blob: 2e5b5c517486303ee0e43d2fb468b465b9b1818a [file] [log] [blame]
/*
* Copyright (C) 2017 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Functions.constant;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.asList;
import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.logging.Level.FINER;
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
import com.google.common.util.concurrent.Futures.FutureCombiner;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.DoNotMock;
import com.google.j2objc.annotations.RetainedWith;
import java.io.Closeable;
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
/**
* A step in a pipeline of an asynchronous computation. When the last step in the computation is
* complete, some objects captured during the computation are closed.
*
* <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
* asynchronously-computed intermediate value, or else an exception that indicates the failure or
* cancellation of the operation so far. The only way to extract the value or exception from a step
* is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
* "value" of a successful step or the "result" (value or exception) of any step.
*
* <ol>
* <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
* block or a {@link ListenableFuture}.
* <li>Each other step is derived from one or more input steps. At each step, zero or more objects
* can be captured for later closing.
* <li>There is one last step (the root of the tree), from which you can extract the final result
* of the computation. After that result is available (or the computation fails), all objects
* captured by any of the steps in the pipeline are closed.
* </ol>
*
* <h3>Starting a pipeline</h3>
*
* Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
* callable block} that may capture objects for later closing. To start a pipeline from a {@link
* ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
* #from(ListenableFuture)} instead.
*
* <h3>Derived steps</h3>
*
* A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
* ways similar to {@link FluentFuture}s:
*
* <ul>
* <li>by transforming the value from a successful input step,
* <li>by catching the exception from a failed input step, or
* <li>by combining the results of several input steps.
* </ul>
*
* Each derivation can capture the next value or any intermediate objects for later closing.
*
* <p>A step can be the input to at most one derived step. Once you transform its value, catch its
* exception, or combine it with others, you cannot do anything else with it, including declare it
* to be the last step of the pipeline.
*
* <h4>Transforming</h4>
*
* To derive the next step by asynchronously applying a function to an input step's value, call
* {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
* Executor)} on the input step.
*
* <h4>Catching</h4>
*
* To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
* Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
*
* <h4>Combining</h4>
*
* To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
* #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
*
* <h3>Cancelling</h3>
*
* Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
* has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
* successfully cancelled step will immediately start closing all objects captured for later closing
* by it and by its input steps.
*
* <h3>Ending a pipeline</h3>
*
* Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
* close the captured objects automatically or manually.
*
* <h4>Automatically closing</h4>
*
* You can extract a {@link Future} that represents the result of the last step in the pipeline by
* calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured
* by all steps in the pipeline will be closed.
*
* <pre>{@code
* FluentFuture<UserName> userName =
* ClosingFuture.submit(
* closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
* executor)
* .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
* .transform((closer, result) -> result.get("userName"), directExecutor())
* .catching(DBException.class, e -> "no user", directExecutor())
* .finishToFuture();
* }</pre>
*
* In this example, when the {@code userName} {@link Future} is done, the transaction and the query
* result cursor will both be closed, even if the operation is cancelled or fails.
*
* <h4>Manually closing</h4>
*
* If you want to close the captured objects manually, after you've used the final result, call
* {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
* final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
*
* <pre>{@code
* ClosingFuture.submit(
* closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
* executor)
* .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
* .transform((closer, result) -> result.get("userName"), directExecutor())
* .catching(DBException.class, e -> "no user", directExecutor())
* .finishToValueAndCloser(
* valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
*
* // later
* try { // get() will throw if the operation failed or was cancelled.
* UserName userName = userNameValueAndCloser.get();
* // do something with userName
* } finally {
* userNameValueAndCloser.closeAsync();
* }
* }</pre>
*
* In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
* the query result cursor will both be closed, even if the operation is cancelled or fails.
*
* <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
* automatic-closing approach described above is safer.
*
* @param <V> the type of the value of this step
* @since 30.0
*/
// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
@Beta // @Beta for one release.
@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
// TODO(dpb): GWT compatibility.
public final class ClosingFuture<V> {
private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
/**
* An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
* done.
*/
public static final class DeferredCloser {
@RetainedWith private final CloseableList list;
DeferredCloser(CloseableList list) {
this.list = list;
}
/**
* Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
*
* <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
* AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
* Closeable}. (For more about the flavors, see <a
* href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
* build</a>.)
*
* <p>Be careful when targeting an older SDK than you are building against (most commonly when
* building for Android): Ensure that any object you pass implements the interface not just in
* your current SDK version but also at the oldest version you support. For example, <a
* href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
* in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
* {@code Closeable} with a method reference like {@code cursor::close}.
*
* <p>Note that this method is still binary-compatible between flavors because the erasure of
* its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
*
* @param closeable the object to be closed (see notes above)
* @param closingExecutor the object will be closed on this executor
* @return the first argument
*/
@CanIgnoreReturnValue
@NullableDecl
// TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
public <C extends Object & Closeable> C eventuallyClose(
@NullableDecl C closeable, Executor closingExecutor) {
checkNotNull(closingExecutor);
if (closeable != null) {
list.add(closeable, closingExecutor);
}
return closeable;
}
}
/**
* An operation that computes a result.
*
* @param <V> the type of the result
*/
public interface ClosingCallable<V extends Object> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
* not before this method completes), even if this method throws or the pipeline is cancelled.
*/
@NullableDecl
V call(DeferredCloser closer) throws Exception;
}
/**
* A function from an input to a result.
*
* @param <T> the type of the input to the function
* @param <U> the type of the result of the function
*/
public interface ClosingFunction<T extends Object, U extends Object> {
/**
* Applies this function to an input, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
* not before this method completes), even if this method throws or the pipeline is cancelled.
*/
@NullableDecl
U apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
}
/**
* A function from an input to a {@link ClosingFuture} of a result.
*
* @param <T> the type of the input to the function
* @param <U> the type of the result of the function
*/
public interface AsyncClosingFunction<T extends Object, U extends Object> {
/**
* Applies this function to an input, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
* not before this method completes), even if this method throws or the pipeline is cancelled.
*/
ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
}
/**
* An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
* allows the user to close all the closeable objects that were captured during it for later
* closing.
*
* <p>The asynchronous operation will have completed before this object is created.
*
* @param <V> the type of the value of a successful operation
* @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
*/
public static final class ValueAndCloser<V> {
private final ClosingFuture<? extends V> closingFuture;
ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
this.closingFuture = checkNotNull(closingFuture);
}
/**
* Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
* {@link Future#get()} would.
*
* <p>Because the asynchronous operation has already completed, this method is synchronous and
* returns immediately.
*
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
*/
@NullableDecl
public V get() throws ExecutionException {
return getDone(closingFuture.future);
}
/**
* Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
* operation on the {@link Executor}s specified by calls to {@link
* DeferredCloser#eventuallyClose(Closeable, Executor)}.
*
* <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
* closed synchronously.
*
* <p>Idempotent: objects will be closed at most once.
*/
public void closeAsync() {
closingFuture.close();
}
}
/**
* Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
* ClosingFuture} pipeline.
*
* @param <V> the type of the final value of a successful pipeline
* @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
*/
public interface ValueAndCloserConsumer<V> {
/** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
void accept(ValueAndCloser<V> valueAndCloser);
}
/**
* Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
*
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
* execution
*/
public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
return new ClosingFuture<>(callable, executor);
}
// TODO(dpb, cpovirk): Do we need submitAsync?
/**
* Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
*
* <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
* implements {@link Closeable}. In order to start a pipeline with a value that will be closed
* when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
*/
public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
return new ClosingFuture<V>(future);
}
/**
* Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
*
* <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
* the pipeline is done, even if the pipeline is canceled or fails.
*
* <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
* value in order to close it.
*
* @param future the future to create the {@code ClosingFuture} from. For discussion of the
* future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
* Executor)}.
* @param closingExecutor the future's result will be closed on this executor
* @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
* underlying value may never be closed if the {@link Future} is canceled after its operation
* begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
* including those that pass them to this method, with {@link #submit(ClosingCallable,
* Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
* {@link ListenableFuture} that doesn't create values that should be closed, use {@link
* ClosingFuture#from}.
*/
@Deprecated
// TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing(
ListenableFuture<C> future, final Executor closingExecutor) {
checkNotNull(closingExecutor);
final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
Futures.addCallback(
future,
new FutureCallback<Closeable>() {
@Override
public void onSuccess(@NullableDecl Closeable result) {
closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
}
@Override
public void onFailure(Throwable t) {}
},
directExecutor());
return closingFuture;
}
/**
* Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
return new Combiner(false, futures);
}
/**
* Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the arguments, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static Combiner whenAllComplete(
ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
return whenAllComplete(asList(future1, moreFutures));
}
/**
* Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
* all succeed. If any fail, the resulting pipeline will fail.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
return new Combiner(true, futures);
}
/**
* Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
* they all succeed. If any fail, the resulting pipeline will fail.
*
* <p>Calling this method allows you to use lambdas or method references typed with the types of
* the input {@link ClosingFuture}s.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the arguments, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
return new Combiner2<>(future1, future2);
}
/**
* Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
* they all succeed. If any fail, the resulting pipeline will fail.
*
* <p>Calling this method allows you to use lambdas or method references typed with the types of
* the input {@link ClosingFuture}s.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the arguments, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
return new Combiner3<>(future1, future2, future3);
}
/**
* Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
* they all succeed. If any fail, the resulting pipeline will fail.
*
* <p>Calling this method allows you to use lambdas or method references typed with the types of
* the input {@link ClosingFuture}s.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the arguments, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
ClosingFuture<V1> future1,
ClosingFuture<V2> future2,
ClosingFuture<V3> future3,
ClosingFuture<V4> future4) {
return new Combiner4<>(future1, future2, future3, future4);
}
/**
* Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
* they all succeed. If any fail, the resulting pipeline will fail.
*
* <p>Calling this method allows you to use lambdas or method references typed with the types of
* the input {@link ClosingFuture}s.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the arguments, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
ClosingFuture<V1> future1,
ClosingFuture<V2> future2,
ClosingFuture<V3> future3,
ClosingFuture<V4> future4,
ClosingFuture<V5> future5) {
return new Combiner5<>(future1, future2, future3, future4, future5);
}
/**
* Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
* all succeed. If any fail, the resulting pipeline will fail.
*
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
* the arguments, or if any has already been {@linkplain #finishToFuture() finished}
*/
public static Combiner whenAllSucceed(
ClosingFuture<?> future1,
ClosingFuture<?> future2,
ClosingFuture<?> future3,
ClosingFuture<?> future4,
ClosingFuture<?> future5,
ClosingFuture<?> future6,
ClosingFuture<?>... moreFutures) {
return whenAllSucceed(
FluentIterable.of(future1, future2, future3, future4, future5, future6)
.append(moreFutures));
}
private final AtomicReference<State> state = new AtomicReference<>(OPEN);
private final CloseableList closeables = new CloseableList();
private final FluentFuture<V> future;
private ClosingFuture(ListenableFuture<V> future) {
this.future = FluentFuture.from(future);
}
private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
checkNotNull(callable);
TrustedListenableFutureTask<V> task =
TrustedListenableFutureTask.create(
new Callable<V>() {
@Override
public V call() throws Exception {
return callable.call(closeables.closer);
}
@Override
public String toString() {
return callable.toString();
}
});
executor.execute(task);
this.future = task;
}
/**
* Returns a future that finishes when this step does. Calling {@code get()} on the returned
* future returns {@code null} if the step is successful or throws the same exception that would
* be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
* cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
*
* <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
* to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
* a derivation method <i>on the same instance</i>. This is important because calling {@code
* statusFuture} alone does not provide a way to close the pipeline.
*/
public ListenableFuture<?> statusFuture() {
return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
* to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
* when the pipeline is done.
*
* <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
* ClosingFuture} will be equivalent to this one.
*
* <p>If the function throws an exception, that exception is used as the result of the derived
* {@code ClosingFuture}.
*
* <p>Example usage:
*
* <pre>{@code
* ClosingFuture<List<Row>> rowsFuture =
* queryFuture.transform((closer, result) -> result.getRows(), executor);
* }</pre>
*
* <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
* about heavyweight listeners are also applicable to heavyweight functions passed to this method.
*
* <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
* #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
* this {@code ClosingFuture}.
*
* @param function transforms the value of this step to the value of the derived step
* @param executor executor to run the function in
* @return the derived step
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
* one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
* finished}
*/
public <U> ClosingFuture<U> transform(
final ClosingFunction<? super V, U> function, Executor executor) {
checkNotNull(function);
AsyncFunction<V, U> applyFunction =
new AsyncFunction<V, U>() {
@Override
public ListenableFuture<U> apply(V input) throws Exception {
return closeables.applyClosingFunction(function, input);
}
@Override
public String toString() {
return function.toString();
}
};
// TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
return derive(future.transformAsync(applyFunction, executor));
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
* that returns a {@code ClosingFuture} to its value. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
* captured by the returned {@link ClosingFuture}).
*
* <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
* returned by the function.
*
* <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
* ClosingFuture} will be equivalent to this one.
*
* <p>If the function throws an exception, that exception is used as the result of the derived
* {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
* ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
* closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
* Executor)} instead, with a function that returns the next value directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
* for every closeable object this step creates in order to capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* <li>In case this step doesn't create new closeables, you can adapt an API that returns a
* {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
* {@link #withoutCloser(AsyncFunction)}
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* // Result.getRowsClosingFuture() returns a ClosingFuture.
* ClosingFuture<List<Row>> rowsFuture =
* queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
*
* // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
* // number of written rows. openOutputFile() returns a FileOutputStream (which implements
* // Closeable).
* ClosingFuture<Integer> rowsFuture2 =
* queryFuture.transformAsync(
* (closer, result) -> {
* FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
* return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
* },
* executor);
*
* // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
* ClosingFuture<List<Row>> rowsFuture3 =
* queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
*
* }</pre>
*
* <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
* about heavyweight listeners are also applicable to heavyweight functions passed to this method.
* (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
* {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
* responsible for completing the returned {@code ClosingFuture}.)
*
* <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
* #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
* this {@code ClosingFuture}.
*
* @param function transforms the value of this step to a {@code ClosingFuture} with the value of
* the derived step
* @param executor executor to run the function in
* @return the derived step
* @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
* one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
* finished}
*/
public <U> ClosingFuture<U> transformAsync(
final AsyncClosingFunction<? super V, U> function, Executor executor) {
checkNotNull(function);
AsyncFunction<V, U> applyFunction =
new AsyncFunction<V, U>() {
@Override
public ListenableFuture<U> apply(V input) throws Exception {
return closeables.applyAsyncClosingFunction(function, input);
}
@Override
public String toString() {
return function.toString();
}
};
return derive(future.transformAsync(applyFunction, executor));
}
/**
* Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
* ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
* {@link ListenableFuture}.
*
* <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
* Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
* meets these conditions:
*
* <ul>
* <li>It does not need to capture any {@link Closeable} objects by calling {@link
* DeferredCloser#eventuallyClose(Closeable, Executor)}.
* <li>It returns a {@link ListenableFuture}.
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* // Result.getRowsFuture() returns a ListenableFuture.
* ClosingFuture<List<Row>> rowsFuture =
* queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
* }</pre>
*
* @param function transforms the value of a {@code ClosingFuture} step to a {@link
* ListenableFuture} with the value of a derived step
*/
public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
final AsyncFunction<V, U> function) {
checkNotNull(function);
return new AsyncClosingFunction<V, U>() {
@Override
public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
return ClosingFuture.from(function.apply(input));
}
};
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
* to its exception if it is an instance of a given exception type. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done.
*
* <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
* function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
* one.
*
* <p>If the function throws an exception, that exception is used as the result of the derived
* {@code ClosingFuture}.
*
* <p>Example usage:
*
* <pre>{@code
* ClosingFuture<QueryResult> queryFuture =
* queryFuture.catching(
* QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
* }</pre>
*
* <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
* about heavyweight listeners are also applicable to heavyweight functions passed to this method.
*
* <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
* #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
* this {@code ClosingFuture}.
*
* @param exceptionType the exception type that triggers use of {@code fallback}. The exception
* type is matched against this step's exception. "This step's exception" means the cause of
* the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
* underlying this step or, if {@code get()} throws a different kind of exception, that
* exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
* prefer more specific types, avoiding {@code Throwable.class} in particular.
* @param fallback the function to be called if this step fails with the expected exception type.
* The function's argument is this step's exception. "This step's exception" means the cause
* of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
* underlying this step or, if {@code get()} throws a different kind of exception, that
* exception itself.
* @param executor the executor that runs {@code fallback} if the input fails
*/
public <X extends Throwable> ClosingFuture<V> catching(
Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
return catchingMoreGeneric(exceptionType, fallback, executor);
}
// Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
checkNotNull(fallback);
AsyncFunction<X, W> applyFallback =
new AsyncFunction<X, W>() {
@Override
public ListenableFuture<W> apply(X exception) throws Exception {
return closeables.applyClosingFunction(fallback, exception);
}
@Override
public String toString() {
return fallback.toString();
}
};
// TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
return derive(future.catchingAsync(exceptionType, applyFallback, executor));
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
* that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
* type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
* pipeline is done (other than those captured by the returned {@link ClosingFuture}).
*
* <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
* ClosingFuture} will be equivalent to the one returned by the function.
*
* <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
* function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
* one.
*
* <p>If the function throws an exception, that exception is used as the result of the derived
* {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
* ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
* closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
* ClosingFunction, Executor)} instead, with a function that returns the next value
* directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
* for every closeable object this step creates in order to capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* <li>In case this step doesn't create new closeables, you can adapt an API that returns a
* {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
* {@link #withoutCloser(AsyncFunction)}
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* // Fall back to a secondary input stream in case of IOException.
* ClosingFuture<InputStream> inputFuture =
* firstInputFuture.catchingAsync(
* IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
* }
* }</pre>
*
* <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
* about heavyweight listeners are also applicable to heavyweight functions passed to this method.
* (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
* {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
* responsible for completing the returned {@code ClosingFuture}.)
*
* <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
* #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
* this {@code ClosingFuture}.
*
* @param exceptionType the exception type that triggers use of {@code fallback}. The exception
* type is matched against this step's exception. "This step's exception" means the cause of
* the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
* underlying this step or, if {@code get()} throws a different kind of exception, that
* exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
* prefer more specific types, avoiding {@code Throwable.class} in particular.
* @param fallback the function to be called if this step fails with the expected exception type.
* The function's argument is this step's exception. "This step's exception" means the cause
* of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
* underlying this step or, if {@code get()} throws a different kind of exception, that
* exception itself.
* @param executor the executor that runs {@code fallback} if the input fails
*/
// TODO(dpb): Should this do something special if the function throws CancellationException or
// ExecutionException?
public <X extends Throwable> ClosingFuture<V> catchingAsync(
Class<X> exceptionType,
AsyncClosingFunction<? super X, ? extends V> fallback,
Executor executor) {
return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
}
// Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
Class<X> exceptionType,
final AsyncClosingFunction<? super X, W> fallback,
Executor executor) {
checkNotNull(fallback);
AsyncFunction<X, W> asyncFunction =
new AsyncFunction<X, W>() {
@Override
public ListenableFuture<W> apply(X exception) throws Exception {
return closeables.applyAsyncClosingFunction(fallback, exception);
}
@Override
public String toString() {
return fallback.toString();
}
};
return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
}
/**
* Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned
* {@link Future} is done, all objects captured for closing during the pipeline's computation will
* be closed.
*
* <p>After calling this method, you may not call {@link
* #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
* derivation method on this {@code ClosingFuture}.
*
* @return a {@link Future} that represents the final value or exception of the pipeline
*/
public FluentFuture<V> finishToFuture() {
if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
logger.log(FINER, "will close {0}", this);
future.addListener(
new Runnable() {
@Override
public void run() {
checkAndUpdateState(WILL_CLOSE, CLOSING);
close();
checkAndUpdateState(CLOSING, CLOSED);
}
},
directExecutor());
} else {
switch (state.get()) {
case SUBSUMED:
throw new IllegalStateException(
"Cannot call finishToFuture() after deriving another step");
case WILL_CREATE_VALUE_AND_CLOSER:
throw new IllegalStateException(
"Cannot call finishToFuture() after calling finishToValueAndCloser()");
case WILL_CLOSE:
case CLOSING:
case CLOSED:
throw new IllegalStateException("Cannot call finishToFuture() twice");
case OPEN:
throw new AssertionError();
}
}
return future;
}
/**
* Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
* {@code receiver} will be called with an object that contains the result of the operation. The
* receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
*
* <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
* any other derivation method on this {@code ClosingFuture}.
*
* @param consumer a callback whose method will be called (using {@code executor}) when this
* operation is done
*/
public void finishToValueAndCloser(
final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
checkNotNull(consumer);
if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
switch (state.get()) {
case SUBSUMED:
throw new IllegalStateException(
"Cannot call finishToValueAndCloser() after deriving another step");
case WILL_CLOSE:
case CLOSING:
case CLOSED:
throw new IllegalStateException(
"Cannot call finishToValueAndCloser() after calling finishToFuture()");
case WILL_CREATE_VALUE_AND_CLOSER:
throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
case OPEN:
break;
}
throw new AssertionError(state);
}
future.addListener(
new Runnable() {
@Override
public void run() {
provideValueAndCloser(consumer, ClosingFuture.this);
}
},
executor);
}
private static <C, V extends C> void provideValueAndCloser(
ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
consumer.accept(new ValueAndCloser<C>(closingFuture));
}
/**
* Attempts to cancel execution of this step. This attempt will fail if the step has already
* completed, has already been cancelled, or could not be cancelled for some other reason. If
* successful, and this step has not started when {@code cancel} is called, this step should never
* run.
*
* <p>If successful, causes the objects captured by this step (if already started) and its input
* step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
* specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this task should be
* interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
* cancelled regardless
* @return {@code false} if the step could not be cancelled, typically because it has already
* completed normally; {@code true} otherwise
*/
@CanIgnoreReturnValue
public boolean cancel(boolean mayInterruptIfRunning) {
logger.log(FINER, "cancelling {0}", this);
boolean cancelled = future.cancel(mayInterruptIfRunning);
if (cancelled) {
close();
}
return cancelled;
}
private void close() {
logger.log(FINER, "closing {0}", this);
closeables.close();
}
private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
ClosingFuture<U> derived = new ClosingFuture<>(future);
becomeSubsumedInto(derived.closeables);
return derived;
}
private void becomeSubsumedInto(CloseableList otherCloseables) {
checkAndUpdateState(OPEN, SUBSUMED);
otherCloseables.add(closeables, directExecutor());
}
/**
* An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
* #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
*
* <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
*/
public static final class Peeker {
private final ImmutableList<ClosingFuture<?>> futures;
private volatile boolean beingCalled;
private Peeker(ImmutableList<ClosingFuture<?>> futures) {
this.futures = checkNotNull(futures);
}
/**
* Returns the value of {@code closingFuture}.
*
* @throws ExecutionException if {@code closingFuture} is a failed step
* @throws CancellationException if the {@code closingFuture}'s future was cancelled
* @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
* {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
* @throws IllegalStateException if called outside of a call to {@link
* CombiningCallable#call(DeferredCloser, Peeker)} or {@link
* AsyncCombiningCallable#call(DeferredCloser, Peeker)}
*/
@NullableDecl
public final <D extends Object> D getDone(ClosingFuture<D> closingFuture)
throws ExecutionException {
checkState(beingCalled);
checkArgument(futures.contains(closingFuture));
return Futures.getDone(closingFuture.future);
}
@NullableDecl
private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables)
throws Exception {
beingCalled = true;
CloseableList newCloseables = new CloseableList();
try {
return combiner.call(newCloseables.closer, this);
} finally {
closeables.add(newCloseables, directExecutor());
beingCalled = false;
}
}
private <V extends Object> FluentFuture<V> callAsync(
AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
beingCalled = true;
CloseableList newCloseables = new CloseableList();
try {
ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
closingFuture.becomeSubsumedInto(closeables);
return closingFuture.future;
} finally {
closeables.add(newCloseables, directExecutor());
beingCalled = false;
}
}
}
/**
* A builder of a {@link ClosingFuture} step that is derived from more than one input step.
*
* <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
* instantiate this class.
*
* <p>Example:
*
* <pre>{@code
* final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
* final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
* ListenableFuture<Integer> numberOfDifferentLines =
* ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
* .call(
* (closer, peeker) -> {
* BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
* BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
* return countDifferentLines(file1Reader, file2Reader);
* },
* executor)
* .closing(executor);
* }</pre>
*/
// TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
@com.google.errorprone.annotations.DoNotMock(
"Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
public static class Combiner {
private final CloseableList closeables = new CloseableList();
/**
* An operation that returns a result and may throw an exception.
*
* @param <V> the type of the result
*/
public interface CombiningCallable<V extends Object> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*
* @param peeker used to get the value of any of the input futures
*/
@NullableDecl
V call(DeferredCloser closer, Peeker peeker) throws Exception;
}
/**
* An operation that returns a {@link ClosingFuture} result and may throw an exception.
*
* @param <V> the type of the result
*/
public interface AsyncCombiningCallable<V extends Object> {
/**
* Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*
* @param peeker used to get the value of any of the input futures
*/
ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
}
private final boolean allMustSucceed;
protected final ImmutableList<ClosingFuture<?>> inputs;
private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
this.allMustSucceed = allMustSucceed;
this.inputs = ImmutableList.copyOf(inputs);
for (ClosingFuture<?> input : inputs) {
input.becomeSubsumedInto(closeables);
}
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* combining function to their values. The function can use a {@link DeferredCloser} to capture
* objects to be closed when the pipeline is done.
*
* <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
* fail, so will the returned step.
*
* <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
* cancelled.
*
* <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
* {@code ExecutionException} will be extracted and used as the failure of the derived step.
*/
public <V> ClosingFuture<V> call(
final CombiningCallable<V> combiningCallable, Executor executor) {
Callable<V> callable =
new Callable<V>() {
@Override
public V call() throws Exception {
return new Peeker(inputs).call(combiningCallable, closeables);
}
@Override
public String toString() {
return combiningCallable.toString();
}
};
ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
derived.closeables.add(closeables, directExecutor());
return derived;
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* {@code ClosingFuture}-returning function to their values. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
* captured by the returned {@link ClosingFuture}).
*
* <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
* fail, so will the returned step.
*
* <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
* cancelled.
*
* <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
* {@code ExecutionException} will be extracted and used as the failure of the derived step.
*
* <p>If the combiningCallable throws any other exception, it will be used as the failure of the
* derived step.
*
* <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
* then none of the closeable objects in that {@code ClosingFuture} will be closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
* Executor)} instead, with a function that returns the next value directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} for every closeable object this step creates in order to
* capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* </ul>
*
* <p>The same warnings about doing heavyweight operations within {@link
* ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
*/
public <V> ClosingFuture<V> callAsync(
final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
AsyncCallable<V> asyncCallable =
new AsyncCallable<V>() {
@Override
public ListenableFuture<V> call() throws Exception {
return new Peeker(inputs).callAsync(combiningCallable, closeables);
}
@Override
public String toString() {
return combiningCallable.toString();
}
};
ClosingFuture<V> derived =
new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
derived.closeables.add(closeables, directExecutor());
return derived;
}
private FutureCombiner<Object> futureCombiner() {
return allMustSucceed
? Futures.whenAllSucceed(inputFutures())
: Futures.whenAllComplete(inputFutures());
}
private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
new Function<ClosingFuture<?>, FluentFuture<?>>() {
@Override
public FluentFuture<?> apply(ClosingFuture<?> future) {
return future.future;
}
};
private ImmutableList<FluentFuture<?>> inputFutures() {
return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
}
}
/**
* A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
* ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
* combination.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
*/
public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner {
/**
* A function that returns a value when applied to the values of the two futures passed to
* {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <U> the type returned by the function
*/
public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
/**
* Applies this function to two inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
@NullableDecl
U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2)
throws Exception;
}
/**
* A function that returns a {@link ClosingFuture} when applied to the values of the two futures
* passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <U> the type returned by the function
*/
public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
/**
* Applies this function to two inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
ClosingFuture<U> apply(
DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception;
}
private final ClosingFuture<V1> future1;
private final ClosingFuture<V2> future2;
private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
super(true, ImmutableList.of(future1, future2));
this.future1 = future1;
this.future2 = future2;
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* combining function to their values. The function can use a {@link DeferredCloser} to capture
* objects to be closed when the pipeline is done.
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
* any of the inputs fail, so will the returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*/
public <U extends Object> ClosingFuture<U> call(
final ClosingFunction2<V1, V2, U> function, Executor executor) {
return call(
new CombiningCallable<U>() {
@Override
@NullableDecl
public U call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* {@code ClosingFuture}-returning function to their values. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
* captured by the returned {@link ClosingFuture}).
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
* any of the inputs fail, so will the returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*
* <p>If the function throws any other exception, it will be used as the failure of the derived
* step.
*
* <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
* the closeable objects in that {@code ClosingFuture} will be closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
* Executor)} instead, with a function that returns the next value directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} for every closeable object this step creates in order to
* capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* </ul>
*
* <p>The same warnings about doing heavyweight operations within {@link
* ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
*/
public <U extends Object> ClosingFuture<U> callAsync(
final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
return callAsync(
new AsyncCombiningCallable<U>() {
@Override
public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
}
/**
* A generic {@link Combiner} that lets you use a lambda or method reference to combine three
* {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture)} to start this combination.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
*/
public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object>
extends Combiner {
/**
* A function that returns a value when applied to the values of the three futures passed to
* {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <U> the type returned by the function
*/
public interface ClosingFunction3<
V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
/**
* Applies this function to three inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
@NullableDecl
U apply(
DeferredCloser closer,
@NullableDecl V1 value1,
@NullableDecl V2 value2,
@NullableDecl V3 v3)
throws Exception;
}
/**
* A function that returns a {@link ClosingFuture} when applied to the values of the three
* futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <U> the type returned by the function
*/
public interface AsyncClosingFunction3<
V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
/**
* Applies this function to three inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
ClosingFuture<U> apply(
DeferredCloser closer,
@NullableDecl V1 value1,
@NullableDecl V2 value2,
@NullableDecl V3 value3)
throws Exception;
}
private final ClosingFuture<V1> future1;
private final ClosingFuture<V2> future2;
private final ClosingFuture<V3> future3;
private Combiner3(
ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
super(true, ImmutableList.of(future1, future2, future3));
this.future1 = future1;
this.future2 = future2;
this.future3 = future3;
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* combining function to their values. The function can use a {@link DeferredCloser} to capture
* objects to be closed when the pipeline is done.
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture)} and any of the inputs fail, so will the returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*/
public <U extends Object> ClosingFuture<U> call(
final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
return call(
new CombiningCallable<U>() {
@Override
@NullableDecl
public U call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(
closer,
peeker.getDone(future1),
peeker.getDone(future2),
peeker.getDone(future3));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* {@code ClosingFuture}-returning function to their values. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
* captured by the returned {@link ClosingFuture}).
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture)} and any of the inputs fail, so will the returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*
* <p>If the function throws any other exception, it will be used as the failure of the derived
* step.
*
* <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
* the closeable objects in that {@code ClosingFuture} will be closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
* Executor)} instead, with a function that returns the next value directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} for every closeable object this step creates in order to
* capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* </ul>
*
* <p>The same warnings about doing heavyweight operations within {@link
* ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
*/
public <U extends Object> ClosingFuture<U> callAsync(
final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
return callAsync(
new AsyncCombiningCallable<U>() {
@Override
public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(
closer,
peeker.getDone(future1),
peeker.getDone(future2),
peeker.getDone(future3));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
}
/**
* A generic {@link Combiner} that lets you use a lambda or method reference to combine four
* {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
* ClosingFuture)} to start this combination.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <V4> the type returned by the fourth future
*/
public static final class Combiner4<
V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object>
extends Combiner {
/**
* A function that returns a value when applied to the values of the four futures passed to
* {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <V4> the type returned by the fourth future
* @param <U> the type returned by the function
*/
public interface ClosingFunction4<
V1 extends Object,
V2 extends Object,
V3 extends Object,
V4 extends Object,
U extends Object> {
/**
* Applies this function to four inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
@NullableDecl
U apply(
DeferredCloser closer,
@NullableDecl V1 value1,
@NullableDecl V2 value2,
@NullableDecl V3 value3,
@NullableDecl V4 value4)
throws Exception;
}
/**
* A function that returns a {@link ClosingFuture} when applied to the values of the four
* futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
* ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <V4> the type returned by the fourth future
* @param <U> the type returned by the function
*/
public interface AsyncClosingFunction4<
V1 extends Object,
V2 extends Object,
V3 extends Object,
V4 extends Object,
U extends Object> {
/**
* Applies this function to four inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
ClosingFuture<U> apply(
DeferredCloser closer,
@NullableDecl V1 value1,
@NullableDecl V2 value2,
@NullableDecl V3 value3,
@NullableDecl V4 value4)
throws Exception;
}
private final ClosingFuture<V1> future1;
private final ClosingFuture<V2> future2;
private final ClosingFuture<V3> future3;
private final ClosingFuture<V4> future4;
private Combiner4(
ClosingFuture<V1> future1,
ClosingFuture<V2> future2,
ClosingFuture<V3> future3,
ClosingFuture<V4> future4) {
super(true, ImmutableList.of(future1, future2, future3, future4));
this.future1 = future1;
this.future2 = future2;
this.future3 = future3;
this.future4 = future4;
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* combining function to their values. The function can use a {@link DeferredCloser} to capture
* objects to be closed when the pipeline is done.
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*/
public <U extends Object> ClosingFuture<U> call(
final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
return call(
new CombiningCallable<U>() {
@Override
@NullableDecl
public U call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(
closer,
peeker.getDone(future1),
peeker.getDone(future2),
peeker.getDone(future3),
peeker.getDone(future4));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* {@code ClosingFuture}-returning function to their values. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
* captured by the returned {@link ClosingFuture}).
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*
* <p>If the function throws any other exception, it will be used as the failure of the derived
* step.
*
* <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
* the closeable objects in that {@code ClosingFuture} will be closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
* Executor)} instead, with a function that returns the next value directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} for every closeable object this step creates in order to
* capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* </ul>
*
* <p>The same warnings about doing heavyweight operations within {@link
* ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
*/
public <U extends Object> ClosingFuture<U> callAsync(
final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
return callAsync(
new AsyncCombiningCallable<U>() {
@Override
public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(
closer,
peeker.getDone(future1),
peeker.getDone(future2),
peeker.getDone(future3),
peeker.getDone(future4));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
}
/**
* A generic {@link Combiner} that lets you use a lambda or method reference to combine five
* {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
* ClosingFuture, ClosingFuture)} to start this combination.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <V4> the type returned by the fourth future
* @param <V5> the type returned by the fifth future
*/
public static final class Combiner5<
V1 extends Object,
V2 extends Object,
V3 extends Object,
V4 extends Object,
V5 extends Object>
extends Combiner {
/**
* A function that returns a value when applied to the values of the five futures passed to
* {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
* ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <V4> the type returned by the fourth future
* @param <V5> the type returned by the fifth future
* @param <U> the type returned by the function
*/
public interface ClosingFunction5<
V1 extends Object,
V2 extends Object,
V3 extends Object,
V4 extends Object,
V5 extends Object,
U extends Object> {
/**
* Applies this function to five inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
@NullableDecl
U apply(
DeferredCloser closer,
@NullableDecl V1 value1,
@NullableDecl V2 value2,
@NullableDecl V3 value3,
@NullableDecl V4 value4,
@NullableDecl V5 value5)
throws Exception;
}
/**
* A function that returns a {@link ClosingFuture} when applied to the values of the five
* futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
* ClosingFuture, ClosingFuture)}.
*
* @param <V1> the type returned by the first future
* @param <V2> the type returned by the second future
* @param <V3> the type returned by the third future
* @param <V4> the type returned by the fourth future
* @param <V5> the type returned by the fifth future
* @param <U> the type returned by the function
*/
public interface AsyncClosingFunction5<
V1 extends Object,
V2 extends Object,
V3 extends Object,
V4 extends Object,
V5 extends Object,
U extends Object> {
/**
* Applies this function to five inputs, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
* Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
* is done (but not before this method completes), even if this method throws or the pipeline
* is cancelled.
*/
ClosingFuture<U> apply(
DeferredCloser closer,
@NullableDecl V1 value1,
@NullableDecl V2 value2,
@NullableDecl V3 value3,
@NullableDecl V4 value4,
@NullableDecl V5 value5)
throws Exception;
}
private final ClosingFuture<V1> future1;
private final ClosingFuture<V2> future2;
private final ClosingFuture<V3> future3;
private final ClosingFuture<V4> future4;
private final ClosingFuture<V5> future5;
private Combiner5(
ClosingFuture<V1> future1,
ClosingFuture<V2> future2,
ClosingFuture<V3> future3,
ClosingFuture<V4> future4,
ClosingFuture<V5> future5) {
super(true, ImmutableList.of(future1, future2, future3, future4, future5));
this.future1 = future1;
this.future2 = future2;
this.future3 = future3;
this.future4 = future4;
this.future5 = future5;
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* combining function to their values. The function can use a {@link DeferredCloser} to capture
* objects to be closed when the pipeline is done.
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
* returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*/
public <U extends Object> ClosingFuture<U> call(
final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
return call(
new CombiningCallable<U>() {
@Override
@NullableDecl
public U call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(
closer,
peeker.getDone(future1),
peeker.getDone(future2),
peeker.getDone(future3),
peeker.getDone(future4),
peeker.getDone(future5));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
/**
* Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
* {@code ClosingFuture}-returning function to their values. The function can use a {@link
* DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
* captured by the returned {@link ClosingFuture}).
*
* <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
* ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
* returned step.
*
* <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
*
* <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and used as the failure of the derived step.
*
* <p>If the function throws any other exception, it will be used as the failure of the derived
* step.
*
* <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
* the closeable objects in that {@code ClosingFuture} will be closed.
*
* <p>Usage guidelines for this method:
*
* <ul>
* <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
* {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
* Executor)} instead, with a function that returns the next value directly.
* <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} for every closeable object this step creates in order to
* capture it for later closing.
* <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
* ClosingFuture} call {@link #from(ListenableFuture)}.
* </ul>
*
* <p>The same warnings about doing heavyweight operations within {@link
* ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
*/
public <U extends Object> ClosingFuture<U> callAsync(
final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
return callAsync(
new AsyncCombiningCallable<U>() {
@Override
public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
return function.apply(
closer,
peeker.getDone(future1),
peeker.getDone(future2),
peeker.getDone(future3),
peeker.getDone(future4),
peeker.getDone(future5));
}
@Override
public String toString() {
return function.toString();
}
},
executor);
}
}
@Override
public String toString() {
// TODO(dpb): Better toString, in the style of Futures.transform etc.
return toStringHelper(this).add("state", state.get()).addValue(future).toString();
}
@Override
protected void finalize() {
if (state.get().equals(OPEN)) {
logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
FluentFuture<V> unused = finishToFuture();
}
}
private static void closeQuietly(final Closeable closeable, Executor executor) {
if (closeable == null) {
return;
}
try {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
closeable.close();
} catch (IOException | RuntimeException e) {
logger.log(WARNING, "thrown by close()", e);
}
}
});
} catch (RejectedExecutionException e) {
if (logger.isLoggable(WARNING)) {
logger.log(
WARNING, String.format("while submitting close to %s; will close inline", executor), e);
}
closeQuietly(closeable, directExecutor());
}
}
private void checkAndUpdateState(State oldState, State newState) {
checkState(
compareAndUpdateState(oldState, newState),
"Expected state to be %s, but it was %s",
oldState,
newState);
}
private boolean compareAndUpdateState(State oldState, State newState) {
return state.compareAndSet(oldState, newState);
}
// TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
private static final class CloseableList extends IdentityHashMap<Closeable, Executor>
implements Closeable {
private final DeferredCloser closer = new DeferredCloser(this);
private volatile boolean closed;
private volatile CountDownLatch whenClosed;
<V, U> ListenableFuture<U> applyClosingFunction(
ClosingFunction<? super V, U> transformation, V input) throws Exception {
// TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
CloseableList newCloseables = new CloseableList();
try {
return immediateFuture(transformation.apply(newCloseables.closer, input));
} finally {
add(newCloseables, directExecutor());
}
}
<V, U> FluentFuture<U> applyAsyncClosingFunction(
AsyncClosingFunction<V, U> transformation, V input) throws Exception {
// TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
CloseableList newCloseables = new CloseableList();
try {
ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
closingFuture.becomeSubsumedInto(newCloseables);
return closingFuture.future;
} finally {
add(newCloseables, directExecutor());
}
}
@Override
public void close() {
if (closed) {
return;
}
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
for (Map.Entry<Closeable, Executor> entry : entrySet()) {
closeQuietly(entry.getKey(), entry.getValue());
}
clear();
if (whenClosed != null) {
whenClosed.countDown();
}
}
void add(@NullableDecl Closeable closeable, Executor executor) {
checkNotNull(executor);
if (closeable == null) {
return;
}
synchronized (this) {
if (!closed) {
put(closeable, executor);
return;
}
}
closeQuietly(closeable, executor);
}
/**
* Returns a latch that reaches zero when this objects' deferred closeables have been closed.
*/
CountDownLatch whenClosedCountDown() {
if (closed) {
return new CountDownLatch(0);
}
synchronized (this) {
if (closed) {
return new CountDownLatch(0);
}
checkState(whenClosed == null);
return whenClosed = new CountDownLatch(1);
}
}
}
/**
* Returns an object that can be used to wait until this objects' deferred closeables have all had
* {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
*/
@VisibleForTesting
CountDownLatch whenClosedCountDown() {
return closeables.whenClosedCountDown();
}
/** The state of a {@link CloseableList}. */
enum State {
/** The {@link CloseableList} has not been subsumed or closed. */
OPEN,
/**
* The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
* into any other.
*/
SUBSUMED,
/**
* Some {@link ListenableFuture} has a callback attached that will close the {@link
* CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
*/
WILL_CLOSE,
/**
* The callback that closes the {@link CloseableList} is running, but it has not completed. The
* {@link CloseableList} may not be subsumed.
*/
CLOSING,
/** The {@link CloseableList} has been closed. It may not be further subsumed. */
CLOSED,
/**
* {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
* called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
*/
WILL_CREATE_VALUE_AND_CLOSER,
}
}