blob: e994512a43fabdf332629d2e015fa9246888389a [file] [log] [blame]
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/common_runtime/executor.h"
#include <algorithm>
#include "tensorflow/core/common_runtime/device.h"
#include "tensorflow/core/common_runtime/device_factory.h"
#include "tensorflow/core/common_runtime/kernel_benchmark_testlib.h"
#include "tensorflow/core/common_runtime/process_util.h"
#include "tensorflow/core/common_runtime/step_stats_collector.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/rendezvous.h"
#include "tensorflow/core/framework/step_stats.pb.h"
#include "tensorflow/core/framework/versions.pb.h"
#include "tensorflow/core/graph/graph_constructor.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/random/simple_philox.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/test.h"
#include "tensorflow/core/platform/test_benchmark.h"
#include "tensorflow/core/platform/tracing.h"
#include "tensorflow/core/public/session_options.h"
namespace tensorflow {
class ExecutorTest : public ::testing::Test {
protected:
ExecutorTest()
: device_(DeviceFactory::NewDevice("CPU", {},
"/job:localhost/replica:0/task:0")),
step_stats_collector_(&step_stats_) {
SessionOptions options;
thread_pool_ = ComputePool(options);
}
~ExecutorTest() override {
// There should always be exactly one Ref left on the Rendezvous
// when the test completes.
CHECK(rendez_->Unref());
delete exec_;
}
// Resets executor_ with a new executor based on a graph 'gdef'.
void Create(std::unique_ptr<const Graph> graph) {
const int version = graph->versions().producer();
LocalExecutorParams params;
params.device = device_.get();
params.create_kernel = [this, version](const NodeDef& ndef,
OpKernel** kernel) {
return CreateNonCachedKernel(device_.get(), nullptr, ndef, version,
kernel);
};
params.delete_kernel = [](OpKernel* kernel) {
DeleteNonCachedKernel(kernel);
};
rendez_ = NewLocalRendezvous();
params.rendezvous_factory = [this](const int64, const DeviceMgr*,
Rendezvous** r) {
*r = rendez_;
rendez_->Ref();
return Status::OK();
};
delete exec_;
TF_CHECK_OK(NewLocalExecutor(params, *graph, &exec_));
runner_ = [this](std::function<void()> fn) { thread_pool_->Schedule(fn); };
}
Status Run(Rendezvous* rendez) {
Executor::Args args;
args.rendezvous = rendez;
args.stats_collector = &step_stats_collector_;
args.runner = runner_;
return exec_->Run(args);
}
thread::ThreadPool* thread_pool_ = nullptr;
std::unique_ptr<Device> device_;
Executor* exec_ = nullptr;
StepStatsCollector step_stats_collector_;
StepStats step_stats_;
Executor::Args::Runner runner_;
Rendezvous* rendez_ = nullptr;
};
// A float val -> Tensor<float>
Tensor V(const float val) {
Tensor tensor(DT_FLOAT, TensorShape({}));
tensor.scalar<float>()() = val;
return tensor;
}
// A int32 val -> Tensor<int32>
Tensor VI(const int32 val) {
Tensor tensor(DT_INT32, TensorShape({}));
tensor.scalar<int32>()() = val;
return tensor;
}
// A bool val -> Tensor<bool>
Tensor VB(const bool val) {
Tensor tensor(DT_BOOL, TensorShape({}));
tensor.scalar<bool>()() = val;
return tensor;
}
// A double val -> Tensor<double>
Tensor VD(const double val) {
Tensor tensor(DT_DOUBLE, TensorShape({}));
tensor.scalar<double>()() = val;
return tensor;
}
// Tensor<float> -> a float val.
float V(const Tensor& tensor) {
CHECK_EQ(tensor.dtype(), DT_FLOAT);
CHECK(TensorShapeUtils::IsScalar(tensor.shape()));
return tensor.scalar<float>()();
}
static uint64 kIncarnation = 1; // Uses in following tests.
Rendezvous::ParsedKey Key(const string& sender, const uint64 incarnation,
const string& receiver, const string& name) {
Rendezvous::ParsedKey result;
CHECK(
Rendezvous::ParseKey(Rendezvous::CreateKey(sender, incarnation, receiver,
name, FrameAndIter(0, 0)),
&result)
.ok());
return result;
}
#define ALICE "/job:j/replica:0/task:0/cpu:0"
#define BOB "/job:j/replica:0/task:0/device:GPU:0"
TEST_F(ExecutorTest, SimpleAdd) {
// c = a + b
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto in0 = test::graph::Recv(g.get(), "a", "float", ALICE, 1, BOB);
auto in1 = test::graph::Recv(g.get(), "b", "float", ALICE, 1, BOB);
auto tmp = test::graph::Add(g.get(), in0, in1);
test::graph::Send(g.get(), tmp, "c", BOB, 1, ALICE);
Create(std::move(g));
Rendezvous::Args args;
TF_ASSERT_OK(rendez_->Send(Key(ALICE, kIncarnation, BOB, "a"), args, V(1.0),
false)); // in0 = 1.0
TF_ASSERT_OK(rendez_->Send(Key(ALICE, kIncarnation, BOB, "b"), args, V(1.0),
false)); // in1 = 1.0
TF_ASSERT_OK(Run(rendez_));
Tensor out = V(-1);
bool is_dead = false;
TF_ASSERT_OK(
rendez_->Recv(Key(BOB, kIncarnation, ALICE, "c"), args, &out, &is_dead));
EXPECT_EQ(2.0, V(out)); // out = 1.0 + 1.0 = 2.0
}
TEST_F(ExecutorTest, SelfAdd) {
// v0 <- a
// v1 = v0 + v0
// v2 = v1 + v1
// ... ...
// v10 = v9 + v9
//
// b <- v10
// All nodes are executed by one thread.
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto v = test::graph::Recv(g.get(), "a", "float", ALICE, 1, BOB);
const int N = 10;
for (int i = 1; i <= N; ++i) {
v = test::graph::Add(g.get(), v, v);
}
// out <- v10
test::graph::Send(g.get(), v, "b", BOB, 1, ALICE);
Create(std::move(g));
Rendezvous::Args args;
// a = 1.0
TF_ASSERT_OK(
rendez_->Send(Key(ALICE, kIncarnation, BOB, "a"), args, V(1.0), false));
TF_ASSERT_OK(Run(rendez_));
Tensor out = V(-1);
bool is_dead = false;
TF_ASSERT_OK(
rendez_->Recv(Key(BOB, kIncarnation, ALICE, "b"), args, &out, &is_dead));
EXPECT_EQ(1024.0, V(out)); // b=v10=2*v9=4*v8=...=1024*a=1024.0
}
// Builds a graph which adds N copies of one variable "in". I.e.,
// a + a + a + ... + a
// The returned graph is parenthesized ramdonly. I.e.,
// a + ((a + a) + a)
// (a + a) + (a + a)
// ((a + a) + a) + a
// are all possibly generated.
void BuildTree(int N, Graph* g) {
CHECK_GT(N, 1);
// A single input node "in".
auto in = test::graph::Recv(g, "a", "float", ALICE, 1, BOB);
std::vector<Node*> nodes;
int i = 0;
// Duplicate "in" N times. Each copies is named as l0, l1, l2, ....
for (; i < N; ++i) {
nodes.push_back(test::graph::Identity(g, in, 0));
}
random::PhiloxRandom philox(testing::RandomSeed(), 17);
random::SimplePhilox rnd(&philox);
while (nodes.size() > 1) {
// Randomly pick two from nodes and add them. The resulting node
// is named lik n10, n11, .... and is put back into "nodes".
int x = rnd.Uniform(nodes.size());
auto in0 = nodes[x];
nodes[x] = nodes.back();
nodes.resize(nodes.size() - 1);
x = rnd.Uniform(nodes.size());
auto in1 = nodes[x];
// node = in0 + in1.
nodes[x] = test::graph::Add(g, in0, in1);
}
// The final output node "out".
test::graph::Send(g, nodes.back(), "b", BOB, 1, ALICE);
}
TEST_F(ExecutorTest, RandomTree) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
BuildTree(4096, g.get());
Create(std::move(g));
Rendezvous::Args args;
TF_ASSERT_OK(
rendez_->Send(Key(ALICE, kIncarnation, BOB, "a"), args, V(1.0), false));
TF_ASSERT_OK(Run(rendez_));
Tensor out = V(-1);
bool is_dead = false;
TF_ASSERT_OK(
rendez_->Recv(Key(BOB, kIncarnation, ALICE, "b"), args, &out, &is_dead));
EXPECT_EQ(4096.0, V(out));
}
void BuildConcurrentAddAssign(Graph* g) {
auto one = test::graph::Constant(g, V(1.0));
// A variable holds one float.
auto var = test::graph::Var(g, DT_FLOAT, TensorShape({}));
// Initilize the variable with 1.0.
auto init = test::graph::Assign(g, var, one);
// Output
auto out = test::graph::Send(g, var, "out", ALICE, kIncarnation, BOB);
// Have many concurrent computation. Each does v = v + 1.
for (int i = 0; i < 1024; ++i) {
auto add = test::graph::Add(g, var, one);
g->AddControlEdge(init, add); // Ensures run after init.
auto assign = test::graph::Assign(g, var, add);
g->AddControlEdge(assign, out);
}
}
#ifndef THREAD_SANITIZER
TEST_F(ExecutorTest, ConcurrentAddAssign) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
BuildConcurrentAddAssign(g.get());
Create(std::move(g));
for (int iters = 0; iters < 16; ++iters) {
Rendezvous* rendez = NewLocalRendezvous();
TF_ASSERT_OK(Run(rendez));
Rendezvous::Args args;
Tensor out;
bool is_dead;
TF_ASSERT_OK(rendez->Recv(Key(ALICE, kIncarnation, BOB, "out"), args, &out,
&is_dead));
VLOG(1) << "Get " << V(out);
EXPECT_LE(V(out), 1025.0);
rendez->Unref();
}
}
#endif
TEST_F(ExecutorTest, SimpleSwitchLive) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto in0 = test::graph::Recv(g.get(), "a", "float", ALICE, 1, BOB);
auto in1 = test::graph::Constant(g.get(), VB(false));
auto tmp = test::graph::Switch(g.get(), in0, in1);
test::graph::Send(g.get(), tmp, "c", BOB, 1, ALICE);
Create(std::move(g));
Rendezvous::Args args;
TF_ASSERT_OK(rendez_->Send(Key(ALICE, kIncarnation, BOB, "a"), args, V(1.0),
false)); // in0 = 1.0
TF_ASSERT_OK(Run(rendez_));
Tensor out = V(-1);
bool is_dead = false;
TF_ASSERT_OK(
rendez_->Recv(Key(BOB, kIncarnation, ALICE, "c"), args, &out, &is_dead));
EXPECT_EQ(1.0, V(out)); // out = 1.0
EXPECT_FALSE(is_dead);
}
TEST_F(ExecutorTest, SimpleSwitchDead) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto in0 = test::graph::Recv(g.get(), "a", "float", ALICE, 1, BOB);
auto in1 = test::graph::Constant(g.get(), VB(true));
auto tmp = test::graph::Switch(g.get(), in0, in1);
test::graph::Send(g.get(), tmp, "c", BOB, 1, ALICE);
Create(std::move(g));
Rendezvous::Args args;
TF_ASSERT_OK(rendez_->Send(Key(ALICE, kIncarnation, BOB, "a"), args, V(1.0),
false)); // in0 = 1.0
TF_ASSERT_OK(Run(rendez_));
Tensor out = V(-1);
bool is_dead = false;
TF_ASSERT_OK(
rendez_->Recv(Key(BOB, kIncarnation, ALICE, "c"), args, &out, &is_dead));
EXPECT_TRUE(is_dead);
}
TEST_F(ExecutorTest, Abort) {
// e = a + b + c + d
auto g = absl::make_unique<Graph>(OpRegistry::Global());
auto in0 = test::graph::Recv(g.get(), "a", "float", ALICE, 1, BOB);
auto in1 = test::graph::Recv(g.get(), "b", "float", ALICE, 1, BOB);
auto in2 = test::graph::Recv(g.get(), "c", "float", ALICE, 1, BOB);
auto in3 = test::graph::Recv(g.get(), "d", "float", ALICE, 1, BOB);
auto add0 = test::graph::Add(g.get(), in0, in1);
auto add1 = test::graph::Add(g.get(), in2, in3);
auto add2 = test::graph::Add(g.get(), add0, add1);
test::graph::Send(g.get(), add2, "e", BOB, 1, ALICE);
Create(std::move(g));
// Needs 4 inputs (recv). One of them is aborted.
rendez_->Ref();
SchedClosure([this]() {
Env::Default()->SleepForMicroseconds(100 * 1000);
Status s = rendez_->Send(Key(ALICE, kIncarnation, BOB, "a"),
Rendezvous::Args(), V(1.0), false);
rendez_->Unref();
});
rendez_->Ref();
SchedClosure([this]() {
Env::Default()->SleepForMicroseconds(100 * 1000);
Status s = rendez_->Send(Key(ALICE, kIncarnation, BOB, "b"),
Rendezvous::Args(), V(1.0), false);
rendez_->Unref();
});
rendez_->Ref();
SchedClosure([this]() {
Env::Default()->SleepForMicroseconds(100 * 1000);
Status s = rendez_->Send(Key(ALICE, kIncarnation, BOB, "c"),
Rendezvous::Args(), V(1.0), false);
rendez_->Unref();
});
rendez_->Ref();
SchedClosure([this]() {
Env::Default()->SleepForMicroseconds(100 * 1000);
rendez_->StartAbort(errors::Aborted(""));
rendez_->Unref();
});
EXPECT_TRUE(errors::IsAborted(Run(rendez_)));
Tensor out = V(-1);
bool is_dead = false;
EXPECT_TRUE(errors::IsAborted(rendez_->Recv(
Key(BOB, kIncarnation, ALICE, "c"), Rendezvous::Args(), &out, &is_dead)));
// At this point there can still be pending (albeit Aborted) Send
// closures holding Refs on rendez_. We need to wait for them, or
// else there can be a memory leak at termination.
while (!rendez_->RefCountIsOne()) {
}
}
TEST_F(ExecutorTest, RecvInvalidDtype) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
// An input vector of type float of size 1.
auto one = test::graph::Recv(g.get(), "one", "float", ALICE, 1, BOB);
// A floating point variable vector of size 1.
auto var = test::graph::Var(g.get(), DT_FLOAT, TensorShape({1}));
// Initialize the variable with input.
auto init = test::graph::Assign(g.get(), var, one);
// Output
auto* two = test::graph::Send(g.get(), var, "two", BOB, 1, ALICE);
g->AddControlEdge(init, two); // Ensures run after init.
Create(std::move(g));
Rendezvous* rendez = NewLocalRendezvous();
// Send a double instead of float.
TF_ASSERT_OK(rendez->Send(Key(ALICE, 1, BOB, "one"), Rendezvous::Args(),
VD(1.0), false));
// Fails due to invalid dtype.
EXPECT_TRUE(errors::IsInternal(Run(rendez)));
Tensor output;
bool is_dead;
EXPECT_TRUE(errors::IsInternal(rendez->Recv(
Key(BOB, 1, ALICE, "two"), Rendezvous::Args(), &output, &is_dead)));
rendez->Unref();
}
TEST_F(ExecutorTest, RecvInvalidRefDtype) {
auto g = absl::make_unique<Graph>(OpRegistry::Global());
// A var that always produces as invalid dtype.
auto var = test::graph::InvalidRefType(g.get(), DT_FLOAT, DT_DOUBLE);
test::graph::Send(g.get(), var, "out", BOB, 1, ALICE);
Create(std::move(g));
Rendezvous* rendez = NewLocalRendezvous();
EXPECT_TRUE(errors::IsInternal(Run(rendez)));
Tensor output;
bool is_dead;
EXPECT_TRUE(errors::IsInternal(rendez->Recv(
Key(BOB, 1, ALICE, "out"), Rendezvous::Args(), &output, &is_dead)));
rendez->Unref();
}
// Create a graph that is 'depth' deep. At each level, fan-in and fan-out a
// maximum of 'width' nodes. All nodes are no-ops and all dependencies are
// control dependencies.
static void BM_executor(int iters, int width, int depth) {
#ifdef PLATFORM_GOOGLE
BenchmarkUseRealTime();
#endif // PLATFORM_GOOGLE
Graph* g = new Graph(OpRegistry::Global());
random::PhiloxRandom philox(1729, 17);
random::SimplePhilox rand(&philox);
uint64 cur = 0;
uint32 r = 1 + rand.Rand32() % width;
std::vector<Node*> ready_nodes;
for (int i = 0; i < r; ++i) {
ready_nodes.push_back(test::graph::NoOp(g, {}));
++cur;
}
std::random_device random_device;
std::mt19937 rng(random_device());
for (int i = 0; i < depth; ++i) {
std::shuffle(ready_nodes.begin(), ready_nodes.end(), rng);
r = 1 + rand.Rand32() % (ready_nodes.size());
std::vector<Node*> control_inputs;
for (int j = 0; j < r; ++j) {
control_inputs.push_back(ready_nodes.back());
ready_nodes.pop_back();
}
Node* n = test::graph::NoOp(g, control_inputs);
++cur;
r = 1 + rand.Rand32() % width;
for (int j = 0; j < r; ++j) {
ready_nodes.push_back(test::graph::NoOp(g, {n}));
++cur;
}
}
#ifdef PLATFORM_GOOGLE
SetBenchmarkLabel(strings::StrCat("Nodes = ", cur));
SetBenchmarkItemsProcessed(cur * static_cast<int64>(iters));
#endif // PLATFORM_GOOGLE
test::Benchmark("cpu", g).Run(iters);
}
// Tall skinny graphs
BENCHMARK(BM_executor)->ArgPair(16, 1024);
BENCHMARK(BM_executor)->ArgPair(32, 8192);
// Short fat graphs
BENCHMARK(BM_executor)->ArgPair(1024, 16);
BENCHMARK(BM_executor)->ArgPair(8192, 32);
// Tall fat graph
BENCHMARK(BM_executor)->ArgPair(1024, 1024);
static void BM_FeedInputFetchOutput(int iters) {
Graph* g = new Graph(OpRegistry::Global());
// z = x + y: x and y are provided as benchmark inputs. z is the
// output of the benchmark. Conceptually, the caller is ALICE, the
// benchmark is BOB.
Node* x = test::graph::Recv(g, "x", "float", ALICE, 1, BOB);
Node* y = test::graph::Recv(g, "y", "float", ALICE, 1, BOB);
Node* sum = test::graph::Add(g, x, y);
Node* z = test::graph::Send(g, sum, "z", BOB, 1, ALICE);
string x_key = test::GetRendezvousKey(x);
string y_key = test::GetRendezvousKey(y);
string z_key = test::GetRendezvousKey(z);
Tensor val(DT_FLOAT, TensorShape({}));
val.scalar<float>()() = 3.14;
#ifdef PLATFORM_GOOGLE
SetBenchmarkItemsProcessed(static_cast<int64>(iters));
#endif // PLATFORM_GOOGLE
test::Benchmark("cpu", g).RunWithRendezvousArgs({{x_key, val}, {y_key, val}},
{z_key}, iters);
}
BENCHMARK(BM_FeedInputFetchOutput);
} // namespace tensorflow