blob: f589e08b0ac1a3a05caee0ffe1c52d28d8d11bef [file] [log] [blame]
/*
* Copyright (C) 2018 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.annotations.Beta;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
/**
* Serializes execution of tasks, somewhat like an "asynchronous {@code synchronized} block." Each
* {@linkplain #submit enqueued} callable will not be submitted to its associated executor until the
* previous callable has returned -- and, if the previous callable was an {@link AsyncCallable}, not
* until the {@code Future} it returned is {@linkplain Future#isDone done} (successful, failed, or
* cancelled).
*
* <p>This class has limited support for cancellation and other "early completion":
*
* <ul>
* <li>While calls to {@code submit} and {@code submitAsync} return a {@code Future} that can be
* cancelled, cancellation never propagates to a task that has started to run -- neither to
* the callable itself nor to any {@code Future} returned by an {@code AsyncCallable}.
* (However, cancellation can prevent an <i>unstarted</i> task from running.) Therefore, the
* next task will wait for any running callable (or pending {@code Future} returned by an
* {@code AsyncCallable}) to complete, without interrupting it (and without calling {@code
* cancel} on the {@code Future}). So beware: <i>Even if you cancel every precededing {@code
* Future} returned by this class, the next task may still have to wait.</i>.
* <li>Once an {@code AsyncCallable} returns a {@code Future}, this class considers that task to
* be "done" as soon as <i>that</i> {@code Future} completes in any way. Notably, a {@code
* Future} is "completed" even if it is cancelled while its underlying work continues on a
* thread, an RPC, etc. The {@code Future} is also "completed" if it fails "early" -- for
* example, if the deadline expires on a {@code Future} returned from {@link
* Futures#withTimeout} while the {@code Future} it wraps continues its underlying work. So
* beware: <i>Your {@code AsyncCallable} should not complete its {@code Future} until it is
* safe for the next task to start.</i>
* </ul>
*
* <p>An additional limitation: this class serializes execution of <i>tasks</i> but not any
* <i>listeners</i> of those tasks.
*
* <p>This class is similar to {@link MoreExecutors#newSequentialExecutor}. This class is different
* in a few ways:
*
* <ul>
* <li>Each task may be associated with a different executor.
* <li>Tasks may be of type {@code AsyncCallable}.
* <li>Running tasks <i>cannot</i> be interrupted. (Note that {@code newSequentialExecutor} does
* not return {@code Future} objects, so it doesn't support interruption directly, either.
* However, utilities that <i>use</i> that executor have the ability to interrupt tasks
* running on it. This class, by contrast, does not expose an {@code Executor} API.)
* </ul>
*
* <p>If you don't need the features of this class, you may prefer {@code newSequentialExecutor} for
* its simplicity and ability to accommodate interruption.
*
* @since 26.0
*/
@Beta
public final class ExecutionSequencer {
private ExecutionSequencer() {}
/** Creates a new instance. */
public static ExecutionSequencer create() {
return new ExecutionSequencer();
}
/** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
private final AtomicReference<ListenableFuture<Void>> ref =
new AtomicReference<>(immediateVoidFuture());
private ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue();
/**
* This object is unsafely published, but avoids problematic races by relying exclusively on the
* identity equality of its Thread field so that the task field is only accessed by a single
* thread.
*/
private static final class ThreadConfinedTaskQueue {
/**
* This field is only used for identity comparisons with the current thread. Field assignments
* are atomic, but do not provide happens-before ordering; however:
*
* <ul>
* <li>If this field's value == currentThread, we know that it's up to date, because write
* operations in a thread always happen-before subsequent read operations in the same
* thread
* <li>If this field's value == null because of unsafe publication, we know that it isn't the
* object associated with our thread, because if it was the publication wouldn't have been
* unsafe and we'd have seen our thread as the value. This state is also why a new
* ThreadConfinedTaskQueue object must be created for each inline execution, because
* observing a null thread does not mean the object is safe to reuse.
* <li>If this field's value is some other thread object, we know that it's not our thread.
* <li>If this field's value == null because it originally belonged to another thread and that
* thread cleared it, we still know that it's not associated with our thread
* <li>If this field's value == null because it was associated with our thread and was
* cleared, we know that we're not executing inline any more
* </ul>
*
* All the states where thread != currentThread are identical for our purposes, and so even
* though it's racy, we don't care which of those values we get, so no need to synchronize.
*/
Thread thread;
/** Only used by the thread associated with this object */
Runnable nextTask;
/** Only used by the thread associated with this object */
Executor nextExecutor;
}
/**
* Enqueues a task to run when the previous task (if any) completes.
*
* <p>Cancellation does not propagate from the output future to a callable that has begun to
* execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
* {@link Callable#call()} will not be invoked.
*/
public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
checkNotNull(callable);
checkNotNull(executor);
return submitAsync(
new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() throws Exception {
return immediateFuture(callable.call());
}
@Override
public String toString() {
return callable.toString();
}
},
executor);
}
/**
* Enqueues a task to run when the previous task (if any) completes.
*
* <p>Cancellation does not propagate from the output future to the future returned from {@code
* callable} or a callable that has begun to execute, but if the output future is cancelled before
* {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
*/
public <T> ListenableFuture<T> submitAsync(
final AsyncCallable<T> callable, final Executor executor) {
checkNotNull(callable);
checkNotNull(executor);
final TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this);
final AsyncCallable<T> task =
new AsyncCallable<T>() {
@Override
public ListenableFuture<T> call() throws Exception {
if (!taskExecutor.trySetStarted()) {
return immediateCancelledFuture();
}
return callable.call();
}
@Override
public String toString() {
return callable.toString();
}
};
/*
* Four futures are at play here:
* taskFuture is the future tracking the result of the callable.
* newFuture is a future that completes after this and all prior tasks are done.
* oldFuture is the previous task's newFuture.
* outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
*
* newFuture is guaranteed to only complete once all tasks previously submitted to this instance
* have completed - namely after oldFuture is done, and taskFuture has either completed or been
* cancelled before the callable started execution.
*/
final SettableFuture<Void> newFuture = SettableFuture.create();
final ListenableFuture<Void> oldFuture = ref.getAndSet(newFuture);
// Invoke our task once the previous future completes.
final TrustedListenableFutureTask<T> taskFuture = TrustedListenableFutureTask.create(task);
oldFuture.addListener(taskFuture, taskExecutor);
final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
// newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
// unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
// if the future we return is cancelled, we don't begin execution of the next task until after
// oldFuture completes.
Runnable listener =
new Runnable() {
@Override
public void run() {
if (taskFuture.isDone()) {
// Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
// a future that eventually came from immediateFuture(null), this doesn't leak
// throwables or completion values.
newFuture.setFuture(oldFuture);
} else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) {
// If this CAS succeeds, we know that the provided callable will never be invoked,
// so when oldFuture completes it is safe to allow the next submitted task to
// proceed. Doing this immediately here lets the next task run without waiting for
// the cancelled task's executor to run the noop AsyncCallable.
//
// ---
//
// If the CAS fails, the provided callable already started running (or it is about
// to). Our contract promises:
//
// 1. not to execute a new callable until the old one has returned
//
// If we were to cancel taskFuture, that would let the next task start while the old
// one is still running.
//
// Now, maybe we could tweak our implementation to not start the next task until the
// callable actually completes. (We could detect completion in our wrapper
// `AsyncCallable task`.) However, our contract also promises:
//
// 2. not to cancel any Future the user returned from an AsyncCallable
//
// We promise this because, once we cancel that Future, we would no longer be able to
// tell when any underlying work it is doing is done. Thus, we might start a new task
// while that underlying work is still running.
//
// So that is why we cancel only in the case of CAS success.
taskFuture.cancel(false);
}
}
};
// Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
// taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
// propagates cancellation if the callable has not yet been invoked.
outputFuture.addListener(listener, directExecutor());
taskFuture.addListener(listener, directExecutor());
return outputFuture;
}
enum RunningState {
NOT_RUN,
CANCELLED,
STARTED,
}
/**
* This class helps avoid a StackOverflowError when large numbers of tasks are submitted with
* {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other
* tasks would be called recursively. Here, we detect that the delegate executor is executing
* inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class
* per call to submit() or submitAsync(), and each instance supports only one call to execute().
*
* <p>This class would certainly be simpler and easier to reason about if it were built with
* ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is
* non-static, and is initialized/removed frequently - this causes churn in the Thread specific
* hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different
* ExecutionSequencer objects interfere with each other, which would be undesirable, in addition
* to increasing the memory footprint of every thread that interacted with it. In order to release
* entries in thread-specific maps when the ThreadLocal object itself is no longer referenced,
* ThreadLocal is usually implemented with a WeakReference, which can have negative performance
* properties; for example, calling WeakReference.get() on Android will block during an
* otherwise-concurrent GC cycle.
*/
@SuppressWarnings("ShouldNotSubclass") // Saving an allocation here is worth it
private static final class TaskNonReentrantExecutor extends AtomicReference<RunningState>
implements Executor, Runnable {
/**
* Used to update and read the latestTaskQueue field. Set to null once the runnable has been run
* or queued.
*/
ExecutionSequencer sequencer;
/**
* Executor the task was set to run on. Set to null when the task has been queued, run, or
* cancelled.
*/
Executor delegate;
/**
* Set before calling delegate.execute(); set to null once run, so that it can be GCed; this
* object may live on after, if submitAsync returns an incomplete future.
*/
Runnable task;
/** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */
Thread submitting;
private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) {
super(NOT_RUN);
this.delegate = delegate;
this.sequencer = sequencer;
}
@Override
public void execute(Runnable task) {
// If this operation was successfully cancelled already, calling the runnable will be a noop.
// This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel,
// which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run
// without waiting for the user's executor to run our submitted Runnable. However, this can
// interact poorly with the reentrancy-avoiding behavior of this executor - when the operation
// before the cancelled future completes, it will synchronously complete both the newFuture
// from the cancelled operation and its own. This can cause one runnable to queue two tasks,
// breaking the invariant this method relies on to iteratively run the next task after the
// previous one completes.
if (get() == RunningState.CANCELLED) {
delegate = null;
sequencer = null;
return;
}
submitting = Thread.currentThread();
try {
ThreadConfinedTaskQueue submittingTaskQueue = sequencer.latestTaskQueue;
if (submittingTaskQueue.thread == submitting) {
sequencer = null;
// Submit from inside a reentrant submit. We don't know if this one will be reentrant (and
// can't know without submitting something to the executor) so queue to run iteratively.
// Task must be null, since each execution on this executor can only produce one more
// execution.
checkState(submittingTaskQueue.nextTask == null);
submittingTaskQueue.nextTask = task;
submittingTaskQueue.nextExecutor = delegate;
delegate = null;
} else {
Executor localDelegate = delegate;
delegate = null;
this.task = task;
localDelegate.execute(this);
}
} finally {
// Important to null this out here - if we did *not* execute inline, we might still
// run() on the same thread that called execute() - such as in a thread pool, and think
// that it was happening inline. As a side benefit, avoids holding on to the Thread object
// longer than necessary.
submitting = null;
}
}
@SuppressWarnings("ShortCircuitBoolean")
@Override
public void run() {
Thread currentThread = Thread.currentThread();
if (currentThread != submitting) {
Runnable localTask = task;
task = null;
localTask.run();
return;
}
// Executor called reentrantly! Make sure that further calls don't overflow stack. Further
// reentrant calls will see that their current thread is the same as the one set in
// latestTaskQueue, and queue rather than calling execute() directly.
ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue();
executingTaskQueue.thread = currentThread;
// Unconditionally set; there is no risk of throwing away a queued task from another thread,
// because in order for the current task to run on this executor the previous task must have
// already started execution. Because each task on a TaskNonReentrantExecutor can only produce
// one execute() call to another instance from the same ExecutionSequencer, we know by
// induction that the task that launched this one must not have added any other runnables to
// that thread's queue, and thus we cannot be replacing a TaskAndThread object that would
// otherwise have another task queued on to it. Note the exception to this, cancellation, is
// specially handled in execute() - execute() calls triggered by cancellation are no-ops, and
// thus don't count.
sequencer.latestTaskQueue = executingTaskQueue;
sequencer = null;
try {
Runnable localTask = task;
task = null;
localTask.run();
// Now check if our task attempted to reentrantly execute the next task.
Runnable queuedTask;
Executor queuedExecutor;
// Intentionally using non-short-circuit operator
while ((queuedTask = executingTaskQueue.nextTask) != null
& (queuedExecutor = executingTaskQueue.nextExecutor) != null) {
executingTaskQueue.nextTask = null;
executingTaskQueue.nextExecutor = null;
queuedExecutor.execute(queuedTask);
}
} finally {
// Null out the thread field, so that we don't leak a reference to Thread, and so that
// future `thread == currentThread()` calls from this thread don't incorrectly queue instead
// of executing. Don't null out the latestTaskQueue field, because the work done here
// may have scheduled more operations on another thread, and if those operations then
// trigger reentrant calls that thread will have updated the latestTaskQueue field, and
// we'd be interfering with their operation.
executingTaskQueue.thread = null;
}
}
private boolean trySetStarted() {
return compareAndSet(NOT_RUN, STARTED);
}
private boolean trySetCancelled() {
return compareAndSet(NOT_RUN, CANCELLED);
}
}
}