blob: 3c86667f5bcec5b486cab530ae6a866d19889113 [file] [log] [blame]
#include "nccl.h"
#include "torch/csrc/cuda/device_set.h"
#include "torch/csrc/utils/functional.h"
#include "torch/csrc/utils/auto_gpu.h"
#include "torch/csrc/utils/hash.h"
#include <unordered_map>
#include <sstream>
#include <ATen/ATen.h>
#include <THC/THC.h>
namespace torch { namespace cuda { namespace nccl {
using namespace at;
namespace detail {
void throw_nccl_error(ncclResult_t status) {
std::ostringstream err;
err << "NCCL Error " << status << ": " << ncclGetErrorString(status);
throw std::runtime_error(err.str());
}
struct NcclCommList {
std::unique_ptr<ncclComm_t[]> comms;
int ndevices;
NcclCommList(const std::vector<int>& devices)
: comms(new ncclComm_t[devices.size()]), ndevices(devices.size()) {
CHECK(ncclCommInitAll(comms.get(), devices.size(), devices.data()));
}
NcclCommList(NcclCommList&& foo) = default;
~NcclCommList() {
if (comms) {
for (int i = 0; i < ndevices; i++) {
int dummy_var;
if (cudaGetDevice(&dummy_var) != cudaSuccess) {
/* there are cases when this destructor is called after the
CUDA driver is already unloaded from the process.
In these cases, skip ncclCommDestroy */
return;
}
ncclCommDestroy(comms[i]);
}
}
}
ArrayRef<ncclComm_t> ref() const {
return ArrayRef<ncclComm_t>(comms.get(), ndevices);
}
};
using device_list = std::vector<int>;
// accesses to this object have to be guarded by THC's CudaFreeMutex
static std::unordered_map<device_list, NcclCommList, torch::hash<device_list>> _communicators;
ArrayRef<ncclComm_t> _get_communicators(TensorList inputs) {
static auto get_device = [](const at::Tensor& t) -> int { return t.get_device(); };
device_list devices = fmap(inputs, get_device);
auto it = _communicators.find(devices);
if (it == _communicators.end())
std::tie(it, std::ignore) = _communicators.emplace(devices, devices);
return it->second.ref();
}
ncclDataType_t _get_data_type(const Type& type) {
if (type.backend() != kCUDA) {
throw std::runtime_error("Unconvertible NCCL type");
}
switch (type.scalarType()) {
case at::kFloat : return ncclFloat;
case at::kHalf : return ncclHalf;
case at::kDouble : return ncclDouble;
case at::kLong : return ncclInt64;
case at::kInt : return ncclInt;
case at::kChar : return ncclChar;
case at::kByte : return ncclChar;
default: throw std::runtime_error("Unconvertible NCCL type");
}
}
void _check_inputs(TensorList inputs, TensorList outputs, int input_multiplier, int output_multiplier) {
// len(inputs) == len(outputs)
size_t len = inputs.size();
if (len <= 0) {
throw std::runtime_error("input sequence can't be empty");
}
if (len != outputs.size()) {
std::stringstream err;
err << "inputs and outputs sequences have to be of the same length, but got input of length " << len << " and output of length " << outputs.size();
throw std::runtime_error(err.str());
}
device_set devices;
int64_t numel = inputs[0].numel();
auto& type = inputs[0].type();
for (size_t i = 0; i < len; i++) {
auto input = inputs[i];
auto output = outputs[i];
if (!(input.type().is_cuda() && !input.type().is_sparse()
&& output.type().is_cuda() && !output.type().is_sparse())) {
throw std::runtime_error("input and output elements have to be cuda dense Tensors");
}
if (!(type == input.type() && type == output.type())) {
throw std::runtime_error("all inputs and outputs must be of the same Tensor type");
}
if (!input.is_contiguous() || !output.is_contiguous()) {
throw std::runtime_error("all inputs and outputs have to be contiguous");
}
auto input_device = input.get_device();
// inputs must be on unique devices
if (devices.test(input_device)) {
throw std::runtime_error("inputs must be on unique devices");
}
devices.set(input_device);
// inputs and outputs must be on same device respectively
if (input_device != output.get_device()) {
throw std::runtime_error("input and output must be on the same device");
}
// all inputs must be same size
if (input.numel() != numel) {
throw std::runtime_error("all inputs must have the same number of elements");
}
if (output.numel() * output_multiplier != numel * input_multiplier) {
throw std::runtime_error("output must be of size input_size * size_multiplier");
}
}
}
} // namespace detail
bool is_available(TensorList tensors) {
#ifdef WITH_NCCL
device_set devices;
for (auto & tensor : tensors) {
auto & type = tensor.type();
if (!type.is_cuda() || type.is_sparse())
return false;
if (!tensor.is_contiguous())
return false;
auto device = tensor.get_device();
if (devices[device])
return false;
devices[device] = true;
}
return true;
#else
return false;
#endif
}
std::uint64_t version() {
#if defined(NCCL_MAJOR)
return NCCL_MAJOR * 1000 + NCCL_MINOR * 100 + NCCL_PATCH;
#elif defined(WITH_NCCL)
return 1000;
#else
return 0;
#endif
}
void broadcast(TensorList tensors, const stream_list& streams, const comm_list& user_comms) {
#ifdef WITH_NCCL
using namespace torch::cuda::nccl::detail;
_check_inputs(tensors, tensors, 1, 1);
ncclDataType_t data_type = _get_data_type(tensors[0].type());
int64_t numel = tensors[0].numel();
std::lock_guard<std::mutex> free_mutex(*(THCCachingAllocator_getCudaFreeMutex()));
const auto comms = user_comms.empty() ? _get_communicators(tensors) : ArrayRef<ncclComm_t>(user_comms);
AutoGPU gpu_guard;
AutoNcclGroup nccl_group_guard;
for (size_t i = 0, num_tensors = tensors.size(); i < num_tensors; i++) {
gpu_guard.setDevice(tensors[i].get_device());
// TODO: use current stream
const auto stream = (streams.empty() || !streams[i]) ? NULL : streams[i]->stream;
CHECK(ncclBcast(tensors[i].data_ptr(), numel, data_type, 0, comms[i], stream));
}
#else
throw std::runtime_error("PyTorch built without NCCL support");
#endif
}
}}}