| /* |
| * Copyright (C) 2008 The Guava Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.google.common.util.concurrent; |
| |
| import static com.google.common.base.Functions.constant; |
| import static com.google.common.base.Functions.identity; |
| import static com.google.common.base.Throwables.propagateIfInstanceOf; |
| import static com.google.common.collect.Iterables.getOnlyElement; |
| import static com.google.common.collect.Lists.newArrayList; |
| import static com.google.common.collect.Sets.intersection; |
| import static com.google.common.truth.Truth.assertThat; |
| import static com.google.common.truth.Truth.assertWithMessage; |
| import static com.google.common.util.concurrent.Futures.allAsList; |
| import static com.google.common.util.concurrent.Futures.catching; |
| import static com.google.common.util.concurrent.Futures.catchingAsync; |
| import static com.google.common.util.concurrent.Futures.getDone; |
| import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; |
| import static com.google.common.util.concurrent.Futures.immediateFailedFuture; |
| import static com.google.common.util.concurrent.Futures.immediateFuture; |
| import static com.google.common.util.concurrent.Futures.immediateVoidFuture; |
| import static com.google.common.util.concurrent.Futures.inCompletionOrder; |
| import static com.google.common.util.concurrent.Futures.lazyTransform; |
| import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; |
| import static com.google.common.util.concurrent.Futures.scheduleAsync; |
| import static com.google.common.util.concurrent.Futures.submit; |
| import static com.google.common.util.concurrent.Futures.submitAsync; |
| import static com.google.common.util.concurrent.Futures.successfulAsList; |
| import static com.google.common.util.concurrent.Futures.transform; |
| import static com.google.common.util.concurrent.Futures.transformAsync; |
| import static com.google.common.util.concurrent.Futures.whenAllComplete; |
| import static com.google.common.util.concurrent.Futures.whenAllSucceed; |
| import static com.google.common.util.concurrent.MoreExecutors.directExecutor; |
| import static com.google.common.util.concurrent.TestPlatform.clearInterrupt; |
| import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; |
| import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; |
| import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; |
| import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; |
| import static java.util.Arrays.asList; |
| import static java.util.concurrent.Executors.newSingleThreadExecutor; |
| import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| |
| import com.google.common.annotations.GwtCompatible; |
| import com.google.common.annotations.GwtIncompatible; |
| import com.google.common.base.Function; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.testing.ClassSanityTester; |
| import com.google.common.testing.GcFinalization; |
| import com.google.common.testing.TestLogHandler; |
| import com.google.errorprone.annotations.CanIgnoreReturnValue; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.lang.ref.WeakReference; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.logging.LogRecord; |
| import java.util.logging.Logger; |
| import junit.framework.AssertionFailedError; |
| import junit.framework.TestCase; |
| import org.checkerframework.checker.nullness.qual.Nullable; |
| |
| /** |
| * Unit tests for {@link Futures}. |
| * |
| * @author Nishant Thakkar |
| */ |
| @GwtCompatible(emulated = true) |
| public class FuturesTest extends TestCase { |
| private static final Logger aggregateFutureLogger = |
| Logger.getLogger(AggregateFuture.class.getName()); |
| private final TestLogHandler aggregateFutureLogHandler = new TestLogHandler(); |
| |
| private static final String DATA1 = "data"; |
| private static final String DATA2 = "more data"; |
| private static final String DATA3 = "most data"; |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| aggregateFutureLogger.addHandler(aggregateFutureLogHandler); |
| } |
| |
| @Override |
| public void tearDown() throws Exception { |
| /* |
| * Clear interrupt for future tests. |
| * |
| * (Ideally we would perform interrupts only in threads that we create, but |
| * it's hard to imagine that anything will break in practice.) |
| */ |
| clearInterrupt(); |
| aggregateFutureLogger.removeHandler(aggregateFutureLogHandler); |
| super.tearDown(); |
| } |
| |
| /* |
| * TODO(cpovirk): Use FutureSubject once it's part of core Truth. But be wary of using it when I'm |
| * really testing a Future implementation (e.g., in the case of immediate*Future()). But it's OK |
| * to use in the case of the majority of Futures that are AbstractFutures. |
| */ |
| |
| public void testImmediateFuture() throws Exception { |
| ListenableFuture<String> future = immediateFuture(DATA1); |
| |
| assertSame(DATA1, getDone(future)); |
| assertSame(DATA1, getDoneFromTimeoutOverload(future)); |
| assertThat(future.toString()).contains("[status=SUCCESS, result=[" + DATA1 + "]]"); |
| } |
| |
| public void testImmediateVoidFuture() throws Exception { |
| ListenableFuture<Void> voidFuture = immediateVoidFuture(); |
| |
| assertThat(getDone(voidFuture)).isNull(); |
| assertThat(getDoneFromTimeoutOverload(voidFuture)).isNull(); |
| assertThat(voidFuture.toString()).contains("[status=SUCCESS, result=[null]]"); |
| } |
| |
| public void testImmediateFailedFuture() throws Exception { |
| Exception exception = new Exception(); |
| ListenableFuture<String> future = immediateFailedFuture(exception); |
| assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); |
| |
| try { |
| getDone(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| |
| try { |
| getDoneFromTimeoutOverload(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| } |
| |
| public void testImmediateFailedFuture_cancellationException() throws Exception { |
| CancellationException exception = new CancellationException(); |
| ListenableFuture<String> future = immediateFailedFuture(exception); |
| assertFalse(future.isCancelled()); |
| assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); |
| |
| try { |
| getDone(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| |
| try { |
| getDoneFromTimeoutOverload(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| } |
| |
| public void testImmediateCancelledFutureBasic() throws Exception { |
| ListenableFuture<String> future = CallerClass1.makeImmediateCancelledFuture(); |
| assertTrue(future.isCancelled()); |
| } |
| |
| @GwtIncompatible |
| public void testImmediateCancelledFutureStack() throws Exception { |
| ListenableFuture<String> future = CallerClass1.makeImmediateCancelledFuture(); |
| assertTrue(future.isCancelled()); |
| |
| try { |
| CallerClass2.get(future); |
| fail(); |
| } catch (CancellationException expected) { |
| // There should be two CancellationException chained together. The outer one should have the |
| // stack trace of where the get() call was made, and the inner should have the stack trace of |
| // where the immediateCancelledFuture() call was made. |
| List<StackTraceElement> stackTrace = ImmutableList.copyOf(expected.getStackTrace()); |
| assertFalse(Iterables.any(stackTrace, hasClassName(CallerClass1.class))); |
| assertTrue(Iterables.any(stackTrace, hasClassName(CallerClass2.class))); |
| |
| // See AbstractFutureCancellationCauseTest for how to set causes. |
| assertThat(expected.getCause()).isNull(); |
| } |
| } |
| |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static Predicate<StackTraceElement> hasClassName(final Class<?> clazz) { |
| return new Predicate<StackTraceElement>() { |
| @Override |
| public boolean apply(StackTraceElement element) { |
| return element.getClassName().equals(clazz.getName()); |
| } |
| }; |
| } |
| |
| private static final class CallerClass1 { |
| static ListenableFuture<String> makeImmediateCancelledFuture() { |
| return immediateCancelledFuture(); |
| } |
| } |
| |
| private static final class CallerClass2 { |
| @CanIgnoreReturnValue |
| static <V> V get(ListenableFuture<V> future) throws ExecutionException, InterruptedException { |
| return getDone(future); |
| } |
| } |
| |
| private static class MyException extends Exception {} |
| |
| // Class hierarchy for generics sanity checks |
| private static class Foo {} |
| |
| private static class FooChild extends Foo {} |
| |
| private static class Bar {} |
| |
| private static class BarChild extends Bar {} |
| |
| public void testTransform_genericsNull() throws Exception { |
| ListenableFuture<?> nullFuture = immediateFuture(null); |
| ListenableFuture<?> transformedFuture = transform(nullFuture, constant(null), directExecutor()); |
| assertNull(getDone(transformedFuture)); |
| } |
| |
| public void testTransform_genericsHierarchy() throws Exception { |
| ListenableFuture<FooChild> future = immediateFuture(null); |
| final BarChild barChild = new BarChild(); |
| Function<Foo, BarChild> function = |
| new Function<Foo, BarChild>() { |
| @Override |
| public BarChild apply(Foo unused) { |
| return barChild; |
| } |
| }; |
| Bar bar = getDone(transform(future, function, directExecutor())); |
| assertSame(barChild, bar); |
| } |
| |
| /* |
| * Android does not handle this stack overflow gracefully... though somehow some other |
| * stack-overflow tests work. It must depend on the exact place the error occurs. |
| */ |
| @AndroidIncompatible |
| @GwtIncompatible // StackOverflowError |
| public void testTransform_StackOverflow() throws Exception { |
| { |
| /* |
| * Initialize all relevant classes before running the test, which may otherwise poison any |
| * classes it is trying to load during its stack overflow. |
| */ |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> unused = transform(root, identity(), directExecutor()); |
| root.set("foo"); |
| } |
| |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> output = root; |
| for (int i = 0; i < 10000; i++) { |
| output = transform(output, identity(), directExecutor()); |
| } |
| try { |
| root.set("foo"); |
| fail(); |
| } catch (StackOverflowError expected) { |
| } |
| } |
| |
| public void testTransform_ErrorAfterCancellation() throws Exception { |
| class Transformer implements Function<Object, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public Object apply(Object input) { |
| output.cancel(false); |
| throw new MyError(); |
| } |
| } |
| Transformer transformer = new Transformer(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = transform(input, transformer, directExecutor()); |
| transformer.output = output; |
| |
| input.set("foo"); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testTransform_ExceptionAfterCancellation() throws Exception { |
| class Transformer implements Function<Object, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public Object apply(Object input) { |
| output.cancel(false); |
| throw new MyRuntimeException(); |
| } |
| } |
| Transformer transformer = new Transformer(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = transform(input, transformer, directExecutor()); |
| transformer.output = output; |
| |
| input.set("foo"); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testTransform_getThrowsRuntimeException() throws Exception { |
| ListenableFuture<Object> input = |
| UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); |
| |
| ListenableFuture<Object> output = transform(input, identity(), directExecutor()); |
| try { |
| getDone(output); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); |
| } |
| } |
| |
| public void testTransform_getThrowsError() throws Exception { |
| ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); |
| |
| ListenableFuture<Object> output = transform(input, identity(), directExecutor()); |
| try { |
| getDone(output); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyError.class); |
| } |
| } |
| |
| public void testTransform_listenerThrowsError() throws Exception { |
| SettableFuture<Object> input = SettableFuture.create(); |
| ListenableFuture<Object> output = transform(input, identity(), directExecutor()); |
| |
| output.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| throw new MyError(); |
| } |
| }, |
| directExecutor()); |
| try { |
| input.set("foo"); |
| fail(); |
| } catch (MyError expected) { |
| } |
| } |
| |
| public void testTransformAsync_cancelPropagatesToInput() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| AsyncFunction<Foo, Bar> function = |
| new AsyncFunction<Foo, Bar>() { |
| @Override |
| public ListenableFuture<Bar> apply(Foo unused) { |
| throw new AssertionFailedError("Unexpeted call to apply."); |
| } |
| }; |
| assertTrue(transformAsync(input, function, directExecutor()).cancel(false)); |
| assertTrue(input.isCancelled()); |
| assertFalse(input.wasInterrupted()); |
| } |
| |
| public void testTransformAsync_interruptPropagatesToInput() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| AsyncFunction<Foo, Bar> function = |
| new AsyncFunction<Foo, Bar>() { |
| @Override |
| public ListenableFuture<Bar> apply(Foo unused) { |
| throw new AssertionFailedError("Unexpeted call to apply."); |
| } |
| }; |
| assertTrue(transformAsync(input, function, directExecutor()).cancel(true)); |
| assertTrue(input.isCancelled()); |
| assertTrue(input.wasInterrupted()); |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testTransformAsync_interruptPropagatesToTransformingThread() throws Exception { |
| SettableFuture<String> input = SettableFuture.create(); |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); |
| final CountDownLatch gotException = new CountDownLatch(1); |
| AsyncFunction<String, String> function = |
| new AsyncFunction<String, String>() { |
| @Override |
| public ListenableFuture<String> apply(String s) throws Exception { |
| inFunction.countDown(); |
| try { |
| shouldCompleteFunction.await(); |
| } catch (InterruptedException expected) { |
| gotException.countDown(); |
| throw expected; |
| } |
| return immediateFuture("a"); |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = |
| transformAsync(input, function, newSingleThreadExecutor()); |
| |
| input.set("value"); |
| inFunction.await(); |
| futureResult.cancel(true); |
| shouldCompleteFunction.countDown(); |
| try { |
| futureResult.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| // TODO(cpovirk): implement interruption, updating this test: |
| // https://github.com/google/guava/issues/1989 |
| assertEquals(1, gotException.getCount()); |
| // gotException.await(); |
| } |
| |
| public void testTransformAsync_cancelPropagatesToAsyncOutput() throws Exception { |
| ListenableFuture<Foo> immediate = immediateFuture(new Foo()); |
| final SettableFuture<Bar> secondary = SettableFuture.create(); |
| AsyncFunction<Foo, Bar> function = |
| new AsyncFunction<Foo, Bar>() { |
| @Override |
| public ListenableFuture<Bar> apply(Foo unused) { |
| return secondary; |
| } |
| }; |
| assertTrue(transformAsync(immediate, function, directExecutor()).cancel(false)); |
| assertTrue(secondary.isCancelled()); |
| assertFalse(secondary.wasInterrupted()); |
| } |
| |
| public void testTransformAsync_interruptPropagatesToAsyncOutput() throws Exception { |
| ListenableFuture<Foo> immediate = immediateFuture(new Foo()); |
| final SettableFuture<Bar> secondary = SettableFuture.create(); |
| AsyncFunction<Foo, Bar> function = |
| new AsyncFunction<Foo, Bar>() { |
| @Override |
| public ListenableFuture<Bar> apply(Foo unused) { |
| return secondary; |
| } |
| }; |
| assertTrue(transformAsync(immediate, function, directExecutor()).cancel(true)); |
| assertTrue(secondary.isCancelled()); |
| assertTrue(secondary.wasInterrupted()); |
| } |
| |
| public void testTransformAsync_inputCancelButNotInterruptPropagatesToOutput() throws Exception { |
| SettableFuture<Foo> f1 = SettableFuture.create(); |
| final SettableFuture<Bar> secondary = SettableFuture.create(); |
| AsyncFunction<Foo, Bar> function = |
| new AsyncFunction<Foo, Bar>() { |
| @Override |
| public ListenableFuture<Bar> apply(Foo unused) { |
| return secondary; |
| } |
| }; |
| ListenableFuture<Bar> f2 = transformAsync(f1, function, directExecutor()); |
| f1.cancel(true); |
| assertTrue(f2.isCancelled()); |
| /* |
| * We might like to propagate interruption, too, but it's not clear that it matters. For now, we |
| * test for the behavior that we have today. |
| */ |
| assertFalse(((AbstractFuture<?>) f2).wasInterrupted()); |
| } |
| |
| /* |
| * Android does not handle this stack overflow gracefully... though somehow some other |
| * stack-overflow tests work. It must depend on the exact place the error occurs. |
| */ |
| @AndroidIncompatible |
| @GwtIncompatible // StackOverflowError |
| public void testTransformAsync_StackOverflow() throws Exception { |
| { |
| /* |
| * Initialize all relevant classes before running the test, which may otherwise poison any |
| * classes it is trying to load during its stack overflow. |
| */ |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> unused = transformAsync(root, asyncIdentity(), directExecutor()); |
| root.set("foo"); |
| } |
| |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> output = root; |
| for (int i = 0; i < 10000; i++) { |
| output = transformAsync(output, asyncIdentity(), directExecutor()); |
| } |
| try { |
| root.set("foo"); |
| fail(); |
| } catch (StackOverflowError expected) { |
| } |
| } |
| |
| public void testTransformAsync_ErrorAfterCancellation() throws Exception { |
| class Transformer implements AsyncFunction<Object, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public ListenableFuture<Object> apply(Object input) { |
| output.cancel(false); |
| throw new MyError(); |
| } |
| } |
| Transformer transformer = new Transformer(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = transformAsync(input, transformer, directExecutor()); |
| transformer.output = output; |
| |
| input.set("foo"); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testTransformAsync_ExceptionAfterCancellation() throws Exception { |
| class Transformer implements AsyncFunction<Object, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public ListenableFuture<Object> apply(Object input) { |
| output.cancel(false); |
| throw new MyRuntimeException(); |
| } |
| } |
| Transformer transformer = new Transformer(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = transformAsync(input, transformer, directExecutor()); |
| transformer.output = output; |
| |
| input.set("foo"); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testTransformAsync_getThrowsRuntimeException() throws Exception { |
| ListenableFuture<Object> input = |
| UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); |
| |
| ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); |
| try { |
| getDone(output); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); |
| } |
| } |
| |
| public void testTransformAsync_getThrowsError() throws Exception { |
| ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); |
| |
| ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); |
| try { |
| getDone(output); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyError.class); |
| } |
| } |
| |
| public void testTransformAsync_listenerThrowsError() throws Exception { |
| SettableFuture<Object> input = SettableFuture.create(); |
| ListenableFuture<Object> output = transformAsync(input, asyncIdentity(), directExecutor()); |
| |
| output.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| throw new MyError(); |
| } |
| }, |
| directExecutor()); |
| try { |
| input.set("foo"); |
| fail(); |
| } catch (MyError expected) { |
| } |
| } |
| |
| public void testTransform_rejectionPropagatesToOutput() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| Function<Foo, Foo> identity = identity(); |
| ListenableFuture<Foo> transformed = transform(input, identity, REJECTING_EXECUTOR); |
| input.set(new Foo()); |
| try { |
| getDone(transformed); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); |
| } |
| } |
| |
| public void testTransformAsync_rejectionPropagatesToOutput() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| AsyncFunction<Foo, Foo> asyncIdentity = asyncIdentity(); |
| ListenableFuture<Foo> transformed = transformAsync(input, asyncIdentity, REJECTING_EXECUTOR); |
| input.set(new Foo()); |
| try { |
| getDone(transformed); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); |
| } |
| } |
| |
| /** Tests that the function is invoked only once, even if it throws an exception. */ |
| public void testTransformValueRemainsMemoized() throws Exception { |
| class Holder { |
| |
| int value = 2; |
| } |
| final Holder holder = new Holder(); |
| |
| // This function adds the holder's value to the input value. |
| Function<Integer, Integer> adder = |
| new Function<Integer, Integer>() { |
| @Override |
| public Integer apply(Integer from) { |
| return from + holder.value; |
| } |
| }; |
| |
| // Since holder.value is 2, applying 4 should yield 6. |
| assertEquals(6, adder.apply(4).intValue()); |
| |
| ListenableFuture<Integer> immediateFuture = immediateFuture(4); |
| Future<Integer> transformedFuture = transform(immediateFuture, adder, directExecutor()); |
| |
| // The composed future also yields 6. |
| assertEquals(6, getDone(transformedFuture).intValue()); |
| |
| // Repeated calls yield the same value even though the function's behavior |
| // changes |
| holder.value = 3; |
| assertEquals(6, getDone(transformedFuture).intValue()); |
| assertEquals(7, adder.apply(4).intValue()); |
| |
| // Once more, with feeling. |
| holder.value = 4; |
| assertEquals(6, getDone(transformedFuture).intValue()); |
| assertEquals(8, adder.apply(4).intValue()); |
| |
| // Memoized get also retains the value. |
| assertEquals(6, getDoneFromTimeoutOverload(transformedFuture).intValue()); |
| |
| // Unsurprisingly, recomposing the future will return an updated value. |
| assertEquals(8, getDone(transform(immediateFuture, adder, directExecutor())).intValue()); |
| |
| // Repeating, with the timeout version |
| assertEquals( |
| 8, |
| getDoneFromTimeoutOverload(transform(immediateFuture, adder, directExecutor())).intValue()); |
| } |
| |
| static class MyError extends Error {} |
| |
| static class MyRuntimeException extends RuntimeException {} |
| |
| /** |
| * Test that the function is invoked only once, even if it throws an exception. Also, test that |
| * that function's result is wrapped in an ExecutionException. |
| */ |
| @GwtIncompatible // reflection |
| public void testTransformExceptionRemainsMemoized() throws Throwable { |
| // We need to test with two input futures since ExecutionList.execute |
| // doesn't catch Errors and we cannot depend on the order that our |
| // transformations run. (So it is possible that the Error being thrown |
| // could prevent our second transformations from running). |
| SettableFuture<Integer> exceptionInput = SettableFuture.create(); |
| ListenableFuture<Integer> exceptionComposedFuture = |
| transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); |
| exceptionInput.set(0); |
| runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); |
| |
| SettableFuture<Integer> errorInput = SettableFuture.create(); |
| ListenableFuture<Integer> errorComposedFuture = |
| transform(errorInput, newOneTimeErrorThrower(), directExecutor()); |
| errorInput.set(0); |
| |
| runGetIdempotencyTest(errorComposedFuture, MyError.class); |
| |
| /* |
| * Try again when the input's value is already filled in, since the flow is |
| * slightly different in that case. |
| */ |
| exceptionComposedFuture = |
| transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); |
| runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); |
| |
| runGetIdempotencyTest( |
| transform(errorInput, newOneTimeErrorThrower(), directExecutor()), MyError.class); |
| runGetIdempotencyTest(errorComposedFuture, MyError.class); |
| } |
| |
| @GwtIncompatible // reflection |
| private static void runGetIdempotencyTest( |
| Future<Integer> transformedFuture, Class<? extends Throwable> expectedExceptionClass) |
| throws Throwable { |
| for (int i = 0; i < 5; i++) { |
| try { |
| getDone(transformedFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| if (!expectedExceptionClass.isInstance(expected.getCause())) { |
| throw expected.getCause(); |
| } |
| } |
| } |
| } |
| |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static Function<Integer, Integer> newOneTimeExceptionThrower() { |
| return new Function<Integer, Integer>() { |
| int calls = 0; |
| |
| @Override |
| public Integer apply(Integer from) { |
| if (++calls > 1) { |
| fail(); |
| } |
| throw new MyRuntimeException(); |
| } |
| }; |
| } |
| |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static Function<Integer, Integer> newOneTimeErrorThrower() { |
| return new Function<Integer, Integer>() { |
| int calls = 0; |
| |
| @Override |
| public Integer apply(Integer from) { |
| if (++calls > 1) { |
| fail(); |
| } |
| throw new MyError(); |
| } |
| }; |
| } |
| |
| // TODO(cpovirk): top-level class? |
| static class ExecutorSpy implements Executor { |
| |
| Executor delegate; |
| boolean wasExecuted; |
| |
| public ExecutorSpy(Executor delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| public void execute(Runnable command) { |
| delegate.execute(command); |
| wasExecuted = true; |
| } |
| } |
| |
| public void testTransform_Executor() throws Exception { |
| Object value = new Object(); |
| ExecutorSpy spy = new ExecutorSpy(directExecutor()); |
| |
| assertFalse(spy.wasExecuted); |
| |
| ListenableFuture<Object> future = transform(immediateFuture(value), identity(), spy); |
| |
| assertSame(value, getDone(future)); |
| assertTrue(spy.wasExecuted); |
| } |
| |
| @GwtIncompatible // Threads |
| |
| public void testTransformAsync_functionToString() throws Exception { |
| final CountDownLatch functionCalled = new CountDownLatch(1); |
| final CountDownLatch functionBlocking = new CountDownLatch(1); |
| AsyncFunction<Object, Object> function = |
| tagged( |
| "Called my toString", |
| new AsyncFunction<Object, Object>() { |
| @Override |
| public ListenableFuture<Object> apply(Object input) throws Exception { |
| functionCalled.countDown(); |
| functionBlocking.await(); |
| return immediateFuture(null); |
| } |
| }); |
| |
| ExecutorService executor = Executors.newSingleThreadExecutor(); |
| try { |
| ListenableFuture<?> output = |
| Futures.transformAsync(immediateFuture(null), function, executor); |
| functionCalled.await(); |
| assertThat(output.toString()).contains(function.toString()); |
| } finally { |
| functionBlocking.countDown(); |
| executor.shutdown(); |
| } |
| } |
| |
| @GwtIncompatible // lazyTransform |
| public void testLazyTransform() throws Exception { |
| FunctionSpy<Object, String> spy = new FunctionSpy<>(constant("bar")); |
| Future<String> input = immediateFuture("foo"); |
| Future<String> transformed = lazyTransform(input, spy); |
| spy.verifyCallCount(0); |
| assertEquals("bar", getDone(transformed)); |
| spy.verifyCallCount(1); |
| assertEquals("bar", getDone(transformed)); |
| spy.verifyCallCount(2); |
| } |
| |
| @GwtIncompatible // lazyTransform |
| public void testLazyTransform_exception() throws Exception { |
| final RuntimeException exception = new RuntimeException("deliberate"); |
| Function<Integer, String> function = |
| new Function<Integer, String>() { |
| @Override |
| public String apply(Integer input) { |
| throw exception; |
| } |
| }; |
| Future<String> transformed = lazyTransform(immediateFuture(1), function); |
| try { |
| getDone(transformed); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| try { |
| getDoneFromTimeoutOverload(transformed); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| } |
| |
| private static class FunctionSpy<I, O> implements Function<I, O> { |
| private int applyCount; |
| private final Function<I, O> delegate; |
| |
| public FunctionSpy(Function<I, O> delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| public O apply(I input) { |
| applyCount++; |
| return delegate.apply(input); |
| } |
| |
| void verifyCallCount(int expected) { |
| assertThat(applyCount).isEqualTo(expected); |
| } |
| } |
| |
| private static <X extends Throwable, V> Function<X, V> unexpectedFunction() { |
| return new Function<X, V>() { |
| @Override |
| public V apply(X t) { |
| throw newAssertionError("Unexpected fallback", t); |
| } |
| }; |
| } |
| |
| private static class AsyncFunctionSpy<X extends Throwable, V> implements AsyncFunction<X, V> { |
| private int count; |
| private final AsyncFunction<X, V> delegate; |
| |
| public AsyncFunctionSpy(AsyncFunction<X, V> delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| public final ListenableFuture<V> apply(X t) throws Exception { |
| count++; |
| return delegate.apply(t); |
| } |
| |
| void verifyCallCount(int expected) { |
| assertThat(count).isEqualTo(expected); |
| } |
| } |
| |
| private static <I, O> FunctionSpy<I, O> spy(Function<I, O> delegate) { |
| return new FunctionSpy<>(delegate); |
| } |
| |
| private static <X extends Throwable, V> AsyncFunctionSpy<X, V> spy(AsyncFunction<X, V> delegate) { |
| return new AsyncFunctionSpy<>(delegate); |
| } |
| |
| private static <X extends Throwable, V> AsyncFunction<X, V> unexpectedAsyncFunction() { |
| return new AsyncFunction<X, V>() { |
| @Override |
| public ListenableFuture<V> apply(X t) { |
| throw newAssertionError("Unexpected fallback", t); |
| } |
| }; |
| } |
| |
| /** Alternative to AssertionError(String, Throwable), which doesn't exist in GWT 2.6.1. */ |
| private static AssertionError newAssertionError(String message, Throwable cause) { |
| AssertionError e = new AssertionError(message); |
| e.initCause(cause); |
| return e; |
| } |
| |
| // catchingAsync tests cloned from the old withFallback tests: |
| |
| public void testCatchingAsync_inputDoesNotRaiseException() throws Exception { |
| AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); |
| ListenableFuture<Integer> originalFuture = immediateFuture(7); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); |
| assertEquals(7, getDone(faultTolerantFuture).intValue()); |
| } |
| |
| public void testCatchingAsync_inputRaisesException() throws Exception { |
| final RuntimeException raisedException = new RuntimeException(); |
| AsyncFunctionSpy<Throwable, Integer> fallback = |
| spy( |
| new AsyncFunction<Throwable, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(Throwable t) throws Exception { |
| assertThat(t).isSameInstanceAs(raisedException); |
| return immediateFuture(20); |
| } |
| }); |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); |
| assertEquals(20, getDone(faultTolerantFuture).intValue()); |
| fallback.verifyCallCount(1); |
| } |
| |
| @GwtIncompatible // non-Throwable exceptionType |
| public void testCatchingAsync_inputCancelledWithoutFallback() throws Exception { |
| AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); |
| ListenableFuture<Integer> originalFuture = immediateCancelledFuture(); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); |
| assertTrue(faultTolerantFuture.isCancelled()); |
| } |
| |
| public void testCatchingAsync_fallbackGeneratesRuntimeException() throws Exception { |
| RuntimeException expectedException = new RuntimeException(); |
| runExpectedExceptionCatchingAsyncTest(expectedException, false); |
| } |
| |
| public void testCatchingAsync_fallbackGeneratesCheckedException() throws Exception { |
| Exception expectedException = new Exception() {}; |
| runExpectedExceptionCatchingAsyncTest(expectedException, false); |
| } |
| |
| public void testCatchingAsync_fallbackGeneratesError() throws Exception { |
| final Error error = new Error("deliberate"); |
| AsyncFunction<Throwable, Integer> fallback = |
| new AsyncFunction<Throwable, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(Throwable t) throws Exception { |
| throw error; |
| } |
| }; |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); |
| try { |
| getDone(catchingAsync(failingFuture, Throwable.class, fallback, directExecutor())); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(error, expected.getCause()); |
| } |
| } |
| |
| public void testCatchingAsync_fallbackReturnsRuntimeException() throws Exception { |
| RuntimeException expectedException = new RuntimeException(); |
| runExpectedExceptionCatchingAsyncTest(expectedException, true); |
| } |
| |
| public void testCatchingAsync_fallbackReturnsCheckedException() throws Exception { |
| Exception expectedException = new Exception() {}; |
| runExpectedExceptionCatchingAsyncTest(expectedException, true); |
| } |
| |
| private void runExpectedExceptionCatchingAsyncTest( |
| final Exception expectedException, final boolean wrapInFuture) throws Exception { |
| AsyncFunctionSpy<Throwable, Integer> fallback = |
| spy( |
| new AsyncFunction<Throwable, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(Throwable t) throws Exception { |
| if (!wrapInFuture) { |
| throw expectedException; |
| } else { |
| return immediateFailedFuture(expectedException); |
| } |
| } |
| }); |
| |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); |
| |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); |
| try { |
| getDone(faultTolerantFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(expectedException, expected.getCause()); |
| } |
| fallback.verifyCallCount(1); |
| } |
| |
| public void testCatchingAsync_fallbackNotReady() throws Exception { |
| ListenableFuture<Integer> primary = immediateFailedFuture(new Exception()); |
| final SettableFuture<Integer> secondary = SettableFuture.create(); |
| AsyncFunction<Throwable, Integer> fallback = |
| new AsyncFunction<Throwable, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(Throwable t) { |
| return secondary; |
| } |
| }; |
| ListenableFuture<Integer> derived = |
| catchingAsync(primary, Throwable.class, fallback, directExecutor()); |
| secondary.set(1); |
| assertEquals(1, (int) getDone(derived)); |
| } |
| |
| public void testCatchingAsync_resultInterruptedBeforeFallback() throws Exception { |
| SettableFuture<Integer> primary = SettableFuture.create(); |
| AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); |
| ListenableFuture<Integer> derived = |
| catchingAsync(primary, Throwable.class, fallback, directExecutor()); |
| derived.cancel(true); |
| assertTrue(primary.isCancelled()); |
| assertTrue(primary.wasInterrupted()); |
| } |
| |
| public void testCatchingAsync_resultCancelledBeforeFallback() throws Exception { |
| SettableFuture<Integer> primary = SettableFuture.create(); |
| AsyncFunction<Throwable, Integer> fallback = unexpectedAsyncFunction(); |
| ListenableFuture<Integer> derived = |
| catchingAsync(primary, Throwable.class, fallback, directExecutor()); |
| derived.cancel(false); |
| assertTrue(primary.isCancelled()); |
| assertFalse(primary.wasInterrupted()); |
| } |
| |
| @GwtIncompatible // mocks |
| // TODO(cpovirk): eliminate use of mocks |
| @SuppressWarnings("unchecked") |
| public void testCatchingAsync_resultCancelledAfterFallback() throws Exception { |
| final SettableFuture<Integer> secondary = SettableFuture.create(); |
| final RuntimeException raisedException = new RuntimeException(); |
| AsyncFunctionSpy<Throwable, Integer> fallback = |
| spy( |
| new AsyncFunction<Throwable, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(Throwable t) throws Exception { |
| assertThat(t).isSameInstanceAs(raisedException); |
| return secondary; |
| } |
| }); |
| |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); |
| |
| ListenableFuture<Integer> derived = |
| catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); |
| derived.cancel(false); |
| assertTrue(secondary.isCancelled()); |
| assertFalse(secondary.wasInterrupted()); |
| fallback.verifyCallCount(1); |
| } |
| |
| public void testCatchingAsync_nullInsteadOfFuture() throws Exception { |
| ListenableFuture<Integer> inputFuture = immediateFailedFuture(new Exception()); |
| ListenableFuture<?> chainedFuture = |
| catchingAsync( |
| inputFuture, |
| Throwable.class, |
| new AsyncFunction<Throwable, Integer>() { |
| @Override |
| @SuppressWarnings("AsyncFunctionReturnsNull") |
| public ListenableFuture<Integer> apply(Throwable t) { |
| return null; |
| } |
| }, |
| directExecutor()); |
| try { |
| getDone(chainedFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| NullPointerException cause = (NullPointerException) expected.getCause(); |
| assertThat(cause) |
| .hasMessageThat() |
| .contains( |
| "AsyncFunction.apply returned null instead of a Future. " |
| + "Did you mean to return immediateFuture(null)?"); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testCatchingAsync_interruptPropagatesToTransformingThread() throws Exception { |
| SettableFuture<String> input = SettableFuture.create(); |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); |
| final CountDownLatch gotException = new CountDownLatch(1); |
| AsyncFunction<Throwable, String> function = |
| new AsyncFunction<Throwable, String>() { |
| @Override |
| public ListenableFuture<String> apply(Throwable t) throws Exception { |
| inFunction.countDown(); |
| try { |
| shouldCompleteFunction.await(); |
| } catch (InterruptedException expected) { |
| gotException.countDown(); |
| throw expected; |
| } |
| return immediateFuture("a"); |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = |
| catchingAsync(input, Exception.class, function, newSingleThreadExecutor()); |
| |
| input.setException(new Exception()); |
| inFunction.await(); |
| futureResult.cancel(true); |
| shouldCompleteFunction.countDown(); |
| try { |
| futureResult.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| // TODO(cpovirk): implement interruption, updating this test: |
| // https://github.com/google/guava/issues/1989 |
| assertEquals(1, gotException.getCount()); |
| // gotException.await(); |
| } |
| |
| @GwtIncompatible // Threads |
| |
| public void testCatchingAsync_functionToString() throws Exception { |
| final CountDownLatch functionCalled = new CountDownLatch(1); |
| final CountDownLatch functionBlocking = new CountDownLatch(1); |
| AsyncFunction<Object, Object> function = |
| tagged( |
| "Called my toString", |
| new AsyncFunction<Object, Object>() { |
| @Override |
| public ListenableFuture<Object> apply(Object input) throws Exception { |
| functionCalled.countDown(); |
| functionBlocking.await(); |
| return immediateFuture(null); |
| } |
| }); |
| |
| ExecutorService executor = Executors.newSingleThreadExecutor(); |
| try { |
| ListenableFuture<?> output = |
| Futures.catchingAsync( |
| immediateFailedFuture(new RuntimeException()), Throwable.class, function, executor); |
| functionCalled.await(); |
| assertThat(output.toString()).contains(function.toString()); |
| } finally { |
| functionBlocking.countDown(); |
| executor.shutdown(); |
| } |
| } |
| |
| public void testCatchingAsync_futureToString() throws Exception { |
| final SettableFuture<Object> toReturn = SettableFuture.create(); |
| AsyncFunction<Object, Object> function = |
| tagged( |
| "Called my toString", |
| new AsyncFunction<Object, Object>() { |
| @Override |
| public ListenableFuture<Object> apply(Object input) throws Exception { |
| return toReturn; |
| } |
| }); |
| |
| ListenableFuture<?> output = |
| Futures.catchingAsync( |
| immediateFailedFuture(new RuntimeException()), |
| Throwable.class, |
| function, |
| directExecutor()); |
| assertThat(output.toString()).contains(toReturn.toString()); |
| } |
| |
| // catching tests cloned from the old withFallback tests: |
| |
| public void testCatching_inputDoesNotRaiseException() throws Exception { |
| Function<Throwable, Integer> fallback = unexpectedFunction(); |
| ListenableFuture<Integer> originalFuture = immediateFuture(7); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(originalFuture, Throwable.class, fallback, directExecutor()); |
| assertEquals(7, getDone(faultTolerantFuture).intValue()); |
| } |
| |
| public void testCatching_inputRaisesException() throws Exception { |
| final RuntimeException raisedException = new RuntimeException(); |
| FunctionSpy<Throwable, Integer> fallback = |
| spy( |
| new Function<Throwable, Integer>() { |
| @Override |
| public Integer apply(Throwable t) { |
| assertThat(t).isSameInstanceAs(raisedException); |
| return 20; |
| } |
| }); |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(raisedException); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(failingFuture, Throwable.class, fallback, directExecutor()); |
| assertEquals(20, getDone(faultTolerantFuture).intValue()); |
| fallback.verifyCallCount(1); |
| } |
| |
| @GwtIncompatible // non-Throwable exceptionType |
| public void testCatching_inputCancelledWithoutFallback() throws Exception { |
| Function<IOException, Integer> fallback = unexpectedFunction(); |
| ListenableFuture<Integer> originalFuture = immediateCancelledFuture(); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(originalFuture, IOException.class, fallback, directExecutor()); |
| assertTrue(faultTolerantFuture.isCancelled()); |
| } |
| |
| public void testCatching_fallbackGeneratesRuntimeException() throws Exception { |
| RuntimeException expectedException = new RuntimeException(); |
| runExpectedExceptionCatchingTest(expectedException); |
| } |
| |
| /* |
| * catching() uses a plain Function, so there's no |
| * testCatching_fallbackGeneratesCheckedException(). |
| */ |
| |
| public void testCatching_fallbackGeneratesError() throws Exception { |
| final Error error = new Error("deliberate"); |
| Function<Throwable, Integer> fallback = |
| new Function<Throwable, Integer>() { |
| @Override |
| public Integer apply(Throwable t) { |
| throw error; |
| } |
| }; |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); |
| try { |
| getDone(catching(failingFuture, Throwable.class, fallback, directExecutor())); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(error, expected.getCause()); |
| } |
| } |
| |
| /* |
| * catching() uses a plain Function, so there's no testCatching_fallbackReturnsRuntimeException() |
| * or testCatching_fallbackReturnsCheckedException(). |
| */ |
| |
| private void runExpectedExceptionCatchingTest(final RuntimeException expectedException) |
| throws Exception { |
| FunctionSpy<Throwable, Integer> fallback = |
| spy( |
| new Function<Throwable, Integer>() { |
| @Override |
| public Integer apply(Throwable t) { |
| throw expectedException; |
| } |
| }); |
| |
| ListenableFuture<Integer> failingFuture = immediateFailedFuture(new RuntimeException()); |
| |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(failingFuture, Throwable.class, fallback, directExecutor()); |
| try { |
| getDone(faultTolerantFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(expectedException, expected.getCause()); |
| } |
| fallback.verifyCallCount(1); |
| } |
| |
| // catching() uses a plain Function, so there's no testCatching_fallbackNotReady(). |
| |
| public void testCatching_resultInterruptedBeforeFallback() throws Exception { |
| SettableFuture<Integer> primary = SettableFuture.create(); |
| Function<Throwable, Integer> fallback = unexpectedFunction(); |
| ListenableFuture<Integer> derived = |
| catching(primary, Throwable.class, fallback, directExecutor()); |
| derived.cancel(true); |
| assertTrue(primary.isCancelled()); |
| assertTrue(primary.wasInterrupted()); |
| } |
| |
| public void testCatching_resultCancelledBeforeFallback() throws Exception { |
| SettableFuture<Integer> primary = SettableFuture.create(); |
| Function<Throwable, Integer> fallback = unexpectedFunction(); |
| ListenableFuture<Integer> derived = |
| catching(primary, Throwable.class, fallback, directExecutor()); |
| derived.cancel(false); |
| assertTrue(primary.isCancelled()); |
| assertFalse(primary.wasInterrupted()); |
| } |
| |
| // catching() uses a plain Function, so there's no testCatching_resultCancelledAfterFallback(). |
| |
| // catching() uses a plain Function, so there's no testCatching_nullInsteadOfFuture(). |
| |
| // Some tests of the exceptionType parameter: |
| |
| public void testCatching_Throwable() throws Exception { |
| Function<Throwable, Integer> fallback = functionReturningOne(); |
| ListenableFuture<Integer> originalFuture = immediateFailedFuture(new IOException()); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(originalFuture, Throwable.class, fallback, directExecutor()); |
| assertEquals(1, (int) getDone(faultTolerantFuture)); |
| } |
| |
| @GwtIncompatible // non-Throwable exceptionType |
| public void testCatching_customTypeMatch() throws Exception { |
| Function<IOException, Integer> fallback = functionReturningOne(); |
| ListenableFuture<Integer> originalFuture = immediateFailedFuture(new FileNotFoundException()); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(originalFuture, IOException.class, fallback, directExecutor()); |
| assertEquals(1, (int) getDone(faultTolerantFuture)); |
| } |
| |
| @GwtIncompatible // non-Throwable exceptionType |
| public void testCatching_customTypeNoMatch() throws Exception { |
| Function<IOException, Integer> fallback = functionReturningOne(); |
| ListenableFuture<Integer> originalFuture = immediateFailedFuture(new RuntimeException()); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catching(originalFuture, IOException.class, fallback, directExecutor()); |
| try { |
| getDone(faultTolerantFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); |
| } |
| } |
| |
| @GwtIncompatible // StackOverflowError |
| public void testCatching_StackOverflow() throws Exception { |
| { |
| /* |
| * Initialize all relevant classes before running the test, which may otherwise poison any |
| * classes it is trying to load during its stack overflow. |
| */ |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> unused = |
| catching(root, MyException.class, identity(), directExecutor()); |
| root.setException(new MyException()); |
| } |
| |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> output = root; |
| for (int i = 0; i < 10000; i++) { |
| output = catching(output, MyException.class, identity(), directExecutor()); |
| } |
| try { |
| root.setException(new MyException()); |
| fail(); |
| } catch (StackOverflowError expected) { |
| } |
| } |
| |
| public void testCatching_ErrorAfterCancellation() throws Exception { |
| class Fallback implements Function<Throwable, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public Object apply(Throwable input) { |
| output.cancel(false); |
| throw new MyError(); |
| } |
| } |
| Fallback fallback = new Fallback(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = catching(input, Throwable.class, fallback, directExecutor()); |
| fallback.output = output; |
| |
| input.setException(new MyException()); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testCatching_ExceptionAfterCancellation() throws Exception { |
| class Fallback implements Function<Throwable, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public Object apply(Throwable input) { |
| output.cancel(false); |
| throw new MyRuntimeException(); |
| } |
| } |
| Fallback fallback = new Fallback(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = catching(input, Throwable.class, fallback, directExecutor()); |
| fallback.output = output; |
| |
| input.setException(new MyException()); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testCatching_getThrowsRuntimeException() throws Exception { |
| ListenableFuture<Object> input = |
| UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); |
| |
| // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. |
| ListenableFuture<Object> output = |
| catching(input, Throwable.class, identity(), directExecutor()); |
| assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); |
| } |
| |
| public void testCatching_getThrowsError() throws Exception { |
| ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); |
| |
| // We'd catch only MyError.class here, but then the test won't compile under GWT. |
| ListenableFuture<Object> output = |
| catching(input, Throwable.class, identity(), directExecutor()); |
| assertThat(getDone(output)).isInstanceOf(MyError.class); |
| } |
| |
| public void testCatching_listenerThrowsError() throws Exception { |
| SettableFuture<Object> input = SettableFuture.create(); |
| ListenableFuture<Object> output = |
| catching(input, Throwable.class, identity(), directExecutor()); |
| |
| output.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| throw new MyError(); |
| } |
| }, |
| directExecutor()); |
| try { |
| input.setException(new MyException()); |
| fail(); |
| } catch (MyError expected) { |
| } |
| } |
| |
| public void testCatchingAsync_Throwable() throws Exception { |
| AsyncFunction<Throwable, Integer> fallback = asyncFunctionReturningOne(); |
| ListenableFuture<Integer> originalFuture = immediateFailedFuture(new IOException()); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); |
| assertEquals(1, (int) getDone(faultTolerantFuture)); |
| } |
| |
| @GwtIncompatible // non-Throwable exceptionType |
| public void testCatchingAsync_customTypeMatch() throws Exception { |
| AsyncFunction<IOException, Integer> fallback = asyncFunctionReturningOne(); |
| ListenableFuture<Integer> originalFuture = immediateFailedFuture(new FileNotFoundException()); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); |
| assertEquals(1, (int) getDone(faultTolerantFuture)); |
| } |
| |
| @GwtIncompatible // non-Throwable exceptionType |
| public void testCatchingAsync_customTypeNoMatch() throws Exception { |
| AsyncFunction<IOException, Integer> fallback = asyncFunctionReturningOne(); |
| ListenableFuture<Integer> originalFuture = immediateFailedFuture(new RuntimeException()); |
| ListenableFuture<Integer> faultTolerantFuture = |
| catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); |
| try { |
| getDone(faultTolerantFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); |
| } |
| } |
| |
| @GwtIncompatible // StackOverflowError |
| public void testCatchingAsync_StackOverflow() throws Exception { |
| { |
| /* |
| * Initialize all relevant classes before running the test, which may otherwise poison any |
| * classes it is trying to load during its stack overflow. |
| */ |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> unused = |
| catchingAsync(root, MyException.class, asyncIdentity(), directExecutor()); |
| root.setException(new MyException()); |
| } |
| |
| SettableFuture<Object> root = SettableFuture.create(); |
| ListenableFuture<Object> output = root; |
| for (int i = 0; i < 10000; i++) { |
| output = catchingAsync(output, MyException.class, asyncIdentity(), directExecutor()); |
| } |
| try { |
| root.setException(new MyException()); |
| fail(); |
| } catch (StackOverflowError expected) { |
| } |
| } |
| |
| public void testCatchingAsync_ErrorAfterCancellation() throws Exception { |
| class Fallback implements AsyncFunction<Throwable, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public ListenableFuture<Object> apply(Throwable input) { |
| output.cancel(false); |
| throw new MyError(); |
| } |
| } |
| Fallback fallback = new Fallback(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = |
| catchingAsync(input, Throwable.class, fallback, directExecutor()); |
| fallback.output = output; |
| |
| input.setException(new MyException()); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testCatchingAsync_ExceptionAfterCancellation() throws Exception { |
| class Fallback implements AsyncFunction<Throwable, Object> { |
| ListenableFuture<Object> output; |
| |
| @Override |
| public ListenableFuture<Object> apply(Throwable input) { |
| output.cancel(false); |
| throw new MyRuntimeException(); |
| } |
| } |
| Fallback fallback = new Fallback(); |
| SettableFuture<Object> input = SettableFuture.create(); |
| |
| ListenableFuture<Object> output = |
| catchingAsync(input, Throwable.class, fallback, directExecutor()); |
| fallback.output = output; |
| |
| input.setException(new MyException()); |
| assertTrue(output.isCancelled()); |
| } |
| |
| public void testCatchingAsync_getThrowsRuntimeException() throws Exception { |
| ListenableFuture<Object> input = |
| UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); |
| |
| // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. |
| ListenableFuture<Object> output = |
| catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); |
| assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); |
| } |
| |
| public void testCatchingAsync_getThrowsError() throws Exception { |
| ListenableFuture<Object> input = UncheckedThrowingFuture.throwingError(new MyError()); |
| |
| // We'd catch only MyError.class here, but then the test won't compile under GWT. |
| ListenableFuture<Object> output = |
| catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); |
| assertThat(getDone(output)).isInstanceOf(MyError.class); |
| } |
| |
| public void testCatchingAsync_listenerThrowsError() throws Exception { |
| SettableFuture<Object> input = SettableFuture.create(); |
| ListenableFuture<Object> output = |
| catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); |
| |
| output.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| throw new MyError(); |
| } |
| }, |
| directExecutor()); |
| try { |
| input.setException(new MyException()); |
| fail(); |
| } catch (MyError expected) { |
| } |
| } |
| |
| public void testCatching_rejectionPropagatesToOutput() throws Exception { |
| SettableFuture<String> input = SettableFuture.create(); |
| ListenableFuture<String> transformed = |
| catching(input, Throwable.class, constant("foo"), REJECTING_EXECUTOR); |
| input.setException(new Exception()); |
| try { |
| getDone(transformed); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); |
| } |
| } |
| |
| public void testCatchingAsync_rejectionPropagatesToOutput() throws Exception { |
| SettableFuture<String> input = SettableFuture.create(); |
| ListenableFuture<String> transformed = |
| catchingAsync( |
| input, |
| Throwable.class, |
| constantAsyncFunction(immediateFuture("foo")), |
| REJECTING_EXECUTOR); |
| input.setException(new Exception()); |
| try { |
| getDone(transformed); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); |
| } |
| } |
| |
| private <X extends Throwable> Function<X, Integer> functionReturningOne() { |
| return new Function<X, Integer>() { |
| @Override |
| public Integer apply(X t) { |
| return 1; |
| } |
| }; |
| } |
| |
| private <X extends Throwable> AsyncFunction<X, Integer> asyncFunctionReturningOne() { |
| return new AsyncFunction<X, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(X t) { |
| return immediateFuture(1); |
| } |
| }; |
| } |
| |
| private static <I, O> AsyncFunction<I, O> constantAsyncFunction( |
| final ListenableFuture<O> output) { |
| return new AsyncFunction<I, O>() { |
| @Override |
| public ListenableFuture<O> apply(I input) { |
| return output; |
| } |
| }; |
| } |
| |
| public void testTransformAsync_genericsWildcard_AsyncFunction() throws Exception { |
| ListenableFuture<?> nullFuture = immediateFuture(null); |
| ListenableFuture<?> chainedFuture = |
| transformAsync(nullFuture, constantAsyncFunction(nullFuture), directExecutor()); |
| assertNull(getDone(chainedFuture)); |
| } |
| |
| public void testTransformAsync_genericsHierarchy_AsyncFunction() throws Exception { |
| ListenableFuture<FooChild> future = immediateFuture(null); |
| final BarChild barChild = new BarChild(); |
| AsyncFunction<Foo, BarChild> function = |
| new AsyncFunction<Foo, BarChild>() { |
| @Override |
| public AbstractFuture<BarChild> apply(Foo unused) { |
| AbstractFuture<BarChild> future = new AbstractFuture<BarChild>() {}; |
| future.set(barChild); |
| return future; |
| } |
| }; |
| Bar bar = getDone(transformAsync(future, function, directExecutor())); |
| assertSame(barChild, bar); |
| } |
| |
| @GwtIncompatible // get() timeout |
| public void testTransformAsync_asyncFunction_timeout() |
| throws InterruptedException, ExecutionException { |
| AsyncFunction<String, Integer> function = constantAsyncFunction(immediateFuture(1)); |
| ListenableFuture<Integer> future = |
| transformAsync(SettableFuture.<String>create(), function, directExecutor()); |
| try { |
| future.get(1, MILLISECONDS); |
| fail(); |
| } catch (TimeoutException expected) { |
| } |
| } |
| |
| public void testTransformAsync_asyncFunction_error() throws InterruptedException { |
| final Error error = new Error("deliberate"); |
| AsyncFunction<String, Integer> function = |
| new AsyncFunction<String, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(String input) { |
| throw error; |
| } |
| }; |
| SettableFuture<String> inputFuture = SettableFuture.create(); |
| ListenableFuture<Integer> outputFuture = |
| transformAsync(inputFuture, function, directExecutor()); |
| inputFuture.set("value"); |
| try { |
| getDone(outputFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(error, expected.getCause()); |
| } |
| } |
| |
| public void testTransformAsync_asyncFunction_nullInsteadOfFuture() throws Exception { |
| ListenableFuture<?> inputFuture = immediateFuture("a"); |
| ListenableFuture<?> chainedFuture = |
| transformAsync(inputFuture, constantAsyncFunction(null), directExecutor()); |
| try { |
| getDone(chainedFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| NullPointerException cause = (NullPointerException) expected.getCause(); |
| assertThat(cause) |
| .hasMessageThat() |
| .contains( |
| "AsyncFunction.apply returned null instead of a Future. " |
| + "Did you mean to return immediateFuture(null)?"); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testTransformAsync_asyncFunction_cancelledWhileApplyingFunction() |
| throws InterruptedException, ExecutionException { |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch functionDone = new CountDownLatch(1); |
| final SettableFuture<Integer> resultFuture = SettableFuture.create(); |
| AsyncFunction<String, Integer> function = |
| new AsyncFunction<String, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(String input) throws Exception { |
| inFunction.countDown(); |
| functionDone.await(); |
| return resultFuture; |
| } |
| }; |
| SettableFuture<String> inputFuture = SettableFuture.create(); |
| ListenableFuture<Integer> future = |
| transformAsync(inputFuture, function, newSingleThreadExecutor()); |
| inputFuture.set("value"); |
| inFunction.await(); |
| future.cancel(false); |
| functionDone.countDown(); |
| try { |
| future.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| try { |
| resultFuture.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testTransformAsync_asyncFunction_cancelledBeforeApplyingFunction() |
| throws InterruptedException { |
| final AtomicBoolean functionCalled = new AtomicBoolean(); |
| AsyncFunction<String, Integer> function = |
| new AsyncFunction<String, Integer>() { |
| @Override |
| public ListenableFuture<Integer> apply(String input) throws Exception { |
| functionCalled.set(true); |
| return immediateFuture(1); |
| } |
| }; |
| SettableFuture<String> inputFuture = SettableFuture.create(); |
| ExecutorService executor = newSingleThreadExecutor(); |
| ListenableFuture<Integer> future = transformAsync(inputFuture, function, executor); |
| |
| // Pause the executor. |
| final CountDownLatch beforeFunction = new CountDownLatch(1); |
| executor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| awaitUninterruptibly(beforeFunction); |
| } |
| }); |
| |
| // Cancel the future after making input available. |
| inputFuture.set("value"); |
| future.cancel(false); |
| |
| // Unpause the executor. |
| beforeFunction.countDown(); |
| executor.shutdown(); |
| assertTrue(executor.awaitTermination(5, SECONDS)); |
| |
| assertFalse(functionCalled.get()); |
| } |
| |
| public void testSubmitAsync_asyncCallable_error() throws InterruptedException { |
| final Error error = new Error("deliberate"); |
| AsyncCallable<Integer> callable = |
| new AsyncCallable<Integer>() { |
| @Override |
| public ListenableFuture<Integer> call() { |
| throw error; |
| } |
| }; |
| SettableFuture<String> inputFuture = SettableFuture.create(); |
| ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor()); |
| inputFuture.set("value"); |
| try { |
| getDone(outputFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(error, expected.getCause()); |
| } |
| } |
| |
| public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { |
| ListenableFuture<?> chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor()); |
| try { |
| getDone(chainedFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| NullPointerException cause = (NullPointerException) expected.getCause(); |
| assertThat(cause) |
| .hasMessageThat() |
| .contains( |
| "AsyncCallable.call returned null instead of a Future. " |
| + "Did you mean to return immediateFuture(null)?"); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction() |
| throws InterruptedException, ExecutionException { |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch callableDone = new CountDownLatch(1); |
| final SettableFuture<Integer> resultFuture = SettableFuture.create(); |
| AsyncCallable<Integer> callable = |
| new AsyncCallable<Integer>() { |
| @Override |
| public ListenableFuture<Integer> call() throws InterruptedException { |
| inFunction.countDown(); |
| callableDone.await(); |
| return resultFuture; |
| } |
| }; |
| SettableFuture<String> inputFuture = SettableFuture.create(); |
| ListenableFuture<Integer> future = submitAsync(callable, newSingleThreadExecutor()); |
| inputFuture.set("value"); |
| inFunction.await(); |
| future.cancel(false); |
| callableDone.countDown(); |
| try { |
| future.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| try { |
| resultFuture.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction() |
| throws InterruptedException { |
| final AtomicBoolean callableCalled = new AtomicBoolean(); |
| AsyncCallable<Integer> callable = |
| new AsyncCallable<Integer>() { |
| @Override |
| public ListenableFuture<Integer> call() { |
| callableCalled.set(true); |
| return immediateFuture(1); |
| } |
| }; |
| ExecutorService executor = newSingleThreadExecutor(); |
| // Pause the executor. |
| final CountDownLatch beforeFunction = new CountDownLatch(1); |
| executor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| awaitUninterruptibly(beforeFunction); |
| } |
| }); |
| ListenableFuture<Integer> future = submitAsync(callable, executor); |
| future.cancel(false); |
| |
| // Unpause the executor. |
| beforeFunction.countDown(); |
| executor.shutdown(); |
| assertTrue(executor.awaitTermination(5, SECONDS)); |
| |
| assertFalse(callableCalled.get()); |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testSubmitAsync_asyncCallable_returnsInterruptedFuture() throws InterruptedException { |
| assertThat(Thread.interrupted()).isFalse(); |
| SettableFuture<Integer> cancelledFuture = SettableFuture.create(); |
| cancelledFuture.cancel(true); |
| assertThat(Thread.interrupted()).isFalse(); |
| ListenableFuture<Integer> future = |
| submitAsync(constantAsyncCallable(cancelledFuture), directExecutor()); |
| assertThat(future.isDone()).isTrue(); |
| assertThat(Thread.interrupted()).isFalse(); |
| } |
| |
| public void testSubmit_callable_returnsValue() throws Exception { |
| Callable<Integer> callable = |
| new Callable<Integer>() { |
| @Override |
| public Integer call() { |
| return 42; |
| } |
| }; |
| ListenableFuture<Integer> future = submit(callable, directExecutor()); |
| assertThat(future.isDone()).isTrue(); |
| assertThat(getDone(future)).isEqualTo(42); |
| } |
| |
| public void testSubmit_callable_throwsException() { |
| final Exception exception = new Exception("Exception for testing"); |
| Callable<Integer> callable = |
| new Callable<Integer>() { |
| @Override |
| public Integer call() throws Exception { |
| throw exception; |
| } |
| }; |
| ListenableFuture<Integer> future = submit(callable, directExecutor()); |
| try { |
| getDone(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected).hasCauseThat().isSameInstanceAs(exception); |
| } |
| } |
| |
| public void testSubmit_runnable_completesAfterRun() throws Exception { |
| final List<Runnable> pendingRunnables = newArrayList(); |
| final List<Runnable> executedRunnables = newArrayList(); |
| Runnable runnable = |
| new Runnable() { |
| @Override |
| public void run() { |
| executedRunnables.add(this); |
| } |
| }; |
| Executor executor = |
| new Executor() { |
| @Override |
| public void execute(Runnable runnable) { |
| pendingRunnables.add(runnable); |
| } |
| }; |
| ListenableFuture<Void> future = submit(runnable, executor); |
| assertThat(future.isDone()).isFalse(); |
| assertThat(executedRunnables).isEmpty(); |
| assertThat(pendingRunnables).hasSize(1); |
| pendingRunnables.remove(0).run(); |
| assertThat(future.isDone()).isTrue(); |
| assertThat(executedRunnables).containsExactly(runnable); |
| assertThat(pendingRunnables).isEmpty(); |
| } |
| |
| public void testSubmit_runnable_throwsException() throws Exception { |
| final RuntimeException exception = new RuntimeException("Exception for testing"); |
| Runnable runnable = |
| new Runnable() { |
| @Override |
| public void run() { |
| throw exception; |
| } |
| }; |
| ListenableFuture<Void> future = submit(runnable, directExecutor()); |
| try { |
| getDone(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected).hasCauseThat().isSameInstanceAs(exception); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testScheduleAsync_asyncCallable_error() throws InterruptedException { |
| final Error error = new Error("deliberate"); |
| AsyncCallable<Integer> callable = |
| new AsyncCallable<Integer>() { |
| @Override |
| public ListenableFuture<Integer> call() { |
| throw error; |
| } |
| }; |
| SettableFuture<String> inputFuture = SettableFuture.create(); |
| ListenableFuture<Integer> outputFuture = submitAsync(callable, directExecutor()); |
| inputFuture.set("value"); |
| try { |
| getDone(outputFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(error, expected.getCause()); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception { |
| ListenableFuture<?> chainedFuture = |
| scheduleAsync( |
| constantAsyncCallable(null), |
| 1, |
| TimeUnit.NANOSECONDS, |
| newSingleThreadScheduledExecutor()); |
| try { |
| chainedFuture.get(); |
| fail(); |
| } catch (ExecutionException expected) { |
| NullPointerException cause = (NullPointerException) expected.getCause(); |
| assertThat(cause) |
| .hasMessageThat() |
| .contains( |
| "AsyncCallable.call returned null instead of a Future. " |
| + "Did you mean to return immediateFuture(null)?"); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction() |
| throws InterruptedException, ExecutionException { |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch callableDone = new CountDownLatch(1); |
| final SettableFuture<Integer> resultFuture = SettableFuture.create(); |
| AsyncCallable<Integer> callable = |
| new AsyncCallable<Integer>() { |
| @Override |
| public ListenableFuture<Integer> call() throws InterruptedException { |
| inFunction.countDown(); |
| callableDone.await(); |
| return resultFuture; |
| } |
| }; |
| ListenableFuture<Integer> future = |
| scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor()); |
| inFunction.await(); |
| future.cancel(false); |
| callableDone.countDown(); |
| try { |
| future.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| try { |
| resultFuture.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction() |
| throws InterruptedException { |
| final AtomicBoolean callableCalled = new AtomicBoolean(); |
| AsyncCallable<Integer> callable = |
| new AsyncCallable<Integer>() { |
| @Override |
| public ListenableFuture<Integer> call() { |
| callableCalled.set(true); |
| return immediateFuture(1); |
| } |
| }; |
| ScheduledExecutorService executor = newSingleThreadScheduledExecutor(); |
| // Pause the executor. |
| final CountDownLatch beforeFunction = new CountDownLatch(1); |
| executor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| awaitUninterruptibly(beforeFunction); |
| } |
| }); |
| ListenableFuture<Integer> future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, executor); |
| future.cancel(false); |
| |
| // Unpause the executor. |
| beforeFunction.countDown(); |
| executor.shutdown(); |
| assertTrue(executor.awaitTermination(5, SECONDS)); |
| |
| assertFalse(callableCalled.get()); |
| } |
| |
| private static <T> AsyncCallable<T> constantAsyncCallable(final ListenableFuture<T> returnValue) { |
| return new AsyncCallable<T>() { |
| @Override |
| public ListenableFuture<T> call() { |
| return returnValue; |
| } |
| }; |
| } |
| |
| /** Runnable which can be called a single time, and only after {@link #expectCall} is called. */ |
| // TODO(cpovirk): top-level class? |
| private static class SingleCallListener implements Runnable { |
| |
| private boolean expectCall = false; |
| private final AtomicBoolean called = new AtomicBoolean(); |
| |
| @Override |
| public void run() { |
| assertTrue("Listener called before it was expected", expectCall); |
| assertFalse("Listener called more than once", wasCalled()); |
| called.set(true); |
| } |
| |
| public void expectCall() { |
| assertFalse("expectCall is already true", expectCall); |
| expectCall = true; |
| } |
| |
| public boolean wasCalled() { |
| return called.get(); |
| } |
| } |
| |
| public void testAllAsList() throws Exception { |
| // Create input and output |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| SettableFuture<String> future3 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(future1, future2, future3); |
| |
| // Attach a listener |
| SingleCallListener listener = new SingleCallListener(); |
| compound.addListener(listener, directExecutor()); |
| |
| // Satisfy each input and check the output |
| assertFalse(compound.isDone()); |
| future1.set(DATA1); |
| assertFalse(compound.isDone()); |
| future2.set(DATA2); |
| assertFalse(compound.isDone()); |
| listener.expectCall(); |
| future3.set(DATA3); |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); |
| } |
| |
| public void testAllAsList_emptyList() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| listener.expectCall(); |
| List<ListenableFuture<String>> futures = ImmutableList.of(); |
| ListenableFuture<List<String>> compound = allAsList(futures); |
| compound.addListener(listener, directExecutor()); |
| assertThat(getDone(compound)).isEmpty(); |
| assertTrue(listener.wasCalled()); |
| } |
| |
| public void testAllAsList_emptyArray() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| listener.expectCall(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(); |
| compound.addListener(listener, directExecutor()); |
| assertThat(getDone(compound)).isEmpty(); |
| assertTrue(listener.wasCalled()); |
| } |
| |
| public void testAllAsList_failure() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(future1, future2); |
| compound.addListener(listener, directExecutor()); |
| |
| listener.expectCall(); |
| Throwable exception = new Throwable("failed1"); |
| future1.setException(exception); |
| assertTrue(compound.isDone()); |
| assertTrue(listener.wasCalled()); |
| assertFalse(future2.isDone()); |
| |
| try { |
| getDone(compound); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| } |
| |
| public void testAllAsList_singleFailure() throws Exception { |
| Throwable exception = new Throwable("failed"); |
| ListenableFuture<String> future = immediateFailedFuture(exception); |
| ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future)); |
| |
| try { |
| getDone(compound); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| } |
| |
| public void testAllAsList_immediateFailure() throws Exception { |
| Throwable exception = new Throwable("failed"); |
| ListenableFuture<String> future1 = immediateFailedFuture(exception); |
| ListenableFuture<String> future2 = immediateFuture("results"); |
| ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future1, future2)); |
| |
| try { |
| getDone(compound); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(exception, expected.getCause()); |
| } |
| } |
| |
| public void testAllAsList_error() throws Exception { |
| Error error = new Error("deliberate"); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| ListenableFuture<String> future2 = immediateFuture("results"); |
| ListenableFuture<List<String>> compound = allAsList(ImmutableList.of(future1, future2)); |
| |
| future1.setException(error); |
| try { |
| getDone(compound); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(error, expected.getCause()); |
| } |
| } |
| |
| public void testAllAsList_cancelled() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(future1, future2); |
| compound.addListener(listener, directExecutor()); |
| |
| listener.expectCall(); |
| future1.cancel(true); |
| assertTrue(compound.isDone()); |
| assertTrue(listener.wasCalled()); |
| assertFalse(future2.isDone()); |
| |
| try { |
| getDone(compound); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| public void testAllAsList_resultCancelled() throws Exception { |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(future1, future2); |
| |
| future2.set(DATA2); |
| assertFalse(compound.isDone()); |
| assertTrue(compound.cancel(false)); |
| assertTrue(compound.isCancelled()); |
| assertTrue(future1.isCancelled()); |
| assertFalse(future1.wasInterrupted()); |
| } |
| |
| public void testAllAsList_resultCancelledInterrupted_withSecondaryListFuture() throws Exception { |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| ListenableFuture<List<String>> compound = allAsList(future1, future2); |
| // There was a bug where the event listener for the combined future would |
| // result in the sub-futures being cancelled without being interrupted. |
| ListenableFuture<List<String>> otherCompound = allAsList(future1, future2); |
| |
| assertTrue(compound.cancel(true)); |
| assertTrue(future1.isCancelled()); |
| assertTrue(future1.wasInterrupted()); |
| assertTrue(future2.isCancelled()); |
| assertTrue(future2.wasInterrupted()); |
| assertTrue(otherCompound.isCancelled()); |
| } |
| |
| public void testAllAsList_resultCancelled_withSecondaryListFuture() throws Exception { |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| ListenableFuture<List<String>> compound = allAsList(future1, future2); |
| // This next call is "unused," but it is an important part of the test. Don't remove it! |
| ListenableFuture<List<String>> unused = allAsList(future1, future2); |
| |
| assertTrue(compound.cancel(false)); |
| assertTrue(future1.isCancelled()); |
| assertFalse(future1.wasInterrupted()); |
| assertTrue(future2.isCancelled()); |
| assertFalse(future2.wasInterrupted()); |
| } |
| |
| public void testAllAsList_resultInterrupted() throws Exception { |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(future1, future2); |
| |
| future2.set(DATA2); |
| assertFalse(compound.isDone()); |
| assertTrue(compound.cancel(true)); |
| assertTrue(compound.isCancelled()); |
| assertTrue(future1.isCancelled()); |
| assertTrue(future1.wasInterrupted()); |
| } |
| |
| /** |
| * Test the case where the futures are fulfilled prior to constructing the ListFuture. There was a |
| * bug where the loop that connects a Listener to each of the futures would die on the last |
| * loop-check as done() on ListFuture nulled out the variable being looped over (the list of |
| * futures). |
| */ |
| public void testAllAsList_doneFutures() throws Exception { |
| // Create input and output |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| SettableFuture<String> future3 = SettableFuture.create(); |
| |
| // Satisfy each input prior to creating compound and check the output |
| future1.set(DATA1); |
| future2.set(DATA2); |
| future3.set(DATA3); |
| |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = allAsList(future1, future2, future3); |
| |
| // Attach a listener |
| SingleCallListener listener = new SingleCallListener(); |
| listener.expectCall(); |
| compound.addListener(listener, directExecutor()); |
| |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); |
| } |
| |
| /** A single non-error failure is not logged because it is reported via the output future. */ |
| @SuppressWarnings("unchecked") |
| public void testAllAsList_logging_exception() throws Exception { |
| try { |
| getDone(allAsList(immediateFailedFuture(new MyException()))); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyException.class); |
| assertEquals( |
| "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); |
| } |
| } |
| |
| /** Ensure that errors are always logged. */ |
| @SuppressWarnings("unchecked") |
| public void testAllAsList_logging_error() throws Exception { |
| try { |
| getDone(allAsList(immediateFailedFuture(new MyError()))); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyError.class); |
| List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); |
| assertThat(logged).hasSize(1); // errors are always logged |
| assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); |
| } |
| } |
| |
| /** All as list will log extra exceptions that have already occurred. */ |
| @SuppressWarnings("unchecked") |
| public void testAllAsList_logging_multipleExceptions_alreadyDone() throws Exception { |
| try { |
| getDone( |
| allAsList( |
| immediateFailedFuture(new MyException()), immediateFailedFuture(new MyException()))); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyException.class); |
| List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); |
| assertThat(logged).hasSize(1); // the second failure is logged |
| assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); |
| } |
| } |
| |
| /** All as list will log extra exceptions that occur later. */ |
| @SuppressWarnings("unchecked") |
| public void testAllAsList_logging_multipleExceptions_doneLater() throws Exception { |
| SettableFuture<Object> future1 = SettableFuture.create(); |
| SettableFuture<Object> future2 = SettableFuture.create(); |
| SettableFuture<Object> future3 = SettableFuture.create(); |
| ListenableFuture<List<Object>> all = allAsList(future1, future2, future3); |
| |
| future1.setException(new MyException()); |
| future2.setException(new MyException()); |
| future3.setException(new MyException()); |
| |
| try { |
| getDone(all); |
| fail(); |
| } catch (ExecutionException expected) { |
| List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); |
| assertThat(logged).hasSize(2); // failures after the first are logged |
| assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); |
| assertThat(logged.get(1).getThrown()).isInstanceOf(MyException.class); |
| } |
| } |
| |
| /** The same exception happening on multiple futures should not be logged. */ |
| @SuppressWarnings("unchecked") |
| public void testAllAsList_logging_same_exception() throws Exception { |
| try { |
| MyException sameInstance = new MyException(); |
| getDone(allAsList(immediateFailedFuture(sameInstance), immediateFailedFuture(sameInstance))); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyException.class); |
| assertEquals( |
| "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); |
| } |
| } |
| |
| public void testAllAsList_logging_seenExceptionUpdateRace() throws Exception { |
| final MyException sameInstance = new MyException(); |
| SettableFuture<Object> firstFuture = SettableFuture.create(); |
| final SettableFuture<Object> secondFuture = SettableFuture.create(); |
| ListenableFuture<List<Object>> bulkFuture = allAsList(firstFuture, secondFuture); |
| |
| bulkFuture.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| /* |
| * firstFuture just completed, but AggregateFuture hasn't yet had time to record the |
| * exception in seenExceptions. When we complete secondFuture with the same exception, |
| * we want for AggregateFuture to still detect that it's been previously seen. |
| */ |
| secondFuture.setException(sameInstance); |
| } |
| }, |
| directExecutor()); |
| firstFuture.setException(sameInstance); |
| |
| try { |
| getDone(bulkFuture); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyException.class); |
| assertThat(aggregateFutureLogHandler.getStoredLogRecords()).isEmpty(); |
| } |
| } |
| |
| public void testAllAsList_logging_seenExceptionUpdateCancelRace() throws Exception { |
| final MyException subsequentFailure = new MyException(); |
| SettableFuture<Object> firstFuture = SettableFuture.create(); |
| final SettableFuture<Object> secondFuture = SettableFuture.create(); |
| ListenableFuture<List<Object>> bulkFuture = allAsList(firstFuture, secondFuture); |
| |
| bulkFuture.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| /* |
| * This is similar to the above test, but this time we're making sure that we recognize |
| * that the output Future is done early not because of an exception but because of a |
| * cancellation. |
| */ |
| secondFuture.setException(subsequentFailure); |
| } |
| }, |
| directExecutor()); |
| firstFuture.cancel(false); |
| |
| try { |
| getDone(bulkFuture); |
| fail(); |
| } catch (CancellationException expected) { |
| assertThat(getOnlyElement(aggregateFutureLogHandler.getStoredLogRecords()).getThrown()) |
| .isSameInstanceAs(subsequentFailure); |
| } |
| } |
| |
| /** |
| * Different exceptions happening on multiple futures with the same cause should not be logged. |
| */ |
| @SuppressWarnings("unchecked") |
| public void testAllAsList_logging_same_cause() throws Exception { |
| try { |
| MyException exception1 = new MyException(); |
| MyException exception2 = new MyException(); |
| MyException exception3 = new MyException(); |
| |
| MyException sameInstance = new MyException(); |
| exception1.initCause(sameInstance); |
| exception2.initCause(sameInstance); |
| exception3.initCause(exception2); |
| getDone(allAsList(immediateFailedFuture(exception1), immediateFailedFuture(exception3))); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected.getCause()).isInstanceOf(MyException.class); |
| assertEquals( |
| "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); |
| } |
| } |
| |
| private static String createCombinedResult(Integer i, Boolean b) { |
| return "-" + i + "-" + b; |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testWhenAllComplete_noLeakInterruption() throws Exception { |
| final SettableFuture<String> stringFuture = SettableFuture.create(); |
| AsyncCallable<String> combiner = |
| new AsyncCallable<String>() { |
| @Override |
| public ListenableFuture<String> call() throws Exception { |
| return stringFuture; |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = whenAllComplete().callAsync(combiner, directExecutor()); |
| |
| assertThat(Thread.interrupted()).isFalse(); |
| futureResult.cancel(true); |
| assertThat(Thread.interrupted()).isFalse(); |
| } |
| |
| public void testWhenAllComplete_wildcard() throws Exception { |
| ListenableFuture<?> futureA = immediateFuture("a"); |
| ListenableFuture<?> futureB = immediateFuture("b"); |
| ListenableFuture<?>[] futures = new ListenableFuture<?>[0]; |
| Callable<String> combiner = |
| new Callable<String>() { |
| @Override |
| public String call() throws Exception { |
| return "hi"; |
| } |
| }; |
| |
| // We'd like for all the following to compile. |
| ListenableFuture<String> unused; |
| |
| // Compiles: |
| unused = whenAllComplete(futureA, futureB).call(combiner, directExecutor()); |
| |
| // Does not compile: |
| // unused = whenAllComplete(futures).call(combiner); |
| |
| // Workaround for the above: |
| unused = whenAllComplete(asList(futures)).call(combiner, directExecutor()); |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testWhenAllComplete_asyncResult() throws Exception { |
| SettableFuture<Integer> futureInteger = SettableFuture.create(); |
| SettableFuture<Boolean> futureBoolean = SettableFuture.create(); |
| |
| final ExecutorService executor = newSingleThreadExecutor(); |
| final CountDownLatch callableBlocking = new CountDownLatch(1); |
| final SettableFuture<String> resultOfCombiner = SettableFuture.create(); |
| AsyncCallable<String> combiner = |
| tagged( |
| "Called my toString", |
| new AsyncCallable<String>() { |
| @Override |
| public ListenableFuture<String> call() throws Exception { |
| // Make this executor terminate after this task so that the test can tell when |
| // futureResult has received resultOfCombiner. |
| executor.shutdown(); |
| callableBlocking.await(); |
| return resultOfCombiner; |
| } |
| }); |
| |
| ListenableFuture<String> futureResult = |
| whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, executor); |
| |
| // Waiting on backing futures |
| assertThat(futureResult.toString()) |
| .matches( |
| "CombinedFuture@\\w+\\[status=PENDING," |
| + " info=\\[futures=\\[SettableFuture@\\w+\\[status=PENDING]," |
| + " SettableFuture@\\w+\\[status=PENDING]]]]"); |
| Integer integerPartial = 1; |
| futureInteger.set(integerPartial); |
| assertThat(futureResult.toString()) |
| .matches( |
| "CombinedFuture@\\w+\\[status=PENDING," |
| + " info=\\[futures=\\[SettableFuture@\\w+\\[status=SUCCESS," |
| + " result=\\[java.lang.Integer@\\w+]], SettableFuture@\\w+\\[status=PENDING]]]]"); |
| |
| // Backing futures complete |
| Boolean booleanPartial = true; |
| futureBoolean.set(booleanPartial); |
| // Once the backing futures are done there's a (brief) moment where we know nothing |
| assertThat(futureResult.toString()).matches("CombinedFuture@\\w+\\[status=PENDING]"); |
| callableBlocking.countDown(); |
| // Need to wait for resultFuture to be returned. |
| assertTrue(executor.awaitTermination(10, SECONDS)); |
| // But once the async function has returned a future we can include that in the toString |
| assertThat(futureResult.toString()) |
| .matches( |
| "CombinedFuture@\\w+\\[status=PENDING," |
| + " setFuture=\\[SettableFuture@\\w+\\[status=PENDING]]]"); |
| |
| // Future complete |
| resultOfCombiner.set(createCombinedResult(getDone(futureInteger), getDone(futureBoolean))); |
| String expectedResult = createCombinedResult(integerPartial, booleanPartial); |
| assertEquals(expectedResult, futureResult.get()); |
| assertThat(futureResult.toString()) |
| .matches("CombinedFuture@\\w+\\[status=SUCCESS, result=\\[java.lang.String@\\w+]]"); |
| } |
| |
| public void testWhenAllComplete_asyncError() throws Exception { |
| final Exception thrown = new RuntimeException("test"); |
| |
| final SettableFuture<Integer> futureInteger = SettableFuture.create(); |
| final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); |
| AsyncCallable<String> combiner = |
| new AsyncCallable<String>() { |
| @Override |
| public ListenableFuture<String> call() throws Exception { |
| assertTrue(futureInteger.isDone()); |
| assertTrue(futureBoolean.isDone()); |
| return immediateFailedFuture(thrown); |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = |
| whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); |
| Integer integerPartial = 1; |
| futureInteger.set(integerPartial); |
| Boolean booleanPartial = true; |
| futureBoolean.set(booleanPartial); |
| |
| try { |
| getDone(futureResult); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(thrown, expected.getCause()); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testWhenAllComplete_cancelledNotInterrupted() throws Exception { |
| SettableFuture<String> stringFuture = SettableFuture.create(); |
| SettableFuture<Boolean> booleanFuture = SettableFuture.create(); |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); |
| final SettableFuture<String> resultFuture = SettableFuture.create(); |
| AsyncCallable<String> combiner = |
| new AsyncCallable<String>() { |
| @Override |
| public ListenableFuture<String> call() throws Exception { |
| inFunction.countDown(); |
| shouldCompleteFunction.await(); |
| return resultFuture; |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = |
| whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); |
| |
| stringFuture.set("value"); |
| booleanFuture.set(true); |
| inFunction.await(); |
| futureResult.cancel(false); |
| shouldCompleteFunction.countDown(); |
| try { |
| futureResult.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| |
| try { |
| resultFuture.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testWhenAllComplete_interrupted() throws Exception { |
| SettableFuture<String> stringFuture = SettableFuture.create(); |
| SettableFuture<Boolean> booleanFuture = SettableFuture.create(); |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch gotException = new CountDownLatch(1); |
| AsyncCallable<String> combiner = |
| new AsyncCallable<String>() { |
| @Override |
| public ListenableFuture<String> call() throws Exception { |
| inFunction.countDown(); |
| try { |
| new CountDownLatch(1).await(); // wait for interrupt |
| } catch (InterruptedException expected) { |
| gotException.countDown(); |
| throw expected; |
| } |
| return immediateFuture("a"); |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = |
| whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); |
| |
| stringFuture.set("value"); |
| booleanFuture.set(true); |
| inFunction.await(); |
| futureResult.cancel(true); |
| try { |
| futureResult.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| gotException.await(); |
| } |
| |
| public void testWhenAllComplete_runnableResult() throws Exception { |
| final SettableFuture<Integer> futureInteger = SettableFuture.create(); |
| final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); |
| final String[] result = new String[1]; |
| Runnable combiner = |
| new Runnable() { |
| @Override |
| public void run() { |
| assertTrue(futureInteger.isDone()); |
| assertTrue(futureBoolean.isDone()); |
| result[0] = |
| createCombinedResult( |
| Futures.getUnchecked(futureInteger), Futures.getUnchecked(futureBoolean)); |
| } |
| }; |
| |
| ListenableFuture<?> futureResult = |
| whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); |
| Integer integerPartial = 1; |
| futureInteger.set(integerPartial); |
| Boolean booleanPartial = true; |
| futureBoolean.set(booleanPartial); |
| futureResult.get(); |
| assertEquals(createCombinedResult(integerPartial, booleanPartial), result[0]); |
| } |
| |
| public void testWhenAllComplete_runnableError() throws Exception { |
| final RuntimeException thrown = new RuntimeException("test"); |
| |
| final SettableFuture<Integer> futureInteger = SettableFuture.create(); |
| final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); |
| Runnable combiner = |
| new Runnable() { |
| @Override |
| public void run() { |
| assertTrue(futureInteger.isDone()); |
| assertTrue(futureBoolean.isDone()); |
| throw thrown; |
| } |
| }; |
| |
| ListenableFuture<?> futureResult = |
| whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); |
| Integer integerPartial = 1; |
| futureInteger.set(integerPartial); |
| Boolean booleanPartial = true; |
| futureBoolean.set(booleanPartial); |
| |
| try { |
| getDone(futureResult); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(thrown, expected.getCause()); |
| } |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testWhenAllCompleteRunnable_resultCanceledWithoutInterrupt_doesNotInterruptRunnable() |
| throws Exception { |
| SettableFuture<String> stringFuture = SettableFuture.create(); |
| SettableFuture<Boolean> booleanFuture = SettableFuture.create(); |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); |
| final CountDownLatch combinerCompletedWithoutInterrupt = new CountDownLatch(1); |
| Runnable combiner = |
| new Runnable() { |
| @Override |
| public void run() { |
| inFunction.countDown(); |
| try { |
| shouldCompleteFunction.await(); |
| combinerCompletedWithoutInterrupt.countDown(); |
| } catch (InterruptedException e) { |
| // Ensure the thread's interrupt status is preserved. |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| |
| ListenableFuture<?> futureResult = |
| whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); |
| |
| stringFuture.set("value"); |
| booleanFuture.set(true); |
| inFunction.await(); |
| futureResult.cancel(false); |
| shouldCompleteFunction.countDown(); |
| try { |
| futureResult.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| combinerCompletedWithoutInterrupt.await(); |
| } |
| |
| @GwtIncompatible // threads |
| |
| public void testWhenAllCompleteRunnable_resultCanceledWithInterrupt_InterruptsRunnable() |
| throws Exception { |
| SettableFuture<String> stringFuture = SettableFuture.create(); |
| SettableFuture<Boolean> booleanFuture = SettableFuture.create(); |
| final CountDownLatch inFunction = new CountDownLatch(1); |
| final CountDownLatch gotException = new CountDownLatch(1); |
| Runnable combiner = |
| new Runnable() { |
| @Override |
| public void run() { |
| inFunction.countDown(); |
| try { |
| new CountDownLatch(1).await(); // wait for interrupt |
| } catch (InterruptedException expected) { |
| // Ensure the thread's interrupt status is preserved. |
| Thread.currentThread().interrupt(); |
| gotException.countDown(); |
| } |
| } |
| }; |
| |
| ListenableFuture<?> futureResult = |
| whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); |
| |
| stringFuture.set("value"); |
| booleanFuture.set(true); |
| inFunction.await(); |
| futureResult.cancel(true); |
| try { |
| futureResult.get(); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| gotException.await(); |
| } |
| |
| public void testWhenAllSucceed() throws Exception { |
| class PartialResultException extends Exception {} |
| |
| final SettableFuture<Integer> futureInteger = SettableFuture.create(); |
| final SettableFuture<Boolean> futureBoolean = SettableFuture.create(); |
| AsyncCallable<String> combiner = |
| new AsyncCallable<String>() { |
| @Override |
| public ListenableFuture<String> call() throws Exception { |
| throw new AssertionFailedError("AsyncCallable should not have been called."); |
| } |
| }; |
| |
| ListenableFuture<String> futureResult = |
| whenAllSucceed(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); |
| PartialResultException partialResultException = new PartialResultException(); |
| futureInteger.setException(partialResultException); |
| Boolean booleanPartial = true; |
| futureBoolean.set(booleanPartial); |
| try { |
| getDone(futureResult); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(partialResultException, expected.getCause()); |
| } |
| } |
| |
| @AndroidIncompatible |
| @GwtIncompatible |
| public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1); |
| WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2); |
| |
| Callable<Long> combiner = |
| new Callable<Long>() { |
| @Override |
| public Long call() { |
| throw new AssertionError(); |
| } |
| }; |
| |
| ListenableFuture<Long> unused = |
| whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); |
| |
| future1.set(1L); |
| future1 = null; |
| future2.set(2L); |
| future2 = null; |
| |
| /* |
| * Futures should be collected even if combiner never runs. This is kind of a silly test, since |
| * the combiner is almost certain to hold its own reference to the futures, and a real app would |
| * hold a reference to the executor and thus to the combiner. What we really care about is that |
| * the futures are released once the combiner is done running. But we happen to provide this |
| * earlier cleanup at the moment, so we're testing it. |
| */ |
| GcFinalization.awaitClear(future1Ref); |
| GcFinalization.awaitClear(future2Ref); |
| } |
| |
| @AndroidIncompatible |
| @GwtIncompatible |
| public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { |
| SettableFuture<Long> future = SettableFuture.create(); |
| WeakReference<SettableFuture<Long>> futureRef = new WeakReference<>(future); |
| |
| Callable<Long> combiner = |
| new Callable<Long>() { |
| @Override |
| public Long call() { |
| throw new AssertionError(); |
| } |
| }; |
| |
| ListenableFuture<Long> unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); |
| |
| unused.cancel(false); |
| future = null; |
| |
| // Future should be collected because whenAll*Complete* doesn't need to look at its result. |
| GcFinalization.awaitClear(futureRef); |
| } |
| |
| @AndroidIncompatible |
| @GwtIncompatible |
| public void testWhenAllSucceed_releasesCallable() throws Exception { |
| AsyncCallable<Long> combiner = |
| new AsyncCallable<Long>() { |
| @Override |
| public ListenableFuture<Long> call() { |
| return SettableFuture.create(); |
| } |
| }; |
| WeakReference<AsyncCallable<Long>> combinerRef = new WeakReference<>(combiner); |
| |
| ListenableFuture<Long> unused = |
| whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); |
| |
| combiner = null; |
| // combiner should be collected even if the future it returns never completes. |
| GcFinalization.awaitClear(combinerRef); |
| } |
| |
| /* |
| * TODO(cpovirk): maybe pass around TestFuture instances instead of |
| * ListenableFuture instances |
| */ |
| |
| /** |
| * A future in {@link TestFutureBatch} that also has a name for debugging purposes and a {@code |
| * finisher}, a task that will complete the future in some fashion when it is called, allowing for |
| * testing both before and after the completion of the future. |
| */ |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static final class TestFuture { |
| |
| final ListenableFuture<String> future; |
| final String name; |
| final Runnable finisher; |
| |
| TestFuture(ListenableFuture<String> future, String name, Runnable finisher) { |
| this.future = future; |
| this.name = name; |
| this.finisher = finisher; |
| } |
| } |
| |
| /** |
| * A collection of several futures, covering cancellation, success, and failure (both {@link |
| * ExecutionException} and {@link RuntimeException}), both immediate and delayed. We use each |
| * possible pair of these futures in {@link FuturesTest#runExtensiveMergerTest}. |
| * |
| * <p>Each test requires a new {@link TestFutureBatch} because we need new delayed futures each |
| * time, as the old delayed futures were completed as part of the old test. |
| */ |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static final class TestFutureBatch { |
| |
| final ListenableFuture<String> doneSuccess = immediateFuture("a"); |
| final ListenableFuture<String> doneFailed = immediateFailedFuture(new Exception()); |
| final SettableFuture<String> doneCancelled = SettableFuture.create(); |
| |
| { |
| doneCancelled.cancel(true); |
| } |
| |
| final ListenableFuture<String> doneRuntimeException = |
| new ForwardingListenableFuture<String>() { |
| final ListenableFuture<String> delegate = immediateFuture("Should never be seen"); |
| |
| @Override |
| protected ListenableFuture<String> delegate() { |
| return delegate; |
| } |
| |
| @Override |
| public String get() { |
| throw new RuntimeException(); |
| } |
| |
| @Override |
| public String get(long timeout, TimeUnit unit) { |
| throw new RuntimeException(); |
| } |
| }; |
| |
| final SettableFuture<String> delayedSuccess = SettableFuture.create(); |
| final SettableFuture<String> delayedFailed = SettableFuture.create(); |
| final SettableFuture<String> delayedCancelled = SettableFuture.create(); |
| |
| final SettableFuture<String> delegateForDelayedRuntimeException = SettableFuture.create(); |
| final ListenableFuture<String> delayedRuntimeException = |
| new ForwardingListenableFuture<String>() { |
| @Override |
| protected ListenableFuture<String> delegate() { |
| return delegateForDelayedRuntimeException; |
| } |
| |
| @Override |
| public String get() throws ExecutionException, InterruptedException { |
| delegateForDelayedRuntimeException.get(); |
| throw new RuntimeException(); |
| } |
| |
| @Override |
| public String get(long timeout, TimeUnit unit) |
| throws ExecutionException, InterruptedException, TimeoutException { |
| delegateForDelayedRuntimeException.get(timeout, unit); |
| throw new RuntimeException(); |
| } |
| }; |
| |
| final Runnable doNothing = |
| new Runnable() { |
| @Override |
| public void run() {} |
| }; |
| final Runnable finishSuccess = |
| new Runnable() { |
| @Override |
| public void run() { |
| delayedSuccess.set("b"); |
| } |
| }; |
| final Runnable finishFailure = |
| new Runnable() { |
| @Override |
| public void run() { |
| delayedFailed.setException(new Exception()); |
| } |
| }; |
| final Runnable finishCancelled = |
| new Runnable() { |
| @Override |
| public void run() { |
| delayedCancelled.cancel(true); |
| } |
| }; |
| final Runnable finishRuntimeException = |
| new Runnable() { |
| @Override |
| public void run() { |
| delegateForDelayedRuntimeException.set("Should never be seen"); |
| } |
| }; |
| |
| /** All the futures, together with human-readable names for use by {@link #smartToString}. */ |
| final ImmutableList<TestFuture> allFutures = |
| ImmutableList.of( |
| new TestFuture(doneSuccess, "doneSuccess", doNothing), |
| new TestFuture(doneFailed, "doneFailed", doNothing), |
| new TestFuture(doneCancelled, "doneCancelled", doNothing), |
| new TestFuture(doneRuntimeException, "doneRuntimeException", doNothing), |
| new TestFuture(delayedSuccess, "delayedSuccess", finishSuccess), |
| new TestFuture(delayedFailed, "delayedFailed", finishFailure), |
| new TestFuture(delayedCancelled, "delayedCancelled", finishCancelled), |
| new TestFuture( |
| delayedRuntimeException, "delayedRuntimeException", finishRuntimeException)); |
| |
| final Function<ListenableFuture<String>, String> nameGetter = |
| new Function<ListenableFuture<String>, String>() { |
| @Override |
| public String apply(ListenableFuture<String> input) { |
| for (TestFuture future : allFutures) { |
| if (future.future == input) { |
| return future.name; |
| } |
| } |
| throw new IllegalArgumentException(input.toString()); |
| } |
| }; |
| |
| static boolean intersect(Set<?> a, Set<?> b) { |
| return !intersection(a, b).isEmpty(); |
| } |
| |
| /** |
| * Like {@code inputs.toString()}, but with the nonsense {@code toString} representations |
| * replaced with the name of each future from {@link #allFutures}. |
| */ |
| String smartToString(ImmutableSet<ListenableFuture<String>> inputs) { |
| Iterable<String> inputNames = Iterables.transform(inputs, nameGetter); |
| return Joiner.on(", ").join(inputNames); |
| } |
| |
| void smartAssertTrue( |
| ImmutableSet<ListenableFuture<String>> inputs, Exception cause, boolean expression) { |
| if (!expression) { |
| throw failureWithCause(cause, smartToString(inputs)); |
| } |
| } |
| |
| boolean hasDelayed(ListenableFuture<String> a, ListenableFuture<String> b) { |
| ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); |
| return intersect( |
| inputs, |
| ImmutableSet.of( |
| delayedSuccess, delayedFailed, delayedCancelled, delayedRuntimeException)); |
| } |
| |
| void assertHasDelayed(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { |
| ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); |
| smartAssertTrue(inputs, e, hasDelayed(a, b)); |
| } |
| |
| void assertHasFailure(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { |
| ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); |
| smartAssertTrue( |
| inputs, |
| e, |
| intersect( |
| inputs, |
| ImmutableSet.of( |
| doneFailed, doneRuntimeException, delayedFailed, delayedRuntimeException))); |
| } |
| |
| void assertHasCancel(ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { |
| ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); |
| smartAssertTrue( |
| inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled, delayedCancelled))); |
| } |
| |
| void assertHasImmediateFailure( |
| ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { |
| ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); |
| smartAssertTrue( |
| inputs, e, intersect(inputs, ImmutableSet.of(doneFailed, doneRuntimeException))); |
| } |
| |
| void assertHasImmediateCancel( |
| ListenableFuture<String> a, ListenableFuture<String> b, Exception e) { |
| ImmutableSet<ListenableFuture<String>> inputs = ImmutableSet.of(a, b); |
| smartAssertTrue(inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled))); |
| } |
| } |
| |
| /** |
| * {@link Futures#allAsList(Iterable)} or {@link Futures#successfulAsList(Iterable)}, hidden |
| * behind a common interface for testing. |
| */ |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private interface Merger { |
| |
| ListenableFuture<List<String>> merged(ListenableFuture<String> a, ListenableFuture<String> b); |
| |
| Merger allMerger = |
| new Merger() { |
| @Override |
| public ListenableFuture<List<String>> merged( |
| ListenableFuture<String> a, ListenableFuture<String> b) { |
| return allAsList(ImmutableSet.of(a, b)); |
| } |
| }; |
| Merger successMerger = |
| new Merger() { |
| @Override |
| public ListenableFuture<List<String>> merged( |
| ListenableFuture<String> a, ListenableFuture<String> b) { |
| return successfulAsList(ImmutableSet.of(a, b)); |
| } |
| }; |
| } |
| |
| /** |
| * Very rough equivalent of a timed get, produced by calling the no-arg get method in another |
| * thread and waiting a short time for it. |
| * |
| * <p>We need this to test the behavior of no-arg get methods without hanging the main test thread |
| * forever in the case of failure. |
| */ |
| @CanIgnoreReturnValue |
| @GwtIncompatible // threads |
| static <V> V pseudoTimedGetUninterruptibly(final Future<V> input, long timeout, TimeUnit unit) |
| throws ExecutionException, TimeoutException { |
| ExecutorService executor = newSingleThreadExecutor(); |
| Future<V> waiter = |
| executor.submit( |
| new Callable<V>() { |
| @Override |
| public V call() throws Exception { |
| return input.get(); |
| } |
| }); |
| |
| try { |
| return getUninterruptibly(waiter, timeout, unit); |
| } catch (ExecutionException e) { |
| propagateIfInstanceOf(e.getCause(), ExecutionException.class); |
| propagateIfInstanceOf(e.getCause(), CancellationException.class); |
| throw failureWithCause(e, "Unexpected exception"); |
| } finally { |
| executor.shutdownNow(); |
| // TODO(cpovirk): assertTrue(awaitTerminationUninterruptibly(executor, 10, SECONDS)); |
| } |
| } |
| |
| /** |
| * For each possible pair of futures from {@link TestFutureBatch}, for each possible completion |
| * order of those futures, test that various get calls (timed before future completion, untimed |
| * before future completion, and untimed after future completion) return or throw the proper |
| * values. |
| */ |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static void runExtensiveMergerTest(Merger merger) throws InterruptedException { |
| int inputCount = new TestFutureBatch().allFutures.size(); |
| |
| for (int i = 0; i < inputCount; i++) { |
| for (int j = 0; j < inputCount; j++) { |
| for (boolean iBeforeJ : new boolean[] {true, false}) { |
| TestFutureBatch inputs = new TestFutureBatch(); |
| ListenableFuture<String> iFuture = inputs.allFutures.get(i).future; |
| ListenableFuture<String> jFuture = inputs.allFutures.get(j).future; |
| ListenableFuture<List<String>> future = merger.merged(iFuture, jFuture); |
| |
| // Test timed get before we've completed any delayed futures. |
| try { |
| List<String> result = future.get(0, MILLISECONDS); |
| assertTrue("Got " + result, asList("a", null).containsAll(result)); |
| } catch (CancellationException e) { |
| assertTrue(merger == Merger.allMerger); |
| inputs.assertHasImmediateCancel(iFuture, jFuture, e); |
| } catch (ExecutionException e) { |
| assertTrue(merger == Merger.allMerger); |
| inputs.assertHasImmediateFailure(iFuture, jFuture, e); |
| } catch (TimeoutException e) { |
| inputs.assertHasDelayed(iFuture, jFuture, e); |
| } |
| |
| // Same tests with pseudoTimedGet. |
| try { |
| List<String> result = |
| conditionalPseudoTimedGetUninterruptibly( |
| inputs, iFuture, jFuture, future, 20, MILLISECONDS); |
| assertTrue("Got " + result, asList("a", null).containsAll(result)); |
| } catch (CancellationException e) { |
| assertTrue(merger == Merger.allMerger); |
| inputs.assertHasImmediateCancel(iFuture, jFuture, e); |
| } catch (ExecutionException e) { |
| assertTrue(merger == Merger.allMerger); |
| inputs.assertHasImmediateFailure(iFuture, jFuture, e); |
| } catch (TimeoutException e) { |
| inputs.assertHasDelayed(iFuture, jFuture, e); |
| } |
| |
| // Finish the two futures in the currently specified order: |
| inputs.allFutures.get(iBeforeJ ? i : j).finisher.run(); |
| inputs.allFutures.get(iBeforeJ ? j : i).finisher.run(); |
| |
| // Test untimed get now that we've completed any delayed futures. |
| try { |
| List<String> result = getDone(future); |
| assertTrue("Got " + result, asList("a", "b", null).containsAll(result)); |
| } catch (CancellationException e) { |
| assertTrue(merger == Merger.allMerger); |
| inputs.assertHasCancel(iFuture, jFuture, e); |
| } catch (ExecutionException e) { |
| assertTrue(merger == Merger.allMerger); |
| inputs.assertHasFailure(iFuture, jFuture, e); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Call the non-timed {@link Future#get()} in a way that allows us to abort if it's expected to |
| * hang forever. More precisely, if it's expected to return, we simply call it[*], but if it's |
| * expected to hang (because one of the input futures that we know makes it up isn't done yet), |
| * then we call it in a separate thread (using pseudoTimedGet). The result is that we wait as long |
| * as necessary when the method is expected to return (at the cost of hanging forever if there is |
| * a bug in the class under test) but that we time out fairly promptly when the method is expected |
| * to hang (possibly too quickly, but too-quick failures should be very unlikely, given that we |
| * used to bail after 20ms during the expected-successful tests, and there we saw a failure rate |
| * of ~1/5000, meaning that the other thread's get() call nearly always completes within 20ms if |
| * it's going to complete at all). |
| * |
| * <p>[*] To avoid hangs, I've disabled the in-thread calls. This makes the test take (very |
| * roughly) 2.5s longer. (2.5s is also the maximum length of time we will wait for a timed get |
| * that is expected to succeed; the fact that the numbers match is only a coincidence.) See the |
| * comment below for how to restore the fast but hang-y version. |
| */ |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static List<String> conditionalPseudoTimedGetUninterruptibly( |
| TestFutureBatch inputs, |
| ListenableFuture<String> iFuture, |
| ListenableFuture<String> jFuture, |
| ListenableFuture<List<String>> future, |
| int timeout, |
| TimeUnit unit) |
| throws ExecutionException, TimeoutException { |
| /* |
| * For faster tests (that may hang indefinitely if the class under test has |
| * a bug!), switch the second branch to call untimed future.get() instead of |
| * pseudoTimedGet. |
| */ |
| return (inputs.hasDelayed(iFuture, jFuture)) |
| ? pseudoTimedGetUninterruptibly(future, timeout, unit) |
| : pseudoTimedGetUninterruptibly(future, 2500, MILLISECONDS); |
| } |
| |
| @GwtIncompatible // threads |
| public void testAllAsList_extensive() throws InterruptedException { |
| runExtensiveMergerTest(Merger.allMerger); |
| } |
| |
| @GwtIncompatible // threads |
| public void testSuccessfulAsList_extensive() throws InterruptedException { |
| runExtensiveMergerTest(Merger.successMerger); |
| } |
| |
| public void testSuccessfulAsList() throws Exception { |
| // Create input and output |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| SettableFuture<String> future3 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2, future3); |
| |
| // Attach a listener |
| SingleCallListener listener = new SingleCallListener(); |
| compound.addListener(listener, directExecutor()); |
| |
| // Satisfy each input and check the output |
| assertFalse(compound.isDone()); |
| future1.set(DATA1); |
| assertFalse(compound.isDone()); |
| future2.set(DATA2); |
| assertFalse(compound.isDone()); |
| listener.expectCall(); |
| future3.set(DATA3); |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); |
| } |
| |
| public void testSuccessfulAsList_emptyList() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| listener.expectCall(); |
| List<ListenableFuture<String>> futures = ImmutableList.of(); |
| ListenableFuture<List<String>> compound = successfulAsList(futures); |
| compound.addListener(listener, directExecutor()); |
| assertThat(getDone(compound)).isEmpty(); |
| assertTrue(listener.wasCalled()); |
| } |
| |
| public void testSuccessfulAsList_emptyArray() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| listener.expectCall(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(); |
| compound.addListener(listener, directExecutor()); |
| assertThat(getDone(compound)).isEmpty(); |
| assertTrue(listener.wasCalled()); |
| } |
| |
| public void testSuccessfulAsList_partialFailure() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2); |
| compound.addListener(listener, directExecutor()); |
| |
| assertFalse(compound.isDone()); |
| future1.setException(new Throwable("failed1")); |
| assertFalse(compound.isDone()); |
| listener.expectCall(); |
| future2.set(DATA2); |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(null, DATA2).inOrder(); |
| } |
| |
| public void testSuccessfulAsList_totalFailure() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2); |
| compound.addListener(listener, directExecutor()); |
| |
| assertFalse(compound.isDone()); |
| future1.setException(new Throwable("failed1")); |
| assertFalse(compound.isDone()); |
| listener.expectCall(); |
| future2.setException(new Throwable("failed2")); |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(null, null).inOrder(); |
| } |
| |
| public void testSuccessfulAsList_cancelled() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2); |
| compound.addListener(listener, directExecutor()); |
| |
| assertFalse(compound.isDone()); |
| future1.cancel(true); |
| assertFalse(compound.isDone()); |
| listener.expectCall(); |
| future2.set(DATA2); |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(null, DATA2).inOrder(); |
| } |
| |
| public void testSuccessfulAsList_resultCancelled() throws Exception { |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2); |
| |
| future2.set(DATA2); |
| assertFalse(compound.isDone()); |
| assertTrue(compound.cancel(false)); |
| assertTrue(compound.isCancelled()); |
| assertTrue(future1.isCancelled()); |
| assertFalse(future1.wasInterrupted()); |
| } |
| |
| public void testSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { |
| TestLogHandler listenerLoggerHandler = new TestLogHandler(); |
| Logger exceptionLogger = Logger.getLogger(AbstractFuture.class.getName()); |
| exceptionLogger.addHandler(listenerLoggerHandler); |
| try { |
| doTestSuccessfulAsList_resultCancelledRacingInputDone(); |
| |
| assertWithMessage("Nothing should be logged") |
| .that(listenerLoggerHandler.getStoredLogRecords()) |
| .isEmpty(); |
| } finally { |
| exceptionLogger.removeHandler(listenerLoggerHandler); |
| } |
| } |
| |
| private static void doTestSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { |
| // Simple (combined.cancel -> input.cancel -> setOneValue): |
| successfulAsList(ImmutableList.of(SettableFuture.create())).cancel(true); |
| |
| /* |
| * Complex (combined.cancel -> input.cancel -> other.set -> setOneValue), |
| * to show that this isn't just about problems with the input future we just |
| * cancelled: |
| */ |
| final SettableFuture<String> future1 = SettableFuture.create(); |
| final SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2); |
| |
| future1.addListener( |
| new Runnable() { |
| @Override |
| public void run() { |
| assertTrue(future1.isCancelled()); |
| /* |
| * This test relies on behavior that's unspecified but currently |
| * guaranteed by the implementation: Cancellation of inputs is |
| * performed in the order they were provided to the constructor. Verify |
| * that as a sanity check: |
| */ |
| assertFalse(future2.isCancelled()); |
| // Now attempt to trigger the exception: |
| future2.set(DATA2); |
| } |
| }, |
| directExecutor()); |
| assertTrue(compound.cancel(false)); |
| assertTrue(compound.isCancelled()); |
| assertTrue(future1.isCancelled()); |
| assertFalse(future2.isCancelled()); |
| |
| try { |
| getDone(compound); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| public void testSuccessfulAsList_resultInterrupted() throws Exception { |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2); |
| |
| future2.set(DATA2); |
| assertFalse(compound.isDone()); |
| assertTrue(compound.cancel(true)); |
| assertTrue(compound.isCancelled()); |
| assertTrue(future1.isCancelled()); |
| assertTrue(future1.wasInterrupted()); |
| } |
| |
| public void testSuccessfulAsList_mixed() throws Exception { |
| SingleCallListener listener = new SingleCallListener(); |
| SettableFuture<String> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| SettableFuture<String> future3 = SettableFuture.create(); |
| @SuppressWarnings("unchecked") // array is never modified |
| ListenableFuture<List<String>> compound = successfulAsList(future1, future2, future3); |
| compound.addListener(listener, directExecutor()); |
| |
| // First is cancelled, second fails, third succeeds |
| assertFalse(compound.isDone()); |
| future1.cancel(true); |
| assertFalse(compound.isDone()); |
| future2.setException(new Throwable("failed2")); |
| assertFalse(compound.isDone()); |
| listener.expectCall(); |
| future3.set(DATA3); |
| assertTrue(listener.wasCalled()); |
| |
| List<String> results = getDone(compound); |
| assertThat(results).containsExactly(null, null, DATA3).inOrder(); |
| } |
| |
| /** Non-Error exceptions are never logged. */ |
| @SuppressWarnings("unchecked") |
| public void testSuccessfulAsList_logging_exception() throws Exception { |
| assertEquals( |
| newArrayList((Object) null), |
| getDone(successfulAsList(immediateFailedFuture(new MyException())))); |
| assertWithMessage("Nothing should be logged") |
| .that(aggregateFutureLogHandler.getStoredLogRecords()) |
| .isEmpty(); |
| |
| // Not even if there are a bunch of failures. |
| assertEquals( |
| newArrayList(null, null, null), |
| getDone( |
| successfulAsList( |
| immediateFailedFuture(new MyException()), |
| immediateFailedFuture(new MyException()), |
| immediateFailedFuture(new MyException())))); |
| assertWithMessage("Nothing should be logged") |
| .that(aggregateFutureLogHandler.getStoredLogRecords()) |
| .isEmpty(); |
| } |
| |
| /** Ensure that errors are always logged. */ |
| @SuppressWarnings("unchecked") |
| public void testSuccessfulAsList_logging_error() throws Exception { |
| assertEquals( |
| newArrayList((Object) null), |
| getDone(successfulAsList(immediateFailedFuture(new MyError())))); |
| List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); |
| assertThat(logged).hasSize(1); // errors are always logged |
| assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); |
| } |
| |
| public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { |
| ListenableFuture<String> input = new CancelPanickingFuture<>(); |
| ListenableFuture<List<String>> output = successfulAsList(input); |
| output.cancel(false); |
| |
| List<LogRecord> logged = aggregateFutureLogHandler.getStoredLogRecords(); |
| assertThat(logged).hasSize(1); |
| assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); |
| } |
| |
| private static final class CancelPanickingFuture<V> extends AbstractFuture<V> { |
| @Override |
| public boolean cancel(boolean mayInterruptIfRunning) { |
| setException(new Error("You can't fire me, I quit.")); |
| return false; |
| } |
| } |
| |
| public void testNonCancellationPropagating_successful() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); |
| Foo foo = new Foo(); |
| |
| assertFalse(wrapper.isDone()); |
| input.set(foo); |
| assertTrue(wrapper.isDone()); |
| assertSame(foo, getDone(wrapper)); |
| } |
| |
| public void testNonCancellationPropagating_failure() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); |
| Throwable failure = new Throwable("thrown"); |
| |
| assertFalse(wrapper.isDone()); |
| input.setException(failure); |
| try { |
| getDone(wrapper); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertSame(failure, expected.getCause()); |
| } |
| } |
| |
| public void testNonCancellationPropagating_delegateCancelled() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); |
| |
| assertFalse(wrapper.isDone()); |
| assertTrue(input.cancel(false)); |
| assertTrue(wrapper.isCancelled()); |
| } |
| |
| public void testNonCancellationPropagating_doesNotPropagate() throws Exception { |
| SettableFuture<Foo> input = SettableFuture.create(); |
| ListenableFuture<Foo> wrapper = nonCancellationPropagating(input); |
| |
| assertTrue(wrapper.cancel(true)); |
| assertTrue(wrapper.isCancelled()); |
| assertTrue(wrapper.isDone()); |
| assertFalse(input.isCancelled()); |
| assertFalse(input.isDone()); |
| } |
| |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private static class TestException extends Exception { |
| |
| TestException(@Nullable Throwable cause) { |
| super(cause); |
| } |
| } |
| |
| @GwtIncompatible // used only in GwtIncompatible tests |
| private interface MapperFunction extends Function<Throwable, Exception> {} |
| |
| public void testCompletionOrder() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| SettableFuture<Long> future3 = SettableFuture.create(); |
| SettableFuture<Long> future4 = SettableFuture.create(); |
| SettableFuture<Long> future5 = SettableFuture.create(); |
| |
| ImmutableList<ListenableFuture<Long>> futures = |
| inCompletionOrder( |
| ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); |
| future2.set(1L); |
| future5.set(2L); |
| future1.set(3L); |
| future3.set(4L); |
| future4.set(5L); |
| |
| long expectedResult = 1L; |
| for (ListenableFuture<Long> future : futures) { |
| assertEquals((Long) expectedResult, getDone(future)); |
| expectedResult++; |
| } |
| } |
| |
| public void testCompletionOrderExceptionThrown() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| SettableFuture<Long> future3 = SettableFuture.create(); |
| SettableFuture<Long> future4 = SettableFuture.create(); |
| SettableFuture<Long> future5 = SettableFuture.create(); |
| |
| ImmutableList<ListenableFuture<Long>> futures = |
| inCompletionOrder( |
| ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); |
| future2.set(1L); |
| future5.setException(new IllegalStateException("2L")); |
| future1.set(3L); |
| future3.set(4L); |
| future4.set(5L); |
| |
| long expectedResult = 1L; |
| for (ListenableFuture<Long> future : futures) { |
| if (expectedResult != 2) { |
| assertEquals((Long) expectedResult, getDone(future)); |
| } else { |
| try { |
| getDone(future); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected).hasCauseThat().hasMessageThat().isEqualTo("2L"); |
| } |
| } |
| expectedResult++; |
| } |
| } |
| |
| public void testCompletionOrderFutureCancelled() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| SettableFuture<Long> future3 = SettableFuture.create(); |
| SettableFuture<Long> future4 = SettableFuture.create(); |
| SettableFuture<Long> future5 = SettableFuture.create(); |
| |
| ImmutableList<ListenableFuture<Long>> futures = |
| inCompletionOrder( |
| ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3, future4, future5)); |
| future2.set(1L); |
| future5.set(2L); |
| future1.set(3L); |
| future3.cancel(true); |
| future4.set(5L); |
| |
| long expectedResult = 1L; |
| for (ListenableFuture<Long> future : futures) { |
| if (expectedResult != 4) { |
| assertEquals((Long) expectedResult, getDone(future)); |
| } else { |
| try { |
| getDone(future); |
| fail(); |
| } catch (CancellationException expected) { |
| } |
| } |
| expectedResult++; |
| } |
| } |
| |
| public void testCompletionOrderFutureInterruption() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| SettableFuture<Long> future3 = SettableFuture.create(); |
| |
| ImmutableList<ListenableFuture<Long>> futures = |
| inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3)); |
| future2.set(1L); |
| |
| futures.get(1).cancel(true); |
| futures.get(2).cancel(false); |
| |
| assertTrue(future1.isCancelled()); |
| assertFalse(future1.wasInterrupted()); |
| assertTrue(future3.isCancelled()); |
| assertFalse(future3.wasInterrupted()); |
| } |
| |
| public void testCancellingADelegatePropagates() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| SettableFuture<Long> future3 = SettableFuture.create(); |
| |
| ImmutableList<ListenableFuture<Long>> delegates = |
| inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2, future3)); |
| |
| future1.set(1L); |
| // Cannot cancel a complete delegate |
| assertFalse(delegates.get(0).cancel(true)); |
| // Cancel the delegate before the input future is done |
| assertTrue(delegates.get(1).cancel(true)); |
| // Setting the future still works since cancellation didn't propagate |
| assertTrue(future2.set(2L)); |
| // Second check to ensure the input future was not cancelled |
| assertEquals((Long) 2L, getDone(future2)); |
| |
| // All futures are now complete; outstanding inputs are cancelled |
| assertTrue(future3.isCancelled()); |
| assertTrue(future3.wasInterrupted()); |
| } |
| |
| @AndroidIncompatible // runs out of memory under some versions of the emulator |
| public void testCancellingAllDelegatesIsNotQuadratic() throws Exception { |
| ImmutableList.Builder<SettableFuture<Long>> builder = ImmutableList.builder(); |
| for (int i = 0; i < 500_000; i++) { |
| builder.add(SettableFuture.<Long>create()); |
| } |
| ImmutableList<SettableFuture<Long>> inputs = builder.build(); |
| ImmutableList<ListenableFuture<Long>> delegates = inCompletionOrder(inputs); |
| |
| for (ListenableFuture<?> delegate : delegates) { |
| delegate.cancel(true); |
| } |
| |
| for (ListenableFuture<?> input : inputs) { |
| assertTrue(input.isDone()); |
| } |
| } |
| |
| @AndroidIncompatible // reference is never cleared under some versions of the emulator |
| @GwtIncompatible |
| public void testInputGCedIfUnreferenced() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<Long> future2 = SettableFuture.create(); |
| WeakReference<SettableFuture<Long>> future1Ref = new WeakReference<>(future1); |
| WeakReference<SettableFuture<Long>> future2Ref = new WeakReference<>(future2); |
| |
| ImmutableList<ListenableFuture<Long>> delegates = |
| inCompletionOrder(ImmutableList.<ListenableFuture<Long>>of(future1, future2)); |
| |
| future1.set(1L); |
| |
| future1 = null; |
| // First future is complete, should be unreferenced |
| GcFinalization.awaitClear(future1Ref); |
| ListenableFuture<Long> outputFuture1 = delegates.get(0); |
| delegates = null; |
| future2 = null; |
| // No references to list or other output future, second future should be unreferenced |
| GcFinalization.awaitClear(future2Ref); |
| outputFuture1.get(); |
| } |
| |
| // Mostly an example of how it would look like to use a list of mixed types |
| public void testCompletionOrderMixedBagOTypes() throws Exception { |
| SettableFuture<Long> future1 = SettableFuture.create(); |
| SettableFuture<String> future2 = SettableFuture.create(); |
| SettableFuture<Integer> future3 = SettableFuture.create(); |
| |
| ImmutableList<? extends ListenableFuture<?>> inputs = |
| ImmutableList.<ListenableFuture<?>>of(future1, future2, future3); |
| ImmutableList<ListenableFuture<Object>> futures = inCompletionOrder(inputs); |
| future2.set("1L"); |
| future1.set(2L); |
| future3.set(3); |
| |
| ImmutableList<?> expected = ImmutableList.of("1L", 2L, 3); |
| for (int i = 0; i < expected.size(); i++) { |
| assertEquals(expected.get(i), getDone(futures.get(i))); |
| } |
| } |
| |
| @GwtIncompatible // ClassSanityTester |
| public void testFutures_nullChecks() throws Exception { |
| new ClassSanityTester() |
| .forAllPublicStaticMethods(Futures.class) |
| .thatReturn(Future.class) |
| .testNulls(); |
| } |
| |
| static AssertionFailedError failureWithCause(Throwable cause, String message) { |
| AssertionFailedError failure = new AssertionFailedError(message); |
| failure.initCause(cause); |
| return failure; |
| } |
| |
| // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to |
| // never complete when timing out. Notably, nothing would get logged since the Error would get |
| // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. |
| |
| // Simulate a timeout that fires before the call the SES.schedule returns but the future is |
| // already completed. |
| |
| // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to |
| // never complete when timing out. Notably, nothing would get logged since the Error would get |
| // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. |
| |
| private static final Executor REJECTING_EXECUTOR = |
| new Executor() { |
| @Override |
| public void execute(Runnable runnable) { |
| throw new RejectedExecutionException(); |
| } |
| }; |
| |
| private static <V> AsyncFunction<V, V> asyncIdentity() { |
| return new AsyncFunction<V, V>() { |
| @Override |
| public ListenableFuture<V> apply(V input) { |
| return immediateFuture(input); |
| } |
| }; |
| } |
| |
| private static <I, O> AsyncFunction<I, O> tagged( |
| final String toString, final AsyncFunction<I, O> function) { |
| return new AsyncFunction<I, O>() { |
| @Override |
| public ListenableFuture<O> apply(I input) throws Exception { |
| return function.apply(input); |
| } |
| |
| @Override |
| public String toString() { |
| return toString; |
| } |
| }; |
| } |
| |
| private static <V> AsyncCallable<V> tagged( |
| final String toString, final AsyncCallable<V> callable) { |
| return new AsyncCallable<V>() { |
| @Override |
| public ListenableFuture<V> call() throws Exception { |
| return callable.call(); |
| } |
| |
| @Override |
| public String toString() { |
| return toString; |
| } |
| }; |
| } |
| } |