blob: a90d1ae6ea715a6f1e58139bcf980171bc87f5e7 [file] [log] [blame]
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.camera.core.impl.utils.executor;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.IDLE;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.QUEUED;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.QUEUING;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.RUNNING;
import android.util.Log;
import androidx.annotation.GuardedBy;
import androidx.core.util.Preconditions;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
/**
* Executor ensuring that all Runnables submitted are executed in order, using the provided
* Executor, and sequentially such that no two will ever be running at the same time.
*
* <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
*
* <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
* When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
* continues. See {@link QueueWorker#workOnQueue} for details.
*
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
* If an {@code Error} is thrown, the error will propagate and execution will stop until it is
* restarted by a call to {@link #execute}.
*
* <p>Copied and adapted from Guava.
*/
final class SequentialExecutor implements Executor {
private static final String TAG = "SequentialExecutor";
@SuppressWarnings("WeakerAccess") /* synthetic accessor */
@GuardedBy("mQueue")
final Deque<Runnable> mQueue = new ArrayDeque<>();
/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor mExecutor;
private final QueueWorker mWorker = new QueueWorker();
/** see {@link WorkerRunningState} */
@SuppressWarnings("WeakerAccess") /* synthetic accessor */
@GuardedBy("mQueue")
WorkerRunningState mWorkerRunningState = IDLE;
/**
* This counter prevents an ABA issue where a thread may successfully schedule the worker, the
* worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
* worker, and then the first thread's call to delegate.execute() returns. Without this counter,
* it would observe the QUEUING state and set it to QUEUED, and the worker would never be
* scheduled again for future submissions.
*/
@SuppressWarnings("WeakerAccess") /* synthetic accessor */
@GuardedBy("mQueue")
long mWorkerRunCount = 0;
/** Use {@link CameraXExecutors#newSequentialExecutor} */
SequentialExecutor(Executor executor) {
mExecutor = Preconditions.checkNotNull(executor);
}
/**
* Adds a task to the queue and makes sure a worker thread is running.
*
* <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate
* executor, execution of tasks will stop until a call to this method is made.
*/
@Override
public void execute(final Runnable task) {
Preconditions.checkNotNull(task);
final Runnable submittedTask;
final long oldRunCount;
synchronized (mQueue) {
// If the worker is already running (or execute() on the delegate returned
// successfully, and
// the worker has yet to start) then we don't need to start the worker.
if (mWorkerRunningState == RUNNING || mWorkerRunningState == QUEUED) {
mQueue.add(task);
return;
}
oldRunCount = mWorkerRunCount;
// If the worker is not yet running, the delegate Executor might reject our attempt
// to start it. To preserve FIFO order and failure atomicity of rejected execution when
// the same Runnable is executed more than once, allocate a wrapper that we know is safe
// to remove by object identity. A data structure that returned a removal handle from
// add() would allow eliminating this allocation.
submittedTask =
new Runnable() {
@Override
public void run() {
task.run();
}
};
mQueue.add(submittedTask);
mWorkerRunningState = QUEUING;
}
try {
mExecutor.execute(mWorker);
} catch (RuntimeException | Error t) {
synchronized (mQueue) {
boolean removed =
(mWorkerRunningState == IDLE || mWorkerRunningState == QUEUING)
&& mQueue.removeLastOccurrence(submittedTask);
// If the delegate is directExecutor(), the submitted runnable could have thrown
// a REE. But that's handled by the log check that catches RuntimeExceptions in the
// queue worker.
if (!(t instanceof RejectedExecutionException) || removed) {
throw t;
}
}
return;
}
/*
* This is an unsynchronized read! After the read, the function returns immediately or
* acquires the lock to check again. Since an IDLE state was observed inside the preceding
* synchronized block, and reference field assignment is atomic, this may save reacquiring
* the lock when another thread or the worker task has cleared the count and set the state.
*
* <p>When {@link #executor} is a directExecutor(), the value written to
* {@code mWorkerRunningState} will be available synchronously, and behaviour will be
* deterministic.
*/
@SuppressWarnings("GuardedBy")
boolean alreadyMarkedQueued = mWorkerRunningState != QUEUING;
if (alreadyMarkedQueued) {
return;
}
synchronized (mQueue) {
if (mWorkerRunCount == oldRunCount && mWorkerRunningState == QUEUING) {
mWorkerRunningState = QUEUED;
}
}
}
enum WorkerRunningState {
/** Runnable is not running and not queued for execution */
IDLE,
/** Runnable is not running, but is being queued for execution */
QUEUING,
/** Runnable has been submitted but has not yet begun execution */
QUEUED,
/** Runnable is running */
RUNNING,
}
/** Worker that runs tasks from {@link #mQueue} until it is empty. */
final class QueueWorker implements Runnable {
@Override
public void run() {
try {
workOnQueue();
} catch (Error e) {
synchronized (mQueue) {
mWorkerRunningState = IDLE;
}
throw e;
// The execution of a task has ended abnormally.
// We could have tasks left in the queue, so should perhaps try to restart a worker,
// but then the Error will get delayed if we are using a direct (same thread)
// executor.
}
}
/**
* Continues executing tasks from {@link #mQueue} until it is empty.
*
* <p>The thread's interrupt bit is cleared before execution of each task.
*
* <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
* #mQueue}, the Executor will complete its tasks, and then restore the interruption.
* This means that once the Thread returns to the Executor that this Executor composes, the
* interruption will still be present. If the composed Executor is an ExecutorService, it
* can respond to shutdown() by returning tasks queued on that Thread after {@link #mWorker}
* drains the queue.
*/
private void workOnQueue() {
boolean interruptedDuringTask = false;
boolean hasSetRunning = false;
try {
while (true) {
Runnable task;
synchronized (mQueue) {
// Choose whether this thread will run or not after acquiring the lock on
// the first iteration
if (!hasSetRunning) {
if (mWorkerRunningState == RUNNING) {
// Don't want to have two workers pulling from the queue.
return;
} else {
// Increment the run counter to avoid the ABA problem of a submitter
// marking the thread as QUEUED after it already ran and exhausted
// the queue before returning from execute().
mWorkerRunCount++;
mWorkerRunningState = RUNNING;
hasSetRunning = true;
}
}
task = mQueue.poll();
if (task == null) {
mWorkerRunningState = IDLE;
return;
}
}
// Remove the interrupt bit before each task. The interrupt is for the
// "current task" when it is sent, so subsequent tasks in the queue should not
// be caused to be interrupted by a previous one in the queue being interrupted.
interruptedDuringTask |= Thread.interrupted();
try {
task.run();
} catch (RuntimeException e) {
Log.e(TAG, "Exception while executing runnable " + task, e);
}
}
} finally {
// Ensure that if the thread was interrupted at all while processing the task
// queue, it is returned to the delegate Executor interrupted so that it may handle
// the interruption if it likes.
if (interruptedDuringTask) {
Thread.currentThread().interrupt();
}
}
}
}
}