Revert D24346771: [caffe2][memonger] Add support for distributed inference predict nets in DAG memonger
Test Plan: revert-hammer
Differential Revision:
D24346771 (https://github.com/pytorch/pytorch/commit/5882f2e540a1d146e44cc2d6f85f6af559aff70f)
Original commit changeset: ad2dd2e63f3e
fbshipit-source-id: 90346f08c890eebe71f068748a8e24e4db88c250
diff --git a/caffe2/core/memonger.cc b/caffe2/core/memonger.cc
index 8b77b50..d0ea51b 100644
--- a/caffe2/core/memonger.cc
+++ b/caffe2/core/memonger.cc
@@ -199,13 +199,10 @@
}
// The main recursive call. Here we do start DFS in the operator graph
- // from the input blobs. Note that the input ordering does not indicate
- // operator graph ordering. To avoid traversing children operators first,
- // traversal begins from root ops and then recursively children ops are
- // visited.
+ // from the input blobs.
for (const auto& input_blob : heads) {
for (const int op_index : blob_to_ops_[input_blob]) {
- if (!op_visited_[op_index] && !op_inputs_[op_index]) {
+ if (!op_visited_[op_index]) {
vector<std::pair<int, string>> free_blobs;
std::unordered_set<int> tokens{tokens_counter_++};
process_op(
@@ -274,12 +271,6 @@
apply_recurrent_blob_assignments(optimized_net.mutable_op(i));
}
- // Special handling for AsyncIf ops, where internal nets can
- // refer to memongered blobs
- if (optimized_net.op(i).type() == "AsyncIf") {
- apply_asyncif_blob_assignments(optimized_net.mutable_op(i));
- }
-
for (int j = 0; j < optimized_net.op(i).input_size(); ++j) {
const string& input_name =
get_blob_or_mapped_blob(optimized_net.op(i).input(j));
@@ -339,39 +330,6 @@
}
}
- void apply_asyncif_blob_assignments(OperatorDef* op) {
- for (int i = 0; i < op->arg_size(); i++) {
- Argument* arg = op->mutable_arg(i);
- const string& name = arg->name();
- if (name == "then_net" || name == "else_net") {
- NetDef* step_net_ref = arg->mutable_n();
- NetDef optimized_net = apply_assignments(*step_net_ref);
-
- // update external inputs and outputs mappings as well
- // for this internal net
- std::vector<string> optim_external_inputs;
- for (auto& blob_name : optimized_net.external_input()) {
- optim_external_inputs.push_back(get_blob_or_mapped_blob(blob_name));
- }
- optimized_net.mutable_external_input()->Clear();
- for (const auto& blob_name : optim_external_inputs) {
- optimized_net.add_external_input(blob_name);
- }
-
- std::vector<string> optim_external_outputs;
- for (auto& blob_name : optimized_net.external_output()) {
- optim_external_outputs.push_back(get_blob_or_mapped_blob(blob_name));
- }
- optimized_net.mutable_external_output()->Clear();
- for (const auto& blob_name : optim_external_outputs) {
- optimized_net.add_external_output(blob_name);
- }
-
- step_net_ref->CopyFrom(optimized_net);
- }
- }
- }
-
template <typename K, typename V>
inline bool has_key(const std::unordered_map<K, V>& in_map, const K& key) {
return in_map.find(key) != in_map.end();
diff --git a/caffe2/python/memonger_test.py b/caffe2/python/memonger_test.py
index 68d1eb2..8584e8d 100644
--- a/caffe2/python/memonger_test.py
+++ b/caffe2/python/memonger_test.py
@@ -1,6 +1,11 @@
+
+
+
+
+
import numpy as np
-from caffe2.python import workspace, memonger, core, model_helper, brew, dyndep
+from caffe2.python import workspace, memonger, core, model_helper, brew
from caffe2.proto import caffe2_pb2
import caffe2.python.hypothesis_test_util as hu
from future.utils import viewvalues
@@ -451,179 +456,6 @@
np.testing.assert_almost_equal(loss1, optimized_loss1)
np.testing.assert_almost_equal(loss2, optimized_loss2)
- # This test reproduces scenario where dag traversal for finding
- # shared blobs was not always starting from ops with in degree of 0
- @settings(deadline=10000)
- def test_forward_optim_tree_dag_traversal(self):
- input_dim = 4
- output_dim = 4
- batch_size = 4
-
- m = model_helper.ModelHelper()
- m.Proto().type = "dag"
- m.Proto().num_workers = 4
-
- with core.NameScope("name_x"):
- fc1 = brew.fc(m, "data", "fc1", dim_in=input_dim, dim_out=output_dim)
- fc2 = brew.fc(m, fc1, "fc2", dim_in=output_dim, dim_out=output_dim)
-
- fc3 = brew.fc(m, fc2, "fc3", dim_in=output_dim, dim_out=output_dim)
- fc4 = brew.fc(m, fc3, "fc4", dim_in=output_dim, dim_out=output_dim)
- fc5 = brew.fc(m, fc4, "fc5", dim_in=output_dim, dim_out=output_dim)
-
- # Branch
- fc3b = brew.fc(m, fc2, "fc3b", dim_in=output_dim, dim_out=output_dim)
- fc4b = brew.fc(m, fc3b, "fc4b", dim_in=output_dim, dim_out=output_dim)
- fc5b = brew.fc(m, fc4b, "fc5b", dim_in=output_dim, dim_out=output_dim)
-
- fc5sum = brew.sum(m, [fc5, fc5b], "fc5sum")
-
- fc5.Relu([], fc5sum) \
- .Softmax([], "pred1") \
- .LabelCrossEntropy(["label"], ["xent1"]) \
- .AveragedLoss([], "loss1")
- fc6 = brew.fc(m, fc5, "fc6", dim_in=output_dim, dim_out=output_dim)
- fc6.Relu([], fc6) \
- .Softmax([], "pred2") \
- .LabelCrossEntropy(["label"], ["xent2"]) \
- .AveragedLoss([], "loss2")
-
- blobs_before = count_blobs(m.net.Proto())
- # adding name_x/fc5_w as heads (which belongs to non-root op)
- # to make sure that dag traversal always starts from root ops
- optim_proto = memonger.optimize_inference_for_dag(
- m.net, ["name_x/fc5_w", "name_x/data"], "name_x"
- )
- blobs_after = count_blobs(optim_proto)
- self.assertLess(blobs_after, blobs_before)
-
- @settings(deadline=10000)
- def test_forward_optim_tree_asyncif_op(self):
- dyndep.InitOpsLibrary("//caffe2/caffe2/fb/operators:async_if_op")
- input_dim = 4
- output_dim = 4
- batch_size = 4
- has_elements_blob = "name_x/has_element_output"
- m = model_helper.ModelHelper()
- m.Proto().type = "async_scheduling"
- m.Proto().num_workers = 4
-
- with core.NameScope("name_x"):
- fc1 = brew.fc(m, "data", "fc1", dim_in=input_dim, dim_out=output_dim)
- fc2 = brew.fc(m, fc1, "fc2", dim_in=output_dim, dim_out=output_dim)
-
- fc3 = brew.fc(m, fc2, "fc3", dim_in=output_dim, dim_out=output_dim)
- fc4 = brew.fc(m, fc3, "fc4", dim_in=output_dim, dim_out=output_dim)
- fc5 = brew.fc(m, fc4, "fc5", dim_in=output_dim, dim_out=output_dim)
-
- # Branch
- fc3b = brew.fc(m, fc2, "fc3b", dim_in=output_dim, dim_out=output_dim)
- fc4b = brew.fc(m, fc3b, "fc4b", dim_in=output_dim, dim_out=output_dim)
- fc5b = brew.fc(m, fc4b, "fc5b", dim_in=output_dim, dim_out=output_dim)
-
- # Replace fc sum with async if op
- m.net.Proto().op.extend(self.creat_async_if_op(
- has_elements_blob,
- ["fc5", "fc5b"],
- "fc5sum",
- "name_x"))
-
- fc5.Relu([], "fc5sum") \
- .Softmax([], "pred1") \
- .LabelCrossEntropy(["label"], ["xent1"]) \
- .AveragedLoss([], "loss1")
- fc6 = brew.fc(m, fc5, "fc6", dim_in=output_dim, dim_out=output_dim)
- fc6.Relu([], fc6) \
- .Softmax([], "pred2") \
- .LabelCrossEntropy(["label"], ["xent2"]) \
- .AveragedLoss([], "loss2")
-
- m.net.Proto().external_input.extend([has_elements_blob])
-
- blobs_before = count_blobs(m.net.Proto())
- optim_proto = memonger.optimize_inference_for_dag(
- m.net, ["name_x/data"], "name_x"
- )
- blobs_after = count_blobs(optim_proto)
- self.assertLess(blobs_after, blobs_before)
-
- # get AsyncIf op and make sure that its inputs and outputs are shared. Verify internal nets also
- for op in optim_proto.op:
- if op.type == "AsyncIf":
- for blob in op.input:
- if has_elements_blob not in blob:
- self.assertIn("__m", blob, "Expected shared blob: " + blob)
- for blob in op.output:
- self.assertIn("__m", blob, "Expected shared blob: " + blob)
- for arg in op.arg:
- if arg.name in ["then_net", "else_net"]:
- for blob in arg.n.external_input:
- self.assertIn("__m", blob, "Expected shared blob: " + blob)
- for blob in arg.n.external_output:
- self.assertIn("__m", blob, "Expected shared blob: " + blob)
- for blob in arg.n.op[0].input:
- self.assertIn("__m", blob, "Expected shared blob: " + blob)
- for blob in arg.n.op[0].output:
- self.assertIn("__m", blob, "Expected shared blob: " + blob)
-
- # Test networks produce exactly same results
- data = np.random.randn(batch_size, input_dim).astype(np.float32)
- label = np.random.randint(
- low=0, high=output_dim, size=(batch_size,)).astype(np.int32)
- workspace.RunNetOnce(m.param_init_net)
- workspace.FeedBlob("name_x/data", data)
- workspace.FeedBlob("name_x/label", label)
-
- # check for then_net
- workspace.FeedBlob(has_elements_blob, np.array(True, dtype='bool'))
- workspace.RunNetOnce(m.net)
- loss1 = workspace.FetchBlob("name_x/loss1")
- loss2 = workspace.FetchBlob("name_x/loss2")
- workspace.RunNetOnce(optim_proto)
- optimized_loss1 = workspace.FetchBlob("name_x/loss1")
- optimized_loss2 = workspace.FetchBlob("name_x/loss2")
- np.testing.assert_almost_equal(loss1, optimized_loss1)
- np.testing.assert_almost_equal(loss2, optimized_loss2)
-
- # check for else_net
- workspace.FeedBlob(has_elements_blob, np.array(False, dtype='bool'))
- workspace.RunNetOnce(m.net)
- loss1 = workspace.FetchBlob("name_x/loss1")
- loss2 = workspace.FetchBlob("name_x/loss2")
- workspace.RunNetOnce(optim_proto)
- optimized_loss1 = workspace.FetchBlob("name_x/loss1")
- optimized_loss2 = workspace.FetchBlob("name_x/loss2")
- np.testing.assert_almost_equal(loss1, optimized_loss1)
- np.testing.assert_almost_equal(loss2, optimized_loss2)
-
- def creat_async_if_op(self, has_elements_output, input_blobs, output_blob, namescope):
- then_net = core.Net("then_net")
- then_net.Proto().type = "async_scheduling"
- then_net.Sum(input_blobs, output_blob)
-
- else_net = core.Net("else_net")
- else_net.Proto().type = "async_scheduling"
- else_net.Sum(input_blobs, output_blob)
-
- then_net.Proto().external_output.extend([namescope + "/" + output_blob])
- else_net.Proto().external_output.extend([namescope + "/" + output_blob])
-
- async_if = caffe2_pb2.OperatorDef()
- async_if.type = "AsyncIf"
- then_net_arg = async_if.arg.add()
- then_net_arg.name = "then_net"
- then_net_arg.n.CopyFrom(then_net.Proto())
-
- else_net_arg = async_if.arg.add()
- else_net_arg.name = "else_net"
- else_net_arg.n.CopyFrom(else_net.Proto())
-
- input_blobs_with_namescope = [namescope + "/" + input_blob for input_blob in input_blobs]
- async_if.input.extend([has_elements_output])
- async_if.input.extend(input_blobs_with_namescope)
- async_if.output.extend([namescope + "/" + output_blob])
- return [async_if]
-
def test_rnn(self):
from caffe2.python import rnn_cell
T = 5