blob: bf70aac25e72c6347eebc918e87e66c3fcf2676f [file] [log] [blame]
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
/** Utilities for the AbstractFutureBenchmarks */
final class AbstractFutureBenchmarks {
private AbstractFutureBenchmarks() {}
interface Facade<T> extends ListenableFuture<T> {
@CanIgnoreReturnValue
boolean set(T t);
@CanIgnoreReturnValue
boolean setException(Throwable t);
}
private static class NewAbstractFutureFacade<T> extends AbstractFuture<T> implements Facade<T> {
@CanIgnoreReturnValue
@Override
public boolean set(T t) {
return super.set(t);
}
@CanIgnoreReturnValue
@Override
public boolean setException(Throwable t) {
return super.setException(t);
}
}
private static class OldAbstractFutureFacade<T> extends OldAbstractFuture<T>
implements Facade<T> {
@CanIgnoreReturnValue
@Override
public boolean set(T t) {
return super.set(t);
}
@CanIgnoreReturnValue
@Override
public boolean setException(Throwable t) {
return super.setException(t);
}
}
enum Impl {
NEW {
@Override
<T> Facade<T> newFacade() {
return new NewAbstractFutureFacade<T>();
}
},
OLD {
@Override
<T> Facade<T> newFacade() {
return new OldAbstractFutureFacade<T>();
}
};
abstract <T> Facade<T> newFacade();
}
static void awaitWaiting(Thread t) {
while (true) {
Thread.State state = t.getState();
switch (state) {
case RUNNABLE:
case BLOCKED:
Thread.yield();
break;
case WAITING:
return;
default:
throw new AssertionError("unexpected state: " + state);
}
}
}
abstract static class OldAbstractFuture<V> implements ListenableFuture<V> {
/** Synchronization control for AbstractFutures. */
private final Sync<V> sync = new Sync<V>();
// The execution list to hold our executors.
private final ExecutionList executionList = new ExecutionList();
/** Constructor for use by subclasses. */
protected OldAbstractFuture() {}
/*
* Improve the documentation of when InterruptedException is thrown. Our
* behavior matches the JDK's, but the JDK's documentation is misleading.
*/
/**
* {@inheritDoc}
*
* <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if
* the current thread is interrupted before or during the call, even if the value is already
* available.
*
* @throws InterruptedException if the current thread was interrupted before or during the call
* (optional but recommended).
* @throws CancellationException {@inheritDoc}
*/
@CanIgnoreReturnValue
@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {
return sync.get(unit.toNanos(timeout));
}
/*
* Improve the documentation of when InterruptedException is thrown. Our
* behavior matches the JDK's, but the JDK's documentation is misleading.
*/
/**
* {@inheritDoc}
*
* <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if
* the current thread is interrupted before or during the call, even if the value is already
* available.
*
* @throws InterruptedException if the current thread was interrupted before or during the call
* (optional but recommended).
* @throws CancellationException {@inheritDoc}
*/
@CanIgnoreReturnValue
@Override
public V get() throws InterruptedException, ExecutionException {
return sync.get();
}
@Override
public boolean isDone() {
return sync.isDone();
}
@Override
public boolean isCancelled() {
return sync.isCancelled();
}
@CanIgnoreReturnValue
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!sync.cancel(mayInterruptIfRunning)) {
return false;
}
executionList.execute();
if (mayInterruptIfRunning) {
interruptTask();
}
return true;
}
/**
* Subclasses can override this method to implement interruption of the future's computation.
* The method is invoked automatically by a successful call to {@link #cancel(boolean)
* cancel(true)}.
*
* <p>The default implementation does nothing.
*
* @since 10.0
*/
protected void interruptTask() {}
/**
* Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code
* true}.
*
* @since 14.0
*/
protected final boolean wasInterrupted() {
return sync.wasInterrupted();
}
/**
* {@inheritDoc}
*
* @since 10.0
*/
@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}
/**
* Subclasses should invoke this method to set the result of the computation to {@code value}.
* This will set the state of the future to {@link OldAbstractFuture.Sync#COMPLETED} and invoke
* the listeners if the state was successfully changed.
*
* @param value the value that was the result of the task.
* @return true if the state was successfully changed.
*/
@CanIgnoreReturnValue
protected boolean set(@NullableDecl V value) {
boolean result = sync.set(value);
if (result) {
executionList.execute();
}
return result;
}
/**
* Subclasses should invoke this method to set the result of the computation to an error, {@code
* throwable}. This will set the state of the future to {@link OldAbstractFuture.Sync#COMPLETED}
* and invoke the listeners if the state was successfully changed.
*
* @param throwable the exception that the task failed with.
* @return true if the state was successfully changed.
*/
@CanIgnoreReturnValue
protected boolean setException(Throwable throwable) {
boolean result = sync.setException(checkNotNull(throwable));
if (result) {
executionList.execute();
}
return result;
}
/**
* Following the contract of {@link AbstractQueuedSynchronizer} we create a private subclass to
* hold the synchronizer. This synchronizer is used to implement the blocking and waiting calls
* as well as to handle state changes in a thread-safe manner. The current state of the future
* is held in the Sync state, and the lock is released whenever the state changes to {@link
* #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
*
* <p>To avoid races between threads doing release and acquire, we transition to the final state
* in two steps. One thread will successfully CAS from RUNNING to COMPLETING, that thread will
* then set the result of the computation, and only then transition to COMPLETED, CANCELLED, or
* INTERRUPTED.
*
* <p>We don't use the integer argument passed between acquire methods so we pass around a -1
* everywhere.
*/
static final class Sync<V> extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 0L;
/* Valid states. */
static final int RUNNING = 0;
static final int COMPLETING = 1;
static final int COMPLETED = 2;
static final int CANCELLED = 4;
static final int INTERRUPTED = 8;
private V value;
private Throwable exception;
/*
* Acquisition succeeds if the future is done, otherwise it fails.
*/
@Override
protected int tryAcquireShared(int ignored) {
if (isDone()) {
return 1;
}
return -1;
}
/*
* We always allow a release to go through, this means the state has been
* successfully changed and the result is available.
*/
@Override
protected boolean tryReleaseShared(int finalState) {
setState(finalState);
return true;
}
/**
* Blocks until the task is complete or the timeout expires. Throws a {@link TimeoutException}
* if the timer expires, otherwise behaves like {@link #get()}.
*/
V get(long nanos)
throws TimeoutException, CancellationException, ExecutionException, InterruptedException {
// Attempt to acquire the shared lock with a timeout.
if (!tryAcquireSharedNanos(-1, nanos)) {
throw new TimeoutException("Timeout waiting for task.");
}
return getValue();
}
/**
* Blocks until {@link #complete(Object, Throwable, int)} has been successfully called. Throws
* a {@link CancellationException} if the task was cancelled, or a {@link ExecutionException}
* if the task completed with an error.
*/
V get() throws CancellationException, ExecutionException, InterruptedException {
// Acquire the shared lock allowing interruption.
acquireSharedInterruptibly(-1);
return getValue();
}
/**
* Implementation of the actual value retrieval. Will return the value on success, an
* exception on failure, a cancellation on cancellation, or an illegal state if the
* synchronizer is in an invalid state.
*/
private V getValue() throws CancellationException, ExecutionException {
int state = getState();
switch (state) {
case COMPLETED:
if (exception != null) {
throw new ExecutionException(exception);
} else {
return value;
}
case CANCELLED:
case INTERRUPTED:
throw cancellationExceptionWithCause("Task was cancelled.", exception);
default:
throw new IllegalStateException("Error, synchronizer in invalid state: " + state);
}
}
/** Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}. */
boolean isDone() {
return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
}
/** Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}. */
boolean isCancelled() {
return (getState() & (CANCELLED | INTERRUPTED)) != 0;
}
/** Checks if the state is {@link #INTERRUPTED}. */
boolean wasInterrupted() {
return getState() == INTERRUPTED;
}
/** Transition to the COMPLETED state and set the value. */
boolean set(@NullableDecl V v) {
return complete(v, null, COMPLETED);
}
/** Transition to the COMPLETED state and set the exception. */
boolean setException(Throwable t) {
return complete(null, t, COMPLETED);
}
/** Transition to the CANCELLED or INTERRUPTED state. */
boolean cancel(boolean interrupt) {
return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
}
/**
* Implementation of completing a task. Either {@code v} or {@code t} will be set but not
* both. The {@code finalState} is the state to change to from {@link #RUNNING}. If the state
* is not in the RUNNING state we return {@code false} after waiting for the state to be set
* to a valid final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}).
*
* @param v the value to set as the result of the computation.
* @param t the exception to set as the result of the computation.
* @param finalState the state to transition to.
*/
private boolean complete(@NullableDecl V v, @NullableDecl Throwable t, int finalState) {
boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
if (doCompletion) {
// If this thread successfully transitioned to COMPLETING, set the value
// and exception and then release to the final state.
this.value = v;
// Don't actually construct a CancellationException until necessary.
this.exception =
((finalState & (CANCELLED | INTERRUPTED)) != 0)
? new CancellationException("Future.cancel() was called.")
: t;
releaseShared(finalState);
} else if (getState() == COMPLETING) {
// If some other thread is currently completing the future, block until
// they are done so we can guarantee completion.
acquireShared(-1);
}
return doCompletion;
}
}
static final CancellationException cancellationExceptionWithCause(
@NullableDecl String message, @NullableDecl Throwable cause) {
CancellationException exception = new CancellationException(message);
exception.initCause(cause);
return exception;
}
}
}