blob: b269e7f9bcfb0e5012e2a289d31e5af4e4d8c263 [file] [log] [blame]
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.Filter.ServerInterceptorBuilder;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.XdsClient.LdsResourceWatcher;
import io.grpc.xds.XdsClient.LdsUpdate;
import io.grpc.xds.XdsClient.RdsResourceWatcher;
import io.grpc.xds.XdsClient.RdsUpdate;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
final class XdsServerWrapper extends Server {
private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.log(Level.SEVERE, "Exception!" + e);
// TODO(chengyuanzhang): implement cleanup.
}
});
public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
ATTR_SERVER_ROUTING_CONFIG =
Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
@VisibleForTesting
static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
private final String listenerAddress;
private final ServerBuilder<?> delegateBuilder;
private boolean sharedTimeService;
private final ScheduledExecutorService timeService;
private final FilterRegistry filterRegistry;
private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final XdsServingStatusListener listener;
private final FilterChainSelectorManager filterChainSelectorManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private boolean isServing;
private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
private boolean initialStarted;
private ScheduledHandle restartTimer;
private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient;
private DiscoveryState discoveryState;
private volatile Server delegate;
XdsServerWrapper(
String listenerAddress,
ServerBuilder<?> delegateBuilder,
XdsServingStatusListener listener,
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
FilterRegistry filterRegistry) {
this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
sharedTimeService = true;
}
@VisibleForTesting
XdsServerWrapper(
String listenerAddress,
ServerBuilder<?> delegateBuilder,
XdsServingStatusListener listener,
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
FilterRegistry filterRegistry,
ScheduledExecutorService timeService) {
this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
this.listener = checkNotNull(listener, "listener");
this.filterChainSelectorManager
= checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.timeService = checkNotNull(timeService, "timeService");
this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
this.delegate = delegateBuilder.build();
}
@Override
public Server start() throws IOException {
checkState(started.compareAndSet(false, true), "Already started");
syncContext.execute(new Runnable() {
@Override
public void run() {
internalStart();
}
});
Exception exception;
try {
exception = initialStartFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
if (exception != null) {
throw (exception instanceof IOException) ? (IOException) exception :
new IOException(exception);
}
return this;
}
private void internalStart() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
listener.onNotServing(statusException);
initialStartFuture.set(statusException);
return;
}
xdsClient = xdsClientPool.getObject();
boolean useProtocolV3 = xdsClient.getBootstrapInfo().servers().get(0).useProtocolV3();
String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
if (!useProtocolV3 || listenerTemplate == null) {
StatusException statusException =
Status.UNAVAILABLE.withDescription(
"Can only support xDS v3 with listener resource name template").asException();
listener.onNotServing(statusException);
initialStartFuture.set(statusException);
xdsClient = xdsClientPool.returnObject(xdsClient);
return;
}
String replacement = listenerAddress;
if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
replacement = XdsClient.percentEncodePath(replacement);
}
discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
}
@Override
public Server shutdown() {
if (!shutdown.compareAndSet(false, true)) {
return this;
}
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!delegate.isShutdown()) {
delegate.shutdown();
}
internalShutdown();
}
});
return this;
}
@Override
public Server shutdownNow() {
if (!shutdown.compareAndSet(false, true)) {
return this;
}
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!delegate.isShutdown()) {
delegate.shutdownNow();
}
internalShutdown();
initialStartFuture.set(new IOException("server is forcefully shut down"));
}
});
return this;
}
// Must run in SynchronizationContext
private void internalShutdown() {
logger.log(Level.FINER, "Shutting down XdsServerWrapper");
if (discoveryState != null) {
discoveryState.shutdown();
}
if (xdsClient != null) {
xdsClient = xdsClientPool.returnObject(xdsClient);
}
if (restartTimer != null) {
restartTimer.cancel();
}
if (sharedTimeService) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
}
isServing = false;
internalTerminationLatch.countDown();
}
@Override
public boolean isShutdown() {
return shutdown.get();
}
@Override
public boolean isTerminated() {
return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.nanoTime();
if (!internalTerminationLatch.await(timeout, unit)) {
return false;
}
long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
}
@Override
public void awaitTermination() throws InterruptedException {
internalTerminationLatch.await();
delegate.awaitTermination();
}
@Override
public int getPort() {
return delegate.getPort();
}
@Override
public List<? extends SocketAddress> getListenSockets() {
return delegate.getListenSockets();
}
@Override
public List<ServerServiceDefinition> getServices() {
return delegate.getServices();
}
@Override
public List<ServerServiceDefinition> getImmutableServices() {
return delegate.getImmutableServices();
}
@Override
public List<ServerServiceDefinition> getMutableServices() {
return delegate.getMutableServices();
}
// Must run in SynchronizationContext
private void startDelegateServer() {
if (restartTimer != null && restartTimer.isPending()) {
return;
}
if (isServing) {
return;
}
if (delegate.isShutdown()) {
delegate = delegateBuilder.build();
}
try {
delegate.start();
listener.onServing();
isServing = true;
if (!initialStarted) {
initialStarted = true;
initialStartFuture.set(null);
}
logger.log(Level.FINER, "Delegate server started.");
} catch (IOException e) {
logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
if (!initialStarted) {
initialStarted = true;
initialStartFuture.set(e);
} else {
listener.onNotServing(e);
}
restartTimer = syncContext.schedule(
new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
}
}
private final class RestartTask implements Runnable {
@Override
public void run() {
startDelegateServer();
}
}
private final class DiscoveryState implements LdsResourceWatcher {
private final String resourceName;
// RDS resource name is the key.
private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
// Track pending RDS resources using rds name.
private final Set<String> pendingRds = new HashSet<>();
// Most recently discovered filter chains.
private List<FilterChain> filterChains = new ArrayList<>();
// Most recently discovered default filter chain.
@Nullable
private FilterChain defaultFilterChain;
private boolean stopped;
private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef
= new HashMap<>();
private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
};
private DiscoveryState(String resourceName) {
this.resourceName = checkNotNull(resourceName, "resourceName");
xdsClient.watchLdsResource(resourceName, this);
}
@Override
public void onChanged(final LdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (stopped) {
return;
}
logger.log(Level.FINEST, "Received Lds update {0}", update);
checkNotNull(update.listener(), "update");
if (!pendingRds.isEmpty()) {
// filter chain state has not yet been applied to filterChainSelectorManager and there
// are two sets of sslContextProviderSuppliers, so we release the old ones.
releaseSuppliersInFlight();
pendingRds.clear();
}
filterChains = update.listener().filterChains();
defaultFilterChain = update.listener().defaultFilterChain();
List<FilterChain> allFilterChains = filterChains;
if (defaultFilterChain != null) {
allFilterChains = new ArrayList<>(filterChains);
allFilterChains.add(defaultFilterChain);
}
Set<String> allRds = new HashSet<>();
for (FilterChain filterChain : allFilterChains) {
HttpConnectionManager hcm = filterChain.httpConnectionManager();
if (hcm.virtualHosts() == null) {
RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
if (rdsState == null) {
rdsState = new RouteDiscoveryState(hcm.rdsName());
routeDiscoveryStates.put(hcm.rdsName(), rdsState);
xdsClient.watchRdsResource(hcm.rdsName(), rdsState);
}
if (rdsState.isPending) {
pendingRds.add(hcm.rdsName());
}
allRds.add(hcm.rdsName());
}
}
for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
if (!allRds.contains(entry.getKey())) {
xdsClient.cancelRdsResourceWatch(entry.getKey(), entry.getValue());
}
}
routeDiscoveryStates.keySet().retainAll(allRds);
if (pendingRds.isEmpty()) {
updateSelector();
}
}
});
}
@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (stopped) {
return;
}
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Listener " + resourceName + " unavailable").asException();
handleConfigNotFound(statusException);
}
});
}
@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (stopped) {
return;
}
logger.log(Level.FINE, "Error from XdsClient", error);
if (!isServing) {
listener.onNotServing(error.asException());
}
}
});
}
private void shutdown() {
stopped = true;
cleanUpRouteDiscoveryStates();
logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
xdsClient.cancelLdsResourceWatch(resourceName, this);
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
for (SslContextProviderSupplier s: toRelease) {
s.close();
}
releaseSuppliersInFlight();
}
private void updateSelector() {
Map<FilterChain, AtomicReference<ServerRoutingConfig>> filterChainRouting = new HashMap<>();
savedRdsRoutingConfigRef.clear();
for (FilterChain filterChain: filterChains) {
filterChainRouting.put(filterChain, generateRoutingConfig(filterChain));
}
FilterChainSelector selector = new FilterChainSelector(
Collections.unmodifiableMap(filterChainRouting),
defaultFilterChain == null ? null : defaultFilterChain.sslContextProviderSupplier(),
defaultFilterChain == null ? new AtomicReference<ServerRoutingConfig>() :
generateRoutingConfig(defaultFilterChain));
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
logger.log(Level.FINEST, "Updating selector {0}", selector);
filterChainSelectorManager.updateSelector(selector);
for (SslContextProviderSupplier e: toRelease) {
e.close();
}
startDelegateServer();
}
private AtomicReference<ServerRoutingConfig> generateRoutingConfig(FilterChain filterChain) {
HttpConnectionManager hcm = filterChain.httpConnectionManager();
if (hcm.virtualHosts() != null) {
ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
hcm.httpFilterConfigs(), hcm.virtualHosts());
return new AtomicReference<>(ServerRoutingConfig.create(hcm.virtualHosts(),interceptors));
} else {
RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
checkNotNull(rds, "rds");
AtomicReference<ServerRoutingConfig> serverRoutingConfigRef = new AtomicReference<>();
if (rds.savedVirtualHosts != null) {
ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
hcm.httpFilterConfigs(), rds.savedVirtualHosts);
ServerRoutingConfig serverRoutingConfig =
ServerRoutingConfig.create(rds.savedVirtualHosts, interceptors);
serverRoutingConfigRef.set(serverRoutingConfig);
} else {
serverRoutingConfigRef.set(ServerRoutingConfig.FAILING_ROUTING_CONFIG);
}
savedRdsRoutingConfigRef.put(filterChain, serverRoutingConfigRef);
return serverRoutingConfigRef;
}
}
private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
List<NamedFilterConfig> namedFilterConfigs, List<VirtualHost> virtualHosts) {
ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
new ImmutableMap.Builder<>();
for (VirtualHost virtualHost : virtualHosts) {
for (Route route : virtualHost.routes()) {
List<ServerInterceptor> filterInterceptors = new ArrayList<>();
Map<String, FilterConfig> selectedOverrideConfigs =
new HashMap<>(virtualHost.filterConfigOverrides());
selectedOverrideConfigs.putAll(route.filterConfigOverrides());
if (namedFilterConfigs != null) {
for (NamedFilterConfig namedFilterConfig : namedFilterConfigs) {
FilterConfig filterConfig = namedFilterConfig.filterConfig;
Filter filter = filterRegistry.get(filterConfig.typeUrl());
if (filter instanceof ServerInterceptorBuilder) {
ServerInterceptor interceptor =
((ServerInterceptorBuilder) filter).buildServerInterceptor(
filterConfig, selectedOverrideConfigs.get(namedFilterConfig.name));
if (interceptor != null) {
filterInterceptors.add(interceptor);
}
} else {
logger.log(Level.WARNING, "HttpFilterConfig(type URL: "
+ filterConfig.typeUrl() + ") is not supported on server-side. "
+ "Probably a bug at ClientXdsClient verification.");
}
}
}
ServerInterceptor interceptor = combineInterceptors(filterInterceptors);
perRouteInterceptors.put(route, interceptor);
}
}
return perRouteInterceptors.buildOrThrow();
}
private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
if (interceptors.isEmpty()) {
return noopInterceptor;
}
if (interceptors.size() == 1) {
return interceptors.get(0);
}
return new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
// intercept forward
for (int i = interceptors.size() - 1; i >= 0; i--) {
next = InternalServerInterceptors.interceptCallHandlerCreate(
interceptors.get(i), next);
}
return next.startCall(call, headers);
}
};
}
private void handleConfigNotFound(StatusException exception) {
cleanUpRouteDiscoveryStates();
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
for (SslContextProviderSupplier s: toRelease) {
s.close();
}
if (restartTimer != null) {
restartTimer.cancel();
}
if (!delegate.isShutdown()) {
delegate.shutdown(); // let in-progress calls finish
}
isServing = false;
listener.onNotServing(exception);
}
private void cleanUpRouteDiscoveryStates() {
for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
String rdsName = rdsState.resourceName;
logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
xdsClient.cancelRdsResourceWatch(rdsName, rdsState);
}
routeDiscoveryStates.clear();
savedRdsRoutingConfigRef.clear();
}
private List<SslContextProviderSupplier> getSuppliersInUse() {
List<SslContextProviderSupplier> toRelease = new ArrayList<>();
FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
if (selector != null) {
for (FilterChain f: selector.getRoutingConfigs().keySet()) {
if (f.sslContextProviderSupplier() != null) {
toRelease.add(f.sslContextProviderSupplier());
}
}
SslContextProviderSupplier defaultSupplier =
selector.getDefaultSslContextProviderSupplier();
if (defaultSupplier != null) {
toRelease.add(defaultSupplier);
}
}
return toRelease;
}
private void releaseSuppliersInFlight() {
SslContextProviderSupplier supplier;
for (FilterChain filterChain : filterChains) {
supplier = filterChain.sslContextProviderSupplier();
if (supplier != null) {
supplier.close();
}
}
if (defaultFilterChain != null
&& (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
supplier.close();
}
}
private final class RouteDiscoveryState implements RdsResourceWatcher {
private final String resourceName;
private ImmutableList<VirtualHost> savedVirtualHosts;
private boolean isPending = true;
private RouteDiscoveryState(String resourceName) {
this.resourceName = checkNotNull(resourceName, "resourceName");
}
@Override
public void onChanged(final RdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
updateRdsRoutingConfig();
maybeUpdateSelector();
}
});
}
@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
savedVirtualHosts = null;
updateRdsRoutingConfig();
maybeUpdateSelector();
}
});
}
@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, error});
maybeUpdateSelector();
}
});
}
private void updateRdsRoutingConfig() {
for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
if (resourceName.equals(filterChain.httpConnectionManager().rdsName())) {
ServerRoutingConfig updatedRoutingConfig;
if (savedVirtualHosts == null) {
updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
} else {
ImmutableMap<Route, ServerInterceptor> updatedInterceptors =
generatePerRouteInterceptors(
filterChain.httpConnectionManager().httpFilterConfigs(),
savedVirtualHosts);
updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts,
updatedInterceptors);
}
logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
new Object[]{filterChain.name(), updatedRoutingConfig});
savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
}
}
}
// Update the selector to use the most recently updated configs only after all rds have been
// discovered for the first time. Later changes on rds will be applied through virtual host
// list atomic ref.
private void maybeUpdateSelector() {
isPending = false;
boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
if (isLastPending) {
updateSelector();
}
}
}
}
@VisibleForTesting
final class ConfigApplyingInterceptor implements ServerInterceptor {
private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
};
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
AtomicReference<ServerRoutingConfig> routingConfigRef =
call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
routingConfigRef.get();
if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
return new Listener<ReqT>() {};
}
List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
virtualHosts, call.getAuthority());
if (virtualHost == null) {
call.close(
Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
new Metadata());
return new Listener<ReqT>() {};
}
Route selectedRoute = null;
MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
for (Route route : virtualHost.routes()) {
if (RoutingUtils.matchRoute(
route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
selectedRoute = route;
break;
}
}
if (selectedRoute == null) {
call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
if (selectedRoute.routeAction() != null) {
call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
+ "route: only Route.non_forwarding_action should be allowed."), new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
ServerInterceptor routeInterceptor = noopInterceptor;
Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
routeInterceptor = perRouteInterceptors.get(selectedRoute);
}
return routeInterceptor.interceptCall(call, headers, next);
}
}
/**
* The HttpConnectionManager level configuration.
*/
@AutoValue
abstract static class ServerRoutingConfig {
@VisibleForTesting
static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
abstract ImmutableList<VirtualHost> virtualHosts();
// Prebuilt per route server interceptors from http filter configs.
abstract ImmutableMap<Route, ServerInterceptor> interceptors();
/**
* Server routing configuration.
* */
public static ServerRoutingConfig create(
ImmutableList<VirtualHost> virtualHosts,
ImmutableMap<Route, ServerInterceptor> interceptors) {
checkNotNull(virtualHosts, "virtualHosts");
checkNotNull(interceptors, "interceptors");
return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
}
}
}