blob: 2b1145d1c4b9e96f3c783b36bc5b3fbf59a85c0e [file] [log] [blame]
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A client transport that queues requests before a real transport is available. When {@link
* #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
* transport for each pending stream.
*
* <p>This transport owns every stream that it has created until a real transport has been picked
* for that stream, at which point the ownership of the stream is transferred to the real transport,
* thus the delayed transport stops owning the stream.
*/
final class DelayedClientTransport implements ManagedClientTransport {
// lazily allocated, since it is infrequently used.
private final InternalLogId logId =
InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
private final Object lock = new Object();
private final Executor defaultAppExecutor;
private final SynchronizationContext syncContext;
private Runnable reportTransportInUse;
private Runnable reportTransportNotInUse;
private Runnable reportTransportTerminated;
private Listener listener;
@Nonnull
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
/**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
* terminated.
*/
@GuardedBy("lock")
private Status shutdownStatus;
/**
* The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
* to idle.
*/
@GuardedBy("lock")
@Nullable
private SubchannelPicker lastPicker;
@GuardedBy("lock")
private long lastPickerVersion;
/**
* Creates a new delayed transport.
*
* @param defaultAppExecutor pending streams will create real streams and run bufferred operations
* in an application executor, which will be this executor, unless there is on provided in
* {@link CallOptions}.
* @param syncContext all listener callbacks of the delayed transport will be run from this
* SynchronizationContext.
*/
DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
this.defaultAppExecutor = defaultAppExecutor;
this.syncContext = syncContext;
}
@Override
public final Runnable start(final Listener listener) {
this.listener = listener;
reportTransportInUse = new Runnable() {
@Override
public void run() {
listener.transportInUse(true);
}
};
reportTransportNotInUse = new Runnable() {
@Override
public void run() {
listener.transportInUse(false);
}
};
reportTransportTerminated = new Runnable() {
@Override
public void run() {
listener.transportTerminated();
}
};
return null;
}
/**
* If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
* picker will be consulted.
*
* <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
* returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
*/
@Override
public final ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
ClientStreamTracer[] tracers) {
try {
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
SubchannelPicker picker = null;
long pickerVersion = -1;
while (true) {
synchronized (lock) {
if (shutdownStatus != null) {
return new FailingClientStream(shutdownStatus, tracers);
}
if (lastPicker == null) {
return createPendingStream(args, tracers);
}
// Check for second time through the loop, and whether anything changed
if (picker != null && pickerVersion == lastPickerVersion) {
return createPendingStream(args, tracers);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
PickResult pickResult = picker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
tracers);
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
// race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
}
} finally {
syncContext.drain();
}
}
/**
* Caller must call {@code syncContext.drain()} outside of lock because this method may
* schedule tasks on syncContext.
*/
@GuardedBy("lock")
private PendingStream createPendingStream(
PickSubchannelArgs args, ClientStreamTracer[] tracers) {
PendingStream pendingStream = new PendingStream(args, tracers);
pendingStreams.add(pendingStream);
if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse);
}
return pendingStream;
}
@Override
public final void ping(final PingCallback callback, Executor executor) {
throw new UnsupportedOperationException("This method is not expected to be called");
}
@Override
public ListenableFuture<SocketStats> getStats() {
SettableFuture<SocketStats> ret = SettableFuture.create();
ret.set(null);
return ret;
}
/**
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* more buffered streams.
*/
@Override
public final void shutdown(final Status status) {
synchronized (lock) {
if (shutdownStatus != null) {
return;
}
shutdownStatus = status;
syncContext.executeLater(new Runnable() {
@Override
public void run() {
listener.transportShutdown(status);
}
});
if (!hasPendingStreams() && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
}
syncContext.drain();
}
/**
* Shuts down this transport and cancels all streams that it owns, hence immediately terminates
* this transport.
*/
@Override
public final void shutdownNow(Status status) {
shutdown(status);
Collection<PendingStream> savedPendingStreams;
Runnable savedReportTransportTerminated;
synchronized (lock) {
savedPendingStreams = pendingStreams;
savedReportTransportTerminated = reportTransportTerminated;
reportTransportTerminated = null;
if (!pendingStreams.isEmpty()) {
pendingStreams = Collections.emptyList();
}
}
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) {
Runnable runnable = stream.setStream(
new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
if (runnable != null) {
// Drain in-line instead of using an executor as failing stream just throws everything
// away. This is essentially the same behavior as DelayedStream.cancel() but can be done
// before stream.start().
runnable.run();
}
}
syncContext.execute(savedReportTransportTerminated);
}
// If savedReportTransportTerminated == null, transportTerminated() has already been called in
// shutdown().
}
public final boolean hasPendingStreams() {
synchronized (lock) {
return !pendingStreams.isEmpty();
}
}
@VisibleForTesting
final int getPendingStreamsCount() {
synchronized (lock) {
return pendingStreams.size();
}
}
/**
* Use the picker to try picking a transport for every pending stream, proceed the stream if the
* pick is successful, otherwise keep it pending.
*
* <p>This method may be called concurrently with {@code newStream()}, and it's safe. All pending
* streams will be served by the latest picker (if a same picker is given more than once, they are
* considered different pickers) as soon as possible.
*
* <p>This method <strong>must not</strong> be called concurrently with itself.
*/
final void reprocess(@Nullable SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
synchronized (lock) {
lastPicker = picker;
lastPickerVersion++;
if (picker == null || !hasPendingStreams()) {
return;
}
toProcess = new ArrayList<>(pendingStreams);
}
ArrayList<PendingStream> toRemove = new ArrayList<>();
for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
Runnable runnable = stream.createRealStream(transport);
if (runnable != null) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
}
synchronized (lock) {
// Between this synchronized and the previous one:
// - Streams may have been cancelled, which may turn pendingStreams into emptiness.
// - shutdown() may be called, which may turn pendingStreams into null.
if (!hasPendingStreams()) {
return;
}
pendingStreams.removeAll(toRemove);
// Because delayed transport is long-lived, we take this opportunity to down-size the
// hashmap.
if (pendingStreams.isEmpty()) {
pendingStreams = new LinkedHashSet<>();
}
if (!hasPendingStreams()) {
// There may be a brief gap between delayed transport clearing in-use state, and first real
// transport starting streams and setting in-use state. During the gap the whole channel's
// in-use state may be false. However, it shouldn't cause spurious switching to idleness
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
}
}
syncContext.drain();
}
@Override
public InternalLogId getLogId() {
return logId;
}
private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args;
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
this.args = args;
this.tracers = tracers;
}
/** Runnable may be null. */
private Runnable createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
realStream = transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
tracers);
} finally {
context.detach(origContext);
}
return setStream(realStream);
}
@Override
public void cancel(Status reason) {
super.cancel(reason);
synchronized (lock) {
if (reportTransportTerminated != null) {
boolean justRemovedAnElement = pendingStreams.remove(this);
if (!hasPendingStreams() && justRemovedAnElement) {
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
}
}
}
syncContext.drain();
}
@Override
protected void onEarlyCancellation(Status reason) {
for (ClientStreamTracer tracer : tracers) {
tracer.streamClosed(reason);
}
}
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
if (args.getCallOptions().isWaitForReady()) {
insight.append("wait_for_ready");
}
super.appendTimeoutInsight(insight);
}
}
}