blob: efd894a3d988b884fe6ccd8999fa84820f2c66f6 [file] [log] [blame]
#include <torch/csrc/python_headers.h>
#include <memory>
#include <unordered_map>
#include <vector>
#include <torch/csrc/utils/python_strings.h>
#include <torch/csrc/distributed/THDP.h>
#include <torch/csrc/PythonTypes.h>
#include <torch/csrc/autograd/python_variable.h>
#ifdef USE_CUDA
#include <torch/csrc/cuda/Stream.h>
#endif
static std::unordered_map<std::string, THDChannelType> name2channel_type = {
{"mpi", THDChannelMPI},
{"tcp", THDChannelTCP},
{"gloo", THDChannelGloo},
{"nccl", THDChannelNccl},
};
static std::unordered_map<PyObject*, THDReduceOp> obj2reduceop;
static std::unordered_map<PyObject*, THDGroup> obj2group;
#ifdef USE_CUDA
extern THCState* state;
#endif
PyObject* THDPModule_initProcessGroup(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 5 || !THPUtils_checkString(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkString(PyTuple_GET_ITEM(args, 1)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 2)) ||
!THPUtils_checkString(PyTuple_GET_ITEM(args, 3)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 4))) {
THPUtils_invalidArguments(args, nullptr, "init_process_group", 1, "(string backend, string init_method, int world_size, string group_name, int rank)");
return nullptr;
}
std::string backend_name = THPUtils_unpackString(PyTuple_GET_ITEM(args, 0));
std::string init_method = THPUtils_unpackString(PyTuple_GET_ITEM(args, 1));
int world_size = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 2));
std::string group_name = THPUtils_unpackString(PyTuple_GET_ITEM(args, 3));
int rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 4));
THDChannelType channel_type = name2channel_type.at(backend_name);
{
AutoNoGIL nogil;
THDProcessGroupInit(channel_type, init_method, world_size, group_name, rank);
}
#ifdef USE_CUDA
THDSetCudaStatePtr(&state);
#endif
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_destroyProcessGroup(PyObject *_unused) {
HANDLE_TH_ERRORS
{
AutoNoGIL nogil;
THDProcessGroupDestroy();
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
#ifdef USE_CUDA
PyObject* THDPModule_registerStream(PyObject *_unused, PyObject *_stream)
{
HANDLE_TH_ERRORS
THPUtils_assert(THCPStream_Check(_stream), "_register_stream expects a "
"torch.cuda.Stream object");
THCPStream *stream = (THCPStream*)_stream;
THDRegisterCudaStream(stream->cuda_stream);
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
#endif
PyObject* THDPModule_getRank(PyObject *_unused)
{
HANDLE_TH_ERRORS
return PyInt_FromLong(THDGetRank());
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_getNumProcesses(PyObject *_unused)
{
HANDLE_TH_ERRORS
return PyInt_FromLong(THDGetNumProcesses());
END_HANDLE_TH_ERRORS
}
#ifdef USE_CUDA
extern PyObject* THCPDoubleTensorClass;
extern PyObject* THCPFloatTensorClass;
extern PyObject* THCPHalfTensorClass;
extern PyObject* THCPLongTensorClass;
extern PyObject* THCPIntTensorClass;
extern PyObject* THCPShortTensorClass;
extern PyObject* THCPCharTensorClass;
extern PyObject* THCPByteTensorClass;
#endif
THDTensorDescriptor THDPModule_makeDescriptor(PyObject *obj) {
auto var = (THPVariable*)obj;
return var->cdata.tensor_data();
}
static THDRequest* _unpackRequest(PyObject *obj)
{
return static_cast<THDRequest*>(THPWrapper_get(obj));
}
static THDReduceOp _getReduceOp(PyObject *obj)
{
auto it = obj2reduceop.find(obj);
if (it == obj2reduceop.end()) {
throw std::runtime_error("op should be a constant from "
"torch.distributed.deprecated.reduce_op");
}
return it->second;
}
static THDGroup _getGroup(PyObject *obj)
{
auto it = obj2group.find(obj);
if (it == obj2group.end()) {
if (!THPUtils_checkLong(obj))
throw std::runtime_error("group should be an int or one of the values "
"from torch.distributed.deprecated.group");
return THPUtils_unpackLong(obj);
}
return it->second;
}
PyObject* THDPModule_clearGroupCache(PyObject *_unused, PyObject *args) {
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 1) {
THPUtils_invalidArguments(args, nullptr, "clear_group_cache", 1, "(group gr)");
return nullptr;
}
THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 0));
{
AutoNoGIL nogil;
THDClearGroupCache(group);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_isend(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "isend", 1, "(tensor input, int dst_rank)");
return nullptr;
}
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
THDRequest* req;
{
AutoNoGIL guard;
req = THDIsend(desc, dst_rank);
}
return THPWrapper_New(req, (void(*)(void*))THDRequest_free);
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_irecv(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "irecv", 1, "(tensor output, int src_rank)");
return nullptr;
}
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
THDRequest* req;
{
AutoNoGIL guard;
req = THDIrecv(desc, src_rank);
}
return THPWrapper_New(req, (void(*)(void*))THDRequest_free);
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_send(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "send", 1, "(tensor input, int dst_rank)");
return nullptr;
}
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDSend(desc, dst_rank);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_recvAnySource(PyObject *_unused, PyObject *_tensor)
{
HANDLE_TH_ERRORS
if (!THPVariable_Check(_tensor)) {
THPUtils_invalidArguments(_tensor, nullptr, "recv", 1, "(tensor output)");
return nullptr;
}
auto desc = THDPModule_makeDescriptor(_tensor);
int sender;
{
AutoNoGIL guard;
sender = THDRecvAnySource(desc);
}
return PyInt_FromLong(sender);
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_recv(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "recv", 1, "(tensor output, int src_rank)");
return nullptr;
}
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDRecv(desc, src_rank);
}
// Return sender rank
Py_INCREF(PyTuple_GET_ITEM(args, 1));
return PyTuple_GET_ITEM(args, 1);
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
std::vector<at::Tensor> descriptors;
size_t length;
THDGroup group;
THDReduceOp op;
THPObjectPtr sequence;
if (PyTuple_GET_SIZE(args) != 3) {
goto invalid_arguments;
}
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
descriptors.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i))) {
goto invalid_arguments;
}
descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
);
}
group = _getGroup(PyTuple_GET_ITEM(args, 2));
op = _getReduceOp(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDAllReduceMultiGPU(descriptors.data(), length, op, group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "all_reduce_multigpu", 1,
"(list[tensor] in_out, reduce_op op, group gr)");
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence;
size_t length;
std::vector<at::Tensor> descriptors;
THDGroup group;
THDReduceOp op;
int dst_rank;
if (PyTuple_GET_SIZE(args) != 4) {
goto invalid_arguments;
}
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
descriptors.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i))) {
goto invalid_arguments;
}
descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
);
}
group = _getGroup(PyTuple_GET_ITEM(args, 3));
op = _getReduceOp(PyTuple_GET_ITEM(args, 2));
dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDReduceMultiGPU(descriptors.data(), length, op, dst_rank, group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "reduce_multigpu", 1,
"(list[tensor] in_out, int dst_rank, "
"reduce_op op, group gr)");
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence;
size_t length;
std::vector<at::Tensor> descriptors;
THDGroup group;
int src_rank;
if (PyTuple_GET_SIZE(args) != 3) {
goto invalid_arguments;
}
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
descriptors.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i))) {
goto invalid_arguments;
}
descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
);
}
group = _getGroup(PyTuple_GET_ITEM(args, 2));
src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDBroadcastMultiGPU(descriptors.data(), length, src_rank, group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "broadcast_multigpu", 1,
"(list[tensor] in_out, int src_rank, group gr)");
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence_one;
THPObjectPtr sequence_two;
size_t length_one;
size_t length_two;
std::vector<at::Tensor> output_descriptors;
std::vector<at::Tensor> input_descriptors;
THDGroup group;
if (PyTuple_GET_SIZE(args) != 3) {
goto invalid_arguments;
}
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!PySequence_Check(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}
sequence_one = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
sequence_two = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 1),
"expected a sequence"));
if (!sequence_one.get() || !sequence_two.get()) {
goto invalid_arguments;
}
length_one = static_cast<size_t>(
PySequence_Fast_GET_SIZE(sequence_one.get()));
length_two = static_cast<size_t>(
PySequence_Fast_GET_SIZE(sequence_two.get()));
if (length_one != length_two) {
goto invalid_arguments;
}
output_descriptors.reserve(length_one);
input_descriptors.reserve(length_two);
// Get the input list
for (size_t i = 0; i < length_two; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence_two.get(), i)) ||
!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence_one.get(), i))) {
goto invalid_arguments;
}
input_descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_two.get(), i))
);
output_descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_one.get(), i))
);
}
group = _getGroup(PyTuple_GET_ITEM(args, 2));
{
AutoNoGIL guard;
THDAllGatherMultiGPU(output_descriptors.data(),
length_one,
input_descriptors.data(),
length_two,
group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "all_gather_multigpu", 1,
"(list[list[tensor]] output, list[tensor] input, group gr)");
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_allReduce(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0))) {
THPUtils_invalidArguments(args, nullptr, "all_reduce", 1, "(tensor in_out, reduce_op op, group gr)");
return nullptr;
}
THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
THDReduceOp op = _getReduceOp(PyTuple_GET_ITEM(args, 1));
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
{
AutoNoGIL guard;
THDAllReduce(desc, op, group);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_reduce(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 4 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "reduce", 1,
"(tensor reduced, int dst_rank, reduce_op op, group gr)");
return nullptr;
}
THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 3));
THDReduceOp op = _getReduceOp(PyTuple_GET_ITEM(args, 2));
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDReduce(desc, op, dst_rank, group);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_broadcast(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "broadcast", 1,
"(tensor src_dst, int src_rank, group gr)");
return nullptr;
}
THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDBroadcast(desc, src_rank, group);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_allGather(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence;
size_t length;
std::vector<at::Tensor> descriptors;
THDGroup group;
at::Tensor desc;
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPVariable_Check(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
descriptors.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i)))
goto invalid_arguments;
descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
);
}
group = _getGroup(PyTuple_GET_ITEM(args, 2));
desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDAllGather(descriptors.data(), length, desc, group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "allGather", 1,
"(list[tensor] output, tensor input, group gr)");
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_gatherSend(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0))) {
THPUtils_invalidArguments(args, nullptr, "gatherSend", 1,
"(tensor input, int dst_rank, group gr)");
return nullptr;
}
THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDGatherSend(desc, dst_rank, group);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_gatherRecv(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence;
size_t length;
std::vector<at::Tensor> descriptors;
THDGroup group;
at::Tensor desc;
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPVariable_Check(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
descriptors.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i)))
goto invalid_arguments;
descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
);
}
desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 1));
group = _getGroup(PyTuple_GET_ITEM(args, 2));
{
AutoNoGIL guard;
THDGatherRecv(descriptors.data(), length, desc, group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "gatherRecv", 1,
"(list[tensor] output, tensor input, group gr)");
return nullptr;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_scatterSend(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence;
size_t length;
std::vector<at::Tensor> descriptors;
THDGroup group;
at::Tensor desc;
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPVariable_Check(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
descriptors.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i)))
goto invalid_arguments;
descriptors.push_back(
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
);
}
desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 1));
group = _getGroup(PyTuple_GET_ITEM(args, 2));
{
AutoNoGIL guard;
THDScatterSend(descriptors.data(), length, desc, group);
}
Py_RETURN_NONE;
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "scatterSend", 1,
"(list[tensor] input, tensor output, group gr)");
return nullptr;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_scatterRecv(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
THPUtils_invalidArguments(args, nullptr, "scatterRecv", 1,
"(tensor output, int src_rank, group gr)");
return nullptr;
}
THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
{
AutoNoGIL guard;
THDScatterRecv(desc, src_rank, group);
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_barrier(PyObject *_unused, PyObject *_group)
{
HANDLE_TH_ERRORS
{
AutoNoGIL guard;
THDBarrier(_getGroup(_group));
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_newGroup(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
THPObjectPtr sequence;
size_t length;
std::vector<int> ranks;
if (PyTuple_GET_SIZE(args) != 1 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
goto invalid_arguments;
}
sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
"expected a sequence"));
if (!sequence.get()) {
goto invalid_arguments;
}
length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
ranks.reserve(length);
for (size_t i = 0; i < length; ++i) {
if (!THPUtils_checkLong(PySequence_Fast_GET_ITEM(sequence.get(), i)))
goto invalid_arguments;
ranks.push_back(THPUtils_unpackLong(
PySequence_Fast_GET_ITEM(sequence.get(), i)));
for (size_t j = 0; j < i; ++j)
THPUtils_assert(ranks[i] != ranks[j], "ranks should be unique");
}
THDGroup group;
{
AutoNoGIL guard;
group = THDNewGroup(ranks.data(), length);
}
return PyInt_FromLong(group);
invalid_arguments:
THPUtils_invalidArguments(args, nullptr, "newGroup", 1, "(list[int] ranks)");
return nullptr;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_requestIsCompleted(PyObject *_unused, PyObject *_req)
{
HANDLE_TH_ERRORS
if (!THPWrapper_check(_req)) {
THPUtils_invalidArguments(_req, nullptr, "requestIsCompleted", 1, "(request req)");
return nullptr;
}
return PyBool_FromLong(THDRequest_isCompleted(_unpackRequest(_req)));
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_requestWait(PyObject *_unused, PyObject *_req)
{
HANDLE_TH_ERRORS
if (!THPWrapper_check(_req)) {
THPUtils_invalidArguments(_req, nullptr, "requestWait", 1, "(request req)");
return nullptr;
}
{
AutoNoGIL guard;
THDRequest_wait(_unpackRequest(_req));
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}
PyObject* THDPModule_initExtension(PyObject *_unused, PyObject *args) {
if (PyTuple_GET_SIZE(args) != 3) {
THPUtils_invalidArguments(args, nullptr, "initExtension", 1, "(bool is_master_worker, reduce_op obj, group obj)");
return nullptr;
}
PyObject* is_master_worker_obj = PyTuple_GET_ITEM(args, 0);
PyObject* reduce_op_obj = PyTuple_GET_ITEM(args, 1);
PyObject* group_obj = PyTuple_GET_ITEM(args, 2);
THPUtils_assert(PyBool_Check(is_master_worker_obj), "first argument should be a bool");
bool is_master_worker = is_master_worker_obj == Py_True;
THPObjectPtr reduce_op;
#define REGISTER_REDUCE_OP(NAME) \
reduce_op = PyObject_GetAttrString(reduce_op_obj, #NAME); \
THPUtils_assert(reduce_op, "Missing object for reduce op " #NAME); \
obj2reduceop.emplace(reduce_op.get(), THDReduce##NAME);
REGISTER_REDUCE_OP(SUM);
REGISTER_REDUCE_OP(PRODUCT);
REGISTER_REDUCE_OP(MIN);
REGISTER_REDUCE_OP(MAX);
#undef REGISTER_REDUCE_OP
THPObjectPtr group;
#define REGISTER_GROUP(NAME) \
group = PyObject_GetAttrString(group_obj, #NAME); \
THPUtils_assert(group, "Missing object for group " #NAME); \
obj2group.emplace(group.get(), THDGroup##NAME);
REGISTER_GROUP(WORLD);
#undef REGISTER_GROUP
if (is_master_worker) {
throw std::runtime_error("THD master_worker no longer supported");
}
Py_RETURN_TRUE;
}
static struct PyMethodDef _THDPModule_methods[] = {
{"_dist_init_extension", (PyCFunction)THDPModule_initExtension, METH_VARARGS, nullptr},
{"_dist_init_process_group", (PyCFunction)THDPModule_initProcessGroup, METH_VARARGS, nullptr},
{"_dist_destroy_process_group", (PyCFunction)THDPModule_destroyProcessGroup, METH_NOARGS, nullptr},
{"_dist_clear_group_cache", (PyCFunction)THDPModule_clearGroupCache, METH_VARARGS, nullptr},
#ifdef USE_CUDA
{"_dist_register_stream", (PyCFunction)THDPModule_registerStream, METH_O, nullptr},
#endif
{"_dist_get_rank", (PyCFunction)THDPModule_getRank, METH_NOARGS, nullptr},
{"_dist_get_num_processes", (PyCFunction)THDPModule_getNumProcesses, METH_NOARGS, nullptr},
{"_dist_isend", (PyCFunction)THDPModule_isend, METH_VARARGS, nullptr},
{"_dist_irecv", (PyCFunction)THDPModule_irecv, METH_VARARGS, nullptr},
{"_dist_send", (PyCFunction)THDPModule_send, METH_VARARGS, nullptr},
{"_dist_recv_any_source", (PyCFunction)THDPModule_recvAnySource, METH_O, nullptr},
{"_dist_recv", (PyCFunction)THDPModule_recv, METH_VARARGS, nullptr},
{"_dist_all_reduce", (PyCFunction)THDPModule_allReduce, METH_VARARGS, nullptr},
{"_dist_all_reduce_multigpu", (PyCFunction)THDPModule_allReduceMultiGPU, METH_VARARGS, nullptr},
{"_dist_reduce", (PyCFunction)THDPModule_reduce, METH_VARARGS, nullptr},
{"_dist_reduce_multigpu", (PyCFunction)THDPModule_reduceMultiGPU, METH_VARARGS, nullptr},
{"_dist_broadcast", (PyCFunction)THDPModule_broadcast, METH_VARARGS, nullptr},
{"_dist_broadcast_multigpu", (PyCFunction)THDPModule_broadcastMultiGPU, METH_VARARGS, nullptr},
{"_dist_all_gather", (PyCFunction)THDPModule_allGather, METH_VARARGS, nullptr},
{"_dist_all_gather_multigpu", (PyCFunction)THDPModule_allGatherMultiGPU, METH_VARARGS, nullptr},
{"_dist_gather_send", (PyCFunction)THDPModule_gatherSend, METH_VARARGS, nullptr},
{"_dist_gather_recv", (PyCFunction)THDPModule_gatherRecv, METH_VARARGS, nullptr},
{"_dist_scatter_send", (PyCFunction)THDPModule_scatterSend, METH_VARARGS, nullptr},
{"_dist_scatter_recv", (PyCFunction)THDPModule_scatterRecv, METH_VARARGS, nullptr},
{"_dist_barrier", (PyCFunction)THDPModule_barrier, METH_O, nullptr},
{"_dist_new_group", (PyCFunction)THDPModule_newGroup, METH_VARARGS, nullptr},
{"_dist_request_is_completed", (PyCFunction)THDPModule_requestIsCompleted, METH_O, nullptr},
{"_dist_request_wait", (PyCFunction)THDPModule_requestWait, METH_O, nullptr},
{nullptr}
};
PyMethodDef* THDPModule_methods() {
return _THDPModule_methods;
}