| /* |
| * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| package kotlinx.coroutines.guava |
| |
| import com.google.common.util.concurrent.* |
| import kotlinx.coroutines.* |
| import org.junit.* |
| import org.junit.Ignore |
| import org.junit.Test |
| import java.util.concurrent.* |
| import java.util.concurrent.CancellationException |
| import java.util.concurrent.atomic.* |
| import kotlin.test.* |
| |
| class ListenableFutureTest : TestBase() { |
| @Before |
| fun setup() { |
| ignoreLostThreads("ForkJoinPool.commonPool-worker-") |
| } |
| |
| @Test |
| fun testSimpleAwait() { |
| val service = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = GlobalScope.future { |
| service.submit(Callable<String> { |
| "O" |
| }).await() + "K" |
| } |
| assertEquals("OK", future.get()) |
| } |
| |
| @Test |
| fun testAwaitWithContext() = runTest { |
| val future = SettableFuture.create<Int>() |
| val deferred = async { |
| withContext(Dispatchers.Default) { |
| future.await() |
| } |
| } |
| |
| future.set(1) |
| assertEquals(1, deferred.await()) |
| } |
| |
| @Test |
| fun testAwaitWithCancellation() = runTest(expected = {it is TestCancellationException}) { |
| val future = SettableFuture.create<Int>() |
| val deferred = async { |
| withContext(Dispatchers.Default) { |
| future.await() |
| } |
| } |
| |
| deferred.cancel(TestCancellationException()) |
| deferred.await() // throws TCE |
| expectUnreached() |
| } |
| |
| @Test |
| fun testCompletedFuture() { |
| val toAwait = SettableFuture.create<String>() |
| toAwait.set("O") |
| val future = GlobalScope.future { |
| toAwait.await() + "K" |
| } |
| assertEquals("OK", future.get()) |
| } |
| |
| @Test |
| fun testWaitForFuture() { |
| val toAwait = SettableFuture.create<String>() |
| val future = GlobalScope.future { |
| toAwait.await() + "K" |
| } |
| assertFalse(future.isDone) |
| toAwait.set("O") |
| assertEquals("OK", future.get()) |
| } |
| |
| @Test |
| fun testCompletedFutureExceptionally() { |
| val toAwait = SettableFuture.create<String>() |
| toAwait.setException(IllegalArgumentException("O")) |
| val future = GlobalScope.future { |
| try { |
| toAwait.await() |
| } catch (e: RuntimeException) { |
| assertTrue(e is IllegalArgumentException) |
| e.message!! |
| } + "K" |
| } |
| assertEquals("OK", future.get()) |
| } |
| |
| @Test |
| fun testWaitForFutureWithException() { |
| val toAwait = SettableFuture.create<String>() |
| val future = GlobalScope.future { |
| try { |
| toAwait.await() |
| } catch (e: RuntimeException) { |
| assertTrue(e is IllegalArgumentException) |
| e.message!! |
| } + "K" |
| } |
| assertFalse(future.isDone) |
| toAwait.setException(IllegalArgumentException("O")) |
| assertEquals("OK", future.get()) |
| } |
| |
| @Test |
| fun testExceptionInsideCoroutine() { |
| val service = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = GlobalScope.future { |
| if (service.submit(Callable<Boolean> { true }).await()) { |
| throw IllegalStateException("OK") |
| } |
| "fail" |
| } |
| try { |
| future.get() |
| fail("'get' should've throw an exception") |
| } catch (e: ExecutionException) { |
| assertTrue(e.cause is IllegalStateException) |
| assertEquals("OK", e.cause!!.message) |
| } |
| } |
| |
| @Test |
| fun testFutureLazyStartThrows() { |
| expect(1) |
| val e = assertFailsWith<IllegalArgumentException> { |
| GlobalScope.future(start = CoroutineStart.LAZY) {} |
| } |
| |
| assertEquals("LAZY start is not supported", e.message) |
| finish(2) |
| } |
| |
| @Test |
| fun testCompletedDeferredAsListenableFuture() = runBlocking { |
| expect(1) |
| val deferred = async(start = CoroutineStart.UNDISPATCHED) { |
| expect(2) // completed right away |
| "OK" |
| } |
| expect(3) |
| val future = deferred.asListenableFuture() |
| assertEquals("OK", future.await()) |
| finish(4) |
| } |
| |
| @Test |
| fun testWaitForDeferredAsListenableFuture() = runBlocking { |
| expect(1) |
| val deferred = async { |
| expect(3) // will complete later |
| "OK" |
| } |
| expect(2) |
| val future = deferred.asListenableFuture() |
| assertEquals("OK", future.await()) // await yields main thread to deferred coroutine |
| finish(4) |
| } |
| |
| @Test |
| fun testAsListenableFutureThrowable() { |
| val deferred = GlobalScope.async { |
| throw OutOfMemoryError() |
| } |
| |
| val future = deferred.asListenableFuture() |
| try { |
| future.get() |
| } catch (e: ExecutionException) { |
| assertTrue(future.isDone) |
| assertTrue(e.cause is OutOfMemoryError) |
| } |
| } |
| |
| @Test |
| fun testCancellableAwait() = runBlocking { |
| expect(1) |
| val toAwait = SettableFuture.create<String>() |
| val job = launch(start = CoroutineStart.UNDISPATCHED) { |
| expect(2) |
| try { |
| toAwait.await() // suspends |
| } catch (e: CancellationException) { |
| expect(5) // should throw cancellation exception |
| throw e |
| } |
| } |
| expect(3) |
| job.cancel() // cancel the job |
| toAwait.set("fail") // too late, the waiting job was already cancelled |
| expect(4) // job processing of cancellation was scheduled, not executed yet |
| yield() // yield main thread to job |
| finish(6) |
| } |
| |
| @Test |
| fun testFutureAwaitCancellationPropagatingToDeferred() = runTest { |
| |
| val latch = CountDownLatch(1) |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable { latch.await(); 42 }) |
| val deferred = async { |
| expect(2) |
| future.await() |
| } |
| expect(1) |
| yield() |
| future.cancel(/*mayInterruptIfRunning=*/true) |
| expect(3) |
| latch.countDown() |
| deferred.join() |
| assertTrue(future.isCancelled) |
| assertTrue(deferred.isCancelled) |
| assertFailsWith<CancellationException> { future.get() } |
| finish(4) |
| } |
| |
| @Test |
| fun testFutureAwaitCancellationPropagatingToDeferredNoInterruption() = runTest { |
| |
| val latch = CountDownLatch(1) |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable { latch.await(); 42 }) |
| val deferred = async { |
| expect(2) |
| future.await() |
| } |
| expect(1) |
| yield() |
| future.cancel(/*mayInterruptIfRunning=*/false) |
| expect(3) |
| latch.countDown() |
| deferred.join() |
| assertTrue(future.isCancelled) |
| assertTrue(deferred.isCancelled) |
| assertFailsWith<CancellationException> { future.get() } |
| finish(4) |
| } |
| |
| @Test |
| fun testAsListenableFutureCancellationPropagatingToDeferred() = runTest { |
| val latch = CountDownLatch(1) |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable { latch.await(); 42 }) |
| val deferred = async { |
| expect(2) |
| future.await() |
| } |
| val asListenableFuture = deferred.asListenableFuture() |
| expect(1) |
| yield() |
| asListenableFuture.cancel(/*mayInterruptIfRunning=*/true) |
| expect(3) |
| latch.countDown() |
| deferred.join() |
| assertTrue(future.isCancelled) |
| assertTrue(deferred.isCancelled) |
| assertTrue(asListenableFuture.isCancelled) |
| assertFailsWith<CancellationException> { future.get() } |
| finish(4) |
| } |
| |
| @Test |
| fun testAsListenableFutureCancellationPropagatingToDeferredNoInterruption() = runTest { |
| val latch = CountDownLatch(1) |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable { latch.await(); 42 }) |
| val deferred = async { |
| expect(2) |
| future.await() |
| } |
| val asListenableFuture = deferred.asListenableFuture() |
| expect(1) |
| yield() |
| asListenableFuture.cancel(/*mayInterruptIfRunning=*/false) |
| expect(3) |
| latch.countDown() |
| deferred.join() |
| assertFailsWith<CancellationException> { asListenableFuture.get() } |
| assertTrue(future.isCancelled) |
| assertTrue(asListenableFuture.isCancelled) |
| assertTrue(deferred.isCancelled) |
| assertFailsWith<CancellationException> { future.get() } |
| finish(4) |
| } |
| |
| @Test |
| fun testAsListenableFutureCancellationThroughSetFuture() = runTest { |
| val latch = CountDownLatch(1) |
| val future = SettableFuture.create<Void>() |
| val deferred = async { |
| expect(2) |
| future.await() |
| } |
| val asListenableFuture = deferred.asListenableFuture() |
| expect(1) |
| yield() |
| future.setFuture(Futures.immediateCancelledFuture()) |
| expect(3) |
| latch.countDown() |
| deferred.join() |
| assertFailsWith<CancellationException> { asListenableFuture.get() } |
| // Future was not interrupted, but also wasn't blocking, so it will be successfully |
| // cancelled by its parent Coroutine. |
| assertTrue(future.isCancelled) |
| assertTrue(asListenableFuture.isCancelled) |
| assertTrue(deferred.isCancelled) |
| assertFailsWith<CancellationException> { future.get() } |
| finish(4) |
| } |
| |
| @Test |
| @Ignore // TODO: propagate cancellation before running listeners. |
| fun testAsListenableFuturePropagatesCancellationBeforeRunningListeners() = runTest { |
| expect(1) |
| val deferred = async(context = Dispatchers.Unconfined) { |
| try { |
| delay(Long.MAX_VALUE) |
| } finally { |
| expect(3) // Cancelled. |
| } |
| } |
| val asFuture = deferred.asListenableFuture() |
| asFuture.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor()) |
| assertFalse(asFuture.isDone) |
| expect(2) |
| asFuture.cancel(false) |
| assertTrue(asFuture.isDone) |
| assertTrue(asFuture.isCancelled) |
| assertFailsWith<CancellationException> { deferred.await() } |
| finish(5) |
| } |
| |
| @Test |
| fun testFutureCancellation() = runTest { |
| val future = awaitFutureWithCancel(true) |
| assertTrue(future.isCancelled) |
| assertFailsWith<CancellationException> { future.get() } |
| finish(4) |
| } |
| |
| @Test |
| fun testAsListenableDeferredCancellationCauseAndMessagePropagate() = runTest { |
| val deferred = CompletableDeferred<Int>() |
| val inputCancellationException = CancellationException("Foobar") |
| inputCancellationException.initCause(OutOfMemoryError("Foobaz")) |
| deferred.cancel(inputCancellationException) |
| val asFuture = deferred.asListenableFuture() |
| |
| val outputCancellationException = |
| assertFailsWith<CancellationException> { asFuture.get() } |
| val cause = outputCancellationException.cause |
| assertNotNull(cause) |
| assertEquals(cause.message, "Foobar") |
| assertTrue(cause.cause is OutOfMemoryError) |
| assertEquals(cause.cause?.message, "Foobaz") |
| } |
| |
| @Test |
| fun testNoFutureCancellation() = runTest { |
| val future = awaitFutureWithCancel(false) |
| assertFalse(future.isCancelled) |
| @Suppress("BlockingMethodInNonBlockingContext") |
| assertEquals(42, future.get()) |
| finish(4) |
| } |
| |
| @Test |
| fun testCancelledDeferredAsListenableFutureAwaitThrowsCancellation() = runTest { |
| val future = Futures.immediateCancelledFuture<Int>() |
| val asDeferred = future.asDeferred() |
| val asDeferredAsFuture = asDeferred.asListenableFuture() |
| |
| assertTrue(asDeferredAsFuture.isCancelled) |
| assertFailsWith<CancellationException> { |
| asDeferredAsFuture.await() |
| } |
| } |
| |
| @Test |
| fun testCancelledDeferredAsListenableFutureAsDeferredPassesCancellationAlong() = runTest { |
| val deferred = CompletableDeferred<Int>() |
| deferred.completeExceptionally(CancellationException()) |
| val asFuture = deferred.asListenableFuture() |
| val asFutureAsDeferred = asFuture.asDeferred() |
| |
| assertTrue(asFutureAsDeferred.isCancelled) |
| assertTrue(asFutureAsDeferred.isCompleted) |
| // By documentation, join() shouldn't throw when asDeferred is already complete. |
| asFutureAsDeferred.join() |
| assertTrue(asFutureAsDeferred.getCompletionExceptionOrNull() is CancellationException) |
| } |
| |
| @Test |
| fun testCancelledFutureAsDeferredAwaitThrowsCancellation() = runTest { |
| val future = Futures.immediateCancelledFuture<Int>() |
| val asDeferred = future.asDeferred() |
| |
| assertTrue(asDeferred.isCancelled) |
| assertFailsWith<CancellationException> { |
| asDeferred.await() |
| } |
| } |
| |
| @Test |
| fun testCancelledFutureAsDeferredJoinDoesNotThrow() = runTest { |
| val future = Futures.immediateCancelledFuture<Int>() |
| val asDeferred = future.asDeferred() |
| |
| assertTrue(asDeferred.isCancelled) |
| assertTrue(asDeferred.isCompleted) |
| // By documentation, join() shouldn't throw when asDeferred is already complete. |
| asDeferred.join() |
| assertTrue(asDeferred.getCompletionExceptionOrNull() is CancellationException) |
| } |
| |
| @Test |
| fun testCompletedFutureAsDeferred() = runTest { |
| val future = SettableFuture.create<Int>() |
| val task = async { |
| expect(2) |
| assertEquals(42, future.asDeferred().await()) |
| expect(4) |
| } |
| |
| expect(1) |
| yield() |
| expect(3) |
| future.set(42) |
| task.join() |
| finish(5) |
| } |
| |
| @Test |
| fun testFailedFutureAsDeferred() = runTest { |
| val future = SettableFuture.create<Int>().apply { |
| setException(TestException()) |
| } |
| val deferred = future.asDeferred() |
| assertTrue(deferred.isCancelled && deferred.isCompleted) |
| val completionException = deferred.getCompletionExceptionOrNull()!! |
| assertTrue(completionException is TestException) |
| |
| try { |
| deferred.await() |
| expectUnreached() |
| } catch (e: Throwable) { |
| assertTrue(e is TestException) |
| } |
| } |
| |
| @Test |
| fun testFutureCompletedWithNullFastPathAsDeferred() = runTest { |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable<Int> { null }).also { |
| @Suppress("BlockingMethodInNonBlockingContext") |
| it.get() |
| } |
| assertNull(future.asDeferred().await()) |
| } |
| |
| @Test |
| fun testFutureCompletedWithNullSlowPathAsDeferred() = runTest { |
| val latch = CountDownLatch(1) |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| |
| val future = executor.submit(Callable<Int> { |
| latch.await() |
| null |
| }) |
| |
| val awaiter = async(start = CoroutineStart.UNDISPATCHED) { |
| future.asDeferred().await() |
| } |
| |
| latch.countDown() |
| assertNull(awaiter.await()) |
| } |
| |
| @Test |
| fun testThrowingFutureAsDeferred() = runTest { |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable { throw TestException() }) |
| try { |
| future.asDeferred().await() |
| expectUnreached() |
| } catch (e: Throwable) { |
| assertTrue(e is TestException) |
| } |
| } |
| |
| @Test |
| fun testStructuredException() = runTest( |
| expected = { it is TestException } // exception propagates to parent with structured concurrency |
| ) { |
| val result = future<Int>(Dispatchers.Unconfined) { |
| throw TestException("FAIL") |
| } |
| result.checkFutureException<TestException>() |
| } |
| |
| @Test |
| fun testChildException() = runTest( |
| expected = { it is TestException } // exception propagates to parent with structured concurrency |
| ) { |
| val result = future(Dispatchers.Unconfined) { |
| // child crashes |
| launch { throw TestException("FAIL") } |
| 42 |
| } |
| result.checkFutureException<TestException>() |
| } |
| |
| @Test |
| fun testExternalCancellation() = runTest { |
| val future = future(Dispatchers.Unconfined) { |
| try { |
| delay(Long.MAX_VALUE) |
| expectUnreached() |
| } catch (e: CancellationException) { |
| expect(2) |
| throw e |
| } |
| } |
| |
| yield() |
| expect(1) |
| future.cancel(true) |
| finish(3) |
| } |
| |
| @Test |
| fun testExceptionOnExternalCancellation() = runTest(expected = {it is TestException}) { |
| val result = future(Dispatchers.Unconfined) { |
| try { |
| expect(1) |
| delay(Long.MAX_VALUE) |
| expectUnreached() |
| } catch (e: CancellationException) { |
| expect(3) |
| throw TestException() |
| } |
| } |
| expect(2) |
| result.cancel(true) |
| finish(4) |
| } |
| |
| @Test |
| fun testUnhandledExceptionOnExternalCancellation() = runTest { |
| expect(1) |
| // No parent here (NonCancellable), so nowhere to propagate exception |
| val result = future(NonCancellable + Dispatchers.Unconfined) { |
| try { |
| delay(Long.MAX_VALUE) |
| } finally { |
| expect(2) |
| throw TestException() // this exception cannot be handled and is set to be lost. |
| } |
| } |
| result.cancel(true) |
| finish(3) |
| } |
| |
| /** This test ensures that we never pass [CancellationException] to [CoroutineExceptionHandler]. */ |
| @Test |
| fun testCancellationExceptionOnExternalCancellation() = runTest { |
| expect(1) |
| // No parent here (NonCancellable), so nowhere to propagate exception |
| val result = future(NonCancellable + Dispatchers.Unconfined) { |
| try { |
| delay(Long.MAX_VALUE) |
| } finally { |
| expect(2) |
| throw TestCancellationException() // this exception cannot be handled |
| } |
| } |
| assertTrue(result.cancel(true)) |
| finish(3) |
| } |
| |
| @Test |
| fun testCancellingFutureContextJobCancelsFuture() = runTest { |
| expect(1) |
| val supervisorJob = SupervisorJob() |
| val future = future(context = supervisorJob) { |
| expect(2) |
| try { |
| delay(Long.MAX_VALUE) |
| expectUnreached() |
| } catch (e: CancellationException) { |
| expect(4) |
| throw e |
| } |
| } |
| yield() |
| expect(3) |
| supervisorJob.cancel(CancellationException("Parent cancelled", TestException())) |
| supervisorJob.join() |
| assertTrue(future.isDone) |
| assertTrue(future.isCancelled) |
| val thrown = assertFailsWith<CancellationException> { future.get() } |
| val cause = thrown.cause |
| assertNotNull(cause) |
| assertTrue(cause is CancellationException) |
| assertEquals("Parent cancelled", cause.message) |
| assertTrue(cause.cause is TestException) |
| finish(5) |
| } |
| |
| @Test |
| fun testFutureChildException() = runTest { |
| val future = future(context = NonCancellable + Dispatchers.Unconfined) { |
| val foo = async { delay(Long.MAX_VALUE); 42 } |
| val bar = async<Int> { throw TestException() } |
| foo.await() + bar.await() |
| } |
| future.checkFutureException<TestException>() |
| } |
| |
| @Test |
| fun testFutureIsDoneAfterChildrenCompleted() = runTest { |
| expect(1) |
| val testException = TestException() |
| // Don't propagate exception to the test and use different dispatchers as we are going to block test thread. |
| val future = future(context = NonCancellable + Dispatchers.Default) { |
| val foo = async { |
| try { |
| delay(Long.MAX_VALUE) |
| 42 |
| } finally { |
| withContext(NonCancellable) { |
| delay(200) |
| } |
| } |
| } |
| foo.invokeOnCompletion { |
| expect(3) |
| } |
| val bar = async<Int> { throw testException } |
| foo.await() + bar.await() |
| } |
| yield() |
| expect(2) |
| // Blocking get should succeed after internal coroutine completes. |
| val thrown = assertFailsWith<ExecutionException> { future.get() } |
| expect(4) |
| assertEquals(testException, thrown.cause) |
| finish(5) |
| } |
| |
| @Test |
| @Ignore // TODO: propagate cancellation before running listeners. |
| fun testFuturePropagatesCancellationBeforeRunningListeners() = runTest { |
| expect(1) |
| val future = future(context = Dispatchers.Unconfined) { |
| try { |
| delay(Long.MAX_VALUE) |
| } finally { |
| expect(3) // Cancelled. |
| } |
| } |
| future.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor()) |
| assertFalse(future.isDone) |
| expect(2) |
| future.cancel(false) |
| assertTrue(future.isDone) |
| assertTrue(future.isCancelled) |
| finish(5) |
| } |
| |
| @Test |
| fun testFutureCompletedExceptionally() = runTest { |
| val testException = TestException() |
| // NonCancellable to not propagate error to this scope. |
| val future = future(context = NonCancellable) { |
| throw testException |
| } |
| yield() |
| assertTrue(future.isDone) |
| assertFalse(future.isCancelled) |
| val thrown = assertFailsWith<ExecutionException> { future.get() } |
| assertEquals(testException, thrown.cause) |
| } |
| |
| @Test |
| fun testAsListenableFutureCompletedExceptionally() = runTest { |
| val testException = TestException() |
| val deferred = CompletableDeferred<String>().apply { |
| completeExceptionally(testException) |
| } |
| val asListenableFuture = deferred.asListenableFuture() |
| assertTrue(asListenableFuture.isDone) |
| assertFalse(asListenableFuture.isCancelled) |
| val thrown = assertFailsWith<ExecutionException> { asListenableFuture.get() } |
| assertEquals(testException, thrown.cause) |
| } |
| |
| private inline fun <reified T: Throwable> ListenableFuture<*>.checkFutureException() { |
| val e = assertFailsWith<ExecutionException> { get() } |
| val cause = e.cause!! |
| assertTrue(cause is T) |
| } |
| |
| @Suppress("SuspendFunctionOnCoroutineScope") |
| private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture<Int> { |
| val latch = CountDownLatch(1) |
| val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) |
| val future = executor.submit(Callable { latch.await(); 42 }) |
| val deferred = async { |
| expect(2) |
| if (cancellable) future.await() |
| else future.asDeferred().await() |
| } |
| expect(1) |
| yield() |
| deferred.cancel() |
| expect(3) |
| latch.countDown() |
| return future |
| } |
| |
| @Test |
| fun testCancelledParent() = runTest({ it is CancellationException }) { |
| cancel() |
| future { expectUnreached() } |
| future(start = CoroutineStart.ATOMIC) { } |
| future(start = CoroutineStart.UNDISPATCHED) { } |
| } |
| |
| @Test |
| fun testStackOverflow() = runTest { |
| val future = SettableFuture.create<Int>() |
| val completed = AtomicLong() |
| val count = 10000L |
| val children = ArrayList<Job>() |
| for (i in 0 until count) { |
| children += launch(Dispatchers.Default) { |
| future.asDeferred().await() |
| completed.incrementAndGet() |
| } |
| } |
| future.set(1) |
| withTimeout(60_000) { |
| children.forEach { it.join() } |
| assertEquals(count, completed.get()) |
| } |
| } |
| |
| @Test |
| fun testFuturePropagatesExceptionToParentAfterCancellation() = runTest { |
| val throwLatch = CompletableDeferred<Boolean>() |
| val cancelLatch = CompletableDeferred<Boolean>() |
| val parent = Job() |
| val scope = CoroutineScope(parent) |
| val exception = TestException("propagated to parent") |
| val future = scope.future { |
| cancelLatch.complete(true) |
| withContext(NonCancellable) { |
| throwLatch.await() |
| throw exception |
| } |
| } |
| cancelLatch.await() |
| future.cancel(true) |
| throwLatch.complete(true) |
| parent.join() |
| assertTrue(parent.isCancelled) |
| assertEquals(exception, parent.getCancellationException().cause) |
| } |
| |
| // Stress tests. |
| |
| @Test |
| fun testFutureDoesNotReportToCoroutineExceptionHandler() = runTest { |
| repeat(1000) { |
| supervisorScope { // Don't propagate failures in children to parent and other children. |
| val innerFuture = SettableFuture.create<Unit>() |
| val outerFuture = async { innerFuture.await() } |
| |
| withContext(Dispatchers.Default) { |
| launch { innerFuture.setException(TestException("can be lost")) } |
| launch { outerFuture.cancel() } |
| // nothing should be reported to CoroutineExceptionHandler, otherwise `Future.cancel` contract violation. |
| } |
| } |
| } |
| } |
| |
| @Test |
| fun testJobListenableFutureIsCancelledDoesNotThrow() = runTest { |
| repeat(1000) { |
| val deferred = CompletableDeferred<String>() |
| val asListenableFuture = deferred.asListenableFuture() |
| // We heed two threads to test a race condition. |
| withContext(Dispatchers.Default) { |
| val cancellationJob = launch { |
| asListenableFuture.cancel(false) |
| } |
| while (!cancellationJob.isCompleted) { |
| asListenableFuture.isCancelled // Shouldn't throw. |
| } |
| } |
| } |
| } |
| } |