| # Owner(s): ["module: dynamo"] |
| |
| import functools |
| import inspect |
| from unittest import expectedFailure as xfail, skipIf as skip |
| |
| import numpy as _np |
| from pytest import raises as assert_raises |
| |
| import torch |
| import torch._numpy as w |
| import torch._numpy._ufuncs as _ufuncs |
| import torch._numpy._util as _util |
| from torch._numpy.testing import assert_allclose, assert_equal |
| from torch.testing._internal.common_cuda import TEST_CUDA |
| from torch.testing._internal.common_utils import ( |
| instantiate_parametrized_tests, |
| parametrize, |
| run_tests, |
| TestCase, |
| ) |
| |
| |
| # These function receive one array_like arg and return one array_like result |
| one_arg_funcs = [ |
| w.asarray, |
| w.empty_like, |
| w.ones_like, |
| w.zeros_like, |
| functools.partial(w.full_like, fill_value=42), |
| w.corrcoef, |
| w.squeeze, |
| w.argmax, |
| # w.bincount, # XXX: input dtypes |
| w.prod, |
| w.sum, |
| w.real, |
| w.imag, |
| w.angle, |
| w.real_if_close, |
| w.isreal, |
| w.iscomplex, |
| w.isneginf, |
| w.isposinf, |
| w.i0, |
| w.copy, |
| w.array, |
| w.round, |
| w.around, |
| w.flip, |
| w.vstack, |
| w.hstack, |
| w.dstack, |
| w.column_stack, |
| w.row_stack, |
| w.flatnonzero, |
| ] |
| |
| ufunc_names = _ufuncs._unary |
| ufunc_names.remove("invert") # torch: bitwise_not_cpu not implemented for 'Float' |
| ufunc_names.remove("bitwise_not") |
| |
| one_arg_funcs += [getattr(_ufuncs, name) for name in ufunc_names] |
| |
| |
| @instantiate_parametrized_tests |
| class TestOneArr(TestCase): |
| """Base for smoke tests of one-arg functions: (array_like) -> (array_like) |
| |
| Accepts array_likes, torch.Tensors, w.ndarays; returns an ndarray |
| """ |
| |
| @parametrize("func", one_arg_funcs) |
| def test_asarray_tensor(self, func): |
| t = torch.Tensor([[1.0, 2, 3], [4, 5, 6]]) |
| ta = func(t) |
| |
| assert isinstance(ta, w.ndarray) |
| |
| @parametrize("func", one_arg_funcs) |
| def test_asarray_list(self, func): |
| lst = [[1.0, 2, 3], [4, 5, 6]] |
| la = func(lst) |
| |
| assert isinstance(la, w.ndarray) |
| |
| @parametrize("func", one_arg_funcs) |
| def test_asarray_array(self, func): |
| a = w.asarray([[1.0, 2, 3], [4, 5, 6]]) |
| la = func(a) |
| |
| assert isinstance(la, w.ndarray) |
| |
| |
| one_arg_axis_funcs = [ |
| w.argmax, |
| w.argmin, |
| w.prod, |
| w.sum, |
| w.all, |
| w.any, |
| w.mean, |
| w.argsort, |
| w.std, |
| w.var, |
| w.flip, |
| ] |
| |
| |
| @instantiate_parametrized_tests |
| class TestOneArrAndAxis(TestCase): |
| @parametrize("func", one_arg_axis_funcs) |
| @parametrize("axis", [0, 1, -1, None]) |
| def test_andaxis_tensor(self, func, axis): |
| t = torch.Tensor([[1.0, 2, 3], [4, 5, 6]]) |
| ta = func(t, axis=axis) |
| assert isinstance(ta, w.ndarray) |
| |
| @parametrize("func", one_arg_axis_funcs) |
| @parametrize("axis", [0, 1, -1, None]) |
| def test_andaxis_list(self, func, axis): |
| t = [[1.0, 2, 3], [4, 5, 6]] |
| ta = func(t, axis=axis) |
| assert isinstance(ta, w.ndarray) |
| |
| @parametrize("func", one_arg_axis_funcs) |
| @parametrize("axis", [0, 1, -1, None]) |
| def test_andaxis_array(self, func, axis): |
| t = w.asarray([[1.0, 2, 3], [4, 5, 6]]) |
| ta = func(t, axis=axis) |
| assert isinstance(ta, w.ndarray) |
| |
| |
| @instantiate_parametrized_tests |
| class TestOneArrAndAxesTuple(TestCase): |
| @parametrize("func", [w.transpose]) |
| @parametrize("axes", [(0, 2, 1), (1, 2, 0), None]) |
| def test_andtuple_tensor(self, func, axes): |
| t = torch.ones((1, 2, 3)) |
| ta = func(t, axes=axes) |
| assert isinstance(ta, w.ndarray) |
| |
| # a np.transpose -specific test |
| if axes is None: |
| newshape = (3, 2, 1) |
| else: |
| newshape = tuple(t.shape[axes[i]] for i in range(w.ndim(t))) |
| assert ta.shape == newshape |
| |
| @parametrize("func", [w.transpose]) |
| @parametrize("axes", [(0, 2, 1), (1, 2, 0), None]) |
| def test_andtuple_list(self, func, axes): |
| t = [[[1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]] # shape = (1, 2, 3) |
| ta = func(t, axes=axes) |
| assert isinstance(ta, w.ndarray) |
| |
| @parametrize("func", [w.transpose]) |
| @parametrize("axes", [(0, 2, 1), (1, 2, 0), None]) |
| def test_andtuple_array(self, func, axes): |
| t = w.asarray([[[1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]]) |
| ta = func(t, axes=axes) |
| assert isinstance(ta, w.ndarray) |
| |
| if axes is None: |
| newshape = (3, 2, 1) |
| else: |
| newshape = tuple(t.shape[axes[i]] for i in range(t.ndim)) |
| assert ta.shape == newshape |
| |
| |
| arr_shape_funcs = [ |
| w.reshape, |
| w.empty_like, |
| w.ones_like, |
| functools.partial(w.full_like, fill_value=42), |
| w.broadcast_to, |
| ] |
| |
| |
| @instantiate_parametrized_tests |
| class TestOneArrAndShape(TestCase): |
| """Smoke test of functions (array_like, shape_like) -> array_like""" |
| |
| def setUp(self): |
| self.shape = (2, 3) |
| self.shape_arg_name = { |
| w.reshape: "newshape", |
| } # reshape expects `newshape` |
| |
| @parametrize("func", arr_shape_funcs) |
| def test_andshape_tensor(self, func): |
| t = torch.Tensor([[1, 2, 3], [4, 5, 6]]) |
| |
| shape_dict = {self.shape_arg_name.get(func, "shape"): self.shape} |
| ta = func(t, **shape_dict) |
| assert isinstance(ta, w.ndarray) |
| assert ta.shape == self.shape |
| |
| @parametrize("func", arr_shape_funcs) |
| def test_andshape_list(self, func): |
| t = [[1, 2, 3], [4, 5, 6]] |
| |
| shape_dict = {self.shape_arg_name.get(func, "shape"): self.shape} |
| ta = func(t, **shape_dict) |
| assert isinstance(ta, w.ndarray) |
| assert ta.shape == self.shape |
| |
| @parametrize("func", arr_shape_funcs) |
| def test_andshape_array(self, func): |
| t = w.asarray([[1, 2, 3], [4, 5, 6]]) |
| |
| shape_dict = {self.shape_arg_name.get(func, "shape"): self.shape} |
| ta = func(t, **shape_dict) |
| assert isinstance(ta, w.ndarray) |
| assert ta.shape == self.shape |
| |
| |
| one_arg_scalar_funcs = [(w.size, _np.size), (w.shape, _np.shape), (w.ndim, _np.ndim)] |
| |
| |
| @instantiate_parametrized_tests |
| class TestOneArrToScalar(TestCase): |
| """Smoke test of functions (array_like) -> scalar or python object.""" |
| |
| @parametrize("func, np_func", one_arg_scalar_funcs) |
| def test_toscalar_tensor(self, func, np_func): |
| t = torch.Tensor([[1, 2, 3], [4, 5, 6]]) |
| ta = func(t) |
| tn = np_func(_np.asarray(t)) |
| |
| assert not isinstance(ta, w.ndarray) |
| assert ta == tn |
| |
| @parametrize("func, np_func", one_arg_scalar_funcs) |
| def test_toscalar_list(self, func, np_func): |
| t = [[1, 2, 3], [4, 5, 6]] |
| ta = func(t) |
| tn = np_func(t) |
| |
| assert not isinstance(ta, w.ndarray) |
| assert ta == tn |
| |
| @parametrize("func, np_func", one_arg_scalar_funcs) |
| def test_toscalar_array(self, func, np_func): |
| t = w.asarray([[1, 2, 3], [4, 5, 6]]) |
| ta = func(t) |
| tn = np_func(t) |
| |
| assert not isinstance(ta, w.ndarray) |
| assert ta == tn |
| |
| |
| shape_funcs = [w.zeros, w.empty, w.ones, functools.partial(w.full, fill_value=42)] |
| |
| |
| @instantiate_parametrized_tests |
| class TestShapeLikeToArray(TestCase): |
| """Smoke test (shape_like) -> array.""" |
| |
| shape = (3, 4) |
| |
| @parametrize("func", shape_funcs) |
| def test_shape(self, func): |
| a = func(self.shape) |
| |
| assert isinstance(a, w.ndarray) |
| assert a.shape == self.shape |
| |
| |
| seq_funcs = [w.atleast_1d, w.atleast_2d, w.atleast_3d, w.broadcast_arrays] |
| |
| |
| @instantiate_parametrized_tests |
| class TestSequenceOfArrays(TestCase): |
| """Smoke test (sequence of arrays) -> (sequence of arrays).""" |
| |
| @parametrize("func", seq_funcs) |
| def test_single_tensor(self, func): |
| t = torch.Tensor([[1, 2, 3], [4, 5, 6]]) |
| ta = func(t) |
| |
| # for a single argument, broadcast_arrays returns a tuple, while |
| # atleast_?d return an array |
| unpack = {w.broadcast_arrays: True}.get(func, False) |
| res = ta[0] if unpack else ta |
| |
| assert isinstance(res, w.ndarray) |
| |
| @parametrize("func", seq_funcs) |
| def test_single_list(self, func): |
| lst = [[1, 2, 3], [4, 5, 6]] |
| la = func(lst) |
| |
| unpack = {w.broadcast_arrays: True}.get(func, False) |
| res = la[0] if unpack else la |
| |
| assert isinstance(res, w.ndarray) |
| |
| @parametrize("func", seq_funcs) |
| def test_single_array(self, func): |
| a = w.asarray([[1, 2, 3], [4, 5, 6]]) |
| la = func(a) |
| |
| unpack = {w.broadcast_arrays: True}.get(func, False) |
| res = la[0] if unpack else la |
| |
| assert isinstance(res, w.ndarray) |
| |
| @parametrize("func", seq_funcs) |
| def test_several(self, func): |
| arys = ( |
| torch.Tensor([[1, 2, 3], [4, 5, 6]]), |
| w.asarray([[1, 2, 3], [4, 5, 6]]), |
| [[1, 2, 3], [4, 5, 6]], |
| ) |
| |
| result = func(*arys) |
| assert isinstance(result, (tuple, list)) |
| assert len(result) == len(arys) |
| assert all(isinstance(_, w.ndarray) for _ in result) |
| |
| |
| seq_to_single_funcs = [ |
| w.concatenate, |
| w.stack, |
| w.vstack, |
| w.hstack, |
| w.dstack, |
| w.column_stack, |
| w.row_stack, |
| ] |
| |
| |
| @instantiate_parametrized_tests |
| class TestSequenceOfArraysToSingle(TestCase): |
| """Smoke test (sequence of arrays) -> (array).""" |
| |
| @parametrize("func", seq_to_single_funcs) |
| def test_several(self, func): |
| arys = ( |
| torch.Tensor([[1, 2, 3], [4, 5, 6]]), |
| w.asarray([[1, 2, 3], [4, 5, 6]]), |
| [[1, 2, 3], [4, 5, 6]], |
| ) |
| |
| result = func(arys) |
| assert isinstance(result, w.ndarray) |
| |
| |
| single_to_seq_funcs = ( |
| w.nonzero, |
| # https://github.com/Quansight-Labs/numpy_pytorch_interop/pull/121#discussion_r1172824545 |
| # w.tril_indices_from, |
| # w.triu_indices_from, |
| w.where, |
| ) |
| |
| |
| @instantiate_parametrized_tests |
| class TestArrayToSequence(TestCase): |
| """Smoke test array -> (tuple of arrays).""" |
| |
| @parametrize("func", single_to_seq_funcs) |
| def test_asarray_tensor(self, func): |
| t = torch.Tensor([[1, 2, 3], [4, 5, 6]]) |
| ta = func(t) |
| |
| assert isinstance(ta, tuple) |
| assert all(isinstance(x, w.ndarray) for x in ta) |
| |
| @parametrize("func", single_to_seq_funcs) |
| def test_asarray_list(self, func): |
| lst = [[1, 2, 3], [4, 5, 6]] |
| la = func(lst) |
| |
| assert isinstance(la, tuple) |
| assert all(isinstance(x, w.ndarray) for x in la) |
| |
| @parametrize("func", single_to_seq_funcs) |
| def test_asarray_array(self, func): |
| a = w.asarray([[1, 2, 3], [4, 5, 6]]) |
| la = func(a) |
| |
| assert isinstance(la, tuple) |
| assert all(isinstance(x, w.ndarray) for x in la) |
| |
| |
| funcs_and_args = [ |
| (w.linspace, (0, 10, 11)), |
| (w.logspace, (1, 2, 5)), |
| (w.logspace, (1, 2, 5, 11)), |
| (w.geomspace, (1, 1000, 5, 11)), |
| (w.eye, (5, 6)), |
| (w.identity, (3,)), |
| (w.arange, (5,)), |
| (w.arange, (5, 8)), |
| (w.arange, (5, 8, 0.5)), |
| (w.tri, (3, 3, -1)), |
| ] |
| |
| |
| @instantiate_parametrized_tests |
| class TestPythonArgsToArray(TestCase): |
| """Smoke_test (sequence of scalars) -> (array)""" |
| |
| @parametrize("func, args", funcs_and_args) |
| def test_argstoarray_simple(self, func, args): |
| a = func(*args) |
| assert isinstance(a, w.ndarray) |
| |
| |
| class TestNormalizations(TestCase): |
| """Smoke test generic problems with normalizations.""" |
| |
| def test_unknown_args(self): |
| # Check that unknown args to decorated functions fail |
| a = w.arange(7) % 2 == 0 |
| |
| # unknown positional args |
| with assert_raises(TypeError): |
| w.nonzero(a, "kaboom") |
| |
| # unknown kwarg |
| with assert_raises(TypeError): |
| w.nonzero(a, oops="ouch") |
| |
| def test_too_few_args_positional(self): |
| with assert_raises(TypeError): |
| w.nonzero() |
| |
| def test_unknown_args_with_defaults(self): |
| # check a function 5 arguments and 4 defaults: this should work |
| w.eye(3) |
| |
| # five arguments, four defaults: this should fail |
| with assert_raises(TypeError): |
| w.eye() |
| |
| |
| class TestCopyTo(TestCase): |
| def test_copyto_basic(self): |
| dst = w.empty(4) |
| src = w.arange(4) |
| w.copyto(dst, src) |
| assert (dst == src).all() |
| |
| def test_copytobcast(self): |
| dst = w.empty((4, 2)) |
| src = w.arange(4) |
| |
| # cannot broadcast => error out |
| with assert_raises(RuntimeError): |
| w.copyto(dst, src) |
| |
| # broadcast src against dst |
| dst = w.empty((2, 4)) |
| w.copyto(dst, src) |
| assert (dst == src).all() |
| |
| def test_copyto_typecast(self): |
| dst = w.empty(4, dtype=int) |
| src = w.arange(4, dtype=float) |
| |
| with assert_raises(TypeError): |
| w.copyto(dst, src, casting="no") |
| |
| # force the type cast |
| w.copyto(dst, src, casting="unsafe") |
| assert (dst == src).all() |
| |
| |
| class TestDivmod(TestCase): |
| def test_divmod_out(self): |
| x1 = w.arange(8, 15) |
| x2 = w.arange(4, 11) |
| |
| out = (w.empty_like(x1), w.empty_like(x1)) |
| |
| quot, rem = w.divmod(x1, x2, out=out) |
| |
| assert_equal(quot, x1 // x2) |
| assert_equal(rem, x1 % x2) |
| |
| out1, out2 = out |
| assert quot is out[0] |
| assert rem is out[1] |
| |
| def test_divmod_out_list(self): |
| x1 = [4, 5, 6] |
| x2 = [2, 1, 2] |
| |
| out = (w.empty_like(x1), w.empty_like(x1)) |
| |
| quot, rem = w.divmod(x1, x2, out=out) |
| |
| assert quot is out[0] |
| assert rem is out[1] |
| |
| @xfail # ("out1, out2 not implemented") |
| def test_divmod_pos_only(self): |
| x1 = [4, 5, 6] |
| x2 = [2, 1, 2] |
| |
| out1, out2 = w.empty_like(x1), w.empty_like(x1) |
| |
| quot, rem = w.divmod(x1, x2, out1, out2) |
| |
| assert quot is out1 |
| assert rem is out2 |
| |
| def test_divmod_no_out(self): |
| # check that the out= machinery handles no out at all |
| x1 = w.array([4, 5, 6]) |
| x2 = w.array([2, 1, 2]) |
| quot, rem = w.divmod(x1, x2) |
| |
| assert_equal(quot, x1 // x2) |
| assert_equal(rem, x1 % x2) |
| |
| def test_divmod_out_both_pos_and_kw(self): |
| o = w.empty(1) |
| with assert_raises(TypeError): |
| w.divmod(1, 2, o, o, out=(o, o)) |
| |
| |
| class TestSmokeNotImpl(TestCase): |
| def test_nimpl_basic(self): |
| # smoke test that the "NotImplemented" annotation is picked up |
| with assert_raises(NotImplementedError): |
| w.empty(3, like="ooops") |
| |
| |
| @instantiate_parametrized_tests |
| class TestDefaultDtype(TestCase): |
| def test_defaultdtype_defaults(self): |
| # by default, both floats and ints 64 bit |
| x = w.empty(3) |
| z = x + 1j * x |
| |
| assert x.dtype.torch_dtype == torch.float64 |
| assert z.dtype.torch_dtype == torch.complex128 |
| |
| assert w.arange(3).dtype.torch_dtype == torch.int64 |
| |
| @parametrize("dt", ["pytorch", "float32", torch.float32]) |
| def test_set_default_float(self, dt): |
| try: |
| w.set_default_dtype(fp_dtype=dt) |
| |
| x = w.empty(3) |
| z = x + 1j * x |
| |
| assert x.dtype.torch_dtype == torch.float32 |
| assert z.dtype.torch_dtype == torch.complex64 |
| |
| finally: |
| # restore the |
| w.set_default_dtype(fp_dtype="numpy") |
| |
| |
| @skip(_np.__version__ <= "1.23", reason="from_dlpack is new in NumPy 1.23") |
| class TestExport(TestCase): |
| def test_exported_objects(self): |
| exported_fns = ( |
| x |
| for x in dir(w) |
| if inspect.isfunction(getattr(w, x)) |
| and not x.startswith("_") |
| and x != "set_default_dtype" |
| ) |
| diff = set(exported_fns).difference(set(dir(_np))) |
| assert len(diff) == 0, str(diff) |
| |
| |
| class TestCtorNested(TestCase): |
| def test_arrays_in_lists(self): |
| lst = [[1, 2], [3, w.array(4)]] |
| assert_equal(w.asarray(lst), [[1, 2], [3, 4]]) |
| |
| |
| class TestMisc(TestCase): |
| def test_ndarrays_to_tensors(self): |
| out = _util.ndarrays_to_tensors(((w.asarray(42), 7), 3)) |
| assert len(out) == 2 |
| assert isinstance(out[0], tuple) and len(out[0]) == 2 |
| assert isinstance(out[0][0], torch.Tensor) |
| |
| @skip(not TEST_CUDA, reason="requires cuda") |
| def test_f16_on_cuda(self): |
| # make sure operations with float16 tensors give same results on CUDA and on CPU |
| t = torch.arange(5, dtype=torch.float16) |
| assert_allclose(w.vdot(t.cuda(), t.cuda()), w.vdot(t, t)) |
| assert_allclose(w.inner(t.cuda(), t.cuda()), w.inner(t, t)) |
| assert_allclose(w.matmul(t.cuda(), t.cuda()), w.matmul(t, t)) |
| assert_allclose(w.einsum("i,i", t.cuda(), t.cuda()), w.einsum("i,i", t, t)) |
| |
| assert_allclose(w.mean(t.cuda()), w.mean(t)) |
| |
| assert_allclose(w.cov(t.cuda(), t.cuda()), w.cov(t, t).tensor.cuda()) |
| assert_allclose(w.corrcoef(t.cuda()), w.corrcoef(t).tensor.cuda()) |
| |
| |
| if __name__ == "__main__": |
| run_tests() |