Instrument ExecutorState::Process with activity watcher.

PiperOrigin-RevId: 463409168
diff --git a/tensorflow/core/common_runtime/BUILD b/tensorflow/core/common_runtime/BUILD
index df062f0..dede6fa 100644
--- a/tensorflow/core/common_runtime/BUILD
+++ b/tensorflow/core/common_runtime/BUILD
@@ -638,6 +638,7 @@
         "//tensorflow/core:lib",
         "//tensorflow/core:lib_internal",
         "//tensorflow/core:protos_all_cc",
+        "//tensorflow/core/activity_watcher",
         "//tensorflow/core/profiler/lib:annotated_traceme",
         "//tensorflow/core/profiler/lib:connected_traceme",
         "//tensorflow/core/profiler/lib:scoped_annotation",
diff --git a/tensorflow/core/common_runtime/executor.cc b/tensorflow/core/common_runtime/executor.cc
index 136fd52..906ae02 100644
--- a/tensorflow/core/common_runtime/executor.cc
+++ b/tensorflow/core/common_runtime/executor.cc
@@ -22,6 +22,7 @@
 #include "absl/memory/memory.h"
 #include "absl/time/time.h"
 #include "absl/types/optional.h"
+#include "tensorflow/core/activity_watcher/activity.h"
 #include "tensorflow/core/common_runtime/costmodel_manager.h"
 #include "tensorflow/core/common_runtime/entry.h"
 #include "tensorflow/core/common_runtime/executor_factory.h"
@@ -298,7 +299,8 @@
                      EntryVector* outputs, NodeExecStatsInterface* stats);
   void ProcessAsync(const NodeItem& item, const OpKernelContext::Params& params,
                     const TaggedNode& tagged_node, Entry* first_input,
-                    NodeExecStatsInterface* stats);
+                    NodeExecStatsInterface* stats,
+                    activity_watcher::ActivityId activity_id);
   void ProcessNoop(NodeExecStatsInterface* stats);
   void ProcessConstTensor(const NodeItem& item, EntryVector* outputs,
                           NodeExecStatsInterface* stats);
@@ -608,13 +610,13 @@
 void ExecutorState<PropagatorStateType>::ProcessAsync(
     const NodeItem& item, const OpKernelContext::Params& params,
     const TaggedNode& tagged_node, Entry* first_input,
-    NodeExecStatsInterface* stats) {
+    NodeExecStatsInterface* stats, activity_watcher::ActivityId activity_id) {
   AsyncOpKernel* async_kernel = item.kernel->AsAsync();
   DCHECK(async_kernel != nullptr);
   AsyncState* state =
       new AsyncState(params, tagged_node, &item, first_input, stats);
 
-  auto done = [this, state]() {
+  auto done = [this, state, activity_id]() {
     Device* device = immutable_state_.params().device;
     NodeExecStatsInterface* stats = state->stats;  // Shorthand
     Entry* first_input = state->first_input;       // Shorthand
@@ -636,6 +638,7 @@
       (first_input + i)->ClearVal();
     }
     propagator_.MaybeMarkCompleted(state->tagged_node);
+    activity_watcher::ActivityEnd(activity_id);
     TaggedNodeSeq ready;
     if (s.ok()) {
       propagator_.PropagateOutputs(state->tagged_node, &outputs, &ready);
@@ -766,6 +769,22 @@
     const int id = item.node_id;
 
     propagator_.MaybeMarkStarted(tagged_node);
+    const activity_watcher::ActivityId activity_id =
+        activity_watcher::ActivityStart(
+            [&]() {
+              return std::make_unique<activity_watcher::Activity>(
+                  "ExecutorState::Process",
+                  activity_watcher::ActivityCategory::kMisc,
+                  activity_watcher::Activity::Attributes{
+                      {"node_name", item.kernel->def().name()},
+                      {"op", item.kernel->def().op()},
+                      {"iter_num", absl::StrCat(tagged_node.get_iter_num())},
+                      {"step_id", absl::StrCat(params.step_id)},
+                      {"node_id", absl::StrCat(id)},
+                      {"device", device->name()},
+                  });
+            },
+            /*level=*/2);
 
     params.track_allocations = false;
     stats = nullptr;
@@ -809,6 +828,7 @@
           (first_input + i)->ClearVal();
         }
         propagator_.MaybeMarkCompleted(tagged_node);
+        activity_watcher::ActivityEnd(activity_id);
         // Continue to process the nodes in 'inline_ready'.
         completed = NodeDone(s, &ready, stats, &inline_ready);
         continue;
@@ -825,7 +845,8 @@
       params.input_alloc_attrs = input_alloc_attrs;
 
       if (item.kernel_is_async) {
-        ProcessAsync(item, params, tagged_node, first_input, stats);
+        ProcessAsync(item, params, tagged_node, first_input, stats,
+                     activity_id);
         launched_asynchronously = true;
       } else {
         s = ProcessSync(item, &params, &outputs, stats);
@@ -846,6 +867,7 @@
         (first_input + i)->ClearVal();
       }
       propagator_.MaybeMarkCompleted(tagged_node);
+      activity_watcher::ActivityEnd(activity_id);
       // Propagates outputs.
       if (s.ok()) {
         propagator_.PropagateOutputs(tagged_node, &outputs, &ready);