| /* |
| * Copyright (C) 2017 The Guava Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.google.common.util.concurrent; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.common.collect.Lists.asList; |
| import static com.google.common.truth.Truth.assertThat; |
| import static com.google.common.truth.Truth.assertWithMessage; |
| import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; |
| import static com.google.common.util.concurrent.Futures.immediateFailedFuture; |
| import static com.google.common.util.concurrent.Futures.immediateFuture; |
| import static com.google.common.util.concurrent.MoreExecutors.directExecutor; |
| import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; |
| import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; |
| import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; |
| import static java.util.Arrays.asList; |
| import static java.util.concurrent.Executors.newSingleThreadExecutor; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.timeout; |
| import static org.mockito.Mockito.verify; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.reflect.Reflection; |
| import com.google.common.truth.FailureStrategy; |
| import com.google.common.truth.StandardSubjectBuilder; |
| import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction; |
| import com.google.common.util.concurrent.ClosingFuture.ClosingCallable; |
| import com.google.common.util.concurrent.ClosingFuture.ClosingFunction; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner2.AsyncClosingFunction2; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner2.ClosingFunction2; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner3.ClosingFunction3; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner4.ClosingFunction4; |
| import com.google.common.util.concurrent.ClosingFuture.Combiner5.ClosingFunction5; |
| import com.google.common.util.concurrent.ClosingFuture.DeferredCloser; |
| import com.google.common.util.concurrent.ClosingFuture.Peeker; |
| import com.google.common.util.concurrent.ClosingFuture.ValueAndCloser; |
| import com.google.common.util.concurrent.ClosingFuture.ValueAndCloserConsumer; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.atomic.AtomicReference; |
| import junit.framework.TestCase; |
| import org.mockito.Mockito; |
| |
| /** |
| * Tests for {@link ClosingFuture}. Subclasses exercise either the {@link |
| * ClosingFuture#finishToFuture()} or {@link |
| * ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} paths to complete a |
| * {@link ClosingFuture} pipeline. |
| */ |
| public abstract class ClosingFutureTest extends TestCase { |
| // TODO(dpb): Use Expect once that supports JUnit 3, or we can use JUnit 4. |
| final List<AssertionError> failures = new ArrayList<>(); |
| final StandardSubjectBuilder expect = |
| StandardSubjectBuilder.forCustomFailureStrategy( |
| new FailureStrategy() { |
| @Override |
| public void fail(AssertionError failure) { |
| failures.add(failure); |
| } |
| }); |
| |
| final ListeningExecutorService executor = |
| MoreExecutors.listeningDecorator(newSingleThreadExecutor()); |
| final ExecutorService closingExecutor = newSingleThreadExecutor(); |
| |
| final TestCloseable closeable1 = new TestCloseable("closeable1"); |
| final TestCloseable closeable2 = new TestCloseable("closeable2"); |
| final TestCloseable closeable3 = new TestCloseable("closeable3"); |
| final TestCloseable closeable4 = new TestCloseable("closeable4"); |
| |
| final Waiter waiter = new Waiter(); |
| final CountDownLatch futureCancelled = new CountDownLatch(1); |
| final Exception exception = new Exception(); |
| final Closeable mockCloseable = Mockito.mock(Closeable.class); |
| |
| @Override |
| protected void tearDown() throws Exception { |
| assertNoExpectedFailures(); |
| super.tearDown(); |
| } |
| |
| public void testFrom() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.from(executor.submit(Callables.returning(closeable1))) |
| .transform( |
| new ClosingFunction<TestCloseable, String>() { |
| @Override |
| public String apply(DeferredCloser closer, TestCloseable v) throws Exception { |
| assertThat(v).isSameInstanceAs(closeable1); |
| return "value"; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| } |
| |
| public void testFrom_failedInput() throws Exception { |
| assertFinallyFailsWithException(failedClosingFuture()); |
| } |
| |
| public void testFrom_cancelledInput() throws Exception { |
| assertBecomesCanceled(ClosingFuture.from(immediateCancelledFuture())); |
| } |
| |
| public void testEventuallyClosing() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor) |
| .transform( |
| new ClosingFunction<TestCloseable, String>() { |
| @Override |
| public String apply(DeferredCloser closer, TestCloseable v) throws Exception { |
| assertThat(v).isSameInstanceAs(closeable1); |
| assertStillOpen(closeable1); |
| return "value"; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testEventuallyClosing_failedInput() throws Exception { |
| assertFinallyFailsWithException( |
| ClosingFuture.eventuallyClosing( |
| Futures.<Closeable>immediateFailedFuture(exception), closingExecutor)); |
| } |
| |
| public void testEventuallyClosing_cancelledInput() throws Exception { |
| assertBecomesCanceled( |
| ClosingFuture.eventuallyClosing( |
| Futures.<Closeable>immediateCancelledFuture(), closingExecutor)); |
| } |
| |
| public void testEventuallyClosing_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.eventuallyClosing( |
| executor.submit( |
| waiter.waitFor( |
| new Callable<TestCloseable>() { |
| @Override |
| public TestCloseable call() throws InterruptedException { |
| awaitUninterruptibly(futureCancelled); |
| return closeable1; |
| } |
| })), |
| closingExecutor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the callable returns |
| assertStillOpen(closeable1); |
| waiter.awaitReturned(); |
| assertClosed(closeable1); |
| } |
| |
| public void testEventuallyClosing_throws() throws Exception { |
| assertFinallyFailsWithException( |
| ClosingFuture.eventuallyClosing( |
| executor.submit( |
| new Callable<TestCloseable>() { |
| @Override |
| public TestCloseable call() throws Exception { |
| throw exception; |
| } |
| }), |
| closingExecutor)); |
| } |
| |
| public void testSubmit() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<TestCloseable>() { |
| @Override |
| public TestCloseable call(DeferredCloser closer) throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }, |
| executor) |
| .transform( |
| new ClosingFunction<TestCloseable, String>() { |
| @Override |
| public String apply(DeferredCloser closer, TestCloseable v) throws Exception { |
| assertThat(v).isSameInstanceAs(closeable3); |
| assertStillOpen(closeable1, closeable2, closeable3); |
| return "value"; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testSubmit_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.submit( |
| waiter.waitFor( |
| new ClosingCallable<TestCloseable>() { |
| @Override |
| public TestCloseable call(DeferredCloser closer) throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testSubmit_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<Object>() { |
| @Override |
| public Object call(DeferredCloser closer) throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testStatusFuture() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.submit( |
| waiter.waitFor( |
| new ClosingCallable<String>() { |
| @Override |
| public String call(DeferredCloser closer) throws Exception { |
| return "value"; |
| } |
| }), |
| executor); |
| ListenableFuture<?> statusFuture = closingFuture.statusFuture(); |
| waiter.awaitStarted(); |
| assertThat(statusFuture.isDone()).isFalse(); |
| waiter.awaitReturned(); |
| assertThat(getUninterruptibly(statusFuture)).isNull(); |
| } |
| |
| public void testStatusFuture_failure() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.submit( |
| waiter.waitFor( |
| new ClosingCallable<String>() { |
| @Override |
| public String call(DeferredCloser closer) throws Exception { |
| throw exception; |
| } |
| }), |
| executor); |
| ListenableFuture<?> statusFuture = closingFuture.statusFuture(); |
| waiter.awaitStarted(); |
| assertThat(statusFuture.isDone()).isFalse(); |
| waiter.awaitReturned(); |
| assertThatFutureFailsWithException(statusFuture); |
| } |
| |
| public void testStatusFuture_cancelDoesNothing() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.submit( |
| waiter.waitFor( |
| new ClosingCallable<String>() { |
| @Override |
| public String call(DeferredCloser closer) throws Exception { |
| return "value"; |
| } |
| }), |
| executor); |
| ListenableFuture<?> statusFuture = closingFuture.statusFuture(); |
| waiter.awaitStarted(); |
| assertThat(statusFuture.isDone()).isFalse(); |
| statusFuture.cancel(true); |
| assertThat(statusFuture.isCancelled()).isTrue(); |
| waiter.awaitReturned(); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| } |
| |
| public void testCancel_caught() throws Exception { |
| ClosingFuture<String> step0 = ClosingFuture.from(immediateFuture("value 0")); |
| ClosingFuture<String> step1 = |
| step0.transform( |
| new ClosingFunction<String, String>() { |
| @Override |
| public String apply(DeferredCloser closer, String v) throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| return "value 1"; |
| } |
| }, |
| executor); |
| Waiter step2Waiter = new Waiter(); |
| ClosingFuture<String> step2 = |
| step1.transform( |
| step2Waiter.waitFor( |
| new ClosingFunction<String, String>() { |
| @Override |
| public String apply(DeferredCloser closer, String v) throws Exception { |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return "value 2"; |
| } |
| }), |
| executor); |
| ClosingFuture<String> step3 = |
| step2.transform( |
| new ClosingFunction<String, String>() { |
| @Override |
| public String apply(DeferredCloser closer, String input) throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| return "value 3"; |
| } |
| }, |
| executor); |
| Waiter step4Waiter = new Waiter(); |
| ClosingFuture<String> step4 = |
| step3.catching( |
| CancellationException.class, |
| step4Waiter.waitFor( |
| new ClosingFunction<CancellationException, String>() { |
| @Override |
| public String apply(DeferredCloser closer, CancellationException input) |
| throws Exception { |
| closer.eventuallyClose(closeable4, closingExecutor); |
| return "value 4"; |
| } |
| }), |
| executor); |
| |
| // Pause in step 2. |
| step2Waiter.awaitStarted(); |
| |
| // Everything should still be open. |
| assertStillOpen(closeable1, closeable2, closeable3, closeable4); |
| |
| // Cancel step 3, resume step 2, and pause in step 4. |
| assertWithMessage("step3.cancel()").that(step3.cancel(false)).isTrue(); |
| step2Waiter.awaitReturned(); |
| step4Waiter.awaitStarted(); |
| |
| // Step 1 is not cancelled because it was done. |
| assertWithMessage("step1.statusFuture().isCancelled()") |
| .that(step1.statusFuture().isCancelled()) |
| .isFalse(); |
| // But its closeable is closed. |
| assertClosed(closeable1); |
| |
| // Step 2 is cancelled because it wasn't complete. |
| assertWithMessage("step2.statusFuture().isCancelled()") |
| .that(step2.statusFuture().isCancelled()) |
| .isTrue(); |
| // Its closeable is closed. |
| assertClosed(closeable2); |
| |
| // Step 3 was cancelled before it began |
| assertWithMessage("step3.statusFuture().isCancelled()") |
| .that(step3.statusFuture().isCancelled()) |
| .isTrue(); |
| // Its closeable is still open. |
| assertStillOpen(closeable3); |
| |
| // Step 4 is not cancelled, because it caught the cancellation. |
| assertWithMessage("step4.statusFuture().isCancelled()") |
| .that(step4.statusFuture().isCancelled()) |
| .isFalse(); |
| // Its closeable isn't closed yet. |
| assertStillOpen(closeable4); |
| |
| // Resume step 4 and complete. |
| step4Waiter.awaitReturned(); |
| assertThat(getFinalValue(step4)).isEqualTo("value 4"); |
| |
| // Step 4's closeable is now closed. |
| assertClosed(closeable4); |
| // Step 3 still never ran, so its closeable should still be open. |
| assertStillOpen(closeable3); |
| } |
| |
| public void testTransform() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.from(immediateFuture("value")) |
| .transform( |
| new ClosingFunction<String, TestCloseable>() { |
| @Override |
| public TestCloseable apply(DeferredCloser closer, String v) throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }, |
| executor) |
| .transform( |
| new ClosingFunction<TestCloseable, String>() { |
| @Override |
| public String apply(DeferredCloser closer, TestCloseable v) throws Exception { |
| assertThat(v).isSameInstanceAs(closeable3); |
| assertStillOpen(closeable1, closeable2, closeable3); |
| return "value"; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testTransform_cancelledPipeline() throws Exception { |
| String value = "value"; |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.from(immediateFuture(value)) |
| .transform( |
| new ClosingFunction<String, TestCloseable>() { |
| @Override |
| public TestCloseable apply(DeferredCloser closer, String v) throws Exception { |
| return closer.eventuallyClose(closeable1, closingExecutor); |
| } |
| }, |
| executor) |
| .transform( |
| waiter.waitFor( |
| new ClosingFunction<TestCloseable, TestCloseable>() { |
| @Override |
| public TestCloseable apply(DeferredCloser closer, TestCloseable v) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| closer.eventuallyClose(closeable3, closingExecutor); |
| return closeable4; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2, closeable3); |
| assertStillOpen(closeable4); |
| } |
| |
| public void testTransform_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.from(immediateFuture("value")) |
| .transform( |
| new ClosingFunction<String, Object>() { |
| @Override |
| public Object apply(DeferredCloser closer, String v) throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testTransformAsync() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.from(immediateFuture("value")) |
| .transformAsync( |
| new AsyncClosingFunction<String, TestCloseable>() { |
| @Override |
| public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v) |
| throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return ClosingFuture.eventuallyClosing( |
| immediateFuture(closeable3), closingExecutor); |
| } |
| }, |
| executor) |
| .transform( |
| new ClosingFunction<TestCloseable, String>() { |
| @Override |
| public String apply(DeferredCloser closer, TestCloseable v) throws Exception { |
| assertThat(v).isSameInstanceAs(closeable3); |
| assertStillOpen(closeable1, closeable2, closeable3); |
| return "value"; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2, closeable3); |
| } |
| |
| public void testTransformAsync_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.from(immediateFuture("value")) |
| .transformAsync( |
| waiter.waitFor( |
| new AsyncClosingFunction<String, TestCloseable>() { |
| @Override |
| public ClosingFuture<TestCloseable> apply(DeferredCloser closer, String v) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return ClosingFuture.eventuallyClosing( |
| immediateFuture(closeable3), closingExecutor); |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the function returns |
| assertStillOpen(closeable1, closeable2, closeable3); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2, closeable3); |
| } |
| |
| public void testTransformAsync_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.from(immediateFuture("value")) |
| .transformAsync( |
| new AsyncClosingFunction<String, Object>() { |
| @Override |
| public ClosingFuture<Object> apply(DeferredCloser closer, String v) |
| throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testTransformAsync_failed() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.from(immediateFuture("value")) |
| .transformAsync( |
| new AsyncClosingFunction<String, Object>() { |
| @Override |
| public ClosingFuture<Object> apply(DeferredCloser closer, String v) |
| throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return failedClosingFuture(); |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testTransformAsync_withoutCloser() throws Exception { |
| ClosingFuture<String> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<TestCloseable>() { |
| @Override |
| public TestCloseable call(DeferredCloser closer) throws Exception { |
| return closer.eventuallyClose(closeable1, closingExecutor); |
| } |
| }, |
| executor) |
| .transformAsync( |
| ClosingFuture.withoutCloser( |
| new AsyncFunction<TestCloseable, String>() { |
| @Override |
| public ListenableFuture<String> apply(TestCloseable v) throws Exception { |
| assertThat(v).isSameInstanceAs(closeable1); |
| assertStillOpen(closeable1); |
| return immediateFuture("value"); |
| } |
| }), |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo("value"); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testWhenAllComplete_call() throws Exception { |
| final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1")); |
| final ClosingFuture<Object> input2Failed = failedClosingFuture(); |
| final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3")); |
| final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>(); |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed)) |
| .call( |
| new CombiningCallable<TestCloseable>() { |
| @Override |
| public TestCloseable call(DeferredCloser closer, Peeker peeker) throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| assertThat(peeker.getDone(input1)).isSameInstanceAs("value1"); |
| try { |
| peeker.getDone(input2Failed); |
| fail("Peeker.getDone() should fail for failed inputs"); |
| } catch (ExecutionException expected) { |
| } |
| try { |
| peeker.getDone(nonInput); |
| fail("Peeker should not be able to peek into non-input ClosingFuture."); |
| } catch (IllegalArgumentException expected) { |
| } |
| capturedPeeker.set(peeker); |
| return closeable2; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable2); |
| assertClosed(closeable1); |
| try { |
| capturedPeeker.get().getDone(input1); |
| fail("Peeker should not be able to peek except during call."); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| public void testWhenAllComplete_call_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllComplete( |
| ImmutableList.of( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) |
| .call( |
| waiter.waitFor( |
| new CombiningCallable<TestCloseable>() { |
| @Override |
| public TestCloseable call(DeferredCloser closer, Peeker peeker) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| return closeable3; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testWhenAllComplete_call_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllComplete( |
| ImmutableList.of( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) |
| .call( |
| new CombiningCallable<Object>() { |
| @Override |
| public Object call(DeferredCloser closer, Peeker peeker) throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| public void testWhenAllComplete_callAsync() throws Exception { |
| final ClosingFuture<String> input1 = ClosingFuture.from(immediateFuture("value1")); |
| final ClosingFuture<Object> input2Failed = failedClosingFuture(); |
| final ClosingFuture<String> nonInput = ClosingFuture.from(immediateFuture("value3")); |
| final AtomicReference<ClosingFuture.Peeker> capturedPeeker = new AtomicReference<>(); |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllComplete(ImmutableList.of(input1, input2Failed)) |
| .callAsync( |
| new AsyncCombiningCallable<TestCloseable>() { |
| @Override |
| public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker) |
| throws Exception { |
| closer.eventuallyClose(closeable1, closingExecutor); |
| assertThat(peeker.getDone(input1)).isSameInstanceAs("value1"); |
| try { |
| peeker.getDone(input2Failed); |
| fail("Peeker should fail for failed inputs"); |
| } catch (ExecutionException expected) { |
| } |
| try { |
| peeker.getDone(nonInput); |
| fail("Peeker should not be able to peek into non-input ClosingFuture."); |
| } catch (IllegalArgumentException expected) { |
| } |
| capturedPeeker.set(peeker); |
| return ClosingFuture.eventuallyClosing( |
| immediateFuture(closeable2), closingExecutor); |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| try { |
| capturedPeeker.get().getDone(input1); |
| fail("Peeker should not be able to peek except during call."); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| public void testWhenAllComplete_callAsync_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllComplete( |
| ImmutableList.of( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) |
| .callAsync( |
| waiter.waitFor( |
| new AsyncCombiningCallable<TestCloseable>() { |
| @Override |
| public ClosingFuture<TestCloseable> call(DeferredCloser closer, Peeker peeker) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| return ClosingFuture.eventuallyClosing( |
| immediateFuture(closeable3), closingExecutor); |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2, closeable3); |
| } |
| |
| public void testWhenAllComplete_callAsync_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllComplete( |
| ImmutableList.of( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor))) |
| .callAsync( |
| new AsyncCombiningCallable<Object>() { |
| @Override |
| public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker) |
| throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| // We don't need to test the happy case for SuccessfulCombiner.call(Async) because it's the same |
| // as Combiner. |
| |
| public void testWhenAllSucceed_call_failedInput() throws Exception { |
| assertFinallyFailsWithException( |
| ClosingFuture.whenAllSucceed( |
| ImmutableList.of( |
| ClosingFuture.from(immediateFuture("value")), failedClosingFuture())) |
| .call( |
| new CombiningCallable<Object>() { |
| @Override |
| public Object call(DeferredCloser closer, Peeker peeker) throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor)); |
| } |
| |
| public void testWhenAllSucceed_callAsync_failedInput() throws Exception { |
| assertFinallyFailsWithException( |
| ClosingFuture.whenAllSucceed( |
| ImmutableList.of( |
| ClosingFuture.from(immediateFuture("value")), failedClosingFuture())) |
| .callAsync( |
| new AsyncCombiningCallable<Object>() { |
| @Override |
| public ClosingFuture<Object> call(DeferredCloser closer, Peeker peeker) |
| throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor)); |
| } |
| |
| public void testWhenAllSucceed2_call() throws ExecutionException, IOException { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| ClosingFuture.from(immediateFuture("value1"))) |
| .call( |
| new ClosingFunction2<TestCloseable, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply(DeferredCloser closer, TestCloseable v1, String v2) |
| throws Exception { |
| assertThat(v1).isEqualTo(closeable1); |
| assertThat(v2).isEqualTo("value1"); |
| assertStillOpen(closeable1); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable2; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testWhenAllSucceed2_call_failedInput() throws ExecutionException, IOException { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| failedClosingFuture()) |
| .call( |
| new ClosingFunction2<TestCloseable, Object, Object>() { |
| @Override |
| public Object apply(DeferredCloser closer, TestCloseable v1, Object v2) |
| throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testWhenAllSucceed2_call_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.from(immediateFuture(closeable2))) |
| .call( |
| waiter.waitFor( |
| new ClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, TestCloseable v1, TestCloseable v2) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the function returns |
| assertStillOpen(closeable1, closeable2); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testWhenAllSucceed2_call_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)) |
| .call( |
| new ClosingFunction2<TestCloseable, TestCloseable, Object>() { |
| @Override |
| public Object apply(DeferredCloser closer, TestCloseable v1, TestCloseable v2) |
| throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| public void testWhenAllSucceed2_callAsync() throws ExecutionException, IOException { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| ClosingFuture.from(immediateFuture("value1"))) |
| .callAsync( |
| new AsyncClosingFunction2<TestCloseable, String, TestCloseable>() { |
| @Override |
| public ClosingFuture<TestCloseable> apply( |
| DeferredCloser closer, TestCloseable v1, String v2) throws Exception { |
| assertThat(v1).isEqualTo(closeable1); |
| assertThat(v2).isEqualTo("value1"); |
| assertStillOpen(closeable1); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return ClosingFuture.eventuallyClosing( |
| immediateFuture(closeable3), closingExecutor); |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable3); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2, closeable3); |
| } |
| |
| public void testWhenAllSucceed2_callAsync_failedInput() throws ExecutionException, IOException { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| failedClosingFuture()) |
| .callAsync( |
| new AsyncClosingFunction2<TestCloseable, Object, Object>() { |
| @Override |
| public ClosingFuture<Object> apply( |
| DeferredCloser closer, TestCloseable v1, Object v2) throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testWhenAllSucceed2_callAsync_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.from(immediateFuture(closeable2))) |
| .callAsync( |
| waiter.waitFor( |
| new AsyncClosingFunction2<TestCloseable, TestCloseable, TestCloseable>() { |
| @Override |
| public ClosingFuture<TestCloseable> apply( |
| DeferredCloser closer, TestCloseable v1, TestCloseable v2) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return ClosingFuture.eventuallyClosing( |
| immediateFuture(closeable3), closingExecutor); |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the function returns |
| assertStillOpen(closeable1, closeable2, closeable3); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2, closeable3); |
| } |
| |
| public void testWhenAllSucceed2_callAsync_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor)) |
| .callAsync( |
| new AsyncClosingFunction2<TestCloseable, TestCloseable, Object>() { |
| @Override |
| public ClosingFuture<Object> apply( |
| DeferredCloser closer, TestCloseable v1, TestCloseable v2) throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| public void testWhenAllSucceed3_call() throws ExecutionException, IOException { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| ClosingFuture.from(immediateFuture("value2")), |
| ClosingFuture.from(immediateFuture("value3"))) |
| .call( |
| new ClosingFunction3<TestCloseable, String, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, TestCloseable v1, String v2, String v3) |
| throws Exception { |
| assertThat(v1).isEqualTo(closeable1); |
| assertThat(v2).isEqualTo("value2"); |
| assertThat(v3).isEqualTo("value3"); |
| assertStillOpen(closeable1); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable2; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testWhenAllSucceed3_call_failedInput() throws ExecutionException, IOException { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| failedClosingFuture(), |
| ClosingFuture.from(immediateFuture("value3"))) |
| .call( |
| new ClosingFunction3<TestCloseable, Object, String, Object>() { |
| @Override |
| public Object apply(DeferredCloser closer, TestCloseable v1, Object v2, String v3) |
| throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testWhenAllSucceed3_call_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.from(immediateFuture(closeable2)), |
| ClosingFuture.from(immediateFuture("value3"))) |
| .call( |
| waiter.waitFor( |
| new ClosingFunction3<TestCloseable, TestCloseable, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the function returns |
| assertStillOpen(closeable1, closeable2); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testWhenAllSucceed3_call_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), |
| ClosingFuture.from(immediateFuture("value3"))) |
| .call( |
| new ClosingFunction3<TestCloseable, TestCloseable, String, Object>() { |
| @Override |
| public Object apply( |
| DeferredCloser closer, TestCloseable v1, TestCloseable v2, String v3) |
| throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| public void testWhenAllSucceed4_call() throws ExecutionException, IOException { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| ClosingFuture.from(immediateFuture("value2")), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4"))) |
| .call( |
| new ClosingFunction4<TestCloseable, String, String, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, TestCloseable v1, String v2, String v3, String v4) |
| throws Exception { |
| assertThat(v1).isEqualTo(closeable1); |
| assertThat(v2).isEqualTo("value2"); |
| assertThat(v3).isEqualTo("value3"); |
| assertThat(v4).isEqualTo("value4"); |
| assertStillOpen(closeable1); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable2; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testWhenAllSucceed4_call_failedInput() throws ExecutionException, IOException { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| failedClosingFuture(), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4"))) |
| .call( |
| new ClosingFunction4<TestCloseable, Object, String, String, Object>() { |
| @Override |
| public Object apply( |
| DeferredCloser closer, TestCloseable v1, Object v2, String v3, String v4) |
| throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testWhenAllSucceed4_call_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.from(immediateFuture(closeable2)), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4"))) |
| .call( |
| waiter.waitFor( |
| new ClosingFunction4< |
| TestCloseable, TestCloseable, String, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, |
| TestCloseable v1, |
| TestCloseable v2, |
| String v3, |
| String v4) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the function returns |
| assertStillOpen(closeable1, closeable2); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testWhenAllSucceed4_call_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4"))) |
| .call( |
| new ClosingFunction4<TestCloseable, TestCloseable, String, String, Object>() { |
| @Override |
| public Object apply( |
| DeferredCloser closer, |
| TestCloseable v1, |
| TestCloseable v2, |
| String v3, |
| String v4) |
| throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| public void testWhenAllSucceed5_call() throws ExecutionException, IOException { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| ClosingFuture.from(immediateFuture("value2")), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4")), |
| ClosingFuture.from(immediateFuture("value5"))) |
| .call( |
| new ClosingFunction5< |
| TestCloseable, String, String, String, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, |
| TestCloseable v1, |
| String v2, |
| String v3, |
| String v4, |
| String v5) |
| throws Exception { |
| assertThat(v1).isEqualTo(closeable1); |
| assertThat(v2).isEqualTo("value2"); |
| assertThat(v3).isEqualTo("value3"); |
| assertThat(v4).isEqualTo("value4"); |
| assertThat(v5).isEqualTo("value5"); |
| assertStillOpen(closeable1); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable2; |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1, closeable2); |
| } |
| |
| public void testWhenAllSucceed5_call_failedInput() throws ExecutionException, IOException { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable1), closingExecutor), |
| failedClosingFuture(), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4")), |
| ClosingFuture.from(immediateFuture("value5"))) |
| .call( |
| new ClosingFunction5<TestCloseable, Object, String, String, String, Object>() { |
| @Override |
| public Object apply( |
| DeferredCloser closer, |
| TestCloseable v1, |
| Object v2, |
| String v3, |
| String v4, |
| String v5) |
| throws Exception { |
| expect.fail(); |
| throw new AssertionError(); |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertClosed(closeable1); |
| } |
| |
| public void testWhenAllSucceed5_call_cancelledPipeline() throws Exception { |
| ClosingFuture<TestCloseable> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.from(immediateFuture(closeable2)), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4")), |
| ClosingFuture.from(immediateFuture("value5"))) |
| .call( |
| waiter.waitFor( |
| new ClosingFunction5< |
| TestCloseable, TestCloseable, String, String, String, TestCloseable>() { |
| @Override |
| public TestCloseable apply( |
| DeferredCloser closer, |
| TestCloseable v1, |
| TestCloseable v2, |
| String v3, |
| String v4, |
| String v5) |
| throws Exception { |
| awaitUninterruptibly(futureCancelled); |
| closer.eventuallyClose(closeable1, closingExecutor); |
| closer.eventuallyClose(closeable2, closingExecutor); |
| return closeable3; |
| } |
| }), |
| executor); |
| waiter.awaitStarted(); |
| cancelFinalStepAndWait(closingFuture); |
| // not closed until the function returns |
| assertStillOpen(closeable1, closeable2); |
| waiter.awaitReturned(); |
| assertClosed(closeable1, closeable2); |
| assertStillOpen(closeable3); |
| } |
| |
| public void testWhenAllSucceed5_call_throws() throws Exception { |
| ClosingFuture<Object> closingFuture = |
| ClosingFuture.whenAllSucceed( |
| ClosingFuture.from(immediateFuture(closeable1)), |
| ClosingFuture.eventuallyClosing(immediateFuture(closeable2), closingExecutor), |
| ClosingFuture.from(immediateFuture("value3")), |
| ClosingFuture.from(immediateFuture("value4")), |
| ClosingFuture.from(immediateFuture("value5"))) |
| .call( |
| new ClosingFunction5< |
| TestCloseable, TestCloseable, String, String, String, Object>() { |
| @Override |
| public Object apply( |
| DeferredCloser closer, |
| TestCloseable v1, |
| TestCloseable v2, |
| String v3, |
| String v4, |
| String v5) |
| throws Exception { |
| closer.eventuallyClose(closeable3, closingExecutor); |
| throw exception; |
| } |
| }, |
| executor); |
| assertFinallyFailsWithException(closingFuture); |
| waitUntilClosed(closingFuture); |
| assertStillOpen(closeable1); |
| assertClosed(closeable2, closeable3); |
| } |
| |
| public void testTransform_preventsFurtherOperations() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| ClosingFuture<String> unused = |
| closingFuture.transform( |
| new ClosingFunction<String, String>() { |
| @Override |
| public String apply(DeferredCloser closer, String v) throws Exception { |
| return "value2"; |
| } |
| }, |
| executor); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| assertFinalStepThrowsIllegalStateException(closingFuture); |
| } |
| |
| public void testTransformAsync_preventsFurtherOperations() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| ClosingFuture<String> unused = |
| closingFuture.transformAsync( |
| new AsyncClosingFunction<String, String>() { |
| @Override |
| public ClosingFuture<String> apply(DeferredCloser closer, String v) throws Exception { |
| return ClosingFuture.from(immediateFuture("value2")); |
| } |
| }, |
| executor); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| assertFinalStepThrowsIllegalStateException(closingFuture); |
| } |
| |
| public void testCatching_preventsFurtherOperations() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| ClosingFuture<String> unused = |
| closingFuture.catching( |
| Exception.class, |
| new ClosingFunction<Exception, String>() { |
| @Override |
| public String apply(DeferredCloser closer, Exception x) throws Exception { |
| return "value2"; |
| } |
| }, |
| executor); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| assertFinalStepThrowsIllegalStateException(closingFuture); |
| } |
| |
| public void testCatchingAsync_preventsFurtherOperations() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| ClosingFuture<String> unused = |
| closingFuture.catchingAsync( |
| Exception.class, |
| ClosingFuture.withoutCloser( |
| new AsyncFunction<Exception, String>() { |
| @Override |
| public ListenableFuture<String> apply(Exception x) throws Exception { |
| return immediateFuture("value2"); |
| } |
| }), |
| executor); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| assertFinalStepThrowsIllegalStateException(closingFuture); |
| } |
| |
| public void testWhenAllComplete_preventsFurtherOperations() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| Combiner unused = ClosingFuture.whenAllComplete(asList(closingFuture)); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| assertFinalStepThrowsIllegalStateException(closingFuture); |
| } |
| |
| public void testWhenAllSucceed_preventsFurtherOperations() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| Combiner unused = ClosingFuture.whenAllSucceed(asList(closingFuture)); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| assertFinalStepThrowsIllegalStateException(closingFuture); |
| } |
| |
| protected final void assertDerivingThrowsIllegalStateException( |
| ClosingFuture<String> closingFuture) { |
| try { |
| closingFuture.transform( |
| new ClosingFunction<String, String>() { |
| @Override |
| public String apply(DeferredCloser closer3, String v1) throws Exception { |
| return "value3"; |
| } |
| }, |
| executor); |
| fail(); |
| } catch (IllegalStateException expected5) { |
| } |
| try { |
| closingFuture.transformAsync( |
| new AsyncClosingFunction<String, String>() { |
| @Override |
| public ClosingFuture<String> apply(DeferredCloser closer2, String v) throws Exception { |
| return ClosingFuture.from(immediateFuture("value3")); |
| } |
| }, |
| executor); |
| fail(); |
| } catch (IllegalStateException expected4) { |
| } |
| try { |
| closingFuture.catching( |
| Exception.class, |
| new ClosingFunction<Exception, String>() { |
| @Override |
| public String apply(DeferredCloser closer1, Exception x1) throws Exception { |
| return "value3"; |
| } |
| }, |
| executor); |
| fail(); |
| } catch (IllegalStateException expected3) { |
| } |
| try { |
| closingFuture.catchingAsync( |
| Exception.class, |
| new AsyncClosingFunction<Exception, String>() { |
| @Override |
| public ClosingFuture<String> apply(DeferredCloser closer, Exception x) |
| throws Exception { |
| return ClosingFuture.from(immediateFuture("value3")); |
| } |
| }, |
| executor); |
| fail(); |
| } catch (IllegalStateException expected2) { |
| } |
| try { |
| ClosingFuture.whenAllComplete(asList(closingFuture)); |
| fail(); |
| } catch (IllegalStateException expected1) { |
| } |
| try { |
| ClosingFuture.whenAllSucceed(asList(closingFuture)); |
| fail(); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| /** Asserts that marking this step a final step throws {@link IllegalStateException}. */ |
| protected void assertFinalStepThrowsIllegalStateException(ClosingFuture<?> closingFuture) { |
| try { |
| closingFuture.finishToFuture(); |
| fail(); |
| } catch (IllegalStateException expected) { |
| } |
| try { |
| closingFuture.finishToValueAndCloser(new NoOpValueAndCloserConsumer<>(), executor); |
| fail(); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| // Avoid infinite recursion if a closeable's close() method throws RejectedExecutionException and |
| // is closed using the direct executor. |
| public void testCloseThrowsRejectedExecutionException() throws Exception { |
| doThrow(new RejectedExecutionException()).when(mockCloseable).close(); |
| ClosingFuture<Closeable> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<Closeable>() { |
| @Override |
| public Closeable call(DeferredCloser closer) throws Exception { |
| return closer.eventuallyClose(mockCloseable, directExecutor()); |
| } |
| }, |
| executor); |
| assertThat(getFinalValue(closingFuture)).isEqualTo(mockCloseable); |
| waitUntilClosed(closingFuture); |
| verify(mockCloseable, timeout(1000)).close(); |
| } |
| |
| /** |
| * Marks the given step final, waits for it to be finished, and returns the value. |
| * |
| * @throws ExecutionException if the step failed |
| * @throws CancellationException if the step was cancelled |
| */ |
| abstract <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException; |
| |
| /** Marks the given step final, cancels it, and waits for the cancellation to happen. */ |
| abstract void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture); |
| |
| /** |
| * Marks the given step final and waits for it to fail. Expects the failure exception to match |
| * {@link ClosingFutureTest#exception}. |
| */ |
| abstract void assertFinallyFailsWithException(ClosingFuture<?> closingFuture); |
| |
| /** Waits for the given step to be canceled. */ |
| abstract void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException; |
| |
| /** Waits for the given step's closeables to be closed. */ |
| void waitUntilClosed(ClosingFuture<?> closingFuture) { |
| assertTrue(awaitUninterruptibly(closingFuture.whenClosedCountDown(), 1, SECONDS)); |
| } |
| |
| /** Tests for {@link ClosingFuture} that exercise {@link ClosingFuture#finishToFuture()}. */ |
| |
| public static class FinishToFutureTest extends ClosingFutureTest { |
| |
| public void testFinishToFuture_throwsIfCalledTwice() throws Exception { |
| ClosingFuture<Closeable> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<Closeable>() { |
| @Override |
| public Closeable call(DeferredCloser closer) throws Exception { |
| return closer.eventuallyClose(mockCloseable, executor); |
| } |
| }, |
| executor); |
| FluentFuture<Closeable> unused = closingFuture.finishToFuture(); |
| try { |
| FluentFuture<Closeable> unused2 = closingFuture.finishToFuture(); |
| fail("should have thrown"); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| public void testFinishToFuture_throwsAfterCallingFinishToValueAndCloser() throws Exception { |
| ClosingFuture<Closeable> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<Closeable>() { |
| @Override |
| public Closeable call(DeferredCloser closer) throws Exception { |
| return closer.eventuallyClose(mockCloseable, executor); |
| } |
| }, |
| executor); |
| closingFuture.finishToValueAndCloser(new NoOpValueAndCloserConsumer<>(), directExecutor()); |
| try { |
| FluentFuture<Closeable> unused = closingFuture.finishToFuture(); |
| fail("should have thrown"); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| public void testFinishToFuture_preventsFurtherDerivation() { |
| ClosingFuture<String> closingFuture = ClosingFuture.from(immediateFuture("value1")); |
| FluentFuture<String> unused = closingFuture.finishToFuture(); |
| assertDerivingThrowsIllegalStateException(closingFuture); |
| } |
| |
| @Override |
| <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException { |
| return getUninterruptibly(closingFuture.finishToFuture()); |
| } |
| |
| @Override |
| void assertFinallyFailsWithException(ClosingFuture<?> closingFuture) { |
| assertThatFutureFailsWithException(closingFuture.finishToFuture()); |
| } |
| |
| @Override |
| void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException { |
| assertThatFutureBecomesCancelled(closingFuture.finishToFuture()); |
| } |
| |
| @Override |
| void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture) { |
| assertThat(closingFuture.finishToFuture().cancel(false)).isTrue(); |
| waitUntilClosed(closingFuture); |
| futureCancelled.countDown(); |
| } |
| } |
| |
| /** |
| * Tests for {@link ClosingFuture} that exercise {@link |
| * ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)}. |
| */ |
| |
| public static class FinishToValueAndCloserTest extends ClosingFutureTest { |
| |
| private final ExecutorService finishToValueAndCloserExecutor = newSingleThreadExecutor(); |
| private volatile ValueAndCloser<?> valueAndCloser; |
| |
| @Override |
| protected void tearDown() throws Exception { |
| super.tearDown(); |
| assertWithMessage("finishToValueAndCloserExecutor was shut down") |
| .that(shutdownAndAwaitTermination(finishToValueAndCloserExecutor, 10, SECONDS)) |
| .isTrue(); |
| } |
| |
| public void testFinishToValueAndCloser_throwsIfCalledTwice() throws Exception { |
| ClosingFuture<Closeable> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<Closeable>() { |
| @Override |
| public Closeable call(DeferredCloser closer) throws Exception { |
| return closer.eventuallyClose(mockCloseable, executor); |
| } |
| }, |
| executor); |
| closingFuture.finishToValueAndCloser( |
| new NoOpValueAndCloserConsumer<>(), finishToValueAndCloserExecutor); |
| try { |
| closingFuture.finishToValueAndCloser( |
| new NoOpValueAndCloserConsumer<>(), finishToValueAndCloserExecutor); |
| fail("should have thrown"); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| public void testFinishToValueAndCloser_throwsAfterCallingFinishToFuture() throws Exception { |
| ClosingFuture<Closeable> closingFuture = |
| ClosingFuture.submit( |
| new ClosingCallable<Closeable>() { |
| @Override |
| public Closeable call(DeferredCloser closer) throws Exception { |
| return closer.eventuallyClose(mockCloseable, executor); |
| } |
| }, |
| executor); |
| FluentFuture<Closeable> unused = closingFuture.finishToFuture(); |
| try { |
| closingFuture.finishToValueAndCloser( |
| new NoOpValueAndCloserConsumer<>(), finishToValueAndCloserExecutor); |
| fail("should have thrown"); |
| } catch (IllegalStateException expected) { |
| } |
| } |
| |
| @Override |
| <T> T getFinalValue(ClosingFuture<T> closingFuture) throws ExecutionException { |
| return finishToValueAndCloser(closingFuture).get(); |
| } |
| |
| @Override |
| void assertFinallyFailsWithException(ClosingFuture<?> closingFuture) { |
| assertThatFutureFailsWithException(closingFuture.statusFuture()); |
| ValueAndCloser<?> valueAndCloser = finishToValueAndCloser(closingFuture); |
| try { |
| valueAndCloser.get(); |
| fail(); |
| } catch (ExecutionException expected) { |
| assertThat(expected).hasCauseThat().isSameInstanceAs(exception); |
| } |
| valueAndCloser.closeAsync(); |
| } |
| |
| @Override |
| void assertBecomesCanceled(ClosingFuture<?> closingFuture) throws ExecutionException { |
| assertThatFutureBecomesCancelled(closingFuture.statusFuture()); |
| } |
| |
| @Override |
| void waitUntilClosed(ClosingFuture<?> closingFuture) { |
| if (valueAndCloser != null) { |
| valueAndCloser.closeAsync(); |
| } |
| super.waitUntilClosed(closingFuture); |
| } |
| |
| @Override |
| void cancelFinalStepAndWait(ClosingFuture<TestCloseable> closingFuture) { |
| assertThat(closingFuture.cancel(false)).isTrue(); |
| ValueAndCloser<?> unused = finishToValueAndCloser(closingFuture); |
| waitUntilClosed(closingFuture); |
| futureCancelled.countDown(); |
| } |
| |
| private <V> ValueAndCloser<V> finishToValueAndCloser(ClosingFuture<V> closingFuture) { |
| final CountDownLatch valueAndCloserSet = new CountDownLatch(1); |
| closingFuture.finishToValueAndCloser( |
| new ValueAndCloserConsumer<V>() { |
| @Override |
| public void accept(ValueAndCloser<V> valueAndCloser) { |
| FinishToValueAndCloserTest.this.valueAndCloser = valueAndCloser; |
| valueAndCloserSet.countDown(); |
| } |
| }, |
| finishToValueAndCloserExecutor); |
| assertWithMessage("valueAndCloser was set") |
| .that(awaitUninterruptibly(valueAndCloserSet, 10, SECONDS)) |
| .isTrue(); |
| @SuppressWarnings("unchecked") |
| ValueAndCloser<V> valueAndCloserWithType = (ValueAndCloser<V>) valueAndCloser; |
| return valueAndCloserWithType; |
| } |
| } |
| |
| void assertThatFutureFailsWithException(Future<?> future) { |
| try { |
| getUninterruptibly(future); |
| fail("Expected future to fail: " + future); |
| } catch (ExecutionException e) { |
| assertThat(e).hasCauseThat().isSameInstanceAs(exception); |
| } |
| } |
| |
| private static void assertThatFutureBecomesCancelled(Future<?> future) throws ExecutionException { |
| try { |
| getUninterruptibly(future); |
| fail("Expected future to be canceled: " + future); |
| } catch (CancellationException expected) { |
| } |
| } |
| |
| private static void assertStillOpen(TestCloseable closeable1, TestCloseable... moreCloseables) |
| throws IOException { |
| for (TestCloseable closeable : asList(closeable1, moreCloseables)) { |
| assertWithMessage("%s.stillOpen()", closeable).that(closeable.stillOpen()).isTrue(); |
| } |
| } |
| |
| static void assertClosed(TestCloseable closeable1, TestCloseable... moreCloseables) |
| throws IOException { |
| for (TestCloseable closeable : asList(closeable1, moreCloseables)) { |
| assertWithMessage("%s.isClosed()", closeable).that(closeable.awaitClosed()).isTrue(); |
| } |
| } |
| |
| private ClosingFuture<Object> failedClosingFuture() { |
| return ClosingFuture.from(immediateFailedFuture(exception)); |
| } |
| |
| private void assertNoExpectedFailures() { |
| assertWithMessage("executor was shut down") |
| .that(shutdownAndAwaitTermination(executor, 10, SECONDS)) |
| .isTrue(); |
| assertWithMessage("closingExecutor was shut down") |
| .that(shutdownAndAwaitTermination(closingExecutor, 10, SECONDS)) |
| .isTrue(); |
| if (!failures.isEmpty()) { |
| StringWriter message = new StringWriter(); |
| PrintWriter writer = new PrintWriter(message); |
| writer.println("Expected no failures, but found:"); |
| for (AssertionError failure : failures) { |
| failure.printStackTrace(writer); |
| } |
| failures.clear(); |
| assertWithMessage(message.toString()).fail(); |
| } |
| } |
| |
| static final class TestCloseable implements Closeable { |
| private final CountDownLatch latch = new CountDownLatch(1); |
| private final String name; |
| |
| TestCloseable(String name) { |
| this.name = name; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| latch.countDown(); |
| } |
| |
| boolean awaitClosed() { |
| return awaitUninterruptibly(latch, 10, SECONDS); |
| } |
| |
| boolean stillOpen() { |
| return !awaitUninterruptibly(latch, 1, SECONDS); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| static final class Waiter { |
| private final CountDownLatch started = new CountDownLatch(1); |
| private final CountDownLatch canReturn = new CountDownLatch(1); |
| private final CountDownLatch returned = new CountDownLatch(1); |
| private Object proxy; |
| |
| <V> Callable<V> waitFor(Callable<V> callable) { |
| return waitFor(callable, Callable.class); |
| } |
| |
| <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) { |
| return waitFor(closingCallable, ClosingCallable.class); |
| } |
| |
| <T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) { |
| return waitFor(closingFunction, ClosingFunction.class); |
| } |
| |
| <T, U> AsyncClosingFunction<T, U> waitFor(AsyncClosingFunction<T, U> asyncClosingFunction) { |
| return waitFor(asyncClosingFunction, AsyncClosingFunction.class); |
| } |
| |
| <V> CombiningCallable<V> waitFor(CombiningCallable<V> combiningCallable) { |
| return waitFor(combiningCallable, CombiningCallable.class); |
| } |
| |
| <V> AsyncCombiningCallable<V> waitFor(AsyncCombiningCallable<V> asyncCombiningCallable) { |
| return waitFor(asyncCombiningCallable, AsyncCombiningCallable.class); |
| } |
| |
| <V1, V2, U> ClosingFunction2<V1, V2, U> waitFor(ClosingFunction2<V1, V2, U> closingFunction2) { |
| return waitFor(closingFunction2, ClosingFunction2.class); |
| } |
| |
| <V1, V2, U> AsyncClosingFunction2<V1, V2, U> waitFor( |
| AsyncClosingFunction2<V1, V2, U> asyncClosingFunction2) { |
| return waitFor(asyncClosingFunction2, AsyncClosingFunction2.class); |
| } |
| |
| <V1, V2, V3, U> ClosingFunction3<V1, V2, V3, U> waitFor( |
| ClosingFunction3<V1, V2, V3, U> closingFunction3) { |
| return waitFor(closingFunction3, ClosingFunction3.class); |
| } |
| |
| <V1, V2, V3, V4, U> ClosingFunction4<V1, V2, V3, V4, U> waitFor( |
| ClosingFunction4<V1, V2, V3, V4, U> closingFunction4) { |
| return waitFor(closingFunction4, ClosingFunction4.class); |
| } |
| |
| <V1, V2, V3, V4, V5, U> ClosingFunction5<V1, V2, V3, V4, V5, U> waitFor( |
| ClosingFunction5<V1, V2, V3, V4, V5, U> closingFunction5) { |
| return waitFor(closingFunction5, ClosingFunction5.class); |
| } |
| |
| <T> T waitFor(final T delegate, final Class<T> type) { |
| checkState(proxy == null); |
| T proxyObject = |
| Reflection.newProxy( |
| type, |
| new InvocationHandler() { |
| @Override |
| public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
| if (!method.getDeclaringClass().equals(type)) { |
| return method.invoke(delegate, args); |
| } |
| checkState(started.getCount() == 1); |
| started.countDown(); |
| try { |
| return method.invoke(delegate, args); |
| } catch (InvocationTargetException e) { |
| throw e.getCause(); |
| } finally { |
| awaitUninterruptibly(canReturn); |
| returned.countDown(); |
| } |
| } |
| }); |
| this.proxy = proxyObject; |
| return proxyObject; |
| } |
| |
| void awaitStarted() { |
| assertTrue(awaitUninterruptibly(started, 10, SECONDS)); |
| } |
| |
| void awaitReturned() { |
| canReturn.countDown(); |
| assertTrue(awaitUninterruptibly(returned, 10, SECONDS)); |
| } |
| } |
| |
| private static final class NoOpValueAndCloserConsumer<V> implements ValueAndCloserConsumer<V> { |
| @Override |
| public void accept(ValueAndCloser<V> valueAndCloser) {} |
| } |
| } |