Remove USE_CUDA from c10d reducer/logger (#59562)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/59562

Needed to merge c10d into libtorch(_cuda).

ghstack-source-id: 131169542

Test Plan: CI

Reviewed By: agolynski

Differential Revision: D28931378

fbshipit-source-id: 71376b862ff6ef7dbfa7331ec8d269bd3fcc7e0d
diff --git a/tools/build_variables.bzl b/tools/build_variables.bzl
index 718809f..0a9aa42 100644
--- a/tools/build_variables.bzl
+++ b/tools/build_variables.bzl
@@ -669,6 +669,7 @@
     "torch/lib/c10d/comm.cpp",
     "torch/lib/c10d/default_comm_hooks.cpp",
     "torch/lib/c10d/reducer.cpp",
+    "torch/lib/c10d/reducer_cuda.cpp",
     "torch/lib/c10d/logger.cpp",
     "torch/csrc/distributed/c10d/python_comm_hook.cpp",
     "torch/csrc/distributed/c10d/init.cpp",
diff --git a/torch/lib/c10d/logger.cpp b/torch/lib/c10d/logger.cpp
index 6cc4614..14a1295 100644
--- a/torch/lib/c10d/logger.cpp
+++ b/torch/lib/c10d/logger.cpp
@@ -9,12 +9,6 @@
 // stats.
 const int LoggingIterations[] = {10, 20, 100, 1000};
 
-namespace {
-
-const int kMilliSecondToNanosSecond = 1000000;
-
-} // anonymous namespace
-
 std::ostream& operator<<(std::ostream& output, const Logger& logger) {
   auto& ddp_logging_data = (*logger.ddp_logging_data_);
 
@@ -176,49 +170,22 @@
   at::LogPyTorchDDPUsage(*ddp_logging_data_);
 }
 
-void Logger::calculate_avg_cpu_time(
+void Logger::calculate_avg_time(
     int64_t& avg_time,
     int64_t& time_duration,
-    int64_t cpu_start_time,
-    int64_t cpu_end_time) {
-  // If cpu_end_time is not recorded in this iteration,
-  // avg_time will return invalid value.
-  // For some cases like DDP runs on non-sync mode, backward compute
-  // end time can not be recorded in this iteration and thus can not
-  // calculate the valid avg_time.
-  // In this case, skip calculating the avg_time and return.
+    Timer& timer,
+    Timer::Event start_event,
+    Timer::Event end_event) {
   TORCH_CHECK(num_iterations_stats_recorded_ > 0);
-  if (cpu_end_time < cpu_start_time) {
+  c10::optional<int64_t> maybe_time_duration = timer.measureDifference(start_event, end_event);
+  if (!maybe_time_duration.has_value()) {
     return;
   }
-  time_duration = cpu_end_time - cpu_start_time;
+  time_duration = maybe_time_duration.value();
   avg_time = (time_duration + avg_time * (num_iterations_stats_recorded_ - 1)) /
       num_iterations_stats_recorded_;
 }
 
-#ifdef USE_CUDA
-void Logger::calculate_avg_gpu_time(
-    int64_t& avg_time,
-    int64_t& time_duration,
-    at::cuda::CUDAEvent& gpu_start,
-    at::cuda::CUDAEvent& gpu_end) {
-  TORCH_CHECK(num_iterations_stats_recorded_ > 0);
-  float milliseconds = gpu_start.elapsed_time(gpu_end);
-  // If gpu_end is not recorded in this iteration,
-  // milliseconds will have invalid value.
-  // For some cases like DDP runs on non-sync mode,
-  // gpu_end can not be recorded in this iteration and thus can not
-  // calculate the valid avg_time.
-  // In this case, skip calculating the avg_time and return.
-  if (milliseconds < 0) {
-    return;
-  }
-  time_duration = int64_t(milliseconds * kMilliSecondToNanosSecond);
-  avg_time = (time_duration + avg_time * (num_iterations_stats_recorded_ - 1)) /
-      num_iterations_stats_recorded_;
-}
-#endif
-
 void Logger::reset_performance_stats() {
   ddp_logging_data_->ints_map["forward_compute_time"] = 0;
   ddp_logging_data_->ints_map["backward_comm_time"] = 0;
@@ -260,85 +227,39 @@
 
   reset_performance_stats();
 
-  if (reducer_->replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
-    // Cuda time stats are only collected for single device modules.
-    if (reducer_->is_multi_device_module_) {
-      TORCH_WARN_ONCE(
-        "Cuda time stats are not collected for multi-device modules."
-      );
-      return;
-    }
-    // Check events on the replicas_[0][0].device().
-    at::DeviceGuard g(reducer_->replicas_[0][0].device());
-    // It is possible users did not call backward or run codes in
-    // no-sync mode, in this case, some cudaEvents like "backward_compute_end"
-    // or "backward_comm_start" or "backward_comm_end" will not be recorded.
-    // cudaEvent is created when it is first time to be recorded.
-    // If it is never recorded/created, skip synchronize and calculation.
-    // Otherwise it will throw cuda errors.
-    if (!reducer_->gpu_timer_.forward_start.isCreated() ||
-        !reducer_->gpu_timer_.backward_compute_start.isCreated() ||
-        !reducer_->gpu_timer_.backward_compute_end.isCreated() ||
-        !reducer_->gpu_timer_.backward_comm_start.isCreated() ||
-        !reducer_->gpu_timer_.backward_comm_end.isCreated()) {
-      return;
-    }
-
-    // set_runtime_stats_and_log is called at the beginning of forward call,
-    // when it is cheap to synchronize the cuda events of previous iteration,
-    // as mostly all cuda operations are finished in previous iteration.
-    reducer_->gpu_timer_.forward_start.synchronize();
-    reducer_->gpu_timer_.backward_compute_start.synchronize();
-    reducer_->gpu_timer_.backward_compute_end.synchronize();
-    reducer_->gpu_timer_.backward_comm_start.synchronize();
-    reducer_->gpu_timer_.backward_comm_end.synchronize();
-    calculate_avg_gpu_time(
-        ddp_logging_data_->ints_map["avg_forward_compute_time"],
-        ddp_logging_data_->ints_map["forward_compute_time"],
-        reducer_->gpu_timer_.forward_start,
-        reducer_->gpu_timer_.backward_compute_start);
-    calculate_avg_gpu_time(
-        ddp_logging_data_->ints_map["avg_backward_compute_time"],
-        ddp_logging_data_->ints_map["backward_compute_time"],
-        reducer_->gpu_timer_.backward_compute_start,
-        reducer_->gpu_timer_.backward_compute_end);
-    calculate_avg_gpu_time(
-        ddp_logging_data_->ints_map["avg_backward_comm_time"],
-        ddp_logging_data_->ints_map["backward_comm_time"],
-        reducer_->gpu_timer_.backward_comm_start,
-        reducer_->gpu_timer_.backward_comm_end);
-    calculate_avg_gpu_time(
-        ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
-        ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
-        reducer_->gpu_timer_.backward_comm_start,
-        reducer_->gpu_timer_.backward_compute_end);
-#endif
-  } else {
-    calculate_avg_cpu_time(
-        ddp_logging_data_->ints_map["avg_forward_compute_time"],
-        ddp_logging_data_->ints_map["forward_compute_time"],
-        reducer_->cpu_timer_.forward_start_time,
-        reducer_->cpu_timer_.backward_compute_start_time);
-
-    calculate_avg_cpu_time(
-        ddp_logging_data_->ints_map["avg_backward_compute_time"],
-        ddp_logging_data_->ints_map["backward_compute_time"],
-        reducer_->cpu_timer_.backward_compute_start_time,
-        reducer_->cpu_timer_.backward_compute_end_time);
-
-    calculate_avg_cpu_time(
-        ddp_logging_data_->ints_map["avg_backward_comm_time"],
-        ddp_logging_data_->ints_map["backward_comm_time"],
-        reducer_->cpu_timer_.backward_comm_start_time,
-        reducer_->cpu_timer_.backward_comm_end_time);
-
-    calculate_avg_cpu_time(
-        ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
-        ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
-        reducer_->cpu_timer_.backward_comm_start_time,
-        reducer_->cpu_timer_.backward_compute_end_time);
+  // Cuda time stats are only collected for single device modules.
+  if (reducer_->replicas_[0][0].is_cuda() && reducer_->is_multi_device_module_) {
+    TORCH_WARN_ONCE(
+      "Cuda time stats are not collected for multi-device modules."
+    );
+    return;
   }
+  TORCH_INTERNAL_ASSERT(reducer_->timer_);
+  calculate_avg_time(
+      ddp_logging_data_->ints_map["avg_forward_compute_time"],
+      ddp_logging_data_->ints_map["forward_compute_time"],
+      *reducer_->timer_,
+      Timer::Event::kForwardStart,
+      Timer::Event::kBackwardComputeStart);
+  calculate_avg_time(
+      ddp_logging_data_->ints_map["avg_backward_compute_time"],
+      ddp_logging_data_->ints_map["backward_compute_time"],
+      *reducer_->timer_,
+      Timer::Event::kBackwardComputeStart,
+      Timer::Event::kBackwardComputeEnd);
+  calculate_avg_time(
+      ddp_logging_data_->ints_map["avg_backward_comm_time"],
+      ddp_logging_data_->ints_map["backward_comm_time"],
+      *reducer_->timer_,
+      Timer::Event::kBackwardCommStart,
+      Timer::Event::kBackwardCommEnd);
+  calculate_avg_time(
+      ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
+      ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
+      *reducer_->timer_,
+      Timer::Event::kBackwardCommStart,
+      Timer::Event::kBackwardComputeEnd);
+
   // Log runtime stats to stderr if TORCH_DISTRIBUTED_DEBUG=DETAIL is enabled.
   if (parseDistDebugLevel() == DistributedDebugLevel::DETAIL) {
     LOG(INFO) << *this;
diff --git a/torch/lib/c10d/logger.hpp b/torch/lib/c10d/logger.hpp
index fe40026..1895e0a 100644
--- a/torch/lib/c10d/logger.hpp
+++ b/torch/lib/c10d/logger.hpp
@@ -41,18 +41,12 @@
 
   // Calculate avg stats using cpu timer and gpu timer
   // that has been recorded in reducer.
-  void calculate_avg_cpu_time(
+  void calculate_avg_time(
       int64_t& avg_time,
       int64_t& time_duration,
-      int64_t cpu_start_time,
-      int64_t cpu_end_time);
-#ifdef USE_CUDA
-  void calculate_avg_gpu_time(
-      int64_t& avg_time,
-      int64_t& time_duration,
-      at::cuda::CUDAEvent& gpu_start,
-      at::cuda::CUDAEvent& gpu_end);
-#endif
+      Timer& timer,
+      Timer::Event start_event,
+      Timer::Event end_event);
   // Set stats that can be collected only during
   // training loop. It is called at the beginning of forward call
   // to record the run time stats of sampled iterations that previouly ran.
diff --git a/torch/lib/c10d/reducer.cpp b/torch/lib/c10d/reducer.cpp
index 383c91b..28312b6 100644
--- a/torch/lib/c10d/reducer.cpp
+++ b/torch/lib/c10d/reducer.cpp
@@ -38,6 +38,67 @@
 
 } // namespace
 
+C10_DEFINE_TYPED_REGISTRY(TimerRegistry, c10::DeviceType, Timer, std::unique_ptr, c10::Device);
+
+namespace {
+
+class CpuTimer : public Timer {
+ private:
+  // The timestamp of forward call start time in each iteration.
+  int64_t forward_start_time = -1;
+  // The timestamp of backward computation start and end time in each
+  // iteration.
+  int64_t backward_compute_start_time = -1;
+  int64_t backward_compute_end_time = -1;
+  // The timestamp of first communication call start time in each iteration.
+  int64_t backward_comm_start_time = -1;
+  // The timestamp of last communication call end time in each iteration.
+  int64_t backward_comm_end_time = -1;
+
+  int64_t& getTime(Event event) {
+    switch (event) {
+      case Event::kForwardStart:
+        return forward_start_time;
+      case Event::kBackwardComputeStart:
+        return backward_compute_start_time;
+      case Event::kBackwardComputeEnd:
+        return backward_compute_end_time;
+      case Event::kBackwardCommStart:
+        return backward_comm_start_time;
+      case Event::kBackwardCommEnd:
+        return backward_comm_end_time;
+      default:
+        TORCH_INTERNAL_ASSERT(false);
+    }
+  }
+
+ public:
+  explicit CpuTimer(c10::Device /* unused */) {}
+
+  void record(Event event) override {
+    getTime(event) = current_time_in_nanos();
+  }
+
+  c10::optional<int64_t> measureDifference(Event start, Event end) override {
+    int64_t start_time = getTime(start);
+    int64_t end_time = getTime(end);
+    // If cpu_end_time is not recorded in this iteration,
+    // avg_time will return invalid value.
+    // For some cases like DDP runs on non-sync mode, backward compute
+    // end time can not be recorded in this iteration and thus can not
+    // calculate the valid avg_time.
+    // In this case, skip calculating the avg_time and return.
+    if (end_time < start_time) {
+      return c10::nullopt;
+    }
+    return end_time - start_time;
+  }
+};
+
+C10_REGISTER_TYPED_CLASS(TimerRegistry, c10::kCPU, CpuTimer);
+
+} // namespace
+
 Reducer::Reducer(
     std::vector<std::vector<at::Tensor>> replicas,
     std::vector<std::vector<size_t>> bucket_indices,
@@ -88,6 +149,12 @@
     }
   }
 
+  // For CUDA, record events only for single device module.
+  c10::Device device = replicas_[0][0].device();
+  if (!(device.is_cuda() && is_multi_device_module_)) {
+    timer_ = TimerRegistry()->Create(device.type(), device);
+  }
+
   // If `expect_sparse_gradients` is not specified, initialize it such that
   // we do not expect sparse gradients for any parameter.
   if (expect_sparse_gradients_.empty()) {
@@ -724,7 +791,7 @@
   checkAndRaiseMarkedTwiceError(variable_index);
   perIterationReadyParams_.insert(variable_index);
   backward_stats_[0][variable_index] =
-      current_time_in_nanos() - cpu_timer_.backward_compute_start_time;
+      current_time_in_nanos() - backward_compute_start_time_;
 
   // Any time we mark a variable ready (be it in line due to unused parameters,
   // or via an autograd hook), we require a call to the finalize function. If
@@ -1180,7 +1247,7 @@
     const std::vector<torch::autograd::Variable>& outputs) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  cpu_timer_.backward_compute_start_time = current_time_in_nanos();
+  backward_compute_start_time_ = current_time_in_nanos();
   if (should_collect_runtime_stats()) {
     record_backward_compute_start_time();
   }
@@ -1723,72 +1790,32 @@
 }
 
 void Reducer::record_forward_compute_start_time() {
-  if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
-    // Record event only for single device module.
-    if (!is_multi_device_module_) {
-      // Create and record event on the replicas_[0][0].device().
-      at::DeviceGuard g(replicas_[0][0].device());
-      gpu_timer_.forward_start.record();
-    }
-#endif
-  } else {
-    cpu_timer_.forward_start_time = current_time_in_nanos();
+  if (timer_) {
+    timer_->record(Timer::Event::kForwardStart);
   }
 }
 
 void Reducer::record_backward_compute_start_time() {
-  if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
-    // Record event only for single device module.
-    if (!is_multi_device_module_) {
-      // Create and record event on the replicas_[0][0].device().
-      at::DeviceGuard g(replicas_[0][0].device());
-      gpu_timer_.backward_compute_start.record();
-    }
-#endif
+  if (timer_) {
+    timer_->record(Timer::Event::kBackwardComputeStart);
   }
 }
 
 void Reducer::record_backward_compute_end_time() {
-  if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
-    // Record event only for single device module.
-    if (!is_multi_device_module_) {
-      at::DeviceGuard g(replicas_[0][0].device());
-      gpu_timer_.backward_compute_end.record();
-    }
-#endif
-  } else {
-    cpu_timer_.backward_compute_end_time = current_time_in_nanos();
+  if (timer_) {
+    timer_->record(Timer::Event::kBackwardComputeEnd);
   }
 }
 
 void Reducer::record_backward_comm_start_time() {
-  if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
-    // Record event only for single device module
-    if (!is_multi_device_module_) {
-      at::DeviceGuard g(replicas_[0][0].device());
-      gpu_timer_.backward_comm_start.record();
-    }
-#endif
-  } else {
-    cpu_timer_.backward_comm_start_time = current_time_in_nanos();
+  if (timer_) {
+    timer_->record(Timer::Event::kBackwardCommStart);
   }
 }
 
 void Reducer::record_backward_comm_end_time() {
-  if (replicas_[0][0].is_cuda()) {
-#ifdef USE_CUDA
-    // Record event only for single device module.
-    if (!is_multi_device_module_) {
-      at::DeviceGuard g(replicas_[0][0].device());
-      gpu_timer_.backward_comm_end.record();
-    }
-#endif
-  } else {
-    cpu_timer_.backward_comm_end_time = current_time_in_nanos();
+  if (timer_) {
+    timer_->record(Timer::Event::kBackwardCommEnd);
   }
 }
 
diff --git a/torch/lib/c10d/reducer.hpp b/torch/lib/c10d/reducer.hpp
index 72d925c..9d03808 100644
--- a/torch/lib/c10d/reducer.hpp
+++ b/torch/lib/c10d/reducer.hpp
@@ -1,9 +1,6 @@
 #pragma once
 
 #include <atomic>
-#ifdef USE_CUDA
-#include <ATen/cuda/CUDAEvent.h>
-#endif
 #include <memory>
 #include <mutex>
 #include <tuple>
@@ -29,6 +26,28 @@
 // Forward declaration
 class Logger;
 
+class Timer {
+ public:
+  enum class Event {
+    kForwardStart,
+    kBackwardComputeStart,
+    kBackwardComputeEnd,
+    kBackwardCommStart,
+    kBackwardCommEnd,
+  };
+
+  // Record the current event, i.e., mark it as having occurred now.
+  virtual void record(Event event) = 0;
+
+  // Return the difference between when two events occurred, in nanoseconds.
+  // Or nullopt if one of them hasn't been recorded.
+  virtual c10::optional<int64_t> measureDifference(Event start, Event end) = 0;
+
+  virtual ~Timer() = default;
+};
+
+C10_DECLARE_TYPED_REGISTRY(TimerRegistry, c10::DeviceType, Timer, std::unique_ptr, c10::Device);
+
 class Reducer {
  public:
   // The constructor takes a list of variables for every model replica.
@@ -335,37 +354,9 @@
   // communication calls like allReduce or communication hooks.
   int num_buckets_ready_;
 
-  // CPU timestamp to record event start and end time.
-  struct CPUTimer {
-    // The timestamp of forward call start time in each iteration.
-    int64_t forward_start_time;
-    // The timestamp of backward computation start and end time in each
-    // iteration.
-    int64_t backward_compute_start_time;
-    int64_t backward_compute_end_time;
-    // The timestamp of first communication call start time in each iteration.
-    int64_t backward_comm_start_time;
-    // The timestamp of last communication call end time in each iteration.
-    int64_t backward_comm_end_time;
-  };
-
-  CPUTimer cpu_timer_{};
-
-#ifdef USE_CUDA
-  // GPU events to record event start and end time.
-  struct GPUTimer {
-    at::cuda::CUDAEvent forward_start = at::cuda::CUDAEvent(cudaEventDefault);
-    at::cuda::CUDAEvent backward_compute_start =
-        at::cuda::CUDAEvent(cudaEventDefault);
-    at::cuda::CUDAEvent backward_compute_end =
-        at::cuda::CUDAEvent(cudaEventDefault);
-    at::cuda::CUDAEvent backward_comm_start =
-        at::cuda::CUDAEvent(cudaEventDefault);
-    at::cuda::CUDAEvent backward_comm_end =
-        at::cuda::CUDAEvent(cudaEventDefault);
-  };
-  GPUTimer gpu_timer_;
-#endif
+  // Timing information.
+  int64_t backward_compute_start_time_ = -1;
+  std::unique_ptr<Timer> timer_;
 
   // We collect the relative timestamp of every gradient being ready
   // when executing autograd. This can be used to derive a timeline of
diff --git a/torch/lib/c10d/reducer_cuda.cpp b/torch/lib/c10d/reducer_cuda.cpp
new file mode 100644
index 0000000..0f55b5a
--- /dev/null
+++ b/torch/lib/c10d/reducer_cuda.cpp
@@ -0,0 +1,89 @@
+#include <c10d/reducer.hpp>
+
+#ifdef USE_CUDA
+
+#include <c10/core/DeviceGuard.h>
+#include <ATen/cuda/CUDAEvent.h>
+
+namespace c10d {
+namespace {
+
+const int kMilliSecondToNanosSecond = 1000000;
+
+class CudaTimer : public Timer {
+ private:
+  c10::Device device;
+
+  at::cuda::CUDAEvent forward_start = at::cuda::CUDAEvent(cudaEventDefault);
+  at::cuda::CUDAEvent backward_compute_start =
+      at::cuda::CUDAEvent(cudaEventDefault);
+  at::cuda::CUDAEvent backward_compute_end =
+      at::cuda::CUDAEvent(cudaEventDefault);
+  at::cuda::CUDAEvent backward_comm_start =
+      at::cuda::CUDAEvent(cudaEventDefault);
+  at::cuda::CUDAEvent backward_comm_end =
+      at::cuda::CUDAEvent(cudaEventDefault);
+
+  at::cuda::CUDAEvent& getEvent(Event event) {
+    switch (event) {
+      case Event::kForwardStart:
+        return forward_start;
+      case Event::kBackwardComputeStart:
+        return backward_compute_start;
+      case Event::kBackwardComputeEnd:
+        return backward_compute_end;
+      case Event::kBackwardCommStart:
+        return backward_comm_start;
+      case Event::kBackwardCommEnd:
+        return backward_comm_end;
+      default:
+        TORCH_INTERNAL_ASSERT(false);
+    }
+  }
+
+ public:
+  explicit CudaTimer(c10::Device dev) : device(dev) {}
+
+  void record(Event event) override {
+    c10::DeviceGuard g(device);
+    getEvent(event).record();
+  }
+
+  c10::optional<int64_t> measureDifference(Event start, Event end) override {
+    c10::DeviceGuard g(device);
+    at::cuda::CUDAEvent& start_event = getEvent(start);
+    at::cuda::CUDAEvent& end_event = getEvent(end);
+    // It is possible users did not call backward or run codes in
+    // no-sync mode, in this case, some cudaEvents like "backward_compute_end"
+    // or "backward_comm_start" or "backward_comm_end" will not be recorded.
+    // cudaEvent is created when it is first time to be recorded.
+    // If it is never recorded/created, skip synchronize and calculation.
+    // Otherwise it will throw cuda errors.
+    if (!start_event.isCreated() || !end_event.isCreated()) {
+      return c10::nullopt;
+    }
+    // set_runtime_stats_and_log is called at the beginning of forward call,
+    // when it is cheap to synchronize the cuda events of previous iteration,
+    // as mostly all cuda operations are finished in previous iteration.
+    start_event.synchronize();
+    end_event.synchronize();
+    float milliseconds = start_event.elapsed_time(end_event);
+    // If gpu_end is not recorded in this iteration,
+    // milliseconds will have invalid value.
+    // For some cases like DDP runs on non-sync mode,
+    // gpu_end can not be recorded in this iteration and thus can not
+    // calculate the valid avg_time.
+    // In this case, skip calculating the avg_time and return.
+    if (milliseconds < 0) {
+      return c10::nullopt;
+    }
+    return int64_t(milliseconds * kMilliSecondToNanosSecond);
+  }
+};
+
+C10_REGISTER_TYPED_CLASS(TimerRegistry, c10::kCUDA, CudaTimer);
+
+} // namespace
+} // namespace c10d
+
+#endif