blob: e225d339a4c053f880007a68d054c2a078a4aa81 [file] [log] [blame]
/**
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include "gloo/cuda_allreduce_ring_chunked.h"
#include <string.h>
#include "gloo/cuda_private.h"
#include "gloo/nccl/nccl.h"
namespace gloo {
template <typename T>
struct CudaAllreduceRingChunked<T>::ChunkContext {
ChunkContext(
CudaDevicePointer<T>&& rootDevicePtr,
T* hostPtr,
const ReductionFunction<T>* fn,
std::vector<nccl::NCCLElement<T>>&& reduceElements,
std::vector<nccl::NCCLElement<T>>&& broadcastElements)
: rootDevicePtr(std::move(rootDevicePtr)),
hostPtr(hostPtr),
length(rootDevicePtr.getCount()),
reduceOp(
nccl::NCCLExecution<T>(std::move(reduceElements)),
fn,
this->rootDevicePtr.getDeviceID()),
broadcastOp(
nccl::NCCLExecution<T>(std::move(broadcastElements)),
this->rootDevicePtr.getDeviceID()) {}
ChunkContext(ChunkContext&& other) = default;
// Instances cannot be copied or copy-assigned
ChunkContext(const ChunkContext&) = delete;
ChunkContext& operator=(const ChunkContext&) = delete;
// Pointers for copying between the device and host
CudaDevicePointer<T> rootDevicePtr;
T* hostPtr;
const size_t length;
// The NCCL operations used for local device reduce before running the
// algorithm and local device broadcast after.
nccl::ReduceOp<T> reduceOp;
nccl::BroadcastOp<T> broadcastOp;
};
template <typename T>
CudaAllreduceRingChunked<T>::CudaAllreduceRingChunked(
const std::shared_ptr<Context>& context,
const std::vector<T*>& ptrs,
const int count,
const std::vector<cudaStream_t>& streams)
: Allreduce<T>(context, nullptr),
count_(count),
bytes_(count * sizeof(T)),
synchronizeDeviceOutputs_(streams.size() == 0),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
auto newStream = true;
if (streams.size() > 0) {
GLOO_ENFORCE_EQ(streams.size(), ptrs.size());
newStream = false;
}
for (int i = 0; i < ptrs.size(); i++) {
if (newStream) {
devicePtrs_.push_back(CudaDevicePointer<T>::create(ptrs[i], count_));
} else {
devicePtrs_.push_back(
CudaDevicePointer<T>::create(ptrs[i], count_, streams[i]));
}
}
// Determine chunk size. Use chunks of no less than 1024 bytes
// (256 * sizeof(float)).
constexpr unsigned long minSize = 256;
chunks_ = this->contextSize_ * 2;
chunkSize_ = std::max(minSize, (count_ + chunks_ - 1) / chunks_);
chunkBytes_ = chunkSize_ * sizeof(T);
// Setup host and device memory
{
// Synchronize memory allocation with NCCL operations
std::lock_guard<std::mutex> lock(CudaShared::getMutex());
CUDA_CHECK(cudaMallocHost(&hostPtr_, bytes_));
}
for (auto offset = 0; offset < count_; offset += chunkSize_) {
auto length = chunkSize_;
if (offset + length <= count_) {
// Chunk completely in range, full chunk.
} else {
// Chunk partially in range, partial chunk.
length = count_ - offset;
}
// Create NCCL elements for the chunk on each device
std::vector<nccl::NCCLElement<T>> reduceElements;
std::vector<nccl::NCCLElement<T>> broadcastElements;
for (auto i = 0; i < devicePtrs_.size(); i++) {
const auto chunkPtr = *devicePtrs_[i] + offset;
const auto stream = devicePtrs_[i].getStream();
reduceElements.push_back(nccl::NCCLElement<T>(
CudaDevicePointer<T>::create(chunkPtr, length, stream),
CudaDevicePointer<T>::create(chunkPtr, length, stream)));
broadcastElements.push_back(nccl::NCCLElement<T>(
CudaDevicePointer<T>::create(chunkPtr, length, stream),
CudaDevicePointer<T>::create(chunkPtr, length, stream)));
}
// Create a device pointer for the chunk on device ptrs[0]. We will use the
// associated stream to serialize NCCL operations and device-host memcpys.
// The NCCL operation will synchronize the independent device streams with
// this master stream.
CudaDevicePointer<T> rootDevicePtr = CudaDevicePointer<T>::create(
*devicePtrs_[0] + offset, length, devicePtrs_[0].getStream());
chunkContext_.push_back(ChunkContext(
std::move(rootDevicePtr),
hostPtr_ + offset,
this->fn_,
std::move(reduceElements),
std::move(broadcastElements)));
}
// Allocate inboxes
for (auto i = 0; i < 2; i++) {
inbox_[i] = static_cast<T*>(malloc(bytes_));
}
for (auto i = 0; i < 2; i++) {
auto slot = this->context_->nextSlot();
// Buffer to send to (rank+1).
sendDataBuf_[i] = rightPair_->createSendBuffer(slot, hostPtr_, bytes_);
// Buffer that (rank-1) writes to.
recvDataBuf_[i] = leftPair_->createRecvBuffer(slot, inbox_[i], chunkBytes_);
}
// Dummy buffers for localized barrier.
// Before sending to the right, we only need to know that the node
// on the right is done using the inbox that's about to be written
// into. No need for a global barrier.
auto notificationSlot = this->context_->nextSlot();
sendNotificationBuf_ =
leftPair_->createSendBuffer(notificationSlot, &dummy_, sizeof(dummy_));
recvNotificationBuf_ =
rightPair_->createRecvBuffer(notificationSlot, &dummy_, sizeof(dummy_));
}
template <typename T>
CudaAllreduceRingChunked<T>::~CudaAllreduceRingChunked() {
{
// Synchronize memory allocation with NCCL operations
std::lock_guard<std::mutex> lock(CudaShared::getMutex());
CUDA_CHECK(cudaFreeHost(hostPtr_));
}
for (auto i = 0; i < 2; i++) {
if (inbox_[i] != nullptr) {
free(inbox_[i]);
}
}
}
template <typename T>
void CudaAllreduceRingChunked<T>::run() {
CudaDeviceGuard guard;
// Kick off local reduction for each chunk, then copy result to host.
// Make sure to iterate over the chunks in the order they will be sent.
for (auto i = 0; i < chunks_; i++) {
const auto chunkOffset = getChunkOffset(i);
if (chunkOffset < chunkContext_.size()) {
auto& context = chunkContext_[chunkOffset];
context.reduceOp.runAsync();
context.rootDevicePtr.copyToHostAsync(context.hostPtr);
}
}
// First pass reduces a chunk in each round
for (auto round = 0; round < chunks_; round++) {
const auto chunkOffset = getChunkOffset(round);
if (chunkOffset < chunkContext_.size()) {
auto& context = chunkContext_[chunkOffset];
// Wait for the local reduction and copy to host memory to complete
context.rootDevicePtr.wait();
// Reduce chunk from previous round. Nothing to do for initial rounds.
if (round >= 2) {
// Wait for inbox write to complete
recvDataBuf_[chunkOffset & 1]->waitRecv();
// Reduce
this->fn_->call(
context.hostPtr, inbox_[chunkOffset & 1], context.length);
}
} else {
// Empty chunk but still need to wait on the inbox write to ensure the
// algorithm progresses. Nothing to do for initial rounds.
if (round >= 2) {
recvDataBuf_[chunkOffset & 1]->waitRecv();
}
}
// Skip buffer passing notifications in initial rounds
if (round >= 2) {
// Send notification to node on the left that
// this node is ready for an inbox write.
sendNotificationBuf_->send();
// Wait for notification from node on the right
// to be sure this node can start an inbox write.
recvNotificationBuf_->waitRecv();
}
// Copy accumulated chunk
copyChunkAtOffset(chunkOffset);
}
// Second pass around the ring to broadcast result.
for (int round = 0; round < chunks_; round++) {
const auto chunkOffset = getChunkOffset(round);
if (chunkOffset < chunkContext_.size()) {
auto& context = chunkContext_[chunkOffset];
// End at chunks_-2 since that's where the accumulation
// stopped in the previous set of rounds.
if (round < (chunks_ - 2)) {
// Wait for inbox write to complete
recvDataBuf_[chunkOffset & 1]->waitRecv();
// Copy from inbox
memcpy(
context.hostPtr,
inbox_[chunkOffset & 1],
context.length * sizeof(T));
}
// Broadcast chunk to devices. Do this in all rounds with non-empty chunk.
context.rootDevicePtr.copyFromHostAsync(context.hostPtr);
context.broadcastOp.runAsync();
} else {
// Empty chunk but still need to wait on the inbox write to ensure the
// algorithm progresses.
if (round < (chunks_ - 2)) {
recvDataBuf_[chunkOffset & 1]->waitRecv();
}
}
// Skip copying in the last two rounds
if (round < (chunks_ - 4)) {
// Send notification to node on the left that
// this node is ready for an inbox write.
sendNotificationBuf_->send();
// Wait for notification from node on the right
// to be sure this node can start an inbox write.
recvNotificationBuf_->waitRecv();
// Copy accumulated chunks
copyChunkAtOffset(chunkOffset);
}
}
// Final barrier to make sure every node has finished
// Otherwise, a second all reduce call might interfere
// with one that it still in progress on some nodes.
sendNotificationBuf_->send();
recvNotificationBuf_->waitRecv();
// If running synchronously, wait for all chunk broadcasts to complete
if (synchronizeDeviceOutputs_) {
for (auto i = 0; i < chunks_; i++) {
const auto chunkOffset = getChunkOffset(i);
if (chunkOffset < chunkContext_.size()) {
chunkContext_[chunkOffset].broadcastOp.wait();
}
}
}
}
template <typename T>
int CudaAllreduceRingChunked<T>::getChunkOffset(int round) {
// Imagine a square grid with chunks of memory laid out vertically and nodes
// horizontally. The diagonal of this grid marks which nodes sends which
// chunks of memory in the prelude. Processing happens by moving this
// diagonal forward and have it wrap around the edge. This means that node
// with rank 0 at round 2 will process the last chunk. This explains why
// we subtract the round in the offset equation below.
//
// Because we're dealing with double buffering in this implementation, we
// have twice the number of chunks and process them in pairs. This explains
// why we ignore the LSB on the round number when subtracting it. The LSB is
// later added to flip back and forth between the two buffers for this pair
// of chunks. The number of chunks is finally added to make sure we can wrap
// correctly (no modulo against negative number).
return ((2 * this->contextRank_) - (round & ~0x1) + (round & 0x1) + chunks_) %
chunks_;
}
template <typename T>
void CudaAllreduceRingChunked<T>::copyChunkAtOffset(int chunkOffset) {
// Populate inbox of next participant in the ring.
size_t offset;
size_t length;
if (chunkOffset < chunkContext_.size()) {
const auto& context = chunkContext_[chunkOffset];
offset = chunkOffset * chunkSize_;
length = context.length;
} else {
// When nothing is put on the wire for empty chunks. @pietern
// has seen this algorithm hang. This is probably related to the
// chunk iteration order described in the run function.
// Chunk out of range, copy _something_.
offset = 0;
length = 1;
}
// Initiate write to inbox of node on the right.
sendDataBuf_[chunkOffset & 0x1]->send(offset * sizeof(T), length * sizeof(T));
}
// Instantiate template
template class CudaAllreduceRingChunked<float>;
} // namespace gloo