blob: 77ce74c58c0cbc4e0b1d85818602ef2dd2dab188 [file] [log] [blame]
/* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/common_runtime/collective_test_util.h"
#include "absl/synchronization/notification.h"
#include "tensorflow/core/common_runtime/base_collective_executor.h"
#include "tensorflow/core/common_runtime/device_resolver_local.h"
#include "tensorflow/core/common_runtime/process_util.h"
#include "tensorflow/core/common_runtime/threadpool_device.h"
#include "tensorflow/core/framework/cancellation.h"
#include "tensorflow/core/framework/device_attributes.pb.h"
#include "tensorflow/core/framework/device_base.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/platform/refcount.h"
#include "tensorflow/core/public/session_options.h"
namespace tensorflow {
FailTestRMA::FailTestRMA(const DeviceMgr* dev_mgr,
DeviceResolverInterface* dev_resolver, int64_t step_id)
: CollectiveRemoteAccessLocal(dev_mgr, dev_resolver, step_id),
fail_after_(0) {}
bool FailTestRMA::MaybeFail(const StatusCallback& done) {
bool fail_now = false;
{
mutex_lock l(mu_);
if (fail_after_ > 0) {
fail_now = (--fail_after_ == 0);
}
}
if (fail_now) {
auto error = errors::Internal("Deliberate failure");
LOG(INFO) << "triggering failure " << error;
buf_rendezvous()->StartAbort(error);
// The current call hasn't reached BufRendezvous yet, so we need to call
// its done separately.
done(error);
return true;
}
return false;
}
void FailTestRMA::RecvFromPeer(
const string& peer_device, const string& peer_task, bool peer_is_local,
const string& key, Device* to_device, DeviceContext* to_device_ctx,
const AllocatorAttributes& to_alloc_attr, Tensor* to_tensor,
const DeviceLocality& client_locality, int dev_to_dev_stream_index,
CancellationManager* cancellation_manager, const StatusCallback& done) {
if (MaybeFail(done)) return;
CollectiveRemoteAccessLocal::RecvFromPeer(
peer_device, peer_task, peer_is_local, key, to_device, to_device_ctx,
to_alloc_attr, to_tensor, client_locality, dev_to_dev_stream_index,
cancellation_manager, done);
}
void FailTestRMA::PostToPeer(const string& peer_device, const string& peer_task,
const string& key, Device* from_device,
DeviceContext* from_device_ctx,
const AllocatorAttributes& from_alloc_attr,
const Tensor* from_tensor,
const DeviceLocality& client_locality,
CancellationManager* cancellation_manager,
const StatusCallback& done) {
if (MaybeFail(done)) return;
CollectiveRemoteAccessLocal::PostToPeer(
peer_device, peer_task, key, from_device, from_device_ctx,
from_alloc_attr, from_tensor, client_locality, cancellation_manager,
done);
}
namespace {
constexpr int kStepId = 0;
std::vector<std::unique_ptr<Device>> CreateCPUDevices(
int num_workers, int num_devices_per_worker) {
SessionOptions sess_opts;
sess_opts.env = Env::Default();
Bytes mem_limit(4 << 20);
DeviceLocality dev_locality;
std::vector<std::unique_ptr<Device>> devices;
for (int wi = 0; wi < num_workers; ++wi) {
for (int di = 0; di < num_devices_per_worker; ++di) {
string dev_name = strings::StrCat("/job:worker/replica:0/task:", wi,
"/device:CPU:", di);
devices.push_back(absl::make_unique<ThreadPoolDevice>(
sess_opts, dev_name, mem_limit, dev_locality, cpu_allocator()));
}
}
return devices;
}
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
std::vector<std::unique_ptr<Device>> CreateGPUDevices() {
// It's required to use the same virtual device configuration in one process,
// so we configure kNumVirtualDevices which should be the maximum used in
// tests.
static constexpr int kNumVirtualDevices = 8;
auto device_factory = DeviceFactory::GetFactory("GPU");
CHECK(device_factory);
SessionOptions options;
std::vector<string> physical_devices;
TF_CHECK_OK(device_factory->ListPhysicalDevices(&physical_devices));
if (physical_devices.size() < kNumVirtualDevices) {
int num_virtual_per_phsyical = static_cast<int>(std::ceil(
static_cast<double>(kNumVirtualDevices) / physical_devices.size()));
auto* virtual_devices = options.config.mutable_gpu_options()
->mutable_experimental()
->mutable_virtual_devices();
for (int i = 0; i < physical_devices.size(); ++i) {
auto* virtual_device = virtual_devices->Add();
for (int j = 0; j < num_virtual_per_phsyical; ++j) {
virtual_device->add_memory_limit_mb(1024); // in MiB.
virtual_device->add_priority(0);
}
}
}
std::vector<std::unique_ptr<Device>> devices;
Status s = device_factory->CreateDevices(
options, "/job:worker/replica:0/task:0", &devices);
CHECK(s.ok());
return devices;
}
#endif
} // namespace
std::unique_ptr<CollectiveTestEnv> CreateCollectiveTestEnv(
int num_workers, int num_devices_per_worker, DeviceType device_type) {
auto test_env = absl::make_unique<CollectiveTestEnv>();
test_env->param_resolver = absl::make_unique<TestParamResolver>();
// We don't create CollecticeExecutor from the CollecticeExecutorMgr so we
// don't need to pass rma.
test_env->col_exec_mgr = absl::make_unique<TestCollectiveExecutorMgr>(
test_env->param_resolver.get(), /*rma=*/nullptr);
test_env->num_workers = num_workers;
test_env->num_devices_per_worker = num_devices_per_worker;
test_env->device_type = device_type;
std::vector<std::unique_ptr<Device>> devices;
if (device_type == DEVICE_CPU) {
devices = CreateCPUDevices(num_workers, num_devices_per_worker);
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} else if (device_type == DEVICE_GPU) {
CHECK(num_workers == 1) << "GPU only supports single worker tests";
devices = CreateGPUDevices();
if (devices.size() < num_devices_per_worker) {
LOG(FATAL) << "The test is requesting more GPUs than available";
}
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} else {
LOG(FATAL) << "Unsupported device_type " << device_type;
}
test_env->device_mgr = absl::make_unique<StaticDeviceMgr>(std::move(devices));
test_env->device_resolver =
absl::make_unique<DeviceResolverLocal>(test_env->device_mgr.get());
test_env->work_queue =
std::make_shared<UnboundedWorkQueue>(Env::Default(), "test");
// BaseCollectiveExecutor takes the ownership of remote_access.
test_env->remote_access = new FailTestRMA(
test_env->device_mgr.get(), test_env->device_resolver.get(), kStepId);
test_env->col_exec.reset(new BaseCollectiveExecutor(
test_env->col_exec_mgr.get(), test_env->remote_access, kStepId,
test_env->device_mgr.get(), test_env->work_queue));
return test_env;
}
core::RefCountPtr<CollectiveParams> CreateCollectiveParams(
const CollectiveTestEnv& test_env, int rank, const string& collective_name,
CollectiveType collective_type, DataType dtype, const TensorShape& shape) {
static constexpr int kGroupKey = 5;
static constexpr int kInstanceKey = 17;
core::RefCountPtr<CollectiveParams> col_params(new CollectiveParams());
col_params->name = "test_collective";
col_params->default_rank = rank;
// Set up a local device ring order that's not just 0,1,2...
std::vector<int> local_ring_order;
local_ring_order.reserve(test_env.num_devices_per_worker);
for (int di = 0; di < test_env.num_devices_per_worker; ++di) {
local_ring_order.push_back(di);
}
for (int di = 0; di < test_env.num_devices_per_worker; ++di) {
bool is_odd = ((di % 2) == 1);
int other = (di + (is_odd ? 7 : 3)) % test_env.num_devices_per_worker;
if (di == other) continue;
std::iter_swap(local_ring_order.begin() + di,
local_ring_order.begin() + other);
}
string lro_buf;
for (auto d : local_ring_order) strings::StrAppend(&lro_buf, d, ", ");
VLOG(1) << "local_ring_order " << lro_buf;
// Set up group parameters.
col_params->group.group_key = kGroupKey;
col_params->group.group_size =
test_env.num_workers * test_env.num_devices_per_worker;
col_params->group.num_tasks = test_env.num_workers;
col_params->group.device_type = test_env.device_type;
for (int wi = 0; wi < test_env.num_workers; ++wi) {
string task_name = strings::StrCat("/job:worker/replica:0/task:", wi);
col_params->group.num_devices_per_task[task_name] =
test_env.num_devices_per_worker;
for (int di = 0; di < test_env.num_devices_per_worker; ++di) {
CollGroupMember member;
member.device.set_name(strings::StrCat(
task_name, "/device:", test_env.device_type.type_string(), ":", di));
member.task = task_name;
// Normally each device would set is_local to its own perspective but
// this test runs in a single process so is_local is always true.
member.is_local = true;
col_params->group.members.push_back(member);
}
}
// Set up instance parameters.
col_params->instance.instance_key = kInstanceKey;
col_params->instance.type = collective_type;
col_params->instance.impl_details.collective_name = collective_name;
col_params->instance.data_type = dtype;
col_params->instance.shape = shape;
col_params->instance.impl_details.subdiv_offsets.push_back(0);
return col_params;
}
std::vector<int> GenerateEvenSubdivOffsets(int num_devices_per_worker,
int num_subdivs) {
std::vector<int> offsets;
offsets.reserve(num_subdivs);
int subdiv_stride = num_devices_per_worker / num_subdivs;
for (int sdi = 0; sdi < num_subdivs; ++sdi) {
offsets.push_back(sdi * subdiv_stride);
}
return offsets;
}
Tensor CopyTensorToDevice(Device* device, const Tensor& tensor) {
if (device->device_type() == DEVICE_CPU) {
return tensor;
} else if (device->device_type() == DEVICE_GPU) {
Tensor copied(device->GetAllocator(AllocatorAttributes()), tensor.dtype(),
tensor.shape());
auto* dev_info = device->tensorflow_gpu_device_info();
CHECK(dev_info);
TF_CHECK_OK(dev_info->default_context->CopyCPUTensorToDeviceSync(
&tensor, device, &copied));
return copied;
}
LOG(FATAL) << "Unsupported device_type " << device->device_type();
}
Tensor CopyTensorToHost(Device* device, const Tensor& tensor) {
if (device->device_type() == DEVICE_CPU) {
return tensor;
} else if (device->device_type() == DEVICE_GPU) {
Tensor copied(tensor.dtype(), tensor.shape());
auto* dev_info = device->tensorflow_gpu_device_info();
CHECK(dev_info);
TF_CHECK_OK(dev_info->default_context->CopyDeviceTensorToCPUSync(
&tensor, "" /*tensor_name*/, device, &copied));
return copied;
}
LOG(FATAL) << "Unsupported device_type " << device->device_type();
}
Status RunCollective(CollectiveTestEnv* test_env, CollectiveParams* col_params,
Device* device, Tensor* input, Tensor* output) {
// Copy input and allocate output if on GPU.
Tensor input_buffer;
Tensor output_buffer;
if (device->device_type() == DEVICE_CPU) {
input_buffer = *input;
output_buffer = *output;
} else if (device->device_type() == DEVICE_GPU) {
input_buffer = CopyTensorToDevice(device, *input);
if (input == output) {
// If the input is forwarded to the output, we keep the forwarding so that
// we can test if the collective can run in-place.
output_buffer = input_buffer;
} else {
output_buffer = Tensor(device->GetAllocator(AllocatorAttributes()),
output->dtype(), output->shape());
}
} else {
LOG(FATAL) << "Unsupported device_type " << device->device_type();
}
// Requires the user the allocate output since we cannot infer the output
// shape.
CHECK(output->NumElements()) << "output must be allocated";
// Prepare an OpKernelContext.
OpKernelContext::Params op_params;
CancellationManager cancellation_manager;
op_params.step_id = kStepId;
op_params.device = device;
op_params.cancellation_manager = &cancellation_manager;
gtl::InlinedVector<TensorValue, 4> inputs;
inputs.push_back(TensorValue(&input_buffer));
op_params.inputs = &inputs;
gtl::InlinedVector<AllocatorAttributes, 4> input_aa({AllocatorAttributes()});
op_params.input_alloc_attrs = &input_aa;
DeviceContext* dev_ctx = nullptr;
auto* dev_info = device->tensorflow_gpu_device_info();
if (dev_info) {
dev_ctx = dev_info->default_context;
dev_ctx->Ref();
} else {
dev_ctx = new DeviceContext;
}
core::ScopedUnref unref_dev_ctx(dev_ctx);
op_params.op_device_context = dev_ctx;
int forward_from = 0;
op_params.forward_from_array = &forward_from;
AllocatorAttributes generic_alloc_attr;
op_params.output_attr_array = &generic_alloc_attr;
op_params.resource_manager = device->resource_manager();
OpKernelContext ctx(&op_params, 1);
// Prepare a collective instance.
CollectiveImplementationInterface* collective_impl = nullptr;
TF_CHECK_OK(CollectiveRegistry::Lookup(
col_params->instance.impl_details.collective_name, &collective_impl));
core::ScopedUnref unref_collective_impl(collective_impl);
TF_RETURN_IF_ERROR(collective_impl->InitializeCollectiveParams(col_params));
string exec_key = strings::StrCat(col_params->instance.instance_key, ":0:0");
auto col_ctx = std::make_shared<CollectiveContext>(
test_env->col_exec.get(), /*nccl_communicator*/ nullptr,
test_env->device_mgr.get(), &ctx, &op_params, col_params, exec_key,
kStepId, &input_buffer, &output_buffer);
TF_RETURN_IF_ERROR(collective_impl->InitializeCollectiveContext(col_ctx));
// Run the collective.
Status status;
Notification n;
collective_impl->Run([&status, &n](Status s) {
status = s;
n.Notify();
});
n.WaitForNotification();
if (status.ok()) {
*output = CopyTensorToHost(device, output_buffer);
}
return status;
}
} // namespace tensorflow