blob: 2ba68cda9cc3ea3d6d8649765eb1af44db323665 [file] [log] [blame]
#pragma once
#ifdef USE_C10D_NCCL
#if defined(__linux__)
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif
#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <list>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/NCCLUtils.hpp>
#include <torch/csrc/distributed/c10d/PrefixStore.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/intra_node_comm.hpp>
#include <ATen/DynamicLibrary.h>
#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/CUDAEvent.h>
#include <c10/core/Stream.h>
#include <c10/core/StreamGuard.h>
#include <c10/cuda/CUDACachingAllocator.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
#include <torch/custom_class.h>
namespace c10d {
// Control broadcasting of NCCL uniqueId
static std::vector<std::string> TORCH_NCCL_BCAST_UNIQUEID = {
"TORCH_NCCL_BCAST_UNIQUEID"};
// Control whether to always use high priority streams
static std::vector<std::string> TORCH_NCCL_HIGH_PRIORITY = {
"TORCH_NCCL_HIGH_PRIORITY"};
// Control whether or not wait() is blocking or non-blocking.
static std::vector<std::string> TORCH_NCCL_BLOCKING_WAIT = {
"TORCH_NCCL_BLOCKING_WAIT",
"NCCL_BLOCKING_WAIT"};
// TODO: We want to eventually remove this variable and make users to use
// the default value (3 - SkipCleanUp).
// Control whether or not we perform Async Error Handling with NCCL.
static std::vector<std::string> TORCH_NCCL_ASYNC_ERROR_HANDLING = {
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
"NCCL_ASYNC_ERROR_HANDLING"};
// Control whether dumping debug info on watchdog
// timeout is enabled. This variable must be set together with
// TORCH_NCCL_ENABLE_MONITORING=1 and TORCH_NCCL_TRACE_BUFFER_SIZE > 0.
static std::vector<std::string> TORCH_NCCL_DUMP_ON_TIMEOUT = {
"TORCH_NCCL_DUMP_ON_TIMEOUT"};
// Control whether Desync Debug is enabled. This variable must be set
// together with TORCH_NCCL_ASYNC_ERROR_HANDLING.
static std::vector<std::string> TORCH_NCCL_DESYNC_DEBUG = {
"TORCH_NCCL_DESYNC_DEBUG",
"NCCL_DESYNC_DEBUG"};
// Enable recording start-events for all ProcessGroupNCCL collectives, and
// compute accurate collective timing per-collective. (Note: end-events are
// recorded by default. Turn on this flag can increase chances of a watchdog
// hang due to performing a CUDA event query which eventually calls
// cudaEventElapsedTime() API.
static std::vector<std::string> TORCH_NCCL_ENABLE_TIMING = {
"TORCH_NCCL_ENABLE_TIMING",
"NCCL_ENABLE_TIMING"};
// Enable monitoring thread which aborts the process when the ProcessGroupNCCL
// Watchdog thread gets stuck and no heartbeat is detected after
// TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC. This can happen due to calling CUDA/NCCL
// APIs that may hang. It is Useful to prevent jobs being stuck for a prolonged
// time than necessary tying up cluster resources.
static std::vector<std::string> TORCH_NCCL_ENABLE_MONITORING = {
"TORCH_NCCL_ENABLE_MONITORING"};
// Control the watchdog heartbeat timeout period after which the monitoring
// thread will abort the process.
static std::vector<std::string> TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC = {
"TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"};
// Whether to rethrow CUDA Errors in the watchdog (default true)
static std::vector<std::string> TORCH_NCCL_RETHROW_CUDA_ERRORS = {
"TORCH_NCCL_RETHROW_CUDA_ERRORS"};
// The maximum number of events we store in the flight recorder's ring buffer.
// (One event could be the start or end of a collective, for example).
static std::vector<std::string> TORCH_NCCL_TRACE_BUFFER_SIZE = {
"TORCH_NCCL_TRACE_BUFFER_SIZE"};
// Control how much extra time we will wait for dumping the debugging info
// before we exit and throws timeout exception.
static std::vector<std::string> TORCH_NCCL_WAIT_TIMEOUT_DUMP_MILSEC = {
"TORCH_NCCL_WAIT_TIMEOUT_DUMP_MILSEC"};
// Control the interval inside the monitoring thread to check the coordinated
// signal from other ranks, e.g. to dump the debugging information.
static std::vector<std::string> TORCH_NCCL_COORD_CHECK_MILSEC = {
"TORCH_NCCL_COORD_CHECK_MILSEC"};
// Whether to log C++ stack traces on unclean shutdown (default true)
static std::vector<std::string> TORCH_NCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN = {
"TORCH_NCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN"};
// Control whether to use CudaEventCache for the collective in watchdog thread.
// We noticed in the past when cuda global lock is held, destroying CudaEvent
// can cause a hang.
static std::vector<std::string> TORCH_NCCL_CUDA_EVENT_CACHE = {
"TORCH_NCCL_CUDA_EVENT_CACHE"};
static std::vector<std::string> TORCH_NCCL_NAN_CHECK = {"TORCH_NCCL_NAN_CHECK"};
constexpr const char* NCCL_BACKEND_NAME = "nccl";
constexpr const char* EXCEPTION_DUMP = "exception_dump";
constexpr const int kWorkStatusUpdatePeriodMs = 30 * 1000; // 30 seconds
constexpr auto kProcessGroupNCCLDefaultTimeout =
std::chrono::milliseconds(10 * 60 * 1000);
// NoHandling: do not handle asynchronous NCCL errors
// TearDown: tear down process upon error, see `WorkNCCL::handleException`
// CleanUpOnly: just clean up collectives and abort communicators without
// tearing down process SkipCleanUp: (this is a temporary option and can be
// removed in future) tear down process without cleaning up NCCL communicators.
// This should be used as a last resort in case `ncclCommAbort` itself is
// hanging
enum ErrorHandlingMode {
NoHandling = 0,
TearDown = 1,
CleanUpOnly = 2,
SkipCleanUp = 3
};
#define SHOULD_CLEAN_UP(a) (a != NoHandling && a != SkipCleanUp)
#define SHOULD_TEAR_DOWN(a) (a != NoHandling && a != CleanUpOnly)
#define PRINT_COLLECTIVE_HASH_SIGNATURE(phase, opType, numel, hashValue) \
LOG(WARNING) << logPrefix() << "Hash of " << phase << " to NCCL " << opType \
<< " with size " << numel << " is " << hashValue;
// If set, ProcessGroupNCCL doesn't use recordStream calls to ensure
// caching allocator safety for tensors used on both user-facing and
// internal comm streams.
// Instead, it stashes live references to those tensors until after
// user-facing streams are synced with comm streams.
// See stashed_for_allocator_safety_ below.
static std::vector<std::string> TORCH_NCCL_AVOID_RECORD_STREAMS = {
"TORCH_NCCL_AVOID_RECORD_STREAMS"};
// If set, ProcessGroupNCCL registers postAlloc and preFree hooks to cuda cache
// allocator so that whenever a tensor is allocated or freed, ProcessGroupNCCL
// can register/deregister the tensor on all available NCCL communicators.
static std::vector<std::string> TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK =
{"TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK",
"NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"};
#if defined(__linux__)
struct DumpPipe {
DumpPipe(int rank) {
std::string fileStem =
getCvarString({"TORCH_NCCL_DEBUG_INFO_PIPE_FILE"}, "");
if (fileStem.empty() ||
getCvarInt({"TORCH_NCCL_TRACE_BUFFER_SIZE"}, 0) <= 0) {
return;
}
TORCH_CHECK(!fileStem.empty(), "TORCH_NCCL_DEBUG_INFO_TEMP_FILE is empty");
std::string filename = c10::str(fileStem, rank, ".pipe");
TORCH_CHECK(
unlink(filename.c_str()) != -1 || errno == ENOENT,
"Error removing existing named pipe ",
filename);
TORCH_CHECK(
mkfifo(filename.c_str(), 0666) != -1,
"Error creating named pipe ",
filename);
fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
LOG(INFO) << "Pipe file " << filename
<< " has been opened, write to it to trigger NCCL Debug Dump.";
TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename);
}
bool shouldDump() {
if (fd_ == -1) {
return false;
}
char buf[128];
// non-blocking from O_NONBLOCK above.
// Ignore EINTR because we already will poll this
// again later.
ssize_t bytesRead = read(fd_, &buf, 128);
return bytesRead > 0;
}
~DumpPipe() {
if (fd_ != -1) {
close(fd_);
}
}
private:
int fd_ = -1;
};
#else
struct DumpPipe {
DumpPipe(int rank) {}
bool shouldDump() {
return false;
}
};
#endif
// ProcessGroupNCCL implements NCCL bindings for c10d.
//
// All functions of the class are expected to be called in the same order
// across all processes in the process group. This is the only way that we
// can guarantee to match up the same calls among all processes.
//
// All NCCL functions provided by this class are asynchronous functions. More
// specifically, each NCCL call is scheduled on a separate CUDA stream that is
// different from the current CUDA stream. This is for the purpose of
// achieving potentially concurrency and better performance. As a result,
// it is the callers' responsibility to make sure that the CUDA stream their
// code works on needs to wait for the NCCL operation from
// this class.
//
// This can be done by calling:
//
// either WorkNCCL::wait() or WorkNCCL::synchronize(), both achieves the same
// functionality and are synonyms.
//
// Also note that WorkNCCL::finishedGPUExecution() is a helper function only
// provided by ProcessGroupNCCL to check if the NCCL operation of WorkNCCL has
// finished execution on the GPU (not just scheduled).
//
// Example on using the NCCL process group
//
// ProcessGroupNCCL pg(store, rank, size);
// std::shared_ptr<WorkNCCL> work = pg.allreduce(tensors);
//
// // At this point, NCCL kernel has already by queued successfully
// // Now, let current stream wait for the NCCL to finish, this function is
// // async operation as well
//
// work->wait()
//
// // Now continue on other work in the current stream.
class TORCH_API ProcessGroupNCCL : public Backend {
public:
class WorkNCCL : public Work, public std::enable_shared_from_this<WorkNCCL> {
public:
friend struct WorkInfo;
// Constructor takes a list of CUDA devices
WorkNCCL(
const std::string& pgUID,
const std::string& pgDesc,
at::Device& device,
int rank,
OpType opType,
uint64_t seq,
const char* profilingTitle = nullptr,
const std::optional<std::vector<at::Tensor>>& inputs = std::nullopt,
bool desyncDebug = false,
bool enableTiming = false,
bool cudaEventCacheEnabled = false,
DebugLevel distDebugLevel = DebugLevel::Off);
// Copy constructor doing partial copy without outputs_. Cleanup thread
// monitors and removes finished works. However it will deadlock when
// destructs outputs_ tensors who are view tensors in autograd graph.
WorkNCCL(const WorkNCCL& w);
~WorkNCCL() override;
// Checks if the NCCL kernel has started to execute.
bool isStarted();
// Checks if request has completed. In this specific case of NCCL, it checks
// if the NCCL operation has completed on the GPU in its own NCCL stream.
// Non-blocking operation.
bool isCompleted() override;
bool isSuccess() const override;
// Same as calling synchronize() for NCCL work.
bool wait(std::chrono::milliseconds timeout = kNoTimeout) override;
void abort() override;
// Let current stream wait on the completing of the NCCL work
// Throws on exceptions. Blocking operation, which will wait for work
// completion.
void synchronize() override;
// Synchronize streams by blocking each on the NCCL stream
void synchronizeStream();
// Helper function to handle exception (throw if needed).
void handleException(ErrorHandlingMode asyncErrorHandling);
// Helper function that checks if the NCCL kernels have finished
// execution on the GPUs
bool finishedGPUExecution();
// Get a Future object that will be marked as completed internally.
c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;
float getDuration() const override;
uint64_t getSequencenumber() const override;
const std::string& logPrefix() const;
// Helper function that sets an exception_ptr on the WorkNCCL object.
void setException(std::exception_ptr exception_ptr);
// Helper function that returns True if the WorkNCCL object has timed out
// and False otherwise.
// In case of timeout, set exception on the WorkNCCL object.
bool checkTimeout(
std::optional<std::chrono::milliseconds> timeout = std::nullopt);
std::vector<at::Tensor> result() override;
protected:
// The process group unique id
std::string pgUID_;
// The process group description
std::string pgDesc_;
// The cached list of CUDA devices to operate on
at::Device device_;
// The start CUDA event of NCCL operator tracking this work item. These
// start CUDA events are needed by desync debugging if enabled.
std::shared_ptr<at::cuda::CUDAEvent> ncclStartEvent_;
// The end CUDA event of NCCL operator tracking this work item.
std::shared_ptr<at::cuda::CUDAEvent> ncclEndEvent_;
// The NCCL communicator used for this work item.
std::shared_ptr<NCCLComm> ncclComm_;
// Tensors used for barrier op
at::Tensor barrierTensor_;
// Clone of blockingWait_ from ProcessGroupNCCL.
bool blockingWait_ = false;
// Clone of avoidRecordStreams_ from ProcessGroupNCCL.
bool avoidRecordStreams_ = false;
// Clone of opTimeout_ from ProcessGroupNCCL.
std::chrono::milliseconds opTimeout_;
// Ephemeral timeouts are owned by exactly one work,
// and reset after that work completes.
// There may be more than one ephemeral timeout active at the same time,
// and this variable is used to track the ownership of ephemeral timeout.
std::chrono::milliseconds ownedEphermeralTimeout_ =
std::chrono::milliseconds(0);
// Time point representing when the work started.
std::chrono::time_point<std::chrono::steady_clock> workStartTime_;
// Record the collective sequential number.
uint64_t seq_;
// Indicates if the nccl start event has been updated to the store trace.
// This will be used by desync debug.
bool startTraceUpdated_{false};
// Record collective sizes for debug. We only record the size on the first
// device as multi-device per process is deprecated
size_t numelIn_ = -1;
size_t numelOut_ = -1;
// Wrapper method for the static checkForNCCLErrors which can be overridden
// for tests.
virtual std::exception_ptr checkForNCCLErrors();
friend std::ostream& operator<<(
std::ostream& output,
const WorkNCCL& workNCCL);
private:
// Helper function for synchronize
void synchronizeInternal(std::chrono::milliseconds timeout);
// Checks for NCCL errors and sets an appropriate exception_ptr.
void checkAndSetException();
// Just checks whether GPU execution has started, without modifying
// exception_ptr.
bool startedGPUExecutionInternal() const;
// Just checks whether GPU execution has completed, without modifying
// exception_ptr.
bool finishedGPUExecutionInternal() const;
// Reference to the store so that we can write aborted communicators
// to the store.
c10::intrusive_ptr<Store> store_;
// Store a reference to NCCL collective's outputs, used by result and to
// give a more descriptive message when representing the Work as a string.
std::shared_ptr<std::vector<at::Tensor>> outputs_;
// TORCH_NCCL_AVOID_RECORD_STREAMS implementation helper.
// Stores references to participating non-output tensors (ie inputs,
// flattened intermediates).
// We'll clear this list in synchronizeStream, just after user-facing
// stream(s) are synced with the nccl work stream(s).
// By keeping these refs (as well as outputs_) alive until after the
// collective's work rejoins the user-facing streams, we achieve
// caching allocator safety without any recordStream calls.
// For in-place collectives, some refs stashed here may alias outputs_,
// but that doesn't do any harm.
std::shared_ptr<std::vector<at::Tensor>> stashed_for_allocator_safety_;
// The future returned by getFuture.
c10::intrusive_ptr<at::ivalue::Future> future_;
bool timingEnabled_;
// unique id used to tell the trace buffer that this
// work has completed
std::optional<uint64_t> trace_id_;
DebugLevel distDebugLevel_;
friend class ProcessGroupNCCL;
};
class CUDAEventCache {
public:
CUDAEventCache();
std::shared_ptr<at::cuda::CUDAEvent> create(bool timing);
static CUDAEventCache& get();
private:
std::timed_mutex cacheMutex_;
// NOTE: We intentionaly store raw pointers so that
// we do not attempt to destroy the event objects on process exit,
// because cuda may be gone.
std::vector<at::cuda::CUDAEvent*>
eventsArray_[2]; // 0 for timing=false, 1 for timing=true
};
struct Options : Backend::Options {
// NOTE: timeout in ProcessGroupNCCL::Options denote the timeout for
// operations. This is only used when blockingWait_ is enabled.
explicit Options(bool is_high_priority_stream = false);
// return intrusive_ptr of the object
static c10::intrusive_ptr<Options> create(
bool is_high_priority_stream = false) {
return c10::make_intrusive<Options>(is_high_priority_stream);
}
// Schedule NCCL operations on high priority CUDA streams
bool is_high_priority_stream;
#ifdef NCCL_HAS_COMM_NONBLOCKING
// Configure ranks
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
#endif
// Optional "parent" backend and color to create communicators from
// via `ncclCommSplit`
std::shared_ptr<ProcessGroupNCCL> split_from;
int64_t split_color{0};
std::vector<uint64_t> global_ranks_in_group;
std::string group_name;
};
// If you wish to create multiple process groups, each with a potentially
// different rank and size, you can do so by passing a new store instance
// to each one. If you have only a single store object, you can
// use the `c10d::PrefixStore` to derive scoped instances.
// This is also what the Python API in torch.distributed does.
//
// The process group instance keeps a reference to the store because
// it may be used long after the constructor runs. In fact, the constructor
// doesn't create any NCCL communicators. A single NCCL communicator can
// only be used on a specific set of devices, and are therefore created
// on-demand when a collective runs. If another collective is executed later,
// against a different set of devices, the process group creates another NCCL
// communicator. These NCCL communicators are cached and reused if possible.
//
ProcessGroupNCCL(
const c10::intrusive_ptr<Store>& store,
int rank,
int size,
c10::intrusive_ptr<Options> options = Options::create());
// This constructor includes the deprecated `groupName` argument.
// If you have existing code that uses the `groupName`, you can replace
// it by specifying a `c10d::PrefixStore(groupName, store)` for store.
C10_DEPRECATED ProcessGroupNCCL(
const c10::intrusive_ptr<Store>& store,
int rank,
int size,
const std::string& groupName,
c10::intrusive_ptr<Options> options = Options::create())
: ProcessGroupNCCL(store, rank, size, options) {}
~ProcessGroupNCCL() override;
// This function returns a local uid for ProcessGroupNCCL.
uint64_t getUid() {
return static_cast<uint64_t>(local_id_);
}
c10::intrusive_ptr<Options> getOptions() {
return options_;
}
const std::string getBackendName() const override {
return std::string(NCCL_BACKEND_NAME);
}
bool supportsSplitting() const override {
return true;
}
void startCoalescing() override;
c10::intrusive_ptr<Work> endCoalescing() override;
// For specifying a composite optype, such as ALLGATHER and REDUCE_SCATTER
c10::intrusive_ptr<Work> endCoalescing(OpType optype);
c10::intrusive_ptr<Work> broadcast(
std::vector<at::Tensor>& tensors,
const BroadcastOptions& opts = BroadcastOptions()) override;
c10::intrusive_ptr<Work> _broadcast_oop(
at::Tensor& outputTensors,
at::Tensor& inputTensors,
const BroadcastOptions& opts = BroadcastOptions());
c10::intrusive_ptr<Work> allreduce_sparse(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;
c10::intrusive_ptr<Work> allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;
c10::intrusive_ptr<Work> allreduce_coalesced(
std::vector<at::Tensor>& tensors,
const AllreduceCoalescedOptions& opts =
AllreduceCoalescedOptions()) override;
c10::intrusive_ptr<Work> reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts = ReduceOptions()) override;
c10::intrusive_ptr<Work> _reduce_oop(
at::Tensor& outputTensors,
at::Tensor& inputTensors,
const ReduceOptions& opts = ReduceOptions());
c10::intrusive_ptr<Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> _allgather_base(
at::Tensor& outputbuffer,
at::Tensor& inputbuffer,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> allgather_coalesced(
std::vector<std::vector<at::Tensor>>& outputTensorLists,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> allgather_into_tensor_coalesced(
std::vector<at::Tensor>& outputs,
std::vector<at::Tensor>& inputs,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> reduce_scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ReduceScatterOptions& opts = ReduceScatterOptions()) override;
c10::intrusive_ptr<Work> _reduce_scatter_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
const ReduceScatterOptions& opts = ReduceScatterOptions()) override;
c10::intrusive_ptr<Work> reduce_scatter_tensor_coalesced(
std::vector<at::Tensor>& outputs,
std::vector<at::Tensor>& inputs,
const ReduceScatterOptions& opts = ReduceScatterOptions()) override;
c10::intrusive_ptr<Work> barrier(
const BarrierOptions& opts = BarrierOptions()) override;
c10::intrusive_ptr<Work> alltoall_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
std::vector<int64_t>& outputSplitSizes,
std::vector<int64_t>& inputSplitSizes,
const AllToAllOptions& opts = AllToAllOptions()) override;
c10::intrusive_ptr<Work> alltoall(
std::vector<at::Tensor>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllToAllOptions& opts = AllToAllOptions()) override;
c10::intrusive_ptr<Work> send(
std::vector<at::Tensor>& tensors,
int dstRank,
int tag) override;
c10::intrusive_ptr<Work> recv(
std::vector<at::Tensor>& tensors,
int srcRank,
int tag) override;
void groupStart();
void groupEnd();
void groupEndNonblocking(std::shared_ptr<NCCLComm> comm);
c10::intrusive_ptr<Work> gather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const GatherOptions& opts = GatherOptions()) override;
c10::intrusive_ptr<Work> scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ScatterOptions& opts = ScatterOptions()) override;
// Unsupported Ops
c10::intrusive_ptr<Work> recvAnysource(
std::vector<at::Tensor>& tensors,
int tag) override;
// Agrees on an initial sequence number for the whole group by having rank 0
// create it and broadcast it to other ranks using the store.
void setSequenceNumberForGroup() override;
// Retrieves the current sequence number for the whole group, which should be
// in sync. If the returned number is not consistent across the group, it
// may indicate that there is some sort of collective desynchronization.
uint64_t getSequenceNumberForGroup() override;
// Return the total number of splits the communicators held by this process
// group have performed. Counts ncclCommCreateFromRanks() for ncclx v2.21.5+
uint64_t getCommSplitCounter() const;
void registerOnCompletionHook(
std::function<void(std::shared_ptr<WorkInfo>)>&& hook) override;
void waitForPendingWorks() override;
void enableCollectivesTiming() override;
// Helper function for iteratively aborting communicators in the provided map
void abortCommsFromMap(
std::unordered_map<std::string, std::shared_ptr<NCCLComm>>& ncclCommsMap,
std::optional<std::string> abortReason);
c10::intrusive_ptr<intra_node_comm::IntraNodeComm> initIntraNodeComm();
// Provides an API to abort the ProcessGroup (similar to ncclCommAbort)
// instead of relying on ProcessGroupNCCL destructor.
// return true if abort is successful, otherwise false
bool abort(std::optional<std::string> abortReason = std::nullopt);
void shutdown(std::optional<std::string> reason = std::nullopt);
void eagerConnectSingleDevice(at::Device device) override;
void performNocolorSplit(at::Device device);
// This method adds a temporary extension for the timeout period,
// applying to all collectives between the calling of this API and
// the completion of the first collective on the GPU. While this feature
// provides flexibility in specific scenarios, it introduces statefulness
// to timeout setting. Therefore, it is advisable to use this API sparingly
// and consider alternative approaches, such as directly setting the timeout
// or utilizing a barrier collective (one can set any timeout to the barrier),
// whenever feasible.
void addEphemeralTimeout(const std::chrono::milliseconds& timeout);
// This function is only intended for testing purposes because we don't
// want to expose the `WorkNCCL` via pybind. It verifies whether the
// `opTimeout_` of the provided WorkNCCL instance is the same as the specified
// timeout.
bool verifyWorkTimeoutForTest(
const c10::intrusive_ptr<Work> work,
const std::chrono::milliseconds& timeout);
protected:
// Helper that broadcasts nccl unique ID to all ranks through the store
void broadcastUniqueNCCLID(
ncclUniqueId* ncclID,
bool isSingleP2POp,
const std::string& devicesKey,
int p2pRank);
// Helper that either looks up the cached NCCL communicators or creates
// a new set of NCCL communicators as a cache entry
std::shared_ptr<NCCLComm> getNCCLComm(
const std::string& deviceKey,
at::Device& device,
OpType opType,
int p2pRank = 0,
bool isSendRecvSelf = false);
// Wrapper method which can be overridden for tests.
virtual std::exception_ptr checkForNCCLErrors(
std::shared_ptr<NCCLComm>& ncclComm);
// Ensure thaht if record is True, the work obj will be enqueued via
// workEnqueue
virtual c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> initWork(
at::Device& device,
int rank,
OpType opType,
const char* profilingTitle = nullptr,
const std::vector<at::Tensor>& inputs = {},
const std::vector<at::Tensor>& outputs = {},
bool record = false);
// In the timeout case and we will dump debug info such as the NCCL flight
// recorder to storage. Down the road, if we have more complicated or blocking
// operations, we might need to use a side thread to do it.
bool dumpDebuggingInfo();
private:
int globalRankStart;
int globalRankStride;
// Helper that encapsulates work shared across all collective communication
// primitives. The callbacks have the following signatures:
//
// ncclResult_t fn(at::Tensor& input, at::Tensor& output,
// ncclComm_t, at::cuda::CUDAStream&);
// void {pre,post}(std::vector<at::cuda::CUDAStream&>);
template <typename Fn>
c10::intrusive_ptr<Work> collective(
at::Tensor& input,
at::Tensor& output,
Fn fn,
OpType opType,
const char* profilingTitle = nullptr,
bool avoidRecordStreams = false);
template <typename Fn, typename PreProcess, typename PostProcess>
c10::intrusive_ptr<Work> collective(
at::Tensor& input,
at::Tensor& output,
Fn fn,
PreProcess pre,
PostProcess post,
OpType opType,
const char* profilingTitle = nullptr,
bool avoidRecordStreams = false);
template <typename Fn>
c10::intrusive_ptr<Work> collectiveCoalesced(
std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
Fn fn,
OpType opType,
const char* profilingTitle = nullptr,
bool avoidRecordStreams = false);
// Helper that encapsulates work shared across point-to-point communication
// primitives. It is the same structure as the helper used for collective
// communication primitives.
template <typename Fn>
c10::intrusive_ptr<Work> pointToPoint(
at::Tensor& tensor,
Fn fn,
int peer,
OpType opType,
const char* profilingTitle = nullptr);
template <typename Fn, typename PreProcess, typename PostProcess>
c10::intrusive_ptr<Work> pointToPoint(
at::Tensor& tensor,
Fn fn,
int peer,
OpType opType,
PreProcess pre,
PostProcess post,
const char* profilingTitle);
c10::intrusive_ptr<Work> allreduce_impl(
at::Tensor& tensor,
const AllreduceOptions& opts = AllreduceOptions());
// Checks for NCCL errors on each of the communicators and returns an
// appropriate exception_ptr (nullptr if no errors).
static std::exception_ptr checkForNCCLErrorsInternal(
std::shared_ptr<NCCLComm>& ncclComm);
// Function that runs as part of a separate thread and checks for errors on
// NCCL communicators. We need a separate thread to check for NCCL errors
// since we can't rely on the user calling certain methods like wait(),
// isCompleted() etc. to detect and remediate errors. In addition to this, we
// need a mechanism to safely abort and remove NCCL communicators from our
// cache. This can be done cleanly by having a thread for the ProcessGroupNCCL
// class. Attempting to modify the communicator cache from the WorkNCCL class
// might run into issues with object lifetime since the ProcessGroupNCCL
// object might get destroyed before the WorkNCCL object.
void ncclCommWatchdog();
// Return the CUDA device most likely associated with this backend.
// If we aren't bound to a specific device, there is no strict
// guarantee that this heuristic is the correct assignment of ranks
// to GPUs that Python layers use, but in practice it tends to be.
// Fortunately we don't rely on this for correctness of any tensor
// operations, just for ancillary uses like barriers.
at::Device guessDeviceForRank() const;
// Destroys initialized NCCL communicators in devNCCLComMap_ given by input
// key. Throws if there are no communicators to destroy. Also removes
// communicators from the cache and clears used device indices.
void destroyNCCLComms(const std::string& devNCCLCommMapKey);
// Watchdog's inside loop.
// Takes care of cleaning up completed work, and aborting upon failure or
// timeout.
void watchdogHandler();
void runHookLoop();
// Desync debug helper
void logWorkStart(WorkNCCL& work);
// Desync debug helper
void logWorkEnd(WorkNCCL& work);
// Generates a prefix that is unique to this process group and rank, for
// disambiguating logs
std::string createLogPrefix() const;
// Returns the unique prefix created in createLogPrefix
const std::string& logPrefix() const;
// Returns the global rank of the device. This function assumes that users
// always create a default global process group(PG) which includes all
// devices. It is called in the constructor of ProcessGroupNCCL, so it always
// return the rank_ of the the very first PG created, aka, default global PG.
const int& globalRank() const;
// Returns the global ranks of a PG.
const std::vector<uint64_t>& groupRanks() const;
// Util function to assign timeout to each work.
void assignTimeoutToWork(
const c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL>& work,
const c10::intrusive_ptr<Options>& option);
protected:
// Function that runs as part of a separate thread aside from watchdog
// thread because we need to check the heartbeat from watchdog thread
// so that when we get stuck in some NCCL/CUDA calls,
// we can dump the debugging information and abort the process.
virtual void heartbeatMonitor();
// Function that directly trigger std::abort so that the whole process
// gets terminated.
virtual void terminateProcess(std::string errMsg);
// A helper function to wait for a future to complete or timeout.
void waitForFutureOrTimeout(
std::future<bool>& fut,
const std::chrono::milliseconds& timeOutMilSec,
const std::string& futDescription,
bool throwException = false);
// When watchdog timeout, this function will be called and return debug info
// for users. For now we only get information from retrieveDesyncReport.
// We are working on enabling more useful debug information for watchdog
// timeout.
virtual std::string getNCCLWatchdogDebugInfo();
std::string getNCCLWatchdogTimeoutErrorMsg(const std::string& extraMsg);
std::string getNCCLWatchdogTimeoutExitMsg(const std::string& exitReason);
static const int64_t kWatchdogThreadSleepMillis;
// The store is used to broadcast the NCCL unique ID of rank 0. This store
// comes with prefix and it is different across ProcessGroup NCCL instances
// (aka, different ProcessGroups).
c10::intrusive_ptr<Store> store_;
// Reference to the store without prefix so that keys are same across all
// ProcessGroup NCCL instances and (key, value) pairs written to the store are
// global.
c10::intrusive_ptr<Store> globalStore_;
bool storeError_{false};
// The lock which protects the write/read of
// ephemeralTimeoutActive_/ephemeralTimeoutInflight_.
// TODO(fduwjj): We need to have an audit on all mutexes we are adding here.
// And consolidate them if possible.
std::timed_mutex mtxTimeoutExtension_;
// The ephemeral timeout added on top of existing timeout for works issued
// before first work finishes.
std::chrono::milliseconds ephemeralTimeoutActive_ =
std::chrono::milliseconds(0);
// The ephemeral timeout addition which has been already applied to work.
std::chrono::milliseconds ephemeralTimeoutInflight_ =
std::chrono::milliseconds(0);
const c10::intrusive_ptr<Options> options_;
// The number of NCCL communicators that have been created during
// the lifetime of this process group. This sequence number is
// used to scope keys used in the store.
uint64_t ncclCommCounter_{0};
// The store keys to trace the last NCCL collective kernel CUDA events - start
// event and end event respectively. These are used to do desync root cause
// analysis.
const std::string traceKeyStart_;
const std::string traceKeyEnd_;
// The NCCL communicator that the process group has cached.
//
// For collective operations:
// The key is a list of GPU devices that an operation is operating on
// The GPU devices are stored in a device sequence and the cache NCCL
// communicator is associated with this GPU device sequence
//
// e.g. If the process group op only uses device 0, then the value of
// the used device string stored (value of the hashmap) would be "0".
//
// If the process group op uses device 0 - 7 and the each tensor of the
// input tensor list is on device, 0, 1, 2, 3, 4, 5, 6, 7 separately,
// then the value of the used device string (key) stored would be
// "0,1,2,3,4,5,6,7"
//
// If the process group op uses device 0 - 7 and the each tensor of the
// input tensor list is on device, 0, 4, 5, 6, 7, 1, 2, 3 separately,
// then the value of the used device string stored would be
// "0,4,5,6,7,1,2,3"
//
// Note that the order of the device for the tensor list matters.
//
// For point-to-point operations:
// The key is a string of my current rank and the peer process rank.
// e.g. If process 1 and process 2 are involved in a point-to-point
// communication, the key will be "1:2" on both processes. Note: this is for
// the scenario where there is only 1 GPU per process. When it comes to
// multiple GPUs per process, this part may need to redesigned.
// TODO: we probably need a separte map for P2P comms
std::unordered_map<std::string, std::shared_ptr<NCCLComm>> devNCCLCommMap_;
// The NCCL communicators currently in process of being initialized.
std::unordered_map<std::string, std::shared_ptr<NCCLComm>>
inInitializationCommMap_;
// Mutex to guard maps like devNCCLCommMap_.
std::timed_mutex mutex_;
// Heartbeat of watchdog thread.
std::atomic_uint64_t heartbeat_;
// The time interval used for deciding whether there is no watchdog heartbeat.
int heartbeatTimeoutInSec_;
// timeout for the dump to finish.
int waitTimeoutDumpInMilSec_;
// Interval of check coordinated signals in ProcessGroupNCCL from other ranks
// e.g., trigger the dump of the debugging info for timeout when notified.
int coordCheckIntervalMilSec_;
// Size of ring buffer where we store NCCL Traces for debugging.
int ncclTraceBufferSize_;
// We gate the heartbeat monitor thread so that we can roll it out gradually.
std::atomic<bool> monitorThreadEnabled_;
// We gate the cudaEventCache so that we can roll it out gradually.
std::atomic<bool> cudaEventCacheEnabled_;
// Monitor thread which checks the heartbeat of Watchdog thread.
// If the monitor thread finds there is no heartbeat, it will dump debug info
// and then kill the watchdog thread to avoid hang.
std::thread ncclHeartbeatMonitorThread_;
// Watchdog thread which looks for errors on the cached NCCL communicators.
std::thread ncclCommWatchdogThread_;
std::thread onCompletionHookThread_;
// Whether or not we should terminate the watchdog and workCleanup threads.
std::atomic<bool> terminateProcessGroup_;
// Whether or not we should terminate the heartbeat monitoring threads.
std::atomic<bool> terminateHeartbeatMonitorThread_;
// Whether we are in the shutdown mode when we are trying to get debug info,
// such as desync report.
std::atomic<bool> collectiveDebugInfoMode_;
// Whether there are hooks pending to be fired
std::atomic<bool> hasPendingHooks_;
// This is the signal from watchdog threads to indicate whether the monitor
// thread should dump. Making it static so that it is accessiable from all the
// PGs. With this flag, monitor thread would dump debug info under any one of
// the three conditions:
//
// 1: watchdog thread of any PG detects a collective timeout.
// 2: timeout signal is received from other ranks through tcpstore.
// 3: current PG's watchdog heartbeat timeout occurs.
//
// Note that only the monitor thread from PG0 will dump the debug info for
// case one and two so that the debug info is only dumped once.
static std::atomic<bool> shouldDump_;
// Mutex to Guard workMetaList_
std::timed_mutex workMetaListMutex_;
// Mutex to Guard monitorWakeUpCV_
std::timed_mutex monitorMutex_;
bool writeDebugInfo_ = false;
// Condition Variable for watchdog thread sleep
std::condition_variable_any workMetaListCV_;
// Condition Variable for monitor thread to wake up early
std::condition_variable_any monitorWakeUpCV_;
// Vector to Store WorkNCCL pointers
std::list<ProcessGroupNCCL::WorkNCCL> workMetaList_;
std::chrono::time_point<std::chrono::steady_clock> lastWorkListUpdateTime_;
// Mutex to Guard workMetaList_
std::timed_mutex completedWorkListMutex_;
// Condition Variable for watchdog thread sleep
std::condition_variable_any completedWorkListCV_;
std::list<ProcessGroupNCCL::WorkNCCL> completedWorkList_;
// Add Work Pointer to workVector
void workEnqueue(c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL>);
// The CUDA streams used by NCCL kernels
std::unordered_map<std::string, at::cuda::CUDAStream> ncclStreams_;
// The CUDA events used to sync NCCL streams
std::unordered_map<std::string, at::cuda::CUDAEvent> ncclEvents_;
// Device Indexes used for all collectives in this group
std::set<int> usedDeviceIdxs_;
// Flag to denote if a coalescing groupStart/groupEnd block is active
int coalescing_state_ = 0;
// Stores device indexes for all collectives run inside a coalescing block
at::Device coalescedDevice_ = at::Device("cuda");
// Stores communicators for all collectives run inside a coalescing block
std::shared_ptr<NCCLComm> coalescedComm_ = nullptr;
// map from the key: "group name + pg counter (ID)" to the
// unique NCCL ID count. This needs to be group and pg specific
//
// For each process group, we need a uniform unique NCCL ID counter to ensure
// that NCCL operation in this process group can be completed successfully.
// Since each process group ID belongs to a group name, the key to this map
// is a combination of group name and ProcessGroupNCCL ID.
static std::unordered_map<std::string, ssize_t> pgUniqueNCCLIDCnt_;
// map from group name to the pg counter (ID) within that group
//
// For each group with the "group name" (which is the key), we need to
// keep track of a unique process group ID when creating a new
// ProcessGroupNCCL for this "group name". Therefore, the value of this
// map keeps the unique ProcessGroupNCCL's ID for a specific group with
// the "group name". The reason we need a per-group process group ID counter
// is that different group can have different ranks and we need ensure that
// each group has its own uniform process group ID for all its ranks.
static std::unordered_map<std::string, ssize_t> processGroupCounterMap_;
// Whether or not wait() and synchronize() are blocking operations that wait
// for the operation to complete.
bool blockingWait_ = false;
// Whether or not to hook the cache allocator to register all allocated
// tensors
bool useTensorRegisterAllocatorHook_ = false;
// Whether or not the workCleanupThread is used to perform async error
// handling.
ErrorHandlingMode asyncErrorHandling_ = NoHandling;
// Whether or not to enable timeout root cause analysis.
bool desyncDebug_;
// Whether or not to dump debug info on exception including both watchdog
// timeout and nccl errors.
bool dumpOnTimeoutOrEx_;
// Whether or not to enable nan check for input tensors to collectives.
bool enableNanCheck_;
// Whether or not to print C++ stack traces to logs on unclean shutdown.
bool logCppStackOnUncleanShutdown_;
// Whether or not to create start CUDAEvent and enable timing for start
// and end events. Note that enableTiming_ is always true if desyncDebug_
// is set to true.
std::atomic<bool> enableTiming_;
// Flag to enable the print of hash value of input/output of collectives for
// verification.
std::atomic<bool> enableCollecticeHashDebug_;
// Whether or not TORCH_NCCL_AVOID_RECORD_STREAMS was set
bool avoidRecordStreams_ = false;
// Whether the NCCL watchdog should rethrow CUDA errors.
bool rethrowCUDAErrors_ = false;
// Set of communicators that this process group has aborted and their
// ncclUniqueId has been written to the store. We don't need a lock
// for this map since only the watchdog thread accesses this set. The
// set contains the string representation of ncclUniqueId.
std::unordered_set<std::string> abortedComms_;
// The number of active ncclGroupStart() calls. This counter will be increased
// by 1 when ncclGroupStart() is called and decreased by 1 when ncclGroupEnd()
// is called.
static thread_local uint64_t ncclActiveGroupCounter_;
// Counting for the sequential number of NCCL collective call.
// (specifically, how many actual kernels we launched, which differs from
// op_id_ when coalescing is enabled)
uint64_t seqCollective_{0};
// Counting for the sequential number of NCCL P2P calls.
uint64_t seqP2P_{0};
// Incrementing counter for logical operations (collective or p2p) issued on
// the ProcessGroup
uint64_t op_id_{0};
std::exception_ptr watchDogException_ = nullptr;
// The number of ProcessGroupNCCL created on the current rank.
size_t local_id_;
std::string logPrefix_;
c10::intrusive_ptr<intra_node_comm::IntraNodeComm> intraNodeComm_;
// Number of devices on this node.
int localDeviceCount_{0};
std::shared_ptr<ProcessGroupStatus> pgStatus_ =
std::make_shared<ProcessGroupStatus>();
};
// Dumps the NCCL comm traces and additional information about the Process
// Group.
TORCH_API std::string dump_nccl_trace(
bool includeCollectives,
bool includeStackTraces,
bool onlyActive);
// Dumps the NCCL comm traces and additional information about the Process
// Group in JSON formatted string.
// We don't include stack traces in JSON format as it is far too much data.
TORCH_API std::string dump_nccl_trace_json(
bool includeCollectives,
bool onlyActive);
// Gets a mutable reference to a global optional function.Heartbeat Monitor
// will use this function to dump traces, if available. Inside fbcode, we
// store a function here that uses an internal tool for process tracing
TORCH_API std::optional<
std::function<void(std::function<void(const std::string&)>)>>&
get_cpp_trace_dumper();
// Similar to get_cpp_trace_dumper, this stores a function defined in
// torch-python layer that lets us check whether the GIL can be acquired,
// helpful for instrumenting in cases where a hang was observed.
typedef bool (*gil_checker_t)();
TORCH_API gil_checker_t& get_gil_checker();
} // namespace c10d
#endif // USE_C10D_NCCL