blob: 9a8e648a9195e6d02e0df4a743308059d982ded9 [file] [log] [blame]
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.Attributes;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.xds.XdsComms.AdsStreamCallback;
import io.grpc.xds.XdsLbState.SubchannelStore;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
* A {@link LoadBalancer} that uses the XDS protocol.
*/
final class XdsLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("io.grpc.xds.XdsLoadBalancer.stateInfo");
private static final LbConfig DEFAULT_FALLBACK_POLICY =
new LbConfig("round_robin", ImmutableMap.<String, Object>of());
private final SubchannelStore subchannelStore;
private final Helper helper;
private final LoadBalancerRegistry lbRegistry;
private final FallbackManager fallbackManager;
private final AdsStreamCallback adsStreamCallback = new AdsStreamCallback() {
@Override
public void onWorking() {
fallbackManager.balancerWorking = true;
fallbackManager.cancelFallback();
}
@Override
public void onError() {
fallbackManager.balancerWorking = false;
fallbackManager.maybeUseFallbackPolicy();
}
};
@Nullable
private XdsLbState xdsLbState;
private LbConfig fallbackPolicy;
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, SubchannelStore subchannelStore) {
this.helper = checkNotNull(helper, "helper");
this.lbRegistry = lbRegistry;
this.subchannelStore = subchannelStore;
fallbackManager = new FallbackManager(helper, subchannelStore, lbRegistry);
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
Map<String, Object> newRawLbConfig = checkNotNull(
attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available");
LbConfig newLbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(newRawLbConfig);
fallbackPolicy = selectFallbackPolicy(newLbConfig, lbRegistry);
fallbackManager.updateFallbackServers(servers, attributes, fallbackPolicy);
fallbackManager.maybeStartFallbackTimer();
handleNewConfig(newLbConfig);
xdsLbState.handleResolvedAddressGroups(servers, attributes);
}
private void handleNewConfig(LbConfig newLbConfig) {
String newBalancerName = ServiceConfigUtil.getBalancerNameFromXdsConfig(newLbConfig);
LbConfig childPolicy = selectChildPolicy(newLbConfig, lbRegistry);
XdsComms xdsComms = null;
if (xdsLbState != null) { // may release and re-use/shutdown xdsComms from current xdsLbState
if (!newBalancerName.equals(xdsLbState.balancerName)) {
xdsComms = xdsLbState.shutdownAndReleaseXdsComms();
if (xdsComms != null) {
xdsComms.shutdownChannel();
fallbackManager.balancerWorking = false;
xdsComms = null;
}
} else if (!Objects.equals(
getPolicyNameOrNull(childPolicy),
getPolicyNameOrNull(xdsLbState.childPolicy))) {
String cancelMessage = "Changing loadbalancing mode";
xdsComms = xdsLbState.shutdownAndReleaseXdsComms();
// close the stream but reuse the channel
if (xdsComms != null) {
xdsComms.shutdownLbRpc(cancelMessage);
fallbackManager.balancerWorking = false;
xdsComms.refreshAdsStream();
}
} else { // effectively no change in policy, keep xdsLbState unchanged
return;
}
}
xdsLbState = new XdsLbState(
newBalancerName, childPolicy, xdsComms, helper, subchannelStore, adsStreamCallback);
}
@Nullable
private static String getPolicyNameOrNull(@Nullable LbConfig config) {
if (config == null) {
return null;
}
return config.getPolicyName();
}
@Nullable
@VisibleForTesting
static LbConfig selectChildPolicy(LbConfig lbConfig, LoadBalancerRegistry lbRegistry) {
List<LbConfig> childConfigs = ServiceConfigUtil.getChildPolicyFromXdsConfig(lbConfig);
return selectSupportedLbPolicy(childConfigs, lbRegistry);
}
@VisibleForTesting
static LbConfig selectFallbackPolicy(LbConfig lbConfig, LoadBalancerRegistry lbRegistry) {
List<LbConfig> fallbackConfigs = ServiceConfigUtil.getFallbackPolicyFromXdsConfig(lbConfig);
LbConfig fallbackPolicy = selectSupportedLbPolicy(fallbackConfigs, lbRegistry);
return fallbackPolicy == null ? DEFAULT_FALLBACK_POLICY : fallbackPolicy;
}
@Nullable
private static LbConfig selectSupportedLbPolicy(
@Nullable List<LbConfig> lbConfigs, LoadBalancerRegistry lbRegistry) {
if (lbConfigs == null) {
return null;
}
for (LbConfig lbConfig : lbConfigs) {
String lbPolicy = lbConfig.getPolicyName();
if (lbRegistry.getProvider(lbPolicy) != null) {
return lbConfig;
}
}
return null;
}
@Override
public void handleNameResolutionError(Status error) {
if (xdsLbState != null) {
if (fallbackManager.fallbackBalancer != null) {
fallbackManager.fallbackBalancer.handleNameResolutionError(error);
} else {
xdsLbState.handleNameResolutionError(error);
}
}
// TODO: impl
// else {
// helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(error));
// }
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
// xdsLbState should never be null here since handleSubchannelState cannot be called while the
// lb is shutdown.
if (newState.getState() == SHUTDOWN) {
return;
}
if (fallbackManager.fallbackBalancer != null) {
fallbackManager.fallbackBalancer.handleSubchannelState(subchannel, newState);
}
if (subchannelStore.hasSubchannel(subchannel)) {
if (newState.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
xdsLbState.handleSubchannelState(subchannel, newState);
fallbackManager.maybeUseFallbackPolicy();
}
}
@Override
public void shutdown() {
if (xdsLbState != null) {
XdsComms xdsComms = xdsLbState.shutdownAndReleaseXdsComms();
if (xdsComms != null) {
xdsComms.shutdownChannel();
}
xdsLbState = null;
}
fallbackManager.cancelFallback();
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}
@Nullable
XdsLbState getXdsLbStateForTest() {
return xdsLbState;
}
@VisibleForTesting
static final class FallbackManager {
private static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); // same as grpclb
private final Helper helper;
private final SubchannelStore subchannelStore;
private final LoadBalancerRegistry lbRegistry;
private LbConfig fallbackPolicy;
// read-only for outer class
private LoadBalancer fallbackBalancer;
// Scheduled only once. Never reset.
@Nullable
private ScheduledHandle fallbackTimer;
private List<EquivalentAddressGroup> fallbackServers = ImmutableList.of();
private Attributes fallbackAttributes;
// allow value write by outer class
private boolean balancerWorking;
FallbackManager(
Helper helper, SubchannelStore subchannelStore, LoadBalancerRegistry lbRegistry) {
this.helper = helper;
this.subchannelStore = subchannelStore;
this.lbRegistry = lbRegistry;
}
void cancelFallback() {
if (fallbackTimer != null) {
fallbackTimer.cancel();
}
if (fallbackBalancer != null) {
fallbackBalancer.shutdown();
fallbackBalancer = null;
}
}
void maybeUseFallbackPolicy() {
if (fallbackBalancer != null) {
return;
}
if (balancerWorking || subchannelStore.hasReadyBackends()) {
return;
}
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Using fallback policy");
fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName())
.newLoadBalancer(helper);
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes);
// TODO: maybe update picker
}
void updateFallbackServers(
List<EquivalentAddressGroup> servers, Attributes attributes,
LbConfig fallbackPolicy) {
this.fallbackServers = servers;
this.fallbackAttributes = Attributes.newBuilder()
.setAll(attributes)
.set(ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue())
.build();
LbConfig currentFallbackPolicy = this.fallbackPolicy;
this.fallbackPolicy = fallbackPolicy;
if (fallbackBalancer != null) {
if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) {
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes);
} else {
fallbackBalancer.shutdown();
fallbackBalancer = null;
maybeUseFallbackPolicy();
}
}
}
void maybeStartFallbackTimer() {
if (fallbackTimer == null) {
class FallbackTask implements Runnable {
@Override
public void run() {
maybeUseFallbackPolicy();
}
}
fallbackTimer = helper.getSynchronizationContext().schedule(
new FallbackTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
}
}
}
}