blob: f575180cb3ecaebab93001638805aff056a14518 [file] [log] [blame]
import math
import torch
import random
import unittest
import contextlib
from copy import deepcopy
from itertools import repeat
from functools import wraps
import torch.nn as nn
import torch.nn.parallel as dp
from torch.autograd import Variable
from torch.nn import Parameter
from common_nn import NNTestCase, ModuleTest, CriterionTest, TestBase, \
module_tests, criterion_tests, TEST_CUDA, TEST_MULTIGPU, TEST_CUDNN, PRECISION
from common import freeze_rng_state
def default_tensor_type(type):
type_str = torch.typename(type)
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
old_type = torch.typename(torch.Tensor())
torch.set_default_tensor_type(type_str)
try:
return fn(*args, **kwargs)
finally:
torch.set_default_tensor_type(old_type)
return wrapper
return decorator
class InputVariableMixin(object):
def _get_input(self):
input = TestBase._get_input(self)
def map_variables(i):
if isinstance(i, Variable):
return i
elif torch.is_tensor(i):
return Variable(i, requires_grad=True)
else:
return type(i)(map_variables(elem) for elem in i)
return map_variables(input)
class NewModuleTest(InputVariableMixin, ModuleTest):
def __init__(self, *args, **kwargs):
super(NewModuleTest, self).__init__(*args, **kwargs)
self.cudnn = kwargs.get('cudnn', False)
self.check_inplace = kwargs.get('check_inplace', False)
def _do_test(self, test_case, module, input):
test_case.check_jacobian(module, input, self.jacobian_input)
# check if module can be printed
module.__repr__()
if self.check_inplace:
module_ip = self.constructor(*self.constructor_args, inplace=True)
input_version = input._version
output = module(input)
test_case.assertEqual(input._version, input_version)
input_ip = deepcopy(input)
output_ip = module_ip(input_ip)
test_case.assertNotEqual(input_ip._version, input_version)
test_case.assertEqual(output, output_ip)
if type(input.data) == torch.LongTensor and TEST_CUDA:
input = input.cuda()
module.float().cuda()
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.cuda.FloatTensor)
test_case.assertEqual(p.get_device(), 0)
if torch.cuda.device_count() > 1:
input = input.cuda(1)
module.cuda(1)
with torch.cuda.device(1):
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.cuda.FloatTensor)
test_case.assertEqual(p.get_device(), 1)
else:
# to float
if type(input.data) != torch.LongTensor:
input = input.float()
module.float()
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.FloatTensor)
# and back to double
if type(input.data) != torch.LongTensor:
input = input.double()
module.double()
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.DoubleTensor)
# TODO: Hardshrink is lacking a CUDA implementation
if TEST_CUDA and type(module) != nn.Hardshrink:
# to GPU0
input = input.float().cuda()
module.float().cuda()
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.cuda.FloatTensor)
test_case.assertEqual(p.get_device(), 0)
# to CPU
input = input.cpu()
module.cpu()
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.FloatTensor)
# back to GPU0
input = input.cuda()
module.cuda()
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.cuda.FloatTensor)
test_case.assertEqual(p.get_device(), 0)
if self.cudnn:
torch.backends.cudnn.enabled = False
try:
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.cuda.FloatTensor)
test_case.assertEqual(p.get_device(), 0)
finally:
torch.backends.cudnn.enabled = True
if torch.cuda.device_count() >= 2:
# to GPU1
input = input.cuda(1)
module.cuda(1)
with torch.cuda.device(1):
module(input)
for p in module.parameters():
test_case.assertEqual(type(p.data), torch.cuda.FloatTensor)
test_case.assertEqual(p.get_device(), 1)
class NewCriterionTest(InputVariableMixin, CriterionTest):
# TODO: check that criterions don't ignore grad_output
def _get_target(self, target):
return Variable(target, requires_grad=False)
class TestNN(NNTestCase):
# # protip: uncomment this line to figure out which test is segfaulting
# def setUp(self):
# print("In method", self._testMethodName)
# super(TestNN, self).setUp()
def _forward(self, module, input):
with freeze_rng_state():
return module(input)
def _backward(self, module, input, output, grad_output):
output.backward(grad_output, retain_variables=True)
return input.grad
def _forward_criterion(self, criterion, input, target):
if isinstance(input, tuple):
args = input + (target,)
output = criterion(*args)
else:
output = criterion(input, target)
return output.data[0]
def _backward_criterion(self, criterion, input, target):
input_tuple = input if isinstance(input, tuple) else (input,)
for i in input_tuple:
i.grad.zero_()
args = input_tuple + (target,)
criterion(*args).backward()
if isinstance(input, tuple):
return tuple(map(lambda i: i.grad, input))
else:
return input.grad
def _zero_grad_parameters(self, module):
if hasattr(module, 'weight') and module.weight is not None:
module.weight.grad.zero_()
if hasattr(module, 'bias') and module.bias is not None:
module.bias.grad.zero_()
def _get_parameters(self, module):
params = []
d_params = []
if hasattr(module, 'weight') and module.weight is not None:
params += [module.weight.data]
d_params += [module.weight.grad]
if hasattr(module, 'bias') and module.bias is not None:
params += [module.bias.data]
d_params += [module.bias.grad]
return params, d_params
def test_hooks(self):
module = nn.Sigmoid()
input = Variable(torch.ones(5, 5), requires_grad=True)
counter = {
'forwards': 0,
'backwards': 0
}
def fw_hook(inc, h_module, input, output):
self.assertIsInstance(input, tuple)
self.assertIsInstance(output, Variable)
self.assertTrue(h_module is module)
self.assertEqual(input[0].data, torch.ones(5, 5))
self.assertEqual(output.data, torch.Tensor(5, 5).fill_(1 / (1 + 1 / math.e)))
counter['forwards'] += inc
def bw_hook(inc, h_module, grad_input, grad_output):
self.assertIsInstance(grad_input, tuple)
self.assertIsInstance(grad_output, tuple)
self.assertTrue(h_module is module)
self.assertEqual(grad_output[0], torch.ones(5, 5) * 2)
counter['backwards'] += inc
module.register_forward_hook('test', lambda *args: fw_hook(1, *args))
module(input)
module(input)
self.assertEqual(counter['forwards'], 2)
self.assertEqual(counter['backwards'], 0)
module.register_backward_hook('test', lambda *args: bw_hook(1, *args))
output = module(input)
self.assertEqual(counter['forwards'], 3)
self.assertEqual(counter['backwards'], 0)
output.backward(torch.ones(5, 5) * 2, retain_variables=True)
self.assertEqual(counter['forwards'], 3)
self.assertEqual(counter['backwards'], 1)
output.backward(torch.ones(5, 5) * 2, retain_variables=True)
self.assertEqual(counter['forwards'], 3)
self.assertEqual(counter['backwards'], 2)
module.register_forward_hook('test2', lambda *args: fw_hook(2, *args))
output = module(input)
self.assertEqual(counter['forwards'], 6)
self.assertEqual(counter['backwards'], 2)
module.register_backward_hook('test2', lambda *args: bw_hook(2, *args))
module(input).backward(torch.ones(5, 5) * 2)
self.assertEqual(counter['forwards'], 9)
self.assertEqual(counter['backwards'], 5)
module.remove_backward_hook('test2')
module(input).backward(torch.ones(5, 5) * 2)
self.assertEqual(counter['forwards'], 12)
self.assertEqual(counter['backwards'], 6)
module.remove_forward_hook('test2')
module(input).backward(torch.ones(5, 5) * 2)
self.assertEqual(counter['forwards'], 13)
self.assertEqual(counter['backwards'], 7)
module.remove_forward_hook('test')
module.remove_backward_hook('test')
def test_hook_fail(self):
module = nn.Sigmoid()
input = Variable(torch.randn(5, 5), requires_grad=True)
def fw_fail1(self, input, output):
return output
def fw_fail2(self, input, output):
return input
def bw_fail1(self, grad_input, grad_output):
return grad_input[:-1]
def bw_fail2(self, grad_input, grad_output):
return grad_input + (torch.randn(2, 2),)
module.register_forward_hook('fw_fail', fw_fail1)
with self.assertRaises(RuntimeError) as err:
module(input)
self.assertIn("fw_fail", err.exception.args[0])
self.assertIn("didn't return None", err.exception.args[0])
module.remove_forward_hook('fw_fail')
module.register_forward_hook('fw_fail2', fw_fail2)
with self.assertRaises(RuntimeError) as err:
module(input)
self.assertIn("fw_fail2", err.exception.args[0])
self.assertIn("didn't return None", err.exception.args[0])
module.remove_forward_hook('fw_fail2')
module.register_backward_hook('bw_fail', bw_fail1)
with self.assertRaises(RuntimeError) as err:
module(input).sum().backward()
self.assertIn("bw_fail", err.exception.args[0])
self.assertIn("got 0, but expected 1", err.exception.args[0])
module.remove_backward_hook('bw_fail')
module.register_backward_hook('bw_fail2', bw_fail2)
with self.assertRaises(RuntimeError) as err:
module(input).sum().backward()
self.assertIn("bw_fail2", err.exception.args[0])
self.assertIn("got 2, but expected 1", err.exception.args[0])
module.remove_backward_hook('bw_fail2')
def test_hook_writeable(self):
module = nn.Linear(5, 5)
input = Variable(torch.randn(5, 5), requires_grad=True)
def bw_hook(self, grad_input, grad_output):
return tuple(gi * 2 for gi in grad_input)
module.register_backward_hook('test', bw_hook)
module(input).backward(torch.ones(5, 5))
expected_grad = torch.ones(5, 5).mm(module.weight.data) * 2
self.assertEqual(input.grad, expected_grad)
def test_volatile(self):
module = nn.Conv2d(2, 5, kernel_size=3, padding=1)
input = torch.randn(1, 2, 10, 10)
x = Variable(input)
y = Variable(input.clone(), volatile=True)
output = module(x)
self.assertFalse(output.volatile)
self.assertTrue(output.requires_grad)
output.backward(torch.ones(1, 5, 10, 10))
vol_output = module(y)
self.assertTrue(vol_output.volatile)
self.assertFalse(vol_output.requires_grad)
self.assertRaises(RuntimeError, lambda: vol_output.backward(torch.ones(1, 5, 10, 10)))
def _test_dropout(self, cls, input):
p = 0.2
input.fill_(1-p)
module = cls(p)
input_var = Variable(input, requires_grad=True)
output = module(input_var)
self.assertLess(abs(output.data.mean() - (1-p)), 0.05)
output.backward(input)
self.assertLess(abs(input_var.grad.mean() - (1-p)), 0.05)
module = cls(p, True)
input_var = Variable(input.clone(), requires_grad=True)
output = module(input_var + 0)
self.assertLess(abs(output.data.mean() - (1-p)), 0.05)
output.backward(input)
self.assertLess(abs(input_var.grad.mean() - (1-p)), 0.05)
# Check that these don't raise errors
module.__repr__()
str(module)
def test_parameters(self):
def num_params(module):
return len(list(module.parameters()))
class Net(nn.Container):
def __init__(self):
super(Net, self).__init__(
l1=l,
l2=l
)
self.param = Parameter(torch.Tensor(3, 5))
l = nn.Linear(10, 20)
n = Net()
s = nn.Sequential(n, n, n, n)
self.assertEqual(num_params(l), 2)
self.assertEqual(num_params(n), 3)
self.assertEqual(num_params(s), 3)
def test_modules(self):
class Net(nn.Container):
def __init__(self):
super(Net, self).__init__()
self.l1 = l
self.l2 = l
self.param = Variable(torch.Tensor(3, 5))
l = nn.Linear(10, 20)
n = Net()
s = nn.Sequential(n, n, n, n)
self.assertEqual(list(s.modules()), [s, n, l])
def test_Sequential_getitem(self):
l1 = nn.Linear(10, 20)
l2 = nn.Linear(20, 30)
l3 = nn.Linear(30, 40)
l4 = nn.Linear(40, 50)
n = nn.Sequential(l1, l2, l3, l4)
self.assertEqual(n[0], l1)
self.assertEqual(n[1], l2)
self.assertEqual(n[2], l3)
self.assertEqual(n[3], l4)
def test_add_module(self):
l = nn.Linear(10, 20)
net = nn.Container(
l=l,
l2=l,
empty=None,
)
self.assertEqual(net.l, l)
self.assertEqual(net.l2, l)
self.assertEqual(net.empty, None)
net.add_module('l3', l)
self.assertEqual(net.l3, l)
self.assertRaises(KeyError, lambda: net.add_module('l', l))
self.assertRaises(TypeError, lambda: net.add_module('x', 'non-module'))
def test_type(self):
l = nn.Linear(10, 20)
net = nn.Container(
l=l,
l2=l,
empty=None,
)
net.float()
self.assertIsInstance(l.weight.data, torch.FloatTensor)
self.assertIsInstance(l.bias.data, torch.FloatTensor)
net.double()
self.assertIsInstance(l.weight.data, torch.DoubleTensor)
self.assertIsInstance(l.bias.data, torch.DoubleTensor)
net.type(torch.FloatTensor)
self.assertIsInstance(l.weight.data, torch.FloatTensor)
self.assertIsInstance(l.bias.data, torch.FloatTensor)
net.type(torch.DoubleTensor)
self.assertIsInstance(l.weight.data, torch.DoubleTensor)
self.assertIsInstance(l.bias.data, torch.DoubleTensor)
if TEST_CUDA:
net.type(torch.cuda.FloatTensor)
self.assertIsInstance(l.weight.data, torch.cuda.FloatTensor)
self.assertIsInstance(l.bias.data, torch.cuda.FloatTensor)
def test_non_leaf_parameters(self):
l1 = nn.Linear(10, 10)
l2 = nn.Linear(10, 10)
def assign_weight():
l2.weight = l1.weight + 2
self.assertRaises(TypeError, assign_weight)
# This should work though
l2.weight = Parameter(torch.randn(10, 10))
def test_embedding_padding_idx(self):
embedding = nn.Embedding(10, 20, padding_idx = 0)
input = Variable(torch.LongTensor([[0,2,4,5],[4,3,0,9]]))
output = embedding(input)
self.assertEqual(output[0][0].sum().data[0], 0)
self.assertEqual(output[1][2].sum().data[0], 0)
def test_Dropout(self):
input = torch.Tensor(1000)
self._test_dropout(nn.Dropout, input)
def test_Dropout2d(self):
b = random.randint(1, 5)
w = random.randint(1, 5)
h = random.randint(1, 5)
num_features = 1000
input = torch.Tensor(num_features, b, w, h)
self._test_dropout(nn.Dropout2d, input)
def test_Dropout3d(self):
b = random.randint(1, 5)
w = random.randint(1, 5)
h = random.randint(1, 5)
d = random.randint(1, 2)
num_features = 1000
input = torch.Tensor(num_features, b, d, w, h)
self._test_dropout(nn.Dropout3d, input)
def _test_maxpool_indices(self, num_dim):
def expected_indices(dim):
if dim == 1:
return torch.DoubleTensor([1, 3])
lower_dim = expected_indices(dim-1)
lower_dim = lower_dim.view(1, *lower_dim.size())
return torch.cat((lower_dim+4, lower_dim+12), 0)
def expected_grad(dim):
if dim == 1:
return torch.DoubleTensor([0, 1, 0, 1])
lower_dim_grad = expected_grad(dim-1)
grad = lower_dim_grad.view(1, *lower_dim_grad.size())
zero = torch.zeros(grad.size())
return torch.cat((zero, grad, zero, grad), 0)
module_cls = getattr(nn, 'MaxPool{}d'.format(num_dim))
module = module_cls(2, return_indices=True)
numel = 4 ** num_dim
input = torch.range(1, numel).view(1, 1, *repeat(4, num_dim))
input_var = Variable(input, requires_grad=True)
# Check forward
output, indices = module(input_var)
if num_dim != 3:
expected_indices = expected_indices(num_dim)
expected_output = expected_indices + 1
self.assertEqual(indices.data.squeeze(), expected_indices)
self.assertEqual(output.data.squeeze(), expected_output)
self.assertTrue(output.requires_grad)
self.assertFalse(indices.requires_grad)
# Make sure backward works
grad_output = torch.DoubleTensor(output.size()).fill_(1)
output.backward(grad_output, retain_variables=True)
expected_grad = expected_grad(num_dim)
self.assertEqual(input_var.grad, expected_grad.view_as(input))
# Make sure backward after changing indices will result in an error
indices.add_(1)
self.assertRaises(RuntimeError, lambda: output.backward(grad_output))
def test_MaxPool1d_indices(self):
self._test_maxpool_indices(1)
def test_MaxPool2d_indices(self):
self._test_maxpool_indices(2)
def test_MaxPool3d_indices(self):
self._test_maxpool_indices(3)
def _test_scatter(self, tensor):
x = Variable(tensor, requires_grad=True)
result = dp.scatter(x, (0, 1))
self.assertEqual(len(result), 2)
self.assertEqual(result[0], x[:2])
self.assertEqual(result[0].get_device(), 0)
self.assertEqual(result[1], x[2:])
self.assertEqual(result[1].get_device(), 1)
grad = result[0].data.clone().fill_(2)
result[0].backward(grad)
self.assertEqual(x.grad[:2], grad)
self.assertEqual(x.grad[2:], grad.clone().zero_())
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_scatter_cpu(self):
self._test_scatter(torch.randn(4, 4))
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_scatter_gpu(self):
self._test_scatter(torch.randn(4, 4).cuda())
def _test_gather(self, output_device):
inputs = (
Variable(torch.randn(2, 4).cuda(0), requires_grad=True),
Variable(torch.randn(2, 4).cuda(1), requires_grad=True)
)
result = dp.gather(inputs, output_device)
self.assertEqual(result.size(), torch.Size([4, 4]))
self.assertEqual(result[:2], inputs[0])
self.assertEqual(result[2:], inputs[1])
if output_device != -1:
self.assertEqual(result.get_device(), output_device)
else:
self.assertFalse(result.is_cuda)
grad = torch.randn(4, 4)
if output_device != -1:
grad = grad.cuda(output_device)
result.backward(grad)
self.assertEqual(inputs[0].grad, grad[:2])
self.assertEqual(inputs[1].grad, grad[2:])
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_gather_cpu(self):
self._test_gather(-1)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_gather_gpu(self):
self._test_gather(0)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_replicate(self):
module = nn.Linear(10, 5).float().cuda()
input = Variable(torch.randn(2, 10).float().cuda())
expected_output = module(input).data
replicas = dp.replicate(module, (0, 1))
for i, replica in enumerate(replicas):
for p in replica.parameters():
self.assertEqual(p.get_device(), i)
replica_input = input.cuda(i)
self.assertEqual(replica(replica_input).data, expected_output)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_replicate_buffers(self):
net = nn.Container()
net.bn = nn.BatchNorm2d(10)
net.cuda()
replicas = dp.replicate(net, (0, 1))
for i, replica in enumerate(replicas):
self.assertEqual(replica.bn.running_mean.get_device(), i, 'buffer on wrong device')
self.assertEqual(replica.bn.running_var.get_device(), i, 'buffer on wrong device')
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_parallel_apply(self):
l1 = nn.Linear(10, 5).float().cuda(0)
l2 = nn.Linear(10, 5).float().cuda(1)
i1 = Variable(torch.randn(2, 10).float().cuda(0))
i2 = Variable(torch.randn(2, 10).float().cuda(1))
expected1 = l1(i1).data
expected2 = l2(i2).data
inputs = (i1, i2)
modules = (l1, l2)
expected_outputs = (expected1, expected2)
outputs = dp.parallel_apply(modules, inputs)
for out, expected in zip(outputs, expected_outputs):
self.assertEqual(out.data, expected)
inputs = (i1, Variable(i2.data.new()))
expected_outputs = (expected1, expected2.new())
def test_data_parallel_noop(self):
l = nn.Linear(10, 5).float()
i = Variable(torch.randn(20, 10).float())
out = dp.data_parallel(l, i, [])
self.assertEqual(out, l(i))
self.assertFalse(out.is_cuda)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_data_parallel_small_back(self):
l = nn.Linear(10, 5).float().cuda()
i = Variable(torch.randn(20, 10).float().cuda())
out = dp.data_parallel(l, i, (0, 1))
self.assertEqual(out, l(i))
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_data_parallel(self):
l = nn.Linear(10, 5).float().cuda()
i = Variable(torch.randn(20, 10).float().cuda(1))
l.cuda(1)
expected_out = l(i).data
l.cuda(0)
out = dp.data_parallel(l, i, (0, 1))
self.assertEqual(out.get_device(), 0)
self.assertEqual(out.data, expected_out)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_data_parallel_nested_output(self):
def fn(input):
return [input, (input.sin(), input.cos(), [input.add(1)]), input]
class Net(nn.Container):
def forward(self, input):
return fn(input)
i = Variable(torch.randn(2, 2).float().cuda(1))
gpus = range(torch.cuda.device_count())
output = dp.data_parallel(Net(), i, gpus)
self.assertEqual(output, fn(i))
self.assertIsInstance(output[0], Variable)
self.assertIsInstance(output[1], tuple)
self.assertIsInstance(output[1][0], Variable)
self.assertIsInstance(output[1][1], Variable)
self.assertIsInstance(output[1][2], list)
self.assertIsInstance(output[1][2][0], Variable)
self.assertIsInstance(output[2], Variable)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_data_parallel_nested_input(self):
def fn(input):
return input[1][0]
class Net(nn.Container):
def forward(self, input):
return fn(input)
i = Variable(torch.randn(20, 3).float().cuda(1))
input = (i.cos(), (i.sin(), i), i.sin())
gpus = range(torch.cuda.device_count())
output = dp.data_parallel(Net(), input, gpus)
self.assertEqual(output, fn(input))
@unittest.skipIf(not TEST_CUDA, "CUDA unavailable")
def test_data_parallel_module(self):
l = nn.Linear(10, 5).float().cuda()
i = Variable(torch.randn(20, 10).float().cuda())
expected_out = l(i).data
net = nn.DataParallel(l)
out = net(i)
self.assertEqual(out.get_device(), 0)
self.assertEqual(out.data, expected_out)
def test_state_dict(self):
l = nn.Linear(5, 5)
block = nn.Container(
conv=nn.Conv2d(3, 3, 3, bias=False)
)
net = nn.Container(
linear1=l,
linear2=l,
bn=nn.BatchNorm2d(2),
block=block,
empty=None,
)
state_dict = net.state_dict()
self.assertEqual(len(state_dict), 9)
self.assertIn('linear1.weight', state_dict)
self.assertIn('linear1.bias', state_dict)
self.assertIn('linear2.weight', state_dict)
self.assertIn('linear2.bias', state_dict)
self.assertIn('block.conv.weight', state_dict)
self.assertIn('block.conv.weight', state_dict)
self.assertNotIn('block.conv.bias', state_dict)
self.assertIn('bn.weight', state_dict)
self.assertIn('bn.bias', state_dict)
self.assertIn('bn.running_var', state_dict)
self.assertIn('bn.running_mean', state_dict)
self.assertFalse(any(map(lambda k: k.startswith('empty'), state_dict.keys())))
for k, v in state_dict.items():
param = net
for component in k.split('.'):
param = getattr(param, component)
self.assertIs(v, param)
l = nn.Linear(5, 5)
state_dict = l.state_dict()
self.assertEqual(len(state_dict), 2)
self.assertIs(state_dict['weight'], l.weight)
self.assertIs(state_dict['bias'], l.bias)
def test_load_state_dict(self):
l = nn.Linear(5, 5)
block = nn.Container(
conv1=nn.Conv2d(3, 3, 3, bias=False),
conv2=nn.Conv2d(3, 3, 3, bias=False),
)
net = nn.Container(
linear1=l,
linear2=l,
bn=nn.BatchNorm2d(2),
block=block,
empty=None,
)
state_dict = {
'linear1.weight': Parameter(torch.ones(5, 5)),
'block.conv1.bias': Parameter(torch.range(1, 3)),
'block.conv2.bias': None,
'bn.running_mean': torch.randn(2),
}
net.load_state_dict(state_dict)
self.assertIs(net.linear1.weight, state_dict['linear1.weight'])
self.assertIs(net.block.conv1.bias, state_dict['block.conv1.bias'])
self.assertIs(net.block.conv2.bias, state_dict['block.conv2.bias'])
self.assertIs(net.bn.running_mean, state_dict['bn.running_mean'])
state_dict = {
'linear1.weight': torch.ones(5, 5)
}
self.assertRaises(TypeError, lambda: net.load_state_dict(state_dict))
def test_parameter_assignment(self):
l = nn.Linear(5, 5)
def num_params():
return len(list(l.parameters()))
self.assertEqual(num_params(), 2)
new_param = Parameter(torch.randn(5, 5))
l.param_name = new_param
self.assertEqual(num_params(), 3)
self.assertObjectIn(new_param, l.parameters())
var = Variable(torch.randn(5, 5))
l.var_name = var
self.assertEqual(num_params(), 3)
self.assertNotIn(var, l.parameters())
# Make sure Variables are not saved as parameters
l.variable_attr = Variable(torch.Tensor(5, 5))
self.assertEqual(num_params(), 3)
l.param_attr = Parameter(torch.Tensor(5, 5))
self.assertEqual(num_params(), 4)
# It shouldn't be possible to replace a parameter with a Variable
def assign_var():
l.param_attr = Variable(torch.Tensor(5, 5))
self.assertRaises(TypeError, assign_var)
# But replacing it with None should be fine
l.param_attr = None
self.assertEqual(num_params(), 3)
def test_ConvTranspose2d_output_size(self):
m = nn.ConvTranspose2d(3, 4, 3, 3, 0, 2)
i = Variable(torch.randn(2, 3, 6, 6))
for h in range(15, 22):
for w in range(15, 22):
if 18 <= h <= 20 and 18 <= w <= 20:
output = m(i, output_size=(h, w))
self.assertEqual(output.size()[2:], (h, w))
else:
self.assertRaises(ValueError, lambda: m(i, (h, w)))
def test_Conv2d_naive_groups(self):
# Check that grouped convolutions matches two half convolutions
m = nn.Conv2d(4, 4, kernel_size=3, groups=2)
i = Variable(torch.randn(2, 4, 6, 6), requires_grad=True)
output = m(i)
grad_output = torch.randn(2, 4, 4, 4)
output.backward(grad_output)
m1 = nn.Conv2d(2, 2, kernel_size=3)
m1.weight.data.copy_(m.weight.data[:2])
m1.bias.data.copy_(m.bias.data[:2])
i1 = Variable(i.data[:, :2].contiguous(), requires_grad=True)
output1 = m1(i1)
output1.backward(grad_output[:, :2].contiguous())
m2 = nn.Conv2d(2, 2, kernel_size=3)
m2.weight.data.copy_(m.weight.data[2:])
m2.bias.data.copy_(m.bias.data[2:])
i2 = Variable(i.data[:, 2:].contiguous(), requires_grad=True)
output2 = m2(i2)
output2.backward(grad_output[:, 2:].contiguous())
self.assertEqual(output, torch.cat([output1, output2], 1))
self.assertEqual(i.grad, torch.cat([i1.grad, i2.grad], 1))
self.assertEqual(m.bias.grad, torch.cat([m1.bias.grad, m2.bias.grad], 0))
self.assertEqual(m.weight.grad, torch.cat([m1.weight.grad, m2.weight.grad], 0))
def test_MaxUnpool2d_output_size(self):
m = nn.MaxPool2d(3, stride=2, return_indices=True)
mu = nn.MaxUnpool2d(3, stride=2)
big_t = torch.rand(1, 1, 6, 6)
big_t[0][0][4][4] = 100
output_big, indices_big = m(Variable(big_t))
self.assertRaises(RuntimeError, lambda: mu(output_big, indices_big))
small_t = torch.rand(1, 1, 5, 5)
for i in range(0, 4, 2):
for j in range(0, 4, 2):
small_t[:,:,i,j] = 100
output_small, indices_small = m(Variable(small_t))
for h in range(3, 10):
for w in range(3, 10):
if 4 <= h <= 6 and 4 <= w <= 6:
size = (h, w)
if h == 5:
size = torch.LongStorage(size)
elif h == 6:
size = torch.LongStorage((1, 1) + size)
mu(output_small, indices_small, output_size=size)
else:
self.assertRaises(ValueError, lambda:
mu(output_small, indices_small, (h, w)))
def test_RNN_cell(self):
# this is just a smoke test; these modules are implemented through
# autograd so no Jacobian test is needed
for module in (nn.RNNCell, nn.GRUCell):
for bias in (True, False):
input = Variable(torch.randn(3, 10))
hx = Variable(torch.randn(3, 20))
cell = module(10, 20, bias=bias)
for i in range(6):
hx = cell(input, hx)
hx.sum().backward()
def test_LSTM_cell(self):
# this is just a smoke test; these modules are implemented through
# autograd so no Jacobian test is needed
for bias in (True, False):
input = Variable(torch.randn(3, 10))
hx = Variable(torch.randn(3, 20))
cx = Variable(torch.randn(3, 20))
lstm = nn.LSTMCell(10, 20, bias=bias)
for i in range(6):
hx, cx = lstm(input, (hx, cx))
(hx+cx).sum().backward()
@unittest.skipIf(not TEST_CUDNN, "needs cudnn")
@default_tensor_type(torch.FloatTensor) # FIXME: just until torch.cuda.DoubleTensor.sum() implemented
def test_RNN_cpu_vs_cudnn(self):
def forward_backward(cuda, rnn, input_val, hx_val, weights_val):
is_lstm = type(rnn) == nn.LSTM
for x_layer, y_layer in zip(rnn.all_weights, weights_val):
for x, y in zip(x_layer, y_layer):
x.data.copy_(y.data)
input = Variable(input_val.clone(), requires_grad=True)
if is_lstm:
hx = (Variable(hx_val.clone(), requires_grad=True),
Variable(hx_val.add(1), requires_grad=True))
else:
hx = Variable(hx_val.clone(), requires_grad=True)
if cuda:
rnn.cuda()
input.data = input.data.cuda()
if is_lstm:
hx[0].data = hx[0].data.cuda()
hx[1].data = hx[1].data.cuda()
else:
hx.data = hx.data.cuda()
output, hy = rnn(input, hx)
# FIXME this is because of a pytorch bug
if is_lstm:
fake_loss = 0*(hy[0] + hy[1]).sum()
else:
fake_loss = 0*hy.sum()
loss = output.sum() + fake_loss
loss.backward()
return {'output': output.data,
'hy': hy[0].data if is_lstm else hy.data,
'weights': rnn.all_weights,
'grad_input': input.grad,
'grad_hx': hx[0].grad if is_lstm else hx.grad,
'cy': hy[1].data if is_lstm else None,
'grad_cx': hx[1].grad if is_lstm else None}
input_size = 10
hidden_size = 6
num_layers = 2
seq_length = 7
batch = 5
def compare_cpu_gpu(outputs_cpu, outputs_gpu):
self.assertEqual(list(outputs_cpu.keys()), list(outputs_gpu.keys()))
for key in outputs_cpu.keys():
if key != 'weights':
self.assertEqual(outputs_cpu[key], outputs_gpu[key], prec=5e-5, message=key)
# check grad weights separately, as nested dict
for cpu_layer_weight, gpu_layer_weight in zip(outputs_cpu['weights'], outputs_gpu['weights']):
for (cpu_weight, gpu_weight) in zip(cpu_layer_weight, gpu_layer_weight):
self.assertEqual(cpu_weight.grad, gpu_weight.grad, prec=5e-5)
input_val = torch.randn(seq_length, batch, input_size)
for module in (nn.RNN, nn.LSTM, nn.GRU):
for bias in (True, False):
for bidirectional in (False, True):
for dropout in (0, 1): # Because of dropout randomness, can only compare 0 and 1
num_directions = 2 if bidirectional else 1
hx_val = torch.randn(num_layers * num_directions, batch, hidden_size)
rnn = module(input_size,
hidden_size,
num_layers,
bias=bias,
dropout=dropout,
bidirectional=bidirectional)
outputs_cpu = forward_backward(
False, rnn, input_val, hx_val, rnn.all_weights)
rnn_gpu = module(input_size,
hidden_size,
num_layers,
bias=bias,
dropout=dropout,
bidirectional=bidirectional)
outputs_gpu = forward_backward(
True, rnn_gpu, input_val, hx_val, rnn.all_weights)
compare_cpu_gpu(outputs_cpu, outputs_gpu)
for nonlinearity in ('tanh', 'relu'):
hx_val = torch.randn(num_layers, batch, hidden_size)
rnn = nn.rnn.RNN(input_size, hidden_size, num_layers, bias=bias, nonlinearity=nonlinearity)
outputs_cpu = forward_backward(False, rnn, input_val, hx_val, rnn.all_weights)
rnn_gpu = nn.rnn.RNN(input_size, hidden_size, num_layers, bias=bias, nonlinearity=nonlinearity)
outputs_gpu = forward_backward(True, rnn_gpu, input_val, hx_val, rnn.all_weights)
compare_cpu_gpu(outputs_cpu, outputs_gpu)
@unittest.skipIf(not TEST_CUDNN, "needs cudnn")
def test_RNN_dropout(self):
# checking the assumption that cuDNN sticks dropout in between
# RNN layers
for p in (0, 0.276, 0.731, 1):
for train in (True, False):
for cuda in (True, False):
rnn = nn.RNN(10, 1000, 2, bias=False, dropout=p, nonlinearity='relu')
if cuda:
rnn.cuda()
if train:
rnn.train()
else:
rnn.eval()
rnn.weight_ih_l0.data.fill_(1)
rnn.weight_hh_l0.data.fill_(1)
rnn.weight_ih_l1.data.fill_(1)
rnn.weight_hh_l1.data.fill_(1)
input = Variable(torch.Tensor(1,1,10).fill_(1))
hx = Variable(torch.Tensor(2,1,1000).fill_(0))
if cuda:
input = input.cuda()
hx = hx.cuda()
output, hy = rnn(input, hx)
self.assertEqual(output.data.min(), output.data.max())
output_val = output.data[0][0][0]
if p == 0 or not train:
self.assertEqual(output_val, 10000)
elif p == 1:
self.assertEqual(output_val, 0)
else:
self.assertGreater(output_val, 8000)
self.assertLess(output_val, 12000)
denorm_mod = (output_val * (1 - p)) % 10
self.assertLess(min(denorm_mod, 10 - denorm_mod), 1e-2)
self.assertEqual(hy[0].data.min(), hy[0].data.max())
self.assertEqual(hy[1].data.min(), hy[1].data.max())
self.assertEqual(hy.data[0][0][0], 10)
self.assertEqual(hy.data[1][0][0], output_val)
@unittest.skipIf(not TEST_CUDNN, "needs cudnn")
def test_RNN_dropout_state(self):
import sys
if sys.version_info[0] == 2:
import cPickle as pickle
else:
import pickle
for p in (0, 0.1234):
for train in (True, False):
for cuda in (True, False):
rnn = nn.RNN(100, 100, 2, bias=False, dropout=p, nonlinearity='relu')
if cuda:
rnn.cuda()
if train:
rnn.train()
else:
rnn.eval()
input = Variable(torch.Tensor(1,1,100).uniform_())
hx = Variable(torch.Tensor(2,1,100).uniform_())
if cuda:
input = input.cuda()
hx = hx.cuda()
output1, hy1 = rnn(input, hx)
output2, hy2 = rnn(input, hx)
rnn_pickle = pickle.dumps(rnn)
rnn2 = pickle.loads(rnn_pickle)
output3, hy3 = rnn2(input, hx)
if p == 0 or not train:
self.assertEqual(output1, output2)
self.assertEqual(output1, output3)
self.assertEqual(hy1, hy2)
self.assertEqual(hy1, hy3)
else:
self.assertNotEqual(output1, output2)
self.assertNotEqual(output1, output3)
self.assertNotEqual(hy1, hy2)
self.assertNotEqual(hy1, hy3)
def _verify_pixel_shuffle(self, input, output, upscale_factor):
for c in range(output.size(1)):
for h in range(output.size(2)):
for w in range(output.size(3)):
height_idx = h // upscale_factor
weight_idx = w // upscale_factor
channel_idx = (upscale_factor * (h % upscale_factor)) + (w % upscale_factor) + \
(c * upscale_factor ** 2)
self.assertEqual(output[:, c, h, w], input[:, channel_idx, height_idx, weight_idx])
def test_pixel_shuffle(self):
batch_size = random.randint(1, 3)
upscale_factor = random.randint(2, 5)
channels = random.randint(1, 4) * upscale_factor ** 2
height = random.randint(5, 10)
width = random.randint(5, 10)
input = Variable(torch.Tensor(batch_size, channels, height, width).uniform_(), requires_grad=True)
ps = nn.PixelShuffle(upscale_factor)
output = ps(input)
self._verify_pixel_shuffle(input.data, output.data, upscale_factor)
output.backward(output.data)
self.assertEqual(input.data, input.grad)
def add_test(test):
test_name = test.get_name()
cuda_test_name = test_name + '_cuda'
if hasattr(TestNN, test_name):
raise RuntimeError('Found two tests with the same name: ' + test_name)
if hasattr(TestNN, cuda_test_name):
raise RuntimeError('Found two tests with the same name: ' + cuda_test_name)
setattr(TestNN, test_name, lambda self,test=test: test(self))
setattr(TestNN, cuda_test_name, lambda self,test=test: test.test_cuda(self))
new_module_tests = [
dict(
module_name='BatchNorm1d',
constructor_args=(10,),
input_size=(4, 10),
cudnn=True,
desc='affine'
),
dict(
module_name='BatchNorm1d',
constructor_args=(5,),
input_size=(4, 5, 3),
cudnn=True,
desc='3d_input'
),
dict(
module_name='BatchNorm1d',
constructor_args=(10, 1e-3, 0.3, False),
input_size=(4, 10),
cudnn=True,
desc='not_affine'
),
dict(
module_name='BatchNorm2d',
constructor_args=(3,),
input_size=(2, 3, 6, 6),
cudnn=True,
),
dict(
module_name='BatchNorm2d',
constructor_args=(3, 1e-3, 0.8),
input_size=(2, 3, 6, 6),
cudnn=True,
desc='momentum',
),
dict(
module_name='BatchNorm2d',
constructor_args=(3, 1e-3, 0.8, False),
input_size=(2, 3, 6, 6),
cudnn=True,
desc='no_affine',
),
dict(
module_name='BatchNorm3d',
constructor_args=(3,),
input_size=(2, 3, 4, 4, 4),
cudnn=True,
),
dict(
module_name='BatchNorm3d',
constructor_args=(3, 1e-3, 0.7),
input_size=(2, 3, 4, 4, 4),
cudnn=True,
desc='momentum'
),
dict(
module_name='BatchNorm3d',
constructor_args=(3, 1e-3, 0.7, False),
input_size=(2, 3, 4, 4, 4),
cudnn=True,
desc='no_affine'
),
dict(
module_name='Conv1d',
constructor_args=(4, 5, 3),
input_size=(2, 4, 10),
cudnn=True,
),
dict(
module_name='Conv1d',
constructor_args=(4, 5, 3),
input_size=(2, 4, 10),
cudnn=True,
desc='stride'
),
dict(
fullname='Conv1d_dilated',
constructor=lambda: nn.Conv1d(4, 5, kernel_size=3, dilation=2),
input_size=(2, 4, 10),
),
dict(
fullname='Conv1d_groups',
constructor=lambda: nn.Conv1d(4, 6, kernel_size=3, groups=2),
input_size=(2, 4, 6),
cudnn=True,
),
dict(
module_name='MaxPool1d',
constructor_args=(4,),
input_size=(2, 10, 4)
),
dict(
module_name='MaxPool1d',
constructor_args=(4, 4),
input_size=(2, 10, 4),
desc='stride'
),
dict(
module_name='Conv2d',
constructor_args=(3, 4, (3, 2)),
input_size=(2, 3, 7, 5),
cudnn=True,
),
dict(
module_name='Conv2d',
constructor_args=(3, 4, (3, 3), (2, 2)),
input_size=(2, 3, 6, 6),
cudnn=True,
desc='strided'
),
dict(
module_name='Conv2d',
constructor_args=(3, 4, (3, 3), (2, 2), (1, 1)),
input_size=(2, 3, 6, 6),
cudnn=True,
desc='padding'
),
dict(
module_name='Conv2d',
constructor_args=(3, 2, (3, 3), (2, 2), (1, 1), (2, 2)),
input_size=(2, 3, 8, 8),
cudnn=True,
desc='dilated'
),
dict(
module_name='Conv2d',
constructor_args=(3, 4, (3, 2), 1, 0, 1, 1, False),
input_size=(2, 3, 6, 5),
cudnn=True,
desc='no_bias',
),
dict(
fullname='Conv2d_groups',
constructor=lambda: nn.Conv2d(4, 6, (3, 2), groups=2),
input_size=(2, 4, 6, 5),
cudnn=True,
),
dict(
module_name='ConvTranspose2d',
constructor_args=(3, 4, 3, (3, 2), 1, (1, 1)),
cudnn=True,
input_size=(1, 3, 7, 6)
),
dict(
module_name='ConvTranspose2d',
constructor_args=(3, 4, 3, (2, 3), 1, (1, 1), 1, False),
input_size=(1, 3, 6, 7),
cudnn=True,
desc='no_bias'
),
dict(
fullname='ConvTranspose2d_groups',
constructor=lambda: nn.ConvTranspose2d(2, 4, (2, 3), groups=2),
input_size=(1, 2, 4, 5),
cudnn=True,
),
dict(
module_name='MaxPool2d',
constructor_args=((3, 3), (2, 2), (1, 1)),
input_size=(1, 3, 7, 7)
),
dict(
module_name='AvgPool2d',
constructor_args=((2, 2),),
input_size=(2, 3, 6, 6),
),
dict(
module_name='AvgPool2d',
constructor_args=((2, 2), (2, 2)),
input_size=(2, 3, 6, 6),
desc='stride',
),
dict(
module_name='AvgPool2d',
constructor_args=((2, 2), (2, 2), (1, 1)),
input_size=(2, 3, 6, 6),
desc='stride_pad',
),
dict(
module_name='LPPool2d',
constructor_args=(2, (2, 2), 2),
input_size=(1, 3, 7, 7)
),
dict(
module_name='LPPool2d',
constructor_args=(1.5, 2),
input=torch.rand(1, 3, 7, 7),
desc='norm'
),
dict(
module_name='ReflectionPad2d',
constructor_args=((1, 2, 3, 4),),
input_size=(2, 3, 8, 8)
),
dict(
module_name='ReplicationPad2d',
constructor_args=((1, 2, 3, 4),),
input_size=(2, 3, 4, 4)
),
dict(
module_name='Conv3d',
constructor_args=(3, 4, (2, 3, 4)),
input_size=(2, 3, 3, 4, 5),
cudnn=True,
),
dict(
module_name='Conv3d',
constructor_args=(3, 4, 2, 2),
input_size=(2, 3, 5, 5, 5),
cudnn=True,
desc='stride'
),
dict(
module_name='Conv3d',
constructor_args=(3, 4, 2, 2, 1),
input_size=(2, 3, 5, 5, 5),
cudnn=True,
desc='stride_padding'
),
dict(
fullname='Conv3d_groups',
constructor=lambda: nn.Conv3d(4, 6, kernel_size=3, groups=2),
input_size=(2, 4, 4, 5, 4),
cudnn=True,
),
dict(
fullname='Conv3d_dilated',
constructor=lambda: nn.Conv3d(3, 4, kernel_size=2, dilation=2),
input_size=(2, 3, 5, 5, 5),
),
dict(
module_name='ConvTranspose3d',
constructor_args=(2, 3, (2, 3, 2)),
cudnn=True,
input_size=(1, 2, 4, 5, 4)
),
dict(
module_name='MaxPool3d',
constructor_args=((2, 2, 2),),
input_size=(2, 3, 5, 5, 5)
),
dict(
module_name='MaxPool3d',
constructor_args=(2, (2, 2, 2)),
input_size=(2, 3, 5, 5, 5),
desc='stride'
),
dict(
module_name='MaxPool3d',
constructor_args=(2, 2, (1, 1, 1)),
input_size=(2, 3, 5, 5, 5),
desc='stride_padding'
),
dict(
module_name='AvgPool3d',
constructor_args=((2, 2, 2),),
input_size=(2, 3, 4, 4, 4)
),
dict(
module_name='AvgPool3d',
constructor_args=(2, (2, 2, 2)),
input_size=(2, 3, 5, 5, 5),
desc='stride'
),
dict(
module_name='ReplicationPad3d',
constructor_args=((1, 2, 3, 4, 5, 6),),
input_size=(2, 3, 5, 5, 5)
),
dict(
module_name='Embedding',
constructor_args=(4, 3),
input=Variable(
torch.randperm(2).repeat(1, 2),
requires_grad=False
),
jacobian_input=False
),
dict(
constructor=lambda: nn.FractionalMaxPool2d(2, output_ratio=0.5, _random_samples=torch.DoubleTensor(1, 3, 2).uniform_()),
input_size=(1, 3, 5, 5),
fullname='FractionalMaxPool2d_ratio',
test_cuda=False
),
dict(
constructor=lambda: nn.FractionalMaxPool2d((2, 2), output_size=(4, 4), _random_samples=torch.DoubleTensor(1, 3, 2).uniform_()),
input_size=(1, 3, 7, 7),
fullname='FractionalMaxPool2d_size',
test_cuda=False
),
dict(
module_name='PixelShuffle',
constructor_args=(3,),
input_size=(1, 9, 4, 4),
),
]
for test_params in module_tests + new_module_tests:
# TODO: CUDA is not implemented yet
if 'constructor' not in test_params:
name = test_params.pop('module_name')
test_params['constructor'] = getattr(nn, name)
test = NewModuleTest(**test_params)
add_test(test)
for test_params in criterion_tests:
name = test_params.pop('module_name')
test_params['constructor'] = getattr(nn, name)
test = NewCriterionTest(**test_params)
add_test(test)
class UnpoolingNet2d(nn.Container):
def __init__(self):
super(UnpoolingNet2d, self).__init__(
pool=nn.MaxPool2d(2, return_indices=True),
unpool=nn.MaxUnpool2d(2)
)
def forward(self, input):
return self.unpool(*self.pool(input))
class UnpoolingNet3d(nn.Container):
def __init__(self):
super(UnpoolingNet3d, self).__init__(
pool=nn.MaxPool3d(2, return_indices=True),
unpool=nn.MaxUnpool3d(2)
)
def forward(self, input):
return self.unpool(*self.pool(input))
add_test(NewModuleTest(
constructor=UnpoolingNet2d,
input_size=(1, 1, 8, 8),
fullname='MaxUnpool2d_net'))
add_test(NewModuleTest(
constructor=UnpoolingNet3d,
input_size=(1, 1, 8, 8, 8),
fullname='MaxUnpool3d_net'))
if __name__ == '__main__':
unittest.main()