blob: 3d92de6986c3ab9dd862cdf324c74d739ae5e55f [file] [log] [blame]
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/compiler/xrt/xrt_util.h"
#include <stdlib.h>
#include <string.h>
#include "tensorflow/compiler/xla/debug_options_flags.h"
#include "tensorflow/compiler/xla/types.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/mutex.h"
namespace tensorflow {
namespace {
mutex nccl_factory_mutex(LINKER_INITIALIZED);
std::shared_ptr<NcclUniqueIdFactory>* nccl_factory;
// The ScopedHandles data structure is used in the ExecuteChained() API and its
// task is to track tuple allocation registrations. It is used both the track
// intermediate results of a chained computation, or its final results. Anything
// which is marked to be released, will be released using the XRTMemoryManager
// once the object is destroyed (unless an explicit call to Drop() or Release()
// is made).
class ScopedHandles {
public:
explicit ScopedHandles(RefPtr<XRTMemoryManager> memory_manager)
: memory_manager_(std::move(memory_manager)) {}
~ScopedHandles() {
for (size_t i = 0; i < handles_.size(); ++i) {
if (handles_release_[i]) {
memory_manager_->Release(handles_[i]).IgnoreError();
}
}
}
int64_t operator[](size_t index) const { return handles_.at(index); }
size_t size() const { return handles_.size(); }
// Adds the given handle at the index position, by marking it releasable
// according to the release argument. If an existing, and to-be-released
// handle already exists at the same index, it will be released.
Status Add(size_t index, int64_t handle, bool release) {
if (index >= handles_.size()) {
handles_.resize(index + 1, XRTMemoryManager::InvalidKey());
handles_release_.resize(index + 1, false);
}
if (handles_release_[index]) {
Status status = memory_manager_->Release(handles_[index]);
if (!status.ok()) {
if (release) {
memory_manager_->Release(handle).IgnoreError();
}
return status;
}
}
handles_[index] = handle;
handles_release_[index] = release;
return OkStatus();
}
// Adds a to-be-released tuple allocation at the given index.
Status Add(size_t index, RefPtr<XRTTupleAllocation> tuple) {
return Add(index, memory_manager_->Register(std::move(tuple)),
/*release=*/true);
}
// Drops the handle at the given index, and releases it using the
// XRTMemoryManager::Release() if marked as to-be-released.
Status Drop(size_t index) {
if (handles_release_.at(index)) {
TF_RETURN_IF_ERROR(memory_manager_->Release(handles_[index]));
}
Release(index);
return OkStatus();
}
// Releases the handle at the given index. The destructor will not use that
// XRTMemoryManager::Release() API on such handle.
int64_t Release(size_t index) {
int64_t handle = handles_.at(index);
handles_[index] = XRTMemoryManager::InvalidKey();
handles_release_[index] = false;
return handle;
}
// Looks up the handle stored at the given index, and returns the matching
// tuple allocation.
xla::StatusOr<RefPtr<XRTTupleAllocation>> Lookup(size_t index) const {
return memory_manager_->Lookup(handles_.at(index));
}
private:
RefPtr<XRTMemoryManager> memory_manager_;
std::vector<int64_t> handles_;
std::vector<bool> handles_release_;
};
bool DebugOptionsPassThroughEnabled() {
const char* env = getenv("TF_XLA_DEBUG_OPTIONS_PASSTHROUGH");
bool enabled =
env != nullptr && (strcmp(env, "1") == 0 || strcmp(env, "true") == 0);
if (enabled) {
LOG(WARNING) << "Passing through XLA debug options!";
} else {
LOG(WARNING) << "TF_XLA_DEBUG_OPTIONS_PASSTHROUGH not set, not all options "
"will be retained";
}
return enabled;
}
string SafeDebugPath(const string& path) {
if (path.empty() || path.compare(0, 5, "gs://") == 0 ||
path.compare(0, 11, "bigstore://") == 0) {
return path;
}
LOG(WARNING) << "Invalid config path (will be dropped): " << path;
return string();
}
Status MakeOutput(const RefPtr<XRTTupleAllocation>& output, int64_t index,
RefPtr<XRTTupleAllocation>* result) {
if (index == 0) {
*result = output;
} else {
XRTTupleAllocation* tuple;
TF_RETURN_IF_ERROR(
XRTTupleAllocation::MakeSubBuffer(output.get(), {index - 1}, &tuple,
/*alias_parent_allocation=*/true));
result->reset(tuple);
}
return OkStatus();
}
Status PopulateOpWorkingSet(xla::Backend* backend,
const xrt::XRTChainedExecuteOp& op,
int current_index, const ScopedHandles& outputs,
XRTMemoryManager::WorkingSet* working_set,
se::DeviceMemoryAllocator* allocator) {
for (int i = 0; i < op.inputs_size(); ++i) {
auto& input = op.inputs(i);
if (input.op_index() >= current_index) {
return errors::InvalidArgument(
"Input index ", input.op_index(),
" is above the current position: ", current_index);
}
TF_RETURN_IF_ERROR(working_set->LookupAndPin(
backend, outputs[input.op_index()], allocator));
}
return OkStatus();
}
} // namespace
void SetNcclUniqueIdFactory(std::shared_ptr<NcclUniqueIdFactory> factory) {
mutex_lock lock(nccl_factory_mutex);
if (nccl_factory == nullptr) {
nccl_factory = new std::shared_ptr<NcclUniqueIdFactory>();
}
*nccl_factory = std::move(factory);
}
std::shared_ptr<NcclUniqueIdFactory> GetNcclUniqueIdFactory() {
mutex_lock lock(nccl_factory_mutex);
return nccl_factory != nullptr ? *nccl_factory : nullptr;
}
xla::DebugOptions BuildXlaDebugOptions(const xla::DebugOptions& ref_options) {
static const bool options_passthrough = DebugOptionsPassThroughEnabled();
if (options_passthrough) {
return ref_options;
}
xla::DebugOptions options = xla::GetDebugOptionsFromFlags();
options.set_xla_dump_to(SafeDebugPath(ref_options.xla_dump_to()));
options.set_xla_dump_hlo_as_proto(ref_options.xla_dump_hlo_as_proto());
options.set_xla_dump_hlo_as_text(ref_options.xla_dump_hlo_as_text());
options.set_xla_dump_hlo_snapshots(ref_options.xla_dump_hlo_snapshots());
options.set_xla_dump_hlo_pass_re(ref_options.xla_dump_hlo_pass_re());
options.set_xla_dump_include_timestamp(
ref_options.xla_dump_include_timestamp());
options.set_xla_dump_max_hlo_modules(ref_options.xla_dump_max_hlo_modules());
for (auto& pass : ref_options.xla_disable_hlo_passes()) {
options.add_xla_disable_hlo_passes(pass);
}
return options;
}
xla::StatusOr<std::vector<InputCoords>> GetComputationInputs(
OpKernelContext* context, const char* input_name) {
OpInputList arg_list;
TF_RETURN_IF_ERROR(context->input_list(input_name, &arg_list));
// Concatenate all input uids from list of scalars-or-vectors carrying them.
std::vector<InputCoords> input_coords;
for (int i = 0; i < arg_list.size(); ++i) {
const Tensor& arg = arg_list[i];
if (TensorShapeUtils::IsScalar(arg.shape())) {
input_coords.emplace_back(arg.scalar<int64_t>()());
} else {
TF_RET_CHECK(TensorShapeUtils::IsVector(arg.shape()));
auto arg_vec = arg.vec<int64_t>();
const int64_t num_elts = arg.shape().dim_size(0);
for (int i = 0; i < num_elts; ++i) {
input_coords.emplace_back(arg_vec(i));
}
}
}
return std::move(input_coords);
}
bool InputShapeMatches(const xla::Shape& parameter_shape,
const xla::Shape& input_shape) {
auto shape_checker = [&](const xla::Shape& pshape,
const xla::ShapeIndex& index) {
if (pshape.IsArray()) {
TF_ASSIGN_OR_RETURN(const xla::Shape* ishape,
xla::ShapeUtil::TryGetSubshape(input_shape, index));
if (pshape.rank() != ishape->rank() ||
pshape.element_type() != ishape->element_type()) {
return errors::InvalidArgument("Mismatching shapes");
}
if (pshape.is_static() && !xla::Layout::Equal().IgnoreTiles()(
pshape.layout(), ishape->layout())) {
return errors::InvalidArgument("Mismatching layouts");
}
for (int64_t dim = 0; dim < pshape.rank(); ++dim) {
if (pshape.is_dynamic_dimension(dim)) {
if (pshape.dimensions(dim) < ishape->dimensions(dim)) {
return errors::InvalidArgument("Mismatching shapes");
}
} else if (pshape.dimensions(dim) != ishape->dimensions(dim)) {
return errors::InvalidArgument("Mismatching shapes");
}
}
}
return OkStatus();
};
return xla::ShapeUtil::ForEachSubshapeWithStatus(parameter_shape,
shape_checker)
.ok();
}
xla::StatusOr<std::vector<RefPtr<XRTTupleAllocation>>> GetInputTupleAllocations(
const std::vector<InputCoords>& input_coords,
XRTMemoryManager::WorkingSet* working_set, xla::Backend* backend,
int64_t num_input_shapes,
const std::function<xla::Shape(int64_t)>& shape_getter, bool release_inputs,
se::DeviceMemoryAllocator* allocator) {
if (input_coords.size() != num_input_shapes) {
return errors::InvalidArgument(
"Number of inputs does not match executable proto input shapes: ",
input_coords.size(), " vs. ", num_input_shapes);
}
std::vector<RefPtr<XRTTupleAllocation>> input_tuples;
input_tuples.reserve(input_coords.size());
for (size_t i = 0; i < input_coords.size(); ++i) {
TF_RETURN_IF_ERROR(
working_set->LookupAndPin(backend, input_coords[i].handle, allocator));
auto tuple = working_set->PinnedTuples().back();
if (release_inputs) {
// We are holding a reference to the tuple, so we can safely delete it
// from the resource manager here.
TF_RETURN_IF_ERROR(
working_set->MemoryManager()->Release(input_coords[i].handle));
VLOG(2) << "Released allocation handle " << input_coords[i].handle;
}
xla::Shape input_shape = shape_getter(i);
if (!InputShapeMatches(input_shape, tuple->on_host_shape())) {
return errors::InvalidArgument(
"Run-time shape mismatch for XRTExecute argument[", i, "] (",
input_coords[i].handle, "). Expected ", input_shape.DebugString(),
"; got ", tuple->on_host_shape().DebugString());
}
if (input_coords[i].index.empty()) {
input_tuples.emplace_back(std::move(tuple));
} else {
XRTTupleAllocation* sub_tuple;
TF_RETURN_IF_ERROR(XRTTupleAllocation::MakeSubBuffer(
tuple.get(), input_coords[i].index, &sub_tuple,
/*alias_parent_allocation=*/true));
input_tuples.emplace_back(sub_tuple);
}
}
return std::move(input_tuples);
}
Status RebuildOutputAliases(
const RefPtr<XRTTupleAllocation>& output_tuple,
absl::Span<const RefPtr<XRTTupleAllocation>> input_tuples,
const xla::HloInputOutputAliasConfig& input_output_alias) {
auto alias_function =
[&](const xla::ShapeIndex& output_index,
const xla::HloInputOutputAliasConfig::Alias& alias) -> Status {
TF_RET_CHECK(alias.parameter_number < input_tuples.size());
return output_tuple->AliasBufferFrom(*input_tuples[alias.parameter_number],
alias.parameter_index, output_index);
};
return input_output_alias.ForEachAliasWithStatus(alias_function);
}
xla::StatusOr<std::vector<xla::ExecutionInput>> GetArgumentsBuffers(
const xla::HloInputOutputAliasConfig& input_output_alias,
absl::Span<const RefPtr<XRTTupleAllocation>> input_tuples,
const std::vector<bool>& input_is_dynamic, bool release_inputs) {
auto is_dynamic = [&](size_t arg) {
return arg < input_is_dynamic.size() && input_is_dynamic[arg];
};
std::vector<xla::ExecutionInput> arguments;
// Don't alias dynamic input -- Due to the underlying implementation,
// aliased inputs have two owners: XRTAllocation and return value of
// this function. If an argument is dynamic and the ownership is
// released to output of this function, TPUExecute will free it and
// reallocate a new one, which creates a double freeing issue where
// XRTAllocation also attempts to release the buffer.
bool alias_outputs = release_inputs && input_tuples.size() == 1 &&
input_tuples[0]->IsExclusiveOwner() && !is_dynamic(0);
arguments.reserve(input_tuples.size());
for (int64_t i = 0; i < input_tuples.size(); ++i) {
auto alias_checker =
[&](const xla::ShapeIndex& index) -> xla::StatusOr<bool> {
if (input_output_alias.ParameterHasAlias(i, index)) {
TF_RET_CHECK(!is_dynamic(i));
return true;
}
return alias_outputs;
};
TF_ASSIGN_OR_RETURN(xla::ExecutionInput exec_input,
input_tuples[i]->ToExecutionInput(alias_checker));
arguments.emplace_back(std::move(exec_input));
}
return std::move(arguments);
}
Status CreateExecuteOutput(OpKernelContext* context,
XRTMemoryManager* memory_manager,
RefPtr<XRTTupleAllocation> output_tuple,
bool return_exploded_tuple) {
if (return_exploded_tuple && output_tuple->on_host_shape().IsTuple()) {
int64_t tuple_element_count =
xla::ShapeUtil::TupleElementCount(output_tuple->on_device_shape());
Tensor* output_tensor;
TF_RETURN_IF_ERROR(context->allocate_output(
0, TensorShape({tuple_element_count}), &output_tensor));
for (int64_t i = 0; i < tuple_element_count; ++i) {
XRTTupleAllocation* suballocation;
TF_RETURN_IF_ERROR(XRTTupleAllocation::MakeSubBuffer(
output_tuple.get(), {i}, &suballocation,
/*alias_parent_allocation=*/false));
output_tensor->vec<int64_t>()(i) =
memory_manager->Register(suballocation);
}
} else {
Tensor* output_tensor;
TF_RETURN_IF_ERROR(
context->allocate_output(0, TensorShape({}), &output_tensor));
output_tensor->scalar<int64_t>()() =
memory_manager->Register(std::move(output_tuple));
}
return OkStatus();
}
Status ExecuteChained(OpKernelContext* context,
const RefPtr<XRTMemoryManager>& memory_manager,
xla::Backend* backend, int device_ordinal,
const xrt::XRTChainedExecutePlan& plan,
const xrt::XRTChainedExecuteConfig& config,
const ChainedExecuteFn& execute_op,
se::DeviceMemoryAllocator* allocator) {
// Create the vector which tracks the uses of the intermediate chained
// operations outputs.
std::vector<int64_t> uses(plan.ops_size(), 0);
for (auto& op : plan.ops()) {
for (auto& input : op.inputs()) {
uses[input.op_index()] += 1;
}
}
ScopedHandles outputs(memory_manager);
ScopedHandles results(memory_manager);
for (int i = 0; i < plan.ops_size(); ++i) {
auto& op = plan.ops(i);
if (op.op_oneof_case() == xrt::XRTChainedExecuteOp::kDataHandle) {
// This operation is a device data load. Set the handle as output and
// leave the release flag off, since this is not an intermediate output.
TF_RETURN_IF_ERROR(outputs.Add(i, op.data_handle(), /*release=*/false));
} else if (op.op_oneof_case() ==
xrt::XRTChainedExecuteOp::kComputationHandle) {
// This is an XRT execute operation, forward to the device specific
// handler. Populating the working set makes sure the input allocations
// for this execute operations are pinned to device memory.
XRTMemoryManager::WorkingSet working_set(memory_manager);
TF_RETURN_IF_ERROR(PopulateOpWorkingSet(backend, op, i, outputs,
&working_set, allocator));
TF_ASSIGN_OR_RETURN(auto tuple,
execute_op(op, working_set.PinnedTuples()));
TF_RETURN_IF_ERROR(outputs.Add(i, std::move(tuple)));
} else {
return errors::InvalidArgument(
"Undefined operation kind at post-order position ", i);
}
// If the result of this chained operation is an output result, feed the
// results at the desired position.
for (auto& output : op.outputs()) {
TF_ASSIGN_OR_RETURN(auto tuple, outputs.Lookup(i));
RefPtr<XRTTupleAllocation> result;
TF_RETURN_IF_ERROR(MakeOutput(tuple, output.output_index(), &result));
TF_RETURN_IF_ERROR(results.Add(output.result_index(), std::move(result)));
}
// Drop intermediate results which have no more users.
for (auto& input : op.inputs()) {
uses[input.op_index()] -= 1;
if (uses[input.op_index()] == 0) {
TF_RETURN_IF_ERROR(outputs.Drop(input.op_index()));
}
}
}
Tensor* output_tensor;
TF_RETURN_IF_ERROR(context->allocate_output(
0, TensorShape({static_cast<int64_t>(results.size())}), &output_tensor));
for (size_t i = 0; i < results.size(); ++i) {
output_tensor->vec<int64_t>()(i) = results.Release(i);
}
return OkStatus();
}
} // namespace tensorflow