blob: bab816f2af05c12a733f31b0a2964c147b59e594 [file] [log] [blame]
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_CORE_FRAMEWORK_MODEL_H_
#define TENSORFLOW_CORE_FRAMEWORK_MODEL_H_
#include <list>
#include <memory>
#include <string>
// TODO(b/114492873): Move this include into core/platform.
#include <thread> // NOLINT
#include <utility>
#include <vector>
#include "tensorflow/core/framework/types.h"
#include "tensorflow/core/lib/gtl/cleanup.h"
#include "tensorflow/core/lib/gtl/map_util.h"
#include "tensorflow/core/lib/histogram/histogram.h"
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/platform/cpu_info.h"
#include "tensorflow/core/platform/env.h"
namespace tensorflow {
namespace data {
namespace model {
// A constant that can be used to enable auto-tuning.
constexpr int64 kAutotune = -1;
enum class AutotuneAlgorithm {
HILL_CLIMB = 0,
GRADIENT_DESCENT = 1,
};
// Represents thread-safe state that can be shared between an input pipeline and
// the performance model.
struct SharedState {
public:
SharedState(int64 value, std::shared_ptr<mutex> mu,
std::shared_ptr<condition_variable> cond_var)
: value(value),
mu(std::move(mu)),
cond_var(std::move(cond_var)),
tunable(value == kAutotune) {}
double value;
const std::shared_ptr<mutex> mu;
const std::shared_ptr<condition_variable> cond_var;
const bool tunable;
};
// Represents a parameter.
struct Parameter {
Parameter(const string& name, std::shared_ptr<SharedState> state, double min,
double max)
: name(name),
value(state->value),
min(min),
max(max),
state(std::move(state)) {}
// Human-readable name of the parameter.
const string name;
// Identifies the model value of the parameter. This can be different from
// the actual value (e.g. during optimization search).
double value;
// Identifies the minimum value of the parameter.
const double min;
// Identifies the maximum value of the parameter.
const double max;
// Shared state of the parameter.
std::shared_ptr<SharedState> state;
};
std::shared_ptr<Parameter> MakeParameter(const string& name,
std::shared_ptr<SharedState> state,
double min, double max);
// Abstract representation of a TensorFlow input pipeline node. It collects
// information about inputs to this node, processing time spent executing the
// node logic, number of elements produced by the node, various other
// information (e.g. batch size or execution parallelism).
//
// Developers of tf.data transformations are not expected to interact with
// this class directly. Boiler plate code for creating the abstract
// representation of the input pipeline and collecting common information has
// been added to the implementation of `DatasetBase` and `DatasetBaseIterator`
// respectively.
//
// In addition, `DatasetBaseIterator` provides wrappers that can be used for
// transformation-specific information collection. The `SetMetadata` wrapper
// can be used to pass arbitrary metadata to the modeling framework, while the
// `StartWork` and `StopWork` wrappers should be used to correctly account for
// processing time of multi-threaded transformation that yield the CPU; such
// transformations should invoke `StartWork()` when a transformation thread
// starts executing (e.g. when created or woken up) and `StopWork()` when a
// transformation thread stops executing (e.g. when returning or waiting).
class Node {
public:
// Arguments for `Node` constructor.
struct Args {
int64 id;
string name;
std::shared_ptr<Node> output;
};
using Factory = std::function<std::shared_ptr<Node>(Args)>;
explicit Node(Args args)
: id_(args.id), name_(args.name), output_(args.output.get()) {}
virtual ~Node() {}
// Increments the bytes buffered by the given delta.
void add_buffered_bytes(int64 delta) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
buffered_bytes_ += delta;
}
// Adds an input.
void add_input(std::shared_ptr<Node> node) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
inputs_.push_back(node);
}
// Increments the aggregate processing time by the given delta.
void add_processing_time(int64 delta) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
processing_time_ += delta;
}
// Returns an indication whether autotuning is enabled for this node.
bool autotune() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return autotune_;
}
// Returns the number of bytes stored in this node's buffer.
int64 buffered_bytes() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return buffered_bytes_;
}
// Indicates whether the node has tunable parameters.
bool has_tunable_parameters() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
for (const auto& pair : parameters_) {
if (pair.second->state->tunable) return true;
}
return false;
}
// Returns the unique node ID.
int64 id() const LOCKS_EXCLUDED(mu_) { return id_; }
// Returns the node inputs.
std::list<std::shared_ptr<Node>> inputs() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return inputs_;
}
// Returns a longer node name that is guaranteed to be unique.
string long_name() const { return strings::StrCat(name_, "(id:", id_, ")"); }
// Returns the node name.
const string& name() const { return name_; }
// Returns the number of elements produced by the node.
int64 num_elements() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return num_elements_;
}
// Returns the node output.
Node* output() const { return output_; }
// Returns the aggregate processing time.
int64 processing_time() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return processing_time_;
}
// Records that the node produced an element.
void record_element() LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
num_elements_++;
}
// Records that a node thread has started executing.
void record_start(int64 time_nanos) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
work_start_[std::this_thread::get_id()] = time_nanos;
}
// Records that a node thread has stopped executing.
void record_stop(int64 time_nanos) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
std::thread::id tid = std::this_thread::get_id();
auto iter = work_start_.find(tid);
if (iter != work_start_.end()) {
processing_time_ += time_nanos - iter->second;
work_start_.erase(iter);
} else {
VLOG(1)
<< "Encountered a stop event that was not preceded by a start event.";
}
}
// Removes an input.
void remove_input(std::shared_ptr<Node> input) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
inputs_.remove(input);
}
// Sets the value that determines whether autotuning is enabled for this node.
void set_autotune(bool autotune) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
autotune_ = autotune;
}
// Collects tunable parameters in the subtree rooted in this node.
void CollectTunableParameters(
std::map<string, std::shared_ptr<Parameter>>* parameters) const
LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
if (!autotune_) {
return;
}
for (auto& pair : parameters_) {
if (pair.second->state->tunable) {
parameters->insert(std::make_pair(long_name(), pair.second));
}
}
for (auto& input : inputs_) {
input->CollectTunableParameters(parameters);
}
}
// Returns a human-readable representation of this node.
string DebugString() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
string result;
strings::StrAppend(&result, long_name(), ":\n");
strings::StrAppend(&result, " autotune=", autotune_, "\n");
strings::StrAppend(&result, " buffered_bytes=", buffered_bytes_, "\n");
strings::StrAppend(&result, " processing_time=", processing_time_, "\n");
strings::StrAppend(&result, " num_elements=", num_elements_, "\n");
string inputs;
for (auto& input : inputs_) {
strings::StrAppend(&inputs, input->long_name(), ",");
}
strings::StrAppend(&result, " inputs={", inputs, "}\n");
for (auto& input : inputs_) {
strings::StrAppend(&result, input->DebugString());
}
return result;
}
// Returns the per-element output time for this node and if `gradient` is not
// `nullptr`, collects the gradient of the output time w.r.t. tunable
// parameters of the subtree rooted in this node and the last input time.
double OutputTime(std::vector<double>* input_times,
std::map<string, double>* gradient) const
LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return OutputTimeLocked(input_times, gradient);
}
// Returns a copy of this node, making a deep copy of its inputs and a
// shallow copy of its tunable parameters.
//
// The purpose for this method is to allow the model optimization logic to
// operate over immutable state while allowing concurrent model updates.
std::shared_ptr<Node> Snapshot(std::shared_ptr<Node> output)
LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
std::shared_ptr<Node> result = Clone(output);
{
mutex_lock l2(result->mu_);
result->autotune_ = autotune_;
result->buffered_bytes_ = buffered_bytes_;
result->processing_time_ = processing_time_;
result->num_elements_ = num_elements_;
result->parameters_ = parameters_;
}
for (auto& input : inputs_) {
result->add_input(input->Snapshot(result));
}
return result;
}
// Returns the per-element processing time spent in this node.
double SelfProcessingTime() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return SelfProcessingTimeLocked();
}
// Returns the per-element CPU time spent in the subtree rooted in this node.
// If `processing_times` is not `nullptr`, collects the per-element CPU time
// spent in each node of the subtree.
double TotalProcessingTime(std::map<string, double>* processing_times)
LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return TotalProcessingTimeLocked(processing_times);
}
protected:
// Returns the number of inputs.
int64 num_inputs() const SHARED_LOCKS_REQUIRED(mu_) {
int64 num_inputs = 0;
for (auto& input : inputs_) {
// Inputs for which autotuning is disabled are excluded.
if (input->autotune()) {
++num_inputs;
}
}
return num_inputs;
}
// Creates a clone of this node.
virtual std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const
SHARED_LOCKS_REQUIRED(mu_) = 0;
// Returns the sum of per-element output time for the inputs of this node and
// if `gradient` is not `nullptr`, collects gradients of output times w.r.t.
// tunable parameters and the last input time.
double OutputTimeForInputs(std::vector<double>* input_times,
std::map<string, double>* gradient) const
SHARED_LOCKS_REQUIRED(mu_) {
double sum = 0;
for (auto& input : inputs_) {
// Inputs for which autotuning is disabled are excluded.
if (input->autotune()) {
sum += input->OutputTime(input_times, gradient);
}
}
return sum;
}
// Returns the per-element output time for this node and if `gradient` is not
// `nullptr`, collects the gradient of the output time w.r.t. tunable
// parameters of the subtree rooted in this node and the last input time.
virtual double OutputTimeLocked(std::vector<double>* input_times,
std::map<string, double>* gradient) const
SHARED_LOCKS_REQUIRED(mu_) = 0;
// Returns the sum of per-element processing time for the inputs of this node.
// Processing time for a given input is a weighted combination of a statistic
// based on history of input processing time and the actual time. This is done
// to improve accuracy of processing time estimation for newly created inputs.
// If `processing_times` is not `nullptr`, collects the per-element CPU time
// spent in each input node.
//
// Uniform distribution of per-element processing times across different
// inputs is assumed.
double TotalProcessingTimeForInputs(
std::map<string, double>* processing_times) SHARED_LOCKS_REQUIRED(mu_) {
// If the number of elements produced by an input is smaller than this
// constant, then its processing time is estimated using a weighted average
// of the empirical processing time and processing time history.
constexpr int kNumElementsThreshold = 30;
// Identifies the minimum number of input processing times to collect
// before the processing time history is used as a prior.
constexpr int kCountThreshold = 30;
double sum = 0;
for (auto& input : inputs_) {
// Inputs for which autotuning is disabled are excluded.
if (input->autotune()) {
double input_processing_time =
input->TotalProcessingTime(processing_times);
int64 num_elements = input->num_elements();
if (num_elements < kNumElementsThreshold) {
if (input_processing_time_count_ < kCountThreshold) {
sum += input_processing_time;
} else {
// The fewer elements the input has produced so far, the more weight
// is assigned to the prior to reduce volatility.
double prior_weight = 1.0L / static_cast<double>(2 << num_elements);
double prior =
input_processing_time_sum_ / input_processing_time_count_;
sum += (1.0L - prior_weight) * input_processing_time +
prior_weight * prior;
}
} else {
sum += input_processing_time;
input_processing_time_count_++;
input_processing_time_sum_ += input_processing_time;
}
}
}
return sum;
}
// Returns the per-element processing time spent in this node.
double SelfProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) {
if (num_elements_ == 0) {
return 0;
}
return static_cast<double>(processing_time_) /
static_cast<double>(num_elements_);
}
// Returns the per-element CPU time spent in the subtree rooted in this node.
// If `processing_times` is not `nullptr`, collects the per-element CPU time
// spent in each node of the subtree.
virtual double TotalProcessingTimeLocked(
std::map<string, double>* processing_times)
SHARED_LOCKS_REQUIRED(mu_) = 0;
mutable mutex mu_;
const int64 id_;
const string name_;
// Indicates whether the subtree rooted in this node should be included in
// autotuning. In particular, if this is `false`, then the subtree is excluded
// from computation of output time and processing time.
bool autotune_ GUARDED_BY(mu_) = true;
int64 buffered_bytes_ GUARDED_BY(mu_) = 0;
int64 processing_time_ GUARDED_BY(mu_) = 0;
int64 num_elements_ GUARDED_BY(mu_) = 0;
std::map<std::thread::id, int64> work_start_ GUARDED_BY(mu_);
std::map<string, std::shared_ptr<Parameter>> parameters_ GUARDED_BY(mu_);
// Statistic of inputs processing time history.
double input_processing_time_sum_ = 0.0L;
int64 input_processing_time_count_ = 0;
// Inputs of this node. These can represent an iterator created from the input
// dataset but also other input iterators (e.g. created by the user-defined
// functions of `flat_map` or `interleave`).
std::list<std::shared_ptr<Node>> inputs_ GUARDED_BY(mu_);
// The reference to the output node is not owned so that deletion of a
// node results in recursive deletion of the subtree rooted in the node.
Node* const output_;
};
// InterleaveMany is used to model datasets whose inputs are used to create
// datasets whose elements are then interleaved.
std::shared_ptr<Node> MakeInterleaveManyNode(Node::Args args);
// AsyncInterleaveMany nodes are the asynchronous version of InterleaveMany
// nodes.
std::shared_ptr<Node> MakeAsyncInterleaveManyNode(
Node::Args args, std::vector<std::shared_ptr<Parameter>> parameters);
// KnownMany nodes model datasets that synchronously consume known number of
// input element per output element.
std::shared_ptr<Node> MakeKnownRatioNode(Node::Args args, double ratio);
// AsyncKnownRatio nodes are the asynchronous version of KnownRate nodes.
std::shared_ptr<Node> MakeAsyncKnownRatioNode(
Node::Args args, double ratio,
std::vector<std::shared_ptr<Parameter>> parameters);
// Source nodes represent data sources.
std::shared_ptr<Node> MakeSourceNode(Node::Args args);
// UnknownMany nodes represent datasets that synchronously consume an
// unknown number of input elements per output.
//
// Unlike KnownRatio nodes which expect the ratio between inputs and outputs is
// specified as a parameter, UnknownRatio estimates the ratio empirically.
std::shared_ptr<Node> MakeUnknownRatioNode(Node::Args args);
// Unknown nodes represent datasets for which we do not have a model. It acts
// as pass-through between inputs and output.
std::shared_ptr<Node> MakeUnknownNode(Node::Args args);
// Abstract representation of a TensorFlow input pipeline that can be used
// for collecting runtime information and optimizing performance. It collects
// runtime information about execution of the input pipeline that is used to
// create a performance model, which is in turn used to identify optimal values
// of tunable parameters.
//
// Developers of tf.data transformations are not expected to interact with this
// class directly. Boiler plate code for creating the abstract representation of
// the input pipeline and collecting runtime information has been added to the
// implementation of `DatasetBase` and `DatasetBaseIterator` respectively.
class Model {
public:
using NodeHook = std::function<void(std::shared_ptr<Node>)>;
// Creates a new model.
//
// The `remove_node_hook` argument can be used to specify functionality that
// should be invoked before a node is removed from the model. The hook can be
// used for dependency injection -- to allow the model to invoke functionality
// from modules that it could not depend on statically.
Model(NodeHook remove_node_hook)
: collect_resource_usage_(false),
remove_node_hook_(std::move(remove_node_hook)) {
DCHECK(remove_node_hook_ != nullptr);
}
// Indicates whether to collect resource usage.
bool collect_resource_usage() const { return collect_resource_usage_; }
// Adds a node with the given name and given output.
std::shared_ptr<Node> AddNode(Node::Factory factory, const string& name,
const string& output_name) LOCKS_EXCLUDED(mu_);
// Increments the processing time for the given node..
void AddProcessingTime(const string& name, int64 delta) LOCKS_EXCLUDED(mu_);
// Uses the given algorithm to perform the autotuning optimization.
void Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget)
LOCKS_EXCLUDED(mu_);
// Records that a node has produced an element.
void RecordElement(const string& name) LOCKS_EXCLUDED(mu_);
// Returns the number of elements that the input pipeline has produced.
int64 NumElements(const string& name) LOCKS_EXCLUDED(mu_);
// Records that the given node has started work. If `stop_output` is set, it
// also records that the output of the given node has stopped work.
void RecordStart(const string& name, bool stop_output) LOCKS_EXCLUDED(mu_);
// Records that the given node has stopped work. If `stop_output` is set, it
// also records that the output of the given node has started work.
void RecordStop(const string& name, bool start_output) LOCKS_EXCLUDED(mu_);
// Removes the given node.
void RemoveNode(const string& name) LOCKS_EXCLUDED(mu_);
private:
// Collects tunable parameters in the tree rooted in the given node, returning
// a mapping from a (unique) node name to a tunable parameter.
std::map<string, std::shared_ptr<Parameter>> CollectTunableParameters(
std::shared_ptr<Node> node);
// Collects "essential" parallelism parameters of transformations in the tree
// rooted in the given node. Which parameters are essential is determined by
// comparison the processing time spent in the corresponding transformation
// relative to other transformations. The collected parameters are returned
// as a mapping from a (unique) node name to a parallelism parameter.
std::map<string, std::shared_ptr<Parameter>> CollectEssentialParallelism(
std::shared_ptr<Node> node);
// This optimization algorithm starts by setting all tunable parallelism
// parameters to 1. It then repeatedly identifies the parameter whose increase
// in parallelism decreases the output time the most. This process is repeated
// until all parameters reach their maximum values or the projected output
// time is less than or equal to the processing time needed to produce an
// element divided by CPU budget.
void OptimizeHillClimb(int64 cpu_budget);
// This optimization algorithm starts by setting all tunable parallelism
// parameters to the minimum value. It then improves current parameters by
// making a step in the direction opposite to the gradient of `OutputTime` and
// projecting resulting values on the feasible intervals. Improvement step is
// repeated until either the output time improvement is smaller than threshold
// value or the output time is less than the processing time needed to produce
// an element divided by CPU budget.
void OptimizeGradientDescent(int64 cpu_budget);
// Collects the output time and if `gradient` is not `nullptr`, the output
// time gradient w.r.t. tunable parameters of the subtree rooted in the given
// node and the last input time.
double OutputTime(std::shared_ptr<Node> node,
std::map<string, double>* gradient);
// Collects the processing time for the given node.
double TotalProcessingTime(std::shared_ptr<Node> node);
// Used for coordination between different input pipeline threads. Exclusive
// access is required only when adding or removing nodes. Concurrent access to
// existing nodes is protected by a node mutex.
mutex mu_;
int64 id_counter_ GUARDED_BY(mu_) = 1;
std::shared_ptr<Node> output_ GUARDED_BY(mu_);
std::map<string, std::shared_ptr<Node>> lookup_table_ GUARDED_BY(mu_);
// Indicates whether the modeling framework should collect resource usage
// (e.g. CPU, memory). The logic for collecting this information assumes that
// the collection is not repeatedly disabled and enabled. As a consequence,
// the implementation starts collecting resource usage when it encounters a
// tunable parameter (because the information is used for for tuning the value
// of the parameter) and never stops.
std::atomic<bool> collect_resource_usage_;
// A hook invoked immediately before a node is removed from the model.
const NodeHook remove_node_hook_;
};
} // namespace model
} // namespace data
} // namespace tensorflow
#endif // TENSORFLOW_CORE_FRAMEWORK_MODEL_H_