写入自定义数组容器

Numpy版本v1.16中引入的Numpy调度机制是编写与numpyapi兼容并提供Numpy功能的自定义实现的自定义N维数组容器的推荐方法。应用包括 dask 数组,分布在多个节点上的N维数组,以及 cupy 数组,GPU上的N维数组。

为了了解如何编写自定义数组容器,我们将从一个简单的示例开始,该示例的实用程序相当狭窄,但说明了所涉及的概念。

>>> import numpy as np
>>> class DiagonalArray:
...     def __init__(self, N, value):
...         self._N = N
...         self._i = value
...     def __repr__(self):
...         return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
...     def __array__(self):
...         return self._i * np.eye(self._N)

我们的自定义数组可以实例化为:

>>> arr = DiagonalArray(5, 1)
>>> arr
DiagonalArray(N=5, value=1)

我们可以使用 numpy.arraynumpy.asarray ,将其称为 __array__ 获取标准的方法 numpy.ndarray .

>>> np.asarray(arr)
array([[1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 1.]])

如果我们在 arr 使用numpy函数,numpy将再次使用 __array__ 接口将其转换为数组,然后按常规方式应用函数。

>>> np.multiply(arr, 2)
array([[2., 0., 0., 0., 0.],
       [0., 2., 0., 0., 0.],
       [0., 0., 2., 0., 0.],
       [0., 0., 0., 2., 0.],
       [0., 0., 0., 0., 2.]])

请注意,返回类型是标准的 numpy.ndarray .

>>> type(arr)
numpy.ndarray

如何通过这个函数传递自定义数组类型?Numpy允许类指示它希望通过接口以自定义的方式处理计算 __array_ufunc____array_function__ . 我们一次拿一个,从 _array_ufunc__ . 此方法包括 通用函数 (ufunc ) ,一类函数,包括,例如, numpy.multiplynumpy.sin .

这个 __array_ufunc__ 接收:

  • ufunc, a function like numpy.multiply

  • method ,一个字符串,区分 numpy.multiply(...) 以及类似 numpy.multiply.outernumpy.multiply.accumulate ,等等。一般情况下, numpy.multiply(...)method == '__call__' .

  • inputs ,可能是不同类型的混合体

  • kwargs ,传递给函数的关键字参数

对于这个例子,我们只处理这个方法 __call__

>>> from numbers import Number
>>> class DiagonalArray:
...     def __init__(self, N, value):
...         self._N = N
...         self._i = value
...     def __repr__(self):
...         return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
...     def __array__(self):
...         return self._i * np.eye(self._N)
...     def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
...         if method == '__call__':
...             N = None
...             scalars = []
...             for input in inputs:
...                 if isinstance(input, Number):
...                     scalars.append(input)
...                 elif isinstance(input, self.__class__):
...                     scalars.append(input._i)
...                     if N is not None:
...                         if N != self._N:
...                             raise TypeError("inconsistent sizes")
...                     else:
...                         N = self._N
...                 else:
...                     return NotImplemented
...             return self.__class__(N, ufunc(*scalars, **kwargs))
...         else:
...             return NotImplemented

现在我们的自定义数组类型通过numpy函数。

>>> arr = DiagonalArray(5, 1)
>>> np.multiply(arr, 3)
DiagonalArray(N=5, value=3)
>>> np.add(arr, 3)
DiagonalArray(N=5, value=4)
>>> np.sin(arr)
DiagonalArray(N=5, value=0.8414709848078965)

在这一点上 arr + 3 不工作。

>>> arr + 3
TypeError: unsupported operand type(s) for *: 'DiagonalArray' and 'int'

定义Python所需要的接口 __add____lt__ ,依此类推以分派到相应的ufunc。我们可以通过从mixin继承来方便地实现这一点 NDArrayOperatorsMixin .

>>> import numpy.lib.mixins
>>> class DiagonalArray(numpy.lib.mixins.NDArrayOperatorsMixin):
...     def __init__(self, N, value):
...         self._N = N
...         self._i = value
...     def __repr__(self):
...         return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
...     def __array__(self):
...         return self._i * np.eye(self._N)
...     def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
...         if method == '__call__':
...             N = None
...             scalars = []
...             for input in inputs:
...                 if isinstance(input, Number):
...                     scalars.append(input)
...                 elif isinstance(input, self.__class__):
...                     scalars.append(input._i)
...                     if N is not None:
...                         if N != self._N:
...                             raise TypeError("inconsistent sizes")
...                     else:
...                         N = self._N
...                 else:
...                     return NotImplemented
...             return self.__class__(N, ufunc(*scalars, **kwargs))
...         else:
...             return NotImplemented
>>> arr = DiagonalArray(5, 1)
>>> arr + 3
DiagonalArray(N=5, value=4)
>>> arr > 0
DiagonalArray(N=5, value=True)

现在我们来解决这个问题 __array_function__ . 我们将创建将numpy函数映射到自定义变体的dict。

>>> HANDLED_FUNCTIONS = {}
>>> class DiagonalArray(numpy.lib.mixins.NDArrayOperatorsMixin):
...     def __init__(self, N, value):
...         self._N = N
...         self._i = value
...     def __repr__(self):
...         return f"{self.__class__.__name__}(N={self._N}, value={self._i})"
...     def __array__(self):
...         return self._i * np.eye(self._N)
...     def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
...         if method == '__call__':
...             N = None
...             scalars = []
...             for input in inputs:
...                 # In this case we accept only scalar numbers or DiagonalArrays.
...                 if isinstance(input, Number):
...                     scalars.append(input)
...                 elif isinstance(input, self.__class__):
...                     scalars.append(input._i)
...                     if N is not None:
...                         if N != self._N:
...                             raise TypeError("inconsistent sizes")
...                     else:
...                         N = self._N
...                 else:
...                     return NotImplemented
...             return self.__class__(N, ufunc(*scalars, **kwargs))
...         else:
...             return NotImplemented
...    def __array_function__(self, func, types, args, kwargs):
...        if func not in HANDLED_FUNCTIONS:
...            return NotImplemented
...        # Note: this allows subclasses that don't override
...        # __array_function__ to handle DiagonalArray objects.
...        if not all(issubclass(t, self.__class__) for t in types):
...            return NotImplemented
...        return HANDLED_FUNCTIONS[func](*args, **kwargs)
...

一个方便的模式是定义一个装饰器 implements 可用于将函数添加到 HANDLED_FUNCTIONS .

>>> def implements(np_function):
...    "Register an __array_function__ implementation for DiagonalArray objects."
...    def decorator(func):
...        HANDLED_FUNCTIONS[np_function] = func
...        return func
...    return decorator
...

现在我们为编写numpy函数的实现 DiagonalArray . 为了完整性,支持 arr.sum() 添加方法 sum 那个电话 numpy.sum(self) ,和 mean .

>>> @implements(np.sum)
... def sum(arr):
...     "Implementation of np.sum for DiagonalArray objects"
...     return arr._i * arr._N
...
>>> @implements(np.mean)
... def mean(arr):
...     "Implementation of np.mean for DiagonalArray objects"
...     return arr._i / arr._N
...
>>> arr = DiagonalArray(5, 1)
>>> np.sum(arr)
5
>>> np.mean(arr)
0.2

如果用户尝试使用未包含在 HANDLED_FUNCTIONS ,A TypeError 将由numpy引发,表示不支持此操作。例如,连接两个 DiagonalArrays 不会生成另一个对角数组,因此不支持它。

>>> np.concatenate([arr, arr])
TypeError: no implementation found for 'numpy.concatenate' on types that implement __array_function__: [<class '__main__.DiagonalArray'>]

此外,我们的 summean 不要接受numpy实现所做的可选参数。

>>> np.sum(arr, axis=0)
TypeError: sum() got an unexpected keyword argument 'axis'

用户始终可以选择转换为普通 numpy.ndarray 具有 numpy.asarray 从那里开始使用标准numpy。

>>> np.concatenate([np.asarray(arr), np.asarray(arr)])
array([[1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 1.],
       [1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 1.]])

参考 dask source codecupy source code 有关更完整的自定义数组容器示例。

也见 NEP 18 .