blob: 4b53a7efaa265fd624083310d080e947242fa259 [file] [log] [blame]
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/common_runtime/process_function_library_runtime.h"
#include <memory>
#include <vector>
#include "tensorflow/core/common_runtime/device_factory.h"
#include "tensorflow/core/common_runtime/function_testlib.h"
#include "tensorflow/core/common_runtime/rendezvous_mgr.h"
#include "tensorflow/core/framework/function.h"
#include "tensorflow/core/framework/function_testlib.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/resource_var.h"
#include "tensorflow/core/framework/tensor_testutil.h"
#include "tensorflow/core/framework/type_index.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/core/threadpool.h"
#include "tensorflow/core/lib/strings/str_util.h"
#include "tensorflow/core/platform/test.h"
#include "tensorflow/core/protobuf/config.pb.h"
#include "tensorflow/core/public/session_options.h"
#include "tensorflow/core/public/version.h"
#if GOOGLE_CUDA
#include "third_party/gpus/cuda/include/cuda.h"
#include "third_party/gpus/cuda/include/cuda_runtime_api.h"
#elif TENSORFLOW_USE_ROCM
#include "rocm/include/hip/hip_runtime.h"
#endif // GOOGLE_CUDA
namespace tensorflow {
namespace {
class TestClusterFLR : public DistributedFunctionLibraryRuntime {
public:
explicit TestClusterFLR(DeviceMgr* device_mgr) : device_mgr_(device_mgr) {}
Status Instantiate(const string& function_name,
const FunctionLibraryDefinition& lib_def, AttrSlice attrs,
const FunctionLibraryRuntime::InstantiateOptions& options,
FunctionLibraryRuntime::LocalHandle* handle) override {
mutex_lock l(mu_);
*handle = next_handle_;
next_handle_++;
return Status::OK();
}
void Run(const FunctionLibraryRuntime::Options& opts,
FunctionLibraryRuntime::LocalHandle handle,
gtl::ArraySlice<Tensor> args, std::vector<Tensor>* rets,
FunctionLibraryRuntime::DoneCallback done) override {}
void CleanUp(uint64 step_id, FunctionLibraryRuntime::LocalHandle handle,
FunctionLibraryRuntime::DoneCallback done) override {}
DeviceMgr* remote_device_mgr() const override { return device_mgr_; }
private:
mutex mu_;
int next_handle_ GUARDED_BY(mu_) = 0;
DeviceMgr* device_mgr_;
};
SessionMetadata GenerateSessionMetadata() {
SessionMetadata session_metadata;
session_metadata.set_name("name");
session_metadata.set_version(42);
return session_metadata;
}
// TODO(b/128707168): Tests requiring a GPU device are currently always skipped
// because the check for whether a GPU device is present happens before the GPU
// device is set up.
class ProcessFunctionLibraryRuntimeTest : public ::testing::Test {
public:
ProcessFunctionLibraryRuntimeTest() {
SessionOptions options;
auto* device_count = options.config.mutable_device_count();
device_count->insert({"CPU", 2});
std::vector<std::unique_ptr<Device>> devices;
TF_CHECK_OK(DeviceFactory::AddDevices(options, "/job:a/replica:0/task:0",
&devices));
device_mgr_ = absl::make_unique<StaticDeviceMgr>(std::move(devices));
TF_CHECK_OK(device_mgr_->LookupDevice(
"/job:a/replica:0/task:0/device:CPU:0", &device0_));
TF_CHECK_OK(device_mgr_->LookupDevice(
"/job:a/replica:0/task:0/device:CPU:1", &device1_));
// If no GPU is available, gpu_device_ will remain nullptr.
Status status = device_mgr_->LookupDevice(
"/job:a/replica:0/task:0/device:GPU:0", &gpu_device_);
if (!status.ok()) {
CHECK_EQ(nullptr, gpu_device_);
}
}
~ProcessFunctionLibraryRuntimeTest() override {
if (rendezvous_ != nullptr) {
rendezvous_->Unref();
}
}
void Init(const std::vector<FunctionDef>& flib,
const SessionMetadata* session_metadata = nullptr) {
FunctionDefLibrary proto;
for (const auto& fdef : flib) *(proto.add_function()) = fdef;
lib_def_.reset(new FunctionLibraryDefinition(OpRegistry::Global(), proto));
OptimizerOptions opts;
cluster_flr_.reset(new TestClusterFLR(device_mgr_.get()));
proc_flr_.reset(new ProcessFunctionLibraryRuntime(
device_mgr_.get(), Env::Default(), /*config=*/nullptr,
TF_GRAPH_DEF_VERSION, lib_def_.get(), opts, nullptr, cluster_flr_.get(),
nullptr, session_metadata));
rendezvous_ = new IntraProcessRendezvous(device_mgr_.get());
}
Status Instantiate(
const string& name, test::function::Attrs attrs,
const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
FunctionLibraryRuntime::Handle* handle) {
return proc_flr_->Instantiate(name, attrs, instantiate_opts, handle);
}
Tensor GPUToCPU(const Tensor& device_tensor) {
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
CHECK(gpu_device_);
CHECK(gpu_device_->tensorflow_gpu_device_info() != nullptr);
DeviceContext* device_context =
gpu_device_->tensorflow_gpu_device_info()->default_context;
Notification n;
Status status;
Tensor cpu_tensor(device_tensor.dtype(), device_tensor.shape());
device_context->CopyDeviceTensorToCPU(&device_tensor, "", gpu_device_,
&cpu_tensor,
[&n, &status](const Status& s) {
status = s;
n.Notify();
});
n.WaitForNotification();
CHECK(status.ok());
return cpu_tensor;
#else
CHECK(false);
#endif // GOOGLE_CUDA
}
Tensor CPUToGPU(const Tensor& cpu_tensor) {
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
CHECK(gpu_device_);
CHECK(gpu_device_->tensorflow_gpu_device_info() != nullptr);
DeviceContext* device_context =
gpu_device_->tensorflow_gpu_device_info()->default_context;
Notification n;
Status status;
Tensor device_tensor(gpu_device_->GetAllocator({}), cpu_tensor.dtype(),
cpu_tensor.shape(), {});
device_context->CopyCPUTensorToDevice(&cpu_tensor, gpu_device_,
&device_tensor,
[&n, &status](const Status& s) {
status = s;
n.Notify();
});
n.WaitForNotification();
CHECK(status.ok());
return device_tensor;
#else
CHECK(false);
#endif // GOOGLE_CUDA
}
Status RunWithRuntime(
const string& name, FunctionLibraryRuntime::Options opts,
test::function::Attrs attrs,
const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
const std::vector<Tensor>& args, std::vector<Tensor*> rets,
ProcessFunctionLibraryRuntime* pflr) {
FunctionLibraryRuntime::Handle handle;
Status status = pflr->Instantiate(name, attrs, instantiate_opts, &handle);
if (!status.ok()) {
return status;
}
std::atomic<int32> call_count(0);
std::function<void(std::function<void()>)> runner =
[&call_count](std::function<void()> fn) {
++call_count;
test::function::FunctionTestSchedClosure(fn);
};
Notification done;
opts.runner = &runner;
std::vector<Tensor> out;
pflr->Run(opts, handle, args, &out, [&status, &done](const Status& s) {
status = s;
done.Notify();
});
done.WaitForNotification();
if (!status.ok()) {
return status;
}
CHECK_EQ(rets.size(), out.size());
for (size_t i = 0; i < rets.size(); ++i) {
*rets[i] = out[i];
}
EXPECT_GE(call_count, 1); // Test runner is used.
// Release the handle and then try running the function. It shouldn't
// succeed.
status = pflr->ReleaseHandle(handle);
if (!status.ok()) {
return status;
}
Notification done2;
pflr->Run(opts, handle, args, &out, [&status, &done2](const Status& s) {
status = s;
done2.Notify();
});
done2.WaitForNotification();
EXPECT_TRUE(errors::IsNotFound(status)) << "Actual status: " << status;
EXPECT_TRUE(absl::StrContains(status.error_message(), "not found."));
return Status::OK();
}
Status Run(const string& name, FunctionLibraryRuntime::Options opts,
test::function::Attrs attrs,
const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
const std::vector<Tensor>& args, std::vector<Tensor*> rets) {
return RunWithRuntime(name, opts, attrs, instantiate_opts, args, rets,
proc_flr_.get());
}
Status RunInstantiated(FunctionLibraryRuntime::Handle handle,
FunctionLibraryRuntime::Options opts,
const std::vector<Tensor>& args,
std::vector<Tensor*> rets) {
std::atomic<int32> call_count(0);
std::function<void(std::function<void()>)> runner =
[&call_count](std::function<void()> fn) {
++call_count;
test::function::FunctionTestSchedClosure(fn);
};
opts.rendezvous = rendezvous_;
opts.runner = &runner;
Status status;
Notification done;
std::vector<Tensor> out;
proc_flr_->Run(opts, handle, args, &out, [&status, &done](const Status& s) {
status = s;
done.Notify();
});
done.WaitForNotification();
if (!status.ok()) {
return status;
}
CHECK_EQ(rets.size(), out.size());
for (size_t i = 0; i < rets.size(); ++i) {
*rets[i] = out[i];
}
EXPECT_GE(call_count, 1); // Test runner is used.
return Status::OK();
}
std::unique_ptr<DeviceMgr> device_mgr_;
Device* device0_ = nullptr; // Not owned. (Owned by device_mgr_.)
Device* device1_ = nullptr; // Not owned. (Owned by device_mgr_.)
// Remains as nullptr if no GPU is available.
Device* gpu_device_ = nullptr; // Not owned. (Owned by device_mgr_.)
std::unique_ptr<FunctionLibraryDefinition> lib_def_;
std::unique_ptr<TestClusterFLR> cluster_flr_;
std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr_;
IntraProcessRendezvous* rendezvous_ = nullptr;
};
TEST_F(ProcessFunctionLibraryRuntimeTest, GetFLRNull) {
FunctionDefLibrary proto;
std::unique_ptr<FunctionLibraryDefinition> lib_def(
new FunctionLibraryDefinition(OpRegistry::Global(), proto));
OptimizerOptions opts;
std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr(
new ProcessFunctionLibraryRuntime(
nullptr /* device_mgr */, Env::Default(), /*config=*/nullptr,
TF_GRAPH_DEF_VERSION, lib_def.get(), opts));
FunctionLibraryRuntime* flr =
proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice);
EXPECT_NE(flr, nullptr);
}
TEST_F(ProcessFunctionLibraryRuntimeTest, Basic) {
Init({});
FunctionLibraryRuntime* flr =
proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
EXPECT_NE(flr, nullptr);
EXPECT_EQ(flr->device(), device0_);
flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/device:CPU:0");
EXPECT_NE(flr, nullptr);
EXPECT_EQ(flr->device(), device0_);
flr = proc_flr_->GetFLR("/device:CPU:0");
EXPECT_NE(flr, nullptr);
EXPECT_EQ(flr->device(), device0_);
flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:1");
EXPECT_NE(flr, nullptr);
EXPECT_EQ(flr->device(), device1_);
flr = proc_flr_->GetFLR("abc");
EXPECT_EQ(flr, nullptr);
}
TEST_F(ProcessFunctionLibraryRuntimeTest, GetDeviceIncarnation) {
Init({});
int64 incarnation;
TF_EXPECT_OK(proc_flr_->GetDeviceIncarnation("/job:a/replica:0/task:0/cpu:1",
&incarnation));
// Incarnation is a random number other than 0.
EXPECT_NE(incarnation, 0);
Status s = proc_flr_->GetDeviceIncarnation("/job:a/replica:0/task:0/cpu:2",
&incarnation);
EXPECT_EQ(s.code(), error::INVALID_ARGUMENT);
}
TEST_F(ProcessFunctionLibraryRuntimeTest, SingleCall) {
Init({test::function::XTimesTwo()});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
auto x = test::AsTensor<float>({1, 2, 3, 4});
Tensor y;
TF_CHECK_OK(
Run("XTimesTwo", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
test::ExpectTensorEqual<float>(y, test::AsTensor<float>({2, 4, 6, 8}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, SingleCallFindDevice) {
Init({test::function::FindDevice()});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
Tensor y;
TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
test::ExpectTensorEqual<tstring>(
y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:0"},
TensorShape({})));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsSameDeviceXTimes) {
Init({test::function::XTimesTwo(), test::function::XTimesFour()});
auto x = test::AsTensor<float>({1, 2, 3, 4});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
Tensor y;
TF_CHECK_OK(
Run("XTimesTwo", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
test::ExpectTensorEqual<float>(y, test::AsTensor<float>({2, 4, 6, 8}));
TF_CHECK_OK(
Run("XTimesFour", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
test::ExpectTensorEqual<float>(y, test::AsTensor<float>({4, 8, 12, 16}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsSameDeviceFindDevice) {
Init({test::function::FindDevice()});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:1";
Tensor y;
TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
test::ExpectTensorEqual<tstring>(
y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
TensorShape({})));
TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
test::ExpectTensorEqual<tstring>(
y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
TensorShape({})));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsDiffDeviceFindDevice) {
Init({test::function::FindDevice()});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
Tensor y;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts_0;
instantiate_opts_0.target = "/job:a/replica:0/task:0/device:CPU:0";
TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts_0, {}, {&y}));
test::ExpectTensorEqual<tstring>(
y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:0"},
TensorShape({})));
FunctionLibraryRuntime::InstantiateOptions instantiate_opts_1;
instantiate_opts_1.target = "/job:a/replica:0/task:0/device:CPU:1";
TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts_1, {}, {&y}));
test::ExpectTensorEqual<tstring>(
y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
TensorShape({})));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, ClusterFLRSerialTest) {
Init({test::function::FindDevice()});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:b/replica:0/task:0/device:CPU:0";
FunctionLibraryRuntime::Handle h;
TF_CHECK_OK(Instantiate("FindDevice",
{{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
instantiate_opts, &h));
EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
"/job:b/replica:0/task:0/device:CPU:0", h));
TF_CHECK_OK(Instantiate("FindDevice",
{{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
instantiate_opts, &h));
EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
"/job:b/replica:0/task:0/device:CPU:0", h));
instantiate_opts.target = "/job:c/replica:0/task:0/device:CPU:0";
TF_CHECK_OK(Instantiate("FindDevice",
{{"_target", "/job:c/replica:0/task:0/device:CPU:0"}},
instantiate_opts, &h));
EXPECT_EQ(1, proc_flr_->GetHandleOnDevice(
"/job:c/replica:0/task:0/device:CPU:0", h));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, ClusterFLRParallelTest) {
Init({test::function::FindDevice()});
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:b/replica:0/task:0/device:CPU:0";
thread::ThreadPool* tp = new thread::ThreadPool(Env::Default(), "test", 4);
auto fn = [this, &instantiate_opts]() {
FunctionLibraryRuntime::Handle h;
TF_CHECK_OK(Instantiate(
"FindDevice", {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
instantiate_opts, &h));
EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
"/job:b/replica:0/task:0/device:CPU:0", h));
};
for (int i = 0; i < 100; ++i) {
tp->Schedule(fn);
}
delete tp;
}
bool IsCUDATensor(const Tensor& t) {
#ifdef GOOGLE_CUDA
cudaPointerAttributes attributes;
cudaError_t err =
cudaPointerGetAttributes(&attributes, t.tensor_data().data());
if (err == cudaErrorInvalidValue) return false;
CHECK_EQ(cudaSuccess, err) << cudaGetErrorString(err);
return (attributes.memoryType == cudaMemoryTypeDevice);
#elif TENSORFLOW_USE_ROCM
hipPointerAttribute_t attributes;
hipError_t err = hipPointerGetAttributes(&attributes, t.tensor_data().data());
if (err == hipErrorInvalidValue) return false;
CHECK_EQ(hipSuccess, err) << hipGetErrorString(err);
return (attributes.memoryType == hipMemoryTypeDevice);
#else
CHECK(false)
<< "IsCUDATensor should not be called when CUDA is not available";
#endif // GOOGLE_CUDA
}
void TestTwoDeviceMult(
ProcessFunctionLibraryRuntimeTest* fixture,
const FunctionLibraryRuntime::InstantiateOptions& inst_opts,
const string& error = "") {
fixture->Init({test::function::TwoDeviceMult()});
FunctionLibraryRuntime::Options opts;
opts.rendezvous = fixture->rendezvous_;
auto x = test::AsTensor<float>({1, 2, 3});
Tensor y_cpu;
Tensor y_gpu;
Status status = fixture->Run("TwoDeviceMult", opts, {{"T", DT_FLOAT}},
inst_opts, {x}, {&y_cpu, &y_gpu});
if (!error.empty()) {
EXPECT_TRUE(errors::IsInvalidArgument(status))
<< "Actual status: " << status;
EXPECT_TRUE(absl::StrContains(status.error_message(), error))
<< "Actual error message: " << status.error_message();
return;
}
EXPECT_TRUE(status.ok()) << "Actual status: " << status;
EXPECT_FALSE(IsCUDATensor(y_cpu));
test::ExpectTensorEqual<float>(y_cpu, test::AsTensor<float>({2, 4, 6}));
EXPECT_TRUE(IsCUDATensor(y_gpu));
Tensor y_gpu_on_cpu = fixture->GPUToCPU(y_gpu);
test::ExpectTensorEqual<float>(y_gpu_on_cpu,
test::AsTensor<float>({3, 6, 9}));
}
void TestTwoDeviceInputOutput(
ProcessFunctionLibraryRuntimeTest* fixture,
const FunctionLibraryRuntime::InstantiateOptions& inst_opts) {
if (fixture->gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
fixture->Init({test::function::TwoDeviceInputOutput()});
FunctionLibraryRuntime::Options opts;
opts.rendezvous = fixture->rendezvous_;
Tensor x1 = test::AsTensor<float>({1, 2});
if (absl::StrContains(inst_opts.input_devices[0], "GPU")) {
x1 = fixture->CPUToGPU(x1);
}
Tensor x2 = test::AsTensor<float>({10, 20});
if (absl::StrContains(inst_opts.input_devices[1], "GPU")) {
x2 = fixture->CPUToGPU(x2);
}
Tensor y1;
Tensor y2;
TF_CHECK_OK(fixture->Run("TwoDeviceInputOutput", opts, {{"T", DT_FLOAT}},
inst_opts, {x1, x2}, {&y1, &y2}));
if (absl::StrContains(inst_opts.output_devices[0], "GPU")) {
EXPECT_TRUE(IsCUDATensor(y1));
y1 = fixture->GPUToCPU(y1);
} else {
EXPECT_FALSE(IsCUDATensor(y1));
}
test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({2, 4}));
if (absl::StrContains(inst_opts.output_devices[1], "GPU")) {
EXPECT_TRUE(IsCUDATensor(y2));
y2 = fixture->GPUToCPU(y2);
} else {
EXPECT_FALSE(IsCUDATensor(y2));
}
test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({30, 60}));
}
std::vector<string> CompleteDevices(const std::vector<string>& v) {
std::vector<string> result;
result.reserve(v.size());
for (const string& s : v) {
result.push_back(strings::StrCat("/job:a/replica:0/task:0/device:", s));
}
return result;
}
FunctionLibraryRuntime::InstantiateOptions MakeOptions(
const string& target, const std::vector<string>& input_devices,
const std::vector<string>& output_devices) {
FunctionLibraryRuntime::InstantiateOptions inst_opts;
inst_opts.target = target;
inst_opts.input_devices = CompleteDevices(input_devices);
inst_opts.output_devices = CompleteDevices(output_devices);
inst_opts.is_multi_device_function = true;
return inst_opts;
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ExplicitOutputDevice) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0"}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_InferredOutputDevice) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0"}, {}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenNoInputDevices) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
TestTwoDeviceMult(this, MakeOptions("CPU:0", {}, {}),
"input_devices must have the same length");
}
TEST_F(ProcessFunctionLibraryRuntimeTest,
MultiDevice_ErrorWhenTooManyInputDevices) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0", "CPU:1"}, {}),
"input_devices must have the same length");
}
TEST_F(ProcessFunctionLibraryRuntimeTest,
MultiDevice_ErrorWhenTooManyOutputDevices) {
TestTwoDeviceMult(
this, MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0", "CPU:1"}),
"output_devices must either be empty or have the same length");
}
TEST_F(ProcessFunctionLibraryRuntimeTest,
MultiDevice_ErrorWhenBadTargetDevice) {
TestTwoDeviceMult(
this, MakeOptions("GPU:11", {"CPU:0"}, {"CPU:0", "GPU:0"}),
"Cannot instantiate multi-device function with target device GPU:11");
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenListInput) {
const FunctionDef& def = test::function::FuncWithListInput();
Init({def});
FunctionLibraryRuntime::Handle handle;
Status status = proc_flr_->Instantiate(
"FuncWithListInput", test::function::Attrs({{"T", DT_FLOAT}, {"N", 1}}),
MakeOptions("CPU:0", {"CPU:0"}, {}), &handle);
ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
ASSERT_TRUE(absl::StrContains(
status.error_message(),
"FuncWithListInput has an input named \"x1\" that is a list of tensors"))
<< "Actual error message: " << status.error_message();
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenListOutput) {
const FunctionDef& def = test::function::FuncWithListOutput();
Init({def});
FunctionLibraryRuntime::Handle handle;
Status status = proc_flr_->Instantiate(
"FuncWithListOutput", test::function::Attrs({{"T", DT_FLOAT}, {"N", 1}}),
MakeOptions("CPU:0", {}, {"CPU:0"}), &handle);
ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
ASSERT_TRUE(absl::StrContains(
status.error_message(),
"FuncWithListOutput has an output named \"y\" that is a list of tensors"))
<< "Actual error message: " << status.error_message();
}
TEST_F(ProcessFunctionLibraryRuntimeTest,
MultiDevice_ExplicitMultiInputOutput) {
TestTwoDeviceInputOutput(
this, MakeOptions("CPU:0", {"CPU:0", "GPU:0"}, {"CPU:0", "GPU:0"}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipInputs) {
TestTwoDeviceInputOutput(
this, MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"CPU:0", "GPU:0"}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipOutputs) {
TestTwoDeviceInputOutput(
this, MakeOptions("CPU:0", {"CPU:0", "GPU:0"}, {"GPU:0", "CPU:0"}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipBoth) {
TestTwoDeviceInputOutput(
this, MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"GPU:0", "CPU:0"}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_EmptyBodySwap) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
FunctionLibraryRuntime::InstantiateOptions inst_opts =
MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"CPU:0", "GPU:0"});
Init({test::function::EmptyBodySwap()});
Tensor x1 = CPUToGPU(test::AsTensor<float>({1, 2}));
Tensor x2 = test::AsTensor<float>({10, 20});
Tensor y1;
Tensor y2;
TF_CHECK_OK(Run("EmptyBodySwap", {}, {{"T", DT_FLOAT}}, inst_opts, {x1, x2},
{&y1, &y2}));
EXPECT_FALSE(IsCUDATensor(y1));
test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({10, 20}));
EXPECT_TRUE(IsCUDATensor(y2));
y2 = GPUToCPU(y2);
test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({1, 2}));
}
Tensor GetResourceHandle(const string& var_name, const string& container,
const string& device_name) {
ResourceHandle handle;
handle.set_device(device_name);
handle.set_container(container);
handle.set_name(var_name);
handle.set_hash_code(MakeTypeIndex<Var>().hash_code());
handle.set_maybe_type_name(MakeTypeIndex<Var>().name());
Tensor tensor(DT_RESOURCE, TensorShape({}));
tensor.scalar<ResourceHandle>()() = handle;
return tensor;
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ResourceOutput_GPU) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
FunctionLibraryRuntime::InstantiateOptions inst_opts =
MakeOptions("CPU:0", {"GPU:0", "GPU:0"}, {"GPU:0", "GPU:0"});
Init({test::function::ResourceOutput(),
test::function::ReadResourceVariable()});
// Make resource var
Tensor resource_value = CPUToGPU(test::AsTensor<float>({10, 20}));
Var* resource = new Var(DT_FLOAT);
*resource->tensor() = resource_value;
resource->is_initialized = true;
ResourceMgr* mgr = gpu_device_->resource_manager();
Status status = mgr->Create(mgr->default_container(), "my_gpu_var", resource);
ASSERT_TRUE(status.ok()) << status.error_message();
// Run the function taking a resource and outputing it
FunctionLibraryRuntime::Options opts;
opts.rendezvous = rendezvous_;
Tensor x1 = CPUToGPU(test::AsTensor<float>({1, 2}));
Tensor x2 = GetResourceHandle("my_gpu_var", mgr->default_container(),
"/job:a/replica:0/task:0/device:GPU:0");
Tensor returned_handle;
Tensor y2;
TF_CHECK_OK(Run("ResourceOutput", opts, {{"T", DT_FLOAT}}, inst_opts,
{x1, x2}, {&returned_handle, &y2}));
EXPECT_FALSE(IsCUDATensor(returned_handle));
EXPECT_TRUE(IsCUDATensor(y2));
y2 = GPUToCPU(y2);
test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({2, 4}));
// Read the variable using the handle returned from previous function to
// make sure the handle and read value is on the right device.
inst_opts = MakeOptions("GPU:0", {"GPU:0"}, {"GPU:0"});
Tensor read_resource;
TF_CHECK_OK(Run("ReadResourceVariable", opts, {{"T", DT_FLOAT}}, inst_opts,
{returned_handle}, {&read_resource}));
EXPECT_TRUE(IsCUDATensor(read_resource));
read_resource = GPUToCPU(read_resource);
test::ExpectTensorEqual<float>(read_resource,
test::AsTensor<float>({10, 20}));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_PlacerError) {
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
// ResourceOutput forwards second input to first output. Both are resources.
// Placer should not be able to place this graph because we ask it to place
// second input on GPU but first output to CPU.
FunctionLibraryRuntime::InstantiateOptions inst_opts =
MakeOptions("CPU:0", {"GPU:0", "GPU:0"}, {"CPU:0", "GPU:0"});
Init({test::function::ResourceOutput(),
test::function::ReadResourceVariable()});
FunctionLibraryRuntime::Handle handle;
Status status = proc_flr_->Instantiate(
"ResourceOutput", test::function::Attrs({{"T", DT_FLOAT}}), inst_opts,
&handle);
ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
ASSERT_TRUE(absl::StrContains(status.error_message(), "Cannot place"));
}
REGISTER_OP("BrokenOp")
.Input("in: T")
.Output("out: T")
.Attr("T: type")
.SetShapeFn(shape_inference::UnknownShape);
class BrokenOp : public OpKernel {
public:
explicit BrokenOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
ctx->SetStatus(errors::Internal("I am broken"));
}
void Compute(OpKernelContext* ctx) override {
ctx->SetStatus(errors::Internal("I am broken"));
}
};
REGISTER_KERNEL_BUILDER(Name("BrokenOp").Device(DEVICE_CPU), BrokenOp);
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_CreateKernelsEagerly) {
auto T = DT_INT32;
// The expected sequence of outputs from this function is [6, 4, 0, 1, ...].
FunctionDef broken_func = FunctionDefHelper::Define(
// Name
"Broken",
// Args
{"x: int32"},
// Return values
{"y: int32"},
// Attrs
{},
// Nodes
{{{"y"}, "BrokenOp", {"x"}, {{"T", T}}}});
Init({broken_func});
FunctionLibraryRuntime::InstantiateOptions inst_opts =
MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0"});
// Instantiating the broken function should work.
FunctionLibraryRuntime::Handle handle;
TF_CHECK_OK(Instantiate("Broken", {{"T", DT_INT32}}, inst_opts, &handle));
TF_CHECK_OK(proc_flr_->ReleaseHandle(handle));
// Instantiating the broken function while creating kernels eagerly should
// fail.
inst_opts.create_kernels_eagerly = true;
Status status = Instantiate("Broken", {{"T", DT_INT32}}, inst_opts, &handle);
EXPECT_TRUE(errors::IsInternal(status));
}
TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_StateHandle) {
auto T = DT_INT32;
// The expected sequence of outputs from this function is [6, 4, 0, 1, ...].
FunctionDef stateful_func = FunctionDefHelper::Define(
// Name
"RandomUniformWrapper",
// Args
{"x: resource"},
// Return values
{"y: int32"},
// Attrs
{},
// Nodes
{FunctionDefHelper::Const<int32>("shape", gtl::ArraySlice<int32>({1})),
FunctionDefHelper::Const<int32>("minval", 0),
{{"maxval"}, "ReadVariableOp", {"x"}, {{"dtype", T}}, {}},
// A stateful node.
{{"y"},
"RandomUniformInt",
{"shape", "minval", "maxval"},
{{"seed", 37}, {"seed2", 48}, {"Tout", T}, {"T", T}}}});
Init({stateful_func});
if (gpu_device_ == nullptr) {
GTEST_SKIP() << "No GPUs available";
}
// Make resource variables.
ResourceMgr* mgr = gpu_device_->resource_manager();
Tensor resource_value = CPUToGPU(test::AsScalar<int>(10));
Var* resource = new Var(T);
*resource->tensor() = resource_value;
resource->is_initialized = true;
Status status = mgr->Create(mgr->default_container(), "my_gpu_var", resource);
ASSERT_TRUE(status.ok()) << status.error_message();
Tensor x = GetResourceHandle("my_gpu_var", mgr->default_container(),
"/job:a/replica:0/task:0/device:GPU:0");
Tensor y;
FunctionLibraryRuntime::InstantiateOptions inst_opts =
MakeOptions("CPU:0", {"GPU:0"}, {"CPU:0"});
// Instantiate the function with no state handle.
FunctionLibraryRuntime::Handle handle;
TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
&handle));
for (auto expected : {6, 4}) {
TF_CHECK_OK(RunInstantiated(handle, {}, {x}, {&y}));
test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
}
// Instantiating the function again with no state handle should result in the
// same handle.
FunctionLibraryRuntime::Handle other_handle;
TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
&other_handle));
EXPECT_EQ(handle, other_handle);
// Running the function should yield continuation of the same sequence.
for (auto expected : {0, 1}) {
TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
}
// Instantiating the function with a state handle should result in a different
// handle.
inst_opts.state_handle = "handle_1";
TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
&other_handle));
EXPECT_NE(handle, other_handle);
// Running the function should yield the original sequeunce.
for (auto expected : {6, 4, 0, 1}) {
TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
}
// Instantiating the function with a different state handle should result in a
// different handle.
inst_opts.state_handle = "handle_2";
TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
&other_handle));
EXPECT_NE(handle, other_handle);
// Running the function should yield the original sequeunce.
for (auto expected : {6, 4, 0, 1}) {
TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
}
// Repeatedly instantiating a function and releasing its handle will result in
// repeating the original sequence.
inst_opts.state_handle = "handle_3";
for (int i = 0; i < 2; ++i) {
TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}},
inst_opts, &other_handle));
EXPECT_NE(handle, other_handle);
// Running the function should yield the original sequeunce.
for (auto expected : {6, 4, 0, 1}) {
TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
}
TF_CHECK_OK(proc_flr_->ReleaseHandle(other_handle));
}
}
REGISTER_OP("SessionMetadataReader")
.Input("x: int64")
.Output("y: string")
.SetIsStateful()
.Doc(R"doc(SessionMetadataReader returns the session metadata.
x: int64
y: string
)doc");
class SessionMetadataReaderOp : public OpKernel {
public:
explicit SessionMetadataReaderOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
void Compute(OpKernelContext* ctx) override {
Tensor* out_tensor = nullptr;
OP_REQUIRES_OK(ctx,
ctx->allocate_output("y", TensorShape({}), &out_tensor));
if (ctx->session_metadata() != nullptr) {
out_tensor->scalar<tstring>()() = ctx->session_metadata()->DebugString();
} else {
out_tensor->scalar<tstring>()() = "";
}
}
};
REGISTER_KERNEL_BUILDER(Name("SessionMetadataReader").Device(DEVICE_CPU),
SessionMetadataReaderOp);
FunctionDef SessionMetadataReaderOpFn() {
return FunctionDefHelper::Define(
// Name
"SessionMetadataReaderFn",
// Args
{"x: int64"},
// Return values
{"y: string"},
// Attr def
{},
// Nodes
{{{"y"}, "SessionMetadataReader", {"x"}, {}}});
}
TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataAbsent) {
Init({SessionMetadataReaderOpFn()}, /*session_metadata=*/nullptr);
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
const auto x = test::AsTensor<int64>({17});
Tensor y;
TF_CHECK_OK(
Run("SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y}));
EXPECT_EQ("", y.scalar<tstring>()());
}
TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataPresent) {
const SessionMetadata session_metadata = GenerateSessionMetadata();
Init({SessionMetadataReaderOpFn()}, &session_metadata);
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
const auto x = test::AsTensor<int64>({17});
Tensor y;
TF_CHECK_OK(
Run("SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y}));
SessionMetadata read_metadata;
ASSERT_TRUE(protobuf::TextFormat::ParseFromString(y.scalar<tstring>()(),
&read_metadata));
EXPECT_EQ(session_metadata.name(), read_metadata.name());
EXPECT_EQ(session_metadata.version(), read_metadata.version());
}
TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataPresentAfterCloning) {
const SessionMetadata session_metadata = GenerateSessionMetadata();
Init({SessionMetadataReaderOpFn()}, &session_metadata);
auto* flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
ASSERT_NE(nullptr, flr);
std::unique_ptr<FunctionLibraryDefinition> cloned_lib_def;
std::unique_ptr<ProcessFunctionLibraryRuntime> cloned_proc_flr;
FunctionLibraryRuntime* cloned_flr;
TF_ASSERT_OK(flr->Clone(&cloned_lib_def, &cloned_proc_flr, &cloned_flr));
FunctionLibraryRuntime::Options opts;
opts.source_device = "/job:a/replica:0/task:0/cpu:0";
opts.rendezvous = rendezvous_;
opts.remote_execution = true;
FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
const auto x = test::AsTensor<int64>({17});
Tensor y;
TF_CHECK_OK(RunWithRuntime("SessionMetadataReaderFn", opts, {},
instantiate_opts, {x}, {&y},
cloned_proc_flr.get()));
SessionMetadata read_metadata;
ASSERT_TRUE(protobuf::TextFormat::ParseFromString(y.scalar<tstring>()(),
&read_metadata));
EXPECT_EQ(session_metadata.name(), read_metadata.name());
EXPECT_EQ(session_metadata.version(), read_metadata.version());
}
} // anonymous namespace
} // namespace tensorflow