tensorcircuit.backends.cupy_backend 源代码

"""
CuPy backend. Not in the tensornetwork package and highly experimental.
"""
# pylint: disable=invalid-name

import logging
import warnings
from typing import Any, Callable, Optional, Sequence, Tuple, Union

import numpy as np
import scipy

try:  # old version tn compatiblity
    from tensornetwork.backends import base_backend

    tnbackend = base_backend.BaseBackend

except ImportError:
    from tensornetwork.backends import abstract_backend

    tnbackend = abstract_backend.AbstractBackend


from .abstract_backend import ExtendedBackend

logger = logging.getLogger(__name__)

dtypestr: str
Tensor = Any

cp: Any
cpx: Any


[文档]class CuPyBackend(tnbackend, ExtendedBackend): # type: ignore
[文档] def __init__(self) -> None: super().__init__() try: import cupy import cupyx except ImportError: raise ImportError( "CuPy not installed, please switch to a different " "backend or install CuPy." ) global cp global cpx cp = cupy cpx = cupyx self.name = "cupy"
[文档] def convert_to_tensor(self, a: Tensor) -> Tensor: if not isinstance(a, cp.ndarray) and not cp.isscalar(a): a = cp.array(a) a = cp.asarray(a) return a
[文档] def sum( self: Any, a: Tensor, axis: Optional[Sequence[int]] = None, keepdims: bool = False, ) -> Tensor: return cp.sum(a, axis=axis, keepdims=keepdims)
[文档] def conj(self, tensor: Tensor) -> Tensor: return tensor.conj()
[文档] def sign(self, tensor: Tensor) -> Tensor: return cp.sign(tensor)
[文档] def multiply(self, tensor1: Tensor, tensor2: Tensor) -> Tensor: return tensor1 * tensor2
[文档] def norm(self, tensor: Tensor) -> Tensor: return cp.linalg.norm(tensor)
[文档] def shape_tuple(self, tensor: Tensor) -> Tuple[int]: return tensor.shape # type:ignore
[文档] def tensordot( self, a: Tensor, b: Tensor, axes: Union[int, Sequence[Sequence[int]]] ) -> Tensor: return cp.tensordot(a, b, axes)
[文档] def outer_product(self, tensor1: Tensor, tensor2: Tensor) -> Tensor: return cp.tensordot(tensor1, tensor2, 0)
[文档] def transpose(self, tensor: Tensor, perm: Optional[Sequence[int]] = None) -> Tensor: return cp.transpose(tensor, perm)
[文档] def reshape(self, tensor: Tensor, shape: Tensor) -> Tensor: return cp.reshape(tensor, np.asarray(shape).astype(np.int32))
[文档] def eye( self, N: int, dtype: Optional[str] = None, M: Optional[int] = None ) -> Tensor: if dtype is None: dtype = dtypestr return cp.eye(N, M=M, dtype=dtype)
[文档] def ones(self, shape: Sequence[int], dtype: Optional[str] = None) -> Tensor: if dtype is None: dtype = dtypestr return cp.ones(shape, dtype=dtype)
[文档] def zeros(self, shape: Sequence[int], dtype: Optional[str] = None) -> Tensor: if dtype is None: dtype = dtypestr return cp.zeros(shape, dtype=dtype)
[文档] def copy(self, a: Tensor) -> Tensor: return a.copy()
[文档] def expm(self, a: Tensor) -> Tensor: return self.convert_to_tensor(scipy.linalg.expm(self.numpy(a)))
[文档] def abs(self, a: Tensor) -> Tensor: return cp.abs(a)
[文档] def sin(self, a: Tensor) -> Tensor: return cp.sin(a)
[文档] def cos(self, a: Tensor) -> Tensor: return cp.cos(a)
# acos acosh asin asinh atan atan2 atanh cosh (cos) tan tanh sinh (sin)
[文档] def acos(self, a: Tensor) -> Tensor: return cp.arccos(a)
[文档] def acosh(self, a: Tensor) -> Tensor: return cp.arccosh(a)
[文档] def asin(self, a: Tensor) -> Tensor: return cp.arcsin(a)
[文档] def asinh(self, a: Tensor) -> Tensor: return cp.arcsinh(a)
[文档] def atan(self, a: Tensor) -> Tensor: return cp.arctan(a)
[文档] def atan2(self, y: Tensor, x: Tensor) -> Tensor: return cp.arctan2(y, x)
[文档] def atanh(self, a: Tensor) -> Tensor: return cp.arctanh(a)
[文档] def cosh(self, a: Tensor) -> Tensor: return cp.cosh(a)
[文档] def tan(self, a: Tensor) -> Tensor: return cp.tan(a)
[文档] def tanh(self, a: Tensor) -> Tensor: return cp.tanh(a)
[文档] def sinh(self, a: Tensor) -> Tensor: return cp.sinh(a)
[文档] def size(self, a: Tensor) -> Tensor: return a.size
[文档] def eigvalsh(self, a: Tensor) -> Tensor: return cp.linalg.eigvalsh(a)
[文档] def kron(self, a: Tensor, b: Tensor) -> Tensor: return cp.kron(a, b)
[文档] def dtype(self, a: Tensor) -> str: return a.dtype.__str__() # type: ignore
[文档] def numpy(self, a: Tensor) -> Tensor: if isinstance(a, cp.ndarray): return a.get() else: return np.array(a)
[文档] def i(self, dtype: Any = None) -> Tensor: if not dtype: dtype = npdtype # type: ignore if isinstance(dtype, str): dtype = getattr(np, dtype) return cp.array(1j, dtype=dtype)
[文档] def stack(self, a: Sequence[Tensor], axis: int = 0) -> Tensor: return cp.stack(a, axis=axis)
[文档] def concat(self, a: Sequence[Tensor], axis: int = 0) -> Tensor: return cp.concatenate(a, axis=axis)
[文档] def tile(self, a: Tensor, rep: Tensor) -> Tensor: return cp.tile(a, rep)
[文档] def mean( self, a: Tensor, axis: Optional[Sequence[int]] = None, keepdims: bool = False, ) -> Tensor: return cp.mean(a, axis=axis, keepdims=keepdims)
[文档] def std( self, a: Tensor, axis: Optional[Sequence[int]] = None, keepdims: bool = False ) -> Tensor: return cp.std(a, axis=axis, keepdims=keepdims)
[文档] def unique_with_counts(self, a: Tensor, **kws: Any) -> Tuple[Tensor, Tensor]: return cp.unique(a, return_counts=True) # type: ignore
[文档] def min(self, a: Tensor, axis: Optional[int] = None) -> Tensor: return cp.min(a, axis=axis)
[文档] def max(self, a: Tensor, axis: Optional[int] = None) -> Tensor: return cp.max(a, axis=axis)
[文档] def argmax(self, a: Tensor, axis: int = 0) -> Tensor: return cp.argmax(a, axis=axis)
[文档] def argmin(self, a: Tensor, axis: int = 0) -> Tensor: return cp.argmin(a, axis=axis)
[文档] def sigmoid(self, a: Tensor) -> Tensor: return cpx.scipy.special.expit(a)
[文档] def relu(self, a: Tensor) -> Tensor: return (abs(a) + a) / 2
# this impl seems to be the fastest # see https://stackoverflow.com/questions/32109319/how-to-implement-the-relu-function-in-numpy
[文档] def softmax(self, a: Sequence[Tensor], axis: Optional[int] = None) -> Tensor: return cpx.scipy.special.softmax(a, axis=axis)
[文档] def onehot(self, a: Tensor, num: int) -> Tensor: res = cp.eye(num)[a.reshape([-1])] return res.reshape(list(a.shape) + [num])
# https://stackoverflow.com/questions/38592324/one-hot-encoding-using-numpy
[文档] def cumsum(self, a: Tensor, axis: Optional[int] = None) -> Tensor: return cp.cumsum(a, axis)
[文档] def is_tensor(self, a: Any) -> bool: if isinstance(a, cp.ndarray): return True return False
[文档] def real(self, a: Tensor) -> Tensor: return cp.real(a)
[文档] def imag(self, a: Tensor) -> Tensor: return cp.imag(a)
[文档] def cast(self, a: Tensor, dtype: str) -> Tensor: with warnings.catch_warnings(): warnings.simplefilter("ignore", np.ComplexWarning) if isinstance(dtype, str): return a.astype(getattr(np, dtype)) return a.astype(dtype)
[文档] def arange(self, start: int, stop: Optional[int] = None, step: int = 1) -> Tensor: if stop is None: return cp.arange(start=0, stop=start, step=step) return cp.arange(start=start, stop=stop, step=step)
[文档] def mod(self, x: Tensor, y: Tensor) -> Tensor: return cp.mod(x, y)
[文档] def right_shift(self, x: Tensor, y: Tensor) -> Tensor: return cp.right_shift(x, y)
[文档] def left_shift(self, x: Tensor, y: Tensor) -> Tensor: return cp.left_shift(x, y)
[文档] def solve(self, A: Tensor, b: Tensor, assume_a: str = "gen") -> Tensor: # type: ignore raise NotImplementedError
[文档] def searchsorted(self, a: Tensor, v: Tensor, side: str = "left") -> Tensor: return cp.searchsorted(a, v, side=side)
[文档] def set_random_state( self, seed: Optional[int] = None, get_only: bool = False ) -> Any: g = cp.random.default_rng(seed) # None auto supported if get_only is False: self.g = g return g
[文档] def stateful_randn( self, g: "cp.random.Generator", shape: Union[int, Sequence[int]] = 1, mean: float = 0, stddev: float = 1, dtype: str = "32", ) -> Tensor: if isinstance(dtype, str): dtype = dtype[-2:] if isinstance(shape, int): shape = (shape,) r = g.normal(loc=mean, scale=stddev, size=shape) if dtype == "32": r = r.astype(np.float32) elif dtype == "64": r = r.astype(np.float64) elif not isinstance(dtype, str): r = r.astype(dtype) else: raise ValueError("unspported `dtype` %s" % dtype) return r
[文档] def stateful_randu( self, g: "cp.random.Generator", shape: Union[int, Sequence[int]] = 1, low: float = 0, high: float = 1, dtype: str = "32", ) -> Tensor: if isinstance(dtype, str): dtype = dtype[-2:] if isinstance(shape, int): shape = (shape,) r = g.random(shape) * (high - low) + low if dtype == "32": r = r.astype(np.float32) elif dtype == "64": r = r.astype(np.float64) elif not isinstance(dtype, str): r = r.astype(dtype) else: raise ValueError("unspported `dtype` %s" % dtype) return r
[文档] def stateful_randc( self, g: "cp.random.Generator", a: Union[int, Sequence[int], Tensor], shape: Union[int, Sequence[int]], p: Optional[Union[Sequence[float], Tensor]] = None, ) -> Tensor: if isinstance(shape, int): shape = (shape,) return g.choice(a, size=shape, replace=True, p=p)
[文档] def scatter(self, operand: Tensor, indices: Tensor, updates: Tensor) -> Tensor: operand_new = cp.copy(operand) operand_new[tuple([indices[:, i] for i in range(indices.shape[1])])] = updates return operand_new
[文档] def coo_sparse_matrix( self, indices: Tensor, values: Tensor, shape: Tensor ) -> Tensor: values = self.convert_to_tensor(values) indices = self.convert_to_tensor(indices).T return cp.sparse.coo_matrix((values, indices), shape=shape)
[文档] def sparse_dense_matmul( self, sp_a: Tensor, b: Tensor, ) -> Tensor: return sp_a @ b
[文档] def to_dense(self, sp_a: Tensor) -> Tensor: return sp_a.todense()
[文档] def is_sparse(self, a: Tensor) -> bool: return cpx.scipy.sparse.issparse(a) # type: ignore
[文档] def cond( self, pred: bool, true_fun: Callable[[], Tensor], false_fun: Callable[[], Tensor], ) -> Tensor: if pred: return true_fun() return false_fun()
[文档] def switch(self, index: Tensor, branches: Sequence[Callable[[], Tensor]]) -> Tensor: return branches[index]()
[文档] def device(self, a: Tensor) -> str: return self._dev2str(a.device)
[文档] def device_move(self, a: Tensor, dev: Any) -> Tensor: if isinstance(dev, str): dev = self._str2dev(dev) with dev: return cp.asarray(a)
def _dev2str(self, dev: Any) -> str: return f"gpu:{dev.id}" def _str2dev(self, str_: str) -> Any: if str_ == "cpu": raise ValueError("CuPy backend only support GPU device") else: return cp.cuda.Device(int(str_.split(":")[-1]))
[文档] def stop_gradient(self, a: Tensor) -> Tensor: raise NotImplementedError("CuPy backend doesn't support AD")
[文档] def grad( self, f: Callable[..., Any], argnums: Union[int, Sequence[int]] = 0, has_aux: bool = False, ) -> Callable[..., Any]: raise NotImplementedError("CuPy backend doesn't support AD")
[文档] def value_and_grad( self, f: Callable[..., Any], argnums: Union[int, Sequence[int]] = 0, has_aux: bool = False, ) -> Callable[..., Tuple[Any, Any]]: raise NotImplementedError("CuPy backend doesn't support AD")
[文档] def jit( self, f: Callable[..., Any], static_argnums: Optional[Union[int, Sequence[int]]] = None, jit_compile: Optional[bool] = None, **kws: Any, ) -> Callable[..., Any]: logger.warning("CuPy backend has no jit interface, just do nothing") return f
# raise NotImplementedError("numpy backend doesn't support jit compiling")
[文档] def vmap( self, f: Callable[..., Any], vectorized_argnums: Union[int, Sequence[int]] = 0 ) -> Any: logger.warning( "CuPy backend has no intrinsic vmap like interface" ", use vectorize instead (plain for loop)" ) if isinstance(vectorized_argnums, int): vectorized_argnums = (vectorized_argnums,) def wrapper(*args: Any, **kws: Any) -> Tensor: results = [] for barg in zip(*[args[i] for i in vectorized_argnums]): # type: ignore narg = [] j = 0 for k in range(len(args)): if k in vectorized_argnums: # type: ignore narg.append(barg[j]) j += 1 else: narg.append(args[k]) results.append(f(*narg, **kws)) return cp.array(results) return wrapper
[文档] def vectorized_value_and_grad( self, f: Callable[..., Any], argnums: Union[int, Sequence[int]] = 0, vectorized_argnums: Union[int, Sequence[int]] = 0, has_aux: bool = False, ) -> Callable[..., Tuple[Any, Any]]: raise NotImplementedError("CuPy backend doesn't support AD")
vvag = vectorized_value_and_grad
[文档] def vjp( self, f: Callable[..., Any], inputs: Union[Tensor, Sequence[Tensor]], v: Union[Tensor, Sequence[Tensor]], ) -> Tuple[Union[Tensor, Sequence[Tensor]], Union[Tensor, Sequence[Tensor]]]: raise NotImplementedError("CuPy backend doesn't support AD")
[文档] def jvp( self, f: Callable[..., Any], inputs: Union[Tensor, Sequence[Tensor]], v: Union[Tensor, Sequence[Tensor]], ) -> Tuple[Union[Tensor, Sequence[Tensor]], Union[Tensor, Sequence[Tensor]]]: raise NotImplementedError("CuPy backend doesn't support AD")