blob: e768cf018e3d4a3afabb34ff36e8e089e93ce24d [file] [log] [blame]
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.grpclb;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY;
import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT;
import static io.grpc.grpclb.GrpclbState.NO_USE_AUTHORITY_SUFFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.grpclb.GrpclbState.BackendEntry;
import io.grpc.grpclb.GrpclbState.DropEntry;
import io.grpc.grpclb.GrpclbState.ErrorEntry;
import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry;
import io.grpc.grpclb.GrpclbState.Mode;
import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.lb.v1.ClientStats;
import io.grpc.lb.v1.ClientStatsPerToken;
import io.grpc.lb.v1.FallbackResponse;
import io.grpc.lb.v1.InitialLoadBalanceRequest;
import io.grpc.lb.v1.InitialLoadBalanceResponse;
import io.grpc.lb.v1.LoadBalanceRequest;
import io.grpc.lb.v1.LoadBalanceResponse;
import io.grpc.lb.v1.LoadBalancerGrpc;
import io.grpc.lb.v1.Server;
import io.grpc.lb.v1.ServerList;
import io.grpc.stub.StreamObserver;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.text.MessageFormat;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** Unit tests for {@link GrpclbLoadBalancer}. */
@RunWith(JUnit4.class)
public class GrpclbLoadBalancerTest {
private static final String SERVICE_AUTHORITY = "api.google.com";
// The tasks are wrapped by SynchronizationContext, so we can't compare the types
// directly.
private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(GrpclbState.LoadReportingTask.class.getSimpleName());
}
};
private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(GrpclbState.FallbackModeTask.class.getSimpleName());
}
};
private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(GrpclbState.LbRpcRetryTask.class.getSimpleName());
}
};
private static final Attributes LB_BACKEND_ATTRS =
Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build();
private final Helper helper = mock(Helper.class, delegatesTo(new FakeHelper()));
private final SubchannelPool subchannelPool =
mock(
SubchannelPool.class,
delegatesTo(new CachedSubchannelPool(helper)));
private final ArrayList<String> logs = new ArrayList<>();
private final ChannelLogger channelLogger = new ChannelLogger() {
@Override
public void log(ChannelLogLevel level, String msg) {
logs.add(level + ": " + msg);
}
@Override
public void log(ChannelLogLevel level, String template, Object... args) {
log(level, MessageFormat.format(template, args));
}
};
private SubchannelPicker currentPicker;
private LoadBalancerGrpc.LoadBalancerImplBase mockLbService;
@Captor
private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor;
private final FakeClock fakeClock = new FakeClock();
private final ArrayDeque<StreamObserver<LoadBalanceRequest>> lbRequestObservers =
new ArrayDeque<>();
private final ArrayDeque<Subchannel> mockSubchannels = new ArrayDeque<>();
private final ArrayDeque<ManagedChannel> fakeOobChannels = new ArrayDeque<>();
private final ArrayList<Subchannel> unpooledSubchannelTracker = new ArrayList<>();
private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
ClientStreamTracer.StreamInfo.newBuilder().build();
private io.grpc.Server fakeLbServer;
@Captor
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
private BackoffPolicy backoffPolicy1;
@Mock
private BackoffPolicy backoffPolicy2;
private GrpclbLoadBalancer balancer;
private final ArgumentCaptor<CreateSubchannelArgs> createSubchannelArgsCaptor =
ArgumentCaptor.forClass(CreateSubchannelArgs.class);
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo(
new LoadBalancerGrpc.LoadBalancerImplBase() {
@Override
@SuppressWarnings("unchecked")
public StreamObserver<LoadBalanceRequest> balanceLoad(
final StreamObserver<LoadBalanceResponse> responseObserver) {
StreamObserver<LoadBalanceRequest> requestObserver =
mock(StreamObserver.class);
Answer<Void> closeRpc = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
responseObserver.onCompleted();
return null;
}
};
doAnswer(closeRpc).when(requestObserver).onCompleted();
lbRequestObservers.add(requestObserver);
return requestObserver;
}
}));
fakeLbServer = InProcessServerBuilder.forName("fakeLb")
.directExecutor().addService(mockLbService).build().start();
when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
balancer = new GrpclbLoadBalancer(
helper,
subchannelPool,
fakeClock.getTimeProvider(),
fakeClock.getStopwatchSupplier().get(),
backoffPolicyProvider);
}
@After
public void tearDown() {
try {
if (balancer != null) {
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.shutdown();
}
});
}
for (ManagedChannel channel : oobChannelTracker) {
assertTrue(channel + " is shutdown", channel.isShutdown());
// balancer should have closed the LB stream, terminating the OOB channel.
assertTrue(channel + " is terminated", channel.isTerminated());
}
for (Subchannel subchannel : unpooledSubchannelTracker) {
verify(subchannel).shutdown();
}
// No timer should linger after shutdown
assertThat(fakeClock.getPendingTasks()).isEmpty();
} finally {
if (fakeLbServer != null) {
fakeLbServer.shutdownNow();
}
}
}
@Test
public void roundRobinPickerNoDrop() {
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
Subchannel subchannel = mock(Subchannel.class);
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
List<BackendEntry> pickList = Arrays.asList(b1, b2);
RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
Metadata headers1 = new Metadata();
// The existing token on the headers will be replaced
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
when(args1.getHeaders()).thenReturn(headers1);
assertSame(b1.result, picker.pickSubchannel(args1));
verify(args1).getHeaders();
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
Metadata headers2 = new Metadata();
when(args2.getHeaders()).thenReturn(headers2);
assertSame(b2.result, picker.pickSubchannel(args2));
verify(args2).getHeaders();
assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
Metadata headers3 = new Metadata();
when(args3.getHeaders()).thenReturn(headers3);
assertSame(b1.result, picker.pickSubchannel(args3));
verify(args3).getHeaders();
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
verify(subchannel, never()).getAttributes();
}
@Test
public void roundRobinPickerWithDrop() {
assertTrue(DROP_PICK_RESULT.isDrop());
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
Subchannel subchannel = mock(Subchannel.class);
// 1 out of 2 requests are to be dropped
DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
List<DropEntry> dropList = Arrays.asList(null, d);
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
List<BackendEntry> pickList = Arrays.asList(b1, b2);
RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList);
// dropList[0], pickList[0]
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
Metadata headers1 = new Metadata();
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
when(args1.getHeaders()).thenReturn(headers1);
assertSame(b1.result, picker.pickSubchannel(args1));
verify(args1).getHeaders();
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
// dropList[1]: drop
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
Metadata headers2 = new Metadata();
when(args2.getHeaders()).thenReturn(headers2);
assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2));
verify(args2, never()).getHeaders();
// dropList[0], pickList[1]
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
Metadata headers3 = new Metadata();
when(args3.getHeaders()).thenReturn(headers3);
assertSame(b2.result, picker.pickSubchannel(args3));
verify(args3).getHeaders();
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
// dropList[1]: drop
PickSubchannelArgs args4 = mock(PickSubchannelArgs.class);
Metadata headers4 = new Metadata();
when(args4.getHeaders()).thenReturn(headers4);
assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4));
verify(args4, never()).getHeaders();
// dropList[0], pickList[0]
PickSubchannelArgs args5 = mock(PickSubchannelArgs.class);
Metadata headers5 = new Metadata();
when(args5.getHeaders()).thenReturn(headers5);
assertSame(b1.result, picker.pickSubchannel(args5));
verify(args5).getHeaders();
assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
verify(subchannel, never()).getAttributes();
}
@Test
public void roundRobinPickerWithIdleEntry_noDrop() {
Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker =
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
}
@Test
public void roundRobinPickerWithIdleEntry_andDrop() {
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
// 1 out of 2 requests are to be dropped
DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
List<DropEntry> dropList = Arrays.asList(null, d);
Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(DROP_PICK_RESULT);
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
}
@Test
public void loadReporting() {
Metadata headers = new Metadata();
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
long loadReportIntervalMillis = 1983;
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
// Fallback timer is started as soon as address is resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
InOrder inOrder = inOrder(lbRequestObserver);
InOrder helperInOrder = inOrder(helper, subchannelPool);
inOrder.verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// Load reporting task is scheduled
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(0, fakeClock.runDueTasks());
List<ServerEntry> backends = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("token0001"), // drop
new ServerEntry("127.0.0.1", 2010, "token0002"),
new ServerEntry("token0003")); // drop
lbResponseObserver.onNext(buildLbResponse(backends));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
helperInOrder.verify(helper, atLeast(1))
.updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).containsExactly(
null,
new DropEntry(getLoadRecorder(), "token0001"),
null,
new DropEntry(getLoadRecorder(), "token0003")).inOrder();
assertThat(picker.pickList).containsExactly(
new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder();
// Report, no data
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder().build());
PickResult pick1 = picker.pickSubchannel(args);
assertSame(subchannel1, pick1.getSubchannel());
assertSame(getLoadRecorder(), pick1.getStreamTracerFactory());
// Merely the pick will not be recorded as upstart.
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder().build());
ClientStreamTracer tracer1 =
pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
PickResult pick2 = picker.pickSubchannel(args);
assertNull(pick2.getSubchannel());
assertSame(DROP_PICK_RESULT, pick2);
// Report includes upstart of pick1 and the drop of pick2
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder()
.setNumCallsStarted(2)
.setNumCallsFinished(1) // pick2
.addCallsFinishedWithDrop(
ClientStatsPerToken.newBuilder()
.setLoadBalanceToken("token0001")
.setNumCalls(1) // pick2
.build())
.build());
PickResult pick3 = picker.pickSubchannel(args);
assertSame(subchannel2, pick3.getSubchannel());
assertSame(getLoadRecorder(), pick3.getStreamTracerFactory());
ClientStreamTracer tracer3 =
pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
// pick3 has sent out headers
tracer3.outboundHeaders();
// 3rd report includes pick3's upstart
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder()
.setNumCallsStarted(1)
.build());
PickResult pick4 = picker.pickSubchannel(args);
assertNull(pick4.getSubchannel());
assertSame(DROP_PICK_RESULT, pick4);
// pick1 ended without sending anything
tracer1.streamClosed(Status.CANCELLED);
// 4th report includes end of pick1 and drop of pick4
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder()
.setNumCallsStarted(1) // pick4
.setNumCallsFinished(2)
.setNumCallsFinishedWithClientFailedToSend(1) // pick1
.addCallsFinishedWithDrop(
ClientStatsPerToken.newBuilder()
.setLoadBalanceToken("token0003")
.setNumCalls(1) // pick4
.build())
.build());
PickResult pick5 = picker.pickSubchannel(args);
assertSame(subchannel1, pick1.getSubchannel());
assertSame(getLoadRecorder(), pick5.getStreamTracerFactory());
ClientStreamTracer tracer5 =
pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
// pick3 ended without receiving response headers
tracer3.streamClosed(Status.DEADLINE_EXCEEDED);
// pick5 sent and received headers
tracer5.outboundHeaders();
tracer5.inboundHeaders();
// 5th report includes pick3's end and pick5's upstart
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder()
.setNumCallsStarted(1) // pick5
.setNumCallsFinished(1) // pick3
.build());
// pick5 ends
tracer5.streamClosed(Status.OK);
// 6th report includes pick5's end
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder()
.setNumCallsFinished(1)
.setNumCallsFinishedKnownReceived(1)
.build());
assertEquals(1, fakeClock.numPendingTasks());
// Balancer closes the stream, scheduled reporting task cancelled
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
assertEquals(0, fakeClock.numPendingTasks());
// New stream created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
inOrder = inOrder(lbRequestObserver);
inOrder.verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Load reporting is also requested
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// No picker created because balancer is still using the results from the last stream
helperInOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
// Make a new pick on that picker. It will not show up on the report of the new stream, because
// that picker is associated with the previous stream.
PickResult pick6 = picker.pickSubchannel(args);
assertNull(pick6.getSubchannel());
assertSame(DROP_PICK_RESULT, pick6);
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder().build());
// New stream got the list update
lbResponseObserver.onNext(buildLbResponse(backends));
// Same backends, thus no new subchannels
helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
any(EquivalentAddressGroup.class), any(Attributes.class));
// But the new RoundRobinEntries have a new loadRecorder, thus considered different from
// the previous list, thus a new picker is created
helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
picker = (RoundRobinPicker) pickerCaptor.getValue();
PickResult pick1p = picker.pickSubchannel(args);
assertSame(subchannel1, pick1p.getSubchannel());
assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory());
pick1p.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
// The pick from the new stream will be included in the report
assertNextReport(
inOrder, lbRequestObserver, loadReportIntervalMillis,
ClientStats.newBuilder()
.setNumCallsStarted(1)
.build());
verify(args, atLeast(0)).getHeaders();
verifyNoMoreInteractions(args);
}
@Test
public void abundantInitialResponse() {
Metadata headers = new Metadata();
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate LB initial response
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(1983));
// Load reporting task is scheduled
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
logs.clear();
// Simulate an abundant LB initial response, with a different report interval
lbResponseObserver.onNext(buildInitialResponse(9097));
// This incident is logged
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildInitialResponse(9097),
"WARNING: [grpclb-<api.google.com>] "
+ "Ignoring unexpected response type: INITIAL_RESPONSE")
.inOrder();
// It doesn't affect load-reporting at all
assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
.containsExactly(scheduledTask);
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
}
@SuppressWarnings("unchecked")
@Test
public void raceBetweenHandleAddressesAndLbStreamClosure() {
InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
createResolvedBalancerAddresses(1));
assertEquals(1, fakeOobChannels.size());
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Close lbStream
lbResponseObserver.onCompleted();
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy1).nextBackoffNanos();
// Retry task scheduled
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
FakeClock.ScheduledTask retryTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LB_RPC_RETRY_TASK_FILTER));
assertEquals(10L, retryTask.getDelay(TimeUnit.NANOSECONDS));
// Receive the same Lb address again
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
createResolvedBalancerAddresses(1));
// Retry task cancelled
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
// Reuse the existing OOB channel
assertEquals(1, fakeOobChannels.size());
// Start a new LoadBalance RPC
inOrder.verify(mockLbService).balanceLoad(any(StreamObserver.class));
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate a race condition where the task has just started when it's cancelled
retryTask.command.run();
inOrder.verifyNoMoreInteractions();
}
@Test
public void raceBetweenLoadReportingAndLbStreamClosure() {
Metadata headers = new Metadata();
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
InOrder inOrder = inOrder(lbRequestObserver);
inOrder.verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(1983));
// Load reporting task is scheduled
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
// Close lbStream
lbResponseObserver.onCompleted();
// Reporting task cancelled
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
// Simulate a race condition where the task has just started when its cancelled
scheduledTask.command.run();
// No report sent. No new task scheduled
inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class));
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
}
private void assertNextReport(
InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver,
long loadReportIntervalMillis, ClientStats expectedReport) {
assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS));
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS));
assertEquals(1, fakeClock.numPendingTasks());
inOrder.verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder()
.setClientStats(
ClientStats.newBuilder(expectedReport)
.setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read()))
.build())
.build()));
}
@Test
public void receiveNoBackendAndBalancerAddress() {
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
Collections.<EquivalentAddressGroup>emptyList());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found");
}
@Test
public void nameResolutionFailsThenRecover() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Created",
"DEBUG: [grpclb-<api.google.com>] Error: " + error,
"INFO: [grpclb-<api.google.com>] Update balancing state to TRANSIENT_FAILURE: picks="
+ "[Status{code=NOT_FOUND, description=www.google.com not found, cause=null}],"
+ " drops=[]")
.inOrder();
logs.clear();
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
// Recover with a subsequent success
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
}
@Test
public void grpclbThenNameResolutionFails() {
InOrder inOrder = inOrder(helper, subchannelPool);
// Go to GRPCLB first
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Let name resolution fail before round-robin list is ready
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
assertFalse(oobChannel.isShutdown());
// Simulate receiving LB response
List<ServerEntry> backends = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "TOKEN1"),
new ServerEntry("127.0.0.1", 2010, "TOKEN2"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
}
@Test
public void grpclbUpdatedAddresses_avoidsReconnect() {
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList2 = createResolvedBalancerAddresses(2);
deliverResolvedAddresses(backendList2, grpclbBalancerList2);
verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2)));
assertEquals(1, lbRequestObservers.size()); // No additional RPC
}
@Test
public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
final String newAuthority = "some-new-authority";
List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList2 =
Collections.singletonList(
new EquivalentAddressGroup(
new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority)));
deliverResolvedAddresses(backendList2, grpclbBalancerList2);
verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2)));
assertEquals(1, lbRequestObservers.size()); // No additional RPC
}
@Test
public void grpclbWorking() {
InOrder inOrder = inOrder(helper, subchannelPool);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
logs.clear();
lbResponseObserver.onNext(buildInitialResponse());
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildInitialResponse());
logs.clear();
lbResponseObserver.onNext(buildLbResponse(backends1));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannel1).requestConnection();
verify(subchannel2).requestConnection();
assertEquals(
new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS),
subchannel1.getAddresses());
assertEquals(
new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS),
subchannel2.getAddresses());
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
inOrder.verifyNoMoreInteractions();
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends1),
"INFO: [grpclb-<api.google.com>] Using RR list="
+ "[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}](token0001),"
+ " [[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}](token0002)],"
+ " drop=[null, null]",
"INFO: [grpclb-<api.google.com>]"
+ " Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]")
.inOrder();
logs.clear();
// Let subchannels be connected
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker1.dropList).containsExactly(null, null);
assertThat(picker1.pickList).containsExactly(
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0001)],"
+ " [[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
assertThat(picker2.pickList).containsExactly(
new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
.inOrder();
// Disconnected subchannels
verify(subchannel1).requestConnection();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel1, times(2)).requestConnection();
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.dropList).containsExactly(null, null);
assertThat(picker3.pickList).containsExactly(
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verifyNoMoreInteractions();
// As long as there is at least one READY subchannel, round robin will work.
ConnectivityStateInfo errorState1 =
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1"));
deliverSubchannelState(subchannel1, errorState1);
inOrder.verifyNoMoreInteractions();
// If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
verify(subchannel2).requestConnection();
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel2, times(2)).requestConnection();
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] "
+ "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]");
logs.clear();
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.dropList).containsExactly(null, null);
assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY);
// Update backends, with a drop entry
List<ServerEntry> backends2 =
Arrays.asList(
new ServerEntry("127.0.0.1", 2030, "token0003"), // New address
new ServerEntry("token0003"), // drop
new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed
new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time
new ServerEntry("token0006")); // drop
verify(subchannelPool, never())
.returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class));
lbResponseObserver.onNext(buildLbResponse(backends2));
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends2),
"INFO: [grpclb-<api.google.com>] Using RR list="
+ "[[[/127.0.0.1:2030]/{io.grpc.grpclb.lbProvidedBackend=true}](token0003),"
+ " [[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}](token0004),"
+ " [[/127.0.0.1:2030]/{io.grpc.grpclb.lbProvidedBackend=true}](token0005)],"
+ " drop=[null, drop(token0003), null, null, drop(token0006)]",
"INFO: [grpclb-<api.google.com>]"
+ " Update balancing state to CONNECTING: picks=[BUFFER_ENTRY],"
+ " drops=[null, drop(token0003), null, null, drop(token0006)]")
.inOrder();
logs.clear();
// not in backends2, closed
verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1));
// backends2[2], will be kept
verify(subchannelPool, never())
.returnSubchannel(same(subchannel2), any(ConnectivityStateInfo.class));
inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
ConnectivityStateInfo errorOnCachedSubchannel1 =
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("You can get this error even if you are cached"));
deliverSubchannelState(subchannel1, errorOnCachedSubchannel1);
assertEquals(1, mockSubchannels.size());
Subchannel subchannel3 = mockSubchannels.poll();
verify(subchannel3).requestConnection();
assertEquals(
new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS),
subchannel3.getAddresses());
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker7.dropList).containsExactly(
null,
new DropEntry(getLoadRecorder(), "token0003"),
null,
null,
new DropEntry(getLoadRecorder(), "token0006")).inOrder();
assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY);
// State updates on obsolete subchannel1 will only be passed to the pool
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState(
subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker8.dropList).containsExactly(
null,
new DropEntry(getLoadRecorder(), "token0003"),
null,
null,
new DropEntry(getLoadRecorder(), "token0006")).inOrder();
// subchannel2 is still IDLE, thus not in the active list
assertThat(picker8.pickList).containsExactly(
new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
// subchannel2 becomes READY and makes it into the list
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker9.dropList).containsExactly(
null,
new DropEntry(getLoadRecorder(), "token0003"),
null,
null,
new DropEntry(getLoadRecorder(), "token0006")).inOrder();
assertThat(picker9.pickList).containsExactly(
new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
new BackendEntry(subchannel2, getLoadRecorder(), "token0004"),
new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
verify(subchannelPool, never())
.returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class));
// Update backends, with no entry
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
verify(subchannelPool)
.returnSubchannel(same(subchannel3), eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker10.dropList).isEmpty();
assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY);
assertFalse(oobChannel.isShutdown());
assertEquals(0, lbRequestObservers.size());
verify(lbRequestObserver, never()).onCompleted();
verify(lbRequestObserver, never()).onError(any(Throwable.class));
// Load reporting was not requested, thus never scheduled
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
verify(subchannelPool, never()).clear();
balancer.shutdown();
verify(subchannelPool).clear();
}
@Test
public void roundRobinMode_subchannelStayTransientFailureUntilReady() {
InOrder inOrder = inOrder(helper);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
// Switch subchannel1 to TRANSIENT_FAILURE, making the general state TRANSIENT_FAILURE too.
Status error = Status.UNAVAILABLE.withDescription("error1");
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList)
.containsExactly(new ErrorEntry(error));
// Switch subchannel1 to IDLE, then to CONNECTING, which are ignored since the previous
// subchannel state is TRANSIENT_FAILURE. General state is unchanged.
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verifyNoMoreInteractions();
// Switch subchannel1 to READY, which will affect the general state
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList)
.containsExactly(new BackendEntry(subchannel1, getLoadRecorder(), "token0001"));
inOrder.verifyNoMoreInteractions();
}
@Test
public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() {
subtestGrpclbFallbackInitialTimeout(false);
}
@Test
public void grpclbFallback_initialTimeout_timerExpires() {
subtestGrpclbFallbackInitialTimeout(true);
}
// Fallback or not within the period of the initial timeout.
private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, subchannelPool);
// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// We don't care if these methods have been run.
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
//////////////////////////////////
// Fallback timer expires (or not)
//////////////////////////////////
if (timerExpires) {
logs.clear();
fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
List<EquivalentAddressGroup> fallbackList =
Arrays.asList(backendList.get(0), backendList.get(1));
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Using fallback backends",
"INFO: [grpclb-<api.google.com>]"
+ " Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], "
+ "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]",
"INFO: [grpclb-<api.google.com>] "
+ "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], "
+ "drops=[null, null]")
.inOrder();
// Fall back to the backends from resolver
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackList);
assertFalse(oobChannel.isShutdown());
verify(lbRequestObserver, never()).onCompleted();
}
//////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
grpclbBalancerList = createResolvedBalancerAddresses(2);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),grpclbBalancerList);
// New addresses are updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
same(oobChannel), eq(xattr(grpclbBalancerList)));
if (timerExpires) {
// Still in fallback logic, except that the backend list is empty
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Collections.<EquivalentAddressGroup>emptyList());
}
////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results with backend addrs
////////////////////////////////////////////////////////////////
// prevents the cached subchannel to be used
subchannelPool.clear();
backendList = createResolvedBackendAddresses(2);
grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
// New LB address is updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
same(oobChannel), eq(xattr(grpclbBalancerList)));
if (timerExpires) {
// New backend addresses are used for fallback
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
}
////////////////////////////////////////////////
// Break the LB stream after the timer expires
////////////////////////////////////////////////
if (timerExpires) {
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());
// The error will NOT propagate to picker because fallback list is in use.
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
// A new stream is created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
}
/////////////////////////////////
// Balancer returns a server list
/////////////////////////////////
List<ServerEntry> serverList = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList));
// Balancer-provided server list now in effect
fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
///////////////////////////////////////////////////////////////
// New backend addresses from resolver outside of fallback mode
///////////////////////////////////////////////////////////////
backendList = createResolvedBackendAddresses(1);
grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
// Will not affect the round robin list at all
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
// No fallback timeout timer scheduled.
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
}
@Test
public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, subchannelPool);
// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
// Attempted to connect to balancer
assertThat(fakeOobChannels).hasSize(1);
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertThat(lbRequestObservers).hasSize(1);
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// We don't care if these methods have been run.
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
/////////////////////////////////////////////
// Break the LB stream before timer expires
/////////////////////////////////////////////
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());
// Fall back to the backends from resolver
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
// A new stream is created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
assertThat(lbRequestObservers).hasSize(1);
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
}
@Test
public void grpclbFallback_noBalancerAddress() {
InOrder inOrder = inOrder(helper, subchannelPool);
// Create just backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendList, Collections.<EquivalentAddressGroup>emptyList());
assertThat(logs).containsAtLeast(
"INFO: [grpclb-<api.google.com>] Using fallback backends",
"INFO: [grpclb-<api.google.com>] "
+ "Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], "
+ "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]",
"INFO: [grpclb-<api.google.com>] "
+ "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]")
.inOrder();
// Fall back to the backends from resolver
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
// No fallback timeout timer scheduled.
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper, never())
.createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), anyString());
}
@Test
public void grpclbFallback_balancerLost() {
subtestGrpclbFallbackConnectionLost(true, false);
}
@Test
public void grpclbFallback_subchannelsLost() {
subtestGrpclbFallbackConnectionLost(false, true);
}
@Test
public void grpclbFallback_allLost() {
subtestGrpclbFallbackConnectionLost(true, true);
}
// Fallback outside of the initial timeout, where all connections are lost.
private void subtestGrpclbFallbackConnectionLost(
boolean balancerBroken, boolean allSubchannelsBroken) {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);
// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList);
inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
fakeOobChannels.poll();
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// We don't care if these methods have been run.
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
inOrder.verifyNoMoreInteractions();
// Balancer returns a server list
List<ServerEntry> serverList = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList));
List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
// Break connections
if (balancerBroken) {
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
// A new stream to LB is created
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
}
if (allSubchannelsBroken) {
for (Subchannel subchannel : subchannels) {
// A READY subchannel transits to IDLE when receiving a go-away
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
}
}
if (balancerBroken && allSubchannelsBroken) {
// Going into fallback
subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
// When in fallback mode, fallback timer should not be scheduled when all backend
// connections are lost
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
}
// Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
List<ServerEntry> serverList2 = Arrays.asList(
new ServerEntry("127.0.0.1", 2001, "token0003"),
new ServerEntry("127.0.0.1", 2011, "token0004"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList2));
fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2);
}
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
if (!(balancerBroken && allSubchannelsBroken)) {
verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(backendList.get(0)), any(Attributes.class));
verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(backendList.get(1)), any(Attributes.class));
}
}
private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
InOrder inOrder, List<EquivalentAddressGroup> addrs) {
return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null);
}
private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists(
InOrder inOrder, List<ServerEntry> servers) {
ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>();
ArrayList<String> tokens = new ArrayList<>();
for (ServerEntry server : servers) {
addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS));
tokens.add(server.token);
}
return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens);
}
private List<Subchannel> fallbackTestVerifyUseOfBackendLists(
InOrder inOrder, List<EquivalentAddressGroup> addrs,
@Nullable List<String> tokens) {
if (tokens != null) {
assertEquals(addrs.size(), tokens.size());
}
for (EquivalentAddressGroup addr : addrs) {
inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class));
}
RoundRobinPicker picker = (RoundRobinPicker) currentPicker;
assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY);
assertEquals(addrs.size(), mockSubchannels.size());
ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels);
mockSubchannels.clear();
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
}
inOrder.verify(helper, atLeast(0))
.updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
ArrayList<BackendEntry> pickList = new ArrayList<>();
for (int i = 0; i < addrs.size(); i++) {
Subchannel subchannel = subchannels.get(i);
BackendEntry backend;
if (tokens == null) {
backend = new BackendEntry(subchannel);
} else {
backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i));
}
pickList.add(backend);
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList)
.containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
assertThat(picker.pickList).containsExactlyElementsIn(pickList);
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
}
return subchannels;
}
@Test
public void grpclbMultipleAuthorities() throws Exception {
List<EquivalentAddressGroup> backendList = Collections.singletonList(
new EquivalentAddressGroup(new FakeSocketAddress("not-a-lb-address")));
List<EquivalentAddressGroup> grpclbBalancerList = Arrays.asList(
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-1"),
lbAttributes("fake-authority-1")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-2"),
lbAttributes("fake-authority-2")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-3"),
lbAttributes("fake-authority-1").toBuilder()
.set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build()
));
deliverResolvedAddresses(backendList, grpclbBalancerList);
List<EquivalentAddressGroup> goldenOobEagList =
Arrays.asList(
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-1"),
Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1")
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1")
.build()),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-2"),
Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-2")
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-2")
.build()),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-3"),
Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1")
.set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value")
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1")
.build()
));
verify(helper).createOobChannel(eq(goldenOobEagList),
eq("fake-authority-1" + NO_USE_AUTHORITY_SUFFIX));
}
@Test
public void grpclbBalancerStreamClosedAndRetried() throws Exception {
LoadBalanceRequest expectedInitialRequest =
LoadBalanceRequest.newBuilder()
.setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build();
InOrder inOrder =
inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
assertEquals(1, fakeOobChannels.size());
@SuppressWarnings("unused")
ManagedChannel oobChannel = fakeOobChannels.poll();
// First balancer RPC
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
// Balancer closes it immediately (erroneously)
lbResponseObserver.onCompleted();
// Will start backoff sequence 1 (10ns)
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
inOrder.verify(helper).refreshNameResolution();
// Fast-forward to a moment before the retry
fakeClock.forwardNanos(9);
verifyNoMoreInteractions(mockLbService);
// Then time for retry
fakeClock.forwardNanos(1);
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
// Balancer closes it with an error.
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
// Will continue the backoff sequence 1 (100ns)
verifyNoMoreInteractions(backoffPolicyProvider);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
inOrder.verify(helper).refreshNameResolution();
// Fast-forward to a moment before the retry
fakeClock.forwardNanos(100 - 1);
verifyNoMoreInteractions(mockLbService);
// Then time for retry
fakeClock.forwardNanos(1);
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
// Balancer sends initial response.
lbResponseObserver.onNext(buildInitialResponse());
// Then breaks the RPC
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
// Will reset the retry sequence and retry immediately, because balancer has responded.
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
inOrder.verify(helper).refreshNameResolution();
// Fail the retry after spending 4ns
fakeClock.forwardNanos(4);
lbResponseObserver.onError(Status.UNAVAILABLE.asException());
// Will be on the first retry (10ns) of backoff sequence 2.
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
inOrder.verify(helper).refreshNameResolution();
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
fakeClock.forwardNanos(10 - 4 - 1);
verifyNoMoreInteractions(mockLbService);
// Then time for retry
fakeClock.forwardNanos(1);
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
// Wrapping up
verify(backoffPolicyProvider, times(2)).get();
verify(backoffPolicy1, times(2)).nextBackoffNanos();
verify(backoffPolicy2, times(1)).nextBackoffNanos();
verify(helper, times(4)).refreshNameResolution();
}
@Test
public void grpclbWorking_pickFirstMode() throws Exception {
InOrder inOrder = inOrder(helper);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.PICK_FIRST));
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses())
.containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
// Initially IDLE
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
// Only one subchannel is created
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// PICK_FIRST doesn't eagerly connect
verify(subchannel, never()).requestConnection();
// CONNECTING
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker1.dropList).containsExactly(null, null);
assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY);
// TRANSIENT_FAILURE
Status error = Status.UNAVAILABLE.withDescription("Simulated connection error");
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
assertThat(picker2.pickList).containsExactly(new ErrorEntry(error));
// READY
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.dropList).containsExactly(null, null);
assertThat(picker3.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
// New server list with drops
List<ServerEntry> backends2 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("token0003"), // drop
new ServerEntry("127.0.0.1", 2020, "token0004"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildLbResponse(backends2));
// new addresses will be updated to the existing subchannel
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class));
assertThat(mockSubchannels).isEmpty();
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends2.get(2).addr,
eagAttrsWithToken("token0004")))));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.dropList).containsExactly(
null, new DropEntry(getLoadRecorder(), "token0003"), null);
assertThat(picker4.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
// Subchannel goes IDLE, but PICK_FIRST will not try to reconnect
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue();
verify(subchannel, never()).requestConnection();
// ... until it's selected
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
PickResult pick = picker5.pickSubchannel(args);
assertThat(pick).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
// ... or requested by application
balancer.requestConnection();
verify(subchannel, times(2)).requestConnection();
// PICK_FIRST doesn't use subchannelPool
verify(subchannelPool, never())
.takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
}
@Test
public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception {
InOrder inOrder = inOrder(helper);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.PICK_FIRST));
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses())
.containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
// Initially IDLE
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
// Only one subchannel is created
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// PICK_FIRST doesn't eagerly connect
verify(subchannel, never()).requestConnection();
// CONNECTING
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker1.dropList).containsExactly(null, null);
assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY);
// TRANSIENT_FAILURE
Status error = Status.UNAVAILABLE.withDescription("Simulated connection error");
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
assertThat(picker2.pickList).containsExactly(new ErrorEntry(error));
// READY
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.dropList).containsExactly(null, null);
assertThat(picker3.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
// Empty addresses from LB
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
// new addresses will be updated to the existing subchannel
// createSubchannel() has ever been called only once
inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class));
assertThat(mockSubchannels).isEmpty();
verify(subchannel).shutdown();
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(errorPicker.pickList)
.containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS));
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
// Test recover from new LB response with addresses
// New server list with drops
List<ServerEntry> backends2 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("token0003"), // drop
new ServerEntry("127.0.0.1", 2020, "token0004"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildLbResponse(backends2));
// new addresses will be updated to the existing subchannel
inOrder.verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class));
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
subchannel = mockSubchannels.poll();
// Subchannel became READY
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
}
@Test
public void shutdownWithoutSubchannel_roundRobin() throws Exception {
subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.ROUND_ROBIN));
}
@Test
public void shutdownWithoutSubchannel_pickFirst() throws Exception {
subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.PICK_FIRST));
}
private void subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig) {
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
grpclbConfig);
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> requestObserver = lbRequestObservers.poll();
verify(requestObserver, never()).onCompleted();
balancer.shutdown();
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(requestObserver).onError(throwableCaptor.capture());
assertThat(Status.fromThrowable(throwableCaptor.getValue()).getCode())
.isEqualTo(Code.CANCELLED);
}
@Test
public void pickFirstMode_fallback() throws Exception {
InOrder inOrder = inOrder(helper);
// Name resolver returns balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
backendList,
grpclbBalancerList,
GrpclbConfig.create(Mode.PICK_FIRST));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
// Fallback timer expires with no response
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// Entering fallback mode
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses())
.containsExactly(backendList.get(0), backendList.get(1));
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
// Initially IDLE
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
// READY
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker1.dropList).containsExactly(null, null);
assertThat(picker1.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(null)));
assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
// Finally, an LB response, which brings us out of fallback
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// new addresses will be updated to the existing subchannel
// createSubchannel() has ever been called only once
inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class));
assertThat(mockSubchannels).isEmpty();
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr,
eagAttrsWithToken("token0002")))));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
assertThat(picker2.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
// PICK_FIRST doesn't use subchannelPool
verify(subchannelPool, never())
.takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
}
@Test
public void switchMode() throws Exception {
InOrder inOrder = inOrder(helper);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.ROUND_ROBIN));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// ROUND_ROBIN: create one subchannel per server
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
// Switch to PICK_FIRST
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST));
// GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue();
verify(subchannelPool)
.returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
// A new LB stream is created
assertEquals(1, fakeOobChannels.size());
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// PICK_FIRST Subchannel
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses())
.containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
}
private static Attributes eagAttrsWithToken(String token) {
return LB_BACKEND_ATTRS.toBuilder().set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, token).build();
}
@Test
public void switchMode_nullLbPolicy() throws Exception {
InOrder inOrder = inOrder(helper);
final List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList);
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// ROUND_ROBIN: create one subchannel per server
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
// Switch to PICK_FIRST
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.PICK_FIRST));
// GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue();
verify(subchannelPool)
.returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
// A new LB stream is created
assertEquals(1, fakeOobChannels.size());
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// PICK_FIRST Subchannel
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses())
.containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
}
@Test
public void switchServiceName() throws Exception {
InOrder inOrder = inOrder(helper);
String serviceName = "foo.google.com";
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(serviceName).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
// ROUND_ROBIN: create one subchannel per server
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
// Switch to different serviceName
serviceName = "bar.google.com";
List<EquivalentAddressGroup> newGrpclbResolutionList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
newGrpclbResolutionList,
GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName));
// GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue();
verify(subchannelPool)
.returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
assertEquals(1, fakeOobChannels.size());
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(serviceName).build())
.build()));
}
@Test
public void grpclbWorking_lbSendsFallbackMessage() {
InOrder inOrder = inOrder(helper, subchannelPool);
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
deliverResolvedAddresses(backendList, grpclbBalancerList);
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder()
.setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001");
ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002");
List<ServerEntry> backends1 = Arrays.asList(backend1a, backend1b);
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
logs.clear();
lbResponseObserver.onNext(buildInitialResponse());
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildInitialResponse());
logs.clear();
lbResponseObserver.onNext(buildLbResponse(backends1));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannel1).requestConnection();
verify(subchannel2).requestConnection();
assertEquals(
new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS),
subchannel1.getAddresses());
assertEquals(
new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS),
subchannel2.getAddresses());
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
inOrder.verifyNoMoreInteractions();
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends1),
"INFO: [grpclb-<api.google.com>] Using RR list="
+ "[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}](token0001),"
+ " [[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}](token0002)],"
+ " drop=[null, null]",
"INFO: [grpclb-<api.google.com>] "
+ "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]")
.inOrder();
logs.clear();
// Let subchannels be connected
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker1.dropList).containsExactly(null, null);
assertThat(picker1.pickList).containsExactly(
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0001)],"
+ " [[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
assertThat(picker2.pickList).containsExactly(
new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
.inOrder();
// enter fallback mode
lbResponseObserver.onNext(buildLbFallbackResponse());
// existing subchannels must be returned immediately to gracefully shutdown.
verify(subchannelPool)
.returnSubchannel(eq(subchannel1), eq(ConnectivityStateInfo.forNonError(READY)));
verify(subchannelPool)
.returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
// verify fallback
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
assertFalse(oobChannel.isShutdown());
verify(lbRequestObserver, never()).onCompleted();
// exit fall back by providing two new backends
ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001");
ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002");
List<ServerEntry> backends2 = Arrays.asList(backend2a, backend2b);
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
logs.clear();
lbResponseObserver.onNext(buildLbResponse(backends2));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel3 = mockSubchannels.poll();
Subchannel subchannel4 = mockSubchannels.poll();
verify(subchannel3).requestConnection();
verify(subchannel4).requestConnection();
assertEquals(
new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS),
subchannel3.getAddresses());
assertEquals(
new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS),
subchannel4.getAddresses());
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker6.dropList).containsExactly(null, null);
assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY);
inOrder.verifyNoMoreInteractions();
assertThat(logs).containsExactly(
"DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends2),
"INFO: [grpclb-<api.google.com>] Using RR list="
+ "[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}](token1001),"
+ " [[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}](token1002)],"
+ " drop=[null, null]",
"INFO: [grpclb-<api.google.com>] "
+ "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]")
.inOrder();
logs.clear();
// Let new subchannels be connected
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.dropList).containsExactly(null, null);
assertThat(picker3.pickList).containsExactly(
new BackendEntry(subchannel3, getLoadRecorder(), "token1001"));
deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
+ "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)],"
+ " [[[[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1002)]],"
+ " drops=[null, null]");
logs.clear();
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.dropList).containsExactly(null, null);
assertThat(picker4.pickList).containsExactly(
new BackendEntry(subchannel3, getLoadRecorder(), "token1001"),
new BackendEntry(subchannel4, getLoadRecorder(), "token1002"))
.inOrder();
}
private void deliverSubchannelState(
final Subchannel subchannel, final ConnectivityStateInfo newState) {
((FakeSubchannel) subchannel).updateState(newState);
}
private void deliverNameResolutionError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleNameResolutionError(error);
}
});
}
private void deliverResolvedAddresses(
final List<EquivalentAddressGroup> backendAddrs,
List<EquivalentAddressGroup> balancerAddrs) {
deliverResolvedAddresses(backendAddrs, balancerAddrs, GrpclbConfig.create(Mode.ROUND_ROBIN));
}
private void deliverResolvedAddresses(
final List<EquivalentAddressGroup> backendAddrs,
List<EquivalentAddressGroup> balancerAddrs,
final GrpclbConfig grpclbConfig) {
final Attributes attrs =
Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build();
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(backendAddrs)
.setAttributes(attrs)
.setLoadBalancingPolicyConfig(grpclbConfig)
.build());
}
});
}
private GrpclbClientLoadRecorder getLoadRecorder() {
return balancer.getGrpclbState().getLoadRecorder();
}
private static List<EquivalentAddressGroup> createResolvedBackendAddresses(int n) {
List<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < n; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
list.add(new EquivalentAddressGroup(addr));
}
return list;
}
private static List<EquivalentAddressGroup> createResolvedBalancerAddresses(int n) {
List<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < n; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
list.add(new EquivalentAddressGroup(addr, lbAttributes(lbAuthority(i))));
}
return list;
}
private static String lbAuthority(int unused) {
// TODO(ejona): Support varying authorities
return "lb.google.com";
}
private static Attributes lbAttributes(String authority) {
return Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority)
.build();
}
private static LoadBalanceResponse buildInitialResponse() {
return buildInitialResponse(0);
}
private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) {
return LoadBalanceResponse.newBuilder()
.setInitialResponse(
InitialLoadBalanceResponse.newBuilder()
.setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
.build();
}
private static LoadBalanceResponse buildLbFallbackResponse() {
return LoadBalanceResponse.newBuilder()
.setFallbackResponse(FallbackResponse.newBuilder().build())
.build();
}
private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) {
ServerList.Builder serverListBuilder = ServerList.newBuilder();
for (ServerEntry server : servers) {
if (server.addr != null) {
serverListBuilder.addServers(Server.newBuilder()
.setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress()))
.setPort(server.addr.getPort())
.setLoadBalanceToken(server.token)
.build());
} else {
serverListBuilder.addServers(Server.newBuilder()
.setDrop(true)
.setLoadBalanceToken(server.token)
.build());
}
}
return LoadBalanceResponse.newBuilder()
.setServerList(serverListBuilder.build())
.build();
}
private List<EquivalentAddressGroup> xattr(List<EquivalentAddressGroup> lbAddr) {
List<EquivalentAddressGroup> oobAddr = new ArrayList<>(lbAddr.size());
for (EquivalentAddressGroup lb : lbAddr) {
String authority = lb.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
Attributes attrs = lb.getAttributes().toBuilder()
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, authority)
.build();
oobAddr.add(new EquivalentAddressGroup(lb.getAddresses(), attrs));
}
return oobAddr;
}
private static class ServerEntry {
final InetSocketAddress addr;
final String token;
ServerEntry(String host, int port, String token) {
this.addr = new InetSocketAddress(host, port);
this.token = token;
}
// Drop entry
ServerEntry(String token) {
this.addr = null;
this.token = token;
}
}
private static class FakeSubchannel extends Subchannel {
private final Attributes attributes;
private List<EquivalentAddressGroup> eags;
private SubchannelStateListener listener;
public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) {
this.eags = Collections.unmodifiableList(eags);
this.attributes = attributes;
}
@Override
public List<EquivalentAddressGroup> getAllAddresses() {
return eags;
}
@Override
public Attributes getAttributes() {
return attributes;
}
@Override
public void start(SubchannelStateListener listener) {
this.listener = checkNotNull(listener, "listener");
}
@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
this.eags = Collections.unmodifiableList(addrs);
}
@Override
public void shutdown() {
}
@Override
public void requestConnection() {
}
public void updateState(ConnectivityStateInfo newState) {
listener.onSubchannelState(newState);
}
}
private class FakeHelper extends Helper {
@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
@Override
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority) {
ManagedChannel channel =
InProcessChannelBuilder
.forName("fakeLb")
.directExecutor()
.overrideAuthority(authority)
.build();
fakeOobChannels.add(channel);
oobChannelTracker.add(channel);
return channel;
}
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
return createOobChannel(Collections.singletonList(eag), authority);
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
FakeSubchannel subchannel =
mock(
FakeSubchannel.class,
AdditionalAnswers
.delegatesTo(new FakeSubchannel(args.getAddresses(), args.getAttributes())));
mockSubchannels.add(subchannel);
unpooledSubchannelTracker.add(subchannel);
return subchannel;
}
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return fakeClock.getScheduledExecutorService();
}
@Override
public ChannelLogger getChannelLogger() {
return channelLogger;
}
@Override
public void updateBalancingState(
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
currentPicker = newPicker;
}
@Override
public void refreshNameResolution() {
}
@Override
public String getAuthority() {
return SERVICE_AUTHORITY;
}
@Override
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
}
@Override
public void updateOobChannelAddresses(ManagedChannel channel,
List<EquivalentAddressGroup> eag) {
}
}
}