blob: 9fbf300cac2d1b9948b3899a972b54176f3f6edd [file] [log] [blame]
#include <ATen/core/dispatch/Dispatcher.h>
#include <c10/util/intrusive_ptr.h>
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/library.h>
namespace c10d {
namespace {
TORCH_LIBRARY(c10d, m) {
// The following ProcessGroup, Work, and ReduceOp definitions are more like
// declarations. They don't expose the details of the two classes into
// TorchScript.
m.class_<ProcessGroup>("ProcessGroup").def(torch::init<int64_t, int64_t>());
m.class_<Work>("Work")
.def(torch::init<>())
.def("wait", [](const c10::intrusive_ptr<Work>& self) { self->wait(); });
m.class_<ReduceOp>("ReduceOp").def(torch::init<>());
m.def(
"broadcast_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int root_rank, int root_tensor, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"allreduce_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"allreduce_coalesced_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> __torch__.torch.classes.c10d.Work");
m.def(
"allgather_(Tensor[][] output_tensors, Tensor[] input_tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int timeout) -> (Tensor[][], __torch__.torch.classes.c10d.Work)");
m.def(
"_allgather_base_(Tensor output_tensor, Tensor input_tensor, __torch__.torch.classes.c10d.ProcessGroup process_group) -> (Tensor, __torch__.torch.classes.c10d.Work)");
m.def(
"allgather_coalesced_(Tensor[][] output_lists, Tensor[] input_list, __torch__.torch.classes.c10d.ProcessGroup process_group) -> __torch__.torch.classes.c10d.Work");
m.def(
"reduce_scatter_(Tensor[] output_tensors, Tensor[][] input_tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"_reduce_scatter_base_(Tensor output_tensor, Tensor input_tensor, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> (Tensor, __torch__.torch.classes.c10d.Work)");
m.def(
"reduce_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int root_rank, int root_tensor, int timeout) -> __torch__.torch.classes.c10d.Work");
m.def(
"gather_(Tensor[][] output_tensors, Tensor[] input_tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int root_rank, int timeout) -> __torch__.torch.classes.c10d.Work");
m.def(
"scatter_(Tensor[] output_tensors, Tensor[][] input_tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int root_rank, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"alltoall_(Tensor[] output_tensors, Tensor[] input_tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"alltoall_base_(Tensor output, Tensor input, __torch__.torch.classes.c10d.ProcessGroup process_group, int[] output_split_sizes, int[] input_split_sizes, int timeout) -> __torch__.torch.classes.c10d.Work");
m.def(
"barrier(Tensor tensor, __torch__.torch.classes.c10d.ProcessGroup process_group, int[] device_ids, int timeout) -> __torch__.torch.classes.c10d.Work");
m.def(
"monitored_barrier_(Tensor tensor, __torch__.torch.classes.c10d.ProcessGroup process_group, int[] device_ids, int timeout, bool wait_all_ranks) -> ()");
m.def(
"send(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int dst, int tag) -> __torch__.torch.classes.c10d.Work");
m.def(
"recv_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int src, int tag) -> __torch__.torch.classes.c10d.Work");
m.def(
"recv_any_source_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int tag) -> __torch__.torch.classes.c10d.Work");
}
} // namespace
namespace ops {
// Below are ProcessGroup's corresponding ops for each backend. Ops are but
// routed through the dispatcher to be dispatched to the appropriate backend.
// Currently a no-op as the process group does not have a list of backends.
c10::intrusive_ptr<Work> send_cpu(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t dstRank,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CPU)
->send(tensor_vec, static_cast<int>(dstRank), static_cast<int>(tag));
}
c10::intrusive_ptr<Work> send_cuda(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t dstRank,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CUDA)
->send(tensor_vec, static_cast<int>(dstRank), static_cast<int>(tag));
}
c10::intrusive_ptr<c10d::Work> send_privateuse1(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t dstRank,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->send(tensor_vec, static_cast<int>(dstRank), static_cast<int>(tag));
}
c10::intrusive_ptr<Work> recv_cpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t srcRank,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CPU)
->recv(tensor_vec, static_cast<int>(srcRank), static_cast<int>(tag));
}
c10::intrusive_ptr<Work> recv_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t srcRank,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CUDA)
->recv(tensor_vec, static_cast<int>(srcRank), static_cast<int>(tag));
}
c10::intrusive_ptr<c10d::Work> recv_privateuse1_(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t srcRank,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->recv(tensor_vec, static_cast<int>(srcRank), static_cast<int>(tag));
}
c10::intrusive_ptr<Work> recv_any_source_cpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CPU)
->recvAnysource(tensor_vec, static_cast<int>(tag));
}
c10::intrusive_ptr<Work> recv_any_source_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CUDA)
->recvAnysource(tensor_vec, static_cast<int>(tag));
}
c10::intrusive_ptr<c10d::Work> recv_any_source_privateuse1_(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t tag) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->recvAnysource(tensor_vec, static_cast<int>(tag));
}
c10::intrusive_ptr<Work> reduce_cpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t root_rank,
int64_t root_tensor,
int64_t timeout) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CPU)
->reduce(
tensor_vec,
ReduceOptions{
*reduce_op.get(),
root_rank,
root_tensor,
std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<Work> reduce_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t root_rank,
int64_t root_tensor,
int64_t timeout) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::CUDA)
->reduce(
tensor_vec,
ReduceOptions{
*reduce_op.get(),
root_rank,
root_tensor,
std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<c10d::Work> reduce_privateuse1_(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
const c10::intrusive_ptr<c10d::ReduceOp>& reduce_op,
int64_t root_rank,
int64_t root_tensor,
int64_t timeout) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->reduce(
tensor_vec,
c10d::ReduceOptions{
*reduce_op.get(),
root_rank,
root_tensor,
std::chrono::milliseconds(timeout)});
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> broadcast_cpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
int64_t root_tensor,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CPU)
->broadcast(
tensor_vec,
BroadcastOptions{
root_rank, root_tensor, std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(tensor_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> broadcast_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
int64_t root_tensor,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CUDA)
->broadcast(
tensor_vec,
BroadcastOptions{
root_rank, root_tensor, std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(tensor_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>
broadcast_privateuse1_(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t root_rank,
int64_t root_tensor,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->broadcast(
tensor_vec,
c10d::BroadcastOptions{
root_rank, root_tensor, std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>(
std::move(tensor_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> allreduce_cpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CPU)
->allreduce(
tensor_vec,
AllreduceOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
// Return input tensors as output tensors to make inplace allreduce look like
// a functional API, so that make_fx can correctly build the dependencies in
// the graph later.
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(tensor_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> allreduce_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CUDA)
->allreduce(
tensor_vec,
AllreduceOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
// Return input tensors as output tensors to make inplace allreduce look like
// a functional API, so that make_fx can correctly build the dependencies in
// the graph later.
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(tensor_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>
allreduce_privateuse1_(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
const c10::intrusive_ptr<c10d::ReduceOp>& reduce_op,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->allreduce(
tensor_vec,
c10d::AllreduceOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
// Return input tensors as output tensors to make inplace allreduce look like
// a functional API, so that make_fx can correctly build the dependencies in
// the graph later.
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>(
std::move(tensor_vec), work);
}
c10::intrusive_ptr<Work> allreduce_coalesced_cpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto tensor_vec = tensors.vec();
AllreduceCoalescedOptions opts = AllreduceCoalescedOptions{};
opts.reduceOp = *reduce_op.get();
opts.timeout = std::chrono::milliseconds(timeout);
return process_group->getBackend(c10::DeviceType::CPU)
->allreduce_coalesced(tensor_vec, opts);
}
c10::intrusive_ptr<Work> allreduce_coalesced_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto tensor_vec = tensors.vec();
AllreduceCoalescedOptions opts = AllreduceCoalescedOptions{};
opts.reduceOp = *reduce_op.get();
opts.timeout = std::chrono::milliseconds(timeout);
return process_group->getBackend(c10::DeviceType::CUDA)
->allreduce_coalesced(tensor_vec, opts);
}
c10::intrusive_ptr<c10d::Work> allreduce_coalesced_privateuse1_(
at::TensorList tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
const c10::intrusive_ptr<c10d::ReduceOp>& reduce_op,
int64_t timeout) {
auto tensor_vec = tensors.vec();
c10d::AllreduceCoalescedOptions opts = c10d::AllreduceCoalescedOptions{};
opts.reduceOp = *reduce_op.get();
opts.timeout = std::chrono::milliseconds(timeout);
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->allreduce_coalesced(tensor_vec, opts);
}
std::tuple<std::vector<std::vector<at::Tensor>>, c10::intrusive_ptr<Work>>
allgather_cpu_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
at::TensorList input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CPU)
->allgather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
AllgatherOptions{std::chrono::milliseconds(timeout)});
// Copy output tensors (not storage) so that this can be used in a functional
// manner
return std::
tuple<std::vector<std::vector<at::Tensor>>, c10::intrusive_ptr<Work>>(
output_tensors, work);
}
std::tuple<std::vector<std::vector<at::Tensor>>, c10::intrusive_ptr<Work>>
allgather_cuda_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
at::TensorList input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CUDA)
->allgather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
AllgatherOptions{std::chrono::milliseconds(timeout)});
// Copy output tensors (not storage) so that this can be used in a functional
// manner
return std::
tuple<std::vector<std::vector<at::Tensor>>, c10::intrusive_ptr<Work>>(
output_tensors, work);
}
std::tuple<std::vector<std::vector<at::Tensor>>, c10::intrusive_ptr<c10d::Work>>
allgather_privateuse1_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
at::TensorList input_tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->allgather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
c10d::AllgatherOptions{std::chrono::milliseconds(timeout)});
// Copy output tensors (not storage) so that this can be used in a functional
// manner
return std::tuple<
std::vector<std::vector<at::Tensor>>,
c10::intrusive_ptr<c10d::Work>>(output_tensors, work);
}
std::tuple<at::Tensor, c10::intrusive_ptr<Work>> _allgather_base_cpu_(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const c10::intrusive_ptr<ProcessGroup>& process_group) {
auto work = process_group->getBackend(c10::DeviceType::CPU)
->_allgather_base(output_tensor, input_tensor);
return std::tuple<at::Tensor, c10::intrusive_ptr<Work>>(output_tensor, work);
}
std::tuple<at::Tensor, c10::intrusive_ptr<Work>> _allgather_base_cuda_(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const c10::intrusive_ptr<ProcessGroup>& process_group) {
auto work = process_group->getBackend(c10::DeviceType::CUDA)
->_allgather_base(output_tensor, input_tensor);
return std::tuple<at::Tensor, c10::intrusive_ptr<Work>>(output_tensor, work);
}
std::tuple<at::Tensor, c10::intrusive_ptr<c10d::Work>>
_allgather_base_privateuse1_(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group) {
auto work = process_group->getBackend(c10::DeviceType::PrivateUse1)
->_allgather_base(output_tensor, input_tensor);
return std::tuple<at::Tensor, c10::intrusive_ptr<c10d::Work>>(
output_tensor, work);
}
c10::intrusive_ptr<Work> allgather_coalesced_cpu_(
const std::vector<std::vector<at::Tensor>>& output_lists,
const at::TensorList& input_list,
const c10::intrusive_ptr<ProcessGroup>& process_group) {
auto input_list_vec = input_list.vec();
return process_group->getBackend(c10::DeviceType::CPU)
->allgather_coalesced(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_lists),
input_list_vec);
}
c10::intrusive_ptr<Work> allgather_coalesced_cuda_(
const std::vector<std::vector<at::Tensor>>& output_lists,
const at::TensorList& input_list,
const c10::intrusive_ptr<ProcessGroup>& process_group) {
auto input_list_vec = input_list.vec();
return process_group->getBackend(c10::DeviceType::CUDA)
->allgather_coalesced(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_lists),
input_list_vec);
}
c10::intrusive_ptr<c10d::Work> allgather_coalesced_privateuse1_(
const std::vector<std::vector<at::Tensor>>& output_lists,
const at::TensorList& input_list,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group) {
auto input_list_vec = input_list.vec();
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->allgather_coalesced(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_lists),
input_list_vec);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>
reduce_scatter_cpu_(
const at::TensorList& output_tensors,
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CPU)
->reduce_scatter(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
output_tensors_vec, work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>
reduce_scatter_cuda_(
const at::TensorList& output_tensors,
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CUDA)
->reduce_scatter(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
output_tensors_vec, work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>
reduce_scatter_privateuse1_(
const at::TensorList& output_tensors,
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
const c10::intrusive_ptr<c10d::ReduceOp>& reduce_op,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->reduce_scatter(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
c10d::ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>(
output_tensors_vec, work);
}
std::tuple<at::Tensor, c10::intrusive_ptr<Work>> _reduce_scatter_base_cpu_(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto work =
process_group->getBackend(c10::DeviceType::CPU)
->_reduce_scatter_base(
output_tensor,
input_tensor,
ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
return std::tuple<at::Tensor, c10::intrusive_ptr<Work>>(output_tensor, work);
}
std::tuple<at::Tensor, c10::intrusive_ptr<Work>> _reduce_scatter_base_cuda_(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t timeout) {
auto work =
process_group->getBackend(c10::DeviceType::CUDA)
->_reduce_scatter_base(
output_tensor,
input_tensor,
ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
return std::tuple<at::Tensor, c10::intrusive_ptr<Work>>(output_tensor, work);
}
std::tuple<at::Tensor, c10::intrusive_ptr<c10d::Work>>
_reduce_scatter_base_privateuse1_(
at::Tensor& output_tensor,
at::Tensor& input_tensor,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
const c10::intrusive_ptr<c10d::ReduceOp>& reduce_op,
int64_t timeout) {
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->_reduce_scatter_base(
output_tensor,
input_tensor,
c10d::ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
return std::tuple<at::Tensor, c10::intrusive_ptr<c10d::Work>>(
output_tensor, work);
}
c10::intrusive_ptr<Work> gather_cpu_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
return process_group->getBackend(c10::DeviceType::CPU)
->gather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
GatherOptions{root_rank, std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<Work> gather_cuda_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
return process_group->getBackend(c10::DeviceType::CUDA)
->gather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
GatherOptions{root_rank, std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<c10d::Work> gather_privateuse1_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t root_rank,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->gather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
c10d::GatherOptions{root_rank, std::chrono::milliseconds(timeout)});
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> scatter_cpu_(
const at::TensorList& output_tensors,
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CPU)
->scatter(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
ScatterOptions{root_rank, std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(output_tensors_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> scatter_cuda_(
const at::TensorList& output_tensors,
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::CUDA)
->scatter(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
ScatterOptions{root_rank, std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(output_tensors_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>
scatter_privateuse1_(
const at::TensorList& output_tensors,
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t root_rank,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->scatter(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
c10d::ScatterOptions{
root_rank, std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>(
std::move(output_tensors_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> alltoall_cpu_(
const at::TensorList& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto input_tensors_vec = input_tensors.vec();
auto work = process_group->getBackend(c10::DeviceType::CPU)
->alltoall(
output_tensors_vec,
input_tensors_vec,
AllToAllOptions{std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(output_tensors_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>> alltoall_cuda_(
const at::TensorList& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto input_tensors_vec = input_tensors.vec();
auto work = process_group->getBackend(c10::DeviceType::CUDA)
->alltoall(
output_tensors_vec,
input_tensors_vec,
AllToAllOptions{std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(output_tensors_vec), work);
}
std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>
alltoall_privateuse1_(
const at::TensorList& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto input_tensors_vec = input_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::PrivateUse1)
->alltoall(
output_tensors_vec,
input_tensors_vec,
c10d::AllToAllOptions{std::chrono::milliseconds(timeout)});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<c10d::Work>>(
std::move(output_tensors_vec), work);
}
c10::intrusive_ptr<Work> alltoall_base_cpu_(
at::Tensor& output,
at::Tensor& input,
const c10::intrusive_ptr<ProcessGroup>& process_group,
std::vector<int64_t> output_split_sizes,
std::vector<int64_t> input_split_sizes,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::CPU)
->alltoall_base(
output,
input,
output_split_sizes,
input_split_sizes,
AllToAllOptions{std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<Work> alltoall_base_cuda_(
at::Tensor& output,
at::Tensor& input,
const c10::intrusive_ptr<ProcessGroup>& process_group,
std::vector<int64_t> output_split_sizes,
std::vector<int64_t> input_split_sizes,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::CUDA)
->alltoall_base(
output,
input,
output_split_sizes,
input_split_sizes,
AllToAllOptions{std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<c10d::Work> alltoall_base_privateuse1_(
at::Tensor& output,
at::Tensor& input,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
std::vector<int64_t> output_split_sizes,
std::vector<int64_t> input_split_sizes,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->alltoall_base(
output,
input,
output_split_sizes,
input_split_sizes,
c10d::AllToAllOptions{std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<Work> barrier_cpu(
at::Tensor /* unused */,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const std::vector<int64_t>& device_ids,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::CPU)
->barrier(BarrierOptions{device_ids, std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<Work> barrier_cuda(
at::Tensor /* unused */,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const std::vector<int64_t>& device_ids,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::CUDA)
->barrier(BarrierOptions{device_ids, std::chrono::milliseconds(timeout)});
}
c10::intrusive_ptr<c10d::Work> barrier_privateuse1(
at::Tensor /* unused */,
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
const std::vector<int64_t>& device_ids,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::PrivateUse1)
->barrier(
c10d::BarrierOptions{device_ids, std::chrono::milliseconds(timeout)});
}
void monitored_barrier_cpu_(
at::Tensor /* unused */,
const c10::intrusive_ptr<::c10d::ProcessGroup>& process_group,
const std::vector<int64_t>& device_ids,
int64_t timeout,
bool wait_all_ranks) {
process_group->getBackend(c10::DeviceType::CPU)
->monitoredBarrier(
BarrierOptions{device_ids, std::chrono::milliseconds(timeout)},
wait_all_ranks);
}
// register functions to dispatcher
namespace {
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("send", send_cpu);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("send", send_cuda);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("send", send_privateuse1);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("recv_", recv_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("recv_", recv_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("recv_", recv_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("recv_any_source_", recv_any_source_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("recv_any_source_", recv_any_source_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("recv_any_source_", recv_any_source_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("reduce_", reduce_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("reduce_", reduce_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("reduce_", reduce_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("broadcast_", broadcast_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("broadcast_", broadcast_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("broadcast_", broadcast_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("allreduce_", allreduce_cpu_);
}
// TODO: The SparseCPU/SparseCUDA dispatched methods are only used to support
// sparse all_reduce in the Gloo backend
TORCH_LIBRARY_IMPL(c10d, SparseCPU, m) {
m.impl("allreduce_", allreduce_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, SparseCUDA, m) {
m.impl("allreduce_", allreduce_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("allreduce_", allreduce_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("allreduce_", allreduce_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("allreduce_coalesced_", allreduce_coalesced_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("allreduce_coalesced_", allreduce_coalesced_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("allreduce_coalesced_", allreduce_coalesced_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("allgather_", allgather_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("allgather_", allgather_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("allgather_", allgather_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("_allgather_base_", _allgather_base_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("_allgather_base_", _allgather_base_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("_allgather_base_", _allgather_base_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("allgather_coalesced_", allgather_coalesced_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("allgather_coalesced_", allgather_coalesced_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("allgather_coalesced_", allgather_coalesced_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("reduce_scatter_", reduce_scatter_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("reduce_scatter_", reduce_scatter_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("reduce_scatter_", reduce_scatter_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("_reduce_scatter_base_", _reduce_scatter_base_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("_reduce_scatter_base_", _reduce_scatter_base_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("_reduce_scatter_base_", _reduce_scatter_base_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("gather_", gather_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("gather_", gather_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("gather_", gather_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("scatter_", scatter_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("scatter_", scatter_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("scatter_", scatter_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("alltoall_", alltoall_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("alltoall_", alltoall_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("alltoall_", alltoall_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("alltoall_base_", alltoall_base_cpu_);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("alltoall_base_", alltoall_base_cuda_);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("alltoall_base_", alltoall_base_privateuse1_);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("barrier", barrier_cpu);
}
TORCH_LIBRARY_IMPL(c10d, CUDA, m) {
m.impl("barrier", barrier_cuda);
}
TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) {
m.impl("barrier", barrier_privateuse1);
}
TORCH_LIBRARY_IMPL(c10d, CPU, m) {
m.impl("monitored_barrier_", monitored_barrier_cpu_);
}
} // namespace
} // namespace ops
} // namespace c10d