fixes to make data parallel model work for RecurrentNet + test case

Summary:
First, this diff includes a full test of data-parallel LSTM, which confirms it works correctly. To make it work, some changes had to be made:
 - cell net/step net external inputs must be namespace scoped
 - prevent double-namescoping of cellnet inputs
 - make data parallel model understand recurrentnets so the device-mapping works

Reviewed By: salexspb

Differential Revision: D4708840

fbshipit-source-id: 4b0ddc43642d449076a2b6f67ad1c47f84138ff4
diff --git a/caffe2/python/core.py b/caffe2/python/core.py
index 6272851..c8d2651 100644
--- a/caffe2/python/core.py
+++ b/caffe2/python/core.py
@@ -1587,6 +1587,16 @@
         for output in outputs:
             self.Proto().external_output.extend([str(output)])
 
+    def AddScopedExternalInputs(self, *inputs):
+        return self.AddExternalInput(
+            * [ScopedBlobReference(str(b)) for b in inputs]
+        )
+
+    def AddScopedExternalOutputs(self, *outputs):
+        return self.AddExternalOutput(
+            * [ScopedBlobReference(str(b)) for b in outputs]
+        )
+
     @property
     def external_inputs(self):
         return map(_get_blob_ref, self._net.external_input)
diff --git a/caffe2/python/data_parallel_model.py b/caffe2/python/data_parallel_model.py
index 484ac2a..342e543 100644
--- a/caffe2/python/data_parallel_model.py
+++ b/caffe2/python/data_parallel_model.py
@@ -578,9 +578,19 @@
     Assign blob to device option based on the operator outputing it
     '''
     mapping = {}
-    for op in model.Proto().op:
-        for b in list(op.input) + list(op.output):
-            mapping[b] = op.device_option
+
+    def map_ops(proto):
+        for op in proto.op:
+            for b in list(op.input) + list(op.output):
+                mapping[b] = op.device_option
+            if op.type.startswith('RecurrentNetwork'):
+                import google.protobuf.text_format as protobuftx
+                step_args = [a for a in op.arg if a.name.endswith("step_net")]
+                for step_arg in step_args:
+                    step_proto = caffe2_pb2.NetDef()
+                    protobuftx.Merge(step_arg.s, step_proto)
+                    map_ops(step_proto)
+    map_ops(model.net.Proto())
 
     model._blob_to_device = mapping
 
diff --git a/caffe2/python/data_parallel_model_test.py b/caffe2/python/data_parallel_model_test.py
index 14a2365..234ffc7 100644
--- a/caffe2/python/data_parallel_model_test.py
+++ b/caffe2/python/data_parallel_model_test.py
@@ -5,7 +5,7 @@
 import numpy as np
 import unittest
 from caffe2.proto import caffe2_pb2
-from caffe2.python import core, workspace, data_parallel_model, cnn
+from caffe2.python import core, workspace, data_parallel_model, cnn, recurrent
 from caffe2.python.test_util import TestCase
 
 
@@ -80,6 +80,7 @@
                 workspace.RunNetOnce(model.param_init_net)
                 workspace.CreateNet(model.net)
 
+            print(i, workspace.FetchBlob("gpu_0/fc_w").flatten()[:5])
             workspace.RunNet(model.net.Proto().name)
 
         return workspace.FetchBlob("gpu_0/fc_w")
@@ -105,141 +106,278 @@
 
 @unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
 @unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
+class RecurrentNetworkParallelTest(TestCase):
+
+    def run_model(self, gpu_devices):
+
+        '''
+        Helper function for test_equiv
+        '''
+        def input_builder_fun(model):
+            return None
+
+        def model_build_fun(model, loss_scale):
+            workspace.FeedBlob(
+                core.ScopedBlobReference("seq_lengths"),
+                np.array([self.T] * self.batch_per_device, dtype=np.int32)
+            )
+            model.param_init_net.ConstantFill(
+                [],
+                "hidden_init",
+                value=0.0,
+                shape=[1, self.batch_per_device, self.hidden_dim]
+            )
+            model.param_init_net.ConstantFill(
+                [],
+                "cell_init",
+                value=0.0,
+                shape=[1, self.batch_per_device, self.hidden_dim]
+            )
+
+            output, _last_hidden, _, _last_state, = recurrent.LSTM(
+                model=model,
+                input_blob="data",
+                seq_lengths="seq_lengths",
+                initial_states=("hidden_init", "cell_init"),
+                dim_in=self.input_dim,
+                dim_out=self.hidden_dim,
+                scope="partest",
+            )
+
+            # A silly loss function
+            loss = model.AveragedLoss(
+                model.Sub([output, "target"], "dist"),
+                "loss",
+            )
+            loss = model.Scale(loss, "loss_scaled", scale=loss_scale)
+            return [loss]
+
+        def param_update_fun(model):
+            ITER = model.Iter("ITER")
+            LR = model.net.LearningRate(
+                [ITER],
+                "LR",
+                base_lr=(-0.1),
+                policy="fixed",
+            )
+            ONE = model.param_init_net.ConstantFill(
+                [], "ONE", shape=[1], value=1.0,
+            )
+            for param in model.GetParams():
+                param_grad = model.param_to_grad[param]
+                model.WeightedSum([param, ONE, param_grad, LR], param)
+
+            assert len(model.GetParams()) == len(model.params) // len(model._devices)
+
+        workspace.ResetWorkspace()
+        model = cnn.CNNModelHelper(
+            name="recurrent_test{}".format(gpu_devices),
+        )
+
+        self.T = 8
+        self.batch_size = 64
+        self.input_dim = 8
+        self.hidden_dim = 31
+        self.batch_per_device = self.batch_size // len(gpu_devices)
+
+        data_parallel_model.Parallelize_GPU(
+            model,
+            input_builder_fun=input_builder_fun,
+            forward_pass_builder_fun=model_build_fun,
+            param_update_builder_fun=param_update_fun,
+            devices=gpu_devices,
+        )
+
+        # Change all initialization to be ConstantFills so that
+        # the everything is deterministic
+        for op in model.param_init_net.Proto().op:
+            if op.type.endswith('Fill'):
+                op.type = 'ConstantFill'
+
+        # Each run has same input, independent of number of gpus
+        np.random.seed(20150210)
+        for i in range(0, 10):
+            full_data = np.random.rand(self.T, self.batch_size, self.input_dim)
+            full_target = np.random.rand(
+                self.T, self.batch_size, self.hidden_dim
+            )
+
+            for (j, g) in enumerate(gpu_devices):
+                st = j * self.batch_per_device
+                en = st + self.batch_per_device
+                data = full_data[:, st:en, :].astype(np.float32)
+                targets = full_target[:, st:en, :].astype(np.float32)
+                with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
+                    workspace.FeedBlob("gpu_{}/data".format(g), data)
+                    workspace.FeedBlob("gpu_{}/target".format(g), targets)
+
+            if i == 0:
+                workspace.RunNetOnce(model.param_init_net)
+                workspace.CreateNet(model.net)
+
+            workspace.RunNet(model.net.Proto().name)
+
+        return workspace.FetchBlob("gpu_0/partest/i2h_w")
+
+    def test_equiv_recurrent(self):
+        '''
+        Test that the model produces exactly same results given
+        total batchsize, independent of number of GPUs.
+        '''
+        result_2gpus = self.run_model([0, 1])
+        result_1gpus = self.run_model([0])
+
+        print("result 1", result_1gpus.flatten()[:5])
+        print("result 2", result_2gpus.flatten()[:5])
+
+        self.assertTrue(np.allclose(result_1gpus, result_2gpus))
+
+        if workspace.NumCudaDevices() >= 4:
+            result_4gpus = self.run_model(range(4))
+            self.assertTrue(np.allclose(result_1gpus, result_4gpus))
+
+        if workspace.NumCudaDevices() >= 8:
+            result_8gpus = self.run_model(range(8))
+            self.assertTrue(np.allclose(result_1gpus, result_8gpus))
+
+
+@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
+@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
 class SparseDataParallelModelTest(TestCase):
 
-        def run_model(self, V, gpu_devices):
+    def run_model(self, V, gpu_devices):
 
-            '''
+        '''
             Helper function for test_equiv
             '''
-            def input_builder_fun(model):
-                return None
+        def input_builder_fun(model):
+            return None
 
-            def model_build_fun(model, loss_scale):
-                with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
-                    gathered_cpu = model.net.Gather(
-                        [self.vecs, 'indices'], 'gathered_cpu')
-                gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
-                flattened = model.Flatten(gathered, "flattened")
-                fc = model.FC(flattened, "fc", 16 * 16, 1,
-                              ("ConstantFill", {}), ("ConstantFill", {}))
-                fc_fl = model.FlattenToVec(fc, "fc_fl")
-                sigm = model.Sigmoid(fc_fl, "sigm")
-                sq = model.SquaredL2Distance([sigm, "label"], "sq")
-                loss = model.AveragedLoss(sq, "loss")
-                loss = model.Scale(loss, scale=loss_scale)
-                return [loss]
+        def model_build_fun(model, loss_scale):
+            with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
+                gathered_cpu = model.net.Gather(
+                    [self.vecs, 'indices'], 'gathered_cpu')
+            gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
+            flattened = model.Flatten(gathered, "flattened")
+            fc = model.FC(flattened, "fc", 16 * 16, 1,
+                          ("ConstantFill", {}), ("ConstantFill", {}))
+            fc_fl = model.FlattenToVec(fc, "fc_fl")
+            sigm = model.Sigmoid(fc_fl, "sigm")
+            sq = model.SquaredL2Distance([sigm, "label"], "sq")
+            loss = model.AveragedLoss(sq, "loss")
+            loss = model.Scale(loss, scale=loss_scale)
+            return [loss]
 
-            def param_update_fun(model):
+        def param_update_fun(model):
 
-                ONE = model.param_init_net.ConstantFill(
-                    [], "ONE", shape=[1], value=1.0,
+            ONE = model.param_init_net.ConstantFill(
+                [], "ONE", shape=[1], value=1.0,
+            )
+            LR = model.CopyCPUToGPU(self.LR, "LR")
+            for param in model.GetParams():
+                param_grad = model.param_to_grad[param]
+                assert not isinstance(param_grad, core.GradientSlice)
+                model.WeightedSum([param, ONE, param_grad, LR], param)
+
+        workspace.ResetWorkspace()
+        model = cnn.CNNModelHelper(
+            order="NHWC",
+            name="sparse_test{}".format(gpu_devices),
+        )
+
+        with core.NameScope("cpu"):
+            with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
+                self.ITER = model.Iter("ITER")
+                self.LR = model.net.LearningRate(
+                    [self.ITER],
+                    "LR",
+                    base_lr=(-0.1),
+                    policy="fixed",
                 )
-                LR = model.CopyCPUToGPU(self.LR, "LR")
+                self.vecs = model.param_init_net.UniformFill(
+                    [], "vecs", shape=[V, 16])
+                model.params.append(self.vecs)
+
+        data_parallel_model.Parallelize_GPU(
+            model,
+            input_builder_fun=input_builder_fun,
+            forward_pass_builder_fun=model_build_fun,
+            param_update_builder_fun=param_update_fun,
+            devices=gpu_devices,
+        )
+
+        # Update the vecs
+        ONE_CPU = model.param_init_net.ConstantFill(
+            [], "ONE_CPU", shape=[1], value=1.0,
+        )
+
+        with core.NameScope("cpu"):
+            with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
                 for param in model.GetParams():
                     param_grad = model.param_to_grad[param]
-                    assert not isinstance(param_grad, core.GradientSlice)
-                    model.WeightedSum([param, ONE, param_grad, LR], param)
+                    model.ScatterWeightedSum([param, ONE_CPU,
+                                              param_grad.indices,
+                                              param_grad.values,
+                                              self.LR],
+                                              self.vecs)
 
-            workspace.ResetWorkspace()
-            model = cnn.CNNModelHelper(
-                order="NHWC",
-                name="sparse_test{}".format(gpu_devices),
-            )
+        np.random.seed(2603)
 
-            with core.NameScope("cpu"):
+        # Each run has same input, independent of number of gpus
+        batch_size = 64
+        for i in range(0, 10):
+            full_indices = (np.random.rand(batch_size, 16) * V).astype(np.int32)
+            full_labels = full_indices[:, 0] % 2
+            batch_per_device = batch_size // len(gpu_devices)
+
+            for (j, g) in enumerate(gpu_devices):
+                st = j * batch_per_device
+                en = st + batch_per_device
+                indices = full_indices[st:en, :].astype(np.int32)
+                labels = full_labels[st:en].astype(np.float32)
+
                 with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
-                    self.ITER = model.Iter("ITER")
-                    self.LR = model.net.LearningRate(
-                        [self.ITER],
-                        "LR",
-                        base_lr=(-0.1),
-                        policy="fixed",
-                    )
-                    self.vecs = model.param_init_net.UniformFill(
-                        [], "vecs", shape=[V, 16])
-                    model.params.append(self.vecs)
+                    workspace.FeedBlob("gpu_{}/indices".format(g), indices)
 
-            data_parallel_model.Parallelize_GPU(
-                model,
-                input_builder_fun=input_builder_fun,
-                forward_pass_builder_fun=model_build_fun,
-                param_update_builder_fun=param_update_fun,
-                devices=gpu_devices,
-            )
+                with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
+                    workspace.FeedBlob("gpu_{}/label".format(g), labels)
 
-            # Update the vecs
-            ONE_CPU = model.param_init_net.ConstantFill(
-                [], "ONE_CPU", shape=[1], value=1.0,
-            )
+            if i == 0:
+                workspace.RunNetOnce(model.param_init_net)
+                # Force vecs to be same on all runs
+                orig_vecs = np.random.rand(V, 16).astype(np.float32)
+                workspace.FeedBlob(
+                    self.vecs,
+                    orig_vecs
+                )
+                workspace.CreateNet(model.net)
 
-            with core.NameScope("cpu"):
-                with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
-                    for param in model.GetParams():
-                        param_grad = model.param_to_grad[param]
-                        model.ScatterWeightedSum([param, ONE_CPU,
-                                                  param_grad.indices,
-                                                  param_grad.values,
-                                                  self.LR],
-                                                  self.vecs)
+            workspace.RunNet(model.net.Proto().name)
 
-            np.random.seed(2603)
+        # Sanity check to see the vecs were updated
+        self.assertFalse(
+            np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
+        return [workspace.FetchBlob(self.vecs),
+                workspace.FetchBlob("gpu_0/fc_w")]
 
-            # Each run has same input, independent of number of gpus
-            batch_size = 64
-            for i in range(0, 10):
-                full_indices = (np.random.rand(batch_size, 16) * V).astype(np.int32)
-                full_labels = full_indices[:, 0] % 2
-                batch_per_device = batch_size // len(gpu_devices)
-
-                for (j, g) in enumerate(gpu_devices):
-                    st = j * batch_per_device
-                    en = st + batch_per_device
-                    indices = full_indices[st:en, :].astype(np.int32)
-                    labels = full_labels[st:en].astype(np.float32)
-
-                    with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
-                        workspace.FeedBlob("gpu_{}/indices".format(g), indices)
-
-                    with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
-                        workspace.FeedBlob("gpu_{}/label".format(g), labels)
-
-                if i == 0:
-                    workspace.RunNetOnce(model.param_init_net)
-                    # Force vecs to be same on all runs
-                    orig_vecs = np.random.rand(V, 16).astype(np.float32)
-                    workspace.FeedBlob(
-                        self.vecs,
-                        orig_vecs
-                    )
-                    workspace.CreateNet(model.net)
-
-                workspace.RunNet(model.net.Proto().name)
-
-            # Sanity check to see the vecs were updated
-            self.assertFalse(
-                np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
-            return [workspace.FetchBlob(self.vecs),
-                    workspace.FetchBlob("gpu_0/fc_w")]
-
-        def test_equiv_sparse(self):
-            '''
+    def test_equiv_sparse(self):
+        '''
             Test that the model produces exactly same results given
             total batchsize, independent of number of GPUs.
             '''
-            V = 10000
-            result_2gpus = self.run_model(V, [0, 1])
-            result_1gpus = self.run_model(V, [0])
+        V = 10000
+        result_2gpus = self.run_model(V, [0, 1])
+        result_1gpus = self.run_model(V, [0])
 
-            self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
-            self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
+        self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
+        self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
 
-            if workspace.NumCudaDevices() >= 4:
-                result_4gpus = self.run_model(V, range(4))
-                self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
-                self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
+        if workspace.NumCudaDevices() >= 4:
+            result_4gpus = self.run_model(V, range(4))
+            self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
+            self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
 
-            if workspace.NumCudaDevices() >= 8:
-                result_8gpus = self.run_model(V, range(8))
-                self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
-                self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
+        if workspace.NumCudaDevices() >= 8:
+            result_8gpus = self.run_model(V, range(8))
+            self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
+            self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
diff --git a/caffe2/python/model_helper.py b/caffe2/python/model_helper.py
index 28e1340..ee1f91e 100644
--- a/caffe2/python/model_helper.py
+++ b/caffe2/python/model_helper.py
@@ -145,7 +145,8 @@
         if namescope == '':
             return self.params[:]
         else:
-            return [p for p in self.params if p.GetNameScope() == namescope]
+            return [p for p in self.params if
+                    p.GetNameScope().startswith(namescope)]
 
     def Proto(self):
         return self.net.Proto()
diff --git a/caffe2/python/recurrent.py b/caffe2/python/recurrent.py
index 9b4098a..7737436 100644
--- a/caffe2/python/recurrent.py
+++ b/caffe2/python/recurrent.py
@@ -4,6 +4,7 @@
 from __future__ import unicode_literals
 
 from caffe2.python import core
+from caffe2.python.scope import CurrentNameScope
 from caffe2.python.cnn import CNNModelHelper
 from caffe2.python.attention import (
     apply_regular_attention,
@@ -45,6 +46,14 @@
     '''
     assert len(inputs) == 1, "Only one input blob is supported so far"
 
+    # Validate scoping
+    for einp in cell_net.Proto().external_input:
+        assert einp.startswith(CurrentNameScope()), \
+            '''
+            Cell net external inputs are not properly scoped, use
+            AddScopedExternalInputs() when creating them
+            '''
+
     input_blobs = [str(i[0]) for i in inputs]
     initial_input_blobs = [str(x[1]) for x in initial_cell_inputs]
     op_name = net.NextName('recurrent')
@@ -62,7 +71,7 @@
     if timestep is not None:
         known_inputs.append(str(timestep))
     references = [
-        b for b in cell_net.Proto().external_input
+        core.BlobReference(b) for b in cell_net.Proto().external_input
         if b not in known_inputs]
 
     inner_outputs = list(cell_net.Proto().external_output)
@@ -223,7 +232,7 @@
     """ the step net """
     step_model = CNNModelHelper(name='lstm_cell', param_model=model)
     input_t, timestep, cell_t_prev, hidden_t_prev = (
-        step_model.net.AddExternalInputs(
+        step_model.net.AddScopedExternalInputs(
             'input_t', 'timestep', 'cell_t_prev', 'hidden_t_prev'))
     gates_t = step_model.FC(
         hidden_t_prev, s('gates_t'), dim_in=dim_out,
@@ -358,20 +367,21 @@
         timestep,
         cell_t_prev,
         hidden_t_prev,
-        _,
-        _,
         attention_weighted_encoder_context_t_prev,
     ) = (
-        step_model.net.AddExternalInputs(
+        step_model.net.AddScopedExternalInputs(
             'input_t',
             'timestep',
             'cell_t_prev',
             'hidden_t_prev',
-            encoder_outputs_transposed,
-            weighted_encoder_outputs,
             'attention_weighted_encoder_context_t_prev',
         )
     )
+    step_model.net.AddExternalInputs(
+        encoder_outputs_transposed,
+        weighted_encoder_outputs
+    )
+
     gates_concatenated_input_t, _ = step_model.net.Concat(
         [hidden_t_prev, attention_weighted_encoder_context_t_prev],
         [
@@ -488,7 +498,7 @@
     """ the step net """
     step_model = CNNModelHelper(name='milstm_cell', param_model=model)
     input_t, timestep, cell_t_prev, hidden_t_prev = (
-        step_model.net.AddExternalInputs(
+        step_model.net.AddScopedExternalInputs(
             'input_t', 'timestep', 'cell_t_prev', 'hidden_t_prev'))
     # hU^T
     # Shape: [1, batch_size, 4 * hidden_size]