blob: 5beefc3384c709e61b7e22683d910ebb2584deb8 [file] [log] [blame]
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ObjectPool;
import io.grpc.util.ForwardingClientStreamTracer;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import io.grpc.xds.internal.sds.SslContextProviderSupplier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
/**
* Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
* the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
* LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
* configurations to requests sent to the corresponding cluster, such as drop policies, circuit
* breakers.
*/
final class ClusterImplLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
@VisibleForTesting
static boolean enableCircuitBreaking =
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
@VisibleForTesting
static boolean enableSecurity =
Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"));
private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
private final XdsLogger logger;
private final Helper helper;
private final ThreadSafeRandom random;
// The following fields are effectively final.
private String cluster;
@Nullable
private String edsServiceName;
private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient;
private CallCounterProvider callCounterProvider;
private ClusterDropStats dropStats;
private ClusterImplLbHelper childLbHelper;
private LoadBalancer childLb;
ClusterImplLoadBalancer(Helper helper) {
this(helper, ThreadSafeRandomImpl.instance);
}
ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
this.helper = checkNotNull(helper, "helper");
this.random = checkNotNull(random, "random");
InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
Attributes attributes = resolvedAddresses.getAttributes();
if (xdsClientPool == null) {
xdsClientPool = attributes.get(InternalXdsAttributes.XDS_CLIENT_POOL);
xdsClient = xdsClientPool.getObject();
}
if (callCounterProvider == null) {
callCounterProvider = attributes.get(InternalXdsAttributes.CALL_COUNTER_PROVIDER);
}
ClusterImplConfig config =
(ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (cluster == null) {
cluster = config.cluster;
edsServiceName = config.edsServiceName;
childLbHelper = new ClusterImplLbHelper(
callCounterProvider.getOrCreate(config.cluster, config.edsServiceName));
childLb = config.childPolicy.getProvider().newLoadBalancer(childLbHelper);
// Assume load report server does not change throughout cluster lifetime.
if (config.lrsServerName != null) {
if (config.lrsServerName.isEmpty()) {
dropStats = xdsClient.addClusterDropStats(cluster, edsServiceName);
} else {
logger.log(XdsLogLevel.WARNING, "Can only report load to the same management server");
}
}
}
childLbHelper.updateDropPolicies(config.dropCategories);
childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
childLb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
.build());
}
@Override
public void handleNameResolutionError(Status error) {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@Override
public void shutdown() {
if (dropStats != null) {
dropStats.release();
}
if (childLb != null) {
childLb.shutdown();
if (childLbHelper != null) {
childLbHelper.updateSslContextProviderSupplier(null);
childLbHelper = null;
}
}
if (xdsClient != null) {
xdsClient = xdsClientPool.returnObject(xdsClient);
}
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}
/**
* A decorated {@link LoadBalancer.Helper} that applies configurations for connections
* or requests to endpoints in the cluster.
*/
private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
private final AtomicLong inFlights;
private ConnectivityState currentState = ConnectivityState.IDLE;
private SubchannelPicker currentPicker = BUFFER_PICKER;
private List<DropOverload> dropPolicies = Collections.emptyList();
private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
@Nullable
private SslContextProviderSupplier sslContextProviderSupplier;
private ClusterImplLbHelper(AtomicLong inFlights) {
this.inFlights = checkNotNull(inFlights, "inFlights");
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
currentState = newState;
currentPicker = newPicker;
SubchannelPicker picker =
new RequestLimitingSubchannelPicker(newPicker, dropPolicies, maxConcurrentRequests);
delegate().updateBalancingState(newState, picker);
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
List<EquivalentAddressGroup> addresses = new ArrayList<>();
for (EquivalentAddressGroup eag : args.getAddresses()) {
Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
InternalXdsAttributes.ATTR_CLUSTER_NAME, cluster);
if (enableSecurity && sslContextProviderSupplier != null) {
attrBuilder.set(
InternalXdsAttributes.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
sslContextProviderSupplier);
}
addresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
}
Locality locality = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
// In case of not (which really shouldn't), loads are aggregated under an empty locality.
if (locality == null) {
locality = Locality.create("", "", "");
}
final ClusterLocalityStats localityStats = xdsClient.addClusterLocalityStats(
cluster, edsServiceName, locality);
Attributes attrs = args.getAttributes().toBuilder().set(
ATTR_CLUSTER_LOCALITY_STATS, localityStats).build();
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
final Subchannel subchannel = delegate().createSubchannel(args);
return new ForwardingSubchannel() {
@Override
public void shutdown() {
localityStats.release();
delegate().shutdown();
}
@Override
protected Subchannel delegate() {
return subchannel;
}
};
}
@Override
protected Helper delegate() {
return helper;
}
private void updateDropPolicies(List<DropOverload> dropOverloads) {
if (!dropPolicies.equals(dropOverloads)) {
dropPolicies = dropOverloads;
updateBalancingState(currentState, currentPicker);
}
}
private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
return;
}
this.maxConcurrentRequests =
maxConcurrentRequests != null
? maxConcurrentRequests
: DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
updateBalancingState(currentState, currentPicker);
}
private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
UpstreamTlsContext currentTlsContext =
sslContextProviderSupplier != null
? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
: null;
if (Objects.equals(currentTlsContext, tlsContext)) {
return;
}
if (sslContextProviderSupplier != null) {
sslContextProviderSupplier.close();
}
sslContextProviderSupplier =
tlsContext != null
? new SslContextProviderSupplier(tlsContext, xdsClient.getTlsContextManager())
: null;
}
private class RequestLimitingSubchannelPicker extends SubchannelPicker {
private final SubchannelPicker delegate;
private final List<DropOverload> dropPolicies;
private final long maxConcurrentRequests;
private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
List<DropOverload> dropPolicies, long maxConcurrentRequests) {
this.delegate = delegate;
this.dropPolicies = dropPolicies;
this.maxConcurrentRequests = maxConcurrentRequests;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
for (DropOverload dropOverload : dropPolicies) {
int rand = random.nextInt(1_000_000);
if (rand < dropOverload.dropsPerMillion()) {
logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
dropOverload.category());
if (dropStats != null) {
dropStats.recordDroppedRequest(dropOverload.category());
}
return PickResult.withDrop(
Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
}
}
final PickResult result = delegate.pickSubchannel(args);
if (result.getStatus().isOk() && result.getSubchannel() != null) {
if (enableCircuitBreaking) {
if (inFlights.get() >= maxConcurrentRequests) {
if (dropStats != null) {
dropStats.recordDroppedRequest();
}
return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
"Cluster max concurrent requests limit exceeded"));
}
}
final ClusterLocalityStats stats =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
stats, inFlights, result.getStreamTracerFactory());
return PickResult.withSubchannel(result.getSubchannel(), tracerFactory);
}
return result;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
}
}
}
private static final class CountingStreamTracerFactory extends ClientStreamTracer.Factory {
private ClusterLocalityStats stats;
private final AtomicLong inFlights;
@Nullable
private final ClientStreamTracer.Factory delegate;
private CountingStreamTracerFactory(
ClusterLocalityStats stats, AtomicLong inFlights,
@Nullable ClientStreamTracer.Factory delegate) {
this.stats = checkNotNull(stats, "stats");
this.inFlights = checkNotNull(inFlights, "inFlights");
this.delegate = delegate;
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
stats.recordCallStarted();
inFlights.incrementAndGet();
if (delegate == null) {
return new ClientStreamTracer() {
@Override
public void streamClosed(Status status) {
stats.recordCallFinished(status);
inFlights.decrementAndGet();
}
};
}
final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
return new ForwardingClientStreamTracer() {
@Override
protected ClientStreamTracer delegate() {
return delegatedTracer;
}
@Override
public void streamClosed(Status status) {
stats.recordCallFinished(status);
inFlights.decrementAndGet();
delegate().streamClosed(status);
}
};
}
}
}