blob: c5bd65369f6fbc1dffad47e5f3f90c855a24eca5 [file] [log] [blame]
# Owner(s): ["module: dynamo"]
import functools
import inspect
from unittest import expectedFailure as xfail, skipIf as skip
import numpy as _np
from pytest import raises as assert_raises
import torch
import torch._numpy as w
import torch._numpy._ufuncs as _ufuncs
import torch._numpy._util as _util
from torch._numpy.testing import assert_allclose, assert_equal
from torch.testing._internal.common_cuda import TEST_CUDA
from torch.testing._internal.common_utils import (
instantiate_parametrized_tests,
parametrize,
run_tests,
TestCase,
)
# These function receive one array_like arg and return one array_like result
one_arg_funcs = [
w.asarray,
w.empty_like,
w.ones_like,
w.zeros_like,
functools.partial(w.full_like, fill_value=42),
w.corrcoef,
w.squeeze,
w.argmax,
# w.bincount, # XXX: input dtypes
w.prod,
w.sum,
w.real,
w.imag,
w.angle,
w.real_if_close,
w.isreal,
w.iscomplex,
w.isneginf,
w.isposinf,
w.i0,
w.copy,
w.array,
w.round,
w.around,
w.flip,
w.vstack,
w.hstack,
w.dstack,
w.column_stack,
w.row_stack,
w.flatnonzero,
]
ufunc_names = _ufuncs._unary
ufunc_names.remove("invert") # torch: bitwise_not_cpu not implemented for 'Float'
ufunc_names.remove("bitwise_not")
one_arg_funcs += [getattr(_ufuncs, name) for name in ufunc_names]
@instantiate_parametrized_tests
class TestOneArr(TestCase):
"""Base for smoke tests of one-arg functions: (array_like) -> (array_like)
Accepts array_likes, torch.Tensors, w.ndarays; returns an ndarray
"""
@parametrize("func", one_arg_funcs)
def test_asarray_tensor(self, func):
t = torch.Tensor([[1.0, 2, 3], [4, 5, 6]])
ta = func(t)
assert isinstance(ta, w.ndarray)
@parametrize("func", one_arg_funcs)
def test_asarray_list(self, func):
lst = [[1.0, 2, 3], [4, 5, 6]]
la = func(lst)
assert isinstance(la, w.ndarray)
@parametrize("func", one_arg_funcs)
def test_asarray_array(self, func):
a = w.asarray([[1.0, 2, 3], [4, 5, 6]])
la = func(a)
assert isinstance(la, w.ndarray)
one_arg_axis_funcs = [
w.argmax,
w.argmin,
w.prod,
w.sum,
w.all,
w.any,
w.mean,
w.argsort,
w.std,
w.var,
w.flip,
]
@instantiate_parametrized_tests
class TestOneArrAndAxis(TestCase):
@parametrize("func", one_arg_axis_funcs)
@parametrize("axis", [0, 1, -1, None])
def test_andaxis_tensor(self, func, axis):
t = torch.Tensor([[1.0, 2, 3], [4, 5, 6]])
ta = func(t, axis=axis)
assert isinstance(ta, w.ndarray)
@parametrize("func", one_arg_axis_funcs)
@parametrize("axis", [0, 1, -1, None])
def test_andaxis_list(self, func, axis):
t = [[1.0, 2, 3], [4, 5, 6]]
ta = func(t, axis=axis)
assert isinstance(ta, w.ndarray)
@parametrize("func", one_arg_axis_funcs)
@parametrize("axis", [0, 1, -1, None])
def test_andaxis_array(self, func, axis):
t = w.asarray([[1.0, 2, 3], [4, 5, 6]])
ta = func(t, axis=axis)
assert isinstance(ta, w.ndarray)
@instantiate_parametrized_tests
class TestOneArrAndAxesTuple(TestCase):
@parametrize("func", [w.transpose])
@parametrize("axes", [(0, 2, 1), (1, 2, 0), None])
def test_andtuple_tensor(self, func, axes):
t = torch.ones((1, 2, 3))
ta = func(t, axes=axes)
assert isinstance(ta, w.ndarray)
# a np.transpose -specific test
if axes is None:
newshape = (3, 2, 1)
else:
newshape = tuple(t.shape[axes[i]] for i in range(w.ndim(t)))
assert ta.shape == newshape
@parametrize("func", [w.transpose])
@parametrize("axes", [(0, 2, 1), (1, 2, 0), None])
def test_andtuple_list(self, func, axes):
t = [[[1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]] # shape = (1, 2, 3)
ta = func(t, axes=axes)
assert isinstance(ta, w.ndarray)
@parametrize("func", [w.transpose])
@parametrize("axes", [(0, 2, 1), (1, 2, 0), None])
def test_andtuple_array(self, func, axes):
t = w.asarray([[[1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]])
ta = func(t, axes=axes)
assert isinstance(ta, w.ndarray)
if axes is None:
newshape = (3, 2, 1)
else:
newshape = tuple(t.shape[axes[i]] for i in range(t.ndim))
assert ta.shape == newshape
arr_shape_funcs = [
w.reshape,
w.empty_like,
w.ones_like,
functools.partial(w.full_like, fill_value=42),
w.broadcast_to,
]
@instantiate_parametrized_tests
class TestOneArrAndShape(TestCase):
"""Smoke test of functions (array_like, shape_like) -> array_like"""
def setUp(self):
self.shape = (2, 3)
self.shape_arg_name = {
w.reshape: "newshape",
} # reshape expects `newshape`
@parametrize("func", arr_shape_funcs)
def test_andshape_tensor(self, func):
t = torch.Tensor([[1, 2, 3], [4, 5, 6]])
shape_dict = {self.shape_arg_name.get(func, "shape"): self.shape}
ta = func(t, **shape_dict)
assert isinstance(ta, w.ndarray)
assert ta.shape == self.shape
@parametrize("func", arr_shape_funcs)
def test_andshape_list(self, func):
t = [[1, 2, 3], [4, 5, 6]]
shape_dict = {self.shape_arg_name.get(func, "shape"): self.shape}
ta = func(t, **shape_dict)
assert isinstance(ta, w.ndarray)
assert ta.shape == self.shape
@parametrize("func", arr_shape_funcs)
def test_andshape_array(self, func):
t = w.asarray([[1, 2, 3], [4, 5, 6]])
shape_dict = {self.shape_arg_name.get(func, "shape"): self.shape}
ta = func(t, **shape_dict)
assert isinstance(ta, w.ndarray)
assert ta.shape == self.shape
one_arg_scalar_funcs = [(w.size, _np.size), (w.shape, _np.shape), (w.ndim, _np.ndim)]
@instantiate_parametrized_tests
class TestOneArrToScalar(TestCase):
"""Smoke test of functions (array_like) -> scalar or python object."""
@parametrize("func, np_func", one_arg_scalar_funcs)
def test_toscalar_tensor(self, func, np_func):
t = torch.Tensor([[1, 2, 3], [4, 5, 6]])
ta = func(t)
tn = np_func(_np.asarray(t))
assert not isinstance(ta, w.ndarray)
assert ta == tn
@parametrize("func, np_func", one_arg_scalar_funcs)
def test_toscalar_list(self, func, np_func):
t = [[1, 2, 3], [4, 5, 6]]
ta = func(t)
tn = np_func(t)
assert not isinstance(ta, w.ndarray)
assert ta == tn
@parametrize("func, np_func", one_arg_scalar_funcs)
def test_toscalar_array(self, func, np_func):
t = w.asarray([[1, 2, 3], [4, 5, 6]])
ta = func(t)
tn = np_func(t)
assert not isinstance(ta, w.ndarray)
assert ta == tn
shape_funcs = [w.zeros, w.empty, w.ones, functools.partial(w.full, fill_value=42)]
@instantiate_parametrized_tests
class TestShapeLikeToArray(TestCase):
"""Smoke test (shape_like) -> array."""
shape = (3, 4)
@parametrize("func", shape_funcs)
def test_shape(self, func):
a = func(self.shape)
assert isinstance(a, w.ndarray)
assert a.shape == self.shape
seq_funcs = [w.atleast_1d, w.atleast_2d, w.atleast_3d, w.broadcast_arrays]
@instantiate_parametrized_tests
class TestSequenceOfArrays(TestCase):
"""Smoke test (sequence of arrays) -> (sequence of arrays)."""
@parametrize("func", seq_funcs)
def test_single_tensor(self, func):
t = torch.Tensor([[1, 2, 3], [4, 5, 6]])
ta = func(t)
# for a single argument, broadcast_arrays returns a tuple, while
# atleast_?d return an array
unpack = {w.broadcast_arrays: True}.get(func, False)
res = ta[0] if unpack else ta
assert isinstance(res, w.ndarray)
@parametrize("func", seq_funcs)
def test_single_list(self, func):
lst = [[1, 2, 3], [4, 5, 6]]
la = func(lst)
unpack = {w.broadcast_arrays: True}.get(func, False)
res = la[0] if unpack else la
assert isinstance(res, w.ndarray)
@parametrize("func", seq_funcs)
def test_single_array(self, func):
a = w.asarray([[1, 2, 3], [4, 5, 6]])
la = func(a)
unpack = {w.broadcast_arrays: True}.get(func, False)
res = la[0] if unpack else la
assert isinstance(res, w.ndarray)
@parametrize("func", seq_funcs)
def test_several(self, func):
arys = (
torch.Tensor([[1, 2, 3], [4, 5, 6]]),
w.asarray([[1, 2, 3], [4, 5, 6]]),
[[1, 2, 3], [4, 5, 6]],
)
result = func(*arys)
assert isinstance(result, (tuple, list))
assert len(result) == len(arys)
assert all(isinstance(_, w.ndarray) for _ in result)
seq_to_single_funcs = [
w.concatenate,
w.stack,
w.vstack,
w.hstack,
w.dstack,
w.column_stack,
w.row_stack,
]
@instantiate_parametrized_tests
class TestSequenceOfArraysToSingle(TestCase):
"""Smoke test (sequence of arrays) -> (array)."""
@parametrize("func", seq_to_single_funcs)
def test_several(self, func):
arys = (
torch.Tensor([[1, 2, 3], [4, 5, 6]]),
w.asarray([[1, 2, 3], [4, 5, 6]]),
[[1, 2, 3], [4, 5, 6]],
)
result = func(arys)
assert isinstance(result, w.ndarray)
single_to_seq_funcs = (
w.nonzero,
# https://github.com/Quansight-Labs/numpy_pytorch_interop/pull/121#discussion_r1172824545
# w.tril_indices_from,
# w.triu_indices_from,
w.where,
)
@instantiate_parametrized_tests
class TestArrayToSequence(TestCase):
"""Smoke test array -> (tuple of arrays)."""
@parametrize("func", single_to_seq_funcs)
def test_asarray_tensor(self, func):
t = torch.Tensor([[1, 2, 3], [4, 5, 6]])
ta = func(t)
assert isinstance(ta, tuple)
assert all(isinstance(x, w.ndarray) for x in ta)
@parametrize("func", single_to_seq_funcs)
def test_asarray_list(self, func):
lst = [[1, 2, 3], [4, 5, 6]]
la = func(lst)
assert isinstance(la, tuple)
assert all(isinstance(x, w.ndarray) for x in la)
@parametrize("func", single_to_seq_funcs)
def test_asarray_array(self, func):
a = w.asarray([[1, 2, 3], [4, 5, 6]])
la = func(a)
assert isinstance(la, tuple)
assert all(isinstance(x, w.ndarray) for x in la)
funcs_and_args = [
(w.linspace, (0, 10, 11)),
(w.logspace, (1, 2, 5)),
(w.logspace, (1, 2, 5, 11)),
(w.geomspace, (1, 1000, 5, 11)),
(w.eye, (5, 6)),
(w.identity, (3,)),
(w.arange, (5,)),
(w.arange, (5, 8)),
(w.arange, (5, 8, 0.5)),
(w.tri, (3, 3, -1)),
]
@instantiate_parametrized_tests
class TestPythonArgsToArray(TestCase):
"""Smoke_test (sequence of scalars) -> (array)"""
@parametrize("func, args", funcs_and_args)
def test_argstoarray_simple(self, func, args):
a = func(*args)
assert isinstance(a, w.ndarray)
class TestNormalizations(TestCase):
"""Smoke test generic problems with normalizations."""
def test_unknown_args(self):
# Check that unknown args to decorated functions fail
a = w.arange(7) % 2 == 0
# unknown positional args
with assert_raises(TypeError):
w.nonzero(a, "kaboom")
# unknown kwarg
with assert_raises(TypeError):
w.nonzero(a, oops="ouch")
def test_too_few_args_positional(self):
with assert_raises(TypeError):
w.nonzero()
def test_unknown_args_with_defaults(self):
# check a function 5 arguments and 4 defaults: this should work
w.eye(3)
# five arguments, four defaults: this should fail
with assert_raises(TypeError):
w.eye()
class TestCopyTo(TestCase):
def test_copyto_basic(self):
dst = w.empty(4)
src = w.arange(4)
w.copyto(dst, src)
assert (dst == src).all()
def test_copytobcast(self):
dst = w.empty((4, 2))
src = w.arange(4)
# cannot broadcast => error out
with assert_raises(RuntimeError):
w.copyto(dst, src)
# broadcast src against dst
dst = w.empty((2, 4))
w.copyto(dst, src)
assert (dst == src).all()
def test_copyto_typecast(self):
dst = w.empty(4, dtype=int)
src = w.arange(4, dtype=float)
with assert_raises(TypeError):
w.copyto(dst, src, casting="no")
# force the type cast
w.copyto(dst, src, casting="unsafe")
assert (dst == src).all()
class TestDivmod(TestCase):
def test_divmod_out(self):
x1 = w.arange(8, 15)
x2 = w.arange(4, 11)
out = (w.empty_like(x1), w.empty_like(x1))
quot, rem = w.divmod(x1, x2, out=out)
assert_equal(quot, x1 // x2)
assert_equal(rem, x1 % x2)
out1, out2 = out
assert quot is out[0]
assert rem is out[1]
def test_divmod_out_list(self):
x1 = [4, 5, 6]
x2 = [2, 1, 2]
out = (w.empty_like(x1), w.empty_like(x1))
quot, rem = w.divmod(x1, x2, out=out)
assert quot is out[0]
assert rem is out[1]
@xfail # ("out1, out2 not implemented")
def test_divmod_pos_only(self):
x1 = [4, 5, 6]
x2 = [2, 1, 2]
out1, out2 = w.empty_like(x1), w.empty_like(x1)
quot, rem = w.divmod(x1, x2, out1, out2)
assert quot is out1
assert rem is out2
def test_divmod_no_out(self):
# check that the out= machinery handles no out at all
x1 = w.array([4, 5, 6])
x2 = w.array([2, 1, 2])
quot, rem = w.divmod(x1, x2)
assert_equal(quot, x1 // x2)
assert_equal(rem, x1 % x2)
def test_divmod_out_both_pos_and_kw(self):
o = w.empty(1)
with assert_raises(TypeError):
w.divmod(1, 2, o, o, out=(o, o))
class TestSmokeNotImpl(TestCase):
def test_nimpl_basic(self):
# smoke test that the "NotImplemented" annotation is picked up
with assert_raises(NotImplementedError):
w.empty(3, like="ooops")
@instantiate_parametrized_tests
class TestDefaultDtype(TestCase):
def test_defaultdtype_defaults(self):
# by default, both floats and ints 64 bit
x = w.empty(3)
z = x + 1j * x
assert x.dtype.torch_dtype == torch.float64
assert z.dtype.torch_dtype == torch.complex128
assert w.arange(3).dtype.torch_dtype == torch.int64
@parametrize("dt", ["pytorch", "float32", torch.float32])
def test_set_default_float(self, dt):
try:
w.set_default_dtype(fp_dtype=dt)
x = w.empty(3)
z = x + 1j * x
assert x.dtype.torch_dtype == torch.float32
assert z.dtype.torch_dtype == torch.complex64
finally:
# restore the
w.set_default_dtype(fp_dtype="numpy")
@skip(_np.__version__ <= "1.23", reason="from_dlpack is new in NumPy 1.23")
class TestExport(TestCase):
def test_exported_objects(self):
exported_fns = (
x
for x in dir(w)
if inspect.isfunction(getattr(w, x))
and not x.startswith("_")
and x != "set_default_dtype"
)
diff = set(exported_fns).difference(set(dir(_np)))
assert len(diff) == 0, str(diff)
class TestCtorNested(TestCase):
def test_arrays_in_lists(self):
lst = [[1, 2], [3, w.array(4)]]
assert_equal(w.asarray(lst), [[1, 2], [3, 4]])
class TestMisc(TestCase):
def test_ndarrays_to_tensors(self):
out = _util.ndarrays_to_tensors(((w.asarray(42), 7), 3))
assert len(out) == 2
assert isinstance(out[0], tuple) and len(out[0]) == 2
assert isinstance(out[0][0], torch.Tensor)
@skip(not TEST_CUDA, reason="requires cuda")
def test_f16_on_cuda(self):
# make sure operations with float16 tensors give same results on CUDA and on CPU
t = torch.arange(5, dtype=torch.float16)
assert_allclose(w.vdot(t.cuda(), t.cuda()), w.vdot(t, t))
assert_allclose(w.inner(t.cuda(), t.cuda()), w.inner(t, t))
assert_allclose(w.matmul(t.cuda(), t.cuda()), w.matmul(t, t))
assert_allclose(w.einsum("i,i", t.cuda(), t.cuda()), w.einsum("i,i", t, t))
assert_allclose(w.mean(t.cuda()), w.mean(t))
assert_allclose(w.cov(t.cuda(), t.cuda()), w.cov(t, t).tensor.cuda())
assert_allclose(w.corrcoef(t.cuda()), w.corrcoef(t).tensor.cuda())
if __name__ == "__main__":
run_tests()