blob: 4b908dc2cc2e08e0423f44f9a1c30ed9aca2386b [file] [log] [blame]
/*
* Copyright (C) 2006 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.rejectionPropagatingExecutor;
import com.google.common.annotations.GwtCompatible;
import com.google.common.base.Function;
import com.google.errorprone.annotations.ForOverride;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
/** Implementations of {@code Futures.transform*}. */
@GwtCompatible
abstract class AbstractTransformFuture<I, O, F, T> extends FluentFuture.TrustedFuture<O>
implements Runnable {
static <I, O> ListenableFuture<O> create(
ListenableFuture<I> input,
AsyncFunction<? super I, ? extends O> function,
Executor executor) {
checkNotNull(executor);
AsyncTransformFuture<I, O> output = new AsyncTransformFuture<>(input, function);
input.addListener(output, rejectionPropagatingExecutor(executor, output));
return output;
}
static <I, O> ListenableFuture<O> create(
ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor) {
checkNotNull(function);
TransformFuture<I, O> output = new TransformFuture<>(input, function);
input.addListener(output, rejectionPropagatingExecutor(executor, output));
return output;
}
/*
* In certain circumstances, this field might theoretically not be visible to an afterDone() call
* triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
*/
@NullableDecl ListenableFuture<? extends I> inputFuture;
@NullableDecl F function;
AbstractTransformFuture(ListenableFuture<? extends I> inputFuture, F function) {
this.inputFuture = checkNotNull(inputFuture);
this.function = checkNotNull(function);
}
@Override
public final void run() {
ListenableFuture<? extends I> localInputFuture = inputFuture;
F localFunction = function;
if (isCancelled() | localInputFuture == null | localFunction == null) {
return;
}
inputFuture = null;
if (localInputFuture.isCancelled()) {
@SuppressWarnings("unchecked")
boolean unused =
setFuture((ListenableFuture<O>) localInputFuture); // Respects cancellation cause setting
return;
}
/*
* Any of the setException() calls below can fail if the output Future is cancelled between now
* and then. This means that we're silently swallowing an exception -- maybe even an Error. But
* this is no worse than what FutureTask does in that situation. Additionally, because the
* Future was cancelled, its listeners have been run, so its consumers will not hang.
*
* Contrast this to the situation we have if setResult() throws, a situation described below.
*/
I sourceResult;
try {
sourceResult = getDone(localInputFuture);
} catch (CancellationException e) {
// TODO(user): verify future behavior - unify logic with getFutureValue in AbstractFuture. This
// code should be unreachable with correctly implemented Futures.
// Cancel this future and return.
// At this point, inputFuture is cancelled and outputFuture doesn't exist, so the value of
// mayInterruptIfRunning is irrelevant.
cancel(false);
return;
} catch (ExecutionException e) {
// Set the cause of the exception as this future's exception.
setException(e.getCause());
return;
} catch (RuntimeException e) {
// Bug in inputFuture.get(). Propagate to the output Future so that its consumers don't hang.
setException(e);
return;
} catch (Error e) {
/*
* StackOverflowError, OutOfMemoryError (e.g., from allocating ExecutionException), or
* something. Try to treat it like a RuntimeException. If we overflow the stack again, the
* resulting Error will propagate upward up to the root call to set().
*/
setException(e);
return;
}
T transformResult;
try {
transformResult = doTransform(localFunction, sourceResult);
} catch (Throwable t) {
// This exception is irrelevant in this thread, but useful for the client.
setException(t);
return;
} finally {
function = null;
}
/*
* If set()/setValue() throws an Error, we let it propagate. Why? The most likely Error is a
* StackOverflowError (from deep transform(..., directExecutor()) nesting), and calling
* setException(stackOverflowError) would fail:
*
* - If the stack overflowed before set()/setValue() could even store the result in the output
* Future, then a call setException() would likely also overflow.
*
* - If the stack overflowed after set()/setValue() stored its result, then a call to
* setException() will be a no-op because the Future is already done.
*
* Both scenarios are bad: The output Future might never complete, or, if it does complete, it
* might not run some of its listeners. The likely result is that the app will hang. (And of
* course stack overflows are bad news in general. For example, we may have overflowed in the
* middle of defining a class. If so, that class will never be loadable in this process.) The
* best we can do (since logging may overflow the stack) is to let the error propagate. Because
* it is an Error, it won't be caught and logged by AbstractFuture.executeListener. Instead, it
* can propagate through many layers of AbstractTransformFuture up to the root call to set().
*
* https://github.com/google/guava/issues/2254
*
* Other kinds of Errors are possible:
*
* - OutOfMemoryError from allocations in setFuture(): The calculus here is similar to
* StackOverflowError: We can't reliably call setException(error).
*
* - Any kind of Error from a listener. Even if we could distinguish that case (by exposing some
* extra state from AbstractFuture), our options are limited: A call to setException() would be
* a no-op. We could log, but if that's what we really want, we should modify
* AbstractFuture.executeListener to do so, since that method would have the ability to continue
* to execute other listeners.
*
* What about RuntimeException? If there is a bug in set()/setValue() that produces one, it will
* propagate, too, but only as far as AbstractFuture.executeListener, which will catch and log
* it.
*/
setResult(transformResult);
}
/** Template method for subtypes to actually run the transform. */
@ForOverride
@NullableDecl
abstract T doTransform(F function, @NullableDecl I result) throws Exception;
/** Template method for subtypes to actually set the result. */
@ForOverride
abstract void setResult(@NullableDecl T result);
@Override
protected final void afterDone() {
maybePropagateCancellationTo(inputFuture);
this.inputFuture = null;
this.function = null;
}
@Override
protected String pendingToString() {
ListenableFuture<? extends I> localInputFuture = inputFuture;
F localFunction = function;
String superString = super.pendingToString();
String resultString = "";
if (localInputFuture != null) {
resultString = "inputFuture=[" + localInputFuture + "], ";
}
if (localFunction != null) {
return resultString + "function=[" + localFunction + "]";
} else if (superString != null) {
return resultString + superString;
}
return null;
}
/**
* An {@link AbstractTransformFuture} that delegates to an {@link AsyncFunction} and {@link
* #setFuture(ListenableFuture)}.
*/
private static final class AsyncTransformFuture<I, O>
extends AbstractTransformFuture<
I, O, AsyncFunction<? super I, ? extends O>, ListenableFuture<? extends O>> {
AsyncTransformFuture(
ListenableFuture<? extends I> inputFuture, AsyncFunction<? super I, ? extends O> function) {
super(inputFuture, function);
}
@Override
ListenableFuture<? extends O> doTransform(
AsyncFunction<? super I, ? extends O> function, @NullableDecl I input) throws Exception {
ListenableFuture<? extends O> outputFuture = function.apply(input);
checkNotNull(
outputFuture,
"AsyncFunction.apply returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)? %s",
function);
return outputFuture;
}
@Override
void setResult(ListenableFuture<? extends O> result) {
setFuture(result);
}
}
/**
* An {@link AbstractTransformFuture} that delegates to a {@link Function} and {@link
* #set(Object)}.
*/
private static final class TransformFuture<I, O>
extends AbstractTransformFuture<I, O, Function<? super I, ? extends O>, O> {
TransformFuture(
ListenableFuture<? extends I> inputFuture, Function<? super I, ? extends O> function) {
super(inputFuture, function);
}
@Override
@NullableDecl
O doTransform(Function<? super I, ? extends O> function, @NullableDecl I input) {
return function.apply(input);
}
@Override
void setResult(@NullableDecl O result) {
set(result);
}
}
}