core: delay sending cancel request on client-side when deadline expires (#6328)

diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 13acf79..372a16d 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -68,13 +68,19 @@
   private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
   private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
       = "gzip".getBytes(Charset.forName("US-ASCII"));
+  // When a deadline is exceeded, there is a race between the server receiving the cancellation from
+  // the client and the server cancelling the stream itself. If the client's cancellation is
+  // received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED.
+  // This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common
+  // monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation.
+  @VisibleForTesting
+  static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);
 
   private final MethodDescriptor<ReqT, RespT> method;
   private final Tag tag;
   private final Executor callExecutor;
   private final CallTracer channelCallsTracer;
   private final Context context;
-  private volatile ScheduledFuture<?> deadlineCancellationFuture;
   private final boolean unaryRequest;
   private final CallOptions callOptions;
   private final boolean retryEnabled;
@@ -83,11 +89,14 @@
   private boolean cancelCalled;
   private boolean halfCloseCalled;
   private final ClientTransportProvider clientTransportProvider;
-  private final CancellationListener cancellationListener = new ContextCancellationListener();
+  private ContextCancellationListener cancellationListener;
   private final ScheduledExecutorService deadlineCancellationExecutor;
   private boolean fullStreamDecompression;
   private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
   private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
+  private volatile ScheduledFuture<?> deadlineCancellationNotifyApplicationFuture;
+  private volatile ScheduledFuture<?> deadlineCancellationSendToServerFuture;
+  private boolean observerClosed = false;
 
   ClientCallImpl(
       MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
@@ -117,9 +126,20 @@
   }
 
   private final class ContextCancellationListener implements CancellationListener {
+    private Listener<RespT> observer;
+
+    private ContextCancellationListener(Listener<RespT> observer) {
+      this.observer = observer;
+    }
+
     @Override
     public void cancelled(Context context) {
-      stream.cancel(statusFromCancelled(context));
+      if (context.getDeadline() == null || !context.getDeadline().isExpired()) {
+        stream.cancel(statusFromCancelled(context));
+      } else {
+        Status status = statusFromCancelled(context);
+        delayedCancelOnDeadlineExceeded(status, observer);
+      }
     }
   }
 
@@ -203,18 +223,7 @@
       // Context is already cancelled so no need to create a real stream, just notify the observer
       // of cancellation via callback on the executor
       stream = NoopClientStream.INSTANCE;
-      class ClosedByContext extends ContextRunnable {
-        ClosedByContext() {
-          super(context);
-        }
-
-        @Override
-        public void runInContext() {
-          closeObserver(observer, statusFromCancelled(context), new Metadata());
-        }
-      }
-
-      callExecutor.execute(new ClosedByContext());
+      executeCloseObserverInContext(observer, statusFromCancelled(context));
       return;
     }
     final String compressorName = callOptions.getCompressor();
@@ -223,22 +232,9 @@
       compressor = compressorRegistry.lookupCompressor(compressorName);
       if (compressor == null) {
         stream = NoopClientStream.INSTANCE;
-        class ClosedByNotFoundCompressor extends ContextRunnable {
-          ClosedByNotFoundCompressor() {
-            super(context);
-          }
-
-          @Override
-          public void runInContext() {
-            closeObserver(
-                observer,
-                Status.INTERNAL.withDescription(
-                    String.format("Unable to find compressor by name %s", compressorName)),
-                new Metadata());
-          }
-        }
-
-        callExecutor.execute(new ClosedByNotFoundCompressor());
+        Status status = Status.INTERNAL.withDescription(
+            String.format("Unable to find compressor by name %s", compressorName));
+        executeCloseObserverInContext(observer, status);
         return;
       }
     } else {
@@ -287,6 +283,7 @@
     }
     stream.setDecompressorRegistry(decompressorRegistry);
     channelCallsTracer.reportCallStarted();
+    cancellationListener = new ContextCancellationListener(observer);
     stream.start(new ClientStreamListenerImpl(observer));
 
     // Delay any sources of cancellation after start(), because most of the transports are broken if
@@ -298,8 +295,11 @@
         // If the context has the effective deadline, we don't need to schedule an extra task.
         && !effectiveDeadline.equals(context.getDeadline())
         // If the channel has been terminated, we don't need to schedule an extra task.
-        && deadlineCancellationExecutor != null) {
-      deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
+        && deadlineCancellationExecutor != null
+        // if already expired deadline let failing stream handle
+        && !(stream instanceof FailingClientStream)) {
+      deadlineCancellationNotifyApplicationFuture =
+          startDeadlineNotifyApplicationTimer(effectiveDeadline, observer);
     }
     if (cancelListenersShouldBeRemoved) {
       // Race detected! ClientStreamListener.closed may have been called before
@@ -333,46 +333,98 @@
 
   private void removeContextListenerAndCancelDeadlineFuture() {
     context.removeListener(cancellationListener);
-    ScheduledFuture<?> f = deadlineCancellationFuture;
+    ScheduledFuture<?> f = deadlineCancellationSendToServerFuture;
+    if (f != null) {
+      f.cancel(false);
+    }
+
+    f = deadlineCancellationNotifyApplicationFuture;
     if (f != null) {
       f.cancel(false);
     }
   }
 
-  private class DeadlineTimer implements Runnable {
-    private final long remainingNanos;
+  private ScheduledFuture<?> startDeadlineNotifyApplicationTimer(Deadline deadline,
+      final Listener<RespT> observer) {
+    final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
 
-    DeadlineTimer(long remainingNanos) {
-      this.remainingNanos = remainingNanos;
-    }
-
-    @Override
-    public void run() {
-      InsightBuilder insight = new InsightBuilder();
-      stream.appendTimeoutInsight(insight);
-      // DelayedStream.cancel() is safe to call from a thread that is different from where the
-      // stream is created.
-      long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
-      long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
-
-      StringBuilder buf = new StringBuilder();
-      buf.append("deadline exceeded after ");
-      if (remainingNanos < 0) {
-        buf.append('-');
+    class DeadlineExceededNotifyApplicationTimer implements Runnable {
+      @Override
+      public void run() {
+        Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos);
+        delayedCancelOnDeadlineExceeded(status, observer);
       }
-      buf.append(seconds);
-      buf.append(String.format(".%09d", nanos));
-      buf.append("s. ");
-      buf.append(insight);
-      stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
     }
+
+    return deadlineCancellationExecutor.schedule(
+        new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()),
+        remainingNanos,
+        TimeUnit.NANOSECONDS);
   }
 
-  private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
-    long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
-    return deadlineCancellationExecutor.schedule(
-        new LogExceptionRunnable(
-            new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
+  private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) {
+    final InsightBuilder insight = new InsightBuilder();
+    stream.appendTimeoutInsight(insight);
+
+    long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
+    long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
+
+    StringBuilder buf = new StringBuilder();
+    buf.append("deadline exceeded after ");
+    if (remainingNanos < 0) {
+      buf.append('-');
+    }
+    buf.append(seconds);
+    buf.append(String.format(".%09d", nanos));
+    buf.append("s. ");
+    buf.append(insight);
+
+    return DEADLINE_EXCEEDED.augmentDescription(buf.toString());
+  }
+
+  private void delayedCancelOnDeadlineExceeded(final Status status, Listener<RespT> observer) {
+    if (deadlineCancellationSendToServerFuture != null) {
+      return;
+    }
+
+    class DeadlineExceededSendCancelToServerTimer implements Runnable {
+      @Override
+      public void run() {
+        // DelayedStream.cancel() is safe to call from a thread that is different from where the
+        // stream is created.
+        stream.cancel(status);
+      }
+    }
+
+    // This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a
+    // stream multiple time is safe, the race here is fine.
+    deadlineCancellationSendToServerFuture =  deadlineCancellationExecutor.schedule(
+        new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
+        DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
+        TimeUnit.NANOSECONDS);
+    executeCloseObserverInContext(observer, status);
+  }
+
+  private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
+    class CloseInContext extends ContextRunnable {
+      CloseInContext() {
+        super(context);
+      }
+
+      @Override
+      public void runInContext() {
+        closeObserver(observer, status, new Metadata());
+      }
+    }
+
+    callExecutor.execute(new CloseInContext());
+  }
+
+  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
+    if (!observerClosed) {
+      observerClosed = true;
+      observer.onClose(status, trailers);
+    }
   }
 
   @Nullable
@@ -517,10 +569,6 @@
     return Attributes.EMPTY;
   }
 
-  private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
-    observer.onClose(status, trailers);
-  }
-
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this).add("method", method).toString();
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index ffd511d..091de2c 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -17,6 +17,7 @@
 package io.grpc.internal;
 
 import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS;
 import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,6 +55,7 @@
 import io.grpc.MethodDescriptor;
 import io.grpc.MethodDescriptor.MethodType;
 import io.grpc.Status;
+import io.grpc.Status.Code;
 import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
 import io.grpc.internal.testing.SingleMessageProducer;
 import io.grpc.testing.TestMethodDescriptors;
@@ -131,6 +133,9 @@
   @Captor
   private ArgumentCaptor<Status> statusArgumentCaptor;
 
+  @Captor
+  private ArgumentCaptor<Metadata> metadataArgumentCaptor;
+
   private CallOptions baseCallOptions;
 
   @Before
@@ -165,7 +170,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
     verify(stream).start(listenerArgumentCaptor.capture());
     final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -187,7 +192,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
     verify(stream).start(listenerArgumentCaptor.capture());
     final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -225,7 +230,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
     verify(stream).start(listenerArgumentCaptor.capture());
     final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -261,7 +266,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
     verify(stream).start(listenerArgumentCaptor.capture());
     final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -296,7 +301,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
 
     call.start(callListener, new Metadata());
@@ -320,7 +325,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
 
     call.start(callListener, new Metadata());
@@ -337,7 +342,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
         .setDecompressorRegistry(decompressorRegistry);
     final Metadata metadata = new Metadata();
 
@@ -356,7 +361,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
 
     call.start(callListener, new Metadata());
@@ -527,7 +532,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
 
     context.detach(previous);
@@ -605,7 +610,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
 
     cancellableContext.detach(previous);
@@ -635,7 +640,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
         .setDecompressorRegistry(decompressorRegistry);
 
     cancellableContext.detach(previous);
@@ -680,7 +685,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
     call.start(callListener, new Metadata());
     verify(transport, times(0))
@@ -705,7 +710,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
 
     context.detach(origContext);
@@ -730,7 +735,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
 
     context.detach(origContext);
@@ -755,7 +760,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
 
     context.detach(origContext);
@@ -776,7 +781,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
 
     ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
@@ -794,7 +799,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
 
     verify(stream, never()).setDeadline(any(Deadline.class));
@@ -808,17 +813,30 @@
     ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
         method,
         MoreExecutors.directExecutor(),
-        baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
+        baseCallOptions.withDeadline(
+            Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())),
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
 
     call.start(callListener, new Metadata());
 
-    fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
+    fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
 
-    verify(stream, times(1)).cancel(statusCaptor.capture());
+    // Verify cancel sent to application when deadline just past
+    verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
+    assertThat(statusCaptor.getValue().getDescription())
+        .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
+    verify(stream, never()).cancel(statusCaptor.capture());
+
+    fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1);
+    verify(stream, never()).cancel(any(Status.class));
+
+    // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
+    fakeClock.forwardNanos(1);
+    verify(stream).cancel(statusCaptor.capture());
     assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
     assertThat(statusCaptor.getValue().getDescription())
         .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
@@ -828,8 +846,8 @@
   public void expiredDeadlineCancelsStream_Context() {
     fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
 
-    Context context = Context.current()
-        .withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor);
+    Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker());
+    Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor);
     Context origContext = context.attach();
 
     ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
@@ -839,15 +857,22 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
 
     context.detach(origContext);
 
     call.start(callListener, new Metadata());
 
-    fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
+    fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
+    verify(stream, never()).cancel(statusCaptor.capture());
+    // verify app is notified.
+    verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
+    assertThat(statusCaptor.getValue().getDescription()).contains("context timed out");
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
 
-    verify(stream, times(1)).cancel(statusCaptor.capture());
+    // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
+    fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS);
+    verify(stream).cancel(statusCaptor.capture());
     assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
     assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
   }
@@ -863,7 +888,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     call.start(callListener, new Metadata());
     call.cancel("canceled", null);
 
@@ -888,7 +913,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
 
     Metadata headers = new Metadata();
 
@@ -906,7 +931,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */);
+        /* retryEnabled= */ false);
     final Exception cause = new Exception();
     ClientCall.Listener<Void> callListener =
         new ClientCall.Listener<Void>() {
@@ -944,7 +969,7 @@
         provider,
         deadlineCancellationExecutor,
         channelCallTracer,
-        false /* retryEnabled */)
+        /* retryEnabled= */ false)
             .setDecompressorRegistry(decompressorRegistry);
 
     call.start(callListener, new Metadata());
@@ -957,7 +982,7 @@
   public void getAttributes() {
     ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
         method, MoreExecutors.directExecutor(), baseCallOptions, provider,
-        deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */);
+        deadlineCancellationExecutor, channelCallTracer, /* retryEnabled= */ false);
     Attributes attrs =
         Attributes.newBuilder().set(Key.<String>create("fake key"), "fake value").build();
     when(stream.getAttributes()).thenReturn(attrs);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 8dc4a51..a278ae5 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -3477,7 +3477,7 @@
         CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
     ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
 
-    timer.forwardTime(1234, TimeUnit.SECONDS);
+    timer.forwardTime(5, TimeUnit.SECONDS);
 
     executor.runDueTasks();
     try {
@@ -3488,6 +3488,9 @@
     }
 
     mychannel.shutdownNow();
+    // Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream
+    // will add a task to executor. Cleaning that task here.
+    executor.runDueTasks();
   }
 
   @Deprecated