blob: 88929b6cedbbefbdb8432d8ab9a1b76f41b1dfa8 [file] [log] [blame]
/*
* Copyright (C) 2018 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.annotations.Beta;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
/**
* Serializes execution of a set of operations. This class guarantees that a submitted callable will
* not be called before previously submitted callables (and any {@code Future}s returned from them)
* have completed.
*
* <p>This class implements a superset of the behavior of {@link
* MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and
* don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead.
*
* @since 26.0
*/
@Beta
public final class ExecutionSequencer {
private ExecutionSequencer() {}
/** Creates a new instance. */
public static ExecutionSequencer create() {
return new ExecutionSequencer();
}
enum RunningState {
NOT_RUN,
CANCELLED,
STARTED,
}
/** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
private final AtomicReference<ListenableFuture<Object>> ref =
new AtomicReference<>(immediateFuture(null));
/**
* Enqueues a task to run when the previous task (if any) completes.
*
* <p>Cancellation does not propagate from the output future to a callable that has begun to
* execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
* {@link Callable#call()} will not be invoked.
*/
public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
checkNotNull(callable);
return submitAsync(
new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() throws Exception {
return immediateFuture(callable.call());
}
@Override
public String toString() {
return callable.toString();
}
},
executor);
}
/**
* Enqueues a task to run when the previous task (if any) completes.
*
* <p>Cancellation does not propagate from the output future to the future returned from {@code
* callable} or a callable that has begun to execute, but if the output future is cancelled before
* {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
*/
public <T> ListenableFuture<T> submitAsync(
final AsyncCallable<T> callable, final Executor executor) {
checkNotNull(callable);
final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN);
final AsyncCallable<T> task =
new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() throws Exception {
if (!runningState.compareAndSet(NOT_RUN, STARTED)) {
return immediateCancelledFuture();
}
return callable.call();
}
@Override
public String toString() {
return callable.toString();
}
};
/*
* Four futures are at play here:
* taskFuture is the future tracking the result of the callable.
* newFuture is a future that completes after this and all prior tasks are done.
* oldFuture is the previous task's newFuture.
* outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
*
* newFuture is guaranteed to only complete once all tasks previously submitted to this instance
* have completed - namely after oldFuture is done, and taskFuture has either completed or been
* cancelled before the callable started execution.
*/
final SettableFuture<Object> newFuture = SettableFuture.create();
final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture);
// Invoke our task once the previous future completes.
final ListenableFuture<T> taskFuture =
Futures.submitAsync(
task,
new Executor() {
@Override
public void execute(Runnable runnable) {
oldFuture.addListener(runnable, executor);
}
});
final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
// newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
// unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
// if the future we return is cancelled, we don't begin execution of the next task until after
// oldFuture completes.
Runnable listener =
new Runnable() {
@Override
public void run() {
if (taskFuture.isDone()
// If this CAS succeeds, we know that the provided callable will never be invoked,
// so when oldFuture completes it is safe to allow the next submitted task to
// proceed.
|| (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) {
// Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
// a future that eventually came from immediateFuture(null), this doesn't leak
// throwables or completion values.
newFuture.setFuture(oldFuture);
}
}
};
// Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
// taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
// propagates cancellation if the callable has not yet been invoked.
outputFuture.addListener(listener, directExecutor());
taskFuture.addListener(listener, directExecutor());
return outputFuture;
}
}