Use fresh stream from pool for each FutureNCCL callback (#48498)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/48498
This commit is part of a stack that reworks FutureNCCL in order to extract a generic CUDA-aware Future subclass. The stack deliberately breaks up this transition into elementary changes, to make it easier to verify that the behavior is preserved (or to highlight how it gets changed).
---
FutureNCCL has a dedicated CUDA stream that it sets as current when running callbacks. This stream is initialized by the ProcessGroupNCCL by extracting it from the global ATen pool.
In order to decouple FutureNCCL from that specific ProcessGroup and make it more generic, in this commit we make FutureNCCL extract a fresh stream from the ATen pool each time it needs one.
This introduces a functional change, because it removes the implicit synchronization and ordering between the callbacks of a same Future. In fact, such an ordering is hard to guarantee in the general case as, for example, a user could attach a new callback just after the future becomes completed, and thus that callback would be run inline, immediately, out-of-order wrt the other callbacks. (There are ways to "fix" this but they are complicated). NCCL got around this because its futures are already marked complete when they're returned, but in fact it could also run into issues if multiple threads were adding callbacks simultaneously.
Note that it remains still possible to enforce ordering between callbacks, but one must now do so explicitly. Namely, instead of this:
```
fut.then(cb1)
fut.then(cb2)
```
one must now do:
```
fut.then(cb1).then(cb2)
```
ghstack-source-id: 118180029
Test Plan: Unit tests
Reviewed By: mrshenli
Differential Revision: D25177559
fbshipit-source-id: 4d4e73ea7bda0ea65066548109b9ea6d5b465599
diff --git a/torch/lib/c10d/ProcessGroupNCCL.cpp b/torch/lib/c10d/ProcessGroupNCCL.cpp
index 4b5fb5f..18948c8 100644
--- a/torch/lib/c10d/ProcessGroupNCCL.cpp
+++ b/torch/lib/c10d/ProcessGroupNCCL.cpp
@@ -452,7 +452,6 @@
ncclCommCounter_(0),
terminateProcessGroup_(false),
opTimeout_(options->opTimeout),
- futureNCCLCallbackStreams_(c10::cuda::device_count()),
isHighPriorityStream_(options->isHighPriorityStream) {
TORCH_CHECK(at::cuda::getNumGPUs() != 0,
"ProcessGroupNCCL is only supported with GPUs, no GPUs found!");
@@ -867,15 +866,6 @@
// Creates the NCCL streams
streamVal.push_back(at::cuda::getStreamFromPool(isHighPriorityStream_));
-
- // If not set before, get a dedicated stream for the device to run
- // FutureNCCL then callbacks.
- std::lock_guard<std::mutex> lock(mutex_);
- if (futureNCCLCallbackStreams_[deviceIndex] == nullptr) {
- futureNCCLCallbackStreams_[deviceIndex] =
- std::make_shared<at::cuda::CUDAStream>(
- at::cuda::getStreamFromPool(isHighPriorityStream_));
- }
}
// [Note 2 ]
@@ -1027,8 +1017,7 @@
return c10::make_intrusive<FutureNCCL>(
at::IValue(*outputs_),
deviceIndex,
- cudaEvents_,
- futureNCCLCallbackStreams_[deviceIndex]);
+ cudaEvents_);
}
void ProcessGroupNCCL::workEnqueue(
@@ -1066,10 +1055,8 @@
bool can_profile = outputs.size() == 1;
auto work = initWork(devices, rank_, opType, can_profile ? profilingTitle : nullptr);
- // Store references to outputs and futureNCCLCallbackStream to be used by
- // WorkNCCL::getFuture.
+ // Store references to outputs to be used by WorkNCCL::getFuture.
work->outputs_ = std::make_shared<std::vector<at::Tensor>>(outputs);
- work->futureNCCLCallbackStreams_ = futureNCCLCallbackStreams_;
if (work->recordFunctionEndCallback_) {
// recordFunctionEndCallback_ is normally called in fininsh() function by
@@ -1152,10 +1139,8 @@
auto work = initWork(devices, rank_, opType);
if (opType == OpType::RECV) {
- // Store references to outputs and futureNCCLCallbackStream to be used by
- // WorkNCCL::getFuture.
+ // Store references to outputs to be used by WorkNCCL::getFuture.
work->outputs_ = std::make_shared<std::vector<at::Tensor>>(tensors);
- work->futureNCCLCallbackStreams_ = futureNCCLCallbackStreams_;
}
at::cuda::OptionalCUDAGuard gpuGuard;
diff --git a/torch/lib/c10d/ProcessGroupNCCL.hpp b/torch/lib/c10d/ProcessGroupNCCL.hpp
index fe6c113..c1b67cf 100644
--- a/torch/lib/c10d/ProcessGroupNCCL.hpp
+++ b/torch/lib/c10d/ProcessGroupNCCL.hpp
@@ -171,10 +171,6 @@
// Store a reference to NCCL collective's outputs to be used by getFuture.
std::shared_ptr<std::vector<at::Tensor>> outputs_;
- // Store streams that run FutureNCCL then callbacks.
- std::vector<std::shared_ptr<at::cuda::CUDAStream>>
- futureNCCLCallbackStreams_;
-
friend class ProcessGroupNCCL;
};
@@ -200,10 +196,8 @@
// or NCCL's barrier().
//
// If created by WorkNCCL's getFuture API, FutureNCCL has a reference to
- // WorkNCCL's cudaEvents, NCCL collective's outputs, device index of
- // outputs' device, and the ProcesGroupNCCL's dedicated
- // futureNCCLCallbackStream for outputs' device that runs all the then
- // callbacks called from this FutureNCCL. Its value is NCCL collective's
+ // WorkNCCL's cudaEvents, NCCL collective's outputs, and the device index of
+ // outputs' device. Its value is NCCL collective's
// outputs. FutureNCCL only supports single-process single-device mode where
// the size of outputs is equal to 1.
//
@@ -213,21 +207,17 @@
// own cudaEvents with the stream that runs the callback. This design
// enables synchronizing the appropriate streams and avoids stalling PyTorch's
// default stream while running the callback. In case of multiple then
- // callbacks, the design will work like a chain such that FutureNCCL n will
- // wait on the cudaEvents from FutureNCCL n - 1. All callbacks are executed on
- // outputs' device's dedicated futureNCCLCallbackStream.
+ // callbacks, each will be executed on its own fresh stream.
struct FutureNCCL : at::ivalue::Future {
public:
explicit FutureNCCL(
at::IValue value,
c10::DeviceIndex deviceIndex,
- std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents,
- std::shared_ptr<at::cuda::CUDAStream> futureNCCLCallbackStream)
+ std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents)
: at::ivalue::Future(c10::ListType::create(c10::TensorType::get())),
value_(std::move(value)),
deviceIndex_(deviceIndex),
- cudaEvents_(cudaEvents),
- futureNCCLCallbackStream_(futureNCCLCallbackStream) {
+ cudaEvents_(std::move(cudaEvents)) {
TORCH_INTERNAL_ASSERT(
cudaEvents_->size() == 1,
"FutureNCCL only supports single-process single-device mode.");
@@ -246,12 +236,10 @@
// return value of callback.
explicit FutureNCCL(
c10::DeviceIndex deviceIndex,
- std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents,
- std::shared_ptr<at::cuda::CUDAStream> futureNCCLCallbackStream)
+ std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents)
: at::ivalue::Future(c10::ListType::create(c10::TensorType::get())),
deviceIndex_(deviceIndex),
- cudaEvents_(cudaEvents),
- futureNCCLCallbackStream_(futureNCCLCallbackStream) {
+ cudaEvents_(std::move(cudaEvents)) {
TORCH_INTERNAL_ASSERT(
cudaEvents_->size() == 1,
"FutureNCCL only supports single-process single-device mode.");
@@ -307,16 +295,19 @@
// this callback. This new FutureNCCL's cudaEvents will record the
// callback's stream and will have the result value of the callback.
void addCallback(std::function<void(void)> callback) override {
+ // FIXME Should we find a way to allow to change the priority of streams?
+ at::cuda::CUDAStream stream =
+ at::cuda::getStreamFromPool(/*isHighPriority=*/false, deviceIndex_);
+
// Do not free the underlying data storage of value_ before its
- // usage on futureNCCLCallbackStream_ finish.
+ // usage on the stream finishes.
for (const at::DataPtr& data_ptr : extractDataPtrs(value_)) {
- c10::cuda::CUDACachingAllocator::recordStream(
- data_ptr, *futureNCCLCallbackStream_);
+ c10::cuda::CUDACachingAllocator::recordStream(data_ptr, stream);
}
- (*cudaEvents_)[0].block(*futureNCCLCallbackStream_);
+ (*cudaEvents_)[0].block(stream);
// Use the dedicated callback stream to run callback.
- c10::StreamGuard streamGuard{*futureNCCLCallbackStream_};
+ c10::StreamGuard streamGuard{stream};
callback();
}
@@ -326,14 +317,13 @@
c10::intrusive_ptr<Future> then(
std::function<at::IValue(void)> callback,
at::TypePtr /* unused */) override {
- // Create a new cudaEvents object of size 1 that will record
- // futureNCCLCallbackStream_ after callback and will be passed to the new
- // FutureNCCL.
+ // Create a new cudaEvents object of size 1 that will record the current
+ // stream after callback and will be passed to the new FutureNCCL.
auto thenFutCudaEvents =
std::make_shared<std::vector<at::cuda::CUDAEvent>>(1);
// Create a FutureNCCL without setting a value.
auto fut = c10::make_intrusive<FutureNCCL>(
- deviceIndex_, thenFutCudaEvents, futureNCCLCallbackStream_);
+ deviceIndex_, thenFutCudaEvents);
// The new future needs the DataPtr extractor when it gets marked complete
// but this might happen immediately inline or in parallel by another
// thread. In both these cases this would/might happen before the user has
@@ -385,7 +375,6 @@
at::IValue value_;
c10::DeviceIndex deviceIndex_;
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents_;
- std::shared_ptr<at::cuda::CUDAStream> futureNCCLCallbackStream_;
DataPtrExtractor dataPtrExtractor_;
std::mutex dataPtrExtractorMutex_;
c10::optional<FutureError> error_;
@@ -743,16 +732,6 @@
// set contains the string representation of ncclUniqueId.
std::unordered_set<std::string> abortedComms_;
- // In single-process single-device mode, WorkNCCL::getFuture is supported.
- // Depending on the device index of collective outputs, WorkNCCL will pass
- // the corresponding device's then callback stream to FutureNCCL.
- // We just inititalize futureNCCLCallbackStreams_ inside the constructor and
- // set its size to the total number of available devices and depending on the
- // device of the NCCL collective's outputs, we later set the callback stream
- // of the corresponding device inside ProcessGroupNCCL::getNCCLComm if not set
- // before.
- std::vector<std::shared_ptr<at::cuda::CUDAStream>> futureNCCLCallbackStreams_;
-
// Schedule NCCL operations on high priority CUDA streams.
bool isHighPriorityStream_ = false;