| /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ==============================================================================*/ |
| #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_BASE_COLLECTIVE_EXECUTOR_H_ |
| #define TENSORFLOW_CORE_COMMON_RUNTIME_BASE_COLLECTIVE_EXECUTOR_H_ |
| |
| #include <memory> |
| #include <string> |
| |
| #include "tensorflow/core/common_runtime/buf_rendezvous.h" |
| #include "tensorflow/core/framework/collective.h" |
| #include "tensorflow/core/framework/device_attributes.pb.h" |
| |
| namespace tensorflow { |
| class CollectiveImplementation; |
| class DeviceMgr; |
| class Device; |
| |
| // Helper interface that aliases regular subfields of a Tensor as separate |
| // Tensors for in-place update. |
| class CollectiveAdapter { |
| public: |
| virtual ~CollectiveAdapter() {} |
| |
| // Move the backing tensor to 'output' with its original storage and |
| // shape. After this call this CollectiveAdapter object should be |
| // deleted immediately without calling any of its other methods. |
| virtual void ConsumeFinalValue(Tensor* output) = 0; |
| |
| // const access to entire intermediate value for debugging |
| virtual const Tensor& Value() const = 0; |
| |
| // Returns tensor for chunk i which aliases the backing buffer. |
| virtual Tensor ChunkAlias(int i) = 0; |
| |
| // Returns tensor allocated on the same device but with its own |
| // separate backing buffer. Will have same type and size as |
| // chunk i. |
| virtual Tensor TempChunk(int i) const = 0; |
| |
| // Bytes in chunk i |
| virtual int64 ChunkBytes(int i) const = 0; |
| |
| // Generate a CPU RAM scalar tensor of the same DataType as the |
| // backing tensor with the given integer value. |
| virtual Tensor Scalar(int v) const = 0; |
| |
| // Generate a scalar tensor of same DataType and on the same device |
| // as the backing tensor. |
| virtual Tensor Scalar(Allocator* a, |
| const AllocationAttributes& attr) const = 0; |
| |
| // Debugging string describing buffer location |
| virtual string TBounds(const Tensor& t) const = 0; |
| |
| virtual string DebugString() const = 0; |
| |
| // Computes the number of elements per alias chunk tensor. |
| // |
| // A CHECK in tensor.cc expects that the memory buffer backing a |
| // Tensor will be aligned according to EIGEN_MAX_ALIGN_BYTES. To |
| // ensure that all chunk aliasing Tensors maintain this alignment we |
| // need to pick a chunk size that preserves it. Note than in extreme |
| // cases (impractical, but possible with very small tensors) one or |
| // more tail chunks can end up emptby. |
| static int64 AlignedChunkElts(int64 elt_bytes, int64 total_elts, |
| int64 num_chunks); |
| }; |
| |
| // Create a CollectiveAdaptor wrapping 'output', specialized to its |
| // data-type and shape. If align_chunks == true then chunk size may |
| // be larger than output->NumElements() / num_chunks and one or more |
| // of the suffix chunks may be empty. Chunks will be arranged to start |
| // and end on alignment boundaries. If align_chunks == false then |
| // output->NumElements() % num_chunks must be 0 and all chunks will |
| // have exactly the same size, ignoring alignment issues. |
| CollectiveAdapter* MakeCollectiveAdapter(Tensor* output, int num_chunks, |
| Allocator* allocator, |
| bool align_chunks = true); |
| |
| // Default implementation of CollectiveExecutor. Delegates the actual |
| // work of moving data to a class specialized for the operation type, |
| // arguments and device+interconnect topology. |
| class BaseCollectiveExecutor : public CollectiveExecutor { |
| public: |
| BaseCollectiveExecutor(CollectiveExecutorMgrInterface* cem, |
| PerStepCollectiveRemoteAccess* remote_access, |
| int64 step_id, const DeviceMgr* dev_mgr, |
| const string* gpu_ring_order) |
| : CollectiveExecutor(cem), |
| step_id_(step_id), |
| dev_mgr_(dev_mgr), |
| remote_access_(remote_access), |
| gpu_ring_order_(gpu_ring_order) {} |
| |
| ~BaseCollectiveExecutor() override; |
| |
| void StartAbort(const Status& s) override; |
| |
| void ExecuteAsync(OpKernelContext* ctx, const CollectiveParams& col_params, |
| const string& exec_key, StatusCallback done) override; |
| |
| void CompleteParamsAsync(const string& device, CollectiveParams* cp, |
| CancellationManager* cancel_mgr, |
| StatusCallback done) override; |
| |
| PerStepCollectiveRemoteAccess* remote_access() override { |
| return remote_access_.get(); |
| } |
| |
| void RecvFromPeer(const string& peer_device, const string& peer_task, |
| bool peer_is_local, const string& key, Device* to_device, |
| DeviceContext* to_device_ctx, |
| const AllocatorAttributes& to_alloc_attr, Tensor* to_tensor, |
| const DeviceLocality& client_locality, int stream_index, |
| const StatusCallback& done) override { |
| remote_access_->RecvFromPeer( |
| peer_device, peer_task, peer_is_local, key, to_device, to_device_ctx, |
| to_alloc_attr, to_tensor, client_locality, stream_index, done); |
| } |
| |
| void PostToPeer(const string& peer_device, const string& peer_task, |
| const string& key, Device* from_device, |
| DeviceContext* from_device_ctx, |
| const AllocatorAttributes& from_alloc_attr, |
| const Tensor* from_tensor, |
| const DeviceLocality& client_locality, |
| const StatusCallback& done) override { |
| remote_access_->PostToPeer(peer_device, peer_task, key, from_device, |
| from_device_ctx, from_alloc_attr, from_tensor, |
| client_locality, done); |
| } |
| |
| // If we need to enforce an ordering on any portion of collective |
| // implementation, and the ordering is encoded via attribute on the collective |
| // op, this function will block until all dependencies for this collective |
| // have completed. |
| void WaitForDependencies(const CollectiveParams& col_params) override; |
| // Record that this collective has completed the portion of the implementation |
| // that needs to be ordered wrt other collectives, to unblock any of its |
| // dependent ops. |
| void Launched(const CollectiveParams& col_params) override; |
| |
| protected: |
| const int64 step_id_; |
| const DeviceMgr* dev_mgr_; // Not owned. |
| std::unique_ptr<PerStepCollectiveRemoteAccess> remote_access_; |
| const string* gpu_ring_order_; // Not owned. |
| mutex launch_mu_; |
| condition_variable launch_cv_; |
| // collective instance key -> number of local devices for which NCCL ops have |
| // been launched. |
| std::unordered_map<int32, int32> launched_ GUARDED_BY(launch_mu_); |
| |
| private: |
| Status CreateCollective(const CollectiveParams& col_params, |
| CollectiveImplementationInterface** col_impl); |
| // Check if all ops on which this collective depends on have launched. |
| bool CheckDependencies(const CollectiveParams& col_params) |
| EXCLUSIVE_LOCKS_REQUIRED(launch_mu_); |
| }; |
| |
| } // namespace tensorflow |
| #endif // TENSORFLOW_CORE_COMMON_RUNTIME_BASE_COLLECTIVE_EXECUTOR_H_ |