blob: 2f823ed4d7be8b77337f2616a793a38ba111b20c [file] [log] [blame]
#include <c10d/ProcessGroupNCCL.hpp>
#include <c10/util/Optional.h>
#include <exception>
#include <map>
#include <tuple>
#include <unordered_set>
#include <THC/THC.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/util/Logging.h>
#include <c10d/ParamCommsUtils.hpp>
#include <torch/csrc/cuda/nccl.h>
#include <c10d/Utils.hpp>
namespace c10d {
constexpr const char* const kNCCLAbortedCommStoreKey = "NCCLABORTEDCOMM";
namespace {
constexpr int kBytes = 8;
// RAII helper class to manage NCCL group API and CUDA free mutex.
// The destructor is allowed to throw since this helper class only
// manages group and lock lifetimes.
struct AutoNcclGroup {
AutoNcclGroup() {
(c10::cuda::CUDACachingAllocator::getFreeMutex())->lock();
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupStart());
#endif
}
~AutoNcclGroup() noexcept(false) {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupEnd());
#endif
(c10::cuda::CUDACachingAllocator::getFreeMutex())->unlock();
}
};
// NCCL op mapping
const std::map<ReduceOp, ncclRedOp_t> ncclOp = {
{ReduceOp::MIN, ncclMin},
{ReduceOp::MAX, ncclMax},
{ReduceOp::SUM, ncclSum},
{ReduceOp::PRODUCT, ncclProd},
};
// NCCL type typing
std::map<at::ScalarType, ncclDataType_t> ncclDataType = {
{at::kChar, ncclInt8},
{at::kByte, ncclUint8},
{at::kFloat, ncclFloat},
{at::kDouble, ncclDouble},
{at::kInt, ncclInt32},
{at::kLong, ncclInt64},
{at::kHalf, ncclHalf},
{at::kBool, ncclUint8},
#if defined(__HIP_PLATFORM_HCC__) && HIP_VERSION >= 301
{at::kBFloat16, ncclBfloat16},
#endif
};
// Helper function that gets the data type and issues error if not supported
ncclDataType_t getNcclDataType(at::ScalarType type) {
auto it = ncclDataType.find(type);
TORCH_CHECK(
it != ncclDataType.end(),
"Input tensor data type is not supported for NCCL process group: ",
type);
return it->second;
}
ncclRedOp_t getNcclReduceOp(const ReduceOp reduceOp, at::Tensor& input) {
try {
if (reduceOp == ReduceOp::SUM && input.scalar_type() == at::kBool) {
// For bool tensors, map sum to max, which both represent a bitwise or.
// This is to prevent overflow issues with sum, since we use uint8 to
// represent a bool (see ncclDataType mapping).
return ncclMax;
}
return ncclOp.at(reduceOp);
} catch (const std::out_of_range& e) {
switch (reduceOp) {
case ReduceOp::BAND:
throw std::runtime_error("Cannot use ReduceOp.BAND with NCCL");
break;
case ReduceOp::BOR:
throw std::runtime_error("Cannot use ReduceOp.BOR with NCCL");
break;
case ReduceOp::BXOR:
throw std::runtime_error("Cannot use ReduceOp.BXOR with NCCL");
break;
default:
throw std::runtime_error("Unhandled ReduceOp");
break;
}
}
}
// Get the deviceList String from the list of devices
std::string getKeyFromDevices(const std::vector<at::Device>& devices) {
std::string deviceList;
for (auto& device : devices) {
if (deviceList.empty()) {
deviceList = std::to_string(device.index());
} else {
deviceList += "," + std::to_string(device.index());
}
}
return deviceList;
}
std::string getKeySendRecv(int myRank, int peer) {
int lowRank = myRank < peer ? myRank : peer;
int highRank = myRank < peer ? peer : myRank;
std::string sendRecvPair = std::to_string(lowRank) + ":" + std::to_string(highRank);
return sendRecvPair;
}
// Get the list of devices from list of tensors
std::vector<at::Device> getDeviceList(const std::vector<at::Tensor>& tensors) {
std::vector<at::Device> res;
res.reserve(tensors.size());
for (auto& tensor : tensors) {
res.push_back(tensor.device());
}
return res;
}
// [Sync Streams] Helper that lets the input ncclStreams to wait for the current
// stream. NCCL communications run on ncclStreams, but input tensors are
// allocated on different streams (i.e., current streams). Communications on
// ncclStreams cannot start before pending input tensor ops on current streams
// finish. Otherwise, ops on two streams might read/write same tensors
// concurrently.
//
// The synchronization above alone is not enough. We also need to make sure
// input tensors are not freed before their usages on ncclStreams finish. This
// can be achieved by calling c10::cuda::CUDACachingAllocator::recordStream,
// which remembers the usage stream (ncclStream), creates an event on the usage
// stream when GC attempts to free the input tensor, and delays GC until that
// event is done.
void syncStreams(
const std::vector<at::Device>& devices,
std::vector<at::cuda::CUDAEvent>& ncclEvents,
std::vector<at::cuda::CUDAStream>& ncclStreams) {
for (size_t i = 0; i < devices.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams[i];
at::cuda::CUDAEvent& ncclEvent = ncclEvents[i];
ncclEvent.record(at::cuda::getCurrentCUDAStream(devices[i].index()));
ncclEvent.block(ncclStream);
}
}
// Given a ncclUniqueId, convert it to a string representation that can be put
// in the store.
std::string buildNcclUniqueIdStr(const ncclUniqueId& ncclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&ncclID);
std::ostringstream oss;
for (size_t i = 0; i < NCCL_UNIQUE_ID_BYTES; i++) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}
std::string getNcclAbortedCommStoreKey(const std::string ncclIdStr) {
return std::string(kNCCLAbortedCommStoreKey) + ":" + ncclIdStr;
}
// Returns exception's what() given an exception_ptr instance.
std::string getExceptionMsgFromExceptionPtr(
const std::exception_ptr& exceptionPtr) {
TORCH_CHECK(exceptionPtr != nullptr);
try {
std::rethrow_exception(exceptionPtr);
} catch (const std::exception& e) {
return e.what();
} catch (...) {
return "Unknown exception type";
}
}
std::vector<c10::DeviceIndex> getIndicesOfDevices(
const std::vector<c10::Device>& devices) {
std::vector<c10::DeviceIndex> deviceIndices;
deviceIndices.reserve(devices.size());
for (const at::Device& device : devices) {
TORCH_INTERNAL_ASSERT(device.is_cuda());
deviceIndices.push_back(device.index());
}
return deviceIndices;
}
} // namespace
const int64_t ProcessGroupNCCL::kWatchdogThreadSleepMillis = 10000;
const int64_t ProcessGroupNCCL::kWorkCleanupThreadSleepMillis = 1000;
constexpr int64_t kWaitForAbortCommStoreKey = 1000;
constexpr int64_t kSynchronizeBusyWaitMillis = 10;
thread_local uint64_t ProcessGroupNCCL::ncclActiveGroupCounter_ = 0;
std::ostream& operator<<(
std::ostream& output,
const ProcessGroupNCCL::WorkNCCL& workNCCL) {
std::string workInfo;
if (workNCCL.outputs_) {
workInfo = c10::str("WorkNCCL(",
"OpType=", opTypeToString(workNCCL.opType_),
", TensorShape=", (*workNCCL.outputs_)[0].sizes(),
", Timeout(ms)=", workNCCL.opTimeout_.count(),
")");
} else {
workInfo = c10::str("WorkNCCL(",
"OpType=", opTypeToString(workNCCL.opType_),
", Timeout(ms)=", workNCCL.opTimeout_.count(),
")");
}
return output << workInfo;
}
ProcessGroupNCCL::WorkNCCL::WorkNCCL(
const std::vector<at::Device>& devices,
int rank,
OpType opType,
const char* profilingTitle, const c10::optional<std::vector<at::Tensor>>& inputs)
: Work(rank, opType, profilingTitle, inputs),
devices_(devices),
workStartTime_(std::chrono::steady_clock::now()) {
// Creates the CUDA event wrappers
// Note: The actual events are lazily created when first recorded to with
// DEFAULT_FLAGS = cudaEventDisableTiming.
cudaEvents_ =
std::make_shared<std::vector<at::cuda::CUDAEvent>>(devices.size());
ncclComms_.resize(devices.size());
}
ProcessGroupNCCL::WorkNCCL::WorkNCCL(const WorkNCCL& w)
: Work(w.rank_, w.opType_),
std::enable_shared_from_this<WorkNCCL>(w),
devices_(w.devices_),
cudaEvents_(w.cudaEvents_),
ncclComms_(w.ncclComms_),
blockingWait_(w.blockingWait_),
opTimeout_(w.opTimeout_),
workStartTime_(w.workStartTime_) {
completed_ = w.completed_;
exception_ = w.exception_;
}
ProcessGroupNCCL::WorkNCCL::~WorkNCCL() {}
bool ProcessGroupNCCL::WorkNCCL::isCompleted() {
checkAndSetException();
return exception() || finishedGPUExecutionInternal();
}
bool ProcessGroupNCCL::WorkNCCL::isSuccess() const {
if (exception()) {
// Already detected an exception.
return false;
}
return !checkForNCCLErrors(ncclComms_) && finishedGPUExecutionInternal();
}
void ProcessGroupNCCL::WorkNCCL::checkAndSetException() {
if (exception()) {
// We already have an exception.
return;
}
auto exception_ptr = checkForNCCLErrors(ncclComms_);
std::unique_lock<std::mutex> lock(mutex_);
exception_ = exception_ptr;
if (exception_) {
LOG(INFO) << "[Rank " << rank_ << "]"
<< " found async exception when checking for NCCL errors: "
<< getExceptionMsgFromExceptionPtr(exception_);
}
}
void ProcessGroupNCCL::WorkNCCL::setException(
std::exception_ptr exception_ptr) {
std::unique_lock<std::mutex> lock(mutex_);
exception_ = exception_ptr;
}
// Helper that checks if the NCCL kernels are completed on the GPUs
bool ProcessGroupNCCL::WorkNCCL::finishedGPUExecution() {
checkAndSetException();
return finishedGPUExecutionInternal();
}
bool ProcessGroupNCCL::WorkNCCL::finishedGPUExecutionInternal() const {
for (size_t i = 0; i < devices_.size(); ++i) {
// Checking the work's corresponding CUDA events' status
if (!(*cudaEvents_)[i].query()) {
return false;
}
}
return true;
}
void ProcessGroupNCCL::WorkNCCL::checkAndThrowException() {
// Set the appropriate exception if found.
checkAndSetException();
// Throw an exception, only if we have a valid exception.
if (exception()) {
std::rethrow_exception(exception());
}
}
void ProcessGroupNCCL::WorkNCCL::handleNCCLGuard() {
std::lock_guard<std::mutex> lock(mutex_);
completed_ = true;
if (exception_) {
auto exceptionMsg = c10::str(
"Some NCCL operations have failed or timed out. Due to the ",
"asynchronous nature of CUDA kernels, subsequent GPU operations ",
"might run on corrupted/incomplete data. To avoid this inconsistency, ",
"we are taking the entire process down.");
LOG(ERROR) << exceptionMsg;
C10_LOG_API_USAGE_ONCE("ProcessGroupNCCL.WorkNCCL.handleNCCLGuard");
std::rethrow_exception(exception_);
}
}
void ProcessGroupNCCL::WorkNCCL::synchronize() {
// Call Synchronize without a timeout. We use this method to avoid adding a
// timeout argument to the public synchronize API.
synchronizeInternal(kNoTimeout);
}
void ProcessGroupNCCL::WorkNCCL::synchronizeStreams() {
for (size_t i = 0; i < devices_.size(); ++i) {
auto currentStream = at::cuda::getCurrentCUDAStream(devices_[i].index());
// Block the current stream on the NCCL stream
(*cudaEvents_)[i].block(currentStream);
}
}
// Waiting on the work's corresponding CUDA events
void ProcessGroupNCCL::WorkNCCL::synchronizeInternal(
std::chrono::milliseconds timeout) {
synchronizeStreams();
// In case of blocking, wait for the operation to complete.
if (blockingWait_) {
// Use the passed in timeout if provided, otherwise use the default
// opTimeout for each WorkNCCL object.
std::chrono::milliseconds workTimeout =
timeout == kNoTimeout ? opTimeout_ : timeout;
// Wait for the operation to complete.
while (!isCompleted()) {
if (timedOut()) {
// When operation times out due to some errors that are not
// detected by nccl communicators, ncclCommWatchdog can not check this
// time out error and thus can not abort ncclComms accordingly.
// So explicitly abort ncclComms here before throwing this timed out
// exception to users, after this, ncclCommWatchdog can detect nccl
// communicators are aborted and clean up devNCCLCommMap_ accordingly.
// if throwing timed out excepiton without aborting nccl communicators
// here, it was observed that CUDA GPU will have 100% utilization and
// can not run new events successfully.
for (const auto& ncclComm : ncclComms_) {
ncclComm->ncclCommAbort();
const auto& storeKey = getNcclAbortedCommStoreKey(
buildNcclUniqueIdStr(ncclComm->getNcclId()));
auto rankStr = std::to_string(rank_);
store_->set(
storeKey,
std::vector<uint8_t>(
reinterpret_cast<const uint8_t*>(rankStr.data()),
reinterpret_cast<const uint8_t*>(rankStr.data()) +
rankStr.size()));
LOG(INFO) << "[Rank " << rank_
<< "] Wrote aborted communicator id to store: " << storeKey;
}
auto currentTimepoint = std::chrono::steady_clock::now();
auto timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
currentTimepoint - workStartTime_);
std::string exceptionMsg = c10::str("[Rank ", rank_, "] ",
"Caught collective operation timeout: ",
(*this),
" ran for ",
timeElapsed.count(),
" milliseconds before timing out.");
throw std::runtime_error(exceptionMsg);
}
// Check for errors and throw appropriate exception.
checkAndThrowException();
std::this_thread::sleep_for(
std::chrono::milliseconds(kSynchronizeBusyWaitMillis));
}
checkAndThrowException();
}
// Device synchronize only after we've completed timeout checks.
if (!barrierTensors_.empty()) {
// If we use the work to do barrier, we should block here
for (auto& device : devices_) {
at::cuda::CUDAGuard gpuGuard(device);
AT_CUDA_CHECK(cudaDeviceSynchronize());
}
}
}
// Same as calling synchronize().
bool ProcessGroupNCCL::WorkNCCL::wait(std::chrono::milliseconds timeout) {
RECORD_PARAM_COMMS(
rank_, // rank
"wait", // colName
0, // inSize
0, // outSize
at::kByte, // dType
{}, // inSplitSizes
{}); // outSplitSizes
synchronizeInternal(timeout);
// Always return true, because abort API is not implemented.
return true;
}
void ProcessGroupNCCL::WorkNCCL::abort() {
TORCH_CHECK(false, "ProcessGroupNCCL::WorkNCCL::abort not implemented.");
}
bool ProcessGroupNCCL::WorkNCCL::timedOut() {
auto currentTimepoint = std::chrono::steady_clock::now();
return (
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTimepoint - workStartTime_) >= opTimeout_);
}
ProcessGroupNCCL::ProcessGroupNCCL(
const c10::intrusive_ptr<Store>& store,
int rank,
int size,
c10::intrusive_ptr<Options> options)
: ProcessGroup(rank, size),
store_(store),
options_(options),
ncclCommCounter_(0),
terminateProcessGroup_(false) {
TORCH_CHECK(at::cuda::getNumGPUs() != 0,
"ProcessGroupNCCL is only supported with GPUs, no GPUs found!");
blockingWait_ = parseEnvVarFlag(NCCL_BLOCKING_WAIT);
asyncErrorHandling_ = parseEnvVarFlag(NCCL_ASYNC_ERROR_HANDLING);
if (blockingWait_ && asyncErrorHandling_) {
LOG(INFO) << "[Rank " << rank_
<< "] NCCL_BLOCKING_WAIT and NCCL_ASYNC_ERROR_HANDLING "
<< "should not both be enabled. "
<< "Only NCCL_BLOCKING_WAIT is being used in this process.";
asyncErrorHandling_ = false;
}
#ifdef ENABLE_NCCL_ERROR_CHECKING
ncclCommWatchdogThread_ =
std::thread(&ProcessGroupNCCL::ncclCommWatchdog, this);
#endif
if (asyncErrorHandling_) {
workCleanupThread_ = std::thread(&ProcessGroupNCCL::workCleanupLoop, this);
}
const char * ncclDebugLevel = std::getenv("NCCL_DEBUG");
if (!ncclDebugLevel) {
ncclDebugLevel = "UNSET";
}
LOG(INFO) << "[Rank " << rank_
<< "] ProcessGroupNCCL initialized with following options:"
<< "\nNCCL_ASYNC_ERROR_HANDLING: " << asyncErrorHandling_
<< "\nNCCL_BLOCKING_WAIT: " << blockingWait_
<< "\nTIMEOUT(ms): " << options_->timeout.count()
<< "\nUSE_HIGH_PRIORITY_STREAM: "
<< options_->is_high_priority_stream
<< "\nNCCL_DEBUG: " << ncclDebugLevel;
}
void ProcessGroupNCCL::setSequenceNumberForGroup() {
if (rank_ == 0) {
// Create and broadcast sequence number
auto seq = 1 + rand();
sequenceNum_ = c10d::SequenceNum(seq);
std::vector<uint8_t> values = c10d::toVec<uint8_t>(seq, kBytes);
store_->set(kSeqNumStoreKey, values);
} else {
// Read rank 0's sequence number from store.
sequenceNum_ = c10d::SequenceNum();
store_->wait({kSeqNumStoreKey}, options_->timeout);
std::vector<uint8_t> values = store_->get(kSeqNumStoreKey);
uint64_t num = c10d::fromVec<uint8_t>(values);
sequenceNum_->set(num);
}
}
uint64_t ProcessGroupNCCL::getSequenceNumberForGroup() {
TORCH_CHECK(
sequenceNum_ != c10::nullopt,
"Sequence number is not set for rank ", rank_
);
return sequenceNum_->get();
}
ProcessGroupNCCL::~ProcessGroupNCCL() {
terminateProcessGroup_.store(true);
watchdogCV_.notify_one();
#ifdef ENABLE_NCCL_ERROR_CHECKING
ncclCommWatchdogThread_.join();
#endif
if (asyncErrorHandling_) {
workMetaListCV_.notify_one();
workCleanupThread_.join();
}
{
// Abort all NCCL Communicators on Process Group Destruction
std::lock_guard<std::mutex> lock(mutex_);
for (auto & it : devNCCLCommMap_) {
auto& ncclComms = it.second;
for (const auto& ncclComm : ncclComms) {
ncclComm->ncclCommAbort();
}
}
}
}
void ProcessGroupNCCL::abortTimedOutCollectives(std::unordered_set<std::string>& abortedCommIds) {
std::unique_lock<std::mutex> lock(workMetaListMutex_);
for (auto& work : workMetaList_) {
work.checkAndSetException();
// Aborting NCCL Communicators due to errors is already handled above.
if (work.exception()) {
continue;
}
// Check for Timeouts in the WorkNCCL Operations, and abort all
// communicators accordingly.
if (work.timedOut()) {
auto currentTimepoint = std::chrono::steady_clock::now();
auto timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
currentTimepoint - work.workStartTime_);
std::string exceptionMsg = c10::str("[Rank ", rank_, "] ",
"Watchdog caught collective operation timeout: ",
work,
" ran for ",
timeElapsed.count(),
" milliseconds before timing out.");
LOG(ERROR) << exceptionMsg;
std::exception_ptr exception_ptr = std::make_exception_ptr(
std::runtime_error(exceptionMsg));
work.setException(exception_ptr);
for (const auto& ncclComm : work.ncclComms_) {
ncclComm->ncclCommAbort();
abortedCommIds.emplace(buildNcclUniqueIdStr(ncclComm->getNcclId()));
}
}
}
}
void ProcessGroupNCCL::ncclCommWatchdog() {
try {
LOG(INFO) << "[Rank " << rank_ << "] NCCL watchdog thread started!";
ncclCommWatchdogInternal();
LOG(INFO) << "[Rank " << rank_
<< "] NCCL watchdog thread terminated normally";
} catch (std::exception& e) {
LOG(INFO) << "[Rank " << rank_
<< "] NCCL watchdog thread terminated with exception: "
<< e.what();
} catch (...) {
LOG(INFO) << "[Rank " << rank_
<< "] NCCL watchdog thread terminated with unknown exception";
}
}
void ProcessGroupNCCL::ncclCommWatchdogInternal() {
while (!terminateProcessGroup_.load()) {
std::unordered_set<std::string> abortedCommIds;
std::unordered_set<std::string> allCommIds;
{
// Loop through the cache of communicators for NCCL errors.
std::lock_guard<std::mutex> lock(mutex_);
for (auto & it : devNCCLCommMap_) {
auto& ncclComms = it.second;
for (const auto& ncclComm : ncclComms) {
allCommIds.emplace(buildNcclUniqueIdStr(ncclComm->getNcclId()));
}
std::exception_ptr ncclErrorException = checkForNCCLErrors(ncclComms);
if (ncclErrorException) {
LOG(INFO) << "[Rank " << rank_
<< "] Received NCCL errors for communicators in the cache: \n"
<< "NCCL error: \n"
<< getExceptionMsgFromExceptionPtr(ncclErrorException);
if (blockingWait_ || asyncErrorHandling_) {
LOG(INFO) << "[Rank " << rank_
<< "] Aborting communicators that received errors";
// We abort NCCL communicators that have received errors from this
// thread, and exceptions are set on the corresponding work objects.
// The workCleanupThread will then loop through the unfinished
// collectives and throw exceptions if an exception has been set on
// any of the work objects from this thread.
for (const auto& ncclComm : ncclComms) {
ncclComm->ncclCommAbort();
// Note that we don't remove the aborted communicators from the
// cache. The reason is that if we do remove the communicator
// from the cache, it is possible that a new collective operation
// calls `ncclCommInitRank` to create a new communicator whereas
// other ranks might have failed/timed out and didn't enter
// `ncclCommInitRank`. As a result, when there is a failure on
// a communicator the application receives an exception and its
// their responsibility to destroy the process group and recreate
// it to recover from errors.
abortedCommIds.emplace(
buildNcclUniqueIdStr(ncclComm->getNcclId()));
}
}
}
}
}
if (asyncErrorHandling_) {
abortTimedOutCollectives(abortedCommIds);
}
if (blockingWait_) {
// When we abort a communicator on one rank, it is likely that might cause
// other ranks to hang indefinitely. As a result, whenever we abort a
// communicator, we write its ID to the store. The watchdog on other ranks
// then monitor the store, find an aborted communicator ID and abort their
// respective communicator as well.
// Record the aborted communicators locally and in the store.
for (const auto& abortedCommId : abortedCommIds) {
abortedComms_.emplace(abortedCommId);
const auto& storeKey = getNcclAbortedCommStoreKey(abortedCommId);
auto rankStr = std::to_string(rank_);
store_->set(
storeKey,
std::vector<uint8_t>(
reinterpret_cast<const uint8_t*>(rankStr.data()),
reinterpret_cast<const uint8_t*>(rankStr.data()) +
rankStr.size()));
LOG(INFO) << "[Rank " << rank_
<< "] Watchdog wrote aborted communicator id to store: "
<< storeKey;
}
// Check for any communicators in the store and abort them if needed.
for (const auto& commId : allCommIds) {
if (abortedComms_.find(commId) == abortedComms_.end()) {
// Check if we need to abort them if not already aborted (shouldn't
// wait more than the watchdog sleep time.).
const auto& storeKey = getNcclAbortedCommStoreKey(commId);
try {
store_->wait(
{storeKey},
std::chrono::milliseconds(kWaitForAbortCommStoreKey));
auto val = store_->get(storeKey);
std::string rank(reinterpret_cast<char*>(val.data()), val.size());
LOG(INFO) << "[Rank " << rank_
<< "] Found key in store: " << storeKey
<< ", from rank: " << rank
<< ", aborting appropriate communicators";
// Now abort the appropriate communicators.
std::lock_guard<std::mutex> lock(mutex_);
auto it = ncclIdToCommMap_.find(commId);
TORCH_INTERNAL_ASSERT(it != ncclIdToCommMap_.end());
for (const auto& ncclComm : it->second) {
ncclComm->ncclCommAbort();
}
abortedComms_.emplace(commId);
LOG(INFO) << "[Rank " << rank_
<< "] Aborted communicators for key in store: "
<< storeKey;
} catch (std::exception& e) {
VLOG(1) << "Did not find key in store: " << storeKey
<< ", error: " << e.what();
}
}
}
}
std::unique_lock<std::mutex> lock(watchdogCVMutex_);
watchdogCV_.wait_for(
lock,
std::chrono::milliseconds(kWatchdogThreadSleepMillis),
[&]() -> bool { return terminateProcessGroup_.load(); });
}
}
void ProcessGroupNCCL::workCleanupLoop() {
bool done = false;
while (!terminateProcessGroup_.load() || !done) {
std::list<WorkNCCL> doneWorks;
{
std::unique_lock<std::mutex> lock(workMetaListMutex_);
// We busy-poll the work vector every kWatchdogThreadSleepMillis
// milliseconds as long as the atomic is True.
workMetaListCV_.wait_for(
lock,
std::chrono::milliseconds(kWorkCleanupThreadSleepMillis),
[&]() -> bool { return terminateProcessGroup_.load(); });
for (auto it = workMetaList_.begin(); it != workMetaList_.end();
/* no increment*/) {
auto& work = *it;
if (work.isCompleted()) {
// Handle Exceptions on failed GPU operations and remove completed
// workNCCL objects from work vector.
if (!terminateProcessGroup_.load()) {
work.handleNCCLGuard();
}
doneWorks.push_back(std::move(*it));
it = workMetaList_.erase(it);
} else {
// Increment the iterator if the current WorkNCCL object is not
// completed.
++it;
}
}
done = workMetaList_.empty();
}
doneWorks.clear();
}
}
std::exception_ptr ProcessGroupNCCL::WorkNCCL::checkForNCCLErrors(
const std::vector<std::shared_ptr<NCCLComm>>& ncclComms) const {
return checkForNCCLErrorsInternal(ncclComms);
}
std::exception_ptr ProcessGroupNCCL::checkForNCCLErrors(
const std::vector<std::shared_ptr<NCCLComm>>& ncclComms) {
return checkForNCCLErrorsInternal(ncclComms);
}
std::exception_ptr ProcessGroupNCCL::checkForNCCLErrorsInternal(
const std::vector<std::shared_ptr<NCCLComm>>& ncclComms) {
for (const auto& ncclComm : ncclComms) {
ncclResult_t ncclAsyncErr = ncclComm->checkForNcclError();
if (ncclAsyncErr != ncclSuccess) {
return std::make_exception_ptr(std::runtime_error(
"NCCL error: " + ncclGetErrorWithVersion(ncclAsyncErr) + "\n" +
getNcclErrorDetailStr(ncclAsyncErr)));
}
}
return nullptr;
}
void ProcessGroupNCCL::broadcastUniqueNCCLID(
ncclUniqueId* ncclID,
OpType opType,
const std::string& p2pKey,
int p2pRank) {
// For collective operations:
// For every NCCL communicator that we create we need to broadcast
// a unique ID from rank 0 to all other ranks. This broadcast is
// done by rank 0 setting a key in the store and all other ranks
// retrieving the contents of that key. A single process group
// may create multiple NCCL communicators, so we use a sequence
// number to differentiate between them.
// For point-to-point operations:
// The sequence number will only be increased on 2 out of all the
// processes in a Process Group. So all following collective
// operations will see different sequence numbers which will cause
// runtime errors. To avoid that, use the src:target pair instead
// of sequence number for p2p communications.
std::string storeKey;
if (!isP2POp(opType)) {
storeKey = std::to_string(ncclCommCounter_++);
} else {
storeKey = p2pKey;
}
if (rank_ == 0 || (isP2POp(opType) && p2pRank == 0)) {
auto vec = std::vector<uint8_t>(
reinterpret_cast<uint8_t*>(ncclID),
reinterpret_cast<uint8_t*>(ncclID) + NCCL_UNIQUE_ID_BYTES);
store_->set(storeKey, vec);
} else {
auto vec = store_->get(storeKey);
TORCH_CHECK(vec.size() == NCCL_UNIQUE_ID_BYTES);
std::memcpy(ncclID, vec.data(), vec.size());
}
}
std::vector<std::shared_ptr<NCCLComm>>& ProcessGroupNCCL::getNCCLComm(
const std::string& devicesKey,
const std::vector<at::Device>& devices,
OpType opType,
int p2pRank,
bool isSendRecvSelf) {
// Sanity check
if (devicesKey.empty()) {
throw std::runtime_error(
"Not able to create/get the NCCL Communicator since "
"the GPU devices are not known");
}
for (auto& device : devices) {
usedDeviceIdxs_.insert(device.index());
}
{
std::lock_guard<std::mutex> lock(mutex_);
if (devNCCLCommMap_.find(devicesKey) != devNCCLCommMap_.end()) {
// Reuse the cached communicator if there is one.
return devNCCLCommMap_[devicesKey];
}
}
// NCCL communicator not cached, create a new entry
std::vector<std::shared_ptr<NCCLComm>> ncclComms;
ncclComms.resize(devices.size());
// Create the unique NCCL ID and broadcast it
ncclUniqueId ncclID;
// For point-to-point communication, lower rank of the two will get unique id.
if (rank_ == 0 || (isP2POp(opType) && p2pRank == 0)) {
C10D_NCCL_CHECK(ncclGetUniqueId(&ncclID));
}
// For point-to-point communication on the same process, don't need broadcast.
if (!isSendRecvSelf) {
// Broadcast so that each process can have a unique NCCL ID
broadcastUniqueNCCLID(&ncclID, opType, devicesKey, p2pRank);
}
at::cuda::OptionalCUDAGuard gpuGuard;
std::vector<at::cuda::CUDAStream> streamVal;
streamVal.reserve(devices.size());
// [Group Start/End Note] This is used to ensure that nccl communicator will be created
// before communication primitives are called. Let's look at this example:
// Using the batch_isend_irecv to send a tensor to a target process. On the sender side,
// the corresponding underlying NCCL calls will look like
// ncclGroupStart() // This is in batch_isend_irecv
// ncclGroupStart() // This is [Note 1]
// ncclCommInitRank() // Inside NCCLComm::create
// ncclSend()
// ncclGroupEnd() // This is [Note 2]
// ncclGroupEnd() // This is in batch_isend_irecv
// With this pattern, the nccl communicator will be created in the last ncclGroupEnd
// which means when ncclSend is processed, the passed communicator argument is NULL which will
// lead to runtime error. So we need to "close" all active nccl groups to ensure
// nccl communicator is actually created before encountering any communication calls.
// This is why we need the following for loop.
for (size_t i = 0; i < ncclActiveGroupCounter_; ++i) {
C10D_NCCL_CHECK(ncclGroupEnd());
}
// [Note 1] Create the NCCL communicators for each GPU
C10D_NCCL_CHECK(ncclGroupStart());
for (size_t i = 0; i < devices.size(); ++i) {
// GPU world size and GPU rank
int numRanks, rank;
if (!isP2POp(opType)) {
numRanks = getSize() * devices.size();
rank = getRank() * devices.size() + i;
} else if(isSendRecvSelf) {
// Same process send and recv.
numRanks = 1;
rank = 0;
} else {
// For point-to-point operation, there are only 2 processes involved so
// the GPU rank is either 0 or 1.
numRanks = 2;
rank = p2pRank;
}
// Get the device index
int deviceIndex = devices[i].index();
gpuGuard.set_index(deviceIndex);
ncclComms[i] = NCCLComm::create(numRanks, rank, ncclID);
// Creates the NCCL streams
streamVal.push_back(
at::cuda::getStreamFromPool(options_->is_high_priority_stream));
}
// [Note 2 ]
C10D_NCCL_CHECK(ncclGroupEnd());
// See [Group Start/End Note]
for (size_t i = 0; i < ncclActiveGroupCounter_; ++i) {
C10D_NCCL_CHECK(ncclGroupStart());
}
ncclStreams_.emplace(devicesKey, std::move(streamVal));
// Note: these events are created with the (default) cudaEventDisableTiming
// flag This flag provides the best performance when used with
// cudaStreamWaitEvent() and cudaEventQuery(). Since we here don't measure the
// performance using cudaEvent, this should be set.
ncclEvents_.emplace(
std::piecewise_construct,
std::make_tuple(devicesKey),
std::make_tuple(devices.size()));
// Hold the lock before modifying the cache.
std::lock_guard<std::mutex> lock(mutex_);
// Record the communicators based on ncclUniqueId.
ncclIdToCommMap_.emplace(buildNcclUniqueIdStr(ncclID), ncclComms);
// Move the NCCL resource to cache
devNCCLCommMap_.emplace(devicesKey, std::move(ncclComms));
return devNCCLCommMap_[devicesKey];
}
namespace {
// Check validity of tensor
void check_gpu_single_tensor(const at::Tensor& tensor) {
if (!tensor.is_cuda() || tensor.is_sparse()) {
throw std::runtime_error("Tensors must be CUDA and dense");
}
if (!tensor.is_contiguous()) {
throw std::runtime_error("Tensors must be contiguous");
}
}
// Check that all `tensors' have the same type and shape and are distributed
// across distinct GPUs.
void check_gpu_tensors(const std::vector<at::Tensor>& tensors) {
if (tensors.size() == 0) {
throw std::runtime_error("Tensor list must be nonempty");
}
if (tensors.size() > static_cast<size_t>(at::cuda::getNumGPUs())) {
throw std::runtime_error(
"Tensor list mustn't be larger than the number of available GPUs");
}
const auto& first = tensors.front();
// Set for ensuring that tensors are on separate devices.
std::unordered_set<decltype(first.get_device())> usedDevices;
usedDevices.reserve(tensors.size());
for (const auto& t : tensors) {
if (!t.is_cuda() || t.is_sparse()) {
throw std::runtime_error("Tensors must be CUDA and dense");
}
if (t.scalar_type() != first.scalar_type()) {
throw std::runtime_error("Tensors must have identical type");
}
if (t.sizes() != first.sizes()) {
throw std::runtime_error("Tensors must have identical size");
}
if (t.strides() != first.strides()) {
throw std::runtime_error("Tensors must have identical strides");
}
if (!t.is_non_overlapping_and_dense()) {
throw std::runtime_error("Tensors must be non-overlapping and dense");
}
const auto inserted = usedDevices.insert(t.get_device()).second;
if (!inserted) {
throw std::runtime_error("Tensors must be on distinct GPU devices");
}
}
}
// Flatten each list in `tensor_lists' for a gather or scatter operation, and
// ensure compatibility with the corresponding tensor in `other'.
std::vector<at::Tensor> flatten_for_scatter_gather(
std::vector<std::vector<at::Tensor>>& tensor_lists,
std::vector<at::Tensor>& other,
size_t world_size) {
if (tensor_lists.size() != other.size()) {
throw std::runtime_error(
"Tensor list operands to scatter/gather must have the same length");
}
const auto num_devices = tensor_lists.size();
std::vector<at::Tensor> flattened;
flattened.resize(num_devices);
for (auto i = size_t{}; i < num_devices; ++i) {
if (tensor_lists[i].size() != world_size * num_devices) {
throw std::runtime_error(
"Tensor list input to scatter/gather must match number of collective"
" participants");
}
// Only check device match for the first tensor in the list; the call to
// newLikeFlat() below will check the rest.
if (tensor_lists[i].front().get_device() != other[i].get_device()) {
throw std::runtime_error(
"Corresponding input/output tensors to scatter/gather must all reside"
" on the same device");
}
for (const auto& t : tensor_lists[i]) {
if (t.numel() != other[i].numel()) {
throw std::runtime_error(
"All tensor operands to scatter/gather must have the same number of elements");
}
}
// Flatten the tensors (from all ranks) into a single big tensor.
flattened[i] = newLikeFlat(tensor_lists, i);
}
return flattened;
}
} // namespace
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
std::vector<at::Device> devices,
int rank,
OpType opType,
const char* profilingTitle, const c10::optional<std::vector<at::Tensor>>& inputs) {
return c10::make_intrusive<ProcessGroupNCCL::WorkNCCL>(devices, rank, opType, profilingTitle, inputs);
}
std::vector<at::Tensor> ProcessGroupNCCL::WorkNCCL::result() {
return *outputs_;
}
c10::intrusive_ptr<c10::ivalue::Future> ProcessGroupNCCL::WorkNCCL::
getFuture() {
return future_;
}
void ProcessGroupNCCL::workEnqueue(
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> work) {
if (!terminateProcessGroup_.load()) {
std::lock_guard<std::mutex> lock(workMetaListMutex_);
// Avoid view tensors to be processed in cleanup thread.
// View tensors' destruction invokes autograd_meta, which
// needs to be destructed in user thread. Otherwise will
// get deadlock. Here we enqueue work without outputs_.
workMetaList_.emplace_back(WorkNCCL(*work));
}
}
ProcessGroupNCCL::Options::Options(bool is_high_priority_stream)
: ProcessGroup::Options(NCCL_BACKEND_NAME),
is_high_priority_stream(is_high_priority_stream) {}
template <typename Fn, typename PreProcess, typename PostProcess>
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
std::vector<at::Tensor>& inputs,
std::vector<at::Tensor>& outputs,
Fn fn,
PreProcess pre,
PostProcess post,
OpType opType,
const char* profilingTitle) {
// Bump collective counter
if (sequenceNum_) {
sequenceNum_->increment();
}
const auto devices = getDeviceList(inputs);
const auto key = getKeyFromDevices(devices);
auto& ncclComms = getNCCLComm(key, devices, opType);
// First let NCCL streams wait for input tensors allocation streams
syncStreams(devices, ncclEvents_[key], ncclStreams_[key]);
// Work itself will create the CUDA events on all GPUs of tensors
bool can_profile = outputs.size() == 1;
auto work = initWork(devices, rank_, opType, can_profile ? profilingTitle : nullptr, can_profile ? c10::optional<std::vector<at::Tensor>>(inputs) : c10::nullopt);
// Store references to outputs to be used by WorkNCCL::result and operator<<.
work->outputs_ = std::make_shared<std::vector<at::Tensor>>(outputs);
at::cuda::OptionalCUDAGuard gpuGuard;
pre(ncclStreams_[key]);
for (size_t i = 0; i < inputs.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// Both `inputs' and `outputs' are created on a worker stream and used in
// different ncclStreams. Hence, both must record the ncclStream to
// prevent being freed before the collective finishes.
//
// We only record `inputs' here, and leave recording `outputs' to `fn' for
// operations where `inputs' and `outputs' are not the same.
//
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
inputs[i].storage().data_ptr(), ncclStream);
}
{
AutoNcclGroup nccl_group_guard;
for (size_t i = 0; i < inputs.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
C10D_NCCL_CHECK(
fn(inputs[i], outputs[i], ncclComms[i]->getNcclComm(), ncclStream));
}
}
post(ncclStreams_[key]);
// Event should only be recorded after the ncclGroupEnd()
for (size_t i = 0; i < inputs.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
(*work->cudaEvents_)[i].record(ncclStream);
work->ncclComms_[i] = ncclComms[i];
}
{
at::cuda::CUDAMultiStreamGuard streamGuard(ncclStreams_[key]);
work->future_ = c10::make_intrusive<at::cuda::CUDAFuture>(
c10::ListType::create(c10::TensorType::get()),
getIndicesOfDevices(devices));
// Add a callback that runs profiling end callbacks. wrapCallback() in CUDA
// future blocks the stream this callback runs on the corresponding
// cudaEvents_ ensuring appropriate synchronization.
if (work->recordFunctionEndCallback_) {
work->future_->addCallback([work]() {
work->recordFunctionEndCallback_();
});
}
work->future_->markCompleted(at::IValue(*work->outputs_));
}
// Set appropriate work parameters.
work->blockingWait_ = blockingWait_;
work->opTimeout_ = options_->timeout;
work->store_ = store_;
if (asyncErrorHandling_) {
workEnqueue(work);
}
return work;
}
template <typename Fn, typename PreProcess, typename PostProcess>
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::pointToPoint(
std::vector<at::Tensor>& tensors,
Fn fn,
int peer,
OpType opType,
PreProcess pre,
PostProcess post) {
const auto devices = getDeviceList(tensors);
const auto key = getKeySendRecv(rank_, peer);
int p2pRank = rank_ <= peer ? 0 : 1;
auto isSendRecvSelf = rank_ == peer;
auto& ncclComms = getNCCLComm(key, devices, opType, p2pRank, isSendRecvSelf);
// First let NCCL streams wait for input tensors allocation streams
syncStreams(devices, ncclEvents_[key], ncclStreams_[key]);
// Work itself will create the CUDA events on all GPUs of tensors
auto work = initWork(devices, rank_, opType);
if (opType == OpType::RECV) {
// Store references to outputs to be used by WorkNCCL::result and operator<<.
work->outputs_ = std::make_shared<std::vector<at::Tensor>>(tensors);
}
at::cuda::OptionalCUDAGuard gpuGuard;
pre(ncclStreams_[key]);
for (size_t i = 0; i < tensors.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// Both send tensor and recv tensor are created on a worker stream and used in
// different ncclStreams. Hence, both must record the ncclStream to
// prevent being freed before the collective finishes.
//
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
tensors[i].storage().data_ptr(), ncclStream);
}
{
AutoNcclGroup nccl_group_guard;
for (size_t i = 0; i < tensors.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
// For point-to-point communication, NCCL ranks can only
// be 0 or 1.
int p2pTargetRank = isSendRecvSelf ? 0 : 1 - p2pRank;
C10D_NCCL_CHECK(
fn(tensors[i], ncclComms[i]->getNcclComm(), ncclStream, p2pTargetRank));
}
}
post(ncclStreams_[key]);
// Event should only be recorded after the ncclGroupEnd()
for (size_t i = 0; i < tensors.size(); ++i) {
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
(*work->cudaEvents_)[i].record(ncclStream);
work->ncclComms_[i] = ncclComms[i];
work->blockingWait_ = blockingWait_;
work->opTimeout_ = options_->timeout;
work->store_ = store_;
}
if (opType == OpType::RECV) {
at::cuda::CUDAMultiStreamGuard streamGuard(ncclStreams_[key]);
work->future_ = c10::make_intrusive<at::cuda::CUDAFuture>(
c10::ListType::create(c10::TensorType::get()),
getIndicesOfDevices(devices));
work->future_->markCompleted(at::IValue(*work->outputs_));
}
return work;
}
template <typename Fn>
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
std::vector<at::Tensor>& inputs,
std::vector<at::Tensor>& outputs,
Fn fn,
OpType opType,
const char* profilingTitle) {
return collective(
inputs,
outputs,
fn,
[](std::vector<at::cuda::CUDAStream>&) {},
[](std::vector<at::cuda::CUDAStream>&) {},
opType,
profilingTitle);
}
template <typename Fn>
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::pointToPoint(
std::vector<at::Tensor>& tensor,
Fn fn,
int peer,
OpType opType) {
return pointToPoint(
tensor,
fn,
peer,
opType,
[](std::vector<at::cuda::CUDAStream>&) {},
[](std::vector<at::cuda::CUDAStream>&) {});
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
check_gpu_tensors(tensors);
// @lint-ignore CLANGTIDY
auto tensor = tensors.back();
RECORD_PARAM_COMMS(
rank_, // rank
"allreduce", // colName
tensor.numel(), // inSize
tensor.numel(), // outSize
tensor.scalar_type(), // dType
std::vector<int64_t>(), // inSplitSizes
std::vector<int64_t>()); // outSplitSizes
return collective(
tensors,
tensors,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
return ncclAllReduce(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
getNcclReduceOp(opts.reduceOp, input),
comm,
stream.stream());
},
OpType::ALLREDUCE,
"nccl:all_reduce");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce_coalesced(
std::vector<at::Tensor>& tensors,
const AllreduceCoalescedOptions& opts) {
throw std::runtime_error(
"allreduce_coalesced is currently not supported with NCCL");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::broadcast(
std::vector<at::Tensor>& tensors,
const BroadcastOptions& opts) {
check_gpu_tensors(tensors);
// @lint-ignore CLANGTIDY
auto tensor = tensors.back();
RECORD_PARAM_COMMS(
rank_, // rank
"broadcast", // colName
tensor.numel(), // inSize
tensor.numel(), // outSize
tensor.scalar_type(), // dType
{}, // inSplitSizes
{}); // outSplitSizes
return collective(
tensors,
tensors,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
const auto root = opts.rootRank * tensors.size() + opts.rootTensor;
return ncclBcast(
input.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
root,
comm,
stream.stream());
},
OpType::BROADCAST,
"nccl:broadcast");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts) {
check_gpu_tensors(tensors);
// @lint-ignore CLANGTIDY
auto tensor = tensors.back();
RECORD_PARAM_COMMS(
rank_, // rank
"reduce", // colName
tensor.numel(), // inSize
tensor.numel(), // outSize
tensor.scalar_type(), // dType
{}, // inSplitSizes
{}); // outSplitSizes
return collective(
tensors,
tensors,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
const auto root = opts.rootRank * tensors.size() + opts.rootTensor;
return ncclReduce(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
getNcclReduceOp(opts.reduceOp, input),
root,
comm,
stream.stream());
},
OpType::REDUCE,
"nccl:reduce");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts) {
check_gpu_tensors(inputTensors);
auto outputFlattened =
flatten_for_scatter_gather(outputTensors, inputTensors, size_);
check_gpu_tensors(outputFlattened);
// @lint-ignore CLANGTIDY
auto tensor = inputTensors.back();
RECORD_PARAM_COMMS(
rank_, // rank
"all_gather", // colName
tensor.numel(), // inSize
tensor.numel() * // outSize
this->getSize(), // dType
tensor.scalar_type(), // inSplitSizes
{}, // outSplitSizes
{});
return collective(
inputTensors,
outputFlattened,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
c10::cuda::CUDACachingAllocator::recordStream(
output.storage().data_ptr(), stream);
return ncclAllGather(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
comm,
stream.stream());
},
[&](std::vector<at::cuda::CUDAStream>& ncclStreams) {},
[&](std::vector<at::cuda::CUDAStream>& ncclStreams) {
// Copy the flattened output tensors to the outputs.
for (size_t i = 0; i < outputTensors.size(); ++i) {
at::cuda::CUDAStreamGuard guard(ncclStreams[i]);
for (size_t j = 0; j < outputTensors[0].size(); ++j) {
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
outputTensors[i][j].storage().data_ptr(), ncclStreams[i]);
outputTensors[i][j].copy_(outputFlattened[i][j], true);
}
}
},
OpType::ALLGATHER,
"nccl:all_gather");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::allgather_coalesced(
std::vector<std::vector<at::Tensor>>& /* unused */,
std::vector<at::Tensor>& /* unused */,
const AllgatherOptions& /* unused */) {
throw std::runtime_error(
"ProcessGroupNCCL does not support allgather_coalesced");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::reduce_scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ReduceScatterOptions& opts) {
check_gpu_tensors(outputTensors);
// @lint-ignore CLANGTIDY
auto tensor = outputTensors.back();
RECORD_PARAM_COMMS(
rank_, // rank
"reduce_scatter", // colName
tensor.numel() * // inSize
this->getSize(), // outSize
tensor.numel(), // dType
tensor.scalar_type(), // inSplitSizes
{}, // outSplitSizes
{});
auto inputFlattened =
flatten_for_scatter_gather(inputTensors, outputTensors, size_);
check_gpu_tensors(inputFlattened);
return collective(
inputFlattened,
outputTensors,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
c10::cuda::CUDACachingAllocator::recordStream(
output.storage().data_ptr(), stream);
return ncclReduceScatter(
input.data_ptr(),
output.data_ptr(),
output.numel(),
getNcclDataType(input.scalar_type()),
getNcclReduceOp(opts.reduceOp, input),
comm,
stream.stream());
},
[&](std::vector<at::cuda::CUDAStream>& ncclStreams) {
// Copy the input tensors to the flattened inputs.
for (size_t i = 0; i < inputTensors.size(); ++i) {
at::cuda::CUDAStreamGuard guard(ncclStreams[i]);
for (size_t j = 0; j < inputTensors[0].size(); ++j) {
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
inputTensors[i][j].storage().data_ptr(), ncclStreams[i]);
inputFlattened[i][j].copy_(inputTensors[i][j], true);
}
}
},
[&](std::vector<at::cuda::CUDAStream>& ncclStreams) {},
OpType::REDUCE_SCATTER,
"nccl:reduce_scatter");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::barrier(
const BarrierOptions& opts) {
RECORD_PARAM_COMMS(
rank_, // rank
"barrier", // colName
0, // inSize
0, // outSize
at::kByte, // dType
{}, // inSplitSizes
{}); // outSplitSizes
std::vector<at::Device> devices;
// Use user defined GPU device ids if provided
if (!opts.device_ids.empty()) {
for (auto device : opts.device_ids) {
devices.emplace_back(at::DeviceType::CUDA, device);
}
} else if (usedDeviceIdxs_.empty()) {
// This means there is not yet a NCCL collective being called
// Here we have to use the best guesses and will use a single GPU to call
// allreduce to achieve barrier.
// In case the multiple processes fall into the same node, we use rank to
// ensure that each process is on a different GPU
auto numGPUs = at::cuda::getNumGPUs();
int16_t deviceIdx = static_cast<int16_t>(rank_ % numGPUs);
LOG(WARNING) << c10::str(
"Rank ",
this->getRank(),
" using best-guess GPU ",
deviceIdx,
" to perform barrier as devices used by this process are currently unknown. ",
"This can potentially cause a hang if this rank to GPU mapping is incorrect.",
"Specify device_ids in barrier() to force use of a particular device."
);
devices.emplace_back(at::DeviceType::CUDA, deviceIdx);
} else {
for (auto usedDeviceIdx : usedDeviceIdxs_) {
devices.emplace_back(at::DeviceType::CUDA, usedDeviceIdx);
}
}
std::vector<at::Tensor> barrierTensors;
barrierTensors.reserve(devices.size());
at::cuda::OptionalCUDAGuard gpuGuard;
for (auto& device : devices) {
gpuGuard.set_index(device.index());
barrierTensors.push_back(at::empty(
{1},
at::TensorOptions().device(at::DeviceType::CUDA).dtype(at::kByte)));
}
// All reduce to achieve the barrier
auto work = allreduce(barrierTensors);
// Work will take over barrierTensors
auto ncclWork = dynamic_cast<ProcessGroupNCCL::WorkNCCL*>(work.get());
TORCH_CHECK(ncclWork);
ncclWork->barrierTensors_ = std::move(barrierTensors);
return work;
}
#ifdef ENABLE_NCCL_P2P_SUPPORT
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::alltoall_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
std::vector<int64_t>& outputSplitSizes,
std::vector<int64_t>& inputSplitSizes,
const AllToAllOptions& /* unused */) {
check_gpu_single_tensor(outputTensor);
check_gpu_single_tensor(inputTensor);
if (outputSplitSizes.size() == 0 && inputSplitSizes.size() == 0) {
std::vector<at::Tensor> inputTensors = {inputTensor};
std::vector<at::Tensor> outputTensors = {outputTensor};
RECORD_PARAM_COMMS(
rank_, // rank
"all_to_all", // colName
inputTensor.numel(), // inSize
outputTensor.numel(), // outSize
at::kByte, // dType
{}, // inSplitSizes
{}); // outSplitSizes
return collective(
inputTensors,
outputTensors,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
output.storage().data_ptr(), stream);
torch::cuda::nccl::all2all_single_equal_split(
input,
output,
this->getSize(),
comm,
stream);
return ncclSuccess;
},
OpType::ALLTOALL_BASE,
"nccl:all_to_all");
} else {
c10d::checkSplitSizes(inputSplitSizes, inputTensor, size_);
c10d::checkSplitSizes(outputSplitSizes, outputTensor, size_);
std::vector<at::Tensor> inputTensors = {inputTensor};
std::vector<at::Tensor> outputTensors = {outputTensor};
RECORD_PARAM_COMMS(
rank_, // rank
"all_to_allv", // colName
inputTensor.numel(), // inSize
outputTensor.numel(), // outSize
at::kByte, // dType
std::move(inputSplitSizes), // inSplitSizes
std::move(outputSplitSizes)); // outSplitSizes
return collective(
inputTensors,
outputTensors,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
std::vector<size_t> send_lengths(size_);
std::vector<size_t> recv_lengths(size_);
std::vector<size_t> send_offsets(size_);
std::vector<size_t> recv_offsets(size_);
c10d::computeLengthsAndOffsets(
inputSplitSizes, input, &send_lengths, &send_offsets);
c10d::computeLengthsAndOffsets(
outputSplitSizes, output, &recv_lengths, &recv_offsets);
// See [Sync Streams].
c10::cuda::CUDACachingAllocator::recordStream(
output.storage().data_ptr(), stream);
torch::cuda::nccl::all2all_single_unequal_split(
input.data_ptr(),
send_lengths.data(),
send_offsets.data(),
output.data_ptr(),
recv_lengths.data(),
recv_offsets.data(),
input.element_size(),
input.scalar_type(),
comm,
stream);
return ncclSuccess;
},
OpType::ALLTOALL_BASE,
"nccl:all_to_all");
}
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::alltoall(
std::vector<at::Tensor>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllToAllOptions& /* unused */) {
auto device = outputTensors[0].device();
for (size_t r = 0; r < outputTensors.size(); r++) {
check_gpu_single_tensor(outputTensors[r]);
check_gpu_single_tensor(inputTensors[r]);
TORCH_CHECK(device == outputTensors[r].device() && device == inputTensors[r].device(),
"Tensors must be on the same device")
}
std::vector<at::Tensor> inputTensor0 = {inputTensors[0]};
std::vector<at::Tensor> outputTensor0 = {outputTensors[0]};
return collective(
inputTensor0,
outputTensor0,
[&](at::Tensor& /* unused */,
at::Tensor& /* unused */,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
torch::cuda::nccl::all2all(outputTensors, inputTensors, comm, stream);
return ncclSuccess;
},
OpType::ALLTOALL);
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::send(
std::vector<at::Tensor>& tensors,
int dstRank,
int /* unused */) {
check_gpu_tensors(tensors);
auto ret = pointToPoint(
tensors,
[&](at::Tensor& input,
ncclComm_t comm,
at::cuda::CUDAStream& stream,
int dst) {
torch::cuda::nccl::send(input, comm, stream, dst);
return ncclSuccess;
},
dstRank,
OpType::SEND);
return ret;
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::recv(
std::vector<at::Tensor>& tensors,
int srcRank,
int /* unused */) {
check_gpu_tensors(tensors);
auto ret = pointToPoint(
tensors,
[&](at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream,
int src) {
torch::cuda::nccl::recv(output, comm, stream, src);
return ncclSuccess;
},
srcRank,
OpType::RECV);
return ret;
}
#else
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::alltoall_base(
at::Tensor& /* unused */,
at::Tensor& /* unused */,
std::vector<int64_t>& /* unused */,
std::vector<int64_t>& /* unused */,
const AllToAllOptions& /* unused */) {
throw std::runtime_error(
"ProcessGroupNCCL only supports alltoall* for NCCL lib version >= 2.7.0");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::alltoall(
std::vector<at::Tensor>& /* unused */,
std::vector<at::Tensor>& /* unused */,
const AllToAllOptions& /* unused */) {
throw std::runtime_error(
"ProcessGroupNCCL only supports alltoall* for NCCL lib version >= 2.7.0");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::send(
std::vector<at::Tensor>& /* unused */,
int /* unused */,
int /* unused */) {
throw std::runtime_error(
"ProcessGroupNCCL only supports send for NCCL lib version >= 2.7.0");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::recv(
std::vector<at::Tensor>& /* unused */,
int /* unused */,
int /* unused */) {
throw std::runtime_error(
"ProcessGroupNCCL only supports recv for NCCL lib version >= 2.7.0");
}
#endif
void ProcessGroupNCCL::groupStart() {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupStart());
#endif
++ncclActiveGroupCounter_;
}
void ProcessGroupNCCL::groupEnd() {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupEnd());
#endif
--ncclActiveGroupCounter_;
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::gather(
std::vector<std::vector<at::Tensor>>& /* unused */,
std::vector<at::Tensor>& /* unused */,
const GatherOptions& /* unused */) {
throw std::runtime_error("ProcessGroupNCCL does not support gather");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::scatter(
std::vector<at::Tensor>& /* unused */,
std::vector<std::vector<at::Tensor>>& /* unused */,
const ScatterOptions& /* unused */) {
throw std::runtime_error("ProcessGroupNCCL does not support scatter");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::recvAnysource(
std::vector<at::Tensor>& /* unused */,
int /* unused */) {
throw std::runtime_error("ProcessGroupNCCL does not support recvAnysource");
}
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::_allgather_base(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const AllgatherOptions& /*unused */) {
check_gpu_single_tensor(input_tensor);
check_gpu_single_tensor(output_tensor);
if (input_tensor.dtype() != output_tensor.dtype()) {
throw std::runtime_error("output tensor must have the same type as input tensor");
}
if (input_tensor.numel() * size_ != output_tensor.numel()) {
throw std::runtime_error("output tensor size must be equal to world_size times input tensor size");
}
// just a wrapper to fit the collective interface
auto inputs = std::vector<at::Tensor> {input_tensor};
auto outputs = std::vector<at::Tensor> {output_tensor};
return collective(
inputs,
outputs,
[&](at::Tensor& input,
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
c10::cuda::CUDACachingAllocator::recordStream(
output.storage().data_ptr(), stream);
return ncclAllGather(
input.data_ptr(),
output.data_ptr(),
input.numel(),
getNcclDataType(input.scalar_type()),
comm,
stream.stream());
},
[&](std::vector<at::cuda::CUDAStream>&) {},
[&](std::vector<at::cuda::CUDAStream>&) {},
OpType::_ALLGATHER_BASE,
"nccl:_all_gather_base");
}
} // namespace c10d