| /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ==============================================================================*/ |
| |
| #include "tensorflow/core/framework/model.h" |
| |
| #include <memory> |
| |
| #include "absl/time/clock.h" |
| #include "tensorflow/core/lib/strings/str_util.h" |
| |
| namespace tensorflow { |
| namespace data { |
| namespace model { |
| namespace { |
| |
| // Key of the derivative w.r.t. the last input time in the gradient of |
| // `OutputTime`. |
| constexpr char kInputTimeDerivativeKey[] = "last_input_time"; |
| |
| // Wrapper for the square function to reduce verbosity. |
| inline double Square(double x) { return x * x; } |
| |
| // Given the average time between output events (`output_time`), the average |
| // time between input events (`input_time`) and the buffer size, the method |
| // computes the expected time an input event will have to wait. |
| // |
| // The wait time is approximated as the product of the probability the buffer |
| // will be empty and the time it takes to produce an element into the buffer. |
| // |
| // The formula used for computing the probability is derived by modeling the |
| // problem as an M/M/1/K queue |
| // (https://en.wikipedia.org/wiki/Birth%E2%80%93death_process#M/M/1/K_queue). |
| // |
| // Collects derivatives of `ComputeWaitTime` w.r.t `output_time`, `input_time' |
| // and `buffer_size` if the corresponding pointers are not `nullptr`. |
| double ComputeWaitTime(double output_time, double input_time, |
| double buffer_size, double* output_time_derivative, |
| double* input_time_derivative, |
| double* buffer_size_derivative) { |
| // Case 0: either the producer or the consumer are infinitely fast. Wait time |
| // is the time to produce an output. |
| if (output_time == 0 || input_time == 0) { |
| if (output_time_derivative) { |
| *output_time_derivative = 1.0L; |
| } |
| if (input_time_derivative) { |
| *input_time_derivative = 0.0L; |
| } |
| if (buffer_size_derivative) { |
| *buffer_size_derivative = 0.0L; |
| } |
| return output_time; |
| } |
| // Case 1: the consumer is slower than the producer. Wait time is 0 since the |
| // buffer will be full in the long run. |
| if (input_time > output_time) { |
| if (output_time_derivative) { |
| *output_time_derivative = 0.0L; |
| } |
| if (input_time_derivative) { |
| *input_time_derivative = 0.0L; |
| } |
| if (buffer_size_derivative) { |
| *buffer_size_derivative = 0.0L; |
| } |
| return 0; |
| } |
| // Case 2: the consumer and the producer are equally fast. Expected wait time |
| // decreases linearly with the size of the buffer. |
| if (input_time == output_time) { |
| const double p_buffer_empty = 1.0L / (buffer_size + 1.0L); |
| if (output_time_derivative) { |
| *output_time_derivative = p_buffer_empty; |
| } |
| if (input_time_derivative) { |
| *input_time_derivative = 0.0L; |
| } |
| if (buffer_size_derivative) { |
| const double p_buffer_empty_der = -1.0L / Square(buffer_size + 1.0L); |
| *buffer_size_derivative = p_buffer_empty_der * output_time; |
| } |
| return p_buffer_empty * output_time; |
| } |
| // Case 3: the producer is slower than the consumer and neither is infinitely |
| // fast. |
| const double alpha = 1.0L / input_time; |
| const double beta = 1.0L / output_time; |
| const double ratio_pow = std::pow((beta / alpha), (buffer_size + 1.0L)); |
| const double p_buffer_empty = (1.0L - beta / alpha) / (1.0L - ratio_pow); |
| if (output_time_derivative) { |
| *output_time_derivative = |
| (1.0L - ratio_pow - |
| (output_time - input_time) * (buffer_size + 1.0L) * ratio_pow / |
| output_time) / |
| Square(1.0L - ratio_pow); |
| } |
| if (input_time_derivative) { |
| *input_time_derivative = |
| (ratio_pow - 1.0L + |
| (buffer_size + 1.0L) * ratio_pow * (alpha / beta - 1.0L)) / |
| Square(1.0L - ratio_pow); |
| } |
| if (buffer_size_derivative) { |
| const double p_buffer_empty_der = (1.0L - beta / alpha) * ratio_pow * |
| std::log(beta / alpha) / |
| Square(1.0L - ratio_pow); |
| *buffer_size_derivative = p_buffer_empty_der * output_time; |
| } |
| |
| return p_buffer_empty * output_time; |
| } |
| |
| // The first input of InterleaveMany corresponds to the input dataset whose |
| // elements are used to create the (derived) input datasets whose elements are |
| // interleaved as output. |
| // |
| // TODO(jsimsa): model the first input |
| class InterleaveMany : public Node { |
| public: |
| using Node::Node; |
| |
| virtual ~InterleaveMany() {} |
| |
| protected: |
| std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| return std::make_shared<InterleaveMany>( |
| Args{id_, name_, std::move(output)}); |
| } |
| |
| // The output time is the sum of the self processing time and the average |
| // output time of inputs comprising the interleave "cycle". |
| double OutputTimeLocked(std::vector<double>* input_times, |
| std::map<string, double>* gradient) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| if (num_inputs() <= 1) { |
| return SelfProcessingTimeLocked(); |
| } |
| double delta = SelfProcessingTimeLocked() * (num_inputs() - 1); |
| input_times->back() += delta; |
| auto cleanup = gtl::MakeCleanup( |
| [input_times, delta]() { input_times->back() -= delta; }); |
| double output_time; |
| if (gradient) { |
| std::map<string, double> inputs_gradient; |
| output_time = |
| (OutputTimeForInputs(input_times, &inputs_gradient) - |
| inputs_.front()->OutputTime(input_times, /*gradient=*/nullptr)) / |
| static_cast<double>(num_inputs() - 1); |
| for (auto& pair : inputs_gradient) { |
| (*gradient)[pair.first] = |
| pair.second / static_cast<double>(num_inputs() - 1); |
| } |
| auto last_input_time_der = |
| gtl::FindWithDefault(*gradient, kInputTimeDerivativeKey, 0.0L); |
| (*gradient)[kInputTimeDerivativeKey] = |
| last_input_time_der + inputs_gradient[kInputTimeDerivativeKey] / |
| static_cast<double>(num_inputs() - 1); |
| // Set derivatives w.r.t. tunable parameters of the subtree rooted in the |
| // first input equal to 0 since its output time is excluded from |
| // computations. |
| std::map<string, std::shared_ptr<Parameter>> first_input_parameters; |
| inputs_.front()->CollectTunableParameters(&first_input_parameters); |
| for (auto& pair : first_input_parameters) { |
| (*gradient)[pair.first] = 0.0L; |
| } |
| } else { |
| output_time = |
| (OutputTimeForInputs(input_times, /*gradient=*/nullptr) - |
| inputs_.front()->OutputTime(input_times, /*gradient=*/nullptr)) / |
| static_cast<double>(num_inputs() - 1); |
| } |
| return SelfProcessingTimeLocked() + output_time; |
| } |
| |
| // The processing time is the sum of the self processing time and the average |
| // processing time of inputs comprising the interleave "cycle". |
| double TotalProcessingTimeLocked(std::map<string, double>* processing_times) |
| override SHARED_LOCKS_REQUIRED(mu_) { |
| double self_processing_time = SelfProcessingTimeLocked(); |
| if (processing_times) { |
| (*processing_times)[long_name()] = self_processing_time; |
| } |
| if (num_inputs() <= 1) { |
| return self_processing_time; |
| } |
| double processing_time = |
| (TotalProcessingTimeForInputs(processing_times) - |
| inputs_.front()->TotalProcessingTime(/*processing_times=*/nullptr)) / |
| static_cast<double>(num_inputs() - 1); |
| return self_processing_time + processing_time; |
| } |
| }; |
| |
| // The first input of AsyncInterleaveMany corresponds to the input dataset whose |
| // elements are used to create the (derived) input datasets whose elements are |
| // interleaved as output. |
| // |
| // TODO(jsimsa): model the first input |
| class AsyncInterleaveMany : public Node { |
| public: |
| AsyncInterleaveMany(Node::Args args, |
| std::vector<std::shared_ptr<Parameter>> parameters) |
| : Node(args) { |
| for (auto& parameter : parameters) { |
| parameters_[parameter->name] = std::move(parameter); |
| } |
| } |
| |
| virtual ~AsyncInterleaveMany() {} |
| |
| protected: |
| std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| std::vector<std::shared_ptr<Parameter>> parameters; |
| for (auto& pair : parameters_) { |
| parameters.push_back(pair.second); |
| } |
| return std::make_shared<AsyncInterleaveMany>( |
| Args{id_, name_, std::move(output)}, parameters); |
| } |
| |
| // The output time is estimated using `ComputeWaitTime(output_time, |
| // input_time, parallelism, ...)`, where `output_time` is the sum of the |
| // self-processing time and the average output time of inputs comprising the |
| // interleave "cycle", `input_time` is specified through `input_times` and |
| // `buffer_size` is derived from parallelism. |
| double OutputTimeLocked(std::vector<double>* input_times, |
| std::map<string, double>* gradient) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| if (num_inputs() <= 1) { |
| return SelfProcessingTimeLocked(); |
| } |
| double old_input_time = input_times->back(); |
| double new_input_time = |
| SelfProcessingTimeLocked() * static_cast<double>(num_inputs() - 1); |
| input_times->push_back(new_input_time); |
| auto cleanup = |
| gtl::MakeCleanup([input_times]() { input_times->pop_back(); }); |
| double parallelism = num_inputs() - 1; // default to cycle length |
| auto* parameter = gtl::FindOrNull(parameters_, "parallelism"); |
| if (parameter) { |
| parallelism = std::min(parallelism, (*parameter)->value); |
| } |
| if (gradient) { |
| std::map<string, double> inputs_gradient; |
| double output_time_for_inputs = |
| OutputTimeForInputs(input_times, &inputs_gradient) - |
| inputs_.front()->OutputTime(input_times, /*gradient=*/nullptr); |
| double output_time = output_time_for_inputs / |
| static_cast<double>(num_inputs() - 1) / parallelism; |
| double output_time_der = 0.0L; |
| double input_time_der = 0.0L; |
| double buffer_size_der = 0.0L; |
| double result = ComputeWaitTime( |
| SelfProcessingTimeLocked() + output_time, old_input_time, parallelism, |
| &output_time_der, &input_time_der, &buffer_size_der); |
| auto last_input_time_der = |
| gtl::FindWithDefault(*gradient, kInputTimeDerivativeKey, 0.0L); |
| (*gradient)[kInputTimeDerivativeKey] = |
| last_input_time_der + input_time_der; |
| double parallelism_der = -output_time_for_inputs / |
| static_cast<double>(num_inputs() - 1) / |
| Square(parallelism); |
| for (auto& pair : inputs_gradient) { |
| if (pair.first != kInputTimeDerivativeKey) { |
| (*gradient)[pair.first] = output_time_der * pair.second / |
| static_cast<double>(num_inputs() - 1) / |
| parallelism; |
| } |
| } |
| // Set derivatives w.r.t. tunable parameters of the subtree rooted in the |
| // first input equal to 0 since its output time is excluded from |
| // computations. |
| std::map<string, std::shared_ptr<Parameter>> first_input_parameters; |
| inputs_.front()->CollectTunableParameters(&first_input_parameters); |
| for (auto& pair : first_input_parameters) { |
| (*gradient)[pair.first] = 0.0L; |
| } |
| // Add derivative w.r.t. own parallelism parameter. |
| if (parameter && (*parameter)->state->tunable) { |
| (*gradient)[long_name()] = |
| output_time_der * parallelism_der + buffer_size_der; |
| } |
| return result; |
| } |
| double output_time = |
| (OutputTimeForInputs(input_times, /*gradient=*/nullptr) - |
| inputs_.front()->OutputTime(input_times, /*gradient=*/nullptr)) / |
| static_cast<double>(num_inputs() - 1) / parallelism; |
| return ComputeWaitTime( |
| SelfProcessingTimeLocked() + output_time, old_input_time, parallelism, |
| /*output_time_derivative=*/nullptr, |
| /*input_time_derivative=*/nullptr, /*buffer_size_derivative=*/nullptr); |
| } |
| |
| // The processing time is the sum of the self processing time and the average |
| // processing time of inputs comprising the interleave "cycle". |
| double TotalProcessingTimeLocked(std::map<string, double>* processing_times) |
| override SHARED_LOCKS_REQUIRED(mu_) { |
| double self_processing_time = SelfProcessingTimeLocked(); |
| if (processing_times) { |
| (*processing_times)[long_name()] = self_processing_time; |
| } |
| if (num_inputs() <= 1) { |
| return self_processing_time; |
| } |
| double processing_time = |
| TotalProcessingTimeForInputs(processing_times) - |
| inputs_.front()->TotalProcessingTime(/*processing_times=*/nullptr); |
| return self_processing_time + |
| processing_time / static_cast<double>(num_inputs() - 1); |
| } |
| }; |
| |
| class KnownRatio : public Node { |
| public: |
| KnownRatio(Node::Args args, int64 ratio) : Node(args), ratio_(ratio) {} |
| |
| virtual ~KnownRatio() {} |
| |
| protected: |
| std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| return std::make_shared<KnownRatio>(Args{id_, name_, std::move(output)}, |
| ratio_); |
| } |
| |
| // The output time is the sum of the self processing time and the product of |
| // `ratio_` and the sum of output times of inputs. |
| double OutputTimeLocked(std::vector<double>* input_times, |
| std::map<string, double>* gradient) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| if (ratio_ == 0) { |
| return SelfProcessingTimeLocked(); |
| } |
| double old_input_time = input_times->back(); |
| input_times->back() += |
| (old_input_time + SelfProcessingTimeLocked()) / ratio_; |
| auto cleanup = gtl::MakeCleanup([input_times, old_input_time]() { |
| input_times->back() = old_input_time; |
| }); |
| double result; |
| if (gradient) { |
| std::map<string, double> inputs_gradient; |
| result = SelfProcessingTimeLocked() + |
| ratio_ * OutputTimeForInputs(input_times, &inputs_gradient); |
| auto last_input_time_der = |
| gtl::FindWithDefault(*gradient, kInputTimeDerivativeKey, 0.0L); |
| (*gradient)[kInputTimeDerivativeKey] = |
| last_input_time_der + ratio_ * |
| inputs_gradient[kInputTimeDerivativeKey] * |
| (1.0L + 1.0L / ratio_); |
| for (auto& pair : inputs_gradient) { |
| if (pair.first != kInputTimeDerivativeKey) { |
| (*gradient)[pair.first] = pair.second * ratio_; |
| } |
| } |
| } else { |
| result = SelfProcessingTimeLocked() + |
| ratio_ * OutputTimeForInputs(input_times, /*gradient=*/nullptr); |
| } |
| return result; |
| } |
| |
| // The processing time is the sum of the self processing time and the product |
| // of `ratio_` and the sum of processing times of inputs. |
| double TotalProcessingTimeLocked(std::map<string, double>* processing_times) |
| override SHARED_LOCKS_REQUIRED(mu_) { |
| double self_processing_time = SelfProcessingTimeLocked(); |
| if (processing_times) { |
| (*processing_times)[long_name()] = self_processing_time; |
| } |
| return self_processing_time + |
| ratio_ * TotalProcessingTimeForInputs(processing_times); |
| } |
| |
| private: |
| const double ratio_; |
| }; |
| |
| class AsyncKnownRatio : public Node { |
| public: |
| AsyncKnownRatio(Node::Args args, double ratio, |
| std::vector<std::shared_ptr<Parameter>> parameters) |
| : Node(args), ratio_(ratio) { |
| for (auto& parameter : parameters) { |
| parameters_[parameter->name] = std::move(parameter); |
| } |
| } |
| |
| virtual ~AsyncKnownRatio() {} |
| |
| protected: |
| std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| std::vector<std::shared_ptr<Parameter>> parameters; |
| for (auto& pair : parameters_) { |
| parameters.push_back(pair.second); |
| } |
| return std::make_shared<AsyncKnownRatio>( |
| Args{id_, name_, std::move(output)}, ratio_, parameters); |
| } |
| |
| // The output time is estimated using `ComputeWaitTime(output_time, |
| // input_time, parallelism, ...)`, where `output_time` is the sum of the self |
| // processing time and the product of `ratio_` and the sum of output times of |
| // inputs, `input_time` is specified through `input_times` and `buffer_size` |
| // is derived from parallelism. |
| double OutputTimeLocked(std::vector<double>* input_times, |
| std::map<string, double>* gradient) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| double parallelism = 1.0; |
| auto* parameter = gtl::FindOrNull(parameters_, "parallelism"); |
| if (parameter) { |
| parallelism = (*parameter)->value; |
| } |
| double self_processing_time = SelfProcessingTimeLocked(); |
| if (ratio_ == 0.0) { |
| double output_time = self_processing_time / parallelism; |
| if (gradient) { |
| double output_time_der = 0.0L; |
| double input_time_der = 0.0L; |
| double buffer_size_der = 0.0L; |
| double result = ComputeWaitTime(output_time, input_times->back(), |
| parallelism, &output_time_der, |
| &input_time_der, &buffer_size_der); |
| auto last_input_time_der = |
| gtl::FindWithDefault(*gradient, kInputTimeDerivativeKey, 0.0L); |
| (*gradient)[kInputTimeDerivativeKey] = |
| last_input_time_der + input_time_der; |
| // Add derivative w.r.t. own parallelism parameter. |
| if (parameter && (*parameter)->state->tunable) { |
| (*gradient)[long_name()] = |
| -output_time_der * self_processing_time / Square(parallelism) + |
| buffer_size_der; |
| } |
| return result; |
| } |
| return ComputeWaitTime(output_time, input_times->back(), parallelism, |
| /*output_time_derivative=*/nullptr, |
| /*input_time_derivative=*/nullptr, |
| /*buffer_size_derivative=*/nullptr); |
| } |
| double old_input_time = input_times->back(); |
| double new_input_time = self_processing_time / ratio_ / parallelism; |
| input_times->push_back(new_input_time); |
| auto cleanup = |
| gtl::MakeCleanup([input_times]() { input_times->pop_back(); }); |
| if (gradient) { |
| std::map<string, double> inputs_gradient; |
| double output_time_der = 0.0L; |
| double input_time_der = 0.0L; |
| double buffer_size_der = 0.0L; |
| double output_time = |
| self_processing_time / parallelism + |
| ratio_ * OutputTimeForInputs(input_times, &inputs_gradient); |
| double result = |
| ComputeWaitTime(output_time, old_input_time, parallelism, |
| &output_time_der, &input_time_der, &buffer_size_der); |
| auto last_input_time_der = |
| gtl::FindWithDefault(*gradient, kInputTimeDerivativeKey, 0.0L); |
| (*gradient)[kInputTimeDerivativeKey] = |
| last_input_time_der + input_time_der; |
| for (auto& pair : inputs_gradient) { |
| if (pair.first != kInputTimeDerivativeKey) { |
| (*gradient)[pair.first] = pair.second * ratio_ * output_time_der; |
| } |
| } |
| // Add derivative w.r.t. own parallelism parameter. |
| if (parameter && (*parameter)->state->tunable) { |
| (*gradient)[long_name()] = |
| -output_time_der * self_processing_time / Square(parallelism) + |
| buffer_size_der - |
| output_time_der * inputs_gradient[kInputTimeDerivativeKey] * |
| self_processing_time / Square(parallelism); |
| } |
| return result; |
| } |
| double output_time = |
| self_processing_time / parallelism + |
| ratio_ * OutputTimeForInputs(input_times, /*gradient=*/nullptr); |
| return ComputeWaitTime(output_time, old_input_time, parallelism, |
| /*output_time_derivative=*/nullptr, |
| /*input_time_derivative=*/nullptr, |
| /*buffer_size_derivative=*/nullptr); |
| } |
| |
| // The processing time is the sum of the self processing time and the product |
| // of `ratio_` and the sum of processing times of inputs. |
| double TotalProcessingTimeLocked(std::map<string, double>* processing_times) |
| override SHARED_LOCKS_REQUIRED(mu_) { |
| double self_processing_time = SelfProcessingTimeLocked(); |
| if (processing_times) { |
| (*processing_times)[long_name()] = self_processing_time; |
| } |
| return self_processing_time + |
| ratio_ * TotalProcessingTimeForInputs(processing_times); |
| } |
| |
| private: |
| const double ratio_; |
| }; |
| |
| class UnknownRatio : public Node { |
| public: |
| using Node::Node; |
| |
| virtual ~UnknownRatio() {} |
| |
| protected: |
| std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| return std::make_shared<UnknownRatio>(Args{id_, name_, std::move(output)}); |
| } |
| |
| // The output time is the sum of the self processing time and the product of |
| // the ratio estimate and the sum of output times of inputs. |
| double OutputTimeLocked(std::vector<double>* input_times, |
| std::map<string, double>* gradient) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| if (num_elements_ == 0 || inputs_.empty() || |
| inputs_.front()->num_elements() == 0) { |
| return SelfProcessingTimeLocked(); |
| } |
| // TODO(jsimsa): The current implementation assumes that the number of input |
| // elements consumed per output is the same across all inputs. |
| std::shared_ptr<Node> input = inputs_.front(); |
| double ratio = static_cast<double>(input->num_elements()) / |
| static_cast<double>(num_elements_); |
| double old_input_time = input_times->back(); |
| input_times->back() = (old_input_time + SelfProcessingTimeLocked()) / ratio; |
| auto cleanup = gtl::MakeCleanup([input_times, old_input_time]() { |
| input_times->back() = old_input_time; |
| }); |
| if (gradient) { |
| std::map<string, double> inputs_gradient; |
| double result = |
| SelfProcessingTimeLocked() + |
| ratio * OutputTimeForInputs(input_times, &inputs_gradient); |
| auto last_input_time_der = |
| gtl::FindWithDefault(*gradient, kInputTimeDerivativeKey, 0.0L); |
| (*gradient)[kInputTimeDerivativeKey] = |
| last_input_time_der + |
| inputs_gradient[kInputTimeDerivativeKey] / ratio; |
| for (auto& pair : inputs_gradient) { |
| if (pair.first != kInputTimeDerivativeKey) { |
| (*gradient)[pair.first] = pair.second * ratio; |
| } |
| } |
| return result; |
| } |
| return SelfProcessingTimeLocked() + |
| ratio * OutputTimeForInputs(input_times, /*gradient=*/nullptr); |
| } |
| |
| // The processing time is the sum of the self processing time and the product |
| // of the ratio estimate and the sum of processing times of inputs. |
| double TotalProcessingTimeLocked(std::map<string, double>* processing_times) |
| override SHARED_LOCKS_REQUIRED(mu_) { |
| double self_processing_time = SelfProcessingTimeLocked(); |
| if (processing_times) { |
| (*processing_times)[long_name()] = self_processing_time; |
| } |
| if (inputs_.empty() || num_elements_ == 0) { |
| return self_processing_time; |
| } |
| // TODO(jsimsa): The current implementation assumes that the number of input |
| // elements consumed per output is the same across all inputs. |
| std::shared_ptr<Node> input = inputs_.front(); |
| double ratio = static_cast<double>(input->num_elements()) / |
| static_cast<double>(num_elements_); |
| return self_processing_time + |
| ratio * TotalProcessingTimeForInputs(processing_times); |
| } |
| }; |
| |
| class Unknown : public Node { |
| public: |
| using Node::Node; |
| |
| virtual ~Unknown() {} |
| |
| protected: |
| std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| return std::make_shared<Unknown>(Args{id_, name_, std::move(output)}); |
| } |
| |
| // The output time is the sum of output times of inputs. |
| double OutputTimeLocked(std::vector<double>* input_times, |
| std::map<string, double>* gradient) const override |
| SHARED_LOCKS_REQUIRED(mu_) { |
| return OutputTimeForInputs(input_times, gradient); |
| } |
| |
| // The processing time is the sum of processing times of inputs. |
| double TotalProcessingTimeLocked(std::map<string, double>* processing_times) |
| override SHARED_LOCKS_REQUIRED(mu_) { |
| return TotalProcessingTimeForInputs(processing_times); |
| } |
| }; |
| |
| } // namespace |
| |
| std::shared_ptr<Parameter> MakeParameter(const string& name, |
| std::shared_ptr<SharedState> state, |
| double min, double max) { |
| return std::make_shared<Parameter>(name, state, min, max); |
| } |
| |
| std::shared_ptr<Node> MakeInterleaveManyNode(Node::Args args) { |
| return std::make_shared<InterleaveMany>(std::move(args)); |
| } |
| |
| std::shared_ptr<Node> MakeAsyncInterleaveManyNode( |
| Node::Args args, std::vector<std::shared_ptr<Parameter>> parameters) { |
| return std::make_shared<AsyncInterleaveMany>(std::move(args), |
| std::move(parameters)); |
| } |
| |
| std::shared_ptr<Node> MakeKnownRatioNode(Node::Args args, double ratio) { |
| return std::make_shared<KnownRatio>(std::move(args), ratio); |
| } |
| |
| std::shared_ptr<Node> MakeAsyncKnownRatioNode( |
| Node::Args args, double ratio, |
| std::vector<std::shared_ptr<Parameter>> parameters) { |
| return std::make_shared<AsyncKnownRatio>(std::move(args), ratio, |
| std::move(parameters)); |
| } |
| |
| std::shared_ptr<Node> MakeSourceNode(Node::Args args) { |
| return MakeKnownRatioNode(std::move(args), 0); |
| } |
| |
| std::shared_ptr<Node> MakeUnknownRatioNode(Node::Args args) { |
| return std::make_shared<UnknownRatio>(std::move(args)); |
| } |
| |
| std::shared_ptr<Node> MakeUnknownNode(Node::Args args) { |
| return std::make_shared<Unknown>(std::move(args)); |
| } |
| |
| std::shared_ptr<Node> Model::AddNode(Node::Factory factory, const string& name, |
| const string& output_name) { |
| // The name captures the sequence of iterators joined by `::`. We use the full |
| // sequence as the key in the lookup table, but only the last element of the |
| // sequence as the name node. |
| std::vector<string> tokens = |
| str_util::Split(name, ':', str_util::SkipEmpty()); |
| // The output name might contain an index. We need to strip it to make it |
| // possible for the model to successfully identify the output node. |
| string sanitized_output_name = output_name; |
| if (str_util::EndsWith(output_name, "]")) { |
| sanitized_output_name = output_name.substr(0, output_name.rfind('[')); |
| } |
| std::shared_ptr<Node> output; |
| mutex_lock l(mu_); |
| auto it = lookup_table_.find(sanitized_output_name); |
| if (it != lookup_table_.end()) { |
| output = it->second; |
| } |
| std::shared_ptr<Node> node = factory({id_counter_++, tokens.back(), output}); |
| if (!output_) { |
| output_ = node; |
| } |
| if (output) { |
| VLOG(3) << "Adding " << node->long_name() << " as input for " |
| << output->long_name(); |
| output->add_input(node); |
| } else { |
| VLOG(3) << "Adding " << node->long_name(); |
| } |
| collect_resource_usage_ = |
| collect_resource_usage_ || node->has_tunable_parameters(); |
| lookup_table_.insert(std::make_pair(name, node)); |
| return node; |
| } |
| |
| void Model::AddProcessingTime(const string& name, int64 delta) { |
| tf_shared_lock l(mu_); |
| auto node = gtl::FindOrNull(lookup_table_, name); |
| if (node) { |
| (*node)->add_processing_time(delta); |
| } |
| } |
| |
| void Model::Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget) { |
| switch (algorithm) { |
| case AutotuneAlgorithm::HILL_CLIMB: |
| OptimizeHillClimb(cpu_budget); |
| break; |
| case AutotuneAlgorithm::GRADIENT_DESCENT: |
| OptimizeGradientDescent(cpu_budget); |
| break; |
| } |
| } |
| |
| void Model::RecordElement(const string& name) { |
| tf_shared_lock l(mu_); |
| auto node = gtl::FindOrNull(lookup_table_, name); |
| if (node) { |
| (*node)->record_element(); |
| } |
| } |
| |
| int64 Model::NumElements(const string& name) { |
| tf_shared_lock l(mu_); |
| auto node = gtl::FindOrNull(lookup_table_, name); |
| if (node) { |
| return (*node)->num_elements(); |
| } |
| return 0; |
| } |
| |
| void Model::RecordStart(const string& name, bool stop_output) { |
| tf_shared_lock l(mu_); |
| auto node = gtl::FindOrNull(lookup_table_, name); |
| if (collect_resource_usage_ && node) { |
| int64 now_nanos = absl::GetCurrentTimeNanos(); |
| if (stop_output && (*node)->output()) { |
| (*node)->output()->record_stop(now_nanos); |
| } |
| (*node)->record_start(now_nanos); |
| } |
| } |
| |
| void Model::RecordStop(const string& name, bool start_output) { |
| tf_shared_lock l(mu_); |
| auto node = gtl::FindOrNull(lookup_table_, name); |
| if (collect_resource_usage_ && node) { |
| int64 now_nanos = absl::GetCurrentTimeNanos(); |
| (*node)->record_stop(now_nanos); |
| if (start_output && (*node)->output()) { |
| (*node)->output()->record_start(now_nanos); |
| } |
| } |
| } |
| |
| void Model::RemoveNode(const string& name) { |
| mutex_lock l(mu_); |
| auto node = gtl::FindOrNull(lookup_table_, name); |
| if (node) { |
| if ((*node)->output()) { |
| (*node)->output()->remove_input(*node); |
| } |
| VLOG(3) << "Removing " << (*node)->long_name(); |
| remove_node_hook_(*node); |
| } |
| lookup_table_.erase(name); |
| } |
| |
| std::map<string, std::shared_ptr<Parameter>> Model::CollectTunableParameters( |
| std::shared_ptr<Node> node) { |
| std::map<string, std::shared_ptr<Parameter>> parameters; |
| node->CollectTunableParameters(¶meters); |
| return parameters; |
| } |
| |
| std::map<string, std::shared_ptr<Parameter>> Model::CollectEssentialParallelism( |
| std::shared_ptr<Node> node) { |
| // Parallelism parameter is considered to be essential if the coressponding |
| // transformations's processing time is greater than essential rate times the |
| // average transformation self processing time. |
| constexpr double kEssentialRate = 0.3L; |
| |
| std::map<string, std::shared_ptr<Parameter>> parameters; |
| node->CollectTunableParameters(¶meters); |
| std::map<string, double> processing_times; |
| double processing_time = node->TotalProcessingTime(&processing_times); |
| double uniform_share = |
| processing_time / static_cast<double>(processing_times.size()); |
| std::map<string, std::shared_ptr<Parameter>> essential_parameters; |
| for (auto& pair : parameters) { |
| if (processing_times[pair.first] > kEssentialRate * uniform_share) { |
| essential_parameters.insert(pair); |
| } |
| } |
| return essential_parameters; |
| } |
| |
| void Model::OptimizeGradientDescent(int64 cpu_budget) { |
| std::shared_ptr<Node> snapshot; |
| { |
| tf_shared_lock lock(mu_); |
| snapshot = output_->Snapshot(nullptr); |
| } |
| VLOG(2) << "Starting optimization of tunable parameters with GradientDescent"; |
| auto parameters = CollectTunableParameters(snapshot); |
| auto essential_parameters = CollectEssentialParallelism(snapshot); |
| for (auto& pair : parameters) { |
| pair.second->value = pair.second->min; |
| } |
| // Gradient descent step size. |
| constexpr double kDescentStep = 0.1L; |
| |
| // Optimization is stopped once the `OutputTime` improvement is smaller than |
| // this value. |
| constexpr double kOptimizationPrecision = 100.0L; |
| |
| // Maximum number of iterations for optimization. |
| constexpr int64 kMaxIterations = 1000; |
| |
| double output_time = 0; |
| double new_output_time; |
| double new_value; |
| for (int i = 0; i < kMaxIterations; ++i) { |
| std::map<string, double> gradient; |
| new_output_time = OutputTime(snapshot, &gradient); |
| int64 model_parallelism = 0; |
| for (auto& pair : essential_parameters) { |
| model_parallelism += std::round(pair.second->value); |
| } |
| // We terminate once the improvement of the output latency is too small or |
| // the essential transformations' parallelism reaches the CPU budget. |
| if (std::abs(output_time - new_output_time) < kOptimizationPrecision || |
| model_parallelism > cpu_budget) { |
| break; |
| } |
| double max_abs_derivative = 1.0; |
| for (auto& pair : parameters) { |
| if (pair.second->value != pair.second->max) { |
| max_abs_derivative = |
| std::max(max_abs_derivative, std::abs(gradient[pair.first])); |
| } |
| } |
| for (auto& pair : parameters) { |
| new_value = pair.second->value - |
| kDescentStep * gradient[pair.first] / max_abs_derivative; |
| // Projection on a feasible interval. |
| if (new_value > pair.second->max) { |
| pair.second->value = pair.second->max; |
| } else if (new_value < pair.second->min) { |
| pair.second->value = pair.second->min; |
| } else { |
| pair.second->value = new_value; |
| } |
| } |
| output_time = new_output_time; |
| } |
| VLOG(2) << "Number of tunable parameters: " << parameters.size(); |
| for (auto& pair : parameters) { |
| pair.second->value = std::round(pair.second->value); |
| auto& parameter = pair.second; |
| VLOG(2) << "Setting tunable parameter " << pair.first << " to " |
| << parameter->value; |
| mutex_lock l(*parameter->state->mu); |
| parameter->state->value = parameter->value; |
| parameter->state->cond_var->notify_all(); |
| } |
| } |
| |
| void Model::OptimizeHillClimb(int64 cpu_budget) { |
| std::shared_ptr<Node> snapshot; |
| { |
| tf_shared_lock lock(mu_); |
| snapshot = output_->Snapshot(nullptr); |
| } |
| VLOG(2) << "Starting optimization of tunable parameters with HillClimb"; |
| const double processing_time = TotalProcessingTime(snapshot); |
| auto parameters = CollectTunableParameters(snapshot); |
| for (auto& pair : parameters) { |
| pair.second->value = 1; |
| } |
| while (true) { |
| const double output_time = OutputTime(snapshot, /*gradient=*/nullptr); |
| bool all_max = true; |
| for (auto& pair : parameters) { |
| if (pair.second->value < pair.second->max) { |
| all_max = false; |
| break; |
| } |
| } |
| if (output_time < processing_time / cpu_budget || all_max) { |
| break; |
| } |
| double best_delta = -1.0L; |
| Parameter* best_parameter = nullptr; |
| for (auto& pair : parameters) { |
| if (pair.second->value == pair.second->max) { |
| continue; |
| } |
| pair.second->value++; |
| double new_output_time = OutputTime(snapshot, /*gradient=*/nullptr); |
| double delta = output_time - new_output_time; |
| if (delta > best_delta) { |
| best_delta = delta; |
| best_parameter = pair.second.get(); |
| } |
| pair.second->value--; |
| } |
| if (!best_parameter) { |
| LOG(WARNING) << "Failed to find a tunable parameter that would " |
| "decrease the output time. This means that the " |
| "autotuning optimization got stuck in a local maximum. " |
| "The optimization attempt will be aborted."; |
| return; |
| } |
| best_parameter->value++; |
| } |
| VLOG(2) << "Number of tunable parameters: " << parameters.size(); |
| for (auto& pair : parameters) { |
| auto& parameter = pair.second; |
| VLOG(2) << "Setting tunable parameter " << pair.first << " to " |
| << parameter->value; |
| mutex_lock l(*parameter->state->mu); |
| parameter->state->value = parameter->value; |
| parameter->state->cond_var->notify_all(); |
| } |
| } |
| |
| double Model::OutputTime(std::shared_ptr<Node> node, |
| std::map<string, double>* gradient) { |
| std::vector<double> input_times(1, 0); |
| // TODO(jsimsa): Now that we are accounting for buffer size in wait time |
| // computation, assuming that the input is infinitely fast will result in |
| // inaccurate estimates of the output latency. |
| // |
| // We should compute the output latency as a fix-point of the following |
| // equation: `output_time = node(OutputTime(input_times(1, output_time))`. |
| return node->OutputTime(&input_times, gradient); |
| } |
| |
| double Model::TotalProcessingTime(std::shared_ptr<Node> node) { |
| return node->TotalProcessingTime(/*processing_times=*/nullptr); |
| } |
| |
| } // namespace model |
| } // namespace data |
| } // namespace tensorflow |