blob: 596a5ae353f2f7590c3bbf46c113d2e08d865dbc [file] [log] [blame]
/*
* Copyright (C) 2006 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.logging.Level.SEVERE;
import com.google.common.annotations.GwtCompatible;
import com.google.common.collect.ImmutableCollection;
import com.google.errorprone.annotations.ForOverride;
import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* A future whose value is derived from a collection of input futures.
*
* @param <InputT> the type of the individual inputs
* @param <OutputT> the type of the output (i.e. this) future
*/
@GwtCompatible
abstract class AggregateFuture<InputT, OutputT> extends AggregateFutureState<OutputT> {
private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName());
/**
* The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to
* propagate cancellation) and {@link #toString()}. To access the futures' <i>values</i>, {@code
* AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case
* of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs.
*/
/*
* In certain circumstances, this field might theoretically not be visible to an afterDone() call
* triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
*/
private @Nullable ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures;
private final boolean allMustSucceed;
private final boolean collectsValues;
AggregateFuture(
ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures,
boolean allMustSucceed,
boolean collectsValues) {
super(futures.size());
this.futures = checkNotNull(futures);
this.allMustSucceed = allMustSucceed;
this.collectsValues = collectsValues;
}
@Override
protected final void afterDone() {
super.afterDone();
ImmutableCollection<? extends Future<?>> localFutures = futures;
releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures`
if (isCancelled() & localFutures != null) {
boolean wasInterrupted = wasInterrupted();
for (Future<?> future : localFutures) {
future.cancel(wasInterrupted);
}
}
/*
* We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed
* again if some outstanding input fails.
*/
}
@Override
protected final String pendingToString() {
ImmutableCollection<? extends Future<?>> localFutures = futures;
if (localFutures != null) {
return "futures=" + localFutures;
}
return super.pendingToString();
}
/**
* Must be called at the end of each subclass's constructor. This method performs the "real"
* initialization; we can't put this in the constructor because, in the case where futures are
* already complete, we would not initialize the subclass before calling {@link
* #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed,
* we're guaranteed to have properly initialized the subclass.
*/
final void init() {
// Corner case: List is empty.
if (futures.isEmpty()) {
handleAllCompleted();
return;
}
// NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll
// need to handle RejectedExecutionException
if (allMustSucceed) {
// We need fail fast, so we have to keep track of which future failed so we can propagate
// the exception immediately
// Register a listener on each Future in the list to update the state of this future.
// Note that if all the futures on the list are done prior to completing this loop, the last
// call to addListener() will callback to setOneValue(), transitively call our cleanup
// listener, and set this.futures to null.
// This is not actually a problem, since the foreach only needs this.futures to be non-null
// at the beginning of the loop.
int i = 0;
for (final ListenableFuture<? extends InputT> future : futures) {
final int index = i++;
future.addListener(
new Runnable() {
@Override
public void run() {
try {
if (future.isCancelled()) {
// Clear futures prior to cancelling children. This sets our own state but lets
// the input futures keep running, as some of them may be used elsewhere.
futures = null;
cancel(false);
} else {
collectValueFromNonCancelledFuture(index, future);
}
} finally {
/*
* "null" means: There is no need to access `futures` again during
* `processCompleted` because we're reading each value during a call to
* handleOneInputDone.
*/
decrementCountAndMaybeComplete(null);
}
}
},
directExecutor());
}
} else {
/*
* We'll call the user callback or collect the values only when all inputs complete,
* regardless of whether some failed. This lets us avoid calling expensive methods like
* Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it
* lets all futures share the same listener.
*
* We store `localFutures` inside the listener because `this.futures` might be nulled out by
* the time the listener runs for the final future -- at which point we need to check all
* inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't
* need access to the futures again, so we can just pass `null`.
*
* TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization.
* If we make some other optimizations, this one will no longer be necessary. The optimization
* could actually hurt in some cases, as it forces us to keep all inputs in memory until the
* final input completes.
*/
final ImmutableCollection<? extends Future<? extends InputT>> localFutures =
collectsValues ? futures : null;
Runnable listener =
new Runnable() {
@Override
public void run() {
decrementCountAndMaybeComplete(localFutures);
}
};
for (ListenableFuture<? extends InputT> future : futures) {
future.addListener(listener, directExecutor());
}
}
}
/**
* Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
* throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
* throwable did not cause this future to fail, and it is the first time we've seen that
* particular Throwable.
*/
private void handleException(Throwable throwable) {
checkNotNull(throwable);
if (allMustSucceed) {
// As soon as the first one fails, make that failure the result of the output future.
// The results of all other inputs are then ignored (except for logging any failures).
boolean completedWithFailure = setException(throwable);
if (!completedWithFailure) {
// Go up the causal chain to see if we've already seen this cause; if we have, even if
// it's wrapped by a different exception, don't log it.
boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable);
if (firstTimeSeeingThisException) {
log(throwable);
return;
}
}
}
/*
* TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call
* handleException() at all.
*/
if (throwable instanceof Error) {
/*
* TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it
* returned true? This was intentional (CL 46470009), but it seems odd compared to how we
* normally handle Error.
*
* Similarly, do we really want to log the same Error more than once?
*/
log(throwable);
}
}
private static void log(Throwable throwable) {
String message =
(throwable instanceof Error)
? "Input Future failed with Error"
: "Got more than one input Future failure. Logging failures after the first";
logger.log(SEVERE, message, throwable);
}
@Override
final void addInitialException(Set<Throwable> seen) {
checkNotNull(seen);
if (!isCancelled()) {
// TODO(cpovirk): Think about whether we could/should use Verify to check this.
boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure());
}
}
/**
* Collects the result (success or failure) of one input future. The input must not have been
* cancelled. For details on when this is called, see {@link #collectOneValue}.
*/
private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) {
try {
// We get the result, even if collectOneValue is a no-op, so that we can fail fast.
collectOneValue(index, getDone(future));
} catch (ExecutionException e) {
handleException(e.getCause());
} catch (Throwable t) {
handleException(t);
}
}
private void decrementCountAndMaybeComplete(
@Nullable
ImmutableCollection<? extends Future<? extends InputT>>
futuresIfNeedToCollectAtCompletion) {
int newRemaining = decrementRemainingAndGet();
checkState(newRemaining >= 0, "Less than 0 remaining futures");
if (newRemaining == 0) {
processCompleted(futuresIfNeedToCollectAtCompletion);
}
}
private void processCompleted(
@Nullable
ImmutableCollection<? extends Future<? extends InputT>>
futuresIfNeedToCollectAtCompletion) {
if (futuresIfNeedToCollectAtCompletion != null) {
int i = 0;
for (Future<? extends InputT> future : futuresIfNeedToCollectAtCompletion) {
if (!future.isCancelled()) {
collectValueFromNonCancelledFuture(i, future);
}
i++;
}
}
clearSeenExceptions();
handleAllCompleted();
/*
* Null out fields, including some used in handleAllCompleted() above (like
* `CollectionFuture.values`). This might be a no-op: If this future completed during
* handleAllCompleted(), they will already have been nulled out. But in the case of
* whenAll*().call*(), this future may be pending until the callback runs -- or even longer in
* the case of callAsync(), which waits for the callback's returned future to complete.
*/
releaseResources(ALL_INPUT_FUTURES_PROCESSED);
}
/**
* Clears fields that are no longer needed after this future has completed -- or at least all its
* inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called).
* Often called multiple times (that is, both when the inputs complete and when the output
* completes).
*
* <p>This is similar to our proposed {@code afterCommit} method but not quite the same. See the
* description of CL 265462958.
*/
// TODO(user): Write more tests for memory retention.
@ForOverride
@OverridingMethodsMustInvokeSuper
void releaseResources(ReleaseResourcesReason reason) {
checkNotNull(reason);
/*
* All elements of `futures` are completed, or this future has already completed and read
* `futures` into a local variable (in preparation for propagating cancellation to them). In
* either case, no one needs to read `futures` for cancellation purposes later. (And
* cancellation purposes are the main reason to access `futures`, as discussed in its docs.)
*/
this.futures = null;
}
enum ReleaseResourcesReason {
OUTPUT_FUTURE_DONE,
ALL_INPUT_FUTURES_PROCESSED,
}
/**
* If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code
* collectsValues} is true, called for each future when all futures complete.
*/
abstract void collectOneValue(int index, @Nullable InputT returnValue);
abstract void handleAllCompleted();
/** Adds the chain to the seen set, and returns whether all the chain was new to us. */
private static boolean addCausalChain(Set<Throwable> seen, Throwable t) {
for (; t != null; t = t.getCause()) {
boolean firstTimeSeen = seen.add(t);
if (!firstTimeSeen) {
/*
* We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case
* where this isn't true, but we ignore it: If we record an exception, then someone calls
* initCause() on it, and then we examine it again, we'll conclude that we've seen the whole
* chain before when it fact we haven't. But this should be rare.)
*/
return false;
}
}
return true;
}
}