blob: b4aa39821d243872516f200d67af7b84d565dd59 [file] [log] [blame]
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.LeastRequestLoadBalancerProvider.DEFAULT_CHOICE_COUNT;
import static io.grpc.xds.LeastRequestLoadBalancerProvider.MAX_CHOICE_COUNT;
import static io.grpc.xds.LeastRequestLoadBalancerProvider.MIN_CHOICE_COUNT;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
/**
* A {@link LoadBalancer} that provides least request load balancing based on
* outstanding request counters.
* It works by sampling a number of subchannels and picking the one with the
* fewest amount of outstanding requests.
* The default sampling amount of two is also known as
* the "power of two choices" (P2C).
*/
final class LeastRequestLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");
@VisibleForTesting
static final Attributes.Key<AtomicInteger> IN_FLIGHTS =
Attributes.Key.create("in-flights");
private final Helper helper;
private final ThreadSafeRandom random;
private final Map<EquivalentAddressGroup, Subchannel> subchannels =
new HashMap<>();
private ConnectivityState currentState;
private LeastRequestPicker currentPicker = new EmptyPicker(EMPTY_OK);
private int choiceCount = DEFAULT_CHOICE_COUNT;
LeastRequestLoadBalancer(Helper helper) {
this(helper, ThreadSafeRandomImpl.instance);
}
@VisibleForTesting
LeastRequestLoadBalancer(Helper helper, ThreadSafeRandom random) {
this.helper = checkNotNull(helper, "helper");
this.random = checkNotNull(random, "random");
}
@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
+ ", attrs=" + resolvedAddresses.getAttributes()));
return false;
}
LeastRequestConfig config =
(LeastRequestConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
// Config may be null if least_request is used outside xDS
if (config != null) {
choiceCount = config.choiceCount;
}
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());
for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry :
latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
EquivalentAddressGroup originalAddressGroup = latestEntry.getValue();
Subchannel existingSubchannel = subchannels.get(strippedAddressGroup);
if (existingSubchannel != null) {
// EAG's Attributes may have changed.
existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup));
continue;
}
// Create new subchannels for new addresses.
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
.set(STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE)))
// Used to track the in flight requests on this particular subchannel
.set(IN_FLIGHTS, new AtomicInteger(0));
final Subchannel subchannel = checkNotNull(
helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(originalAddressGroup)
.setAttributes(subchannelAttrs.build())
.build()),
"subchannel");
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
processSubchannelState(subchannel, state);
}
});
subchannels.put(strippedAddressGroup, subchannel);
subchannel.requestConnection();
}
ArrayList<Subchannel> removedSubchannels = new ArrayList<>();
for (EquivalentAddressGroup addressGroup : removedAddrs) {
removedSubchannels.add(subchannels.remove(addressGroup));
}
// Update the picker before shutting down the subchannels, to reduce the chance of the race
// between picking a subchannel and shutting it down.
updateBalancingState();
// Shutdown removed subchannels
for (Subchannel removedSubchannel : removedSubchannels) {
shutdownSubchannel(removedSubchannel);
}
return true;
}
@Override
public void handleNameResolutionError(Status error) {
if (currentState != READY) {
updateBalancingState(TRANSIENT_FAILURE, new EmptyPicker(error));
}
}
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
if (subchannelStateRef.value.getState().equals(TRANSIENT_FAILURE)) {
if (stateInfo.getState().equals(CONNECTING) || stateInfo.getState().equals(IDLE)) {
return;
}
}
subchannelStateRef.value = stateInfo;
updateBalancingState();
}
private void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value =
ConnectivityStateInfo.forNonError(SHUTDOWN);
}
@Override
public void shutdown() {
for (Subchannel subchannel : getSubchannels()) {
shutdownSubchannel(subchannel);
}
subchannels.clear();
}
private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");
/**
* Updates picker with the list of active subchannels (state == READY).
*/
@SuppressWarnings("ReferenceEquality")
private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
if (activeList.isEmpty()) {
// No READY subchannels, determine aggregate state and error status
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
// This subchannel IDLE is not because of channel IDLE_TIMEOUT,
// in which case LB is already shutdown.
// LRLB will request connection immediately on subchannel IDLE.
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
// If all subchannels are TRANSIENT_FAILURE, return the Status associated with
// an arbitrary subchannel, otherwise return OK.
new EmptyPicker(aggStatus));
} else {
updateBalancingState(READY, new ReadyPicker(activeList, choiceCount, random));
}
}
private void updateBalancingState(ConnectivityState state, LeastRequestPicker picker) {
if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
helper.updateBalancingState(state, picker);
currentState = state;
currentPicker = picker;
}
}
/**
* Filters out non-ready subchannels.
*/
private static List<Subchannel> filterNonFailingSubchannels(
Collection<Subchannel> subchannels) {
List<Subchannel> readySubchannels = new ArrayList<>(subchannels.size());
for (Subchannel subchannel : subchannels) {
if (isReady(subchannel)) {
readySubchannels.add(subchannel);
}
}
return readySubchannels;
}
/**
* Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and
* remove all attributes. The values are the original EAGs.
*/
private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(
List<EquivalentAddressGroup> groupList) {
Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = new HashMap<>(groupList.size() * 2);
for (EquivalentAddressGroup group : groupList) {
addrs.put(stripAttrs(group), group);
}
return addrs;
}
private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
return new EquivalentAddressGroup(eag.getAddresses());
}
@VisibleForTesting
Collection<Subchannel> getSubchannels() {
return subchannels.values();
}
private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef(
Subchannel subchannel) {
return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
}
private static AtomicInteger getInFlights(Subchannel subchannel) {
return checkNotNull(subchannel.getAttributes().get(IN_FLIGHTS), "IN_FLIGHTS");
}
// package-private to avoid synthetic access
static boolean isReady(Subchannel subchannel) {
return getSubchannelStateInfoRef(subchannel).value.getState() == READY;
}
private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
Set<T> aCopy = new HashSet<>(a);
aCopy.removeAll(b);
return aCopy;
}
// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class LeastRequestPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(LeastRequestPicker picker);
}
@VisibleForTesting
static final class ReadyPicker extends LeastRequestPicker {
private final List<Subchannel> list; // non-empty
private final int choiceCount;
private final ThreadSafeRandom random;
ReadyPicker(List<Subchannel> list, int choiceCount, ThreadSafeRandom random) {
checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.choiceCount = choiceCount;
this.random = checkNotNull(random, "random");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
final Subchannel subchannel = nextSubchannel();
final OutstandingRequestsTracingFactory factory =
new OutstandingRequestsTracingFactory(getInFlights(subchannel));
return PickResult.withSubchannel(subchannel, factory);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(ReadyPicker.class)
.add("list", list)
.add("choiceCount", choiceCount)
.toString();
}
private Subchannel nextSubchannel() {
Subchannel candidate = list.get(random.nextInt(list.size()));
for (int i = 0; i < choiceCount - 1; ++i) {
Subchannel sampled = list.get(random.nextInt(list.size()));
if (getInFlights(sampled).get() < getInFlights(candidate).get()) {
candidate = sampled;
}
}
return candidate;
}
@VisibleForTesting
List<Subchannel> getList() {
return list;
}
@Override
boolean isEquivalentTo(LeastRequestPicker picker) {
if (!(picker instanceof ReadyPicker)) {
return false;
}
ReadyPicker other = (ReadyPicker) picker;
// the lists cannot contain duplicate subchannels
return other == this
|| ((list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list))
&& choiceCount == other.choiceCount);
}
}
@VisibleForTesting
static final class EmptyPicker extends LeastRequestPicker {
private final Status status;
EmptyPicker(@Nonnull Status status) {
this.status = Preconditions.checkNotNull(status, "status");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
}
@Override
boolean isEquivalentTo(LeastRequestPicker picker) {
return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
|| (status.isOk() && ((EmptyPicker) picker).status.isOk()));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(EmptyPicker.class).add("status", status).toString();
}
}
/**
* A lighter weight Reference than AtomicReference.
*/
static final class Ref<T> {
T value;
Ref(T value) {
this.value = value;
}
}
private static final class OutstandingRequestsTracingFactory extends
ClientStreamTracer.Factory {
private final AtomicInteger inFlights;
private OutstandingRequestsTracingFactory(AtomicInteger inFlights) {
this.inFlights = checkNotNull(inFlights, "inFlights");
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new ClientStreamTracer() {
@Override
public void streamCreated(Attributes transportAttrs, Metadata headers) {
inFlights.incrementAndGet();
}
@Override
public void streamClosed(Status status) {
inFlights.decrementAndGet();
}
};
}
}
static final class LeastRequestConfig {
final int choiceCount;
LeastRequestConfig(int choiceCount) {
checkArgument(choiceCount >= MIN_CHOICE_COUNT, "choiceCount <= 1");
// Even though a choiceCount value larger than 2 is currently considered valid in xDS
// we restrict it to 10 here as specified in "A48: xDS Least Request LB Policy".
this.choiceCount = Math.min(choiceCount, MAX_CHOICE_COUNT);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("choiceCount", choiceCount)
.toString();
}
}
}