blob: da2c3a1dca93952d7f6d13984b1d82f3b9b0c4ce [file] [log] [blame]
#include <torch/csrc/distributed/c10d/frontend.h>
#include <c10/util/Exception.h>
#include <sstream>
#include <stdexcept>
namespace c10d {
// Note: We assume that group.WORLD equates default_pg_. Otherwise,
// we need many additional conditionals to check whether group is WORLD and
// then use default_pg_ explicitly.
int64_t DistributedC10d::getRank(const std::shared_ptr<ProcessGroup>& group) const {
if (rankNotInGroup(group)) {
return -1;
}
return group->getRank();
}
int64_t DistributedC10d::getWorldSize(
const std::shared_ptr<ProcessGroup>& group) const {
if (rankNotInGroup(group)) {
return -1;
}
return getGroupSize(group);
}
int64_t DistributedC10d::getGroupSize(
const std::shared_ptr<ProcessGroup>& group) const {
if (group == default_pg_) {
default_pg_->getSize();
}
auto it = pg_group_ranks_.find(group);
TORCH_CHECK(it != pg_group_ranks_.end(), "The given group does not exist");
return it->second.size();
}
std::shared_ptr<ProcessGroup> DistributedC10d::worldProcessGroup() {
checkDefaultPg();
return default_pg_;
}
bool DistributedC10d::rankNotInGroup(
const std::shared_ptr<ProcessGroup>& group) const {
if (group == default_pg_) {
return false;
}
return group == nullptr;
}
int64_t DistributedC10d::getGroupRank(
const std::shared_ptr<ProcessGroup>& group,
const int64_t rank) const {
TORCH_CHECK(
group != default_pg_,
"group.WORLD does not have local rank to global rank mapping");
auto it = pg_group_ranks_.find(group);
TORCH_CHECK(it != pg_group_ranks_.end(), "The given group does not exist");
auto& group_rank_map = it->second;
auto g_it = group_rank_map.find(rank);
if (g_it == group_rank_map.end()) {
std::string group_name = "Unknown";
auto name_it = pg_names_.find(group);
if (name_it != pg_names_.end()) {
group_name = name_it->second;
}
TORCH_CHECK(
false,
"The global rank ",
rank,
" is not part of the group ",
group_name);
}
return g_it->second;
}
int64_t DistributedC10d::getGlobalRank(
const std::shared_ptr<ProcessGroup>& group,
const int64_t group_rank) const {
TORCH_CHECK(
group != default_pg_,
"group.WORLD does not have local rank to global rank mapping");
auto it = pg_group_ranks_.find(group);
TORCH_CHECK(it != pg_group_ranks_.end(), "The given group does not exist");
auto& group_rank_map = it->second;
for (const auto& p : group_rank_map) {
if (p.second == group_rank) {
return p.first;
}
}
TORCH_CHECK(false, "The group rank is not part of the group");
}
std::string DistributedC10d::getBackend(
const std::shared_ptr<ProcessGroup>& group) {
TORCH_CHECK(!rankNotInGroup(group), "Invalid process group specified");
auto it = pg_map_.find(group);
TORCH_CHECK(it != pg_map_.end(), "The given group does not exist");
return it->second.first;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::isend(
at::Tensor tensor,
int64_t dst,
const std::shared_ptr<ProcessGroup>& group,
c10::optional<int64_t>& tag) {
if (rankNotInGroup(group)) {
return nullptr;
}
std::vector<at::Tensor> inputs = {std::move(tensor)};
checkDefaultPg();
if (group == default_pg_) {
return default_pg_->send(inputs, dst, tag.value_or(0));
}
auto group_dst_rank = getGroupRank(group, dst);
return group->send(inputs, group_dst_rank, tag.value_or(0));
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::irecv(
at::Tensor tensor,
int64_t src,
const std::shared_ptr<ProcessGroup>& group,
c10::optional<int64_t>& tag) {
if (rankNotInGroup(group)) {
return nullptr;
}
std::vector<at::Tensor> inputs = {std::move(tensor)};
checkDefaultPg();
if (group == default_pg_) {
return default_pg_->recv(inputs, src, tag.value_or(0));
}
auto group_dst_rank = getGroupRank(group, src);
return group->recv(inputs, group_dst_rank, tag.value_or(0));
}
void DistributedC10d::send(
at::Tensor tensor,
int64_t dst,
const std::shared_ptr<ProcessGroup>& group,
c10::optional<int64_t>& tag) {
auto work = isend(std::move(tensor), dst, group, tag);
if (work) {
work->wait();
}
}
int64_t DistributedC10d::recv(
at::Tensor tensor,
const c10::optional<int64_t>& src,
const std::shared_ptr<ProcessGroup>& group,
c10::optional<int64_t>& tag) {
if (rankNotInGroup(group)) {
return -1;
}
std::vector<at::Tensor> outputs = {std::move(tensor)};
if (!src.has_value()) {
auto work = group->recvAnysource(outputs, tag.value_or(0));
work->wait();
auto src_rank = work->sourceRank();
if (group == default_pg_) {
return src_rank;
}
return getGlobalRank(group, src_rank);
}
if (group == default_pg_) {
group->recv(outputs, src.value(), tag.value_or(0))->wait();
} else {
int64_t group_src_rank = getGroupRank(group, src.value());
group->recv(outputs, group_src_rank, tag.value_or(0))->wait();
}
return src.value();
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::broadcastMultiGPU(
std::vector<at::Tensor>& tensor_list,
int64_t src,
const std::shared_ptr<ProcessGroup>& group,
bool async_op,
int64_t src_tensor) {
if (rankNotInGroup(group)) {
return nullptr;
}
BroadcastOptions opts;
opts.rootRank = src;
opts.rootTensor = src_tensor;
checkDefaultPg();
std::shared_ptr<ProcessGroup::Work> work;
if (group == default_pg_) {
work = default_pg_->broadcast(tensor_list, opts);
} else {
int64_t group_src_rank = getGroupRank(group, src);
opts.rootRank = group_src_rank;
work = group->broadcast(tensor_list, opts);
}
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::broadcast(
at::Tensor tensor,
int64_t src,
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
BroadcastOptions opts;
opts.rootRank = src;
opts.rootTensor = 0;
std::vector<at::Tensor> tensors = {std::move(tensor)};
std::shared_ptr<ProcessGroup::Work> work;
checkDefaultPg();
if (group == default_pg_) {
work = group->broadcast(tensors, opts);
} else {
int64_t group_src_rank = getGroupRank(group, src);
opts.rootRank = group_src_rank;
work = group->broadcast(tensors, opts);
}
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allReduceMultiGPU(
std::vector<at::Tensor>& tensor_list,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
AllreduceOptions opts;
opts.reduceOp = op;
auto work = group->allreduce(tensor_list, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allReduce(
at::Tensor tensor,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
AllreduceOptions opts;
opts.reduceOp = op;
std::vector<at::Tensor> tensors = {std::move(tensor)};
auto work = group->allreduce(tensors, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allReduceCoalesced(
std::vector<at::Tensor>& tensors,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
AllreduceCoalescedOptions opts;
opts.reduceOp = op;
auto work = group->allreduce_coalesced(tensors, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::reduceMultiGPU(
std::vector<at::Tensor>& tensor_list,
int64_t dst,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op ,
int64_t dst_tensor) {
if (rankNotInGroup(group)) {
return nullptr;
}
ReduceOptions opts;
opts.reduceOp = op;
opts.rootRank = dst;
opts.rootTensor = dst_tensor;
checkDefaultPg();
std::shared_ptr<ProcessGroup::Work> work;
if (group == default_pg_) {
work = group->reduce(tensor_list, opts);
} else {
int64_t group_dst_rank = getGroupRank(group, dst);
opts.rootRank = group_dst_rank;
work = group->reduce(tensor_list, opts);
}
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::reduce(
at::Tensor tensor,
int64_t dst,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
ReduceOptions opts;
opts.reduceOp = op;
opts.rootRank = dst;
checkDefaultPg();
std::shared_ptr<ProcessGroup::Work> work;
std::vector<at::Tensor> tensors = {std::move(tensor)};
if (group == default_pg_) {
work = group->reduce(tensors, opts);
} else {
int64_t group_dst_rank = getGroupRank(group, dst);
opts.rootRank = group_dst_rank;
work = group->reduce(tensors, opts);
}
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allGatherMultiGPU(
std::vector<std::vector<at::Tensor>>& output_tensor_lists,
std::vector<at::Tensor>& input_tensor_list,
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
auto work = group->allgather(output_tensor_lists, input_tensor_list);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allGather(
std::vector<at::Tensor>& tensor_list,
at::Tensor tensor,
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
std::vector<std::vector<at::Tensor>> output_tensor_lists = {std::move(tensor_list)};
std::vector<at::Tensor> input_tensor_list = {std::move(tensor)};
auto work = group->allgather(output_tensor_lists, input_tensor_list);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allGatherCoalesced(
std::vector<std::vector<at::Tensor>>& output_tensor_lists,
std::vector<at::Tensor>& input_tensor_list,
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
auto work =
group->allgather_coalesced(output_tensor_lists, input_tensor_list);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::gather(
at::Tensor tensor,
const c10::optional<std::vector<at::Tensor>>& gather_list,
const std::shared_ptr<ProcessGroup>& group,
int64_t dst,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
auto my_rank = group->getRank();
std::vector<std::vector<at::Tensor>> output_tensors;
if (dst == my_rank) {
TORCH_CHECK(
gather_list.has_value(),
"Argument ``gather_list`` must be specified on destination rank");
output_tensors.push_back(gather_list.value());
} else {
TORCH_CHECK(
!gather_list.has_value(),
"Argument ``gather_list`` must NOT be specified on non-destination ranks.");
}
std::vector<at::Tensor> input_tensors = {std::move(tensor)};
GatherOptions opts;
opts.rootRank = dst;
std::shared_ptr<ProcessGroup::Work> work;
if (group == default_pg_) {
work = group->gather(output_tensors, input_tensors, opts);
} else {
int64_t group_dst_rank = getGroupRank(group, dst);
opts.rootRank = group_dst_rank;
work = group->gather(output_tensors, input_tensors, opts);
}
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::scatter(
at::Tensor tensor,
std::vector<at::Tensor>& scatter_list,
const std::shared_ptr<ProcessGroup>& group,
int64_t src,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
auto my_rank = getRank(default_pg_);
std::vector<at::Tensor> output_tensors = {std::move(tensor)};
std::vector<std::vector<at::Tensor>> input_tensors;
if (src == my_rank) {
input_tensors.push_back(scatter_list);
}
ScatterOptions opts;
opts.rootRank = src;
std::shared_ptr<ProcessGroup::Work> work;
if (group == default_pg_) {
work = group->scatter(output_tensors, input_tensors, opts);
} else {
int64_t group_src_rank = getGroupRank(group, src);
opts.rootRank = group_src_rank;
work = group->scatter(output_tensors, input_tensors, opts);
}
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::reduceScatterMultiGPU(
std::vector<at::Tensor>& output_tensor_list,
std::vector<std::vector<at::Tensor>>& input_tensor_lists,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
ReduceScatterOptions opts;
opts.reduceOp = op;
auto work =
group->reduce_scatter(output_tensor_list, input_tensor_lists, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::reduceScatter(
at::Tensor output,
std::vector<at::Tensor>& input_tensor_list,
const std::shared_ptr<ProcessGroup>& group,
ReduceOp op,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
ReduceScatterOptions opts;
opts.reduceOp = op;
std::vector<at::Tensor> output_tensor_list = {std::move(output)};
std::vector<std::vector<at::Tensor>> input_tensor_lists = {std::move(input_tensor_list)};
auto work =
group->reduce_scatter(output_tensor_list, input_tensor_lists, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allToAllSingle(
at::Tensor output,
at::Tensor input,
std::vector<int64_t>& output_split_sizes,
std::vector<int64_t>& input_split_sizes,
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
AllToAllOptions opts;
auto work = group->alltoall_base(
output, input, output_split_sizes, input_split_sizes, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::allToAll(
std::vector<at::Tensor>& output_tensor_list,
std::vector<at::Tensor>& input_tensor_list,
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
AllToAllOptions opts;
auto work = group->alltoall(output_tensor_list, input_tensor_list, opts);
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
std::shared_ptr<ProcessGroup::Work> DistributedC10d::barrier(
const std::shared_ptr<ProcessGroup>& group,
bool async_op) {
if (rankNotInGroup(group)) {
return nullptr;
}
auto work = group->barrier();
if (async_op) {
return work;
}
work->wait();
return nullptr;
}
} // namespace c10d