| /* |
| * Copyright 2015 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.internal; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.common.truth.Truth.assertThat; |
| import static com.google.common.truth.Truth.assertWithMessage; |
| import static io.grpc.ConnectivityState.CONNECTING; |
| import static io.grpc.ConnectivityState.IDLE; |
| import static io.grpc.ConnectivityState.READY; |
| import static io.grpc.ConnectivityState.SHUTDOWN; |
| import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; |
| import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE; |
| import static junit.framework.TestCase.assertNotSame; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.AdditionalAnswers.delegatesTo; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.ArgumentMatchers.isA; |
| import static org.mockito.ArgumentMatchers.same; |
| import static org.mockito.Mockito.atLeast; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.inOrder; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoInteractions; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.SettableFuture; |
| import io.grpc.Attributes; |
| import io.grpc.BinaryLog; |
| import io.grpc.CallCredentials; |
| import io.grpc.CallCredentials.RequestInfo; |
| import io.grpc.CallOptions; |
| import io.grpc.Channel; |
| import io.grpc.ChannelCredentials; |
| import io.grpc.ChannelLogger; |
| import io.grpc.ClientCall; |
| import io.grpc.ClientInterceptor; |
| import io.grpc.ClientInterceptors; |
| import io.grpc.ClientStreamTracer; |
| import io.grpc.CompositeChannelCredentials; |
| import io.grpc.ConnectivityState; |
| import io.grpc.ConnectivityStateInfo; |
| import io.grpc.Context; |
| import io.grpc.EquivalentAddressGroup; |
| import io.grpc.InsecureChannelCredentials; |
| import io.grpc.IntegerMarshaller; |
| import io.grpc.InternalChannelz; |
| import io.grpc.InternalChannelz.ChannelStats; |
| import io.grpc.InternalChannelz.ChannelTrace; |
| import io.grpc.InternalConfigSelector; |
| import io.grpc.InternalInstrumented; |
| import io.grpc.LoadBalancer; |
| import io.grpc.LoadBalancer.CreateSubchannelArgs; |
| import io.grpc.LoadBalancer.Helper; |
| import io.grpc.LoadBalancer.PickResult; |
| import io.grpc.LoadBalancer.PickSubchannelArgs; |
| import io.grpc.LoadBalancer.ResolvedAddresses; |
| import io.grpc.LoadBalancer.Subchannel; |
| import io.grpc.LoadBalancer.SubchannelPicker; |
| import io.grpc.LoadBalancer.SubchannelStateListener; |
| import io.grpc.LoadBalancerProvider; |
| import io.grpc.LoadBalancerRegistry; |
| import io.grpc.ManagedChannel; |
| import io.grpc.Metadata; |
| import io.grpc.MethodDescriptor; |
| import io.grpc.MethodDescriptor.MethodType; |
| import io.grpc.NameResolver; |
| import io.grpc.NameResolver.ConfigOrError; |
| import io.grpc.NameResolver.ResolutionResult; |
| import io.grpc.NameResolverRegistry; |
| import io.grpc.ProxiedSocketAddress; |
| import io.grpc.ProxyDetector; |
| import io.grpc.SecurityLevel; |
| import io.grpc.ServerMethodDefinition; |
| import io.grpc.Status; |
| import io.grpc.Status.Code; |
| import io.grpc.StringMarshaller; |
| import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; |
| import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult; |
| import io.grpc.internal.InternalSubchannel.TransportLogger; |
| import io.grpc.internal.ManagedChannelImpl.ScParser; |
| import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder; |
| import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider; |
| import io.grpc.internal.ManagedChannelImplBuilder.UnsupportedClientTransportFactoryBuilder; |
| import io.grpc.internal.ServiceConfigUtil.PolicySelection; |
| import io.grpc.internal.TestUtils.MockClientTransportInfo; |
| import io.grpc.stub.ClientCalls; |
| import io.grpc.testing.TestMethodDescriptors; |
| import io.grpc.util.ForwardingSubchannel; |
| import java.io.IOException; |
| import java.net.SocketAddress; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.InOrder; |
| import org.mockito.Mock; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.junit.MockitoJUnit; |
| import org.mockito.junit.MockitoRule; |
| import org.mockito.stubbing.Answer; |
| |
| /** Unit tests for {@link ManagedChannelImpl}. */ |
| @RunWith(JUnit4.class) |
| // TODO(creamsoup) remove backward compatible check when fully migrated |
| @SuppressWarnings("deprecation") |
| public class ManagedChannelImplTest { |
| private static final int DEFAULT_PORT = 447; |
| |
| private static final MethodDescriptor<String, Integer> method = |
| MethodDescriptor.<String, Integer>newBuilder() |
| .setType(MethodType.UNKNOWN) |
| .setFullMethodName("service/method") |
| .setRequestMarshaller(new StringMarshaller()) |
| .setResponseMarshaller(new IntegerMarshaller()) |
| .build(); |
| private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY = |
| Attributes.Key.create("subchannel-attr-key"); |
| private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10; |
| private static final String SERVICE_NAME = "fake.example.com"; |
| private static final String AUTHORITY = SERVICE_NAME; |
| private static final String USER_AGENT = "userAgent"; |
| private static final ClientTransportOptions clientTransportOptions = |
| new ClientTransportOptions() |
| .setAuthority(AUTHORITY) |
| .setUserAgent(USER_AGENT); |
| private static final String TARGET = "fake://" + SERVICE_NAME; |
| private static final String MOCK_POLICY_NAME = "mock_lb"; |
| private URI expectedUri; |
| private final SocketAddress socketAddress = |
| new SocketAddress() { |
| @Override |
| public String toString() { |
| return "test-addr"; |
| } |
| }; |
| private final SocketAddress socketAddress2 = |
| new SocketAddress() { |
| @Override |
| public String toString() { |
| return "test-addr"; |
| } |
| }; |
| private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); |
| private final EquivalentAddressGroup addressGroup2 = |
| new EquivalentAddressGroup(Arrays.asList(socketAddress, socketAddress2)); |
| private final FakeClock timer = new FakeClock(); |
| private final FakeClock executor = new FakeClock(); |
| private final FakeClock balancerRpcExecutor = new FakeClock(); |
| private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER = |
| new FakeClock.TaskFilter() { |
| @Override |
| public boolean shouldAccept(Runnable command) { |
| return command.toString().contains( |
| ManagedChannelImpl.DelayedNameResolverRefresh.class.getName()); |
| } |
| }; |
| |
| private final InternalChannelz channelz = new InternalChannelz(); |
| |
| @Rule public final MockitoRule mocks = MockitoJUnit.rule(); |
| |
| private ManagedChannelImpl channel; |
| private Helper helper; |
| @Captor |
| private ArgumentCaptor<Status> statusCaptor; |
| @Captor |
| private ArgumentCaptor<CallOptions> callOptionsCaptor; |
| @Mock |
| private LoadBalancer mockLoadBalancer; |
| @Mock |
| private SubchannelStateListener subchannelStateListener; |
| private final LoadBalancerProvider mockLoadBalancerProvider = |
| mock(LoadBalancerProvider.class, delegatesTo(new LoadBalancerProvider() { |
| @Override |
| public LoadBalancer newLoadBalancer(Helper helper) { |
| return mockLoadBalancer; |
| } |
| |
| @Override |
| public boolean isAvailable() { |
| return true; |
| } |
| |
| @Override |
| public int getPriority() { |
| return 999; |
| } |
| |
| @Override |
| public String getPolicyName() { |
| return MOCK_POLICY_NAME; |
| } |
| })); |
| |
| @Captor |
| private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor; |
| @Mock |
| private SubchannelPicker mockPicker; |
| @Mock |
| private ClientTransportFactory mockTransportFactory; |
| @Mock |
| private ClientCall.Listener<Integer> mockCallListener; |
| @Mock |
| private ClientCall.Listener<Integer> mockCallListener2; |
| @Mock |
| private ClientCall.Listener<Integer> mockCallListener3; |
| @Mock |
| private ClientCall.Listener<Integer> mockCallListener4; |
| @Mock |
| private ClientCall.Listener<Integer> mockCallListener5; |
| @Mock |
| private ObjectPool<Executor> executorPool; |
| @Mock |
| private ObjectPool<Executor> balancerRpcExecutorPool; |
| @Mock |
| private CallCredentials creds; |
| @Mock |
| private Executor offloadExecutor; |
| private ManagedChannelImplBuilder channelBuilder; |
| private boolean requestConnection = true; |
| private BlockingQueue<MockClientTransportInfo> transports; |
| private boolean panicExpected; |
| @Captor |
| private ArgumentCaptor<ResolvedAddresses> resolvedAddressCaptor; |
| |
| private ArgumentCaptor<ClientStreamListener> streamListenerCaptor = |
| ArgumentCaptor.forClass(ClientStreamListener.class); |
| |
| private void createChannel(ClientInterceptor... interceptors) { |
| checkState(channel == null); |
| |
| channel = new ManagedChannelImpl( |
| channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(), |
| balancerRpcExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors), |
| timer.getTimeProvider()); |
| |
| if (requestConnection) { |
| int numExpectedTasks = 0; |
| |
| // Force-exit the initial idle-mode |
| channel.syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| channel.exitIdleMode(); |
| } |
| }); |
| if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) { |
| numExpectedTasks += 1; |
| } |
| |
| if (getNameResolverRefresh() != null) { |
| numExpectedTasks += 1; |
| } |
| |
| assertEquals(numExpectedTasks, timer.numPendingTasks()); |
| |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| } |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| when(mockLoadBalancer.canHandleEmptyAddressListFromNameResolution()).thenCallRealMethod(); |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| expectedUri = new URI(TARGET); |
| transports = TestUtils.captureTransports(mockTransportFactory); |
| when(mockTransportFactory.getScheduledExecutorService()) |
| .thenReturn(timer.getScheduledExecutorService()); |
| when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); |
| when(balancerRpcExecutorPool.getObject()) |
| .thenReturn(balancerRpcExecutor.getScheduledExecutorService()); |
| |
| channelBuilder = new ManagedChannelImplBuilder(TARGET, |
| new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); |
| configureBuilder(channelBuilder); |
| } |
| |
| private void configureBuilder(ManagedChannelImplBuilder channelBuilder) { |
| channelBuilder |
| .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) |
| .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) |
| .userAgent(USER_AGENT) |
| .idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) |
| .offloadExecutor(offloadExecutor); |
| channelBuilder.executorPool = executorPool; |
| channelBuilder.binlog = null; |
| channelBuilder.channelz = channelz; |
| } |
| |
| @After |
| public void allPendingTasksAreRun() throws Exception { |
| // The "never" verifications in the tests only hold up if all due tasks are done. |
| // As for timer, although there may be scheduled tasks in a future time, since we don't test |
| // any time-related behavior in this test suite, we only care the tasks that are due. This |
| // would ignore any time-sensitive tasks, e.g., back-off and the idle timer. |
| assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty()); |
| assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); |
| if (channel != null) { |
| if (!panicExpected) { |
| assertFalse(channel.isInPanicMode()); |
| } |
| channel.shutdownNow(); |
| channel = null; |
| } |
| } |
| |
| @After |
| public void cleanUp() { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| |
| @Test |
| public void createSubchannel_outsideSynchronizationContextShouldThrow() { |
| createChannel(); |
| try { |
| helper.createSubchannel(CreateSubchannelArgs.newBuilder() |
| .setAddresses(addressGroup) |
| .build()); |
| fail("Should throw"); |
| } catch (IllegalStateException e) { |
| assertThat(e).hasMessageThat().isEqualTo("Not called from the SynchronizationContext"); |
| } |
| } |
| |
| @Test |
| public void createSubchannel_resolverOverrideAuthority() { |
| EquivalentAddressGroup addressGroup = new EquivalentAddressGroup( |
| socketAddress, |
| Attributes.newBuilder() |
| .set(ATTR_AUTHORITY_OVERRIDE, "resolver.override.authority") |
| .build()); |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(addressGroup)) |
| .build()); |
| createChannel(); |
| |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| ArgumentCaptor<ClientTransportOptions> transportOptionCaptor = ArgumentCaptor.forClass(null); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), transportOptionCaptor.capture(), any(ChannelLogger.class)); |
| assertThat(transportOptionCaptor.getValue().getAuthority()) |
| .isEqualTo("resolver.override.authority"); |
| } |
| |
| @Test |
| public void createSubchannel_channelBuilderOverrideAuthority() { |
| channelBuilder.overrideAuthority("channel-builder.override.authority"); |
| EquivalentAddressGroup addressGroup = new EquivalentAddressGroup( |
| socketAddress, |
| Attributes.newBuilder() |
| .set(ATTR_AUTHORITY_OVERRIDE, "resolver.override.authority") |
| .build()); |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(addressGroup)) |
| .build()); |
| createChannel(); |
| |
| final Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| ArgumentCaptor<ClientTransportOptions> transportOptionCaptor = ArgumentCaptor.forClass(null); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), transportOptionCaptor.capture(), any(ChannelLogger.class)); |
| assertThat(transportOptionCaptor.getValue().getAuthority()) |
| .isEqualTo("channel-builder.override.authority"); |
| final List<EquivalentAddressGroup> subchannelEags = new ArrayList<>(); |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| subchannelEags.addAll(subchannel.getAllAddresses()); |
| } |
| }); |
| assertThat(subchannelEags).isEqualTo(ImmutableList.of(addressGroup)); |
| } |
| |
| @Test |
| public void idleModeDisabled() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build()); |
| createChannel(); |
| |
| // In this test suite, the channel is always created with idle mode disabled. |
| // No task is scheduled to enter idle mode |
| assertEquals(0, timer.numPendingTasks()); |
| assertEquals(0, executor.numPendingTasks()); |
| } |
| |
| @Test |
| public void immediateDeadlineExceeded() { |
| createChannel(); |
| ClientCall<String, Integer> call = |
| channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); |
| call.start(mockCallListener, new Metadata()); |
| assertEquals(1, executor.runDueTasks()); |
| |
| verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); |
| Status status = statusCaptor.getValue(); |
| assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); |
| } |
| |
| @Test |
| public void startCallBeforeNameResolution() throws Exception { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channel = new ManagedChannelImpl( |
| channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(), |
| balancerRpcExecutorPool, timer.getStopwatchSupplier(), |
| Collections.<ClientInterceptor>emptyList(), timer.getTimeProvider()); |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"service\"}]," |
| + "\"waitForReady\":true}]}"); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| Metadata headers = new Metadata(); |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, headers); |
| |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| // Make the transport available |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| executor.runDueTasks(); |
| |
| ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(null); |
| verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture()); |
| assertThat(callOptionsCaptor.getValue().isWaitForReady()).isTrue(); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| |
| // Clean up as much as possible to allow the channel to terminate. |
| shutdownSafely(helper, subchannel); |
| timer.forwardNanos( |
| TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); |
| } |
| |
| @Test |
| public void newCallWithConfigSelector() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channel = new ManagedChannelImpl( |
| channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(), |
| balancerRpcExecutorPool, timer.getStopwatchSupplier(), |
| Collections.<ClientInterceptor>emptyList(), timer.getTimeProvider()); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(ManagedChannelServiceConfig.empty())); |
| final Metadata.Key<String> metadataKey = |
| Metadata.Key.of("test", Metadata.ASCII_STRING_MARSHALLER); |
| final CallOptions.Key<String> callOptionsKey = CallOptions.Key.create("test"); |
| InternalConfigSelector configSelector = new InternalConfigSelector() { |
| @Override |
| public Result selectConfig(final PickSubchannelArgs args) { |
| return Result.newBuilder() |
| .setConfig(ManagedChannelServiceConfig.empty()) |
| .setInterceptor( |
| // An interceptor that mutates CallOptions based on headers value. |
| new ClientInterceptor() { |
| String value = args.getHeaders().get(metadataKey); |
| @Override |
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( |
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { |
| callOptions = callOptions.withOption(callOptionsKey, value); |
| return next.newCall(method, callOptions); |
| } |
| }) |
| .build(); |
| } |
| }; |
| nameResolverFactory.nextAttributes.set( |
| Attributes.newBuilder().set(InternalConfigSelector.KEY, configSelector).build()); |
| channel.getState(true); |
| Metadata headers = new Metadata(); |
| headers.put(metadataKey, "fooValue"); |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, headers); |
| |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| // Make the transport available |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| executor.runDueTasks(); |
| |
| ArgumentCaptor<CallOptions> callOptionsCaptor = ArgumentCaptor.forClass(null); |
| verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture()); |
| assertThat(callOptionsCaptor.getValue().getOption(callOptionsKey)).isEqualTo("fooValue"); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| |
| // Clean up as much as possible to allow the channel to terminate. |
| shutdownSafely(helper, subchannel); |
| timer.forwardNanos( |
| TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); |
| } |
| |
| @Test |
| public void shutdownWithNoTransportsEverCreated() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build()); |
| createChannel(); |
| verify(executorPool).getObject(); |
| verify(executorPool, never()).returnObject(any()); |
| channel.shutdown(); |
| assertTrue(channel.isShutdown()); |
| assertTrue(channel.isTerminated()); |
| verify(executorPool).returnObject(executor.getScheduledExecutorService()); |
| } |
| |
| @Test |
| public void shutdownNow_pendingCallShouldFail() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setResolvedAtStart(false) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build()); |
| createChannel(); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| channel.shutdown(); |
| executor.runDueTasks(); |
| verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); |
| channel.shutdownNow(); |
| executor.runDueTasks(); |
| verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); |
| assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); |
| } |
| |
| @Test |
| public void shutdownWithNoNameResolution_newCallShouldFail() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setResolvedAtStart(false) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build()); |
| createChannel(); |
| channel.shutdown(); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| executor.runDueTasks(); |
| verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); |
| assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); |
| } |
| |
| @Test |
| public void channelzMembership() throws Exception { |
| createChannel(); |
| assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); |
| assertFalse(channelz.containsSubchannel(channel.getLogId())); |
| channel.shutdownNow(); |
| channel.awaitTermination(5, TimeUnit.SECONDS); |
| assertNull(channelz.getRootChannel(channel.getLogId().getId())); |
| assertFalse(channelz.containsSubchannel(channel.getLogId())); |
| } |
| |
| @Test |
| public void channelzMembership_subchannel() throws Exception { |
| createChannel(); |
| assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); |
| |
| AbstractSubchannel subchannel = |
| (AbstractSubchannel) createSubchannelSafely( |
| helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| // subchannels are not root channels |
| assertNull( |
| channelz.getRootChannel(subchannel.getInstrumentedInternalSubchannel().getLogId().getId())); |
| assertTrue( |
| channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); |
| assertThat(getStats(channel).subchannels) |
| .containsExactly(subchannel.getInstrumentedInternalSubchannel()); |
| |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); |
| transportInfo.listener.transportReady(); |
| |
| // terminate transport |
| transportInfo.listener.transportShutdown(Status.CANCELLED); |
| transportInfo.listener.transportTerminated(); |
| assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); |
| |
| // terminate subchannel |
| assertTrue( |
| channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); |
| shutdownSafely(helper, subchannel); |
| timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); |
| timer.runDueTasks(); |
| assertFalse( |
| channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); |
| assertThat(getStats(channel).subchannels).isEmpty(); |
| |
| // channel still appears |
| assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); |
| } |
| |
| @Test |
| public void channelzMembership_oob() throws Exception { |
| createChannel(); |
| OobChannel oob = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), AUTHORITY); |
| // oob channels are not root channels |
| assertNull(channelz.getRootChannel(oob.getLogId().getId())); |
| assertTrue(channelz.containsSubchannel(oob.getLogId())); |
| assertThat(getStats(channel).subchannels).containsExactly(oob); |
| assertTrue(channelz.containsSubchannel(oob.getLogId())); |
| |
| AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel(); |
| assertTrue( |
| channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); |
| assertThat(getStats(oob).subchannels) |
| .containsExactly(subchannel.getInstrumentedInternalSubchannel()); |
| assertTrue( |
| channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); |
| |
| oob.getSubchannel().requestConnection(); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); |
| |
| // terminate transport |
| transportInfo.listener.transportShutdown(Status.INTERNAL); |
| transportInfo.listener.transportTerminated(); |
| assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); |
| |
| // terminate oobchannel |
| oob.shutdown(); |
| assertFalse(channelz.containsSubchannel(oob.getLogId())); |
| assertThat(getStats(channel).subchannels).isEmpty(); |
| assertFalse( |
| channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); |
| |
| // channel still appears |
| assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); |
| } |
| |
| @Test |
| public void callsAndShutdown() { |
| subtestCallsAndShutdown(false, false); |
| } |
| |
| @Test |
| public void callsAndShutdownNow() { |
| subtestCallsAndShutdown(true, false); |
| } |
| |
| /** Make sure shutdownNow() after shutdown() has an effect. */ |
| @Test |
| public void callsAndShutdownAndShutdownNow() { |
| subtestCallsAndShutdown(false, true); |
| } |
| |
| private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| verify(executorPool).getObject(); |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientStream mockStream2 = mock(ClientStream.class); |
| Metadata headers = new Metadata(); |
| Metadata headers2 = new Metadata(); |
| |
| // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to |
| // real transport. |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| verify(mockTransport).start(any(ManagedClientTransport.Listener.class)); |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT))) |
| .thenReturn(mockStream); |
| when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT))) |
| .thenReturn(mockStream2); |
| transportListener.transportReady(); |
| when(mockPicker.pickSubchannel( |
| new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn( |
| PickResult.withNoResult()); |
| when(mockPicker.pickSubchannel( |
| new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn( |
| PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| // First RPC, will be pending |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| call.start(mockCallListener, headers); |
| |
| verify(mockTransport, never()) |
| .newStream(same(method), same(headers), same(CallOptions.DEFAULT)); |
| |
| // Second RPC, will be assigned to the real transport |
| ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); |
| call2.start(mockCallListener2, headers2); |
| verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT)); |
| verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT)); |
| verify(mockStream2).start(any(ClientStreamListener.class)); |
| |
| // Shutdown |
| if (shutdownNow) { |
| channel.shutdownNow(); |
| } else { |
| channel.shutdown(); |
| if (shutdownNowAfterShutdown) { |
| channel.shutdownNow(); |
| shutdownNow = true; |
| } |
| } |
| assertTrue(channel.isShutdown()); |
| assertFalse(channel.isTerminated()); |
| assertThat(nameResolverFactory.resolvers).hasSize(1); |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| |
| // Further calls should fail without going to the transport |
| ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT); |
| call3.start(mockCallListener3, headers2); |
| timer.runDueTasks(); |
| executor.runDueTasks(); |
| |
| verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class)); |
| assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| |
| if (shutdownNow) { |
| // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated. |
| verify(mockLoadBalancer).shutdown(); |
| assertTrue(nameResolverFactory.resolvers.get(0).shutdown); |
| // call should have been aborted by delayed transport |
| executor.runDueTasks(); |
| verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS), |
| any(Metadata.class)); |
| } else { |
| // LoadBalancer and NameResolver are still running. |
| verify(mockLoadBalancer, never()).shutdown(); |
| assertFalse(nameResolverFactory.resolvers.get(0).shutdown); |
| // call and call2 are still alive, and can still be assigned to a real transport |
| SubchannelPicker picker2 = mock(SubchannelPicker.class); |
| when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, picker2); |
| executor.runDueTasks(); |
| verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); |
| verify(mockStream).start(any(ClientStreamListener.class)); |
| } |
| |
| // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports |
| // will be shutdown. |
| verify(mockLoadBalancer).shutdown(); |
| assertTrue(nameResolverFactory.resolvers.get(0).shutdown); |
| |
| if (shutdownNow) { |
| // Channel shutdownNow() all subchannels after shutting down LoadBalancer |
| verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS); |
| } else { |
| verify(mockTransport, never()).shutdownNow(any(Status.class)); |
| } |
| // LoadBalancer should shutdown the subchannel |
| shutdownSafely(helper, subchannel); |
| if (shutdownNow) { |
| verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS)); |
| } else { |
| verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); |
| } |
| |
| // Killing the remaining real transport will terminate the channel |
| transportListener.transportShutdown(Status.UNAVAILABLE); |
| assertFalse(channel.isTerminated()); |
| verify(executorPool, never()).returnObject(any()); |
| transportListener.transportTerminated(); |
| assertTrue(channel.isTerminated()); |
| verify(executorPool).returnObject(executor.getScheduledExecutorService()); |
| verifyNoMoreInteractions(balancerRpcExecutorPool); |
| |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| verify(mockTransportFactory).close(); |
| verify(mockTransport, atLeast(0)).getLogId(); |
| verifyNoMoreInteractions(mockTransport); |
| } |
| |
| @Test |
| public void noMoreCallbackAfterLoadBalancerShutdown() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed"); |
| createChannel(); |
| |
| FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| verify(mockLoadBalancer).handleResolvedAddresses(resolvedAddressCaptor.capture()); |
| assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| |
| SubchannelStateListener stateListener1 = mock(SubchannelStateListener.class); |
| SubchannelStateListener stateListener2 = mock(SubchannelStateListener.class); |
| Subchannel subchannel1 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener1); |
| Subchannel subchannel2 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener2); |
| requestConnectionSafely(helper, subchannel1); |
| requestConnectionSafely(helper, subchannel2); |
| verify(mockTransportFactory, times(2)) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo1 = transports.poll(); |
| MockClientTransportInfo transportInfo2 = transports.poll(); |
| |
| // LoadBalancer receives all sorts of callbacks |
| transportInfo1.listener.transportReady(); |
| |
| verify(stateListener1, times(2)).onSubchannelState(stateInfoCaptor.capture()); |
| assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState()); |
| assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState()); |
| |
| verify(stateListener2).onSubchannelState(stateInfoCaptor.capture()); |
| assertSame(CONNECTING, stateInfoCaptor.getValue().getState()); |
| |
| resolver.listener.onError(resolutionError); |
| verify(mockLoadBalancer).handleNameResolutionError(resolutionError); |
| |
| verifyNoMoreInteractions(mockLoadBalancer); |
| |
| channel.shutdown(); |
| verify(mockLoadBalancer).shutdown(); |
| verifyNoMoreInteractions(stateListener1, stateListener2); |
| |
| // LoadBalancer will normally shutdown all subchannels |
| shutdownSafely(helper, subchannel1); |
| shutdownSafely(helper, subchannel2); |
| |
| // Since subchannels are shutdown, SubchannelStateListeners will only get SHUTDOWN regardless of |
| // the transport states. |
| transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); |
| transportInfo2.listener.transportReady(); |
| verify(stateListener1).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); |
| verify(stateListener2).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); |
| verifyNoMoreInteractions(stateListener1, stateListener2); |
| |
| // No more callback should be delivered to LoadBalancer after it's shut down |
| resolver.listener.onError(resolutionError); |
| resolver.resolved(); |
| verifyNoMoreInteractions(mockLoadBalancer); |
| } |
| |
| @Test |
| public void interceptor() throws Exception { |
| final AtomicLong atomic = new AtomicLong(); |
| ClientInterceptor interceptor = new ClientInterceptor() { |
| @Override |
| public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall( |
| MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions, |
| Channel next) { |
| atomic.set(1); |
| return next.newCall(method, callOptions); |
| } |
| }; |
| createChannel(interceptor); |
| assertNotNull(channel.newCall(method, CallOptions.DEFAULT)); |
| assertEquals(1, atomic.get()); |
| } |
| |
| @Test |
| public void callOptionsExecutor() { |
| Metadata headers = new Metadata(); |
| ClientStream mockStream = mock(ClientStream.class); |
| FakeClock callExecutor = new FakeClock(); |
| createChannel(); |
| |
| // Start a call with a call executor |
| CallOptions options = |
| CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); |
| ClientCall<String, Integer> call = channel.newCall(method, options); |
| call.start(mockCallListener, headers); |
| |
| // Make the transport available |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| assertEquals(0, callExecutor.numPendingTasks()); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| // Real streams are started in the call executor if they were previously buffered. |
| assertEquals(1, callExecutor.runDueTasks()); |
| verify(mockTransport).newStream(same(method), same(headers), same(options)); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| |
| // Call listener callbacks are also run in the call executor |
| ClientStreamListener streamListener = streamListenerCaptor.getValue(); |
| Metadata trailers = new Metadata(); |
| assertEquals(0, callExecutor.numPendingTasks()); |
| streamListener.closed(Status.CANCELLED, trailers); |
| verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers)); |
| assertEquals(1, callExecutor.runDueTasks()); |
| verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers)); |
| |
| |
| transportListener.transportShutdown(Status.UNAVAILABLE); |
| transportListener.transportTerminated(); |
| |
| // Clean up as much as possible to allow the channel to terminate. |
| shutdownSafely(helper, subchannel); |
| timer.forwardNanos( |
| TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); |
| } |
| |
| @Test |
| public void nameResolutionFailed() { |
| Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .setError(error) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| // Name resolution is started as soon as channel is created. |
| createChannel(); |
| FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); |
| verify(mockLoadBalancer).handleNameResolutionError(same(error)); |
| assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); |
| |
| timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1); |
| assertEquals(0, resolver.refreshCalled); |
| |
| timer.forwardNanos(1); |
| assertEquals(1, resolver.refreshCalled); |
| verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error)); |
| |
| // Verify an additional name resolution failure does not schedule another timer |
| resolver.refresh(); |
| verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error)); |
| assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); |
| |
| // Allow the next refresh attempt to succeed |
| resolver.error = null; |
| |
| // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2 |
| timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1); |
| assertEquals(2, resolver.refreshCalled); |
| timer.forwardNanos(1); |
| assertEquals(3, resolver.refreshCalled); |
| assertEquals(0, timer.numPendingTasks()); |
| |
| // Verify that the successful resolution reset the backoff policy |
| resolver.listener.onError(error); |
| timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1); |
| assertEquals(3, resolver.refreshCalled); |
| timer.forwardNanos(1); |
| assertEquals(4, resolver.refreshCalled); |
| assertEquals(0, timer.numPendingTasks()); |
| } |
| |
| @Test |
| public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() { |
| Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); |
| |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| // Name resolution is started as soon as channel is created. |
| createChannel(); |
| verify(mockLoadBalancer).handleNameResolutionError(same(error)); |
| |
| FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh(); |
| assertNotNull(nameResolverBackoff); |
| assertFalse(nameResolverBackoff.isCancelled()); |
| |
| // Add a pending call to the delayed transport |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| Metadata headers = new Metadata(); |
| call.start(mockCallListener, headers); |
| |
| // The pending call on the delayed transport stops the name resolver backoff from cancelling |
| channel.shutdown(); |
| assertFalse(nameResolverBackoff.isCancelled()); |
| |
| // Notify that a subchannel is ready, which drains the delayed transport |
| SubchannelPicker picker = mock(SubchannelPicker.class); |
| Status status = Status.UNAVAILABLE.withDescription("for test"); |
| when(picker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withDrop(status)); |
| updateBalancingStateSafely(helper, READY, picker); |
| executor.runDueTasks(); |
| verify(mockCallListener).onClose(same(status), any(Metadata.class)); |
| |
| assertTrue(nameResolverBackoff.isCancelled()); |
| } |
| |
| @Test |
| public void nameResolverReturnsEmptySubLists_becomeErrorByDefault() throws Exception { |
| String errorDescription = "NameResolver returned no usable address"; |
| |
| // Pass a FakeNameResolverFactory with an empty list and LB config |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": { \"setting1\": \"high\" } } ] }"); |
| ManagedChannelServiceConfig parsedServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig)); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| // LoadBalancer received the error |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture()); |
| Status status = statusCaptor.getValue(); |
| assertSame(Status.Code.UNAVAILABLE, status.getCode()); |
| assertThat(status.getDescription()).startsWith(errorDescription); |
| |
| // A resolution retry has been scheduled |
| assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); |
| } |
| |
| @Test |
| public void nameResolverReturnsEmptySubLists_optionallyAllowed() throws Exception { |
| when(mockLoadBalancer.canHandleEmptyAddressListFromNameResolution()).thenReturn(true); |
| |
| // Pass a FakeNameResolverFactory with an empty list and LB config |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| String rawLbConfig = "{ \"setting1\": \"high\" }"; |
| Object parsedLbConfig = new Object(); |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": " + rawLbConfig + " } ] }"); |
| ManagedChannelServiceConfig parsedServiceConfig = |
| createManagedChannelServiceConfig( |
| rawServiceConfig, |
| new PolicySelection( |
| mockLoadBalancerProvider, |
| parsedLbConfig)); |
| nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig)); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| // LoadBalancer received the empty list and the LB config |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).isEmpty(); |
| assertThat(resultCaptor.getValue().getLoadBalancingPolicyConfig()).isEqualTo(parsedLbConfig); |
| |
| // A no resolution retry |
| assertEquals(0, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); |
| } |
| |
| @Test |
| public void loadBalancerThrowsInHandleResolvedAddresses() { |
| RuntimeException ex = new RuntimeException("simulated"); |
| // Delay the success of name resolution until allResolved() is called |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setResolvedAtStart(false) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses(any(ResolvedAddresses.class)); |
| |
| // NameResolver returns addresses. |
| nameResolverFactory.allResolved(); |
| |
| // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode. |
| verifyPanicMode(ex); |
| } |
| |
| @Test |
| public void nameResolvedAfterChannelShutdown() { |
| // Delay the success of name resolution until allResolved() is called. |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| channel.shutdown(); |
| |
| assertTrue(channel.isShutdown()); |
| assertTrue(channel.isTerminated()); |
| verify(mockLoadBalancer).shutdown(); |
| // Name resolved after the channel is shut down, which is possible if the name resolution takes |
| // time and is not cancellable. The resolved address will be dropped. |
| nameResolverFactory.allResolved(); |
| verifyNoMoreInteractions(mockLoadBalancer); |
| } |
| |
| /** |
| * Verify that if the first resolved address points to a server that cannot be connected, the call |
| * will end up with the second address which works. |
| */ |
| @Test |
| public void firstResolvedServerFailedToConnect() throws Exception { |
| final SocketAddress goodAddress = new SocketAddress() { |
| @Override public String toString() { |
| return "goodAddress"; |
| } |
| }; |
| final SocketAddress badAddress = new SocketAddress() { |
| @Override public String toString() { |
| return "badAddress"; |
| } |
| }; |
| InOrder inOrder = inOrder(mockLoadBalancer, subchannelStateListener); |
| |
| List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| // Start the call |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| Metadata headers = new Metadata(); |
| call.start(mockCallListener, headers); |
| executor.runDueTasks(); |
| |
| // Simulate name resolution results |
| EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); |
| inOrder.verify(mockLoadBalancer).handleResolvedAddresses(resolvedAddressCaptor.capture()); |
| assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| requestConnectionSafely(helper, subchannel); |
| inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture()); |
| assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); |
| |
| // The channel will starts with the first address (badAddress) |
| verify(mockTransportFactory) |
| .newClientTransport( |
| same(badAddress), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| verify(mockTransportFactory, times(0)) |
| .newClientTransport( |
| same(goodAddress), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| |
| MockClientTransportInfo badTransportInfo = transports.poll(); |
| // Which failed to connect |
| badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // The channel then try the second address (goodAddress) |
| verify(mockTransportFactory) |
| .newClientTransport( |
| same(goodAddress), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo goodTransportInfo = transports.poll(); |
| when(goodTransportInfo.transport.newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mock(ClientStream.class)); |
| |
| goodTransportInfo.listener.transportReady(); |
| inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture()); |
| assertEquals(READY, stateInfoCaptor.getValue().getState()); |
| |
| // A typical LoadBalancer will call this once the subchannel becomes READY |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| // Delayed transport uses the app executor to create real streams. |
| executor.runDueTasks(); |
| |
| verify(goodTransportInfo.transport).newStream(same(method), same(headers), |
| same(CallOptions.DEFAULT)); |
| // The bad transport was never used. |
| verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class), |
| any(Metadata.class), any(CallOptions.class)); |
| } |
| |
| @Test |
| public void failFastRpcFailFromErrorFromBalancer() { |
| subtestFailRpcFromBalancer(false, false, true); |
| } |
| |
| @Test |
| public void failFastRpcFailFromDropFromBalancer() { |
| subtestFailRpcFromBalancer(false, true, true); |
| } |
| |
| @Test |
| public void waitForReadyRpcImmuneFromErrorFromBalancer() { |
| subtestFailRpcFromBalancer(true, false, false); |
| } |
| |
| @Test |
| public void waitForReadyRpcFailFromDropFromBalancer() { |
| subtestFailRpcFromBalancer(true, true, true); |
| } |
| |
| private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) { |
| createChannel(); |
| |
| // This call will be buffered by the channel, thus involve delayed transport |
| CallOptions callOptions = CallOptions.DEFAULT; |
| if (waitForReady) { |
| callOptions = callOptions.withWaitForReady(); |
| } else { |
| callOptions = callOptions.withoutWaitForReady(); |
| } |
| ClientCall<String, Integer> call1 = channel.newCall(method, callOptions); |
| call1.start(mockCallListener, new Metadata()); |
| |
| SubchannelPicker picker = mock(SubchannelPicker.class); |
| Status status = Status.UNAVAILABLE.withDescription("for test"); |
| |
| when(picker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status)); |
| updateBalancingStateSafely(helper, READY, picker); |
| |
| executor.runDueTasks(); |
| if (shouldFail) { |
| verify(mockCallListener).onClose(same(status), any(Metadata.class)); |
| } else { |
| verifyNoInteractions(mockCallListener); |
| } |
| |
| // This call doesn't involve delayed transport |
| ClientCall<String, Integer> call2 = channel.newCall(method, callOptions); |
| call2.start(mockCallListener2, new Metadata()); |
| |
| executor.runDueTasks(); |
| if (shouldFail) { |
| verify(mockCallListener2).onClose(same(status), any(Metadata.class)); |
| } else { |
| verifyNoInteractions(mockCallListener2); |
| } |
| } |
| |
| /** |
| * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a |
| * wait-for-ready call will still be buffered. |
| */ |
| @Test |
| public void allServersFailedToConnect() throws Exception { |
| final SocketAddress addr1 = new SocketAddress() { |
| @Override public String toString() { |
| return "addr1"; |
| } |
| }; |
| final SocketAddress addr2 = new SocketAddress() { |
| @Override public String toString() { |
| return "addr2"; |
| } |
| }; |
| InOrder inOrder = inOrder(mockLoadBalancer, subchannelStateListener); |
| |
| List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2); |
| |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| // Start a wait-for-ready call |
| ClientCall<String, Integer> call = |
| channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); |
| Metadata headers = new Metadata(); |
| call.start(mockCallListener, headers); |
| // ... and a fail-fast call |
| ClientCall<String, Integer> call2 = |
| channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); |
| call2.start(mockCallListener2, headers); |
| executor.runDueTasks(); |
| |
| // Simulate name resolution results |
| EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); |
| inOrder.verify(mockLoadBalancer).handleResolvedAddresses(resolvedAddressCaptor.capture()); |
| assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| requestConnectionSafely(helper, subchannel); |
| |
| inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture()); |
| assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); |
| |
| // Connecting to server1, which will fail |
| verify(mockTransportFactory) |
| .newClientTransport( |
| same(addr1), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| verify(mockTransportFactory, times(0)) |
| .newClientTransport( |
| same(addr2), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo1 = transports.poll(); |
| transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); |
| |
| // Connecting to server2, which will fail too |
| verify(mockTransportFactory) |
| .newClientTransport( |
| same(addr2), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo2 = transports.poll(); |
| Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect"); |
| transportInfo2.listener.transportShutdown(server2Error); |
| |
| // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated |
| // to LoadBalancer. |
| inOrder.verify(subchannelStateListener).onSubchannelState(stateInfoCaptor.capture()); |
| assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState()); |
| assertSame(server2Error, stateInfoCaptor.getValue().getStatus()); |
| |
| // A typical LoadBalancer would create a picker with error |
| SubchannelPicker picker2 = mock(SubchannelPicker.class); |
| when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withError(server2Error)); |
| updateBalancingStateSafely(helper, TRANSIENT_FAILURE, picker2); |
| executor.runDueTasks(); |
| |
| // ... which fails the fail-fast call |
| verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class)); |
| // ... while the wait-for-ready call stays |
| verifyNoMoreInteractions(mockCallListener); |
| // No real stream was ever created |
| verify(transportInfo1.transport, times(0)) |
| .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| verify(transportInfo2.transport, times(0)) |
| .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| } |
| |
| @Test |
| public void subchannels() { |
| createChannel(); |
| |
| // createSubchannel() always return a new Subchannel |
| Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build(); |
| Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build(); |
| SubchannelStateListener listener1 = mock(SubchannelStateListener.class); |
| SubchannelStateListener listener2 = mock(SubchannelStateListener.class); |
| final Subchannel sub1 = createSubchannelSafely(helper, addressGroup, attrs1, listener1); |
| final Subchannel sub2 = createSubchannelSafely(helper, addressGroup, attrs2, listener2); |
| assertNotSame(sub1, sub2); |
| assertNotSame(attrs1, attrs2); |
| assertSame(attrs1, sub1.getAttributes()); |
| assertSame(attrs2, sub2.getAttributes()); |
| |
| final AtomicBoolean snippetPassed = new AtomicBoolean(false); |
| helper.getSynchronizationContext().execute(new Runnable() { |
| @Override |
| public void run() { |
| // getAddresses() must be called from sync context |
| assertSame(addressGroup, sub1.getAddresses()); |
| assertSame(addressGroup, sub2.getAddresses()); |
| snippetPassed.set(true); |
| } |
| }); |
| assertThat(snippetPassed.get()).isTrue(); |
| |
| // requestConnection() |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), |
| any(ClientTransportOptions.class), |
| any(TransportLogger.class)); |
| requestConnectionSafely(helper, sub1); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| eq(socketAddress), |
| eq(clientTransportOptions), |
| isA(TransportLogger.class)); |
| MockClientTransportInfo transportInfo1 = transports.poll(); |
| assertNotNull(transportInfo1); |
| |
| requestConnectionSafely(helper, sub2); |
| verify(mockTransportFactory, times(2)) |
| .newClientTransport( |
| eq(socketAddress), |
| eq(clientTransportOptions), |
| isA(TransportLogger.class)); |
| MockClientTransportInfo transportInfo2 = transports.poll(); |
| assertNotNull(transportInfo2); |
| |
| requestConnectionSafely(helper, sub1); |
| requestConnectionSafely(helper, sub2); |
| // The subchannel doesn't matter since this isn't called |
| verify(mockTransportFactory, times(2)) |
| .newClientTransport( |
| eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class)); |
| |
| // updateAddresses() |
| updateAddressesSafely(helper, sub1, Collections.singletonList(addressGroup2)); |
| assertThat(((InternalSubchannel) sub1.getInternalSubchannel()).getAddressGroups()) |
| .isEqualTo(Collections.singletonList(addressGroup2)); |
| |
| // shutdown() has a delay |
| shutdownSafely(helper, sub1); |
| timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); |
| shutdownSafely(helper, sub1); |
| verify(transportInfo1.transport, never()).shutdown(any(Status.class)); |
| timer.forwardTime(1, TimeUnit.SECONDS); |
| verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS)); |
| |
| // ... but not after Channel is terminating |
| verify(mockLoadBalancer, never()).shutdown(); |
| channel.shutdown(); |
| verify(mockLoadBalancer).shutdown(); |
| verify(transportInfo2.transport, never()).shutdown(any(Status.class)); |
| |
| shutdownSafely(helper, sub2); |
| verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); |
| |
| // Cleanup |
| transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); |
| transportInfo1.listener.transportTerminated(); |
| transportInfo2.listener.transportShutdown(Status.UNAVAILABLE); |
| transportInfo2.listener.transportTerminated(); |
| timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); |
| } |
| |
| @Test |
| public void subchannelStringableBeforeStart() { |
| createChannel(); |
| Subchannel subchannel = createUnstartedSubchannel(helper, addressGroup, Attributes.EMPTY); |
| assertThat(subchannel.toString()).isNotNull(); |
| } |
| |
| @Test |
| public void subchannelLoggerCreatedBeforeSubchannelStarted() { |
| createChannel(); |
| Subchannel subchannel = createUnstartedSubchannel(helper, addressGroup, Attributes.EMPTY); |
| assertThat(subchannel.getChannelLogger()).isNotNull(); |
| } |
| |
| @Test |
| public void subchannelsWhenChannelShutdownNow() { |
| createChannel(); |
| Subchannel sub1 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| Subchannel sub2 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, sub1); |
| requestConnectionSafely(helper, sub2); |
| |
| assertThat(transports).hasSize(2); |
| MockClientTransportInfo ti1 = transports.poll(); |
| MockClientTransportInfo ti2 = transports.poll(); |
| |
| ti1.listener.transportReady(); |
| ti2.listener.transportReady(); |
| |
| channel.shutdownNow(); |
| verify(ti1.transport).shutdownNow(any(Status.class)); |
| verify(ti2.transport).shutdownNow(any(Status.class)); |
| |
| ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); |
| ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); |
| ti1.listener.transportTerminated(); |
| |
| assertFalse(channel.isTerminated()); |
| ti2.listener.transportTerminated(); |
| assertTrue(channel.isTerminated()); |
| } |
| |
| @Test |
| public void subchannelsNoConnectionShutdown() { |
| createChannel(); |
| Subchannel sub1 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| Subchannel sub2 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| |
| channel.shutdown(); |
| verify(mockLoadBalancer).shutdown(); |
| shutdownSafely(helper, sub1); |
| assertFalse(channel.isTerminated()); |
| shutdownSafely(helper, sub2); |
| assertTrue(channel.isTerminated()); |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| } |
| |
| @Test |
| public void subchannelsNoConnectionShutdownNow() { |
| createChannel(); |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| channel.shutdownNow(); |
| |
| verify(mockLoadBalancer).shutdown(); |
| // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. |
| // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels. |
| assertTrue(channel.isTerminated()); |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| } |
| |
| @Test |
| public void oobchannels() { |
| createChannel(); |
| |
| ManagedChannel oob1 = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oob1authority"); |
| ManagedChannel oob2 = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oob2authority"); |
| verify(balancerRpcExecutorPool, times(2)).getObject(); |
| |
| assertEquals("oob1authority", oob1.authority()); |
| assertEquals("oob2authority", oob2.authority()); |
| |
| // OOB channels create connections lazily. A new call will initiate the connection. |
| Metadata headers = new Metadata(); |
| ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, headers); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| eq(socketAddress), |
| eq(new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)), |
| isA(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| transportInfo.listener.transportReady(); |
| assertEquals(1, balancerRpcExecutor.runDueTasks()); |
| verify(transportInfo.transport).newStream(same(method), same(headers), |
| same(CallOptions.DEFAULT)); |
| |
| // The transport goes away |
| transportInfo.listener.transportShutdown(Status.UNAVAILABLE); |
| transportInfo.listener.transportTerminated(); |
| |
| // A new call will trigger a new transport |
| ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT); |
| call2.start(mockCallListener2, headers); |
| ClientCall<String, Integer> call3 = |
| oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady()); |
| call3.start(mockCallListener3, headers); |
| verify(mockTransportFactory, times(2)).newClientTransport( |
| eq(socketAddress), |
| eq(new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)), |
| isA(ChannelLogger.class)); |
| transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| |
| // This transport fails |
| Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| transportInfo.listener.transportShutdown(transportError); |
| assertTrue(balancerRpcExecutor.runDueTasks() > 0); |
| |
| // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending |
| verify(mockCallListener2).onClose(same(transportError), any(Metadata.class)); |
| verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); |
| |
| // Shutdown |
| assertFalse(oob1.isShutdown()); |
| assertFalse(oob2.isShutdown()); |
| oob1.shutdown(); |
| oob2.shutdownNow(); |
| assertTrue(oob1.isShutdown()); |
| assertTrue(oob2.isShutdown()); |
| assertTrue(oob2.isTerminated()); |
| verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService()); |
| |
| // New RPCs will be rejected. |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT); |
| ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT); |
| call4.start(mockCallListener4, headers); |
| call5.start(mockCallListener5, headers); |
| assertTrue(balancerRpcExecutor.runDueTasks() > 0); |
| verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class)); |
| Status status4 = statusCaptor.getValue(); |
| assertEquals(Status.Code.UNAVAILABLE, status4.getCode()); |
| verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class)); |
| Status status5 = statusCaptor.getValue(); |
| assertEquals(Status.Code.UNAVAILABLE, status5.getCode()); |
| |
| // The pending RPC will still be pending |
| verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); |
| |
| // This will shutdownNow() the delayed transport, terminating the pending RPC |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| oob1.shutdownNow(); |
| assertTrue(balancerRpcExecutor.runDueTasks() > 0); |
| verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class)); |
| |
| // Shut down the channel, and it will not terminated because OOB channel has not. |
| channel.shutdown(); |
| assertFalse(channel.isTerminated()); |
| // Delayed transport has already terminated. Terminating the transport terminates the |
| // subchannel, which in turn terimates the OOB channel, which terminates the channel. |
| assertFalse(oob1.isTerminated()); |
| verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService()); |
| transportInfo.listener.transportTerminated(); |
| assertTrue(oob1.isTerminated()); |
| assertTrue(channel.isTerminated()); |
| verify(balancerRpcExecutorPool, times(2)) |
| .returnObject(balancerRpcExecutor.getScheduledExecutorService()); |
| } |
| |
| @Test |
| public void oobChannelHasNoChannelCallCredentials() { |
| Metadata.Key<String> metadataKey = |
| Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER); |
| String channelCredValue = "channel-provided call cred"; |
| channelBuilder = new ManagedChannelImplBuilder( |
| TARGET, InsecureChannelCredentials.create(), |
| new FakeCallCredentials(metadataKey, channelCredValue), |
| new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); |
| configureBuilder(channelBuilder); |
| createChannel(); |
| |
| // Verify that the normal channel has call creds, to validate configuration |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| transportInfo.listener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| String callCredValue = "per-RPC call cred"; |
| CallOptions callOptions = CallOptions.DEFAULT |
| .withCallCredentials(new FakeCallCredentials(metadataKey, callCredValue)); |
| Metadata headers = new Metadata(); |
| ClientCall<String, Integer> call = channel.newCall(method, callOptions); |
| call.start(mockCallListener, headers); |
| |
| verify(transportInfo.transport).newStream(same(method), same(headers), same(callOptions)); |
| assertThat(headers.getAll(metadataKey)) |
| .containsExactly(channelCredValue, callCredValue).inOrder(); |
| |
| // Verify that the oob channel does not |
| ManagedChannel oob = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oobauthority"); |
| |
| headers = new Metadata(); |
| call = oob.newCall(method, callOptions); |
| call.start(mockCallListener2, headers); |
| |
| transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| transportInfo.listener.transportReady(); |
| balancerRpcExecutor.runDueTasks(); |
| |
| verify(transportInfo.transport).newStream(same(method), same(headers), same(callOptions)); |
| assertThat(headers.getAll(metadataKey)).containsExactly(callCredValue); |
| oob.shutdownNow(); |
| |
| // Verify that resolving oob channel does not |
| oob = helper.createResolvingOobChannelBuilder("oobauthority") |
| .nameResolverFactory( |
| new FakeNameResolverFactory.Builder(URI.create("oobauthority")).build()) |
| .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) |
| .idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) |
| .build(); |
| oob.getState(true); |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture()); |
| Helper oobHelper = helperCaptor.getValue(); |
| |
| subchannel = |
| createSubchannelSafely(oobHelper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(oobHelper, subchannel); |
| transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| transportInfo.listener.transportReady(); |
| SubchannelPicker mockPicker2 = mock(SubchannelPicker.class); |
| when(mockPicker2.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(oobHelper, READY, mockPicker2); |
| |
| headers = new Metadata(); |
| call = oob.newCall(method, callOptions); |
| call.start(mockCallListener2, headers); |
| |
| // CallOptions may contain StreamTracerFactory for census that is added by default. |
| verify(transportInfo.transport).newStream(same(method), same(headers), any(CallOptions.class)); |
| assertThat(headers.getAll(metadataKey)).containsExactly(callCredValue); |
| oob.shutdownNow(); |
| } |
| |
| @Test |
| public void oobChannelWithOobChannelCredsHasChannelCallCredentials() { |
| Metadata.Key<String> metadataKey = |
| Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER); |
| String channelCredValue = "channel-provided call cred"; |
| when(mockTransportFactory.swapChannelCredentials(any(CompositeChannelCredentials.class))) |
| .thenAnswer(new Answer<SwapChannelCredentialsResult>() { |
| @Override |
| public SwapChannelCredentialsResult answer(InvocationOnMock invocation) { |
| CompositeChannelCredentials c = |
| invocation.getArgument(0, CompositeChannelCredentials.class); |
| return new SwapChannelCredentialsResult(mockTransportFactory, c.getCallCredentials()); |
| } |
| }); |
| channelBuilder = new ManagedChannelImplBuilder( |
| TARGET, InsecureChannelCredentials.create(), |
| new FakeCallCredentials(metadataKey, channelCredValue), |
| new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT)); |
| configureBuilder(channelBuilder); |
| createChannel(); |
| |
| // Verify that the normal channel has call creds, to validate configuration |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| transportInfo.listener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| String callCredValue = "per-RPC call cred"; |
| CallOptions callOptions = CallOptions.DEFAULT |
| .withCallCredentials(new FakeCallCredentials(metadataKey, callCredValue)); |
| Metadata headers = new Metadata(); |
| ClientCall<String, Integer> call = channel.newCall(method, callOptions); |
| call.start(mockCallListener, headers); |
| |
| verify(transportInfo.transport).newStream(same(method), same(headers), same(callOptions)); |
| assertThat(headers.getAll(metadataKey)) |
| .containsExactly(channelCredValue, callCredValue).inOrder(); |
| |
| // Verify that resolving oob channel with oob channel creds provides call creds |
| String oobChannelCredValue = "oob-channel-provided call cred"; |
| ChannelCredentials oobChannelCreds = CompositeChannelCredentials.create( |
| InsecureChannelCredentials.create(), |
| new FakeCallCredentials(metadataKey, oobChannelCredValue)); |
| ManagedChannel oob = helper.createResolvingOobChannelBuilder( |
| "fake://oobauthority/", oobChannelCreds) |
| .nameResolverFactory( |
| new FakeNameResolverFactory.Builder(URI.create("fake://oobauthority/")).build()) |
| .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) |
| .idleTimeout(ManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) |
| .build(); |
| oob.getState(true); |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture()); |
| Helper oobHelper = helperCaptor.getValue(); |
| |
| subchannel = |
| createSubchannelSafely(oobHelper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(oobHelper, subchannel); |
| transportInfo = transports.poll(); |
| transportInfo.listener.transportReady(); |
| SubchannelPicker mockPicker2 = mock(SubchannelPicker.class); |
| when(mockPicker2.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(oobHelper, READY, mockPicker2); |
| |
| headers = new Metadata(); |
| call = oob.newCall(method, callOptions); |
| call.start(mockCallListener2, headers); |
| |
| // CallOptions may contain StreamTracerFactory for census that is added by default. |
| verify(transportInfo.transport).newStream(same(method), same(headers), any(CallOptions.class)); |
| assertThat(headers.getAll(metadataKey)) |
| .containsExactly(oobChannelCredValue, callCredValue).inOrder(); |
| oob.shutdownNow(); |
| } |
| |
| @Test |
| public void oobChannelsWhenChannelShutdownNow() { |
| createChannel(); |
| ManagedChannel oob1 = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oob1Authority"); |
| ManagedChannel oob2 = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oob2Authority"); |
| |
| oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); |
| oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata()); |
| |
| assertThat(transports).hasSize(2); |
| MockClientTransportInfo ti1 = transports.poll(); |
| MockClientTransportInfo ti2 = transports.poll(); |
| |
| ti1.listener.transportReady(); |
| ti2.listener.transportReady(); |
| |
| channel.shutdownNow(); |
| verify(ti1.transport).shutdownNow(any(Status.class)); |
| verify(ti2.transport).shutdownNow(any(Status.class)); |
| |
| ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); |
| ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); |
| ti1.listener.transportTerminated(); |
| |
| assertFalse(channel.isTerminated()); |
| ti2.listener.transportTerminated(); |
| assertTrue(channel.isTerminated()); |
| } |
| |
| @Test |
| public void oobChannelsNoConnectionShutdown() { |
| createChannel(); |
| ManagedChannel oob1 = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oob1Authority"); |
| ManagedChannel oob2 = helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oob2Authority"); |
| channel.shutdown(); |
| |
| verify(mockLoadBalancer).shutdown(); |
| oob1.shutdown(); |
| assertTrue(oob1.isTerminated()); |
| assertFalse(channel.isTerminated()); |
| oob2.shutdown(); |
| assertTrue(oob2.isTerminated()); |
| assertTrue(channel.isTerminated()); |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| } |
| |
| @Test |
| public void oobChannelsNoConnectionShutdownNow() { |
| createChannel(); |
| helper.createOobChannel(Collections.singletonList(addressGroup), "oob1Authority"); |
| helper.createOobChannel(Collections.singletonList(addressGroup), "oob2Authority"); |
| channel.shutdownNow(); |
| |
| verify(mockLoadBalancer).shutdown(); |
| assertTrue(channel.isTerminated()); |
| // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. |
| // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels. |
| verify(mockTransportFactory, never()) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| } |
| |
| @Test |
| public void subchannelChannel_normalUsage() { |
| createChannel(); |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| verify(balancerRpcExecutorPool, never()).getObject(); |
| |
| Channel sChannel = subchannel.asChannel(); |
| verify(balancerRpcExecutorPool).getObject(); |
| |
| Metadata headers = new Metadata(); |
| CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS); |
| |
| // Subchannel must be READY when creating the RPC. |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| transportListener.transportReady(); |
| |
| ClientCall<String, Integer> call = sChannel.newCall(method, callOptions); |
| call.start(mockCallListener, headers); |
| verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture()); |
| |
| CallOptions capturedCallOption = callOptionsCaptor.getValue(); |
| assertThat(capturedCallOption.getDeadline()).isSameInstanceAs(callOptions.getDeadline()); |
| assertThat(capturedCallOption.getOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER)).isTrue(); |
| } |
| |
| @Test |
| public void subchannelChannel_failWhenNotReady() { |
| createChannel(); |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| Channel sChannel = subchannel.asChannel(); |
| Metadata headers = new Metadata(); |
| |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| |
| // Subchannel is still CONNECTING, but not READY yet |
| ClientCall<String, Integer> call = sChannel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, headers); |
| verify(mockTransport, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| |
| verifyNoInteractions(mockCallListener); |
| assertEquals(1, balancerRpcExecutor.runDueTasks()); |
| verify(mockCallListener).onClose( |
| same(SubchannelChannel.NOT_READY_ERROR), any(Metadata.class)); |
| } |
| |
| @Test |
| public void subchannelChannel_failWaitForReady() { |
| createChannel(); |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| Channel sChannel = subchannel.asChannel(); |
| Metadata headers = new Metadata(); |
| |
| // Subchannel must be READY when creating the RPC. |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| transportListener.transportReady(); |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| |
| // Wait-for-ready RPC is not allowed |
| ClientCall<String, Integer> call = |
| sChannel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); |
| call.start(mockCallListener, headers); |
| verify(mockTransport, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| |
| verifyNoInteractions(mockCallListener); |
| assertEquals(1, balancerRpcExecutor.runDueTasks()); |
| verify(mockCallListener).onClose( |
| same(SubchannelChannel.WAIT_FOR_READY_ERROR), any(Metadata.class)); |
| } |
| |
| @Test |
| public void lbHelper_getScheduledExecutorService() { |
| createChannel(); |
| |
| ScheduledExecutorService ses = helper.getScheduledExecutorService(); |
| Runnable task = mock(Runnable.class); |
| helper.getSynchronizationContext().schedule(task, 110, TimeUnit.NANOSECONDS, ses); |
| timer.forwardNanos(109); |
| verify(task, never()).run(); |
| timer.forwardNanos(1); |
| verify(task).run(); |
| |
| try { |
| ses.shutdown(); |
| fail("Should throw"); |
| } catch (UnsupportedOperationException e) { |
| // expected |
| } |
| |
| try { |
| ses.shutdownNow(); |
| fail("Should throw"); |
| } catch (UnsupportedOperationException e) { |
| // expected |
| } |
| } |
| |
| @Test |
| public void lbHelper_getNameResolverArgs() { |
| createChannel(); |
| |
| NameResolver.Args args = helper.getNameResolverArgs(); |
| assertThat(args.getDefaultPort()).isEqualTo(DEFAULT_PORT); |
| assertThat(args.getProxyDetector()).isSameInstanceAs(GrpcUtil.DEFAULT_PROXY_DETECTOR); |
| assertThat(args.getSynchronizationContext()) |
| .isSameInstanceAs(helper.getSynchronizationContext()); |
| assertThat(args.getServiceConfigParser()).isNotNull(); |
| } |
| |
| @Test |
| public void lbHelper_getNameResolverRegistry() { |
| createChannel(); |
| |
| assertThat(helper.getNameResolverRegistry()) |
| .isSameInstanceAs(NameResolverRegistry.getDefaultRegistry()); |
| } |
| |
| @Test |
| public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() { |
| subtestNameResolutionRefreshWhenConnectionFailed(false, false); |
| } |
| |
| @Test |
| public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() { |
| subtestNameResolutionRefreshWhenConnectionFailed(true, false); |
| } |
| |
| @Test |
| public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() { |
| subtestNameResolutionRefreshWhenConnectionFailed(false, true); |
| } |
| |
| @Test |
| public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() { |
| subtestNameResolutionRefreshWhenConnectionFailed(true, true); |
| } |
| |
| private void subtestNameResolutionRefreshWhenConnectionFailed( |
| boolean isOobChannel, boolean isIdle) { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| if (isOobChannel) { |
| OobChannel oobChannel = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oobAuthority"); |
| oobChannel.getSubchannel().requestConnection(); |
| } else { |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| } |
| |
| MockClientTransportInfo transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| |
| FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); |
| |
| if (isIdle) { |
| channel.enterIdle(); |
| // Entering idle mode will result in a new resolver |
| resolver = nameResolverFactory.resolvers.remove(0); |
| } |
| |
| assertEquals(0, nameResolverFactory.resolvers.size()); |
| |
| int expectedRefreshCount = 0; |
| |
| // Transport closed when connecting |
| assertEquals(expectedRefreshCount, resolver.refreshCalled); |
| transportInfo.listener.transportShutdown(Status.UNAVAILABLE); |
| // When channel enters idle, new resolver is created but not started. |
| if (!isIdle) { |
| expectedRefreshCount++; |
| } |
| assertEquals(expectedRefreshCount, resolver.refreshCalled); |
| |
| timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); |
| transportInfo = transports.poll(); |
| assertNotNull(transportInfo); |
| |
| transportInfo.listener.transportReady(); |
| |
| // Transport closed when ready |
| assertEquals(expectedRefreshCount, resolver.refreshCalled); |
| transportInfo.listener.transportShutdown(Status.UNAVAILABLE); |
| // When channel enters idle, new resolver is created but not started. |
| if (!isIdle) { |
| expectedRefreshCount++; |
| } |
| assertEquals(expectedRefreshCount, resolver.refreshCalled); |
| } |
| |
| @Test |
| public void uriPattern() { |
| assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches()); |
| assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches()); |
| assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched |
| assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched |
| assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched |
| assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched |
| } |
| |
| /** |
| * Test that information such as the Call's context, MethodDescriptor, authority, executor are |
| * propagated to newStream() and applyRequestMetadata(). |
| */ |
| @Test |
| public void informationPropagatedToNewStreamAndCallCredentials() { |
| createChannel(); |
| CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds); |
| final Context.Key<String> testKey = Context.key("testing"); |
| Context ctx = Context.current().withValue(testKey, "testValue"); |
| final LinkedList<Context> credsApplyContexts = new LinkedList<>(); |
| final LinkedList<Context> newStreamContexts = new LinkedList<>(); |
| doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock in) throws Throwable { |
| credsApplyContexts.add(Context.current()); |
| return null; |
| } |
| }).when(creds).applyRequestMetadata( |
| any(RequestInfo.class), any(Executor.class), any(CallCredentials.MetadataApplier.class)); |
| |
| // First call will be on delayed transport. Only newCall() is run within the expected context, |
| // so that we can verify that the context is explicitly attached before calling newStream() and |
| // applyRequestMetadata(), which happens after we detach the context from the thread. |
| Context origCtx = ctx.attach(); |
| assertEquals("testValue", testKey.get()); |
| ClientCall<String, Integer> call = channel.newCall(method, callOptions); |
| ctx.detach(origCtx); |
| assertNull(testKey.get()); |
| call.start(mockCallListener, new Metadata()); |
| |
| // Simulate name resolution results |
| EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| verify(mockTransportFactory) |
| .newClientTransport( |
| same(socketAddress), eq(clientTransportOptions), any(ChannelLogger.class)); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| final ConnectionClientTransport transport = transportInfo.transport; |
| when(transport.getAttributes()).thenReturn(Attributes.EMPTY); |
| doAnswer(new Answer<ClientStream>() { |
| @Override |
| public ClientStream answer(InvocationOnMock in) throws Throwable { |
| newStreamContexts.add(Context.current()); |
| return mock(ClientStream.class); |
| } |
| }).when(transport).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| |
| verify(creds, never()).applyRequestMetadata( |
| any(RequestInfo.class), any(Executor.class), any(CallCredentials.MetadataApplier.class)); |
| |
| // applyRequestMetadata() is called after the transport becomes ready. |
| transportInfo.listener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| executor.runDueTasks(); |
| ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null); |
| ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); |
| verify(creds).applyRequestMetadata(infoCaptor.capture(), |
| same(executor.getScheduledExecutorService()), applierCaptor.capture()); |
| assertEquals("testValue", testKey.get(credsApplyContexts.poll())); |
| assertEquals(AUTHORITY, infoCaptor.getValue().getAuthority()); |
| assertEquals(SecurityLevel.NONE, infoCaptor.getValue().getSecurityLevel()); |
| verify(transport, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| |
| // newStream() is called after apply() is called |
| applierCaptor.getValue().apply(new Metadata()); |
| verify(transport).newStream(same(method), any(Metadata.class), same(callOptions)); |
| assertEquals("testValue", testKey.get(newStreamContexts.poll())); |
| // The context should not live beyond the scope of newStream() and applyRequestMetadata() |
| assertNull(testKey.get()); |
| |
| |
| // Second call will not be on delayed transport |
| origCtx = ctx.attach(); |
| call = channel.newCall(method, callOptions); |
| ctx.detach(origCtx); |
| call.start(mockCallListener, new Metadata()); |
| |
| verify(creds, times(2)).applyRequestMetadata(infoCaptor.capture(), |
| same(executor.getScheduledExecutorService()), applierCaptor.capture()); |
| assertEquals("testValue", testKey.get(credsApplyContexts.poll())); |
| assertEquals(AUTHORITY, infoCaptor.getValue().getAuthority()); |
| assertEquals(SecurityLevel.NONE, infoCaptor.getValue().getSecurityLevel()); |
| // This is from the first call |
| verify(transport).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| |
| // Still, newStream() is called after apply() is called |
| applierCaptor.getValue().apply(new Metadata()); |
| verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions)); |
| assertEquals("testValue", testKey.get(newStreamContexts.poll())); |
| |
| assertNull(testKey.get()); |
| } |
| |
| @Test |
| public void pickerReturnsStreamTracer_noDelay() { |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); |
| ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); |
| createChannel(); |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| transportInfo.listener.transportReady(); |
| ClientTransport mockTransport = transportInfo.transport; |
| when(mockTransport.newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel, factory2)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); |
| ClientCall<String, Integer> call = channel.newCall(method, callOptions); |
| call.start(mockCallListener, new Metadata()); |
| |
| verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); |
| verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); |
| assertEquals( |
| Arrays.asList(factory1, factory2), |
| callOptionsCaptor.getValue().getStreamTracerFactories()); |
| // The factories are safely not stubbed because we do not expect any usage of them. |
| verifyNoInteractions(factory1); |
| verifyNoInteractions(factory2); |
| } |
| |
| @Test |
| public void pickerReturnsStreamTracer_delayed() { |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); |
| ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); |
| createChannel(); |
| |
| CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); |
| ClientCall<String, Integer> call = channel.newCall(method, callOptions); |
| call.start(mockCallListener, new Metadata()); |
| |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| transportInfo.listener.transportReady(); |
| ClientTransport mockTransport = transportInfo.transport; |
| when(mockTransport.newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel, factory2)); |
| |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| assertEquals(1, executor.runDueTasks()); |
| |
| verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); |
| verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); |
| assertEquals( |
| Arrays.asList(factory1, factory2), |
| callOptionsCaptor.getValue().getStreamTracerFactories()); |
| // The factories are safely not stubbed because we do not expect any usage of them. |
| verifyNoInteractions(factory1); |
| verifyNoInteractions(factory2); |
| } |
| |
| @Test |
| public void getState_loadBalancerSupportsChannelState() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); |
| createChannel(); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| updateBalancingStateSafely(helper, TRANSIENT_FAILURE, mockPicker); |
| assertEquals(TRANSIENT_FAILURE, channel.getState(false)); |
| } |
| |
| @Test |
| public void getState_withRequestConnect() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); |
| requestConnection = false; |
| createChannel(); |
| |
| assertEquals(IDLE, channel.getState(false)); |
| verify(mockLoadBalancerProvider, never()).newLoadBalancer(any(Helper.class)); |
| |
| // call getState() with requestConnection = true |
| assertEquals(IDLE, channel.getState(true)); |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| assertEquals(CONNECTING, channel.getState(false)); |
| assertEquals(CONNECTING, channel.getState(true)); |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Test |
| public void getState_withRequestConnect_IdleWithLbRunning() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); |
| createChannel(); |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| |
| updateBalancingStateSafely(helper, IDLE, mockPicker); |
| |
| assertEquals(IDLE, channel.getState(true)); |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| verify(mockPicker).requestConnection(); |
| verify(mockLoadBalancer).requestConnection(); |
| } |
| |
| @Test |
| public void notifyWhenStateChanged() { |
| final AtomicBoolean stateChanged = new AtomicBoolean(); |
| Runnable onStateChanged = new Runnable() { |
| @Override |
| public void run() { |
| stateChanged.set(true); |
| } |
| }; |
| |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); |
| createChannel(); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| channel.notifyWhenStateChanged(IDLE, onStateChanged); |
| executor.runDueTasks(); |
| assertFalse(stateChanged.get()); |
| |
| // state change from IDLE to CONNECTING |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| // onStateChanged callback should run |
| executor.runDueTasks(); |
| assertTrue(stateChanged.get()); |
| |
| // clear and test form CONNECTING |
| stateChanged.set(false); |
| channel.notifyWhenStateChanged(IDLE, onStateChanged); |
| // onStateChanged callback should run immediately |
| executor.runDueTasks(); |
| assertTrue(stateChanged.get()); |
| } |
| |
| @Test |
| public void channelStateWhenChannelShutdown() { |
| final AtomicBoolean stateChanged = new AtomicBoolean(); |
| Runnable onStateChanged = new Runnable() { |
| @Override |
| public void run() { |
| stateChanged.set(true); |
| } |
| }; |
| |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); |
| createChannel(); |
| assertEquals(IDLE, channel.getState(false)); |
| channel.notifyWhenStateChanged(IDLE, onStateChanged); |
| executor.runDueTasks(); |
| assertFalse(stateChanged.get()); |
| |
| channel.shutdown(); |
| assertEquals(SHUTDOWN, channel.getState(false)); |
| executor.runDueTasks(); |
| assertTrue(stateChanged.get()); |
| |
| stateChanged.set(false); |
| channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged); |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| |
| assertEquals(SHUTDOWN, channel.getState(false)); |
| executor.runDueTasks(); |
| assertFalse(stateChanged.get()); |
| } |
| |
| @Test |
| public void stateIsIdleOnIdleTimeout() { |
| long idleTimeoutMillis = 2000L; |
| channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); |
| createChannel(); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| assertEquals(CONNECTING, channel.getState(false)); |
| |
| timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); |
| assertEquals(IDLE, channel.getState(false)); |
| } |
| |
| @Test |
| public void panic_whenIdle() { |
| subtestPanic(IDLE); |
| } |
| |
| @Test |
| public void panic_whenConnecting() { |
| subtestPanic(CONNECTING); |
| } |
| |
| @Test |
| public void panic_whenTransientFailure() { |
| subtestPanic(TRANSIENT_FAILURE); |
| } |
| |
| @Test |
| public void panic_whenReady() { |
| subtestPanic(READY); |
| } |
| |
| private void subtestPanic(ConnectivityState initialState) { |
| assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState); |
| long idleTimeoutMillis = 2000L; |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); |
| createChannel(); |
| |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| assertThat(nameResolverFactory.resolvers).hasSize(1); |
| FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); |
| |
| final Throwable panicReason = new Exception("Simulated uncaught exception"); |
| if (initialState == IDLE) { |
| timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); |
| } else { |
| updateBalancingStateSafely(helper, initialState, mockPicker); |
| } |
| assertEquals(initialState, channel.getState(false)); |
| |
| if (initialState == IDLE) { |
| // IDLE mode will shutdown resolver and balancer |
| verify(mockLoadBalancer).shutdown(); |
| assertTrue(resolver.shutdown); |
| // A new resolver is created |
| assertThat(nameResolverFactory.resolvers).hasSize(1); |
| resolver = nameResolverFactory.resolvers.remove(0); |
| assertFalse(resolver.shutdown); |
| } else { |
| verify(mockLoadBalancer, never()).shutdown(); |
| assertFalse(resolver.shutdown); |
| } |
| |
| // Make channel panic! |
| channel.syncContext.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| channel.panic(panicReason); |
| } |
| }); |
| |
| // Calls buffered in delayedTransport will fail |
| |
| // Resolver and balancer are shutdown |
| verify(mockLoadBalancer).shutdown(); |
| assertTrue(resolver.shutdown); |
| |
| // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it. |
| assertEquals(TRANSIENT_FAILURE, channel.getState(true)); |
| assertEquals(TRANSIENT_FAILURE, channel.getState(true)); |
| verifyPanicMode(panicReason); |
| |
| // Besides the resolver created initially, no new resolver or balancer are created. |
| verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); |
| assertThat(nameResolverFactory.resolvers).isEmpty(); |
| |
| // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be |
| // able to revive it. |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| verifyPanicMode(panicReason); |
| |
| // Cannot be revived by exitIdleMode() |
| channel.syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| channel.exitIdleMode(); |
| } |
| }); |
| verifyPanicMode(panicReason); |
| |
| // Can still shutdown normally |
| channel.shutdown(); |
| assertTrue(channel.isShutdown()); |
| assertTrue(channel.isTerminated()); |
| assertEquals(SHUTDOWN, channel.getState(false)); |
| |
| // We didn't stub mockPicker, because it should have never been called in this test. |
| verifyNoInteractions(mockPicker); |
| } |
| |
| @Test |
| public void panic_bufferedCallsWillFail() { |
| createChannel(); |
| |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withNoResult()); |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| |
| // Start RPCs that will be buffered in delayedTransport |
| ClientCall<String, Integer> call = |
| channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); |
| call.start(mockCallListener, new Metadata()); |
| |
| ClientCall<String, Integer> call2 = |
| channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); |
| call2.start(mockCallListener2, new Metadata()); |
| |
| executor.runDueTasks(); |
| verifyNoInteractions(mockCallListener, mockCallListener2); |
| |
| // Enter panic |
| final Throwable panicReason = new Exception("Simulated uncaught exception"); |
| channel.syncContext.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| channel.panic(panicReason); |
| } |
| }); |
| |
| // Buffered RPCs fail immediately |
| executor.runDueTasks(); |
| verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason); |
| verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason); |
| panicExpected = true; |
| } |
| |
| private void verifyPanicMode(Throwable cause) { |
| panicExpected = true; |
| @SuppressWarnings("unchecked") |
| ClientCall.Listener<Integer> mockListener = |
| (ClientCall.Listener<Integer>) mock(ClientCall.Listener.class); |
| assertEquals(TRANSIENT_FAILURE, channel.getState(false)); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockListener, new Metadata()); |
| executor.runDueTasks(); |
| verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause); |
| |
| // Channel is dead. No more pending task to possibly revive it. |
| assertEquals(0, timer.numPendingTasks()); |
| assertEquals(0, executor.numPendingTasks()); |
| assertEquals(0, balancerRpcExecutor.numPendingTasks()); |
| } |
| |
| private void verifyCallListenerClosed( |
| ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) { |
| ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null); |
| verify(listener).onClose(captor.capture(), any(Metadata.class)); |
| Status rpcStatus = captor.getValue(); |
| assertEquals(code, rpcStatus.getCode()); |
| assertSame(cause, rpcStatus.getCause()); |
| verifyNoMoreInteractions(listener); |
| } |
| |
| @Test |
| public void idleTimeoutAndReconnect() { |
| long idleTimeoutMillis = 2000L; |
| channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); |
| createChannel(); |
| |
| timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); |
| assertEquals(IDLE, channel.getState(true /* request connection */)); |
| |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| // Two times of requesting connection will create loadBalancer twice. |
| verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture()); |
| Helper helper2 = helperCaptor.getValue(); |
| |
| // Updating on the old helper (whose balancer has been shutdown) does not change the channel |
| // state. |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| updateBalancingStateSafely(helper2, CONNECTING, mockPicker); |
| assertEquals(CONNECTING, channel.getState(false)); |
| } |
| |
| @Test |
| public void idleMode_resetsDelayedTransportPicker() { |
| ClientStream mockStream = mock(ClientStream.class); |
| Status pickError = Status.UNAVAILABLE.withDescription("pick result error"); |
| long idleTimeoutMillis = 1000L; |
| channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build()); |
| createChannel(); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| // This call will be buffered in delayedTransport |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| |
| // Move channel into TRANSIENT_FAILURE, which will fail the pending call |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withError(pickError)); |
| updateBalancingStateSafely(helper, TRANSIENT_FAILURE, mockPicker); |
| assertEquals(TRANSIENT_FAILURE, channel.getState(false)); |
| executor.runDueTasks(); |
| verify(mockCallListener).onClose(same(pickError), any(Metadata.class)); |
| |
| // Move channel to idle |
| timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| // This call should be buffered, but will move the channel out of idle |
| ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); |
| call2.start(mockCallListener2, new Metadata()); |
| executor.runDueTasks(); |
| verifyNoMoreInteractions(mockCallListener2); |
| |
| // Get the helper created on exiting idle |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture()); |
| Helper helper2 = helperCaptor.getValue(); |
| |
| // Establish a connection |
| Subchannel subchannel = |
| createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper2, READY, mockPicker); |
| assertEquals(READY, channel.getState(false)); |
| executor.runDueTasks(); |
| |
| // Verify the buffered call was drained |
| verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); |
| verify(mockStream).start(any(ClientStreamListener.class)); |
| } |
| |
| @Test |
| public void enterIdleEntersIdle() { |
| createChannel(); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| assertEquals(READY, channel.getState(false)); |
| |
| channel.enterIdle(); |
| |
| assertEquals(IDLE, channel.getState(false)); |
| } |
| |
| @Test |
| public void enterIdleAfterIdleTimerIsNoOp() { |
| long idleTimeoutMillis = 2000L; |
| channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); |
| createChannel(); |
| timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| channel.enterIdle(); |
| |
| assertEquals(IDLE, channel.getState(false)); |
| } |
| |
| @Test |
| public void enterIdle_exitsIdleIfDelayedStreamPending() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| // Start a call that will be buffered in delayedTransport |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| |
| // enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed |
| // call |
| channel.enterIdle(); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| // enterIdle() will restart the delayed call by exiting idle. This creates a new helper. |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| verify(mockLoadBalancerProvider, times(2)).newLoadBalancer(helperCaptor.capture()); |
| Helper helper2 = helperCaptor.getValue(); |
| |
| // Establish a connection |
| Subchannel subchannel = |
| createSubchannelSafely(helper2, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| ClientStream mockStream = mock(ClientStream.class); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| updateBalancingStateSafely(helper2, READY, mockPicker); |
| assertEquals(READY, channel.getState(false)); |
| |
| // Verify the original call was drained |
| executor.runDueTasks(); |
| verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); |
| verify(mockStream).start(any(ClientStreamListener.class)); |
| } |
| |
| @Test |
| public void updateBalancingStateDoesUpdatePicker() { |
| ClientStream mockStream = mock(ClientStream.class); |
| createChannel(); |
| |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| |
| // Make the transport available with subchannel2 |
| Subchannel subchannel1 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| Subchannel subchannel2 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel2); |
| |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel1)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| executor.runDueTasks(); |
| verify(mockTransport, never()) |
| .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| verify(mockStream, never()).start(any(ClientStreamListener.class)); |
| |
| |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel2)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| executor.runDueTasks(); |
| verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); |
| verify(mockStream).start(any(ClientStreamListener.class)); |
| } |
| |
| @Test |
| public void updateBalancingState_withWrappedSubchannel() { |
| ClientStream mockStream = mock(ClientStream.class); |
| createChannel(); |
| |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| |
| final Subchannel subchannel1 = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel1); |
| |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| transportListener.transportReady(); |
| |
| Subchannel wrappedSubchannel1 = new ForwardingSubchannel() { |
| @Override |
| protected Subchannel delegate() { |
| return subchannel1; |
| } |
| }; |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(wrappedSubchannel1)); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| executor.runDueTasks(); |
| verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); |
| verify(mockStream).start(any(ClientStreamListener.class)); |
| } |
| |
| @Test |
| public void updateBalancingStateWithShutdownShouldBeIgnored() { |
| channelBuilder.nameResolverFactory( |
| new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); |
| createChannel(); |
| assertEquals(IDLE, channel.getState(false)); |
| |
| Runnable onStateChanged = mock(Runnable.class); |
| channel.notifyWhenStateChanged(IDLE, onStateChanged); |
| |
| updateBalancingStateSafely(helper, SHUTDOWN, mockPicker); |
| |
| assertEquals(IDLE, channel.getState(false)); |
| executor.runDueTasks(); |
| verify(onStateChanged, never()).run(); |
| } |
| |
| @Test |
| public void balancerRefreshNameResolution() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); |
| int initialRefreshCount = resolver.refreshCalled; |
| refreshNameResolutionSafely(helper); |
| assertEquals(initialRefreshCount + 1, resolver.refreshCalled); |
| } |
| |
| @Test |
| public void resetConnectBackoff() { |
| // Start with a name resolution failure to trigger backoff attempts |
| Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| // Name resolution is started as soon as channel is created. |
| createChannel(); |
| FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); |
| verify(mockLoadBalancer).handleNameResolutionError(same(error)); |
| |
| FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh(); |
| assertNotNull("There should be a name resolver backoff task", nameResolverBackoff); |
| assertEquals(0, resolver.refreshCalled); |
| |
| // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff |
| channel.resetConnectBackoff(); |
| assertEquals(1, resolver.refreshCalled); |
| assertTrue(nameResolverBackoff.isCancelled()); |
| |
| // Simulate a race between cancel and the task scheduler. Should be a no-op. |
| nameResolverBackoff.command.run(); |
| assertEquals(1, resolver.refreshCalled); |
| |
| // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1 |
| timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); |
| assertEquals(2, resolver.refreshCalled); |
| } |
| |
| @Test |
| public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); |
| assertEquals(0, nameResolver.refreshCalled); |
| |
| channel.resetConnectBackoff(); |
| |
| assertEquals(0, nameResolver.refreshCalled); |
| } |
| |
| @Test |
| public void resetConnectBackoff_noOpWhenChannelShutdown() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| channel.shutdown(); |
| assertTrue(channel.isShutdown()); |
| channel.resetConnectBackoff(); |
| |
| FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); |
| assertEquals(0, nameResolver.refreshCalled); |
| } |
| |
| @Test |
| public void resetConnectBackoff_noOpWhenNameResolverNotStarted() { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| requestConnection = false; |
| createChannel(); |
| |
| channel.resetConnectBackoff(); |
| |
| FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); |
| assertEquals(0, nameResolver.refreshCalled); |
| } |
| |
| @Test |
| public void channelsAndSubchannels_instrumented_name() throws Exception { |
| createChannel(); |
| assertEquals(TARGET, getStats(channel).target); |
| |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| assertEquals(Collections.singletonList(addressGroup).toString(), |
| getStats((AbstractSubchannel) subchannel).target); |
| } |
| |
| @Test |
| public void channelTracing_channelCreationEvent() throws Exception { |
| timer.forwardNanos(1234); |
| channelBuilder.maxTraceEvents(10); |
| createChannel(); |
| assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Channel for 'fake://fake.example.com' created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_subchannelCreationEvents() throws Exception { |
| channelBuilder.maxTraceEvents(10); |
| createChannel(); |
| timer.forwardNanos(1234); |
| AbstractSubchannel subchannel = |
| (AbstractSubchannel) createSubchannelSafely( |
| helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Child Subchannel started") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .setSubchannelRef(subchannel.getInstrumentedInternalSubchannel()) |
| .build()); |
| assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Subchannel for [[[test-addr]/{}]] created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_nameResolvingErrorEvent() throws Exception { |
| timer.forwardNanos(1234); |
| channelBuilder.maxTraceEvents(10); |
| |
| Status error = Status.UNAVAILABLE.withDescription("simulated error"); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Failed to resolve name: " + error) |
| .setSeverity(ChannelTrace.Event.Severity.CT_WARNING) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_nameResolvedEvent() throws Exception { |
| timer.forwardNanos(1234); |
| channelBuilder.maxTraceEvents(10); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Address resolved: " |
| + Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception { |
| timer.forwardNanos(1234); |
| channelBuilder.maxTraceEvents(10); |
| List<EquivalentAddressGroup> servers = new ArrayList<>(); |
| servers.add(new EquivalentAddressGroup(socketAddress)); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| int prevSize = getStats(channel).channelTrace.events.size(); |
| ResolutionResult resolutionResult1 = ResolutionResult.newBuilder() |
| .setAddresses(Collections.singletonList( |
| new EquivalentAddressGroup( |
| Arrays.asList(new SocketAddress() {}, new SocketAddress() {})))) |
| .build(); |
| nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult1); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); |
| |
| prevSize = getStats(channel).channelTrace.events.size(); |
| nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); |
| |
| prevSize = getStats(channel).channelTrace.events.size(); |
| nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); |
| |
| prevSize = getStats(channel).channelTrace.events.size(); |
| ResolutionResult resolutionResult2 = ResolutionResult.newBuilder() |
| .setAddresses(Collections.singletonList( |
| new EquivalentAddressGroup( |
| Arrays.asList(new SocketAddress() {}, new SocketAddress() {})))) |
| .build(); |
| nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); |
| } |
| |
| @Test |
| public void channelTracing_serviceConfigChange() throws Exception { |
| timer.forwardNanos(1234); |
| channelBuilder.maxTraceEvents(10); |
| List<EquivalentAddressGroup> servers = new ArrayList<>(); |
| servers.add(new EquivalentAddressGroup(socketAddress)); |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| createChannel(); |
| |
| int prevSize = getStats(channel).channelTrace.events.size(); |
| ManagedChannelServiceConfig mcsc1 = createManagedChannelServiceConfig( |
| ImmutableMap.<String, Object>of(), |
| new PolicySelection( |
| mockLoadBalancerProvider, null)); |
| ResolutionResult resolutionResult1 = ResolutionResult.newBuilder() |
| .setAddresses(Collections.singletonList( |
| new EquivalentAddressGroup( |
| Arrays.asList(new SocketAddress() {}, new SocketAddress() {})))) |
| .setServiceConfig(ConfigOrError.fromConfig(mcsc1)) |
| .build(); |
| nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult1); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); |
| assertThat(getStats(channel).channelTrace.events.get(prevSize)) |
| .isEqualTo(new ChannelTrace.Event.Builder() |
| .setDescription("Service config changed") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| |
| prevSize = getStats(channel).channelTrace.events.size(); |
| ResolutionResult resolutionResult2 = ResolutionResult.newBuilder().setAddresses( |
| Collections.singletonList( |
| new EquivalentAddressGroup( |
| Arrays.asList(new SocketAddress() {}, new SocketAddress() {})))) |
| .setServiceConfig(ConfigOrError.fromConfig(mcsc1)) |
| .build(); |
| nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); |
| |
| prevSize = getStats(channel).channelTrace.events.size(); |
| timer.forwardNanos(1234); |
| ResolutionResult resolutionResult3 = ResolutionResult.newBuilder() |
| .setAddresses(Collections.singletonList( |
| new EquivalentAddressGroup( |
| Arrays.asList(new SocketAddress() {}, new SocketAddress() {})))) |
| .setServiceConfig(ConfigOrError.fromConfig(ManagedChannelServiceConfig.empty())) |
| .build(); |
| nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult3); |
| assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); |
| assertThat(getStats(channel).channelTrace.events.get(prevSize)) |
| .isEqualTo(new ChannelTrace.Event.Builder() |
| .setDescription("Service config changed") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_stateChangeEvent() throws Exception { |
| channelBuilder.maxTraceEvents(10); |
| createChannel(); |
| timer.forwardNanos(1234); |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Entering CONNECTING state with picker: mockPicker") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_subchannelStateChangeEvent() throws Exception { |
| channelBuilder.maxTraceEvents(10); |
| createChannel(); |
| AbstractSubchannel subchannel = |
| (AbstractSubchannel) createSubchannelSafely( |
| helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| timer.forwardNanos(1234); |
| ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport(); |
| assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("CONNECTING as requested") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_oobChannelStateChangeEvent() throws Exception { |
| channelBuilder.maxTraceEvents(10); |
| createChannel(); |
| OobChannel oobChannel = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), "authority"); |
| timer.forwardNanos(1234); |
| oobChannel.handleSubchannelStateChange( |
| ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); |
| assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Entering CONNECTING state") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelTracing_oobChannelCreationEvents() throws Exception { |
| channelBuilder.maxTraceEvents(10); |
| createChannel(); |
| timer.forwardNanos(1234); |
| OobChannel oobChannel = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), "authority"); |
| assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("Child OobChannel created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .setChannelRef(oobChannel) |
| .build()); |
| assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() |
| .setDescription("OobChannel for [[[test-addr]/{}]] created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains( |
| new ChannelTrace.Event.Builder() |
| .setDescription("Subchannel for [[[test-addr]/{}]] created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void channelsAndSubchannels_instrumented_state() throws Exception { |
| createChannel(); |
| |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| |
| assertEquals(IDLE, getStats(channel).state); |
| updateBalancingStateSafely(helper, CONNECTING, mockPicker); |
| assertEquals(CONNECTING, getStats(channel).state); |
| |
| AbstractSubchannel subchannel = |
| (AbstractSubchannel) createSubchannelSafely( |
| helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| |
| assertEquals(IDLE, getStats(subchannel).state); |
| requestConnectionSafely(helper, subchannel); |
| assertEquals(CONNECTING, getStats(subchannel).state); |
| |
| MockClientTransportInfo transportInfo = transports.poll(); |
| |
| assertEquals(CONNECTING, getStats(subchannel).state); |
| transportInfo.listener.transportReady(); |
| assertEquals(READY, getStats(subchannel).state); |
| |
| assertEquals(CONNECTING, getStats(channel).state); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| assertEquals(READY, getStats(channel).state); |
| |
| channel.shutdownNow(); |
| assertEquals(SHUTDOWN, getStats(channel).state); |
| assertEquals(SHUTDOWN, getStats(subchannel).state); |
| } |
| |
| @Test |
| public void channelStat_callStarted() throws Exception { |
| createChannel(); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| assertEquals(0, getStats(channel).callsStarted); |
| call.start(mockCallListener, new Metadata()); |
| assertEquals(1, getStats(channel).callsStarted); |
| assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos); |
| } |
| |
| @Test |
| public void channelsAndSubChannels_instrumented_success() throws Exception { |
| channelsAndSubchannels_instrumented0(true); |
| } |
| |
| @Test |
| public void channelsAndSubChannels_instrumented_fail() throws Exception { |
| channelsAndSubchannels_instrumented0(false); |
| } |
| |
| private void channelsAndSubchannels_instrumented0(boolean success) throws Exception { |
| createChannel(); |
| |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| |
| // Channel stat bumped when ClientCall.start() called |
| assertEquals(0, getStats(channel).callsStarted); |
| call.start(mockCallListener, new Metadata()); |
| assertEquals(1, getStats(channel).callsStarted); |
| |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class); |
| AbstractSubchannel subchannel = |
| (AbstractSubchannel) createSubchannelSafely( |
| helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| transportInfo.listener.transportReady(); |
| ClientTransport mockTransport = transportInfo.transport; |
| when(mockTransport.newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel, factory)); |
| |
| // subchannel stat bumped when call gets assigned to it |
| assertEquals(0, getStats(subchannel).callsStarted); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| assertEquals(1, executor.runDueTasks()); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| assertEquals(1, getStats(subchannel).callsStarted); |
| |
| ClientStreamListener streamListener = streamListenerCaptor.getValue(); |
| call.halfClose(); |
| |
| // closing stream listener affects subchannel stats immediately |
| assertEquals(0, getStats(subchannel).callsSucceeded); |
| assertEquals(0, getStats(subchannel).callsFailed); |
| streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); |
| if (success) { |
| assertEquals(1, getStats(subchannel).callsSucceeded); |
| assertEquals(0, getStats(subchannel).callsFailed); |
| } else { |
| assertEquals(0, getStats(subchannel).callsSucceeded); |
| assertEquals(1, getStats(subchannel).callsFailed); |
| } |
| |
| // channel stats bumped when the ClientCall.Listener is notified |
| assertEquals(0, getStats(channel).callsSucceeded); |
| assertEquals(0, getStats(channel).callsFailed); |
| executor.runDueTasks(); |
| if (success) { |
| assertEquals(1, getStats(channel).callsSucceeded); |
| assertEquals(0, getStats(channel).callsFailed); |
| } else { |
| assertEquals(0, getStats(channel).callsSucceeded); |
| assertEquals(1, getStats(channel).callsFailed); |
| } |
| } |
| |
| @Test |
| public void channelsAndSubchannels_oob_instrumented_success() throws Exception { |
| channelsAndSubchannels_oob_instrumented0(true); |
| } |
| |
| @Test |
| public void channelsAndSubchannels_oob_instrumented_fail() throws Exception { |
| channelsAndSubchannels_oob_instrumented0(false); |
| } |
| |
| private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception { |
| // set up |
| ClientStream mockStream = mock(ClientStream.class); |
| createChannel(); |
| |
| OobChannel oobChannel = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oobauthority"); |
| AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel(); |
| FakeClock callExecutor = new FakeClock(); |
| CallOptions options = |
| CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); |
| ClientCall<String, Integer> call = oobChannel.newCall(method, options); |
| Metadata headers = new Metadata(); |
| |
| // Channel stat bumped when ClientCall.start() called |
| assertEquals(0, getStats(oobChannel).callsStarted); |
| call.start(mockCallListener, headers); |
| assertEquals(1, getStats(oobChannel).callsStarted); |
| |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) |
| .thenReturn(mockStream); |
| |
| // subchannel stat bumped when call gets assigned to it |
| assertEquals(0, getStats(oobSubchannel).callsStarted); |
| transportListener.transportReady(); |
| callExecutor.runDueTasks(); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| assertEquals(1, getStats(oobSubchannel).callsStarted); |
| |
| ClientStreamListener streamListener = streamListenerCaptor.getValue(); |
| call.halfClose(); |
| |
| // closing stream listener affects subchannel stats immediately |
| assertEquals(0, getStats(oobSubchannel).callsSucceeded); |
| assertEquals(0, getStats(oobSubchannel).callsFailed); |
| streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); |
| if (success) { |
| assertEquals(1, getStats(oobSubchannel).callsSucceeded); |
| assertEquals(0, getStats(oobSubchannel).callsFailed); |
| } else { |
| assertEquals(0, getStats(oobSubchannel).callsSucceeded); |
| assertEquals(1, getStats(oobSubchannel).callsFailed); |
| } |
| |
| // channel stats bumped when the ClientCall.Listener is notified |
| assertEquals(0, getStats(oobChannel).callsSucceeded); |
| assertEquals(0, getStats(oobChannel).callsFailed); |
| callExecutor.runDueTasks(); |
| if (success) { |
| assertEquals(1, getStats(oobChannel).callsSucceeded); |
| assertEquals(0, getStats(oobChannel).callsFailed); |
| } else { |
| assertEquals(0, getStats(oobChannel).callsSucceeded); |
| assertEquals(1, getStats(oobChannel).callsFailed); |
| } |
| // oob channel is separate from the original channel |
| assertEquals(0, getStats(channel).callsSucceeded); |
| assertEquals(0, getStats(channel).callsFailed); |
| } |
| |
| @Test |
| public void channelsAndSubchannels_oob_instrumented_name() throws Exception { |
| createChannel(); |
| |
| String authority = "oobauthority"; |
| OobChannel oobChannel = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), authority); |
| assertEquals(authority, getStats(oobChannel).target); |
| } |
| |
| @Test |
| public void channelsAndSubchannels_oob_instrumented_state() throws Exception { |
| createChannel(); |
| |
| OobChannel oobChannel = (OobChannel) helper.createOobChannel( |
| Collections.singletonList(addressGroup), "oobauthority"); |
| assertEquals(IDLE, getStats(oobChannel).state); |
| |
| oobChannel.getSubchannel().requestConnection(); |
| assertEquals(CONNECTING, getStats(oobChannel).state); |
| |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ManagedClientTransport.Listener transportListener = transportInfo.listener; |
| |
| transportListener.transportReady(); |
| assertEquals(READY, getStats(oobChannel).state); |
| |
| // oobchannel state is separate from the ManagedChannel |
| assertEquals(IDLE, getStats(channel).state); |
| channel.shutdownNow(); |
| assertEquals(SHUTDOWN, getStats(channel).state); |
| assertEquals(SHUTDOWN, getStats(oobChannel).state); |
| } |
| |
| @Test |
| public void binaryLogInstalled() throws Exception { |
| final SettableFuture<Boolean> intercepted = SettableFuture.create(); |
| channelBuilder.binlog = new BinaryLog() { |
| @Override |
| public void close() throws IOException { |
| // noop |
| } |
| |
| @Override |
| public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition( |
| ServerMethodDefinition<ReqT, RespT> oMethodDef) { |
| return oMethodDef; |
| } |
| |
| @Override |
| public Channel wrapChannel(Channel channel) { |
| return ClientInterceptors.intercept(channel, |
| new ClientInterceptor() { |
| @Override |
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( |
| MethodDescriptor<ReqT, RespT> method, |
| CallOptions callOptions, |
| Channel next) { |
| intercepted.set(true); |
| return next.newCall(method, callOptions); |
| } |
| }); |
| } |
| }; |
| |
| createChannel(); |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| assertTrue(intercepted.get()); |
| } |
| |
| @Test |
| public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() { |
| Map<String, Object> retryPolicy = new HashMap<>(); |
| retryPolicy.put("maxAttempts", 3D); |
| retryPolicy.put("initialBackoff", "10s"); |
| retryPolicy.put("maxBackoff", "30s"); |
| retryPolicy.put("backoffMultiplier", 2D); |
| retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE")); |
| Map<String, Object> methodConfig = new HashMap<>(); |
| Map<String, Object> name = new HashMap<>(); |
| name.put("service", "service"); |
| methodConfig.put("name", Arrays.<Object>asList(name)); |
| methodConfig.put("retryPolicy", retryPolicy); |
| Map<String, Object> rawServiceConfig = new HashMap<>(); |
| rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig)); |
| |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channelBuilder.executor(MoreExecutors.directExecutor()); |
| channelBuilder.enableRetry(); |
| RetriableStream.setRandom( |
| // not random |
| new Random() { |
| @Override |
| public double nextDouble() { |
| return 1D; // fake random |
| } |
| }); |
| |
| requestConnection = false; |
| createChannel(); |
| |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| verify(mockLoadBalancer).handleResolvedAddresses( |
| ResolvedAddresses.newBuilder() |
| .setAddresses(nameResolverFactory.servers) |
| .build()); |
| |
| // simulating request connection and then transport ready after resolved address |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientStream mockStream2 = mock(ClientStream.class); |
| when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream).thenReturn(mockStream2); |
| transportInfo.listener.transportReady(); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| ArgumentCaptor<ClientStreamListener> streamListenerCaptor = |
| ArgumentCaptor.forClass(ClientStreamListener.class); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| assertThat(timer.getPendingTasks()).isEmpty(); |
| |
| // trigger retry |
| streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| |
| // in backoff |
| timer.forwardTime(5, TimeUnit.SECONDS); |
| assertThat(timer.getPendingTasks()).hasSize(1); |
| verify(mockStream2, never()).start(any(ClientStreamListener.class)); |
| |
| // shutdown during backoff period |
| channel.shutdown(); |
| |
| assertThat(timer.getPendingTasks()).hasSize(1); |
| verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); |
| |
| ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); |
| call2.start(mockCallListener2, new Metadata()); |
| |
| ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); |
| verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class)); |
| assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription()); |
| |
| // backoff ends |
| timer.forwardTime(5, TimeUnit.SECONDS); |
| assertThat(timer.getPendingTasks()).isEmpty(); |
| verify(mockStream2).start(streamListenerCaptor.capture()); |
| verify(mockLoadBalancer, never()).shutdown(); |
| assertFalse( |
| "channel.isTerminated() is expected to be false but was true", |
| channel.isTerminated()); |
| |
| streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata()); |
| verify(mockLoadBalancer).shutdown(); |
| // simulating the shutdown of load balancer triggers the shutdown of subchannel |
| shutdownSafely(helper, subchannel); |
| transportInfo.listener.transportShutdown(Status.INTERNAL); |
| transportInfo.listener.transportTerminated(); // simulating transport terminated |
| assertTrue( |
| "channel.isTerminated() is expected to be true but was false", |
| channel.isTerminated()); |
| } |
| |
| @Test |
| public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallShouldFail() { |
| Map<String, Object> hedgingPolicy = new HashMap<>(); |
| hedgingPolicy.put("maxAttempts", 3D); |
| hedgingPolicy.put("hedgingDelay", "10s"); |
| hedgingPolicy.put("nonFatalStatusCodes", Arrays.<Object>asList("UNAVAILABLE")); |
| Map<String, Object> methodConfig = new HashMap<>(); |
| Map<String, Object> name = new HashMap<>(); |
| name.put("service", "service"); |
| methodConfig.put("name", Arrays.<Object>asList(name)); |
| methodConfig.put("hedgingPolicy", hedgingPolicy); |
| Map<String, Object> rawServiceConfig = new HashMap<>(); |
| rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig)); |
| |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channelBuilder.executor(MoreExecutors.directExecutor()); |
| channelBuilder.enableRetry(); |
| |
| requestConnection = false; |
| createChannel(); |
| |
| ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); |
| call.start(mockCallListener, new Metadata()); |
| ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); |
| verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); |
| helper = helperCaptor.getValue(); |
| verify(mockLoadBalancer).handleResolvedAddresses( |
| ResolvedAddresses.newBuilder() |
| .setAddresses(nameResolverFactory.servers) |
| .build()); |
| |
| // simulating request connection and then transport ready after resolved address |
| Subchannel subchannel = |
| createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(subchannel)); |
| requestConnectionSafely(helper, subchannel); |
| MockClientTransportInfo transportInfo = transports.poll(); |
| ConnectionClientTransport mockTransport = transportInfo.transport; |
| ClientStream mockStream = mock(ClientStream.class); |
| ClientStream mockStream2 = mock(ClientStream.class); |
| when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) |
| .thenReturn(mockStream).thenReturn(mockStream2); |
| transportInfo.listener.transportReady(); |
| updateBalancingStateSafely(helper, READY, mockPicker); |
| |
| ArgumentCaptor<ClientStreamListener> streamListenerCaptor = |
| ArgumentCaptor.forClass(ClientStreamListener.class); |
| verify(mockStream).start(streamListenerCaptor.capture()); |
| |
| // in hedging delay backoff |
| timer.forwardTime(5, TimeUnit.SECONDS); |
| assertThat(timer.numPendingTasks()).isEqualTo(1); |
| // first hedge fails |
| streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| verify(mockStream2, never()).start(any(ClientStreamListener.class)); |
| |
| // shutdown during backoff period |
| channel.shutdown(); |
| |
| assertThat(timer.numPendingTasks()).isEqualTo(1); |
| verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); |
| |
| ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); |
| call2.start(mockCallListener2, new Metadata()); |
| |
| ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); |
| verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class)); |
| assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription()); |
| |
| // backoff ends |
| timer.forwardTime(5, TimeUnit.SECONDS); |
| assertThat(timer.numPendingTasks()).isEqualTo(1); |
| verify(mockStream2).start(streamListenerCaptor.capture()); |
| verify(mockLoadBalancer, never()).shutdown(); |
| assertFalse( |
| "channel.isTerminated() is expected to be false but was true", |
| channel.isTerminated()); |
| |
| streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata()); |
| assertThat(timer.numPendingTasks()).isEqualTo(0); |
| verify(mockLoadBalancer).shutdown(); |
| // simulating the shutdown of load balancer triggers the shutdown of subchannel |
| shutdownSafely(helper, subchannel); |
| // simulating transport shutdown & terminated |
| transportInfo.listener.transportShutdown(Status.INTERNAL); |
| transportInfo.listener.transportTerminated(); |
| assertTrue( |
| "channel.isTerminated() is expected to be true but was false", |
| channel.isTerminated()); |
| } |
| |
| @Test |
| public void badServiceConfigIsRecoverable() throws Exception { |
| final List<EquivalentAddressGroup> addresses = |
| ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {})); |
| final class FakeNameResolver extends NameResolver { |
| Listener2 listener; |
| |
| @Override |
| public String getServiceAuthority() { |
| return "also fake"; |
| } |
| |
| @Override |
| public void start(Listener2 listener) { |
| this.listener = listener; |
| listener.onResult( |
| ResolutionResult.newBuilder() |
| .setAddresses(addresses) |
| .setServiceConfig( |
| ConfigOrError.fromError( |
| Status.INTERNAL.withDescription("kaboom is invalid"))) |
| .build()); |
| } |
| |
| @Override |
| public void shutdown() {} |
| } |
| |
| final class FakeNameResolverFactory2 extends NameResolver.Factory { |
| FakeNameResolver resolver; |
| |
| @Nullable |
| @Override |
| public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { |
| return (resolver = new FakeNameResolver()); |
| } |
| |
| @Override |
| public String getDefaultScheme() { |
| return "fake"; |
| } |
| } |
| |
| FakeNameResolverFactory2 factory = new FakeNameResolverFactory2(); |
| |
| ManagedChannelImplBuilder customBuilder = new ManagedChannelImplBuilder(TARGET, |
| new ClientTransportFactoryBuilder() { |
| @Override |
| public ClientTransportFactory buildClientTransportFactory() { |
| return mockTransportFactory; |
| } |
| }, |
| null); |
| customBuilder.executorPool = executorPool; |
| customBuilder.channelz = channelz; |
| ManagedChannel mychannel = customBuilder.nameResolverFactory(factory).build(); |
| |
| ClientCall<Void, Void> call1 = |
| mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT); |
| ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null); |
| executor.runDueTasks(); |
| try { |
| future1.get(1, TimeUnit.SECONDS); |
| Assert.fail(); |
| } catch (ExecutionException e) { |
| assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom"); |
| } |
| |
| // ok the service config is bad, let's fix it. |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}"); |
| Object fakeLbConfig = new Object(); |
| PolicySelection lbConfigs = |
| new PolicySelection( |
| mockLoadBalancerProvider, fakeLbConfig); |
| mockLoadBalancerProvider.parseLoadBalancingPolicyConfig(rawServiceConfig); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, lbConfigs); |
| factory.resolver.listener.onResult( |
| ResolutionResult.newBuilder() |
| .setAddresses(addresses) |
| .setServiceConfig(ConfigOrError.fromConfig(managedChannelServiceConfig)) |
| .build()); |
| |
| ClientCall<Void, Void> call2 = mychannel.newCall( |
| TestMethodDescriptors.voidMethod(), |
| CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS)); |
| ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null); |
| |
| timer.forwardTime(1234, TimeUnit.SECONDS); |
| |
| executor.runDueTasks(); |
| try { |
| future2.get(); |
| Assert.fail(); |
| } catch (ExecutionException e) { |
| assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline"); |
| } |
| |
| mychannel.shutdownNow(); |
| } |
| |
| @Test |
| public void nameResolverArgsPropagation() { |
| final AtomicReference<NameResolver.Args> capturedArgs = new AtomicReference<>(); |
| final NameResolver noopResolver = new NameResolver() { |
| @Override |
| public String getServiceAuthority() { |
| return "fake-authority"; |
| } |
| |
| @Override |
| public void start(Listener2 listener) { |
| } |
| |
| @Override |
| public void shutdown() {} |
| }; |
| ProxyDetector neverProxy = new ProxyDetector() { |
| @Override |
| public ProxiedSocketAddress proxyFor(SocketAddress targetAddress) { |
| return null; |
| } |
| }; |
| NameResolver.Factory factory = new NameResolver.Factory() { |
| @Override |
| public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { |
| capturedArgs.set(args); |
| return noopResolver; |
| } |
| |
| @Override |
| public String getDefaultScheme() { |
| return "fakescheme"; |
| } |
| }; |
| channelBuilder.nameResolverFactory(factory).proxyDetector(neverProxy); |
| createChannel(); |
| |
| NameResolver.Args args = capturedArgs.get(); |
| assertThat(args).isNotNull(); |
| assertThat(args.getDefaultPort()).isEqualTo(DEFAULT_PORT); |
| assertThat(args.getProxyDetector()).isSameInstanceAs(neverProxy); |
| |
| verify(offloadExecutor, never()).execute(any(Runnable.class)); |
| args.getOffloadExecutor() |
| .execute( |
| new Runnable() { |
| @Override |
| public void run() {} |
| }); |
| verify(offloadExecutor, times(1)).execute(any(Runnable.class)); |
| } |
| |
| @Test |
| public void getAuthorityAfterShutdown() throws Exception { |
| createChannel(); |
| assertEquals(SERVICE_NAME, channel.authority()); |
| channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS); |
| assertEquals(SERVICE_NAME, channel.authority()); |
| } |
| |
| @Test |
| public void nameResolverHelper_emptyConfigSucceeds() { |
| boolean retryEnabled = false; |
| int maxRetryAttemptsLimit = 2; |
| int maxHedgedAttemptsLimit = 3; |
| AutoConfiguredLoadBalancerFactory autoConfiguredLoadBalancerFactory = |
| new AutoConfiguredLoadBalancerFactory("pick_first"); |
| |
| ScParser parser = new ScParser( |
| retryEnabled, |
| maxRetryAttemptsLimit, |
| maxHedgedAttemptsLimit, |
| autoConfiguredLoadBalancerFactory); |
| |
| ConfigOrError coe = parser.parseServiceConfig(ImmutableMap.<String, Object>of()); |
| |
| assertThat(coe.getError()).isNull(); |
| ManagedChannelServiceConfig cfg = (ManagedChannelServiceConfig) coe.getConfig(); |
| assertThat(cfg.getMethodConfig(method)).isEqualTo( |
| ManagedChannelServiceConfig.empty().getMethodConfig(method)); |
| } |
| |
| @Test |
| public void nameResolverHelper_badConfigFails() { |
| boolean retryEnabled = false; |
| int maxRetryAttemptsLimit = 2; |
| int maxHedgedAttemptsLimit = 3; |
| AutoConfiguredLoadBalancerFactory autoConfiguredLoadBalancerFactory = |
| new AutoConfiguredLoadBalancerFactory("pick_first"); |
| |
| ScParser parser = new ScParser( |
| retryEnabled, |
| maxRetryAttemptsLimit, |
| maxHedgedAttemptsLimit, |
| autoConfiguredLoadBalancerFactory); |
| |
| ConfigOrError coe = |
| parser.parseServiceConfig(ImmutableMap.<String, Object>of("methodConfig", "bogus")); |
| |
| assertThat(coe.getError()).isNotNull(); |
| assertThat(coe.getError().getCode()).isEqualTo(Code.UNKNOWN); |
| assertThat(coe.getError().getDescription()).contains("failed to parse service config"); |
| assertThat(coe.getError().getCause()).isInstanceOf(ClassCastException.class); |
| } |
| |
| @Test |
| public void nameResolverHelper_noConfigChosen() { |
| boolean retryEnabled = false; |
| int maxRetryAttemptsLimit = 2; |
| int maxHedgedAttemptsLimit = 3; |
| AutoConfiguredLoadBalancerFactory autoConfiguredLoadBalancerFactory = |
| new AutoConfiguredLoadBalancerFactory("pick_first"); |
| |
| ScParser parser = new ScParser( |
| retryEnabled, |
| maxRetryAttemptsLimit, |
| maxHedgedAttemptsLimit, |
| autoConfiguredLoadBalancerFactory); |
| |
| ConfigOrError coe = |
| parser.parseServiceConfig(ImmutableMap.of("loadBalancingConfig", ImmutableList.of())); |
| |
| assertThat(coe.getError()).isNull(); |
| ManagedChannelServiceConfig cfg = (ManagedChannelServiceConfig) coe.getConfig(); |
| assertThat(cfg.getLoadBalancingConfig()).isNull(); |
| } |
| |
| @Test |
| public void disableServiceConfigLookUp_noDefaultConfig() throws Exception { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channelBuilder.disableServiceConfigLookUp(); |
| |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| nameResolverFactory.nextAttributes.set( |
| Attributes.newBuilder() |
| .set(InternalConfigSelector.KEY, mock(InternalConfigSelector.class)) |
| .build()); |
| |
| createChannel(); |
| |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| assertThat(resultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY)).isNull(); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void disableServiceConfigLookUp_withDefaultConfig() throws Exception { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channelBuilder.disableServiceConfigLookUp(); |
| Map<String, Object> defaultServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| channelBuilder.defaultServiceConfig(defaultServiceConfig); |
| |
| Map<String, Object> rawServiceConfig = new HashMap<>(); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| nameResolverFactory.nextAttributes.set( |
| Attributes.newBuilder() |
| .set(InternalConfigSelector.KEY, mock(InternalConfigSelector.class)) |
| .build()); |
| |
| createChannel(); |
| |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| assertThat(resultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY)).isNull(); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void enableServiceConfigLookUp_noDefaultConfig() throws Exception { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| |
| createChannel(); |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| |
| // new config |
| rawServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":false}]}"); |
| managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| nameResolverFactory.allResolved(); |
| |
| resultCaptor = ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void enableServiceConfigLookUp_withDefaultConfig() throws Exception { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| Map<String, Object> defaultServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| channelBuilder.defaultServiceConfig(defaultServiceConfig); |
| |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService2\"}]," |
| + "\"waitForReady\":false}]}"); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| |
| createChannel(); |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void enableServiceConfigLookUp_resolverReturnsNoConfig_withDefaultConfig() |
| throws Exception { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| Map<String, Object> defaultServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| channelBuilder.defaultServiceConfig(defaultServiceConfig); |
| |
| nameResolverFactory.nextConfigOrError.set(null); |
| |
| createChannel(); |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void enableServiceConfigLookUp_resolverReturnsNoConfig_noDefaultConfig() { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| |
| Map<String, Object> rawServiceConfig = Collections.emptyMap(); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| |
| createChannel(); |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAddresses()).containsExactly(addressGroup); |
| verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void useDefaultImmediatelyIfDisableLookUp() throws Exception { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| channelBuilder.disableServiceConfigLookUp(); |
| Map<String, Object> defaultServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| channelBuilder.defaultServiceConfig(defaultServiceConfig); |
| requestConnection = false; |
| channelBuilder.maxTraceEvents(10); |
| |
| createChannel(); |
| |
| int size = getStats(channel).channelTrace.events.size(); |
| assertThat(getStats(channel).channelTrace.events.get(size - 1)) |
| .isEqualTo(new ChannelTrace.Event.Builder() |
| .setDescription("Service config look-up disabled, using default service config") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void notUseDefaultImmediatelyIfEnableLookUp() throws Exception { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(ImmutableList.of(addressGroup)).build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| Map<String, Object> defaultServiceConfig = |
| parseConfig("{\"methodConfig\":[{" |
| + "\"name\":[{\"service\":\"SimpleService1\"}]," |
| + "\"waitForReady\":true}]}"); |
| channelBuilder.defaultServiceConfig(defaultServiceConfig); |
| requestConnection = false; |
| channelBuilder.maxTraceEvents(10); |
| |
| createChannel(); |
| |
| int size = getStats(channel).channelTrace.events.size(); |
| assertThat(getStats(channel).channelTrace.events.get(size - 1)) |
| .isNotEqualTo(new ChannelTrace.Event.Builder() |
| .setDescription("Using default service config") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timer.getTicker().read()) |
| .build()); |
| } |
| |
| @Test |
| public void healthCheckingConfigPropagated() throws Exception { |
| LoadBalancerRegistry.getDefaultRegistry().register(mockLoadBalancerProvider); |
| try { |
| FakeNameResolverFactory nameResolverFactory = |
| new FakeNameResolverFactory.Builder(expectedUri) |
| .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) |
| .build(); |
| channelBuilder.nameResolverFactory(nameResolverFactory); |
| |
| Map<String, Object> rawServiceConfig = |
| parseConfig("{\"healthCheckConfig\": {\"serviceName\": \"service1\"}}"); |
| ManagedChannelServiceConfig managedChannelServiceConfig = |
| createManagedChannelServiceConfig(rawServiceConfig, null); |
| nameResolverFactory.nextConfigOrError.set( |
| ConfigOrError.fromConfig(managedChannelServiceConfig)); |
| |
| createChannel(); |
| |
| ArgumentCaptor<ResolvedAddresses> resultCaptor = |
| ArgumentCaptor.forClass(ResolvedAddresses.class); |
| verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture()); |
| assertThat(resultCaptor.getValue().getAttributes() |
| .get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG)) |
| .containsExactly("serviceName", "service1"); |
| } finally { |
| LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); |
| } |
| } |
| |
| @Test |
| public void createResolvingOobChannel() throws Exception { |
| String oobTarget = "fake://second.example.com"; |
| URI oobUri = new URI(oobTarget); |
| channelBuilder |
| .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri, oobUri).build()); |
| createChannel(); |
| |
| ManagedChannel resolvedOobChannel = null; |
| try { |
| resolvedOobChannel = helper.createResolvingOobChannel(oobTarget); |
| |
| assertWithMessage("resolving oob channel should have same authority") |
| .that(resolvedOobChannel.authority()) |
| .isEqualTo(channel.authority()); |
| } finally { |
| if (resolvedOobChannel != null) { |
| resolvedOobChannel.shutdownNow(); |
| } |
| } |
| } |
| |
| private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { |
| @Override |
| public BackoffPolicy get() { |
| return new BackoffPolicy() { |
| int multiplier = 1; |
| |
| @Override |
| public long nextBackoffNanos() { |
| return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++; |
| } |
| }; |
| } |
| } |
| |
| private static final class FakeNameResolverFactory extends NameResolver.Factory { |
| final List<URI> expectedUris; |
| final List<EquivalentAddressGroup> servers; |
| final boolean resolvedAtStart; |
| final Status error; |
| final ArrayList<FakeNameResolverFactory.FakeNameResolver> resolvers = new ArrayList<>(); |
| final AtomicReference<ConfigOrError> nextConfigOrError = new AtomicReference<>(); |
| final AtomicReference<Attributes> nextAttributes = new AtomicReference<>(Attributes.EMPTY); |
| |
| FakeNameResolverFactory( |
| List<URI> expectedUris, |
| List<EquivalentAddressGroup> servers, |
| boolean resolvedAtStart, |
| Status error) { |
| this.expectedUris = expectedUris; |
| this.servers = servers; |
| this.resolvedAtStart = resolvedAtStart; |
| this.error = error; |
| } |
| |
| @Override |
| public NameResolver newNameResolver(final URI targetUri, NameResolver.Args args) { |
| if (!expectedUris.contains(targetUri)) { |
| return null; |
| } |
| assertEquals(DEFAULT_PORT, args.getDefaultPort()); |
| FakeNameResolverFactory.FakeNameResolver resolver = |
| new FakeNameResolverFactory.FakeNameResolver(targetUri, error); |
| resolvers.add(resolver); |
| return resolver; |
| } |
| |
| @Override |
| public String getDefaultScheme() { |
| return "fake"; |
| } |
| |
| void allResolved() { |
| for (FakeNameResolverFactory.FakeNameResolver resolver : resolvers) { |
| resolver.resolved(); |
| } |
| } |
| |
| final class FakeNameResolver extends NameResolver { |
| final URI targetUri; |
| Listener2 listener; |
| boolean shutdown; |
| int refreshCalled; |
| Status error; |
| |
| FakeNameResolver(URI targetUri, Status error) { |
| this.targetUri = targetUri; |
| this.error = error; |
| } |
| |
| @Override public String getServiceAuthority() { |
| return targetUri.getAuthority(); |
| } |
| |
| @Override public void start(Listener2 listener) { |
| this.listener = listener; |
| if (resolvedAtStart) { |
| resolved(); |
| } |
| } |
| |
| @Override public void refresh() { |
| refreshCalled++; |
| resolved(); |
| } |
| |
| void resolved() { |
| if (error != null) { |
| listener.onError(error); |
| return; |
| } |
| ResolutionResult.Builder builder = |
| ResolutionResult.newBuilder() |
| .setAddresses(servers) |
| .setAttributes(nextAttributes.get()); |
| ConfigOrError configOrError = nextConfigOrError.get(); |
| if (configOrError != null) { |
| builder.setServiceConfig(configOrError); |
| } |
| listener.onResult(builder.build()); |
| } |
| |
| @Override public void shutdown() { |
| shutdown = true; |
| } |
| |
| @Override |
| public String toString() { |
| return "FakeNameResolver"; |
| } |
| } |
| |
| static final class Builder { |
| List<URI> expectedUris; |
| List<EquivalentAddressGroup> servers = ImmutableList.of(); |
| boolean resolvedAtStart = true; |
| Status error = null; |
| |
| Builder(URI... expectedUris) { |
| this.expectedUris = Collections.unmodifiableList(Arrays.asList(expectedUris)); |
| } |
| |
| FakeNameResolverFactory.Builder setServers(List<EquivalentAddressGroup> servers) { |
| this.servers = servers; |
| return this; |
| } |
| |
| FakeNameResolverFactory.Builder setResolvedAtStart(boolean resolvedAtStart) { |
| this.resolvedAtStart = resolvedAtStart; |
| return this; |
| } |
| |
| FakeNameResolverFactory.Builder setError(Status error) { |
| this.error = error; |
| return this; |
| } |
| |
| FakeNameResolverFactory build() { |
| return new FakeNameResolverFactory(expectedUris, servers, resolvedAtStart, error); |
| } |
| } |
| } |
| |
| private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception { |
| return subchannel.getInstrumentedInternalSubchannel().getStats().get(); |
| } |
| |
| private static ChannelStats getStats( |
| InternalInstrumented<ChannelStats> instrumented) throws Exception { |
| return instrumented.getStats().get(); |
| } |
| |
| private FakeClock.ScheduledTask getNameResolverRefresh() { |
| return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null); |
| } |
| |
| // Helper methods to call methods from SynchronizationContext |
| private static Subchannel createSubchannelSafely( |
| final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs, |
| final SubchannelStateListener stateListener) { |
| final AtomicReference<Subchannel> resultCapture = new AtomicReference<>(); |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| Subchannel s = helper.createSubchannel(CreateSubchannelArgs.newBuilder() |
| .setAddresses(addressGroup) |
| .setAttributes(attrs) |
| .build()); |
| s.start(stateListener); |
| resultCapture.set(s); |
| } |
| }); |
| return resultCapture.get(); |
| } |
| |
| private static Subchannel createUnstartedSubchannel( |
| final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) { |
| final AtomicReference<Subchannel> resultCapture = new AtomicReference<>(); |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| Subchannel s = helper.createSubchannel(CreateSubchannelArgs.newBuilder() |
| .setAddresses(addressGroup) |
| .setAttributes(attrs) |
| .build()); |
| resultCapture.set(s); |
| } |
| }); |
| return resultCapture.get(); |
| } |
| |
| private static void requestConnectionSafely(Helper helper, final Subchannel subchannel) { |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| subchannel.requestConnection(); |
| } |
| }); |
| } |
| |
| private static void updateBalancingStateSafely( |
| final Helper helper, final ConnectivityState state, final SubchannelPicker picker) { |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| helper.updateBalancingState(state, picker); |
| } |
| }); |
| } |
| |
| private static void refreshNameResolutionSafely(final Helper helper) { |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| helper.refreshNameResolution(); |
| } |
| }); |
| } |
| |
| private static void updateAddressesSafely( |
| Helper helper, final Subchannel subchannel, final List<EquivalentAddressGroup> addrs) { |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| subchannel.updateAddresses(addrs); |
| } |
| }); |
| } |
| |
| private static void shutdownSafely( |
| final Helper helper, final Subchannel subchannel) { |
| helper.getSynchronizationContext().execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| subchannel.shutdown(); |
| } |
| }); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static Map<String, Object> parseConfig(String json) throws Exception { |
| return (Map<String, Object>) JsonParser.parse(json); |
| } |
| |
| private static ManagedChannelServiceConfig createManagedChannelServiceConfig( |
| Map<String, Object> rawServiceConfig, PolicySelection policySelection) { |
| // Provides dummy variable for retry related params (not used in this test class) |
| return ManagedChannelServiceConfig |
| .fromServiceConfig(rawServiceConfig, true, 3, 4, policySelection); |
| } |
| } |