blob: 288ad2fb92e52cd68bb4c62c726403e4f38dca9e [file] [log] [blame]
* Copyright 2019 The gRPC Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package io.grpc.xds;
import static;
// TODO(sanjaypujare): remove dependency on envoy data types.
import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyServerProtoData.Listener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
* An {@link XdsClient} instance encapsulates all of the logic for communicating with the xDS
* server. It may create multiple RPC streams (or a single ADS stream) for a series of xDS
* protocols (e.g., LDS, RDS, VHDS, CDS and EDS) over a single channel. Watch-based interfaces
* are provided for each set of data needed by gRPC.
abstract class XdsClient {
* Data class containing the results of performing a series of resource discovery RPCs via
* LDS/RDS/VHDS protocols. The results may include configurations for path/host rewriting,
* traffic mirroring, retry or hedging, default timeouts and load balancing policy that will
* be used to generate a service config.
static final class ConfigUpdate {
private final String clusterName;
private final Listener listener;
private ConfigUpdate(String clusterName, @Nullable Listener listener) {
this.clusterName = clusterName;
this.listener = listener;
String getClusterName() {
return clusterName;
public Listener getListener() {
return listener;
static Builder newBuilder() {
return new Builder();
static final class Builder {
private String clusterName;
@Nullable private Listener listener;
// Use ConfigUpdate.newBuilder().
private Builder() {
Builder setClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
Builder setListener(Listener listener) {
this.listener = listener;
return this;
ConfigUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
return new ConfigUpdate(clusterName, listener);
* Data class containing the results of performing a resource discovery RPC via CDS protocol.
* The results include configurations for a single upstream cluster, such as endpoint discovery
* type, load balancing policy, connection timeout and etc.
static final class ClusterUpdate {
private final String clusterName;
private final String edsServiceName;
private final String lbPolicy;
private final boolean enableLrs;
private final String lrsServerName;
private final UpstreamTlsContext upstreamTlsContext;
private ClusterUpdate(String clusterName, String edsServiceName, String lbPolicy,
boolean enableLrs, @Nullable String lrsServerName,
@Nullable UpstreamTlsContext upstreamTlsContext) {
this.clusterName = clusterName;
this.edsServiceName = edsServiceName;
this.lbPolicy = lbPolicy;
this.enableLrs = enableLrs;
this.lrsServerName = lrsServerName;
this.upstreamTlsContext = upstreamTlsContext;
String getClusterName() {
return clusterName;
* Returns the resource name for EDS requests.
String getEdsServiceName() {
return edsServiceName;
* Returns the policy of balancing loads to endpoints. Only "round_robin" is supported
* as of now.
String getLbPolicy() {
return lbPolicy;
* Returns true if LRS is enabled.
boolean isEnableLrs() {
return enableLrs;
* Returns the server name to send client load reports to if LRS is enabled. {@code null} if
* {@link #isEnableLrs()} returns {@code false}.
String getLrsServerName() {
return lrsServerName;
/** Returns the {@link UpstreamTlsContext} for this cluster if present, else null. */
UpstreamTlsContext getUpstreamTlsContext() {
return upstreamTlsContext;
static Builder newBuilder() {
return new Builder();
static final class Builder {
private String clusterName;
private String edsServiceName;
private String lbPolicy;
private boolean enableLrs;
private String lrsServerName;
private UpstreamTlsContext upstreamTlsContext;
// Use ClusterUpdate.newBuilder().
private Builder() {
Builder setClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
Builder setEdsServiceName(String edsServiceName) {
this.edsServiceName = edsServiceName;
return this;
Builder setLbPolicy(String lbPolicy) {
this.lbPolicy = lbPolicy;
return this;
Builder setEnableLrs(boolean enableLrs) {
this.enableLrs = enableLrs;
return this;
Builder setLrsServerName(String lrsServerName) {
this.lrsServerName = lrsServerName;
return this;
Builder setUpstreamTlsContext(UpstreamTlsContext upstreamTlsContext) {
this.upstreamTlsContext = upstreamTlsContext;
return this;
ClusterUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
Preconditions.checkState(lbPolicy != null, "lbPolicy is not set");
(enableLrs && lrsServerName != null) || (!enableLrs && lrsServerName == null),
"lrsServerName is not set while LRS is enabled "
+ "OR lrsServerName is set while LRS is not enabled");
new ClusterUpdate(clusterName, edsServiceName == null ? clusterName : edsServiceName,
lbPolicy, enableLrs, lrsServerName, upstreamTlsContext);
* Data class containing the results of performing a resource discovery RPC via EDS protocol.
* The results include endpoint addresses running the requested service, as well as
* configurations for traffic control such as drop overloads, inter-cluster load balancing
* policy and etc.
static final class EndpointUpdate {
private final String clusterName;
private final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
private final List<DropOverload> dropPolicies;
private EndpointUpdate(
String clusterName,
Map<Locality, LocalityLbEndpoints> localityLbEndpoints,
List<DropOverload> dropPolicies) {
this.clusterName = clusterName;
this.localityLbEndpointsMap = localityLbEndpoints;
this.dropPolicies = dropPolicies;
static Builder newBuilder() {
return new Builder();
String getClusterName() {
return clusterName;
* Returns a map of localities with endpoints load balancing information in each locality.
Map<Locality, LocalityLbEndpoints> getLocalityLbEndpointsMap() {
return Collections.unmodifiableMap(localityLbEndpointsMap);
* Returns a list of drop policies to be applied to outgoing requests.
List<DropOverload> getDropPolicies() {
return Collections.unmodifiableList(dropPolicies);
public boolean equals(Object o) {
if (this == o) {
return true;
if (o == null || getClass() != o.getClass()) {
return false;
EndpointUpdate that = (EndpointUpdate) o;
return clusterName.equals(that.clusterName)
&& localityLbEndpointsMap.equals(that.localityLbEndpointsMap)
&& dropPolicies.equals(that.dropPolicies);
public int hashCode() {
return Objects.hash(clusterName, localityLbEndpointsMap, dropPolicies);
static final class Builder {
private String clusterName;
private Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>();
private List<DropOverload> dropPolicies = new ArrayList<>();
// Use EndpointUpdate.newBuilder().
private Builder() {
Builder setClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
Builder addLocalityLbEndpoints(Locality locality, LocalityLbEndpoints info) {
localityLbEndpointsMap.put(locality, info);
return this;
Builder addDropPolicy(DropOverload policy) {
return this;
EndpointUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
new EndpointUpdate(
* Config watcher interface. To be implemented by the xDS resolver.
interface ConfigWatcher {
* Called when receiving an update on virtual host configurations.
void onConfigChanged(ConfigUpdate update);
void onError(Status error);
* Cluster watcher interface.
interface ClusterWatcher {
void onClusterChanged(ClusterUpdate update);
void onError(Status error);
* Endpoint watcher interface.
interface EndpointWatcher {
void onEndpointChanged(EndpointUpdate update);
void onError(Status error);
* Shutdown this {@link XdsClient} and release resources.
abstract void shutdown();
* Registers a watcher to receive {@link ConfigUpdate} for service with the given hostname and
* port.
* <p>Unlike watchers for cluster data and endpoint data, at most one ConfigWatcher can be
* registered. Once it is registered, it cannot be unregistered.
* @param hostName the host name part of the "xds:" URI for the server name that the gRPC client
* targets for. Must NOT contain port.
* @param port the port part of the "xds:" URI for the server name that the gRPC client targets
* for. -1 if not specified.
* @param watcher the {@link ConfigWatcher} to receive {@link ConfigUpdate}.
void watchConfigData(String hostName, int port, ConfigWatcher watcher) {
* Registers a data watcher for the given cluster.
* <p>Adding the same watcher for the same cluster more than once is a no-op.
void watchClusterData(String clusterName, ClusterWatcher watcher) {
* Unregisters the given cluster watcher, which was registered to receive updates for the
* given cluster.
* <p>Cancelling a watcher that was not registered for the given cluster is a no-op.
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
* Registers a data watcher for endpoints in the given cluster.
* <p>Adding the same watcher for the same cluster more than once is a no-op.
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
* Unregisters the given endpoints watcher, which was registered to receive updates for
* endpoints information in the given cluster.
* <p>Cancelling a watcher that was not registered for the given cluster is a no-op.
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
* Starts reporting client load stats to a remote server for the given cluster.
LoadReportClient reportClientStats(String clusterName, String serverUri) {
throw new UnsupportedOperationException();
* Stops reporting client load stats to the remote server for the given cluster.
void cancelClientStatsReport(String clusterName) {
abstract static class XdsClientFactory {
abstract XdsClient createXdsClient();
* An {@link ObjectPool} holding reference and ref-count of an {@link XdsClient} instance.
* Initially the instance is null and the ref-count is zero. {@link #getObject()} will create a
* new XdsClient instance if the ref-count is zero when calling the method. {@code #getObject()}
* increments the ref-count and {@link #returnObject(Object)} decrements it. Anytime when the
* ref-count gets back to zero, the XdsClient instance will be shutdown and de-referenced.
static final class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private final XdsClientFactory xdsClientFactory;
XdsClient xdsClient;
private int refCount;
RefCountedXdsClientObjectPool(XdsClientFactory xdsClientFactory) {
this.xdsClientFactory = Preconditions.checkNotNull(xdsClientFactory, "xdsClientFactory");
* See {@link RefCountedXdsClientObjectPool}.
public synchronized XdsClient getObject() {
if (xdsClient == null) {
refCount == 0,
"Bug: refCount should be zero while xdsClient is null");
xdsClient = xdsClientFactory.createXdsClient();
return xdsClient;
* See {@link RefCountedXdsClientObjectPool}.
public synchronized XdsClient returnObject(Object object) {
object == xdsClient,
"Bug: the returned object '%s' does not match current XdsClient '%s'",
Preconditions.checkState(refCount >= 0, "Bug: refCount of XdsClient less than 0");
if (refCount == 0) {
xdsClient = null;
return null;
* Factory for creating channels to xDS severs.
abstract static class XdsChannelFactory {
private static final XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() {
* Creates a channel to the first server in the given list.
ManagedChannel createChannel(List<ServerInfo> servers) {
checkArgument(!servers.isEmpty(), "No management server provided.");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannelBuilder<?> channelBuilder = null;
// Use the first supported channel credentials configuration.
// Currently, only "google_default" is supported.
for (ChannelCreds creds : channelCredsList) {
if (creds.getType().equals("google_default")) {
channelBuilder = GoogleDefaultChannelBuilder.forTarget(serverUri);
if (channelBuilder == null) {
channelBuilder = ManagedChannelBuilder.forTarget(serverUri);
return channelBuilder
.keepAliveTime(5, TimeUnit.MINUTES)
static XdsChannelFactory getInstance() {
* Creates a channel to one of the provided management servers.
abstract ManagedChannel createChannel(List<ServerInfo> servers);