blob: fcad1a15e0437b8b3dacbce893c3752ec5dff98f [file] [log] [blame]
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <unistd.h>
#include <c10d/ProcessGroupMPI.hpp>
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
void testAllreduce(int iter = 1000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
// Generate inputs
std::vector<std::vector<at::Tensor>> allTensors(iter);
for (auto i = 0; i < iter; ++i) {
auto tensor = at::ones({16, 16}) * i;
allTensors[i] = std::vector<at::Tensor>({tensor});
}
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (auto& tensors : allTensors) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work = pg->allreduce(tensors);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Get the world size
auto worldSize = pg->getSize();
// Verify outputs
for (int i = 0; i < iter; ++i) {
const auto expected = worldSize * i;
auto data = allTensors[i][0].data<float>();
for (auto j = 0; j < allTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
void testBroadcast(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
// Generate inputs
std::vector<std::vector<at::Tensor>> allTensors(iter);
for (auto i = 0; i < iter; ++i) {
if (pg->getRank() == 0) {
auto tensor = at::ones({16, 16}) * i;
allTensors[i] = std::vector<at::Tensor>({tensor});
} else {
auto tensor = at::zeros({16, 16});
allTensors[i] = std::vector<at::Tensor>({tensor});
}
}
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (auto& tensors : allTensors) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work = pg->broadcast(tensors);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Verify outputs
for (int i = 0; i < iter; ++i) {
const auto expected = i;
auto data = allTensors[i][0].data<float>();
for (auto j = 0; j < allTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
void testReduce(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
// Generate inputs
std::vector<std::vector<at::Tensor>> allTensors(iter);
for (auto i = 0; i < iter; ++i) {
auto tensor = at::ones({16, 16}) * i;
allTensors[i] = std::vector<at::Tensor>({tensor});
}
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (auto& tensors : allTensors) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work = pg->reduce(tensors);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Get the world size
auto worldSize = pg->getSize();
if (pg->getRank() == 0) {
// Verify outputs
for (int i = 0; i < iter; ++i) {
const auto expected = worldSize * i;
auto data = allTensors[i][0].data<float>();
for (auto j = 0; j < allTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
}
void testAllgather(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<std::vector<at::Tensor>> allTensors(iter);
std::vector<std::vector<std::vector<at::Tensor>>> allOutputTensors(iter);
// Get the world size
auto worldSize = pg->getSize();
auto rank = pg->getRank();
// Generate inputs
for (auto i = 0; i < iter; ++i) {
auto tensor = at::ones({16, 16}) * i * rank;
allTensors[i] = std::vector<at::Tensor>({tensor});
allOutputTensors[i] = std::vector<std::vector<at::Tensor>>(1);
allOutputTensors[i][0].resize(worldSize);
for (auto j = 0; j < worldSize; ++j) {
allOutputTensors[i][0][j] = at::zeros({16, 16});
}
}
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (size_t i = 0; i < allTensors.size(); ++i) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work =
pg->allgather(allOutputTensors[i], allTensors[i]);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Verify outputs
for (int i = 0; i < iter; ++i) {
for (int j = 0; j < worldSize; ++j) {
const auto expected = i * j;
auto data = allOutputTensors[i][0][j].data<float>();
for (auto k = 0; k < allOutputTensors[i][0][j].numel(); ++k) {
if (data[k] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
}
void testGather(int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<std::vector<at::Tensor>> allTensors(iter);
std::vector<std::vector<std::vector<at::Tensor>>> allOutputTensors(iter);
// Get the world size
auto worldSize = pg->getSize();
auto rank = pg->getRank();
// Generate inputs
for (auto i = 0; i < iter; ++i) {
auto tensor = at::ones({16, 16}) * i * rank;
allTensors[i] = std::vector<at::Tensor>({tensor});
if (rank == 0) {
allOutputTensors[i] = std::vector<std::vector<at::Tensor>>(1);
allOutputTensors[i][0].resize(worldSize);
for (auto j = 0; j < worldSize; ++j) {
allOutputTensors[i][0][j] = at::zeros({16, 16});
}
} else {
allOutputTensors[i] = std::vector<std::vector<at::Tensor>>(1);
}
}
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (size_t i = 0; i < allTensors.size(); ++i) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work =
pg->gather(allOutputTensors[i], allTensors[i]);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Verify outputs
if (rank == 0) {
for (int i = 0; i < iter; ++i) {
for (int j = 0; j < worldSize; ++j) {
const auto expected = i * j;
auto data = allOutputTensors[i][0][j].data<float>();
for (auto k = 0; k < allOutputTensors[i][0][j].numel(); ++k) {
if (data[k] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
}
}
void testScatter(int iter = 1) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
std::vector<std::vector<std::vector<at::Tensor>>> allInputTensors(iter);
std::vector<std::vector<at::Tensor>> allTensors(iter);
// Get the world size
auto worldSize = pg->getSize();
auto rank = pg->getRank();
// Generate inputs
for (auto i = 0; i < iter; ++i) {
auto tensor = at::zeros({16, 16});
allTensors[i] = std::vector<at::Tensor>({tensor});
if (rank == 0) {
allInputTensors[i] = std::vector<std::vector<at::Tensor>>(1);
allInputTensors[i][0].resize(worldSize);
for (auto j = 0; j < worldSize; ++j) {
allInputTensors[i][0][j] = at::ones({16, 16}) * rank * i;
}
} else {
allInputTensors[i] = std::vector<std::vector<at::Tensor>>(1);
}
}
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (size_t i = 0; i < allTensors.size(); ++i) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work =
pg->scatter(allTensors[i], allInputTensors[i]);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Verify outputs
for (int i = 0; i < iter; ++i) {
for (int j = 0; j < worldSize; ++j) {
const auto expected = i * j;
auto data = allTensors[i][0].data<float>();
for (auto k = 0; k < allTensors[i][0].numel(); ++k) {
if (data[k] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
}
void testSendRecv(bool recvAnysource, int iter = 10000) {
auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
// Generate inputs
std::vector<std::vector<at::Tensor>> allTensors(iter);
auto rank = pg->getRank();
for (auto i = 0; i < iter; ++i) {
if (rank == 0) {
auto tensor = at::ones({16, 16}) * i;
allTensors[i] = std::vector<at::Tensor>({tensor});
} else {
auto tensor = at::zeros({16, 16});
allTensors[i] = std::vector<at::Tensor>({tensor});
}
}
if (rank == 0) {
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
for (auto& tensors : allTensors) {
// Kick off work
std::shared_ptr<::c10d::ProcessGroup::Work> work =
pg->send(tensors, 1, 0);
works.push_back(std::move(work));
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
}
if (rank == 1) {
std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> works;
std::vector<int> srcRanks(allTensors.size(), -1);
size_t i = 0;
for (auto& tensors : allTensors) {
// Kick off work
if (!recvAnysource) {
std::shared_ptr<::c10d::ProcessGroup::Work> work =
pg->recv(tensors, 0, 0);
works.push_back(std::move(work));
} else {
std::shared_ptr<::c10d::ProcessGroup::Work> work =
pg->recvAnysource(tensors, &srcRanks[i], 0);
works.push_back(std::move(work));
}
++i;
}
for (auto& work : works) {
// Wait for work to complete
if (!work->wait()) {
std::cerr << "Exception received: " << work->exception().what()
<< std::endl;
pg->abort();
}
}
// Verify outputs
for (int i = 0; i < iter; ++i) {
if (recvAnysource && srcRanks[i] != 0) {
throw std::runtime_error("src rank is wrong for recvAnysource");
}
const auto expected = i;
auto data = allTensors[i][0].data<float>();
for (auto j = 0; j < allTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
throw std::runtime_error("BOOM!");
}
}
}
}
}
int main(int argc, char** argv) {
#ifdef MPIEXEC
// If we are within an openmpi mpirun, then skip the exec
if (!std::getenv("OMPI_COMM_WORLD_SIZE")) {
std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl;
execl(STR(MPIEXEC), "-np 2", argv[0]);
}
testAllreduce();
testBroadcast();
testReduce();
testAllgather();
testGather();
testScatter();
testSendRecv(false);
testSendRecv(true);
std::cout << "Test successful" << std::endl;
#else
std::cout << "MPI executable not found, skipping test" << std::endl;
#endif
return EXIT_SUCCESS;
}