blob: 288ad2fb92e52cd68bb4c62c726403e4f38dca9e [file] [log] [blame]
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
// TODO(sanjaypujare): remove dependency on envoy data types.
import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyServerProtoData.Listener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* An {@link XdsClient} instance encapsulates all of the logic for communicating with the xDS
* server. It may create multiple RPC streams (or a single ADS stream) for a series of xDS
* protocols (e.g., LDS, RDS, VHDS, CDS and EDS) over a single channel. Watch-based interfaces
* are provided for each set of data needed by gRPC.
*/
abstract class XdsClient {
/**
* Data class containing the results of performing a series of resource discovery RPCs via
* LDS/RDS/VHDS protocols. The results may include configurations for path/host rewriting,
* traffic mirroring, retry or hedging, default timeouts and load balancing policy that will
* be used to generate a service config.
*/
static final class ConfigUpdate {
private final String clusterName;
private final Listener listener;
private ConfigUpdate(String clusterName, @Nullable Listener listener) {
this.clusterName = clusterName;
this.listener = listener;
}
String getClusterName() {
return clusterName;
}
@Nullable
public Listener getListener() {
return listener;
}
static Builder newBuilder() {
return new Builder();
}
static final class Builder {
private String clusterName;
@Nullable private Listener listener;
// Use ConfigUpdate.newBuilder().
private Builder() {
}
Builder setClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
}
Builder setListener(Listener listener) {
this.listener = listener;
return this;
}
ConfigUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
return new ConfigUpdate(clusterName, listener);
}
}
}
/**
* Data class containing the results of performing a resource discovery RPC via CDS protocol.
* The results include configurations for a single upstream cluster, such as endpoint discovery
* type, load balancing policy, connection timeout and etc.
*/
static final class ClusterUpdate {
private final String clusterName;
private final String edsServiceName;
private final String lbPolicy;
private final boolean enableLrs;
private final String lrsServerName;
private final UpstreamTlsContext upstreamTlsContext;
private ClusterUpdate(String clusterName, String edsServiceName, String lbPolicy,
boolean enableLrs, @Nullable String lrsServerName,
@Nullable UpstreamTlsContext upstreamTlsContext) {
this.clusterName = clusterName;
this.edsServiceName = edsServiceName;
this.lbPolicy = lbPolicy;
this.enableLrs = enableLrs;
this.lrsServerName = lrsServerName;
this.upstreamTlsContext = upstreamTlsContext;
}
String getClusterName() {
return clusterName;
}
/**
* Returns the resource name for EDS requests.
*/
String getEdsServiceName() {
return edsServiceName;
}
/**
* Returns the policy of balancing loads to endpoints. Only "round_robin" is supported
* as of now.
*/
String getLbPolicy() {
return lbPolicy;
}
/**
* Returns true if LRS is enabled.
*/
boolean isEnableLrs() {
return enableLrs;
}
/**
* Returns the server name to send client load reports to if LRS is enabled. {@code null} if
* {@link #isEnableLrs()} returns {@code false}.
*/
@Nullable
String getLrsServerName() {
return lrsServerName;
}
/** Returns the {@link UpstreamTlsContext} for this cluster if present, else null. */
@Nullable
UpstreamTlsContext getUpstreamTlsContext() {
return upstreamTlsContext;
}
static Builder newBuilder() {
return new Builder();
}
static final class Builder {
private String clusterName;
private String edsServiceName;
private String lbPolicy;
private boolean enableLrs;
@Nullable
private String lrsServerName;
@Nullable
private UpstreamTlsContext upstreamTlsContext;
// Use ClusterUpdate.newBuilder().
private Builder() {
}
Builder setClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
}
Builder setEdsServiceName(String edsServiceName) {
this.edsServiceName = edsServiceName;
return this;
}
Builder setLbPolicy(String lbPolicy) {
this.lbPolicy = lbPolicy;
return this;
}
Builder setEnableLrs(boolean enableLrs) {
this.enableLrs = enableLrs;
return this;
}
Builder setLrsServerName(String lrsServerName) {
this.lrsServerName = lrsServerName;
return this;
}
Builder setUpstreamTlsContext(UpstreamTlsContext upstreamTlsContext) {
this.upstreamTlsContext = upstreamTlsContext;
return this;
}
ClusterUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
Preconditions.checkState(lbPolicy != null, "lbPolicy is not set");
Preconditions.checkState(
(enableLrs && lrsServerName != null) || (!enableLrs && lrsServerName == null),
"lrsServerName is not set while LRS is enabled "
+ "OR lrsServerName is set while LRS is not enabled");
return
new ClusterUpdate(clusterName, edsServiceName == null ? clusterName : edsServiceName,
lbPolicy, enableLrs, lrsServerName, upstreamTlsContext);
}
}
}
/**
* Data class containing the results of performing a resource discovery RPC via EDS protocol.
* The results include endpoint addresses running the requested service, as well as
* configurations for traffic control such as drop overloads, inter-cluster load balancing
* policy and etc.
*/
static final class EndpointUpdate {
private final String clusterName;
private final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
private final List<DropOverload> dropPolicies;
private EndpointUpdate(
String clusterName,
Map<Locality, LocalityLbEndpoints> localityLbEndpoints,
List<DropOverload> dropPolicies) {
this.clusterName = clusterName;
this.localityLbEndpointsMap = localityLbEndpoints;
this.dropPolicies = dropPolicies;
}
static Builder newBuilder() {
return new Builder();
}
String getClusterName() {
return clusterName;
}
/**
* Returns a map of localities with endpoints load balancing information in each locality.
*/
Map<Locality, LocalityLbEndpoints> getLocalityLbEndpointsMap() {
return Collections.unmodifiableMap(localityLbEndpointsMap);
}
/**
* Returns a list of drop policies to be applied to outgoing requests.
*/
List<DropOverload> getDropPolicies() {
return Collections.unmodifiableList(dropPolicies);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EndpointUpdate that = (EndpointUpdate) o;
return clusterName.equals(that.clusterName)
&& localityLbEndpointsMap.equals(that.localityLbEndpointsMap)
&& dropPolicies.equals(that.dropPolicies);
}
@Override
public int hashCode() {
return Objects.hash(clusterName, localityLbEndpointsMap, dropPolicies);
}
static final class Builder {
private String clusterName;
private Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>();
private List<DropOverload> dropPolicies = new ArrayList<>();
// Use EndpointUpdate.newBuilder().
private Builder() {
}
Builder setClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
}
Builder addLocalityLbEndpoints(Locality locality, LocalityLbEndpoints info) {
localityLbEndpointsMap.put(locality, info);
return this;
}
Builder addDropPolicy(DropOverload policy) {
dropPolicies.add(policy);
return this;
}
EndpointUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
return
new EndpointUpdate(
clusterName,
ImmutableMap.copyOf(localityLbEndpointsMap),
ImmutableList.copyOf(dropPolicies));
}
}
}
/**
* Config watcher interface. To be implemented by the xDS resolver.
*/
interface ConfigWatcher {
/**
* Called when receiving an update on virtual host configurations.
*/
void onConfigChanged(ConfigUpdate update);
void onError(Status error);
}
/**
* Cluster watcher interface.
*/
interface ClusterWatcher {
void onClusterChanged(ClusterUpdate update);
void onError(Status error);
}
/**
* Endpoint watcher interface.
*/
interface EndpointWatcher {
void onEndpointChanged(EndpointUpdate update);
void onError(Status error);
}
/**
* Shutdown this {@link XdsClient} and release resources.
*/
abstract void shutdown();
/**
* Registers a watcher to receive {@link ConfigUpdate} for service with the given hostname and
* port.
*
* <p>Unlike watchers for cluster data and endpoint data, at most one ConfigWatcher can be
* registered. Once it is registered, it cannot be unregistered.
*
* @param hostName the host name part of the "xds:" URI for the server name that the gRPC client
* targets for. Must NOT contain port.
* @param port the port part of the "xds:" URI for the server name that the gRPC client targets
* for. -1 if not specified.
* @param watcher the {@link ConfigWatcher} to receive {@link ConfigUpdate}.
*/
void watchConfigData(String hostName, int port, ConfigWatcher watcher) {
}
/**
* Registers a data watcher for the given cluster.
*
* <p>Adding the same watcher for the same cluster more than once is a no-op.
*/
void watchClusterData(String clusterName, ClusterWatcher watcher) {
}
/**
* Unregisters the given cluster watcher, which was registered to receive updates for the
* given cluster.
*
* <p>Cancelling a watcher that was not registered for the given cluster is a no-op.
*/
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
}
/**
* Registers a data watcher for endpoints in the given cluster.
*
* <p>Adding the same watcher for the same cluster more than once is a no-op.
*/
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
}
/**
* Unregisters the given endpoints watcher, which was registered to receive updates for
* endpoints information in the given cluster.
*
* <p>Cancelling a watcher that was not registered for the given cluster is a no-op.
*/
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
}
/**
* Starts reporting client load stats to a remote server for the given cluster.
*/
LoadReportClient reportClientStats(String clusterName, String serverUri) {
throw new UnsupportedOperationException();
}
/**
* Stops reporting client load stats to the remote server for the given cluster.
*/
void cancelClientStatsReport(String clusterName) {
}
abstract static class XdsClientFactory {
abstract XdsClient createXdsClient();
}
/**
* An {@link ObjectPool} holding reference and ref-count of an {@link XdsClient} instance.
* Initially the instance is null and the ref-count is zero. {@link #getObject()} will create a
* new XdsClient instance if the ref-count is zero when calling the method. {@code #getObject()}
* increments the ref-count and {@link #returnObject(Object)} decrements it. Anytime when the
* ref-count gets back to zero, the XdsClient instance will be shutdown and de-referenced.
*/
static final class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private final XdsClientFactory xdsClientFactory;
@VisibleForTesting
@Nullable
XdsClient xdsClient;
private int refCount;
RefCountedXdsClientObjectPool(XdsClientFactory xdsClientFactory) {
this.xdsClientFactory = Preconditions.checkNotNull(xdsClientFactory, "xdsClientFactory");
}
/**
* See {@link RefCountedXdsClientObjectPool}.
*/
@Override
public synchronized XdsClient getObject() {
if (xdsClient == null) {
Preconditions.checkState(
refCount == 0,
"Bug: refCount should be zero while xdsClient is null");
xdsClient = xdsClientFactory.createXdsClient();
}
refCount++;
return xdsClient;
}
/**
* See {@link RefCountedXdsClientObjectPool}.
*/
@Override
public synchronized XdsClient returnObject(Object object) {
Preconditions.checkState(
object == xdsClient,
"Bug: the returned object '%s' does not match current XdsClient '%s'",
object,
xdsClient);
refCount--;
Preconditions.checkState(refCount >= 0, "Bug: refCount of XdsClient less than 0");
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
}
return null;
}
}
/**
* Factory for creating channels to xDS severs.
*/
abstract static class XdsChannelFactory {
private static final XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() {
/**
* Creates a channel to the first server in the given list.
*/
@Override
ManagedChannel createChannel(List<ServerInfo> servers) {
checkArgument(!servers.isEmpty(), "No management server provided.");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannelBuilder<?> channelBuilder = null;
// Use the first supported channel credentials configuration.
// Currently, only "google_default" is supported.
for (ChannelCreds creds : channelCredsList) {
if (creds.getType().equals("google_default")) {
channelBuilder = GoogleDefaultChannelBuilder.forTarget(serverUri);
break;
}
}
if (channelBuilder == null) {
channelBuilder = ManagedChannelBuilder.forTarget(serverUri);
}
return channelBuilder
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
}
};
static XdsChannelFactory getInstance() {
return DEFAULT_INSTANCE;
}
/**
* Creates a channel to one of the provided management servers.
*/
abstract ManagedChannel createChannel(List<ServerInfo> servers);
}
}