blob: c8f7254c1a02ffefeb3dbe487baf007f45c5dd7c [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.Attributes.Key;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import io.grpc.internal.testing.SingleMessageProducer;
import io.grpc.testing.TestMethodDescriptors;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Test for {@link ClientCallImpl}.
*/
@RunWith(JUnit4.class)
public class ClientCallImplTest {
private final FakeClock fakeClock = new FakeClock();
private final ScheduledExecutorService deadlineCancellationExecutor =
fakeClock.getScheduledExecutorService();
private final CallTracer channelCallTracer = CallTracer.getDefaultFactory().create();
private final DecompressorRegistry decompressorRegistry =
DecompressorRegistry.getDefaultInstance().with(new Codec.Gzip(), true);
private final MethodDescriptor<Void, Void> method = MethodDescriptor.<Void, Void>newBuilder()
.setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.build();
@Mock private ClientStreamListener streamListener;
@Mock private ClientTransport clientTransport;
@Captor private ArgumentCaptor<Status> statusCaptor;
@Mock
private ClientStreamTracer.Factory streamTracerFactory;
@Mock
private ClientTransport transport;
@Mock
private ClientTransportProvider provider;
@Mock
private ClientStream stream;
@Mock
private ClientCall.Listener<Void> callListener;
@Captor
private ArgumentCaptor<ClientStreamListener> listenerArgumentCaptor;
@Captor
private ArgumentCaptor<Status> statusArgumentCaptor;
private CallOptions baseCallOptions;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(provider.get(any(PickSubchannelArgsImpl.class))).thenReturn(transport);
when(transport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(stream);
baseCallOptions = CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory);
}
@After
public void tearDown() {
verifyZeroInteractions(streamTracerFactory);
}
@Test
public void statusPropagatedFromStreamToCallListener() {
DelayedExecutor executor = new DelayedExecutor();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
executor,
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
streamListener.headersRead(new Metadata());
Status status = Status.RESOURCE_EXHAUSTED.withDescription("simulated");
streamListener.closed(status , new Metadata());
executor.release();
verify(callListener).onClose(same(status), Matchers.isA(Metadata.class));
}
@Test
public void exceptionInOnMessageTakesPrecedenceOverServer() {
DelayedExecutor executor = new DelayedExecutor();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
executor,
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
streamListener.headersRead(new Metadata());
RuntimeException failure = new RuntimeException("bad");
doThrow(failure).when(callListener).onMessage(Matchers.<Void>any());
/*
* In unary calls, the server closes the call right after responding, so the onClose call is
* queued to run. When messageRead is called, an exception will occur and attempt to cancel the
* stream. However, since the server closed it "first" the second exception is lost leading to
* the call being counted as successful.
*/
streamListener
.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[]{})));
streamListener.closed(Status.OK, new Metadata());
executor.release();
verify(callListener).onClose(statusArgumentCaptor.capture(), Matchers.isA(Metadata.class));
Status callListenerStatus = statusArgumentCaptor.getValue();
assertThat(callListenerStatus.getCode()).isEqualTo(Status.Code.CANCELLED);
assertThat(callListenerStatus.getCause()).isSameAs(failure);
verify(stream).cancel(same(callListenerStatus));
}
@Test
public void exceptionInOnHeadersTakesPrecedenceOverServer() {
DelayedExecutor executor = new DelayedExecutor();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
executor,
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
RuntimeException failure = new RuntimeException("bad");
doThrow(failure).when(callListener).onHeaders(any(Metadata.class));
/*
* In unary calls, the server closes the call right after responding, so the onClose call is
* queued to run. When headersRead is called, an exception will occur and attempt to cancel the
* stream. However, since the server closed it "first" the second exception is lost leading to
* the call being counted as successful.
*/
streamListener.headersRead(new Metadata());
streamListener.closed(Status.OK, new Metadata());
executor.release();
verify(callListener).onClose(statusArgumentCaptor.capture(), Matchers.isA(Metadata.class));
Status callListenerStatus = statusArgumentCaptor.getValue();
assertThat(callListenerStatus.getCode()).isEqualTo(Status.Code.CANCELLED);
assertThat(callListenerStatus.getCause()).isSameAs(failure);
verify(stream).cancel(same(callListenerStatus));
}
@Test
public void exceptionInOnReadyTakesPrecedenceOverServer() {
DelayedExecutor executor = new DelayedExecutor();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
executor,
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
RuntimeException failure = new RuntimeException("bad");
doThrow(failure).when(callListener).onReady();
/*
* In unary calls, the server closes the call right after responding, so the onClose call is
* queued to run. When onReady is called, an exception will occur and attempt to cancel the
* stream. However, since the server closed it "first" the second exception is lost leading to
* the call being counted as successful.
*/
streamListener.onReady();
streamListener.closed(Status.OK, new Metadata());
executor.release();
verify(callListener).onClose(statusArgumentCaptor.capture(), Matchers.isA(Metadata.class));
Status callListenerStatus = statusArgumentCaptor.getValue();
assertThat(callListenerStatus.getCode()).isEqualTo(Status.Code.CANCELLED);
assertThat(callListenerStatus.getCause()).isSameAs(failure);
verify(stream).cancel(same(callListenerStatus));
}
@Test
public void advertisedEncodingsAreSent() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(transport).newStream(eq(method), metadataCaptor.capture(), same(baseCallOptions));
Metadata actual = metadataCaptor.getValue();
// there should only be one.
Set<String> acceptedEncodings = ImmutableSet.of(
new String(actual.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY), GrpcUtil.US_ASCII));
assertEquals(decompressorRegistry.getAdvertisedMessageEncodings(), acceptedEncodings);
}
@Test
public void authorityPropagatedToStream() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withAuthority("overridden-authority"),
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream).setAuthority("overridden-authority");
}
@Test
public void callOptionsPropagatedToTransport() {
final CallOptions callOptions = baseCallOptions.withAuthority("dummy_value");
final ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
final Metadata metadata = new Metadata();
call.start(callListener, metadata);
verify(transport).newStream(same(method), same(metadata), same(callOptions));
}
@Test
public void authorityNotPropagatedToStream() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
// Don't provide an authority
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream, never()).setAuthority(any(String.class));
}
@Test
public void prepareHeaders_userAgentIgnored() {
Metadata m = new Metadata();
m.put(GrpcUtil.USER_AGENT_KEY, "batmobile");
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false);
// User Agent is removed and set by the transport
assertThat(m.get(GrpcUtil.USER_AGENT_KEY)).isNotNull();
}
@Test
public void prepareHeaders_ignoreIdentityEncoding() {
Metadata m = new Metadata();
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
}
@Test
public void prepareHeaders_acceptedMessageEncodingsAdded() {
Metadata m = new Metadata();
DecompressorRegistry customRegistry = DecompressorRegistry.emptyInstance()
.with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "a";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return null;
}
}, true)
.with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "b";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return null;
}
}, true)
.with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "c";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return null;
}
}, false); // not advertised
ClientCallImpl.prepareHeaders(m, customRegistry, Codec.Identity.NONE, false);
Iterable<String> acceptedEncodings = ACCEPT_ENCODING_SPLITTER.split(
new String(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY), GrpcUtil.US_ASCII));
// Order may be different, since decoder priorities have not yet been implemented.
assertEquals(ImmutableSet.of("b", "a"), ImmutableSet.copyOf(acceptedEncodings));
}
@Test
public void prepareHeaders_noAcceptedContentEncodingsWithoutFullStreamDecompressionEnabled() {
Metadata m = new Metadata();
ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
@Test
public void prepareHeaders_acceptedMessageAndContentEncodingsAdded() {
Metadata m = new Metadata();
DecompressorRegistry customRegistry =
DecompressorRegistry.emptyInstance()
.with(
new Decompressor() {
@Override
public String getMessageEncoding() {
return "a";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return null;
}
},
true)
.with(
new Decompressor() {
@Override
public String getMessageEncoding() {
return "b";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return null;
}
},
true)
.with(
new Decompressor() {
@Override
public String getMessageEncoding() {
return "c";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return null;
}
},
false); // not advertised
ClientCallImpl.prepareHeaders(m, customRegistry, Codec.Identity.NONE, true);
Iterable<String> acceptedMessageEncodings =
ACCEPT_ENCODING_SPLITTER.split(
new String(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY), GrpcUtil.US_ASCII));
// Order may be different, since decoder priorities have not yet been implemented.
assertEquals(ImmutableSet.of("b", "a"), ImmutableSet.copyOf(acceptedMessageEncodings));
Iterable<String> acceptedContentEncodings =
ACCEPT_ENCODING_SPLITTER.split(
new String(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY), GrpcUtil.US_ASCII));
assertEquals(
ImmutableSet.of("gzip"), ImmutableSet.copyOf(acceptedContentEncodings));
}
@Test
public void prepareHeaders_removeReservedHeaders() {
Metadata m = new Metadata();
m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip");
m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip");
m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII));
ClientCallImpl.prepareHeaders(
m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false);
assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY));
assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY));
assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY));
}
@Test
public void callerContextPropagatedToListener() throws Exception {
// Attach the context which is recorded when the call is created
final Context.Key<String> testKey = Context.key("testing");
Context context = Context.current().withValue(testKey, "testValue");
Context previous = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
context.detach(previous);
// Override the value after creating the call, this should not be seen by callbacks
context = Context.current().withValue(testKey, "badValue");
previous = context.attach();
final AtomicBoolean onHeadersCalled = new AtomicBoolean();
final AtomicBoolean onMessageCalled = new AtomicBoolean();
final AtomicBoolean onReadyCalled = new AtomicBoolean();
final AtomicBoolean observedIncorrectContext = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
call.start(new ClientCall.Listener<Void>() {
@Override
public void onHeaders(Metadata headers) {
onHeadersCalled.set(true);
checkContext();
}
@Override
public void onMessage(Void message) {
onMessageCalled.set(true);
checkContext();
}
@Override
public void onClose(Status status, Metadata trailers) {
checkContext();
latch.countDown();
}
@Override
public void onReady() {
onReadyCalled.set(true);
checkContext();
}
private void checkContext() {
if (!"testValue".equals(testKey.get())) {
observedIncorrectContext.set(true);
}
}
}, new Metadata());
context.detach(previous);
verify(stream).start(listenerArgumentCaptor.capture());
ClientStreamListener listener = listenerArgumentCaptor.getValue();
listener.onReady();
listener.headersRead(new Metadata());
listener.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
listener.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
listener.closed(Status.OK, new Metadata());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(onHeadersCalled.get());
assertTrue(onMessageCalled.get());
assertTrue(onReadyCalled.get());
assertFalse(observedIncorrectContext.get());
}
@Test
public void contextCancellationCancelsStream() throws Exception {
// Attach the context which is recorded when the call is created
Context.CancellableContext cancellableContext = Context.current().withCancellation();
Context previous = cancellableContext.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous);
call.start(callListener, new Metadata());
Throwable t = new Throwable();
cancellableContext.cancel(t);
verify(stream, times(1)).cancel(statusArgumentCaptor.capture());
Status streamStatus = statusArgumentCaptor.getValue();
assertEquals(Status.Code.CANCELLED, streamStatus.getCode());
}
@Test
public void contextAlreadyCancelledNotifiesImmediately() throws Exception {
// Attach the context which is recorded when the call is created
Context.CancellableContext cancellableContext = Context.current().withCancellation();
Throwable cause = new Throwable();
cancellableContext.cancel(cause);
Context previous = cancellableContext.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous);
final SettableFuture<Status> statusFuture = SettableFuture.create();
call.start(new ClientCall.Listener<Void>() {
@Override
public void onClose(Status status, Metadata trailers) {
statusFuture.set(status);
}
}, new Metadata());
// Caller should receive onClose callback.
Status status = statusFuture.get(5, TimeUnit.SECONDS);
assertEquals(Status.Code.CANCELLED, status.getCode());
assertSame(cause, status.getCause());
// Following operations should be no-op.
call.request(1);
call.sendMessage(null);
call.halfClose();
// Stream should never be created.
verifyZeroInteractions(transport);
try {
call.sendMessage(null);
fail("Call has been cancelled");
} catch (IllegalStateException ise) {
// expected
}
}
@Test
public void deadlineExceededBeforeCallStarted() {
CallOptions callOptions = baseCallOptions.withDeadlineAfter(0, TimeUnit.SECONDS);
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
callOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(transport, times(0))
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
verifyZeroInteractions(provider);
}
@Test
public void contextDeadlineShouldBePropagatedToStream() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void contextDeadlineShouldOverrideLargerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(2000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void contextDeadlineShouldNotOverrideSmallerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(2000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void callOptionsDeadlineShouldBePropagatedToStream() {
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void noDeadlineShouldBePropagatedToStream() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream, never()).setDeadline(any(Deadline.class));
}
@Test
public void expiredDeadlineCancelsStream_CallOptions() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
// The deadline needs to be a number large enough to get encompass the call to start, otherwise
// the scheduled cancellation won't be created, and the call will fail early.
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
}
@Test
public void expiredDeadlineCancelsStream_Context() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
Context context = Context.current()
.withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
context.detach(origContext);
call.start(callListener, new Metadata());
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
}
@Test
public void streamCancelAbortsDeadlineTimer() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
call.cancel("canceled", null);
// Run the deadline timer, which should have been cancelled by the previous call to cancel()
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
}
/**
* Without a context or call options deadline,
* a timeout should not be set in metadata.
*/
@Test
public void timeoutShouldNotBeSet() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
Metadata headers = new Metadata();
call.start(callListener, headers);
assertFalse(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
}
@Test
public void cancelInOnMessageShouldInvokeStreamCancel() throws Exception {
final ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
final Exception cause = new Exception();
ClientCall.Listener<Void> callListener =
new ClientCall.Listener<Void>() {
@Override
public void onMessage(Void message) {
call.cancel("foo", cause);
}
};
call.start(callListener, new Metadata());
call.halfClose();
call.request(1);
verify(stream).start(listenerArgumentCaptor.capture());
ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
streamListener.onReady();
streamListener.headersRead(new Metadata());
streamListener
.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
verify(stream).cancel(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("foo", status.getDescription());
assertSame(cause, status.getCause());
}
@Test
public void startAddsMaxSize() {
CallOptions callOptions =
baseCallOptions.withMaxInboundMessageSize(1).withMaxOutboundMessageSize(2);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
callOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream).setMaxInboundMessageSize(1);
verify(stream).setMaxOutboundMessageSize(2);
}
@Test
public void getAttributes() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method, MoreExecutors.directExecutor(), baseCallOptions, provider,
deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */);
Attributes attrs =
Attributes.newBuilder().set(Key.<String>create("fake key"), "fake value").build();
when(stream.getAttributes()).thenReturn(attrs);
assertNotEquals(attrs, call.getAttributes());
call.start(callListener, new Metadata());
assertEquals(attrs, call.getAttributes());
}
private static void assertTimeoutBetween(long timeout, long from, long to) {
assertTrue("timeout: " + timeout + " ns", timeout <= to);
assertTrue("timeout: " + timeout + " ns", timeout >= from);
}
private static final class DelayedExecutor implements Executor {
private final BlockingQueue<Runnable> commands = new LinkedBlockingQueue<Runnable>();
@Override
public void execute(Runnable command) {
commands.add(command);
}
void release() {
while (!commands.isEmpty()) {
commands.poll().run();
}
}
}
}