blob: 7f896892d0d005bf89a32ee78d04477b2729fa0f [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
import static junit.framework.TestCase.assertNotSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.BinaryLog;
import io.grpc.CallCredentials;
import io.grpc.CallCredentials.RequestInfo;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.CompositeChannelCredentials;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InsecureChannelCredentials;
import io.grpc.IntegerMarshaller;
import io.grpc.InternalChannelz;
import io.grpc.InternalChannelz.ChannelStats;
import io.grpc.InternalChannelz.ChannelTrace;
import io.grpc.InternalConfigSelector;
import io.grpc.InternalInstrumented;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.NameResolver;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.NameResolverRegistry;
import io.grpc.ProxiedSocketAddress;
import io.grpc.ProxyDetector;
import io.grpc.SecurityLevel;
import io.grpc.ServerMethodDefinition;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StringMarshaller;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
import io.grpc.internal.InternalSubchannel.TransportLogger;
import io.grpc.internal.ManagedChannelImpl.ScParser;
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
import io.grpc.internal.ManagedChannelImplBuilder.UnsupportedClientTransportFactoryBuilder;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import io.grpc.stub.ClientCalls;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.util.ForwardingSubchannel;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
/** Unit tests for {@link ManagedChannelImpl}. */
@RunWith(JUnit4.class)
// TODO(creamsoup) remove backward compatible check when fully migrated
@SuppressWarnings("deprecation")
public class ManagedChannelImplTest {
private static final int DEFAULT_PORT = 447;
private static final MethodDescriptor<String, Integer> method =
MethodDescriptor.<String, Integer>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(new StringMarshaller())
.setResponseMarshaller(new IntegerMarshaller())
.build();
private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
Attributes.Key.create("subchannel-attr-key");
private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10;
private static final String SERVICE_NAME = "fake.example.com";
private static final String AUTHORITY = SERVICE_NAME;
private static final String USER_AGENT = "userAgent";
private static final ClientTransportOptions clientTransportOptions =
new ClientTransportOptions()
.setAuthority(AUTHORITY)
.setUserAgent(USER_AGENT);
private static final String TARGET = "fake://" + SERVICE_NAME;
private static final String MOCK_POLICY_NAME = "mock_lb";
private URI expectedUri;
private final SocketAddress socketAddress =
new SocketAddress() {
@Override
public String toString() {
return "test-addr";
}
};
private final SocketAddress socketAddress2 =
new SocketAddress() {
@Override
public String toString() {
return "test-addr";
}
};
private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
private final EquivalentAddressGroup addressGroup2 =
new EquivalentAddressGroup(Arrays.asList(socketAddress, socketAddress2));
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock balancerRpcExecutor = new FakeClock();
private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(
ManagedChannelImpl.DelayedNameResolverRefresh.class.getName());
}
};
private final InternalChannelz channelz = new InternalChannelz();
@Rule public final MockitoRule mocks = MockitoJUnit.rule();
private ManagedChannelImpl channel;
private Helper helper;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@Captor
private ArgumentCaptor<CallOptions> callOptionsCaptor;
@Mock
private LoadBalancer mockLoadBalancer;
@Mock
private SubchannelStateListener subchannelStateListener;
private final LoadBalancerProvider mockLoadBalancerProvider =
mock(LoadBalancerProvider.class, delegatesTo(new LoadBalancerProvider() {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return mockLoadBalancer;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 999;
}
@Override
public String getPolicyName() {
return MOCK_POLICY_NAME;
}
}));
@Captor
private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor;
@Mock
private SubchannelPicker mockPicker;
@Mock
private ClientTransportFactory mockTransportFactory;
@Mock
private ClientCall.Listener<Integer> mockCallListener;
@Mock
private ClientCall.Listener<Integer> mockCallListener2;
@Mock
private ClientCall.Listener<Integer> mockCallListener3;
@Mock
private ClientCall.Listener<Integer> mockCallListener4;
@Mock
private ClientCall.Listener<Integer> mockCallListener5;
@Mock
private ObjectPool<Executor> executorPool;
@Mock
private ObjectPool<Executor> balancerRpcExecutorPool;
@Mock
private CallCredentials creds;
@Mock
private Executor offloadExecutor;
private ManagedChannelImplBuilder channelBuilder;
private boolean requestConnection = true;
private BlockingQueue<MockClientTransportInfo> transports;
private boolean panicExpected;
@Captor
private ArgumentCaptor<ResolvedAddresses> resolvedAddressCaptor;
private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
private void createChannel(ClientInterceptor... interceptors) {
checkState(channel == null);
channel = new ManagedChannelImpl(
channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
balancerRpcExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors),
timer.getTimeProvider());
if (requestConnection) {
int numExpectedTasks = 0;
// Force-exit the initial idle-mode
channel.syncContext.execute(new Runnable() {
@Override
public void run() {
channel.exitIdleMode();
}
});
if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) {
numExpectedTasks += 1;
}
if (getNameResolverRefresh() != null) {
numExpectedTasks += 1;
}
assertEquals(numExpectedTasks, timer.numPendingTasks());
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
}
}
@Before
public void setUp() throws Exception {
when(mockLoadBalancer.canHandleEmptyAddressListFromNameResolution()).thenCallRealMethod();
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
expectedUri = new URI(TARGET);
transports = TestUtils.captureTransports(mockTransportFactory);
when(mockTransportFactory.getScheduledExecutorService())
.thenReturn(timer.getScheduledExecutorService());
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(balancerRpcExecutorPool.getObject())
.thenReturn(balancerRpcExecutor.getScheduledExecutorService());
channelBuilder = new ManagedChannelImplBuilder(TARGET,
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
configureBuilder(channelBuilder);
}
private void configureBuilder(ManagedChannelImplBuilder channelBuilder) {
channelBuilder
.nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build())
.defaultLoadBalancingPolicy(MOCK_POLICY_NAME)
.userAgent(USER_AGENT)
.idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS)
.offloadExecutor(offloadExecutor);
channelBuilder.executorPool = executorPool;
channelBuilder.binlog = null;
channelBuilder.channelz = channelz;
}
@After
public void allPendingTasksAreRun() throws Exception {
// The "never" verifications in the tests only hold up if all due tasks are done.
// As for timer, although there may be scheduled tasks in a future time, since we don't test
// any time-related behavior in this test suite, we only care the tasks that are due. This
// would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
if (channel != null) {
if (!panicExpected) {
assertFalse(channel.isInPanicMode());
}
channel.shutdownNow();
channel = null;
}
}
@After
public void cleanUp() {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
@Test
public void createSubchannel_outsideSynchronizationContextShouldThrow() {
createChannel();
try {
helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroup)
.build());
fail("Should throw");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().isEqualTo("Not called from the SynchronizationContext");
}
}
@Test
public void createSubchannel_resolverOverrideAuthority() {
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(
socketAddress,
Attributes.newBuilder()
.set(ATTR_AUTHORITY_OVERRIDE, "resolver.override.authority")
.build());
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(addressGroup))
.build());
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
ArgumentCaptor<ClientTransportOptions> transportOptionCaptor = ArgumentCaptor.forClass(null);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), transportOptionCaptor.capture(), any(ChannelLogger.class));
assertThat(transportOptionCaptor.getValue().getAuthority())
.isEqualTo("resolver.override.authority");
}
@Test
public void createSubchannel_channelBuilderOverrideAuthority() {
channelBuilder.overrideAuthority("channel-builder.override.authority");
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(
socketAddress,
Attributes.newBuilder()
.set(ATTR_AUTHORITY_OVERRIDE, "resolver.override.authority")
.build());
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(addressGroup))
.build());
createChannel();
final Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
ArgumentCaptor<ClientTransportOptions> transportOptionCaptor = ArgumentCaptor.forClass(null);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), transportOptionCaptor.capture(), any(ChannelLogger.class));
assertThat(transportOptionCaptor.getValue().getAuthority())
.isEqualTo("channel-builder.override.authority");
final List<EquivalentAddressGroup> subchannelEags = new ArrayList<>();
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannelEags.addAll(subchannel.getAllAddresses());
}
});
assertThat(subchannelEags).isEqualTo(ImmutableList.of(addressGroup));
}
@Test
public void idleModeDisabled() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
// In this test suite, the channel is always created with idle mode disabled.
// No task is scheduled to enter idle mode
assertEquals(0, timer.numPendingTasks());
assertEquals(0, executor.numPendingTasks());
}
@Test
public void immediateDeadlineExceeded() {
createChannel();
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
call.start(mockCallListener, new Metadata());
assertEquals(1, executor.runDueTasks());
verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
}
@Test
public void startCallBeforeNameResolution() throws Exception {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
channel = new ManagedChannelImpl(
channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
balancerRpcExecutorPool, timer.getStopwatchSupplier(),
Collections.<ClientInterceptor>emptyList(), timer.getTimeProvider());
Map<String, Object> rawServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"service\"}],"
+ "\"waitForReady\":true}]}");
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
Metadata headers = new Metadata();
ClientStream mockStream = mock(ClientStream.class);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, headers);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
// Make the transport available
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(null);
verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture());
assertThat(callOptionsCaptor.getValue().isWaitForReady()).isTrue();
verify(mockStream).start(streamListenerCaptor.capture());
// Clean up as much as possible to allow the channel to terminate.
shutdownSafely(helper, subchannel);
timer.forwardNanos(
TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
}
@Test
public void newCallWithConfigSelector() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
channel = new ManagedChannelImpl(
channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
balancerRpcExecutorPool, timer.getStopwatchSupplier(),
Collections.<ClientInterceptor>emptyList(), timer.getTimeProvider());
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(ManagedChannelServiceConfig.empty()));
final Metadata.Key<String> metadataKey =
Metadata.Key.of("test", Metadata.ASCII_STRING_MARSHALLER);
final CallOptions.Key<String> callOptionsKey = CallOptions.Key.create("test");
InternalConfigSelector configSelector = new InternalConfigSelector() {
@Override
public Result selectConfig(final PickSubchannelArgs args) {
return Result.newBuilder()
.setConfig(ManagedChannelServiceConfig.empty())
.setInterceptor(
// An interceptor that mutates CallOptions based on headers value.
new ClientInterceptor() {
String value = args.getHeaders().get(metadataKey);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
callOptions = callOptions.withOption(callOptionsKey, value);
return next.newCall(method, callOptions);
}
})
.build();
}
};
nameResolverFactory.nextAttributes.set(
Attributes.newBuilder().set(InternalConfigSelector.KEY, configSelector).build());
channel.getState(true);
Metadata headers = new Metadata();
headers.put(metadataKey, "fooValue");
ClientStream mockStream = mock(ClientStream.class);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, headers);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
// Make the transport available
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(null);
verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture());
assertThat(callOptionsCaptor.getValue().getOption(callOptionsKey)).isEqualTo("fooValue");
verify(mockStream).start(streamListenerCaptor.capture());
// Clean up as much as possible to allow the channel to terminate.
shutdownSafely(helper, subchannel);
timer.forwardNanos(
TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
}
@Test
public void shutdownWithNoTransportsEverCreated() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
verify(executorPool).getObject();
verify(executorPool, never()).returnObject(any());
channel.shutdown();
assertTrue(channel.isShutdown());
assertTrue(channel.isTerminated());
verify(executorPool).returnObject(executor.getScheduledExecutorService());
}
@Test
public void shutdownNow_pendingCallShouldFail() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
channel.shutdown();
executor.runDueTasks();
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
channel.shutdownNow();
executor.runDueTasks();
verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
}
@Test
public void shutdownWithNoNameResolution_newCallShouldFail() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
channel.shutdown();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
executor.runDueTasks();
verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
}
@Test
public void channelzMembership() throws Exception {
createChannel();
assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
assertFalse(channelz.containsSubchannel(channel.getLogId()));
channel.shutdownNow();
channel.awaitTermination(5, TimeUnit.SECONDS);
assertNull(channelz.getRootChannel(channel.getLogId().getId()));
assertFalse(channelz.containsSubchannel(channel.getLogId()));
}
@Test
public void channelzMembership_subchannel() throws Exception {
createChannel();
assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
// subchannels are not root channels
assertNull(
channelz.getRootChannel(subchannel.getInstrumentedInternalSubchannel().getLogId().getId()));
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
assertThat(getStats(channel).subchannels)
.containsExactly(subchannel.getInstrumentedInternalSubchannel());
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
transportInfo.listener.transportReady();
// terminate transport
transportInfo.listener.transportShutdown(Status.CANCELLED);
transportInfo.listener.transportTerminated();
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
// terminate subchannel
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
shutdownSafely(helper, subchannel);
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
timer.runDueTasks();
assertFalse(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
assertThat(getStats(channel).subchannels).isEmpty();
// channel still appears
assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
}
@Test
public void channelzMembership_oob() throws Exception {
createChannel();
OobChannel oob = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), AUTHORITY);
// oob channels are not root channels
assertNull(channelz.getRootChannel(oob.getLogId().getId()));
assertTrue(channelz.containsSubchannel(oob.getLogId()));
assertThat(getStats(channel).subchannels).containsExactly(oob);
assertTrue(channelz.containsSubchannel(oob.getLogId()));
AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel();
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
assertThat(getStats(oob).subchannels)
.containsExactly(subchannel.getInstrumentedInternalSubchannel());
assertTrue(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
oob.getSubchannel().requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
// terminate transport
transportInfo.listener.transportShutdown(Status.INTERNAL);
transportInfo.listener.transportTerminated();
assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
// terminate oobchannel
oob.shutdown();
assertFalse(channelz.containsSubchannel(oob.getLogId()));
assertThat(getStats(channel).subchannels).isEmpty();
assertFalse(
channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId()));
// channel still appears
assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
}
@Test
public void callsAndShutdown() {
subtestCallsAndShutdown(false, false);
}
@Test
public void callsAndShutdownNow() {
subtestCallsAndShutdown(true, false);
}
/** Make sure shutdownNow() after shutdown() has an effect. */
@Test
public void callsAndShutdownAndShutdownNow() {
subtestCallsAndShutdown(false, true);
}
private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
verify(executorPool).getObject();
ClientStream mockStream = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
Metadata headers = new Metadata();
Metadata headers2 = new Metadata();
// Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
// real transport.
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
verify(mockTransport).start(any(ManagedClientTransport.Listener.class));
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT)))
.thenReturn(mockStream);
when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT)))
.thenReturn(mockStream2);
transportListener.transportReady();
when(mockPicker.pickSubchannel(
new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn(
PickResult.withNoResult());
when(mockPicker.pickSubchannel(
new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn(
PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
// First RPC, will be pending
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
call.start(mockCallListener, headers);
verify(mockTransport, never())
.newStream(same(method), same(headers), same(CallOptions.DEFAULT));
// Second RPC, will be assigned to the real transport
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
call2.start(mockCallListener2, headers2);
verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
verify(mockStream2).start(any(ClientStreamListener.class));
// Shutdown
if (shutdownNow) {
channel.shutdownNow();
} else {
channel.shutdown();
if (shutdownNowAfterShutdown) {
channel.shutdownNow();
shutdownNow = true;
}
}
assertTrue(channel.isShutdown());
assertFalse(channel.isTerminated());
assertThat(nameResolverFactory.resolvers).hasSize(1);
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
// Further calls should fail without going to the transport
ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
call3.start(mockCallListener3, headers2);
timer.runDueTasks();
executor.runDueTasks();
verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
if (shutdownNow) {
// LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated.
verify(mockLoadBalancer).shutdown();
assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
// call should have been aborted by delayed transport
executor.runDueTasks();
verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS),
any(Metadata.class));
} else {
// LoadBalancer and NameResolver are still running.
verify(mockLoadBalancer, never()).shutdown();
assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
// call and call2 are still alive, and can still be assigned to a real transport
SubchannelPicker picker2 = mock(SubchannelPicker.class);
when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, picker2);
executor.runDueTasks();
verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT));
verify(mockStream).start(any(ClientStreamListener.class));
}
// After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports
// will be shutdown.
verify(mockLoadBalancer).shutdown();
assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
if (shutdownNow) {
// Channel shutdownNow() all subchannels after shutting down LoadBalancer
verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS);
} else {
verify(mockTransport, never()).shutdownNow(any(Status.class));
}
// LoadBalancer should shutdown the subchannel
shutdownSafely(helper, subchannel);
if (shutdownNow) {
verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS));
} else {
verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
}
// Killing the remaining real transport will terminate the channel
transportListener.transportShutdown(Status.UNAVAILABLE);
assertFalse(channel.isTerminated());
verify(executorPool, never()).returnObject(any());
transportListener.transportTerminated();
assertTrue(channel.isTerminated());
verify(executorPool).returnObject(executor.getScheduledExecutorService());
verifyNoMoreInteractions(balancerRpcExecutorPool);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
verify(mockTransportFactory).close();
verify(mockTransport, atLeast(0)).getLogId();
verifyNoMoreInteractions(mockTransport);
}
@Test
public void noMoreCallbackAfterLoadBalancerShutdown() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockLoadBalancer).handleResolvedAddresses(resolvedAddressCaptor.capture());
assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup);
SubchannelStateListener stateListener1 = mock(SubchannelStateListener.class);
SubchannelStateListener stateListener2 = mock(SubchannelStateListener.class);
Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener1);
Subchannel subchannel2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener2);
requestConnectionSafely(helper, subchannel1);
requestConnectionSafely(helper, subchannel2);
verify(mockTransportFactory, times(2))
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo1 = transports.poll();
MockClientTransportInfo transportInfo2 = transports.poll();
// LoadBalancer receives all sorts of callbacks
transportInfo1.listener.transportReady();
verify(stateListener1, times(2)).onSubchannelState(stateInfoCaptor.capture());
assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState());
assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState());
verify(stateListener2).onSubchannelState(stateInfoCaptor.capture());
assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
resolver.listener.onError(resolutionError);
verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
verifyNoMoreInteractions(mockLoadBalancer);
channel.shutdown();
verify(mockLoadBalancer).shutdown();
verifyNoMoreInteractions(stateListener1, stateListener2);
// LoadBalancer will normally shutdown all subchannels
shutdownSafely(helper, subchannel1);
shutdownSafely(helper, subchannel2);
// Since subchannels are shutdown, SubchannelStateListeners will only get SHUTDOWN regardless of
// the transport states.
transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo2.listener.transportReady();
verify(stateListener1).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN));
verify(stateListener2).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN));
verifyNoMoreInteractions(stateListener1, stateListener2);
// No more callback should be delivered to LoadBalancer after it's shut down
resolver.listener.onError(resolutionError);
resolver.resolved();
verifyNoMoreInteractions(mockLoadBalancer);
}
@Test
public void interceptor() throws Exception {
final AtomicLong atomic = new AtomicLong();
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
Channel next) {
atomic.set(1);
return next.newCall(method, callOptions);
}
};
createChannel(interceptor);
assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
assertEquals(1, atomic.get());
}
@Test
public void callOptionsExecutor() {
Metadata headers = new Metadata();
ClientStream mockStream = mock(ClientStream.class);
FakeClock callExecutor = new FakeClock();
createChannel();
// Start a call with a call executor
CallOptions options =
CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
ClientCall<String, Integer> call = channel.newCall(method, options);
call.start(mockCallListener, headers);
// Make the transport available
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
assertEquals(0, callExecutor.numPendingTasks());
updateBalancingStateSafely(helper, READY, mockPicker);
// Real streams are started in the call executor if they were previously buffered.
assertEquals(1, callExecutor.runDueTasks());
verify(mockTransport).newStream(same(method), same(headers), same(options));
verify(mockStream).start(streamListenerCaptor.capture());
// Call listener callbacks are also run in the call executor
ClientStreamListener streamListener = streamListenerCaptor.getValue();
Metadata trailers = new Metadata();
assertEquals(0, callExecutor.numPendingTasks());
streamListener.closed(Status.CANCELLED, trailers);
verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
assertEquals(1, callExecutor.runDueTasks());
verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
transportListener.transportShutdown(Status.UNAVAILABLE);
transportListener.transportTerminated();
// Clean up as much as possible to allow the channel to terminate.
shutdownSafely(helper, subchannel);
timer.forwardNanos(
TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
}
@Test
public void nameResolutionFailed() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.setError(error)
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancer).handleNameResolutionError(same(error));
assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
assertEquals(0, resolver.refreshCalled);
timer.forwardNanos(1);
assertEquals(1, resolver.refreshCalled);
verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error));
// Verify an additional name resolution failure does not schedule another timer
resolver.refresh();
verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error));
assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
// Allow the next refresh attempt to succeed
resolver.error = null;
// For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1);
assertEquals(2, resolver.refreshCalled);
timer.forwardNanos(1);
assertEquals(3, resolver.refreshCalled);
assertEquals(0, timer.numPendingTasks());
// Verify that the successful resolution reset the backoff policy
resolver.listener.onError(error);
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
assertEquals(3, resolver.refreshCalled);
timer.forwardNanos(1);
assertEquals(4, resolver.refreshCalled);
assertEquals(0, timer.numPendingTasks());
}
@Test
public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull(nameResolverBackoff);
assertFalse(nameResolverBackoff.isCancelled());
// Add a pending call to the delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// The pending call on the delayed transport stops the name resolver backoff from cancelling
channel.shutdown();
assertFalse(nameResolverBackoff.isCancelled());
// Notify that a subchannel is ready, which drains the delayed transport
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withDrop(status));
updateBalancingStateSafely(helper, READY, picker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(status), any(Metadata.class));
assertTrue(nameResolverBackoff.isCancelled());
}
@Test
public void nameResolverReturnsEmptySubLists_becomeErrorByDefault() throws Exception {
String errorDescription = "NameResolver returned no usable address";
// Pass a FakeNameResolverFactory with an empty list and LB config
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
Map<String, Object> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": { \"setting1\": \"high\" } } ] }");
ManagedChannelServiceConfig parsedServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig));
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
// LoadBalancer received the error
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertSame(Status.Code.UNAVAILABLE, status.getCode());
assertThat(status.getDescription()).startsWith(errorDescription);
// A resolution retry has been scheduled
assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
}
@Test
public void nameResolverReturnsEmptySubLists_optionallyAllowed() throws Exception {
when(mockLoadBalancer.canHandleEmptyAddressListFromNameResolution()).thenReturn(true);
// Pass a FakeNameResolverFactory with an empty list and LB config
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
String rawLbConfig = "{ \"setting1\": \"high\" }";
Object parsedLbConfig = new Object();
Map<String, Object> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": " + rawLbConfig + " } ] }");
ManagedChannelServiceConfig parsedServiceConfig =
createManagedChannelServiceConfig(
rawServiceConfig,
new PolicySelection(
mockLoadBalancerProvider,
parsedLbConfig));
nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig));
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
// LoadBalancer received the empty list and the LB config
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).isEmpty();
assertThat(resultCaptor.getValue().getLoadBalancingPolicyConfig()).isEqualTo(parsedLbConfig);
// A no resolution retry
assertEquals(0, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
}
@Test
public void loadBalancerThrowsInHandleResolvedAddresses() {
RuntimeException ex = new RuntimeException("simulated");
// Delay the success of name resolution until allResolved() is called
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses(any(ResolvedAddresses.class));
// NameResolver returns addresses.
nameResolverFactory.allResolved();
// Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode.
verifyPanicMode(ex);
}
@Test
public void nameResolvedAfterChannelShutdown() {
// Delay the success of name resolution until allResolved() is called.
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
channel.shutdown();
assertTrue(channel.isShutdown());
assertTrue(channel.isTerminated());
verify(mockLoadBalancer).shutdown();
// Name resolved after the channel is shut down, which is possible if the name resolution takes
// time and is not cancellable. The resolved address will be dropped.
nameResolverFactory.allResolved();
verifyNoMoreInteractions(mockLoadBalancer);
}
/**
* Verify that if the first resolved address points to a server that cannot be connected, the call
* will end up with the second address which works.
*/
@Test
public void firstResolvedServerFailedToConnect() throws Exception {
final SocketAddress goodAddress = new SocketAddress() {
@Override public String toString() {
return "goodAddress";
}
};
final SocketAddress badAddress = new SocketAddress() {
@Override public String toString() {
return "badAddress";
}
};
InOrder inOrder = inOrder(mockLoadBalancer, subchannelStateListener);
List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
// Start the call
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
executor.runDueTasks();
// Simulate name resolution results
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
inOrder.verify(mockLoadBalancer).handleResolvedAddresses(resolvedAddressCaptor.capture());
assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup);
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture());
assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
// The channel will starts with the first address (badAddress)
verify(mockTransportFactory)
.newClientTransport(
same(badAddress), any(ClientTransportOptions.class), any(ChannelLogger.class));
verify(mockTransportFactory, times(0))
.newClientTransport(
same(goodAddress), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo badTransportInfo = transports.poll();
// Which failed to connect
badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE);
inOrder.verifyNoMoreInteractions();
// The channel then try the second address (goodAddress)
verify(mockTransportFactory)
.newClientTransport(
same(goodAddress), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo goodTransportInfo = transports.poll();
when(goodTransportInfo.transport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mock(ClientStream.class));
goodTransportInfo.listener.transportReady();
inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture());
assertEquals(READY, stateInfoCaptor.getValue().getState());
// A typical LoadBalancer will call this once the subchannel becomes READY
updateBalancingStateSafely(helper, READY, mockPicker);
// Delayed transport uses the app executor to create real streams.
executor.runDueTasks();
verify(goodTransportInfo.transport).newStream(same(method), same(headers),
same(CallOptions.DEFAULT));
// The bad transport was never used.
verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class),
any(Metadata.class), any(CallOptions.class));
}
@Test
public void failFastRpcFailFromErrorFromBalancer() {
subtestFailRpcFromBalancer(false, false, true);
}
@Test
public void failFastRpcFailFromDropFromBalancer() {
subtestFailRpcFromBalancer(false, true, true);
}
@Test
public void waitForReadyRpcImmuneFromErrorFromBalancer() {
subtestFailRpcFromBalancer(true, false, false);
}
@Test
public void waitForReadyRpcFailFromDropFromBalancer() {
subtestFailRpcFromBalancer(true, true, true);
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel();
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
if (waitForReady) {
callOptions = callOptions.withWaitForReady();
} else {
callOptions = callOptions.withoutWaitForReady();
}
ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
call1.start(mockCallListener, new Metadata());
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
updateBalancingStateSafely(helper, READY, picker);
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener).onClose(same(status), any(Metadata.class));
} else {
verifyNoInteractions(mockCallListener);
}
// This call doesn't involve delayed transport
ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener2).onClose(same(status), any(Metadata.class));
} else {
verifyNoInteractions(mockCallListener2);
}
}
/**
* Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
* wait-for-ready call will still be buffered.
*/
@Test
public void allServersFailedToConnect() throws Exception {
final SocketAddress addr1 = new SocketAddress() {
@Override public String toString() {
return "addr1";
}
};
final SocketAddress addr2 = new SocketAddress() {
@Override public String toString() {
return "addr2";
}
};
InOrder inOrder = inOrder(mockLoadBalancer, subchannelStateListener);
List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
// Start a wait-for-ready call
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// ... and a fail-fast call
ClientCall<String, Integer> call2 =
channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
call2.start(mockCallListener2, headers);
executor.runDueTasks();
// Simulate name resolution results
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
inOrder.verify(mockLoadBalancer).handleResolvedAddresses(resolvedAddressCaptor.capture());
assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup);
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture());
assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
// Connecting to server1, which will fail
verify(mockTransportFactory)
.newClientTransport(
same(addr1), any(ClientTransportOptions.class), any(ChannelLogger.class));
verify(mockTransportFactory, times(0))
.newClientTransport(
same(addr2), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo1 = transports.poll();
transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
// Connecting to server2, which will fail too
verify(mockTransportFactory)
.newClientTransport(
same(addr2), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo2 = transports.poll();
Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect");
transportInfo2.listener.transportShutdown(server2Error);
// ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated
// to LoadBalancer.
inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture());
assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState());
assertSame(server2Error, stateInfoCaptor.getValue().getStatus());
// A typical LoadBalancer would create a picker with error
SubchannelPicker picker2 = mock(SubchannelPicker.class);
when(picker2.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(server2Error));
updateBalancingStateSafely(helper, TRANSIENT_FAILURE, picker2);
executor.runDueTasks();
// ... which fails the fail-fast call
verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class));
// ... while the wait-for-ready call stays
verifyNoMoreInteractions(mockCallListener);
// No real stream was ever created
verify(transportInfo1.transport, times(0))
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(transportInfo2.transport, times(0))
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
}
@Test
public void subchannels() {
createChannel();
// createSubchannel() always return a new Subchannel
Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build();
SubchannelStateListener listener1 = mock(SubchannelStateListener.class);
SubchannelStateListener listener2 = mock(SubchannelStateListener.class);
final Subchannel sub1 = createSubchannelSafely(helper, addressGroup, attrs1, listener1);
final Subchannel sub2 = createSubchannelSafely(helper, addressGroup, attrs2, listener2);
assertNotSame(sub1, sub2);
assertNotSame(attrs1, attrs2);
assertSame(attrs1, sub1.getAttributes());
assertSame(attrs2, sub2.getAttributes());
final AtomicBoolean snippetPassed = new AtomicBoolean(false);
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
// getAddresses() must be called from sync context
assertSame(addressGroup, sub1.getAddresses());
assertSame(addressGroup, sub2.getAddresses());
snippetPassed.set(true);
}
});
assertThat(snippetPassed.get()).isTrue();
// requestConnection()
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class),
any(ClientTransportOptions.class),
any(TransportLogger.class));
requestConnectionSafely(helper, sub1);
verify(mockTransportFactory)
.newClientTransport(
eq(socketAddress),
eq(clientTransportOptions),
isA(TransportLogger.class));
MockClientTransportInfo transportInfo1 = transports.poll();
assertNotNull(transportInfo1);
requestConnectionSafely(helper, sub2);
verify(mockTransportFactory, times(2))
.newClientTransport(
eq(socketAddress),
eq(clientTransportOptions),
isA(TransportLogger.class));
MockClientTransportInfo transportInfo2 = transports.poll();
assertNotNull(transportInfo2);
requestConnectionSafely(helper, sub1);
requestConnectionSafely(helper, sub2);
// The subchannel doesn't matter since this isn't called
verify(mockTransportFactory, times(2))
.newClientTransport(
eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class));
// updateAddresses()
updateAddressesSafely(helper, sub1, Collections.singletonList(addressGroup2));
assertThat(((InternalSubchannel) sub1.getInternalSubchannel()).getAddressGroups())
.isEqualTo(Collections.singletonList(addressGroup2));
// shutdown() has a delay
shutdownSafely(helper, sub1);
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
shutdownSafely(helper, sub1);
verify(transportInfo1.transport, never()).shutdown(any(Status.class));
timer.forwardTime(1, TimeUnit.SECONDS);
verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS));
// ... but not after Channel is terminating
verify(mockLoadBalancer, never()).shutdown();
channel.shutdown();
verify(mockLoadBalancer).shutdown();
verify(transportInfo2.transport, never()).shutdown(any(Status.class));
shutdownSafely(helper, sub2);
verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
// Cleanup
transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo1.listener.transportTerminated();
transportInfo2.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo2.listener.transportTerminated();
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
}
@Test
public void subchannelStringableBeforeStart() {
createChannel();
Subchannel subchannel = createUnstartedSubchannel(helper, addressGroup, Attributes.EMPTY);
assertThat(subchannel.toString()).isNotNull();
}
@Test
public void subchannelLoggerCreatedBeforeSubchannelStarted() {
createChannel();
Subchannel subchannel = createUnstartedSubchannel(helper, addressGroup, Attributes.EMPTY);
assertThat(subchannel.getChannelLogger()).isNotNull();
}
@Test
public void subchannelsWhenChannelShutdownNow() {
createChannel();
Subchannel sub1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Subchannel sub2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, sub1);
requestConnectionSafely(helper, sub2);
assertThat(transports).hasSize(2);
MockClientTransportInfo ti1 = transports.poll();
MockClientTransportInfo ti2 = transports.poll();
ti1.listener.transportReady();
ti2.listener.transportReady();
channel.shutdownNow();
verify(ti1.transport).shutdownNow(any(Status.class));
verify(ti2.transport).shutdownNow(any(Status.class));
ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
ti1.listener.transportTerminated();
assertFalse(channel.isTerminated());
ti2.listener.transportTerminated();
assertTrue(channel.isTerminated());
}
@Test
public void subchannelsNoConnectionShutdown() {
createChannel();
Subchannel sub1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Subchannel sub2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
channel.shutdown();
verify(mockLoadBalancer).shutdown();
shutdownSafely(helper, sub1);
assertFalse(channel.isTerminated());
shutdownSafely(helper, sub2);
assertTrue(channel.isTerminated());
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void subchannelsNoConnectionShutdownNow() {
createChannel();
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
channel.shutdownNow();
verify(mockLoadBalancer).shutdown();
// Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
// Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels.
assertTrue(channel.isTerminated());
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void oobchannels() {
createChannel();
ManagedChannel oob1 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob1authority");
ManagedChannel oob2 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob2authority");
verify(balancerRpcExecutorPool, times(2)).getObject();
assertEquals("oob1authority", oob1.authority());
assertEquals("oob2authority", oob2.authority());
// OOB channels create connections lazily. A new call will initiate the connection.
Metadata headers = new Metadata();
ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, headers);
verify(mockTransportFactory)
.newClientTransport(
eq(socketAddress),
eq(new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)),
isA(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
assertEquals(0, balancerRpcExecutor.numPendingTasks());
transportInfo.listener.transportReady();
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(transportInfo.transport).newStream(same(method), same(headers),
same(CallOptions.DEFAULT));
// The transport goes away
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo.listener.transportTerminated();
// A new call will trigger a new transport
ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT);
call2.start(mockCallListener2, headers);
ClientCall<String, Integer> call3 =
oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call3.start(mockCallListener3, headers);
verify(mockTransportFactory, times(2)).newClientTransport(
eq(socketAddress),
eq(new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)),
isA(ChannelLogger.class));
transportInfo = transports.poll();
assertNotNull(transportInfo);
// This transport fails
Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
assertEquals(0, balancerRpcExecutor.numPendingTasks());
transportInfo.listener.transportShutdown(transportError);
assertTrue(balancerRpcExecutor.runDueTasks() > 0);
// Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
// Shutdown
assertFalse(oob1.isShutdown());
assertFalse(oob2.isShutdown());
oob1.shutdown();
oob2.shutdownNow();
assertTrue(oob1.isShutdown());
assertTrue(oob2.isShutdown());
assertTrue(oob2.isTerminated());
verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService());
// New RPCs will be rejected.
assertEquals(0, balancerRpcExecutor.numPendingTasks());
ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
call4.start(mockCallListener4, headers);
call5.start(mockCallListener5, headers);
assertTrue(balancerRpcExecutor.runDueTasks() > 0);
verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
Status status4 = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class));
Status status5 = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status5.getCode());
// The pending RPC will still be pending
verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
// This will shutdownNow() the delayed transport, terminating the pending RPC
assertEquals(0, balancerRpcExecutor.numPendingTasks());
oob1.shutdownNow();
assertTrue(balancerRpcExecutor.runDueTasks() > 0);
verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
// Shut down the channel, and it will not terminated because OOB channel has not.
channel.shutdown();
assertFalse(channel.isTerminated());
// Delayed transport has already terminated. Terminating the transport terminates the
// subchannel, which in turn terimates the OOB channel, which terminates the channel.
assertFalse(oob1.isTerminated());
verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService());
transportInfo.listener.transportTerminated();
assertTrue(oob1.isTerminated());
assertTrue(channel.isTerminated());
verify(balancerRpcExecutorPool, times(2))
.returnObject(balancerRpcExecutor.getScheduledExecutorService());
}
@Test
public void oobChannelHasNoChannelCallCredentials() {
Metadata.Key<String> metadataKey =
Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
String channelCredValue = "channel-provided call cred";
channelBuilder = new ManagedChannelImplBuilder(
TARGET, InsecureChannelCredentials.create(),
new FakeCallCredentials(metadataKey, channelCredValue),
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
configureBuilder(channelBuilder);
createChannel();
// Verify that the normal channel has call creds, to validate configuration
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
String callCredValue = "per-RPC call cred";
CallOptions callOptions = CallOptions.DEFAULT
.withCallCredentials(new FakeCallCredentials(metadataKey, callCredValue));
Metadata headers = new Metadata();
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, headers);
verify(transportInfo.transport).newStream(same(method), same(headers), same(callOptions));
assertThat(headers.getAll(metadataKey))
.containsExactly(channelCredValue, callCredValue).inOrder();
// Verify that the oob channel does not
ManagedChannel oob = helper.createOobChannel(
Collections.singletonList(addressGroup), "oobauthority");
headers = new Metadata();
call = oob.newCall(method, callOptions);
call.start(mockCallListener2, headers);
transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
balancerRpcExecutor.runDueTasks();
verify(transportInfo.transport).newStream(same(method), same(headers), same(callOptions));
assertThat(headers.getAll(metadataKey)).containsExactly(callCredValue);
oob.shutdownNow();
// Verify that resolving oob channel does not
oob = helper.createResolvingOobChannelBuilder("oobauthority")
.nameResolverFactory(
new FakeNameResolverFactory.Builder(URI.create("oobauthority")).build())
.defaultLoadBalancingPolicy(MOCK_POLICY_NAME)
.idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS)
.build();
oob.getState(true);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture());
Helper oobHelper = helperCaptor.getValue();
subchannel =
createSubchannelSafely(oobHelper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(oobHelper, subchannel);
transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
SubchannelPicker mockPicker2 = mock(SubchannelPicker.class);
when(mockPicker2.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(oobHelper, READY, mockPicker2);
headers = new Metadata();
call = oob.newCall(method, callOptions);
call.start(mockCallListener2, headers);
// CallOptions may contain StreamTracerFactory for census that is added by default.
verify(transportInfo.transport).newStream(same(method), same(headers), any(CallOptions.class));
assertThat(headers.getAll(metadataKey)).containsExactly(callCredValue);
oob.shutdownNow();
}
@Test
public void oobChannelWithOobChannelCredsHasChannelCallCredentials() {
Metadata.Key<String> metadataKey =
Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
String channelCredValue = "channel-provided call cred";
when(mockTransportFactory.swapChannelCredentials(any(CompositeChannelCredentials.class)))
.thenAnswer(new Answer<SwapChannelCredentialsResult>() {
@Override
public SwapChannelCredentialsResult answer(InvocationOnMock invocation) {
CompositeChannelCredentials c =
invocation.getArgument(0, CompositeChannelCredentials.class);
return new SwapChannelCredentialsResult(mockTransportFactory, c.getCallCredentials());
}
});
channelBuilder = new ManagedChannelImplBuilder(
TARGET, InsecureChannelCredentials.create(),
new FakeCallCredentials(metadataKey, channelCredValue),
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
configureBuilder(channelBuilder);
createChannel();
// Verify that the normal channel has call creds, to validate configuration
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
String callCredValue = "per-RPC call cred";
CallOptions callOptions = CallOptions.DEFAULT
.withCallCredentials(new FakeCallCredentials(metadataKey, callCredValue));
Metadata headers = new Metadata();
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, headers);
verify(transportInfo.transport).newStream(same(method), same(headers), same(callOptions));
assertThat(headers.getAll(metadataKey))
.containsExactly(channelCredValue, callCredValue).inOrder();
// Verify that resolving oob channel with oob channel creds provides call creds
String oobChannelCredValue = "oob-channel-provided call cred";
ChannelCredentials oobChannelCreds = CompositeChannelCredentials.create(
InsecureChannelCredentials.create(),
new FakeCallCredentials(metadataKey, oobChannelCredValue));
ManagedChannel oob = helper.createResolvingOobChannelBuilder(
"fake://oobauthority/", oobChannelCreds)
.nameResolverFactory(
new FakeNameResolverFactory.Builder(URI.create("fake://oobauthority/")).build())
.defaultLoadBalancingPolicy(MOCK_POLICY_NAME)
.idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS)
.build();
oob.getState(true);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture());
Helper oobHelper = helperCaptor.getValue();
subchannel =
createSubchannelSafely(oobHelper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(oobHelper, subchannel);
transportInfo = transports.poll();
transportInfo.listener.transportReady();
SubchannelPicker mockPicker2 = mock(SubchannelPicker.class);
when(mockPicker2.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(oobHelper, READY, mockPicker2);
headers = new Metadata();
call = oob.newCall(method, callOptions);
call.start(mockCallListener2, headers);
// CallOptions may contain StreamTracerFactory for census that is added by default.
verify(transportInfo.transport).newStream(same(method), same(headers), any(CallOptions.class));
assertThat(headers.getAll(metadataKey))
.containsExactly(oobChannelCredValue, callCredValue).inOrder();
oob.shutdownNow();
}
@Test
public void oobChannelsWhenChannelShutdownNow() {
createChannel();
ManagedChannel oob1 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob2Authority");
oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
assertThat(transports).hasSize(2);
MockClientTransportInfo ti1 = transports.poll();
MockClientTransportInfo ti2 = transports.poll();
ti1.listener.transportReady();
ti2.listener.transportReady();
channel.shutdownNow();
verify(ti1.transport).shutdownNow(any(Status.class));
verify(ti2.transport).shutdownNow(any(Status.class));
ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
ti1.listener.transportTerminated();
assertFalse(channel.isTerminated());
ti2.listener.transportTerminated();
assertTrue(channel.isTerminated());
}
@Test
public void oobChannelsNoConnectionShutdown() {
createChannel();
ManagedChannel oob1 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(
Collections.singletonList(addressGroup), "oob2Authority");
channel.shutdown();
verify(mockLoadBalancer).shutdown();
oob1.shutdown();
assertTrue(oob1.isTerminated());
assertFalse(channel.isTerminated());
oob2.shutdown();
assertTrue(oob2.isTerminated());
assertTrue(channel.isTerminated());
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void oobChannelsNoConnectionShutdownNow() {
createChannel();
helper.createOobChannel(Collections.singletonList(addressGroup), "oob1Authority");
helper.createOobChannel(Collections.singletonList(addressGroup), "oob2Authority");
channel.shutdownNow();
verify(mockLoadBalancer).shutdown();
assertTrue(channel.isTerminated());
// Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
// Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels.
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void subchannelChannel_normalUsage() {
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
verify(balancerRpcExecutorPool, never()).getObject();
Channel sChannel = subchannel.asChannel();
verify(balancerRpcExecutorPool).getObject();
Metadata headers = new Metadata();
CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS);
// Subchannel must be READY when creating the RPC.
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
transportListener.transportReady();
ClientCall<String, Integer> call = sChannel.newCall(method, callOptions);
call.start(mockCallListener, headers);
verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getDeadline()).isSameInstanceAs(callOptions.getDeadline());
assertThat(capturedCallOption.getOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER)).isTrue();
}
@Test
public void subchannelChannel_failWhenNotReady() {
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
assertEquals(0, balancerRpcExecutor.numPendingTasks());
// Subchannel is still CONNECTING, but not READY yet
ClientCall<String, Integer> call = sChannel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, headers);
verify(mockTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verifyNoInteractions(mockCallListener);
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(mockCallListener).onClose(
same(SubchannelChannel.NOT_READY_ERROR), any(Metadata.class));
}
@Test
public void subchannelChannel_failWaitForReady() {
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();
// Subchannel must be READY when creating the RPC.
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
transportListener.transportReady();
assertEquals(0, balancerRpcExecutor.numPendingTasks());
// Wait-for-ready RPC is not allowed
ClientCall<String, Integer> call =
sChannel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call.start(mockCallListener, headers);
verify(mockTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verifyNoInteractions(mockCallListener);
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(mockCallListener).onClose(
same(SubchannelChannel.WAIT_FOR_READY_ERROR), any(Metadata.class));
}
@Test
public void lbHelper_getScheduledExecutorService() {
createChannel();
ScheduledExecutorService ses = helper.getScheduledExecutorService();
Runnable task = mock(Runnable.class);
helper.getSynchronizationContext().schedule(task, 110, TimeUnit.NANOSECONDS, ses);
timer.forwardNanos(109);
verify(task, never()).run();
timer.forwardNanos(1);
verify(task).run();
try {
ses.shutdown();
fail("Should throw");
} catch (UnsupportedOperationException e) {
// expected
}
try {
ses.shutdownNow();
fail("Should throw");
} catch (UnsupportedOperationException e) {
// expected
}
}
@Test
public void lbHelper_getNameResolverArgs() {
createChannel();
NameResolver.Args args = helper.getNameResolverArgs();
assertThat(args.getDefaultPort()).isEqualTo(DEFAULT_PORT);
assertThat(args.getProxyDetector()).isSameInstanceAs(GrpcUtil.DEFAULT_PROXY_DETECTOR);
assertThat(args.getSynchronizationContext())
.isSameInstanceAs(helper.getSynchronizationContext());
assertThat(args.getServiceConfigParser()).isNotNull();
}
@Test
public void lbHelper_getNameResolverRegistry() {
createChannel();
assertThat(helper.getNameResolverRegistry())
.isSameInstanceAs(NameResolverRegistry.getDefaultRegistry());
}
@Test
public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, false);
}
@Test
public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, false);
}
@Test
public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, true);
}
@Test
public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, true);
}
private void subtestNameResolutionRefreshWhenConnectionFailed(
boolean isOobChannel, boolean isIdle) {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
if (isOobChannel) {
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
}
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
if (isIdle) {
channel.enterIdle();
// Entering idle mode will result in a new resolver
resolver = nameResolverFactory.resolvers.remove(0);
}
assertEquals(0, nameResolverFactory.resolvers.size());
int expectedRefreshCount = 0;
// Transport closed when connecting
assertEquals(expectedRefreshCount, resolver.refreshCalled);
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
// When channel enters idle, new resolver is created but not started.
if (!isIdle) {
expectedRefreshCount++;
}
assertEquals(expectedRefreshCount, resolver.refreshCalled);
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
transportInfo = transports.poll();
assertNotNull(transportInfo);
transportInfo.listener.transportReady();
// Transport closed when ready
assertEquals(expectedRefreshCount, resolver.refreshCalled);
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
// When channel enters idle, new resolver is created but not started.
if (!isIdle) {
expectedRefreshCount++;
}
assertEquals(expectedRefreshCount, resolver.refreshCalled);
}
@Test
public void uriPattern() {
assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches());
assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched
}
/**
* Test that information such as the Call's context, MethodDescriptor, authority, executor are
* propagated to newStream() and applyRequestMetadata().
*/
@Test
public void informationPropagatedToNewStreamAndCallCredentials() {
createChannel();
CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
final Context.Key<String> testKey = Context.key("testing");
Context ctx = Context.current().withValue(testKey, "testValue");
final LinkedList<Context> credsApplyContexts = new LinkedList<>();
final LinkedList<Context> newStreamContexts = new LinkedList<>();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
credsApplyContexts.add(Context.current());
return null;
}
}).when(creds).applyRequestMetadata(
any(RequestInfo.class), any(Executor.class), any(CallCredentials.MetadataApplier.class));
// First call will be on delayed transport. Only newCall() is run within the expected context,
// so that we can verify that the context is explicitly attached before calling newStream() and
// applyRequestMetadata(), which happens after we detach the context from the thread.
Context origCtx = ctx.attach();
assertEquals("testValue", testKey.get());
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
ctx.detach(origCtx);
assertNull(testKey.get());
call.start(mockCallListener, new Metadata());
// Simulate name resolution results
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
same(socketAddress), eq(clientTransportOptions), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
final ConnectionClientTransport transport = transportInfo.transport;
when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
doAnswer(new Answer<ClientStream>() {
@Override
public ClientStream answer(InvocationOnMock in) throws Throwable {
newStreamContexts.add(Context.current());
return mock(ClientStream.class);
}
}).when(transport).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(creds, never()).applyRequestMetadata(
any(RequestInfo.class), any(Executor.class), any(CallCredentials.MetadataApplier.class));
// applyRequestMetadata() is called after the transport becomes ready.
transportInfo.listener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
verify(creds).applyRequestMetadata(infoCaptor.capture(),
same(executor.getScheduledExecutorService()), applierCaptor.capture());
assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
assertEquals(AUTHORITY, infoCaptor.getValue().getAuthority());
assertEquals(SecurityLevel.NONE, infoCaptor.getValue().getSecurityLevel());
verify(transport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
// newStream() is called after apply() is called
applierCaptor.getValue().apply(new Metadata());
verify(transport).newStream(same(method), any(Metadata.class), same(callOptions));
assertEquals("testValue", testKey.get(newStreamContexts.poll()));
// The context should not live beyond the scope of newStream() and applyRequestMetadata()
assertNull(testKey.get());
// Second call will not be on delayed transport
origCtx = ctx.attach();
call = channel.newCall(method, callOptions);
ctx.detach(origCtx);
call.start(mockCallListener, new Metadata());
verify(creds, times(2)).applyRequestMetadata(infoCaptor.capture(),
same(executor.getScheduledExecutorService()), applierCaptor.capture());
assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
assertEquals(AUTHORITY, infoCaptor.getValue().getAuthority());
assertEquals(SecurityLevel.NONE, infoCaptor.getValue().getSecurityLevel());
// This is from the first call
verify(transport).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
// Still, newStream() is called after apply() is called
applierCaptor.getValue().apply(new Metadata());
verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions));
assertEquals("testValue", testKey.get(newStreamContexts.poll()));
assertNull(testKey.get());
}
@Test
public void pickerReturnsStreamTracer_noDelay() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
updateBalancingStateSafely(helper, READY, mockPicker);
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyNoInteractions(factory1);
verifyNoInteractions(factory2);
}
@Test
public void pickerReturnsStreamTracer_delayed() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyNoInteractions(factory1);
verifyNoInteractions(factory2);
}
@Test
public void getState_loadBalancerSupportsChannelState() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
createChannel();
assertEquals(IDLE, channel.getState(false));
updateBalancingStateSafely(helper, TRANSIENT_FAILURE, mockPicker);
assertEquals(TRANSIENT_FAILURE, channel.getState(false));
}
@Test
public void getState_withRequestConnect() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
requestConnection = false;
createChannel();
assertEquals(IDLE, channel.getState(false));
verify(mockLoadBalancerProvider, never()).newLoadBalancer(any(Helper.class));
// call getState() with requestConnection = true
assertEquals(IDLE, channel.getState(true));
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
assertEquals(CONNECTING, channel.getState(false));
assertEquals(CONNECTING, channel.getState(true));
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
}
@SuppressWarnings("deprecation")
@Test
public void getState_withRequestConnect_IdleWithLbRunning() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
createChannel();
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
updateBalancingStateSafely(helper, IDLE, mockPicker);
assertEquals(IDLE, channel.getState(true));
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockPicker).requestConnection();
verify(mockLoadBalancer).requestConnection();
}
@Test
public void notifyWhenStateChanged() {
final AtomicBoolean stateChanged = new AtomicBoolean();
Runnable onStateChanged = new Runnable() {
@Override
public void run() {
stateChanged.set(true);
}
};
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
createChannel();
assertEquals(IDLE, channel.getState(false));
channel.notifyWhenStateChanged(IDLE, onStateChanged);
executor.runDueTasks();
assertFalse(stateChanged.get());
// state change from IDLE to CONNECTING
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
// onStateChanged callback should run
executor.runDueTasks();
assertTrue(stateChanged.get());
// clear and test form CONNECTING
stateChanged.set(false);
channel.notifyWhenStateChanged(IDLE, onStateChanged);
// onStateChanged callback should run immediately
executor.runDueTasks();
assertTrue(stateChanged.get());
}
@Test
public void channelStateWhenChannelShutdown() {
final AtomicBoolean stateChanged = new AtomicBoolean();
Runnable onStateChanged = new Runnable() {
@Override
public void run() {
stateChanged.set(true);
}
};
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
createChannel();
assertEquals(IDLE, channel.getState(false));
channel.notifyWhenStateChanged(IDLE, onStateChanged);
executor.runDueTasks();
assertFalse(stateChanged.get());
channel.shutdown();
assertEquals(SHUTDOWN, channel.getState(false));
executor.runDueTasks();
assertTrue(stateChanged.get());
stateChanged.set(false);
channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged);
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
assertEquals(SHUTDOWN, channel.getState(false));
executor.runDueTasks();
assertFalse(stateChanged.get());
}
@Test
public void stateIsIdleOnIdleTimeout() {
long idleTimeoutMillis = 2000L;
channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
createChannel();
assertEquals(IDLE, channel.getState(false));
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
assertEquals(CONNECTING, channel.getState(false));
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(IDLE, channel.getState(false));
}
@Test
public void panic_whenIdle() {
subtestPanic(IDLE);
}
@Test
public void panic_whenConnecting() {
subtestPanic(CONNECTING);
}
@Test
public void panic_whenTransientFailure() {
subtestPanic(TRANSIENT_FAILURE);
}
@Test
public void panic_whenReady() {
subtestPanic(READY);
}
private void subtestPanic(ConnectivityState initialState) {
assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState);
long idleTimeoutMillis = 2000L;
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
createChannel();
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
assertThat(nameResolverFactory.resolvers).hasSize(1);
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
final Throwable panicReason = new Exception("Simulated uncaught exception");
if (initialState == IDLE) {
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
} else {
updateBalancingStateSafely(helper, initialState, mockPicker);
}
assertEquals(initialState, channel.getState(false));
if (initialState == IDLE) {
// IDLE mode will shutdown resolver and balancer
verify(mockLoadBalancer).shutdown();
assertTrue(resolver.shutdown);
// A new resolver is created
assertThat(nameResolverFactory.resolvers).hasSize(1);
resolver = nameResolverFactory.resolvers.remove(0);
assertFalse(resolver.shutdown);
} else {
verify(mockLoadBalancer, never()).shutdown();
assertFalse(resolver.shutdown);
}
// Make channel panic!
channel.syncContext.execute(
new Runnable() {
@Override
public void run() {
channel.panic(panicReason);
}
});
// Calls buffered in delayedTransport will fail
// Resolver and balancer are shutdown
verify(mockLoadBalancer).shutdown();
assertTrue(resolver.shutdown);
// Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it.
assertEquals(TRANSIENT_FAILURE, channel.getState(true));
assertEquals(TRANSIENT_FAILURE, channel.getState(true));
verifyPanicMode(panicReason);
// Besides the resolver created initially, no new resolver or balancer are created.
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
assertThat(nameResolverFactory.resolvers).isEmpty();
// A misbehaving balancer that calls updateBalancingState() after it's shut down will not be
// able to revive it.
updateBalancingStateSafely(helper, READY, mockPicker);
verifyPanicMode(panicReason);
// Cannot be revived by exitIdleMode()
channel.syncContext.execute(new Runnable() {
@Override
public void run() {
channel.exitIdleMode();
}
});
verifyPanicMode(panicReason);
// Can still shutdown normally
channel.shutdown();
assertTrue(channel.isShutdown());
assertTrue(channel.isTerminated());
assertEquals(SHUTDOWN, channel.getState(false));
// We didn't stub mockPicker, because it should have never been called in this test.
verifyNoInteractions(mockPicker);
}
@Test
public void panic_bufferedCallsWillFail() {
createChannel();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult());
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
// Start RPCs that will be buffered in delayedTransport
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
call.start(mockCallListener, new Metadata());
ClientCall<String, Integer> call2 =
channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
verifyNoInteractions(mockCallListener, mockCallListener2);
// Enter panic
final Throwable panicReason = new Exception("Simulated uncaught exception");
channel.syncContext.execute(
new Runnable() {
@Override
public void run() {
channel.panic(panicReason);
}
});
// Buffered RPCs fail immediately
executor.runDueTasks();
verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
panicExpected = true;
}
private void verifyPanicMode(Throwable cause) {
panicExpected = true;
@SuppressWarnings("unchecked")
ClientCall.Listener<Integer> mockListener =
(ClientCall.Listener<Integer>) mock(ClientCall.Listener.class);
assertEquals(TRANSIENT_FAILURE, channel.getState(false));
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockListener, new Metadata());
executor.runDueTasks();
verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause);
// Channel is dead. No more pending task to possibly revive it.
assertEquals(0, timer.numPendingTasks());
assertEquals(0, executor.numPendingTasks());
assertEquals(0, balancerRpcExecutor.numPendingTasks());
}
private void verifyCallListenerClosed(
ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null);
verify(listener).onClose(captor.capture(), any(Metadata.class));
Status rpcStatus = captor.getValue();
assertEquals(code, rpcStatus.getCode());
assertSame(cause, rpcStatus.getCause());
verifyNoMoreInteractions(listener);
}
@Test
public void idleTimeoutAndReconnect() {
long idleTimeoutMillis = 2000L;
channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
createChannel();
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(IDLE, channel.getState(true /* request connection */));
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
// Two times of requesting connection will create loadBalancer twice.
verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture());
Helper helper2 = helperCaptor.getValue();
// Updating on the old helper (whose balancer has been shutdown) does not change the channel
// state.
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
assertEquals(IDLE, channel.getState(false));
updateBalancingStateSafely(helper2, CONNECTING, mockPicker);
assertEquals(CONNECTING, channel.getState(false));
}
@Test
public void idleMode_resetsDelayedTransportPicker() {
ClientStream mockStream = mock(ClientStream.class);
Status pickError = Status.UNAVAILABLE.withDescription("pick result error");
long idleTimeoutMillis = 1000L;
channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
assertEquals(IDLE, channel.getState(false));
// This call will be buffered in delayedTransport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
// Move channel into TRANSIENT_FAILURE, which will fail the pending call
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(pickError));
updateBalancingStateSafely(helper, TRANSIENT_FAILURE, mockPicker);
assertEquals(TRANSIENT_FAILURE, channel.getState(false));
executor.runDueTasks();
verify(mockCallListener).onClose(same(pickError), any(Metadata.class));
// Move channel to idle
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(IDLE, channel.getState(false));
// This call should be buffered, but will move the channel out of idle
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
verifyNoMoreInteractions(mockCallListener2);
// Get the helper created on exiting idle
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture());
Helper helper2 = helperCaptor.getValue();
// Establish a connection
Subchannel subchannel =
createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper2, READY, mockPicker);
assertEquals(READY, channel.getState(false));
executor.runDueTasks();
// Verify the buffered call was drained
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
@Test
public void enterIdleEntersIdle() {
createChannel();
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(READY, channel.getState(false));
channel.enterIdle();
assertEquals(IDLE, channel.getState(false));
}
@Test
public void enterIdleAfterIdleTimerIsNoOp() {
long idleTimeoutMillis = 2000L;
channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
createChannel();
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(IDLE, channel.getState(false));
channel.enterIdle();
assertEquals(IDLE, channel.getState(false));
}
@Test
public void enterIdle_exitsIdleIfDelayedStreamPending() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
// Start a call that will be buffered in delayedTransport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
// enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed
// call
channel.enterIdle();
assertEquals(IDLE, channel.getState(false));
// enterIdle() will restart the delayed call by exiting idle. This creates a new helper.
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture());
Helper helper2 = helperCaptor.getValue();
// Establish a connection
Subchannel subchannel =
createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
ClientStream mockStream = mock(ClientStream.class);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper2, READY, mockPicker);
assertEquals(READY, channel.getState(false));
// Verify the original call was drained
executor.runDueTasks();
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
@Test
public void updateBalancingStateDoesUpdatePicker() {
ClientStream mockStream = mock(ClientStream.class);
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
// Make the transport available with subchannel2
Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Subchannel subchannel2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel2);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel1));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport, never())
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(mockStream, never()).start(any(ClientStreamListener.class));
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel2));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
@Test
public void updateBalancingState_withWrappedSubchannel() {
ClientStream mockStream = mock(ClientStream.class);
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
final Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel1);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
Subchannel wrappedSubchannel1 = new ForwardingSubchannel() {
@Override
protected Subchannel delegate() {
return subchannel1;
}
};
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(wrappedSubchannel1));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
@Test
public void updateBalancingStateWithShutdownShouldBeIgnored() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
createChannel();
assertEquals(IDLE, channel.getState(false));
Runnable onStateChanged = mock(Runnable.class);
channel.notifyWhenStateChanged(IDLE, onStateChanged);
updateBalancingStateSafely(helper, SHUTDOWN, mockPicker);
assertEquals(IDLE, channel.getState(false));
executor.runDueTasks();
verify(onStateChanged, never()).run();
}
@Test
public void balancerRefreshNameResolution() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
int initialRefreshCount = resolver.refreshCalled;
refreshNameResolutionSafely(helper);
assertEquals(initialRefreshCount + 1, resolver.refreshCalled);
}
@Test
public void resetConnectBackoff() {
// Start with a name resolution failure to trigger backoff attempts
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull("There should be a name resolver backoff task", nameResolverBackoff);
assertEquals(0, resolver.refreshCalled);
// Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff
channel.resetConnectBackoff();
assertEquals(1, resolver.refreshCalled);
assertTrue(nameResolverBackoff.isCancelled());
// Simulate a race between cancel and the task scheduler. Should be a no-op.
nameResolverBackoff.command.run();
assertEquals(1, resolver.refreshCalled);
// Verify that the reconnect policy was recreated and the backoff multiplier reset to 1
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
assertEquals(2, resolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
channel.resetConnectBackoff();
assertEquals(0, nameResolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWhenChannelShutdown() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
channel.shutdown();
assertTrue(channel.isShutdown());
channel.resetConnectBackoff();
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWhenNameResolverNotStarted() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
requestConnection = false;
createChannel();
channel.resetConnectBackoff();
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
}
@Test
public void channelsAndSubchannels_instrumented_name() throws Exception {
createChannel();
assertEquals(TARGET, getStats(channel).target);
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
assertEquals(Collections.singletonList(addressGroup).toString(),
getStats((AbstractSubchannel) subchannel).target);
}
@Test
public void channelTracing_channelCreationEvent() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
createChannel();
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Channel for 'fake://fake.example.com' created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_subchannelCreationEvents() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel started")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.setSubchannelRef(subchannel.getInstrumentedInternalSubchannel())
.build());
assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Subchannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_nameResolvingErrorEvent() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
Status error = Status.UNAVAILABLE.withDescription("simulated error");
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Failed to resolve name: " + error)
.setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_nameResolvedEvent() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Address resolved: "
+ Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
List<EquivalentAddressGroup> servers = new ArrayList<>();
servers.add(new EquivalentAddressGroup(socketAddress));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
int prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult1);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
prevSize = getStats(channel).channelTrace.events.size();
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
}
@Test
public void channelTracing_serviceConfigChange() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
List<EquivalentAddressGroup> servers = new ArrayList<>();
servers.add(new EquivalentAddressGroup(socketAddress));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
int prevSize = getStats(channel).channelTrace.events.size();
ManagedChannelServiceConfig mcsc1 = createManagedChannelServiceConfig(
ImmutableMap.<String, Object>of(),
new PolicySelection(
mockLoadBalancerProvider, null));
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setServiceConfig(ConfigOrError.fromConfig(mcsc1))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult1);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
assertThat(getStats(channel).channelTrace.events.get(prevSize))
.isEqualTo(new ChannelTrace.Event.Builder()
.setDescription("Service config changed")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
prevSize = getStats(channel).channelTrace.events.size();
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder().setAddresses(
Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setServiceConfig(ConfigOrError.fromConfig(mcsc1))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
timer.forwardNanos(1234);
ResolutionResult resolutionResult3 = ResolutionResult.newBuilder()
.setAddresses(Collections.singletonList(
new EquivalentAddressGroup(
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
.setServiceConfig(ConfigOrError.fromConfig(ManagedChannelServiceConfig.empty()))
.build();
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult3);
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
assertThat(getStats(channel).channelTrace.events.get(prevSize))
.isEqualTo(new ChannelTrace.Event.Builder()
.setDescription("Service config changed")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_stateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Entering CONNECTING state with picker: mockPicker")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_subchannelStateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
timer.forwardNanos(1234);
((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport();
assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("CONNECTING as requested")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_oobChannelStateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "authority");
timer.forwardNanos(1234);
oobChannel.handleSubchannelStateChange(
ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Entering CONNECTING state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_oobChannelCreationEvents() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "authority");
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Child OobChannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.setChannelRef(oobChannel)
.build());
assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("OobChannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains(
new ChannelTrace.Event.Builder()
.setDescription("Subchannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelsAndSubchannels_instrumented_state() throws Exception {
createChannel();
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
assertEquals(IDLE, getStats(channel).state);
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
assertEquals(CONNECTING, getStats(channel).state);
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
assertEquals(IDLE, getStats(subchannel).state);
requestConnectionSafely(helper, subchannel);
assertEquals(CONNECTING, getStats(subchannel).state);
MockClientTransportInfo transportInfo = transports.poll();
assertEquals(CONNECTING, getStats(subchannel).state);
transportInfo.listener.transportReady();
assertEquals(READY, getStats(subchannel).state);
assertEquals(CONNECTING, getStats(channel).state);
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(READY, getStats(channel).state);
channel.shutdownNow();
assertEquals(SHUTDOWN, getStats(channel).state);
assertEquals(SHUTDOWN, getStats(subchannel).state);
}
@Test
public void channelStat_callStarted() throws Exception {
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
assertEquals(1, getStats(channel).callsStarted);
assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos);
}
@Test
public void channelsAndSubChannels_instrumented_success() throws Exception {
channelsAndSubchannels_instrumented0(true);
}
@Test
public void channelsAndSubChannels_instrumented_fail() throws Exception {
channelsAndSubchannels_instrumented0(false);
}
private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
// Channel stat bumped when ClientCall.start() called
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
assertEquals(1, getStats(channel).callsStarted);
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory));
// subchannel stat bumped when call gets assigned to it
assertEquals(0, getStats(subchannel).callsStarted);
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockStream).start(streamListenerCaptor.capture());
assertEquals(1, getStats(subchannel).callsStarted);
ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose();
// closing stream listener affects subchannel stats immediately
assertEquals(0, getStats(subchannel).callsSucceeded);
assertEquals(0, getStats(subchannel).callsFailed);
streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
if (success) {
assertEquals(1, getStats(subchannel).callsSucceeded);
assertEquals(0, getStats(subchannel).callsFailed);
} else {
assertEquals(0, getStats(subchannel).callsSucceeded);
assertEquals(1, getStats(subchannel).callsFailed);
}
// channel stats bumped when the ClientCall.Listener is notified
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
executor.runDueTasks();
if (success) {
assertEquals(1, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
} else {
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(1, getStats(channel).callsFailed);
}
}
@Test
public void channelsAndSubchannels_oob_instrumented_success() throws Exception {
channelsAndSubchannels_oob_instrumented0(true);
}
@Test
public void channelsAndSubchannels_oob_instrumented_fail() throws Exception {
channelsAndSubchannels_oob_instrumented0(false);
}
private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception {
// set up
ClientStream mockStream = mock(ClientStream.class);
createChannel();
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobauthority");
AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
FakeClock callExecutor = new FakeClock();
CallOptions options =
CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
ClientCall<String, Integer> call = oobChannel.newCall(method, options);
Metadata headers = new Metadata();
// Channel stat bumped when ClientCall.start() called
assertEquals(0, getStats(oobChannel).callsStarted);
call.start(mockCallListener, headers);
assertEquals(1, getStats(oobChannel).callsStarted);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
.thenReturn(mockStream);
// subchannel stat bumped when call gets assigned to it
assertEquals(0, getStats(oobSubchannel).callsStarted);
transportListener.transportReady();
callExecutor.runDueTasks();
verify(mockStream).start(streamListenerCaptor.capture());
assertEquals(1, getStats(oobSubchannel).callsStarted);
ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose();
// closing stream listener affects subchannel stats immediately
assertEquals(0, getStats(oobSubchannel).callsSucceeded);
assertEquals(0, getStats(oobSubchannel).callsFailed);
streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
if (success) {
assertEquals(1, getStats(oobSubchannel).callsSucceeded);
assertEquals(0, getStats(oobSubchannel).callsFailed);
} else {
assertEquals(0, getStats(oobSubchannel).callsSucceeded);
assertEquals(1, getStats(oobSubchannel).callsFailed);
}
// channel stats bumped when the ClientCall.Listener is notified
assertEquals(0, getStats(oobChannel).callsSucceeded);
assertEquals(0, getStats(oobChannel).callsFailed);
callExecutor.runDueTasks();
if (success) {
assertEquals(1, getStats(oobChannel).callsSucceeded);
assertEquals(0, getStats(oobChannel).callsFailed);
} else {
assertEquals(0, getStats(oobChannel).callsSucceeded);
assertEquals(1, getStats(oobChannel).callsFailed);
}
// oob channel is separate from the original channel
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
}
@Test
public void channelsAndSubchannels_oob_instrumented_name() throws Exception {
createChannel();
String authority = "oobauthority";
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), authority);
assertEquals(authority, getStats(oobChannel).target);
}
@Test
public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
createChannel();
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobauthority");
assertEquals(IDLE, getStats(oobChannel).state);
oobChannel.getSubchannel().requestConnection();
assertEquals(CONNECTING, getStats(oobChannel).state);
MockClientTransportInfo transportInfo = transports.poll();
ManagedClientTransport.Listener transportListener = transportInfo.listener;
transportListener.transportReady();
assertEquals(READY, getStats(oobChannel).state);
// oobchannel state is separate from the ManagedChannel
assertEquals(IDLE, getStats(channel).state);
channel.shutdownNow();
assertEquals(SHUTDOWN, getStats(channel).state);
assertEquals(SHUTDOWN, getStats(oobChannel).state);
}
@Test
public void binaryLogInstalled() throws Exception {
final SettableFuture<Boolean> intercepted = SettableFuture.create();
channelBuilder.binlog = new BinaryLog() {
@Override
public void close() throws IOException {
// noop
}
@Override
public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
return oMethodDef;
}
@Override
public Channel wrapChannel(Channel channel) {
return ClientInterceptors.intercept(channel,
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
intercepted.set(true);
return next.newCall(method, callOptions);
}
});
}
};
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
assertTrue(intercepted.get());
}
@Test
public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() {
Map<String, Object> retryPolicy = new HashMap<>();
retryPolicy.put("maxAttempts", 3D);
retryPolicy.put("initialBackoff", "10s");
retryPolicy.put("maxBackoff", "30s");
retryPolicy.put("backoffMultiplier", 2D);
retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"));
Map<String, Object> methodConfig = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("service", "service");
methodConfig.put("name", Arrays.<Object>asList(name));
methodConfig.put("retryPolicy", retryPolicy);
Map<String, Object> rawServiceConfig = new HashMap<>();
rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
channelBuilder.nameResolverFactory(nameResolverFactory);
channelBuilder.executor(MoreExecutors.directExecutor());
channelBuilder.enableRetry();
RetriableStream.setRandom(
// not random
new Random() {
@Override
public double nextDouble() {
return 1D; // fake random
}
});
requestConnection = false;
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(nameResolverFactory.servers)
.build());
// simulating request connection and then transport ready after resolved address
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ClientStream mockStream = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream).thenReturn(mockStream2);
transportInfo.listener.transportReady();
updateBalancingStateSafely(helper, READY, mockPicker);
ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream).start(streamListenerCaptor.capture());
assertThat(timer.getPendingTasks()).isEmpty();
// trigger retry
streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata());
// in backoff
timer.forwardTime(5, TimeUnit.SECONDS);
assertThat(timer.getPendingTasks()).hasSize(1);
verify(mockStream2, never()).start(any(ClientStreamListener.class));
// shutdown during backoff period
channel.shutdown();
assertThat(timer.getPendingTasks()).hasSize(1);
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
call2.start(mockCallListener2, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class));
assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());
// backoff ends
timer.forwardTime(5, TimeUnit.SECONDS);
assertThat(timer.getPendingTasks()).isEmpty();
verify(mockStream2).start(streamListenerCaptor.capture());
verify(mockLoadBalancer, never()).shutdown();
assertFalse(
"channel.isTerminated() is expected to be false but was true",
channel.isTerminated());
streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata());
verify(mockLoadBalancer).shutdown();
// simulating the shutdown of load balancer triggers the shutdown of subchannel
shutdownSafely(helper, subchannel);
transportInfo.listener.transportShutdown(Status.INTERNAL);
transportInfo.listener.transportTerminated(); // simulating transport terminated
assertTrue(
"channel.isTerminated() is expected to be true but was false",
channel.isTerminated());
}
@Test
public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallShouldFail() {
Map<String, Object> hedgingPolicy = new HashMap<>();
hedgingPolicy.put("maxAttempts", 3D);
hedgingPolicy.put("hedgingDelay", "10s");
hedgingPolicy.put("nonFatalStatusCodes", Arrays.<Object>asList("UNAVAILABLE"));
Map<String, Object> methodConfig = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("service", "service");
methodConfig.put("name", Arrays.<Object>asList(name));
methodConfig.put("hedgingPolicy", hedgingPolicy);
Map<String, Object> rawServiceConfig = new HashMap<>();
rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
channelBuilder.nameResolverFactory(nameResolverFactory);
channelBuilder.executor(MoreExecutors.directExecutor());
channelBuilder.enableRetry();
requestConnection = false;
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(nameResolverFactory.servers)
.build());
// simulating request connection and then transport ready after resolved address
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ClientStream mockStream = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream).thenReturn(mockStream2);
transportInfo.listener.transportReady();
updateBalancingStateSafely(helper, READY, mockPicker);
ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream).start(streamListenerCaptor.capture());
// in hedging delay backoff
timer.forwardTime(5, TimeUnit.SECONDS);
assertThat(timer.numPendingTasks()).isEqualTo(1);
// first hedge fails
streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata());
verify(mockStream2, never()).start(any(ClientStreamListener.class));
// shutdown during backoff period
channel.shutdown();
assertThat(timer.numPendingTasks()).isEqualTo(1);
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
call2.start(mockCallListener2, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class));
assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());
// backoff ends
timer.forwardTime(5, TimeUnit.SECONDS);
assertThat(timer.numPendingTasks()).isEqualTo(1);
verify(mockStream2).start(streamListenerCaptor.capture());
verify(mockLoadBalancer, never()).shutdown();
assertFalse(
"channel.isTerminated() is expected to be false but was true",
channel.isTerminated());
streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata());
assertThat(timer.numPendingTasks()).isEqualTo(0);
verify(mockLoadBalancer).shutdown();
// simulating the shutdown of load balancer triggers the shutdown of subchannel
shutdownSafely(helper, subchannel);
// simulating transport shutdown & terminated
transportInfo.listener.transportShutdown(Status.INTERNAL);
transportInfo.listener.transportTerminated();
assertTrue(
"channel.isTerminated() is expected to be true but was false",
channel.isTerminated());
}
@Test
public void badServiceConfigIsRecoverable() throws Exception {
final List<EquivalentAddressGroup> addresses =
ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
final class FakeNameResolver extends NameResolver {
Listener2 listener;
@Override
public String getServiceAuthority() {
return "also fake";
}
@Override
public void start(Listener2 listener) {
this.listener = listener;
listener.onResult(
ResolutionResult.newBuilder()
.setAddresses(addresses)
.setServiceConfig(
ConfigOrError.fromError(
Status.INTERNAL.withDescription("kaboom is invalid")))
.build());
}
@Override
public void shutdown() {}
}
final class FakeNameResolverFactory2 extends NameResolver.Factory {
FakeNameResolver resolver;
@Nullable
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
return (resolver = new FakeNameResolver());
}
@Override
public String getDefaultScheme() {
return "fake";
}
}
FakeNameResolverFactory2 factory = new FakeNameResolverFactory2();
ManagedChannelImplBuilder customBuilder = new ManagedChannelImplBuilder(TARGET,
new ClientTransportFactoryBuilder() {
@Override
public ClientTransportFactory buildClientTransportFactory() {
return mockTransportFactory;
}
},
null);
customBuilder.executorPool = executorPool;
customBuilder.channelz = channelz;
ManagedChannel mychannel = customBuilder.nameResolverFactory(factory).build();
ClientCall<Void, Void> call1 =
mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
executor.runDueTasks();
try {
future1.get(1, TimeUnit.SECONDS);
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
}
// ok the service config is bad, let's fix it.
Map<String, Object> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}");
Object fakeLbConfig = new Object();
PolicySelection lbConfigs =
new PolicySelection(
mockLoadBalancerProvider, fakeLbConfig);
mockLoadBalancerProvider.parseLoadBalancingPolicyConfig(rawServiceConfig);
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, lbConfigs);
factory.resolver.listener.onResult(
ResolutionResult.newBuilder()
.setAddresses(addresses)
.setServiceConfig(ConfigOrError.fromConfig(managedChannelServiceConfig))
.build());
ClientCall<Void, Void> call2 = mychannel.newCall(
TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
timer.forwardTime(1234, TimeUnit.SECONDS);
executor.runDueTasks();
try {
future2.get();
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
}
mychannel.shutdownNow();
}
@Test
public void nameResolverArgsPropagation() {
final AtomicReference<NameResolver.Args> capturedArgs = new AtomicReference<>();
final NameResolver noopResolver = new NameResolver() {
@Override
public String getServiceAuthority() {
return "fake-authority";
}
@Override
public void start(Listener2 listener) {
}
@Override
public void shutdown() {}
};
ProxyDetector neverProxy = new ProxyDetector() {
@Override
public ProxiedSocketAddress proxyFor(SocketAddress targetAddress) {
return null;
}
};
NameResolver.Factory factory = new NameResolver.Factory() {
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
capturedArgs.set(args);
return noopResolver;
}
@Override
public String getDefaultScheme() {
return "fakescheme";
}
};
channelBuilder.nameResolverFactory(factory).proxyDetector(neverProxy);
createChannel();
NameResolver.Args args = capturedArgs.get();
assertThat(args).isNotNull();
assertThat(args.getDefaultPort()).isEqualTo(DEFAULT_PORT);
assertThat(args.getProxyDetector()).isSameInstanceAs(neverProxy);
verify(offloadExecutor, never()).execute(any(Runnable.class));
args.getOffloadExecutor()
.execute(
new Runnable() {
@Override
public void run() {}
});
verify(offloadExecutor, times(1)).execute(any(Runnable.class));
}
@Test
public void getAuthorityAfterShutdown() throws Exception {
createChannel();
assertEquals(SERVICE_NAME, channel.authority());
channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS);
assertEquals(SERVICE_NAME, channel.authority());
}
@Test
public void nameResolverHelper_emptyConfigSucceeds() {
boolean retryEnabled = false;
int maxRetryAttemptsLimit = 2;
int maxHedgedAttemptsLimit = 3;
AutoConfiguredLoadBalancerFactory autoConfiguredLoadBalancerFactory =
new AutoConfiguredLoadBalancerFactory("pick_first");
ScParser parser = new ScParser(
retryEnabled,
maxRetryAttemptsLimit,
maxHedgedAttemptsLimit,
autoConfiguredLoadBalancerFactory);
ConfigOrError coe = parser.parseServiceConfig(ImmutableMap.<String, Object>of());
assertThat(coe.getError()).isNull();
ManagedChannelServiceConfig cfg = (ManagedChannelServiceConfig) coe.getConfig();
assertThat(cfg.getMethodConfig(method)).isEqualTo(
ManagedChannelServiceConfig.empty().getMethodConfig(method));
}
@Test
public void nameResolverHelper_badConfigFails() {
boolean retryEnabled = false;
int maxRetryAttemptsLimit = 2;
int maxHedgedAttemptsLimit = 3;
AutoConfiguredLoadBalancerFactory autoConfiguredLoadBalancerFactory =
new AutoConfiguredLoadBalancerFactory("pick_first");
ScParser parser = new ScParser(
retryEnabled,
maxRetryAttemptsLimit,
maxHedgedAttemptsLimit,
autoConfiguredLoadBalancerFactory);
ConfigOrError coe =
parser.parseServiceConfig(ImmutableMap.<String, Object>of("methodConfig", "bogus"));
assertThat(coe.getError()).isNotNull();
assertThat(coe.getError().getCode()).isEqualTo(Code.UNKNOWN);
assertThat(coe.getError().getDescription()).contains("failed to parse service config");
assertThat(coe.getError().getCause()).isInstanceOf(ClassCastException.class);
}
@Test
public void nameResolverHelper_noConfigChosen() {
boolean retryEnabled = false;
int maxRetryAttemptsLimit = 2;
int maxHedgedAttemptsLimit = 3;
AutoConfiguredLoadBalancerFactory autoConfiguredLoadBalancerFactory =
new AutoConfiguredLoadBalancerFactory("pick_first");
ScParser parser = new ScParser(
retryEnabled,
maxRetryAttemptsLimit,
maxHedgedAttemptsLimit,
autoConfiguredLoadBalancerFactory);
ConfigOrError coe =
parser.parseServiceConfig(ImmutableMap.of("loadBalancingConfig", ImmutableList.of()));
assertThat(coe.getError()).isNull();
ManagedChannelServiceConfig cfg = (ManagedChannelServiceConfig) coe.getConfig();
assertThat(cfg.getLoadBalancingConfig()).isNull();
}
@Test
public void disableServiceConfigLookUp_noDefaultConfig() throws Exception {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
channelBuilder.disableServiceConfigLookUp();
Map<String, Object> rawServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
nameResolverFactory.nextAttributes.set(
Attributes.newBuilder()
.set(InternalConfigSelector.KEY, mock(InternalConfigSelector.class))
.build());
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
assertThat(resultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY)).isNull();
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void disableServiceConfigLookUp_withDefaultConfig() throws Exception {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
channelBuilder.disableServiceConfigLookUp();
Map<String, Object> defaultServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
channelBuilder.defaultServiceConfig(defaultServiceConfig);
Map<String, Object> rawServiceConfig = new HashMap<>();
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
nameResolverFactory.nextAttributes.set(
Attributes.newBuilder()
.set(InternalConfigSelector.KEY, mock(InternalConfigSelector.class))
.build());
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
assertThat(resultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY)).isNull();
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void enableServiceConfigLookUp_noDefaultConfig() throws Exception {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> rawServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
// new config
rawServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":false}]}");
managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
nameResolverFactory.allResolved();
resultCaptor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void enableServiceConfigLookUp_withDefaultConfig() throws Exception {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> defaultServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
channelBuilder.defaultServiceConfig(defaultServiceConfig);
Map<String, Object> rawServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService2\"}],"
+ "\"waitForReady\":false}]}");
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void enableServiceConfigLookUp_resolverReturnsNoConfig_withDefaultConfig()
throws Exception {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> defaultServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
channelBuilder.defaultServiceConfig(defaultServiceConfig);
nameResolverFactory.nextConfigOrError.set(null);
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void enableServiceConfigLookUp_resolverReturnsNoConfig_noDefaultConfig() {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> rawServiceConfig = Collections.emptyMap();
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void useDefaultImmediatelyIfDisableLookUp() throws Exception {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
channelBuilder.disableServiceConfigLookUp();
Map<String, Object> defaultServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
channelBuilder.defaultServiceConfig(defaultServiceConfig);
requestConnection = false;
channelBuilder.maxTraceEvents(10);
createChannel();
int size = getStats(channel).channelTrace.events.size();
assertThat(getStats(channel).channelTrace.events.get(size - 1))
.isEqualTo(new ChannelTrace.Event.Builder()
.setDescription("Service config look-up disabled, using default service config")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void notUseDefaultImmediatelyIfEnableLookUp() throws Exception {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(ImmutableList.of(addressGroup)).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> defaultServiceConfig =
parseConfig("{\"methodConfig\":[{"
+ "\"name\":[{\"service\":\"SimpleService1\"}],"
+ "\"waitForReady\":true}]}");
channelBuilder.defaultServiceConfig(defaultServiceConfig);
requestConnection = false;
channelBuilder.maxTraceEvents(10);
createChannel();
int size = getStats(channel).channelTrace.events.size();
assertThat(getStats(channel).channelTrace.events.get(size - 1))
.isNotEqualTo(new ChannelTrace.Event.Builder()
.setDescription("Using default service config")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void healthCheckingConfigPropagated() throws Exception {
LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider);
try {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> rawServiceConfig =
parseConfig("{\"healthCheckConfig\": {\"serviceName\": \"service1\"}}");
ManagedChannelServiceConfig managedChannelServiceConfig =
createManagedChannelServiceConfig(rawServiceConfig, null);
nameResolverFactory.nextConfigOrError.set(
ConfigOrError.fromConfig(managedChannelServiceConfig));
createChannel();
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAttributes()
.get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG))
.containsExactly("serviceName", "service1");
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
}
}
@Test
public void createResolvingOobChannel() throws Exception {
String oobTarget = "fake://second.example.com";
URI oobUri = new URI(oobTarget);
channelBuilder
.nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri, oobUri).build());
createChannel();
ManagedChannel resolvedOobChannel = null;
try {
resolvedOobChannel = helper.createResolvingOobChannel(oobTarget);
assertWithMessage("resolving oob channel should have same authority")
.that(resolvedOobChannel.authority())
.isEqualTo(channel.authority());
} finally {
if (resolvedOobChannel != null) {
resolvedOobChannel.shutdownNow();
}
}
}
private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
@Override
public BackoffPolicy get() {
return new BackoffPolicy() {
int multiplier = 1;
@Override
public long nextBackoffNanos() {
return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++;
}
};
}
}
private static final class FakeNameResolverFactory extends NameResolver.Factory {
final List<URI> expectedUris;
final List<EquivalentAddressGroup> servers;
final boolean resolvedAtStart;
final Status error;
final ArrayList<FakeNameResolverFactory.FakeNameResolver> resolvers = new ArrayList<>();
final AtomicReference<ConfigOrError> nextConfigOrError = new AtomicReference<>();
final AtomicReference<Attributes> nextAttributes = new AtomicReference<>(Attributes.EMPTY);
FakeNameResolverFactory(
List<URI> expectedUris,
List<EquivalentAddressGroup> servers,
boolean resolvedAtStart,
Status error) {
this.expectedUris = expectedUris;
this.servers = servers;
this.resolvedAtStart = resolvedAtStart;
this.error = error;
}
@Override
public NameResolver newNameResolver(final URI targetUri, NameResolver.Args args) {
if (!expectedUris.contains(targetUri)) {
return null;
}
assertEquals(DEFAULT_PORT, args.getDefaultPort());
FakeNameResolverFactory.FakeNameResolver resolver =
new FakeNameResolverFactory.FakeNameResolver(targetUri, error);
resolvers.add(resolver);
return resolver;
}
@Override
public String getDefaultScheme() {
return "fake";
}
void allResolved() {
for (FakeNameResolverFactory.FakeNameResolver resolver : resolvers) {
resolver.resolved();
}
}
final class FakeNameResolver extends NameResolver {
final URI targetUri;
Listener2 listener;
boolean shutdown;
int refreshCalled;
Status error;
FakeNameResolver(URI targetUri, Status error) {
this.targetUri = targetUri;
this.error = error;
}
@Override public String getServiceAuthority() {
return targetUri.getAuthority();
}
@Override public void start(Listener2 listener) {
this.listener = listener;
if (resolvedAtStart) {
resolved();
}
}
@Override public void refresh() {
refreshCalled++;
resolved();
}
void resolved() {
if (error != null) {
listener.onError(error);
return;
}
ResolutionResult.Builder builder =
ResolutionResult.newBuilder()
.setAddresses(servers)
.setAttributes(nextAttributes.get());
ConfigOrError configOrError = nextConfigOrError.get();
if (configOrError != null) {
builder.setServiceConfig(configOrError);
}
listener.onResult(builder.build());
}
@Override public void shutdown() {
shutdown = true;
}
@Override
public String toString() {
return "FakeNameResolver";
}
}
static final class Builder {
List<URI> expectedUris;
List<EquivalentAddressGroup> servers = ImmutableList.of();
boolean resolvedAtStart = true;
Status error = null;
Builder(URI... expectedUris) {
this.expectedUris = Collections.unmodifiableList(Arrays.asList(expectedUris));
}
FakeNameResolverFactory.Builder setServers(List<EquivalentAddressGroup> servers) {
this.servers = servers;
return this;
}
FakeNameResolverFactory.Builder setResolvedAtStart(boolean resolvedAtStart) {
this.resolvedAtStart = resolvedAtStart;
return this;
}
FakeNameResolverFactory.Builder setError(Status error) {
this.error = error;
return this;
}
FakeNameResolverFactory build() {
return new FakeNameResolverFactory(expectedUris, servers, resolvedAtStart, error);
}
}
}
private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception {
return subchannel.getInstrumentedInternalSubchannel().getStats().get();
}
private static ChannelStats getStats(
InternalInstrumented<ChannelStats> instrumented) throws Exception {
return instrumented.getStats().get();
}
private FakeClock.ScheduledTask getNameResolverRefresh() {
return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
}
// Helper methods to call methods from SynchronizationContext
private static Subchannel createSubchannelSafely(
final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs,
final SubchannelStateListener stateListener) {
final AtomicReference<Subchannel> resultCapture = new AtomicReference<>();
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
Subchannel s = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroup)
.setAttributes(attrs)
.build());
s.start(stateListener);
resultCapture.set(s);
}
});
return resultCapture.get();
}
private static Subchannel createUnstartedSubchannel(
final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) {
final AtomicReference<Subchannel> resultCapture = new AtomicReference<>();
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
Subchannel s = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroup)
.setAttributes(attrs)
.build());
resultCapture.set(s);
}
});
return resultCapture.get();
}
private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
}
private static void updateBalancingStateSafely(
final Helper helper, final ConnectivityState state, final SubchannelPicker picker) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
helper.updateBalancingState(state, picker);
}
});
}
private static void refreshNameResolutionSafely(final Helper helper) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
helper.refreshNameResolution();
}
});
}
private static void updateAddressesSafely(
Helper helper, final Subchannel subchannel, final List<EquivalentAddressGroup> addrs) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.updateAddresses(addrs);
}
});
}
private static void shutdownSafely(
final Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.shutdown();
}
});
}
@SuppressWarnings("unchecked")
private static Map<String, Object> parseConfig(String json) throws Exception {
return (Map<String, Object>) JsonParser.parse(json);
}
private static ManagedChannelServiceConfig createManagedChannelServiceConfig(
Map<String, Object> rawServiceConfig, PolicySelection policySelection) {
// Provides dummy variable for retry related params (not used in this test class)
return ManagedChannelServiceConfig
.fromServiceConfig(rawServiceConfig, true, 3, 4, policySelection);
}
}