blob: 018f1f5b735007a9cbcb7a8adbcfd4c9ad93603d [file] [log] [blame]
import numpy as np
import math
import torch
import io
import unittest
from copy import deepcopy
from hypothesis import given
from hypothesis import strategies as st
from torch.testing._internal.common_utils import TestCase, TEST_WITH_ROCM
import torch.testing._internal.hypothesis_utils as hu
hu.assert_deadline_disabled()
import tempfile
class Foo(torch.nn.Module):
def __init__(self):
super(Foo, self).__init__()
self.qscheme = torch.per_tensor_symmetric
def _calculate_dynamic_qparams(X, dtype, reduce_range=False):
"""Calculate the dynamic quantization parameters (scale, zero_point)
according to the min and max element of the tensor"""
if isinstance(X, torch.Tensor):
X = X.numpy()
if dtype == torch.qint8:
if reduce_range:
qmin, qmax = -64, 63
else:
qmin, qmax = -128, 127
else: # dtype == torch.quint8
if reduce_range:
qmin, qmax = 0, 127
else:
qmin, qmax = 0, 255
min_val = X.min().astype(dtype=np.float32)
max_val = X.max().astype(dtype=np.float32)
min_val = min(0.0, min_val)
max_val = max(0.0, max_val)
scale = (np.float64(max_val) - min_val) / (qmax - qmin)
if scale == 0.0 or math.isinf(1.0 / scale):
scale = np.float64(0.1)
zero_point = 0
zero_point_from_min = qmin - min_val / float(scale)
zero_point_from_max = qmax - max_val / float(scale)
zero_point_from_min_error = abs(qmin) - abs(min_val / float(scale))
zero_point_from_max_error = abs(qmax) - abs(max_val / float(scale))
if zero_point_from_min_error < zero_point_from_max_error:
initial_zero_point = zero_point_from_min
else:
initial_zero_point = zero_point_from_max
nudged_zero_point = 0
if initial_zero_point < qmin:
nudged_zero_point = qmin
elif initial_zero_point > qmax:
nudged_zero_point = qmax
else:
nudged_zero_point = int(round(initial_zero_point))
return [scale.astype(np.float32), int(nudged_zero_point)]
def get_supported_device_types():
return ['cpu', 'cuda'] if torch.cuda.is_available() and not TEST_WITH_ROCM else ['cpu']
class TestQuantizedTensor(TestCase):
def test_qtensor(self):
num_elements = 10
scale = 1.0
zero_point = 2
for device in get_supported_device_types():
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
r = torch.ones(num_elements, dtype=torch.float, device=device)
qr = torch.quantize_per_tensor(r, scale, zero_point, dtype)
self.assertEqual(qr.q_scale(), scale)
self.assertEqual(qr.q_zero_point(), zero_point)
self.assertTrue(qr.is_quantized)
self.assertFalse(r.is_quantized)
self.assertEqual(qr.qscheme(), torch.per_tensor_affine)
self.assertTrue(isinstance(qr.qscheme(), torch.qscheme))
# slicing and int_repr
int_repr = qr.int_repr()
for num in int_repr:
self.assertEqual(num, 3)
for num in qr[2:].int_repr():
self.assertEqual(num, 3)
# dequantize
rqr = qr.dequantize()
for i in range(num_elements):
self.assertEqual(r[i], rqr[i])
# we can also print a qtensor
empty_r = torch.ones((0, 1), dtype=torch.float, device=device)
empty_qr = torch.quantize_per_tensor(empty_r, scale, zero_point, dtype)
device_msg = "" if device == 'cpu' else "device='" + device + ":0', "
dtype_msg = str(dtype) + ", "
self.assertEqual(' '.join(str(empty_qr).split()),
"tensor([], " + device_msg + "size=(0, 1), dtype=" + dtype_msg +
"quantization_scheme=torch.per_tensor_affine, " +
"scale=1.0, zero_point=2)")
def test_qtensor_float_assignment(self):
# Scalar Tensor
# item
scale = 1.0
zero_point = 2
r = torch.ones(1, dtype=torch.float)
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
qr = torch.quantize_per_tensor(r, scale, zero_point, dtype=dtype)
self.assertEqual(qr.item(), 1)
self.assertEqual(qr[0].item(), 1)
# assignment
self.assertTrue(qr[0].is_quantized)
qr[0] = 11.3 # float assignment
self.assertEqual(qr.item(), 11)
x = torch.ones(1, dtype=torch.float) * 15.3
# Copying from a float Tensor
qr[:] = x
self.assertEqual(qr.item(), 15)
dtype_msg = str(dtype) + ", "
self.assertEqual(' '.join(str(qr).split()),
"tensor([15.], size=(1,), dtype=" + dtype_msg +
"quantization_scheme=torch.per_tensor_affine, " +
"scale=1.0, zero_point=2)")
def test_qtensor_quant_dequant(self):
scale = 0.02
zero_point = 2
for device in get_supported_device_types():
r = torch.rand(3, 2, dtype=torch.float, device=device) * 4 - 2
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
qr = torch.quantize_per_tensor(r, scale, zero_point, dtype)
rqr = qr.dequantize()
self.assertTrue(np.allclose(r.cpu().numpy(), rqr.cpu().numpy(), atol=2 / scale))
# legacy constructor/new doesn't support qtensors
def test_qtensor_legacy_new_failure(self):
r = torch.rand(3, 2, dtype=torch.float) * 4 - 2
scale = 0.02
zero_point = 2
qr = torch.quantize_per_tensor(r, scale, zero_point, torch.quint8)
self.assertRaises(RuntimeError, lambda: qr.new(device='cpu'))
self.assertRaises(RuntimeError, lambda: qr.new(r.storage()))
self.assertRaises(RuntimeError, lambda: qr.new(r))
self.assertRaises(RuntimeError, lambda: qr.new(torch.Size([2, 3])))
self.assertRaises(RuntimeError, lambda: qr.new([6]))
def test_per_channel_qtensor_creation(self):
numel = 10
ch_axis = 0
scales = torch.rand(numel)
zero_points = torch.randint(0, 10, size=(numel,))
for dtype in [torch.qint8, torch.quint8]:
q = torch._empty_per_channel_affine_quantized(
[numel], scales=scales, zero_points=zero_points, axis=ch_axis, dtype=dtype)
# TODO(#38095): Replace assertEqualIgnoreType. See issue #38095
self.assertEqualIgnoreType(scales, q.q_per_channel_scales())
self.assertEqual(zero_points, q.q_per_channel_zero_points())
self.assertEqual(ch_axis, q.q_per_channel_axis())
# create Tensor from uint8_t Tensor, scales and zero_points
int_tensor = torch.randint(0, 100, size=(numel,), dtype=torch.uint8)
q = torch._make_per_channel_quantized_tensor(int_tensor, scales, zero_points, ch_axis)
self.assertEqual(int_tensor, q.int_repr())
# TODO(#38095): Replace assertEqualIgnoreType. See issue #38095
self.assertEqualIgnoreType(scales, q.q_per_channel_scales())
self.assertEqual(zero_points, q.q_per_channel_zero_points())
self.assertEqual(ch_axis, q.q_per_channel_axis())
def test_qtensor_creation(self):
scale = 0.5
zero_point = 10
numel = 10
for device in get_supported_device_types():
q = torch._empty_affine_quantized([numel], scale=scale, zero_point=zero_point,
device=device, dtype=torch.quint8)
self.assertEqual(scale, q.q_scale())
self.assertEqual(zero_point, q.q_zero_point())
# create Tensor from uint8_t Tensor, scale and zero_point
int_tensor = torch.randint(0, 100, size=(10,), device=device, dtype=torch.uint8)
q = torch._make_per_tensor_quantized_tensor(int_tensor, scale, zero_point)
self.assertEqual(int_tensor, q.int_repr())
self.assertEqual(scale, q.q_scale())
self.assertEqual(zero_point, q.q_zero_point())
# create via empty_like
q = torch._empty_affine_quantized([numel], scale=scale, zero_point=zero_point,
device=device, dtype=torch.quint8)
q_el = torch.empty_like(q)
self.assertEqual(q.q_scale(), q_el.q_scale())
self.assertEqual(q.q_zero_point(), q_el.q_zero_point())
self.assertEqual(q.dtype, q_el.dtype)
# create via empty_like but change the dtype (currently not supported)
with self.assertRaises(RuntimeError):
torch.empty_like(q, dtype=torch.qint8)
def test_qtensor_dtypes(self):
r = torch.rand(3, 2, dtype=torch.float) * 4 - 2
scale = 0.2
zero_point = 2
qr = torch.quantize_per_tensor(r, scale, zero_point, torch.qint8)
rqr = qr.dequantize()
self.assertTrue(np.allclose(r.numpy(), rqr.numpy(), atol=2 / scale))
qr = torch.quantize_per_tensor(r, scale, zero_point, torch.quint8)
rqr = qr.dequantize()
self.assertTrue(np.allclose(r.numpy(), rqr.numpy(), atol=2 / scale))
qr = torch.quantize_per_tensor(r, scale, zero_point, torch.qint32)
rqr = qr.dequantize()
self.assertTrue(np.allclose(r.numpy(), rqr.numpy(), atol=2 / scale))
def test_qtensor_quantize_per_channel(self):
r = torch.rand(3, 2, dtype=torch.float) * 4 - 2
scales = torch.tensor([0.2, 0.03], dtype=torch.double)
zero_points = torch.tensor([5, 10], dtype=torch.long)
axis = 1
def quantize_c(data, scales, zero_points):
res = torch.empty((3, 2))
quant_min, quant_max = 0, 255
for i in range(3):
for j in range(2):
res[i][j] = np.clip(np.round(data[i][j] / scales[j]) + zero_points[j], quant_min, quant_max)
return res
qr = torch.quantize_per_channel(r, scales, zero_points, axis, torch.quint8)
rqr = qr.dequantize()
self.assertTrue(np.allclose(qr.int_repr(), quantize_c(r, scales, zero_points)))
self.assertTrue(np.allclose(r.numpy(), rqr.numpy(), atol=2 / np.min(scales.numpy())))
def test_qtensor_permute(self):
scale = 0.02
zero_point = 1
for device in get_supported_device_types():
r = torch.rand(10, 30, 2, 2, device=device, dtype=torch.float) * 4 - 2
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
qr = torch.quantize_per_tensor(r, scale, zero_point, dtype=dtype)
qr = qr.transpose(0, 1)
rqr = qr.dequantize()
# compare transpose + dequantized result with orignal transposed result
self.assertTrue(np.allclose(r.cpu().numpy().transpose([1, 0, 2, 3]), rqr.cpu().numpy(), atol=2 / scale))
qr = torch.quantize_per_tensor(r, scale, zero_point, dtype=dtype)
qr1 = qr.permute([1, 0, 2, 3])
qr2 = qr.transpose(0, 1)
# compare int representation after transformations
self.assertEqual(qr1.int_repr(), qr2.int_repr())
self.assertEqual(qr1.q_scale(), qr2.q_scale())
self.assertEqual(qr1.q_zero_point(), qr2.q_zero_point())
# compare dequantized result
self.assertEqual(qr1.dequantize(), qr2.dequantize())
# compare permuted + dequantized result with original transposed result
self.assertTrue(np.allclose(qr2.dequantize().cpu().numpy(),
r.cpu().numpy().transpose([1, 0, 2, 3]), atol=2 / scale))
# make permuted result contiguous
self.assertEqual(qr2.contiguous().int_repr(), qr2.int_repr())
# change memory format
qlast = qr.contiguous(memory_format=torch.channels_last)
self.assertEqual(qr.stride(), list(reversed(sorted(qr.stride()))))
self.assertNotEqual(qlast.stride(), list(reversed(sorted(qlast.stride()))))
self.assertEqual(qr.int_repr(), qlast.int_repr())
self.assertEqual(qr.q_scale(), qlast.q_scale())
self.assertEqual(qr.q_zero_point(), qlast.q_zero_point())
self.assertEqual(qlast.dequantize(), qr.dequantize())
# permuting larger tensors
x = torch.randn(64, 64, device=device)
qx = torch.quantize_per_tensor(x, 1.0, 0, dtype)
# should work
qx.permute([1, 0])
def test_qtensor_per_channel_permute(self):
r = torch.rand(20, 10, 2, 2, dtype=torch.float) * 4 - 2
dtype = torch.qint8
scales = torch.rand(10) * 0.02 + 0.01
zero_points = torch.round(torch.rand(10) * 2 - 1).to(torch.long)
qr = torch.quantize_per_channel(r, scales, zero_points, 1, dtype)
# we can't reorder the axis
with self.assertRaises(RuntimeError):
qr.transpose(0, 1)
# but we can change memory format
qlast = qr.contiguous(memory_format=torch.channels_last)
self.assertEqual(qr.stride(), list(reversed(sorted(qr.stride()))))
self.assertNotEqual(qlast.stride(), list(reversed(sorted(qlast.stride()))))
self.assertEqual(qr.int_repr(), qlast.int_repr())
# TODO(#38095): Replace assertEqualIgnoreType. See issue #38095
self.assertEqualIgnoreType(scales, qlast.q_per_channel_scales())
self.assertEqual(zero_points, qlast.q_per_channel_zero_points())
self.assertEqual(1, qlast.q_per_channel_axis())
self.assertEqual(qlast.dequantize(), qr.dequantize())
def test_qtensor_load_save(self):
scale = 0.2
zero_point = 10
# storage is not accessible on the cuda right now
device = "cpu"
r = torch.rand(15, 2, dtype=torch.float32, device=device) * 2
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
qr = torch.quantize_per_tensor(r, scale, zero_point, dtype=dtype)
qrv = qr[:, 1]
with tempfile.NamedTemporaryFile() as f:
# Serializing and Deserializing Tensor
torch.save((qr, qrv), f)
f.seek(0)
qr2, qrv2 = torch.load(f)
self.assertEqual(qr, qr2)
self.assertEqual(qrv, qrv2)
self.assertEqual(qr2.storage().data_ptr(), qrv2.storage().data_ptr())
def test_qtensor_per_channel_load_save(self):
r = torch.rand(20, 10, dtype=torch.float) * 4 - 2
scales = torch.rand(10, dtype=torch.double) * 0.02 + 0.01
zero_points = torch.round(torch.rand(10) * 20 + 1).to(torch.long)
# quint32, cuda is not supported yet
for dtype in [torch.quint8, torch.qint8]:
qr = torch.quantize_per_channel(r, scales, zero_points, 1, dtype)
with tempfile.NamedTemporaryFile() as f:
# Serializing and Deserializing Tensor
torch.save(qr, f)
f.seek(0)
qr2 = torch.load(f)
self.assertEqual(qr, qr2)
def test_qtensor_copy(self):
scale = 0.5
zero_point = 10
numel = 10
for device in get_supported_device_types():
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
# copy from same scale and zero_point
q = torch._empty_affine_quantized([numel], scale=scale,
zero_point=zero_point, device=device, dtype=dtype)
q2 = torch._empty_affine_quantized([numel], scale=scale,
zero_point=zero_point, device=device, dtype=dtype)
q.copy_(q2)
self.assertEqual(q.int_repr(), q2.int_repr())
self.assertEqual(q.q_scale(), q2.q_scale())
self.assertEqual(q.q_zero_point(), q2.q_zero_point())
# copying from different scale and zero_point
scale = 3.2
zero_point = 5
q = torch._empty_affine_quantized([numel], scale=scale,
zero_point=zero_point, device=device, dtype=dtype)
# check original scale and zero_points are set correctly
self.assertEqual(q.q_scale(), scale)
self.assertEqual(q.q_zero_point(), zero_point)
q.copy_(q2)
# check scale and zero_points has been copied
self.assertEqual(q, q2)
# can't copy from quantized tensor to non-quantized tensor
r = torch.empty([numel], dtype=torch.float)
q = torch._empty_affine_quantized([numel], scale=scale, zero_point=zero_point, dtype=torch.quint8)
with self.assertRaisesRegex(RuntimeError, "please use dequantize"):
r.copy_(q)
def test_torch_qtensor_deepcopy(self):
# cuda is not supported yet
device = "cpu"
q_int = torch.randint(0, 100, [3, 5], device=device, dtype=torch.uint8)
scale, zero_point = 2.0, 3
q = torch._make_per_tensor_quantized_tensor(q_int, scale=scale, zero_point=zero_point)
qc = deepcopy(q)
self.assertEqual(qc, q)
def test_qtensor_clone(self):
numel = 10
scale = 0.5
zero_point = 10
for device in get_supported_device_types():
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
q2 = torch._empty_affine_quantized([numel], scale=scale, zero_point=zero_point,
device=device, dtype=dtype)
q = q2.clone()
# Check to make sure the scale and zero_point has been copied.
self.assertEqual(q, q2)
def test_qtensor_view(self):
scale, zero_point, dtype = 1.0, 2, torch.uint8
for device in get_supported_device_types():
q_int = torch.randint(0, 100, [1, 2, 3], device=device, dtype=dtype)
q = torch._make_per_tensor_quantized_tensor(q_int, scale=scale, zero_point=zero_point)
q2 = q.view(1, 3, 2)
self.assertEqual(q.numel(), q2.numel())
# testing -1
self.assertEqual(q, q2.view(1, -1, 3))
a_int = torch.randint(0, 100, [1, 2, 3, 4], device=device, dtype=dtype)
a = torch._make_per_tensor_quantized_tensor(a_int, scale=scale, zero_point=zero_point)
b = a.transpose(1, 2) # swaps 2nd and 3rd dimension
c = a.view(1, 3, 2, 4) # does not change tensor layout in memory
self.assertEqual(b.size(), c.size())
self.assertEqual(b.q_scale(), c.q_scale())
self.assertEqual(b.q_zero_point(), c.q_zero_point())
self.assertNotEqual(b.stride(), c.stride())
# size is the same but the underlying data is different
self.assertNotEqual(b.int_repr(), c.int_repr())
# torch.equal is not supported for the cuda backend
if device == 'cpu':
self.assertFalse(torch.equal(b, c))
else:
self.assertRaises(RuntimeError, lambda: torch.equal(b, c))
# a case can't view non-contiguos Tensor
a_int = torch.randint(0, 100, [1, 2, 3, 4], device=device, dtype=dtype)
a = torch._make_per_tensor_quantized_tensor(a_int, scale=scale, zero_point=zero_point)
b = a.transpose(1, 2) # swaps 2nd and 3rd dimension
err_str = "view size is not compatible with input tensor's size and stride*"
with self.assertRaisesRegex(RuntimeError, err_str):
b.view(1, 4, 2, 3)
# view on contiguous tensor is fine
b.contiguous().view(1, 4, 2, 3)
def test_qtensor_resize(self):
scale, zero_point, dtype = 1.0, 2, torch.uint8
sizes1 = [1, 2, 3, 4]
sizes2 = [1 * 2, 3 * 4]
sizes3 = [1, 2 * 3, 4]
sizes4 = [1 * 2 * 3 * 4]
sizes5 = [1, 2, 1, 3, 1, 4]
q1_int = torch.randint(0, 100, sizes1, dtype=dtype)
q1 = torch._make_per_tensor_quantized_tensor(q1_int, scale=scale, zero_point=zero_point)
q2 = q1.resize(*sizes2)
q3 = q2.resize(*sizes3)
q4 = q3.resize(*sizes4)
q5 = q4.resize(*sizes5)
self.assertEqual(q1.numel(), q2.numel())
self.assertEqual(q1.numel(), q3.numel())
self.assertEqual(q1.numel(), q4.numel())
self.assertEqual(q1.numel(), q5.numel())
# Compare original and post-transpose
a_int = torch.randint(0, 100, sizes1, dtype=dtype)
a = torch._make_per_tensor_quantized_tensor(a_int, scale=scale, zero_point=zero_point)
b = a.transpose(1, 2) # swaps 2nd and 3rd dimension
c = b.resize(*sizes1) # Change the sizes back to the original
self.assertEqual(a.size(), c.size())
self.assertEqual(b.q_scale(), c.q_scale())
self.assertEqual(b.q_zero_point(), c.q_zero_point())
self.assertNotEqual(b.stride(), c.stride())
# size is the same but the underlying data is different
self.assertNotEqual(b.int_repr(), c.int_repr())
self.assertFalse(torch.equal(b, c))
# Throws an error if numel is wrong
q1_int = torch.randint(0, 100, sizes1, dtype=dtype)
q1 = torch._make_per_tensor_quantized_tensor(a_int, scale=scale, zero_point=zero_point)
err_str = "requested resize to*"
with self.assertRaisesRegex(RuntimeError, err_str):
q2 = q1.resize(*sizes1[:-1])
# resize on both contiguous and non-contiguous tensor should be fine
q3 = q1.resize(*sizes2)
q4 = q1.contiguous().resize(*sizes2)
def test_qtensor_reshape(self):
scale, zero_point, dtype = 1.0, 2, torch.uint8
for device in get_supported_device_types():
q_int = torch.randint(0, 100, [3, 5], dtype=dtype, device=device)
q = torch._make_per_tensor_quantized_tensor(q_int, scale=scale, zero_point=zero_point)
q2 = q.reshape([15])
self.assertEqual(q.numel(), q2.numel())
self.assertEqual(q2.size(), [15])
# testing -1
self.assertEqual(q, q2.reshape([3, -1]))
a_int = torch.randint(0, 100, [1, 2, 3, 4], dtype=dtype, device=device)
a = torch._make_per_tensor_quantized_tensor(a_int, scale=scale, zero_point=zero_point)
b = a.transpose(1, 2) # swaps 2nd and 3rd dimension
c = a.reshape(1, 3, 2, 4) # does not change tensor layout
self.assertEqual(b.size(), c.size())
self.assertEqual(b.q_scale(), c.q_scale())
self.assertEqual(b.q_zero_point(), c.q_zero_point())
self.assertNotEqual(b.stride(), c.stride())
self.assertNotEqual(b.int_repr(), c.int_repr())
# torch.equal is not supported for the cuda backend
if device == 'cpu':
self.assertFalse(torch.equal(b, c))
else:
self.assertRaises(RuntimeError, lambda: torch.equal(b, c))
# we can use reshape for non-contiguous Tensor
a_int = torch.randint(0, 100, [1, 2, 3, 4], dtype=dtype, device=device)
a = torch._make_per_tensor_quantized_tensor(a_int, scale=scale, zero_point=zero_point)
b = a.transpose(1, 2) # swaps 2nd and 3rd dimension
c = b.reshape(1, 4, 2, 3)
def test_qtensor_unsqueeze(self):
x = torch.randn((1, 3, 4))
qx = torch.quantize_per_tensor(x, scale=1.0, zero_point=0, dtype=torch.quint8)
qy = qx.unsqueeze(2)
self.assertEqual(qy.size(), (1, 3, 1, 4))
qy = qy.squeeze(2)
self.assertEqual(qy.size(), qx.size())
# Per channel qtensor
scales = torch.tensor([1.0])
zero_points = torch.tensor([0])
qx = torch.quantize_per_channel(x, scales=scales, zero_points=zero_points, dtype=torch.quint8, axis=0)
qy = qx.unsqueeze(0)
self.assertEqual(qy.size(), (1, 1, 3, 4))
self.assertEqual(qy.q_per_channel_axis(), 1)
qz = qy.squeeze(0)
self.assertEqual(qz.size(), x.size())
self.assertEqual(qz.q_per_channel_axis(), 0)
with self.assertRaisesRegex(RuntimeError, "Squeeze is only possible on non-axis dimension for Per-Channel"):
qz = qy.squeeze(1)
# squeeze without dim specified
x = torch.randn((3, 1, 2, 1, 4))
scales = torch.tensor([1.0, 1.0])
zero_points = torch.tensor([0, 0])
qx = torch.quantize_per_channel(x, scales=scales, zero_points=zero_points, dtype=torch.quint8, axis=2)
qz = qx.squeeze()
self.assertEqual(qz.size(), (3, 2, 4))
self.assertEqual(qz.q_per_channel_axis(), 1)
with self.assertRaisesRegex(RuntimeError, "Squeeze is only possible on non-axis dimension for Per-Channel"):
qz = qy.squeeze()
def test_qscheme_pickle(self):
f = Foo()
buf = io.BytesIO()
torch.save(f, buf)
buf.seek(0)
f2 = torch.load(buf)
self.assertEqual(f2.qscheme, torch.per_tensor_symmetric)
@given(X=hu.tensor(shapes=hu.array_shapes(min_dims=2, max_dims=4,
min_side=1, max_side=10),
qparams=hu.qparams()),
reduce_range=st.booleans()
)
def test_choose_qparams(self, X, reduce_range):
X, (scale, zero_point, torch_type) = X
X = torch.from_numpy(X)
X_scale, X_zp = _calculate_dynamic_qparams(X, torch.quint8, reduce_range=reduce_range)
qparams = torch._choose_qparams_per_tensor(X, reduce_range)
np.testing.assert_array_almost_equal(X_scale, qparams[0], decimal=3)
self.assertEqual(X_zp, qparams[1])
@unittest.skipIf(not torch.cuda.is_available() or TEST_WITH_ROCM, 'CUDA is not available')
def test_cuda_cpu_implementation_consistency(self):
numel, zero_point, scale = 100, 2, 0.02
r = torch.rand(numel, dtype=torch.float32, device='cpu') * 25 - 4
for dtype in [torch.qint8, torch.quint8, torch.qint32]:
qr_cpu = torch.quantize_per_tensor(r, scale, zero_point, dtype=dtype)
qr_cuda = torch.quantize_per_tensor(r.cuda(), scale, zero_point, dtype=dtype)
# intr repr must be the same
np.testing.assert_equal(qr_cpu.int_repr().numpy(), qr_cuda.int_repr().cpu().numpy())
# dequantized values must be the same
r_cpu, r_cuda = qr_cpu.dequantize().numpy(), qr_cuda.dequantize().cpu().numpy()
np.testing.assert_almost_equal(r_cuda, r_cpu, decimal=5)