blob: 3a1453e1b2c845d084759a2ad75dbbf13dc38ba6 [file] [log] [blame]
#pragma once
#include <condition_variable>
#include <deque>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>
#include <gloo/algorithm.h>
#include <gloo/common/error.h>
#include <gloo/context.h>
#include <gloo/rendezvous/store.h>
#include <gloo/transport/device.h>
#include <torch/csrc/utils/hash.h>
#include "CUDAUtils.hpp"
#include "ProcessGroup.hpp"
#include "Store.hpp"
#include "Types.hpp"
#include "Utils.hpp"
// Forward declaration
struct THCState;
namespace c10d {
// AlgorithmKey is a const identifier for a Gloo algorithm.
//
// It captures the set of participating devices, the source device,
// destination device, source rank, destination rank, reduction type
// (if applicable), etcetera. This key is used to cache instances of a
// Gloo algorithm for reuse. The number of cached instances can vary
// over time and is agreed upon between all processes in the group.
//
// When we're dealing with multiple entries per key, it is also used
// to broadcast the number of entries such that all processes agree.
//
struct AlgorithmKey {
bool operator==(const AlgorithmKey& other) const {
return (collectiveType == other.collectiveType) && (type == other.type) &&
(devices == other.devices) && (srcSizes == other.srcSizes) &&
(dstSizes == other.dstSizes) && (srcRank == other.srcRank) &&
(dstRank == other.dstRank) && (srcTensor == other.srcTensor) &&
(dstTensor == other.dstTensor) && (reduceOp == other.reduceOp);
}
CollectiveType collectiveType = CollectiveType::UNUSED;
at::Type* type = nullptr;
std::vector<int> devices;
std::vector<std::vector<int64_t>> srcSizes;
std::vector<std::vector<int64_t>> dstSizes;
int srcRank = -1;
int dstRank = -1;
int srcTensor = -1;
int dstTensor = -1;
ReduceOp reduceOp = ReduceOp::UNUSED;
// This function is called by torch::hash<AlgorithmKey>
static size_t hash(const AlgorithmKey& k) {
return torch::get_hash(
k.collectiveType,
k.type,
k.devices,
k.srcSizes,
k.dstSizes,
k.srcRank,
k.dstRank,
k.srcTensor,
k.dstTensor,
k.reduceOp);
}
};
// AlgorithmEntry is the state associated with a single algorithm instance.
//
// Keeping Gloo algorithms around for reuse is a win, since most of
// them end up allocating some memory, constructing them takes some
// time, and they may do some I/O (to setup communication buffers
// between processes). Also, until it supports executing on arbitrary
// memory, we need to hold on to memory that we instantiated the
// algorithm with. The lifecycle of memory in ATen is arbitrary, so to
// do caching, this entry holds on to memory that we copy to/from.
//
// Every unique call (in terms of number of tensors, tensor types,
// tensor sizes, etc.) gets its own entry. In the future we may extend
// this to allow multiple entries per unique call, to better exploit
// parallelism for calls with the same signature.
//
struct AlgorithmEntry {
AlgorithmKey key;
std::unique_ptr<::gloo::Algorithm> algorithm;
std::vector<at::Tensor> src;
std::vector<at::Tensor> dst;
std::function<void()> run;
// For CUDA tensors, the following happens:
//
// - Input tensor A is copied to persistent tensor B on the stream
// associated with the device that stores A (the stream is a
// per-device thread local stored by THC).
// - This stream is recorded in an event (see events below) so that
// the copy can be synchronized.
// - The private stream (see streams below) that is used to execute
// the algorithm on a worker thread waits for this event such that
// we know the copy to tensor B has finished.
// - Once the algorithm has finished executing, the work object
// associated with the execution records the private streams in
// its own events. Then, when the wait() function on the work
// object is called, the streams of the caller are synchronized
// with asynchronous completion of the memory copies back to the
// destination tensors.
//
// This approach means the caller of the process group function can
// retain asynchrony (no need for synchronizing its CUDA streams).
// Once the wait() function on the associated work object returns
// true, the caller can launch new CUDA kernels and they will be
// correctly sequenced.
//
std::vector<CUDAStream> streams;
std::vector<CUDAEvent> events;
// Used to synchronize between calling thread and worker threads.
std::mutex m;
std::condition_variable cv;
bool busy = false;
// Default constructor must be specified.
AlgorithmEntry() = default;
// Must not be copyable.
// This is implied by the std::unique_ptr member field, but serves
// as documentation in case it ever is removed.
AlgorithmEntry& operator=(const AlgorithmEntry&) = delete;
AlgorithmEntry(const AlgorithmEntry&) = delete;
};
} // namespace c10d
namespace c10d {
// ProcessGroupGloo implements Gloo bindings for c10d.
//
// All functions on this class are expected to be called in the same
// order across processes in the group. This is the only way that we
// can guarantee to match up the same calls across processes. For
// multi-threaded usage of process groups, you can use consider using
// multiple process group instances.
//
// The Gloo algorithms that this class calls into are cached by their
// signature (see description of AlgorithmKey above). This cache works
// as follows: every function call instantiates an AlgorithmKey and
// looks in the cache for existing entries. If there is one, it is
// removed from the cache and returned to the caller. If there are
// none, a new entry is created and returned. If an entry was created
// before, but is still in use, the call will block and wait until the
// entry is returned to the cache.
//
// In the future, we hope to extend this to allow multiple entries per
// key, to enable parallelism for a single key. The number of entries
// per key must always be identical for all processes. This maximum
// number can be automatically tuned, but only if we let a single
// process take charge, and have it broadcast the limits.
//
class ProcessGroupGloo : public ProcessGroup {
public:
class WorkGloo : public ProcessGroup::Work {
public:
explicit WorkGloo();
virtual ~WorkGloo();
bool isCompleted() const override;
bool isSuccess() const override;
void synchronize() override;
bool wait() override;
const std::exception& exception() const override;
protected:
void finish(const AlgorithmEntry& entry);
void finishWithException(const ::gloo::Exception& ex);
std::mutex m_;
std::condition_variable cv_;
std::atomic<bool> completed_;
// Use pointer to ::gloo::Exception because it doesn't have a
// default constructor and constructing an empty std::unique_ptr
// is probably cheaper (this is highly speculative).
std::unique_ptr<::gloo::Exception> ex_;
// List of devices and events so that we can synchronize the
// streams of the caller with the kernels that were launched
// asynchronously to finish this operation.
//
// These events are private to a single work instance. An event
// captures the progress of a stream at a single point in time. If
// we were to use events stored on the algorithm entry, then
// multiple work instances might end up using the same events, and
// end up interfering with each other (causing unnecessary
// synchronization delays). Using events that are private to a
// single work instance avoids this. Ad hoc benchmarks showed that
// event construction is relatively cheap: creating 8 events takes
// 3 microseconds on a fast machine.
//
// Also see CUDA comment in AlgorithmEntry struct.
//
bool cuda_;
std::vector<int> devices_;
std::vector<CUDAEvent> events_;
friend class ProcessGroupGloo;
};
struct Options {
explicit Options();
std::vector<std::shared_ptr<::gloo::transport::Device>> devices;
std::chrono::milliseconds timeout;
int threads;
};
explicit ProcessGroupGloo(
const std::shared_ptr<Store>& store,
int rank,
int size,
Options options = Options());
virtual ~ProcessGroupGloo();
std::shared_ptr<Work> broadcast(
std::vector<at::Tensor>& data,
const BroadcastOptions& opts = BroadcastOptions()) override;
std::shared_ptr<Work> allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;
protected:
using KeyType = AlgorithmKey;
using EntryType = std::unique_ptr<AlgorithmEntry>;
using HashType = torch::hash<AlgorithmKey>;
using WorkType = std::tuple<AlgorithmEntry*, std::shared_ptr<WorkGloo>>;
std::unique_ptr<::gloo::rendezvous::Store> store_;
std::vector<std::shared_ptr<::gloo::Context>> contexts_;
std::vector<std::thread> threads_;
bool stop_;
void runLoop(void);
void runSingle(WorkType work);
void createAlgorithm(AlgorithmEntry& entry);
template <typename T>
void createAllreduce(AlgorithmEntry& entry);
template <typename T>
void createBroadcast(AlgorithmEntry& entry);
// Construct creates AlgorithmEntry for specified key.
EntryType construct(const KeyType& key);
// Checkout constructs new AlgorithmEntry or returns existing one.
AlgorithmEntry* checkout(const KeyType& key);
std::unordered_map<KeyType, EntryType, HashType> cache_;
std::shared_ptr<Work> enqueue(AlgorithmEntry* entry);
std::deque<WorkType> queue_;
std::mutex queueMutex_;
std::condition_variable queueProduceCV_;
std::condition_variable queueConsumeCV_;
// Store copy of pointer to THCState retrieved from ::at::globalContext().
THCState* thcState_;
};
} // namespace c10d