allow querying tensor device + tool to validate that all ops have tensors from correct devices (GPUs)

Summary:
Quite common, hard-to-debug, performance bug for multi-GPU training has been that operators have been passed tensors that reside on different GPU than what the op runs on. Since we have peer access enabled, this works, but is just much slower. With data parallel model this problem arises rarely as it has static analysis of the operators, but if someone bypassed DPM or uses FeedBlob with incorrect device options, this problem can happen.

To make debugging easier, I added device-field to tensor that stores the device information that allocated the memory. In addition, I added a function to go through operator inputs and outputs and compare their tensor device to the operator device. This check is run after first iteration with prof_dag only.

Also renamed ShapeCall to TensorInfoFun, as it now returns so much other info than the shape.

I think this is pretty safe diff, but do you find it problematic to add a new field to tensor?

Reviewed By: dzhulgakov

Differential Revision: D5335505

fbshipit-source-id: 511b6c122dff9a205f43951984868ffd40f7ac30
diff --git a/caffe2/contrib/prof/prof_dag_net.cc b/caffe2/contrib/prof/prof_dag_net.cc
index dc35cfc..c83b39e 100644
--- a/caffe2/contrib/prof/prof_dag_net.cc
+++ b/caffe2/contrib/prof/prof_dag_net.cc
@@ -21,12 +21,33 @@
   PrintStats();
 }
 
+void ProfDAGNet::ValidateOpTensorDevices() {
+  bool had_mismatches = false;
+  for (int idx = 0; idx < operator_nodes_.size(); idx++) {
+    const auto& node = operator_nodes_[idx];
+    auto mismatches = ValidateTensorDevices(*node.operator_);
+    for (auto& mismatch : mismatches) {
+      had_mismatches = true;
+      LOG(INFO) << "== PERFORMANCE WARNING == \n"
+                << " Operator " << node.operator_->def().type()
+                << " expects GPU " << mismatch.second.first.cuda_gpu_id()
+                << " but tensor [" << mismatch.first << "] is on GPU "
+                << mismatch.second.second.cuda_gpu_id();
+    }
+  }
+  if (!had_mismatches) {
+    LOG(INFO) << "Analyzed operator & blob GPU assignments -- no mismatches";
+  }
+}
+
 bool ProfDAGNet::Run() {
   runs_++;
 
   // don't collect statistics from first run
   if (runs_ <= 1) {
-    return DAGNetBase::Run();
+    bool success = DAGNetBase::Run();
+    ValidateOpTensorDevices();
+    return success;
   }
 
   CAFFE_ENFORCE(
diff --git a/caffe2/contrib/prof/prof_dag_net.h b/caffe2/contrib/prof/prof_dag_net.h
index be1a576..4b4ea2e 100644
--- a/caffe2/contrib/prof/prof_dag_net.h
+++ b/caffe2/contrib/prof/prof_dag_net.h
@@ -27,6 +27,7 @@
  protected:
   bool RunAt(const std::vector<int>& chain) override;
   void PrintStats();
+  void ValidateOpTensorDevices();
   ProfDAGProto ProtoMsg(std::pair<std::string, Stats> op_stat) const;
   std::vector<Stats> time_per_op_;
   CaffeMap<std::string, Stats> time_per_op_type_;
diff --git a/caffe2/core/context_gpu.cu b/caffe2/core/context_gpu.cu
index b14638d..231664d 100644
--- a/caffe2/core/context_gpu.cu
+++ b/caffe2/core/context_gpu.cu
@@ -100,6 +100,19 @@
   return g_cuda_memory_pool_type;
 }
 
+vector<TIndex> GetCUDATensorInfo(
+    void* c,
+    bool* shares_data,
+    size_t* capacity,
+    DeviceOption* device) {
+  vector<TIndex> dims =
+      GetTensorInfo<CUDAContext>(c, shares_data, capacity, device);
+  Tensor<CUDAContext>* tc = static_cast<Tensor<CUDAContext>*>(c);
+  device->set_device_type(CUDA);
+  device->set_cuda_gpu_id(GetGPUIDForPointer(tc->raw_data()));
+  return dims;
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // A wrapper to allow us to lazily initialize all cuda environments that Caffe
 // uses. This gets done the first time a caffe2::CUDAContext::New() gets called
@@ -167,10 +180,8 @@
     GetTensorType<CUDAContext>
   );
 
-  RegisterShapeCallFunction(
-    TypeMeta::Id<Tensor<CUDAContext>>(),
-    GetTensorShape<CUDAContext>
-  );
+  RegisterTensorInfoFunction(
+      TypeMeta::Id<Tensor<CUDAContext>>(), GetCUDATensorInfo);
 
   // Check the versions of cuDNN that were compiled and linked with are compatible
   CheckCuDNNVersions();
diff --git a/caffe2/core/operator.cc b/caffe2/core/operator.cc
index d8d58ad..d39ed93 100644
--- a/caffe2/core/operator.cc
+++ b/caffe2/core/operator.cc
@@ -348,16 +348,18 @@
   for (const auto& s : ws_blobs) {
     Blob* b = ws->GetBlob(s);
     TypeCall type_fun = GetTypeCallFunction(b->meta().id());
-    ShapeCall shape_fun = GetShapeCallFunction(b->meta().id());
+    TensorInfoCall tensor_info_fun = GetTensorInfoFunction(b->meta().id());
     TensorShape tp;
 
     if (type_fun) {
         tp.set_data_type(TypeMetaToDataType(type_fun(b->GetRaw())));
     }
-    if (shape_fun) {
+    if (tensor_info_fun) {
       bool _shares_data;
       size_t _capacity;
-      auto shape = shape_fun(b->GetRaw(), _shares_data, _capacity);
+      DeviceOption _device;
+      auto shape =
+          tensor_info_fun(b->GetRaw(), &_shares_data, &_capacity, &_device);
       for (auto d : shape) {
         tp.add_dims(d);
       }
@@ -385,4 +387,46 @@
   return InferBlobShapesAndTypes(blob_desc, nets);
 }
 
+std::map<string, std::pair<DeviceOption, DeviceOption>> ValidateTensorDevices(
+    OperatorBase& op) {
+  std::map<string, std::pair<DeviceOption, DeviceOption>> mismatches;
+  DeviceOption op_device = op.def().device_option();
+
+  // Check from op schema if this op is used for crossing devices
+  auto op_schema = OpSchemaRegistry::Schema(op.def().type());
+  if (op_schema != nullptr) {
+    if (op_schema->inputs_can_cross_devices()) {
+      return mismatches;
+    }
+  }
+
+  auto Check = [&](const Blob& blob, std::string blob_name) {
+    TensorInfoCall tensor_info_fun = GetTensorInfoFunction(blob.meta().id());
+    if (tensor_info_fun) {
+      bool _shares_data;
+      size_t _capacity;
+      DeviceOption blob_device;
+      tensor_info_fun(
+          const_cast<Blob&>(blob).GetRaw(),
+          &_shares_data,
+          &_capacity,
+          &blob_device);
+
+      if (blob_device.device_type() == CUDA &&
+          blob_device.cuda_gpu_id() != op_device.cuda_gpu_id()) {
+        mismatches[blob_name] = std::make_pair(op_device, blob_device);
+      }
+    }
+  };
+
+  // Check that inputs have same device type as the op
+  for (int i = 0; i < op.InputSize(); i++) {
+    Check(op.InputBlob(i), op.def().input(i));
+  }
+  for (int i = 0; i < op.OutputSize(); i++) {
+    Check(*op.OutputBlob(i), op.def().output(i));
+  }
+  return mismatches;
+}
+
 }  // namespace caffe2
diff --git a/caffe2/core/operator.h b/caffe2/core/operator.h
index 6afce26..d48c8d0 100644
--- a/caffe2/core/operator.h
+++ b/caffe2/core/operator.h
@@ -584,6 +584,9 @@
     const CaffeMap<std::string, std::vector<TIndex>>& blob_dimensions,
     const vector<std::unique_ptr<NetDef>>& nets);
 
+std::map<string, std::pair<DeviceOption, DeviceOption>> ValidateTensorDevices(
+    OperatorBase& op);
+
 }  // namespace caffe2
 
 #endif  // CAFFE2_CORE_OPERATOR_H_
diff --git a/caffe2/core/operator_schema.cc b/caffe2/core/operator_schema.cc
index b66ace1..4adaa30 100644
--- a/caffe2/core/operator_schema.cc
+++ b/caffe2/core/operator_schema.cc
@@ -171,6 +171,11 @@
   return *this;
 }
 
+OpSchema& OpSchema::InputsCanCrossDevices() {
+  inputs_can_cross_devices_ = true;
+  return *this;
+}
+
 OpSchema& OpSchema::TensorInferenceFunction(
     TensorInferenceFunctionType function) {
   tensor_inference_function_ = function;
diff --git a/caffe2/core/operator_schema.h b/caffe2/core/operator_schema.h
index 00b7440..bd4908f 100644
--- a/caffe2/core/operator_schema.h
+++ b/caffe2/core/operator_schema.h
@@ -199,6 +199,9 @@
   // Remove from documentation
   OpSchema& Private();
 
+  // This op can pass data across devices
+  OpSchema& InputsCanCrossDevices();
+
   /**
    * @brief A function to allow one to get the number of outputs based on the
    * number of inputs, if this schema supports it.
@@ -219,6 +222,9 @@
   bool private_op() {
     return private_;
   }
+  bool inputs_can_cross_devices() const {
+    return inputs_can_cross_devices_;
+  }
 
   /**
    * @brief Returns the required device location of inputs and outputs.
@@ -256,6 +262,7 @@
   int min_output_ = 0;
   int max_output_ = std::numeric_limits<int>::max();
   bool private_ = false;
+  bool inputs_can_cross_devices_ = false;
   std::function<bool(int)> num_inputs_allowed_
       = [](int) { return true; };
   std::function<bool(int)> num_outputs_allowed_
diff --git a/caffe2/core/tensor.cc b/caffe2/core/tensor.cc
index 4889709..b973fbf 100644
--- a/caffe2/core/tensor.cc
+++ b/caffe2/core/tensor.cc
@@ -72,20 +72,19 @@
   type_call_registry_[id] = c;
 }
 
-static CaffeMap<CaffeTypeId, ShapeCall> shape_call_registry_ {
-  {TypeMeta::Id<Tensor<CPUContext>>(), GetTensorShape<Tensor<CPUContext>>}
-};
+static CaffeMap<CaffeTypeId, TensorInfoCall> tensor_info_call_registry_{
+    {TypeMeta::Id<Tensor<CPUContext>>(), GetTensorInfo<Tensor<CPUContext>>}};
 
-ShapeCall GetShapeCallFunction(CaffeTypeId id) {
-  auto f = shape_call_registry_.find(id);
-  if (f == shape_call_registry_.end()) {
+TensorInfoCall GetTensorInfoFunction(CaffeTypeId id) {
+  auto f = tensor_info_call_registry_.find(id);
+  if (f == tensor_info_call_registry_.end()) {
     return nullptr;
   }
   return f->second;
 }
 
-void RegisterShapeCallFunction(CaffeTypeId id, ShapeCall c) {
-  shape_call_registry_[id] = c;
+void RegisterTensorInfoFunction(CaffeTypeId id, TensorInfoCall c) {
+  tensor_info_call_registry_[id] = c;
 }
 
 namespace {
diff --git a/caffe2/core/tensor.h b/caffe2/core/tensor.h
index a952630..4fa9ba9 100644
--- a/caffe2/core/tensor.h
+++ b/caffe2/core/tensor.h
@@ -733,15 +733,25 @@
 }
 
 // Shape call registry
-typedef vector<TIndex> (*ShapeCall)(void*, bool& shares_data, size_t& capacity);
-ShapeCall GetShapeCallFunction(CaffeTypeId id);
-void RegisterShapeCallFunction(CaffeTypeId id, ShapeCall c);
+typedef vector<TIndex> (*TensorInfoCall)(
+    void*,
+    bool* shares_data,
+    size_t* capacity,
+    DeviceOption* device);
+TensorInfoCall GetTensorInfoFunction(CaffeTypeId id);
+void RegisterTensorInfoFunction(CaffeTypeId id, TensorInfoCall c);
 
 template <class Context>
-vector<TIndex> GetTensorShape(void* c, bool& shares_data, size_t& capacity) {
+vector<TIndex> GetTensorInfo(
+    void* c,
+    bool* shares_data,
+    size_t* capacity,
+    DeviceOption* device) {
   Tensor<Context>* tc = static_cast<Tensor<Context>*>(c);
-  shares_data = tc->shares_data();
-  capacity = tc->capacity_nbytes();
+  *shares_data = tc->shares_data();
+  *capacity = tc->capacity_nbytes();
+  device->set_device_type(CPU);
+  device->set_cuda_gpu_id(0);
   return tc->dims();
 }
 
diff --git a/caffe2/core/workspace.cc b/caffe2/core/workspace.cc
index ee196d4..2f99709 100644
--- a/caffe2/core/workspace.cc
+++ b/caffe2/core/workspace.cc
@@ -37,11 +37,12 @@
   vector<std::pair<size_t, std::string>> blob_sizes;
   for (const auto& s : blobs) {
     Blob* b = this->GetBlob(s);
-    ShapeCall shape_fun = GetShapeCallFunction(b->meta().id());
+    TensorInfoCall shape_fun = GetTensorInfoFunction(b->meta().id());
     if (shape_fun) {
       bool shares_data = false;
       size_t capacity;
-      auto shape = shape_fun(b->GetRaw(), shares_data, capacity);
+      DeviceOption _device;
+      auto shape = shape_fun(b->GetRaw(), &shares_data, &capacity, &_device);
       if (shares_data) {
         // Blobs sharing data do not actually take any memory
         capacity = 0;
@@ -63,11 +64,13 @@
   LOG(INFO) << "name;current shape;capacity bytes;percentage";
   for (const auto& sb : blob_sizes) {
     Blob* b = this->GetBlob(sb.second);
-    ShapeCall shape_fun = GetShapeCallFunction(b->meta().id());
+    TensorInfoCall shape_fun = GetTensorInfoFunction(b->meta().id());
     CHECK(shape_fun != nullptr);
     bool _shares_data = false;
     size_t capacity;
-    auto shape = shape_fun(b->GetRaw(), _shares_data, capacity);
+    DeviceOption _device;
+
+    auto shape = shape_fun(b->GetRaw(), &_shares_data, &capacity, &_device);
     std::stringstream ss;
     ss << sb.second << ";";
     for (const auto d : shape) {
diff --git a/caffe2/operators/communicator_op.cc b/caffe2/operators/communicator_op.cc
index 37a4606..878ea29 100644
--- a/caffe2/operators/communicator_op.cc
+++ b/caffe2/operators/communicator_op.cc
@@ -19,6 +19,7 @@
       return in >= 2 && out == (in - 1);
     })
     .EnforceInplace([](int in, int out) { return (in - 1) == out; })
+    .InputsCanCrossDevices()
     .IdenticalTypeAndShapeOfInput(0)
     .SetDoc(R"DOC(
 Does a broadcast operation from the root node to every other node. The tensor
@@ -32,6 +33,7 @@
 OPERATOR_SCHEMA(Reduce)
     .NumInputs(2)
     .NumOutputs(1)
+    .InputsCanCrossDevices()
     .IdenticalTypeAndShapeOfInput(0)
     .SetDoc(R"DOC(
 Does a reduce operation from every node to the root node. Currently only
@@ -48,6 +50,7 @@
     })
     .EnforceInplace([](int in, int out) { return (in - 1) == out; })
     .IdenticalTypeAndShapeOfInput(0)
+    .InputsCanCrossDevices()
     .SetDoc(R"DOC(
 Does an allreduce operation among the nodes. Currently only Sum is supported.
 )DOC")
@@ -58,6 +61,7 @@
 OPERATOR_SCHEMA(Allgather)
     .NumInputs(2)
     .NumOutputs(1)
+    .InputsCanCrossDevices()
     .SetDoc(R"DOC(
 Does an allgather operation among the nodes.
 )DOC")
diff --git a/caffe2/operators/utility_ops.cc b/caffe2/operators/utility_ops.cc
index 6bcd498..88b789b 100644
--- a/caffe2/operators/utility_ops.cc
+++ b/caffe2/operators/utility_ops.cc
@@ -219,19 +219,21 @@
 OPERATOR_SCHEMA(SumInt)
     .NumInputs(1, INT_MAX)
     .NumOutputs(1)
-    .TensorInferenceFunction(
-        [](const OperatorDef& def, const vector<TensorShape>& in) {
-          vector<TensorShape> out(1);
-          out.push_back(in[0]);
-          out[0].set_data_type(TensorProto::INT32);
-          return out;
-        })
+    .InputsCanCrossDevices()
+    .TensorInferenceFunction([](const OperatorDef& def,
+                                const vector<TensorShape>& in) {
+      vector<TensorShape> out(1);
+      out.push_back(in[0]);
+      out[0].set_data_type(TensorProto::INT32);
+      return out;
+    })
     .AllowInplace({{0, 0}});
 
 OPERATOR_SCHEMA(Sum)
     .NumInputs(1, INT_MAX)
     .NumOutputs(1)
     .AllowInplace({{0, 0}})
+    .InputsCanCrossDevices()
     .IdenticalTypeAndShapeOfInput(0)
     .SetDoc(R"DOC(
 Element-wise sum of each of the input tensors. The first input tensor can be
@@ -352,6 +354,7 @@
     .NumInputs(1)
     .NumOutputs(1)
     .IdenticalTypeAndShape()
+    .InputsCanCrossDevices()
     .SetDoc("Copy input tensor into output, potentially across devices.")
     .Input(0, "input", "The input tensor.")
     .Output(0, "output", "Tensor that will contain a copy of the input.");
@@ -360,6 +363,7 @@
     .NumInputs(1)
     .NumOutputs(1)
     .IdenticalTypeAndShape()
+    .InputsCanCrossDevices()
     .DeviceInferenceFunction([](const OperatorDef& def) {
       CAFFE_ENFORCE(
           def.has_device_option(),
@@ -380,6 +384,7 @@
     .NumInputs(1)
     .NumOutputs(1)
     .IdenticalTypeAndShape()
+    .InputsCanCrossDevices()
     .DeviceInferenceFunction([](const OperatorDef& def) {
       CAFFE_ENFORCE(
           def.has_device_option(),
@@ -400,6 +405,7 @@
     .NumInputs(1)
     .NumOutputs(1)
     .IdenticalTypeAndShape()
+    .InputsCanCrossDevices()
     .DeviceInferenceFunction([](const OperatorDef& def) {
       auto op_device =
           def.has_device_option() ? def.device_option() : DeviceOption();
@@ -419,6 +425,7 @@
     .NumInputs(1)
     .NumOutputs(1)
     .IdenticalTypeAndShape()
+    .InputsCanCrossDevices()
     .DeviceInferenceFunction([](const OperatorDef& def) {
       auto op_device =
           def.has_device_option() ? def.device_option() : DeviceOption();