Back out "Back out "[Caffe2] Fix device_option propagation"" (#25908)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/25908
Original commit changeset: f6e961e88c01
device_option propagation is completely broken in Caffe2 for cases when pass through operators are used. As an example Gather operator don't have gradient and passes through it's inputs, which results in incorrect detection of the components for sparse parameter aggregation (component will be empty instead of the real device).
This diff is trying to fix this issue.
Original diff had a problem, that Caffe2 is not handling cases when device option is present, but contains only metadata (for example one for auto-generated reduction ops in backward pass). This diff is addressing this issue by merging device options during the backward pass
Test Plan:
1. net_transform is finally working with Gather + FloatToHalf transformed model instead of failing because of incorrect number of components.
2. New unit-test.
3. Verify that previously broken benchmark is now passing
ezyang do you have suggestions what else I should test?
Reviewed By: ezyang
Differential Revision: D17281528
fbshipit-source-id: 4a1bc386f29f6a34fbf8008effde9d4890abebfa
diff --git a/caffe2/core/net_dag_utils.cc b/caffe2/core/net_dag_utils.cc
index 8786ac6..498a86f 100644
--- a/caffe2/core/net_dag_utils.cc
+++ b/caffe2/core/net_dag_utils.cc
@@ -381,9 +381,13 @@
const OperatorDef& op_def = net_def->op(idx);
VLOG(1) << "Creating operator #" << idx << ": " << op_def.name() << ": "
<< op_def.type();
- if (!op_def.has_device_option() && net_def_has_device_option) {
+ if (net_def_has_device_option) {
OperatorDef temp_def(op_def);
- temp_def.mutable_device_option()->CopyFrom(net_def->device_option());
+
+ DeviceOption temp_dev(net_def->device_option());
+ temp_dev.MergeFrom(op_def.device_option());
+
+ temp_def.mutable_device_option()->CopyFrom(temp_dev);
operator_nodes[idx].operator_ = CreateOperator(temp_def, ws, idx);
} else {
auto op = CreateOperator(op_def, ws, idx);
diff --git a/caffe2/core/net_simple.cc b/caffe2/core/net_simple.cc
index 759c2fc..38f03b3 100644
--- a/caffe2/core/net_simple.cc
+++ b/caffe2/core/net_simple.cc
@@ -31,12 +31,16 @@
VLOG(1) << "Creating operator " << operator_def.name() << ": "
<< operator_def.type();
std::unique_ptr<OperatorBase> op{nullptr};
- if (!operator_def.has_device_option() && net_def_has_device_option) {
- // In the case that the operator def does not specify a device option but
- // the net def has a default option, we copy the device option over to the
- // operator def.
+ if (net_def_has_device_option) {
+ // In the case when net def specifies device option, final device option
+ // will be equal to merge of operator and net def device options, with
+ // preference to settings from the operator.
OperatorDef temp_def(operator_def);
- temp_def.mutable_device_option()->CopyFrom(net_def->device_option());
+
+ DeviceOption temp_dev(net_def->device_option());
+ temp_dev.MergeFrom(operator_def.device_option());
+
+ temp_def.mutable_device_option()->CopyFrom(temp_dev);
op = CreateOperator(temp_def, ws, idx);
} else {
op = CreateOperator(operator_def, ws, idx);
diff --git a/caffe2/operators/rnn/recurrent_network_executor.h b/caffe2/operators/rnn/recurrent_network_executor.h
index 2022cc8..33fb457 100644
--- a/caffe2/operators/rnn/recurrent_network_executor.h
+++ b/caffe2/operators/rnn/recurrent_network_executor.h
@@ -39,13 +39,14 @@
timestep_blob_(timestep_blob) {
const bool net_def_has_device_option = step_net_def_.has_device_option();
for (int i = 0; i < step_net_def_.op_size(); i++) {
- if (!step_net_def_.op(i).has_device_option() &&
- net_def_has_device_option) {
- // In the case that the operator def does not specify a device option
- // but the net def has a default option, we copy the device option over
- // to the operator def.
- step_net_def_.mutable_op(i)->mutable_device_option()->CopyFrom(
- step_net_def_.device_option());
+ if (net_def_has_device_option) {
+ // In the case when net def specifies device option, final device option
+ // will be equal to merge of operator and net def device options, with
+ // preference to settings from the operator.
+ DeviceOption option;
+ option.CopyFrom(step_net_def_.device_option());
+ option.MergeFrom(step_net_def_.op(i).device_option());
+ step_net_def_.mutable_op(i)->mutable_device_option()->CopyFrom(option);
}
op_deps_.push_back(op_deps(i));
}
diff --git a/caffe2/python/core.py b/caffe2/python/core.py
index 6589c9a..ef21246 100644
--- a/caffe2/python/core.py
+++ b/caffe2/python/core.py
@@ -451,11 +451,12 @@
OpSSA = namedtuple('OpSSA', ['op', 'in_versions', 'out_versions'])
-GradGenMeta = namedtuple('GradGenMeta', ['grad_op', 'idx', 'gradient'])
+GradGenMeta = namedtuple('GradGenMeta',
+ ['grad_op', 'idx', 'gradient', 'device_option'])
SparseGradGenMeta = namedtuple('SparseGradGenMeta', [
'grad_op_indices', 'idx_indices',
'grad_op_values', 'idx_values',
- 'gradient',
+ 'gradient', 'device_option',
])
@@ -607,9 +608,13 @@
else:
# both indices and values are generated
assert(len(generators) == 2)
- op1_i, idx1_i, op1_v, idx1_v, g1 = generators[0]
- op2_i, idx2_i, op2_v, idx2_v, g2 = generators[1]
+ op1_i, idx1_i, op1_v, idx1_v, g1, dev_1 = generators[0]
+ op2_i, idx2_i, op2_v, idx2_v, g2, dev_2 = generators[1]
assert(g1 == g2)
+ assert dev_1 == dev_2, (
+ "Unequal devices for sparse generators: "
+ "{} and {}".format(dev1, dev2)
+ )
assert(op1_i is None or op2_i is None)
assert(op1_v is None or op2_v is None)
assert(idx1_i == 0 or idx2_i == 0)
@@ -617,7 +622,7 @@
generator = SparseGradGenMeta(
op1_i or op2_i, idx1_i + idx2_i,
op1_v or op2_v, idx1_v + idx2_v,
- g1)
+ g1, dev_1)
self.gradient_generators[name][version].append(generator)
def BuildGradientGenerators( # NOQA
@@ -650,15 +655,17 @@
# we'll merge indices and values generators
# corresponding to the same gradient in step (3)
if g.indices == output:
- m = SparseGradGenMeta(grad_op, i, None, 0, g)
+ m = SparseGradGenMeta(
+ grad_op, i, None, 0, g, grad_op.device_option)
else:
assert(g.values == output)
- m = SparseGradGenMeta(None, 0, grad_op, i, g)
+ m = SparseGradGenMeta(
+ None, 0, grad_op, i, g, grad_op.device_option)
sparse_generators[input_name][input_version].append(m)
else:
self.gradient_generators[input_name][input_version] \
.append(GradGenMeta(
- grad_op, i, g))
+ grad_op, i, g, grad_op.device_option))
# (3) merge indices and values generators for sparse gradients, and
# add them to gradient_generators
@@ -678,11 +685,11 @@
if str(g.indices) not in locally_generated_blobs and \
str(g.values) not in locally_generated_blobs:
self.gradient_generators[input_name][input_version].append(
- SparseGradGenMeta(None, 0, None, 0, g))
+ SparseGradGenMeta(None, 0, None, 0, g, forward_op.device_option))
else:
if str(g) not in locally_generated_blobs:
self.gradient_generators[input_name][input_version].append(
- GradGenMeta(None, 0, g))
+ GradGenMeta(None, 0, g, forward_op.device_option))
# Finally, for the gradients specified in g_input, we update the
# gradient frontier to reflect the input versions that the gradients
@@ -701,12 +708,12 @@
for g in generator:
if type(g) is GradGenMeta:
- grad_op, idx, _ = g
+ grad_op, idx, _, _ = g
if grad_op:
return grad_op.output[idx]
else:
assert(type(g) is SparseGradGenMeta)
- op_i, idx_i, op_v, idx_v, _ = g
+ op_i, idx_i, op_v, idx_v, _, _ = g
if op_i:
return remove_suffix(op_i.output[idx_i], '_indices')
if op_v:
@@ -720,16 +727,12 @@
# we already checked that device options are consistent so we can just
# use the first one we find
for generator in generators:
- grad_op = generator.grad_op if type(generator) is GradGenMeta \
- else generator.grad_op_values or generator.grad_op_indices
- if grad_op:
- if grad_op.HasField('device_option'):
- for op in sum_ops:
- op.device_option.CopyFrom(grad_op.device_option)
- op.device_option.extra_info.extend([
- "{}:1".format(IR.IS_AUTO_GEN_SUM_OPS_TAG)
- ])
- break
+ for op in sum_ops:
+ op.device_option.CopyFrom(generator.device_option)
+ op.device_option.extra_info.extend([
+ "{}:1".format(IR.IS_AUTO_GEN_SUM_OPS_TAG)
+ ])
+ break
def _DisambiguateGradOpOutput(self, grad_op, idx, cnt):
new_grad_output = (
@@ -756,7 +759,7 @@
first_grad_op = True
for generator in generators:
- grad_op, idx, g = generator
+ grad_op, idx, g, _ = generator
assert(type(g) is not GradientSlice)
if grad_op:
if first_grad_op:
@@ -790,7 +793,7 @@
for generator in generators:
assert(type(generator) is SparseGradGenMeta)
- op_i, idx_i, op_v, idx_v, g = generator
+ op_i, idx_i, op_v, idx_v, g, _ = generator
if op_i:
out, cnt_i = self._DisambiguateGradOpOutput(op_i, idx_i, cnt_i)
indices_concat_input.append(out)
@@ -864,16 +867,14 @@
all_gradient_names = []
all_device_options = []
for g in generator:
+ if g.device_option:
+ all_device_options.append(g.device_option)
if type(g) is GradGenMeta:
if g.grad_op:
all_gradient_names.append(g.gradient)
- all_device_options.append(g.grad_op.device_option)
else:
assert(type(g) is SparseGradGenMeta)
- if g.grad_op_indices:
- all_device_options.append(g.grad_op_indices.device_option)
- if g.grad_op_values:
- all_device_options.append(g.grad_op_values.device_option)
+ if g.gradient.values:
all_gradient_names.append(g.gradient.values)
# Check if all grad op device options are the same.
@@ -935,7 +936,8 @@
# a ConstantFill operator. Autogeneration for sparse gradients is
# not supported
generator = GradGenMeta(
- autograd_op, 0 if autograd_op else None, str(grad))
+ autograd_op, 0 if autograd_op else None, str(grad),
+ autograd_op.device_option)
self.gradient_generators[str(y)][self.frontier[str(y)]].append(
generator)
diff --git a/caffe2/python/core_gradients_test.py b/caffe2/python/core_gradients_test.py
index b6e9817..a7b736e 100644
--- a/caffe2/python/core_gradients_test.py
+++ b/caffe2/python/core_gradients_test.py
@@ -915,6 +915,44 @@
except Exception as e:
self.assertTrue("schema" in str(e))
+ def testDeviceOptionsPropagation(self):
+ '''
+ Test verifies that aggregation operators in a backward path will be in
+ the same device as the parameter.
+ '''
+ device_0 = 'node:0'
+
+ # init_net.
+ init_net = core.Net("init_net")
+ with core.DeviceScope(0, node_name=device_0):
+ w = init_net.UniformFill([], 'w', shape=[10000, 64])
+ ids = init_net.GivenTensorFill(
+ [],
+ 'ids',
+ values=np.random.random_integers(low=0, high=10000, size=10),
+ )
+ ids_2 = init_net.GivenTensorFill(
+ [],
+ 'ids_2',
+ values=np.random.random_integers(low=0, high=10000, size=10),
+ )
+
+ # train_net.
+ train_net = core.Net("train_net")
+ with core.DeviceScope(0, node_name=device_0):
+ vals = train_net.Gather([w, ids], "gathered")
+ r_vals = train_net.ReduceSum([vals], 1, axes=0)
+
+ vals_2 = train_net.Gather([w, ids_2], "gathered_2")
+ r_vals_2 = train_net.ReduceSum([vals_2], 1, axes=0)
+
+ loss = train_net.Sum([r_vals, r_vals_2], 1)
+ train_net.AddGradientOperators([loss])
+ # All concat operators should be on device_0
+ for op in train_net.Proto().op:
+ if op.type == 'Concat':
+ self.assertEqual(op.device_option.node_name, device_0)
+
if __name__ == '__main__':
unittest.main()