| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| from hypothesis import given |
| import numpy as np |
| import unittest |
| |
| from caffe2.proto import caffe2_pb2, hsm_pb2 |
| from caffe2.python import workspace, core, gradient_checker |
| import caffe2.python.hypothesis_test_util as hu |
| import caffe2.python.hsm_util as hsmu |
| |
| # User inputs tree using protobuf file or, in this case, python utils |
| # The hierarchy in this test looks as shown below. Note that the final subtrees |
| # (with word_ids as leaves) have been collapsed for visualization |
| # * |
| # / \ |
| # * 5,6,7,8 |
| # / \ |
| # 0,1,2 3,4 |
| tree = hsm_pb2.TreeProto() |
| words = [[0, 1, 2], [3, 4], [5, 6, 7, 8]] |
| node1 = hsmu.create_node_with_words(words[0], "node1") |
| node2 = hsmu.create_node_with_words(words[1], "node2") |
| node3 = hsmu.create_node_with_words(words[2], "node3") |
| node4 = hsmu.create_node_with_nodes([node1, node2], "node4") |
| node = hsmu.create_node_with_nodes([node4, node3], "node5") |
| tree.root_node.MergeFrom(node) |
| |
| # structure: |
| # node5: [0, 2, ["node4", "node3"]] # offset, length, "node4, node3" |
| # node4: [2, 2, ["node1", "node2"]] |
| # node1: [4, 3, [0, 1 ,2]] |
| # node2: [7, 2, [3, 4] |
| # node3: [9, 4, [5, 6, 7, 8] |
| struct = [[0, 2, ["node4", "node3"], "node5"], |
| [2, 2, ["node1", "node2"], "node4"], |
| [4, 3, [0, 1, 2], "node1"], |
| [7, 2, [3, 4], "node2"], |
| [9, 4, [5, 6, 7, 8], "node3"]] |
| |
| # Internal util to translate input tree to list of (word_id,path). serialized |
| # hierarchy is passed into the operator_def as a string argument, |
| hierarchy_proto = hsmu.create_hierarchy(tree) |
| arg = caffe2_pb2.Argument() |
| arg.name = "hierarchy" |
| arg.s = hierarchy_proto.SerializeToString() |
| |
| beam = 5 |
| args_search = [] |
| arg_search = caffe2_pb2.Argument() |
| arg_search.name = "tree" |
| arg_search.s = tree.SerializeToString() |
| args_search.append(arg_search) |
| arg_search = caffe2_pb2.Argument() |
| arg_search.name = "beam" |
| arg_search.f = beam |
| args_search.append(arg_search) |
| |
| |
| class TestHsm(hu.HypothesisTestCase): |
| def test_hsm_search(self): |
| samples = 10 |
| dim_in = 5 |
| X = np.random.rand(samples, dim_in).astype(np.float32) - 0.5 |
| w = np.random.rand(hierarchy_proto.size, dim_in) \ |
| .astype(np.float32) - 0.5 |
| b = np.random.rand(hierarchy_proto.size).astype(np.float32) - 0.5 |
| labels = np.array([np.random.randint(0, 8) for i in range(samples)]) \ |
| .astype(np.int32) |
| |
| workspace.GlobalInit(['caffe2']) |
| workspace.FeedBlob("data", X) |
| workspace.FeedBlob("weights", w) |
| workspace.FeedBlob("bias", b) |
| workspace.FeedBlob("labels", labels) |
| op = core.CreateOperator( |
| 'HSoftmaxSearch', |
| ['data', 'weights', 'bias'], |
| ['names', 'scores'], |
| 'HSoftmaxSearch', |
| arg=args_search) |
| workspace.RunOperatorOnce(op) |
| names = workspace.FetchBlob('names') |
| scores = workspace.FetchBlob('scores') |
| |
| def simulation_hsm_search(): |
| names = [] |
| scores = [] |
| for line in struct: |
| s, e = line[0], line[0] + line[1] |
| score = np.dot(X, w[s:e].transpose()) + b[s:e] |
| score = np.exp(score - np.max(score, axis=1, keepdims=True)) |
| score /= score.sum(axis=1, keepdims=True) |
| score = -np.log(score) |
| |
| score = score.transpose() |
| idx = -1 |
| for j, n in enumerate(names): |
| if n == line[3]: |
| idx = j |
| score += scores[j] |
| if idx == -1: |
| score[score > beam] = np.inf |
| else: |
| score[score - scores[idx] > beam] = np.inf |
| |
| for i, name in enumerate(line[2]): |
| scores.append(score[i]) |
| names.append(name) |
| scores = np.vstack(scores) |
| return names, scores.transpose() |
| |
| p_names, p_scores = simulation_hsm_search() |
| idx = np.argsort(p_scores, axis=1) |
| p_scores = np.sort(p_scores, axis=1) |
| p_names = np.array(p_names)[idx] |
| for i in range(names.shape[0]): |
| for j in range(names.shape[1]): |
| if names[i][j]: |
| assert(names[i][j] == p_names[i][j]) |
| self.assertAlmostEqual( |
| scores[i][j], p_scores[i][j], delta=0.001) |
| |
| def test_hsm_run_once(self): |
| workspace.GlobalInit(['caffe2']) |
| workspace.FeedBlob("data", |
| np.random.randn(1000, 100).astype(np.float32)) |
| workspace.FeedBlob("weights", |
| np.random.randn(1000, 100).astype(np.float32)) |
| workspace.FeedBlob("bias", np.random.randn(1000).astype(np.float32)) |
| workspace.FeedBlob("labels", np.random.rand(1000).astype(np.int32) * 9) |
| op = core.CreateOperator( |
| 'HSoftmax', |
| ['data', 'weights', 'bias', 'labels'], |
| ['output', 'intermediate_output'], |
| 'HSoftmax', |
| arg=[arg]) |
| self.assertTrue(workspace.RunOperatorOnce(op)) |
| |
| # Test to check value of sum of squared losses in forward pass for given |
| # input |
| def test_hsm_forward(self): |
| cpu_device_option = caffe2_pb2.DeviceOption() |
| grad_checker = gradient_checker.GradientChecker( |
| 0.01, 0.05, cpu_device_option, "default") |
| samples = 9 |
| dim_in = 5 |
| X = np.zeros((samples, dim_in)).astype(np.float32) + 1 |
| w = np.zeros((hierarchy_proto.size, dim_in)).astype(np.float32) + 1 |
| b = np.array([i for i in range(hierarchy_proto.size)])\ |
| .astype(np.float32) |
| labels = np.array([i for i in range(samples)]).astype(np.int32) |
| |
| workspace.GlobalInit(['caffe2']) |
| workspace.FeedBlob("data", X) |
| workspace.FeedBlob("weights", w) |
| workspace.FeedBlob("bias", b) |
| workspace.FeedBlob("labels", labels) |
| |
| op = core.CreateOperator( |
| 'HSoftmax', |
| ['data', 'weights', 'bias', 'labels'], |
| ['output', 'intermediate_output'], |
| 'HSoftmax', |
| arg=[arg]) |
| grad_ops, g_input = core.GradientRegistry.GetGradientForOp( |
| op, [s + '_grad' for s in op.output]) |
| |
| loss, _ = grad_checker.GetLossAndGrad( |
| op, grad_ops, X, op.input[0], g_input[0], [0] |
| ) |
| self.assertAlmostEqual(loss, 44.269, delta=0.001) |
| |
| # Test to compare gradient calculated using the gradient operator and the |
| # symmetric derivative calculated using Euler Method |
| # TODO: convert to both cpu and gpu test when ready. |
| @given(**hu.gcs_cpu_only) |
| def test_hsm_gradient(self, gc, dc): |
| samples = 10 |
| dim_in = 5 |
| X = np.random.rand(samples, dim_in).astype(np.float32) - 0.5 |
| w = np.random.rand(hierarchy_proto.size, dim_in) \ |
| .astype(np.float32) - 0.5 |
| b = np.random.rand(hierarchy_proto.size).astype(np.float32) - 0.5 |
| labels = np.array([np.random.randint(0, 8) for i in range(samples)]) \ |
| .astype(np.int32) |
| |
| workspace.GlobalInit(['caffe2']) |
| workspace.FeedBlob("data", X) |
| workspace.FeedBlob("weights", w) |
| workspace.FeedBlob("bias", b) |
| workspace.FeedBlob("labels", labels) |
| |
| op = core.CreateOperator( |
| 'HSoftmax', |
| ['data', 'weights', 'bias', 'labels'], |
| ['output', 'intermediate_output'], |
| 'HSoftmax', |
| arg=[arg]) |
| |
| self.assertDeviceChecks(dc, op, [X, w, b, labels], [0]) |
| |
| for i in range(3): |
| self.assertGradientChecks(gc, op, [X, w, b, labels], i, [0]) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |