| #include <signal.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| #include <unistd.h> |
| |
| #include <condition_variable> |
| #include <iostream> |
| #include <mutex> |
| #include <sstream> |
| #include <thread> |
| |
| #include <gloo/transport/tcp/device.h> |
| |
| #include <c10d/CUDAUtils.hpp> |
| #include <c10d/FileStore.hpp> |
| #include <c10d/ProcessGroupGloo.hpp> |
| #include <c10d/test/TestUtils.hpp> |
| |
| using namespace c10d::test; |
| |
| class SignalTest { |
| public: |
| SignalTest(const std::string& path) : path_(path) {} |
| |
| ~SignalTest() { |
| if (arm_.joinable()) { |
| arm_.join(); |
| } |
| } |
| |
| // Arms test to send signal to PID when the semaphore unlocks. This |
| // happens as soon as the first collective completes successfully. |
| void arm(int pid, int signal) { |
| arm_ = std::move(std::thread([=] { |
| sem_.wait(); |
| kill(pid, signal); |
| })); |
| } |
| |
| std::shared_ptr<::c10d::ProcessGroup::Work> run(int rank, int size) { |
| auto store = std::make_shared<::c10d::FileStore>(path_); |
| |
| // Use tiny timeout to make this test run fast |
| ::c10d::ProcessGroupGloo::Options options; |
| options.timeout = std::chrono::milliseconds(50); |
| |
| ::c10d::ProcessGroupGloo pg(store, rank, size, options); |
| |
| // Initialize tensor list |
| std::vector<at::Tensor> tensors = { |
| at::ones({16, 16}, at::TensorOptions(at::CPU(at::kFloat))), |
| }; |
| |
| // Loop until an exception happens |
| std::shared_ptr<::c10d::ProcessGroup::Work> work; |
| while (true) { |
| work = pg.allreduce(tensors); |
| if (!work->wait()) { |
| break; |
| } |
| sem_.post(); |
| } |
| |
| return std::move(work); |
| } |
| |
| protected: |
| std::string path_; |
| std::thread arm_; |
| Semaphore sem_; |
| }; |
| |
| std::shared_ptr<::c10d::ProcessGroup::Work> testSignal( |
| const std::string& path, |
| int signal) { |
| Fork fork; |
| if (fork.isChild()) { |
| SignalTest test(path); |
| test.run(1, 2); |
| exit(1); |
| } |
| |
| SignalTest test(path); |
| test.arm(fork.pid, signal); |
| return test.run(0, 2); |
| } |
| |
| class CollectiveTest { |
| public: |
| static std::vector<CollectiveTest> initialize( |
| const std::string& path, |
| int num) { |
| std::vector<CollectiveTest> tests; |
| for (auto i = 0; i < num; i++) { |
| tests.push_back(std::move(CollectiveTest(path))); |
| } |
| |
| std::vector<std::thread> threads; |
| for (auto i = 0; i < num; i++) { |
| threads.push_back(std::move( |
| std::thread([i, &tests] { tests[i].start(i, tests.size()); }))); |
| } |
| for (auto& thread : threads) { |
| thread.join(); |
| } |
| |
| return std::move(tests); |
| } |
| |
| CollectiveTest(const std::string& path) : path_(path) {} |
| |
| CollectiveTest(CollectiveTest&& other) { |
| path_ = std::move(other.path_); |
| pg_ = std::move(other.pg_); |
| } |
| |
| ::c10d::ProcessGroupGloo& getProcessGroup() { |
| return *pg_; |
| } |
| |
| void start(int rank, int size) { |
| auto store = std::make_shared<::c10d::FileStore>(path_); |
| |
| // Use tiny timeout to make this test run fast |
| ::c10d::ProcessGroupGloo::Options options; |
| options.timeout = std::chrono::milliseconds(50); |
| |
| pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( |
| new ::c10d::ProcessGroupGloo(store, rank, size, options)); |
| } |
| |
| protected: |
| std::string path_; |
| std::unique_ptr<::c10d::ProcessGroupGloo> pg_; |
| }; |
| |
| std::vector<std::vector<at::Tensor>> copyTensors( |
| const std::vector<std::vector<at::Tensor>>& inputs) { |
| std::vector<std::vector<at::Tensor>> outputs(inputs.size()); |
| for (size_t i = 0; i < inputs.size(); i++) { |
| const auto& input = inputs[i]; |
| std::vector<at::Tensor> output(input.size()); |
| for (size_t j = 0; j < input.size(); j++) { |
| output[j] = input[j].cpu(); |
| } |
| outputs[i] = std::move(output); |
| } |
| return outputs; |
| } |
| |
| void testAllreduce(const std::string& path, const at::Backend b) { |
| const auto size = 4; |
| auto tests = CollectiveTest::initialize(path, size); |
| |
| // Generate inputs |
| std::vector<std::vector<at::Tensor>> inputs(size); |
| for (auto i = 0; i < size; i++) { |
| auto tensor = |
| at::ones({16, 16}, at::TensorOptions(at::getType(b, at::kFloat))) * i; |
| inputs[i] = std::vector<at::Tensor>({tensor}); |
| } |
| |
| // Kick off work |
| std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> work(size); |
| for (auto i = 0; i < size; i++) { |
| work[i] = tests[i].getProcessGroup().allreduce(inputs[i]); |
| } |
| |
| // Wait for work to complete |
| for (auto i = 0; i < size; i++) { |
| if (!work[i]->wait()) { |
| throw work[i]->exception(); |
| } |
| } |
| |
| // Verify outputs |
| const auto expected = (size * (size - 1)) / 2; |
| auto outputs = copyTensors(inputs); |
| for (auto i = 0; i < size; i++) { |
| auto& tensor = outputs[i][0]; |
| auto data = tensor.data<float>(); |
| for (auto j = 0; j < tensor.numel(); j++) { |
| if (data[j] != expected) { |
| throw std::runtime_error("BOOM!"); |
| } |
| } |
| } |
| } |
| |
| void testBroadcast(const std::string& path, const at::Backend b) { |
| const auto size = 2; |
| const auto stride = 2; |
| auto tests = CollectiveTest::initialize(path, size); |
| |
| std::vector<std::vector<at::Tensor>> inputs(size); |
| const auto& type = at::getType(b, at::kFloat); |
| |
| // Try every permutation of root rank and root tensoro |
| for (auto i = 0; i < size; i++) { |
| for (auto j = 0; j < stride; j++) { |
| // Initialize inputs |
| for (auto k = 0; k < size; k++) { |
| inputs[k].resize(stride); |
| at::DeviceGuard deviceGuard; |
| for (auto l = 0; l < stride; l++) { |
| if (type.is_cuda()) { |
| deviceGuard.set_index(l); |
| } |
| inputs[k][l] = |
| at::ones({16, 16}, at::TensorOptions(type)) * (k * stride + l); |
| } |
| } |
| |
| ::c10d::BroadcastOptions options; |
| options.rootRank = i; |
| options.rootTensor = j; |
| |
| // Kick off work |
| std::vector<std::shared_ptr<::c10d::ProcessGroup::Work>> work(size); |
| for (auto i = 0; i < size; i++) { |
| work[i] = tests[i].getProcessGroup().broadcast(inputs[i], options); |
| } |
| |
| // Wait for work to complete |
| for (auto i = 0; i < size; i++) { |
| if (!work[i]->wait()) { |
| throw work[i]->exception(); |
| } |
| } |
| |
| // Verify outputs |
| const auto expected = (i * stride + j); |
| auto outputs = copyTensors(inputs); |
| for (auto k = 0; k < size; k++) { |
| for (auto l = 0; l < stride; l++) { |
| auto& tensor = outputs[k][l]; |
| auto data = tensor.data<float>(); |
| for (auto n = 0; n < tensor.numel(); n++) { |
| if (data[n] != expected) { |
| throw std::runtime_error("BOOM!"); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| int main(int argc, char** argv) { |
| { |
| TemporaryFile file; |
| auto work = testSignal(file.path, SIGSTOP); |
| auto& ex = work->exception(); |
| std::cout << "SIGSTOP test got: " << ex.what() << std::endl; |
| } |
| |
| { |
| TemporaryFile file; |
| auto work = testSignal(file.path, SIGKILL); |
| auto& ex = work->exception(); |
| std::cout << "SIGKILL test got: " << ex.what() << std::endl; |
| } |
| |
| { |
| TemporaryFile file; |
| testAllreduce(file.path, at::Backend::CPU); |
| } |
| |
| { |
| TemporaryFile file; |
| testAllreduce(file.path, at::Backend::CUDA); |
| } |
| |
| { |
| TemporaryFile file; |
| testBroadcast(file.path, at::Backend::CPU); |
| } |
| |
| { |
| TemporaryFile file; |
| testBroadcast(file.path, at::Backend::CUDA); |
| } |
| |
| return 0; |
| } |