blob: fdc36675cfc27db782a13091c22638a92ed97891 [file] [log] [blame]
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <list>
#include <map>
#include <tuple>
#include <vector>
#include "tensorflow/core/common_runtime/device/device_id.h"
#include "tensorflow/core/common_runtime/device/device_id_manager.h"
#include "tensorflow/core/common_runtime/device/device_id_utils.h"
#include "tensorflow/core/common_runtime/device_factory.h"
#include "tensorflow/core/common_runtime/pluggable_device/pluggable_device.h"
#include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.h"
#include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_init.h"
#include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_process_state.h"
#include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_util.h"
#include "tensorflow/core/framework/allocator.h"
namespace tensorflow {
namespace {
int64 MinSystemMemory(int64 available_memory) {
// We use the following heuristic for now:
//
// If the available_memory is < 2GiB, we allocate 225MiB to system memory,
// Otherwise, allocate max(300MiB, kMinSystemMemoryFraction *
// available_memory) to system memory.
//
// In the future we could be more sophisticated by using a table of devices.
int64 min_system_memory;
constexpr float kMinSystemMemoryFraction = 0.06;
if (available_memory < (1LL << 31)) {
// 225MiB
min_system_memory = 255 * 1024 * 1024;
} else {
// max(300 MiB, kMinSystemMemoryFraction * available_memory)
min_system_memory = std::max(
int64{314572800},
static_cast<int64>(available_memory * kMinSystemMemoryFraction));
}
#if defined(__GNUC__) && defined(__OPTIMIZE__)
// Do nothing
#elif !defined(__GNUC__) && defined(NDEBUG)
// Do nothing
#else
// Double the amount of available PluggableDevice memory in non-opt builds
// (debug builds in windows); because in non-opt builds more system memory is
// necessary.
min_system_memory *= 2;
#endif
VLOG(5) << "available_memory = " << available_memory;
VLOG(5) << "min_system_memory = " << min_system_memory;
return min_system_memory;
}
// Get the memory limit for the virtual device being created on PluggableDevice
// with 'platform_device_id', when that virtual device is the only
// virtual device being created on that Plugged Device.
Status SingleVirtualDeviceMemoryLimit(const string& platform_name,
const GPUOptions& gpu_options,
PlatformDeviceId platform_device_id,
int64* memory_limit) {
int64 total_memory = 0;
int64 available_memory = 0;
se::Platform* platform = PluggableDeviceMachineManager(platform_name);
se::StreamExecutor* se =
DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_device_id)
.ValueOrDie();
if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
return errors::Unknown(
"Failed to query available memory for PluggableDevice ",
platform_device_id.value());
}
int64 allocated_memory = 0;
const double per_process_gpu_memory_fraction =
gpu_options.per_process_gpu_memory_fraction();
if (per_process_gpu_memory_fraction > 1.0 ||
gpu_options.experimental().use_unified_memory()) {
return errors::Internal("not support unified memory yet.");
}
if (per_process_gpu_memory_fraction == 0) {
allocated_memory = available_memory;
const int64 min_system_memory = MinSystemMemory(available_memory);
if (min_system_memory < allocated_memory) {
allocated_memory -= min_system_memory;
}
} else {
allocated_memory = total_memory * per_process_gpu_memory_fraction;
}
*memory_limit = allocated_memory;
return Status::OK();
}
} // namespace
PluggableDeviceFactory::PluggableDeviceFactory(const string& device_type,
const string& platform_name)
: device_type_(device_type), platform_name_(platform_name) {}
Status PluggableDeviceFactory::ListPhysicalDevices(
std::vector<string>* devices) {
TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
int device_count = platform->VisibleDeviceCount();
for (int i = 0; i < device_count; ++i) {
const string device_name =
strings::StrCat("/physical_device:", device_type_, ":", i);
devices->push_back(device_name);
}
return Status::OK();
}
Status PluggableDeviceFactory::GetDeviceDetails(
int device_index, std::unordered_map<string, string>* details) {
TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
if (platform == nullptr) {
return Status::OK();
}
int device_count = platform->VisibleDeviceCount();
if (device_index < 0 || device_index >= device_count) {
return errors::Internal("Invalid device index: ", device_index);
}
auto desc_status = platform->DescriptionForDevice(device_index);
if (!desc_status.ok()) {
return desc_status.status();
}
auto desc = desc_status.ConsumeValueOrDie();
(*details)["device_name"] = desc->name();
return Status::OK();
}
Status PluggableDeviceFactory::CreateDevices(
const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices) {
TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
if (platform == nullptr) {
return Status::OK();
}
if (platform->VisibleDeviceCount() <= 0) {
return Status::OK();
}
size_t num_devices_to_use = INT_MAX;
auto iter = options.config.device_count().find(device_type_);
if (iter != options.config.device_count().end()) {
num_devices_to_use = iter->second;
}
const auto& gpu_options = options.config.gpu_options();
std::vector<PlatformDeviceId> visible_device_order;
if (num_devices_to_use > 0) {
TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
gpu_options.visible_device_list(), platform->VisibleDeviceCount(),
&visible_device_order));
}
if (num_devices_to_use > visible_device_order.size()) {
num_devices_to_use = visible_device_order.size();
}
const auto& virtual_devices = gpu_options.experimental().virtual_devices();
if (!virtual_devices.empty())
VLOG(2) << "Pluggable device does not support virtual device setting yet";
int next_tf_device_id = 0;
std::vector<int64> memory_limit_bytes;
for (int i = 0; i < num_devices_to_use; ++i) {
const PlatformDeviceId platform_device_id = visible_device_order[i];
int64 single_virtual_device_memory_limit = 0;
TF_RETURN_IF_ERROR(SingleVirtualDeviceMemoryLimit(
platform_name_, gpu_options, platform_device_id,
&single_virtual_device_memory_limit));
memory_limit_bytes.push_back(single_virtual_device_memory_limit);
while (next_tf_device_id < memory_limit_bytes.size()) {
TfDeviceId tf_device_id(next_tf_device_id);
++next_tf_device_id;
TF_RETURN_IF_ERROR(DeviceIdManager::InsertTfPlatformDeviceIdPair(
DeviceType(device_type_), tf_device_id, platform_device_id));
}
}
const int num_tf_devices = next_tf_device_id;
std::vector<DeviceLocality> device_localities;
TF_RETURN_IF_ERROR(GetDeviceLocalities(num_tf_devices, &device_localities));
// Build the PluggableDevices.
DCHECK_EQ(next_tf_device_id, memory_limit_bytes.size());
for (int di = 0; di < num_tf_devices; ++di) {
TfDeviceId tf_device_id(di);
int64 bytes = memory_limit_bytes[di];
TF_RETURN_IF_ERROR(CreatePluggableDevice(options, name_prefix, tf_device_id,
bytes, device_localities[di],
devices));
}
return Status::OK();
}
static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
const se::DeviceDescription& desc) {
return strings::StrCat("device: ", platform_device_id.value(),
", name: ", desc.name(),
", pci bus id: ", desc.pci_bus_id());
}
Status PluggableDeviceFactory::CreatePluggableDevice(
const SessionOptions& options, const string& name_prefix,
TfDeviceId tf_device_id, int64 memory_limit,
const DeviceLocality& dev_locality,
std::vector<std::unique_ptr<Device>>* devices) {
DCHECK_GE(tf_device_id.value(), 0);
const string device_name = strings::StrCat(
name_prefix, "/device:", device_type_, ":", tf_device_id.value());
se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
DeviceIdUtil::CheckValidTfDeviceId(DeviceType(device_type_), platform,
tf_device_id);
PlatformDeviceId platform_device_id;
TF_RETURN_IF_ERROR(DeviceIdManager::TfToPlatformDeviceId(
DeviceType(device_type_), tf_device_id, &platform_device_id));
int numa_node = dev_locality.numa_node();
auto desc_status = platform->DescriptionForDevice(platform_device_id.value());
if (!desc_status.ok()) {
return desc_status.status();
}
auto desc = desc_status.ConsumeValueOrDie();
PluggableDeviceProcessState* process_state =
PluggableDeviceProcessState::singleton(device_type_, platform_name_);
Allocator* device_allocator = process_state->GetPluggableDeviceAllocator(
options.config.gpu_options(), tf_device_id, memory_limit);
if (device_allocator == nullptr) {
return errors::Internal(
"Failed to get memory allocator for TF PluggableDevice ",
tf_device_id.value(), " with", memory_limit, " bytes of memory. ");
}
absl::optional<AllocatorStats> stats = device_allocator->GetStats();
if (!stats) {
return errors::Internal("No allocator statistics");
}
// 'memory_limit' is the required memory size, but if the allocator with
// given tf_device_id was created before, we'll use it instead of creating
// a new one (as Tf Device is a shared resource), in which case the actual
// memory limit represented by 'stats.bytes_limit' used by that allocator
// may be different(which should be an error);
int64 bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
std::unique_ptr<PluggableDevice> pluggable_device =
absl::make_unique<PluggableDevice>(
options, device_name, device_type_, platform_name_,
static_cast<Bytes>(bytes_limit), dev_locality, tf_device_id,
GetShortDeviceDescription(platform_device_id, *desc),
device_allocator,
ProcessState::singleton()->GetCPUAllocator(numa_node),
false /*sync every op*/);
LOG(INFO) << "Created TensorFlow device (" << device_name << " with "
<< (bytes_limit >> 20)
<< " MB memory) -> physical PluggableDevice ("
<< GetShortDeviceDescription(platform_device_id, *desc) << ")";
TF_RETURN_IF_ERROR(pluggable_device->Init(options));
devices->push_back(std::move(pluggable_device));
return Status::OK();
}
Status PluggableDeviceFactory::GetDeviceLocalities(
int num_tf_devices, std::vector<DeviceLocality>* device_localities) {
std::vector<TfDeviceId> all_tf_device_ids;
all_tf_device_ids.reserve(num_tf_devices);
for (int i = 0; i < num_tf_devices; ++i) {
all_tf_device_ids.push_back(TfDeviceId(i));
}
for (TfDeviceId tf_device_id : all_tf_device_ids) {
PlatformDeviceId platform_device_id;
TF_RETURN_IF_ERROR(DeviceIdManager::TfToPlatformDeviceId(
DeviceType(device_type_), tf_device_id, &platform_device_id));
// Get Plugged device bus_id from its reported NUMA affinity. Because
// devices are virtualized in some environment, we can't just use the device
// id. NUMA locales are indexed from 0, buses are indexed from 1.
se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
auto desc_status =
platform->DescriptionForDevice(platform_device_id.value());
if (!desc_status.ok()) {
return desc_status.status();
}
auto desc = desc_status.ConsumeValueOrDie();
int numa_node = desc->numa_node();
if (numa_node < 0) {
// For some reason the StreamExecutor couldn't get the NUMA
// affinity of the device. If this is not a multi-socket mobo with
// devices local to different buses, it doesn't matter. If it is,
// we may run into trouble later with data transfer operations.
// The trouble may manifest as slower than expected performance,
// or outright features.
LOG(INFO) << "Could not identify NUMA node of platform " << device_type_
<< "Id" << platform_device_id
<< ", defaulting to 0. Your kernel may not have been built "
<< "with NUMA support. ";
numa_node = 0;
}
DeviceLocality dev_locality;
dev_locality.set_numa_node(numa_node);
dev_locality.set_bus_id(numa_node + 1);
device_localities->push_back(dev_locality);
VLOG(1) << "PluggableDevice PlatformId" << platform_device_id
<< "TfDeviceId " << tf_device_id << "on bus "
<< dev_locality.bus_id() << " numa: " << numa_node;
}
return Status::OK();
}
} // namespace tensorflow