blob: d0b05112baecb79b48c80e1c4f3a927256f611f5 [file] [log] [blame]
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsServerWrapper.ATTR_SERVER_ROUTING_CONFIG;
import static io.grpc.xds.XdsServerWrapper.RETRY_DELAY_NANOS;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.InsecureChannelCredentials;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.FakeClock;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.Filter.ServerInterceptorBuilder;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher;
import io.grpc.xds.XdsClient.LdsResourceWatcher;
import io.grpc.xds.XdsClient.RdsResourceWatcher;
import io.grpc.xds.XdsClient.RdsUpdate;
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClient;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClientPoolFactory;
import io.grpc.xds.XdsServerWrapper.ConfigApplyingInterceptor;
import io.grpc.xds.XdsServerWrapper.ServerRoutingConfig;
import io.grpc.xds.internal.Matchers.HeaderMatcher;
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class XdsServerWrapperTest {
private static final int START_WAIT_AFTER_LISTENER_MILLIS = 100;
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@Mock
private ServerBuilder<?> mockBuilder;
@Mock
private Server mockServer;
@Mock
private XdsServingStatusListener listener;
private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager();
private FakeClock executor = new FakeClock();
private FakeXdsClient xdsClient = new FakeXdsClient();
private FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private XdsServerWrapper xdsServerWrapper;
private ServerRoutingConfig noopConfig = ServerRoutingConfig.create(
ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
@Before
public void setup() {
when(mockBuilder.build()).thenReturn(mockServer);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
filterRegistry, executor.getScheduledExecutorService());
}
@After
public void tearDown() {
xdsServerWrapper.shutdownNow();
}
@Test
public void testBootstrap_notV3() throws Exception {
Bootstrapper.BootstrapInfo b =
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create("uri", InsecureChannelCredentials.create(), false)))
.node(EnvoyProtoData.Node.newBuilder().setId("id").build())
.serverListenerResourceNameTemplate("grpc/server?udpa.resource.listening_address=%s")
.build();
verifyBootstrapFail(b);
}
@Test
public void testBootstrap_noTemplate() throws Exception {
Bootstrapper.BootstrapInfo b =
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create("uri", InsecureChannelCredentials.create(), true)))
.node(EnvoyProtoData.Node.newBuilder().setId("id").build())
.build();
verifyBootstrapFail(b);
}
private void verifyBootstrapFail(Bootstrapper.BootstrapInfo b) throws Exception {
XdsClient xdsClient = mock(XdsClient.class);
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient), filterRegistry);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
Throwable cause = ex.getCause().getCause();
assertThat(cause).isInstanceOf(StatusException.class);
assertThat(((StatusException)cause).getStatus().getCode())
.isEqualTo(Status.UNAVAILABLE.getCode());
}
}
@Test
public void testBootstrap_templateWithXdstp() throws Exception {
Bootstrapper.BootstrapInfo b = Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create(
"uri", InsecureChannelCredentials.create(), true)))
.node(EnvoyProtoData.Node.newBuilder().setId("id").build())
.serverListenerResourceNameTemplate(
"xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/%s")
.build();
XdsClient xdsClient = mock(XdsClient.class);
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient), filterRegistry);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
xdsServerWrapper.start();
} catch (IOException ex) {
// ignore
}
}
});
verify(xdsClient, timeout(5000)).watchLdsResource(
eq("xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/"
+ "%5B::FFFF:129.144.52.38%5D:80"),
any(LdsResourceWatcher.class));
}
@Test
public void shutdown() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsWatched = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
HttpConnectionManager hcm_virtual = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(createVirtualHost("virtual-host-0")),
new ArrayList<NamedFilterConfig>());
FilterChain f0 = createFilterChain("filter-chain-0", hcm_virtual);
FilterChain f1 = createFilterChain("filter-chain-1", createRds("rds"));
xdsClient.deliverLdsUpdate(Collections.singletonList(f0), f1);
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
verify(listener, timeout(5000)).onServing();
start.get(START_WAIT_AFTER_LISTENER_MILLIS, TimeUnit.MILLISECONDS);
verify(mockServer).start();
xdsServerWrapper.shutdown();
assertThat(xdsServerWrapper.isShutdown()).isTrue();
assertThat(xdsClient.ldsResource).isNull();
assertThat(xdsClient.shutdown).isTrue();
verify(mockServer).shutdown();
assertThat(f0.sslContextProviderSupplier().isShutdown()).isTrue();
assertThat(f1.sslContextProviderSupplier().isShutdown()).isTrue();
when(mockServer.isTerminated()).thenReturn(true);
when(mockServer.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
assertThat(xdsServerWrapper.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
xdsServerWrapper.awaitTermination();
assertThat(xdsServerWrapper.isTerminated()).isTrue();
assertThat(start.get()).isSameInstanceAs(xdsServerWrapper);
}
@Test
public void shutdown_inflight() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsWatched = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
HttpConnectionManager hcmVirtual = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(createVirtualHost("virtual-host-0")),
new ArrayList<NamedFilterConfig>());
FilterChain f0 = createFilterChain("filter-chain-0", createRds("rds"));
FilterChain f1 = createFilterChain("filter-chain-1", hcmVirtual);
xdsClient.deliverLdsUpdate(Collections.singletonList(f0), f1);
xdsServerWrapper.shutdown();
when(mockServer.isTerminated()).thenReturn(true);
when(mockServer.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
assertThat(xdsServerWrapper.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
xdsServerWrapper.awaitTermination();
assertThat(xdsServerWrapper.isTerminated()).isTrue();
verify(mockServer, never()).start();
assertThat(xdsServerWrapper.isShutdown()).isTrue();
assertThat(xdsClient.ldsResource).isNull();
assertThat(xdsClient.shutdown).isTrue();
verify(mockServer).shutdown();
assertThat(f0.sslContextProviderSupplier().isShutdown()).isTrue();
assertThat(f1.sslContextProviderSupplier().isShutdown()).isTrue();
assertThat(start.isDone()).isFalse(); //shall we set initialStatus when shutdown?
}
@Test
public void shutdown_afterResourceNotExist() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsResource = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
verify(listener, timeout(5000)).onNotServing(any());
try {
start.get(START_WAIT_AFTER_LISTENER_MILLIS, TimeUnit.MILLISECONDS);
fail("server should not start() successfully.");
} catch (TimeoutException ex) {
// expect to block here.
assertThat(start.isDone()).isFalse();
}
verify(mockBuilder, times(1)).build();
verify(mockServer, never()).start();
verify(mockServer).shutdown();
when(mockServer.isShutdown()).thenReturn(true);
when(mockServer.isTerminated()).thenReturn(true);
verify(listener, times(1)).onNotServing(any(Throwable.class));
xdsServerWrapper.shutdown();
assertThat(xdsServerWrapper.isShutdown()).isTrue();
assertThat(xdsClient.ldsResource).isNull();
assertThat(xdsClient.shutdown).isTrue();
verify(mockBuilder, times(1)).build();
verify(mockServer, times(1)).shutdown();
xdsServerWrapper.awaitTermination(1, TimeUnit.SECONDS);
assertThat(xdsServerWrapper.isTerminated()).isTrue();
}
@Test
public void shutdown_pendingRetry() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
when(mockServer.start()).thenThrow(new IOException("error!"));
FilterChain filterChain = createFilterChain("filter-chain-1", createRds("rds"));
SslContextProviderSupplier sslSupplier = filterChain.sslContextProviderSupplier();
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
}
assertThat(executor.getPendingTasks().size()).isEqualTo(1);
verify(mockServer).start();
verify(mockServer, never()).shutdown();
xdsServerWrapper.shutdown();
verify(mockServer).shutdown();
when(mockServer.isTerminated()).thenReturn(true);
assertThat(sslSupplier.isShutdown()).isTrue();
assertThat(executor.getPendingTasks().size()).isEqualTo(0);
verify(listener, never()).onNotServing(any(Throwable.class));
verify(listener, never()).onServing();
xdsServerWrapper.awaitTermination();
assertThat(xdsServerWrapper.isTerminated()).isTrue();
}
@Test
public void shutdownNow_startThreadShouldNotLeak() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor()
.execute(
new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
assertThat(xdsClient.ldsResource.get(5, TimeUnit.SECONDS))
.isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
xdsServerWrapper.shutdownNow();
try {
start.get(5, TimeUnit.SECONDS);
fail("should have thrown but not");
} catch (ExecutionException ex) {
assertThat(ex).hasCauseThat().isInstanceOf(IOException.class);
assertThat(ex).hasCauseThat().hasMessageThat().isEqualTo("server is forcefully shut down");
}
}
@Test
public void initialStartIoException() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
when(mockServer.start()).thenThrow(new IOException("error!"));
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
FilterChain filterChain = createFilterChain("filter-chain-1", createRds("rds"));
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
assertThat(ex.getCause().getMessage()).isEqualTo("error!");
}
}
@Test
public void discoverState_virtualhost() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsWatched = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
VirtualHost virtualHost =
VirtualHost.create(
"virtual-host", Collections.singletonList("auth"), new ArrayList<Route>(),
ImmutableMap.<String, FilterConfig>of());
HttpConnectionManager httpConnectionManager = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), new ArrayList<NamedFilterConfig>());
EnvoyServerProtoData.FilterChain filterChain = EnvoyServerProtoData.FilterChain.create(
"filter-chain-foo", createMatch(), httpConnectionManager, createTls(),
mock(TlsContextManager.class));
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
start.get(5000, TimeUnit.MILLISECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(filterChain).get();
assertThat(realConfig.virtualHosts()).isEqualTo(httpConnectionManager.virtualHosts());
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
verify(listener).onServing();
verify(mockServer).start();
}
@Test
public void discoverState_restart_afterResourceNotExist() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsResource = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsResource).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
VirtualHost virtualHost =
VirtualHost.create(
"virtual-host", Collections.singletonList("auth"), new ArrayList<Route>(),
ImmutableMap.<String, FilterConfig>of());
HttpConnectionManager httpConnectionManager = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), new ArrayList<NamedFilterConfig>());
EnvoyServerProtoData.FilterChain filterChain = EnvoyServerProtoData.FilterChain.create(
"filter-chain-foo", createMatch(), httpConnectionManager, createTls(),
mock(TlsContextManager.class));
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
start.get(5000, TimeUnit.MILLISECONDS);
verify(listener).onServing();
verify(mockServer).start();
// server shutdown after resourceDoesNotExist
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
verify(mockServer).shutdown();
// re-deliver lds resource
reset(mockServer);
reset(listener);
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
verify(listener).onServing();
verify(mockServer).start();
}
@Test
public void discoverState_rds() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsWatched = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
VirtualHost virtualHost = createVirtualHost("virtual-host-0");
HttpConnectionManager hcmVirtual = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), new ArrayList<NamedFilterConfig>());
EnvoyServerProtoData.FilterChain f0 = createFilterChain("filter-chain-0", hcmVirtual);
EnvoyServerProtoData.FilterChain f1 = createFilterChain("filter-chain-1", createRds("r0"));
xdsClient.rdsCount = new CountDownLatch(3);
xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), null);
assertThat(start.isDone()).isFalse();
assertThat(selectorManager.getSelectorToUpdateSelector()).isNull();
verify(mockServer, never()).start();
verify(listener, never()).onServing();
EnvoyServerProtoData.FilterChain f2 = createFilterChain("filter-chain-2", createRds("r1"));
EnvoyServerProtoData.FilterChain f3 = createFilterChain("filter-chain-3", createRds("r2"));
xdsClient.deliverLdsUpdate(Arrays.asList(f0, f2), f3);
verify(mockServer, never()).start();
verify(listener, never()).onServing();
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("r1",
Collections.singletonList(createVirtualHost("virtual-host-1")));
verify(mockServer, never()).start();
xdsClient.deliverRdsUpdate("r2",
Collections.singletonList(createVirtualHost("virtual-host-2")));
start.get(5000, TimeUnit.MILLISECONDS);
verify(mockServer).start();
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(2);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f2).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig().get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-2")));
assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier())
.isEqualTo(f3.sslContextProviderSupplier());
}
@Test
public void discoverState_oneRdsToMultipleFilterChain() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsWatched = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
EnvoyServerProtoData.FilterChain f0 = createFilterChain("filter-chain-0", createRds("r0"));
EnvoyServerProtoData.FilterChain f1 = createFilterChain("filter-chain-1", createRds("r0"));
EnvoyServerProtoData.FilterChain f2 = createFilterChain("filter-chain-2", createRds("r0"));
xdsClient.rdsCount = new CountDownLatch(1);
xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), f2);
assertThat(start.isDone()).isFalse();
assertThat(selectorManager.getSelectorToUpdateSelector()).isNull();
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("r0",
Collections.singletonList(createVirtualHost("virtual-host-0")));
start.get(5000, TimeUnit.MILLISECONDS);
verify(mockServer, times(1)).start();
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig().get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier())
.isSameInstanceAs(f2.sslContextProviderSupplier());
EnvoyServerProtoData.FilterChain f3 = createFilterChain("filter-chain-3", createRds("r0"));
EnvoyServerProtoData.FilterChain f4 = createFilterChain("filter-chain-4", createRds("r1"));
EnvoyServerProtoData.FilterChain f5 = createFilterChain("filter-chain-4", createRds("r1"));
xdsClient.rdsCount = new CountDownLatch(1);
xdsClient.deliverLdsUpdate(Arrays.asList(f5, f3), f4);
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("r1",
Collections.singletonList(createVirtualHost("virtual-host-1")));
xdsClient.deliverRdsUpdate("r0",
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(2);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f5).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f3).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-0")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
realConfig = selectorManager.getSelectorToUpdateSelector().getDefaultRoutingConfig().get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
assertThat(selectorManager.getSelectorToUpdateSelector().getDefaultSslContextProviderSupplier())
.isSameInstanceAs(f4.sslContextProviderSupplier());
verify(mockServer, times(1)).start();
xdsServerWrapper.shutdown();
verify(mockServer, times(1)).shutdown();
when(mockServer.isTerminated()).thenReturn(true);
xdsServerWrapper.awaitTermination();
assertThat(xdsServerWrapper.isTerminated()).isTrue();
}
@Test
public void discoverState_rds_onError_and_resourceNotExist() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsWatched = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsWatched).isEqualTo("grpc/server?udpa.resource.listening_address=0.0.0.0:1");
VirtualHost virtualHost = createVirtualHost("virtual-host-0");
HttpConnectionManager hcmVirtual = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), new ArrayList<NamedFilterConfig>());
EnvoyServerProtoData.FilterChain f0 = createFilterChain("filter-chain-0", hcmVirtual);
EnvoyServerProtoData.FilterChain f1 = createFilterChain("filter-chain-1", createRds("r0"));
xdsClient.deliverLdsUpdate(Arrays.asList(f0, f1), null);
xdsClient.rdsCount.await();
xdsClient.rdsWatchers.get("r0").onError(Status.CANCELLED);
start.get(5000, TimeUnit.MILLISECONDS);
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(2);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1).get();
assertThat(realConfig.virtualHosts()).isEmpty();
assertThat(realConfig.interceptors()).isEmpty();
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f0).get();
assertThat(realConfig.virtualHosts()).isEqualTo(hcmVirtual.virtualHosts());
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
xdsClient.deliverRdsUpdate("r0",
Collections.singletonList(createVirtualHost("virtual-host-1")));
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
xdsClient.rdsWatchers.get("r0").onError(Status.CANCELLED);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
xdsClient.rdsWatchers.get("r0").onResourceDoesNotExist("r0");
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(f1).get();
assertThat(realConfig.virtualHosts()).isEmpty();
assertThat(realConfig.interceptors()).isEmpty();
}
@Test
public void error() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsResource = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
verify(listener, timeout(5000)).onNotServing(any());
try {
start.get(START_WAIT_AFTER_LISTENER_MILLIS, TimeUnit.MILLISECONDS);
fail("server should not start()");
} catch (TimeoutException ex) {
// expect to block here.
assertThat(start.isDone()).isFalse();
}
verify(listener, times(1)).onNotServing(any(StatusException.class));
verify(mockBuilder, times(1)).build();
FilterChain filterChain0 = createFilterChain("filter-chain-0", createRds("rds"));
SslContextProviderSupplier sslSupplier0 = filterChain0.sslContextProviderSupplier();
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain0), null);
xdsClient.ldsWatcher.onError(Status.INTERNAL);
assertThat(selectorManager.getSelectorToUpdateSelector())
.isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN);
RdsResourceWatcher saveRdsWatcher = xdsClient.rdsWatchers.get("rds");
verify(mockBuilder, times(1)).build();
verify(listener, times(2)).onNotServing(any(StatusException.class));
assertThat(sslSupplier0.isShutdown()).isFalse();
when(mockServer.start()).thenThrow(new IOException("error!"))
.thenReturn(mockServer);
FilterChain filterChain1 = createFilterChain("filter-chain-1", createRds("rds"));
SslContextProviderSupplier sslSupplier1 = filterChain1.sslContextProviderSupplier();
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain1), null);
assertThat(sslSupplier0.isShutdown()).isTrue();
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
assertThat(ex.getCause().getMessage()).isEqualTo("error!");
}
assertThat(executor.forwardNanos(RETRY_DELAY_NANOS)).isEqualTo(1);
verify(mockBuilder, times(1)).build();
verify(mockServer, times(2)).start();
verify(listener, times(1)).onServing();
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
ServerRoutingConfig realConfig =
selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().get(filterChain1).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
// xds update after start
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-2")));
assertThat(sslSupplier1.isShutdown()).isFalse();
xdsClient.ldsWatcher.onError(Status.DEADLINE_EXCEEDED);
verify(mockBuilder, times(1)).build();
verify(mockServer, times(2)).start();
verify(listener, times(2)).onNotServing(any(StatusException.class));
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain1).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-2")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
assertThat(sslSupplier1.isShutdown()).isFalse();
// not serving after serving
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
assertThat(xdsClient.rdsWatchers).isEmpty();
verify(mockServer, times(2)).shutdown();
when(mockServer.isShutdown()).thenReturn(true);
assertThat(selectorManager.getSelectorToUpdateSelector())
.isSameInstanceAs(FilterChainSelector.NO_FILTER_CHAIN);
verify(listener, times(3)).onNotServing(any(StatusException.class));
assertThat(sslSupplier1.isShutdown()).isTrue();
// no op
saveRdsWatcher.onChanged(
new RdsUpdate(Collections.singletonList(createVirtualHost("virtual-host-1"))));
verify(mockBuilder, times(1)).build();
verify(mockServer, times(2)).start();
verify(listener, times(1)).onServing();
// cancel retry
when(mockServer.start()).thenThrow(new IOException("error1!"))
.thenThrow(new IOException("error2!"))
.thenReturn(mockServer);
FilterChain filterChain2 = createFilterChain("filter-chain-2", createRds("rds"));
SslContextProviderSupplier sslSupplier2 = filterChain2.sslContextProviderSupplier();
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain2), null);
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(sslSupplier1.isShutdown()).isTrue();
verify(mockBuilder, times(2)).build();
when(mockServer.isShutdown()).thenReturn(false);
verify(mockServer, times(3)).start();
verify(listener, times(1)).onServing();
verify(listener, times(3)).onNotServing(any(StatusException.class));
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain2).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
assertThat(executor.numPendingTasks()).isEqualTo(1);
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
verify(mockServer, times(3)).shutdown();
verify(listener, times(4)).onNotServing(any(StatusException.class));
verify(listener, times(1)).onNotServing(any(IOException.class));
when(mockServer.isShutdown()).thenReturn(true);
assertThat(executor.numPendingTasks()).isEqualTo(0);
assertThat(sslSupplier2.isShutdown()).isTrue();
// serving after not serving
FilterChain filterChain3 = createFilterChain("filter-chain-2", createRds("rds"));
SslContextProviderSupplier sslSupplier3 = filterChain3.sslContextProviderSupplier();
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain3), null);
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
verify(mockBuilder, times(3)).build();
verify(mockServer, times(4)).start();
verify(listener, times(1)).onServing();
when(mockServer.isShutdown()).thenReturn(false);
verify(listener, times(4)).onNotServing(any(StatusException.class));
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
realConfig = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain3).get();
assertThat(realConfig.virtualHosts()).isEqualTo(
Collections.singletonList(createVirtualHost("virtual-host-1")));
assertThat(realConfig.interceptors()).isEqualTo(ImmutableMap.of());
xdsServerWrapper.shutdown();
verify(mockServer, times(4)).shutdown();
assertThat(sslSupplier3.isShutdown()).isTrue();
when(mockServer.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
assertThat(xdsServerWrapper.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
}
@Test
@SuppressWarnings("unchecked")
public void interceptor_success() throws Exception {
ArgumentCaptor<ConfigApplyingInterceptor> interceptorCaptor =
ArgumentCaptor.forClass(ConfigApplyingInterceptor.class);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
verify(mockBuilder).intercept(interceptorCaptor.capture());
ConfigApplyingInterceptor interceptor = interceptorCaptor.getValue();
RouteMatch routeMatch =
RouteMatch.create(
PathMatcher.fromPath("/FooService/barMethod", true),
Collections.<HeaderMatcher>emptyList(), null);
Route route = Route.forAction(routeMatch, null,
ImmutableMap.<String, FilterConfig>of());
VirtualHost virtualHost = VirtualHost.create(
"v1", Collections.singletonList("foo.google.com"), Arrays.asList(route),
ImmutableMap.<String, FilterConfig>of());
final List<Integer> interceptorTrace = new ArrayList<>();
ServerInterceptor interceptor0 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
interceptorTrace.add(0);
return next.startCall(call, headers);
}
};
ServerRoutingConfig realConfig = ServerRoutingConfig.create(
ImmutableList.of(virtualHost), ImmutableMap.of(route, interceptor0));
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
when(serverCall.getMethodDescriptor()).thenReturn(createMethod("FooService/barMethod"));
when(serverCall.getAttributes()).thenReturn(
Attributes.newBuilder().set(ATTR_SERVER_ROUTING_CONFIG,
new AtomicReference<>(realConfig)).build());
when(serverCall.getAuthority()).thenReturn("foo.google.com");
ServerCallHandler<Void, Void> next = mock(ServerCallHandler.class);
interceptor.interceptCall(serverCall, new Metadata(), next);
verify(next).startCall(eq(serverCall), any(Metadata.class));
assertThat(interceptorTrace).isEqualTo(Arrays.asList(0));
}
@Test
@SuppressWarnings("unchecked")
public void interceptor_virtualHostNotMatch() throws Exception {
ArgumentCaptor<ConfigApplyingInterceptor> interceptorCaptor =
ArgumentCaptor.forClass(ConfigApplyingInterceptor.class);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
verify(mockBuilder).intercept(interceptorCaptor.capture());
ConfigApplyingInterceptor interceptor = interceptorCaptor.getValue();
ServerRoutingConfig routingConfig = createRoutingConfig("/FooService/barMethod",
"foo.google.com", "filter-type-url");
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
when(serverCall.getAttributes()).thenReturn(
Attributes.newBuilder().set(ATTR_SERVER_ROUTING_CONFIG,
new AtomicReference<>(routingConfig)).build());
when(serverCall.getAuthority()).thenReturn("not-match.google.com");
Filter filter = mock(Filter.class);
when(filter.typeUrls()).thenReturn(new String[]{"filter-type-url"});
filterRegistry.register(filter);
ServerCallHandler<Void, Void> next = mock(ServerCallHandler.class);
interceptor.interceptCall(serverCall, new Metadata(), next);
verify(next, never()).startCall(any(ServerCall.class), any(Metadata.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(serverCall).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertThat(status.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(status.getDescription()).isEqualTo("Could not find xDS virtual host matching RPC");
}
@Test
@SuppressWarnings("unchecked")
public void interceptor_routeNotMatch() throws Exception {
ArgumentCaptor<ConfigApplyingInterceptor> interceptorCaptor =
ArgumentCaptor.forClass(ConfigApplyingInterceptor.class);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
verify(mockBuilder).intercept(interceptorCaptor.capture());
ConfigApplyingInterceptor interceptor = interceptorCaptor.getValue();
ServerRoutingConfig routingConfig = createRoutingConfig("/FooService/barMethod",
"foo.google.com", "filter-type-url");
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
when(serverCall.getAttributes()).thenReturn(
Attributes.newBuilder()
.set(ATTR_SERVER_ROUTING_CONFIG, new AtomicReference<>(routingConfig)).build());
when(serverCall.getMethodDescriptor()).thenReturn(createMethod("NotMatchMethod"));
when(serverCall.getAuthority()).thenReturn("foo.google.com");
Filter filter = mock(Filter.class);
when(filter.typeUrls()).thenReturn(new String[]{"filter-type-url"});
filterRegistry.register(filter);
ServerCallHandler<Void, Void> next = mock(ServerCallHandler.class);
interceptor.interceptCall(serverCall, new Metadata(), next);
verify(next, never()).startCall(any(ServerCall.class), any(Metadata.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(serverCall).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertThat(status.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(status.getDescription()).isEqualTo("Could not find xDS route matching RPC");
}
@Test
@SuppressWarnings("unchecked")
public void interceptor_invalidRouteAction() throws Exception {
ArgumentCaptor<ConfigApplyingInterceptor> interceptorCaptor =
ArgumentCaptor.forClass(ConfigApplyingInterceptor.class);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
verify(mockBuilder).intercept(interceptorCaptor.capture());
ConfigApplyingInterceptor interceptor = interceptorCaptor.getValue();
ServerRoutingConfig routingConfig = createRoutingConfig("/FooService/barMethod",
"foo.google.com", "filter-type-url", Route.RouteAction.forCluster(
"cluster", Collections.<Route.RouteAction.HashPolicy>emptyList(), null, null
));
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
when(serverCall.getAttributes()).thenReturn(
Attributes.newBuilder()
.set(ATTR_SERVER_ROUTING_CONFIG, new AtomicReference<>(routingConfig)).build());
when(serverCall.getMethodDescriptor()).thenReturn(createMethod("FooService/barMethod"));
when(serverCall.getAuthority()).thenReturn("foo.google.com");
Filter filter = mock(Filter.class);
when(filter.typeUrls()).thenReturn(new String[]{"filter-type-url"});
filterRegistry.register(filter);
ServerCallHandler<Void, Void> next = mock(ServerCallHandler.class);
interceptor.interceptCall(serverCall, new Metadata(), next);
verify(next, never()).startCall(any(ServerCall.class), any(Metadata.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(serverCall).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertThat(status.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(status.getDescription()).isEqualTo("Invalid xDS route action for matching "
+ "route: only Route.non_forwarding_action should be allowed.");
}
@Test
@SuppressWarnings("unchecked")
public void interceptor_failingRouterConfig() throws Exception {
ArgumentCaptor<ConfigApplyingInterceptor> interceptorCaptor =
ArgumentCaptor.forClass(ConfigApplyingInterceptor.class);
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
verify(mockBuilder).intercept(interceptorCaptor.capture());
ConfigApplyingInterceptor interceptor = interceptorCaptor.getValue();
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
when(serverCall.getAttributes()).thenReturn(
Attributes.newBuilder().set(ATTR_SERVER_ROUTING_CONFIG,
new AtomicReference<>(ServerRoutingConfig.FAILING_ROUTING_CONFIG)).build());
ServerCallHandler<Void, Void> next = mock(ServerCallHandler.class);
interceptor.interceptCall(serverCall, new Metadata(), next);
verify(next, never()).startCall(any(ServerCall.class), any(Metadata.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(serverCall).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertThat(status.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(status.getDescription()).isEqualTo(
"Missing or broken xDS routing config: RDS config unavailable.");
}
@Test
@SuppressWarnings("unchecked")
public void buildInterceptor_inline() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
RouteMatch routeMatch =
RouteMatch.create(
PathMatcher.fromPath("/FooService/barMethod", true),
Collections.<HeaderMatcher>emptyList(), null);
Filter filter = mock(Filter.class, withSettings()
.extraInterfaces(ServerInterceptorBuilder.class));
when(filter.typeUrls()).thenReturn(new String[]{"filter-type-url"});
filterRegistry.register(filter);
FilterConfig f0 = mock(FilterConfig.class);
FilterConfig f0Override = mock(FilterConfig.class);
when(f0.typeUrl()).thenReturn("filter-type-url");
final List<Integer> interceptorTrace = new ArrayList<>();
ServerInterceptor interceptor0 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
interceptorTrace.add(0);
return next.startCall(call, headers);
}
};
ServerInterceptor interceptor1 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
interceptorTrace.add(1);
return next.startCall(call, headers);
}
};
when(((ServerInterceptorBuilder)filter).buildServerInterceptor(f0, null))
.thenReturn(interceptor0);
when(((ServerInterceptorBuilder)filter).buildServerInterceptor(f0, f0Override))
.thenReturn(interceptor1);
Route route = Route.forAction(routeMatch, null,
ImmutableMap.<String, FilterConfig>of());
VirtualHost virtualHost = VirtualHost.create(
"v1", Collections.singletonList("foo.google.com"), Arrays.asList(route),
ImmutableMap.of("filter-config-name-0", f0Override));
HttpConnectionManager hcmVirtual = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost),
Arrays.asList(new NamedFilterConfig("filter-config-name-0", f0),
new NamedFilterConfig("filter-config-name-1", f0)));
EnvoyServerProtoData.FilterChain filterChain = createFilterChain("filter-chain-0", hcmVirtual);
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
start.get(5000, TimeUnit.MILLISECONDS);
verify(mockServer).start();
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
ServerInterceptor realInterceptor = selectorManager.getSelectorToUpdateSelector()
.getRoutingConfigs().get(filterChain).get().interceptors().get(route);
assertThat(realInterceptor).isNotNull();
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
ServerCallHandler<Void, Void> mockNext = mock(ServerCallHandler.class);
final ServerCall.Listener<Void> listener = new ServerCall.Listener<Void>() {};
when(mockNext.startCall(any(ServerCall.class), any(Metadata.class))).thenReturn(listener);
realInterceptor.interceptCall(serverCall, new Metadata(), mockNext);
assertThat(interceptorTrace).isEqualTo(Arrays.asList(1, 0));
verify(mockNext).startCall(eq(serverCall), any(Metadata.class));
}
@Test
@SuppressWarnings("unchecked")
public void buildInterceptor_rds() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
Filter filter = mock(Filter.class, withSettings()
.extraInterfaces(ServerInterceptorBuilder.class));
when(filter.typeUrls()).thenReturn(new String[]{"filter-type-url"});
filterRegistry.register(filter);
FilterConfig f0 = mock(FilterConfig.class);
FilterConfig f0Override = mock(FilterConfig.class);
when(f0.typeUrl()).thenReturn("filter-type-url");
final List<Integer> interceptorTrace = new ArrayList<>();
ServerInterceptor interceptor0 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
interceptorTrace.add(0);
return next.startCall(call, headers);
}
};
ServerInterceptor interceptor1 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
interceptorTrace.add(1);
return next.startCall(call, headers);
}
};
when(((ServerInterceptorBuilder)filter).buildServerInterceptor(f0, null))
.thenReturn(interceptor0);
when(((ServerInterceptorBuilder)filter).buildServerInterceptor(f0, f0Override))
.thenReturn(interceptor1);
RouteMatch routeMatch =
RouteMatch.create(
PathMatcher.fromPath("/FooService/barMethod", true),
Collections.<HeaderMatcher>emptyList(), null);
HttpConnectionManager rdsHcm = HttpConnectionManager.forRdsName(0L, "r0",
Arrays.asList(new NamedFilterConfig("filter-config-name-0", f0),
new NamedFilterConfig("filter-config-name-1", f0)));
EnvoyServerProtoData.FilterChain filterChain = createFilterChain("filter-chain-0", rdsHcm);
xdsClient.deliverLdsUpdate(Collections.singletonList(filterChain), null);
Route route = Route.forAction(routeMatch, null,
ImmutableMap.<String, FilterConfig>of());
VirtualHost virtualHost = VirtualHost.create(
"v1", Collections.singletonList("foo.google.com"), Arrays.asList(route),
ImmutableMap.of("filter-config-name-0", f0Override));
xdsClient.rdsCount.await(5, TimeUnit.SECONDS);
xdsClient.deliverRdsUpdate("r0", Collections.singletonList(virtualHost));
start.get(5000, TimeUnit.MILLISECONDS);
verify(mockServer).start();
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs().size())
.isEqualTo(1);
ServerInterceptor realInterceptor = selectorManager.getSelectorToUpdateSelector()
.getRoutingConfigs().get(filterChain).get().interceptors().get(route);
assertThat(realInterceptor).isNotNull();
ServerCall<Void, Void> serverCall = mock(ServerCall.class);
ServerCallHandler<Void, Void> mockNext = mock(ServerCallHandler.class);
final ServerCall.Listener<Void> listener = new ServerCall.Listener<Void>() {};
when(mockNext.startCall(any(ServerCall.class), any(Metadata.class))).thenReturn(listener);
realInterceptor.interceptCall(serverCall, new Metadata(), mockNext);
assertThat(interceptorTrace).isEqualTo(Arrays.asList(1, 0));
verify(mockNext).startCall(eq(serverCall), any(Metadata.class));
virtualHost = VirtualHost.create(
"v1", Collections.singletonList("foo.google.com"), Arrays.asList(route),
ImmutableMap.<String, FilterConfig>of());
xdsClient.deliverRdsUpdate("r0", Collections.singletonList(virtualHost));
realInterceptor = selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain).get().interceptors().get(route);
assertThat(realInterceptor).isNotNull();
interceptorTrace.clear();
realInterceptor.interceptCall(serverCall, new Metadata(), mockNext);
assertThat(interceptorTrace).isEqualTo(Arrays.asList(0, 0));
verify(mockNext, times(2)).startCall(eq(serverCall), any(Metadata.class));
xdsClient.rdsWatchers.get("r0").onResourceDoesNotExist("r0");
assertThat(selectorManager.getSelectorToUpdateSelector().getRoutingConfigs()
.get(filterChain).get()).isEqualTo(noopConfig);
}
private static FilterChain createFilterChain(String name, HttpConnectionManager hcm) {
return EnvoyServerProtoData.FilterChain.create(name, createMatch(),
hcm, createTls(), mock(TlsContextManager.class));
}
private static VirtualHost createVirtualHost(String name) {
return VirtualHost.create(
name, Collections.singletonList("auth"), new ArrayList<Route>(),
ImmutableMap.<String, FilterConfig>of());
}
private static HttpConnectionManager createRds(String name) {
return createRds(name, null);
}
private static HttpConnectionManager createRds(String name, FilterConfig filterConfig) {
return HttpConnectionManager.forRdsName(0L, name,
Arrays.asList(new NamedFilterConfig("named-config-" + name, filterConfig)));
}
private static EnvoyServerProtoData.FilterChainMatch createMatch() {
return EnvoyServerProtoData.FilterChainMatch.create(
0,
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
EnvoyServerProtoData.ConnectionSourceType.ANY,
ImmutableList.of(),
ImmutableList.of(),
"");
}
private static ServerRoutingConfig createRoutingConfig(String path, String domain,
String filterType) {
return createRoutingConfig(path, domain, filterType, null);
}
private static ServerRoutingConfig createRoutingConfig(String path, String domain,
String filterType, Route.RouteAction action) {
RouteMatch routeMatch =
RouteMatch.create(
PathMatcher.fromPath(path, true),
Collections.<HeaderMatcher>emptyList(), null);
VirtualHost virtualHost = VirtualHost.create(
"v1", Collections.singletonList(domain),
Arrays.asList(Route.forAction(routeMatch, action,
ImmutableMap.<String, FilterConfig>of())),
Collections.<String, FilterConfig>emptyMap());
FilterConfig f0 = mock(FilterConfig.class);
when(f0.typeUrl()).thenReturn(filterType);
return ServerRoutingConfig.create(ImmutableList.<VirtualHost>of(virtualHost),
ImmutableMap.<Route, ServerInterceptor>of()
);
}
private static MethodDescriptor<Void, Void> createMethod(String path) {
return MethodDescriptor.<Void, Void>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setFullMethodName(path)
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.build();
}
private static EnvoyServerProtoData.DownstreamTlsContext createTls() {
return CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1");
}
}