blob: 56041751c91c928fd22fb5c065b409c80a3792a4 [file] [log] [blame]
/*
* Copyright (C) 2008 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.IDLE;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUED;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUING;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.RUNNING;
import static java.lang.System.identityHashCode;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.j2objc.annotations.RetainedWith;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Executor ensuring that all Runnables submitted are executed in order, using the provided
* Executor, and sequentially such that no two will ever be running at the same time.
*
* <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
*
* <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
* When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
* continues. See {@link QueueWorker#workOnQueue} for details.
*
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
* If an {@code Error} is thrown, the error will propagate and execution will stop until it is
* restarted by a call to {@link #execute}.
*/
@GwtIncompatible
final class SequentialExecutor implements Executor {
private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName());
enum WorkerRunningState {
/** Runnable is not running and not queued for execution */
IDLE,
/** Runnable is not running, but is being queued for execution */
QUEUING,
/** runnable has been submitted but has not yet begun execution */
QUEUED,
RUNNING,
}
/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;
@GuardedBy("queue")
private final Deque<Runnable> queue = new ArrayDeque<>();
/** see {@link WorkerRunningState} */
@GuardedBy("queue")
private WorkerRunningState workerRunningState = IDLE;
/**
* This counter prevents an ABA issue where a thread may successfully schedule the worker, the
* worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
* worker, and then the first thread's call to delegate.execute() returns. Without this counter,
* it would observe the QUEUING state and set it to QUEUED, and the worker would never be
* scheduled again for future submissions.
*/
@GuardedBy("queue")
private long workerRunCount = 0;
@RetainedWith private final QueueWorker worker = new QueueWorker();
/** Use {@link MoreExecutors#newSequentialExecutor} */
SequentialExecutor(Executor executor) {
this.executor = Preconditions.checkNotNull(executor);
}
/**
* Adds a task to the queue and makes sure a worker thread is running.
*
* <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
* execution of tasks will stop until a call to this method or to {@link #resume()} is made.
*/
@Override
public void execute(final Runnable task) {
checkNotNull(task);
final Runnable submittedTask;
final long oldRunCount;
synchronized (queue) {
// If the worker is already running (or execute() on the delegate returned successfully, and
// the worker has yet to start) then we don't need to start the worker.
if (workerRunningState == RUNNING || workerRunningState == QUEUED) {
queue.add(task);
return;
}
oldRunCount = workerRunCount;
// If the worker is not yet running, the delegate Executor might reject our attempt to start
// it. To preserve FIFO order and failure atomicity of rejected execution when the same
// Runnable is executed more than once, allocate a wrapper that we know is safe to remove by
// object identity.
// A data structure that returned a removal handle from add() would allow eliminating this
// allocation.
submittedTask =
new Runnable() {
@Override
public void run() {
task.run();
}
@Override
public String toString() {
return task.toString();
}
};
queue.add(submittedTask);
workerRunningState = QUEUING;
}
try {
executor.execute(worker);
} catch (RuntimeException | Error t) {
synchronized (queue) {
boolean removed =
(workerRunningState == IDLE || workerRunningState == QUEUING)
&& queue.removeLastOccurrence(submittedTask);
// If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But
// that's handled by the log check that catches RuntimeExceptions in the queue worker.
if (!(t instanceof RejectedExecutionException) || removed) {
throw t;
}
}
return;
}
/*
* This is an unsynchronized read! After the read, the function returns immediately or acquires
* the lock to check again. Since an IDLE state was observed inside the preceding synchronized
* block, and reference field assignment is atomic, this may save reacquiring the lock when
* another thread or the worker task has cleared the count and set the state.
*
* <p>When {@link #executor} is a directExecutor(), the value written to
* {@code workerRunningState} will be available synchronously, and behaviour will be
* deterministic.
*/
@SuppressWarnings("GuardedBy")
boolean alreadyMarkedQueued = workerRunningState != QUEUING;
if (alreadyMarkedQueued) {
return;
}
synchronized (queue) {
if (workerRunCount == oldRunCount && workerRunningState == QUEUING) {
workerRunningState = QUEUED;
}
}
}
/** Worker that runs tasks from {@link #queue} until it is empty. */
private final class QueueWorker implements Runnable {
Runnable task;
@Override
public void run() {
try {
workOnQueue();
} catch (Error e) {
synchronized (queue) {
workerRunningState = IDLE;
}
throw e;
// The execution of a task has ended abnormally.
// We could have tasks left in the queue, so should perhaps try to restart a worker,
// but then the Error will get delayed if we are using a direct (same thread) executor.
}
}
/**
* Continues executing tasks from {@link #queue} until it is empty.
*
* <p>The thread's interrupt bit is cleared before execution of each task.
*
* <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
* #queue}, the Executor will complete its tasks, and then restore the interruption. This means
* that once the Thread returns to the Executor that this Executor composes, the interruption
* will still be present. If the composed Executor is an ExecutorService, it can respond to
* shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue.
*/
private void workOnQueue() {
boolean interruptedDuringTask = false;
boolean hasSetRunning = false;
try {
while (true) {
synchronized (queue) {
// Choose whether this thread will run or not after acquiring the lock on the first
// iteration
if (!hasSetRunning) {
if (workerRunningState == RUNNING) {
// Don't want to have two workers pulling from the queue.
return;
} else {
// Increment the run counter to avoid the ABA problem of a submitter marking the
// thread as QUEUED after it already ran and exhausted the queue before returning
// from execute().
workerRunCount++;
workerRunningState = RUNNING;
hasSetRunning = true;
}
}
task = queue.poll();
if (task == null) {
workerRunningState = IDLE;
return;
}
}
// Remove the interrupt bit before each task. The interrupt is for the "current task" when
// it is sent, so subsequent tasks in the queue should not be caused to be interrupted
// by a previous one in the queue being interrupted.
interruptedDuringTask |= Thread.interrupted();
try {
task.run();
} catch (RuntimeException e) {
log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
} finally {
task = null;
}
}
} finally {
// Ensure that if the thread was interrupted at all while processing the task queue, it
// is returned to the delegate Executor interrupted so that it may handle the
// interruption if it likes.
if (interruptedDuringTask) {
Thread.currentThread().interrupt();
}
}
}
@SuppressWarnings("GuardedBy")
@Override
public String toString() {
Runnable currentlyRunning = task;
if (currentlyRunning != null) {
return "SequentialExecutorWorker{running=" + currentlyRunning + "}";
}
return "SequentialExecutorWorker{state=" + workerRunningState + "}";
}
}
@Override
public String toString() {
return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}";
}
}