Avoid 2 extra copies when reducing sparse tensors and fix result() vs inplace output discrepancy (#57822)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57822
* `AsyncSparseAllreduceWork` can avoid copying output tensors, since we keep all the results alive by means of modifying input vector directly
* `AsyncSparseAllreduceWork` now returns inputs back to user instead of former behavior where it returned copies of inputs. This is consistent with other operations and process group implementations
* `AsyncSparseAllreduceCUDAWork` is now copying tensors directly from CPU to input tensors avoiding extra copy `output` -> `outputs` -> `inputs`. inputs are being returned to back to user. This is consistent with other operations and process group implementations.
overall AsyncSparseAllreduceCUDAWork is now avoiding 2 extra copies (as AsyncSparseAllreduceCUDAWork is using AsyncSparseAllreduceWork's impl)
Test Plan: Imported from OSS
Reviewed By: mrshenli
Differential Revision: D28298325
Pulled By: agolynski
fbshipit-source-id: 18e2104413cdf5e73a01aad464e2613807779297
diff --git a/torch/lib/c10d/ProcessGroupGloo.cpp b/torch/lib/c10d/ProcessGroupGloo.cpp
index 2f8b9a3..4f63148 100644
--- a/torch/lib/c10d/ProcessGroupGloo.cpp
+++ b/torch/lib/c10d/ProcessGroupGloo.cpp
@@ -984,7 +984,6 @@
std::shared_ptr<gloo::Context> context;
std::vector<at::Tensor> inputs;
- std::vector<at::Tensor> outputs;
const uint32_t tag;
// We share dimensionality about the sparse tensors before collecting
@@ -1118,22 +1117,13 @@
void run() override {
auto output = allreduce(inputs);
- // Copy back to input tensors.
- outputs.reserve(inputs.size());
- for (auto& input : inputs) {
- input.copy_(output);
- if (output.is_sparse()) {
- outputs.push_back(output.clone());
- } else {
- outputs.push_back(output.clone(at::MemoryFormat::Contiguous));
- }
+ // This copy is needed when we run a multi-gpu version of reduce (multiple
+ // inputs per rank).
+ for (int i = 0; i < inputs.size(); ++i) {
+ inputs[i].copy_(output);
}
}
- std::vector<at::Tensor> result() override {
- return outputs;
- }
-
private:
std::vector<SparseTensorMetadata> allgather_metadata(
const at::Tensor& tensor) {
@@ -1339,7 +1329,7 @@
at::cuda::OptionalCUDAStreamGuard stream_guard;
for (size_t i = 0; i < inputs.size(); i++) {
stream_guard.reset_stream(streams[i]);
- outputs.push_back(output.to(inputs[i].device(), /*non_blocking=*/true));
+ inputs[i].copy_(output, /*non_blocking=*/true);
events[i].record(streams[i]);
}
}
@@ -1351,12 +1341,6 @@
guard.set_index(inputs[i].device().index());
events[i].block(at::cuda::getCurrentCUDAStream());
}
-
- // Copy outputs back to inputs after synchronization, so that users can
- // access all reduce results from input tensors
- for (size_t i = 0; i < inputs.size(); i++) {
- inputs[i].copy_(outputs[i]);
- }
}
std::vector<at::Tensor> tmp;