blob: d9f0b65d43a681c6be574650a44963b13d088c1f [file] [log] [blame]
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from hypothesis import given
from caffe2.python import core, workspace
from caffe2.proto import caffe2_pb2
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
def feed_inputs(inputs):
for name, value in inputs.items():
workspace.FeedBlob(name, value)
def assert_proto_equals(proto, expected):
proto_lines = proto.strip().split('\n')
expected_lines = expected.strip().split('\n')
assert len(proto_lines) == len(expected_lines), \
'{} != {}'.format(proto, expected)
for left, right in zip(proto_lines, expected_lines):
assert left.strip() == right.strip(), \
'{} != {}'.format(proto, expected)
class TestCaffe2Script(hu.HypothesisTestCase):
test_program = """
def foo(a,b,X,W) -> (c):
t = a + b*b
c = FC(X,W,t)
def testIf(c0,c1,t,f) -> (r):
if c0 < c1:
r = t
else:
r = f
r = Add(r,3f,broadcast=1)
def testWhile(r) -> (r):
m = 0
while m < 4:
# Plus operator automatically broadcasts, and we cannot
# do in-place B and C arguments when we broadcast, so use
# an explicit Add op.
r = Add(r, r)
m = m + 1
"""
@given(firstdim=st.integers(min_value=1, max_value=4096),
seconddim=st.integers(min_value=1, max_value=4096),
seed=st.integers(min_value=0, max_value=65536),
**hu.gcs)
def test_foo(self, firstdim, seconddim, seed, gc, dc):
np.random.seed(int(seed))
inputs = {}
a = inputs['a'] = np.random.rand(seconddim).astype(np.float32)
b = inputs['b'] = np.random.rand(seconddim).astype(np.float32)
X = inputs['X'] = np.random.rand(firstdim, firstdim).astype(np.float32)
W = inputs['W'] = np.random.rand(seconddim, firstdim).astype(np.float32)
feed_inputs(inputs)
CU = core.C.CompilationUnit()
CU.define(self.test_program)
CU.create_net('foo').run()
ref_t = a + b * b
ref_c = np.matmul(X, W.transpose()) + ref_t
actual_c = workspace.FetchBlob('c')
np.testing.assert_allclose(actual_c, ref_c, rtol=1e-05)
def test_trinary(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo(c) -> (d):
d = 1 + (2 if c else 4)
""")
workspace.FeedBlob('c', np.ones((1), dtype=bool))
net = CU.create_net('foo')
net.run()
assert(3 == workspace.FetchBlob('d'))
workspace.FeedBlob('c', np.zeros((1), dtype=bool))
net.run()
assert(5 == workspace.FetchBlob('d'))
def test_bool_literal(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> (a,b):
a = True
b = False
""")
net = CU.create_net('foo')
net.run()
assert(workspace.FetchBlob('a'))
assert(not workspace.FetchBlob('b'))
def test_bool_operators(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> (a, b, c, d, e):
a = True and False
b = True or False
c = not b
d = not False or True
e = not (1 if a else 0) == (1 if b else 0)
""")
net = CU.create_net('foo')
net.run()
assert(not workspace.FetchBlob('a'))
assert(workspace.FetchBlob('b'))
assert(not workspace.FetchBlob('c'))
assert(workspace.FetchBlob('d'))
assert(workspace.FetchBlob('e'))
def expect_fail(self, fn, msg):
try:
fn()
except RuntimeError as r:
if msg not in str(r):
raise RuntimeError(
"Failed wrong: expected string '{}' ".format(msg) +
"in error message but found\n{}".format(str(r)))
def test_fails(self):
def fail_inputs():
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> ():
Print(1,4)
""")
self.expect_fail(fail_inputs, "expects 1 inputs but found 2")
def fail_undef():
CU = core.C.CompilationUnit()
CU.define("""
def foo(a) -> (b):
a = what()
""")
self.expect_fail(fail_undef, "attempting to call unknown operation")
def fail_schema():
CU = core.C.CompilationUnit()
CU.define("""
def foo(a) -> (b):
a = FC(a,a,a)
""")
self.expect_fail(fail_schema, "failed schema checking")
def test_print(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> ():
a = 1
Print(a)
Print(a+1)
_ = 4
Print(_) # verify in print this isn't _ but some temorary
Print(1)
Print(1.f)
Print(3.0)
""")
net = CU.create_net('foo')
net.run()
def test_method(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> (a):
a = (3+1).Add(4).Add(1)
""")
net = CU.create_net('foo')
net.run()
assert(9 == workspace.FetchBlob('a'))
def test_plus_eq(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> (a):
a = 4
a += 1
""")
net = CU.create_net('foo')
net.run()
assert(5 == workspace.FetchBlob('a'))
def test_cast(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> (a):
a = int(4.5f)
""")
net = CU.create_net('foo')
net.run()
assert(4 == workspace.FetchBlob('a'))
def test_global(self):
CU = core.C.CompilationUnit()
CU.define("""
def foo() -> (a):
global m
m.a = 4
m.b = 5
a = m.a + m.b
""")
net = CU.create_net('foo')
net.run()
assert(9 == workspace.FetchBlob('a'))
def test_module_as_arg_ret(self):
CU = core.C.CompilationUnit()
CU.define("""
def bar(a,c) -> (b):
b = Module()
temp = a.second
b.first = temp
b.second = a.first + c
def foo() -> (a,b):
x = Module()
x.first = 1
x.second = 2
x.y = bar(x,4)
a = x.y.first
b = x.y.second
""")
net = CU.create_net('foo')
net.run()
assert(2 == workspace.FetchBlob('a'))
assert(5 == workspace.FetchBlob('b'))
def test_call_extern(self):
CU = core.C.CompilationUnit()
net = caffe2_pb2.NetDef()
net.op.extend([
core.CreateOperator(
'Mul',
['i', 'i'],
['o'],
)
])
net.external_input.append('i')
net.external_output.append('o')
CU.extern("myActualExtern", net)
CU.define("""
def myExtern(x) -> (y):
t = x
if t > 1:
y = t * t
else:
y = 5
def foo() -> (b):
a = 4
a += 1
b = 2 + myExtern(a) + myExtern(a, rename=False) + myActualExtern(a)
""")
net = CU.create_net('foo')
net.run()
assert(77 == workspace.FetchBlob('b'))
@given(seed=st.integers(min_value=0, max_value=65536), **hu.gcs)
def test_if(self, seed, gc, dc):
np.random.seed(int(seed))
inputs = {}
c0 = inputs['c0'] = np.random.rand(1).astype(np.float32)
c1 = inputs['c1'] = np.random.rand(1).astype(np.float32)
t = inputs['t'] = np.random.rand(3, 3).astype(np.float32)
f = inputs['f'] = np.random.rand(3, 3).astype(np.float32)
feed_inputs(inputs)
CU = core.C.CompilationUnit()
CU.define(self.test_program)
CU.create_net('testIf').run()
if c0 < c1:
ref_r = t + 3
else:
ref_r = f + 3
actual_r = workspace.FetchBlob('r')
np.testing.assert_allclose(actual_r, ref_r)
@given(seed=st.integers(min_value=0, max_value=65536), **hu.gcs)
def test_while(self, seed, gc, dc):
np.random.seed(int(seed))
inputs = {}
r = inputs['r'] = np.ones([3, 3]).astype(np.float32)
feed_inputs(inputs)
CU = core.C.CompilationUnit()
CU.define(self.test_program)
CU.create_net('testWhile').run()
m = 0
while m < 4:
r = r + r
m = m + 1
actual_r = workspace.FetchBlob('r')
np.testing.assert_allclose(actual_r, r)
@given(seed=st.integers(min_value=0, max_value=65536), **hu.gcs)
def test_gather(self, seed, gc, dc):
CU = core.C.CompilationUnit()
CU.define("""
def easy(tensor, indices) -> (output):
output = tensor[indices]
def hard(tensor, i, j, k) -> (output):
output = tensor[i][j][k]
""")
# First check that the generated proto is as expected. This tests that
# we desugar the gather syntax correctly and emit the right code.
proto = CU.get_proto('easy')
assert_proto_equals(proto, """
name: "easy"
op {
input: "tensor"
input: "indices"
output: "output"
type: "Gather"
}""")
proto = CU.get_proto('hard')
assert_proto_equals(proto, """
name: "hard"
op {
input: "tensor"
input: "i"
output: "$t1"
type: "Gather"
}
op {
input: "$t1"
input: "j"
output: "$t0"
type: "Gather"
}
op {
input: "$t0"
input: "k"
output: "output"
type: "Gather"
}""")
# Now just test that the effect of the generated code is as expected.
np.random.seed(int(seed))
tensor = np.random.rand(5, 4, 3).astype(np.float32)
indices = np.random.randint(len(tensor), size=(5, 5))
feed_inputs(dict(tensor=tensor, indices=indices))
net = CU.create_net('easy')
net.run()
output = workspace.FetchBlob('output')
expected_output = [tensor[sample] for sample in indices]
np.testing.assert_allclose(output, expected_output)
@given(seed=st.integers(min_value=0, max_value=65536), **hu.gcs)
def test_slice(self, seed, gc, dc):
CU = core.C.CompilationUnit()
CU.define("""
def slice_from_tensor(tensor, start, end) -> (output):
output = tensor[start:end]
def slice_from_vector(vector, start, end) -> (a, b, c, d):
a = vector[start:end]
b = vector[start:]
c = vector[:end]
d = vector[:]
""")
# slice_from_tensor
proto = CU.get_proto('slice_from_tensor')
assert_proto_equals(proto, """
name: "slice_from_tensor"
op {
input: "tensor"
input: "start"
input: "end"
output: "output"
type: "Slice"
}""")
np.random.seed(int(seed))
tensor = np.random.rand(5, 4, 3).astype(np.float32)
start = np.array([0, 1, 0], dtype=np.int32)
end = np.array([-1, 2, -1], dtype=np.int32)
feed_inputs(dict(tensor=tensor, start=start, end=end))
net = CU.create_net('slice_from_tensor')
net.run()
output = workspace.FetchBlob('output')
np.testing.assert_allclose(output, tensor[:, 1:2])
# slice_from_vector
proto = CU.get_proto('slice_from_vector')
assert_proto_equals(proto, """
name: "slice_from_vector"
op {
input: "vector"
input: "start"
input: "end"
output: "a"
type: "Slice"
}
op {
output: "$t0"
type: "ConstantFill"
arg {
name: "dtype"
i: 2
}
arg {
name: "value"
i: -1
}
arg {
name: "shape"
ints: 1
}
}
op {
input: "vector"
input: "start"
input: "$t0"
output: "b"
type: "Slice"
}
op {
output: "$t1"
type: "ConstantFill"
arg {
name: "dtype"
i: 2
}
arg {
name: "value"
i: 0
}
arg {
name: "shape"
ints: 1
}
}
op {
input: "vector"
input: "$t1"
input: "end"
output: "c"
type: "Slice"
}
op {
output: "$t2"
type: "ConstantFill"
arg {
name: "dtype"
i: 2
}
arg {
name: "value"
i: 0
}
arg {
name: "shape"
ints: 1
}
}
op {
output: "$t3"
type: "ConstantFill"
arg {
name: "dtype"
i: 2
}
arg {
name: "value"
i: -1
}
arg {
name: "shape"
ints: 1
}
}
op {
input: "vector"
input: "$t2"
input: "$t3"
output: "d"
type: "Slice"
}""")
vector = np.random.rand(10).astype(np.float32)
start = np.array([2], dtype=np.int32)
end = np.array([6], dtype=np.int32)
feed_inputs(dict(vector=vector, start=start, end=end))
net = CU.create_net('slice_from_vector')
net.run()
output = workspace.FetchBlob('a')
np.testing.assert_allclose(output, vector[2:6])
output = workspace.FetchBlob('b')
np.testing.assert_allclose(output, vector[2:])
output = workspace.FetchBlob('c')
np.testing.assert_allclose(output, vector[:6])
output = workspace.FetchBlob('d')
np.testing.assert_allclose(output, vector)