|  | #include <c10/cuda/CUDAGuard.h> | 
|  | #include <c10/util/irange.h> | 
|  |  | 
|  | #include <ATen/cuda/CUDAContext.h> | 
|  | #include <gtest/gtest.h> | 
|  | #include <torch/csrc/distributed/c10d/FileStore.hpp> | 
|  | #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp> | 
|  | #include "CUDATest.hpp" | 
|  | #include "TestUtils.hpp" | 
|  |  | 
|  | using namespace c10d::test; | 
|  |  | 
|  | using at::cuda::CUDAStream; | 
|  |  | 
|  | template <typename T, typename... Args> | 
|  | std::vector<T> initialize(const std::string& path, int N, Args&&... args) { | 
|  | std::vector<T> tests; | 
|  | for (C10_UNUSED const auto i : c10::irange(N)) { | 
|  | tests.push_back(std::move(T(path, std::forward<Args>(args)...))); | 
|  | } | 
|  |  | 
|  | std::vector<std::thread> threads; | 
|  | for (C10_UNUSED const auto i : c10::irange(N)) { | 
|  | threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); })); | 
|  | } | 
|  |  | 
|  | for (auto& thread : threads) { | 
|  | thread.join(); | 
|  | } | 
|  |  | 
|  | return tests; | 
|  | } | 
|  |  | 
|  | class AsyncTest { | 
|  | public: | 
|  | AsyncTest(std::string path) : path_(std::move(path)) {} | 
|  |  | 
|  | AsyncTest(AsyncTest&& other) { | 
|  | path_ = std::move(other.path_); | 
|  | pg_ = std::move(other.pg_); | 
|  | } | 
|  |  | 
|  | ::c10d::ProcessGroupGloo& getProcessGroup() { | 
|  | return *pg_; | 
|  | } | 
|  |  | 
|  | void start(int rank, int size) { | 
|  | auto store = c10::make_intrusive<::c10d::FileStore>(path_, size); | 
|  |  | 
|  | // Use tiny timeout to make this test run fast | 
|  | auto options = ::c10d::ProcessGroupGloo::Options::create(); | 
|  | options->timeout = std::chrono::milliseconds(50); | 
|  | options->devices.push_back( | 
|  | ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1")); | 
|  |  | 
|  | pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( | 
|  | new ::c10d::ProcessGroupGloo(store, rank, size, options)); | 
|  | } | 
|  |  | 
|  | protected: | 
|  | std::string path_; | 
|  | std::unique_ptr<::c10d::ProcessGroupGloo> pg_; | 
|  | }; | 
|  |  | 
|  | class AsyncInputIsOutputTest : public AsyncTest { | 
|  | public: | 
|  | AsyncInputIsOutputTest(const std::string& path, int numTensors) | 
|  | : AsyncTest(path), | 
|  | numTensors_(numTensors), | 
|  | numDevices_(cudaNumDevices()) { | 
|  | // Allocate inputs on available devices in a round robin fashion. | 
|  | ::at::globalContext().lazyInitCUDA(); | 
|  | inputs_.resize(numTensors_); | 
|  | for (const auto i : c10::irange(numTensors_)) { | 
|  | inputs_[i] = at::empty( | 
|  | {16, 16}, | 
|  | at::device( | 
|  | {at::kCUDA, static_cast<c10::DeviceIndex>(i % numDevices_)})); | 
|  | } | 
|  |  | 
|  | // Allocate a stream per device. | 
|  | // | 
|  | // The "current stream" is set globally per device in THC, so we | 
|  | // can't make two tensors on the same device use different streams | 
|  | // and pass this along to the collective (since it uses the THC | 
|  | // getters to retrieve the current stream). | 
|  | // | 
|  | at::cuda::OptionalCUDAGuard deviceGuard; | 
|  | streams_.reserve(numDevices_); | 
|  | for (const auto i : c10::irange(numDevices_)) { | 
|  | deviceGuard.set_index(i); | 
|  | streams_.push_back(at::cuda::getStreamFromPool()); | 
|  | } | 
|  | } | 
|  |  | 
|  | void wait(c10::intrusive_ptr<c10d::Work>& work) { | 
|  | c10::cuda::CUDAMultiStreamGuard guard(streams_); | 
|  | work->wait(); | 
|  | } | 
|  |  | 
|  | std::vector<at::Tensor> getCpuTensors( | 
|  | const std::vector<at::Tensor>& gpu_tensors) { | 
|  | std::vector<at::Tensor> outputs(gpu_tensors.size()); | 
|  |  | 
|  | // For the duration of this function, make THC use our streams | 
|  | c10::cuda::CUDAMultiStreamGuard guard(streams_); | 
|  |  | 
|  | // Copy inputs to outputs | 
|  | for (unsigned i = 0; i < gpu_tensors.size(); i++) { | 
|  | outputs[i] = gpu_tensors[i].cpu(); | 
|  | } | 
|  |  | 
|  | return outputs; | 
|  | } | 
|  |  | 
|  | std::vector<at::Tensor> getTensors() { | 
|  | return getCpuTensors(inputs_); | 
|  | } | 
|  |  | 
|  | protected: | 
|  | const int numTensors_; | 
|  | const int numDevices_; | 
|  | std::vector<at::Tensor> inputs_; | 
|  | std::vector<CUDAStream> streams_; | 
|  | }; | 
|  |  | 
|  | class AsyncAllreduceTest : public AsyncInputIsOutputTest { | 
|  | public: | 
|  | AsyncAllreduceTest(const std::string& path, int numTensors) | 
|  | : AsyncInputIsOutputTest(path, numTensors) {} | 
|  |  | 
|  | c10::intrusive_ptr<c10d::Work> run() { | 
|  | // For the duration of this function, make THC use our streams | 
|  | c10::cuda::CUDAMultiStreamGuard guard(streams_); | 
|  |  | 
|  | // Launch sleep on every stream | 
|  | at::cuda::OptionalCUDAGuard deviceGuard; | 
|  | for (const auto i : c10::irange(numDevices_)) { | 
|  | deviceGuard.set_index(i); | 
|  | cudaSleep(streams_[i], 10 * 1000 * 1000); | 
|  | } | 
|  |  | 
|  | // Launch value initialization for every tensor | 
|  | for (const auto i : c10::irange(numTensors_)) { | 
|  | deviceGuard.set_index(i % numDevices_); | 
|  | inputs_[i].fill_(pg_->getRank() * numTensors_ + i); | 
|  | } | 
|  |  | 
|  | return pg_->allreduce(inputs_); | 
|  | } | 
|  | }; | 
|  |  | 
|  | class AsyncBroadcastTest : public AsyncInputIsOutputTest { | 
|  | public: | 
|  | AsyncBroadcastTest(const std::string& path, int numTensors) | 
|  | : AsyncInputIsOutputTest(path, numTensors) {} | 
|  |  | 
|  | c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) { | 
|  | // For the duration of this function, make THC use our streams | 
|  | c10::cuda::CUDAMultiStreamGuard guard(streams_); | 
|  |  | 
|  | // Launch sleep on every stream | 
|  | at::cuda::OptionalCUDAGuard deviceGuard; | 
|  | for (const auto i : c10::irange(numDevices_)) { | 
|  | deviceGuard.set_index(i); | 
|  | cudaSleep(streams_[i], 10 * 1000 * 1000); | 
|  | } | 
|  |  | 
|  | // Launch value initialization for every tensor | 
|  | for (const auto i : c10::irange(numTensors_)) { | 
|  | deviceGuard.set_index(i % numDevices_); | 
|  | inputs_[i].fill_(pg_->getRank() * numTensors_ + i); | 
|  | } | 
|  |  | 
|  | ::c10d::BroadcastOptions options; | 
|  | options.rootRank = rootRank; | 
|  | options.rootTensor = rootTensor; | 
|  | return pg_->broadcast(inputs_, options); | 
|  | } | 
|  | }; | 
|  |  | 
|  | void runAsyncAllreduceTest( | 
|  | const std::string& path, | 
|  | size_t numProcesses = 4, | 
|  | size_t numTensors = 2) { | 
|  | auto tests = initialize<AsyncAllreduceTest>(path, numProcesses, numTensors); | 
|  | std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses); | 
|  | for (const auto i : c10::irange(numProcesses)) { | 
|  | work[i] = tests[i].run(); | 
|  | } | 
|  |  | 
|  | // Wait for work to complete | 
|  | for (const auto i : c10::irange(numProcesses)) { | 
|  | tests[i].wait(work[i]); | 
|  | } | 
|  |  | 
|  | // Check results | 
|  | for (const auto i : c10::irange(numProcesses)) { | 
|  | const auto size = numProcesses * numTensors; | 
|  | const auto expected = (size * (size - 1)) / 2; | 
|  | auto tensors = tests[i].getTensors(); | 
|  | auto results = tests[i].getCpuTensors(work[i]->result()); | 
|  | EXPECT_EQ(tensors.size(), results.size()); | 
|  |  | 
|  | for (const auto j : c10::irange(tensors.size())) { | 
|  | auto& tensor = tensors[j]; | 
|  | auto data = tensor.data_ptr<float>(); | 
|  |  | 
|  | auto& result_tensor = results[j]; | 
|  | auto result_data = result_tensor.data_ptr<float>(); | 
|  |  | 
|  | EXPECT_EQ(tensor.numel(), result_tensor.numel()); | 
|  |  | 
|  | for (const auto k : c10::irange(tensor.numel())) { | 
|  | EXPECT_EQ(data[k], expected); | 
|  | EXPECT_EQ(result_data[k], expected); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void runAsyncBroadcastTest( | 
|  | const std::string& path, | 
|  | size_t numProcesses = 4, | 
|  | size_t numTensors = 1) { | 
|  | auto tests = initialize<AsyncBroadcastTest>(path, numProcesses, numTensors); | 
|  |  | 
|  | // Try every permutation of root rank and root tensor | 
|  | for (const auto rootRank : c10::irange(numProcesses)) { | 
|  | for (const auto rootTensor : c10::irange(numTensors)) { | 
|  | std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses); | 
|  | for (const auto i : c10::irange(numProcesses)) { | 
|  | work[i] = tests[i].run(rootRank, rootTensor); | 
|  | } | 
|  |  | 
|  | // Wait for work to complete | 
|  | for (const auto i : c10::irange(numProcesses)) { | 
|  | tests[i].wait(work[i]); | 
|  | } | 
|  |  | 
|  | // Check results | 
|  | const auto expected = (rootRank * numTensors + rootTensor); | 
|  | for (const auto i : c10::irange(numProcesses)) { | 
|  | auto tensors = tests[i].getTensors(); | 
|  | for (const auto& tensor : tensors) { | 
|  | const auto* const data = tensor.const_data_ptr<float>(); | 
|  | for (const auto k : c10::irange(tensor.numel())) { | 
|  | EXPECT_EQ(data[k], expected); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | #ifdef USE_CUDA | 
|  | TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) { | 
|  | if (!at::cuda::is_available()) { | 
|  | LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce"; | 
|  | return; | 
|  | } | 
|  | TemporaryFile file; | 
|  | runAsyncAllreduceTest(file.path); | 
|  | } | 
|  |  | 
|  | TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) { | 
|  | if (!at::cuda::is_available()) { | 
|  | LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast"; | 
|  | return; | 
|  | } | 
|  | TemporaryFile file; | 
|  | runAsyncBroadcastTest(file.path); | 
|  | } | 
|  | #endif |