blob: 84e2f6ae19224a9fa4984a1a077be3c3c30f68e1 [file] [log] [blame]
/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_
#define TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_
#include <string>
#include "tensorflow/core/framework/cancellation.h"
#include "tensorflow/core/framework/control_flow.h"
#include "tensorflow/core/framework/device_base.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/lib/core/refcount.h"
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/util/device_name_utils.h"
namespace tensorflow {
// A Rendezvous is an abstraction for passing tensors from producers
// to consumers. A rendezvous is a table of channels. Each channel is
// keyed by a rendezvous key. The key encodes a pair of <producer,
// consumer>, where the producer and the consumer are tensorflow
// devices.
//
// The producer calls the Send() method to send one tensor over one
// named channel. The consumer calls the Recv() method to receive one
// tensor from a named channel. A sequence of tensors can be passed
// from the producer to the consumer. The consumer receives them in
// the order as the producer sends them.
//
// A consumer may safely request the tensor before or after it has
// been produced. A consumer has the choice of making a blocking call
// or providing a callback: in either case, the consumer receives the
// Tensor as soon as it is available. A producer never blocks.
class Rendezvous : public core::RefCounted {
public:
struct Args {
DeviceContext* device_context = nullptr;
AllocatorAttributes alloc_attrs;
CancellationManager* cancellation_manager = nullptr; // not owned.
};
// Constructs a rendezvous key for the tensor of "name" sent from
// "src_device" to "dst_device". The tensor is generated in the frame
// and iteration specified by "frame_iter".
static string CreateKey(const string& src_device, uint64 src_incarnation,
const string& dst_device, const string& name,
const FrameAndIter& frame_iter);
// Parses the key constructed by CreateKey and parse src/dst device
// names into structures respectively.
struct ParsedKey {
StringPiece src_device;
DeviceNameUtils::ParsedName src;
uint64 src_incarnation = 0;
StringPiece dst_device;
DeviceNameUtils::ParsedName dst;
StringPiece edge_name;
ParsedKey() {}
ParsedKey(const ParsedKey& b) { *this = b; }
ParsedKey& operator=(const ParsedKey& b);
StringPiece FullKey() const { return buf_; }
private:
friend class Rendezvous;
friend class SendOp;
friend class RecvOp;
string buf_;
};
static Status ParseKey(StringPiece key, ParsedKey* out);
// The caller is a tensor producer and it sends a message (a tensor
// "val" and a bool "is_dead") under the given "key".
//
// {val, is_dead} is bundled as a message sent and received.
// Typically, is_dead is set by some control flow nodes
// (e.g., a not-taken branch). args is passed by Send to the
// Recv function to communicate any information that the Recv
// function might need. This is typically only necessary for
// Send/Recv on the same worker.
//
// Send() never blocks.
virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val,
const bool is_dead) = 0;
// Callback provided by a tensor consumer waiting on the rendezvous.
// It will be invoked when the tensor is available, or when a non-OK
// status arises in the production of that tensor. It also gets
// two Rendezvous::Args, one provided by the sender, the other by the
// receiver, which may be needed when a non-CPU device is in use
// by either side.
typedef std::function<void(const Status&, const Args&, const Args&,
const Tensor&, const bool)>
DoneCallback;
virtual void RecvAsync(const ParsedKey& key, const Args& args,
DoneCallback done) = 0;
// Synchronous wrapper for RecvAsync.
Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
bool* is_dead, int64 timeout_ms);
Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
bool* is_dead);
// Aborts all pending and future Send/Recv with the given "status".
//
// StartAbort() does not wait for ongoing calls to finish.
// REQUIRES: !status.ok()
virtual void StartAbort(const Status& status) = 0;
protected:
~Rendezvous() override;
};
// Returns a Rendezvous instance that is limited to use only by
// producers and consumers in the local process. The caller assumes
// ownership of one Ref() on the returned object.
Rendezvous* NewLocalRendezvous();
} // end namespace tensorflow
#endif // TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_