| #include <c10d/comm.hpp> |
| |
| #include <deque> |
| |
| #include <ATen/core/functional.h> |
| #include <c10d/reducer.hpp> |
| #include <torch/csrc/jit/python/pybind_utils.h> |
| #include <torch/csrc/utils/tensor_flatten.h> |
| |
| namespace c10d { |
| namespace { |
| |
| class BroadcastWork { |
| public: |
| BroadcastWork( |
| const c10::intrusive_ptr<c10d::ProcessGroup>& process_group, |
| std::vector<at::Tensor> bucket_tensors, |
| int root_rank = 0) |
| : bucket_tensors_(std::move(bucket_tensors)), |
| flat_tensor_({torch::utils::flatten_dense_tensors(bucket_tensors_)}) { |
| BroadcastOptions broadcastOptions; |
| broadcastOptions.rootRank = root_rank; |
| work_ = process_group->broadcast(flat_tensor_, broadcastOptions); |
| } |
| |
| void finish() { |
| work_->wait(); |
| |
| // Copy the output of the broadcast operation back. |
| auto output_tensors = torch::utils::unflatten_dense_tensors( |
| flat_tensor_.front(), bucket_tensors_); |
| TORCH_INTERNAL_ASSERT(output_tensors.size() == bucket_tensors_.size()); |
| for (size_t i = 0; i < output_tensors.size(); i++) { |
| bucket_tensors_[i].copy_(output_tensors[i], /*non_blocking=*/true); |
| } |
| } |
| |
| protected: |
| // The list of tensors to broadcast. They are guaranteed to be |
| // placed on the same device and have the same dtype. |
| std::vector<at::Tensor> bucket_tensors_; |
| |
| // The vector with a single flattened tensor containing the contents |
| // of the tensors in bucket_tensors_. It must be stored in a vector |
| // because c10d::ProcessGroup::broadcast takes a vector argument. |
| std::vector<at::Tensor> flat_tensor_; |
| |
| private: |
| // The broadcast work that is kicked off upon construction. |
| c10::intrusive_ptr<c10d::ProcessGroup::Work> work_; |
| }; |
| |
| } // namespace |
| |
| // Broadcast many tensors to all processes in the process group. |
| void broadcast_coalesced( |
| c10::intrusive_ptr<c10d::ProcessGroup> process_group, |
| at::TensorList tensors, |
| size_t buffer_size, |
| int rank) { |
| // Coalesce tensors into buckets taking into account the maximum buffer size. |
| // This routine is multi-device aware, so the tensors can be split across |
| // multiple devices and can contain a mix of CPU and CUDA tensors. |
| const auto buckets = |
| compute_bucket_assignment_by_size(tensors.vec(), {buffer_size}); |
| |
| // Returns tensor at specified index in input tensor list. |
| const auto lookup = [&tensors](size_t index) { return tensors[index]; }; |
| |
| // We maintain a maximum of 2 in flight broadcast operations to avoid |
| // allocating too much memory (in case the specified tensors are very large). |
| std::deque<BroadcastWork> in_flight; |
| constexpr auto max_in_flight = 2; |
| for (const auto& bucket : buckets) { |
| if (in_flight.size() >= max_in_flight) { |
| in_flight.front().finish(); |
| in_flight.pop_front(); |
| } |
| |
| in_flight.emplace_back(process_group, c10::fmap(bucket, lookup), rank); |
| } |
| |
| while (!in_flight.empty()) { |
| in_flight.front().finish(); |
| in_flight.pop_front(); |
| } |
| } |
| |
| std::vector<at::Tensor> GradBucket::getPerParameterTensors() const { |
| std::vector<at::Tensor> per_parameter_tensors; |
| size_t num_parameters = offsets_.size(); |
| per_parameter_tensors.reserve(num_parameters); |
| for (size_t i = 0; i < num_parameters; ++i) { |
| per_parameter_tensors.push_back( |
| tensor_.slice(0, offsets_[i], offsets_[i] + lengths_[i]) |
| .view(sizes_vec_[i])); |
| } |
| return per_parameter_tensors; |
| } |
| |
| } // namespace c10d |