blob: c16e5c271d3e239316af9bff11be90918fb2bf2a [file] [log] [blame]
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/common_runtime/ring_reducer.h"
#include <algorithm>
#include "absl/memory/memory.h"
#include "tensorflow/core/common_runtime/base_collective_executor.h"
#include "tensorflow/core/common_runtime/collective_rma_local.h"
#include "tensorflow/core/common_runtime/collective_test_util.h"
#include "tensorflow/core/common_runtime/device.h"
#include "tensorflow/core/common_runtime/device_mgr.h"
#include "tensorflow/core/common_runtime/device_resolver_local.h"
#include "tensorflow/core/common_runtime/process_util.h"
#include "tensorflow/core/common_runtime/test_collective_executor_mgr.h"
#include "tensorflow/core/common_runtime/threadpool_device.h"
#include "tensorflow/core/framework/cancellation.h"
#include "tensorflow/core/framework/collective.h"
#include "tensorflow/core/framework/fake_input.h"
#include "tensorflow/core/framework/node_def.pb.h"
#include "tensorflow/core/framework/node_def_builder.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/tensor_testutil.h"
#include "tensorflow/core/lib/core/notification.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/platform/refcount.h"
#include "tensorflow/core/platform/test.h"
#include "tensorflow/core/platform/unbounded_work_queue.h"
#include "tensorflow/core/public/session_options.h"
#include "tensorflow/core/public/version.h"
namespace tensorflow {
std::unique_ptr<OpKernel> GetKernel(const NodeDef& node,
const DeviceType& device_type,
DeviceBase* device) {
Status status;
std::unique_ptr<OpKernel> k = CreateOpKernel(
device_type, device, device->GetAllocator(AllocatorAttributes()), node,
TF_GRAPH_DEF_VERSION, &status);
if (!status.ok()) {
LOG(FATAL) << status;
}
return k;
}
std::unique_ptr<OpKernel> GetAdd(DataType dtype, const DeviceType& device_type,
DeviceBase* device) {
NodeDef node_def;
NodeDefBuilder builder("add_node", "Add");
TF_CHECK_OK(builder.Attr("T", dtype)
.Input(FakeInput(dtype))
.Input(FakeInput(dtype))
.Finalize(&node_def));
return GetKernel(node_def, device_type, device);
}
std::unique_ptr<OpKernel> GetDiv(DataType dtype, const DeviceType& device_type,
DeviceBase* device) {
NodeDef node_def;
NodeDefBuilder builder("add_node", "Div");
TF_CHECK_OK(builder.Attr("T", dtype)
.Input(FakeInput(dtype))
.Input(FakeInput(dtype))
.Finalize(&node_def));
return GetKernel(node_def, device_type, device);
}
class RingReducerTest : public ::testing::Test {
protected:
void Init(int num_workers, int num_devices, DataType dtype,
const TensorShape& shape, const DeviceType& device_type,
int num_subdivs, int fail_after) {
test_env_ = CreateCollectiveTestEnv(num_workers, num_devices, device_type);
test_env_->remote_access->set_fail_after(fail_after);
for (int wi = 0; wi < num_workers; ++wi) {
for (int di = 0; di < num_devices; ++di) {
int rank = wi * num_devices + di;
instances_.push_back(absl::make_unique<DeviceInstance>(
rank, num_subdivs, dtype, shape, test_env_.get()));
}
}
}
void Reduce(int fail_after) {
std::atomic<int> done(0);
for (auto& di : instances_) {
SchedClosure([&di, &done] {
di->DoReduce();
++done;
});
if (fail_after > 0) {
// Stagger the op execution starts.
Env::Default()->SleepForMicroseconds(100);
}
}
while (done < static_cast<int>(instances_.size())) {
Env::Default()->SleepForMicroseconds(1000);
}
}
template <typename T>
void RunTest(DataType dtype, const DeviceType& device_type, int num_workers,
int num_devices, int num_subdivs, int tensor_len,
int fail_after) {
Init(num_workers, num_devices, dtype, TensorShape({tensor_len}),
device_type, num_subdivs, fail_after);
std::vector<T> expected(tensor_len);
for (int di = 0; di < static_cast<int>(instances_.size()); ++di) {
instances_[di]->InitTensor([&expected, dtype, di](Tensor* t) {
for (size_t i = 0; i < t->NumElements(); ++i) {
// The cast is necessary to prevent clang-tidy from insisting
// that a faster non-open source function be substituted.
float value = pow(10, static_cast<double>(di)) * i;
if (dtype == DT_INT32 || dtype == DT_INT64) {
value = di * 10 + i;
}
t->flat<T>()(i) = static_cast<T>(value);
expected[i] += static_cast<T>(value);
}
});
}
Reduce(fail_after);
if (fail_after > 0) {
// Confirm that every device terminated with the expected error status.
for (int di = 0; di < static_cast<int>(instances_.size()); ++di) {
EXPECT_NE(
instances_[di]->status_.error_message().find("Deliberate failure"),
string::npos);
}
} else {
// Confirm that every device computed the same correct reduction value.
for (int i = 0; i < tensor_len; ++i) {
expected[i] /= static_cast<T>(num_workers * num_devices);
}
for (int di = 0; di < static_cast<int>(instances_.size()); ++di) {
TF_EXPECT_OK(instances_[di]->status_);
test::ExpectTensorEqual<T>(test::AsTensor<T>(expected),
instances_[di]->tensor());
}
}
}
class DeviceInstance {
public:
DeviceInstance(int rank, int num_subdivs, DataType dtype,
const TensorShape& shape, CollectiveTestEnv* test_env)
: test_env_(test_env), tensor_(dtype, shape) {
col_params_ = CreateCollectiveParams(*test_env_, rank, "RingReduce",
REDUCTION_COLLECTIVE, dtype, shape);
if (num_subdivs > 0) {
col_params_->instance.impl_details.subdiv_offsets =
GenerateEvenSubdivOffsets(test_env->num_devices_per_worker,
num_subdivs);
}
string dev_name = col_params_->group.members[rank].device.name();
TF_CHECK_OK(test_env_->device_mgr->LookupDevice(dev_name, &device_))
<< "Couldn't find device " << dev_name
<< " existing devices: " << test_env_->device_mgr->DebugString();
merge_op_ = GetAdd(col_params_->instance.data_type,
test_env_->device_type, device_);
final_op_ = GetDiv(col_params_->instance.data_type,
test_env_->device_type, device_);
col_params_->merge_op = merge_op_.get();
col_params_->final_op = final_op_.get();
}
void InitTensor(const std::function<void(Tensor*)>& init_f) {
init_f(&tensor_);
}
void DoReduce() {
status_ = RunCollective(test_env_, col_params_.get(), device_, &tensor_,
&tensor_);
}
const Tensor& tensor() { return tensor_; }
CollectiveTestEnv* test_env_;
Tensor tensor_;
Device* device_;
core::RefCountPtr<CollectiveParams> col_params_;
std::unique_ptr<OpKernel> merge_op_;
std::unique_ptr<OpKernel> final_op_;
Status status_;
};
std::unique_ptr<CollectiveTestEnv> test_env_;
std::vector<std::unique_ptr<DeviceInstance>> instances_;
mutex mu_;
int32 reduce_counter_ TF_GUARDED_BY(mu_) = 0;
};
class RingReducerInitParamsTest : public ::testing::Test {
protected:
void RunSubdivPermsTest(
CollectiveParams* cp,
const std::vector<std::vector<int>>& expected_subdiv_perms,
const std::vector<int>& expected_subdiv_rank) {
cp->instance.impl_details.subdiv_permutations.clear();
cp->subdiv_rank.clear();
// Create a stub ring reducer only for testing param initialization.
core::RefCountPtr<RingReducer> reducer(new RingReducer());
TF_CHECK_OK(reducer->InitializeCollectiveParams(cp));
EXPECT_EQ(expected_subdiv_perms,
cp->instance.impl_details.subdiv_permutations);
EXPECT_EQ(expected_subdiv_rank, cp->subdiv_rank);
reducer->group_size_tensor_ready_.Notify(); // To unblock destructor.
}
};
TEST_F(RingReducerInitParamsTest, SpecifiedSubdivs) {
const int kNumDevsPerWorker = 8;
const int kNumWorkers = 3;
auto test_env =
CreateCollectiveTestEnv(kNumWorkers, kNumDevsPerWorker, DEVICE_CPU);
auto cp =
CreateCollectiveParams(*test_env, /*rank*/ 0, "RingReduce",
REDUCTION_COLLECTIVE, DT_FLOAT, TensorShape({1}));
cp->default_rank = 0;
cp->instance.impl_details.subdiv_offsets = {0, 4};
RunSubdivPermsTest(cp.get(),
{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
{4, 5, 6, 7, 0, 1, 2, 3, 12, 13, 14, 15,
8, 9, 10, 11, 20, 21, 22, 23, 16, 17, 18, 19}},
{0, 4});
cp->instance.impl_details.subdiv_offsets = {0, -4};
RunSubdivPermsTest(cp.get(),
{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
{3, 2, 1, 0, 7, 6, 5, 4, 11, 10, 9, 8,
15, 14, 13, 12, 19, 18, 17, 16, 23, 22, 21, 20}},
{0, 3});
cp->default_rank = 3;
cp->instance.impl_details.subdiv_offsets = {3, -3};
RunSubdivPermsTest(cp.get(),
{{3, 4, 5, 6, 7, 0, 1, 2, 11, 12, 13, 14,
15, 8, 9, 10, 19, 20, 21, 22, 23, 16, 17, 18},
{4, 3, 2, 1, 0, 7, 6, 5, 12, 11, 10, 9,
8, 15, 14, 13, 20, 19, 18, 17, 16, 23, 22, 21}},
{0, 1});
}
TEST_F(RingReducerInitParamsTest, AutomaticSubdivs) {
const int kNumDevsPerWorker = 8;
const int kNumWorkers = 3;
const int kNumDevs = kNumDevsPerWorker * kNumWorkers;
auto test_env =
CreateCollectiveTestEnv(kNumWorkers, kNumDevsPerWorker, DEVICE_CPU);
auto cp =
CreateCollectiveParams(*test_env, /*rank*/ 0, "RingReduce",
REDUCTION_COLLECTIVE, DT_FLOAT, TensorShape({1}));
// Test automatic generation of subdiv offsets.
cp->default_rank = 0;
cp->instance.impl_details.subdiv_offsets.clear();
cp->instance.impl_details.max_subdivs_per_device = 0;
RunSubdivPermsTest(cp.get(),
{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}},
{0});
// Set shape so that with 2 subdivs chunk_size is 3 MiB. This should cause 2
// offsets, {0, -4}, to be generated.
{
int num_subdivs = 2;
int num_chunks = kNumDevs * num_subdivs;
size_t chunk_size = 3 * 1048576; // 3 MB
size_t tensor_size = chunk_size * num_chunks;
cp->instance.shape = TensorShape(
{static_cast<int64_t>(tensor_size / DataTypeSize(DT_FLOAT))});
}
cp->instance.impl_details.subdiv_offsets.clear();
RunSubdivPermsTest(cp.get(),
{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
{3, 2, 1, 0, 7, 6, 5, 4, 11, 10, 9, 8,
15, 14, 13, 12, 19, 18, 17, 16, 23, 22, 21, 20}},
{0, 3});
}
TEST_F(RingReducerInitParamsTest, AutomaticSubdivUpperBound) {
const int kNumDevsPerWorker = 1;
const int kNumWorkers = 4;
auto test_env =
CreateCollectiveTestEnv(kNumWorkers, kNumDevsPerWorker, DEVICE_CPU);
auto cp =
CreateCollectiveParams(*test_env, /*rank*/ 0, "RingReduce",
REDUCTION_COLLECTIVE, DT_FLOAT, TensorShape({1}));
cp->default_rank = 0;
cp->instance.impl_details.subdiv_offsets.clear();
cp->instance.impl_details.max_subdivs_per_device = 0;
cp->instance.shape = TensorShape({104857600 / DataTypeSize(DT_FLOAT)});
RunSubdivPermsTest(cp.get(), {{0, 1, 2, 3}, {0, 1, 2, 3}}, {0, 0});
}
TEST_F(RingReducerInitParamsTest, AutomaticSubdivIgnoresMaxNumSubdivs) {
const int kNumDevsPerWorker = 1;
const int kNumWorkers = 4;
auto test_env =
CreateCollectiveTestEnv(kNumWorkers, kNumDevsPerWorker, DEVICE_CPU);
auto cp =
CreateCollectiveParams(*test_env, /*rank*/ 0, "RingReduce",
REDUCTION_COLLECTIVE, DT_FLOAT, TensorShape({1}));
cp->default_rank = 0;
// When subdiv_offsets is present it will override automatic generation of
// offsets even when max_subdivs_per_device is present.
// cp->instance.impl_details.subdiv_offsets.clear();
cp->instance.impl_details.max_subdivs_per_device = 4;
cp->instance.shape = TensorShape({104857600 / DataTypeSize(DT_FLOAT)});
RunSubdivPermsTest(cp.get(), {{0, 1, 2, 3}}, {0});
cp->default_rank = 0;
// subdiv_offsets cleared, max_subdivs_per_device = 4 takes effect.
cp->instance.impl_details.subdiv_offsets.clear();
cp->instance.impl_details.max_subdivs_per_device = 4;
cp->instance.shape = TensorShape({104857600 / DataTypeSize(DT_FLOAT)});
RunSubdivPermsTest(cp.get(),
{{0, 1, 2, 3}, {0, 1, 2, 3}, {0, 1, 2, 3}, {0, 1, 2, 3}},
{0, 0, 0, 0});
}
TEST_F(RingReducerInitParamsTest, AutomaticSubdivUsesDefault) {
const int kNumDevsPerWorker = 1;
const int kNumWorkers = 4;
auto test_env =
CreateCollectiveTestEnv(kNumWorkers, kNumDevsPerWorker, DEVICE_CPU);
auto cp =
CreateCollectiveParams(*test_env, /*rank*/ 0, "RingReduce",
REDUCTION_COLLECTIVE, DT_FLOAT, TensorShape({1}));
cp->default_rank = 0;
// When subdiv_offsets is NOT present and max_subdivs_per_device has a
// == 0 value, the default setting of 2 is used.
cp->instance.impl_details.subdiv_offsets.clear();
cp->instance.impl_details.max_subdivs_per_device = 0;
cp->instance.shape = TensorShape({104857600 / DataTypeSize(DT_FLOAT)});
RunSubdivPermsTest(cp.get(), {{0, 1, 2, 3}, {0, 1, 2, 3}}, {0, 0});
}
TEST_F(RingReducerInitParamsTest, AutomaticSubdivDisabled) {
const int kNumDevsPerWorker = 1;
const int kNumWorkers = 4;
auto test_env =
CreateCollectiveTestEnv(kNumWorkers, kNumDevsPerWorker, DEVICE_CPU);
auto cp =
CreateCollectiveParams(*test_env, /*rank*/ 0, "RingReduce",
REDUCTION_COLLECTIVE, DT_FLOAT, TensorShape({1}));
cp->default_rank = 0;
// When subdiv_offsets is NOT present and max_subdivs_per_device = -1 no
// subidivision should be done. (old behavior)
cp->instance.impl_details.subdiv_offsets.clear();
cp->instance.impl_details.max_subdivs_per_device = -1;
cp->instance.shape = TensorShape({104857600 / DataTypeSize(DT_FLOAT)});
RunSubdivPermsTest(cp.get(), {{0, 1, 2, 3}}, {0});
}
// TODO(b/113171733): change to use TEST_P.
#define DEF_TEST(B, T, W, D, S, L, A) \
TEST_F(RingReducerTest, \
DaTy##B##_DevTy##T##_Wkr##W##_Dev##D##_Sdiv##S##_Len##L##_Abrt##A) { \
DataType dtype = DT_##B; \
switch (dtype) { \
case DT_FLOAT: { \
RunTest<float>(dtype, DEVICE_##T, W, D, S, L, A); \
} break; \
case DT_DOUBLE: { \
RunTest<double>(dtype, DEVICE_##T, W, D, S, L, A); \
} break; \
case DT_BFLOAT16: { \
RunTest<tensorflow::bfloat16>(dtype, DEVICE_##T, W, D, S, L, A); \
} break; \
case DT_INT32: { \
RunTest<int32>(dtype, DEVICE_##T, W, D, S, L, A); \
} break; \
case DT_INT64: { \
RunTest<int64_t>(dtype, DEVICE_##T, W, D, S, L, A); \
} break; \
default: \
LOG(FATAL) << "Unimplemented"; \
} \
}
#if !(GOOGLE_CUDA || TENSORFLOW_USE_ROCM)
// Success tests
DEF_TEST(FLOAT, CPU, 1, 2, 1, 1, 0)
DEF_TEST(FLOAT, CPU, 1, 2, 1, 2, 0)
DEF_TEST(FLOAT, CPU, 1, 2, 1, 8, 0)
DEF_TEST(FLOAT, CPU, 1, 2, 1, 16, 0)
DEF_TEST(FLOAT, CPU, 1, 2, 1, 1001, 0)
DEF_TEST(FLOAT, CPU, 2, 4, 1, 128, 0)
DEF_TEST(FLOAT, CPU, 2, 8, 1, 1001, 0)
DEF_TEST(FLOAT, CPU, 2, 8, 1, 4096, 0)
DEF_TEST(FLOAT, CPU, 2, 8, 1, 9408, 0)
DEF_TEST(FLOAT, CPU, 2, 8, 3, 4095, 0)
DEF_TEST(FLOAT, CPU, 2, 8, 3, 1045991, 0)
DEF_TEST(FLOAT, CPU, 4, 4, 4, 1045991, 0)
DEF_TEST(DOUBLE, CPU, 1, 2, 1, 1001, 0)
DEF_TEST(DOUBLE, CPU, 2, 8, 3, 4095, 0)
DEF_TEST(BFLOAT16, CPU, 1, 2, 1, 8, 0)
DEF_TEST(BFLOAT16, CPU, 2, 8, 3, 16, 0)
DEF_TEST(INT32, CPU, 1, 2, 1, 1001, 0)
DEF_TEST(INT32, CPU, 2, 8, 3, 4095, 0)
DEF_TEST(INT64, CPU, 1, 2, 1, 1001, 0)
DEF_TEST(INT64, CPU, 2, 8, 3, 4095, 0)
// Failure tests
DEF_TEST(FLOAT, CPU, 2, 8, 1, 9408, 1)
DEF_TEST(FLOAT, CPU, 2, 8, 1, 9408, 7)
DEF_TEST(FLOAT, CPU, 2, 8, 2, 9408, 11)
#endif
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
// GPU tests. So long as the device names are all in a single tasks we
// bypass inter-worker routing code and can fake multiple GPUs with a single
// GPU, from the perspective of the RingReducer logic. So these tests
// are all single-worker.
DEF_TEST(FLOAT, GPU, 1, 2, 1, 1, 0)
DEF_TEST(FLOAT, GPU, 1, 2, 1, 2, 0)
DEF_TEST(FLOAT, GPU, 1, 2, 1, 8, 0)
DEF_TEST(FLOAT, GPU, 1, 2, 1, 16, 0)
DEF_TEST(FLOAT, GPU, 1, 2, 1, 1001, 0)
DEF_TEST(FLOAT, GPU, 1, 8, 1, 1001, 0)
DEF_TEST(FLOAT, GPU, 1, 8, 1, 4096, 0)
DEF_TEST(FLOAT, GPU, 1, 8, 3, 4095, 0)
DEF_TEST(FLOAT, GPU, 1, 8, 3, 1045991, 0)
DEF_TEST(FLOAT, GPU, 1, 4, 4, 1045991, 0)
DEF_TEST(DOUBLE, GPU, 1, 2, 1, 1001, 0)
// INT32 values are never on the GPU.
// DEF_TEST(INT32, GPU, 1, 2, 1, 1001, 0)
DEF_TEST(INT64, GPU, 1, 2, 1, 1001, 0)
// Failure tests
DEF_TEST(FLOAT, GPU, 1, 8, 1, 9408, 2)
DEF_TEST(FLOAT, GPU, 1, 8, 2, 9408, 5)
#endif
} // namespace tensorflow