| /* |
| * Copyright 2017 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.internal; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Objects; |
| import io.grpc.Attributes; |
| import io.grpc.ClientStreamTracer; |
| import io.grpc.Compressor; |
| import io.grpc.Deadline; |
| import io.grpc.DecompressorRegistry; |
| import io.grpc.Metadata; |
| import io.grpc.MethodDescriptor; |
| import io.grpc.Status; |
| import io.grpc.SynchronizationContext; |
| import io.grpc.internal.ClientStreamListener.RpcProgress; |
| import java.io.InputStream; |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.annotation.CheckForNull; |
| import javax.annotation.CheckReturnValue; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| /** A logical {@link ClientStream} that is retriable. */ |
| abstract class RetriableStream<ReqT> implements ClientStream { |
| @VisibleForTesting |
| static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS = |
| Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER); |
| |
| @VisibleForTesting |
| static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS = |
| Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER); |
| |
| private static final Status CANCELLED_BECAUSE_COMMITTED = |
| Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed"); |
| |
| private final MethodDescriptor<ReqT, ?> method; |
| private final Executor callExecutor; |
| private final Executor listenerSerializeExecutor = new SynchronizationContext( |
| new UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| throw Status.fromThrowable(e) |
| .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.") |
| .asRuntimeException(); |
| } |
| } |
| ); |
| private final ScheduledExecutorService scheduledExecutorService; |
| // Must not modify it. |
| private final Metadata headers; |
| @Nullable |
| private final RetryPolicy retryPolicy; |
| @Nullable |
| private final HedgingPolicy hedgingPolicy; |
| private final boolean isHedging; |
| |
| /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */ |
| private final Object lock = new Object(); |
| |
| private final ChannelBufferMeter channelBufferUsed; |
| private final long perRpcBufferLimit; |
| private final long channelBufferLimit; |
| @Nullable |
| private final Throttle throttle; |
| @GuardedBy("lock") |
| private final InsightBuilder closedSubstreamsInsight = new InsightBuilder(); |
| |
| private volatile State state = new State( |
| new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false, |
| false, 0); |
| |
| /** |
| * Either transparent retry happened or reached server's application logic. |
| */ |
| private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean(); |
| |
| // Used for recording the share of buffer used for the current call out of the channel buffer. |
| // This field would not be necessary if there is no channel buffer limit. |
| @GuardedBy("lock") |
| private long perRpcBufferUsed; |
| |
| private ClientStreamListener masterListener; |
| @GuardedBy("lock") |
| private FutureCanceller scheduledRetry; |
| @GuardedBy("lock") |
| private FutureCanceller scheduledHedging; |
| private long nextBackoffIntervalNanos; |
| private Status cancellationStatus; |
| private boolean isClosed; |
| |
| RetriableStream( |
| MethodDescriptor<ReqT, ?> method, Metadata headers, |
| ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, |
| Executor callExecutor, ScheduledExecutorService scheduledExecutorService, |
| @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy, |
| @Nullable Throttle throttle) { |
| this.method = method; |
| this.channelBufferUsed = channelBufferUsed; |
| this.perRpcBufferLimit = perRpcBufferLimit; |
| this.channelBufferLimit = channelBufferLimit; |
| this.callExecutor = callExecutor; |
| this.scheduledExecutorService = scheduledExecutorService; |
| this.headers = headers; |
| this.retryPolicy = retryPolicy; |
| if (retryPolicy != null) { |
| this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; |
| } |
| this.hedgingPolicy = hedgingPolicy; |
| checkArgument( |
| retryPolicy == null || hedgingPolicy == null, |
| "Should not provide both retryPolicy and hedgingPolicy"); |
| this.isHedging = hedgingPolicy != null; |
| this.throttle = throttle; |
| } |
| |
| @SuppressWarnings("GuardedBy") |
| @Nullable // null if already committed |
| @CheckReturnValue |
| private Runnable commit(final Substream winningSubstream) { |
| |
| synchronized (lock) { |
| if (state.winningSubstream != null) { |
| return null; |
| } |
| final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams; |
| |
| state = state.committed(winningSubstream); |
| |
| // subtract the share of this RPC from channelBufferUsed. |
| channelBufferUsed.addAndGet(-perRpcBufferUsed); |
| |
| final Future<?> retryFuture; |
| if (scheduledRetry != null) { |
| // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead |
| // found: 'this.lock' |
| retryFuture = scheduledRetry.markCancelled(); |
| scheduledRetry = null; |
| } else { |
| retryFuture = null; |
| } |
| // cancel the scheduled hedging if it is scheduled prior to the commitment |
| final Future<?> hedgingFuture; |
| if (scheduledHedging != null) { |
| // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead |
| // found: 'this.lock' |
| hedgingFuture = scheduledHedging.markCancelled(); |
| scheduledHedging = null; |
| } else { |
| hedgingFuture = null; |
| } |
| |
| class CommitTask implements Runnable { |
| @Override |
| public void run() { |
| // For hedging only, not needed for normal retry |
| for (Substream substream : savedDrainedSubstreams) { |
| if (substream != winningSubstream) { |
| substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); |
| } |
| } |
| if (retryFuture != null) { |
| retryFuture.cancel(false); |
| } |
| if (hedgingFuture != null) { |
| hedgingFuture.cancel(false); |
| } |
| |
| postCommit(); |
| } |
| } |
| |
| return new CommitTask(); |
| } |
| } |
| |
| abstract void postCommit(); |
| |
| /** |
| * Calls commit() and if successful runs the post commit task. |
| */ |
| private void commitAndRun(Substream winningSubstream) { |
| Runnable postCommitTask = commit(winningSubstream); |
| |
| if (postCommitTask != null) { |
| postCommitTask.run(); |
| } |
| } |
| |
| private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) { |
| Substream sub = new Substream(previousAttemptCount); |
| // one tracer per substream |
| final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); |
| ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.InternalLimitedInfoFactory() { |
| @Override |
| public ClientStreamTracer newClientStreamTracer( |
| ClientStreamTracer.StreamInfo info, Metadata headers) { |
| return bufferSizeTracer; |
| } |
| }; |
| |
| Metadata newHeaders = updateHeaders(headers, previousAttemptCount); |
| // NOTICE: This set _must_ be done before stream.start() and it actually is. |
| sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry); |
| return sub; |
| } |
| |
| /** |
| * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned |
| * Client stream is not yet started. |
| */ |
| abstract ClientStream newSubstream( |
| Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, |
| boolean isTransparentRetry); |
| |
| /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */ |
| @VisibleForTesting |
| final Metadata updateHeaders( |
| Metadata originalHeaders, int previousAttemptCount) { |
| Metadata newHeaders = new Metadata(); |
| newHeaders.merge(originalHeaders); |
| if (previousAttemptCount > 0) { |
| newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount)); |
| } |
| return newHeaders; |
| } |
| |
| private void drain(Substream substream) { |
| int index = 0; |
| int chunk = 0x80; |
| List<BufferEntry> list = null; |
| boolean streamStarted = false; |
| Runnable onReadyRunnable = null; |
| |
| while (true) { |
| State savedState; |
| |
| synchronized (lock) { |
| savedState = state; |
| if (streamStarted) { |
| if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { |
| // committed but not me, to be cancelled |
| break; |
| } |
| if (savedState.cancelled) { |
| break; |
| } |
| } |
| if (index == savedState.buffer.size()) { // I'm drained |
| state = savedState.substreamDrained(substream); |
| if (!isReady()) { |
| return; |
| } |
| onReadyRunnable = new Runnable() { |
| @Override |
| public void run() { |
| if (!isClosed) { |
| masterListener.onReady(); |
| } |
| } |
| }; |
| break; |
| } |
| |
| if (substream.closed) { |
| return; |
| } |
| |
| int stop = Math.min(index + chunk, savedState.buffer.size()); |
| if (list == null) { |
| list = new ArrayList<>(savedState.buffer.subList(index, stop)); |
| } else { |
| list.clear(); |
| list.addAll(savedState.buffer.subList(index, stop)); |
| } |
| index = stop; |
| } |
| |
| for (BufferEntry bufferEntry : list) { |
| bufferEntry.runWith(substream); |
| if (bufferEntry instanceof RetriableStream.StartEntry) { |
| streamStarted = true; |
| } |
| if (streamStarted) { |
| savedState = state; |
| if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { |
| // committed but not me, to be cancelled |
| break; |
| } |
| if (savedState.cancelled) { |
| break; |
| } |
| } |
| } |
| } |
| |
| if (onReadyRunnable != null) { |
| listenerSerializeExecutor.execute(onReadyRunnable); |
| return; |
| } |
| |
| substream.stream.cancel( |
| state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); |
| } |
| |
| /** |
| * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown. |
| */ |
| @CheckReturnValue |
| @Nullable |
| abstract Status prestart(); |
| |
| class StartEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.start(new Sublistener(substream)); |
| } |
| } |
| |
| /** Starts the first PRC attempt. */ |
| @Override |
| public final void start(ClientStreamListener listener) { |
| masterListener = listener; |
| |
| Status shutdownStatus = prestart(); |
| |
| if (shutdownStatus != null) { |
| cancel(shutdownStatus); |
| return; |
| } |
| |
| synchronized (lock) { |
| state.buffer.add(new StartEntry()); |
| } |
| |
| Substream substream = createSubstream(0, false); |
| if (isHedging) { |
| FutureCanceller scheduledHedgingRef = null; |
| |
| synchronized (lock) { |
| state = state.addActiveHedge(substream); |
| if (hasPotentialHedging(state) |
| && (throttle == null || throttle.isAboveThreshold())) { |
| scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock); |
| } |
| } |
| |
| if (scheduledHedgingRef != null) { |
| scheduledHedgingRef.setFuture( |
| scheduledExecutorService.schedule( |
| new HedgingRunnable(scheduledHedgingRef), |
| hedgingPolicy.hedgingDelayNanos, |
| TimeUnit.NANOSECONDS)); |
| } |
| } |
| |
| drain(substream); |
| } |
| |
| @SuppressWarnings("GuardedBy") |
| private void pushbackHedging(@Nullable Integer delayMillis) { |
| if (delayMillis == null) { |
| return; |
| } |
| if (delayMillis < 0) { |
| freezeHedging(); |
| return; |
| } |
| |
| // Cancels the current scheduledHedging and reschedules a new one. |
| FutureCanceller future; |
| Future<?> futureToBeCancelled; |
| |
| synchronized (lock) { |
| if (scheduledHedging == null) { |
| return; |
| } |
| |
| // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead |
| // found: 'this.lock' |
| futureToBeCancelled = scheduledHedging.markCancelled(); |
| scheduledHedging = future = new FutureCanceller(lock); |
| } |
| |
| if (futureToBeCancelled != null) { |
| futureToBeCancelled.cancel(false); |
| } |
| future.setFuture(scheduledExecutorService.schedule( |
| new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS)); |
| } |
| |
| private final class HedgingRunnable implements Runnable { |
| |
| // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed |
| // by a positive push-back just after newSubstream is instantiated, so that we can double check. |
| final FutureCanceller scheduledHedgingRef; |
| |
| HedgingRunnable(FutureCanceller scheduledHedging) { |
| scheduledHedgingRef = scheduledHedging; |
| } |
| |
| @Override |
| public void run() { |
| callExecutor.execute( |
| new Runnable() { |
| @SuppressWarnings("GuardedBy") |
| @Override |
| public void run() { |
| // It's safe to read state.hedgingAttemptCount here. |
| // If this run is not cancelled, the value of state.hedgingAttemptCount won't change |
| // until state.addActiveHedge() is called subsequently, even the state could possibly |
| // change. |
| Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); |
| boolean cancelled = false; |
| FutureCanceller future = null; |
| |
| synchronized (lock) { |
| // TODO(b/145386688): This access should be guarded by |
| // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found: |
| // 'RetriableStream.this.lock' |
| if (scheduledHedgingRef.isCancelled()) { |
| cancelled = true; |
| } else { |
| state = state.addActiveHedge(newSubstream); |
| if (hasPotentialHedging(state) |
| && (throttle == null || throttle.isAboveThreshold())) { |
| scheduledHedging = future = new FutureCanceller(lock); |
| } else { |
| state = state.freezeHedging(); |
| scheduledHedging = null; |
| } |
| } |
| } |
| |
| if (cancelled) { |
| newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging")); |
| return; |
| } |
| if (future != null) { |
| future.setFuture( |
| scheduledExecutorService.schedule( |
| new HedgingRunnable(future), |
| hedgingPolicy.hedgingDelayNanos, |
| TimeUnit.NANOSECONDS)); |
| } |
| drain(newSubstream); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public final void cancel(final Status reason) { |
| Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); |
| noopSubstream.stream = new NoopClientStream(); |
| Runnable runnable = commit(noopSubstream); |
| |
| if (runnable != null) { |
| runnable.run(); |
| listenerSerializeExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| isClosed = true; |
| masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); |
| |
| } |
| }); |
| return; |
| } |
| |
| Substream winningSubstreamToCancel = null; |
| synchronized (lock) { |
| if (state.drainedSubstreams.contains(state.winningSubstream)) { |
| winningSubstreamToCancel = state.winningSubstream; |
| } else { // the winningSubstream will be cancelled while draining |
| cancellationStatus = reason; |
| } |
| state = state.cancelled(); |
| } |
| if (winningSubstreamToCancel != null) { |
| winningSubstreamToCancel.stream.cancel(reason); |
| } |
| } |
| |
| private void delayOrExecute(BufferEntry bufferEntry) { |
| Collection<Substream> savedDrainedSubstreams; |
| synchronized (lock) { |
| if (!state.passThrough) { |
| state.buffer.add(bufferEntry); |
| } |
| savedDrainedSubstreams = state.drainedSubstreams; |
| } |
| |
| for (Substream substream : savedDrainedSubstreams) { |
| bufferEntry.runWith(substream); |
| } |
| } |
| |
| /** |
| * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use |
| * InputStream for buffering. |
| */ |
| @Override |
| public final void writeMessage(InputStream message) { |
| throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly"); |
| } |
| |
| final void sendMessage(final ReqT message) { |
| State savedState = state; |
| if (savedState.passThrough) { |
| savedState.winningSubstream.stream.writeMessage(method.streamRequest(message)); |
| return; |
| } |
| |
| class SendMessageEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.writeMessage(method.streamRequest(message)); |
| } |
| } |
| |
| delayOrExecute(new SendMessageEntry()); |
| } |
| |
| @Override |
| public final void request(final int numMessages) { |
| State savedState = state; |
| if (savedState.passThrough) { |
| savedState.winningSubstream.stream.request(numMessages); |
| return; |
| } |
| |
| class RequestEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.request(numMessages); |
| } |
| } |
| |
| delayOrExecute(new RequestEntry()); |
| } |
| |
| @Override |
| public final void flush() { |
| State savedState = state; |
| if (savedState.passThrough) { |
| savedState.winningSubstream.stream.flush(); |
| return; |
| } |
| |
| class FlushEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.flush(); |
| } |
| } |
| |
| delayOrExecute(new FlushEntry()); |
| } |
| |
| @Override |
| public final boolean isReady() { |
| for (Substream substream : state.drainedSubstreams) { |
| if (substream.stream.isReady()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void optimizeForDirectExecutor() { |
| class OptimizeDirectEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.optimizeForDirectExecutor(); |
| } |
| } |
| |
| delayOrExecute(new OptimizeDirectEntry()); |
| } |
| |
| @Override |
| public final void setCompressor(final Compressor compressor) { |
| class CompressorEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setCompressor(compressor); |
| } |
| } |
| |
| delayOrExecute(new CompressorEntry()); |
| } |
| |
| @Override |
| public final void setFullStreamDecompression(final boolean fullStreamDecompression) { |
| class FullStreamDecompressionEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setFullStreamDecompression(fullStreamDecompression); |
| } |
| } |
| |
| delayOrExecute(new FullStreamDecompressionEntry()); |
| } |
| |
| @Override |
| public final void setMessageCompression(final boolean enable) { |
| class MessageCompressionEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setMessageCompression(enable); |
| } |
| } |
| |
| delayOrExecute(new MessageCompressionEntry()); |
| } |
| |
| @Override |
| public final void halfClose() { |
| class HalfCloseEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.halfClose(); |
| } |
| } |
| |
| delayOrExecute(new HalfCloseEntry()); |
| } |
| |
| @Override |
| public final void setAuthority(final String authority) { |
| class AuthorityEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setAuthority(authority); |
| } |
| } |
| |
| delayOrExecute(new AuthorityEntry()); |
| } |
| |
| @Override |
| public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { |
| class DecompressorRegistryEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setDecompressorRegistry(decompressorRegistry); |
| } |
| } |
| |
| delayOrExecute(new DecompressorRegistryEntry()); |
| } |
| |
| @Override |
| public final void setMaxInboundMessageSize(final int maxSize) { |
| class MaxInboundMessageSizeEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setMaxInboundMessageSize(maxSize); |
| } |
| } |
| |
| delayOrExecute(new MaxInboundMessageSizeEntry()); |
| } |
| |
| @Override |
| public final void setMaxOutboundMessageSize(final int maxSize) { |
| class MaxOutboundMessageSizeEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setMaxOutboundMessageSize(maxSize); |
| } |
| } |
| |
| delayOrExecute(new MaxOutboundMessageSizeEntry()); |
| } |
| |
| @Override |
| public final void setDeadline(final Deadline deadline) { |
| class DeadlineEntry implements BufferEntry { |
| @Override |
| public void runWith(Substream substream) { |
| substream.stream.setDeadline(deadline); |
| } |
| } |
| |
| delayOrExecute(new DeadlineEntry()); |
| } |
| |
| @Override |
| public final Attributes getAttributes() { |
| if (state.winningSubstream != null) { |
| return state.winningSubstream.stream.getAttributes(); |
| } |
| return Attributes.EMPTY; |
| } |
| |
| @Override |
| public void appendTimeoutInsight(InsightBuilder insight) { |
| State currentState; |
| synchronized (lock) { |
| insight.appendKeyValue("closed", closedSubstreamsInsight); |
| currentState = state; |
| } |
| if (currentState.winningSubstream != null) { |
| // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor |
| // of the winning substream, they may not have received closed() notifications yet, thus they |
| // may be missing from closedSubstreamsInsight. This may be a little confusing to the user. |
| // We need to figure out how to include them. |
| InsightBuilder substreamInsight = new InsightBuilder(); |
| currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight); |
| insight.appendKeyValue("committed", substreamInsight); |
| } else { |
| InsightBuilder openSubstreamsInsight = new InsightBuilder(); |
| // drainedSubstreams doesn't include all open substreams. Those which have just been created |
| // and are still catching up with buffered requests (in other words, still draining) will not |
| // show up. We think this is benign, because the draining should be typically fast, and it'd |
| // be indistinguishable from the case where those streams are to be created a little late due |
| // to delays in the timer. |
| for (Substream sub : currentState.drainedSubstreams) { |
| InsightBuilder substreamInsight = new InsightBuilder(); |
| sub.stream.appendTimeoutInsight(substreamInsight); |
| openSubstreamsInsight.append(substreamInsight); |
| } |
| insight.appendKeyValue("open", openSubstreamsInsight); |
| } |
| } |
| |
| private static Random random = new Random(); |
| |
| @VisibleForTesting |
| static void setRandom(Random random) { |
| RetriableStream.random = random; |
| } |
| |
| /** |
| * Whether there is any potential hedge at the moment. A false return value implies there is |
| * absolutely no potential hedge. At least one of the hedges will observe a false return value |
| * when calling this method, unless otherwise the rpc is committed. |
| */ |
| // only called when isHedging is true |
| @GuardedBy("lock") |
| private boolean hasPotentialHedging(State state) { |
| return state.winningSubstream == null |
| && state.hedgingAttemptCount < hedgingPolicy.maxAttempts |
| && !state.hedgingFrozen; |
| } |
| |
| @SuppressWarnings("GuardedBy") |
| private void freezeHedging() { |
| Future<?> futureToBeCancelled = null; |
| synchronized (lock) { |
| if (scheduledHedging != null) { |
| // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead |
| // found: 'this.lock' |
| futureToBeCancelled = scheduledHedging.markCancelled(); |
| scheduledHedging = null; |
| } |
| state = state.freezeHedging(); |
| } |
| |
| if (futureToBeCancelled != null) { |
| futureToBeCancelled.cancel(false); |
| } |
| } |
| |
| private interface BufferEntry { |
| /** Replays the buffer entry with the given stream. */ |
| void runWith(Substream substream); |
| } |
| |
| private final class Sublistener implements ClientStreamListener { |
| final Substream substream; |
| |
| Sublistener(Substream substream) { |
| this.substream = substream; |
| } |
| |
| @Override |
| public void headersRead(final Metadata headers) { |
| commitAndRun(substream); |
| if (state.winningSubstream == substream) { |
| if (throttle != null) { |
| throttle.onSuccess(); |
| } |
| listenerSerializeExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| masterListener.headersRead(headers); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void closed( |
| final Status status, final RpcProgress rpcProgress, final Metadata trailers) { |
| synchronized (lock) { |
| state = state.substreamClosed(substream); |
| closedSubstreamsInsight.append(status.getCode()); |
| } |
| |
| // handle a race between buffer limit exceeded and closed, when setting |
| // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream). |
| if (substream.bufferLimitExceeded) { |
| commitAndRun(substream); |
| if (state.winningSubstream == substream) { |
| listenerSerializeExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| isClosed = true; |
| masterListener.closed(status, rpcProgress, trailers); |
| } |
| }); |
| } |
| return; |
| } |
| |
| if (state.winningSubstream == null) { |
| if (rpcProgress == RpcProgress.REFUSED |
| && noMoreTransparentRetry.compareAndSet(false, true)) { |
| // transparent retry |
| final Substream newSubstream = createSubstream(substream.previousAttemptCount, true); |
| if (isHedging) { |
| boolean commit = false; |
| synchronized (lock) { |
| // Although this operation is not done atomically with |
| // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of |
| // activeHedges, so neither does it affect the commitment decision of other threads, |
| // nor do the commitment decision making threads affect itself. |
| state = state.replaceActiveHedge(substream, newSubstream); |
| |
| // optimization for early commit |
| if (!hasPotentialHedging(state) |
| && state.activeHedges.size() == 1) { |
| commit = true; |
| } |
| } |
| if (commit) { |
| commitAndRun(newSubstream); |
| } |
| } else { |
| if (retryPolicy == null || retryPolicy.maxAttempts == 1) { |
| // optimization for early commit |
| commitAndRun(newSubstream); |
| } |
| } |
| callExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| drain(newSubstream); |
| } |
| }); |
| return; |
| } else if (rpcProgress == RpcProgress.DROPPED) { |
| // For normal retry, nothing need be done here, will just commit. |
| // For hedging, cancel scheduled hedge that is scheduled prior to the drop |
| if (isHedging) { |
| freezeHedging(); |
| } |
| } else { |
| noMoreTransparentRetry.set(true); |
| |
| if (isHedging) { |
| HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers); |
| if (hedgingPlan.isHedgeable) { |
| pushbackHedging(hedgingPlan.hedgingPushbackMillis); |
| } |
| synchronized (lock) { |
| state = state.removeActiveHedge(substream); |
| // The invariant is whether or not #(Potential Hedge + active hedges) > 0. |
| // Once hasPotentialHedging(state) is false, it will always be false, and then |
| // #(state.activeHedges) will be decreasing. This guarantees that even there may be |
| // multiple concurrent hedges, one of the hedges will end up committed. |
| if (hedgingPlan.isHedgeable) { |
| if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { |
| return; |
| } |
| // else, no activeHedges, no new hedges possible, try to commit |
| } // else, isHedgeable is false, try to commit |
| } |
| } else { |
| RetryPlan retryPlan = makeRetryDecision(status, trailers); |
| if (retryPlan.shouldRetry) { |
| // The check state.winningSubstream == null, checking if is not already committed, is |
| // racy, but is still safe b/c the retry will also handle committed/cancellation |
| FutureCanceller scheduledRetryCopy; |
| synchronized (lock) { |
| scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); |
| } |
| class RetryBackoffRunnable implements Runnable { |
| @Override |
| public void run() { |
| callExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| // retry |
| Substream newSubstream = createSubstream( |
| substream.previousAttemptCount + 1, |
| false); |
| drain(newSubstream); |
| } |
| }); |
| } |
| } |
| |
| scheduledRetryCopy.setFuture( |
| scheduledExecutorService.schedule( |
| new RetryBackoffRunnable(), |
| retryPlan.backoffNanos, |
| TimeUnit.NANOSECONDS)); |
| return; |
| } |
| } |
| } |
| } |
| |
| commitAndRun(substream); |
| if (state.winningSubstream == substream) { |
| listenerSerializeExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| isClosed = true; |
| masterListener.closed(status, rpcProgress, trailers); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Decides in current situation whether or not the RPC should retry and if it should retry how |
| * long the backoff should be. The decision does not take the commitment status into account, so |
| * caller should check it separately. It also updates the throttle. It does not change state. |
| */ |
| private RetryPlan makeRetryDecision(Status status, Metadata trailer) { |
| if (retryPolicy == null) { |
| return new RetryPlan(false, 0); |
| } |
| boolean shouldRetry = false; |
| long backoffNanos = 0L; |
| boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode()); |
| Integer pushbackMillis = getPushbackMills(trailer); |
| boolean isThrottled = false; |
| if (throttle != null) { |
| if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) { |
| isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); |
| } |
| } |
| |
| if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) { |
| if (pushbackMillis == null) { |
| if (isRetryableStatusCode) { |
| shouldRetry = true; |
| backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble()); |
| nextBackoffIntervalNanos = Math.min( |
| (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), |
| retryPolicy.maxBackoffNanos); |
| } // else no retry |
| } else if (pushbackMillis >= 0) { |
| shouldRetry = true; |
| backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis); |
| nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; |
| } // else no retry |
| } // else no retry |
| |
| return new RetryPlan(shouldRetry, backoffNanos); |
| } |
| |
| private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) { |
| Integer pushbackMillis = getPushbackMills(trailer); |
| boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); |
| boolean isThrottled = false; |
| if (throttle != null) { |
| if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) { |
| isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); |
| } |
| } |
| return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis); |
| } |
| |
| @Nullable |
| private Integer getPushbackMills(Metadata trailer) { |
| String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); |
| Integer pushbackMillis = null; |
| if (pushbackStr != null) { |
| try { |
| pushbackMillis = Integer.valueOf(pushbackStr); |
| } catch (NumberFormatException e) { |
| pushbackMillis = -1; |
| } |
| } |
| return pushbackMillis; |
| } |
| |
| @Override |
| public void messagesAvailable(final MessageProducer producer) { |
| State savedState = state; |
| checkState( |
| savedState.winningSubstream != null, "Headers should be received prior to messages."); |
| if (savedState.winningSubstream != substream) { |
| return; |
| } |
| listenerSerializeExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| masterListener.messagesAvailable(producer); |
| } |
| }); |
| } |
| |
| @Override |
| public void onReady() { |
| // FIXME(#7089): hedging case is broken. |
| if (!isReady()) { |
| return; |
| } |
| listenerSerializeExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| if (!isClosed) { |
| masterListener.onReady(); |
| } |
| } |
| }); |
| } |
| } |
| |
| private static final class State { |
| /** Committed and the winning substream drained. */ |
| final boolean passThrough; |
| |
| /** A list of buffered ClientStream runnables. Set to Null once passThrough. */ |
| @Nullable final List<BufferEntry> buffer; |
| |
| /** |
| * Unmodifiable collection of all the open substreams that are drained. Singleton once |
| * passThrough; Empty if committed but not passTrough. |
| */ |
| final Collection<Substream> drainedSubstreams; |
| |
| /** |
| * Unmodifiable collection of all the active hedging substreams. |
| * |
| * <p>A substream even with the attribute substream.closed being true may be considered still |
| * "active" at the moment as long as it is in this collection. |
| */ |
| final Collection<Substream> activeHedges; // not null once isHedging = true |
| |
| final int hedgingAttemptCount; |
| |
| /** Null until committed. */ |
| @Nullable final Substream winningSubstream; |
| |
| /** Not required to set to true when cancelled, but can short-circuit the draining process. */ |
| final boolean cancelled; |
| |
| /** No more hedging due to events like drop or pushback. */ |
| final boolean hedgingFrozen; |
| |
| State( |
| @Nullable List<BufferEntry> buffer, |
| Collection<Substream> drainedSubstreams, |
| Collection<Substream> activeHedges, |
| @Nullable Substream winningSubstream, |
| boolean cancelled, |
| boolean passThrough, |
| boolean hedgingFrozen, |
| int hedgingAttemptCount) { |
| this.buffer = buffer; |
| this.drainedSubstreams = |
| checkNotNull(drainedSubstreams, "drainedSubstreams"); |
| this.winningSubstream = winningSubstream; |
| this.activeHedges = activeHedges; |
| this.cancelled = cancelled; |
| this.passThrough = passThrough; |
| this.hedgingFrozen = hedgingFrozen; |
| this.hedgingAttemptCount = hedgingAttemptCount; |
| |
| checkState(!passThrough || buffer == null, "passThrough should imply buffer is null"); |
| checkState( |
| !passThrough || winningSubstream != null, |
| "passThrough should imply winningSubstream != null"); |
| checkState( |
| !passThrough |
| || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream)) |
| || (drainedSubstreams.size() == 0 && winningSubstream.closed), |
| "passThrough should imply winningSubstream is drained"); |
| checkState(!cancelled || winningSubstream != null, "cancelled should imply committed"); |
| } |
| |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| State cancelled() { |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } |
| |
| /** The given substream is drained. */ |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| State substreamDrained(Substream substream) { |
| checkState(!passThrough, "Already passThrough"); |
| |
| Collection<Substream> drainedSubstreams; |
| |
| if (substream.closed) { |
| drainedSubstreams = this.drainedSubstreams; |
| } else if (this.drainedSubstreams.isEmpty()) { |
| // optimize for 0-retry, which is most of the cases. |
| drainedSubstreams = Collections.singletonList(substream); |
| } else { |
| drainedSubstreams = new ArrayList<>(this.drainedSubstreams); |
| drainedSubstreams.add(substream); |
| drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams); |
| } |
| |
| boolean passThrough = winningSubstream != null; |
| |
| List<BufferEntry> buffer = this.buffer; |
| if (passThrough) { |
| checkState( |
| winningSubstream == substream, "Another RPC attempt has already committed"); |
| buffer = null; |
| } |
| |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } |
| |
| /** The given substream is closed. */ |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| State substreamClosed(Substream substream) { |
| substream.closed = true; |
| if (this.drainedSubstreams.contains(substream)) { |
| Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams); |
| drainedSubstreams.remove(substream); |
| drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams); |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } else { |
| return this; |
| } |
| } |
| |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| State committed(Substream winningSubstream) { |
| checkState(this.winningSubstream == null, "Already committed"); |
| |
| boolean passThrough = false; |
| List<BufferEntry> buffer = this.buffer; |
| Collection<Substream> drainedSubstreams; |
| |
| if (this.drainedSubstreams.contains(winningSubstream)) { |
| passThrough = true; |
| buffer = null; |
| drainedSubstreams = Collections.singleton(winningSubstream); |
| } else { |
| drainedSubstreams = Collections.emptyList(); |
| } |
| |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } |
| |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| State freezeHedging() { |
| if (hedgingFrozen) { |
| return this; |
| } |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| true, hedgingAttemptCount); |
| } |
| |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| // state.hedgingAttemptCount is modified only here. |
| // The method is only called in RetriableStream.start() and HedgingRunnable.run() |
| State addActiveHedge(Substream substream) { |
| // hasPotentialHedging must be true |
| checkState(!hedgingFrozen, "hedging frozen"); |
| checkState(winningSubstream == null, "already committed"); |
| |
| Collection<Substream> activeHedges; |
| if (this.activeHedges == null) { |
| activeHedges = Collections.singleton(substream); |
| } else { |
| activeHedges = new ArrayList<>(this.activeHedges); |
| activeHedges.add(substream); |
| activeHedges = Collections.unmodifiableCollection(activeHedges); |
| } |
| |
| int hedgingAttemptCount = this.hedgingAttemptCount + 1; |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } |
| |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| // The method is only called in Sublistener.closed() |
| State removeActiveHedge(Substream substream) { |
| Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges); |
| activeHedges.remove(substream); |
| activeHedges = Collections.unmodifiableCollection(activeHedges); |
| |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } |
| |
| @CheckReturnValue |
| // GuardedBy RetriableStream.lock |
| // The method is only called for transparent retry. |
| State replaceActiveHedge(Substream oldOne, Substream newOne) { |
| Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges); |
| activeHedges.remove(oldOne); |
| activeHedges.add(newOne); |
| activeHedges = Collections.unmodifiableCollection(activeHedges); |
| |
| return new State( |
| buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, |
| hedgingFrozen, hedgingAttemptCount); |
| } |
| } |
| |
| /** |
| * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful |
| * attributes. |
| */ |
| private static final class Substream { |
| ClientStream stream; |
| |
| // GuardedBy RetriableStream.lock |
| boolean closed; |
| |
| // setting to true must be GuardedBy RetriableStream.lock |
| boolean bufferLimitExceeded; |
| |
| final int previousAttemptCount; |
| |
| Substream(int previousAttemptCount) { |
| this.previousAttemptCount = previousAttemptCount; |
| } |
| } |
| |
| |
| /** |
| * Traces the buffer used by a substream. |
| */ |
| class BufferSizeTracer extends ClientStreamTracer { |
| // Each buffer size tracer is dedicated to one specific substream. |
| private final Substream substream; |
| |
| @GuardedBy("lock") |
| long bufferNeeded; |
| |
| BufferSizeTracer(Substream substream) { |
| this.substream = substream; |
| } |
| |
| /** |
| * A message is sent to the wire, so its reference would be released if no retry or |
| * hedging were involved. So at this point we have to hold the reference of the message longer |
| * for retry, and we need to increment {@code substream.bufferNeeded}. |
| */ |
| @Override |
| public void outboundWireSize(long bytes) { |
| if (state.winningSubstream != null) { |
| return; |
| } |
| |
| Runnable postCommitTask = null; |
| |
| // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound. |
| synchronized (lock) { |
| if (state.winningSubstream != null || substream.closed) { |
| return; |
| } |
| bufferNeeded += bytes; |
| if (bufferNeeded <= perRpcBufferUsed) { |
| return; |
| } |
| |
| if (bufferNeeded > perRpcBufferLimit) { |
| substream.bufferLimitExceeded = true; |
| } else { |
| // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit. |
| long savedChannelBufferUsed = |
| channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed); |
| perRpcBufferUsed = bufferNeeded; |
| |
| if (savedChannelBufferUsed > channelBufferLimit) { |
| substream.bufferLimitExceeded = true; |
| } |
| } |
| |
| if (substream.bufferLimitExceeded) { |
| postCommitTask = commit(substream); |
| } |
| } |
| |
| if (postCommitTask != null) { |
| postCommitTask.run(); |
| } |
| } |
| } |
| |
| /** |
| * Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for |
| * the Channel. There should be a single instance of it for each channel. |
| */ |
| static final class ChannelBufferMeter { |
| private final AtomicLong bufferUsed = new AtomicLong(); |
| |
| @VisibleForTesting |
| long addAndGet(long newBytesUsed) { |
| return bufferUsed.addAndGet(newBytesUsed); |
| } |
| } |
| |
| /** |
| * Used for retry throttling. |
| */ |
| static final class Throttle { |
| |
| private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000; |
| |
| /** |
| * 1000 times the maxTokens field of the retryThrottling policy in service config. |
| * The number of tokens starts at maxTokens. The token_count will always be between 0 and |
| * maxTokens. |
| */ |
| final int maxTokens; |
| |
| /** |
| * Half of {@code maxTokens}. |
| */ |
| final int threshold; |
| |
| /** |
| * 1000 times the tokenRatio field of the retryThrottling policy in service config. |
| */ |
| final int tokenRatio; |
| |
| final AtomicInteger tokenCount = new AtomicInteger(); |
| |
| Throttle(float maxTokens, float tokenRatio) { |
| // tokenRatio is up to 3 decimal places |
| this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP); |
| this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP); |
| this.threshold = this.maxTokens / 2; |
| tokenCount.set(this.maxTokens); |
| } |
| |
| @VisibleForTesting |
| boolean isAboveThreshold() { |
| return tokenCount.get() > threshold; |
| } |
| |
| /** |
| * Counts down the token on qualified failure and checks if it is above the threshold |
| * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with |
| * a not-to-retry pushback. |
| */ |
| @VisibleForTesting |
| boolean onQualifiedFailureThenCheckIsAboveThreshold() { |
| while (true) { |
| int currentCount = tokenCount.get(); |
| if (currentCount == 0) { |
| return false; |
| } |
| int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP); |
| boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0)); |
| if (updated) { |
| return decremented > threshold; |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| void onSuccess() { |
| while (true) { |
| int currentCount = tokenCount.get(); |
| if (currentCount == maxTokens) { |
| break; |
| } |
| int incremented = currentCount + tokenRatio; |
| boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens)); |
| if (updated) { |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (!(o instanceof Throttle)) { |
| return false; |
| } |
| Throttle that = (Throttle) o; |
| return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(maxTokens, tokenRatio); |
| } |
| } |
| |
| private static final class RetryPlan { |
| final boolean shouldRetry; |
| final long backoffNanos; |
| |
| RetryPlan(boolean shouldRetry, long backoffNanos) { |
| this.shouldRetry = shouldRetry; |
| this.backoffNanos = backoffNanos; |
| } |
| } |
| |
| private static final class HedgingPlan { |
| final boolean isHedgeable; |
| @Nullable |
| final Integer hedgingPushbackMillis; |
| |
| public HedgingPlan( |
| boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) { |
| this.isHedgeable = isHedgeable; |
| this.hedgingPushbackMillis = hedgingPushbackMillis; |
| } |
| } |
| |
| /** Allows cancelling a Future without racing with setting the future. */ |
| private static final class FutureCanceller { |
| |
| final Object lock; |
| @GuardedBy("lock") |
| Future<?> future; |
| @GuardedBy("lock") |
| boolean cancelled; |
| |
| FutureCanceller(Object lock) { |
| this.lock = lock; |
| } |
| |
| void setFuture(Future<?> future) { |
| synchronized (lock) { |
| if (!cancelled) { |
| this.future = future; |
| } |
| } |
| } |
| |
| @GuardedBy("lock") |
| @CheckForNull // Must cancel the returned future if not null. |
| Future<?> markCancelled() { |
| cancelled = true; |
| return future; |
| } |
| |
| @GuardedBy("lock") |
| boolean isCancelled() { |
| return cancelled; |
| } |
| } |
| } |