[C10d] Add PG::enableCollectivesTiming to make it dynamically enabled. (#108814)
Collectives timing gates the tracking when a collective starts on device.
Currently it's enabled by set the NCCL_ENABLE_TIMING env var.
The goal of this PR is to make it possible to dynamically enable that flag so users of the PG hooks don't have to set that flag in order to have their hooks work.
The design is that once set, all new collectives will have such behavior so we track it on each Work object.
We make enableTiming_ atomic in PGNCCL to avoid races on non-TSO hardware.
To ensure consistency, we copy its value during Work construction and replace all previous usage of enableTiming_ from the PG with usages from the Work, which now has an immutable value.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/108814
Approved by: https://github.com/wconstab, https://github.com/fduwjj
ghstack dependencies: #108813
diff --git a/torch/csrc/distributed/c10d/Backend.hpp b/torch/csrc/distributed/c10d/Backend.hpp
index 7eb42cf..3b67594 100644
--- a/torch/csrc/distributed/c10d/Backend.hpp
+++ b/torch/csrc/distributed/c10d/Backend.hpp
@@ -342,6 +342,14 @@
" backend.");
}
+ virtual void enableCollectivesTiming() {
+ TORCH_CHECK(
+ false,
+ "Backend ",
+ getBackendName(),
+ " is missing implementation of enableCollectivesTiming.");
+ }
+
bool hasHooks() const {
return onCompletionHook_ != nullptr;
}
diff --git a/torch/csrc/distributed/c10d/ProcessGroup.cpp b/torch/csrc/distributed/c10d/ProcessGroup.cpp
index 956965c..ffec42a 100644
--- a/torch/csrc/distributed/c10d/ProcessGroup.cpp
+++ b/torch/csrc/distributed/c10d/ProcessGroup.cpp
@@ -165,4 +165,10 @@
}
}
+void ProcessGroup::enableCollectivesTiming() {
+ for (auto& kv : deviceTypeToBackend_) {
+ kv.second->enableCollectivesTiming();
+ }
+}
+
} // namespace c10d
diff --git a/torch/csrc/distributed/c10d/ProcessGroup.hpp b/torch/csrc/distributed/c10d/ProcessGroup.hpp
index f35ac38..114d681 100644
--- a/torch/csrc/distributed/c10d/ProcessGroup.hpp
+++ b/torch/csrc/distributed/c10d/ProcessGroup.hpp
@@ -680,6 +680,7 @@
const std::string& getGroupName() const;
void setGroupName(const std::string& name);
+ void enableCollectivesTiming();
protected:
// Implementations of this interface need to call this to setup
diff --git a/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp b/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
index 398808b..e11ec59 100644
--- a/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
+++ b/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
@@ -2901,6 +2901,10 @@
return sequenceNum_->get();
}
+void ProcessGroupGloo::enableCollectivesTiming() {
+ // Nothing to do to enable timing
+}
+
} // namespace c10d
#endif // USE_C10D_GLOO
diff --git a/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp b/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp
index 215f8ea..d02f007 100644
--- a/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp
+++ b/torch/csrc/distributed/c10d/ProcessGroupGloo.hpp
@@ -341,6 +341,8 @@
c10::intrusive_ptr<Work> barrier(
const BarrierOptions& opts = BarrierOptions()) override;
+ void enableCollectivesTiming() override;
+
const std::unique_ptr<::gloo::rendezvous::Store>& _getStore() const {
return store_;
}
diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
index 26d8b17..23fdccf 100644
--- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
+++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
@@ -319,7 +319,8 @@
: Work(rank, opType, profilingTitle, inputs),
devices_(devices),
workStartTime_(std::chrono::steady_clock::now()),
- seq_(seq) {
+ seq_(seq),
+ timingEnabled_(enableTiming) {
// Creates the CUDA event wrappers
// Note: The actual events are lazily created when first recorded to with
// DEFAULT_FLAGS = cudaEventDisableTiming.
@@ -353,7 +354,8 @@
startTraceUpdated_(w.startTraceUpdated_),
numelIn_(w.numelIn_),
numelOut_(w.numelOut_),
- store_(w.store_) {
+ store_(w.store_),
+ timingEnabled_(w.timingEnabled_) {
exception_ = w.exception_;
}
@@ -408,6 +410,10 @@
bool ProcessGroupNCCL::WorkNCCL::startedGPUExecutionInternal() const {
try {
+ // if timing is disabled we won't have allocated start events
+ if (!timingEnabled_) {
+ return false;
+ }
for (const auto i : c10::irange(devices_.size())) {
// Checking the work's corresponding CUDA events' status
if (!(*ncclStartEvents_)[i].query()) {
@@ -641,7 +647,7 @@
desyncDebug_ = parseEnvVarFlag(NCCL_DESYNC_DEBUG) ||
(dist_debug_level_ >= DebugLevel::Detail);
#ifdef ENABLE_NCCL_ERROR_CHECKING
- enableTiming_ = parseEnvVarFlag(NCCL_ENABLE_TIMING) || desyncDebug_;
+ enableTiming_.store(parseEnvVarFlag(NCCL_ENABLE_TIMING) || desyncDebug_);
#endif
avoidRecordStreams_ = parseEnvVarFlag(TORCH_NCCL_AVOID_RECORD_STREAMS);
@@ -686,7 +692,7 @@
LOG(INFO) << "[Rank " << rank_ << "] ProcessGroupNCCL initialization options:"
<< "NCCL_ASYNC_ERROR_HANDLING: " << asyncErrorHandling_
<< ", NCCL_DESYNC_DEBUG: " << desyncDebug_
- << ", NCCL_ENABLE_TIMING: " << enableTiming_
+ << ", NCCL_ENABLE_TIMING: " << enableTiming_.load()
<< ", NCCL_BLOCKING_WAIT: " << blockingWait_
<< ", TIMEOUT(ms): " << options_->timeout.count()
<< ", USE_HIGH_PRIORITY_STREAM: "
@@ -802,7 +808,7 @@
"ProcessGroupNCCL OnCompletion hook already registered");
TORCH_CHECK(
- enableTiming_,
+ enableTiming_.load(),
"ProcessGroupNCCL OnCompletion hook requires recording start and end "
"events which require setting NCCL_ENABLE_TIMING environment variable. "
"This is only available for NCCL version >= 2.4.");
@@ -847,6 +853,9 @@
}
}
+void ProcessGroupNCCL::enableCollectivesTiming() {
+ enableTiming_.store(true);
+}
void abortCommsFromMap(
std::unordered_map<std::string, std::vector<std::shared_ptr<NCCLComm>>>&
ncclCommsMap,
@@ -1552,7 +1561,7 @@
profilingTitle,
inputs,
desyncDebug_,
- enableTiming_);
+ enableTiming_.load());
}
std::vector<at::Tensor> ProcessGroupNCCL::WorkNCCL::result() {
@@ -1565,6 +1574,7 @@
}
float ProcessGroupNCCL::WorkNCCL::getDuration() const {
+ TORCH_CHECK(timingEnabled_, "getDuration only works if timing was enabled")
TORCH_CHECK(
ncclStartEvents_->size() == 1,
"getDuration only works for single device per ProcessGroup.");
@@ -1726,7 +1736,7 @@
at::cuda::OptionalCUDAGuard gpuGuard;
// Start event should only be recorded before the ncclGroupStart()
- if (enableTiming_) {
+ if (work->timingEnabled_) {
for (const auto i : c10::irange(devices.size())) {
at::cuda::CUDAStream& ncclStream = ncclStreams[i];
(*work->ncclStartEvents_)[i].record(ncclStream);
@@ -1901,7 +1911,7 @@
at::cuda::OptionalCUDAGuard gpuGuard;
// Start event should only be recorded before the ncclGroupStart()
- if (enableTiming_) {
+ if (work->timingEnabled_) {
for (const auto i : c10::irange(tensors.size())) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
(*work->ncclStartEvents_)[i].record(ncclStream);
diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
index ec21dd1..e0cd44d 100644
--- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
+++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
@@ -267,6 +267,7 @@
// The future returned by getFuture.
c10::intrusive_ptr<at::ivalue::Future> future_;
+ bool timingEnabled_;
friend class ProcessGroupNCCL;
};
@@ -482,6 +483,8 @@
std::function<void(std::shared_ptr<WorkInfo>)>&& hook) override;
void waitForPendingWorks() override;
+ void enableCollectivesTiming() override;
+
// Tests if the UCC fallback path is available
bool isUCCAvailable() const;
@@ -759,7 +762,7 @@
// Whether or not to create start CUDAEvent and enable timing for start
// and end events. Note that enableTiming_ is always true if desyncDebug_
// is set to true.
- bool enableTiming_;
+ std::atomic<bool> enableTiming_;
// Whether or not TORCH_NCCL_AVOID_RECORD_STREAMS was set
bool avoidRecordStreams_ = false;
diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp
index 97c4660..1896f15 100644
--- a/torch/csrc/distributed/c10d/init.cpp
+++ b/torch/csrc/distributed/c10d/init.cpp
@@ -1758,6 +1758,11 @@
&::c10d::ProcessGroup::hasHooks,
py::call_guard<py::gil_scoped_acquire>())
.def(
+ "_enable_collectives_timing",
+ &::c10d::ProcessGroup::enableCollectivesTiming,
+ py::call_guard<py::gil_scoped_acquire>(),
+ "Enable timing of collectives by all backends. This might incur in additional overhead.")
+ .def(
"_set_group_name",
&::c10d::ProcessGroup::setGroupName,
py::call_guard<py::gil_scoped_acquire>(),