| /* |
| * Copyright 2016 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.internal; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; |
| import static io.grpc.ConnectivityState.IDLE; |
| import static io.grpc.ConnectivityState.SHUTDOWN; |
| import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; |
| import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.MoreObjects; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Supplier; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.SettableFuture; |
| import io.grpc.Attributes; |
| import io.grpc.CallCredentials; |
| import io.grpc.CallOptions; |
| import io.grpc.Channel; |
| import io.grpc.ChannelCredentials; |
| import io.grpc.ChannelLogger; |
| import io.grpc.ChannelLogger.ChannelLogLevel; |
| import io.grpc.ClientCall; |
| import io.grpc.ClientInterceptor; |
| import io.grpc.ClientInterceptors; |
| import io.grpc.ClientStreamTracer; |
| import io.grpc.CompressorRegistry; |
| import io.grpc.ConnectivityState; |
| import io.grpc.ConnectivityStateInfo; |
| import io.grpc.Context; |
| import io.grpc.DecompressorRegistry; |
| import io.grpc.EquivalentAddressGroup; |
| import io.grpc.ForwardingChannelBuilder; |
| import io.grpc.ForwardingClientCall; |
| import io.grpc.Grpc; |
| import io.grpc.InternalChannelz; |
| import io.grpc.InternalChannelz.ChannelStats; |
| import io.grpc.InternalChannelz.ChannelTrace; |
| import io.grpc.InternalConfigSelector; |
| import io.grpc.InternalInstrumented; |
| import io.grpc.InternalLogId; |
| import io.grpc.InternalWithLogId; |
| import io.grpc.LoadBalancer; |
| import io.grpc.LoadBalancer.CreateSubchannelArgs; |
| import io.grpc.LoadBalancer.PickResult; |
| import io.grpc.LoadBalancer.PickSubchannelArgs; |
| import io.grpc.LoadBalancer.ResolvedAddresses; |
| import io.grpc.LoadBalancer.SubchannelPicker; |
| import io.grpc.LoadBalancer.SubchannelStateListener; |
| import io.grpc.ManagedChannel; |
| import io.grpc.ManagedChannelBuilder; |
| import io.grpc.Metadata; |
| import io.grpc.MethodDescriptor; |
| import io.grpc.NameResolver; |
| import io.grpc.NameResolver.ConfigOrError; |
| import io.grpc.NameResolver.ResolutionResult; |
| import io.grpc.NameResolverRegistry; |
| import io.grpc.ProxyDetector; |
| import io.grpc.Status; |
| import io.grpc.SynchronizationContext; |
| import io.grpc.SynchronizationContext.ScheduledHandle; |
| import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; |
| import io.grpc.internal.ClientCallImpl.ClientStreamProvider; |
| import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult; |
| import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder; |
| import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider; |
| import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo; |
| import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector; |
| import io.grpc.internal.RetriableStream.ChannelBufferMeter; |
| import io.grpc.internal.RetriableStream.Throttle; |
| import io.grpc.internal.RetryingNameResolver.ResolutionResultListener; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import java.util.regex.Pattern; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| import javax.annotation.concurrent.ThreadSafe; |
| |
| /** A communication channel for making outgoing RPCs. */ |
| @ThreadSafe |
| final class ManagedChannelImpl extends ManagedChannel implements |
| InternalInstrumented<ChannelStats> { |
| @VisibleForTesting |
| static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName()); |
| |
| // Matching this pattern means the target string is a URI target or at least intended to be one. |
| // A URI target must be an absolute hierarchical URI. |
| // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." ) |
| @VisibleForTesting |
| static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*"); |
| |
| static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1; |
| |
| static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5; |
| |
| @VisibleForTesting |
| static final Status SHUTDOWN_NOW_STATUS = |
| Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); |
| |
| @VisibleForTesting |
| static final Status SHUTDOWN_STATUS = |
| Status.UNAVAILABLE.withDescription("Channel shutdown invoked"); |
| |
| @VisibleForTesting |
| static final Status SUBCHANNEL_SHUTDOWN_STATUS = |
| Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked"); |
| |
| private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG = |
| ManagedChannelServiceConfig.empty(); |
| private static final InternalConfigSelector INITIAL_PENDING_SELECTOR = |
| new InternalConfigSelector() { |
| @Override |
| public Result selectConfig(PickSubchannelArgs args) { |
| throw new IllegalStateException("Resolution is pending"); |
| } |
| }; |
| |
| private final InternalLogId logId; |
| private final String target; |
| @Nullable |
| private final String authorityOverride; |
| private final NameResolverRegistry nameResolverRegistry; |
| private final NameResolver.Factory nameResolverFactory; |
| private final NameResolver.Args nameResolverArgs; |
| private final AutoConfiguredLoadBalancerFactory loadBalancerFactory; |
| private final ClientTransportFactory originalTransportFactory; |
| @Nullable |
| private final ChannelCredentials originalChannelCreds; |
| private final ClientTransportFactory transportFactory; |
| private final ClientTransportFactory oobTransportFactory; |
| private final RestrictedScheduledExecutor scheduledExecutor; |
| private final Executor executor; |
| private final ObjectPool<? extends Executor> executorPool; |
| private final ObjectPool<? extends Executor> balancerRpcExecutorPool; |
| private final ExecutorHolder balancerRpcExecutorHolder; |
| private final ExecutorHolder offloadExecutorHolder; |
| private final TimeProvider timeProvider; |
| private final int maxTraceEvents; |
| |
| @VisibleForTesting |
| final SynchronizationContext syncContext = new SynchronizationContext( |
| new Thread.UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| logger.log( |
| Level.SEVERE, |
| "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!", |
| e); |
| panic(e); |
| } |
| }); |
| |
| private boolean fullStreamDecompression; |
| |
| private final DecompressorRegistry decompressorRegistry; |
| private final CompressorRegistry compressorRegistry; |
| |
| private final Supplier<Stopwatch> stopwatchSupplier; |
| /** The timout before entering idle mode. */ |
| private final long idleTimeoutMillis; |
| |
| private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager(); |
| private final BackoffPolicy.Provider backoffPolicyProvider; |
| |
| /** |
| * We delegate to this channel, so that we can have interceptors as necessary. If there aren't |
| * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a |
| * {@link RealChannel}. |
| */ |
| private final Channel interceptorChannel; |
| @Nullable private final String userAgent; |
| |
| // Only null after channel is terminated. Must be assigned from the syncContext. |
| private NameResolver nameResolver; |
| |
| // Must be accessed from the syncContext. |
| private boolean nameResolverStarted; |
| |
| // null when channel is in idle mode. Must be assigned from syncContext. |
| @Nullable |
| private LbHelperImpl lbHelper; |
| |
| // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext. |
| // null if channel is in idle mode. |
| @Nullable |
| private volatile SubchannelPicker subchannelPicker; |
| |
| // Must be accessed from the syncContext |
| private boolean panicMode; |
| |
| // Must be mutated from syncContext |
| // If any monitoring hook to be added later needs to get a snapshot of this Set, we could |
| // switch to a ConcurrentHashMap. |
| private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f); |
| |
| // Must be accessed from syncContext |
| @Nullable |
| private Collection<RealChannel.PendingCall<?, ?>> pendingCalls; |
| private final Object pendingCallsInUseObject = new Object(); |
| |
| // Must be mutated from syncContext |
| private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f); |
| |
| // reprocess() must be run from syncContext |
| private final DelayedClientTransport delayedTransport; |
| private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry |
| = new UncommittedRetriableStreamsRegistry(); |
| |
| // Shutdown states. |
| // |
| // Channel's shutdown process: |
| // 1. shutdown(): stop accepting new calls from applications |
| // 1a shutdown <- true |
| // 1b subchannelPicker <- null |
| // 1c delayedTransport.shutdown() |
| // 2. delayedTransport terminated: stop stream-creation functionality |
| // 2a terminating <- true |
| // 2b loadBalancer.shutdown() |
| // * LoadBalancer will shutdown subchannels and OOB channels |
| // 2c loadBalancer <- null |
| // 2d nameResolver.shutdown() |
| // 2e nameResolver <- null |
| // 3. All subchannels and OOB channels terminated: Channel considered terminated |
| |
| private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| // Must only be mutated and read from syncContext |
| private boolean shutdownNowed; |
| // Must only be mutated from syncContext |
| private boolean terminating; |
| // Must be mutated from syncContext |
| private volatile boolean terminated; |
| private final CountDownLatch terminatedLatch = new CountDownLatch(1); |
| |
| private final CallTracer.Factory callTracerFactory; |
| private final CallTracer channelCallTracer; |
| private final ChannelTracer channelTracer; |
| private final ChannelLogger channelLogger; |
| private final InternalChannelz channelz; |
| private final RealChannel realChannel; |
| // Must be mutated and read from syncContext |
| // a flag for doing channel tracing when flipped |
| private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION; |
| // Must be mutated and read from constructor or syncContext |
| // used for channel tracing when value changed |
| private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG; |
| |
| @Nullable |
| private final ManagedChannelServiceConfig defaultServiceConfig; |
| // Must be mutated and read from constructor or syncContext |
| private boolean serviceConfigUpdated = false; |
| private final boolean lookUpServiceConfig; |
| |
| // One instance per channel. |
| private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); |
| |
| private final long perRpcBufferLimit; |
| private final long channelBufferLimit; |
| |
| // Temporary false flag that can skip the retry code path. |
| private final boolean retryEnabled; |
| |
| // Called from syncContext |
| private final ManagedClientTransport.Listener delayedTransportListener = |
| new DelayedTransportListener(); |
| |
| // Must be called from syncContext |
| private void maybeShutdownNowSubchannels() { |
| if (shutdownNowed) { |
| for (InternalSubchannel subchannel : subchannels) { |
| subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); |
| } |
| for (OobChannel oobChannel : oobChannels) { |
| oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS); |
| } |
| } |
| } |
| |
| // Must be accessed from syncContext |
| @VisibleForTesting |
| final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator(); |
| |
| @Override |
| public ListenableFuture<ChannelStats> getStats() { |
| final SettableFuture<ChannelStats> ret = SettableFuture.create(); |
| final class StatsFetcher implements Runnable { |
| @Override |
| public void run() { |
| ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder(); |
| channelCallTracer.updateBuilder(builder); |
| channelTracer.updateBuilder(builder); |
| builder.setTarget(target).setState(channelStateManager.getState()); |
| List<InternalWithLogId> children = new ArrayList<>(); |
| children.addAll(subchannels); |
| children.addAll(oobChannels); |
| builder.setSubchannels(children); |
| ret.set(builder.build()); |
| } |
| } |
| |
| // subchannels and oobchannels can only be accessed from syncContext |
| syncContext.execute(new StatsFetcher()); |
| return ret; |
| } |
| |
| @Override |
| public InternalLogId getLogId() { |
| return logId; |
| } |
| |
| // Run from syncContext |
| private class IdleModeTimer implements Runnable { |
| |
| @Override |
| public void run() { |
| // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after |
| // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too |
| // subtle to change rapidly to resolve the channel panic. See #8714 |
| if (lbHelper == null) { |
| return; |
| } |
| enterIdleMode(); |
| } |
| } |
| |
| // Must be called from syncContext |
| private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| if (channelIsActive) { |
| checkState(nameResolverStarted, "nameResolver is not started"); |
| checkState(lbHelper != null, "lbHelper is null"); |
| } |
| if (nameResolver != null) { |
| nameResolver.shutdown(); |
| nameResolverStarted = false; |
| if (channelIsActive) { |
| nameResolver = getNameResolver( |
| target, authorityOverride, nameResolverFactory, nameResolverArgs); |
| } else { |
| nameResolver = null; |
| } |
| } |
| if (lbHelper != null) { |
| lbHelper.lb.shutdown(); |
| lbHelper = null; |
| } |
| subchannelPicker = null; |
| } |
| |
| /** |
| * Make the channel exit idle mode, if it's in it. |
| * |
| * <p>Must be called from syncContext |
| */ |
| @VisibleForTesting |
| void exitIdleMode() { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| if (shutdown.get() || panicMode) { |
| return; |
| } |
| if (inUseStateAggregator.isInUse()) { |
| // Cancel the timer now, so that a racing due timer will not put Channel on idleness |
| // when the caller of exitIdleMode() is about to use the returned loadBalancer. |
| cancelIdleTimer(false); |
| } else { |
| // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while |
| // isInUse() == false, in which case we still need to schedule the timer. |
| rescheduleIdleTimer(); |
| } |
| if (lbHelper != null) { |
| return; |
| } |
| channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode"); |
| LbHelperImpl lbHelper = new LbHelperImpl(); |
| lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); |
| // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and |
| // may throw. We don't want to confuse our state, even if we will enter panic mode. |
| this.lbHelper = lbHelper; |
| |
| NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver); |
| nameResolver.start(listener); |
| nameResolverStarted = true; |
| } |
| |
| // Must be run from syncContext |
| private void enterIdleMode() { |
| // nameResolver and loadBalancer are guaranteed to be non-null. If any of them were null, |
| // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() |
| // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of |
| // which are bugs. |
| shutdownNameResolverAndLoadBalancer(true); |
| delayedTransport.reprocess(null); |
| channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state"); |
| channelStateManager.gotoState(IDLE); |
| // If the inUseStateAggregator still considers pending calls to be queued up or the delayed |
| // transport to be holding some we need to exit idle mode to give these calls a chance to |
| // be processed. |
| if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) { |
| exitIdleMode(); |
| } |
| } |
| |
| // Must be run from syncContext |
| private void cancelIdleTimer(boolean permanent) { |
| idleTimer.cancel(permanent); |
| } |
| |
| // Always run from syncContext |
| private void rescheduleIdleTimer() { |
| if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { |
| return; |
| } |
| idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * Force name resolution refresh to happen immediately. Must be run |
| * from syncContext. |
| */ |
| private void refreshNameResolution() { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| if (nameResolverStarted) { |
| nameResolver.refresh(); |
| } |
| } |
| |
| private final class ChannelStreamProvider implements ClientStreamProvider { |
| volatile Throttle throttle; |
| |
| private ClientTransport getTransport(PickSubchannelArgs args) { |
| SubchannelPicker pickerCopy = subchannelPicker; |
| if (shutdown.get()) { |
| // If channel is shut down, delayedTransport is also shut down which will fail the stream |
| // properly. |
| return delayedTransport; |
| } |
| if (pickerCopy == null) { |
| final class ExitIdleModeForTransport implements Runnable { |
| @Override |
| public void run() { |
| exitIdleMode(); |
| } |
| } |
| |
| syncContext.execute(new ExitIdleModeForTransport()); |
| return delayedTransport; |
| } |
| // There is no need to reschedule the idle timer here. |
| // |
| // pickerCopy != null, which means idle timer has not expired when this method starts. |
| // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer |
| // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after |
| // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. |
| // |
| // In most cases the idle timer is scheduled to fire after the transport has created the |
| // stream, which would have reported in-use state to the channel that would have cancelled |
| // the idle timer. |
| PickResult pickResult = pickerCopy.pickSubchannel(args); |
| ClientTransport transport = GrpcUtil.getTransportFromPickResult( |
| pickResult, args.getCallOptions().isWaitForReady()); |
| if (transport != null) { |
| return transport; |
| } |
| return delayedTransport; |
| } |
| |
| @Override |
| public ClientStream newStream( |
| final MethodDescriptor<?, ?> method, |
| final CallOptions callOptions, |
| final Metadata headers, |
| final Context context) { |
| if (!retryEnabled) { |
| ClientTransport transport = |
| getTransport(new PickSubchannelArgsImpl(method, headers, callOptions)); |
| Context origContext = context.attach(); |
| ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( |
| callOptions, headers, 0, /* isTransparentRetry= */ false); |
| try { |
| return transport.newStream(method, headers, callOptions, tracers); |
| } finally { |
| context.detach(origContext); |
| } |
| } else { |
| MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY); |
| final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy; |
| final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy; |
| final class RetryStream<ReqT> extends RetriableStream<ReqT> { |
| @SuppressWarnings("unchecked") |
| RetryStream() { |
| super( |
| (MethodDescriptor<ReqT, ?>) method, |
| headers, |
| channelBufferUsed, |
| perRpcBufferLimit, |
| channelBufferLimit, |
| getCallExecutor(callOptions), |
| transportFactory.getScheduledExecutorService(), |
| retryPolicy, |
| hedgingPolicy, |
| throttle); |
| } |
| |
| @Override |
| Status prestart() { |
| return uncommittedRetriableStreamsRegistry.add(this); |
| } |
| |
| @Override |
| void postCommit() { |
| uncommittedRetriableStreamsRegistry.remove(this); |
| } |
| |
| @Override |
| ClientStream newSubstream( |
| Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts, |
| boolean isTransparentRetry) { |
| CallOptions newOptions = callOptions.withStreamTracerFactory(factory); |
| ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( |
| newOptions, newHeaders, previousAttempts, isTransparentRetry); |
| ClientTransport transport = |
| getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); |
| Context origContext = context.attach(); |
| try { |
| return transport.newStream(method, newHeaders, newOptions, tracers); |
| } finally { |
| context.detach(origContext); |
| } |
| } |
| } |
| |
| return new RetryStream<>(); |
| } |
| } |
| } |
| |
| private final ChannelStreamProvider transportProvider = new ChannelStreamProvider(); |
| |
| private final Rescheduler idleTimer; |
| |
| ManagedChannelImpl( |
| ManagedChannelImplBuilder builder, |
| ClientTransportFactory clientTransportFactory, |
| BackoffPolicy.Provider backoffPolicyProvider, |
| ObjectPool<? extends Executor> balancerRpcExecutorPool, |
| Supplier<Stopwatch> stopwatchSupplier, |
| List<ClientInterceptor> interceptors, |
| final TimeProvider timeProvider) { |
| this.target = checkNotNull(builder.target, "target"); |
| this.logId = InternalLogId.allocate("Channel", target); |
| this.timeProvider = checkNotNull(timeProvider, "timeProvider"); |
| this.executorPool = checkNotNull(builder.executorPool, "executorPool"); |
| this.executor = checkNotNull(executorPool.getObject(), "executor"); |
| this.originalChannelCreds = builder.channelCredentials; |
| this.originalTransportFactory = clientTransportFactory; |
| this.offloadExecutorHolder = |
| new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool")); |
| this.transportFactory = new CallCredentialsApplyingTransportFactory( |
| clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder); |
| this.oobTransportFactory = new CallCredentialsApplyingTransportFactory( |
| clientTransportFactory, null, this.offloadExecutorHolder); |
| this.scheduledExecutor = |
| new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService()); |
| maxTraceEvents = builder.maxTraceEvents; |
| channelTracer = new ChannelTracer( |
| logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(), |
| "Channel for '" + target + "'"); |
| channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider); |
| ProxyDetector proxyDetector = |
| builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR; |
| this.retryEnabled = builder.retryEnabled; |
| this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy); |
| this.nameResolverRegistry = builder.nameResolverRegistry; |
| ScParser serviceConfigParser = |
| new ScParser( |
| retryEnabled, |
| builder.maxRetryAttempts, |
| builder.maxHedgedAttempts, |
| loadBalancerFactory); |
| this.authorityOverride = builder.authorityOverride; |
| this.nameResolverArgs = |
| NameResolver.Args.newBuilder() |
| .setDefaultPort(builder.getDefaultPort()) |
| .setProxyDetector(proxyDetector) |
| .setSynchronizationContext(syncContext) |
| .setScheduledExecutorService(scheduledExecutor) |
| .setServiceConfigParser(serviceConfigParser) |
| .setChannelLogger(channelLogger) |
| .setOffloadExecutor(this.offloadExecutorHolder) |
| .setOverrideAuthority(this.authorityOverride) |
| .build(); |
| this.nameResolverFactory = builder.nameResolverFactory; |
| this.nameResolver = getNameResolver( |
| target, authorityOverride, nameResolverFactory, nameResolverArgs); |
| this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); |
| this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); |
| this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext); |
| this.delayedTransport.start(delayedTransportListener); |
| this.backoffPolicyProvider = backoffPolicyProvider; |
| |
| if (builder.defaultServiceConfig != null) { |
| ConfigOrError parsedDefaultServiceConfig = |
| serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig); |
| checkState( |
| parsedDefaultServiceConfig.getError() == null, |
| "Default config is invalid: %s", |
| parsedDefaultServiceConfig.getError()); |
| this.defaultServiceConfig = |
| (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig(); |
| this.lastServiceConfig = this.defaultServiceConfig; |
| } else { |
| this.defaultServiceConfig = null; |
| } |
| this.lookUpServiceConfig = builder.lookUpServiceConfig; |
| realChannel = new RealChannel(nameResolver.getServiceAuthority()); |
| Channel channel = realChannel; |
| if (builder.binlog != null) { |
| channel = builder.binlog.wrapChannel(channel); |
| } |
| this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors); |
| this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); |
| if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { |
| this.idleTimeoutMillis = builder.idleTimeoutMillis; |
| } else { |
| checkArgument( |
| builder.idleTimeoutMillis |
| >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS, |
| "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis); |
| this.idleTimeoutMillis = builder.idleTimeoutMillis; |
| } |
| |
| idleTimer = new Rescheduler( |
| new IdleModeTimer(), |
| syncContext, |
| transportFactory.getScheduledExecutorService(), |
| stopwatchSupplier.get()); |
| this.fullStreamDecompression = builder.fullStreamDecompression; |
| this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry"); |
| this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry"); |
| this.userAgent = builder.userAgent; |
| |
| this.channelBufferLimit = builder.retryBufferSize; |
| this.perRpcBufferLimit = builder.perRpcBufferLimit; |
| final class ChannelCallTracerFactory implements CallTracer.Factory { |
| @Override |
| public CallTracer create() { |
| return new CallTracer(timeProvider); |
| } |
| } |
| |
| this.callTracerFactory = new ChannelCallTracerFactory(); |
| channelCallTracer = callTracerFactory.create(); |
| this.channelz = checkNotNull(builder.channelz); |
| channelz.addRootChannel(this); |
| |
| if (!lookUpServiceConfig) { |
| if (defaultServiceConfig != null) { |
| channelLogger.log( |
| ChannelLogLevel.INFO, "Service config look-up disabled, using default service config"); |
| } |
| serviceConfigUpdated = true; |
| } |
| } |
| |
| private static NameResolver getNameResolver( |
| String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) { |
| // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending |
| // "dns:///". |
| URI targetUri = null; |
| StringBuilder uriSyntaxErrors = new StringBuilder(); |
| try { |
| targetUri = new URI(target); |
| // For "localhost:8080" this would likely cause newNameResolver to return null, because |
| // "localhost" is parsed as the scheme. Will fall into the next branch and try |
| // "dns:///localhost:8080". |
| } catch (URISyntaxException e) { |
| // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234. |
| uriSyntaxErrors.append(e.getMessage()); |
| } |
| if (targetUri != null) { |
| NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs); |
| if (resolver != null) { |
| return resolver; |
| } |
| // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an |
| // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080" |
| } |
| |
| // If we reached here, the targetUri couldn't be used. |
| if (!URI_PATTERN.matcher(target).matches()) { |
| // It doesn't look like a URI target. Maybe it's an authority string. Try with the default |
| // scheme from the factory. |
| try { |
| targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null); |
| } catch (URISyntaxException e) { |
| // Should not be possible. |
| throw new IllegalArgumentException(e); |
| } |
| NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs); |
| if (resolver != null) { |
| return resolver; |
| } |
| } |
| throw new IllegalArgumentException(String.format( |
| "cannot find a NameResolver for %s%s", |
| target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : "")); |
| } |
| |
| @VisibleForTesting |
| static NameResolver getNameResolver( |
| String target, @Nullable final String overrideAuthority, |
| NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) { |
| NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); |
| if (overrideAuthority == null) { |
| return resolver; |
| } |
| |
| // If the nameResolver is not already a RetryingNameResolver, then wrap it with it. |
| // This helps guarantee that name resolution retry remains supported even as it has been |
| // removed from ManagedChannelImpl. |
| // TODO: After a transition period, all NameResolver implementations that need retry should use |
| // RetryingNameResolver directly and this step can be removed. |
| NameResolver usedNameResolver; |
| if (resolver instanceof RetryingNameResolver) { |
| usedNameResolver = resolver; |
| } else { |
| usedNameResolver = new RetryingNameResolver(resolver, |
| new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(), |
| nameResolverArgs.getScheduledExecutorService(), |
| nameResolverArgs.getSynchronizationContext()), |
| nameResolverArgs.getSynchronizationContext()); |
| } |
| |
| return new ForwardingNameResolver(usedNameResolver) { |
| @Override |
| public String getServiceAuthority() { |
| return overrideAuthority; |
| } |
| }; |
| } |
| |
| @VisibleForTesting |
| InternalConfigSelector getConfigSelector() { |
| return realChannel.configSelector.get(); |
| } |
| |
| /** |
| * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately |
| * cancelled. |
| */ |
| @Override |
| public ManagedChannelImpl shutdown() { |
| channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called"); |
| if (!shutdown.compareAndSet(false, true)) { |
| return this; |
| } |
| final class Shutdown implements Runnable { |
| @Override |
| public void run() { |
| channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state"); |
| channelStateManager.gotoState(SHUTDOWN); |
| } |
| } |
| |
| syncContext.execute(new Shutdown()); |
| realChannel.shutdown(); |
| final class CancelIdleTimer implements Runnable { |
| @Override |
| public void run() { |
| cancelIdleTimer(/* permanent= */ true); |
| } |
| } |
| |
| syncContext.execute(new CancelIdleTimer()); |
| return this; |
| } |
| |
| /** |
| * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although |
| * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely |
| * return {@code false} immediately after this method returns. |
| */ |
| @Override |
| public ManagedChannelImpl shutdownNow() { |
| channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called"); |
| shutdown(); |
| realChannel.shutdownNow(); |
| final class ShutdownNow implements Runnable { |
| @Override |
| public void run() { |
| if (shutdownNowed) { |
| return; |
| } |
| shutdownNowed = true; |
| maybeShutdownNowSubchannels(); |
| } |
| } |
| |
| syncContext.execute(new ShutdownNow()); |
| return this; |
| } |
| |
| // Called from syncContext |
| @VisibleForTesting |
| void panic(final Throwable t) { |
| if (panicMode) { |
| // Preserve the first panic information |
| return; |
| } |
| panicMode = true; |
| cancelIdleTimer(/* permanent= */ true); |
| shutdownNameResolverAndLoadBalancer(false); |
| final class PanicSubchannelPicker extends SubchannelPicker { |
| private final PickResult panicPickResult = |
| PickResult.withDrop( |
| Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); |
| |
| @Override |
| public PickResult pickSubchannel(PickSubchannelArgs args) { |
| return panicPickResult; |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(PanicSubchannelPicker.class) |
| .add("panicPickResult", panicPickResult) |
| .toString(); |
| } |
| } |
| |
| updateSubchannelPicker(new PanicSubchannelPicker()); |
| realChannel.updateConfigSelector(null); |
| channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE"); |
| channelStateManager.gotoState(TRANSIENT_FAILURE); |
| } |
| |
| @VisibleForTesting |
| boolean isInPanicMode() { |
| return panicMode; |
| } |
| |
| // Called from syncContext |
| private void updateSubchannelPicker(SubchannelPicker newPicker) { |
| subchannelPicker = newPicker; |
| delayedTransport.reprocess(newPicker); |
| } |
| |
| @Override |
| public boolean isShutdown() { |
| return shutdown.get(); |
| } |
| |
| @Override |
| public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { |
| return terminatedLatch.await(timeout, unit); |
| } |
| |
| @Override |
| public boolean isTerminated() { |
| return terminated; |
| } |
| |
| /* |
| * Creates a new outgoing call on the channel. |
| */ |
| @Override |
| public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, |
| CallOptions callOptions) { |
| return interceptorChannel.newCall(method, callOptions); |
| } |
| |
| @Override |
| public String authority() { |
| return interceptorChannel.authority(); |
| } |
| |
| private Executor getCallExecutor(CallOptions callOptions) { |
| Executor executor = callOptions.getExecutor(); |
| if (executor == null) { |
| executor = this.executor; |
| } |
| return executor; |
| } |
| |
| private class RealChannel extends Channel { |
| // Reference to null if no config selector is available from resolution result |
| // Reference must be set() from syncContext |
| private final AtomicReference<InternalConfigSelector> configSelector = |
| new AtomicReference<>(INITIAL_PENDING_SELECTOR); |
| // Set when the NameResolver is initially created. When we create a new NameResolver for the |
| // same target, the new instance must have the same value. |
| private final String authority; |
| |
| private final Channel clientCallImplChannel = new Channel() { |
| @Override |
| public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( |
| MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) { |
| return new ClientCallImpl<>( |
| method, |
| getCallExecutor(callOptions), |
| callOptions, |
| transportProvider, |
| terminated ? null : transportFactory.getScheduledExecutorService(), |
| channelCallTracer, |
| null) |
| .setFullStreamDecompression(fullStreamDecompression) |
| .setDecompressorRegistry(decompressorRegistry) |
| .setCompressorRegistry(compressorRegistry); |
| } |
| |
| @Override |
| public String authority() { |
| return authority; |
| } |
| }; |
| |
| private RealChannel(String authority) { |
| this.authority = checkNotNull(authority, "authority"); |
| } |
| |
| @Override |
| public <ReqT, RespT> ClientCall<ReqT, RespT> newCall( |
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { |
| if (configSelector.get() != INITIAL_PENDING_SELECTOR) { |
| return newClientCall(method, callOptions); |
| } |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| exitIdleMode(); |
| } |
| }); |
| if (configSelector.get() != INITIAL_PENDING_SELECTOR) { |
| // This is an optimization for the case (typically with InProcessTransport) when name |
| // resolution result is immediately available at this point. Otherwise, some users' |
| // tests might observe slight behavior difference from earlier grpc versions. |
| return newClientCall(method, callOptions); |
| } |
| if (shutdown.get()) { |
| // Return a failing ClientCall. |
| return new ClientCall<ReqT, RespT>() { |
| @Override |
| public void start(Listener<RespT> responseListener, Metadata headers) { |
| responseListener.onClose(SHUTDOWN_STATUS, new Metadata()); |
| } |
| |
| @Override public void request(int numMessages) {} |
| |
| @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {} |
| |
| @Override public void halfClose() {} |
| |
| @Override public void sendMessage(ReqT message) {} |
| }; |
| } |
| Context context = Context.current(); |
| final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions); |
| syncContext.execute(new Runnable() { |
| @Override |
| public void run() { |
| if (configSelector.get() == INITIAL_PENDING_SELECTOR) { |
| if (pendingCalls == null) { |
| pendingCalls = new LinkedHashSet<>(); |
| inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true); |
| } |
| pendingCalls.add(pendingCall); |
| } else { |
| pendingCall.reprocess(); |
| } |
| } |
| }); |
| return pendingCall; |
| } |
| |
| // Must run in SynchronizationContext. |
| void updateConfigSelector(@Nullable InternalConfigSelector config) { |
| InternalConfigSelector prevConfig = configSelector.get(); |
| configSelector.set(config); |
| if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) { |
| for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) { |
| pendingCall.reprocess(); |
| } |
| } |
| } |
| |
| // Must run in SynchronizationContext. |
| void onConfigError() { |
| if (configSelector.get() == INITIAL_PENDING_SELECTOR) { |
| updateConfigSelector(null); |
| } |
| } |
| |
| void shutdown() { |
| final class RealChannelShutdown implements Runnable { |
| @Override |
| public void run() { |
| if (pendingCalls == null) { |
| if (configSelector.get() == INITIAL_PENDING_SELECTOR) { |
| configSelector.set(null); |
| } |
| uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); |
| } |
| } |
| } |
| |
| syncContext.execute(new RealChannelShutdown()); |
| } |
| |
| void shutdownNow() { |
| final class RealChannelShutdownNow implements Runnable { |
| @Override |
| public void run() { |
| if (configSelector.get() == INITIAL_PENDING_SELECTOR) { |
| configSelector.set(null); |
| } |
| if (pendingCalls != null) { |
| for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) { |
| pendingCall.cancel("Channel is forcefully shutdown", null); |
| } |
| } |
| uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); |
| } |
| } |
| |
| syncContext.execute(new RealChannelShutdownNow()); |
| } |
| |
| @Override |
| public String authority() { |
| return authority; |
| } |
| |
| private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> { |
| final Context context; |
| final MethodDescriptor<ReqT, RespT> method; |
| final CallOptions callOptions; |
| |
| PendingCall( |
| Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { |
| super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline()); |
| this.context = context; |
| this.method = method; |
| this.callOptions = callOptions; |
| } |
| |
| /** Called when it's ready to create a real call and reprocess the pending call. */ |
| void reprocess() { |
| ClientCall<ReqT, RespT> realCall; |
| Context previous = context.attach(); |
| try { |
| CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true); |
| realCall = newClientCall(method, delayResolutionOption); |
| } finally { |
| context.detach(previous); |
| } |
| Runnable toRun = setCall(realCall); |
| if (toRun == null) { |
| syncContext.execute(new PendingCallRemoval()); |
| } else { |
| getCallExecutor(callOptions).execute(new Runnable() { |
| @Override |
| public void run() { |
| toRun.run(); |
| syncContext.execute(new PendingCallRemoval()); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| protected void callCancelled() { |
| super.callCancelled(); |
| syncContext.execute(new PendingCallRemoval()); |
| } |
| |
| final class PendingCallRemoval implements Runnable { |
| @Override |
| public void run() { |
| if (pendingCalls != null) { |
| pendingCalls.remove(PendingCall.this); |
| if (pendingCalls.isEmpty()) { |
| inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false); |
| pendingCalls = null; |
| if (shutdown.get()) { |
| uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall( |
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { |
| InternalConfigSelector selector = configSelector.get(); |
| if (selector == null) { |
| return clientCallImplChannel.newCall(method, callOptions); |
| } |
| if (selector instanceof ServiceConfigConvertedSelector) { |
| MethodInfo methodInfo = |
| ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method); |
| if (methodInfo != null) { |
| callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo); |
| } |
| return clientCallImplChannel.newCall(method, callOptions); |
| } |
| return new ConfigSelectingClientCall<>( |
| selector, clientCallImplChannel, executor, method, callOptions); |
| } |
| } |
| |
| /** |
| * A client call for a given channel that applies a given config selector when it starts. |
| */ |
| static final class ConfigSelectingClientCall<ReqT, RespT> |
| extends ForwardingClientCall<ReqT, RespT> { |
| |
| private final InternalConfigSelector configSelector; |
| private final Channel channel; |
| private final Executor callExecutor; |
| private final MethodDescriptor<ReqT, RespT> method; |
| private final Context context; |
| private CallOptions callOptions; |
| |
| private ClientCall<ReqT, RespT> delegate; |
| |
| ConfigSelectingClientCall( |
| InternalConfigSelector configSelector, Channel channel, Executor channelExecutor, |
| MethodDescriptor<ReqT, RespT> method, |
| CallOptions callOptions) { |
| this.configSelector = configSelector; |
| this.channel = channel; |
| this.method = method; |
| this.callExecutor = |
| callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor(); |
| this.callOptions = callOptions.withExecutor(callExecutor); |
| this.context = Context.current(); |
| } |
| |
| @Override |
| protected ClientCall<ReqT, RespT> delegate() { |
| return delegate; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void start(Listener<RespT> observer, Metadata headers) { |
| PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); |
| InternalConfigSelector.Result result = configSelector.selectConfig(args); |
| Status status = result.getStatus(); |
| if (!status.isOk()) { |
| executeCloseObserverInContext(observer, |
| GrpcUtil.replaceInappropriateControlPlaneStatus(status)); |
| delegate = (ClientCall<ReqT, RespT>) NOOP_CALL; |
| return; |
| } |
| ClientInterceptor interceptor = result.getInterceptor(); |
| ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig(); |
| MethodInfo methodInfo = config.getMethodConfig(method); |
| if (methodInfo != null) { |
| callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo); |
| } |
| if (interceptor != null) { |
| delegate = interceptor.interceptCall(method, callOptions, channel); |
| } else { |
| delegate = channel.newCall(method, callOptions); |
| } |
| delegate.start(observer, headers); |
| } |
| |
| private void executeCloseObserverInContext( |
| final Listener<RespT> observer, final Status status) { |
| class CloseInContext extends ContextRunnable { |
| CloseInContext() { |
| super(context); |
| } |
| |
| @Override |
| public void runInContext() { |
| observer.onClose(status, new Metadata()); |
| } |
| } |
| |
| callExecutor.execute(new CloseInContext()); |
| } |
| |
| @Override |
| public void cancel(@Nullable String message, @Nullable Throwable cause) { |
| if (delegate != null) { |
| delegate.cancel(message, cause); |
| } |
| } |
| } |
| |
| private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() { |
| @Override |
| public void start(Listener<Object> responseListener, Metadata headers) {} |
| |
| @Override |
| public void request(int numMessages) {} |
| |
| @Override |
| public void cancel(String message, Throwable cause) {} |
| |
| @Override |
| public void halfClose() {} |
| |
| @Override |
| public void sendMessage(Object message) {} |
| |
| // Always returns {@code false}, since this is only used when the startup of the call fails. |
| @Override |
| public boolean isReady() { |
| return false; |
| } |
| }; |
| |
| /** |
| * Terminate the channel if termination conditions are met. |
| */ |
| // Must be run from syncContext |
| private void maybeTerminateChannel() { |
| if (terminated) { |
| return; |
| } |
| if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) { |
| channelLogger.log(ChannelLogLevel.INFO, "Terminated"); |
| channelz.removeRootChannel(this); |
| executorPool.returnObject(executor); |
| balancerRpcExecutorHolder.release(); |
| offloadExecutorHolder.release(); |
| // Release the transport factory so that it can deallocate any resources. |
| transportFactory.close(); |
| |
| terminated = true; |
| terminatedLatch.countDown(); |
| } |
| } |
| |
| // Must be called from syncContext |
| private void handleInternalSubchannelState(ConnectivityStateInfo newState) { |
| if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { |
| refreshNameResolution(); |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public ConnectivityState getState(boolean requestConnection) { |
| ConnectivityState savedChannelState = channelStateManager.getState(); |
| if (requestConnection && savedChannelState == IDLE) { |
| final class RequestConnection implements Runnable { |
| @Override |
| public void run() { |
| exitIdleMode(); |
| if (subchannelPicker != null) { |
| subchannelPicker.requestConnection(); |
| } |
| if (lbHelper != null) { |
| lbHelper.lb.requestConnection(); |
| } |
| } |
| } |
| |
| syncContext.execute(new RequestConnection()); |
| } |
| return savedChannelState; |
| } |
| |
| @Override |
| public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) { |
| final class NotifyStateChanged implements Runnable { |
| @Override |
| public void run() { |
| channelStateManager.notifyWhenStateChanged(callback, executor, source); |
| } |
| } |
| |
| syncContext.execute(new NotifyStateChanged()); |
| } |
| |
| @Override |
| public void resetConnectBackoff() { |
| final class ResetConnectBackoff implements Runnable { |
| @Override |
| public void run() { |
| if (shutdown.get()) { |
| return; |
| } |
| if (nameResolverStarted) { |
| refreshNameResolution(); |
| } |
| for (InternalSubchannel subchannel : subchannels) { |
| subchannel.resetConnectBackoff(); |
| } |
| for (OobChannel oobChannel : oobChannels) { |
| oobChannel.resetConnectBackoff(); |
| } |
| } |
| } |
| |
| syncContext.execute(new ResetConnectBackoff()); |
| } |
| |
| @Override |
| public void enterIdle() { |
| final class PrepareToLoseNetworkRunnable implements Runnable { |
| @Override |
| public void run() { |
| if (shutdown.get() || lbHelper == null) { |
| return; |
| } |
| cancelIdleTimer(/* permanent= */ false); |
| enterIdleMode(); |
| } |
| } |
| |
| syncContext.execute(new PrepareToLoseNetworkRunnable()); |
| } |
| |
| /** |
| * A registry that prevents channel shutdown from killing existing retry attempts that are in |
| * backoff. |
| */ |
| private final class UncommittedRetriableStreamsRegistry { |
| // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream, |
| // it's worthwhile to look for a lock-free approach. |
| final Object lock = new Object(); |
| |
| @GuardedBy("lock") |
| Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>(); |
| |
| @GuardedBy("lock") |
| Status shutdownStatus; |
| |
| void onShutdown(Status reason) { |
| boolean shouldShutdownDelayedTransport = false; |
| synchronized (lock) { |
| if (shutdownStatus != null) { |
| return; |
| } |
| shutdownStatus = reason; |
| // Keep the delayedTransport open until there is no more uncommitted streams, b/c those |
| // retriable streams, which may be in backoff and not using any transport, are already |
| // started RPCs. |
| if (uncommittedRetriableStreams.isEmpty()) { |
| shouldShutdownDelayedTransport = true; |
| } |
| } |
| |
| if (shouldShutdownDelayedTransport) { |
| delayedTransport.shutdown(reason); |
| } |
| } |
| |
| void onShutdownNow(Status reason) { |
| onShutdown(reason); |
| Collection<ClientStream> streams; |
| |
| synchronized (lock) { |
| streams = new ArrayList<>(uncommittedRetriableStreams); |
| } |
| |
| for (ClientStream stream : streams) { |
| stream.cancel(reason); |
| } |
| delayedTransport.shutdownNow(reason); |
| } |
| |
| /** |
| * Registers a RetriableStream and return null if not shutdown, otherwise just returns the |
| * shutdown Status. |
| */ |
| @Nullable |
| Status add(RetriableStream<?> retriableStream) { |
| synchronized (lock) { |
| if (shutdownStatus != null) { |
| return shutdownStatus; |
| } |
| uncommittedRetriableStreams.add(retriableStream); |
| return null; |
| } |
| } |
| |
| void remove(RetriableStream<?> retriableStream) { |
| Status shutdownStatusCopy = null; |
| |
| synchronized (lock) { |
| uncommittedRetriableStreams.remove(retriableStream); |
| if (uncommittedRetriableStreams.isEmpty()) { |
| shutdownStatusCopy = shutdownStatus; |
| // Because retriable transport is long-lived, we take this opportunity to down-size the |
| // hashmap. |
| uncommittedRetriableStreams = new HashSet<>(); |
| } |
| } |
| |
| if (shutdownStatusCopy != null) { |
| delayedTransport.shutdown(shutdownStatusCopy); |
| } |
| } |
| } |
| |
| private final class LbHelperImpl extends LoadBalancer.Helper { |
| AutoConfiguredLoadBalancer lb; |
| |
| @Override |
| public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| // No new subchannel should be created after load balancer has been shutdown. |
| checkState(!terminating, "Channel is being terminated"); |
| return new SubchannelImpl(args); |
| } |
| |
| @Override |
| public void updateBalancingState( |
| final ConnectivityState newState, final SubchannelPicker newPicker) { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| checkNotNull(newState, "newState"); |
| checkNotNull(newPicker, "newPicker"); |
| final class UpdateBalancingState implements Runnable { |
| @Override |
| public void run() { |
| if (LbHelperImpl.this != lbHelper) { |
| return; |
| } |
| updateSubchannelPicker(newPicker); |
| // It's not appropriate to report SHUTDOWN state from lb. |
| // Ignore the case of newState == SHUTDOWN for now. |
| if (newState != SHUTDOWN) { |
| channelLogger.log( |
| ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker); |
| channelStateManager.gotoState(newState); |
| } |
| } |
| } |
| |
| syncContext.execute(new UpdateBalancingState()); |
| } |
| |
| @Override |
| public void refreshNameResolution() { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| final class LoadBalancerRefreshNameResolution implements Runnable { |
| @Override |
| public void run() { |
| ManagedChannelImpl.this.refreshNameResolution(); |
| } |
| } |
| |
| syncContext.execute(new LoadBalancerRefreshNameResolution()); |
| } |
| |
| @Override |
| public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { |
| return createOobChannel(Collections.singletonList(addressGroup), authority); |
| } |
| |
| @Override |
| public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup, |
| String authority) { |
| // TODO(ejona): can we be even stricter? Like terminating? |
| checkState(!terminated, "Channel is terminated"); |
| long oobChannelCreationTime = timeProvider.currentTimeNanos(); |
| InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null); |
| InternalLogId subchannelLogId = |
| InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority); |
| ChannelTracer oobChannelTracer = |
| new ChannelTracer( |
| oobLogId, maxTraceEvents, oobChannelCreationTime, |
| "OobChannel for " + addressGroup); |
| final OobChannel oobChannel = new OobChannel( |
| authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(), |
| syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider); |
| channelTracer.reportEvent(new ChannelTrace.Event.Builder() |
| .setDescription("Child OobChannel created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(oobChannelCreationTime) |
| .setChannelRef(oobChannel) |
| .build()); |
| ChannelTracer subchannelTracer = |
| new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime, |
| "Subchannel for " + addressGroup); |
| ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider); |
| final class ManagedOobChannelCallback extends InternalSubchannel.Callback { |
| @Override |
| void onTerminated(InternalSubchannel is) { |
| oobChannels.remove(oobChannel); |
| channelz.removeSubchannel(is); |
| oobChannel.handleSubchannelTerminated(); |
| maybeTerminateChannel(); |
| } |
| |
| @Override |
| void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { |
| // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's |
| // state and refresh name resolution if necessary. |
| handleInternalSubchannelState(newState); |
| oobChannel.handleSubchannelStateChange(newState); |
| } |
| } |
| |
| final InternalSubchannel internalSubchannel = new InternalSubchannel( |
| addressGroup, |
| authority, userAgent, backoffPolicyProvider, oobTransportFactory, |
| oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext, |
| // All callback methods are run from syncContext |
| new ManagedOobChannelCallback(), |
| channelz, |
| callTracerFactory.create(), |
| subchannelTracer, |
| subchannelLogId, |
| subchannelLogger); |
| oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() |
| .setDescription("Child Subchannel created") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(oobChannelCreationTime) |
| .setSubchannelRef(internalSubchannel) |
| .build()); |
| channelz.addSubchannel(oobChannel); |
| channelz.addSubchannel(internalSubchannel); |
| oobChannel.setSubchannel(internalSubchannel); |
| final class AddOobChannel implements Runnable { |
| @Override |
| public void run() { |
| if (terminating) { |
| oobChannel.shutdown(); |
| } |
| if (!terminated) { |
| // If channel has not terminated, it will track the subchannel and block termination |
| // for it. |
| oobChannels.add(oobChannel); |
| } |
| } |
| } |
| |
| syncContext.execute(new AddOobChannel()); |
| return oobChannel; |
| } |
| |
| @Deprecated |
| @Override |
| public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) { |
| return createResolvingOobChannelBuilder(target, new DefaultChannelCreds()) |
| // Override authority to keep the old behavior. |
| // createResolvingOobChannelBuilder(String target) will be deleted soon. |
| .overrideAuthority(getAuthority()); |
| } |
| |
| // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated |
| // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz. |
| @Override |
| public ManagedChannelBuilder<?> createResolvingOobChannelBuilder( |
| final String target, final ChannelCredentials channelCreds) { |
| checkNotNull(channelCreds, "channelCreds"); |
| |
| final class ResolvingOobChannelBuilder |
| extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> { |
| final ManagedChannelBuilder<?> delegate; |
| |
| ResolvingOobChannelBuilder() { |
| final ClientTransportFactory transportFactory; |
| CallCredentials callCredentials; |
| if (channelCreds instanceof DefaultChannelCreds) { |
| transportFactory = originalTransportFactory; |
| callCredentials = null; |
| } else { |
| SwapChannelCredentialsResult swapResult = |
| originalTransportFactory.swapChannelCredentials(channelCreds); |
| if (swapResult == null) { |
| delegate = Grpc.newChannelBuilder(target, channelCreds); |
| return; |
| } else { |
| transportFactory = swapResult.transportFactory; |
| callCredentials = swapResult.callCredentials; |
| } |
| } |
| ClientTransportFactoryBuilder transportFactoryBuilder = |
| new ClientTransportFactoryBuilder() { |
| @Override |
| public ClientTransportFactory buildClientTransportFactory() { |
| return transportFactory; |
| } |
| }; |
| delegate = new ManagedChannelImplBuilder( |
| target, |
| channelCreds, |
| callCredentials, |
| transportFactoryBuilder, |
| new FixedPortProvider(nameResolverArgs.getDefaultPort())); |
| } |
| |
| @Override |
| protected ManagedChannelBuilder<?> delegate() { |
| return delegate; |
| } |
| } |
| |
| checkState(!terminated, "Channel is terminated"); |
| |
| @SuppressWarnings("deprecation") |
| ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder() |
| .nameResolverFactory(nameResolverFactory); |
| |
| return builder |
| // TODO(zdapeng): executors should not outlive the parent channel. |
| .executor(executor) |
| .offloadExecutor(offloadExecutorHolder.getExecutor()) |
| .maxTraceEvents(maxTraceEvents) |
| .proxyDetector(nameResolverArgs.getProxyDetector()) |
| .userAgent(userAgent); |
| } |
| |
| @Override |
| public ChannelCredentials getUnsafeChannelCredentials() { |
| if (originalChannelCreds == null) { |
| return new DefaultChannelCreds(); |
| } |
| return originalChannelCreds; |
| } |
| |
| @Override |
| public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { |
| updateOobChannelAddresses(channel, Collections.singletonList(eag)); |
| } |
| |
| @Override |
| public void updateOobChannelAddresses(ManagedChannel channel, |
| List<EquivalentAddressGroup> eag) { |
| checkArgument(channel instanceof OobChannel, |
| "channel must have been returned from createOobChannel"); |
| ((OobChannel) channel).updateAddresses(eag); |
| } |
| |
| @Override |
| public String getAuthority() { |
| return ManagedChannelImpl.this.authority(); |
| } |
| |
| @Override |
| public SynchronizationContext getSynchronizationContext() { |
| return syncContext; |
| } |
| |
| @Override |
| public ScheduledExecutorService getScheduledExecutorService() { |
| return scheduledExecutor; |
| } |
| |
| @Override |
| public ChannelLogger getChannelLogger() { |
| return channelLogger; |
| } |
| |
| @Override |
| public NameResolver.Args getNameResolverArgs() { |
| return nameResolverArgs; |
| } |
| |
| @Override |
| public NameResolverRegistry getNameResolverRegistry() { |
| return nameResolverRegistry; |
| } |
| |
| /** |
| * A placeholder for channel creds if user did not specify channel creds for the channel. |
| */ |
| // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null |
| // channel creds. |
| final class DefaultChannelCreds extends ChannelCredentials { |
| @Override |
| public ChannelCredentials withoutBearerTokens() { |
| return this; |
| } |
| } |
| } |
| |
| final class NameResolverListener extends NameResolver.Listener2 { |
| final LbHelperImpl helper; |
| final NameResolver resolver; |
| |
| NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) { |
| this.helper = checkNotNull(helperImpl, "helperImpl"); |
| this.resolver = checkNotNull(resolver, "resolver"); |
| } |
| |
| @Override |
| public void onResult(final ResolutionResult resolutionResult) { |
| final class NamesResolved implements Runnable { |
| |
| @SuppressWarnings("ReferenceEquality") |
| @Override |
| public void run() { |
| if (ManagedChannelImpl.this.nameResolver != resolver) { |
| return; |
| } |
| |
| List<EquivalentAddressGroup> servers = resolutionResult.getAddresses(); |
| channelLogger.log( |
| ChannelLogLevel.DEBUG, |
| "Resolved address: {0}, config={1}", |
| servers, |
| resolutionResult.getAttributes()); |
| |
| if (lastResolutionState != ResolutionState.SUCCESS) { |
| channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers); |
| lastResolutionState = ResolutionState.SUCCESS; |
| } |
| |
| ConfigOrError configOrError = resolutionResult.getServiceConfig(); |
| ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes() |
| .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY); |
| InternalConfigSelector resolvedConfigSelector = |
| resolutionResult.getAttributes().get(InternalConfigSelector.KEY); |
| ManagedChannelServiceConfig validServiceConfig = |
| configOrError != null && configOrError.getConfig() != null |
| ? (ManagedChannelServiceConfig) configOrError.getConfig() |
| : null; |
| Status serviceConfigError = configOrError != null ? configOrError.getError() : null; |
| |
| ManagedChannelServiceConfig effectiveServiceConfig; |
| if (!lookUpServiceConfig) { |
| if (validServiceConfig != null) { |
| channelLogger.log( |
| ChannelLogLevel.INFO, |
| "Service config from name resolver discarded by channel settings"); |
| } |
| effectiveServiceConfig = |
| defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig; |
| if (resolvedConfigSelector != null) { |
| channelLogger.log( |
| ChannelLogLevel.INFO, |
| "Config selector from name resolver discarded by channel settings"); |
| } |
| realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector()); |
| } else { |
| // Try to use config if returned from name resolver |
| // Otherwise, try to use the default config if available |
| if (validServiceConfig != null) { |
| effectiveServiceConfig = validServiceConfig; |
| if (resolvedConfigSelector != null) { |
| realChannel.updateConfigSelector(resolvedConfigSelector); |
| if (effectiveServiceConfig.getDefaultConfigSelector() != null) { |
| channelLogger.log( |
| ChannelLogLevel.DEBUG, |
| "Method configs in service config will be discarded due to presence of" |
| + "config-selector"); |
| } |
| } else { |
| realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector()); |
| } |
| } else if (defaultServiceConfig != null) { |
| effectiveServiceConfig = defaultServiceConfig; |
| realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector()); |
| channelLogger.log( |
| ChannelLogLevel.INFO, |
| "Received no service config, using default service config"); |
| } else if (serviceConfigError != null) { |
| if (!serviceConfigUpdated) { |
| // First DNS lookup has invalid service config, and cannot fall back to default |
| channelLogger.log( |
| ChannelLogLevel.INFO, |
| "Fallback to error due to invalid first service config without default config"); |
| // This error could be an "inappropriate" control plane error that should not bleed |
| // through to client code using gRPC. We let them flow through here to the LB as |
| // we later check for these error codes when investigating pick results in |
| // GrpcUtil.getTransportFromPickResult(). |
| onError(configOrError.getError()); |
| if (resolutionResultListener != null) { |
| resolutionResultListener.resolutionAttempted(false); |
| } |
| return; |
| } else { |
| effectiveServiceConfig = lastServiceConfig; |
| } |
| } else { |
| effectiveServiceConfig = EMPTY_SERVICE_CONFIG; |
| realChannel.updateConfigSelector(null); |
| } |
| if (!effectiveServiceConfig.equals(lastServiceConfig)) { |
| channelLogger.log( |
| ChannelLogLevel.INFO, |
| "Service config changed{0}", |
| effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : ""); |
| lastServiceConfig = effectiveServiceConfig; |
| transportProvider.throttle = effectiveServiceConfig.getRetryThrottling(); |
| } |
| |
| try { |
| // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS |
| // and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But, |
| // lbNeedAddress is not deterministic |
| serviceConfigUpdated = true; |
| } catch (RuntimeException re) { |
| logger.log( |
| Level.WARNING, |
| "[" + getLogId() + "] Unexpected exception from parsing service config", |
| re); |
| } |
| } |
| |
| Attributes effectiveAttrs = resolutionResult.getAttributes(); |
| // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. |
| if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) { |
| Attributes.Builder attrBuilder = |
| effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY); |
| Map<String, ?> healthCheckingConfig = |
| effectiveServiceConfig.getHealthCheckingConfig(); |
| if (healthCheckingConfig != null) { |
| attrBuilder |
| .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig) |
| .build(); |
| } |
| Attributes attributes = attrBuilder.build(); |
| |
| boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses( |
| ResolvedAddresses.newBuilder() |
| .setAddresses(servers) |
| .setAttributes(attributes) |
| .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig()) |
| .build()); |
| // If a listener is provided, let it know if the addresses were accepted. |
| if (resolutionResultListener != null) { |
| resolutionResultListener.resolutionAttempted(lastAddressesAccepted); |
| } |
| } |
| } |
| } |
| |
| syncContext.execute(new NamesResolved()); |
| } |
| |
| @Override |
| public void onError(final Status error) { |
| checkArgument(!error.isOk(), "the error status must not be OK"); |
| final class NameResolverErrorHandler implements Runnable { |
| @Override |
| public void run() { |
| handleErrorInSyncContext(error); |
| } |
| } |
| |
| syncContext.execute(new NameResolverErrorHandler()); |
| } |
| |
| private void handleErrorInSyncContext(Status error) { |
| logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}", |
| new Object[] {getLogId(), error}); |
| realChannel.onConfigError(); |
| if (lastResolutionState != ResolutionState.ERROR) { |
| channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error); |
| lastResolutionState = ResolutionState.ERROR; |
| } |
| // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. |
| if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) { |
| return; |
| } |
| |
| helper.lb.handleNameResolutionError(error); |
| } |
| } |
| |
| private final class SubchannelImpl extends AbstractSubchannel { |
| final CreateSubchannelArgs args; |
| final InternalLogId subchannelLogId; |
| final ChannelLoggerImpl subchannelLogger; |
| final ChannelTracer subchannelTracer; |
| List<EquivalentAddressGroup> addressGroups; |
| InternalSubchannel subchannel; |
| boolean started; |
| boolean shutdown; |
| ScheduledHandle delayedShutdownTask; |
| |
| SubchannelImpl(CreateSubchannelArgs args) { |
| checkNotNull(args, "args"); |
| addressGroups = args.getAddresses(); |
| if (authorityOverride != null) { |
| List<EquivalentAddressGroup> eagsWithoutOverrideAttr = |
| stripOverrideAuthorityAttributes(args.getAddresses()); |
| args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build(); |
| } |
| this.args = args; |
| subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority()); |
| subchannelTracer = new ChannelTracer( |
| subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(), |
| "Subchannel for " + args.getAddresses()); |
| subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider); |
| } |
| |
| @Override |
| public void start(final SubchannelStateListener listener) { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| checkState(!started, "already started"); |
| checkState(!shutdown, "already shutdown"); |
| checkState(!terminating, "Channel is being terminated"); |
| started = true; |
| final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback { |
| // All callbacks are run in syncContext |
| @Override |
| void onTerminated(InternalSubchannel is) { |
| subchannels.remove(is); |
| channelz.removeSubchannel(is); |
| maybeTerminateChannel(); |
| } |
| |
| @Override |
| void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { |
| checkState(listener != null, "listener is null"); |
| listener.onSubchannelState(newState); |
| } |
| |
| @Override |
| void onInUse(InternalSubchannel is) { |
| inUseStateAggregator.updateObjectInUse(is, true); |
| } |
| |
| @Override |
| void onNotInUse(InternalSubchannel is) { |
| inUseStateAggregator.updateObjectInUse(is, false); |
| } |
| } |
| |
| final InternalSubchannel internalSubchannel = new InternalSubchannel( |
| args.getAddresses(), |
| authority(), |
| userAgent, |
| backoffPolicyProvider, |
| transportFactory, |
| transportFactory.getScheduledExecutorService(), |
| stopwatchSupplier, |
| syncContext, |
| new ManagedInternalSubchannelCallback(), |
| channelz, |
| callTracerFactory.create(), |
| subchannelTracer, |
| subchannelLogId, |
| subchannelLogger); |
| |
| channelTracer.reportEvent(new ChannelTrace.Event.Builder() |
| .setDescription("Child Subchannel started") |
| .setSeverity(ChannelTrace.Event.Severity.CT_INFO) |
| .setTimestampNanos(timeProvider.currentTimeNanos()) |
| .setSubchannelRef(internalSubchannel) |
| .build()); |
| |
| this.subchannel = internalSubchannel; |
| channelz.addSubchannel(internalSubchannel); |
| subchannels.add(internalSubchannel); |
| } |
| |
| @Override |
| InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() { |
| checkState(started, "not started"); |
| return subchannel; |
| } |
| |
| @Override |
| public void shutdown() { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| if (subchannel == null) { |
| // start() was not successful |
| shutdown = true; |
| return; |
| } |
| if (shutdown) { |
| if (terminating && delayedShutdownTask != null) { |
| // shutdown() was previously called when terminating == false, thus a delayed shutdown() |
| // was scheduled. Now since terminating == true, We should expedite the shutdown. |
| delayedShutdownTask.cancel(); |
| delayedShutdownTask = null; |
| // Will fall through to the subchannel.shutdown() at the end. |
| } else { |
| return; |
| } |
| } else { |
| shutdown = true; |
| } |
| // Add a delay to shutdown to deal with the race between 1) a transport being picked and |
| // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g., |
| // because of address change, or because LoadBalancer is shutdown by Channel entering idle |
| // mode). If (2) wins, the app will see a spurious error. We work around this by delaying |
| // shutdown of Subchannel for a few seconds here. |
| // |
| // TODO(zhangkun83): consider a better approach |
| // (https://github.com/grpc/grpc-java/issues/2562). |
| if (!terminating) { |
| final class ShutdownSubchannel implements Runnable { |
| @Override |
| public void run() { |
| subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); |
| } |
| } |
| |
| delayedShutdownTask = syncContext.schedule( |
| new LogExceptionRunnable(new ShutdownSubchannel()), |
| SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS, |
| transportFactory.getScheduledExecutorService()); |
| return; |
| } |
| // When terminating == true, no more real streams will be created. It's safe and also |
| // desirable to shutdown timely. |
| subchannel.shutdown(SHUTDOWN_STATUS); |
| } |
| |
| @Override |
| public void requestConnection() { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| checkState(started, "not started"); |
| subchannel.obtainActiveTransport(); |
| } |
| |
| @Override |
| public List<EquivalentAddressGroup> getAllAddresses() { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| checkState(started, "not started"); |
| return addressGroups; |
| } |
| |
| @Override |
| public Attributes getAttributes() { |
| return args.getAttributes(); |
| } |
| |
| @Override |
| public String toString() { |
| return subchannelLogId.toString(); |
| } |
| |
| @Override |
| public Channel asChannel() { |
| checkState(started, "not started"); |
| return new SubchannelChannel( |
| subchannel, balancerRpcExecutorHolder.getExecutor(), |
| transportFactory.getScheduledExecutorService(), |
| callTracerFactory.create(), |
| new AtomicReference<InternalConfigSelector>(null)); |
| } |
| |
| @Override |
| public Object getInternalSubchannel() { |
| checkState(started, "Subchannel is not started"); |
| return subchannel; |
| } |
| |
| @Override |
| public ChannelLogger getChannelLogger() { |
| return subchannelLogger; |
| } |
| |
| @Override |
| public void updateAddresses(List<EquivalentAddressGroup> addrs) { |
| syncContext.throwIfNotInThisSynchronizationContext(); |
| addressGroups = addrs; |
| if (authorityOverride != null) { |
| addrs = stripOverrideAuthorityAttributes(addrs); |
| } |
| subchannel.updateAddresses(addrs); |
| } |
| |
| private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes( |
| List<EquivalentAddressGroup> eags) { |
| List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>(); |
| for (EquivalentAddressGroup eag : eags) { |
| EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup( |
| eag.getAddresses(), |
| eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build()); |
| eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr); |
| } |
| return Collections.unmodifiableList(eagsWithoutOverrideAttr); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(this) |
| .add("logId", logId.getId()) |
| .add("target", target) |
| .toString(); |
| } |
| |
| /** |
| * Called from syncContext. |
| */ |
| private final class DelayedTransportListener implements ManagedClientTransport.Listener { |
| @Override |
| public void transportShutdown(Status s) { |
| checkState(shutdown.get(), "Channel must have been shut down"); |
| } |
| |
| @Override |
| public void transportReady() { |
| // Don't care |
| } |
| |
| @Override |
| public void transportInUse(final boolean inUse) { |
| inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); |
| } |
| |
| @Override |
| public void transportTerminated() { |
| checkState(shutdown.get(), "Channel must have been shut down"); |
| terminating = true; |
| shutdownNameResolverAndLoadBalancer(false); |
| // No need to call channelStateManager since we are already in SHUTDOWN state. |
| // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them |
| // here. |
| maybeShutdownNowSubchannels(); |
| maybeTerminateChannel(); |
| } |
| } |
| |
| /** |
| * Must be accessed from syncContext. |
| */ |
| private final class IdleModeStateAggregator extends InUseStateAggregator<Object> { |
| @Override |
| protected void handleInUse() { |
| exitIdleMode(); |
| } |
| |
| @Override |
| protected void handleNotInUse() { |
| if (shutdown.get()) { |
| return; |
| } |
| rescheduleIdleTimer(); |
| } |
| } |
| |
| /** |
| * Lazily request for Executor from an executor pool. |
| * Also act as an Executor directly to simply run a cmd |
| */ |
| @VisibleForTesting |
| static final class ExecutorHolder implements Executor { |
| private final ObjectPool<? extends Executor> pool; |
| private Executor executor; |
| |
| ExecutorHolder(ObjectPool<? extends Executor> executorPool) { |
| this.pool = checkNotNull(executorPool, "executorPool"); |
| } |
| |
| synchronized Executor getExecutor() { |
| if (executor == null) { |
| executor = checkNotNull(pool.getObject(), "%s.getObject()", executor); |
| } |
| return executor; |
| } |
| |
| synchronized void release() { |
| if (executor != null) { |
| executor = pool.returnObject(executor); |
| } |
| } |
| |
| @Override |
| public void execute(Runnable command) { |
| getExecutor().execute(command); |
| } |
| } |
| |
| private static final class RestrictedScheduledExecutor implements ScheduledExecutorService { |
| final ScheduledExecutorService delegate; |
| |
| private RestrictedScheduledExecutor(ScheduledExecutorService delegate) { |
| this.delegate = checkNotNull(delegate, "delegate"); |
| } |
| |
| @Override |
| public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { |
| return delegate.schedule(callable, delay, unit); |
| } |
| |
| @Override |
| public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) { |
| return delegate.schedule(cmd, delay, unit); |
| } |
| |
| @Override |
| public ScheduledFuture<?> scheduleAtFixedRate( |
| Runnable command, long initialDelay, long period, TimeUnit unit) { |
| return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); |
| } |
| |
| @Override |
| public ScheduledFuture<?> scheduleWithFixedDelay( |
| Runnable command, long initialDelay, long delay, TimeUnit unit) { |
| return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); |
| } |
| |
| @Override |
| public boolean awaitTermination(long timeout, TimeUnit unit) |
| throws InterruptedException { |
| return delegate.awaitTermination(timeout, unit); |
| } |
| |
| @Override |
| public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
| throws InterruptedException { |
| return delegate.invokeAll(tasks); |
| } |
| |
| @Override |
| public <T> List<Future<T>> invokeAll( |
| Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
| throws InterruptedException { |
| return delegate.invokeAll(tasks, timeout, unit); |
| } |
| |
| @Override |
| public <T> T invokeAny(Collection<? extends Callable<T>> tasks) |
| throws InterruptedException, ExecutionException { |
| return delegate.invokeAny(tasks); |
| } |
| |
| @Override |
| public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| return delegate.invokeAny(tasks, timeout, unit); |
| } |
| |
| @Override |
| public boolean isShutdown() { |
| return delegate.isShutdown(); |
| } |
| |
| @Override |
| public boolean isTerminated() { |
| return delegate.isTerminated(); |
| } |
| |
| @Override |
| public void shutdown() { |
| throw new UnsupportedOperationException("Restricted: shutdown() is not allowed"); |
| } |
| |
| @Override |
| public List<Runnable> shutdownNow() { |
| throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed"); |
| } |
| |
| @Override |
| public <T> Future<T> submit(Callable<T> task) { |
| return delegate.submit(task); |
| } |
| |
| @Override |
| public Future<?> submit(Runnable task) { |
| return delegate.submit(task); |
| } |
| |
| @Override |
| public <T> Future<T> submit(Runnable task, T result) { |
| return delegate.submit(task, result); |
| } |
| |
| @Override |
| public void execute(Runnable command) { |
| delegate.execute(command); |
| } |
| } |
| |
| /** |
| * A ResolutionState indicates the status of last name resolution. |
| */ |
| enum ResolutionState { |
| NO_RESOLUTION, |
| SUCCESS, |
| ERROR |
| } |
| } |