blob: 588ead4f514115eb2bffd0fc50d3d2bd1df6ba86 [file] [log] [blame]
import unittest
import contextlib
from itertools import product
from copy import deepcopy
import torch
import torch.cuda
from torch.autograd import Variable, Function
torch.set_default_tensor_type('torch.DoubleTensor')
torch.manual_seed(123)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(123)
TEST_NUMPY = True
try:
import numpy
except ImportError:
TEST_NUMPY = False
def get_cpu_type(t):
assert t.__module__ == 'torch.cuda'
return getattr(torch, t.__class__.__name__)
def get_gpu_type(t):
assert t.__module__ == 'torch'
return getattr(torch.cuda, t.__name__)
def to_gpu(obj, type_map={}):
if torch.is_tensor(obj):
t = type_map.get(type(obj), get_gpu_type(type(obj)))
return obj.clone().type(t)
elif torch.is_storage(obj):
return obj.new().resize_(obj.size()).copy_(obj)
elif isinstance(obj, Variable):
assert obj.creator is None
t = type_map.get(type(obj.data), get_gpu_type(type(obj.data)))
return Variable(obj.data.clone().type(t), requires_grad=obj.requires_grad)
elif isinstance(obj, list):
return [to_gpu(o, type_map) for o in obj]
elif isinstance(obj, tuple):
return tuple(to_gpu(o, type_map) for o in obj)
else:
return deepcopy(obj)
@contextlib.contextmanager
def freeze_rng_state():
rng_state = torch.get_rng_state()
if torch.cuda.is_available():
cuda_rng_state = torch.cuda.get_rng_state()
yield
if torch.cuda.is_available():
torch.cuda.set_rng_state(cuda_rng_state)
torch.set_rng_state(rng_state)
def iter_indices(tensor):
if tensor.dim() == 0:
return range(0)
if tensor.dim() == 1:
return range(tensor.size(0))
return product(*(range(s) for s in tensor.size()))
def is_iterable(obj):
try:
iter(obj)
return True
except:
return False
class TestCase(unittest.TestCase):
precision = 1e-5
def assertEqual(self, x, y, prec=None, message=''):
if prec is None:
prec = self.precision
if isinstance(x, Variable) and isinstance(y, Variable):
x = x.data
y = y.data
if torch.is_tensor(x) and torch.is_tensor(y):
max_err = 0
super(TestCase, self).assertEqual(x.size().tolist(), y.size().tolist())
for index in iter_indices(x):
max_err = max(max_err, abs(x[index] - y[index]))
self.assertLessEqual(max_err, prec, message)
elif type(x) == str and type(y) == str:
super(TestCase, self).assertEqual(x, y)
elif is_iterable(x) and is_iterable(y):
for x_, y_ in zip(x, y):
self.assertEqual(x_, y_, prec, message)
else:
try:
self.assertLessEqual(abs(x - y), prec, message)
return
except:
pass
super(TestCase, self).assertEqual(x, y, message)
def make_jacobian(input, num_out):
if isinstance(input, Variable) and not input.requires_grad:
return None
if torch.is_tensor(input) or isinstance(input, Variable):
return torch.zeros(input.nelement(), num_out)
else:
return type(input)(filter(lambda x: x is not None,
(make_jacobian(elem, num_out) for elem in input)))
def iter_tensors(x, only_requiring_grad=False):
if torch.is_tensor(x):
yield x
elif isinstance(x, Variable):
if x.requires_grad or not only_requiring_grad:
yield x.data
else:
for elem in x:
for result in iter_tensors(elem, only_requiring_grad):
yield result
def contiguous(input):
if torch.is_tensor(input):
return input.contiguous()
elif isinstance(input, Variable):
return input.contiguous()
else:
return type(input)(contiguous(e) for e in input)
def get_numerical_jacobian(fn, input, target):
perturbation = 1e-6
# To be able to use .view(-1) input must be contiguous
input = contiguous(input)
output_size = fn(input).numel()
jacobian = make_jacobian(target, output_size)
# It's much easier to iterate over flattened lists of tensors.
# These are reference to the same objects in jacobian, so any changes
# will be reflected in it as well.
x_tensors = [t for t in iter_tensors(target, True)]
j_tensors = [t for t in iter_tensors(jacobian)]
outa = torch.DoubleTensor(output_size)
outb = torch.DoubleTensor(output_size)
# TODO: compare structure
for x_tensor, d_tensor in zip(x_tensors, j_tensors):
flat_tensor = x_tensor.view(-1)
for i in range(flat_tensor.nelement()):
orig = flat_tensor[i]
flat_tensor[i] = orig - perturbation
outa.copy_(fn(input))
flat_tensor[i] = orig + perturbation
outb.copy_(fn(input))
flat_tensor[i] = orig
outb.add_(-1,outa).div_(2*perturbation)
d_tensor[i] = outb
return jacobian