| /* |
| * Copyright 2018 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.protobuf.services; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.common.truth.Truth.assertThat; |
| import static io.grpc.ConnectivityState.CONNECTING; |
| import static io.grpc.ConnectivityState.IDLE; |
| import static io.grpc.ConnectivityState.READY; |
| import static io.grpc.ConnectivityState.SHUTDOWN; |
| import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; |
| import static org.mockito.AdditionalAnswers.delegatesTo; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.ArgumentMatchers.same; |
| import static org.mockito.Mockito.atLeast; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.inOrder; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoInteractions; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.common.util.concurrent.MoreExecutors; |
| import io.grpc.Attributes; |
| import io.grpc.Channel; |
| import io.grpc.ChannelLogger; |
| import io.grpc.ConnectivityState; |
| import io.grpc.ConnectivityStateInfo; |
| import io.grpc.Context; |
| import io.grpc.Context.CancellationListener; |
| import io.grpc.EquivalentAddressGroup; |
| import io.grpc.LoadBalancer; |
| import io.grpc.LoadBalancer.CreateSubchannelArgs; |
| import io.grpc.LoadBalancer.Helper; |
| import io.grpc.LoadBalancer.ResolvedAddresses; |
| import io.grpc.LoadBalancer.Subchannel; |
| import io.grpc.LoadBalancer.SubchannelPicker; |
| import io.grpc.LoadBalancer.SubchannelStateListener; |
| import io.grpc.ManagedChannel; |
| import io.grpc.Server; |
| import io.grpc.Status; |
| import io.grpc.Status.Code; |
| import io.grpc.SynchronizationContext; |
| import io.grpc.health.v1.HealthCheckRequest; |
| import io.grpc.health.v1.HealthCheckResponse; |
| import io.grpc.health.v1.HealthCheckResponse.ServingStatus; |
| import io.grpc.health.v1.HealthGrpc; |
| import io.grpc.inprocess.InProcessChannelBuilder; |
| import io.grpc.inprocess.InProcessServerBuilder; |
| import io.grpc.internal.BackoffPolicy; |
| import io.grpc.internal.FakeClock; |
| import io.grpc.internal.ServiceConfigUtil; |
| import io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory.SubchannelImpl; |
| import io.grpc.stub.StreamObserver; |
| import java.net.SocketAddress; |
| import java.text.MessageFormat; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.InOrder; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| import org.mockito.hamcrest.MockitoHamcrest; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| /** Tests for {@link HealthCheckingLoadBalancerFactory}. */ |
| @RunWith(JUnit4.class) |
| public class HealthCheckingLoadBalancerFactoryTest { |
| private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY = |
| Attributes.Key.create("subchannel-attr-for-test"); |
| |
| // We use in-process channels for Subchannel.asChannel(), so that we make sure we are making RPCs |
| // correctly. Mocking Channel and ClientCall is a bad idea because it can easily be done wrong. |
| // Each Channel goes to a different server, so that we can verify the health check activity on |
| // each Subchannel. |
| private static final int NUM_SUBCHANNELS = 2; |
| private final EquivalentAddressGroup[] eags = new EquivalentAddressGroup[NUM_SUBCHANNELS]; |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS]; |
| private final SubchannelStateListener[] mockStateListeners = |
| new SubchannelStateListener[NUM_SUBCHANNELS]; |
| private List<EquivalentAddressGroup> resolvedAddressList; |
| private final FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS]; |
| private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS]; |
| private final Server[] servers = new Server[NUM_SUBCHANNELS]; |
| private final HealthImpl[] healthImpls = new HealthImpl[NUM_SUBCHANNELS]; |
| |
| private final SynchronizationContext syncContext = new SynchronizationContext( |
| new Thread.UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| throw new AssertionError(e); |
| } |
| }); |
| private final FakeClock clock = new FakeClock(); |
| private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper())); |
| // The helper seen by the origLb |
| private Helper wrappedHelper; |
| private final LoadBalancer.Factory origLbFactory = |
| mock(LoadBalancer.Factory.class, delegatesTo(new LoadBalancer.Factory() { |
| @Override |
| public LoadBalancer newLoadBalancer(Helper helper) { |
| checkState(wrappedHelper == null, "LoadBalancer already created"); |
| wrappedHelper = helper; |
| return origLb; |
| } |
| })); |
| |
| @Mock |
| private LoadBalancer origLb; |
| private LoadBalancer hcLb; |
| @Captor |
| ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor; |
| @Mock |
| private BackoffPolicy.Provider backoffPolicyProvider; |
| @Mock |
| private BackoffPolicy backoffPolicy1; |
| @Mock |
| private BackoffPolicy backoffPolicy2; |
| |
| private HealthCheckingLoadBalancerFactory hcLbFactory; |
| private LoadBalancer hcLbEventDelivery; |
| |
| @Before |
| @SuppressWarnings("unchecked") |
| public void setup() throws Exception { |
| MockitoAnnotations.initMocks(this); |
| |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| HealthImpl healthImpl = new HealthImpl(); |
| healthImpls[i] = healthImpl; |
| Server server = |
| InProcessServerBuilder.forName("health-check-test-" + i) |
| .addService(healthImpl).directExecutor().build().start(); |
| servers[i] = server; |
| ManagedChannel channel = |
| InProcessChannelBuilder.forName("health-check-test-" + i).directExecutor().build(); |
| channels[i] = channel; |
| |
| EquivalentAddressGroup eag = |
| new EquivalentAddressGroup(new FakeSocketAddress("address-" + i)); |
| eags[i] = eag; |
| List<EquivalentAddressGroup> eagList = Arrays.asList(eag); |
| eagLists[i] = eagList; |
| mockStateListeners[i] = mock(SubchannelStateListener.class); |
| } |
| resolvedAddressList = Arrays.asList(eags); |
| |
| when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); |
| when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L, 31L); |
| when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L, 32L); |
| |
| hcLbFactory = new HealthCheckingLoadBalancerFactory( |
| origLbFactory, backoffPolicyProvider, |
| clock.getStopwatchSupplier()); |
| hcLb = hcLbFactory.newLoadBalancer(origHelper); |
| // Make sure all calls into the hcLb is from the syncContext |
| hcLbEventDelivery = new LoadBalancer() { |
| // Per LoadBalancer API, no more callbacks will be called after shutdown() is called. |
| boolean shutdown; |
| |
| @Override |
| public void handleResolvedAddresses(final ResolvedAddresses resolvedAddresses) { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| if (!shutdown) { |
| hcLb.handleResolvedAddresses(resolvedAddresses); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public void handleNameResolutionError(Status error) { |
| throw new AssertionError("Not supposed to be called"); |
| } |
| |
| @Override |
| public void shutdown() { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| if (!shutdown) { |
| shutdown = true; |
| hcLb.shutdown(); |
| } |
| } |
| }); |
| } |
| }; |
| verify(origLbFactory).newLoadBalancer(any(Helper.class)); |
| } |
| |
| @After |
| public void teardown() throws Exception { |
| // All scheduled tasks have been accounted for |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| // Health-check streams are usually not closed in the tests because onSubchannelState() is |
| // faked. Force closing for clean up. |
| for (Server server : servers) { |
| server.shutdownNow(); |
| assertThat(server.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); |
| } |
| for (ManagedChannel channel : channels) { |
| channel.shutdownNow(); |
| assertThat(channel.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); |
| } |
| for (HealthImpl impl : healthImpls) { |
| assertThat(impl.checkCalled).isFalse(); |
| } |
| } |
| |
| @Test |
| public void typicalWorkflow() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("FooService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| |
| verify(origLb).handleResolvedAddresses(result); |
| verify(origHelper, atLeast(0)).getSynchronizationContext(); |
| verify(origHelper, atLeast(0)).getScheduledExecutorService(); |
| verifyNoMoreInteractions(origHelper); |
| verifyNoMoreInteractions(origLb); |
| Subchannel[] wrappedSubchannels = new Subchannel[NUM_SUBCHANNELS]; |
| |
| // Simulate that the orignal LB creates Subchannels |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| // Subchannel attributes set by origLb are correctly plumbed in |
| String subchannelAttrValue = "eag attr " + i; |
| Attributes attrs = Attributes.newBuilder() |
| .set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); |
| wrappedSubchannels[i] = createSubchannel(i, attrs); |
| assertThat(unwrap(wrappedSubchannels[i])).isSameInstanceAs(subchannels[i]); |
| verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); |
| assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); |
| assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) |
| .isEqualTo(subchannelAttrValue); |
| } |
| |
| for (int i = NUM_SUBCHANNELS - 1; i >= 0; i--) { |
| // Not starting health check until underlying Subchannel is READY |
| FakeSubchannel subchannel = subchannels[i]; |
| HealthImpl healthImpl = healthImpls[i]; |
| SubchannelStateListener mockStateListener = mockStateListeners[i]; |
| InOrder inOrder = inOrder(mockStateListener); |
| deliverSubchannelState(i, ConnectivityStateInfo.forNonError(CONNECTING)); |
| deliverSubchannelState(i, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); |
| deliverSubchannelState(i, ConnectivityStateInfo.forNonError(IDLE)); |
| |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE))); |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(IDLE))); |
| verifyNoMoreInteractions(mockStateListener); |
| |
| assertThat(subchannel.logs).isEmpty(); |
| assertThat(healthImpl.calls).isEmpty(); |
| deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY)); |
| assertThat(healthImpl.calls).hasSize(1); |
| ServerSideCall serverCall = healthImpl.calls.peek(); |
| assertThat(serverCall.request).isEqualTo(makeRequest("FooService")); |
| |
| // Starting the health check will make the Subchannel appear CONNECTING to the origLb. |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| verifyNoMoreInteractions(mockStateListener); |
| |
| assertThat(subchannel.logs).containsExactly( |
| "INFO: CONNECTING: Starting health-check for \"FooService\""); |
| subchannel.logs.clear(); |
| |
| // Simulate a series of responses. |
| for (ServingStatus servingStatus : |
| new ServingStatus[] { |
| ServingStatus.UNKNOWN, ServingStatus.NOT_SERVING, ServingStatus.SERVICE_UNKNOWN, |
| ServingStatus.SERVING, ServingStatus.NOT_SERVING, ServingStatus.SERVING}) { |
| serverCall.responseObserver.onNext(makeResponse(servingStatus)); |
| // SERVING is mapped to READY, while other statuses are mapped to TRANSIENT_FAILURE |
| if (servingStatus == ServingStatus.SERVING) { |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| assertThat(subchannel.logs).containsExactly( |
| "INFO: READY: health-check responded SERVING"); |
| } else { |
| inOrder.verify(mockStateListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check service responded " + servingStatus + " for 'FooService'")); |
| assertThat(subchannel.logs).containsExactly( |
| "INFO: TRANSIENT_FAILURE: health-check responded " + servingStatus); |
| } |
| subchannel.logs.clear(); |
| verifyNoMoreInteractions(mockStateListener); |
| } |
| } |
| |
| // origLb shuts down Subchannels |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| FakeSubchannel subchannel = subchannels[i]; |
| SubchannelStateListener mockStateListener = mockStateListeners[i]; |
| |
| ServerSideCall serverCall = healthImpls[i].calls.peek(); |
| assertThat(serverCall.cancelled).isFalse(); |
| verifyNoMoreInteractions(mockStateListener); |
| |
| assertThat(subchannels[i].isShutdown).isFalse(); |
| final Subchannel wrappedSubchannel = wrappedSubchannels[i]; |
| // Subchannel enters SHUTDOWN state as a response to shutdown(), and that will cancel the |
| // health check RPC |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| wrappedSubchannel.shutdown(); |
| } |
| }); |
| assertThat(subchannels[i].isShutdown).isTrue(); |
| assertThat(serverCall.cancelled).isTrue(); |
| verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(SHUTDOWN))); |
| assertThat(subchannel.logs).isEmpty(); |
| } |
| |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| assertThat(healthImpls[i].calls).hasSize(1); |
| } |
| |
| verifyNoInteractions(backoffPolicyProvider); |
| } |
| |
| @Test |
| public void healthCheckDisabledWhenServiceNotImplemented() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("BarService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| |
| verify(origLb).handleResolvedAddresses(result); |
| verifyNoMoreInteractions(origLb); |
| |
| // We create 2 Subchannels. One of them connects to a server that doesn't implement health check |
| for (int i = 0; i < 2; i++) { |
| createSubchannel(i, Attributes.EMPTY); |
| } |
| |
| InOrder inOrder = inOrder(mockStateListeners[0], mockStateListeners[1]); |
| |
| for (int i = 0; i < 2; i++) { |
| deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY)); |
| assertThat(healthImpls[i].calls).hasSize(1); |
| inOrder.verify(mockStateListeners[i]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| } |
| |
| ServerSideCall serverCall0 = healthImpls[0].calls.poll(); |
| ServerSideCall serverCall1 = healthImpls[1].calls.poll(); |
| |
| subchannels[0].logs.clear(); |
| // subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health |
| // checking and it'll use the original state, which is currently READY. |
| // In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell |
| // whether it's the server library or the service implementation that returned this status. |
| serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException()); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| assertThat(subchannels[0].logs).containsExactly( |
| "ERROR: Health-check disabled: " + Status.UNIMPLEMENTED, |
| "INFO: READY (no health-check)").inOrder(); |
| |
| // subchannels[1] has normal health checking |
| serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING)); |
| inOrder.verify(mockStateListeners[1]).onSubchannelState( |
| unavailableStateWithMsg("Health-check service responded NOT_SERVING for 'BarService'")); |
| |
| // Without health checking, states from underlying Subchannel are delivered directly to the mock |
| // listeners. |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(IDLE)); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(IDLE))); |
| |
| // Re-connecting on a Subchannel will reset the "disabled" flag. |
| assertThat(healthImpls[0].calls).hasSize(0); |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| assertThat(healthImpls[0].calls).hasSize(1); |
| serverCall0 = healthImpls[0].calls.poll(); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| |
| // Health check now works as normal |
| serverCall0.responseObserver.onNext(makeResponse(ServingStatus.SERVICE_UNKNOWN)); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| unavailableStateWithMsg("Health-check service responded SERVICE_UNKNOWN for 'BarService'")); |
| |
| verifyNoMoreInteractions(origLb, mockStateListeners[0], mockStateListeners[1]); |
| verifyNoInteractions(backoffPolicyProvider); |
| } |
| |
| @Test |
| public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| |
| verify(origLb).handleResolvedAddresses(result); |
| verifyNoMoreInteractions(origLb); |
| |
| FakeSubchannel subchannel = unwrap(createSubchannel(0, Attributes.EMPTY)); |
| assertThat(subchannel).isSameInstanceAs(subchannels[0]); |
| SubchannelStateListener mockListener = mockStateListeners[0]; |
| InOrder inOrder = inOrder(mockListener, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); |
| |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| HealthImpl healthImpl = healthImpls[0]; |
| assertThat(healthImpl.calls).hasSize(1); |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| |
| subchannel.logs.clear(); |
| // Server closes the health checking RPC without any response |
| healthImpl.calls.poll().responseObserver.onCompleted(); |
| |
| // which results in TRANSIENT_FAILURE |
| inOrder.verify(mockListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); |
| assertThat(subchannel.logs).containsExactly( |
| "INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.OK, |
| "DEBUG: Will retry health-check after 11 ns").inOrder(); |
| |
| // Retry with backoff is scheduled |
| inOrder.verify(backoffPolicyProvider).get(); |
| inOrder.verify(backoffPolicy1).nextBackoffNanos(); |
| assertThat(clock.getPendingTasks()).hasSize(1); |
| |
| verifyRetryAfterNanos(inOrder, mockListener, healthImpl, 11); |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| |
| subchannel.logs.clear(); |
| // Server closes the health checking RPC without any response |
| healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException()); |
| |
| // which also results in TRANSIENT_FAILURE, with a different description |
| inOrder.verify(mockListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " |
| + Status.CANCELLED + " for 'TeeService'")); |
| assertThat(subchannel.logs).containsExactly( |
| "INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.CANCELLED, |
| "DEBUG: Will retry health-check after 21 ns").inOrder(); |
| |
| // Retry with backoff |
| inOrder.verify(backoffPolicy1).nextBackoffNanos(); |
| |
| verifyRetryAfterNanos(inOrder, mockListener, healthImpl, 21); |
| |
| // Server responds this time |
| healthImpl.calls.poll().responseObserver.onNext(makeResponse(ServingStatus.SERVING)); |
| |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| verifyNoMoreInteractions(origLb, mockListener, backoffPolicyProvider, backoffPolicy1); |
| } |
| |
| @Test |
| public void serverRespondResetsBackoff() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| |
| verify(origLb).handleResolvedAddresses(result); |
| verifyNoMoreInteractions(origLb); |
| |
| SubchannelStateListener mockStateListener = mockStateListeners[0]; |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = |
| inOrder(mockStateListener, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); |
| |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| HealthImpl healthImpl = healthImpls[0]; |
| assertThat(healthImpl.calls).hasSize(1); |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| |
| // Server closes the health checking RPC without any response |
| healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException()); |
| |
| // which results in TRANSIENT_FAILURE |
| inOrder.verify(mockStateListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " |
| + Status.CANCELLED + " for 'TeeService'")); |
| |
| // Retry with backoff is scheduled |
| inOrder.verify(backoffPolicyProvider).get(); |
| inOrder.verify(backoffPolicy1).nextBackoffNanos(); |
| assertThat(clock.getPendingTasks()).hasSize(1); |
| |
| verifyRetryAfterNanos(inOrder, mockStateListener, healthImpl, 11); |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| |
| // Server responds |
| healthImpl.calls.peek().responseObserver.onNext(makeResponse(ServingStatus.SERVING)); |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| verifyNoMoreInteractions(mockStateListener); |
| |
| // then closes the stream |
| healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException()); |
| inOrder.verify(mockStateListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " |
| + Status.UNAVAILABLE + " for 'TeeService'")); |
| |
| // Because server has responded, the first retry is not subject to backoff. |
| // But the backoff policy has been reset. A new backoff policy will be used for |
| // the next backed-off retry. |
| inOrder.verify(mockStateListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| assertThat(healthImpl.calls).hasSize(1); |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // then closes the stream for this retry |
| healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException()); |
| inOrder.verify(mockStateListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " |
| + Status.UNAVAILABLE + " for 'TeeService'")); |
| |
| // New backoff policy is used |
| inOrder.verify(backoffPolicyProvider).get(); |
| // Retry with a new backoff policy |
| inOrder.verify(backoffPolicy2).nextBackoffNanos(); |
| |
| verifyRetryAfterNanos(inOrder, mockStateListener, healthImpl, 12); |
| } |
| |
| private void verifyRetryAfterNanos( |
| InOrder inOrder, SubchannelStateListener listener, HealthImpl impl, |
| long nanos) { |
| assertThat(impl.calls).isEmpty(); |
| clock.forwardNanos(nanos - 1); |
| assertThat(impl.calls).isEmpty(); |
| inOrder.verifyNoMoreInteractions(); |
| verifyNoMoreInteractions(listener); |
| clock.forwardNanos(1); |
| inOrder.verify(listener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| assertThat(impl.calls).hasSize(1); |
| } |
| |
| @Test |
| public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() { |
| // No service config, thus no health check. |
| ResolvedAddresses result1 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(Attributes.EMPTY) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| |
| verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb); |
| |
| // First, create Subchannels 0 |
| createSubchannel(0, Attributes.EMPTY); |
| |
| // No health check activity. Underlying Subchannel states are directly propagated |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| assertThat(healthImpls[0].calls).isEmpty(); |
| verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| verifyNoMoreInteractions(mockStateListeners[0]); |
| |
| // Service config enables health check |
| Attributes resolutionAttrs = attrsWithHealthCheckService("FooService"); |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| verify(origLb).handleResolvedAddresses(result2); |
| |
| // Health check started on existing Subchannel |
| assertThat(healthImpls[0].calls).hasSize(1); |
| |
| // State stays in READY, instead of switching to CONNECTING. |
| verifyNoMoreInteractions(mockStateListeners[0]); |
| |
| // Start Subchannel 1, which will have health check |
| createSubchannel(1, Attributes.EMPTY); |
| assertThat(healthImpls[1].calls).isEmpty(); |
| deliverSubchannelState(1, ConnectivityStateInfo.forNonError(READY)); |
| assertThat(healthImpls[1].calls).hasSize(1); |
| } |
| |
| @Test |
| public void serviceConfigDisablesHealthCheckWhenRpcActive() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result1 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| |
| verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb); |
| |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = inOrder(origLb, mockStateListeners[0]); |
| |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| inOrder.verifyNoMoreInteractions(); |
| HealthImpl healthImpl = healthImpls[0]; |
| assertThat(healthImpl.calls).hasSize(1); |
| ServerSideCall serverCall = healthImpl.calls.poll(); |
| assertThat(serverCall.cancelled).isFalse(); |
| |
| // NameResolver gives an update without service config, thus health check will be disabled |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(Attributes.EMPTY) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| |
| // Health check RPC cancelled. |
| assertThat(serverCall.cancelled).isTrue(); |
| // Subchannel uses original state |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| inOrder.verify(origLb).handleResolvedAddresses(result2); |
| |
| verifyNoMoreInteractions(origLb, mockStateListeners[0]); |
| assertThat(healthImpl.calls).isEmpty(); |
| } |
| |
| @Test |
| public void serviceConfigDisablesHealthCheckWhenRetryPending() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| |
| verify(origLb).handleResolvedAddresses(result); |
| verifyNoMoreInteractions(origLb); |
| |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = inOrder(origLb, mockStateListeners[0]); |
| |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| inOrder.verifyNoMoreInteractions(); |
| HealthImpl healthImpl = healthImpls[0]; |
| assertThat(healthImpl.calls).hasSize(1); |
| |
| // Server closes the stream without responding. Client in retry backoff |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| healthImpl.calls.poll().responseObserver.onCompleted(); |
| assertThat(clock.getPendingTasks()).hasSize(1); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); |
| |
| // NameResolver gives an update without service config, thus health check will be disabled |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(Attributes.EMPTY) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| |
| // Retry timer is cancelled |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| |
| // No retry was attempted |
| assertThat(healthImpl.calls).isEmpty(); |
| |
| // Subchannel uses original state |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| inOrder.verify(origLb).handleResolvedAddresses(result2); |
| |
| verifyNoMoreInteractions(origLb, mockStateListeners[0]); |
| } |
| |
| @Test |
| public void serviceConfigDisablesHealthCheckWhenRpcInactive() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result1 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| |
| verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb); |
| |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = inOrder(origLb, mockStateListeners[0]); |
| |
| // Underlying subchannel is not READY initially |
| ConnectivityStateInfo underlyingErrorState = |
| ConnectivityStateInfo.forTransientFailure( |
| Status.UNAVAILABLE.withDescription("connection refused")); |
| deliverSubchannelState(0, underlyingErrorState); |
| inOrder.verify(mockStateListeners[0]).onSubchannelState(same(underlyingErrorState)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // NameResolver gives an update without service config, thus health check will be disabled |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(Attributes.EMPTY) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| |
| inOrder.verify(origLb).handleResolvedAddresses(result2); |
| |
| // Underlying subchannel is now ready |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| |
| // Since health check is disabled, READY state is propagated directly. |
| inOrder.verify(mockStateListeners[0]).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| // and there is no health check activity. |
| assertThat(healthImpls[0].calls).isEmpty(); |
| |
| verifyNoMoreInteractions(origLb, mockStateListeners[0]); |
| } |
| |
| @Test |
| public void serviceConfigChangesServiceNameWhenRpcActive() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result1 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| |
| verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb); |
| |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| SubchannelStateListener mockListener = mockStateListeners[0]; |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = inOrder(origLb, mockListener); |
| |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| |
| HealthImpl healthImpl = healthImpls[0]; |
| assertThat(healthImpl.calls).hasSize(1); |
| ServerSideCall serverCall = healthImpl.calls.poll(); |
| assertThat(serverCall.cancelled).isFalse(); |
| assertThat(serverCall.request).isEqualTo(makeRequest("TeeService")); |
| |
| // Health check responded |
| serverCall.responseObserver.onNext(makeResponse(ServingStatus.SERVING)); |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(READY))); |
| |
| // Service config returns with the same health check name. |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| // It's delivered to origLb, but nothing else happens |
| inOrder.verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb, mockListener); |
| |
| // Service config returns a different health check name. |
| resolutionAttrs = attrsWithHealthCheckService("FooService"); |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| inOrder.verify(origLb).handleResolvedAddresses(result2); |
| |
| // Current health check RPC cancelled. |
| assertThat(serverCall.cancelled).isTrue(); |
| |
| // A second RPC is started immediately |
| assertThat(healthImpl.calls).hasSize(1); |
| serverCall = healthImpl.calls.poll(); |
| // with the new service name |
| assertThat(serverCall.request).isEqualTo(makeRequest("FooService")); |
| |
| // State stays in READY, instead of switching to CONNECTING. |
| verifyNoMoreInteractions(origLb, mockListener); |
| } |
| |
| @Test |
| public void serviceConfigChangesServiceNameWhenRetryPending() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result1 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| |
| verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb); |
| |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| SubchannelStateListener mockListener = mockStateListeners[0]; |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = inOrder(origLb, mockListener); |
| |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| |
| HealthImpl healthImpl = healthImpls[0]; |
| assertThat(healthImpl.calls).hasSize(1); |
| ServerSideCall serverCall = healthImpl.calls.poll(); |
| assertThat(serverCall.cancelled).isFalse(); |
| assertThat(serverCall.request).isEqualTo(makeRequest("TeeService")); |
| |
| // Health check stream closed without responding. Client in retry backoff. |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| serverCall.responseObserver.onCompleted(); |
| assertThat(clock.getPendingTasks()).hasSize(1); |
| assertThat(healthImpl.calls).isEmpty(); |
| inOrder.verify(mockListener).onSubchannelState( |
| unavailableStateWithMsg( |
| "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); |
| |
| // Service config returns with the same health check name. |
| |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| // It's delivered to origLb, but nothing else happens |
| inOrder.verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb, mockListener); |
| assertThat(clock.getPendingTasks()).hasSize(1); |
| assertThat(healthImpl.calls).isEmpty(); |
| |
| // Service config returns a different health check name. |
| resolutionAttrs = attrsWithHealthCheckService("FooService"); |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| |
| // Concluded CONNECTING state |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| |
| inOrder.verify(origLb).handleResolvedAddresses(result2); |
| |
| // Current retry timer cancelled |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| |
| // A second RPC is started immediately |
| assertThat(healthImpl.calls).hasSize(1); |
| serverCall = healthImpl.calls.poll(); |
| // with the new service name |
| assertThat(serverCall.request).isEqualTo(makeRequest("FooService")); |
| |
| verifyNoMoreInteractions(origLb, mockListener); |
| } |
| |
| @Test |
| public void serviceConfigChangesServiceNameWhenRpcInactive() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result1 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| |
| verify(origLb).handleResolvedAddresses(result1); |
| verifyNoMoreInteractions(origLb); |
| |
| Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); |
| SubchannelStateListener mockListener = mockStateListeners[0]; |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); |
| InOrder inOrder = inOrder(origLb, mockListener); |
| HealthImpl healthImpl = healthImpls[0]; |
| |
| // Underlying subchannel is not READY initially |
| ConnectivityStateInfo underlyingErrorState = |
| ConnectivityStateInfo.forTransientFailure( |
| Status.UNAVAILABLE.withDescription("connection refused")); |
| deliverSubchannelState(0, underlyingErrorState); |
| inOrder.verify(mockListener).onSubchannelState(same(underlyingErrorState)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // Service config returns with the same health check name. |
| hcLbEventDelivery.handleResolvedAddresses(result1); |
| // It's delivered to origLb, but nothing else happens |
| inOrder.verify(origLb).handleResolvedAddresses(result1); |
| assertThat(healthImpl.calls).isEmpty(); |
| verifyNoMoreInteractions(origLb); |
| |
| // Service config returns a different health check name. |
| resolutionAttrs = attrsWithHealthCheckService("FooService"); |
| ResolvedAddresses result2 = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result2); |
| |
| inOrder.verify(origLb).handleResolvedAddresses(result2); |
| |
| // Underlying subchannel is now ready |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| |
| // Concluded CONNECTING state |
| inOrder.verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| |
| // Health check RPC is started |
| assertThat(healthImpl.calls).hasSize(1); |
| // with the new service name |
| assertThat(healthImpl.calls.poll().request).isEqualTo(makeRequest("FooService")); |
| |
| verifyNoMoreInteractions(origLb, mockListener); |
| } |
| |
| @Test |
| public void getHealthCheckedServiceName_nullHealthCheckConfig() { |
| assertThat(ServiceConfigUtil.getHealthCheckedServiceName(null)).isNull(); |
| } |
| |
| @Test |
| public void getHealthCheckedServiceName_missingServiceName() { |
| HashMap<String, Object> hcConfig = new HashMap<>(); |
| assertThat(ServiceConfigUtil.getHealthCheckedServiceName(hcConfig)).isNull(); |
| } |
| |
| @Test |
| public void getHealthCheckedServiceName_healthCheckConfigHasServiceName() { |
| HashMap<String, Object> hcConfig = new HashMap<>(); |
| hcConfig.put("serviceName", "FooService"); |
| assertThat(ServiceConfigUtil.getHealthCheckedServiceName(hcConfig)) |
| .isEqualTo("FooService"); |
| } |
| |
| @Test |
| public void balancerShutdown() { |
| Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| |
| verify(origLb).handleResolvedAddresses(result); |
| verifyNoMoreInteractions(origLb); |
| ServerSideCall[] serverCalls = new ServerSideCall[NUM_SUBCHANNELS]; |
| |
| final Subchannel[] wrappedSubchannels = new Subchannel[NUM_SUBCHANNELS]; |
| |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| Subchannel subchannel = createSubchannel(i, Attributes.EMPTY); |
| wrappedSubchannels[i] = subchannel; |
| SubchannelStateListener mockListener = mockStateListeners[i]; |
| assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[i]); |
| |
| // Trigger the health check |
| deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY)); |
| |
| HealthImpl healthImpl = healthImpls[i]; |
| assertThat(healthImpl.calls).hasSize(1); |
| serverCalls[i] = healthImpl.calls.poll(); |
| assertThat(serverCalls[i].cancelled).isFalse(); |
| |
| verify(mockListener).onSubchannelState( |
| eq(ConnectivityStateInfo.forNonError(CONNECTING))); |
| } |
| |
| doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock invocation) { |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| wrappedSubchannels[i].shutdown(); |
| } |
| return null; |
| } |
| }).when(origLb).shutdown(); |
| |
| // Shut down the balancer |
| hcLbEventDelivery.shutdown(); |
| verify(origLb).shutdown(); |
| |
| // Health check stream should be cancelled |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| assertThat(serverCalls[i].cancelled).isTrue(); |
| verifyNoMoreInteractions(origLb); |
| verify(mockStateListeners[i]).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); |
| // No more health check call is made or scheduled |
| assertThat(healthImpls[i].calls).isEmpty(); |
| } |
| |
| assertThat(clock.getPendingTasks()).isEmpty(); |
| } |
| |
| @Test |
| public void util_newHealthCheckingLoadBalancer() { |
| LoadBalancer.Factory hcFactory = |
| new LoadBalancer.Factory() { |
| @Override |
| public LoadBalancer newLoadBalancer(Helper helper) { |
| return HealthCheckingLoadBalancerUtil.newHealthCheckingLoadBalancer( |
| origLbFactory, helper); |
| } |
| }; |
| |
| // hcLb and wrappedHelper are already set in setUp(). For this special test case, we |
| // clear wrappedHelper so that we can create hcLb again with the util. |
| wrappedHelper = null; |
| hcLb = hcFactory.newLoadBalancer(origHelper); |
| |
| // Verify that HC works |
| Attributes resolutionAttrs = attrsWithHealthCheckService("BarService"); |
| ResolvedAddresses result = ResolvedAddresses.newBuilder() |
| .setAddresses(resolvedAddressList) |
| .setAttributes(resolutionAttrs) |
| .build(); |
| hcLbEventDelivery.handleResolvedAddresses(result); |
| verify(origLb).handleResolvedAddresses(result); |
| createSubchannel(0, Attributes.EMPTY); |
| assertThat(healthImpls[0].calls).isEmpty(); |
| deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); |
| assertThat(healthImpls[0].calls).hasSize(1); |
| } |
| |
| private Attributes attrsWithHealthCheckService(@Nullable String serviceName) { |
| HashMap<String, Object> hcConfig = new HashMap<>(); |
| hcConfig.put("serviceName", serviceName); |
| return Attributes.newBuilder() |
| .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, hcConfig) |
| .build(); |
| } |
| |
| private HealthCheckRequest makeRequest(String service) { |
| return HealthCheckRequest.newBuilder().setService(service).build(); |
| } |
| |
| private HealthCheckResponse makeResponse(ServingStatus status) { |
| return HealthCheckResponse.newBuilder().setStatus(status).build(); |
| } |
| |
| private ConnectivityStateInfo unavailableStateWithMsg(final String expectedMsg) { |
| return MockitoHamcrest.argThat( |
| new org.hamcrest.BaseMatcher<ConnectivityStateInfo>() { |
| @Override |
| public boolean matches(Object item) { |
| if (!(item instanceof ConnectivityStateInfo)) { |
| return false; |
| } |
| ConnectivityStateInfo info = (ConnectivityStateInfo) item; |
| if (!info.getState().equals(TRANSIENT_FAILURE)) { |
| return false; |
| } |
| Status error = info.getStatus(); |
| if (!error.getCode().equals(Code.UNAVAILABLE)) { |
| return false; |
| } |
| if (!error.getDescription().equals(expectedMsg)) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public void describeTo(org.hamcrest.Description desc) { |
| desc.appendText("Matches unavailable state with msg='" + expectedMsg + "'"); |
| } |
| }); |
| } |
| |
| private static class HealthImpl extends HealthGrpc.HealthImplBase { |
| boolean checkCalled; |
| final Queue<ServerSideCall> calls = new ArrayDeque<>(); |
| |
| @Override |
| public void check(HealthCheckRequest request, |
| StreamObserver<HealthCheckResponse> responseObserver) { |
| responseObserver.onError(new UnsupportedOperationException("Should never be called")); |
| checkCalled = true; |
| } |
| |
| @Override |
| public void watch(HealthCheckRequest request, |
| StreamObserver<HealthCheckResponse> responseObserver) { |
| final ServerSideCall call = new ServerSideCall(request, responseObserver); |
| Context.current().addListener( |
| new CancellationListener() { |
| @Override |
| public void cancelled(Context ctx) { |
| call.cancelled = true; |
| } |
| }, MoreExecutors.directExecutor()); |
| calls.add(call); |
| } |
| } |
| |
| private static class ServerSideCall { |
| final HealthCheckRequest request; |
| final StreamObserver<HealthCheckResponse> responseObserver; |
| boolean cancelled; |
| |
| ServerSideCall( |
| HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) { |
| this.request = request; |
| this.responseObserver = responseObserver; |
| } |
| } |
| |
| private class FakeSubchannel extends Subchannel { |
| final List<EquivalentAddressGroup> eagList; |
| final Attributes attrs; |
| final Channel channel; |
| final ArrayList<String> logs = new ArrayList<>(); |
| final int index; |
| SubchannelStateListener listener; |
| boolean isShutdown; |
| private final ChannelLogger logger = new ChannelLogger() { |
| @Override |
| public void log(ChannelLogLevel level, String msg) { |
| logs.add(level + ": " + msg); |
| } |
| |
| @Override |
| public void log(ChannelLogLevel level, String template, Object... args) { |
| log(level, MessageFormat.format(template, args)); |
| } |
| }; |
| |
| FakeSubchannel(int index, CreateSubchannelArgs args, Channel channel) { |
| this.index = index; |
| this.eagList = args.getAddresses(); |
| this.attrs = args.getAttributes(); |
| this.channel = checkNotNull(channel); |
| } |
| |
| @Override |
| public void start(SubchannelStateListener listener) { |
| checkState(this.listener == null); |
| this.listener = listener; |
| } |
| |
| @Override |
| public void shutdown() { |
| isShutdown = true; |
| deliverSubchannelState(index, ConnectivityStateInfo.forNonError(SHUTDOWN)); |
| } |
| |
| @Override |
| public void requestConnection() { |
| throw new AssertionError("Should not be called"); |
| } |
| |
| @Override |
| public List<EquivalentAddressGroup> getAllAddresses() { |
| return eagList; |
| } |
| |
| @Override |
| public Attributes getAttributes() { |
| return attrs; |
| } |
| |
| @Override |
| public Channel asChannel() { |
| return channel; |
| } |
| |
| @Override |
| public ChannelLogger getChannelLogger() { |
| return logger; |
| } |
| } |
| |
| private class FakeHelper extends Helper { |
| @Override |
| public Subchannel createSubchannel(CreateSubchannelArgs args) { |
| int index = -1; |
| for (int i = 0; i < NUM_SUBCHANNELS; i++) { |
| if (eagLists[i].equals(args.getAddresses())) { |
| index = i; |
| break; |
| } |
| } |
| checkState(index >= 0, "addrs " + args.getAddresses() + " not found"); |
| FakeSubchannel subchannel = new FakeSubchannel(index, args, channels[index]); |
| checkState(subchannels[index] == null, "subchannels[" + index + "] already created"); |
| subchannels[index] = subchannel; |
| return subchannel; |
| } |
| |
| @Override |
| public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { |
| throw new AssertionError("Should not be called"); |
| } |
| |
| @Override |
| public SynchronizationContext getSynchronizationContext() { |
| return syncContext; |
| } |
| |
| @Override |
| public ScheduledExecutorService getScheduledExecutorService() { |
| return clock.getScheduledExecutorService(); |
| } |
| |
| @Override |
| public String getAuthority() { |
| throw new AssertionError("Should not be called"); |
| } |
| |
| @Override |
| public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { |
| throw new AssertionError("Should not be called"); |
| } |
| } |
| |
| private static class FakeSocketAddress extends SocketAddress { |
| final String name; |
| |
| FakeSocketAddress(String name) { |
| this.name = name; |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| // In reality wrappedHelper.createSubchannel() is always called from syncContext. |
| // Make sure it's the case in the test too. |
| private Subchannel createSubchannel(final int index, final Attributes attrs) { |
| final AtomicReference<Subchannel> returnedSubchannel = new AtomicReference<>(); |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| Subchannel s = wrappedHelper.createSubchannel(CreateSubchannelArgs.newBuilder() |
| .setAddresses(eagLists[index]) |
| .setAttributes(attrs) |
| .build()); |
| s.start(mockStateListeners[index]); |
| returnedSubchannel.set(s); |
| } |
| }); |
| return returnedSubchannel.get(); |
| } |
| |
| private void deliverSubchannelState(final int index, final ConnectivityStateInfo newState) { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| subchannels[index].listener.onSubchannelState(newState); |
| } |
| }); |
| } |
| |
| private static FakeSubchannel unwrap(Subchannel s) { |
| return (FakeSubchannel) ((SubchannelImpl) s).delegate(); |
| } |
| } |