Implement a OpStats combiner, with fixes for tensorflow build.
PiperOrigin-RevId: 329985619
Change-Id: Ib8a68a30c685b8ebc38f726a6516c64accf54375
diff --git a/tensorflow/core/profiler/convert/BUILD b/tensorflow/core/profiler/convert/BUILD
index 4931d52..f1dc37e 100644
--- a/tensorflow/core/profiler/convert/BUILD
+++ b/tensorflow/core/profiler/convert/BUILD
@@ -540,3 +540,22 @@
"@com_google_absl//absl/strings",
],
)
+
+cc_library(
+ name = "op_stats_combiner",
+ srcs = ["op_stats_combiner.cc"],
+ hdrs = ["op_stats_combiner.h"],
+ deps = [
+ ":op_metrics_db_combiner",
+ ":xplane_to_tf_functions",
+ "//tensorflow/core:lib",
+ "//tensorflow/core/profiler/protobuf:diagnostics_proto_cc",
+ "//tensorflow/core/profiler/protobuf:hardware_types_proto_cc",
+ "//tensorflow/core/profiler/protobuf:kernel_stats_proto_cc",
+ "//tensorflow/core/profiler/protobuf:op_stats_proto_cc",
+ "//tensorflow/core/profiler/protobuf:steps_db_proto_cc",
+ "//tensorflow/core/profiler/utils:hardware_type_utils",
+ "//tensorflow/core/profiler/utils:step_interval",
+ "@com_google_absl//absl/container:flat_hash_map",
+ ],
+)
diff --git a/tensorflow/core/profiler/convert/op_stats_combiner.cc b/tensorflow/core/profiler/convert/op_stats_combiner.cc
new file mode 100644
index 0000000..aafc619
--- /dev/null
+++ b/tensorflow/core/profiler/convert/op_stats_combiner.cc
@@ -0,0 +1,174 @@
+/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include "tensorflow/core/profiler/convert/op_stats_combiner.h"
+
+#include "absl/container/flat_hash_map.h"
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/macros.h"
+#include "tensorflow/core/platform/protobuf.h"
+#include "tensorflow/core/profiler/convert/op_metrics_db_combiner.h"
+#include "tensorflow/core/profiler/convert/xplane_to_tf_functions.h"
+#include "tensorflow/core/profiler/protobuf/diagnostics.pb.h"
+#include "tensorflow/core/profiler/protobuf/hardware_types.pb.h"
+#include "tensorflow/core/profiler/protobuf/kernel_stats.pb.h"
+#include "tensorflow/core/profiler/protobuf/op_stats.pb.h"
+#include "tensorflow/core/profiler/protobuf/steps_db.pb.h"
+#include "tensorflow/core/profiler/utils/hardware_type_utils.h"
+#include "tensorflow/core/profiler/utils/step_interval.h"
+
+namespace tensorflow {
+namespace profiler {
+
+namespace {
+
+// Combines the src PerCoreStepInfo into the dst PerCoreStepInfo.
+void CombinePerCoreStepInfo(
+ int src_host_id, bool use_incomplete_step, const PerCoreStepInfo& src,
+ PerCoreStepInfo* dst,
+ OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
+ OpMetricsDbCombiner* hlo_metrics_db_per_step_combiner) {
+ DCHECK_EQ(dst->step_num(), src.step_num());
+ CombineCoreIdMap(src_host_id, src.step_info_per_core(),
+ dst->mutable_step_info_per_core());
+ if (!use_incomplete_step) {
+ hlo_metrics_db_complete_steps_only_combiner->Combine(src.hlo_metrics_db());
+ }
+ hlo_metrics_db_per_step_combiner->Combine(src.hlo_metrics_db());
+ CombineCoreIdMap(src_host_id, src.flow_db_per_core(),
+ dst->mutable_flow_db_per_core());
+ CombineCoreIdMap(src_host_id, src.all_reduce_db_per_core(),
+ dst->mutable_all_reduce_db_per_core());
+ CombineCoreIdMap(src_host_id, src.core_id_to_replica_id_map(),
+ dst->mutable_core_id_to_replica_id_map());
+}
+
+void CombineStepDatabase(
+ int src_host_id, StepInterval step_intersection,
+ const StepDatabaseResult& src, StepDatabaseResult* dst,
+ OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
+ std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
+ if (src.use_incomplete_step()) {
+ dst->set_use_incomplete_step(true);
+ }
+ for (const PerCoreStepInfo& src_step_info : src.step_sequence()) {
+ uint32 step_num = src_step_info.step_num();
+ if (!step_intersection.Contains(step_num)) {
+ continue;
+ }
+ uint32 dst_step_sequence_index = step_intersection.Index(step_num);
+ CombinePerCoreStepInfo(
+ src_host_id, src.use_incomplete_step(), src_step_info,
+ dst->mutable_step_sequence(dst_step_sequence_index),
+ hlo_metrics_db_complete_steps_only_combiner,
+ &(*hlo_metrics_db_per_step_combiners)[dst_step_sequence_index]);
+ }
+}
+
+void CombineRunEnvironment(const RunEnvironment& src, RunEnvironment* dst) {
+ dst->mutable_hostnames()->insert(src.hostnames().begin(),
+ src.hostnames().end());
+ dst->set_host_count(dst->hostnames_size());
+ if (src.device_type() != "CPU") {
+ dst->set_device_type(src.device_type());
+ // TODO(b/111402648): Batch size may differ per-core. Currently, we report
+ // the max batch size. We need to come up with a better measure.
+ dst->set_per_core_batch_size(
+ std::max(src.per_core_batch_size(), dst->per_core_batch_size()));
+ dst->set_device_core_count(src.device_core_count() +
+ dst->device_core_count());
+ // Replica count and num cores per replica must be same for all copies.
+ dst->set_replica_count(std::max(src.replica_count(), dst->replica_count()));
+ dst->set_num_cores_per_replica(
+ std::max(src.num_cores_per_replica(), dst->num_cores_per_replica()));
+ *dst->mutable_topology() = src.topology();
+ }
+ dst->set_task_count(src.task_count() + dst->task_count());
+ (*dst->mutable_host_independent_job_info()) = src.host_independent_job_info();
+ for (const auto& job_info : src.host_dependent_job_info()) {
+ *(dst->add_host_dependent_job_info()) = job_info;
+ }
+ dst->set_host_trace_level(src.host_trace_level());
+}
+
+// Combines the src PerfEnv into the dst PerfEnv.
+void CombinePerfEnv(const PerfEnv& src, PerfEnv* dst) {
+ dst->set_peak_tera_flops_per_second(src.peak_tera_flops_per_second());
+ dst->set_peak_hbm_bw_giga_bytes_per_second(
+ src.peak_hbm_bw_giga_bytes_per_second());
+ dst->set_ridge_point(src.ridge_point());
+}
+
+// Combines the src Diagnostics into the dst Diagnostics.
+void CombineDiagnostics(const Diagnostics& src, Diagnostics* dst) {
+ dst->mutable_info()->MergeFrom(src.info());
+ dst->mutable_warnings()->MergeFrom(src.warnings());
+ dst->mutable_errors()->MergeFrom(src.errors());
+}
+
+} // namespace
+
+bool IsCoordinator(bool no_accelerator_in_system, HardwareType hardware_type) {
+ // A host is a coordinator if:
+ // (1) The host doesn't have a device, and
+ // (2) The system does use accelerator (if not, it uses CPU only and so this
+ // host should be regarded as a worker as well).
+ return !HasDevice(hardware_type) && !no_accelerator_in_system;
+}
+
+uint32 GlobalCoreId(int host_id, uint32 device_ordinal) {
+ constexpr uint32 kMaxDevicesPerHost = 1000; // power-of-10 for debuggability
+ return host_id * kMaxDevicesPerHost + device_ordinal;
+}
+
+void CombineOpStats(
+ bool no_accelerator_in_system, int src_host_id, HardwareType hardware_type,
+ StepInterval step_intersection, const OpStats& src, OpStats* dst,
+ OpMetricsDbCombiner* host_op_metrics_db_combiner,
+ OpMetricsDbCombiner* device_op_metrics_db_combiner,
+ OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
+ std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
+ // Combine host_metrics_db.
+ host_op_metrics_db_combiner->Combine(src.host_op_metrics_db());
+ // Combine device_metrics_db.
+ device_op_metrics_db_combiner->Combine(src.device_op_metrics_db());
+
+ // Combine step_db.
+ if (!IsCoordinator(no_accelerator_in_system, hardware_type)) {
+ CombineStepDatabase(src_host_id, step_intersection, src.step_db(),
+ dst->mutable_step_db(),
+ hlo_metrics_db_complete_steps_only_combiner,
+ hlo_metrics_db_per_step_combiners);
+ }
+
+ // Combine run environment info.
+ CombineRunEnvironment(src.run_environment(), dst->mutable_run_environment());
+
+ // Combine the perf environment info.
+ CombinePerfEnv(src.perf_env(), dst->mutable_perf_env());
+
+ // Combine diagnostics.
+ CombineDiagnostics(src.diagnostics(), dst->mutable_diagnostics());
+
+ // Combine kernel stats.
+ dst->mutable_kernel_stats_db()->mutable_reports()->MergeFrom(
+ src.kernel_stats_db().reports());
+
+ // Combine tf-function stats.
+ CombineTfFunctionDb(src.tf_function_db(), dst->mutable_tf_function_db());
+}
+
+} // namespace profiler
+} // namespace tensorflow
diff --git a/tensorflow/core/profiler/convert/op_stats_combiner.h b/tensorflow/core/profiler/convert/op_stats_combiner.h
new file mode 100644
index 0000000..1b338c9
--- /dev/null
+++ b/tensorflow/core/profiler/convert/op_stats_combiner.h
@@ -0,0 +1,66 @@
+/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef TENSORFLOW_CORE_PROFILER_CONVERT_OP_STATS_COMBINER_H_
+#define TENSORFLOW_CORE_PROFILER_CONVERT_OP_STATS_COMBINER_H_
+
+#include "absl/container/flat_hash_map.h"
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/macros.h"
+#include "tensorflow/core/profiler/convert/op_metrics_db_combiner.h"
+#include "tensorflow/core/profiler/protobuf/hardware_types.pb.h"
+#include "tensorflow/core/profiler/protobuf/op_stats.pb.h"
+#include "tensorflow/core/profiler/utils/step_interval.h"
+
+namespace tensorflow {
+namespace profiler {
+
+// Whether a host is a coordinator.
+bool IsCoordinator(bool no_accelerator_in_system, HardwareType hardware_type);
+
+// Translates the core id from single host to the one for multiple-host.
+// We need this translation because the device_ordinal was assigned when a
+// single host response was given. Now, we need a global core_id to distinguish
+// it with multiple hosts.
+uint32 GlobalCoreId(int host_id, uint32 device_ordinal);
+
+// Combines the src map into the dst map.
+// The src map keys are local core_ids. The src_host_id is used to convert them
+// into global core_ids used as keys in the dst map.
+// REQUIRED: cores from src_host_id are not already in dst.
+template <typename CoreIdMap>
+void CombineCoreIdMap(int src_host_id, const CoreIdMap& src, CoreIdMap* dst) {
+ for (const auto& core_id_and_value : src) {
+ uint32 global_core_id = GlobalCoreId(src_host_id, core_id_and_value.first);
+ auto iter_and_inserted =
+ dst->insert({global_core_id, core_id_and_value.second});
+ DCHECK(iter_and_inserted.second)
+ << "Duplicated core_id: " << iter_and_inserted.first->first;
+ }
+}
+
+// Combine the src OpStats into the dst OpStats.
+void CombineOpStats(
+ bool no_accelerator_in_system, int src_host_id, HardwareType hardware_type,
+ StepInterval step_intersection, const OpStats& src, OpStats* dst,
+ OpMetricsDbCombiner* host_op_metrics_db_combiner,
+ OpMetricsDbCombiner* device_op_metrics_db_combiner,
+ OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
+ std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners);
+
+} // namespace profiler
+} // namespace tensorflow
+
+#endif // TENSORFLOW_CORE_PROFILER_CONVERT_OP_STATS_COMBINER_H_
diff --git a/tensorflow/core/profiler/utils/hardware_type_utils.cc b/tensorflow/core/profiler/utils/hardware_type_utils.cc
index 69b5d47..b7cae76 100644
--- a/tensorflow/core/profiler/utils/hardware_type_utils.cc
+++ b/tensorflow/core/profiler/utils/hardware_type_utils.cc
@@ -82,5 +82,7 @@
return HardwareType::UNKNOWN_HARDWARE;
}
+bool HasDevice(HardwareType x) { return x > tensorflow::profiler::CPU_ONLY; }
+
} // namespace profiler
} // namespace tensorflow
diff --git a/tensorflow/core/profiler/utils/hardware_type_utils.h b/tensorflow/core/profiler/utils/hardware_type_utils.h
index 70090fb..4a1470a 100644
--- a/tensorflow/core/profiler/utils/hardware_type_utils.h
+++ b/tensorflow/core/profiler/utils/hardware_type_utils.h
@@ -28,6 +28,9 @@
HardwareType ParseHardwareType(absl::string_view device_type);
+// Returns true if the given hardware type has a device.
+bool HasDevice(HardwareType x);
+
} // namespace profiler
} // namespace tensorflow