Remove USE_CUDA from c10d reducer/logger (#59562)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59562
Needed to merge c10d into libtorch(_cuda).
ghstack-source-id: 131169542
Test Plan: CI
Reviewed By: agolynski
Differential Revision: D28931378
fbshipit-source-id: 71376b862ff6ef7dbfa7331ec8d269bd3fcc7e0d
diff --git a/tools/build_variables.bzl b/tools/build_variables.bzl
index 718809f..0a9aa42 100644
--- a/tools/build_variables.bzl
+++ b/tools/build_variables.bzl
@@ -669,6 +669,7 @@
"torch/lib/c10d/comm.cpp",
"torch/lib/c10d/default_comm_hooks.cpp",
"torch/lib/c10d/reducer.cpp",
+ "torch/lib/c10d/reducer_cuda.cpp",
"torch/lib/c10d/logger.cpp",
"torch/csrc/distributed/c10d/python_comm_hook.cpp",
"torch/csrc/distributed/c10d/init.cpp",
diff --git a/torch/lib/c10d/logger.cpp b/torch/lib/c10d/logger.cpp
index 6cc4614..14a1295 100644
--- a/torch/lib/c10d/logger.cpp
+++ b/torch/lib/c10d/logger.cpp
@@ -9,12 +9,6 @@
// stats.
const int LoggingIterations[] = {10, 20, 100, 1000};
-namespace {
-
-const int kMilliSecondToNanosSecond = 1000000;
-
-} // anonymous namespace
-
std::ostream& operator<<(std::ostream& output, const Logger& logger) {
auto& ddp_logging_data = (*logger.ddp_logging_data_);
@@ -176,49 +170,22 @@
at::LogPyTorchDDPUsage(*ddp_logging_data_);
}
-void Logger::calculate_avg_cpu_time(
+void Logger::calculate_avg_time(
int64_t& avg_time,
int64_t& time_duration,
- int64_t cpu_start_time,
- int64_t cpu_end_time) {
- // If cpu_end_time is not recorded in this iteration,
- // avg_time will return invalid value.
- // For some cases like DDP runs on non-sync mode, backward compute
- // end time can not be recorded in this iteration and thus can not
- // calculate the valid avg_time.
- // In this case, skip calculating the avg_time and return.
+ Timer& timer,
+ Timer::Event start_event,
+ Timer::Event end_event) {
TORCH_CHECK(num_iterations_stats_recorded_ > 0);
- if (cpu_end_time < cpu_start_time) {
+ c10::optional<int64_t> maybe_time_duration = timer.measureDifference(start_event, end_event);
+ if (!maybe_time_duration.has_value()) {
return;
}
- time_duration = cpu_end_time - cpu_start_time;
+ time_duration = maybe_time_duration.value();
avg_time = (time_duration + avg_time * (num_iterations_stats_recorded_ - 1)) /
num_iterations_stats_recorded_;
}
-#ifdef USE_CUDA
-void Logger::calculate_avg_gpu_time(
- int64_t& avg_time,
- int64_t& time_duration,
- at::cuda::CUDAEvent& gpu_start,
- at::cuda::CUDAEvent& gpu_end) {
- TORCH_CHECK(num_iterations_stats_recorded_ > 0);
- float milliseconds = gpu_start.elapsed_time(gpu_end);
- // If gpu_end is not recorded in this iteration,
- // milliseconds will have invalid value.
- // For some cases like DDP runs on non-sync mode,
- // gpu_end can not be recorded in this iteration and thus can not
- // calculate the valid avg_time.
- // In this case, skip calculating the avg_time and return.
- if (milliseconds < 0) {
- return;
- }
- time_duration = int64_t(milliseconds * kMilliSecondToNanosSecond);
- avg_time = (time_duration + avg_time * (num_iterations_stats_recorded_ - 1)) /
- num_iterations_stats_recorded_;
-}
-#endif
-
void Logger::reset_performance_stats() {
ddp_logging_data_->ints_map["forward_compute_time"] = 0;
ddp_logging_data_->ints_map["backward_comm_time"] = 0;
@@ -260,85 +227,39 @@
reset_performance_stats();
- if (reducer_->replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
- // Cuda time stats are only collected for single device modules.
- if (reducer_->is_multi_device_module_) {
- TORCH_WARN_ONCE(
- "Cuda time stats are not collected for multi-device modules."
- );
- return;
- }
- // Check events on the replicas_[0][0].device().
- at::DeviceGuard g(reducer_->replicas_[0][0].device());
- // It is possible users did not call backward or run codes in
- // no-sync mode, in this case, some cudaEvents like "backward_compute_end"
- // or "backward_comm_start" or "backward_comm_end" will not be recorded.
- // cudaEvent is created when it is first time to be recorded.
- // If it is never recorded/created, skip synchronize and calculation.
- // Otherwise it will throw cuda errors.
- if (!reducer_->gpu_timer_.forward_start.isCreated() ||
- !reducer_->gpu_timer_.backward_compute_start.isCreated() ||
- !reducer_->gpu_timer_.backward_compute_end.isCreated() ||
- !reducer_->gpu_timer_.backward_comm_start.isCreated() ||
- !reducer_->gpu_timer_.backward_comm_end.isCreated()) {
- return;
- }
-
- // set_runtime_stats_and_log is called at the beginning of forward call,
- // when it is cheap to synchronize the cuda events of previous iteration,
- // as mostly all cuda operations are finished in previous iteration.
- reducer_->gpu_timer_.forward_start.synchronize();
- reducer_->gpu_timer_.backward_compute_start.synchronize();
- reducer_->gpu_timer_.backward_compute_end.synchronize();
- reducer_->gpu_timer_.backward_comm_start.synchronize();
- reducer_->gpu_timer_.backward_comm_end.synchronize();
- calculate_avg_gpu_time(
- ddp_logging_data_->ints_map["avg_forward_compute_time"],
- ddp_logging_data_->ints_map["forward_compute_time"],
- reducer_->gpu_timer_.forward_start,
- reducer_->gpu_timer_.backward_compute_start);
- calculate_avg_gpu_time(
- ddp_logging_data_->ints_map["avg_backward_compute_time"],
- ddp_logging_data_->ints_map["backward_compute_time"],
- reducer_->gpu_timer_.backward_compute_start,
- reducer_->gpu_timer_.backward_compute_end);
- calculate_avg_gpu_time(
- ddp_logging_data_->ints_map["avg_backward_comm_time"],
- ddp_logging_data_->ints_map["backward_comm_time"],
- reducer_->gpu_timer_.backward_comm_start,
- reducer_->gpu_timer_.backward_comm_end);
- calculate_avg_gpu_time(
- ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
- ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
- reducer_->gpu_timer_.backward_comm_start,
- reducer_->gpu_timer_.backward_compute_end);
-#endif
- } else {
- calculate_avg_cpu_time(
- ddp_logging_data_->ints_map["avg_forward_compute_time"],
- ddp_logging_data_->ints_map["forward_compute_time"],
- reducer_->cpu_timer_.forward_start_time,
- reducer_->cpu_timer_.backward_compute_start_time);
-
- calculate_avg_cpu_time(
- ddp_logging_data_->ints_map["avg_backward_compute_time"],
- ddp_logging_data_->ints_map["backward_compute_time"],
- reducer_->cpu_timer_.backward_compute_start_time,
- reducer_->cpu_timer_.backward_compute_end_time);
-
- calculate_avg_cpu_time(
- ddp_logging_data_->ints_map["avg_backward_comm_time"],
- ddp_logging_data_->ints_map["backward_comm_time"],
- reducer_->cpu_timer_.backward_comm_start_time,
- reducer_->cpu_timer_.backward_comm_end_time);
-
- calculate_avg_cpu_time(
- ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
- ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
- reducer_->cpu_timer_.backward_comm_start_time,
- reducer_->cpu_timer_.backward_compute_end_time);
+ // Cuda time stats are only collected for single device modules.
+ if (reducer_->replicas_[0][0].is_cuda() && reducer_->is_multi_device_module_) {
+ TORCH_WARN_ONCE(
+ "Cuda time stats are not collected for multi-device modules."
+ );
+ return;
}
+ TORCH_INTERNAL_ASSERT(reducer_->timer_);
+ calculate_avg_time(
+ ddp_logging_data_->ints_map["avg_forward_compute_time"],
+ ddp_logging_data_->ints_map["forward_compute_time"],
+ *reducer_->timer_,
+ Timer::Event::kForwardStart,
+ Timer::Event::kBackwardComputeStart);
+ calculate_avg_time(
+ ddp_logging_data_->ints_map["avg_backward_compute_time"],
+ ddp_logging_data_->ints_map["backward_compute_time"],
+ *reducer_->timer_,
+ Timer::Event::kBackwardComputeStart,
+ Timer::Event::kBackwardComputeEnd);
+ calculate_avg_time(
+ ddp_logging_data_->ints_map["avg_backward_comm_time"],
+ ddp_logging_data_->ints_map["backward_comm_time"],
+ *reducer_->timer_,
+ Timer::Event::kBackwardCommStart,
+ Timer::Event::kBackwardCommEnd);
+ calculate_avg_time(
+ ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
+ ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
+ *reducer_->timer_,
+ Timer::Event::kBackwardCommStart,
+ Timer::Event::kBackwardComputeEnd);
+
// Log runtime stats to stderr if TORCH_DISTRIBUTED_DEBUG=DETAIL is enabled.
if (parseDistDebugLevel() == DistributedDebugLevel::DETAIL) {
LOG(INFO) << *this;
diff --git a/torch/lib/c10d/logger.hpp b/torch/lib/c10d/logger.hpp
index fe40026..1895e0a 100644
--- a/torch/lib/c10d/logger.hpp
+++ b/torch/lib/c10d/logger.hpp
@@ -41,18 +41,12 @@
// Calculate avg stats using cpu timer and gpu timer
// that has been recorded in reducer.
- void calculate_avg_cpu_time(
+ void calculate_avg_time(
int64_t& avg_time,
int64_t& time_duration,
- int64_t cpu_start_time,
- int64_t cpu_end_time);
-#ifdef USE_CUDA
- void calculate_avg_gpu_time(
- int64_t& avg_time,
- int64_t& time_duration,
- at::cuda::CUDAEvent& gpu_start,
- at::cuda::CUDAEvent& gpu_end);
-#endif
+ Timer& timer,
+ Timer::Event start_event,
+ Timer::Event end_event);
// Set stats that can be collected only during
// training loop. It is called at the beginning of forward call
// to record the run time stats of sampled iterations that previouly ran.
diff --git a/torch/lib/c10d/reducer.cpp b/torch/lib/c10d/reducer.cpp
index 383c91b..28312b6 100644
--- a/torch/lib/c10d/reducer.cpp
+++ b/torch/lib/c10d/reducer.cpp
@@ -38,6 +38,67 @@
} // namespace
+C10_DEFINE_TYPED_REGISTRY(TimerRegistry, c10::DeviceType, Timer, std::unique_ptr, c10::Device);
+
+namespace {
+
+class CpuTimer : public Timer {
+ private:
+ // The timestamp of forward call start time in each iteration.
+ int64_t forward_start_time = -1;
+ // The timestamp of backward computation start and end time in each
+ // iteration.
+ int64_t backward_compute_start_time = -1;
+ int64_t backward_compute_end_time = -1;
+ // The timestamp of first communication call start time in each iteration.
+ int64_t backward_comm_start_time = -1;
+ // The timestamp of last communication call end time in each iteration.
+ int64_t backward_comm_end_time = -1;
+
+ int64_t& getTime(Event event) {
+ switch (event) {
+ case Event::kForwardStart:
+ return forward_start_time;
+ case Event::kBackwardComputeStart:
+ return backward_compute_start_time;
+ case Event::kBackwardComputeEnd:
+ return backward_compute_end_time;
+ case Event::kBackwardCommStart:
+ return backward_comm_start_time;
+ case Event::kBackwardCommEnd:
+ return backward_comm_end_time;
+ default:
+ TORCH_INTERNAL_ASSERT(false);
+ }
+ }
+
+ public:
+ explicit CpuTimer(c10::Device /* unused */) {}
+
+ void record(Event event) override {
+ getTime(event) = current_time_in_nanos();
+ }
+
+ c10::optional<int64_t> measureDifference(Event start, Event end) override {
+ int64_t start_time = getTime(start);
+ int64_t end_time = getTime(end);
+ // If cpu_end_time is not recorded in this iteration,
+ // avg_time will return invalid value.
+ // For some cases like DDP runs on non-sync mode, backward compute
+ // end time can not be recorded in this iteration and thus can not
+ // calculate the valid avg_time.
+ // In this case, skip calculating the avg_time and return.
+ if (end_time < start_time) {
+ return c10::nullopt;
+ }
+ return end_time - start_time;
+ }
+};
+
+C10_REGISTER_TYPED_CLASS(TimerRegistry, c10::kCPU, CpuTimer);
+
+} // namespace
+
Reducer::Reducer(
std::vector<std::vector<at::Tensor>> replicas,
std::vector<std::vector<size_t>> bucket_indices,
@@ -88,6 +149,12 @@
}
}
+ // For CUDA, record events only for single device module.
+ c10::Device device = replicas_[0][0].device();
+ if (!(device.is_cuda() && is_multi_device_module_)) {
+ timer_ = TimerRegistry()->Create(device.type(), device);
+ }
+
// If `expect_sparse_gradients` is not specified, initialize it such that
// we do not expect sparse gradients for any parameter.
if (expect_sparse_gradients_.empty()) {
@@ -724,7 +791,7 @@
checkAndRaiseMarkedTwiceError(variable_index);
perIterationReadyParams_.insert(variable_index);
backward_stats_[0][variable_index] =
- current_time_in_nanos() - cpu_timer_.backward_compute_start_time;
+ current_time_in_nanos() - backward_compute_start_time_;
// Any time we mark a variable ready (be it in line due to unused parameters,
// or via an autograd hook), we require a call to the finalize function. If
@@ -1180,7 +1247,7 @@
const std::vector<torch::autograd::Variable>& outputs) {
std::lock_guard<std::mutex> lock(mutex_);
- cpu_timer_.backward_compute_start_time = current_time_in_nanos();
+ backward_compute_start_time_ = current_time_in_nanos();
if (should_collect_runtime_stats()) {
record_backward_compute_start_time();
}
@@ -1723,72 +1790,32 @@
}
void Reducer::record_forward_compute_start_time() {
- if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
- // Record event only for single device module.
- if (!is_multi_device_module_) {
- // Create and record event on the replicas_[0][0].device().
- at::DeviceGuard g(replicas_[0][0].device());
- gpu_timer_.forward_start.record();
- }
-#endif
- } else {
- cpu_timer_.forward_start_time = current_time_in_nanos();
+ if (timer_) {
+ timer_->record(Timer::Event::kForwardStart);
}
}
void Reducer::record_backward_compute_start_time() {
- if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
- // Record event only for single device module.
- if (!is_multi_device_module_) {
- // Create and record event on the replicas_[0][0].device().
- at::DeviceGuard g(replicas_[0][0].device());
- gpu_timer_.backward_compute_start.record();
- }
-#endif
+ if (timer_) {
+ timer_->record(Timer::Event::kBackwardComputeStart);
}
}
void Reducer::record_backward_compute_end_time() {
- if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
- // Record event only for single device module.
- if (!is_multi_device_module_) {
- at::DeviceGuard g(replicas_[0][0].device());
- gpu_timer_.backward_compute_end.record();
- }
-#endif
- } else {
- cpu_timer_.backward_compute_end_time = current_time_in_nanos();
+ if (timer_) {
+ timer_->record(Timer::Event::kBackwardComputeEnd);
}
}
void Reducer::record_backward_comm_start_time() {
- if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
- // Record event only for single device module
- if (!is_multi_device_module_) {
- at::DeviceGuard g(replicas_[0][0].device());
- gpu_timer_.backward_comm_start.record();
- }
-#endif
- } else {
- cpu_timer_.backward_comm_start_time = current_time_in_nanos();
+ if (timer_) {
+ timer_->record(Timer::Event::kBackwardCommStart);
}
}
void Reducer::record_backward_comm_end_time() {
- if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
- // Record event only for single device module.
- if (!is_multi_device_module_) {
- at::DeviceGuard g(replicas_[0][0].device());
- gpu_timer_.backward_comm_end.record();
- }
-#endif
- } else {
- cpu_timer_.backward_comm_end_time = current_time_in_nanos();
+ if (timer_) {
+ timer_->record(Timer::Event::kBackwardCommEnd);
}
}
diff --git a/torch/lib/c10d/reducer.hpp b/torch/lib/c10d/reducer.hpp
index 72d925c..9d03808 100644
--- a/torch/lib/c10d/reducer.hpp
+++ b/torch/lib/c10d/reducer.hpp
@@ -1,9 +1,6 @@
#pragma once
#include <atomic>
-#ifdef USE_CUDA
-#include <ATen/cuda/CUDAEvent.h>
-#endif
#include <memory>
#include <mutex>
#include <tuple>
@@ -29,6 +26,28 @@
// Forward declaration
class Logger;
+class Timer {
+ public:
+ enum class Event {
+ kForwardStart,
+ kBackwardComputeStart,
+ kBackwardComputeEnd,
+ kBackwardCommStart,
+ kBackwardCommEnd,
+ };
+
+ // Record the current event, i.e., mark it as having occurred now.
+ virtual void record(Event event) = 0;
+
+ // Return the difference between when two events occurred, in nanoseconds.
+ // Or nullopt if one of them hasn't been recorded.
+ virtual c10::optional<int64_t> measureDifference(Event start, Event end) = 0;
+
+ virtual ~Timer() = default;
+};
+
+C10_DECLARE_TYPED_REGISTRY(TimerRegistry, c10::DeviceType, Timer, std::unique_ptr, c10::Device);
+
class Reducer {
public:
// The constructor takes a list of variables for every model replica.
@@ -335,37 +354,9 @@
// communication calls like allReduce or communication hooks.
int num_buckets_ready_;
- // CPU timestamp to record event start and end time.
- struct CPUTimer {
- // The timestamp of forward call start time in each iteration.
- int64_t forward_start_time;
- // The timestamp of backward computation start and end time in each
- // iteration.
- int64_t backward_compute_start_time;
- int64_t backward_compute_end_time;
- // The timestamp of first communication call start time in each iteration.
- int64_t backward_comm_start_time;
- // The timestamp of last communication call end time in each iteration.
- int64_t backward_comm_end_time;
- };
-
- CPUTimer cpu_timer_{};
-
-#ifdef USE_CUDA
- // GPU events to record event start and end time.
- struct GPUTimer {
- at::cuda::CUDAEvent forward_start = at::cuda::CUDAEvent(cudaEventDefault);
- at::cuda::CUDAEvent backward_compute_start =
- at::cuda::CUDAEvent(cudaEventDefault);
- at::cuda::CUDAEvent backward_compute_end =
- at::cuda::CUDAEvent(cudaEventDefault);
- at::cuda::CUDAEvent backward_comm_start =
- at::cuda::CUDAEvent(cudaEventDefault);
- at::cuda::CUDAEvent backward_comm_end =
- at::cuda::CUDAEvent(cudaEventDefault);
- };
- GPUTimer gpu_timer_;
-#endif
+ // Timing information.
+ int64_t backward_compute_start_time_ = -1;
+ std::unique_ptr<Timer> timer_;
// We collect the relative timestamp of every gradient being ready
// when executing autograd. This can be used to derive a timeline of
diff --git a/torch/lib/c10d/reducer_cuda.cpp b/torch/lib/c10d/reducer_cuda.cpp
new file mode 100644
index 0000000..0f55b5a
--- /dev/null
+++ b/torch/lib/c10d/reducer_cuda.cpp
@@ -0,0 +1,89 @@
+#include <c10d/reducer.hpp>
+
+#ifdef USE_CUDA
+
+#include <c10/core/DeviceGuard.h>
+#include <ATen/cuda/CUDAEvent.h>
+
+namespace c10d {
+namespace {
+
+const int kMilliSecondToNanosSecond = 1000000;
+
+class CudaTimer : public Timer {
+ private:
+ c10::Device device;
+
+ at::cuda::CUDAEvent forward_start = at::cuda::CUDAEvent(cudaEventDefault);
+ at::cuda::CUDAEvent backward_compute_start =
+ at::cuda::CUDAEvent(cudaEventDefault);
+ at::cuda::CUDAEvent backward_compute_end =
+ at::cuda::CUDAEvent(cudaEventDefault);
+ at::cuda::CUDAEvent backward_comm_start =
+ at::cuda::CUDAEvent(cudaEventDefault);
+ at::cuda::CUDAEvent backward_comm_end =
+ at::cuda::CUDAEvent(cudaEventDefault);
+
+ at::cuda::CUDAEvent& getEvent(Event event) {
+ switch (event) {
+ case Event::kForwardStart:
+ return forward_start;
+ case Event::kBackwardComputeStart:
+ return backward_compute_start;
+ case Event::kBackwardComputeEnd:
+ return backward_compute_end;
+ case Event::kBackwardCommStart:
+ return backward_comm_start;
+ case Event::kBackwardCommEnd:
+ return backward_comm_end;
+ default:
+ TORCH_INTERNAL_ASSERT(false);
+ }
+ }
+
+ public:
+ explicit CudaTimer(c10::Device dev) : device(dev) {}
+
+ void record(Event event) override {
+ c10::DeviceGuard g(device);
+ getEvent(event).record();
+ }
+
+ c10::optional<int64_t> measureDifference(Event start, Event end) override {
+ c10::DeviceGuard g(device);
+ at::cuda::CUDAEvent& start_event = getEvent(start);
+ at::cuda::CUDAEvent& end_event = getEvent(end);
+ // It is possible users did not call backward or run codes in
+ // no-sync mode, in this case, some cudaEvents like "backward_compute_end"
+ // or "backward_comm_start" or "backward_comm_end" will not be recorded.
+ // cudaEvent is created when it is first time to be recorded.
+ // If it is never recorded/created, skip synchronize and calculation.
+ // Otherwise it will throw cuda errors.
+ if (!start_event.isCreated() || !end_event.isCreated()) {
+ return c10::nullopt;
+ }
+ // set_runtime_stats_and_log is called at the beginning of forward call,
+ // when it is cheap to synchronize the cuda events of previous iteration,
+ // as mostly all cuda operations are finished in previous iteration.
+ start_event.synchronize();
+ end_event.synchronize();
+ float milliseconds = start_event.elapsed_time(end_event);
+ // If gpu_end is not recorded in this iteration,
+ // milliseconds will have invalid value.
+ // For some cases like DDP runs on non-sync mode,
+ // gpu_end can not be recorded in this iteration and thus can not
+ // calculate the valid avg_time.
+ // In this case, skip calculating the avg_time and return.
+ if (milliseconds < 0) {
+ return c10::nullopt;
+ }
+ return int64_t(milliseconds * kMilliSecondToNanosSecond);
+ }
+};
+
+C10_REGISTER_TYPED_CLASS(TimerRegistry, c10::kCUDA, CudaTimer);
+
+} // namespace
+} // namespace c10d
+
+#endif