blob: 84838a101b666527b06c346f260a8066537cb4cf [file] [log] [blame]
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/data/single_threaded_executor.h"
#include <algorithm>
#include "tensorflow/core/common_runtime/device.h"
#include "tensorflow/core/common_runtime/device_factory.h"
#include "tensorflow/core/common_runtime/executor.h"
#include "tensorflow/core/common_runtime/kernel_benchmark_testlib.h"
#include "tensorflow/core/common_runtime/process_util.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/rendezvous.h"
#include "tensorflow/core/framework/versions.pb.h"
#include "tensorflow/core/graph/algorithm.h"
#include "tensorflow/core/graph/graph_constructor.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/random/simple_philox.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/test.h"
#include "tensorflow/core/platform/test_benchmark.h"
#include "tensorflow/core/platform/tracing.h"
#include "tensorflow/core/public/session_options.h"
namespace tensorflow {
namespace data {
namespace {
class ExecutorTest : public ::testing::Test {
protected:
ExecutorTest()
: device_(DeviceFactory::NewDevice("CPU", {},
"/job:localhost/replica:0/task:0")) {}
~ExecutorTest() override {
// There should always be exactly one Ref left on the Rendezvous
// when the test completes.
CHECK(rendez_->Unref());
delete exec_;
}
// Resets executor_ with a new executor based on a graph 'gdef'.
void Create(std::unique_ptr<const Graph> graph) {
const int version = graph->versions().producer();
LocalExecutorParams params;
params.device = device_.get();
params.create_kernel = [this, version](const NodeDef& ndef,
OpKernel** kernel) {
return CreateNonCachedKernel(device_.get(), nullptr, ndef, version,
kernel);
};
params.delete_kernel = [](OpKernel* kernel) {
DeleteNonCachedKernel(kernel);
};
delete exec_;
TF_CHECK_OK(NewSingleThreadedExecutor(params, *graph, &exec_));
runner_ = [](const std::function<void()> fn) { fn(); };
rendez_ = NewLocalRendezvous();
}
Status Run(Rendezvous* rendez) {
Executor::Args args;
args.rendezvous = rendez;
args.runner = runner_;
return exec_->Run(args);
}
Status Run(CallFrameInterface* call_frame) {
Executor::Args args;
args.call_frame = call_frame;
args.runner = runner_;
return exec_->Run(args);
}
std::unique_ptr<Device> device_;
Executor* exec_ = nullptr;
Executor::Args::Runner runner_;
Rendezvous* rendez_ = nullptr;
};
// A float val -> Tensor<float>
Tensor V(const float val) {
Tensor tensor(DT_FLOAT, TensorShape({}));
tensor.scalar<float>()() = val;
return tensor;
}
// A int32 val -> Tensor<int32>
Tensor VI(const int32 val) {
Tensor tensor(DT_INT32, TensorShape({}));
tensor.scalar<int32>()() = val;
return tensor;
}
// A bool val -> Tensor<bool>
Tensor VB(const bool val) {
Tensor tensor(DT_BOOL, TensorShape({}));
tensor.scalar<bool>()() = val;
return tensor;
}
// A double val -> Tensor<double>
Tensor VD(const double val) {
Tensor tensor(DT_DOUBLE, TensorShape({}));
tensor.scalar<double>()() = val;
return tensor;
}
// Tensor<float> -> a float val.
float V(const Tensor& tensor) {
CHECK_EQ(tensor.dtype(), DT_FLOAT);
CHECK(TensorShapeUtils::IsScalar(tensor.shape()));
return tensor.scalar<float>()();
}
Rendezvous::ParsedKey Key(const string& sender, const uint64 incarnation,
const string& receiver, const string& name) {
Rendezvous::ParsedKey result;
TF_CHECK_OK(
Rendezvous::ParseKey(Rendezvous::CreateKey(sender, incarnation, receiver,
name, FrameAndIter(0, 0)),
&result));
return result;
}
TEST_F(ExecutorTest, SimpleAdd) {
// c = a + b
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto in0 = test::graph::Arg(g.get(), 0, DT_FLOAT);
auto in1 = test::graph::Arg(g.get(), 1, DT_FLOAT);
auto tmp = test::graph::Add(g.get(), in0, in1);
auto ret = test::graph::Retval(g.get(), 0, tmp);
g->AddControlEdge(in1, ret);
FixupSourceAndSinkEdges(g.get());
Create(std::move(g));
FunctionCallFrame call_frame({DT_FLOAT, DT_FLOAT}, {DT_FLOAT});
TF_ASSERT_OK(call_frame.SetArgs({V(1.0), V(2.0)}));
TF_ASSERT_OK(Run(&call_frame));
std::vector<Tensor> retvals;
TF_ASSERT_OK(call_frame.ConsumeRetvals(&retvals, false));
EXPECT_EQ(3.0, V(retvals[0])); // out = 1.0 + 2.0 = 3.0
}
TEST_F(ExecutorTest, SelfAdd) {
// v0 <- a
// v1 = v0 + v0
// v2 = v1 + v1
// ... ...
// v10 = v9 + v9
//
// b <- v10
// All nodes are executed by one thread.
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto v = test::graph::Arg(g.get(), 0, DT_FLOAT);
const int N = 10;
for (int i = 1; i <= N; ++i) {
v = test::graph::Add(g.get(), v, v);
}
// out <- v10
test::graph::Retval(g.get(), 0, v);
FixupSourceAndSinkEdges(g.get());
Create(std::move(g));
FunctionCallFrame call_frame({DT_FLOAT}, {DT_FLOAT});
// a = 1.0
TF_ASSERT_OK(call_frame.SetArgs({V(1.0)}));
TF_ASSERT_OK(Run(&call_frame));
std::vector<Tensor> retvals;
TF_ASSERT_OK(call_frame.ConsumeRetvals(&retvals, false));
EXPECT_EQ(1024.0, V(retvals[0])); // b=v10=2*v9=4*v8=...=1024*a=1024.0
}
// Builds a graph which adds N copies of one variable "in". I.e.,
// a + a + a + ... + a
// The returned graph is parenthesized ramdonly. I.e.,
// a + ((a + a) + a)
// (a + a) + (a + a)
// ((a + a) + a) + a
// are all possibly generated.
void BuildTree(int N, Graph* g) {
CHECK_GT(N, 1);
// A single input node "in".
auto in = test::graph::Arg(g, 0, DT_FLOAT);
std::vector<Node*> nodes;
int i = 0;
// Duplicate "in" N times. Each copies is named as l0, l1, l2, ....
for (; i < N; ++i) {
nodes.push_back(test::graph::Identity(g, in, 0));
}
random::PhiloxRandom philox(0, 17);
random::SimplePhilox rnd(&philox);
while (nodes.size() > 1) {
// Randomly pick two from nodes and add them. The resulting node
// is named lik n10, n11, .... and is put back into "nodes".
int x = rnd.Uniform(nodes.size());
auto in0 = nodes[x];
nodes[x] = nodes.back();
nodes.resize(nodes.size() - 1);
x = rnd.Uniform(nodes.size());
auto in1 = nodes[x];
// node = in0 + in1.
nodes[x] = test::graph::Add(g, in0, in1);
}
// The final output node "out".
test::graph::Retval(g, 0, nodes.back());
FixupSourceAndSinkEdges(g);
}
TEST_F(ExecutorTest, RandomTree) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
BuildTree(4096, g.get());
Create(std::move(g));
FunctionCallFrame call_frame({DT_FLOAT}, {DT_FLOAT});
TF_ASSERT_OK(call_frame.SetArgs({V(1.0)}));
TF_ASSERT_OK(Run(&call_frame));
std::vector<Tensor> retvals;
TF_ASSERT_OK(call_frame.ConsumeRetvals(&retvals, false));
EXPECT_EQ(4096.0, V(retvals[0]));
}
TEST_F(ExecutorTest, OpError) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto zero = test::graph::Constant(g.get(), V(0.0));
auto inf = test::graph::Unary(g.get(), "Reciprocal", zero);
auto check = test::graph::CheckNumerics(g.get(), inf, "message");
auto two = test::graph::Constant(g.get(), V(2.0));
test::graph::Binary(g.get(), "Mul", check, two);
FixupSourceAndSinkEdges(g.get());
Create(std::move(g));
FunctionCallFrame call_frame({}, {});
// Fails due to invalid dtype.
EXPECT_TRUE(errors::IsInvalidArgument(Run(&call_frame)));
}
static void BM_executor(int iters, int width, int depth) {
#ifdef PLATFORM_GOOGLE
BenchmarkUseRealTime();
#endif // PLATFORM_GOOGLE
Graph* g = new Graph(OpRegistry::Global());
random::PhiloxRandom philox(1729, 17);
random::SimplePhilox rand(&philox);
uint64 cur = 0;
uint32 r = 1 + rand.Rand32() % width;
std::vector<Node*> ready_nodes;
for (int i = 0; i < r; ++i) {
ready_nodes.push_back(test::graph::NoOp(g, {}));
++cur;
}
std::random_device random_device;
std::mt19937 rng(random_device());
for (int i = 0; i < depth; ++i) {
std::shuffle(ready_nodes.begin(), ready_nodes.end(), rng);
r = 1 + rand.Rand32() % (ready_nodes.size());
std::vector<Node*> control_inputs;
for (int j = 0; j < r; ++j) {
control_inputs.push_back(ready_nodes.back());
ready_nodes.pop_back();
}
Node* n = test::graph::NoOp(g, control_inputs);
++cur;
r = 1 + rand.Rand32() % width;
for (int j = 0; j < r; ++j) {
ready_nodes.push_back(test::graph::NoOp(g, {n}));
++cur;
}
}
FixupSourceAndSinkEdges(g);
#ifdef PLATFORM_GOOGLE
SetBenchmarkLabel(strings::StrCat("Nodes = ", cur));
SetBenchmarkItemsProcessed(cur * static_cast<int64>(iters));
#endif // PLATFORM_GOOGLE
test::Benchmark("cpu", g, nullptr, nullptr, nullptr,
"SINGLE_THREADED_EXECUTOR")
.Run(iters);
}
// Tall skinny graphs
BENCHMARK(BM_executor)->ArgPair(16, 1024);
BENCHMARK(BM_executor)->ArgPair(32, 8192);
// Short fat graphs
BENCHMARK(BM_executor)->ArgPair(1024, 16);
BENCHMARK(BM_executor)->ArgPair(8192, 32);
// Tall fat graph
BENCHMARK(BM_executor)->ArgPair(1024, 1024);
// TODO(mrry): This benchmark currently crashes with a use-after free, because
// test::Benchmark::RunWithArgs() assumes that the executor will take ownership
// of the given graph, *and* keep its nodes (`x`, `y` and `z`) alive for the
// duration of the benchmark. Since the single threaded executor does not retain
// a copy of the graph, this fails.
//
// TODO(mrry): Add support for Arg/Retval "function call convention" in
// `test::Benchmark::RunWithArgs()`.
#if 0
#define ALICE "/job:j/replica:0/task:0/cpu:0"
#define BOB "/job:j/replica:0/task:0/gpu:0"
static void BM_FeedInputFetchOutput(int iters) {
Graph* g = new Graph(OpRegistry::Global());
// z = x + y: x and y are provided as benchmark inputs. z is the
// output of the benchmark. Conceptually, the caller is ALICE, the
// benchmark is BOB.
Node* x = test::graph::Recv(g, "x", "float", ALICE, 1, BOB);
Node* y = test::graph::Recv(g, "y", "float", ALICE, 1, BOB);
Node* sum = test::graph::Add(g, x, y);
Node* z = test::graph::Send(g, sum, "z", BOB, 1, ALICE);
FixupSourceAndSinkEdges(g);
Tensor val(DT_FLOAT, TensorShape({}));
val.scalar<float>()() = 3.14;
SetBenchmarkItemsProcessed(static_cast<int64>(iters));
test::Benchmark("cpu", g, nullptr, nullptr, nullptr,
"SINGLE_THREADED_EXECUTOR")
.RunWithArgs({{x, val}, {y, val}}, {z}, iters);
}
BENCHMARK(BM_FeedInputFetchOutput);
#endif
} // namespace
} // namespace data
} // namespace tensorflow