blob: 0abef014f8a3557b015566b24818f6bbdb76fc38 [file] [log] [blame]
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.rls;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.ObjectPool;
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
/** Configuration for RLS load balancing policy. */
final class LbPolicyConfiguration {
private final RouteLookupConfig routeLookupConfig;
@Nullable
private final Map<String, ?> routeLookupChannelServiceConfig;
private final ChildLoadBalancingPolicy policy;
LbPolicyConfiguration(
RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig,
ChildLoadBalancingPolicy policy) {
this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
this.routeLookupChannelServiceConfig = routeLookupChannelServiceConfig;
this.policy = checkNotNull(policy, "policy");
}
RouteLookupConfig getRouteLookupConfig() {
return routeLookupConfig;
}
@Nullable
Map<String, ?> getRouteLookupChannelServiceConfig() {
return routeLookupChannelServiceConfig;
}
ChildLoadBalancingPolicy getLoadBalancingPolicy() {
return policy;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LbPolicyConfiguration that = (LbPolicyConfiguration) o;
return Objects.equals(routeLookupConfig, that.routeLookupConfig)
&& Objects.equals(routeLookupChannelServiceConfig, that.routeLookupChannelServiceConfig)
&& Objects.equals(policy, that.policy);
}
@Override
public int hashCode() {
return Objects.hash(routeLookupConfig, routeLookupChannelServiceConfig, policy);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("routeLookupConfig", routeLookupConfig)
.add("routeLookupChannelServiceConfig", routeLookupChannelServiceConfig)
.add("policy", policy)
.toString();
}
/** ChildLoadBalancingPolicy is an elected child policy to delegate requests. */
static final class ChildLoadBalancingPolicy {
private final Map<String, Object> effectiveRawChildPolicy;
private final LoadBalancerProvider effectiveLbProvider;
private final String targetFieldName;
@VisibleForTesting
ChildLoadBalancingPolicy(
String targetFieldName,
Map<String, Object> effectiveRawChildPolicy,
LoadBalancerProvider effectiveLbProvider) {
checkArgument(
targetFieldName != null && !targetFieldName.isEmpty(),
"targetFieldName cannot be empty or null");
this.targetFieldName = targetFieldName;
this.effectiveRawChildPolicy =
checkNotNull(effectiveRawChildPolicy, "effectiveRawChildPolicy");
this.effectiveLbProvider = checkNotNull(effectiveLbProvider, "effectiveLbProvider");
}
/** Creates ChildLoadBalancingPolicy. */
@SuppressWarnings("unchecked")
static ChildLoadBalancingPolicy create(
String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies)
throws InvalidChildPolicyConfigException {
Map<String, Object> effectiveChildPolicy = null;
LoadBalancerProvider effectiveLbProvider = null;
List<String> policyTried = new ArrayList<>();
LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry();
for (Map<String, ?> childPolicy : childPolicies) {
if (childPolicy.isEmpty()) {
continue;
}
if (childPolicy.size() != 1) {
throw
new InvalidChildPolicyConfigException(
"childPolicy should have exactly one loadbalancing policy");
}
String policyName = childPolicy.keySet().iterator().next();
LoadBalancerProvider provider = lbRegistry.getProvider(policyName);
if (provider != null) {
effectiveLbProvider = provider;
effectiveChildPolicy = Collections.unmodifiableMap(childPolicy);
break;
}
policyTried.add(policyName);
}
if (effectiveChildPolicy == null) {
throw
new InvalidChildPolicyConfigException(
String.format("no valid childPolicy found, policy tried: %s", policyTried));
}
return
new ChildLoadBalancingPolicy(
childPolicyConfigTargetFieldName,
(Map<String, Object>) effectiveChildPolicy.values().iterator().next(),
effectiveLbProvider);
}
/** Creates a child load balancer config for given target from elected raw child policy. */
Map<String, ?> getEffectiveChildPolicy(String target) {
Map<String, Object> childPolicy = new HashMap<>(effectiveRawChildPolicy);
childPolicy.put(targetFieldName, target);
return childPolicy;
}
/** Returns the elected child {@link LoadBalancerProvider}. */
LoadBalancerProvider getEffectiveLbProvider() {
return effectiveLbProvider;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChildLoadBalancingPolicy that = (ChildLoadBalancingPolicy) o;
return Objects.equals(effectiveRawChildPolicy, that.effectiveRawChildPolicy)
&& Objects.equals(effectiveLbProvider, that.effectiveLbProvider)
&& Objects.equals(targetFieldName, that.targetFieldName);
}
@Override
public int hashCode() {
return Objects.hash(effectiveRawChildPolicy, effectiveLbProvider, targetFieldName);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("effectiveRawChildPolicy", effectiveRawChildPolicy)
.add("effectiveLbProvider", effectiveLbProvider)
.add("childPolicyConfigTargetFieldName", targetFieldName)
.toString();
}
}
/** Factory for {@link ChildPolicyWrapper}. */
static final class RefCountedChildPolicyWrapperFactory {
// GuardedBy CachingRlsLbClient.lock
@VisibleForTesting
final Map<String /* target */, RefCountedChildPolicyWrapper> childPolicyMap =
new HashMap<>();
private final ChildLoadBalancerHelperProvider childLbHelperProvider;
private final ChildLbStatusListener childLbStatusListener;
private final ChildLoadBalancingPolicy childPolicy;
private final ResolvedAddressFactory childLbResolvedAddressFactory;
public RefCountedChildPolicyWrapperFactory(
ChildLoadBalancingPolicy childPolicy,
ResolvedAddressFactory childLbResolvedAddressFactory,
ChildLoadBalancerHelperProvider childLbHelperProvider,
ChildLbStatusListener childLbStatusListener) {
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
this.childLbResolvedAddressFactory =
checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
}
// GuardedBy CachingRlsLbClient.lock
ChildPolicyWrapper createOrGet(String target) {
// TODO(creamsoup) check if the target is valid or not
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
if (pooledChildPolicyWrapper == null) {
ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
childLbStatusListener);
pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
childPolicyMap.put(target, pooledChildPolicyWrapper);
return pooledChildPolicyWrapper.getObject();
} else {
ChildPolicyWrapper childPolicyWrapper = pooledChildPolicyWrapper.getObject();
if (childPolicyWrapper.getPicker() != null) {
childPolicyWrapper.refreshState();
}
return childPolicyWrapper;
}
}
// GuardedBy CachingRlsLbClient.lock
List<ChildPolicyWrapper> createOrGet(List<String> targets) {
List<ChildPolicyWrapper> retVal = new ArrayList<>();
for (String target : targets) {
retVal.add(createOrGet(target));
}
return retVal;
}
// GuardedBy CachingRlsLbClient.lock
void release(ChildPolicyWrapper childPolicyWrapper) {
checkNotNull(childPolicyWrapper, "childPolicyWrapper");
String target = childPolicyWrapper.getTarget();
RefCountedChildPolicyWrapper existing = childPolicyMap.get(target);
checkState(existing != null, "Cannot access already released object");
existing.returnObject(childPolicyWrapper);
if (existing.isReleased()) {
childPolicyMap.remove(target);
}
}
}
/**
* ChildPolicyWrapper is a wrapper class for child load balancing policy with associated helper /
* utility classes to manage the child policy.
*/
static final class ChildPolicyWrapper {
private final String target;
private final ChildPolicyReportingHelper helper;
private final LoadBalancer lb;
private volatile SubchannelPicker picker;
private ConnectivityState state;
public ChildPolicyWrapper(
String target,
ChildLoadBalancingPolicy childPolicy,
final ResolvedAddressFactory childLbResolvedAddressFactory,
ChildLoadBalancerHelperProvider childLbHelperProvider,
ChildLbStatusListener childLbStatusListener) {
this.target = target;
this.helper =
new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
final ConfigOrError lbConfig =
lbProvider
.parseLoadBalancingPolicyConfig(
childPolicy.getEffectiveChildPolicy(target));
this.lb = lbProvider.newLoadBalancer(helper);
helper.getChannelLogger().log(
ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig());
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
lb.handleResolvedAddresses(
childLbResolvedAddressFactory.create(lbConfig.getConfig()));
lb.requestConnection();
}
});
}
String getTarget() {
return target;
}
SubchannelPicker getPicker() {
return picker;
}
ChildPolicyReportingHelper getHelper() {
return helper;
}
public ConnectivityState getState() {
return state;
}
void refreshState() {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
helper.updateBalancingState(state, picker);
}
}
);
}
void shutdown() {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
lb.shutdown();
}
}
);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("target", target)
.add("picker", picker)
.add("state", state)
.toString();
}
/**
* A delegating {@link io.grpc.LoadBalancer.Helper} maintains status of {@link
* ChildPolicyWrapper} when {@link Subchannel} status changed. This helper is used between child
* policy and parent load-balancer where each picker in child policy is governed by a governing
* picker (RlsPicker). The governing picker will be reported back to the parent load-balancer.
*/
final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {
private final ChildLoadBalancerHelper delegate;
private final ChildLbStatusListener listener;
ChildPolicyReportingHelper(
ChildLoadBalancerHelperProvider childHelperProvider,
ChildLbStatusListener listener) {
checkNotNull(childHelperProvider, "childHelperProvider");
this.delegate = childHelperProvider.forTarget(getTarget());
this.listener = checkNotNull(listener, "listener");
}
@Override
protected Helper delegate() {
return delegate;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
picker = newPicker;
state = newState;
super.updateBalancingState(newState, newPicker);
listener.onStatusChanged(newState);
}
}
}
/** Listener for child lb status change events. */
interface ChildLbStatusListener {
/** Notifies when child lb status changes. */
void onStatusChanged(ConnectivityState newState);
}
private static final class RefCountedChildPolicyWrapper
implements ObjectPool<ChildPolicyWrapper> {
private final AtomicLong refCnt = new AtomicLong();
@Nullable
private ChildPolicyWrapper childPolicyWrapper;
private RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper) {
this.childPolicyWrapper = checkNotNull(childPolicyWrapper, "childPolicyWrapper");
}
@Override
public ChildPolicyWrapper getObject() {
checkState(!isReleased(), "ChildPolicyWrapper is already released");
refCnt.getAndIncrement();
return childPolicyWrapper;
}
@Override
@Nullable
public ChildPolicyWrapper returnObject(Object object) {
checkState(
!isReleased(),
"cannot return already released ChildPolicyWrapper, this is possibly a bug.");
checkState(
childPolicyWrapper == object,
"returned object doesn't match the pooled childPolicyWrapper");
long newCnt = refCnt.decrementAndGet();
checkState(newCnt != -1, "Cannot return never pooled childPolicyWrapper");
if (newCnt == 0) {
childPolicyWrapper.shutdown();
childPolicyWrapper = null;
}
return null;
}
boolean isReleased() {
return childPolicyWrapper == null;
}
static RefCountedChildPolicyWrapper of(ChildPolicyWrapper childPolicyWrapper) {
return new RefCountedChildPolicyWrapper(childPolicyWrapper);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("object", childPolicyWrapper)
.add("refCnt", refCnt.get())
.toString();
}
}
/** Exception thrown when attempting to parse child policy encountered parsing issue. */
static final class InvalidChildPolicyConfigException extends Exception {
private static final long serialVersionUID = 0L;
InvalidChildPolicyConfigException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
// no stack trace above this point
return this;
}
}
}