| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| |
| from collections import namedtuple |
| |
| from caffe2.python import ( |
| core, |
| layer_model_instantiator, |
| layer_model_helper, |
| schema, |
| test_util, |
| ) |
| import numpy as np |
| |
| |
| OpSpec = namedtuple("OpSpec", "type input output") |
| |
| |
| class TestLayers(test_util.TestCase): |
| |
| def setUp(self): |
| super(TestLayers, self).setUp() |
| input_feature_schema = schema.Struct( |
| ('float_features', schema.Scalar((np.float32, (32,)))), |
| ) |
| trainer_extra_schema = schema.Struct() |
| |
| self.model = layer_model_helper.LayerModelHelper( |
| 'test_model', |
| input_feature_schema=input_feature_schema, |
| trainer_extra_schema=trainer_extra_schema) |
| |
| def new_record(self, schema_obj): |
| return schema.NewRecord(self.model.net, schema_obj) |
| |
| def get_training_nets(self): |
| """ |
| We don't use |
| layer_model_instantiator.generate_training_nets_forward_only() |
| here because it includes initialization of global constants, which make |
| testing tricky |
| """ |
| train_net = core.Net('train_net') |
| train_init_net = core.Net('trai_init_net') |
| for layer in self.model.layers: |
| layer.add_operators(train_net, train_init_net) |
| return train_init_net, train_net |
| |
| def get_predict_net(self): |
| return layer_model_instantiator.generate_predict_net(self.model) |
| |
| def assertBlobsEqual(self, spec_blobs, op_blobs): |
| """ |
| spec_blobs can either be None or a list of blob names. If it's None, |
| then no assertion is performed. The elements of the list can be None, |
| in that case, it means that position will not be checked. |
| """ |
| if spec_blobs is None: |
| return |
| self.assertEqual(len(spec_blobs), len(op_blobs)) |
| for spec_blob, op_blob in zip(spec_blobs, op_blobs): |
| if spec_blob is None: |
| continue |
| self.assertEqual(spec_blob, op_blob) |
| |
| def assertNetContainOps(self, net, op_specs): |
| """ |
| Given a net and a list of OpSpec's, check that the net match the spec |
| """ |
| ops = net.Proto().op |
| self.assertEqual(len(op_specs), len(ops)) |
| for op, op_spec in zip(ops, op_specs): |
| self.assertEqual(op_spec.type, op.type) |
| self.assertBlobsEqual(op_spec.input, op.input) |
| self.assertBlobsEqual(op_spec.output, op.output) |
| return ops |
| |
| def testFCWithoutBias(self): |
| output_dims = 2 |
| fc_without_bias = self.model.FCWithoutBias( |
| self.model.input_feature_schema.float_features, output_dims) |
| |
| self.assertEqual( |
| schema.Scalar((np.float32, (output_dims, ))), |
| fc_without_bias |
| ) |
| |
| train_init_net, train_net = self.get_training_nets() |
| |
| init_ops = self.assertNetContainOps( |
| train_init_net, |
| [ |
| OpSpec("UniformFill", None, None), |
| ] |
| ) |
| |
| mat_mul_spec = OpSpec( |
| "MatMul", |
| [ |
| self.model.input_feature_schema.float_features(), |
| init_ops[0].output[0], |
| ], |
| fc_without_bias.field_blobs() |
| ) |
| |
| self.assertNetContainOps(train_net, [mat_mul_spec]) |
| |
| predict_net = self.get_predict_net() |
| self.assertNetContainOps(predict_net, [mat_mul_spec]) |
| |
| def testBatchSigmoidCrossEntropyLoss(self): |
| input_record = self.new_record(schema.Struct( |
| ('label', schema.Scalar((np.float32, (32,)))), |
| ('prediction', schema.Scalar((np.float32, (32,)))) |
| )) |
| loss = self.model.BatchSigmoidCrossEntropyLoss(input_record) |
| self.assertEqual(schema.Scalar((np.float32, tuple())), loss) |
| |
| def testBatchSoftmaxLoss(self): |
| input_record = self.new_record(schema.Struct( |
| ('label', schema.Scalar((np.float32, tuple()))), |
| ('prediction', schema.Scalar((np.float32, (32,)))) |
| )) |
| loss = self.model.BatchSoftmaxLoss(input_record) |
| self.assertEqual(schema.Struct( |
| ('softmax', schema.Scalar((np.float32, (32,)))), |
| ('loss', schema.Scalar(np.float32)), |
| ), loss) |
| |
| def testFunctionalLayer(self): |
| def normalize(net, in_record, out_record): |
| mean = net.ReduceFrontMean(in_record(), 1) |
| net.Sub( |
| [in_record(), mean], |
| out_record[0](), |
| broadcast=1) |
| normalized = self.model.Functional( |
| self.model.input_feature_schema.float_features, 1, |
| normalize, name="normalizer") |
| |
| # Attach metadata to one of the outputs and use it in FC |
| normalized[0].set_type((np.float32, 32)) |
| self.model.FC(normalized[0], 2) |
| |
| predict_net = layer_model_instantiator.generate_predict_net( |
| self.model) |
| ops = predict_net.Proto().op |
| assert len(ops) == 3 |
| assert ops[0].type == "ReduceFrontMean" |
| assert ops[1].type == "Sub" |
| assert ops[2].type == "FC" |
| assert len(ops[0].input) == 1 |
| assert ops[0].input[0] ==\ |
| self.model.input_feature_schema.float_features() |
| assert len(ops[1].output) == 1 |
| assert ops[1].output[0] in ops[2].input |
| |
| def testFunctionalLayerHelper(self): |
| mean = self.model.ReduceFrontMean( |
| self.model.input_feature_schema.float_features, 1) |
| normalized = self.model.Sub( |
| schema.Tuple( |
| self.model.input_feature_schema.float_features, mean[0]), |
| 1, broadcast=1) |
| # Attach metadata to one of the outputs and use it in FC |
| normalized[0].set_type((np.float32, (32,))) |
| self.model.FC(normalized[0], 2) |
| |
| predict_net = layer_model_instantiator.generate_predict_net( |
| self.model) |
| ops = predict_net.Proto().op |
| assert len(ops) == 3 |
| assert ops[0].type == "ReduceFrontMean" |
| assert ops[1].type == "Sub" |
| assert ops[2].type == "FC" |
| assert len(ops[0].input) == 1 |
| assert ops[0].input[0] ==\ |
| self.model.input_feature_schema.float_features() |
| assert len(ops[1].output) == 1 |
| assert ops[1].output[0] in ops[2].input |
| |
| def testFunctionalLayerHelperAutoInference(self): |
| softsign = self.model.Softsign( |
| schema.Tuple(self.model.input_feature_schema.float_features), |
| 1) |
| assert len(softsign.field_types()) == 1 |
| assert softsign.field_types()[0].base == np.float32 |
| assert softsign.field_types()[0].shape == (32,) |
| self.model.FC(softsign[0], 2) |
| |
| predict_net = layer_model_instantiator.generate_predict_net( |
| self.model) |
| ops = predict_net.Proto().op |
| assert len(ops) == 2 |
| assert ops[0].type == "Softsign" |
| assert ops[1].type == "FC" |
| assert len(ops[0].input) == 1 |
| assert ops[0].input[0] ==\ |
| self.model.input_feature_schema.float_features() |
| assert len(ops[0].output) == 1 |
| assert ops[0].output[0] in ops[1].input |
| |
| def testFunctionalLayerHelperAutoInferenceScalar(self): |
| loss = self.model.AveragedLoss(self.model.input_feature_schema, 1) |
| self.assertEqual(1, len(loss.field_types())) |
| self.assertEqual(np.float32, loss.field_types()[0].base) |
| self.assertEqual(tuple(), loss.field_types()[0].shape) |
| |
| def testFunctionalLayerWithOutputNames(self): |
| k = 3 |
| topk = self.model.TopK( |
| self.model.input_feature_schema, |
| output_names_or_num=['values', 'indices'], |
| k=k, |
| ) |
| self.assertEqual(2, len(topk.field_types())) |
| self.assertEqual(np.float32, topk.field_types()[0].base) |
| self.assertEqual((k,), topk.field_types()[0].shape) |
| self.assertEqual(np.int32, topk.field_types()[1].base) |
| self.assertEqual((k,), topk.field_types()[1].shape) |
| self.assertEqual(['TopK/values', 'TopK/indices'], topk.field_blobs()) |