| /* |
| * Copyright 2016 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.grpclb; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| import static io.grpc.ConnectivityState.CONNECTING; |
| import static io.grpc.ConnectivityState.IDLE; |
| import static io.grpc.ConnectivityState.READY; |
| import static io.grpc.ConnectivityState.SHUTDOWN; |
| import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; |
| import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY; |
| import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.AdditionalAnswers.delegatesTo; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Matchers.same; |
| import static org.mockito.Mockito.atLeast; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.inOrder; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.common.collect.Iterables; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.protobuf.ByteString; |
| import com.google.protobuf.util.Durations; |
| import com.google.protobuf.util.Timestamps; |
| import io.grpc.Attributes; |
| import io.grpc.CallOptions; |
| import io.grpc.ClientStreamTracer; |
| import io.grpc.ConnectivityState; |
| import io.grpc.ConnectivityStateInfo; |
| import io.grpc.EquivalentAddressGroup; |
| import io.grpc.LoadBalancer.Helper; |
| import io.grpc.LoadBalancer.PickResult; |
| import io.grpc.LoadBalancer.PickSubchannelArgs; |
| import io.grpc.LoadBalancer.Subchannel; |
| import io.grpc.LoadBalancer.SubchannelPicker; |
| import io.grpc.ManagedChannel; |
| import io.grpc.Metadata; |
| import io.grpc.Status; |
| import io.grpc.grpclb.GrpclbState.BackendEntry; |
| import io.grpc.grpclb.GrpclbState.DropEntry; |
| import io.grpc.grpclb.GrpclbState.ErrorEntry; |
| import io.grpc.grpclb.GrpclbState.RoundRobinPicker; |
| import io.grpc.inprocess.InProcessChannelBuilder; |
| import io.grpc.inprocess.InProcessServerBuilder; |
| import io.grpc.internal.BackoffPolicy; |
| import io.grpc.internal.FakeClock; |
| import io.grpc.internal.GrpcAttributes; |
| import io.grpc.internal.ObjectPool; |
| import io.grpc.internal.SerializingExecutor; |
| import io.grpc.internal.TimeProvider; |
| import io.grpc.lb.v1.ClientStats; |
| import io.grpc.lb.v1.ClientStatsPerToken; |
| import io.grpc.lb.v1.InitialLoadBalanceRequest; |
| import io.grpc.lb.v1.InitialLoadBalanceResponse; |
| import io.grpc.lb.v1.LoadBalanceRequest; |
| import io.grpc.lb.v1.LoadBalanceResponse; |
| import io.grpc.lb.v1.LoadBalancerGrpc; |
| import io.grpc.lb.v1.Server; |
| import io.grpc.lb.v1.ServerList; |
| import io.grpc.stub.StreamObserver; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import javax.annotation.Nullable; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.InOrder; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| /** Unit tests for {@link GrpclbLoadBalancer}. */ |
| @RunWith(JUnit4.class) |
| public class GrpclbLoadBalancerTest { |
| private static final Attributes.Key<String> RESOLUTION_ATTR = |
| Attributes.Key.create("resolution-attr"); |
| private static final String SERVICE_AUTHORITY = "api.google.com"; |
| private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = |
| new FakeClock.TaskFilter() { |
| @Override |
| public boolean shouldAccept(Runnable command) { |
| return command instanceof GrpclbState.LoadReportingTask; |
| } |
| }; |
| private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER = |
| new FakeClock.TaskFilter() { |
| @Override |
| public boolean shouldAccept(Runnable command) { |
| return command instanceof GrpclbState.FallbackModeTask; |
| } |
| }; |
| private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER = |
| new FakeClock.TaskFilter() { |
| @Override |
| public boolean shouldAccept(Runnable command) { |
| return command instanceof GrpclbState.LbRpcRetryTask; |
| } |
| }; |
| private static final Attributes LB_BACKEND_ATTRS = |
| Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build(); |
| |
| @Mock |
| private Helper helper; |
| @Mock |
| private SubchannelPool subchannelPool; |
| private SubchannelPicker currentPicker; |
| private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; |
| @Captor |
| private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor; |
| private final FakeClock fakeClock = new FakeClock(); |
| private final LinkedList<StreamObserver<LoadBalanceRequest>> lbRequestObservers = |
| new LinkedList<StreamObserver<LoadBalanceRequest>>(); |
| private final LinkedList<Subchannel> mockSubchannels = new LinkedList<Subchannel>(); |
| private final LinkedList<ManagedChannel> fakeOobChannels = new LinkedList<ManagedChannel>(); |
| private final ArrayList<Subchannel> subchannelTracker = new ArrayList<>(); |
| private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>(); |
| private final ArrayList<String> failingLbAuthorities = new ArrayList<>(); |
| private final TimeProvider timeProvider = new TimeProvider() { |
| @Override |
| public long currentTimeNanos() { |
| return fakeClock.getTicker().read(); |
| } |
| }; |
| private io.grpc.Server fakeLbServer; |
| @Captor |
| private ArgumentCaptor<SubchannelPicker> pickerCaptor; |
| private final SerializingExecutor channelExecutor = |
| new SerializingExecutor(MoreExecutors.directExecutor()); |
| @Mock |
| private ObjectPool<ScheduledExecutorService> timerServicePool; |
| @Mock |
| private BackoffPolicy.Provider backoffPolicyProvider; |
| @Mock |
| private BackoffPolicy backoffPolicy1; |
| @Mock |
| private BackoffPolicy backoffPolicy2; |
| private GrpclbLoadBalancer balancer; |
| |
| @SuppressWarnings("unchecked") |
| @Before |
| public void setUp() throws Exception { |
| MockitoAnnotations.initMocks(this); |
| mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo( |
| new LoadBalancerGrpc.LoadBalancerImplBase() { |
| @Override |
| public StreamObserver<LoadBalanceRequest> balanceLoad( |
| final StreamObserver<LoadBalanceResponse> responseObserver) { |
| StreamObserver<LoadBalanceRequest> requestObserver = |
| mock(StreamObserver.class); |
| Answer<Void> closeRpc = new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock invocation) { |
| responseObserver.onCompleted(); |
| return null; |
| } |
| }; |
| doAnswer(closeRpc).when(requestObserver).onCompleted(); |
| lbRequestObservers.add(requestObserver); |
| return requestObserver; |
| } |
| })); |
| fakeLbServer = InProcessServerBuilder.forName("fakeLb") |
| .directExecutor().addService(mockLbService).build().start(); |
| doAnswer(new Answer<ManagedChannel>() { |
| @Override |
| public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { |
| String authority = (String) invocation.getArguments()[1]; |
| ManagedChannel channel; |
| if (failingLbAuthorities.contains(authority)) { |
| channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() |
| .overrideAuthority(authority).build(); |
| } else { |
| channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() |
| .overrideAuthority(authority).build(); |
| } |
| fakeOobChannels.add(channel); |
| oobChannelTracker.add(channel); |
| return channel; |
| } |
| }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); |
| doAnswer(new Answer<Subchannel>() { |
| @Override |
| public Subchannel answer(InvocationOnMock invocation) throws Throwable { |
| Subchannel subchannel = mock(Subchannel.class); |
| EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; |
| Attributes attrs = (Attributes) invocation.getArguments()[1]; |
| when(subchannel.getAddresses()).thenReturn(eag); |
| when(subchannel.getAttributes()).thenReturn(attrs); |
| mockSubchannels.add(subchannel); |
| subchannelTracker.add(subchannel); |
| return subchannel; |
| } |
| }).when(subchannelPool).takeOrCreateSubchannel( |
| any(EquivalentAddressGroup.class), any(Attributes.class)); |
| doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock invocation) throws Throwable { |
| Runnable task = (Runnable) invocation.getArguments()[0]; |
| channelExecutor.execute(task); |
| return null; |
| } |
| }).when(helper).runSerialized(any(Runnable.class)); |
| doAnswer(new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock invocation) throws Throwable { |
| currentPicker = (SubchannelPicker) invocation.getArguments()[1]; |
| return null; |
| } |
| }).when(helper).updateBalancingState( |
| any(ConnectivityState.class), any(SubchannelPicker.class)); |
| when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); |
| ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService(); |
| when(timerServicePool.getObject()).thenReturn(timerService); |
| when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); |
| when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); |
| when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); |
| balancer = new GrpclbLoadBalancer( |
| helper, |
| subchannelPool, |
| timerServicePool, |
| timeProvider, |
| backoffPolicyProvider); |
| verify(subchannelPool).init(same(helper), same(timerService)); |
| } |
| |
| @After |
| public void tearDown() { |
| try { |
| if (balancer != null) { |
| channelExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| balancer.shutdown(); |
| } |
| }); |
| } |
| for (ManagedChannel channel : oobChannelTracker) { |
| assertTrue(channel + " is shutdown", channel.isShutdown()); |
| // balancer should have closed the LB stream, terminating the OOB channel. |
| assertTrue(channel + " is terminated", channel.isTerminated()); |
| } |
| // GRPCLB manages subchannels only through subchannelPool |
| for (Subchannel subchannel: subchannelTracker) { |
| verify(subchannelPool).returnSubchannel(same(subchannel)); |
| // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if |
| // LoadBalancer has called it expectedly. |
| verify(subchannel, never()).shutdown(); |
| } |
| verify(helper, never()) |
| .createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); |
| // No timer should linger after shutdown |
| assertThat(fakeClock.getPendingTasks()).isEmpty(); |
| } finally { |
| if (fakeLbServer != null) { |
| fakeLbServer.shutdownNow(); |
| } |
| } |
| } |
| |
| @Test |
| public void roundRobinPickerNoDrop() { |
| GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); |
| Subchannel subchannel = mock(Subchannel.class); |
| BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); |
| BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); |
| |
| List<BackendEntry> pickList = Arrays.asList(b1, b2); |
| RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList); |
| |
| PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); |
| Metadata headers1 = new Metadata(); |
| // The existing token on the headers will be replaced |
| headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); |
| when(args1.getHeaders()).thenReturn(headers1); |
| assertSame(b1.result, picker.pickSubchannel(args1)); |
| verify(args1).getHeaders(); |
| assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); |
| |
| PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); |
| Metadata headers2 = new Metadata(); |
| when(args2.getHeaders()).thenReturn(headers2); |
| assertSame(b2.result, picker.pickSubchannel(args2)); |
| verify(args2).getHeaders(); |
| assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); |
| |
| PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); |
| Metadata headers3 = new Metadata(); |
| when(args3.getHeaders()).thenReturn(headers3); |
| assertSame(b1.result, picker.pickSubchannel(args3)); |
| verify(args3).getHeaders(); |
| assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); |
| |
| verify(subchannel, never()).getAttributes(); |
| } |
| |
| |
| @Test |
| public void roundRobinPickerWithDrop() { |
| assertTrue(DROP_PICK_RESULT.isDrop()); |
| GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); |
| Subchannel subchannel = mock(Subchannel.class); |
| // 1 out of 2 requests are to be dropped |
| DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003"); |
| List<DropEntry> dropList = Arrays.asList(null, d); |
| |
| BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); |
| BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); |
| List<BackendEntry> pickList = Arrays.asList(b1, b2); |
| RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList); |
| |
| // dropList[0], pickList[0] |
| PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); |
| Metadata headers1 = new Metadata(); |
| headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); |
| when(args1.getHeaders()).thenReturn(headers1); |
| assertSame(b1.result, picker.pickSubchannel(args1)); |
| verify(args1).getHeaders(); |
| assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); |
| |
| // dropList[1]: drop |
| PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); |
| Metadata headers2 = new Metadata(); |
| when(args2.getHeaders()).thenReturn(headers2); |
| assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2)); |
| verify(args2, never()).getHeaders(); |
| |
| // dropList[0], pickList[1] |
| PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); |
| Metadata headers3 = new Metadata(); |
| when(args3.getHeaders()).thenReturn(headers3); |
| assertSame(b2.result, picker.pickSubchannel(args3)); |
| verify(args3).getHeaders(); |
| assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); |
| |
| // dropList[1]: drop |
| PickSubchannelArgs args4 = mock(PickSubchannelArgs.class); |
| Metadata headers4 = new Metadata(); |
| when(args4.getHeaders()).thenReturn(headers4); |
| assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4)); |
| verify(args4, never()).getHeaders(); |
| |
| // dropList[0], pickList[0] |
| PickSubchannelArgs args5 = mock(PickSubchannelArgs.class); |
| Metadata headers5 = new Metadata(); |
| when(args5.getHeaders()).thenReturn(headers5); |
| assertSame(b1.result, picker.pickSubchannel(args5)); |
| verify(args5).getHeaders(); |
| assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); |
| |
| verify(subchannel, never()).getAttributes(); |
| } |
| |
| @Test |
| public void loadReporting() { |
| Metadata headers = new Metadata(); |
| PickSubchannelArgs args = mock(PickSubchannelArgs.class); |
| when(args.getHeaders()).thenReturn(headers); |
| |
| long loadReportIntervalMillis = 1983; |
| List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| // Fallback timer is started as soon as address is resolved. |
| assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| |
| assertEquals(1, fakeOobChannels.size()); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| InOrder inOrder = inOrder(lbRequestObserver); |
| InOrder helperInOrder = inOrder(helper, subchannelPool); |
| |
| inOrder.verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| |
| // Simulate receiving LB response |
| assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); |
| |
| // Load reporting task is scheduled |
| assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| assertEquals(0, fakeClock.runDueTasks()); |
| |
| List<ServerEntry> backends = Arrays.asList( |
| new ServerEntry("127.0.0.1", 2000, "token0001"), |
| new ServerEntry("token0001"), // drop |
| new ServerEntry("127.0.0.1", 2010, "token0002"), |
| new ServerEntry("token0003")); // drop |
| |
| lbResponseObserver.onNext(buildLbResponse(backends)); |
| |
| assertEquals(2, mockSubchannels.size()); |
| Subchannel subchannel1 = mockSubchannels.poll(); |
| Subchannel subchannel2 = mockSubchannels.poll(); |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); |
| deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); |
| deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); |
| |
| helperInOrder.verify(helper, atLeast(1)) |
| .updateBalancingState(eq(READY), pickerCaptor.capture()); |
| RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker.dropList).containsExactly( |
| null, |
| new DropEntry(getLoadRecorder(), "token0001"), |
| null, |
| new DropEntry(getLoadRecorder(), "token0003")).inOrder(); |
| assertThat(picker.pickList).containsExactly( |
| new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), |
| new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); |
| |
| // Report, no data |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder().build()); |
| |
| PickResult pick1 = picker.pickSubchannel(args); |
| assertSame(subchannel1, pick1.getSubchannel()); |
| assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); |
| |
| // Merely the pick will not be recorded as upstart. |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder().build()); |
| |
| ClientStreamTracer tracer1 = |
| pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); |
| |
| PickResult pick2 = picker.pickSubchannel(args); |
| assertNull(pick2.getSubchannel()); |
| assertSame(DROP_PICK_RESULT, pick2); |
| |
| // Report includes upstart of pick1 and the drop of pick2 |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder() |
| .setNumCallsStarted(2) |
| .setNumCallsFinished(1) // pick2 |
| .addCallsFinishedWithDrop( |
| ClientStatsPerToken.newBuilder() |
| .setLoadBalanceToken("token0001") |
| .setNumCalls(1) // pick2 |
| .build()) |
| .build()); |
| |
| PickResult pick3 = picker.pickSubchannel(args); |
| assertSame(subchannel2, pick3.getSubchannel()); |
| assertSame(getLoadRecorder(), pick3.getStreamTracerFactory()); |
| ClientStreamTracer tracer3 = |
| pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); |
| |
| // pick3 has sent out headers |
| tracer3.outboundHeaders(); |
| |
| // 3rd report includes pick3's upstart |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder() |
| .setNumCallsStarted(1) |
| .build()); |
| |
| PickResult pick4 = picker.pickSubchannel(args); |
| assertNull(pick4.getSubchannel()); |
| assertSame(DROP_PICK_RESULT, pick4); |
| |
| // pick1 ended without sending anything |
| tracer1.streamClosed(Status.CANCELLED); |
| |
| // 4th report includes end of pick1 and drop of pick4 |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder() |
| .setNumCallsStarted(1) // pick4 |
| .setNumCallsFinished(2) |
| .setNumCallsFinishedWithClientFailedToSend(1) // pick1 |
| .addCallsFinishedWithDrop( |
| ClientStatsPerToken.newBuilder() |
| .setLoadBalanceToken("token0003") |
| .setNumCalls(1) // pick4 |
| .build()) |
| .build()); |
| |
| PickResult pick5 = picker.pickSubchannel(args); |
| assertSame(subchannel1, pick1.getSubchannel()); |
| assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); |
| ClientStreamTracer tracer5 = |
| pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); |
| |
| // pick3 ended without receiving response headers |
| tracer3.streamClosed(Status.DEADLINE_EXCEEDED); |
| |
| // pick5 sent and received headers |
| tracer5.outboundHeaders(); |
| tracer5.inboundHeaders(); |
| |
| // 5th report includes pick3's end and pick5's upstart |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder() |
| .setNumCallsStarted(1) // pick5 |
| .setNumCallsFinished(1) // pick3 |
| .build()); |
| |
| // pick5 ends |
| tracer5.streamClosed(Status.OK); |
| |
| // 6th report includes pick5's end |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder() |
| .setNumCallsFinished(1) |
| .setNumCallsFinishedKnownReceived(1) |
| .build()); |
| |
| assertEquals(1, fakeClock.numPendingTasks()); |
| // Balancer closes the stream, scheduled reporting task cancelled |
| lbResponseObserver.onError(Status.UNAVAILABLE.asException()); |
| assertEquals(0, fakeClock.numPendingTasks()); |
| |
| // New stream created |
| verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| inOrder = inOrder(lbRequestObserver); |
| |
| inOrder.verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| |
| // Load reporting is also requested |
| lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); |
| |
| // No picker created because balancer is still using the results from the last stream |
| helperInOrder.verify(helper, never()) |
| .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| |
| // Make a new pick on that picker. It will not show up on the report of the new stream, because |
| // that picker is associated with the previous stream. |
| PickResult pick6 = picker.pickSubchannel(args); |
| assertNull(pick6.getSubchannel()); |
| assertSame(DROP_PICK_RESULT, pick6); |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder().build()); |
| |
| // New stream got the list update |
| lbResponseObserver.onNext(buildLbResponse(backends)); |
| |
| // Same backends, thus no new subchannels |
| helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( |
| any(EquivalentAddressGroup.class), any(Attributes.class)); |
| // But the new RoundRobinEntries have a new loadRecorder, thus considered different from |
| // the previous list, thus a new picker is created |
| helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| |
| PickResult pick1p = picker.pickSubchannel(args); |
| assertSame(subchannel1, pick1p.getSubchannel()); |
| assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory()); |
| pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); |
| |
| // The pick from the new stream will be included in the report |
| assertNextReport( |
| inOrder, lbRequestObserver, loadReportIntervalMillis, |
| ClientStats.newBuilder() |
| .setNumCallsStarted(1) |
| .build()); |
| |
| verify(args, atLeast(0)).getHeaders(); |
| verifyNoMoreInteractions(args); |
| } |
| |
| @Test |
| public void abundantInitialResponse() { |
| Metadata headers = new Metadata(); |
| PickSubchannelArgs args = mock(PickSubchannelArgs.class); |
| when(args.getHeaders()).thenReturn(headers); |
| |
| List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| assertEquals(1, fakeOobChannels.size()); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| |
| // Simulate LB initial response |
| assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| lbResponseObserver.onNext(buildInitialResponse(1983)); |
| |
| // Load reporting task is scheduled |
| assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); |
| assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); |
| |
| // Simulate an abundant LB initial response, with a different report interval |
| lbResponseObserver.onNext(buildInitialResponse(9097)); |
| // It doesn't affect load-reporting at all |
| assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) |
| .containsExactly(scheduledTask); |
| assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); |
| } |
| |
| @Test |
| public void raceBetweenLoadReportingAndLbStreamClosure() { |
| Metadata headers = new Metadata(); |
| PickSubchannelArgs args = mock(PickSubchannelArgs.class); |
| when(args.getHeaders()).thenReturn(headers); |
| |
| List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| assertEquals(1, fakeOobChannels.size()); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| InOrder inOrder = inOrder(lbRequestObserver); |
| |
| inOrder.verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| |
| // Simulate receiving LB response |
| assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| lbResponseObserver.onNext(buildInitialResponse(1983)); |
| |
| // Load reporting task is scheduled |
| assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); |
| assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); |
| |
| // Close lbStream |
| lbResponseObserver.onCompleted(); |
| |
| // Reporting task cancelled |
| assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| |
| // Simulate a race condition where the task has just started when its cancelled |
| scheduledTask.command.run(); |
| |
| // No report sent. No new task scheduled |
| inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class)); |
| assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| } |
| |
| private void assertNextReport( |
| InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, |
| long loadReportIntervalMillis, ClientStats expectedReport) { |
| assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS)); |
| inOrder.verifyNoMoreInteractions(); |
| assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS)); |
| assertEquals(1, fakeClock.numPendingTasks()); |
| inOrder.verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder() |
| .setClientStats( |
| ClientStats.newBuilder(expectedReport) |
| .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) |
| .build()) |
| .build())); |
| } |
| |
| @Test |
| public void acquireAndReleaseScheduledExecutor() { |
| verify(timerServicePool).getObject(); |
| verifyNoMoreInteractions(timerServicePool); |
| |
| balancer.shutdown(); |
| verify(timerServicePool).returnObject(same(fakeClock.getScheduledExecutorService())); |
| verifyNoMoreInteractions(timerServicePool); |
| } |
| |
| @Test |
| public void nameResolutionFailsThenRecoverToDelegate() { |
| Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); |
| deliverNameResolutionError(error); |
| verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); |
| RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker.dropList).isEmpty(); |
| assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); |
| |
| // Recover with a subsequent success |
| List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false); |
| |
| Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); |
| deliverResolvedAddresses(resolvedServers, resolutionAttrs); |
| } |
| |
| @Test |
| public void nameResolutionFailsThenRecoverToGrpclb() { |
| Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); |
| deliverNameResolutionError(error); |
| verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); |
| RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker.dropList).isEmpty(); |
| assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); |
| |
| // Recover with a subsequent success |
| List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true); |
| EquivalentAddressGroup eag = resolvedServers.get(0); |
| |
| Attributes resolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(resolvedServers, resolutionAttrs); |
| |
| verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| } |
| |
| @Test |
| public void grpclbThenNameResolutionFails() { |
| InOrder inOrder = inOrder(helper, subchannelPool); |
| // Go to GRPCLB first |
| List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); |
| assertEquals(1, fakeOobChannels.size()); |
| ManagedChannel oobChannel = fakeOobChannels.poll(); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| |
| // Let name resolution fail before round-robin list is ready |
| Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); |
| deliverNameResolutionError(error); |
| |
| inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); |
| RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker.dropList).isEmpty(); |
| assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); |
| assertFalse(oobChannel.isShutdown()); |
| |
| // Simulate receiving LB response |
| List<ServerEntry> backends = Arrays.asList( |
| new ServerEntry("127.0.0.1", 2000, "TOKEN1"), |
| new ServerEntry("127.0.0.1", 2010, "TOKEN2")); |
| verify(helper, never()).runSerialized(any(Runnable.class)); |
| lbResponseObserver.onNext(buildInitialResponse()); |
| lbResponseObserver.onNext(buildLbResponse(backends)); |
| |
| verify(helper, times(2)).runSerialized(any(Runnable.class)); |
| inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)), |
| any(Attributes.class)); |
| inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)), |
| any(Attributes.class)); |
| } |
| |
| @Test |
| public void grpclbUpdatedAddresses_avoidsReconnect() { |
| List<EquivalentAddressGroup> grpclbResolutionList = |
| createResolvedServerAddresses(true, false); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); |
| ManagedChannel oobChannel = fakeOobChannels.poll(); |
| assertEquals(1, lbRequestObservers.size()); |
| |
| List<EquivalentAddressGroup> grpclbResolutionList2 = |
| createResolvedServerAddresses(true, false, true); |
| EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList( |
| grpclbResolutionList2.get(0).getAddresses().get(0), |
| grpclbResolutionList2.get(2).getAddresses().get(0)), |
| lbAttributes(lbAuthority(0))); |
| deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); |
| verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag)); |
| assertEquals(1, lbRequestObservers.size()); // No additional RPC |
| } |
| |
| @Test |
| public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { |
| List<EquivalentAddressGroup> grpclbResolutionList = |
| createResolvedServerAddresses(true, false); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); |
| ManagedChannel oobChannel = fakeOobChannels.poll(); |
| assertEquals(1, lbRequestObservers.size()); |
| |
| final String newAuthority = "some-new-authority"; |
| List<EquivalentAddressGroup> grpclbResolutionList2 = |
| createResolvedServerAddresses(false); |
| grpclbResolutionList2.add(new EquivalentAddressGroup( |
| new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); |
| deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); |
| assertTrue(oobChannel.isTerminated()); |
| verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority)); |
| assertEquals(2, lbRequestObservers.size()); // An additional RPC |
| } |
| |
| @Test |
| public void grpclbWorking() { |
| InOrder inOrder = inOrder(helper, subchannelPool); |
| List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| // Fallback timer is started as soon as the addresses are resolved. |
| assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| |
| verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); |
| assertEquals(1, fakeOobChannels.size()); |
| ManagedChannel oobChannel = fakeOobChannels.poll(); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| |
| // Simulate receiving LB response |
| List<ServerEntry> backends1 = Arrays.asList( |
| new ServerEntry("127.0.0.1", 2000, "token0001"), |
| new ServerEntry("127.0.0.1", 2010, "token0002")); |
| inOrder.verify(helper, never()) |
| .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| lbResponseObserver.onNext(buildInitialResponse()); |
| lbResponseObserver.onNext(buildLbResponse(backends1)); |
| |
| inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), |
| any(Attributes.class)); |
| inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), |
| any(Attributes.class)); |
| assertEquals(2, mockSubchannels.size()); |
| Subchannel subchannel1 = mockSubchannels.poll(); |
| Subchannel subchannel2 = mockSubchannels.poll(); |
| verify(subchannel1).requestConnection(); |
| verify(subchannel2).requestConnection(); |
| assertEquals( |
| new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS), |
| subchannel1.getAddresses()); |
| assertEquals( |
| new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS), |
| subchannel2.getAddresses()); |
| |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); |
| deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); |
| inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); |
| RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker0.dropList).containsExactly(null, null); |
| assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // Let subchannels be connected |
| deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); |
| |
| assertThat(picker1.dropList).containsExactly(null, null); |
| assertThat(picker1.pickList).containsExactly( |
| new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); |
| |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker2.dropList).containsExactly(null, null); |
| assertThat(picker2.pickList).containsExactly( |
| new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), |
| new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) |
| .inOrder(); |
| |
| // Disconnected subchannels |
| verify(subchannel1).requestConnection(); |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); |
| verify(subchannel1, times(2)).requestConnection(); |
| inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker3.dropList).containsExactly(null, null); |
| assertThat(picker3.pickList).containsExactly( |
| new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); |
| |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // As long as there is at least one READY subchannel, round robin will work. |
| Status error1 = Status.UNAVAILABLE.withDescription("error1"); |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING |
| verify(subchannel2).requestConnection(); |
| deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE)); |
| verify(subchannel2, times(2)).requestConnection(); |
| inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); |
| RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker4.dropList).containsExactly(null, null); |
| assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); |
| |
| // Update backends, with a drop entry |
| List<ServerEntry> backends2 = |
| Arrays.asList( |
| new ServerEntry("127.0.0.1", 2030, "token0003"), // New address |
| new ServerEntry("token0003"), // drop |
| new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed |
| new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time |
| new ServerEntry("token0006")); // drop |
| verify(subchannelPool, never()).returnSubchannel(same(subchannel1)); |
| |
| lbResponseObserver.onNext(buildLbResponse(backends2)); |
| // not in backends2, closed |
| verify(subchannelPool).returnSubchannel(same(subchannel1)); |
| // backends2[2], will be kept |
| verify(subchannelPool, never()).returnSubchannel(same(subchannel2)); |
| |
| inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( |
| eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)), |
| any(Attributes.class)); |
| inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)), |
| any(Attributes.class)); |
| assertEquals(1, mockSubchannels.size()); |
| Subchannel subchannel3 = mockSubchannels.poll(); |
| verify(subchannel3).requestConnection(); |
| assertEquals( |
| new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS), |
| subchannel3.getAddresses()); |
| inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); |
| RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker7.dropList).containsExactly( |
| null, |
| new DropEntry(getLoadRecorder(), "token0003"), |
| null, |
| null, |
| new DropEntry(getLoadRecorder(), "token0006")).inOrder(); |
| assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); |
| |
| // State updates on obsolete subchannel1 will have no effect |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); |
| deliverSubchannelState( |
| subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); |
| deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker8.dropList).containsExactly( |
| null, |
| new DropEntry(getLoadRecorder(), "token0003"), |
| null, |
| null, |
| new DropEntry(getLoadRecorder(), "token0006")).inOrder(); |
| // subchannel2 is still IDLE, thus not in the active list |
| assertThat(picker8.pickList).containsExactly( |
| new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), |
| new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); |
| // subchannel2 becomes READY and makes it into the list |
| deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker9.dropList).containsExactly( |
| null, |
| new DropEntry(getLoadRecorder(), "token0003"), |
| null, |
| null, |
| new DropEntry(getLoadRecorder(), "token0006")).inOrder(); |
| assertThat(picker9.pickList).containsExactly( |
| new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), |
| new BackendEntry(subchannel2, getLoadRecorder(), "token0004"), |
| new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); |
| verify(subchannelPool, never()).returnSubchannel(same(subchannel3)); |
| |
| // Update backends, with no entry |
| lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); |
| verify(subchannelPool).returnSubchannel(same(subchannel2)); |
| verify(subchannelPool).returnSubchannel(same(subchannel3)); |
| inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); |
| RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker10.dropList).isEmpty(); |
| assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY); |
| |
| assertFalse(oobChannel.isShutdown()); |
| assertEquals(0, lbRequestObservers.size()); |
| verify(lbRequestObserver, never()).onCompleted(); |
| verify(lbRequestObserver, never()).onError(any(Throwable.class)); |
| |
| // Load reporting was not requested, thus never scheduled |
| assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); |
| |
| verify(subchannelPool, never()).clear(); |
| balancer.shutdown(); |
| verify(subchannelPool).clear(); |
| } |
| |
| @Test |
| public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() { |
| subtestGrpclbFallbackInitialTimeout(false); |
| } |
| |
| @Test |
| public void grpclbFallback_initialTimeout_timerExpires() { |
| subtestGrpclbFallbackInitialTimeout(true); |
| } |
| |
| // Fallback or not within the period of the initial timeout. |
| private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { |
| long loadReportIntervalMillis = 1983; |
| InOrder inOrder = inOrder(helper, subchannelPool); |
| |
| // Create a resolution list with a mixture of balancer and backend addresses |
| List<EquivalentAddressGroup> resolutionList = |
| createResolvedServerAddresses(false, true, false); |
| Attributes resolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(resolutionList, resolutionAttrs); |
| |
| inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); |
| |
| // Attempted to connect to balancer |
| assertEquals(1, fakeOobChannels.size()); |
| ManagedChannel oobChannel = fakeOobChannels.poll(); |
| verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| |
| verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); |
| // We don't care if runSerialized() has been run. |
| inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); |
| assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| |
| ///////////////////////////////////////////// |
| // Break the LB stream before timer expires |
| ///////////////////////////////////////////// |
| Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); |
| lbResponseObserver.onError(streamError.asException()); |
| // Not in fallback mode. The error will be propagated. |
| verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); |
| RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker.dropList).isEmpty(); |
| ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); |
| Status status = errorEntry.result.getStatus(); |
| assertThat(status.getCode()).isEqualTo(streamError.getCode()); |
| assertThat(status.getDescription()).contains(streamError.getDescription()); |
| // A new stream is created |
| verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| |
| ////////////////////////////////// |
| // Fallback timer expires (or not) |
| ////////////////////////////////// |
| if (timerExpires) { |
| fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); |
| assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| // Fall back to the backends from resolver |
| fallbackTestVerifyUseOfFallbackBackendLists( |
| inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); |
| |
| assertFalse(oobChannel.isShutdown()); |
| verify(lbRequestObserver, never()).onCompleted(); |
| } |
| |
| //////////////////////////////////////////////////////// |
| // Name resolver sends new list without any backend addr |
| //////////////////////////////////////////////////////// |
| resolutionList = createResolvedServerAddresses(true, true); |
| deliverResolvedAddresses(resolutionList, resolutionAttrs); |
| |
| // New addresses are updated to the OobChannel |
| inOrder.verify(helper).updateOobChannelAddresses( |
| same(oobChannel), |
| eq(new EquivalentAddressGroup( |
| Arrays.asList( |
| resolutionList.get(0).getAddresses().get(0), |
| resolutionList.get(1).getAddresses().get(0)), |
| lbAttributes(lbAuthority(0))))); |
| |
| if (timerExpires) { |
| // Still in fallback logic, except that the backend list is empty |
| fallbackTestVerifyUseOfFallbackBackendLists( |
| inOrder, Collections.<EquivalentAddressGroup>emptyList()); |
| } |
| |
| ////////////////////////////////////////////////// |
| // Name resolver sends new list with backend addrs |
| ////////////////////////////////////////////////// |
| resolutionList = createResolvedServerAddresses(true, false, false); |
| deliverResolvedAddresses(resolutionList, resolutionAttrs); |
| |
| // New LB address is updated to the OobChannel |
| inOrder.verify(helper).updateOobChannelAddresses( |
| same(oobChannel), |
| eq(resolutionList.get(0))); |
| |
| if (timerExpires) { |
| // New backend addresses are used for fallback |
| fallbackTestVerifyUseOfFallbackBackendLists( |
| inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); |
| } |
| |
| //////////////////////////////////////////////// |
| // Break the LB stream after the timer expires |
| //////////////////////////////////////////////// |
| if (timerExpires) { |
| lbResponseObserver.onError(streamError.asException()); |
| |
| // The error will NOT propagate to picker because fallback list is in use. |
| inOrder.verify(helper, never()) |
| .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| // A new stream is created |
| verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| } |
| |
| ///////////////////////////////// |
| // Balancer returns a server list |
| ///////////////////////////////// |
| List<ServerEntry> serverList = Arrays.asList( |
| new ServerEntry("127.0.0.1", 2000, "token0001"), |
| new ServerEntry("127.0.0.1", 2010, "token0002")); |
| lbResponseObserver.onNext(buildInitialResponse()); |
| lbResponseObserver.onNext(buildLbResponse(serverList)); |
| |
| // Balancer-provided server list now in effect |
| fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); |
| |
| /////////////////////////////////////////////////////////////// |
| // New backend addresses from resolver outside of fallback mode |
| /////////////////////////////////////////////////////////////// |
| resolutionList = createResolvedServerAddresses(true, false); |
| deliverResolvedAddresses(resolutionList, resolutionAttrs); |
| // Will not affect the round robin list at all |
| inOrder.verify(helper, never()) |
| .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| |
| // No fallback timeout timer scheduled. |
| assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| } |
| |
| @Test |
| public void grpclbFallback_balancerLost() { |
| subtestGrpclbFallbackConnectionLost(true, false); |
| } |
| |
| @Test |
| public void grpclbFallback_subchannelsLost() { |
| subtestGrpclbFallbackConnectionLost(false, true); |
| } |
| |
| @Test |
| public void grpclbFallback_allLost() { |
| subtestGrpclbFallbackConnectionLost(true, true); |
| } |
| |
| // Fallback outside of the initial timeout, where all connections are lost. |
| private void subtestGrpclbFallbackConnectionLost( |
| boolean balancerBroken, boolean allSubchannelsBroken) { |
| long loadReportIntervalMillis = 1983; |
| InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); |
| |
| // Create a resolution list with a mixture of balancer and backend addresses |
| List<EquivalentAddressGroup> resolutionList = |
| createResolvedServerAddresses(false, true, false); |
| Attributes resolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(resolutionList, resolutionAttrs); |
| |
| inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); |
| |
| // Attempted to connect to balancer |
| assertEquals(1, fakeOobChannels.size()); |
| fakeOobChannels.poll(); |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| |
| verify(lbRequestObserver).onNext( |
| eq(LoadBalanceRequest.newBuilder().setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build())); |
| lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); |
| // We don't care if runSerialized() has been run. |
| inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); |
| inOrder.verifyNoMoreInteractions(); |
| |
| // Balancer returns a server list |
| List<ServerEntry> serverList = Arrays.asList( |
| new ServerEntry("127.0.0.1", 2000, "token0001"), |
| new ServerEntry("127.0.0.1", 2010, "token0002")); |
| lbResponseObserver.onNext(buildInitialResponse()); |
| lbResponseObserver.onNext(buildLbResponse(serverList)); |
| |
| List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); |
| |
| // Break connections |
| if (balancerBroken) { |
| lbResponseObserver.onError(Status.UNAVAILABLE.asException()); |
| // A new stream to LB is created |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| } |
| if (allSubchannelsBroken) { |
| for (Subchannel subchannel : subchannels) { |
| // A READY subchannel transits to IDLE when receiving a go-away |
| deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); |
| } |
| } |
| |
| if (balancerBroken && allSubchannelsBroken) { |
| // Going into fallback |
| subchannels = fallbackTestVerifyUseOfFallbackBackendLists( |
| inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); |
| |
| // When in fallback mode, fallback timer should not be scheduled when all backend |
| // connections are lost |
| for (Subchannel subchannel : subchannels) { |
| deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); |
| } |
| |
| // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer |
| List<ServerEntry> serverList2 = Arrays.asList( |
| new ServerEntry("127.0.0.1", 2001, "token0003"), |
| new ServerEntry("127.0.0.1", 2011, "token0004")); |
| lbResponseObserver.onNext(buildInitialResponse()); |
| lbResponseObserver.onNext(buildLbResponse(serverList2)); |
| |
| fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2); |
| } |
| assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| |
| if (!(balancerBroken && allSubchannelsBroken)) { |
| verify(subchannelPool, never()).takeOrCreateSubchannel( |
| eq(resolutionList.get(0)), any(Attributes.class)); |
| verify(subchannelPool, never()).takeOrCreateSubchannel( |
| eq(resolutionList.get(2)), any(Attributes.class)); |
| } |
| } |
| |
| private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists( |
| InOrder inOrder, List<EquivalentAddressGroup> addrs) { |
| return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); |
| } |
| |
| private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists( |
| InOrder inOrder, List<ServerEntry> servers) { |
| ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>(); |
| ArrayList<String> tokens = new ArrayList<>(); |
| for (ServerEntry server : servers) { |
| addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS)); |
| tokens.add(server.token); |
| } |
| return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens); |
| } |
| |
| private List<Subchannel> fallbackTestVerifyUseOfBackendLists( |
| InOrder inOrder, List<EquivalentAddressGroup> addrs, |
| @Nullable List<String> tokens) { |
| if (tokens != null) { |
| assertEquals(addrs.size(), tokens.size()); |
| } |
| for (EquivalentAddressGroup addr : addrs) { |
| inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class)); |
| } |
| RoundRobinPicker picker = (RoundRobinPicker) currentPicker; |
| assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); |
| assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); |
| assertEquals(addrs.size(), mockSubchannels.size()); |
| ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels); |
| mockSubchannels.clear(); |
| for (Subchannel subchannel : subchannels) { |
| deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); |
| } |
| inOrder.verify(helper, atLeast(0)) |
| .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); |
| inOrder.verify(helper, never()) |
| .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| |
| ArrayList<BackendEntry> pickList = new ArrayList<>(); |
| for (int i = 0; i < addrs.size(); i++) { |
| Subchannel subchannel = subchannels.get(i); |
| BackendEntry backend; |
| if (tokens == null) { |
| backend = new BackendEntry(subchannel); |
| } else { |
| backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i)); |
| } |
| pickList.add(backend); |
| deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); |
| inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| picker = (RoundRobinPicker) pickerCaptor.getValue(); |
| assertThat(picker.dropList) |
| .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); |
| assertThat(picker.pickList).containsExactlyElementsIn(pickList); |
| inOrder.verify(helper, never()) |
| .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| } |
| return subchannels; |
| } |
| |
| @Test |
| public void grpclbMultipleAuthorities() throws Exception { |
| List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList( |
| new EquivalentAddressGroup( |
| new FakeSocketAddress("fake-address-1"), |
| lbAttributes("fake-authority-1")), |
| new EquivalentAddressGroup( |
| new FakeSocketAddress("fake-address-2"), |
| lbAttributes("fake-authority-2")), |
| new EquivalentAddressGroup( |
| new FakeSocketAddress("not-a-lb-address")), |
| new EquivalentAddressGroup( |
| new FakeSocketAddress("fake-address-3"), |
| lbAttributes("fake-authority-1"))); |
| final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup( |
| Arrays.<SocketAddress>asList( |
| new FakeSocketAddress("fake-address-1"), |
| new FakeSocketAddress("fake-address-3")), |
| lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day |
| |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); |
| } |
| |
| @Test |
| public void grpclbBalancerStreamRetry() throws Exception { |
| LoadBalanceRequest expectedInitialRequest = |
| LoadBalanceRequest.newBuilder() |
| .setInitialRequest( |
| InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| .build(); |
| InOrder inOrder = |
| inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); |
| List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); |
| Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| |
| assertEquals(1, fakeOobChannels.size()); |
| @SuppressWarnings("unused") |
| ManagedChannel oobChannel = fakeOobChannels.poll(); |
| |
| // First balancer RPC |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); |
| assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Balancer closes it immediately (erroneously) |
| lbResponseObserver.onCompleted(); |
| // Will start backoff sequence 1 (10ns) |
| inOrder.verify(backoffPolicyProvider).get(); |
| inOrder.verify(backoffPolicy1).nextBackoffNanos(); |
| assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Fast-forward to a moment before the retry |
| fakeClock.forwardNanos(9); |
| verifyNoMoreInteractions(mockLbService); |
| // Then time for retry |
| fakeClock.forwardNanos(1); |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); |
| assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Balancer closes it with an error. |
| lbResponseObserver.onError(Status.UNAVAILABLE.asException()); |
| // Will continue the backoff sequence 1 (100ns) |
| verifyNoMoreInteractions(backoffPolicyProvider); |
| inOrder.verify(backoffPolicy1).nextBackoffNanos(); |
| assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Fast-forward to a moment before the retry |
| fakeClock.forwardNanos(100 - 1); |
| verifyNoMoreInteractions(mockLbService); |
| // Then time for retry |
| fakeClock.forwardNanos(1); |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); |
| assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Balancer sends initial response. |
| lbResponseObserver.onNext(buildInitialResponse()); |
| |
| // Then breaks the RPC |
| lbResponseObserver.onError(Status.UNAVAILABLE.asException()); |
| |
| // Will reset the retry sequence and retry immediately, because balancer has responded. |
| inOrder.verify(backoffPolicyProvider).get(); |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); |
| |
| // Fail the retry after spending 4ns |
| fakeClock.forwardNanos(4); |
| lbResponseObserver.onError(Status.UNAVAILABLE.asException()); |
| |
| // Will be on the first retry (10ns) of backoff sequence 2. |
| inOrder.verify(backoffPolicy2).nextBackoffNanos(); |
| assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Fast-forward to a moment before the retry, the time spent in the last try is deducted. |
| fakeClock.forwardNanos(10 - 4 - 1); |
| verifyNoMoreInteractions(mockLbService); |
| // Then time for retry |
| fakeClock.forwardNanos(1); |
| inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| assertEquals(1, lbRequestObservers.size()); |
| lbRequestObserver = lbRequestObservers.poll(); |
| verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); |
| assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); |
| |
| // Wrapping up |
| verify(backoffPolicyProvider, times(2)).get(); |
| verify(backoffPolicy1, times(2)).nextBackoffNanos(); |
| verify(backoffPolicy2, times(1)).nextBackoffNanos(); |
| } |
| |
| private void deliverSubchannelState( |
| final Subchannel subchannel, final ConnectivityStateInfo newState) { |
| channelExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| balancer.handleSubchannelState(subchannel, newState); |
| } |
| }); |
| } |
| |
| private void deliverNameResolutionError(final Status error) { |
| channelExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| balancer.handleNameResolutionError(error); |
| } |
| }); |
| } |
| |
| private void deliverResolvedAddresses( |
| final List<EquivalentAddressGroup> addrs, final Attributes attrs) { |
| channelExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| balancer.handleResolvedAddressGroups(addrs, attrs); |
| } |
| }); |
| } |
| |
| private GrpclbClientLoadRecorder getLoadRecorder() { |
| return balancer.getGrpclbState().getLoadRecorder(); |
| } |
| |
| private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) { |
| ArrayList<EquivalentAddressGroup> list = new ArrayList<>(); |
| for (int i = 0; i < isLb.length; i++) { |
| SocketAddress addr = new FakeSocketAddress("fake-address-" + i); |
| EquivalentAddressGroup eag = |
| new EquivalentAddressGroup( |
| addr, |
| isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY); |
| list.add(eag); |
| } |
| return list; |
| } |
| |
| private static String lbAuthority(int unused) { |
| // TODO(ejona): Support varying authorities |
| return "lb.google.com"; |
| } |
| |
| private static Attributes lbAttributes(String authority) { |
| return Attributes.newBuilder() |
| .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) |
| .build(); |
| } |
| |
| private static LoadBalanceResponse buildInitialResponse() { |
| return buildInitialResponse(0); |
| } |
| |
| private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) { |
| return LoadBalanceResponse.newBuilder() |
| .setInitialResponse( |
| InitialLoadBalanceResponse.newBuilder() |
| .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) |
| .build(); |
| } |
| |
| private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) { |
| ServerList.Builder serverListBuilder = ServerList.newBuilder(); |
| for (ServerEntry server : servers) { |
| if (server.addr != null) { |
| serverListBuilder.addServers(Server.newBuilder() |
| .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress())) |
| .setPort(server.addr.getPort()) |
| .setLoadBalanceToken(server.token) |
| .build()); |
| } else { |
| serverListBuilder.addServers(Server.newBuilder() |
| .setDrop(true) |
| .setLoadBalanceToken(server.token) |
| .build()); |
| } |
| } |
| return LoadBalanceResponse.newBuilder() |
| .setServerList(serverListBuilder.build()) |
| .build(); |
| } |
| |
| private static class ServerEntry { |
| final InetSocketAddress addr; |
| final String token; |
| |
| ServerEntry(String host, int port, String token) { |
| this.addr = new InetSocketAddress(host, port); |
| this.token = token; |
| } |
| |
| // Drop entry |
| ServerEntry(String token) { |
| this.addr = null; |
| this.token = token; |
| } |
| } |
| } |