blob: 368e2d4f2e6ee6ca40256eb8bbd49d614d649b3f [file] [log] [blame]
/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.internal.InternalFutureFailureAccess;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
/**
* Tests for {@link AbstractFuture}.
*
* @author Brian Stoler
*/
public class AbstractFutureTest extends TestCase {
public void testSuccess() throws ExecutionException, InterruptedException {
final Object value = new Object();
assertSame(
value,
new AbstractFuture<Object>() {
{
set(value);
}
}.get());
}
public void testException() throws InterruptedException {
final Throwable failure = new Throwable();
AbstractFuture<String> future =
new AbstractFuture<String>() {
{
setException(failure);
}
};
ExecutionException ee1 = getExpectingExecutionException(future);
ExecutionException ee2 = getExpectingExecutionException(future);
// Ensure we get a unique execution exception on each get
assertNotSame(ee1, ee2);
assertThat(ee1).hasCauseThat().isSameAs(failure);
assertThat(ee2).hasCauseThat().isSameAs(failure);
checkStackTrace(ee1);
checkStackTrace(ee2);
}
public void testCancel_notDoneNoInterrupt() throws Exception {
InterruptibleFuture future = new InterruptibleFuture();
assertTrue(future.cancel(false));
assertTrue(future.isCancelled());
assertTrue(future.isDone());
assertFalse(future.wasInterrupted());
assertFalse(future.interruptTaskWasCalled);
try {
future.get();
fail("Expected CancellationException");
} catch (CancellationException e) {
// See AbstractFutureCancellationCauseTest for how to set causes
assertThat(e).hasCauseThat().isNull();
}
}
public void testCancel_notDoneInterrupt() throws Exception {
InterruptibleFuture future = new InterruptibleFuture();
assertTrue(future.cancel(true));
assertTrue(future.isCancelled());
assertTrue(future.isDone());
assertTrue(future.wasInterrupted());
assertTrue(future.interruptTaskWasCalled);
try {
future.get();
fail("Expected CancellationException");
} catch (CancellationException e) {
// See AbstractFutureCancellationCauseTest for how to set causes
assertThat(e).hasCauseThat().isNull();
}
}
public void testCancel_done() throws Exception {
AbstractFuture<String> future =
new AbstractFuture<String>() {
{
set("foo");
}
};
assertFalse(future.cancel(true));
assertFalse(future.isCancelled());
assertTrue(future.isDone());
}
public void testGetWithTimeoutDoneFuture() throws Exception {
AbstractFuture<String> future =
new AbstractFuture<String>() {
{
set("foo");
}
};
assertEquals("foo", future.get(0, TimeUnit.SECONDS));
}
public void testEvilFuture_setFuture() throws Exception {
final RuntimeException exception = new RuntimeException("you didn't say the magic word!");
AbstractFuture<String> evilFuture =
new AbstractFuture<String>() {
@Override
public void addListener(Runnable r, Executor e) {
throw exception;
}
};
AbstractFuture<String> normalFuture = new AbstractFuture<String>() {};
normalFuture.setFuture(evilFuture);
assertTrue(normalFuture.isDone());
try {
normalFuture.get();
fail();
} catch (ExecutionException e) {
assertThat(e).hasCauseThat().isSameAs(exception);
}
}
public void testRemoveWaiter_interruption() throws Exception {
final AbstractFuture<String> future = new AbstractFuture<String>() {};
WaiterThread waiter1 = new WaiterThread(future);
waiter1.start();
waiter1.awaitWaiting();
WaiterThread waiter2 = new WaiterThread(future);
waiter2.start();
waiter2.awaitWaiting();
// The waiter queue should be waiter2->waiter1
// This should wake up waiter1 and cause the waiter1 node to be removed.
waiter1.interrupt();
waiter1.join();
waiter2.awaitWaiting(); // should still be blocked
LockSupport.unpark(waiter2); // spurious wakeup
waiter2.awaitWaiting(); // should eventually re-park
future.set(null);
waiter2.join();
}
public void testRemoveWaiter_polling() throws Exception {
final AbstractFuture<String> future = new AbstractFuture<String>() {};
WaiterThread waiter = new WaiterThread(future);
waiter.start();
waiter.awaitWaiting();
PollingThread poller = new PollingThread(future);
poller.start();
PollingThread poller2 = new PollingThread(future);
poller2.start();
PollingThread poller3 = new PollingThread(future);
poller3.start();
poller.awaitInLoop();
poller2.awaitInLoop();
poller3.awaitInLoop();
// The waiter queue should be {poller x 3}->waiter1
waiter.interrupt();
// This should wake up waiter1 and cause the waiter1 node to be removed.
waiter.join();
future.set(null);
poller.join();
}
public void testToString_allUnique() throws Exception {
// Two futures should not have the same toString, to avoid people asserting on it
assertThat(SettableFuture.create().toString()).isNotEqualTo(SettableFuture.create().toString());
}
public void testToString_notDone() throws Exception {
AbstractFuture<Object> testFuture =
new AbstractFuture<Object>() {
@Override
public String pendingToString() {
return "cause=[Because this test isn't done]";
}
};
assertThat(testFuture.toString())
.matches(
"[^\\[]+\\[status=PENDING, info=\\[cause=\\[Because this test isn't done\\]\\]\\]");
try {
testFuture.get(1, TimeUnit.NANOSECONDS);
fail();
} catch (TimeoutException e) {
assertThat(e.getMessage()).contains("1 nanoseconds");
assertThat(e.getMessage()).contains("Because this test isn't done");
}
}
/**
* This test attempts to cause a future to wait for longer than it was requested to from a timed
* get() call. As measurements of time are prone to flakiness, it tries to assert based on ranges
* derived from observing how much time actually passed for various operations.
*/
@SuppressWarnings({"DeprecatedThreadMethods", "ThreadPriorityCheck"})
public void testToString_delayedTimeout() throws Exception {
TimedWaiterThread thread =
new TimedWaiterThread(new AbstractFuture<Object>() {}, 2, TimeUnit.SECONDS);
thread.start();
thread.awaitWaiting();
thread.suspend();
// Sleep for enough time to add 1500 milliseconds of overwait to the get() call.
long toWaitMillis = 3500 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - thread.startTime);
Thread.sleep(toWaitMillis);
thread.setPriority(Thread.MAX_PRIORITY);
thread.resume();
thread.join();
// It's possible to race and suspend the thread just before the park call actually takes effect,
// causing the thread to be suspended for 3.5 seconds, and then park itself for 2 seconds after
// being resumed. To avoid a flake in this scenario, calculate how long that thread actually
// waited and assert based on that time. Empirically, the race where the thread ends up waiting
// for 5.5 seconds happens about 2% of the time.
boolean longWait = TimeUnit.NANOSECONDS.toSeconds(thread.timeSpentBlocked) >= 5;
// Count how long it actually took to return; we'll accept any number between the expected delay
// and the approximate actual delay, to be robust to variance in thread scheduling.
char overWaitNanosFirstDigit =
Long.toString(
thread.timeSpentBlocked - TimeUnit.MILLISECONDS.toNanos(longWait ? 5000 : 3000))
.charAt(0);
if (overWaitNanosFirstDigit < '4') {
overWaitNanosFirstDigit = '9';
}
String nanosRegex = "[4-" + overWaitNanosFirstDigit + "][0-9]+";
assertWithMessage(
"Spent " + thread.timeSpentBlocked + " ns blocked; slept for " + toWaitMillis + " ms")
.that(thread.exception)
.hasMessageThat()
.matches(
"Waited 2 seconds \\(plus "
+ (longWait ? "3" : "1")
+ " seconds, "
+ nanosRegex
+ " nanoseconds delay\\).*");
}
public void testToString_completed() throws Exception {
AbstractFuture<Object> testFuture2 =
new AbstractFuture<Object>() {
@Override
public String pendingToString() {
return "cause=[Someday...]";
}
};
AbstractFuture<Object> testFuture3 = new AbstractFuture<Object>() {};
testFuture3.setFuture(testFuture2);
assertThat(testFuture3.toString())
.matches(
"[^\\[]+\\[status=PENDING, info=\\[setFuture="
+ "\\[[^\\[]+\\[status=PENDING, info=\\[cause=\\[Someday...\\]\\]\\]\\]\\]\\]");
testFuture2.set("result string");
assertThat(testFuture3.toString())
.matches("[^\\[]+\\[status=SUCCESS, result=\\[result string\\]\\]");
}
public void testToString_cancelled() throws Exception {
assertThat(Futures.immediateCancelledFuture().toString())
.matches("[^\\[]+\\[status=CANCELLED\\]");
}
public void testToString_failed() {
assertThat(Futures.immediateFailedFuture(new RuntimeException("foo")).toString())
.matches("[^\\[]+\\[status=FAILURE, cause=\\[java.lang.RuntimeException: foo\\]\\]");
}
public void testToString_misbehaving() throws Exception {
assertThat(
new AbstractFuture<Object>() {
@Override
public String pendingToString() {
throw new RuntimeException("I'm a misbehaving implementation");
}
}.toString())
.matches(
"[^\\[]+\\[status=PENDING, info=\\[Exception thrown from implementation: "
+ "class java.lang.RuntimeException\\]\\]");
}
public void testCompletionFinishesWithDone() {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50000; i++) {
final AbstractFuture<String> future = new AbstractFuture<String>() {};
final AtomicReference<String> errorMessage = Atomics.newReference();
executor.execute(
new Runnable() {
@Override
public void run() {
future.set("success");
if (!future.isDone()) {
errorMessage.set("Set call exited before future was complete.");
}
}
});
executor.execute(
new Runnable() {
@Override
public void run() {
future.setException(new IllegalArgumentException("failure"));
if (!future.isDone()) {
errorMessage.set("SetException call exited before future was complete.");
}
}
});
executor.execute(
new Runnable() {
@Override
public void run() {
future.cancel(true);
if (!future.isDone()) {
errorMessage.set("Cancel call exited before future was complete.");
}
}
});
try {
future.get();
} catch (Throwable t) {
// Ignore, we just wanted to block.
}
String error = errorMessage.get();
assertNull(error, error);
}
executor.shutdown();
}
/**
* He did the bash, he did the future bash The future bash, it was a concurrency smash He did the
* bash, it caught on in a flash He did the bash, he did the future bash
*/
public void testFutureBash() {
final CyclicBarrier barrier =
new CyclicBarrier(
6 // for the setter threads
+ 50 // for the listeners
+ 50 // for the blocking get threads,
+ 1); // for the main thread
final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
final AtomicInteger numSuccessfulSetCalls = new AtomicInteger();
Callable<Void> completeSucessFullyRunnable =
new Callable<Void>() {
@Override
public Void call() {
if (currentFuture.get().set("set")) {
numSuccessfulSetCalls.incrementAndGet();
}
awaitUnchecked(barrier);
return null;
}
};
Callable<Void> completeExceptionallyRunnable =
new Callable<Void>() {
Exception failureCause = new Exception("setException");
@Override
public Void call() {
if (currentFuture.get().setException(failureCause)) {
numSuccessfulSetCalls.incrementAndGet();
}
awaitUnchecked(barrier);
return null;
}
};
Callable<Void> cancelRunnable =
new Callable<Void>() {
@Override
public Void call() {
if (currentFuture.get().cancel(true)) {
numSuccessfulSetCalls.incrementAndGet();
}
awaitUnchecked(barrier);
return null;
}
};
Callable<Void> setFutureCompleteSucessFullyRunnable =
new Callable<Void>() {
ListenableFuture<String> future = Futures.immediateFuture("setFuture");
@Override
public Void call() {
if (currentFuture.get().setFuture(future)) {
numSuccessfulSetCalls.incrementAndGet();
}
awaitUnchecked(barrier);
return null;
}
};
Callable<Void> setFutureCompleteExceptionallyRunnable =
new Callable<Void>() {
ListenableFuture<String> future =
Futures.immediateFailedFuture(new Exception("setFuture"));
@Override
public Void call() {
if (currentFuture.get().setFuture(future)) {
numSuccessfulSetCalls.incrementAndGet();
}
awaitUnchecked(barrier);
return null;
}
};
Callable<Void> setFutureCancelRunnable =
new Callable<Void>() {
ListenableFuture<String> future = Futures.immediateCancelledFuture();
@Override
public Void call() {
if (currentFuture.get().setFuture(future)) {
numSuccessfulSetCalls.incrementAndGet();
}
awaitUnchecked(barrier);
return null;
}
};
final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet());
Runnable collectResultsRunnable =
new Runnable() {
@Override
public void run() {
try {
String result = Uninterruptibles.getUninterruptibly(currentFuture.get());
finalResults.add(result);
} catch (ExecutionException e) {
finalResults.add(e.getCause());
} catch (CancellationException e) {
finalResults.add(CancellationException.class);
} finally {
awaitUnchecked(barrier);
}
}
};
Runnable collectResultsTimedGetRunnable =
new Runnable() {
@Override
public void run() {
Future<String> future = currentFuture.get();
while (true) {
try {
String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS);
finalResults.add(result);
break;
} catch (ExecutionException e) {
finalResults.add(e.getCause());
break;
} catch (CancellationException e) {
finalResults.add(CancellationException.class);
break;
} catch (TimeoutException e) {
// loop
}
}
awaitUnchecked(barrier);
}
};
List<Callable<?>> allTasks = new ArrayList<>();
allTasks.add(completeSucessFullyRunnable);
allTasks.add(completeExceptionallyRunnable);
allTasks.add(cancelRunnable);
allTasks.add(setFutureCompleteSucessFullyRunnable);
allTasks.add(setFutureCompleteExceptionallyRunnable);
allTasks.add(setFutureCancelRunnable);
for (int k = 0; k < 50; k++) {
// For each listener we add a task that submits it to the executor directly for the blocking
// get usecase and another task that adds it as a listener to the future to exercise both
// racing addListener calls and addListener calls completing after the future completes.
final Runnable listener =
k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable;
allTasks.add(Executors.callable(listener));
allTasks.add(
new Callable<Void>() {
@Override
public Void call() throws Exception {
currentFuture.get().addListener(listener, executor);
return null;
}
});
}
assertEquals(allTasks.size() + 1, barrier.getParties());
for (int i = 0; i < 1000; i++) {
Collections.shuffle(allTasks);
final AbstractFuture<String> future = new AbstractFuture<String>() {};
currentFuture.set(future);
for (Callable<?> task : allTasks) {
@SuppressWarnings("unused") // go/futurereturn-lsc
Future<?> possiblyIgnoredError = executor.submit(task);
}
awaitUnchecked(barrier);
assertThat(future.isDone()).isTrue();
// inspect state and ensure it is correct!
// asserts that all get calling threads received the same value
Object result = Iterables.getOnlyElement(finalResults);
if (result == CancellationException.class) {
assertTrue(future.isCancelled());
if (future.wasInterrupted()) {
// We were cancelled, it is possible that setFuture could have succeeded too.
assertThat(numSuccessfulSetCalls.get()).isIn(Range.closed(1, 2));
} else {
assertThat(numSuccessfulSetCalls.get()).isEqualTo(1);
}
} else {
assertThat(numSuccessfulSetCalls.get()).isEqualTo(1);
}
// reset for next iteration
numSuccessfulSetCalls.set(0);
finalResults.clear();
}
executor.shutdown();
}
// setFuture and cancel() interact in more complicated ways than the other setters.
public void testSetFutureCancelBash() {
final int size = 50;
final CyclicBarrier barrier =
new CyclicBarrier(
2 // for the setter threads
+ size // for the listeners
+ size // for the get threads,
+ 1); // for the main thread
final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
final AtomicReference<AbstractFuture<String>> setFutureFuture = Atomics.newReference();
final AtomicBoolean setFutureSetSucess = new AtomicBoolean();
final AtomicBoolean setFutureCompletionSucess = new AtomicBoolean();
final AtomicBoolean cancellationSucess = new AtomicBoolean();
Runnable cancelRunnable =
new Runnable() {
@Override
public void run() {
cancellationSucess.set(currentFuture.get().cancel(true));
awaitUnchecked(barrier);
}
};
Runnable setFutureCompleteSucessFullyRunnable =
new Runnable() {
@Override
public void run() {
AbstractFuture<String> future = setFutureFuture.get();
setFutureSetSucess.set(currentFuture.get().setFuture(future));
setFutureCompletionSucess.set(future.set("hello-async-world"));
awaitUnchecked(barrier);
}
};
final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet());
Runnable collectResultsRunnable =
new Runnable() {
@Override
public void run() {
try {
String result = Uninterruptibles.getUninterruptibly(currentFuture.get());
finalResults.add(result);
} catch (ExecutionException e) {
finalResults.add(e.getCause());
} catch (CancellationException e) {
finalResults.add(CancellationException.class);
} finally {
awaitUnchecked(barrier);
}
}
};
Runnable collectResultsTimedGetRunnable =
new Runnable() {
@Override
public void run() {
Future<String> future = currentFuture.get();
while (true) {
try {
String result = Uninterruptibles.getUninterruptibly(future, 0, TimeUnit.SECONDS);
finalResults.add(result);
break;
} catch (ExecutionException e) {
finalResults.add(e.getCause());
break;
} catch (CancellationException e) {
finalResults.add(CancellationException.class);
break;
} catch (TimeoutException e) {
// loop
}
}
awaitUnchecked(barrier);
}
};
List<Runnable> allTasks = new ArrayList<>();
allTasks.add(cancelRunnable);
allTasks.add(setFutureCompleteSucessFullyRunnable);
for (int k = 0; k < size; k++) {
// For each listener we add a task that submits it to the executor directly for the blocking
// get usecase and another task that adds it as a listener to the future to exercise both
// racing addListener calls and addListener calls completing after the future completes.
final Runnable listener =
k % 2 == 0 ? collectResultsRunnable : collectResultsTimedGetRunnable;
allTasks.add(listener);
allTasks.add(
new Runnable() {
@Override
public void run() {
currentFuture.get().addListener(listener, executor);
}
});
}
assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check
for (int i = 0; i < 1000; i++) {
Collections.shuffle(allTasks);
final AbstractFuture<String> future = new AbstractFuture<String>() {};
final AbstractFuture<String> setFuture = new AbstractFuture<String>() {};
currentFuture.set(future);
setFutureFuture.set(setFuture);
for (Runnable task : allTasks) {
executor.execute(task);
}
awaitUnchecked(barrier);
assertThat(future.isDone()).isTrue();
// inspect state and ensure it is correct!
// asserts that all get calling threads received the same value
Object result = Iterables.getOnlyElement(finalResults);
if (result == CancellationException.class) {
assertTrue(future.isCancelled());
assertTrue(cancellationSucess.get());
// cancellation can interleave in 3 ways
// 1. prior to setFuture
// 2. after setFuture before set() on the future assigned
// 3. after setFuture and set() are called but before the listener completes.
if (!setFutureSetSucess.get() || !setFutureCompletionSucess.get()) {
// If setFuture fails or set on the future fails then it must be because that future was
// cancelled
assertTrue(setFuture.isCancelled());
assertTrue(setFuture.wasInterrupted()); // we only call cancel(true)
}
} else {
// set on the future completed
assertFalse(cancellationSucess.get());
assertTrue(setFutureSetSucess.get());
assertTrue(setFutureCompletionSucess.get());
}
// reset for next iteration
setFutureSetSucess.set(false);
setFutureCompletionSucess.set(false);
cancellationSucess.set(false);
finalResults.clear();
}
executor.shutdown();
}
// Test to ensure that when calling setFuture with a done future only setFuture or cancel can
// return true.
public void testSetFutureCancelBash_withDoneFuture() {
final CyclicBarrier barrier =
new CyclicBarrier(
2 // for the setter threads
+ 1 // for the blocking get thread,
+ 1); // for the main thread
final ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties());
final AtomicReference<AbstractFuture<String>> currentFuture = Atomics.newReference();
final AtomicBoolean setFutureSuccess = new AtomicBoolean();
final AtomicBoolean cancellationSucess = new AtomicBoolean();
Callable<Void> cancelRunnable =
new Callable<Void>() {
@Override
public Void call() {
cancellationSucess.set(currentFuture.get().cancel(true));
awaitUnchecked(barrier);
return null;
}
};
Callable<Void> setFutureCompleteSucessFullyRunnable =
new Callable<Void>() {
final ListenableFuture<String> future = Futures.immediateFuture("hello");
@Override
public Void call() {
setFutureSuccess.set(currentFuture.get().setFuture(future));
awaitUnchecked(barrier);
return null;
}
};
final Set<Object> finalResults = Collections.synchronizedSet(Sets.newIdentityHashSet());
final Runnable collectResultsRunnable =
new Runnable() {
@Override
public void run() {
try {
String result = Uninterruptibles.getUninterruptibly(currentFuture.get());
finalResults.add(result);
} catch (ExecutionException e) {
finalResults.add(e.getCause());
} catch (CancellationException e) {
finalResults.add(CancellationException.class);
} finally {
awaitUnchecked(barrier);
}
}
};
List<Callable<?>> allTasks = new ArrayList<>();
allTasks.add(cancelRunnable);
allTasks.add(setFutureCompleteSucessFullyRunnable);
allTasks.add(Executors.callable(collectResultsRunnable));
assertEquals(allTasks.size() + 1, barrier.getParties()); // sanity check
for (int i = 0; i < 1000; i++) {
Collections.shuffle(allTasks);
final AbstractFuture<String> future = new AbstractFuture<String>() {};
currentFuture.set(future);
for (Callable<?> task : allTasks) {
@SuppressWarnings("unused") // go/futurereturn-lsc
Future<?> possiblyIgnoredError = executor.submit(task);
}
awaitUnchecked(barrier);
assertThat(future.isDone()).isTrue();
// inspect state and ensure it is correct!
// asserts that all get calling threads received the same value
Object result = Iterables.getOnlyElement(finalResults);
if (result == CancellationException.class) {
assertTrue(future.isCancelled());
assertTrue(cancellationSucess.get());
assertFalse(setFutureSuccess.get());
} else {
assertTrue(setFutureSuccess.get());
assertFalse(cancellationSucess.get());
}
// reset for next iteration
setFutureSuccess.set(false);
cancellationSucess.set(false);
finalResults.clear();
}
executor.shutdown();
}
// In a previous implementation this would cause a stack overflow after ~2000 futures chained
// together. Now it should only be limited by available memory (and time)
public void testSetFuture_stackOverflow() {
SettableFuture<String> orig = SettableFuture.create();
SettableFuture<String> prev = orig;
for (int i = 0; i < 100000; i++) {
SettableFuture<String> curr = SettableFuture.create();
prev.setFuture(curr);
prev = curr;
}
// prev represents the 'innermost' future
prev.set("done");
assertTrue(orig.isDone());
}
public void testSetFuture_misbehavingFutureThrows() throws Exception {
SettableFuture<String> future = SettableFuture.create();
ListenableFuture<String> badFuture =
new ListenableFuture<String>() {
@Override
public boolean cancel(boolean interrupt) {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public boolean isCancelled() {
return false; // BAD!!
}
@Override
public String get() {
throw new CancellationException(); // BAD!!
}
@Override
public String get(long time, TimeUnit unit) {
throw new CancellationException(); // BAD!!
}
@Override
public void addListener(Runnable runnable, Executor executor) {
executor.execute(runnable);
}
};
future.setFuture(badFuture);
ExecutionException expected = getExpectingExecutionException(future);
assertThat(expected).hasCauseThat().isInstanceOf(IllegalArgumentException.class);
assertThat(expected).hasCauseThat().hasMessageThat().contains(badFuture.toString());
}
public void testSetFuture_misbehavingFutureDoesNotThrow() throws Exception {
SettableFuture<String> future = SettableFuture.create();
ListenableFuture<String> badFuture =
new ListenableFuture<String>() {
@Override
public boolean cancel(boolean interrupt) {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public boolean isCancelled() {
return true; // BAD!!
}
@Override
public String get() {
return "foo"; // BAD!!
}
@Override
public String get(long time, TimeUnit unit) {
return "foo"; // BAD!!
}
@Override
public void addListener(Runnable runnable, Executor executor) {
executor.execute(runnable);
}
};
future.setFuture(badFuture);
assertThat(future.isCancelled()).isTrue();
}
public void testCancel_stackOverflow() {
SettableFuture<String> orig = SettableFuture.create();
SettableFuture<String> prev = orig;
for (int i = 0; i < 100000; i++) {
SettableFuture<String> curr = SettableFuture.create();
prev.setFuture(curr);
prev = curr;
}
// orig is the 'outermost future', this should propagate fully down the stack of futures.
orig.cancel(true);
assertTrue(orig.isCancelled());
assertTrue(prev.isCancelled());
assertTrue(prev.wasInterrupted());
}
public void testSetFutureSelf_cancel() {
SettableFuture<String> orig = SettableFuture.create();
orig.setFuture(orig);
orig.cancel(true);
assertTrue(orig.isCancelled());
}
public void testSetFutureSelf_toString() {
SettableFuture<String> orig = SettableFuture.create();
orig.setFuture(orig);
assertThat(orig.toString()).contains("[status=PENDING, info=[setFuture=[this future]]]");
}
public void testSetSelf_toString() {
SettableFuture<Object> orig = SettableFuture.create();
orig.set(orig);
assertThat(orig.toString()).contains("[status=SUCCESS, result=[this future]]");
}
public void testSetIndirectSelf_toString() {
final SettableFuture<Object> orig = SettableFuture.create();
// unlike the above this indirection defeats the trivial cycle detection and causes a SOE
orig.set(
new Object() {
@Override
public String toString() {
return orig.toString();
}
});
try {
orig.toString();
fail();
} catch (StackOverflowError expected) {
}
}
// Regression test for a case where we would fail to execute listeners immediately on done futures
// this would be observable from an afterDone callback
public void testListenersExecuteImmediately_fromAfterDone() {
AbstractFuture<String> f =
new AbstractFuture<String>() {
@Override
protected void afterDone() {
final AtomicBoolean ranImmediately = new AtomicBoolean();
addListener(
new Runnable() {
@Override
public void run() {
ranImmediately.set(true);
}
},
MoreExecutors.directExecutor());
assertThat(ranImmediately.get()).isTrue();
}
};
f.set("foo");
}
// Regression test for a case where we would fail to execute listeners immediately on done futures
// this would be observable from a waiter that was just unblocked.
public void testListenersExecuteImmediately_afterWaiterWakesUp() throws Exception {
final AbstractFuture<String> f =
new AbstractFuture<String>() {
@Override
protected void afterDone() {
// this simply delays executing listeners
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt(); // preserve status
}
}
};
Thread t =
new Thread() {
@Override
public void run() {
f.set("foo");
}
};
t.start();
f.get();
final AtomicBoolean ranImmediately = new AtomicBoolean();
f.addListener(
new Runnable() {
@Override
public void run() {
ranImmediately.set(true);
}
},
MoreExecutors.directExecutor());
assertThat(ranImmediately.get()).isTrue();
t.interrupt();
t.join();
}
public void testTrustedGetFailure_Completed() {
SettableFuture<String> future = SettableFuture.create();
future.set("261");
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testTrustedGetFailure_Failed() {
SettableFuture<String> future = SettableFuture.create();
Throwable failure = new Throwable();
future.setException(failure);
assertThat(future.tryInternalFastPathGetFailure()).isEqualTo(failure);
}
public void testTrustedGetFailure_NotCompleted() {
SettableFuture<String> future = SettableFuture.create();
assertThat(future.isDone()).isFalse();
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testTrustedGetFailure_CanceledNoCause() {
SettableFuture<String> future = SettableFuture.create();
future.cancel(false);
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testGetFailure_Completed() {
AbstractFuture<String> future = new AbstractFuture<String>() {};
future.set("261");
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testGetFailure_Failed() {
AbstractFuture<String> future = new AbstractFuture<String>() {};
final Throwable failure = new Throwable();
future.setException(failure);
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testGetFailure_NotCompleted() {
AbstractFuture<String> future = new AbstractFuture<String>() {};
assertThat(future.isDone()).isFalse();
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testGetFailure_CanceledNoCause() {
AbstractFuture<String> future = new AbstractFuture<String>() {};
future.cancel(false);
assertThat(future.tryInternalFastPathGetFailure()).isNull();
}
public void testForwardExceptionFastPath() throws Exception {
class FailFuture extends InternalFutureFailureAccess implements ListenableFuture<String> {
Throwable failure;
FailFuture(Throwable throwable) {
failure = throwable;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new AssertionFailedError("cancel shouldn't be called on this object");
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public String get() throws InterruptedException, ExecutionException {
throw new AssertionFailedError("get() shouldn't be called on this object");
}
@Override
public String get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
@Override
protected Throwable tryInternalFastPathGetFailure() {
return failure;
}
@Override
public void addListener(Runnable listener, Executor executor) {
throw new AssertionFailedError("addListener() shouldn't be called on this object");
}
}
final RuntimeException exception = new RuntimeException("you still didn't say the magic word!");
SettableFuture<String> normalFuture = SettableFuture.create();
normalFuture.setFuture(new FailFuture(exception));
assertTrue(normalFuture.isDone());
try {
normalFuture.get();
fail();
} catch (ExecutionException e) {
assertSame(exception, e.getCause());
}
}
private static void awaitUnchecked(final CyclicBarrier barrier) {
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void checkStackTrace(ExecutionException e) {
// Our call site for get() should be in the trace.
int index = findStackFrame(e, getClass().getName(), "getExpectingExecutionException");
assertThat(index).isNotEqualTo(0);
// Above our method should be the call to get(). Don't assert on the class
// because it could be some superclass.
assertThat(e.getStackTrace()[index - 1].getMethodName()).isEqualTo("get");
}
private static int findStackFrame(ExecutionException e, String clazz, String method) {
StackTraceElement[] elements = e.getStackTrace();
for (int i = 0; i < elements.length; i++) {
StackTraceElement element = elements[i];
if (element.getClassName().equals(clazz) && element.getMethodName().equals(method)) {
return i;
}
}
AssertionFailedError failure =
new AssertionFailedError(
"Expected element " + clazz + "." + method + " not found in stack trace");
failure.initCause(e);
throw failure;
}
private ExecutionException getExpectingExecutionException(AbstractFuture<String> future)
throws InterruptedException {
try {
String got = future.get();
fail("Expected exception but got " + got);
} catch (ExecutionException e) {
return e;
}
// unreachable, but compiler doesn't know that fail() always throws
return null;
}
private static final class WaiterThread extends Thread {
private final AbstractFuture<?> future;
private WaiterThread(AbstractFuture<?> future) {
this.future = future;
}
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
// nothing
}
}
void awaitWaiting() {
while (!isBlocked()) {
if (getState() == State.TERMINATED) {
throw new RuntimeException("Thread exited");
}
Thread.yield();
}
}
private boolean isBlocked() {
return getState() == Thread.State.WAITING && LockSupport.getBlocker(this) == future;
}
}
static final class TimedWaiterThread extends Thread {
private final AbstractFuture<?> future;
private final long timeout;
private final TimeUnit unit;
private Exception exception;
private volatile long startTime;
private long timeSpentBlocked;
TimedWaiterThread(AbstractFuture<?> future, long timeout, TimeUnit unit) {
this.future = future;
this.timeout = timeout;
this.unit = unit;
}
@Override
public void run() {
startTime = System.nanoTime();
try {
future.get(timeout, unit);
} catch (Exception e) {
// nothing
exception = e;
} finally {
timeSpentBlocked = System.nanoTime() - startTime;
}
}
void awaitWaiting() {
while (!isBlocked()) {
if (getState() == State.TERMINATED) {
throw new RuntimeException("Thread exited");
}
Thread.yield();
}
}
private boolean isBlocked() {
return getState() == Thread.State.TIMED_WAITING && LockSupport.getBlocker(this) == future;
}
}
private static final class PollingThread extends Thread {
private final AbstractFuture<?> future;
private final CountDownLatch completedIteration = new CountDownLatch(10);
private PollingThread(AbstractFuture<?> future) {
this.future = future;
}
@Override
public void run() {
while (true) {
try {
future.get(0, TimeUnit.SECONDS);
return;
} catch (InterruptedException | ExecutionException e) {
return;
} catch (TimeoutException e) {
// do nothing
} finally {
completedIteration.countDown();
}
}
}
void awaitInLoop() {
Uninterruptibles.awaitUninterruptibly(completedIteration);
}
}
private static final class InterruptibleFuture extends AbstractFuture<String> {
boolean interruptTaskWasCalled;
@Override
protected void interruptTask() {
assertFalse(interruptTaskWasCalled);
interruptTaskWasCalled = true;
}
}
}