blob: 2e7d337872b4255c55a45bbe62e9913b80da5331 [file] [log] [blame]
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.JsonFormat;
import com.google.rpc.Code;
import io.envoyproxy.envoy.api.v2.Cluster;
import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
import io.envoyproxy.envoy.api.v2.listener.FilterChainMatch;
import io.envoyproxy.envoy.api.v2.route.Route;
import io.envoyproxy.envoy.api.v2.route.VirtualHost;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.RouteAction;
import io.grpc.xds.EnvoyProtoData.RouteMatch;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
final class XdsClientImpl extends XdsClient {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
@VisibleForTesting
static final String ADS_TYPE_URL_LDS = "type.googleapis.com/envoy.api.v2.Listener";
@VisibleForTesting
static final String ADS_TYPE_URL_RDS =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
@VisibleForTesting
static final String ADS_TYPE_URL_CDS = "type.googleapis.com/envoy.api.v2.Cluster";
@VisibleForTesting
static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
// For now we do not support path matching unless enabled manually.
private static final boolean ENABLE_PATH_MATCHING = Boolean.parseBoolean(
System.getenv("ENABLE_EXPERIMENTAL_PATH_MATCHING"));
private final MessagePrinter respPrinter = new MessagePrinter();
private final InternalLogId logId;
private final XdsLogger logger;
// Name of the target server this gRPC client is trying to talk to.
private final String targetName;
private final ManagedChannel channel;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier;
private final Stopwatch adsStreamRetryStopwatch;
// The node identifier to be included in xDS requests. Management server only requires the
// first request to carry the node identifier on a stream. It should be identical if present
// more than once.
private Node node;
// Cached data for CDS responses, keyed by cluster names.
// Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead
// of whole Cluster messages to reduce memory usage.
private final Map<String, ClusterUpdate> clusterNamesToClusterUpdates = new HashMap<>();
// Cached CDS resources that are known to be absent.
private final Set<String> absentCdsResources = new HashSet<>();
// Cached data for EDS responses, keyed by cluster names.
// CDS responses indicate absence of clusters and EDS responses indicate presence of clusters.
// Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead
// of whole ClusterLoadAssignment messages to reduce memory usage.
private final Map<String, EndpointUpdate> clusterNamesToEndpointUpdates = new HashMap<>();
// Cached EDS resources that are known to be absent.
private final Set<String> absentEdsResources = new HashSet<>();
// Cluster watchers waiting for cluster information updates. Multiple cluster watchers
// can watch on information for the same cluster.
private final Map<String, Set<ClusterWatcher>> clusterWatchers = new HashMap<>();
// Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint
// watchers can watch endpoints in the same cluster.
private final Map<String, Set<EndpointWatcher>> endpointWatchers = new HashMap<>();
// Resource fetch timers are used to conclude absence of resources. Each timer is activated when
// subscription for the resource starts and disarmed on first update for the resource.
// Timers for concluding CDS resources not found.
private final Map<String, ScheduledHandle> cdsRespTimers = new HashMap<>();
// Timers for concluding EDS resources not found.
private final Map<String, ScheduledHandle> edsRespTimers = new HashMap<>();
// Timer for concluding the currently requesting LDS resource not found.
@Nullable
private ScheduledHandle ldsRespTimer;
// Timer for concluding the currently requesting RDS resource not found.
@Nullable
private ScheduledHandle rdsRespTimer;
@Nullable
private AdsStream adsStream;
@Nullable
private BackoffPolicy retryBackoffPolicy;
@Nullable
private ScheduledHandle rpcRetryTimer;
@Nullable
private LoadReportClient lrsClient;
// Following fields are set only after the ConfigWatcher registered. Once set, they should
// never change. Only a ConfigWatcher or ListenerWatcher can be registered.
@Nullable
private ConfigWatcher configWatcher;
// The "xds:" URI (including port suffix if present) that the gRPC client targets for.
@Nullable
private String ldsResourceName;
// only a ConfigWatcher or ListenerWatcher can be registered.
@Nullable
private ListenerWatcher listenerWatcher;
private int listenerPort = -1;
XdsClientImpl(
String targetName,
List<ServerInfo> servers, // list of management servers
XdsChannelFactory channelFactory,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.targetName = checkNotNull(targetName, "targetName");
this.channel =
checkNotNull(channelFactory, "channelFactory")
.createChannel(checkNotNull(servers, "servers"));
this.node = checkNotNull(node, "node");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatch");
adsStreamRetryStopwatch = stopwatchSupplier.get();
logId = InternalLogId.allocate("xds-client", targetName);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
@Override
void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutting down");
channel.shutdown();
if (adsStream != null) {
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
}
cleanUpResources();
if (lrsClient != null) {
lrsClient.stopLoadReporting();
lrsClient = null;
}
if (rpcRetryTimer != null) {
rpcRetryTimer.cancel();
}
}
/**
* Purge cache for resources and cancel resource fetch timers.
*/
private void cleanUpResources() {
clusterNamesToClusterUpdates.clear();
absentCdsResources.clear();
clusterNamesToEndpointUpdates.clear();
absentEdsResources.clear();
if (ldsRespTimer != null) {
ldsRespTimer.cancel();
ldsRespTimer = null;
}
if (rdsRespTimer != null) {
rdsRespTimer.cancel();
rdsRespTimer = null;
}
for (ScheduledHandle handle : cdsRespTimers.values()) {
handle.cancel();
}
cdsRespTimers.clear();
for (ScheduledHandle handle : edsRespTimers.values()) {
handle.cancel();
}
edsRespTimers.clear();
}
@Override
void watchConfigData(String targetAuthority, ConfigWatcher watcher) {
checkState(configWatcher == null, "watcher for %s already registered", targetAuthority);
checkState(listenerWatcher == null, "ListenerWatcher already registered");
ldsResourceName = checkNotNull(targetAuthority, "targetAuthority");
configWatcher = checkNotNull(watcher, "watcher");
logger.log(XdsLogLevel.INFO, "Started watching config {0}", ldsResourceName);
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
ldsRespTimer =
syncContext
.schedule(
new LdsResourceFetchTimeoutTask(ldsResourceName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
@Override
void watchClusterData(String clusterName, ClusterWatcher watcher) {
checkNotNull(clusterName, "clusterName");
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!clusterWatchers.containsKey(clusterName)) {
logger.log(XdsLogLevel.INFO, "Start watching cluster {0}", clusterName);
needRequest = true;
clusterWatchers.put(clusterName, new HashSet<ClusterWatcher>());
}
Set<ClusterWatcher> watchers = clusterWatchers.get(clusterName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", clusterName);
watchers.add(watcher);
// If local cache contains cluster information to be watched, notify the watcher immediately.
if (absentCdsResources.contains(clusterName)) {
logger.log(XdsLogLevel.DEBUG, "Cluster resource {0} is known to be absent", clusterName);
watcher.onError(
Status.NOT_FOUND
.withDescription(
"Cluster resource [" + clusterName + "] not found."));
return;
}
if (clusterNamesToClusterUpdates.containsKey(clusterName)) {
logger.log(XdsLogLevel.DEBUG, "Retrieve cluster info {0} from local cache", clusterName);
watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName));
return;
}
if (needRequest) {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new CdsResourceFetchTimeoutTask(clusterName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
cdsRespTimers.put(clusterName, timeoutHandle);
}
}
@Override
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<ClusterWatcher> watchers = clusterWatchers.get(clusterName);
checkState(
watchers != null && watchers.contains(watcher),
"watcher for %s was not registered", clusterName);
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(XdsLogLevel.INFO, "Stop watching cluster {0}", clusterName);
clusterWatchers.remove(clusterName);
// Remove the corresponding CDS entry.
absentCdsResources.remove(clusterName);
clusterNamesToClusterUpdates.remove(clusterName);
// Cancel and delete response timer waiting for the corresponding resource.
if (cdsRespTimers.containsKey(clusterName)) {
cdsRespTimers.get(clusterName).cancel();
cdsRespTimers.remove(clusterName);
}
// No longer interested in this cluster, send an updated CDS request to unsubscribe
// this resource.
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
checkState(adsStream != null,
"Severe bug: ADS stream was not created while an endpoint watcher was registered");
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
}
}
@Override
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!endpointWatchers.containsKey(clusterName)) {
logger.log(XdsLogLevel.INFO, "Start watching endpoints in cluster {0}", clusterName);
needRequest = true;
endpointWatchers.put(clusterName, new HashSet<EndpointWatcher>());
}
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", clusterName);
watchers.add(watcher);
// If local cache contains endpoint information for the cluster to be watched, notify
// the watcher immediately.
if (absentEdsResources.contains(clusterName)) {
logger.log(
XdsLogLevel.DEBUG,
"Endpoint resource for cluster {0} is known to be absent.", clusterName);
watcher.onError(
Status.NOT_FOUND
.withDescription(
"Endpoint resource for cluster " + clusterName + " not found."));
return;
}
if (clusterNamesToEndpointUpdates.containsKey(clusterName)) {
logger.log(
XdsLogLevel.DEBUG,
"Retrieve endpoints info for cluster {0} from local cache.", clusterName);
watcher.onEndpointChanged(clusterNamesToEndpointUpdates.get(clusterName));
return;
}
if (needRequest) {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new EdsResourceFetchTimeoutTask(clusterName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
edsRespTimers.put(clusterName, timeoutHandle);
}
}
@Override
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
checkState(
watchers != null && watchers.contains(watcher),
"watcher for %s was not registered", clusterName);
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(XdsLogLevel.INFO, "Stop watching endpoints in cluster {0}", clusterName);
endpointWatchers.remove(clusterName);
// Remove the corresponding EDS cache entry.
absentEdsResources.remove(clusterName);
clusterNamesToEndpointUpdates.remove(clusterName);
// Cancel and delete response timer waiting for the corresponding resource.
if (edsRespTimers.containsKey(clusterName)) {
edsRespTimers.get(clusterName).cancel();
edsRespTimers.remove(clusterName);
}
// No longer interested in this cluster, send an updated EDS request to unsubscribe
// this resource.
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
}
}
@Override
void watchListenerData(int port, ListenerWatcher watcher) {
checkState(configWatcher == null,
"ListenerWatcher cannot be set when ConfigWatcher set");
checkState(listenerWatcher == null, "ListenerWatcher already registered");
listenerWatcher = checkNotNull(watcher, "watcher");
checkArgument(port > 0, "port needs to be > 0");
this.listenerPort = port;
logger.log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
updateNodeMetadataForListenerRequest(port);
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.<String>of());
ldsRespTimer =
syncContext
.schedule(
new ListenerResourceFetchTimeoutTask(":" + port),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
/** In case of Listener watcher metadata to be updated to include port. */
private void updateNodeMetadataForListenerRequest(int port) {
Struct newMetadata = node.getMetadata().toBuilder()
.putFields("TRAFFICDIRECTOR_PROXYLESS",
Value.newBuilder().setStringValue("1").build())
.build();
Address listeningAddress =
Address.newBuilder()
.setSocketAddress(
SocketAddress.newBuilder().setAddress("0.0.0.0").setPortValue(port).build())
.build();
node =
node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build();
}
@Override
void reportClientStats(
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
if (lrsClient == null) {
lrsClient =
new LoadReportClient(
logId,
targetName,
channel,
node,
syncContext,
timeService,
backoffPolicyProvider,
stopwatchSupplier);
lrsClient.startLoadReporting(new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {}
});
}
logger.log(
XdsLogLevel.INFO,
"Report loads for cluster: {0}, cluster_service: {1}", clusterName, clusterServiceName);
lrsClient.addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore);
}
@Override
void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) {
checkState(lrsClient != null, "load reporting was never started");
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
lrsClient.removeLoadStatsStore(clusterName, clusterServiceName);
// TODO(chengyuanzhang): can be optimized to stop load reporting if no more loads need
// to be reported.
}
@Override
public String toString() {
return logId.toString();
}
/**
* Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication.
*/
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
adsStream = new AdsStream(stub);
adsStream.start();
logger.log(XdsLogLevel.INFO, "ADS stream started");
adsStreamRetryStopwatch.reset().start();
}
/**
* Calls handleLdsResponseForListener or handleLdsResponseForConfigUpdate based on which watcher
* was set.
*/
private void handleLdsResponse(DiscoveryResponse ldsResponse) {
checkState((configWatcher != null) != (listenerWatcher != null),
"No LDS request was ever sent. Management server is doing something wrong");
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG, "Received LDS response:\n{0}", respPrinter.print(ldsResponse));
}
if (listenerWatcher != null) {
handleLdsResponseForListener(ldsResponse);
} else {
handleLdsResponseForConfigUpdate(ldsResponse);
}
}
/**
* Handles LDS response to find the HttpConnectionManager message for the requested resource name.
* Proceed with the resolved RouteConfiguration in HttpConnectionManager message of the requested
* listener, if exists, to find the VirtualHost configuration for the "xds:" URI
* (with the port, if any, stripped off). Or sends an RDS request if configured for dynamic
* resolution. The response is NACKed if contains invalid data for gRPC's usage. Otherwise, an
* ACK request is sent to management server.
*/
private void handleLdsResponseForConfigUpdate(DiscoveryResponse ldsResponse) {
checkState(ldsResourceName != null && configWatcher != null,
"LDS request for ConfigWatcher was never sent!");
// Unpack Listener messages.
List<Listener> listeners = new ArrayList<>(ldsResponse.getResourcesCount());
List<String> listenerNames = new ArrayList<>(ldsResponse.getResourcesCount());
try {
for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) {
Listener listener = res.unpack(Listener.class);
listeners.add(listener);
listenerNames.add(listener.getName());
}
} catch (InvalidProtocolBufferException e) {
logger.log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e);
adsStream.sendNackRequest(
ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
ldsResponse.getVersionInfo(), "Malformed LDS response: " + e);
return;
}
logger.log(XdsLogLevel.INFO, "Received LDS response for resources: {0}", listenerNames);
// Unpack HttpConnectionManager messages.
HttpConnectionManager requestedHttpConnManager = null;
try {
for (Listener listener : listeners) {
HttpConnectionManager hm =
listener.getApiListener().getApiListener().unpack(HttpConnectionManager.class);
if (listener.getName().equals(ldsResourceName)) {
requestedHttpConnManager = hm;
}
}
} catch (InvalidProtocolBufferException e) {
logger.log(
XdsLogLevel.WARNING,
"Failed to unpack HttpConnectionManagers in Listeners of LDS response {0}", e);
adsStream.sendNackRequest(
ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
ldsResponse.getVersionInfo(), "Malformed LDS response: " + e);
return;
}
String errorMessage = null;
// Routes found in the in-lined RouteConfiguration, if exists.
List<EnvoyProtoData.Route> routes = null;
// RouteConfiguration name to be used as the resource name for RDS request, if exists.
String rdsRouteConfigName = null;
// Process the requested Listener if exists, either extract cluster information from in-lined
// RouteConfiguration message or send an RDS request for dynamic resolution.
if (requestedHttpConnManager != null) {
logger.log(XdsLogLevel.DEBUG, "Found http connection manager");
// The HttpConnectionManager message must either provide the RouteConfiguration directly
// in-line or tell the client to use RDS to obtain it.
// TODO(chengyuanzhang): if both route_config and rds are set, it should be either invalid
// data or one supersedes the other. TBD.
if (requestedHttpConnManager.hasRouteConfig()) {
RouteConfiguration rc = requestedHttpConnManager.getRouteConfig();
routes = findRoutesInRouteConfig(rc, ldsResourceName);
String errorDetail = validateRoutes(routes);
if (errorDetail != null) {
errorMessage =
"Listener " + ldsResourceName + " : cannot find a valid cluster name in any "
+ "virtual hosts inside RouteConfiguration with domains matching: "
+ ldsResourceName
+ " with the reason : " + errorDetail;
}
} else if (requestedHttpConnManager.hasRds()) {
Rds rds = requestedHttpConnManager.getRds();
if (!rds.getConfigSource().hasAds()) {
errorMessage =
"Listener " + ldsResourceName + " : for using RDS, config_source must be "
+ "set to use ADS.";
} else {
rdsRouteConfigName = rds.getRouteConfigName();
}
} else {
errorMessage = "Listener " + ldsResourceName + " : HttpConnectionManager message must "
+ "either provide the RouteConfiguration directly in-line or tell the client to "
+ "use RDS to obtain it.";
}
}
if (errorMessage != null) {
adsStream.sendNackRequest(
ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
ldsResponse.getVersionInfo(), errorMessage);
return;
}
adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
ldsResponse.getVersionInfo());
if (routes != null || rdsRouteConfigName != null) {
if (ldsRespTimer != null) {
ldsRespTimer.cancel();
ldsRespTimer = null;
}
}
if (routes != null) {
// Found clusterName in the in-lined RouteConfiguration.
String clusterName = routes.get(routes.size() - 1).getRouteAction().get().getCluster();
if (!ENABLE_PATH_MATCHING) {
logger.log(
XdsLogLevel.INFO,
"Found cluster name (inlined in route config): {0}", clusterName);
} else {
logger.log(
XdsLogLevel.INFO,
"Found routes (inlined in route config): {0}", routes);
}
ConfigUpdate configUpdate = ConfigUpdate.newBuilder()
.setClusterName(clusterName).addRoutes(routes).build();
configWatcher.onConfigChanged(configUpdate);
} else if (rdsRouteConfigName != null) {
// Send an RDS request if the resource to request has changed.
if (!rdsRouteConfigName.equals(adsStream.rdsResourceName)) {
logger.log(
XdsLogLevel.INFO,
"Use RDS to dynamically resolve route config, resource name: {0}", rdsRouteConfigName);
adsStream.sendXdsRequest(ADS_TYPE_URL_RDS, ImmutableList.of(rdsRouteConfigName));
// Cancel the timer for fetching the previous RDS resource.
if (rdsRespTimer != null) {
rdsRespTimer.cancel();
}
rdsRespTimer =
syncContext
.schedule(
new RdsResourceFetchTimeoutTask(rdsRouteConfigName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
} else {
// The requested Listener is removed by management server.
if (ldsRespTimer == null) {
configWatcher.onError(
Status.NOT_FOUND.withDescription(
"Listener resource for listener " + ldsResourceName + " does not exist"));
}
}
}
private void handleLdsResponseForListener(DiscoveryResponse ldsResponse) {
checkState(ldsResourceName == null && listenerPort > 0 && listenerWatcher != null,
"LDS request for ListenerWatcher was never sent!");
// Unpack Listener messages.
Listener requestedListener = null;
logger.log(XdsLogLevel.DEBUG, "Listener count: {0}", ldsResponse.getResourcesCount());
try {
for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) {
Listener listener = res.unpack(Listener.class);
logger.log(XdsLogLevel.DEBUG, "Found listener {0}", listener.toString());
if (isRequestedListener(listener)) {
requestedListener = listener;
logger.log(XdsLogLevel.DEBUG, "Requested listener found: {0}", listener.getName());
}
}
} catch (InvalidProtocolBufferException e) {
logger.log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e);
adsStream.sendNackRequest(
ADS_TYPE_URL_LDS, ImmutableList.<String>of(),
ldsResponse.getVersionInfo(), "Malformed LDS response: " + e);
return;
}
adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.<String>of(),
ldsResponse.getVersionInfo());
if (requestedListener != null) {
if (ldsRespTimer != null) {
ldsRespTimer.cancel();
ldsRespTimer = null;
}
ListenerUpdate listenerUpdate = ListenerUpdate.newBuilder()
.setListener(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(requestedListener))
.build();
listenerWatcher.onListenerChanged(listenerUpdate);
} else {
if (ldsRespTimer == null) {
listenerWatcher.onError(Status.NOT_FOUND.withDescription("did not find listener for "
+ listenerPort));
}
}
}
private boolean isRequestedListener(Listener listener) {
// TODO(sanjaypujare): check listener.getName() once we know what xDS server returns
return isAddressMatching(listener.getAddress())
&& hasMatchingFilter(listener.getFilterChainsList());
}
private boolean isAddressMatching(Address address) {
// TODO(sanjaypujare): check IP address once we know xDS server will include it
return address.hasSocketAddress()
&& (address.getSocketAddress().getPortValue() == listenerPort);
}
private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
// TODO(sanjaypujare): if myIp to be checked against filterChainMatch.getPrefixRangesList()
for (FilterChain filterChain : filterChainsList) {
FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch();
if (listenerPort == filterChainMatch.getDestinationPort().getValue()) {
return true;
}
}
return false;
}
/**
* Handles RDS response to find the RouteConfiguration message for the requested resource name.
* Proceed with the resolved RouteConfiguration if exists to find the VirtualHost configuration
* for the "xds:" URI (with the port, if any, stripped off). The response is NACKed if contains
* invalid data for gRPC's usage. Otherwise, an ACK request is sent to management server.
*/
private void handleRdsResponse(DiscoveryResponse rdsResponse) {
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received RDS response:\n{0}", respPrinter.print(rdsResponse));
}
checkState(adsStream.rdsResourceName != null,
"Never requested for RDS resources, management server is doing something wrong");
// Unpack RouteConfiguration messages.
List<String> routeConfigNames = new ArrayList<>(rdsResponse.getResourcesCount());
RouteConfiguration requestedRouteConfig = null;
try {
for (com.google.protobuf.Any res : rdsResponse.getResourcesList()) {
RouteConfiguration rc = res.unpack(RouteConfiguration.class);
routeConfigNames.add(rc.getName());
if (rc.getName().equals(adsStream.rdsResourceName)) {
requestedRouteConfig = rc;
}
}
} catch (InvalidProtocolBufferException e) {
logger.log(
XdsLogLevel.WARNING, "Failed to unpack RouteConfiguration in RDS response {0}", e);
adsStream.sendNackRequest(
ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
rdsResponse.getVersionInfo(), "Malformed RDS response: " + e);
return;
}
logger.log(
XdsLogLevel.INFO, "Received RDS response for resources: {0}", routeConfigNames);
// Resolved cluster name for the requested resource, if exists.
List<EnvoyProtoData.Route> routes = null;
if (requestedRouteConfig != null) {
routes = findRoutesInRouteConfig(requestedRouteConfig, ldsResourceName);
String errorDetail = validateRoutes(routes);
if (errorDetail != null) {
adsStream.sendNackRequest(
ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
rdsResponse.getVersionInfo(),
"RouteConfiguration " + requestedRouteConfig.getName() + ": cannot find a "
+ "valid cluster name in any virtual hosts with domains matching: "
+ ldsResourceName
+ " with the reason: " + errorDetail);
return;
}
}
adsStream.sendAckRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
rdsResponse.getVersionInfo());
// Notify the ConfigWatcher if this RDS response contains the most recently requested
// RDS resource.
if (routes != null) {
if (rdsRespTimer != null) {
rdsRespTimer.cancel();
rdsRespTimer = null;
}
// Found clusterName in the in-lined RouteConfiguration.
String clusterName = routes.get(routes.size() - 1).getRouteAction().get().getCluster();
if (!ENABLE_PATH_MATCHING) {
logger.log(XdsLogLevel.INFO, "Found cluster name: {0}", clusterName);
} else {
logger.log(XdsLogLevel.INFO, "Found {0} routes", routes.size());
logger.log(XdsLogLevel.DEBUG, "Found routes: {0}", routes);
}
ConfigUpdate configUpdate = ConfigUpdate.newBuilder()
.setClusterName(clusterName).addRoutes(routes).build();
configWatcher.onConfigChanged(configUpdate);
}
}
/**
* Processes a RouteConfiguration message to find the routes that requests for the given host will
* be routed to.
*/
@VisibleForTesting
static List<EnvoyProtoData.Route> findRoutesInRouteConfig(
RouteConfiguration config, String hostName) {
List<VirtualHost> virtualHosts = config.getVirtualHostsList();
// Domain search order:
// 1. Exact domain names: ``www.foo.com``.
// 2. Suffix domain wildcards: ``*.foo.com`` or ``*-bar.foo.com``.
// 3. Prefix domain wildcards: ``foo.*`` or ``foo-*``.
// 4. Special wildcard ``*`` matching any domain.
//
// The longest wildcards match first.
// Assuming only a single virtual host in the entire route configuration can match
// on ``*`` and a domain must be unique across all virtual hosts.
int matchingLen = -1; // longest length of wildcard pattern that matches host name
boolean exactMatchFound = false; // true if a virtual host with exactly matched domain found
VirtualHost targetVirtualHost = null; // target VirtualHost with longest matched domain
for (VirtualHost vHost : virtualHosts) {
for (String domain : vHost.getDomainsList()) {
boolean selected = false;
if (matchHostName(hostName, domain)) { // matching
if (!domain.contains("*")) { // exact matching
exactMatchFound = true;
targetVirtualHost = vHost;
break;
} else if (domain.length() > matchingLen) { // longer matching pattern
selected = true;
} else if (domain.length() == matchingLen && domain.startsWith("*")) { // suffix matching
selected = true;
}
}
if (selected) {
matchingLen = domain.length();
targetVirtualHost = vHost;
}
}
if (exactMatchFound) {
break;
}
}
List<EnvoyProtoData.Route> routes = new ArrayList<>();
// Proceed with the virtual host that has longest wildcard matched domain name with the
// hostname in original "xds:" URI.
// Note we would consider upstream cluster not found if the virtual host is not configured
// correctly for gRPC, even if there exist other virtual hosts with (lower priority)
// matching domains.
if (targetVirtualHost != null) {
List<Route> routesProto = targetVirtualHost.getRoutesList();
for (Route route : routesProto) {
routes.add(EnvoyProtoData.Route.fromEnvoyProtoRoute(route));
}
}
return routes;
}
/**
* Validates the given list of routes and returns error details if there's any error.
*/
@Nullable
private static String validateRoutes(List<EnvoyProtoData.Route> routes) {
if (routes.isEmpty()) {
return "No routes found";
}
// We only validate the default route unless path matching is enabled.
if (!ENABLE_PATH_MATCHING) {
EnvoyProtoData.Route route = routes.get(routes.size() - 1);
RouteMatch routeMatch = route.getRouteMatch();
if (!routeMatch.getPath().isEmpty() || !routeMatch.getPrefix().isEmpty()
|| routeMatch.hasRegex()) {
return "The last route must be the default route";
}
if (!route.getRouteAction().isPresent()) {
return "Route action is not specified for the default route";
}
if (route.getRouteAction().get().getCluster().isEmpty()) {
return "Cluster is not specified for the default route";
}
return null;
}
// We do more validation if path matching is enabled, but whether every single route is required
// to be valid for grpc is TBD.
// For now we consider the whole list invalid if anything invalid for grpc is found.
// TODO(zdapeng): Fix it if the decision is different from current implementation.
// TODO(zdapeng): Add test for validation.
Set<String> prefixMatches = new HashSet<>();
Set<String> pathMatches = new HashSet<>();
for (int i = 0; i < routes.size(); i++) {
EnvoyProtoData.Route route = routes.get(i);
if (!route.getRouteAction().isPresent()) {
return "Route action is not specified for one of the routes";
}
RouteMatch routeMatch = route.getRouteMatch();
String prefix = routeMatch.getPrefix();
String path = routeMatch.getPath();
if (!prefix.isEmpty()) {
if (!prefix.startsWith("/") || !prefix.endsWith("/") || prefix.length() < 3) {
return "Prefix route match must be in the format of '/service/'";
}
if (prefixMatches.contains(prefix)) {
return "Duplicate prefix match found";
}
prefixMatches.add(prefix);
} else if (!path.isEmpty()) {
int lastSlash = path.lastIndexOf('/');
if (!path.startsWith("/") || lastSlash == 0 || lastSlash == path.length() - 1) {
return "Path route match must be in the format of '/service/method'";
}
if (pathMatches.contains(path)) {
return "Duplicate path match found";
}
pathMatches.add(path);
} else if (routeMatch.hasRegex()) {
return "Regex route match not supported";
} else { // Default route match
if (i != routes.size() - 1) {
return "Default route found but is not the last route in the route list";
}
}
if (i == routes.size() - 1) {
if (!prefix.isEmpty() || !path.isEmpty()) {
return "The last route must be the default route";
}
}
RouteAction routeAction = route.getRouteAction().get();
if (routeAction.getCluster().isEmpty() && routeAction.getWeightedCluster().isEmpty()) {
return "Either cluster or weighted cluster route action must be provided";
}
}
return null;
}
/**
* Handles CDS response, which contains a list of Cluster messages with information for a logical
* cluster. The response is NACKed if messages for requested resources contain invalid
* information for gRPC's usage. Otherwise, an ACK request is sent to management server.
* Response data for requested clusters is cached locally, in case of new cluster watchers
* interested in the same clusters are added later.
*/
private void handleCdsResponse(DiscoveryResponse cdsResponse) {
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received CDS response:\n{0}", respPrinter.print(cdsResponse));
}
adsStream.cdsRespNonce = cdsResponse.getNonce();
// Unpack Cluster messages.
List<Cluster> clusters = new ArrayList<>(cdsResponse.getResourcesCount());
List<String> clusterNames = new ArrayList<>(cdsResponse.getResourcesCount());
try {
for (com.google.protobuf.Any res : cdsResponse.getResourcesList()) {
Cluster cluster = res.unpack(Cluster.class);
clusters.add(cluster);
clusterNames.add(cluster.getName());
}
} catch (InvalidProtocolBufferException e) {
logger.log(XdsLogLevel.WARNING, "Failed to unpack Clusters in CDS response {0}", e);
adsStream.sendNackRequest(
ADS_TYPE_URL_CDS, clusterWatchers.keySet(),
cdsResponse.getVersionInfo(), "Malformed CDS response: " + e);
return;
}
logger.log(XdsLogLevel.INFO, "Received CDS response for resources: {0}", clusterNames);
String errorMessage = null;
// Cluster information update for requested clusters received in this CDS response.
Map<String, ClusterUpdate> clusterUpdates = new HashMap<>();
// CDS responses represents the state of the world, EDS services not referenced by
// Clusters are those no longer exist.
Set<String> edsServices = new HashSet<>();
for (Cluster cluster : clusters) {
String clusterName = cluster.getName();
// Skip information for clusters not requested.
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!clusterWatchers.containsKey(clusterName)) {
continue;
}
ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
// The type field must be set to EDS.
if (!cluster.getType().equals(DiscoveryType.EDS)) {
errorMessage = "Cluster " + clusterName + " : only EDS discovery type is supported "
+ "in gRPC.";
break;
}
// In the eds_cluster_config field, the eds_config field must be set to indicate to
// use EDS (must be set to use ADS).
EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig();
if (!edsClusterConfig.getEdsConfig().hasAds()) {
errorMessage = "Cluster " + clusterName + " : field eds_cluster_config must be set to "
+ "indicate to use EDS over ADS.";
break;
}
// If the service_name field is set, that value will be used for the EDS request.
if (!edsClusterConfig.getServiceName().isEmpty()) {
updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName());
edsServices.add(edsClusterConfig.getServiceName());
} else {
edsServices.add(clusterName);
}
// The lb_policy field must be set to ROUND_ROBIN.
if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) {
errorMessage = "Cluster " + clusterName + " : only round robin load balancing policy is "
+ "supported in gRPC.";
break;
}
updateBuilder.setLbPolicy("round_robin");
// If the lrs_server field is set, it must have its self field set, in which case the
// client should use LRS for load reporting. Otherwise (the lrs_server field is not set),
// LRS load reporting will be disabled.
if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) {
errorMessage = "Cluster " + clusterName + " : only support enabling LRS for the same "
+ "management server.";
break;
}
updateBuilder.setLrsServerName("");
}
if (cluster.hasTlsContext()) {
updateBuilder.setUpstreamTlsContext(cluster.getTlsContext());
}
clusterUpdates.put(clusterName, updateBuilder.build());
}
if (errorMessage != null) {
adsStream.sendNackRequest(
ADS_TYPE_URL_CDS, clusterWatchers.keySet(), cdsResponse.getVersionInfo(), errorMessage);
return;
}
adsStream.sendAckRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet(),
cdsResponse.getVersionInfo());
// Update local CDS cache with data in this response.
absentCdsResources.removeAll(clusterUpdates.keySet());
for (String clusterName : clusterNamesToClusterUpdates.keySet()) {
if (!clusterUpdates.containsKey(clusterName)) {
// Some previously existing resource no longer exists.
absentCdsResources.add(clusterName);
}
}
clusterNamesToClusterUpdates.clear();
clusterNamesToClusterUpdates.putAll(clusterUpdates);
// Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
for (String clusterName : clusterNamesToEndpointUpdates.keySet()) {
if (!edsServices.contains(clusterName)) {
absentEdsResources.add(clusterName);
// Notify EDS resource removal to watchers.
if (endpointWatchers.containsKey(clusterName)) {
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
for (EndpointWatcher watcher : watchers) {
watcher.onError(
Status.NOT_FOUND
.withDescription(
"Endpoint resource for cluster " + clusterName + " is deleted."));
}
}
}
}
clusterNamesToEndpointUpdates.keySet().retainAll(edsServices);
for (String clusterName : clusterUpdates.keySet()) {
if (cdsRespTimers.containsKey(clusterName)) {
cdsRespTimers.get(clusterName).cancel();
cdsRespTimers.remove(clusterName);
}
}
// Notify watchers if clusters interested in present in this CDS response.
for (Map.Entry<String, Set<ClusterWatcher>> entry : clusterWatchers.entrySet()) {
String clusterName = entry.getKey();
if (clusterUpdates.containsKey(clusterName)) {
ClusterUpdate clusterUpdate = clusterUpdates.get(clusterName);
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onClusterChanged(clusterUpdate);
}
} else if (!cdsRespTimers.containsKey(clusterName)) {
// Update for previously present resource being removed.
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onError(
Status.NOT_FOUND
.withDescription("Cluster resource " + clusterName + " not found."));
}
}
}
}
/**
* Handles EDS response, which contains a list of ClusterLoadAssignment messages with
* endpoint load balancing information for each cluster. The response is NACKed if messages
* for requested resources contain invalid information for gRPC's usage. Otherwise,
* an ACK request is sent to management server. Response data for requested clusters is
* cached locally, in case of new endpoint watchers interested in the same clusters
* are added later.
*/
private void handleEdsResponse(DiscoveryResponse edsResponse) {
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received EDS response:\n{0}", respPrinter.print(edsResponse));
}
// Unpack ClusterLoadAssignment messages.
List<ClusterLoadAssignment> clusterLoadAssignments =
new ArrayList<>(edsResponse.getResourcesCount());
List<String> claNames = new ArrayList<>(edsResponse.getResourcesCount());
try {
for (com.google.protobuf.Any res : edsResponse.getResourcesList()) {
ClusterLoadAssignment assignment = res.unpack(ClusterLoadAssignment.class);
clusterLoadAssignments.add(assignment);
claNames.add(assignment.getClusterName());
}
} catch (InvalidProtocolBufferException e) {
logger.log(
XdsLogLevel.WARNING, "Failed to unpack ClusterLoadAssignments in EDS response {0}", e);
adsStream.sendNackRequest(
ADS_TYPE_URL_EDS, endpointWatchers.keySet(),
edsResponse.getVersionInfo(), "Malformed EDS response: " + e);
return;
}
logger.log(XdsLogLevel.INFO, "Received EDS response for resources: {0}", claNames);
String errorMessage = null;
// Endpoint information updates for requested clusters received in this EDS response.
Map<String, EndpointUpdate> endpointUpdates = new HashMap<>();
// Walk through each ClusterLoadAssignment message. If any of them for requested clusters
// contain invalid information for gRPC's load balancing usage, the whole response is rejected.
for (ClusterLoadAssignment assignment : clusterLoadAssignments) {
String clusterName = assignment.getClusterName();
// Skip information for clusters not requested.
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!endpointWatchers.containsKey(clusterName)) {
continue;
}
EndpointUpdate.Builder updateBuilder = EndpointUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
if (assignment.getEndpointsCount() == 0) {
errorMessage = "ClusterLoadAssignment " + clusterName + " : no locality endpoints.";
break;
}
Set<Integer> priorities = new HashSet<>();
int maxPriority = -1;
for (io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints localityLbEndpoints
: assignment.getEndpointsList()) {
// Filter out localities without or with 0 weight.
if (!localityLbEndpoints.hasLoadBalancingWeight()
|| localityLbEndpoints.getLoadBalancingWeight().getValue() < 1) {
continue;
}
int localityPriority = localityLbEndpoints.getPriority();
if (localityPriority < 0) {
errorMessage =
"ClusterLoadAssignment " + clusterName + " : locality with negative priority.";
break;
}
maxPriority = Math.max(maxPriority, localityPriority);
priorities.add(localityPriority);
// The endpoint field of each lb_endpoints must be set.
// Inside of it: the address field must be set.
for (io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint lbEndpoint
: localityLbEndpoints.getLbEndpointsList()) {
if (!lbEndpoint.getEndpoint().hasAddress()) {
errorMessage = "ClusterLoadAssignment " + clusterName + " : endpoint with no address.";
break;
}
}
if (errorMessage != null) {
break;
}
// Note endpoints with health status other than UNHEALTHY and UNKNOWN are still
// handed over to watching parties. It is watching parties' responsibility to
// filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy().
updateBuilder.addLocalityLbEndpoints(
Locality.fromEnvoyProtoLocality(localityLbEndpoints.getLocality()),
LocalityLbEndpoints.fromEnvoyProtoLocalityLbEndpoints(localityLbEndpoints));
}
if (errorMessage != null) {
break;
}
if (priorities.size() != maxPriority + 1) {
errorMessage = "ClusterLoadAssignment " + clusterName + " : sparse priorities.";
break;
}
for (io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload dropOverload
: assignment.getPolicy().getDropOverloadsList()) {
updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload));
}
EndpointUpdate update = updateBuilder.build();
endpointUpdates.put(clusterName, update);
}
if (errorMessage != null) {
adsStream.sendNackRequest(
ADS_TYPE_URL_EDS, endpointWatchers.keySet(), edsResponse.getVersionInfo(), errorMessage);
return;
}
adsStream.sendAckRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet(),
edsResponse.getVersionInfo());
// Update local EDS cache by inserting updated endpoint information.
clusterNamesToEndpointUpdates.putAll(endpointUpdates);
absentEdsResources.removeAll(endpointUpdates.keySet());
// Notify watchers waiting for updates of endpoint information received in this EDS response.
for (Map.Entry<String, EndpointUpdate> entry : endpointUpdates.entrySet()) {
String clusterName = entry.getKey();
// Cancel and delete response timeout timer.
if (edsRespTimers.containsKey(clusterName)) {
edsRespTimers.get(clusterName).cancel();
edsRespTimers.remove(clusterName);
}
if (endpointWatchers.containsKey(clusterName)) {
for (EndpointWatcher watcher : endpointWatchers.get(clusterName)) {
watcher.onEndpointChanged(entry.getValue());
}
}
}
}
@VisibleForTesting
final class RpcRetryTask implements Runnable {
@Override
public void run() {
startRpcStream();
if (configWatcher != null) {
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
ldsRespTimer =
syncContext
.schedule(
new LdsResourceFetchTimeoutTask(ldsResourceName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
if (listenerWatcher != null) {
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.<String>of());
ldsRespTimer =
syncContext
.schedule(
new ListenerResourceFetchTimeoutTask(":" + listenerPort),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
if (!clusterWatchers.isEmpty()) {
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
for (String clusterName : clusterWatchers.keySet()) {
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new CdsResourceFetchTimeoutTask(clusterName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
cdsRespTimers.put(clusterName, timeoutHandle);
}
}
if (!endpointWatchers.isEmpty()) {
adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
for (String clusterName : endpointWatchers.keySet()) {
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new EdsResourceFetchTimeoutTask(clusterName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
edsRespTimers.put(clusterName, timeoutHandle);
}
}
}
}
private final class AdsStream implements StreamObserver<DiscoveryResponse> {
private final AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub;
private StreamObserver<DiscoveryRequest> requestWriter;
private boolean responseReceived;
private boolean closed;
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
private String cdsVersion = "";
private String edsVersion = "";
// Response nonce for the most recently received discovery responses of each resource type.
// Client initiated requests start response nonce with empty string.
// A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest
// corresponds to.
// A nonce becomes stale following a newer nonce being presented to the client in a
// DiscoveryResponse.
private String ldsRespNonce = "";
private String rdsRespNonce = "";
private String cdsRespNonce = "";
private String edsRespNonce = "";
// Most recently requested RDS resource name, which is an intermediate resource name for
// resolving service config.
// LDS request always use the same resource name, which is the "xds:" URI.
// Resource names for EDS requests are always represented by the cluster names that
// watchers are interested in.
@Nullable
private String rdsResourceName;
private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) {
this.stub = checkNotNull(stub, "stub");
}
private void start() {
requestWriter = stub.withWaitForReady().streamAggregatedResources(this);
}
@Override
public void onNext(final DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
responseReceived = true;
String typeUrl = response.getTypeUrl();
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
ldsRespNonce = response.getNonce();
handleLdsResponse(response);
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
rdsRespNonce = response.getNonce();
handleRdsResponse(response);
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
cdsRespNonce = response.getNonce();
handleCdsResponse(response);
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
edsRespNonce = response.getNonce();
handleEdsResponse(response);
} else {
logger.log(
XdsLogLevel.WARNING,
"Received an unknown type of DiscoveryResponse\n{0}", response);
}
}
});
}
@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleStreamClosed(Status.fromThrowable(t));
}
});
}
@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleStreamClosed(
Status.UNAVAILABLE.withDescription("Closed by server"));
}
});
}
private void handleStreamClosed(Status error) {
checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}
logger.log(
XdsLogLevel.ERROR,
"ADS stream closed with status {0}: {1}. Cause: {2}",
error.getCode(), error.getDescription(), error.getCause());
closed = true;
if (configWatcher != null) {
configWatcher.onError(error);
}
if (listenerWatcher != null) {
listenerWatcher.onError(error);
}
for (Set<ClusterWatcher> watchers : clusterWatchers.values()) {
for (ClusterWatcher watcher : watchers) {
watcher.onError(error);
}
}
for (Set<EndpointWatcher> watchers : endpointWatchers.values()) {
for (EndpointWatcher watcher : watchers) {
watcher.onError(error);
}
}
cleanUp();
cleanUpResources();
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
// has never been initialized.
retryBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = 0;
if (!responseReceived) {
delayNanos =
Math.max(
0,
retryBackoffPolicy.nextBackoffNanos()
- adsStreamRetryStopwatch.elapsed(TimeUnit.NANOSECONDS));
}
logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
rpcRetryTimer =
syncContext.schedule(
new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
}
private void close(Exception error) {
if (closed) {
return;
}
closed = true;
cleanUp();
requestWriter.onError(error);
}
private void cleanUp() {
if (adsStream == this) {
adsStream = null;
}
}
/**
* Sends a DiscoveryRequest for the given resource name to management server. Memories the
* requested resource name (except for LDS as we always request for the singleton Listener)
* as we need it to find resources in responses.
*/
private void sendXdsRequest(String typeUrl, Collection<String> resourceNames) {
checkState(requestWriter != null, "ADS stream has not been started");
String version = "";
String nonce = "";
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
version = ldsVersion;
nonce = ldsRespNonce;
logger.log(XdsLogLevel.INFO, "Sending LDS request for resources: {0}", resourceNames);
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
checkArgument(resourceNames.size() == 1,
"RDS request requesting for more than one resource");
version = rdsVersion;
nonce = rdsRespNonce;
rdsResourceName = resourceNames.iterator().next();
logger.log(XdsLogLevel.INFO, "Sending RDS request for resources: {0}", resourceNames);
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
version = cdsVersion;
nonce = cdsRespNonce;
logger.log(XdsLogLevel.INFO, "Sending CDS request for resources: {0}", resourceNames);
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
version = edsVersion;
nonce = edsRespNonce;
logger.log(XdsLogLevel.INFO, "Sending EDS request for resources: {0}", resourceNames);
}
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
.setVersionInfo(version)
.setNode(node)
.addAllResourceNames(resourceNames)
.setTypeUrl(typeUrl)
.setResponseNonce(nonce)
.build();
requestWriter.onNext(request);
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", request);
}
/**
* Sends a DiscoveryRequest with the given information as an ACK. Updates the latest accepted
* version for the corresponding resource type.
*/
private void sendAckRequest(String typeUrl, Collection<String> resourceNames,
String versionInfo) {
checkState(requestWriter != null, "ADS stream has not been started");
String nonce = "";
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
ldsVersion = versionInfo;
nonce = ldsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
rdsVersion = versionInfo;
nonce = rdsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
cdsVersion = versionInfo;
nonce = cdsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
edsVersion = versionInfo;
nonce = edsRespNonce;
}
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
.setVersionInfo(versionInfo)
.setNode(node)
.addAllResourceNames(resourceNames)
.setTypeUrl(typeUrl)
.setResponseNonce(nonce)
.build();
requestWriter.onNext(request);
logger.log(XdsLogLevel.DEBUG, "Sent ACK request\n{0}", request);
}
/**
* Sends a DiscoveryRequest with the given information as an NACK. NACK takes the previous
* accepted version.
*/
private void sendNackRequest(String typeUrl, Collection<String> resourceNames,
String rejectVersion, String message) {
checkState(requestWriter != null, "ADS stream has not been started");
String versionInfo = "";
String nonce = "";
if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
versionInfo = ldsVersion;
nonce = ldsRespNonce;
logger.log(
XdsLogLevel.WARNING,
"Rejecting LDS update, version: {0}, reason: {1}", rejectVersion, message);
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
versionInfo = rdsVersion;
nonce = rdsRespNonce;
logger.log(
XdsLogLevel.WARNING,
"Rejecting RDS update, version: {0}, reason: {1}", rejectVersion, message);
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
versionInfo = cdsVersion;
nonce = cdsRespNonce;
logger.log(
XdsLogLevel.WARNING,
"Rejecting CDS update, version: {0}, reason: {1}", rejectVersion, message);
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
versionInfo = edsVersion;
nonce = edsRespNonce;
logger.log(
XdsLogLevel.WARNING,
"Rejecting EDS update, version: {0}, reason: {1}", rejectVersion, message);
}
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
.setVersionInfo(versionInfo)
.setNode(node)
.addAllResourceNames(resourceNames)
.setTypeUrl(typeUrl)
.setResponseNonce(nonce)
.setErrorDetail(
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE)
.setMessage(message))
.build();
requestWriter.onNext(request);
logger.log(XdsLogLevel.DEBUG, "Sent NACK request\n{0}", request);
}
}
private abstract class ResourceFetchTimeoutTask implements Runnable {
final String resourceName;
ResourceFetchTimeoutTask(String resourceName) {
this.resourceName = resourceName;
}
@Override
public void run() {
logger.log(
XdsLogLevel.WARNING,
"Did not receive resource info {0} after {1} seconds, conclude it absent",
resourceName, INITIAL_RESOURCE_FETCH_TIMEOUT_SEC);
}
}
@VisibleForTesting
final class LdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
LdsResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
ldsRespTimer = null;
configWatcher.onError(
Status.NOT_FOUND
.withDescription("Listener resource for listener " + resourceName + " not found."));
}
}
@VisibleForTesting
final class ListenerResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
ListenerResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
ldsRespTimer = null;
listenerWatcher.onError(
Status.NOT_FOUND
.withDescription("Listener resource for port " + resourceName + " not found."));
}
}
@VisibleForTesting
final class RdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
RdsResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
rdsRespTimer = null;
configWatcher.onError(Status.NOT_FOUND
.withDescription(
"RouteConfiguration resource for route " + resourceName + " not found."));
}
}
@VisibleForTesting
final class CdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
CdsResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
cdsRespTimers.remove(resourceName);
absentCdsResources.add(resourceName);
for (ClusterWatcher wat : clusterWatchers.get(resourceName)) {
wat.onError(
Status.NOT_FOUND
.withDescription("Cluster resource " + resourceName + " not found."));
}
}
}
@VisibleForTesting
final class EdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
EdsResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
edsRespTimers.remove(resourceName);
absentEdsResources.add(resourceName);
for (EndpointWatcher wat : endpointWatchers.get(resourceName)) {
wat.onError(
Status.NOT_FOUND
.withDescription(
"Endpoint resource for cluster " + resourceName + " not found."));
}
}
}
/**
* Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
* case-insensitive.
*
* <p>Wildcard pattern rules:
* <ol>
* <li>A single asterisk (*) matches any domain.</li>
* <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
* but not both.</li>
* </ol>
*/
@VisibleForTesting
static boolean matchHostName(String hostName, String pattern) {
checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
"Invalid host name");
checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
"Invalid pattern/domain name");
hostName = hostName.toLowerCase(Locale.US);
pattern = pattern.toLowerCase(Locale.US);
// hostName and pattern are now in lower case -- domain names are case-insensitive.
if (!pattern.contains("*")) {
// Not a wildcard pattern -- hostName and pattern must match exactly.
return hostName.equals(pattern);
}
// Wildcard pattern
if (pattern.length() == 1) {
return true;
}
int index = pattern.indexOf('*');
// At most one asterisk (*) is allowed.
if (pattern.indexOf('*', index + 1) != -1) {
return false;
}
// Asterisk can only match prefix or suffix.
if (index != 0 && index != pattern.length() - 1) {
return false;
}
// HostName must be at least as long as the pattern because asterisk has to
// match one or more characters.
if (hostName.length() < pattern.length()) {
return false;
}
if (index == 0 && hostName.endsWith(pattern.substring(1))) {
// Prefix matching fails.
return true;
}
// Pattern matches hostname if suffix matching succeeds.
return index == pattern.length() - 1
&& hostName.startsWith(pattern.substring(0, pattern.length() - 1));
}
/**
* Convert protobuf message to human readable String format. Useful for protobuf messages
* containing {@link com.google.protobuf.Any} fields.
*/
@VisibleForTesting
static final class MessagePrinter {
private final JsonFormat.Printer printer;
@VisibleForTesting
MessagePrinter() {
com.google.protobuf.TypeRegistry registry =
com.google.protobuf.TypeRegistry.newBuilder()
.add(Listener.getDescriptor())
.add(HttpConnectionManager.getDescriptor())
.add(RouteConfiguration.getDescriptor())
.add(Cluster.getDescriptor())
.add(ClusterLoadAssignment.getDescriptor())
.build();
printer = JsonFormat.printer().usingTypeRegistry(registry);
}
@VisibleForTesting
String print(MessageOrBuilder message) {
String res;
try {
res = printer.print(message);
} catch (InvalidProtocolBufferException e) {
res = message + " (failed to pretty-print: " + e + ")";
}
return res;
}
}
}