core: delay sending cancel request on client-side when deadline expires (#6328)
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 13acf79..372a16d 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -68,13 +68,19 @@
private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
= "gzip".getBytes(Charset.forName("US-ASCII"));
+ // When a deadline is exceeded, there is a race between the server receiving the cancellation from
+ // the client and the server cancelling the stream itself. If the client's cancellation is
+ // received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED.
+ // This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common
+ // monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation.
+ @VisibleForTesting
+ static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);
private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final Executor callExecutor;
private final CallTracer channelCallsTracer;
private final Context context;
- private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest;
private final CallOptions callOptions;
private final boolean retryEnabled;
@@ -83,11 +89,14 @@
private boolean cancelCalled;
private boolean halfCloseCalled;
private final ClientTransportProvider clientTransportProvider;
- private final CancellationListener cancellationListener = new ContextCancellationListener();
+ private ContextCancellationListener cancellationListener;
private final ScheduledExecutorService deadlineCancellationExecutor;
private boolean fullStreamDecompression;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
+ private volatile ScheduledFuture<?> deadlineCancellationNotifyApplicationFuture;
+ private volatile ScheduledFuture<?> deadlineCancellationSendToServerFuture;
+ private boolean observerClosed = false;
ClientCallImpl(
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
@@ -117,9 +126,20 @@
}
private final class ContextCancellationListener implements CancellationListener {
+ private Listener<RespT> observer;
+
+ private ContextCancellationListener(Listener<RespT> observer) {
+ this.observer = observer;
+ }
+
@Override
public void cancelled(Context context) {
- stream.cancel(statusFromCancelled(context));
+ if (context.getDeadline() == null || !context.getDeadline().isExpired()) {
+ stream.cancel(statusFromCancelled(context));
+ } else {
+ Status status = statusFromCancelled(context);
+ delayedCancelOnDeadlineExceeded(status, observer);
+ }
}
}
@@ -203,18 +223,7 @@
// Context is already cancelled so no need to create a real stream, just notify the observer
// of cancellation via callback on the executor
stream = NoopClientStream.INSTANCE;
- class ClosedByContext extends ContextRunnable {
- ClosedByContext() {
- super(context);
- }
-
- @Override
- public void runInContext() {
- closeObserver(observer, statusFromCancelled(context), new Metadata());
- }
- }
-
- callExecutor.execute(new ClosedByContext());
+ executeCloseObserverInContext(observer, statusFromCancelled(context));
return;
}
final String compressorName = callOptions.getCompressor();
@@ -223,22 +232,9 @@
compressor = compressorRegistry.lookupCompressor(compressorName);
if (compressor == null) {
stream = NoopClientStream.INSTANCE;
- class ClosedByNotFoundCompressor extends ContextRunnable {
- ClosedByNotFoundCompressor() {
- super(context);
- }
-
- @Override
- public void runInContext() {
- closeObserver(
- observer,
- Status.INTERNAL.withDescription(
- String.format("Unable to find compressor by name %s", compressorName)),
- new Metadata());
- }
- }
-
- callExecutor.execute(new ClosedByNotFoundCompressor());
+ Status status = Status.INTERNAL.withDescription(
+ String.format("Unable to find compressor by name %s", compressorName));
+ executeCloseObserverInContext(observer, status);
return;
}
} else {
@@ -287,6 +283,7 @@
}
stream.setDecompressorRegistry(decompressorRegistry);
channelCallsTracer.reportCallStarted();
+ cancellationListener = new ContextCancellationListener(observer);
stream.start(new ClientStreamListenerImpl(observer));
// Delay any sources of cancellation after start(), because most of the transports are broken if
@@ -298,8 +295,11 @@
// If the context has the effective deadline, we don't need to schedule an extra task.
&& !effectiveDeadline.equals(context.getDeadline())
// If the channel has been terminated, we don't need to schedule an extra task.
- && deadlineCancellationExecutor != null) {
- deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
+ && deadlineCancellationExecutor != null
+ // if already expired deadline let failing stream handle
+ && !(stream instanceof FailingClientStream)) {
+ deadlineCancellationNotifyApplicationFuture =
+ startDeadlineNotifyApplicationTimer(effectiveDeadline, observer);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before
@@ -333,46 +333,98 @@
private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(cancellationListener);
- ScheduledFuture<?> f = deadlineCancellationFuture;
+ ScheduledFuture<?> f = deadlineCancellationSendToServerFuture;
+ if (f != null) {
+ f.cancel(false);
+ }
+
+ f = deadlineCancellationNotifyApplicationFuture;
if (f != null) {
f.cancel(false);
}
}
- private class DeadlineTimer implements Runnable {
- private final long remainingNanos;
+ private ScheduledFuture<?> startDeadlineNotifyApplicationTimer(Deadline deadline,
+ final Listener<RespT> observer) {
+ final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
- DeadlineTimer(long remainingNanos) {
- this.remainingNanos = remainingNanos;
- }
-
- @Override
- public void run() {
- InsightBuilder insight = new InsightBuilder();
- stream.appendTimeoutInsight(insight);
- // DelayedStream.cancel() is safe to call from a thread that is different from where the
- // stream is created.
- long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
- long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
-
- StringBuilder buf = new StringBuilder();
- buf.append("deadline exceeded after ");
- if (remainingNanos < 0) {
- buf.append('-');
+ class DeadlineExceededNotifyApplicationTimer implements Runnable {
+ @Override
+ public void run() {
+ Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos);
+ delayedCancelOnDeadlineExceeded(status, observer);
}
- buf.append(seconds);
- buf.append(String.format(".%09d", nanos));
- buf.append("s. ");
- buf.append(insight);
- stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
}
+
+ return deadlineCancellationExecutor.schedule(
+ new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()),
+ remainingNanos,
+ TimeUnit.NANOSECONDS);
}
- private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
- long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
- return deadlineCancellationExecutor.schedule(
- new LogExceptionRunnable(
- new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
+ private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) {
+ final InsightBuilder insight = new InsightBuilder();
+ stream.appendTimeoutInsight(insight);
+
+ long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
+ long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
+
+ StringBuilder buf = new StringBuilder();
+ buf.append("deadline exceeded after ");
+ if (remainingNanos < 0) {
+ buf.append('-');
+ }
+ buf.append(seconds);
+ buf.append(String.format(".%09d", nanos));
+ buf.append("s. ");
+ buf.append(insight);
+
+ return DEADLINE_EXCEEDED.augmentDescription(buf.toString());
+ }
+
+ private void delayedCancelOnDeadlineExceeded(final Status status, Listener<RespT> observer) {
+ if (deadlineCancellationSendToServerFuture != null) {
+ return;
+ }
+
+ class DeadlineExceededSendCancelToServerTimer implements Runnable {
+ @Override
+ public void run() {
+ // DelayedStream.cancel() is safe to call from a thread that is different from where the
+ // stream is created.
+ stream.cancel(status);
+ }
+ }
+
+ // This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a
+ // stream multiple time is safe, the race here is fine.
+ deadlineCancellationSendToServerFuture = deadlineCancellationExecutor.schedule(
+ new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
+ DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
+ TimeUnit.NANOSECONDS);
+ executeCloseObserverInContext(observer, status);
+ }
+
+ private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
+ class CloseInContext extends ContextRunnable {
+ CloseInContext() {
+ super(context);
+ }
+
+ @Override
+ public void runInContext() {
+ closeObserver(observer, status, new Metadata());
+ }
+ }
+
+ callExecutor.execute(new CloseInContext());
+ }
+
+ private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
+ if (!observerClosed) {
+ observerClosed = true;
+ observer.onClose(status, trailers);
+ }
}
@Nullable
@@ -517,10 +569,6 @@
return Attributes.EMPTY;
}
- private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
- observer.onClose(status, trailers);
- }
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("method", method).toString();
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index ffd511d..091de2c 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -17,6 +17,7 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -54,6 +55,7 @@
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
+import io.grpc.Status.Code;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import io.grpc.internal.testing.SingleMessageProducer;
import io.grpc.testing.TestMethodDescriptors;
@@ -131,6 +133,9 @@
@Captor
private ArgumentCaptor<Status> statusArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<Metadata> metadataArgumentCaptor;
+
private CallOptions baseCallOptions;
@Before
@@ -165,7 +170,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -187,7 +192,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -225,7 +230,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -261,7 +266,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@@ -296,7 +301,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@@ -320,7 +325,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@@ -337,7 +342,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
final Metadata metadata = new Metadata();
@@ -356,7 +361,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@@ -527,7 +532,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
context.detach(previous);
@@ -605,7 +610,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous);
@@ -635,7 +640,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous);
@@ -680,7 +685,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(transport, times(0))
@@ -705,7 +710,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
@@ -730,7 +735,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
@@ -755,7 +760,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
@@ -776,7 +781,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
@@ -794,7 +799,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream, never()).setDeadline(any(Deadline.class));
@@ -808,17 +813,30 @@
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
- baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
+ baseCallOptions.withDeadline(
+ Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())),
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
- fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
+ fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
- verify(stream, times(1)).cancel(statusCaptor.capture());
+ // Verify cancel sent to application when deadline just past
+ verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
+ assertThat(statusCaptor.getValue().getDescription())
+ .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
+ verify(stream, never()).cancel(statusCaptor.capture());
+
+ fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1);
+ verify(stream, never()).cancel(any(Status.class));
+
+ // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
+ fakeClock.forwardNanos(1);
+ verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
@@ -828,8 +846,8 @@
public void expiredDeadlineCancelsStream_Context() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
- Context context = Context.current()
- .withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor);
+ Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker());
+ Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor);
Context origContext = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
@@ -839,15 +857,22 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
context.detach(origContext);
call.start(callListener, new Metadata());
- fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
+ fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
+ verify(stream, never()).cancel(statusCaptor.capture());
+ // verify app is notified.
+ verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
+ assertThat(statusCaptor.getValue().getDescription()).contains("context timed out");
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
- verify(stream, times(1)).cancel(statusCaptor.capture());
+ // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
+ fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS);
+ verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
}
@@ -863,7 +888,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
call.start(callListener, new Metadata());
call.cancel("canceled", null);
@@ -888,7 +913,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
Metadata headers = new Metadata();
@@ -906,7 +931,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */);
+ /* retryEnabled= */ false);
final Exception cause = new Exception();
ClientCall.Listener<Void> callListener =
new ClientCall.Listener<Void>() {
@@ -944,7 +969,7 @@
provider,
deadlineCancellationExecutor,
channelCallTracer,
- false /* retryEnabled */)
+ /* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@@ -957,7 +982,7 @@
public void getAttributes() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method, MoreExecutors.directExecutor(), baseCallOptions, provider,
- deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */);
+ deadlineCancellationExecutor, channelCallTracer, /* retryEnabled= */ false);
Attributes attrs =
Attributes.newBuilder().set(Key.<String>create("fake key"), "fake value").build();
when(stream.getAttributes()).thenReturn(attrs);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 8dc4a51..a278ae5 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -3477,7 +3477,7 @@
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
- timer.forwardTime(1234, TimeUnit.SECONDS);
+ timer.forwardTime(5, TimeUnit.SECONDS);
executor.runDueTasks();
try {
@@ -3488,6 +3488,9 @@
}
mychannel.shutdownNow();
+ // Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream
+ // will add a task to executor. Cleaning that task here.
+ executor.runDueTasks();
}
@Deprecated