core: add PerfMark tasks/events for MigratingThreadDeframer (#7146)
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index 4ae7214..19bdbbc 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -23,6 +23,8 @@
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
+import io.perfmark.Link;
+import io.perfmark.PerfMark;
import java.io.InputStream;
import javax.annotation.concurrent.GuardedBy;
@@ -219,15 +221,25 @@
*/
private void requestMessagesFromDeframer(final int numMessages) {
if (deframer instanceof ThreadOptimizedDeframer) {
- deframer.request(numMessages);
+ PerfMark.startTask("AbstractStream.request");
+ try {
+ deframer.request(numMessages);
+ } finally {
+ PerfMark.stopTask("AbstractStream.request");
+ }
return;
}
+ final Link link = PerfMark.linkOut();
class RequestRunnable implements Runnable {
@Override public void run() {
+ PerfMark.startTask("AbstractStream.request");
+ PerfMark.linkIn(link);
try {
deframer.request(numMessages);
} catch (Throwable t) {
deframeFailed(t);
+ } finally {
+ PerfMark.stopTask("AbstractStream.request");
}
}
}
diff --git a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
index 90678d2..f820e7f 100644
--- a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
@@ -19,6 +19,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Decompressor;
+import io.perfmark.Link;
+import io.perfmark.PerfMark;
import java.io.Closeable;
import java.io.InputStream;
import java.util.ArrayDeque;
@@ -105,13 +107,25 @@
} else {
if (!alreadyEnqueued) {
if (currentThreadIsTransportThread) {
- transportListener.messagesAvailable(messageProducer);
+ PerfMark.startTask("MigratingThreadDeframer.messageAvailable");
+ try {
+ transportListener.messagesAvailable(messageProducer);
+ } finally {
+ PerfMark.stopTask("MigratingThreadDeframer.messageAvailable");
+ }
} else {
+ final Link link = PerfMark.linkOut();
// SLOW path. This is the "normal" thread-hopping approach for request() when _not_ using
// MigratingThreadDeframer
transportExecutor.runOnTransportThread(new Runnable() {
@Override public void run() {
- transportListener.messagesAvailable(messageProducer);
+ PerfMark.startTask("MigratingThreadDeframer.messageAvailable");
+ PerfMark.linkIn(link);
+ try {
+ transportListener.messagesAvailable(messageProducer);
+ } finally {
+ PerfMark.stopTask("MigratingThreadDeframer.messageAvailable");
+ }
}
});
}
@@ -126,23 +140,33 @@
class RequestOp implements Op {
@Override public void run(boolean isDeframerOnTransportThread) {
if (isDeframerOnTransportThread) {
+ final Link link = PerfMark.linkOut();
// We may not be currently on the transport thread, so jump over to it and then do the
// necessary processing
transportExecutor.runOnTransportThread(new Runnable() {
@Override public void run() {
- // Since processing continues from transport thread while this runnable was
- // enqueued, the state may have changed since we ran runOnTransportThread. So we
- // must make sure deframerOnTransportThread==true
- requestFromTransportThread(numMessages);
+ PerfMark.startTask("MigratingThreadDeframer.request");
+ PerfMark.linkIn(link);
+ try {
+ // Since processing continues from transport thread while this runnable was
+ // enqueued, the state may have changed since we ran runOnTransportThread. So we
+ // must make sure deframerOnTransportThread==true
+ requestFromTransportThread(numMessages);
+ } finally {
+ PerfMark.stopTask("MigratingThreadDeframer.request");
+ }
}
});
return;
}
+ PerfMark.startTask("MigratingThreadDeframer.request");
try {
deframer.request(numMessages);
} catch (Throwable t) {
appListener.deframeFailed(t);
deframer.close(); // unrecoverable state
+ } finally {
+ PerfMark.stopTask("MigratingThreadDeframer.request");
}
}
}
@@ -166,6 +190,7 @@
}
if (!deframer.hasPendingDeliveries()) {
synchronized (lock) {
+ PerfMark.event("MigratingThreadDeframer.deframerOnApplicationThread");
migratingListener.setDelegate(appListener);
deframerOnTransportThread = false;
}
@@ -180,16 +205,21 @@
public void deframe(final ReadableBuffer data) {
class DeframeOp implements Op, Closeable {
@Override public void run(boolean isDeframerOnTransportThread) {
- if (isDeframerOnTransportThread) {
- deframer.deframe(data);
- return;
- }
-
+ PerfMark.startTask("MigratingThreadDeframer.deframe");
try {
- deframer.deframe(data);
- } catch (Throwable t) {
- appListener.deframeFailed(t);
- deframer.close(); // unrecoverable state
+ if (isDeframerOnTransportThread) {
+ deframer.deframe(data);
+ return;
+ }
+
+ try {
+ deframer.deframe(data);
+ } catch (Throwable t) {
+ appListener.deframeFailed(t);
+ deframer.close(); // unrecoverable state
+ }
+ } finally {
+ PerfMark.stopTask("MigratingThreadDeframer.deframe");
}
}
@@ -238,6 +268,7 @@
op = opQueue.poll();
if (op == null) {
if (deframer.hasPendingDeliveries()) {
+ PerfMark.event("MigratingThreadDeframer.deframerOnTransportThread");
migratingListener.setDelegate(transportListener);
deframerOnTransportThread = true;
}