core: add PerfMark tasks/events for MigratingThreadDeframer (#7146)

diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index 4ae7214..19bdbbc 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -23,6 +23,8 @@
 import io.grpc.Codec;
 import io.grpc.Compressor;
 import io.grpc.Decompressor;
+import io.perfmark.Link;
+import io.perfmark.PerfMark;
 import java.io.InputStream;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -219,15 +221,25 @@
      */
     private void requestMessagesFromDeframer(final int numMessages) {
       if (deframer instanceof ThreadOptimizedDeframer) {
-        deframer.request(numMessages);
+        PerfMark.startTask("AbstractStream.request");
+        try {
+          deframer.request(numMessages);
+        } finally {
+          PerfMark.stopTask("AbstractStream.request");
+        }
         return;
       }
+      final Link link = PerfMark.linkOut();
       class RequestRunnable implements Runnable {
         @Override public void run() {
+          PerfMark.startTask("AbstractStream.request");
+          PerfMark.linkIn(link);
           try {
             deframer.request(numMessages);
           } catch (Throwable t) {
             deframeFailed(t);
+          } finally {
+            PerfMark.stopTask("AbstractStream.request");
           }
         }
       }
diff --git a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
index 90678d2..f820e7f 100644
--- a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
@@ -19,6 +19,8 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import io.grpc.Decompressor;
+import io.perfmark.Link;
+import io.perfmark.PerfMark;
 import java.io.Closeable;
 import java.io.InputStream;
 import java.util.ArrayDeque;
@@ -105,13 +107,25 @@
     } else {
       if (!alreadyEnqueued) {
         if (currentThreadIsTransportThread) {
-          transportListener.messagesAvailable(messageProducer);
+          PerfMark.startTask("MigratingThreadDeframer.messageAvailable");
+          try {
+            transportListener.messagesAvailable(messageProducer);
+          } finally {
+            PerfMark.stopTask("MigratingThreadDeframer.messageAvailable");
+          }
         } else {
+          final Link link = PerfMark.linkOut();
           // SLOW path. This is the "normal" thread-hopping approach for request() when _not_ using
           // MigratingThreadDeframer
           transportExecutor.runOnTransportThread(new Runnable() {
             @Override public void run() {
-              transportListener.messagesAvailable(messageProducer);
+              PerfMark.startTask("MigratingThreadDeframer.messageAvailable");
+              PerfMark.linkIn(link);
+              try {
+                transportListener.messagesAvailable(messageProducer);
+              } finally {
+                PerfMark.stopTask("MigratingThreadDeframer.messageAvailable");
+              }
             }
           });
         }
@@ -126,23 +140,33 @@
     class RequestOp implements Op {
       @Override public void run(boolean isDeframerOnTransportThread) {
         if (isDeframerOnTransportThread) {
+          final Link link = PerfMark.linkOut();
           // We may not be currently on the transport thread, so jump over to it and then do the
           // necessary processing
           transportExecutor.runOnTransportThread(new Runnable() {
             @Override public void run() {
-              // Since processing continues from transport thread while this runnable was
-              // enqueued, the state may have changed since we ran runOnTransportThread. So we
-              // must make sure deframerOnTransportThread==true
-              requestFromTransportThread(numMessages);
+              PerfMark.startTask("MigratingThreadDeframer.request");
+              PerfMark.linkIn(link);
+              try {
+                // Since processing continues from transport thread while this runnable was
+                // enqueued, the state may have changed since we ran runOnTransportThread. So we
+                // must make sure deframerOnTransportThread==true
+                requestFromTransportThread(numMessages);
+              } finally {
+                PerfMark.stopTask("MigratingThreadDeframer.request");
+              }
             }
           });
           return;
         }
+        PerfMark.startTask("MigratingThreadDeframer.request");
         try {
           deframer.request(numMessages);
         } catch (Throwable t) {
           appListener.deframeFailed(t);
           deframer.close(); // unrecoverable state
+        } finally {
+          PerfMark.stopTask("MigratingThreadDeframer.request");
         }
       }
     }
@@ -166,6 +190,7 @@
         }
         if (!deframer.hasPendingDeliveries()) {
           synchronized (lock) {
+            PerfMark.event("MigratingThreadDeframer.deframerOnApplicationThread");
             migratingListener.setDelegate(appListener);
             deframerOnTransportThread = false;
           }
@@ -180,16 +205,21 @@
   public void deframe(final ReadableBuffer data) {
     class DeframeOp implements Op, Closeable {
       @Override public void run(boolean isDeframerOnTransportThread) {
-        if (isDeframerOnTransportThread) {
-          deframer.deframe(data);
-          return;
-        }
-
+        PerfMark.startTask("MigratingThreadDeframer.deframe");
         try {
-          deframer.deframe(data);
-        } catch (Throwable t) {
-          appListener.deframeFailed(t);
-          deframer.close(); // unrecoverable state
+          if (isDeframerOnTransportThread) {
+            deframer.deframe(data);
+            return;
+          }
+
+          try {
+            deframer.deframe(data);
+          } catch (Throwable t) {
+            appListener.deframeFailed(t);
+            deframer.close(); // unrecoverable state
+          }
+        } finally {
+          PerfMark.stopTask("MigratingThreadDeframer.deframe");
         }
       }
 
@@ -238,6 +268,7 @@
           op = opQueue.poll();
           if (op == null) {
             if (deframer.hasPendingDeliveries()) {
+              PerfMark.event("MigratingThreadDeframer.deframerOnTransportThread");
               migratingListener.setDelegate(transportListener);
               deframerOnTransportThread = true;
             }