blob: d0ced5faaed16e92b133ca068f0de2a795d54a64 [file] [log] [blame]
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.EquivalentAddressGroup;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.FakeClock.ScheduledTask;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Unit tests for {@link ManagedChannelImpl}'s idle mode.
*/
@RunWith(JUnit4.class)
public class ManagedChannelImplIdlenessTest {
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock oobExecutor = new FakeClock();
private static final String AUTHORITY = "fakeauthority";
private static final String USER_AGENT = "fakeagent";
private static final ProxyParameters NO_PROXY = null;
private static final long IDLE_TIMEOUT_SECONDS = 30;
private ManagedChannelImpl channel;
private final MethodDescriptor<String, Integer> method =
MethodDescriptor.<String, Integer>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(new StringMarshaller())
.setResponseMarshaller(new IntegerMarshaller())
.build();
private final List<EquivalentAddressGroup> servers = Lists.newArrayList();
private final ObjectPool<Executor> executorPool =
new FixedObjectPool<Executor>(executor.getScheduledExecutorService());
private final ObjectPool<Executor> oobExecutorPool =
new FixedObjectPool<Executor>(oobExecutor.getScheduledExecutorService());
@Mock private ClientTransportFactory mockTransportFactory;
@Mock private LoadBalancer mockLoadBalancer;
@Mock private LoadBalancer.Factory mockLoadBalancerFactory;
@Mock private NameResolver mockNameResolver;
@Mock private NameResolver.Factory mockNameResolverFactory;
@Mock private ClientCall.Listener<Integer> mockCallListener;
@Mock private ClientCall.Listener<Integer> mockCallListener2;
@Captor private ArgumentCaptor<NameResolver.Listener> nameResolverListenerCaptor;
private BlockingQueue<MockClientTransportInfo> newTransports;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY);
when(mockNameResolverFactory
.newNameResolver(any(URI.class), any(Attributes.class)))
.thenReturn(mockNameResolver);
when(mockTransportFactory.getScheduledExecutorService())
.thenReturn(timer.getScheduledExecutorService());
class Builder extends AbstractManagedChannelImplBuilder<Builder> {
Builder(String target) {
super(target);
}
@Override protected ClientTransportFactory buildTransportFactory() {
throw new UnsupportedOperationException();
}
@Override public Builder usePlaintext() {
throw new UnsupportedOperationException();
}
}
Builder builder = new Builder("fake://target")
.nameResolverFactory(mockNameResolverFactory)
.loadBalancerFactory(mockLoadBalancerFactory)
.idleTimeout(IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.userAgent(USER_AGENT);
builder.executorPool = executorPool;
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(),
Collections.<ClientInterceptor>emptyList(),
TimeProvider.SYSTEM_TIME_PROVIDER);
newTransports = TestUtils.captureTransports(mockTransportFactory);
for (int i = 0; i < 2; i++) {
ArrayList<SocketAddress> addrs = Lists.newArrayList();
for (int j = 0; j < 2; j++) {
addrs.add(new FakeSocketAddress("servergroup" + i + "server" + j));
}
servers.add(new EquivalentAddressGroup(addrs));
}
verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
// Verify the initial idleness
verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class));
verify(mockTransportFactory, never()).newClientTransport(
any(SocketAddress.class),
any(ClientTransportFactory.ClientTransportOptions.class));
verify(mockNameResolver, never()).start(any(NameResolver.Listener.class));
}
@After
public void allPendingTasksAreRun() {
Collection<ScheduledTask> pendingTimerTasks = timer.getPendingTasks();
for (ScheduledTask a : pendingTimerTasks) {
assertFalse(Rescheduler.isEnabled(a.command));
}
assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
}
@Test
public void newCallExitsIdleness() throws Exception {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
verify(mockNameResolver).start(nameResolverListenerCaptor.capture());
// Simulate new address resolved to make sure the LoadBalancer is correctly linked to
// the NameResolver.
nameResolverListenerCaptor.getValue().onAddresses(servers, Attributes.EMPTY);
verify(mockLoadBalancer).handleResolvedAddressGroups(servers, Attributes.EMPTY);
}
@Test
public void newCallRefreshesIdlenessTimer() throws Exception {
// First call to exit the initial idleness, then immediately cancel the call.
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
call.cancel("For testing", null);
// Verify that we have exited the idle mode
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
assertFalse(channel.inUseStateAggregator.isInUse());
// Move closer to idleness, but not yet.
timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS);
verify(mockLoadBalancer, never()).shutdown();
assertFalse(channel.inUseStateAggregator.isInUse());
// A new call would refresh the timer
call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
call.cancel("For testing", null);
assertFalse(channel.inUseStateAggregator.isInUse());
// ... so that passing the same length of time will not trigger idle mode
timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS);
verify(mockLoadBalancer, never()).shutdown();
assertFalse(channel.inUseStateAggregator.isInUse());
// ... until the time since last call has reached the timeout
timer.forwardTime(1, TimeUnit.SECONDS);
verify(mockLoadBalancer).shutdown();
assertFalse(channel.inUseStateAggregator.isInUse());
// Drain the app executor, which runs the call listeners
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
assertEquals(2, executor.runDueTasks());
verify(mockCallListener, times(2)).onClose(any(Status.class), any(Metadata.class));
}
@Test
public void delayedTransportHoldsOffIdleness() throws Exception {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
assertTrue(channel.inUseStateAggregator.isInUse());
// As long as the delayed transport is in-use (by the pending RPC), the channel won't go idle.
timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
assertTrue(channel.inUseStateAggregator.isInUse());
// Cancelling the only RPC will reset the in-use state.
assertEquals(0, executor.numPendingTasks());
call.cancel("In test", null);
assertEquals(1, executor.runDueTasks());
assertFalse(channel.inUseStateAggregator.isInUse());
// And allow the channel to go idle.
timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS);
verify(mockLoadBalancer, never()).shutdown();
timer.forwardTime(1, TimeUnit.SECONDS);
verify(mockLoadBalancer).shutdown();
}
@Test
public void realTransportsHoldsOffIdleness() throws Exception {
final EquivalentAddressGroup addressGroup = servers.get(1);
// Start a call, which goes to delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
// Verify that we have exited the idle mode
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
assertTrue(channel.inUseStateAggregator.isInUse());
// Assume LoadBalancer has received an address, then create a subchannel.
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
SubchannelPicker mockPicker = mock(SubchannelPicker.class);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
helper.updateBalancingState(READY, mockPicker);
// Delayed transport creates real streams in the app executor
executor.runDueTasks();
// Delayed transport exits in-use, while real transport has not entered in-use yet.
assertFalse(channel.inUseStateAggregator.isInUse());
// Now it's in-use
t0.listener.transportInUse(true);
assertTrue(channel.inUseStateAggregator.isInUse());
// As long as the transport is in-use, the channel won't go idle.
timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
assertTrue(channel.inUseStateAggregator.isInUse());
t0.listener.transportInUse(false);
assertFalse(channel.inUseStateAggregator.isInUse());
// And allow the channel to go idle.
timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS);
verify(mockLoadBalancer, never()).shutdown();
timer.forwardTime(1, TimeUnit.SECONDS);
verify(mockLoadBalancer).shutdown();
}
@Test
public void updateSubchannelAddresses_newAddressConnects() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
Subchannel subchannel = helper.createSubchannel(servers.get(0), Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
helper.updateSubchannelAddresses(subchannel, servers.get(1));
subchannel.requestConnection();
MockClientTransportInfo t1 = newTransports.poll();
t1.listener.transportReady();
}
@Test
public void updateSubchannelAddresses_existingAddressDoesNotConnect() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
Subchannel subchannel = helper.createSubchannel(servers.get(0), Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateSubchannelAddresses(subchannel, new EquivalentAddressGroup(changedList));
subchannel.requestConnection();
assertNull(newTransports.poll());
}
@Test
public void oobTransportDoesNotAffectIdleness() {
// Start a call, which goes to delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
// Verify that we have exited the idle mode
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
// Fail the RPC
SubchannelPicker failingPicker = mock(SubchannelPicker.class);
when(failingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(Status.UNAVAILABLE));
helper.updateBalancingState(TRANSIENT_FAILURE, failingPicker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(Status.UNAVAILABLE), any(Metadata.class));
// ... so that the channel resets its in-use state
assertFalse(channel.inUseStateAggregator.isInUse());
// Now make an RPC on an OOB channel
ManagedChannel oob = helper.createOobChannel(servers.get(0), "oobauthority");
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class),
eq(new ClientTransportFactory.ClientTransportOptions()
.setAuthority("oobauthority")
.setUserAgent(USER_AGENT)));
ClientCall<String, Integer> oobCall = oob.newCall(method, CallOptions.DEFAULT);
oobCall.start(mockCallListener2, new Metadata());
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class),
eq(new ClientTransportFactory.ClientTransportOptions()
.setAuthority("oobauthority")
.setUserAgent(USER_AGENT)));
MockClientTransportInfo oobTransportInfo = newTransports.poll();
assertEquals(0, newTransports.size());
// The OOB transport reports in-use state
oobTransportInfo.listener.transportInUse(true);
// But it won't stop the channel from going idle
verify(mockLoadBalancer, never()).shutdown();
timer.forwardTime(IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
verify(mockLoadBalancer).shutdown();
}
@Test
public void updateOobChannelAddresses_newAddressConnects() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
helper.updateOobChannelAddresses(oobChannel, servers.get(1));
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t1 = newTransports.poll();
t1.listener.transportReady();
}
@Test
public void updateOobChannelAddresses_existingAddressDoesNotConnect() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateOobChannelAddresses(oobChannel, new EquivalentAddressGroup(changedList));
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
assertNull(newTransports.poll());
}
private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
@Override
public BackoffPolicy get() {
return new BackoffPolicy() {
@Override
public long nextBackoffNanos() {
return 1;
}
};
}
}
private static class FakeSocketAddress extends SocketAddress {
final String name;
FakeSocketAddress(String name) {
this.name = name;
}
@Override
public String toString() {
return "FakeSocketAddress-" + name;
}
}
}