| /* |
| * Copyright 2017 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.grpclb; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static io.grpc.ConnectivityState.CONNECTING; |
| import static io.grpc.ConnectivityState.IDLE; |
| import static io.grpc.ConnectivityState.READY; |
| import static io.grpc.ConnectivityState.SHUTDOWN; |
| import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.MoreObjects; |
| import com.google.common.base.Objects; |
| import com.google.common.base.Stopwatch; |
| import com.google.protobuf.util.Durations; |
| import io.grpc.Attributes; |
| import io.grpc.ChannelLogger; |
| import io.grpc.ChannelLogger.ChannelLogLevel; |
| import io.grpc.ConnectivityState; |
| import io.grpc.ConnectivityStateInfo; |
| import io.grpc.EquivalentAddressGroup; |
| import io.grpc.LoadBalancer.CreateSubchannelArgs; |
| import io.grpc.LoadBalancer.Helper; |
| import io.grpc.LoadBalancer.PickResult; |
| import io.grpc.LoadBalancer.PickSubchannelArgs; |
| import io.grpc.LoadBalancer.Subchannel; |
| import io.grpc.LoadBalancer.SubchannelPicker; |
| import io.grpc.LoadBalancer.SubchannelStateListener; |
| import io.grpc.ManagedChannel; |
| import io.grpc.Metadata; |
| import io.grpc.Status; |
| import io.grpc.SynchronizationContext; |
| import io.grpc.SynchronizationContext.ScheduledHandle; |
| import io.grpc.grpclb.SubchannelPool.PooledSubchannelStateListener; |
| import io.grpc.internal.BackoffPolicy; |
| import io.grpc.internal.TimeProvider; |
| import io.grpc.lb.v1.ClientStats; |
| import io.grpc.lb.v1.InitialLoadBalanceRequest; |
| import io.grpc.lb.v1.InitialLoadBalanceResponse; |
| import io.grpc.lb.v1.LoadBalanceRequest; |
| import io.grpc.lb.v1.LoadBalanceResponse; |
| import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase; |
| import io.grpc.lb.v1.LoadBalancerGrpc; |
| import io.grpc.lb.v1.Server; |
| import io.grpc.lb.v1.ServerList; |
| import io.grpc.stub.StreamObserver; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.NotThreadSafe; |
| |
| /** |
| * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}. Created when |
| * GrpclbLoadBalancer switches to GRPCLB mode. Closed and discarded when GrpclbLoadBalancer |
| * switches away from GRPCLB mode. |
| */ |
| @NotThreadSafe |
| final class GrpclbState { |
| static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); |
| private static final Attributes LB_PROVIDED_BACKEND_ATTRS = |
| Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build(); |
| |
| @VisibleForTesting |
| static final PickResult DROP_PICK_RESULT = |
| PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); |
| @VisibleForTesting |
| static final Status NO_AVAILABLE_BACKENDS_STATUS = |
| Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends"); |
| |
| @VisibleForTesting |
| static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { |
| @Override |
| public PickResult picked(Metadata headers) { |
| return PickResult.withNoResult(); |
| } |
| |
| @Override |
| public String toString() { |
| return "BUFFER_ENTRY"; |
| } |
| }; |
| @VisibleForTesting |
| static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed"; |
| |
| enum Mode { |
| ROUND_ROBIN, |
| PICK_FIRST, |
| } |
| |
| private final String serviceName; |
| private final Helper helper; |
| private final SynchronizationContext syncContext; |
| @Nullable |
| private final SubchannelPool subchannelPool; |
| private final TimeProvider time; |
| private final Stopwatch stopwatch; |
| private final ScheduledExecutorService timerService; |
| |
| private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO = |
| Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); |
| private final BackoffPolicy.Provider backoffPolicyProvider; |
| private final ChannelLogger logger; |
| |
| // Scheduled only once. Never reset. |
| @Nullable |
| private ScheduledHandle fallbackTimer; |
| private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList(); |
| private boolean usingFallbackBackends; |
| // True if the current balancer has returned a serverlist. Will be reset to false when lost |
| // connection to a balancer. |
| private boolean balancerWorking; |
| @Nullable |
| private BackoffPolicy lbRpcRetryPolicy; |
| @Nullable |
| private ScheduledHandle lbRpcRetryTimer; |
| |
| @Nullable |
| private ManagedChannel lbCommChannel; |
| private boolean lbSentEmptyBackends = false; |
| |
| @Nullable |
| private LbStream lbStream; |
| private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Collections.emptyMap(); |
| private final GrpclbConfig config; |
| |
| // Has the same size as the round-robin list from the balancer. |
| // A drop entry from the round-robin list becomes a DropEntry here. |
| // A backend entry from the robin-robin list becomes a null here. |
| private List<DropEntry> dropList = Collections.emptyList(); |
| // Contains only non-drop, i.e., backends from the round-robin list from the balancer. |
| private List<BackendEntry> backendList = Collections.emptyList(); |
| private RoundRobinPicker currentPicker = |
| new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY)); |
| private boolean requestConnectionPending; |
| |
| GrpclbState( |
| GrpclbConfig config, |
| Helper helper, |
| SubchannelPool subchannelPool, |
| TimeProvider time, |
| Stopwatch stopwatch, |
| BackoffPolicy.Provider backoffPolicyProvider) { |
| this.config = checkNotNull(config, "config"); |
| this.helper = checkNotNull(helper, "helper"); |
| this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); |
| if (config.getMode() == Mode.ROUND_ROBIN) { |
| this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); |
| subchannelPool.registerListener( |
| new PooledSubchannelStateListener() { |
| @Override |
| public void onSubchannelState( |
| Subchannel subchannel, ConnectivityStateInfo newState) { |
| handleSubchannelState(subchannel, newState); |
| } |
| }); |
| } else { |
| this.subchannelPool = null; |
| } |
| this.time = checkNotNull(time, "time provider"); |
| this.stopwatch = checkNotNull(stopwatch, "stopwatch"); |
| this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); |
| this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); |
| if (config.getServiceName() != null) { |
| this.serviceName = config.getServiceName(); |
| } else { |
| this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); |
| } |
| this.logger = checkNotNull(helper.getChannelLogger(), "logger"); |
| logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Created", serviceName); |
| } |
| |
| void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { |
| if (newState.getState() == SHUTDOWN || !subchannels.containsValue(subchannel)) { |
| return; |
| } |
| if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) { |
| subchannel.requestConnection(); |
| } |
| AtomicReference<ConnectivityStateInfo> stateInfoRef = |
| subchannel.getAttributes().get(STATE_INFO); |
| // If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at |
| // every moment which causes RR to stay in CONNECTING. It's better to keep the TRANSIENT_FAILURE |
| // state in that case so that fail-fast RPCs can fail fast. |
| boolean keepState = |
| config.getMode() == Mode.ROUND_ROBIN |
| && stateInfoRef.get().getState() == TRANSIENT_FAILURE |
| && (newState.getState() == CONNECTING || newState.getState() == IDLE); |
| if (!keepState) { |
| stateInfoRef.set(newState); |
| maybeUseFallbackBackends(); |
| maybeUpdatePicker(); |
| } |
| } |
| |
| /** |
| * Handle new addresses of the balancer and backends from the resolver, and create connection if |
| * not yet connected. |
| */ |
| void handleAddresses( |
| List<EquivalentAddressGroup> newLbAddressGroups, |
| List<EquivalentAddressGroup> newBackendServers) { |
| logger.log( |
| ChannelLogLevel.DEBUG, |
| "[grpclb-<{0}>] Resolved addresses: lb addresses {0}, backends: {1}", |
| serviceName, |
| newLbAddressGroups, |
| newBackendServers); |
| if (newLbAddressGroups.isEmpty()) { |
| // No balancer address: close existing balancer connection and enter fallback mode |
| // immediately. |
| shutdownLbComm(); |
| syncContext.execute(new FallbackModeTask()); |
| } else { |
| startLbComm(newLbAddressGroups); |
| // Avoid creating a new RPC just because the addresses were updated, as it can cause a |
| // stampeding herd. The current RPC may be on a connection to an address not present in |
| // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an |
| // outdated backend, we could choose to re-create the RPC. |
| if (lbStream == null) { |
| cancelLbRpcRetryTimer(); |
| startLbRpc(); |
| } |
| // Start the fallback timer if it's never started |
| if (fallbackTimer == null) { |
| fallbackTimer = syncContext.schedule( |
| new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService); |
| } |
| } |
| fallbackBackendList = newBackendServers; |
| if (usingFallbackBackends) { |
| // Populate the new fallback backends to round-robin list. |
| useFallbackBackends(); |
| } |
| maybeUpdatePicker(); |
| } |
| |
| void requestConnection() { |
| requestConnectionPending = true; |
| for (RoundRobinEntry entry : currentPicker.pickList) { |
| if (entry instanceof IdleSubchannelEntry) { |
| ((IdleSubchannelEntry) entry).subchannel.requestConnection(); |
| requestConnectionPending = false; |
| } |
| } |
| } |
| |
| private void maybeUseFallbackBackends() { |
| if (balancerWorking) { |
| return; |
| } |
| if (usingFallbackBackends) { |
| return; |
| } |
| for (Subchannel subchannel : subchannels.values()) { |
| if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { |
| return; |
| } |
| } |
| // Fallback conditions met |
| useFallbackBackends(); |
| } |
| |
| /** |
| * Populate the round-robin lists with the fallback backends. |
| */ |
| private void useFallbackBackends() { |
| usingFallbackBackends = true; |
| logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Using fallback backends", serviceName); |
| |
| List<DropEntry> newDropList = new ArrayList<>(); |
| List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); |
| for (EquivalentAddressGroup eag : fallbackBackendList) { |
| newDropList.add(null); |
| newBackendAddrList.add(new BackendAddressGroup(eag, null)); |
| } |
| useRoundRobinLists(newDropList, newBackendAddrList, null); |
| } |
| |
| private void shutdownLbComm() { |
| if (lbCommChannel != null) { |
| lbCommChannel.shutdown(); |
| lbCommChannel = null; |
| } |
| shutdownLbRpc(); |
| } |
| |
| private void shutdownLbRpc() { |
| if (lbStream != null) { |
| lbStream.close(Status.CANCELLED.withDescription("balancer shutdown").asException()); |
| // lbStream will be set to null in LbStream.cleanup() |
| } |
| } |
| |
| private void startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags) { |
| checkNotNull(overrideAuthorityEags, "overrideAuthorityEags"); |
| assert !overrideAuthorityEags.isEmpty(); |
| String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes() |
| .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX; |
| if (lbCommChannel == null) { |
| lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority); |
| logger.log( |
| ChannelLogLevel.DEBUG, |
| "[grpclb-<{0}>] Created grpclb channel: EAG={1}", |
| serviceName, |
| overrideAuthorityEags); |
| } else { |
| helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags); |
| } |
| } |
| |
| private void startLbRpc() { |
| checkState(lbStream == null, "previous lbStream has not been cleared yet"); |
| LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); |
| lbStream = new LbStream(stub); |
| lbStream.start(); |
| stopwatch.reset().start(); |
| |
| LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() |
| .setInitialRequest(InitialLoadBalanceRequest.newBuilder() |
| .setName(serviceName).build()) |
| .build(); |
| logger.log( |
| ChannelLogLevel.DEBUG, |
| "[grpclb-<{0}>] Sent initial grpclb request {1}", serviceName, initRequest); |
| try { |
| lbStream.lbRequestWriter.onNext(initRequest); |
| } catch (Exception e) { |
| lbStream.close(e); |
| } |
| } |
| |
| private void cancelFallbackTimer() { |
| if (fallbackTimer != null) { |
| fallbackTimer.cancel(); |
| } |
| } |
| |
| private void cancelLbRpcRetryTimer() { |
| if (lbRpcRetryTimer != null) { |
| lbRpcRetryTimer.cancel(); |
| lbRpcRetryTimer = null; |
| } |
| } |
| |
| void shutdown() { |
| logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Shutdown", serviceName); |
| shutdownLbComm(); |
| switch (config.getMode()) { |
| case ROUND_ROBIN: |
| // We close the subchannels through subchannelPool instead of helper just for convenience of |
| // testing. |
| for (Subchannel subchannel : subchannels.values()) { |
| returnSubchannelToPool(subchannel); |
| } |
| subchannelPool.clear(); |
| break; |
| case PICK_FIRST: |
| if (!subchannels.isEmpty()) { |
| checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels); |
| subchannels.values().iterator().next().shutdown(); |
| } |
| break; |
| default: |
| throw new AssertionError("Missing case for " + config.getMode()); |
| } |
| subchannels = Collections.emptyMap(); |
| cancelFallbackTimer(); |
| cancelLbRpcRetryTimer(); |
| } |
| |
| void propagateError(Status status) { |
| logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status); |
| if (backendList.isEmpty()) { |
| maybeUpdatePicker( |
| TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status)))); |
| } |
| } |
| |
| private void returnSubchannelToPool(Subchannel subchannel) { |
| subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get()); |
| } |
| |
| @VisibleForTesting |
| @Nullable |
| GrpclbClientLoadRecorder getLoadRecorder() { |
| if (lbStream == null) { |
| return null; |
| } |
| return lbStream.loadRecorder; |
| } |
| |
| /** |
| * Populate the round-robin lists with the given values. |
| */ |
| private void useRoundRobinLists( |
| List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, |
| @Nullable GrpclbClientLoadRecorder loadRecorder) { |
| logger.log( |
| ChannelLogLevel.INFO, |
| "[grpclb-<{0}>] Using RR list={1}, drop={2}", |
| serviceName, |
| newBackendAddrList, |
| newDropList); |
| HashMap<List<EquivalentAddressGroup>, Subchannel> newSubchannelMap = |
| new HashMap<>(); |
| List<BackendEntry> newBackendList = new ArrayList<>(); |
| |
| switch (config.getMode()) { |
| case ROUND_ROBIN: |
| for (BackendAddressGroup backendAddr : newBackendAddrList) { |
| EquivalentAddressGroup eag = backendAddr.getAddresses(); |
| List<EquivalentAddressGroup> eagAsList = Collections.singletonList(eag); |
| Subchannel subchannel = newSubchannelMap.get(eagAsList); |
| if (subchannel == null) { |
| subchannel = subchannels.get(eagAsList); |
| if (subchannel == null) { |
| subchannel = subchannelPool.takeOrCreateSubchannel(eag, createSubchannelAttrs()); |
| subchannel.requestConnection(); |
| } |
| newSubchannelMap.put(eagAsList, subchannel); |
| } |
| BackendEntry entry; |
| // Only picks with tokens are reported to LoadRecorder |
| if (backendAddr.getToken() == null) { |
| entry = new BackendEntry(subchannel); |
| } else { |
| entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken()); |
| } |
| newBackendList.add(entry); |
| } |
| // Close Subchannels whose addresses have been delisted |
| for (Map.Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) { |
| List<EquivalentAddressGroup> eagList = entry.getKey(); |
| if (!newSubchannelMap.containsKey(eagList)) { |
| returnSubchannelToPool(entry.getValue()); |
| } |
| } |
| subchannels = Collections.unmodifiableMap(newSubchannelMap); |
| break; |
| case PICK_FIRST: |
| checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels); |
| final Subchannel subchannel; |
| if (newBackendAddrList.isEmpty()) { |
| if (subchannels.size() == 1) { |
| cancelFallbackTimer(); |
| subchannel = subchannels.values().iterator().next(); |
| subchannel.shutdown(); |
| subchannels = Collections.emptyMap(); |
| } |
| break; |
| } |
| List<EquivalentAddressGroup> eagList = new ArrayList<>(); |
| // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to |
| // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on |
| // headers. |
| // |
| // The PICK_FIRST code path doesn't cache Subchannels. |
| for (BackendAddressGroup bag : newBackendAddrList) { |
| EquivalentAddressGroup origEag = bag.getAddresses(); |
| Attributes eagAttrs = origEag.getAttributes(); |
| if (bag.getToken() != null) { |
| eagAttrs = eagAttrs.toBuilder() |
| .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build(); |
| } |
| eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); |
| } |
| if (subchannels.isEmpty()) { |
| subchannel = |
| helper.createSubchannel( |
| CreateSubchannelArgs.newBuilder() |
| .setAddresses(eagList) |
| .setAttributes(createSubchannelAttrs()) |
| .build()); |
| subchannel.start(new SubchannelStateListener() { |
| @Override |
| public void onSubchannelState(ConnectivityStateInfo newState) { |
| handleSubchannelState(subchannel, newState); |
| } |
| }); |
| if (requestConnectionPending) { |
| subchannel.requestConnection(); |
| requestConnectionPending = false; |
| } |
| } else { |
| subchannel = subchannels.values().iterator().next(); |
| subchannel.updateAddresses(eagList); |
| } |
| subchannels = Collections.singletonMap(eagList, subchannel); |
| newBackendList.add( |
| new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder))); |
| break; |
| default: |
| throw new AssertionError("Missing case for " + config.getMode()); |
| } |
| |
| dropList = Collections.unmodifiableList(newDropList); |
| backendList = Collections.unmodifiableList(newBackendList); |
| } |
| |
| @VisibleForTesting |
| class FallbackModeTask implements Runnable { |
| @Override |
| public void run() { |
| maybeUseFallbackBackends(); |
| maybeUpdatePicker(); |
| } |
| } |
| |
| @VisibleForTesting |
| class LbRpcRetryTask implements Runnable { |
| @Override |
| public void run() { |
| startLbRpc(); |
| } |
| } |
| |
| @VisibleForTesting |
| static class LoadReportingTask implements Runnable { |
| private final LbStream stream; |
| |
| LoadReportingTask(LbStream stream) { |
| this.stream = stream; |
| } |
| |
| @Override |
| public void run() { |
| stream.loadReportTimer = null; |
| stream.sendLoadReport(); |
| } |
| } |
| |
| private class LbStream implements StreamObserver<LoadBalanceResponse> { |
| final GrpclbClientLoadRecorder loadRecorder; |
| final LoadBalancerGrpc.LoadBalancerStub stub; |
| StreamObserver<LoadBalanceRequest> lbRequestWriter; |
| |
| // These fields are only accessed from helper.runSerialized() |
| boolean initialResponseReceived; |
| boolean closed; |
| long loadReportIntervalMillis = -1; |
| ScheduledHandle loadReportTimer; |
| |
| LbStream(LoadBalancerGrpc.LoadBalancerStub stub) { |
| this.stub = checkNotNull(stub, "stub"); |
| // Stats data only valid for current LbStream. We do not carry over data from previous |
| // stream. |
| loadRecorder = new GrpclbClientLoadRecorder(time); |
| } |
| |
| void start() { |
| lbRequestWriter = stub.withWaitForReady().balanceLoad(this); |
| } |
| |
| @Override public void onNext(final LoadBalanceResponse response) { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| handleResponse(response); |
| } |
| }); |
| } |
| |
| @Override public void onError(final Throwable error) { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| handleStreamClosed(Status.fromThrowable(error) |
| .augmentDescription("Stream to GRPCLB LoadBalancer had an error")); |
| } |
| }); |
| } |
| |
| @Override public void onCompleted() { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| handleStreamClosed( |
| Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed")); |
| } |
| }); |
| } |
| |
| // Following methods must be run in helper.runSerialized() |
| |
| private void sendLoadReport() { |
| if (closed) { |
| return; |
| } |
| ClientStats stats = loadRecorder.generateLoadReport(); |
| // TODO(zhangkun83): flow control? |
| try { |
| lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build()); |
| scheduleNextLoadReport(); |
| } catch (Exception e) { |
| close(e); |
| } |
| } |
| |
| private void scheduleNextLoadReport() { |
| if (loadReportIntervalMillis > 0) { |
| loadReportTimer = syncContext.schedule( |
| new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS, |
| timerService); |
| } |
| } |
| |
| private void handleResponse(LoadBalanceResponse response) { |
| if (closed) { |
| return; |
| } |
| logger.log( |
| ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response: {1}", serviceName, response); |
| |
| LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase(); |
| if (!initialResponseReceived) { |
| if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) { |
| logger.log( |
| ChannelLogLevel.WARNING, |
| "[grpclb-<{0}>] Received a response without initial response", |
| serviceName); |
| return; |
| } |
| initialResponseReceived = true; |
| InitialLoadBalanceResponse initialResponse = response.getInitialResponse(); |
| loadReportIntervalMillis = |
| Durations.toMillis(initialResponse.getClientStatsReportInterval()); |
| scheduleNextLoadReport(); |
| return; |
| } |
| |
| if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) { |
| cancelFallbackTimer(); |
| useFallbackBackends(); |
| maybeUpdatePicker(); |
| return; |
| } else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) { |
| logger.log( |
| ChannelLogLevel.WARNING, |
| "[grpclb-<{0}>] Ignoring unexpected response type: {1}", |
| serviceName, |
| typeCase); |
| return; |
| } |
| |
| balancerWorking = true; |
| // TODO(zhangkun83): handle delegate from initialResponse |
| ServerList serverList = response.getServerList(); |
| List<DropEntry> newDropList = new ArrayList<>(); |
| List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); |
| // Construct the new collections. Create new Subchannels when necessary. |
| for (Server server : serverList.getServersList()) { |
| String token = server.getLoadBalanceToken(); |
| if (server.getDrop()) { |
| newDropList.add(new DropEntry(loadRecorder, token)); |
| } else { |
| newDropList.add(null); |
| InetSocketAddress address; |
| try { |
| address = new InetSocketAddress( |
| InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort()); |
| } catch (UnknownHostException e) { |
| propagateError( |
| Status.UNAVAILABLE |
| .withDescription("Host for server not found: " + server) |
| .withCause(e)); |
| continue; |
| } |
| // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of |
| // TLS, with Netty. |
| EquivalentAddressGroup eag = |
| new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS); |
| newBackendAddrList.add(new BackendAddressGroup(eag, token)); |
| } |
| } |
| // Stop using fallback backends as soon as a new server list is received from the balancer. |
| usingFallbackBackends = false; |
| lbSentEmptyBackends = serverList.getServersList().isEmpty(); |
| cancelFallbackTimer(); |
| useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); |
| maybeUpdatePicker(); |
| } |
| |
| private void handleStreamClosed(Status error) { |
| checkArgument(!error.isOk(), "unexpected OK status"); |
| if (closed) { |
| return; |
| } |
| closed = true; |
| cleanUp(); |
| propagateError(error); |
| balancerWorking = false; |
| maybeUseFallbackBackends(); |
| maybeUpdatePicker(); |
| |
| long delayNanos = 0; |
| if (initialResponseReceived || lbRpcRetryPolicy == null) { |
| // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence |
| // has never been initialized. |
| lbRpcRetryPolicy = backoffPolicyProvider.get(); |
| } |
| // Backoff only when balancer wasn't working previously. |
| if (!initialResponseReceived) { |
| // The back-off policy determines the interval between consecutive RPC upstarts, thus the |
| // actual delay may be smaller than the value from the back-off policy, or even negative, |
| // depending how much time was spent in the previous RPC. |
| delayNanos = |
| lbRpcRetryPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS); |
| } |
| if (delayNanos <= 0) { |
| startLbRpc(); |
| } else { |
| lbRpcRetryTimer = |
| syncContext.schedule(new LbRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, |
| timerService); |
| } |
| |
| helper.refreshNameResolution(); |
| } |
| |
| void close(Exception error) { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| cleanUp(); |
| lbRequestWriter.onError(error); |
| } |
| |
| private void cleanUp() { |
| if (loadReportTimer != null) { |
| loadReportTimer.cancel(); |
| loadReportTimer = null; |
| } |
| if (lbStream == this) { |
| lbStream = null; |
| } |
| } |
| } |
| |
| /** |
| * Make and use a picker out of the current lists and the states of subchannels if they have |
| * changed since the last picker created. |
| */ |
| private void maybeUpdatePicker() { |
| List<RoundRobinEntry> pickList; |
| ConnectivityState state; |
| switch (config.getMode()) { |
| case ROUND_ROBIN: |
| pickList = new ArrayList<>(backendList.size()); |
| Status error = null; |
| boolean hasIdle = false; |
| for (BackendEntry entry : backendList) { |
| Subchannel subchannel = entry.subchannel; |
| Attributes attrs = subchannel.getAttributes(); |
| ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); |
| if (stateInfo.getState() == READY) { |
| pickList.add(entry); |
| } else if (stateInfo.getState() == TRANSIENT_FAILURE) { |
| error = stateInfo.getStatus(); |
| } else if (stateInfo.getState() == IDLE) { |
| hasIdle = true; |
| } |
| } |
| if (pickList.isEmpty()) { |
| if (error != null && !hasIdle) { |
| pickList.add(new ErrorEntry(error)); |
| state = TRANSIENT_FAILURE; |
| } else { |
| pickList.add(BUFFER_ENTRY); |
| state = CONNECTING; |
| } |
| } else { |
| state = READY; |
| } |
| break; |
| case PICK_FIRST: |
| if (backendList.isEmpty()) { |
| if (lbSentEmptyBackends) { |
| pickList = |
| Collections.<RoundRobinEntry>singletonList( |
| new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); |
| state = TRANSIENT_FAILURE; |
| } else { |
| pickList = Collections.singletonList(BUFFER_ENTRY); |
| // Have not received server addresses |
| state = CONNECTING; |
| } |
| } else { |
| checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); |
| BackendEntry onlyEntry = backendList.get(0); |
| ConnectivityStateInfo stateInfo = |
| onlyEntry.subchannel.getAttributes().get(STATE_INFO).get(); |
| state = stateInfo.getState(); |
| switch (state) { |
| case READY: |
| pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry); |
| break; |
| case TRANSIENT_FAILURE: |
| pickList = |
| Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus())); |
| break; |
| case CONNECTING: |
| pickList = Collections.singletonList(BUFFER_ENTRY); |
| break; |
| default: |
| pickList = Collections.<RoundRobinEntry>singletonList( |
| new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); |
| } |
| } |
| break; |
| default: |
| throw new AssertionError("Missing case for " + config.getMode()); |
| } |
| maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); |
| } |
| |
| /** |
| * Update the given picker to the helper if it's different from the current one. |
| */ |
| private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) { |
| // Discard the new picker if we are sure it won't make any difference, in order to save |
| // re-processing pending streams, and avoid unnecessary resetting of the pointer in |
| // RoundRobinPicker. |
| if (picker.dropList.equals(currentPicker.dropList) |
| && picker.pickList.equals(currentPicker.pickList)) { |
| return; |
| } |
| currentPicker = picker; |
| logger.log( |
| ChannelLogLevel.INFO, |
| "[grpclb-<{0}>] Update balancing state to {1}: picks={2}, drops={3}", |
| serviceName, |
| state, |
| picker.pickList, |
| picker.dropList); |
| helper.updateBalancingState(state, picker); |
| } |
| |
| private static Attributes createSubchannelAttrs() { |
| return Attributes.newBuilder() |
| .set(STATE_INFO, |
| new AtomicReference<>( |
| ConnectivityStateInfo.forNonError(IDLE))) |
| .build(); |
| } |
| |
| @VisibleForTesting |
| static final class DropEntry { |
| private final GrpclbClientLoadRecorder loadRecorder; |
| private final String token; |
| |
| DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) { |
| this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); |
| this.token = checkNotNull(token, "token"); |
| } |
| |
| PickResult picked() { |
| loadRecorder.recordDroppedRequest(token); |
| return DROP_PICK_RESULT; |
| } |
| |
| @Override |
| public String toString() { |
| // This is printed in logs. Only include useful information. |
| return "drop(" + token + ")"; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(loadRecorder, token); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof DropEntry)) { |
| return false; |
| } |
| DropEntry that = (DropEntry) other; |
| return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token); |
| } |
| } |
| |
| @VisibleForTesting |
| interface RoundRobinEntry { |
| PickResult picked(Metadata headers); |
| } |
| |
| @VisibleForTesting |
| static final class BackendEntry implements RoundRobinEntry { |
| final Subchannel subchannel; |
| @VisibleForTesting |
| final PickResult result; |
| @Nullable |
| private final String token; |
| |
| /** |
| * For ROUND_ROBIN: creates a BackendEntry whose usage will be reported to load recorder. |
| */ |
| BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { |
| this.subchannel = checkNotNull(subchannel, "subchannel"); |
| this.result = |
| PickResult.withSubchannel(subchannel, checkNotNull(loadRecorder, "loadRecorder")); |
| this.token = checkNotNull(token, "token"); |
| } |
| |
| /** |
| * For ROUND_ROBIN/PICK_FIRST: creates a BackendEntry whose usage will not be reported. |
| */ |
| BackendEntry(Subchannel subchannel) { |
| this.subchannel = checkNotNull(subchannel, "subchannel"); |
| this.result = PickResult.withSubchannel(subchannel); |
| this.token = null; |
| } |
| |
| /** |
| * For PICK_FIRST: creates a BackendEntry that includes all addresses. |
| */ |
| BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory) { |
| this.subchannel = checkNotNull(subchannel, "subchannel"); |
| this.result = |
| PickResult.withSubchannel(subchannel, checkNotNull(tracerFactory, "tracerFactory")); |
| this.token = null; |
| } |
| |
| @Override |
| public PickResult picked(Metadata headers) { |
| headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); |
| if (token != null) { |
| headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); |
| } |
| return result; |
| } |
| |
| @Override |
| public String toString() { |
| // This is printed in logs. Only give out useful information. |
| return "[" + subchannel.getAllAddresses().toString() + "(" + token + ")]"; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(result, token); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof BackendEntry)) { |
| return false; |
| } |
| BackendEntry that = (BackendEntry) other; |
| return Objects.equal(result, that.result) && Objects.equal(token, that.token); |
| } |
| } |
| |
| @VisibleForTesting |
| static final class IdleSubchannelEntry implements RoundRobinEntry { |
| private final SynchronizationContext syncContext; |
| private final Subchannel subchannel; |
| private final AtomicBoolean connectionRequested = new AtomicBoolean(false); |
| |
| IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) { |
| this.subchannel = checkNotNull(subchannel, "subchannel"); |
| this.syncContext = checkNotNull(syncContext, "syncContext"); |
| } |
| |
| @Override |
| public PickResult picked(Metadata headers) { |
| if (connectionRequested.compareAndSet(false, true)) { |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| subchannel.requestConnection(); |
| } |
| }); |
| } |
| return PickResult.withNoResult(); |
| } |
| |
| @Override |
| public String toString() { |
| // This is printed in logs. Only give out useful information. |
| return "(idle)[" + subchannel.getAllAddresses().toString() + "]"; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(subchannel, syncContext); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof IdleSubchannelEntry)) { |
| return false; |
| } |
| IdleSubchannelEntry that = (IdleSubchannelEntry) other; |
| return Objects.equal(subchannel, that.subchannel) |
| && Objects.equal(syncContext, that.syncContext); |
| } |
| } |
| |
| @VisibleForTesting |
| static final class ErrorEntry implements RoundRobinEntry { |
| final PickResult result; |
| |
| ErrorEntry(Status status) { |
| result = PickResult.withError(status); |
| } |
| |
| @Override |
| public PickResult picked(Metadata headers) { |
| return result; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(result); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof ErrorEntry)) { |
| return false; |
| } |
| return Objects.equal(result, ((ErrorEntry) other).result); |
| } |
| |
| @Override |
| public String toString() { |
| // This is printed in logs. Only include useful information. |
| return result.getStatus().toString(); |
| } |
| } |
| |
| @VisibleForTesting |
| static final class RoundRobinPicker extends SubchannelPicker { |
| @VisibleForTesting |
| final List<DropEntry> dropList; |
| private int dropIndex; |
| |
| @VisibleForTesting |
| final List<? extends RoundRobinEntry> pickList; |
| private int pickIndex; |
| |
| // dropList can be empty, which means no drop. |
| // pickList must not be empty. |
| RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) { |
| this.dropList = checkNotNull(dropList, "dropList"); |
| this.pickList = checkNotNull(pickList, "pickList"); |
| checkArgument(!pickList.isEmpty(), "pickList is empty"); |
| } |
| |
| @Override |
| public PickResult pickSubchannel(PickSubchannelArgs args) { |
| synchronized (pickList) { |
| // Two-level round-robin. |
| // First round-robin on dropList. If a drop entry is selected, request will be dropped. If |
| // a non-drop entry is selected, then round-robin on pickList. This makes sure requests are |
| // dropped at the same proportion as the drop entries appear on the round-robin list from |
| // the balancer, while only backends from pickList are selected for the non-drop cases. |
| if (!dropList.isEmpty()) { |
| DropEntry drop = dropList.get(dropIndex); |
| dropIndex++; |
| if (dropIndex == dropList.size()) { |
| dropIndex = 0; |
| } |
| if (drop != null) { |
| return drop.picked(); |
| } |
| } |
| |
| RoundRobinEntry pick = pickList.get(pickIndex); |
| pickIndex++; |
| if (pickIndex == pickList.size()) { |
| pickIndex = 0; |
| } |
| return pick.picked(args.getHeaders()); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(RoundRobinPicker.class) |
| .add("dropList", dropList) |
| .add("pickList", pickList) |
| .toString(); |
| } |
| } |
| } |