fixes to make data parallel model work for RecurrentNet + test case
Summary:
First, this diff includes a full test of data-parallel LSTM, which confirms it works correctly. To make it work, some changes had to be made:
- cell net/step net external inputs must be namespace scoped
- prevent double-namescoping of cellnet inputs
- make data parallel model understand recurrentnets so the device-mapping works
Reviewed By: salexspb
Differential Revision: D4708840
fbshipit-source-id: 4b0ddc43642d449076a2b6f67ad1c47f84138ff4
diff --git a/caffe2/python/core.py b/caffe2/python/core.py
index 6272851..c8d2651 100644
--- a/caffe2/python/core.py
+++ b/caffe2/python/core.py
@@ -1587,6 +1587,16 @@
for output in outputs:
self.Proto().external_output.extend([str(output)])
+ def AddScopedExternalInputs(self, *inputs):
+ return self.AddExternalInput(
+ * [ScopedBlobReference(str(b)) for b in inputs]
+ )
+
+ def AddScopedExternalOutputs(self, *outputs):
+ return self.AddExternalOutput(
+ * [ScopedBlobReference(str(b)) for b in outputs]
+ )
+
@property
def external_inputs(self):
return map(_get_blob_ref, self._net.external_input)
diff --git a/caffe2/python/data_parallel_model.py b/caffe2/python/data_parallel_model.py
index 484ac2a..342e543 100644
--- a/caffe2/python/data_parallel_model.py
+++ b/caffe2/python/data_parallel_model.py
@@ -578,9 +578,19 @@
Assign blob to device option based on the operator outputing it
'''
mapping = {}
- for op in model.Proto().op:
- for b in list(op.input) + list(op.output):
- mapping[b] = op.device_option
+
+ def map_ops(proto):
+ for op in proto.op:
+ for b in list(op.input) + list(op.output):
+ mapping[b] = op.device_option
+ if op.type.startswith('RecurrentNetwork'):
+ import google.protobuf.text_format as protobuftx
+ step_args = [a for a in op.arg if a.name.endswith("step_net")]
+ for step_arg in step_args:
+ step_proto = caffe2_pb2.NetDef()
+ protobuftx.Merge(step_arg.s, step_proto)
+ map_ops(step_proto)
+ map_ops(model.net.Proto())
model._blob_to_device = mapping
diff --git a/caffe2/python/data_parallel_model_test.py b/caffe2/python/data_parallel_model_test.py
index 14a2365..234ffc7 100644
--- a/caffe2/python/data_parallel_model_test.py
+++ b/caffe2/python/data_parallel_model_test.py
@@ -5,7 +5,7 @@
import numpy as np
import unittest
from caffe2.proto import caffe2_pb2
-from caffe2.python import core, workspace, data_parallel_model, cnn
+from caffe2.python import core, workspace, data_parallel_model, cnn, recurrent
from caffe2.python.test_util import TestCase
@@ -80,6 +80,7 @@
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
+ print(i, workspace.FetchBlob("gpu_0/fc_w").flatten()[:5])
workspace.RunNet(model.net.Proto().name)
return workspace.FetchBlob("gpu_0/fc_w")
@@ -105,141 +106,278 @@
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
+class RecurrentNetworkParallelTest(TestCase):
+
+ def run_model(self, gpu_devices):
+
+ '''
+ Helper function for test_equiv
+ '''
+ def input_builder_fun(model):
+ return None
+
+ def model_build_fun(model, loss_scale):
+ workspace.FeedBlob(
+ core.ScopedBlobReference("seq_lengths"),
+ np.array([self.T] * self.batch_per_device, dtype=np.int32)
+ )
+ model.param_init_net.ConstantFill(
+ [],
+ "hidden_init",
+ value=0.0,
+ shape=[1, self.batch_per_device, self.hidden_dim]
+ )
+ model.param_init_net.ConstantFill(
+ [],
+ "cell_init",
+ value=0.0,
+ shape=[1, self.batch_per_device, self.hidden_dim]
+ )
+
+ output, _last_hidden, _, _last_state, = recurrent.LSTM(
+ model=model,
+ input_blob="data",
+ seq_lengths="seq_lengths",
+ initial_states=("hidden_init", "cell_init"),
+ dim_in=self.input_dim,
+ dim_out=self.hidden_dim,
+ scope="partest",
+ )
+
+ # A silly loss function
+ loss = model.AveragedLoss(
+ model.Sub([output, "target"], "dist"),
+ "loss",
+ )
+ loss = model.Scale(loss, "loss_scaled", scale=loss_scale)
+ return [loss]
+
+ def param_update_fun(model):
+ ITER = model.Iter("ITER")
+ LR = model.net.LearningRate(
+ [ITER],
+ "LR",
+ base_lr=(-0.1),
+ policy="fixed",
+ )
+ ONE = model.param_init_net.ConstantFill(
+ [], "ONE", shape=[1], value=1.0,
+ )
+ for param in model.GetParams():
+ param_grad = model.param_to_grad[param]
+ model.WeightedSum([param, ONE, param_grad, LR], param)
+
+ assert len(model.GetParams()) == len(model.params) // len(model._devices)
+
+ workspace.ResetWorkspace()
+ model = cnn.CNNModelHelper(
+ name="recurrent_test{}".format(gpu_devices),
+ )
+
+ self.T = 8
+ self.batch_size = 64
+ self.input_dim = 8
+ self.hidden_dim = 31
+ self.batch_per_device = self.batch_size // len(gpu_devices)
+
+ data_parallel_model.Parallelize_GPU(
+ model,
+ input_builder_fun=input_builder_fun,
+ forward_pass_builder_fun=model_build_fun,
+ param_update_builder_fun=param_update_fun,
+ devices=gpu_devices,
+ )
+
+ # Change all initialization to be ConstantFills so that
+ # the everything is deterministic
+ for op in model.param_init_net.Proto().op:
+ if op.type.endswith('Fill'):
+ op.type = 'ConstantFill'
+
+ # Each run has same input, independent of number of gpus
+ np.random.seed(20150210)
+ for i in range(0, 10):
+ full_data = np.random.rand(self.T, self.batch_size, self.input_dim)
+ full_target = np.random.rand(
+ self.T, self.batch_size, self.hidden_dim
+ )
+
+ for (j, g) in enumerate(gpu_devices):
+ st = j * self.batch_per_device
+ en = st + self.batch_per_device
+ data = full_data[:, st:en, :].astype(np.float32)
+ targets = full_target[:, st:en, :].astype(np.float32)
+ with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
+ workspace.FeedBlob("gpu_{}/data".format(g), data)
+ workspace.FeedBlob("gpu_{}/target".format(g), targets)
+
+ if i == 0:
+ workspace.RunNetOnce(model.param_init_net)
+ workspace.CreateNet(model.net)
+
+ workspace.RunNet(model.net.Proto().name)
+
+ return workspace.FetchBlob("gpu_0/partest/i2h_w")
+
+ def test_equiv_recurrent(self):
+ '''
+ Test that the model produces exactly same results given
+ total batchsize, independent of number of GPUs.
+ '''
+ result_2gpus = self.run_model([0, 1])
+ result_1gpus = self.run_model([0])
+
+ print("result 1", result_1gpus.flatten()[:5])
+ print("result 2", result_2gpus.flatten()[:5])
+
+ self.assertTrue(np.allclose(result_1gpus, result_2gpus))
+
+ if workspace.NumCudaDevices() >= 4:
+ result_4gpus = self.run_model(range(4))
+ self.assertTrue(np.allclose(result_1gpus, result_4gpus))
+
+ if workspace.NumCudaDevices() >= 8:
+ result_8gpus = self.run_model(range(8))
+ self.assertTrue(np.allclose(result_1gpus, result_8gpus))
+
+
+@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
+@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTest(TestCase):
- def run_model(self, V, gpu_devices):
+ def run_model(self, V, gpu_devices):
- '''
+ '''
Helper function for test_equiv
'''
- def input_builder_fun(model):
- return None
+ def input_builder_fun(model):
+ return None
- def model_build_fun(model, loss_scale):
- with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
- gathered_cpu = model.net.Gather(
- [self.vecs, 'indices'], 'gathered_cpu')
- gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
- flattened = model.Flatten(gathered, "flattened")
- fc = model.FC(flattened, "fc", 16 * 16, 1,
- ("ConstantFill", {}), ("ConstantFill", {}))
- fc_fl = model.FlattenToVec(fc, "fc_fl")
- sigm = model.Sigmoid(fc_fl, "sigm")
- sq = model.SquaredL2Distance([sigm, "label"], "sq")
- loss = model.AveragedLoss(sq, "loss")
- loss = model.Scale(loss, scale=loss_scale)
- return [loss]
+ def model_build_fun(model, loss_scale):
+ with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
+ gathered_cpu = model.net.Gather(
+ [self.vecs, 'indices'], 'gathered_cpu')
+ gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
+ flattened = model.Flatten(gathered, "flattened")
+ fc = model.FC(flattened, "fc", 16 * 16, 1,
+ ("ConstantFill", {}), ("ConstantFill", {}))
+ fc_fl = model.FlattenToVec(fc, "fc_fl")
+ sigm = model.Sigmoid(fc_fl, "sigm")
+ sq = model.SquaredL2Distance([sigm, "label"], "sq")
+ loss = model.AveragedLoss(sq, "loss")
+ loss = model.Scale(loss, scale=loss_scale)
+ return [loss]
- def param_update_fun(model):
+ def param_update_fun(model):
- ONE = model.param_init_net.ConstantFill(
- [], "ONE", shape=[1], value=1.0,
+ ONE = model.param_init_net.ConstantFill(
+ [], "ONE", shape=[1], value=1.0,
+ )
+ LR = model.CopyCPUToGPU(self.LR, "LR")
+ for param in model.GetParams():
+ param_grad = model.param_to_grad[param]
+ assert not isinstance(param_grad, core.GradientSlice)
+ model.WeightedSum([param, ONE, param_grad, LR], param)
+
+ workspace.ResetWorkspace()
+ model = cnn.CNNModelHelper(
+ order="NHWC",
+ name="sparse_test{}".format(gpu_devices),
+ )
+
+ with core.NameScope("cpu"):
+ with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
+ self.ITER = model.Iter("ITER")
+ self.LR = model.net.LearningRate(
+ [self.ITER],
+ "LR",
+ base_lr=(-0.1),
+ policy="fixed",
)
- LR = model.CopyCPUToGPU(self.LR, "LR")
+ self.vecs = model.param_init_net.UniformFill(
+ [], "vecs", shape=[V, 16])
+ model.params.append(self.vecs)
+
+ data_parallel_model.Parallelize_GPU(
+ model,
+ input_builder_fun=input_builder_fun,
+ forward_pass_builder_fun=model_build_fun,
+ param_update_builder_fun=param_update_fun,
+ devices=gpu_devices,
+ )
+
+ # Update the vecs
+ ONE_CPU = model.param_init_net.ConstantFill(
+ [], "ONE_CPU", shape=[1], value=1.0,
+ )
+
+ with core.NameScope("cpu"):
+ with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
for param in model.GetParams():
param_grad = model.param_to_grad[param]
- assert not isinstance(param_grad, core.GradientSlice)
- model.WeightedSum([param, ONE, param_grad, LR], param)
+ model.ScatterWeightedSum([param, ONE_CPU,
+ param_grad.indices,
+ param_grad.values,
+ self.LR],
+ self.vecs)
- workspace.ResetWorkspace()
- model = cnn.CNNModelHelper(
- order="NHWC",
- name="sparse_test{}".format(gpu_devices),
- )
+ np.random.seed(2603)
- with core.NameScope("cpu"):
+ # Each run has same input, independent of number of gpus
+ batch_size = 64
+ for i in range(0, 10):
+ full_indices = (np.random.rand(batch_size, 16) * V).astype(np.int32)
+ full_labels = full_indices[:, 0] % 2
+ batch_per_device = batch_size // len(gpu_devices)
+
+ for (j, g) in enumerate(gpu_devices):
+ st = j * batch_per_device
+ en = st + batch_per_device
+ indices = full_indices[st:en, :].astype(np.int32)
+ labels = full_labels[st:en].astype(np.float32)
+
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
- self.ITER = model.Iter("ITER")
- self.LR = model.net.LearningRate(
- [self.ITER],
- "LR",
- base_lr=(-0.1),
- policy="fixed",
- )
- self.vecs = model.param_init_net.UniformFill(
- [], "vecs", shape=[V, 16])
- model.params.append(self.vecs)
+ workspace.FeedBlob("gpu_{}/indices".format(g), indices)
- data_parallel_model.Parallelize_GPU(
- model,
- input_builder_fun=input_builder_fun,
- forward_pass_builder_fun=model_build_fun,
- param_update_builder_fun=param_update_fun,
- devices=gpu_devices,
- )
+ with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
+ workspace.FeedBlob("gpu_{}/label".format(g), labels)
- # Update the vecs
- ONE_CPU = model.param_init_net.ConstantFill(
- [], "ONE_CPU", shape=[1], value=1.0,
- )
+ if i == 0:
+ workspace.RunNetOnce(model.param_init_net)
+ # Force vecs to be same on all runs
+ orig_vecs = np.random.rand(V, 16).astype(np.float32)
+ workspace.FeedBlob(
+ self.vecs,
+ orig_vecs
+ )
+ workspace.CreateNet(model.net)
- with core.NameScope("cpu"):
- with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
- for param in model.GetParams():
- param_grad = model.param_to_grad[param]
- model.ScatterWeightedSum([param, ONE_CPU,
- param_grad.indices,
- param_grad.values,
- self.LR],
- self.vecs)
+ workspace.RunNet(model.net.Proto().name)
- np.random.seed(2603)
+ # Sanity check to see the vecs were updated
+ self.assertFalse(
+ np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
+ return [workspace.FetchBlob(self.vecs),
+ workspace.FetchBlob("gpu_0/fc_w")]
- # Each run has same input, independent of number of gpus
- batch_size = 64
- for i in range(0, 10):
- full_indices = (np.random.rand(batch_size, 16) * V).astype(np.int32)
- full_labels = full_indices[:, 0] % 2
- batch_per_device = batch_size // len(gpu_devices)
-
- for (j, g) in enumerate(gpu_devices):
- st = j * batch_per_device
- en = st + batch_per_device
- indices = full_indices[st:en, :].astype(np.int32)
- labels = full_labels[st:en].astype(np.float32)
-
- with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
- workspace.FeedBlob("gpu_{}/indices".format(g), indices)
-
- with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
- workspace.FeedBlob("gpu_{}/label".format(g), labels)
-
- if i == 0:
- workspace.RunNetOnce(model.param_init_net)
- # Force vecs to be same on all runs
- orig_vecs = np.random.rand(V, 16).astype(np.float32)
- workspace.FeedBlob(
- self.vecs,
- orig_vecs
- )
- workspace.CreateNet(model.net)
-
- workspace.RunNet(model.net.Proto().name)
-
- # Sanity check to see the vecs were updated
- self.assertFalse(
- np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
- return [workspace.FetchBlob(self.vecs),
- workspace.FetchBlob("gpu_0/fc_w")]
-
- def test_equiv_sparse(self):
- '''
+ def test_equiv_sparse(self):
+ '''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
- V = 10000
- result_2gpus = self.run_model(V, [0, 1])
- result_1gpus = self.run_model(V, [0])
+ V = 10000
+ result_2gpus = self.run_model(V, [0, 1])
+ result_1gpus = self.run_model(V, [0])
- self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
- self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
+ self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
+ self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
- if workspace.NumCudaDevices() >= 4:
- result_4gpus = self.run_model(V, range(4))
- self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
- self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
+ if workspace.NumCudaDevices() >= 4:
+ result_4gpus = self.run_model(V, range(4))
+ self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
+ self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
- if workspace.NumCudaDevices() >= 8:
- result_8gpus = self.run_model(V, range(8))
- self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
- self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
+ if workspace.NumCudaDevices() >= 8:
+ result_8gpus = self.run_model(V, range(8))
+ self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
+ self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
diff --git a/caffe2/python/model_helper.py b/caffe2/python/model_helper.py
index 28e1340..ee1f91e 100644
--- a/caffe2/python/model_helper.py
+++ b/caffe2/python/model_helper.py
@@ -145,7 +145,8 @@
if namescope == '':
return self.params[:]
else:
- return [p for p in self.params if p.GetNameScope() == namescope]
+ return [p for p in self.params if
+ p.GetNameScope().startswith(namescope)]
def Proto(self):
return self.net.Proto()
diff --git a/caffe2/python/recurrent.py b/caffe2/python/recurrent.py
index 9b4098a..7737436 100644
--- a/caffe2/python/recurrent.py
+++ b/caffe2/python/recurrent.py
@@ -4,6 +4,7 @@
from __future__ import unicode_literals
from caffe2.python import core
+from caffe2.python.scope import CurrentNameScope
from caffe2.python.cnn import CNNModelHelper
from caffe2.python.attention import (
apply_regular_attention,
@@ -45,6 +46,14 @@
'''
assert len(inputs) == 1, "Only one input blob is supported so far"
+ # Validate scoping
+ for einp in cell_net.Proto().external_input:
+ assert einp.startswith(CurrentNameScope()), \
+ '''
+ Cell net external inputs are not properly scoped, use
+ AddScopedExternalInputs() when creating them
+ '''
+
input_blobs = [str(i[0]) for i in inputs]
initial_input_blobs = [str(x[1]) for x in initial_cell_inputs]
op_name = net.NextName('recurrent')
@@ -62,7 +71,7 @@
if timestep is not None:
known_inputs.append(str(timestep))
references = [
- b for b in cell_net.Proto().external_input
+ core.BlobReference(b) for b in cell_net.Proto().external_input
if b not in known_inputs]
inner_outputs = list(cell_net.Proto().external_output)
@@ -223,7 +232,7 @@
""" the step net """
step_model = CNNModelHelper(name='lstm_cell', param_model=model)
input_t, timestep, cell_t_prev, hidden_t_prev = (
- step_model.net.AddExternalInputs(
+ step_model.net.AddScopedExternalInputs(
'input_t', 'timestep', 'cell_t_prev', 'hidden_t_prev'))
gates_t = step_model.FC(
hidden_t_prev, s('gates_t'), dim_in=dim_out,
@@ -358,20 +367,21 @@
timestep,
cell_t_prev,
hidden_t_prev,
- _,
- _,
attention_weighted_encoder_context_t_prev,
) = (
- step_model.net.AddExternalInputs(
+ step_model.net.AddScopedExternalInputs(
'input_t',
'timestep',
'cell_t_prev',
'hidden_t_prev',
- encoder_outputs_transposed,
- weighted_encoder_outputs,
'attention_weighted_encoder_context_t_prev',
)
)
+ step_model.net.AddExternalInputs(
+ encoder_outputs_transposed,
+ weighted_encoder_outputs
+ )
+
gates_concatenated_input_t, _ = step_model.net.Concat(
[hidden_t_prev, attention_weighted_encoder_context_t_prev],
[
@@ -488,7 +498,7 @@
""" the step net """
step_model = CNNModelHelper(name='milstm_cell', param_model=model)
input_t, timestep, cell_t_prev, hidden_t_prev = (
- step_model.net.AddExternalInputs(
+ step_model.net.AddScopedExternalInputs(
'input_t', 'timestep', 'cell_t_prev', 'hidden_t_prev'))
# hU^T
# Shape: [1, batch_size, 4 * hidden_size]