core: fix RetriableStream edge case bug introduced in #8386 (#8393)
While adding regression tests to #8386, I found a bug in an edge case: while retry attempt is draining the last buffered entry, if it is in the mean time committed and then we cancel the call, the stream will never be cancelled. See the regression test case `commitAndCancelWhileDraining()`.
diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java
index 396c7ce..d19a260 100644
--- a/core/src/main/java/io/grpc/internal/RetriableStream.java
+++ b/core/src/main/java/io/grpc/internal/RetriableStream.java
@@ -252,10 +252,14 @@
synchronized (lock) {
savedState = state;
- if (savedState.winningSubstream != null && savedState.winningSubstream != substream
- && streamStarted) {
- // committed but not me, to be cancelled
- break;
+ if (streamStarted) {
+ if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
+ // committed but not me, to be cancelled
+ break;
+ }
+ if (savedState.cancelled) {
+ break;
+ }
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
@@ -277,27 +281,25 @@
}
for (BufferEntry bufferEntry : list) {
- savedState = state;
- if (savedState.winningSubstream != null && savedState.winningSubstream != substream
- && streamStarted) {
- // committed but not me, to be cancelled
- break;
- }
- if (savedState.cancelled && streamStarted) {
- checkState(
- savedState.winningSubstream == substream,
- "substream should be CANCELLED_BECAUSE_COMMITTED already");
- substream.stream.cancel(cancellationStatus);
- return;
- }
bufferEntry.runWith(substream);
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}
+ if (streamStarted) {
+ savedState = state;
+ if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
+ // committed but not me, to be cancelled
+ break;
+ }
+ if (savedState.cancelled) {
+ break;
+ }
+ }
}
}
- substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
+ substream.stream.cancel(
+ state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
/**
diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
index 26c6fcf..95d2c2b 100644
--- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
@@ -750,6 +750,91 @@
}
@Test
+ public void cancelWhileDraining() {
+ ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
+ ArgumentCaptor.forClass(ClientStreamListener.class);
+ ClientStream mockStream1 = mock(ClientStream.class);
+ ClientStream mockStream2 =
+ mock(
+ ClientStream.class,
+ delegatesTo(
+ new NoopClientStream() {
+ @Override
+ public void request(int numMessages) {
+ retriableStream.cancel(
+ Status.CANCELLED.withDescription("cancelled while requesting"));
+ }
+ }));
+
+ InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
+ doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
+ retriableStream.start(masterListener);
+ inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+ retriableStream.request(3);
+ inOrder.verify(mockStream1).request(3);
+
+ // retry
+ doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
+ sublistenerCaptor1.getValue().closed(
+ Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
+ fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
+
+ inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
+ inOrder.verify(mockStream2).request(3);
+ inOrder.verify(retriableStreamRecorder).postCommit();
+ ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
+ inOrder.verify(mockStream2).cancel(statusCaptor.capture());
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
+ assertThat(statusCaptor.getValue().getDescription())
+ .isEqualTo("Stream thrown away because RetriableStream committed");
+ verify(masterListener).closed(
+ statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
+ assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while requesting");
+ }
+
+ @Test
+ public void cancelWhileRetryStart() {
+ ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
+ ArgumentCaptor.forClass(ClientStreamListener.class);
+ ClientStream mockStream1 = mock(ClientStream.class);
+ ClientStream mockStream2 =
+ mock(
+ ClientStream.class,
+ delegatesTo(
+ new NoopClientStream() {
+ @Override
+ public void start(ClientStreamListener listener) {
+ retriableStream.cancel(
+ Status.CANCELLED.withDescription("cancelled while retry start"));
+ }
+ }));
+
+ InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
+ doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
+ retriableStream.start(masterListener);
+ inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
+
+ // retry
+ doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
+ sublistenerCaptor1.getValue().closed(
+ Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
+ fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
+
+ inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
+ inOrder.verify(retriableStreamRecorder).postCommit();
+ ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
+ inOrder.verify(mockStream2).cancel(statusCaptor.capture());
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
+ assertThat(statusCaptor.getValue().getDescription())
+ .isEqualTo("Stream thrown away because RetriableStream committed");
+ verify(masterListener).closed(
+ statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
+ assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while retry start");
+ }
+
+ @Test
public void operationsAfterImmediateCommit() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@@ -916,6 +1001,47 @@
verify(mockStream3).request(1);
}
+ @Test
+ public void commitAndCancelWhileDraining() {
+ ClientStream mockStream1 = mock(ClientStream.class);
+ ClientStream mockStream2 =
+ mock(
+ ClientStream.class,
+ delegatesTo(
+ new NoopClientStream() {
+ @Override
+ public void start(ClientStreamListener listener) {
+ // commit while draining
+ listener.headersRead(new Metadata());
+ // cancel while draining
+ retriableStream.cancel(
+ Status.CANCELLED.withDescription("cancelled while drained"));
+ }
+ }));
+
+ when(retriableStreamRecorder.newSubstream(anyInt()))
+ .thenReturn(mockStream1, mockStream2);
+
+ retriableStream.start(masterListener);
+
+ ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
+ ArgumentCaptor.forClass(ClientStreamListener.class);
+ verify(mockStream1).start(sublistenerCaptor1.capture());
+
+ ClientStreamListener listener1 = sublistenerCaptor1.getValue();
+
+ // retry
+ listener1.closed(
+ Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
+ fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
+
+ verify(mockStream2).start(any(ClientStreamListener.class));
+ verify(retriableStreamRecorder).postCommit();
+ ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
+ verify(mockStream2).cancel(statusCaptor.capture());
+ assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
+ assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while drained");
+ }
@Test
public void perRpcBufferLimitExceeded() {