| /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ==============================================================================*/ |
| |
| #include "tensorflow/core/distributed_runtime/rpc/grpc_session.h" |
| |
| #include "tensorflow/core/common_runtime/device.h" |
| #include "tensorflow/core/distributed_runtime/rpc/grpc_testlib.h" |
| #include "tensorflow/core/framework/graph.pb.h" |
| #include "tensorflow/core/framework/op.h" |
| #include "tensorflow/core/framework/tensor_testutil.h" |
| #include "tensorflow/core/graph/default_device.h" |
| #include "tensorflow/core/graph/graph.h" |
| #include "tensorflow/core/graph/testlib.h" |
| #include "tensorflow/core/lib/strings/strcat.h" |
| #include "tensorflow/core/platform/env.h" |
| #include "tensorflow/core/platform/init_main.h" |
| #include "tensorflow/core/platform/logging.h" |
| #include "tensorflow/core/platform/test.h" |
| #include "tensorflow/core/protobuf/error_codes.pb.h" |
| #include "tensorflow/core/public/session.h" |
| #include "tensorflow/core/util/port.h" |
| |
| namespace tensorflow { |
| |
| static SessionOptions Devices(int num_cpus, int num_gpus) { |
| SessionOptions result; |
| (*result.config.mutable_device_count())["CPU"] = num_cpus; |
| (*result.config.mutable_device_count())["GPU"] = num_gpus; |
| return result; |
| } |
| |
| void CreateGraphDef(GraphDef* graph_def, string node_names[3]) { |
| Graph graph(OpRegistry::Global()); |
| |
| Tensor a_tensor(DT_FLOAT, TensorShape({1, 2})); |
| test::FillValues<float>(&a_tensor, {1, 2}); |
| Node* a = test::graph::Constant(&graph, a_tensor); |
| node_names[0] = a->name(); |
| |
| Tensor b_tensor(DT_FLOAT, TensorShape({2, 1})); |
| test::FillValues<float>(&b_tensor, {2, 1}); |
| Node* b = test::graph::Constant(&graph, b_tensor); |
| node_names[1] = b->name(); |
| |
| Node* c = test::graph::Matmul(&graph, a, b, false, false); |
| node_names[2] = c->name(); |
| |
| test::graph::ToGraphDef(&graph, graph_def); |
| } |
| |
| // Asserts that "val" is a single float tensor. The only float is |
| // "expected_val". |
| static void IsSingleFloatValue(const Tensor& val, float expected_val) { |
| ASSERT_EQ(val.dtype(), DT_FLOAT); |
| ASSERT_EQ(val.NumElements(), 1); |
| ASSERT_EQ(val.flat<float>()(0), expected_val); |
| } |
| |
| static SessionOptions Options(const string& target, int placement_period) { |
| SessionOptions options; |
| // NOTE(mrry): GrpcSession requires a grpc:// scheme prefix in the target |
| // string. |
| options.target = strings::StrCat("grpc://", target); |
| options.config.set_placement_period(placement_period); |
| options.config.mutable_graph_options() |
| ->mutable_optimizer_options() |
| ->set_opt_level(OptimizerOptions::L0); |
| return options; |
| } |
| |
| static Session* NewRemote(const SessionOptions& options) { |
| return CHECK_NOTNULL(NewSession(options)); |
| } |
| |
| TEST(GrpcSessionTest, BasicNonProtoAPI) { |
| GraphDef graph; |
| string node_names[3]; |
| // c = a * b |
| CreateGraphDef(&graph, node_names); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| for (int iters = 0; iters < 25; ++iters) { |
| TF_CHECK_OK(session->Create(graph)); |
| { |
| // Just run to target node |
| std::vector<std::pair<string, Tensor>> inputs; |
| std::vector<string> targets = {node_names[2]}; |
| TF_CHECK_OK(session->Run(inputs, {}, targets, nullptr)); |
| } |
| { |
| // Run to a target node and a real tensor |
| std::vector<std::pair<string, Tensor>> inputs; |
| std::vector<string> names = {node_names[2] + ":0"}; |
| std::vector<string> targets = {node_names[1]}; |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run(inputs, names, targets, &outputs)); |
| ASSERT_TRUE(outputs[0].IsInitialized()); |
| ASSERT_EQ(4.0, outputs[0].flat<float>()(0)); |
| } |
| |
| TF_CHECK_OK(session->Close()); |
| } |
| } |
| |
| TEST(GrpcSessionTest, BasicCallable) { |
| GraphDef graph; |
| string node_names[3]; |
| // c = a * b |
| CreateGraphDef(&graph, node_names); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| for (int iters = 0; iters < 25; ++iters) { |
| TF_CHECK_OK(session->Create(graph)); |
| { |
| // Just run to target node |
| CallableOptions opts; |
| opts.add_target(node_names[2]); |
| Session::CallableHandle handle; |
| TF_CHECK_OK(session->MakeCallable(opts, &handle)); |
| TF_CHECK_OK(session->RunCallable(handle, {}, nullptr, nullptr)); |
| TF_CHECK_OK(session->ReleaseCallable(handle)); |
| } |
| { |
| // Run to a target node and a real tensor |
| CallableOptions opts; |
| opts.add_target(node_names[1]); |
| opts.add_fetch(node_names[2] + ":0"); |
| Session::CallableHandle handle; |
| TF_CHECK_OK(session->MakeCallable(opts, &handle)); |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->RunCallable(handle, {}, &outputs, nullptr)); |
| ASSERT_EQ(1, outputs.size()); |
| ASSERT_TRUE(outputs[0].IsInitialized()); |
| ASSERT_EQ(4.0, outputs[0].flat<float>()(0)); |
| TF_CHECK_OK(session->ReleaseCallable(handle)); |
| } |
| |
| TF_CHECK_OK(session->Close()); |
| } |
| } |
| |
| TEST(GrpcSessionTest, CallableWithOnDeviceFeedsAndFetches) { |
| // Specifying feeds/fetch devices for remote sessions is not yet defined. |
| // Ensure that the error is graceful. |
| GraphDef graph; |
| string node_names[3]; |
| // c = a * b |
| CreateGraphDef(&graph, node_names); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(graph)); |
| |
| std::vector<DeviceAttributes> devices; |
| TF_CHECK_OK(session->ListDevices(&devices)); |
| ASSERT_GT(devices.size(), 0); |
| const string device_name = devices.back().name(); |
| |
| CallableOptions opts; |
| const string fetch = node_names[2] + ":0"; |
| opts.add_fetch(fetch); |
| opts.mutable_fetch_devices()->insert({fetch, device_name}); |
| |
| Session::CallableHandle handle; |
| Status status = session->MakeCallable(opts, &handle); |
| EXPECT_EQ(error::UNIMPLEMENTED, status.code()); |
| TF_CHECK_OK(session->Close()); |
| } |
| |
| TEST(GrpcSessionTest, BasicNonProtoAPIConsistentOrder) { |
| GraphDef graph; |
| string node_names[3]; |
| // c = a * b |
| CreateGraphDef(&graph, node_names); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| ASSERT_TRUE(session != nullptr); |
| ASSERT_TRUE(session->Create(graph).ok()); |
| |
| // Test that the order of the output names matches the order of the |
| // returned Tensors. |
| std::vector<std::pair<string, Tensor>> inputs; |
| std::vector<string> names = {node_names[2] + ":0", node_names[0] + ":0", |
| node_names[1] + ":0"}; |
| |
| std::vector<string> target_ops = {node_names[1]}; |
| std::vector<Tensor> outputs; |
| ASSERT_TRUE(session->Run(inputs, names, target_ops, &outputs).ok()); |
| ASSERT_TRUE(outputs[0].IsInitialized()); |
| ASSERT_EQ(4.0, outputs[0].flat<float>()(0)); |
| ASSERT_TRUE(outputs[1].IsInitialized()); |
| ASSERT_EQ(1.0, outputs[1].flat<float>()(0)); |
| ASSERT_TRUE(outputs[2].IsInitialized()); |
| ASSERT_EQ(2.0, outputs[2].flat<float>()(0)); |
| ASSERT_TRUE(session->Close().ok()); |
| } |
| |
| TEST(GrpcSessionTest, NonLocalWithFilters) { |
| GraphDef graph; |
| string node_names[3]; |
| // c = a * b |
| CreateGraphDef(&graph, node_names); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| SessionOptions options; |
| options.target = strings::StrCat("grpc://", cluster->targets()[0]); |
| options.config.add_device_filters(cluster->devices()[0].name()); |
| |
| std::unique_ptr<Session> session(NewRemote(options)); |
| ASSERT_TRUE(session != nullptr); |
| |
| { |
| GraphDef graph_copy(graph); |
| graph::SetDefaultDevice(cluster->devices()[0].name(), &graph_copy); |
| TF_CHECK_OK(session->Create(graph_copy)); |
| TF_CHECK_OK(session->Run({}, {}, {node_names[2]}, nullptr)); |
| TF_CHECK_OK(session->Close()); |
| } |
| { |
| GraphDef graph_copy(graph); |
| graph::SetDefaultDevice(cluster->devices()[1].name(), &graph_copy); |
| auto status = session->Create(graph_copy); |
| EXPECT_EQ(tensorflow::error::INVALID_ARGUMENT, status.code()); |
| } |
| } |
| |
| TEST(GrpcSessionTest, FetchMultipleTimes) { |
| GraphDef graph; |
| string node_names[3]; |
| CreateGraphDef(&graph, node_names); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(graph)); |
| const std::vector<std::pair<string, Tensor>> inputs; |
| std::vector<Tensor> outputs; |
| |
| const string node = node_names[2] + ":0"; |
| TF_CHECK_OK(session->Run(inputs, {node, node}, {}, &outputs)); |
| EXPECT_EQ(2, outputs.size()); |
| for (int i = 0; i < outputs.size(); ++i) { |
| const Tensor& t = outputs[i]; |
| ASSERT_TRUE(t.IsInitialized()) << i; |
| ASSERT_EQ(4.0, t.flat<float>()(0)) << i; |
| } |
| TF_CHECK_OK(session->Close()); |
| } |
| |
| // A = [3 2; -1 0]; x = rand(2, 1); We want to compute the largest |
| // eigenvalue for A, which is 2.0. Iteratively, we do |
| // repeat x = y / y.norm(); y = A * x; end |
| // At the end, we expect "lambda" converges to 2.0. |
| void FindMaxEigen(const string& target) { |
| Graph graph(OpRegistry::Global()); |
| |
| Tensor a_tensor(DT_FLOAT, TensorShape({2, 2})); |
| // Store rows [3, 2] and [-1, 0] in row major format. |
| test::FillValues<float>(&a_tensor, {3, 2, -1, 0}); |
| Node* a = test::graph::Constant(&graph, a_tensor); |
| |
| // x is from the feed. |
| Tensor x_tensor(DT_FLOAT, TensorShape({2, 1})); |
| test::FillValues<float>(&x_tensor, {0, 0}); |
| Node* x = test::graph::Constant(&graph, x_tensor); |
| |
| // y = A * x |
| Node* y = test::graph::Matmul(&graph, a, x, false, false); |
| |
| // y2 = y.^2 |
| Node* y2 = test::graph::Unary(&graph, "Square", y); |
| |
| // const tensor for reduction |
| Tensor rdim_tensor(DT_INT32, TensorShape({})); |
| rdim_tensor.scalar<int32>()() = 0; |
| Node* rdim = test::graph::Constant(&graph, rdim_tensor); |
| |
| // y2_sum = sum(y2) |
| Node* y2_sum = test::graph::Reduce(&graph, "Sum", y2, rdim); |
| |
| // y_norm = sqrt(y2_sum) |
| Node* y_norm = test::graph::Unary(&graph, "Sqrt", y2_sum); |
| |
| // y_normalized = y ./ y_norm |
| Node* y_normalized = test::graph::Binary(&graph, "Div", y, y_norm); |
| |
| GraphDef def; |
| test::graph::ToGraphDef(&graph, &def); |
| |
| std::unique_ptr<Session> session(NewRemote(Options(target, 1))); |
| ASSERT_TRUE(session != nullptr); |
| TF_CHECK_OK(session->Create(def)); |
| |
| // Setup feeds and fetches. |
| float lambda; |
| Tensor feed_value(DT_FLOAT, TensorShape({2, 1})); |
| feed_value.matrix<float>()(0, 0) = -3.1415; |
| feed_value.matrix<float>()(1, 0) = +2.7183; |
| |
| for (int i = 0; i < 25; ++i) { |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run({{x->name(), feed_value}}, |
| {y->name(), y_normalized->name()}, {}, &outputs)); |
| const Tensor& y = outputs[0]; |
| const Tensor& y_normalized = outputs[1]; |
| // Print out lambda, x, and y. |
| CHECK_EQ(2, feed_value.NumElements()); |
| CHECK_EQ(2, y.NumElements()); |
| lambda = y.flat<float>()(0) / feed_value.flat<float>()(0); |
| printf("%06d lambda = %8.6f x = [%8.6f %8.6f] y = [%8.6f %8.6f]\n", i, |
| lambda, feed_value.flat<float>()(0), feed_value.flat<float>()(1), |
| y.flat<float>()(0), y.flat<float>()(1)); |
| // Copies y_normalized to *x. |
| feed_value = y_normalized; |
| } |
| EXPECT_NEAR(2.0, lambda, 1e-6); |
| } |
| |
| TEST(FindMaxEigenTest, RemoteDevice) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| FindMaxEigen(cluster->targets()[0]); |
| } |
| |
| void SetDevice(GraphDef* graph, const string& name, const string& dev) { |
| for (int i = 0; i < graph->node_size(); ++i) { |
| if (graph->node(i).name() == name) { |
| graph->mutable_node(i)->set_device(dev); |
| return; |
| } |
| } |
| LOG(FATAL) << "Name '" << name << "' not found."; |
| } |
| |
| // TODO(b/32636929): This test fails 1/1000 times. Disable it while we |
| // figure out why. |
| TEST(GrpcSessionTest, DISABLED_MultiDevices) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| Graph graph(OpRegistry::Global()); |
| const int kSize = 1048576; |
| |
| // c = a * b = 2 * 3 * kSize |
| Tensor a_tensor(DT_FLOAT, TensorShape({1, kSize})); |
| Tensor b_tensor(DT_FLOAT, TensorShape({kSize, 1})); |
| for (int i = 0; i < kSize; ++i) { |
| a_tensor.flat<float>()(i) = 2; |
| b_tensor.flat<float>()(i) = 3; |
| } |
| Node* a = test::graph::Constant(&graph, a_tensor); |
| Node* b = test::graph::Constant(&graph, b_tensor); |
| Node* c = test::graph::Matmul(&graph, a, b, false, false); |
| |
| GraphDef def; |
| test::graph::ToGraphDef(&graph, &def); |
| |
| // In this test, we force each node (a, b, c) on every possible device. |
| // We test all possible cases. |
| for (const auto& a_dev : cluster->devices()) { |
| for (const auto& b_dev : cluster->devices()) { |
| for (const auto& c_dev : cluster->devices()) { |
| LOG(INFO) << "a: " << a_dev.name() << " b: " << b_dev.name() |
| << " c: " << c_dev.name(); |
| |
| SetDevice(&def, a->name(), a_dev.name()); |
| SetDevice(&def, b->name(), b_dev.name()); |
| SetDevice(&def, c->name(), c_dev.name()); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1000))); |
| ASSERT_TRUE(session != nullptr); |
| TF_CHECK_OK(session->Create(def)); |
| { |
| std::vector<Tensor> outputs; |
| RunOptions options; |
| options.set_trace_level(RunOptions::FULL_TRACE); |
| RunMetadata metadata; |
| TF_CHECK_OK( |
| session->Run(options, {}, {c->name()}, {}, &outputs, &metadata)); |
| ASSERT_EQ(1, outputs.size()); |
| IsSingleFloatValue(outputs[0], 6.0 * kSize); |
| |
| const StepStats& ss = metadata.step_stats(); |
| // NOTE(mrry): We only assert that `c` is placed correctly, |
| // because the current placement algorithm will move its |
| // inputs to be colocated with it, when it is the sole |
| // consumer. |
| bool c_placed_correctly = false; |
| for (const auto& dev : ss.dev_stats()) { |
| for (const auto& node : dev.node_stats()) { |
| if (node.node_name() == c->name() && |
| dev.device() == c_dev.name()) { |
| c_placed_correctly = true; |
| } |
| } |
| } |
| ASSERT_TRUE(c_placed_correctly); |
| } |
| TF_CHECK_OK(session->Close()); |
| } |
| } |
| } |
| } |
| |
| TEST(GrpcSessionTest, LargeTensorSend) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| Graph graph(OpRegistry::Global()); |
| |
| // Define a 3 GB fill result. |
| Tensor fill_shape_tensor(DT_INT32, TensorShape({4})); |
| fill_shape_tensor.vec<int32>()(0) = 1; |
| fill_shape_tensor.vec<int32>()(1) = 256; |
| fill_shape_tensor.vec<int32>()(2) = 1024; |
| fill_shape_tensor.vec<int32>()(3) = 1024; |
| Node* fill_shape_node = test::graph::Constant(&graph, fill_shape_tensor); |
| |
| Tensor fill_val_tensor(DT_FLOAT, TensorShape({})); |
| fill_val_tensor.flat<float>()(0) = 1.0; |
| Node* fill_val_node = test::graph::Constant(&graph, fill_val_tensor); |
| |
| Node* fill_node = |
| test::graph::Binary(&graph, "Fill", fill_shape_node, fill_val_node); |
| |
| Tensor max_axes_tensor(DT_INT32, TensorShape({4})); |
| max_axes_tensor.vec<int32>()(0) = 0; |
| max_axes_tensor.vec<int32>()(1) = 1; |
| max_axes_tensor.vec<int32>()(2) = 2; |
| max_axes_tensor.vec<int32>()(3) = 3; |
| Node* max_axes_node = test::graph::Constant(&graph, max_axes_tensor); |
| Node* max_node = test::graph::Reduce(&graph, "Max", fill_node, max_axes_node); |
| |
| GraphDef def; |
| test::graph::ToGraphDef(&graph, &def); |
| |
| SetDevice(&def, fill_node->name(), cluster->devices()[0].name()); |
| SetDevice(&def, fill_node->name(), cluster->devices()[1].name()); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1000))); |
| ASSERT_TRUE(session != nullptr); |
| TF_CHECK_OK(session->Create(def)); |
| { |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run({}, {max_node->name()}, {}, &outputs)); |
| ASSERT_EQ(1, outputs.size()); |
| IsSingleFloatValue(outputs[0], 1.0); |
| } |
| TF_CHECK_OK(session->Close()); |
| } |
| |
| TEST(GrpcSessionTest, MultiDevices_String) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 1), 2, &cluster)); |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1000))); |
| ASSERT_TRUE(session != nullptr); |
| |
| // b = a |
| Graph graph(OpRegistry::Global()); |
| Tensor a_tensor(DT_STRING, TensorShape({2, 2})); |
| for (int i = 0; i < 4; ++i) { |
| a_tensor.flat<tstring>()(i) = "hello, world"; |
| } |
| Node* a = test::graph::Constant(&graph, a_tensor); |
| Node* b = test::graph::Identity(&graph, a); |
| |
| GraphDef def; |
| test::graph::ToGraphDef(&graph, &def); |
| |
| // In this test, we force each node (a, b) on every possible device. |
| // We test all possible cases. |
| for (const auto& a_dev : cluster->devices()) { |
| for (const auto& b_dev : cluster->devices()) { |
| LOG(INFO) << "a: " << a_dev.name() << " b: " << b_dev.name(); |
| SetDevice(&def, a->name(), a_dev.name()); |
| SetDevice(&def, b->name(), b_dev.name()); |
| |
| Status s = session->Create(def); |
| if (s.ok()) { |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run({}, {b->name()}, {}, &outputs)); |
| ASSERT_EQ(1, outputs.size()); |
| ASSERT_EQ(outputs[0].dtype(), DT_STRING); |
| ASSERT_EQ(outputs[0].NumElements(), 4); |
| for (int i = 0; i < outputs[0].NumElements(); ++i) { |
| EXPECT_EQ(outputs[0].flat<tstring>()(i), "hello, world"); |
| } |
| TF_CHECK_OK(session->Close()); |
| } else { |
| LOG(ERROR) << "Error: " << s; |
| ASSERT_TRUE((a_dev.device_type() == DEVICE_GPU) || |
| (b_dev.device_type() == DEVICE_GPU)); |
| ASSERT_FALSE(s.ok()); |
| } |
| } |
| } |
| } |
| |
| TEST(GrpcSessionTest, SendRecv_Node_Naming) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 3, &cluster)); |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| // This test case needs at least 3 devices. |
| CHECK_GE(cluster->devices().size(), 3); |
| const DeviceAttributes& src = cluster->devices()[0]; |
| const DeviceAttributes& dst0 = cluster->devices()[1]; |
| const DeviceAttributes& dst1 = cluster->devices()[2]; |
| LOG(INFO) << "src = " << src.name() << " dst0 = " << dst0.name() |
| << " dst1 = " << dst1.name(); |
| |
| // Within the same session, we compute two subgraphs: |
| // 1) a on 'src' sends to b on 'dst0'; |
| // 2) a on 'src' sends to c on 'dst1'. |
| Graph graph(OpRegistry::Global()); |
| Tensor a_tensor(DT_FLOAT, TensorShape({1, 1})); |
| a_tensor.flat<float>()(0) = 100; |
| Node* a = test::graph::Constant(&graph, a_tensor); |
| Node* b = test::graph::Identity(&graph, a); |
| Node* c = test::graph::Identity(&graph, a); |
| |
| GraphDef def; |
| test::graph::ToGraphDef(&graph, &def); |
| |
| // The base graph have a, b, c, assigned to devices explicitly. |
| SetDevice(&def, a->name(), src.name()); |
| SetDevice(&def, b->name(), dst0.name()); |
| SetDevice(&def, c->name(), dst1.name()); |
| TF_CHECK_OK(session->Create(def)); |
| |
| // Run subgraph a -> b, and fetch b. |
| { |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run({}, {b->name()}, {}, &outputs)); |
| ASSERT_EQ(1, outputs.size()); |
| IsSingleFloatValue(outputs[0], 100); |
| } |
| |
| // Run subgraph a -> c, and fetch c. |
| { |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run({}, {c->name()}, {}, &outputs)); |
| ASSERT_EQ(1, outputs.size()); |
| IsSingleFloatValue(outputs[0], 100); |
| } |
| |
| TF_CHECK_OK(session->Close()); |
| } |
| |
| TEST(GrpcSessionTest, Error) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| const string& master = cluster->targets()[0]; |
| const string& dev_a = cluster->devices()[0].name(); |
| const string& dev_b = cluster->devices()[1].name(); |
| LOG(INFO) << "master " << master << "dev_a " << dev_a << "dev_b " << dev_b; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| { |
| Graph g(OpRegistry::Global()); |
| |
| // a2 = a + error(a) |
| // |
| // Subgraph for "a" fails. The master will cancel the subgraph for |
| // "b" and then returns the Session::Run. |
| auto a = test::graph::Constant(&g, Tensor()); |
| a->set_assigned_device_name(dev_a); |
| auto a_err = test::graph::Error(&g, a, "fantasia!"); |
| a_err->set_assigned_device_name(dev_a); |
| auto a2 = test::graph::Add(&g, a, a_err); |
| a2->set_assigned_device_name(dev_a); |
| fetches.push_back(a2->name()); |
| |
| // b2 = b + delay(b) |
| // |
| // Subgraph for "b" sleeps at the node "b_delay". When the sleep |
| // finishes, the subgraph "b" will continue execution till it |
| // notices that it is canceled. Meanwhile, subgraph's executor |
| // and its related state (registered ops) should still be alive. |
| auto b = test::graph::Constant(&g, Tensor()); |
| b->set_assigned_device_name(dev_b); |
| auto b_delay = test::graph::Delay(&g, b, Microseconds(1000000)); |
| b_delay->set_assigned_device_name(dev_b); |
| auto b2 = test::graph::Add(&g, b, b_delay); |
| b2->set_assigned_device_name(dev_b); |
| fetches.push_back(b2->name()); |
| test::graph::ToGraphDef(&g, &gdef); |
| } |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| Status status = session->Run({}, fetches, {}, nullptr); |
| EXPECT_FALSE(status.ok()); |
| EXPECT_NE(status.ToString().find("fantasia!"), string::npos); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| TEST(GrpcSessionTest, ErrorStatusLog) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| const string& master = cluster->targets()[0]; |
| const string& dev_a = cluster->devices()[0].name(); |
| const string& dev_b = cluster->devices()[1].name(); |
| LOG(INFO) << "master " << master << "dev_a " << dev_a << "dev_b " << dev_b; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| { |
| Graph g(OpRegistry::Global()); |
| |
| // a2 = a + error(a) |
| // |
| // Subgraph for "a" fails. The master will cancel the subgraph for |
| // "b" and then returns the Session::Run. |
| auto a = test::graph::Constant(&g, Tensor()); |
| a->set_assigned_device_name(dev_a); |
| auto a_err = test::graph::Error(&g, a, "fantasia!", true); |
| a_err->set_assigned_device_name(dev_a); |
| auto a2 = test::graph::Add(&g, a, a_err); |
| a2->set_assigned_device_name(dev_a); |
| fetches.push_back(a2->name()); |
| |
| // b2 = b + delay(b) |
| // |
| // Subgraph for "b" sleeps at the node "b_delay". When the sleep |
| // finishes, the subgraph "b" will continue execution till it |
| // notices that it is canceled. Meanwhile, subgraph's executor |
| // and its related state (registered ops) should still be alive. |
| auto b = test::graph::Constant(&g, Tensor()); |
| b->set_assigned_device_name(dev_b); |
| auto b_delay = test::graph::Delay(&g, b, Microseconds(1000000)); |
| b_delay->set_assigned_device_name(dev_b); |
| auto b2 = test::graph::Add(&g, b, b_delay); |
| b2->set_assigned_device_name(dev_b); |
| fetches.push_back(b2->name()); |
| g.ToGraphDef(&gdef); |
| } |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| Status status = session->Run({}, fetches, {}, nullptr); |
| EXPECT_FALSE(status.ok()); |
| std::cerr << status << "\n"; |
| EXPECT_NE(status.ToString().find("fantasia!"), string::npos); |
| EXPECT_NE(status.ToString().find("ErrorOp: fantasia!"), string::npos); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| TEST(GrpcSessionTest, LongErrorMessage) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| const string& master = cluster->targets()[0]; |
| const string& dev_a = cluster->devices()[0].name(); |
| const string& dev_b = cluster->devices()[1].name(); |
| LOG(INFO) << "master " << master << "dev_a " << dev_a << "dev_b " << dev_b; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| { |
| Graph g(OpRegistry::Global()); |
| |
| // a2 = a + error(a) |
| // |
| // Subgraph for "a" fails. The master will cancel the subgraph for |
| // "b" and then returns the Session::Run. |
| auto a = test::graph::Constant(&g, Tensor()); |
| a->set_assigned_device_name(dev_a); |
| std::vector<char> long_string_buffer(1024 * 1024, 'x'); |
| StringPiece long_string(long_string_buffer.data(), 1024 * 1024); |
| string name = strings::StrCat(long_string, "fantasia!"); |
| auto a_err = test::graph::Error(&g, a, name); |
| a_err->set_assigned_device_name(dev_a); |
| auto a2 = test::graph::Add(&g, a, a_err); |
| a2->set_assigned_device_name(dev_a); |
| fetches.push_back(a2->name()); |
| |
| // b2 = b + delay(b) |
| // |
| // Subgraph for "b" sleeps at the node "b_delay". When the sleep |
| // finishes, the subgraph "b" will continue execution till it |
| // notices that it is canceled. Meanwhile, subgraph's executor |
| // and its related state (registered ops) should still be alive. |
| auto b = test::graph::Constant(&g, Tensor()); |
| b->set_assigned_device_name(dev_b); |
| auto b_delay = test::graph::Delay(&g, b, Microseconds(1000000)); |
| b_delay->set_assigned_device_name(dev_b); |
| auto b2 = test::graph::Add(&g, b, b_delay); |
| b2->set_assigned_device_name(dev_b); |
| fetches.push_back(b2->name()); |
| test::graph::ToGraphDef(&g, &gdef); |
| } |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| Status status = session->Run({}, fetches, {}, nullptr); |
| EXPECT_FALSE(status.ok()); |
| EXPECT_NE(status.ToString().find("fantasia!"), string::npos); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| TEST(SessionTest, SharedVar) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 1, &cluster)); |
| const string master = cluster->targets()[0]; |
| CHECK_EQ(cluster->devices().size(), 1); |
| |
| GraphDef gdef; |
| string init_name; |
| string inc_name; |
| string get_name; |
| { |
| Graph g(OpRegistry::Global()); |
| Tensor one(DT_FLOAT, TensorShape({})); |
| one.scalar<float>()() = 1.0; |
| Node* var = test::graph::Var(&g, DT_FLOAT, one.shape()); |
| Node* init = test::graph::Assign(&g, var, test::graph::Constant(&g, one)); |
| init_name = init->name(); |
| Node* update = test::graph::Assign( |
| &g, var, test::graph::Add(&g, var, test::graph::Constant(&g, one))); |
| inc_name = update->name(); |
| get_name = var->name(); |
| test::graph::ToGraphDef(&g, &gdef); |
| } |
| |
| // Init a variable |
| { |
| Session* sess = NewRemote(Options(master, 1)); |
| TF_CHECK_OK(sess->Create(gdef)); |
| std::vector<std::pair<string, Tensor>> inp; |
| TF_CHECK_OK(sess->Run(inp, {}, {init_name}, nullptr)); |
| TF_CHECK_OK(sess->Close()); |
| delete sess; |
| } |
| |
| for (int rep = 1; rep < 10; ++rep) { |
| // Update a variable |
| { |
| Session* sess = NewRemote(Options(master, 1)); |
| TF_CHECK_OK(sess->Create(gdef)); |
| std::vector<std::pair<string, Tensor>> inp; |
| TF_CHECK_OK(sess->Run(inp, {}, {inc_name}, nullptr)); |
| TF_CHECK_OK(sess->Close()); |
| delete sess; |
| } |
| |
| // Gets the variable's value. |
| { |
| Session* sess = NewRemote(Options(master, 1)); |
| TF_CHECK_OK(sess->Create(gdef)); |
| std::vector<std::pair<string, Tensor>> inp; |
| std::vector<Tensor> ret; |
| TF_CHECK_OK(sess->Run(inp, {get_name}, {}, &ret)); |
| ASSERT_EQ(ret.size(), 1); |
| EXPECT_EQ(ret[0].scalar<float>()(), 1.0 * (1 + rep)); |
| TF_CHECK_OK(sess->Close()); |
| delete sess; |
| } |
| } |
| } |
| |
| void CreateInvalidGraph(const string& graph_def_ascii, |
| const string& error_substring) { |
| GraphDef graph; |
| CHECK(protobuf::TextFormat::ParseFromString(graph_def_ascii, &graph)); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| Status s = session->Create(graph); |
| |
| ASSERT_FALSE(s.ok()); |
| EXPECT_NE(s.error_message().find(error_substring), string::npos); |
| } |
| |
| TEST(SessionTest, InvalidOpName) { |
| CreateInvalidGraph(R"( |
| node { |
| name: 'a:b' op: 'Const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| )", |
| "Illegal op name"); |
| |
| CreateInvalidGraph(R"( |
| node { |
| name: 'a:0' op: 'Const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| )", |
| "Illegal op name"); |
| |
| CreateInvalidGraph(R"( |
| node { |
| name: '_a' op: 'Const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| )", |
| "Illegal op name"); |
| } |
| |
| TEST(SessionTest, InvalidOpInputName) { |
| CreateInvalidGraph(R"( |
| node { |
| name: 'a' op: 'const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| node { |
| name:'b' op:'MatMul' input:'a:first' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| "Illegal op input name"); |
| |
| CreateInvalidGraph(R"( |
| node { |
| name: 'a' op: 'const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| node { |
| name:'b' op:'MatMul' input:'_a' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| "Illegal op input name"); |
| |
| CreateInvalidGraph(R"( |
| node { |
| name: 'a' op: 'const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| node { |
| name:'b' op:'MatMul' input:'_a:0' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| "Illegal op input name"); |
| |
| CreateInvalidGraph(R"( |
| node { |
| name: 'a' op: 'const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| node { |
| name:'b' op:'MatMul' input:'a:01' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| "Illegal op input name"); |
| } |
| |
| TEST(SessionTest, ExtendValidation) { |
| GraphDef graph; |
| bool success = protobuf::TextFormat::ParseFromString(R"( |
| node { |
| name: 'a' op: 'Const' |
| attr { key: 'dtype' value { type: DT_FLOAT } } |
| attr { key: 'value' value { |
| tensor { dtype: DT_FLOAT tensor_shape { dim [{size:1}, {size:1}] } |
| float_val: [100] } |
| } } |
| } |
| )", |
| &graph); |
| // NOTE(mrry): CHECK not done inline to avoid a compilation error in |
| // open-source (due to a multi-line string in a macro argument). |
| ASSERT_TRUE(success); |
| |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| |
| std::unique_ptr<Session> session( |
| NewRemote(Options(cluster->targets()[0], 1))); |
| TF_CHECK_OK(session->Create(graph)); |
| |
| // 1. Fail with an unknown input name. |
| GraphDef extension; |
| success = protobuf::TextFormat::ParseFromString(R"( |
| node { |
| name:'b' op:'MatMul' input:'a:first' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| &extension); |
| ASSERT_TRUE(success); |
| |
| Status s = session->Extend(extension); |
| ASSERT_FALSE(s.ok()); |
| EXPECT_NE(s.error_message().find("Illegal op input name"), string::npos); |
| |
| // 2. Succeed with a valid node. |
| success = protobuf::TextFormat::ParseFromString(R"( |
| node { |
| name:'b' op:'MatMul' input:'a' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| &extension); |
| ASSERT_TRUE(success); |
| TF_CHECK_OK(session->Extend(extension)); |
| |
| // 2. Fail with a duplicate node. |
| success = protobuf::TextFormat::ParseFromString(R"( |
| node { |
| name:'b' op:'MatMul' input:'a' input:'a' |
| attr { key: 'T' value { type: DT_FLOAT } } |
| attr { key: 'transpose_a' value { b: false } } |
| attr { key: 'transpose_b' value { b: false } } |
| attr { key: '_kernel' value { s: 'eigen' } } |
| } |
| )", |
| &extension); |
| ASSERT_TRUE(success); |
| s = session->Extend(extension); |
| ASSERT_FALSE(s.ok()); |
| EXPECT_NE(s.error_message().find("'b', which was created by a previous call"), |
| string::npos); |
| } |
| // Tests that Create() with "operation_timeout_in_ms" set times out. |
| TEST(SessionTest, CreateTimeoutWithSessionOptions) { |
| // Creates a RemoteSession with "operation_timeout_in_ms" set to 100. |
| SessionOptions options = Options("example.org:2222", 1); |
| options.config.set_operation_timeout_in_ms(100); |
| std::unique_ptr<Session> session(NewRemote(options)); |
| |
| // Creates a long running op. |
| Graph graph(OpRegistry::Global()); |
| Node* b = test::graph::Constant(&graph, Tensor()); |
| test::graph::Delay(&graph, b, Microseconds(1000000)); |
| GraphDef gdef; |
| test::graph::ToGraphDef(&graph, &gdef); |
| Status status = session->Create(gdef); |
| // Either error is possible, depending on the environment. |
| EXPECT_TRUE(error::DEADLINE_EXCEEDED == status.code() || |
| error::UNAVAILABLE == status.code()); |
| } |
| |
| // Tests that Create() with "timeout_in_ms" in RunOptions set times out. |
| TEST(SessionTest, CreateTimeoutWithRunOptions) { |
| SessionOptions options = Options("example.org:2222", 1); |
| std::unique_ptr<Session> session(NewRemote(options)); |
| |
| // Creates a long running op. |
| Graph graph(OpRegistry::Global()); |
| Node* b = test::graph::Constant(&graph, Tensor()); |
| test::graph::Delay(&graph, b, Microseconds(1000000)); |
| GraphDef gdef; |
| test::graph::ToGraphDef(&graph, &gdef); |
| RunOptions run_options; |
| // Sets RunOption timeout_in_ms to 20. |
| run_options.set_timeout_in_ms(20); |
| Status status = session->Create(run_options, gdef); |
| // Either error is possible, depending on the environment. |
| EXPECT_TRUE(error::DEADLINE_EXCEEDED == status.code() || |
| error::UNAVAILABLE == status.code()); |
| } |
| |
| // Tests that Run() with "operation_timeout_in_ms" set times out. |
| TEST(SessionTest, RunTimeoutWithSessionOptions) { |
| // Creates a RemoteSession with "operation_timeout_in_ms" set to 100. |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 1, &cluster)); |
| SessionOptions options = Options(cluster->targets()[0], 100); |
| options.config.set_operation_timeout_in_ms(1); |
| std::unique_ptr<Session> session(NewRemote(options)); |
| |
| // Creates a long running op. |
| Graph graph(OpRegistry::Global()); |
| Node* b = test::graph::Constant(&graph, Tensor()); |
| Node* b_delay = test::graph::Delay(&graph, b, Microseconds(2000000)); |
| GraphDef gdef; |
| test::graph::ToGraphDef(&graph, &gdef); |
| RunOptions run_options; |
| TF_CHECK_OK(session->Create(run_options, gdef)); |
| |
| // Verifies that Run() times out, and the error code is DEADLINE_EXCEEDED. |
| std::vector<std::pair<string, Tensor>> inputs; |
| Status status = session->Run(inputs, {}, {b_delay->name()}, nullptr); |
| // TODO(sherrym): Due to potentially a GRPC bug, we sometimes get |
| // GRPC_CHTTP2_INTERNAL_ERROR which is mapped to error::INTERNAL. |
| EXPECT_TRUE(error::DEADLINE_EXCEEDED == status.code() || |
| error::INTERNAL == status.code()); |
| } |
| |
| // Tests that Run() with "timeout_in_ms" set times out. |
| TEST(SessionTest, RunTimeoutWithRunOptions) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 1, &cluster)); |
| SessionOptions options = Options(cluster->targets()[0], 1); |
| std::unique_ptr<Session> session(NewRemote(options)); |
| |
| // Creates a long running op. |
| Graph graph(OpRegistry::Global()); |
| Node* b = test::graph::Constant(&graph, Tensor()); |
| Node* b_delay = test::graph::Delay(&graph, b, Microseconds(1000000)); |
| GraphDef gdef; |
| test::graph::ToGraphDef(&graph, &gdef); |
| TF_CHECK_OK(session->Create(gdef)); |
| |
| // Verifies that Run() times out, and the error code is DEADLINE_EXCEEDED. |
| std::vector<std::pair<string, Tensor>> inputs; |
| RunOptions run_options; |
| run_options.set_timeout_in_ms(100); |
| Status status = session->Run(run_options, inputs, {}, {b_delay->name()}, |
| nullptr, nullptr); |
| // TODO(sherrym): Due to potentially a GRPC bug, we sometimes get |
| // GRPC_CHTTP2_INTERNAL_ERROR which is mapped to error::INTERNAL. |
| EXPECT_TRUE(error::DEADLINE_EXCEEDED == status.code() || |
| error::INTERNAL == status.code()); |
| } |
| |
| TEST(SessionTest, TestCompression) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 1, &cluster)); |
| SessionOptions options = Options(cluster->targets()[0], 100); |
| RPCOptions* rpc_options = options.config.mutable_rpc_options(); |
| rpc_options->set_compression_algorithm("deflate"); |
| rpc_options->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); |
| |
| std::unique_ptr<Session> session(NewRemote(options)); |
| |
| static const float kTestValue = 409.1934f; |
| Graph graph(OpRegistry::Global()); |
| Tensor tensor(DT_FLOAT, TensorShape({1, 1})); |
| tensor.flat<float>()(0) = kTestValue; |
| Node* b = test::graph::Constant(&graph, tensor); |
| GraphDef gdef; |
| graph.ToGraphDef(&gdef); |
| RunOptions run_options; |
| TF_CHECK_OK(session->Create(run_options, gdef)); |
| |
| std::vector<std::pair<string, Tensor>> inputs; |
| std::vector<Tensor> outputs; |
| TF_CHECK_OK(session->Run(inputs, {b->name()}, {}, &outputs)); |
| ASSERT_EQ(1, outputs.size()); |
| IsSingleFloatValue(outputs[0], kTestValue); |
| } |
| |
| TEST(GrpcSessionTest, ErrorAggregationTwoWorkersTwoErrors) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster)); |
| auto& devs = cluster->devices(); |
| const string& master = cluster->targets()[0]; |
| // worker 1 |
| const string w1_dev1 = devs[0].name(); |
| // worker 2 |
| const string w2_dev1 = devs[1].name(); |
| |
| LOG(INFO) << "master " << master << "w1_dev1 " << w1_dev1 << " w2_dev1 " |
| << w2_dev1; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| { |
| // Set up a graph to test the error handling when two workers both reports |
| // original errors. The expected behavior is: |
| // 1. The master issues a cancel operation upon receiving the first error. |
| // 2. The master may receive one or both errors depending on the timing |
| // of the cancel operation. |
| // |
| // Set up: |
| // Set up two workers. Both worker reports error the master without any |
| // delay. |
| Graph g(OpRegistry::Global()); |
| |
| // Worker 1. a_err runs on w1_dev1 and a_delay runs on w2_dev2. |
| auto a = test::graph::Constant(&g, Tensor(1)); |
| a->set_assigned_device_name(w1_dev1); |
| |
| auto a_err = test::graph::Error(&g, a, "fantasia1!"); |
| a_err->set_assigned_device_name(w1_dev1); |
| |
| fetches.push_back(a_err->name()); |
| |
| // Worker 2. b2 depends on a_err and detects the error via the rendezvous |
| // from worker 1. |
| auto b = test::graph::Constant(&g, Tensor(1)); |
| b->set_assigned_device_name(w2_dev1); |
| |
| auto b_err = test::graph::Error(&g, b, "fantasia2!"); |
| b_err->set_assigned_device_name(w2_dev1); |
| |
| fetches.push_back(b_err->name()); |
| |
| g.ToGraphDef(&gdef); |
| } |
| |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| std::vector<Tensor> outputs; |
| Status status = session->Run({}, fetches, {}, &outputs); |
| LOG(INFO) << status; |
| EXPECT_FALSE(status.ok()); |
| // Status contains the error either worker1 or worker2. |
| EXPECT_NE(status.ToString().find("fantasia"), string::npos); |
| EXPECT_EQ(status.code(), error::Code::INTERNAL); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| TEST(GrpcSessionTest, ErrorAggregationTwoWorkerRace) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(2, 0), 2, &cluster)); |
| auto& devs = cluster->devices(); |
| const string& master = cluster->targets()[0]; |
| // worker 1 |
| const string w1_dev1 = devs[0].name(); |
| const string w1_dev2 = devs[1].name(); |
| // worker 2 |
| const string w2_dev1 = devs[2].name(); |
| |
| LOG(INFO) << "master " << master << "w1_dev1 " << w1_dev1 << " w1_dev2 " |
| << w1_dev2 << " w2_dev1 " << w2_dev1; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| std::vector<string> targets; |
| { |
| // Set up a graph to test the error handling when a derived error is |
| // reported to master before the original error. The expected behavior is: |
| // 1. the original error will be received by the master and reported |
| // to the user as the error status. |
| // |
| // Setup: |
| // |
| // Worker 1 generates the original error but it delays for 5 seconds before |
| // reporting to master. Worker 2 detects the error (via Rendezvous) and |
| // reports to the master before worker 1. |
| Graph g(OpRegistry::Global()); |
| |
| // Worker 1. a_err runs on w1_dev1 and a_delay runs on w2_dev2. |
| auto a = test::graph::Constant(&g, Tensor(1)); |
| a->set_assigned_device_name(w1_dev1); |
| |
| auto a_err = test::graph::Error(&g, a, "fantasia!"); |
| a_err->set_assigned_device_name(w1_dev1); |
| |
| auto a_delay = test::graph::Delay(&g, a, Microseconds(5000000)); |
| a_delay->set_assigned_device_name(w1_dev2); |
| |
| // We need to put a_delay in targets instead of fetches. Putting |
| // a_delay in fetches will cause the executor for w1_dev2 to report failure |
| // status as well as the Rendezvous is already poisoned by the a_err op in |
| // w1_dev1. |
| targets.push_back(a_delay->name()); |
| fetches.push_back(a_err->name()); |
| |
| // Worker 2. b2 depends on a_err and detects the error via the rendezvous |
| // from worker 1. |
| auto b = test::graph::Constant(&g, Tensor(3)); |
| b->set_assigned_device_name(w2_dev1); |
| auto b2 = test::graph::Add(&g, b, a_err); |
| b2->set_assigned_device_name(w2_dev1); |
| fetches.push_back(b2->name()); |
| |
| g.ToGraphDef(&gdef); |
| } |
| |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| std::vector<Tensor> outputs; |
| Status status = session->Run({}, fetches, targets, &outputs); |
| LOG(INFO) << status; |
| EXPECT_FALSE(status.ok()); |
| // assert status contains the root error |
| EXPECT_NE(status.ToString().find("fantasia!"), string::npos); |
| // assert status does not contain cancelled error. |
| EXPECT_EQ(status.ToString().find("Cancelled"), string::npos); |
| EXPECT_EQ(status.code(), error::Code::INTERNAL); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| TEST(GrpcSessionTest, ErrorAggregationThreeWorkerRaceVariant1) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(2, 0), 3, &cluster)); |
| auto& devs = cluster->devices(); |
| const string& master = cluster->targets()[0]; |
| // worker 1 |
| const string w1_dev1 = devs[0].name(); |
| const string w1_dev2 = devs[1].name(); |
| // worker 2 |
| const string w2_dev1 = devs[2].name(); |
| // worker 3 |
| const string w3_dev1 = devs[4].name(); |
| |
| LOG(INFO) << "master " << master << "w1_dev1 " << w1_dev1 << " w1_dev2 " |
| << w1_dev2 << " w2_dev1 " << w2_dev1 << " w3_dev1 " << w3_dev1; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| std::vector<string> targets; |
| { |
| // Set up a graph to test the error handling when a derived error is |
| // reported to master before the original error and a third worker is |
| // canceled by the master. The expect behavior is that |
| // 1. the original error will be received by the master, |
| // 2. the canceled error will be treated as a derived error. |
| // |
| // Setup: |
| // |
| // Worker 1 generates the original error but it delays for 5 seconds before |
| // reporting to master. Worker 2 detects the error (via Rendezvous) and |
| // reports to the master before worker 1. Worker 3 runs a delay op and will |
| // be canceled by the master. |
| Graph g(OpRegistry::Global()); |
| |
| // Worker 1. a_err runs on w1_dev1 and a_delay runs on w2_dev2. |
| auto a = test::graph::Constant(&g, Tensor(1)); |
| a->set_assigned_device_name(w1_dev1); |
| |
| auto a_err = test::graph::Error(&g, a, "fantasia!"); |
| a_err->set_assigned_device_name(w1_dev1); |
| |
| auto a_delay = test::graph::Delay(&g, a, Microseconds(5000000)); |
| a_delay->set_assigned_device_name(w1_dev2); |
| |
| // Putting a_delay in fetches will cause the executor for w1_dev2 to report |
| // failure status as well due to the use of SendOp, as the Rendezvous is |
| // already poisoned by the a_err op in w1_dev1. |
| targets.push_back(a_delay->name()); |
| fetches.push_back(a_err->name()); |
| |
| // Worker 2. b2 depends on a_err and detects the error via the rendezvous |
| // from worker 1. |
| auto b = test::graph::Constant(&g, Tensor(3)); |
| b->set_assigned_device_name(w2_dev1); |
| auto b2 = test::graph::Add(&g, b, a_err); |
| b2->set_assigned_device_name(w2_dev1); |
| fetches.push_back(b2->name()); |
| |
| // Worker 3. Runs only a delay op. This worker will be cancelled by master |
| // when the master receives the root error from Worker 1. |
| auto c = test::graph::Constant(&g, Tensor(3)); |
| c->set_assigned_device_name(w3_dev1); |
| auto c_delay = test::graph::Delay(&g, c, Microseconds(4000000)); |
| c_delay->set_assigned_device_name(w3_dev1); |
| |
| // Put c_delay in targets so that an implicit SendOp for c_delay to |
| // worker 1 is not generated. |
| targets.push_back(c_delay->name()); |
| |
| g.ToGraphDef(&gdef); |
| } |
| |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| std::vector<Tensor> outputs; |
| Status status = session->Run({}, fetches, targets, &outputs); |
| LOG(INFO) << status; |
| EXPECT_FALSE(status.ok()); |
| // assert status contains the root error |
| EXPECT_NE(status.ToString().find("fantasia!"), string::npos); |
| // assert status does not contain cancelled or aborted error. |
| EXPECT_EQ(status.ToString().find("Cancelled"), string::npos); |
| EXPECT_EQ(status.ToString().find("Aborted"), string::npos); |
| EXPECT_EQ(status.code(), error::Code::INTERNAL); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| TEST(GrpcSessionTest, ErrorAggregationThreeWorkerRaceVariant2) { |
| std::unique_ptr<test::TestCluster> cluster; |
| TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(2, 0), 3, &cluster)); |
| auto& devs = cluster->devices(); |
| const string& master = cluster->targets()[0]; |
| // worker 1 |
| const string w1_dev1 = devs[0].name(); |
| const string w1_dev2 = devs[1].name(); |
| // worker 2 |
| const string w2_dev1 = devs[2].name(); |
| // worker 3 |
| const string w3_dev1 = devs[4].name(); |
| |
| LOG(INFO) << "master " << master << "w1_dev1 " << w1_dev1 << " w1_dev2 " |
| << w1_dev2 << " w2_dev1 " << w2_dev1 << " w3_dev1 " << w3_dev1; |
| GraphDef gdef; |
| std::vector<string> fetches; |
| std::vector<string> targets; |
| { |
| // Set up a graph to test the error handling when a derived error is |
| // reported to master before the original error and a third worker is |
| // aborted from an implicit SendOp. The expect behavior is that |
| // 1. the original error will be received by the master, |
| // 2. the aborted error will be treated as a derived error. |
| // |
| // Setup: |
| // |
| // Worker 1 generates the original error but it delays for 5 seconds before |
| // reporting to master. Worker 2 detects the error (via Rendezvous) and |
| // reports to the master before worker 1. Worker 3 runs a delay op and an |
| // implicit SendOp (for sending tensor c_delay to Worker 1) and will be |
| // aborted by worker 1. |
| Graph g(OpRegistry::Global()); |
| |
| // Worker 1. a_err runs on w1_dev1 and a_delay runs on w2_dev2. |
| auto a = test::graph::Constant(&g, Tensor(1)); |
| a->set_assigned_device_name(w1_dev1); |
| |
| auto a_err = test::graph::Error(&g, a, "fantasia!"); |
| a_err->set_assigned_device_name(w1_dev1); |
| |
| auto a_delay = test::graph::Delay(&g, a, Microseconds(5000000)); |
| a_delay->set_assigned_device_name(w1_dev2); |
| |
| // Putting a_delay in fetches will cause the executor for w1_dev2 to report |
| // failure status as well due to the use of SendOp, as the Rendezvous is |
| // already poisoned by the a_err op in w1_dev1. |
| targets.push_back(a_delay->name()); |
| fetches.push_back(a_err->name()); |
| |
| // Worker 2. b2 depends on a_err and detects the error via the rendezvous |
| // from worker 1. |
| auto b = test::graph::Constant(&g, Tensor(3)); |
| b->set_assigned_device_name(w2_dev1); |
| auto b2 = test::graph::Add(&g, b, a_err); |
| b2->set_assigned_device_name(w2_dev1); |
| fetches.push_back(b2->name()); |
| |
| // Worker 3. Runs only a delay op. This worker will be cancelled by master |
| // when the master receives the root error from Worker 1. |
| auto c = test::graph::Constant(&g, Tensor(3)); |
| c->set_assigned_device_name(w3_dev1); |
| auto c_delay = test::graph::Delay(&g, c, Microseconds(4000000)); |
| c_delay->set_assigned_device_name(w3_dev1); |
| |
| // Put c_delay in fetches so that an implicit SendOp for c_delay to worker 1 |
| // is generated. |
| fetches.push_back(c_delay->name()); |
| |
| g.ToGraphDef(&gdef); |
| } |
| |
| std::unique_ptr<Session> session(NewRemote(Options(master, 1))); |
| ASSERT_TRUE(session != nullptr); |
| |
| TF_CHECK_OK(session->Create(gdef)); |
| { |
| std::vector<Tensor> outputs; |
| Status status = session->Run({}, fetches, targets, &outputs); |
| LOG(INFO) << status; |
| EXPECT_FALSE(status.ok()); |
| // assert status contains the root error |
| EXPECT_NE(status.ToString().find("fantasia!"), string::npos); |
| // assert status does not contain cancelled or aborted error. |
| EXPECT_EQ(status.ToString().find("Cancelled"), string::npos); |
| EXPECT_EQ(status.ToString().find("Aborted"), string::npos); |
| EXPECT_EQ(status.code(), error::Code::INTERNAL); |
| } |
| // session->Close() shall clean up all states related to the session-> |
| // E.g., deregisters subgraph with workers, etc. |
| TF_CHECK_OK(session->Close()); |
| |
| // Sleep a bit so that most of asynchronous works finishes before |
| // the test process finishes. |
| Env::Default()->SleepForMicroseconds(2000000); |
| } |
| |
| } // namespace tensorflow |