Avoid 2 extra copies when reducing sparse tensors and fix result() vs inplace output discrepancy (#57822)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57822

* `AsyncSparseAllreduceWork` can avoid copying output tensors, since we keep all the results alive by means of modifying input vector directly
* `AsyncSparseAllreduceWork` now returns inputs back to user instead of former behavior where it returned copies of inputs. This is consistent with other operations and process group implementations
* `AsyncSparseAllreduceCUDAWork` is now copying tensors directly from CPU to input tensors avoiding extra copy `output` -> `outputs` -> `inputs`. inputs are being returned to back to user. This is consistent with other operations and process group implementations.

overall AsyncSparseAllreduceCUDAWork is now avoiding 2 extra copies (as AsyncSparseAllreduceCUDAWork is using AsyncSparseAllreduceWork's impl)

Test Plan: Imported from OSS

Reviewed By: mrshenli

Differential Revision: D28298325

Pulled By: agolynski

fbshipit-source-id: 18e2104413cdf5e73a01aad464e2613807779297
diff --git a/torch/lib/c10d/ProcessGroupGloo.cpp b/torch/lib/c10d/ProcessGroupGloo.cpp
index 2f8b9a3..4f63148 100644
--- a/torch/lib/c10d/ProcessGroupGloo.cpp
+++ b/torch/lib/c10d/ProcessGroupGloo.cpp
@@ -984,7 +984,6 @@
 
   std::shared_ptr<gloo::Context> context;
   std::vector<at::Tensor> inputs;
-  std::vector<at::Tensor> outputs;
   const uint32_t tag;
 
   // We share dimensionality about the sparse tensors before collecting
@@ -1118,22 +1117,13 @@
   void run() override {
     auto output = allreduce(inputs);
 
-    // Copy back to input tensors.
-    outputs.reserve(inputs.size());
-    for (auto& input : inputs) {
-      input.copy_(output);
-      if (output.is_sparse()) {
-        outputs.push_back(output.clone());
-      } else {
-        outputs.push_back(output.clone(at::MemoryFormat::Contiguous));
-      }
+    // This copy is needed when we run a multi-gpu version of reduce (multiple
+    // inputs per rank).
+    for (int i = 0; i < inputs.size(); ++i) {
+      inputs[i].copy_(output);
     }
   }
 
-  std::vector<at::Tensor> result() override {
-    return outputs;
-  }
-
  private:
   std::vector<SparseTensorMetadata> allgather_metadata(
       const at::Tensor& tensor) {
@@ -1339,7 +1329,7 @@
     at::cuda::OptionalCUDAStreamGuard stream_guard;
     for (size_t i = 0; i < inputs.size(); i++) {
       stream_guard.reset_stream(streams[i]);
-      outputs.push_back(output.to(inputs[i].device(), /*non_blocking=*/true));
+      inputs[i].copy_(output, /*non_blocking=*/true);
       events[i].record(streams[i]);
     }
   }
@@ -1351,12 +1341,6 @@
       guard.set_index(inputs[i].device().index());
       events[i].block(at::cuda::getCurrentCUDAStream());
     }
-
-    // Copy outputs back to inputs after synchronization, so that users can
-    // access all reduce results from input tensors
-    for (size_t i = 0; i < inputs.size(); i++) {
-      inputs[i].copy_(outputs[i]);
-    }
   }
 
   std::vector<at::Tensor> tmp;