blob: 511b1b0322a3355cbb0b6baca44fcd91080e4e47 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.guava
import com.google.common.util.concurrent.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.*
import kotlin.test.*
class ListenableFutureTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("ForkJoinPool.commonPool-worker-")
}
@Test
fun testSimpleAwait() {
val service = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = GlobalScope.future {
service.submit(Callable<String> {
"O"
}).await() + "K"
}
assertEquals("OK", future.get())
}
@Test
fun testAwaitWithContext() = runTest {
val future = SettableFuture.create<Int>()
val deferred = async {
withContext(Dispatchers.Default) {
future.await()
}
}
future.set(1)
assertEquals(1, deferred.await())
}
@Test
fun testAwaitWithCancellation() = runTest(expected = {it is TestCancellationException}) {
val future = SettableFuture.create<Int>()
val deferred = async {
withContext(Dispatchers.Default) {
future.await()
}
}
deferred.cancel(TestCancellationException())
deferred.await() // throws TCE
expectUnreached()
}
@Test
fun testCompletedFuture() {
val toAwait = SettableFuture.create<String>()
toAwait.set("O")
val future = GlobalScope.future {
toAwait.await() + "K"
}
assertEquals("OK", future.get())
}
@Test
fun testWaitForFuture() {
val toAwait = SettableFuture.create<String>()
val future = GlobalScope.future {
toAwait.await() + "K"
}
assertFalse(future.isDone)
toAwait.set("O")
assertEquals("OK", future.get())
}
@Test
fun testCompletedFutureExceptionally() {
val toAwait = SettableFuture.create<String>()
toAwait.setException(IllegalArgumentException("O"))
val future = GlobalScope.future {
try {
toAwait.await()
} catch (e: RuntimeException) {
assertTrue(e is IllegalArgumentException)
e.message!!
} + "K"
}
assertEquals("OK", future.get())
}
@Test
fun testWaitForFutureWithException() {
val toAwait = SettableFuture.create<String>()
val future = GlobalScope.future {
try {
toAwait.await()
} catch (e: RuntimeException) {
assertTrue(e is IllegalArgumentException)
e.message!!
} + "K"
}
assertFalse(future.isDone)
toAwait.setException(IllegalArgumentException("O"))
assertEquals("OK", future.get())
}
@Test
fun testExceptionInsideCoroutine() {
val service = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = GlobalScope.future {
if (service.submit(Callable<Boolean> { true }).await()) {
throw IllegalStateException("OK")
}
"fail"
}
try {
future.get()
fail("'get' should've throw an exception")
} catch (e: ExecutionException) {
assertTrue(e.cause is IllegalStateException)
assertEquals("OK", e.cause!!.message)
}
}
@Test
fun testFutureLazyStartThrows() {
expect(1)
val e = assertFailsWith<IllegalArgumentException> {
GlobalScope.future(start = CoroutineStart.LAZY) {}
}
assertEquals("LAZY start is not supported", e.message)
finish(2)
}
@Test
fun testCompletedDeferredAsListenableFuture() = runBlocking {
expect(1)
val deferred = async(start = CoroutineStart.UNDISPATCHED) {
expect(2) // completed right away
"OK"
}
expect(3)
val future = deferred.asListenableFuture()
assertEquals("OK", future.await())
finish(4)
}
@Test
fun testWaitForDeferredAsListenableFuture() = runBlocking {
expect(1)
val deferred = async {
expect(3) // will complete later
"OK"
}
expect(2)
val future = deferred.asListenableFuture()
assertEquals("OK", future.await()) // await yields main thread to deferred coroutine
finish(4)
}
@Test
fun testAsListenableFutureThrowable() {
val deferred = GlobalScope.async {
throw OutOfMemoryError()
}
val future = deferred.asListenableFuture()
try {
future.get()
} catch (e: ExecutionException) {
assertTrue(future.isDone)
assertTrue(e.cause is OutOfMemoryError)
}
}
@Test
fun testCancellableAwait() = runBlocking {
expect(1)
val toAwait = SettableFuture.create<String>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
try {
toAwait.await() // suspends
} catch (e: CancellationException) {
expect(5) // should throw cancellation exception
throw e
}
}
expect(3)
job.cancel() // cancel the job
toAwait.set("fail") // too late, the waiting job was already cancelled
expect(4) // job processing of cancellation was scheduled, not executed yet
yield() // yield main thread to job
finish(6)
}
@Test
fun testFutureAwaitCancellationPropagatingToDeferred() = runTest {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { latch.await(); 42 })
val deferred = async {
expect(2)
future.await()
}
expect(1)
yield()
future.cancel(/*mayInterruptIfRunning=*/true)
expect(3)
latch.countDown()
deferred.join()
assertTrue(future.isCancelled)
assertTrue(deferred.isCancelled)
assertFailsWith<CancellationException> { future.get() }
finish(4)
}
@Test
fun testFutureAwaitCancellationPropagatingToDeferredNoInterruption() = runTest {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { latch.await(); 42 })
val deferred = async {
expect(2)
future.await()
}
expect(1)
yield()
future.cancel(/*mayInterruptIfRunning=*/false)
expect(3)
latch.countDown()
deferred.join()
assertTrue(future.isCancelled)
assertTrue(deferred.isCancelled)
assertFailsWith<CancellationException> { future.get() }
finish(4)
}
@Test
fun testAsListenableFutureCancellationPropagatingToDeferred() = runTest {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { latch.await(); 42 })
val deferred = async {
expect(2)
future.await()
}
val asListenableFuture = deferred.asListenableFuture()
expect(1)
yield()
asListenableFuture.cancel(/*mayInterruptIfRunning=*/true)
expect(3)
latch.countDown()
deferred.join()
assertTrue(future.isCancelled)
assertTrue(deferred.isCancelled)
assertTrue(asListenableFuture.isCancelled)
assertFailsWith<CancellationException> { future.get() }
finish(4)
}
@Test
fun testAsListenableFutureCancellationPropagatingToDeferredNoInterruption() = runTest {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { latch.await(); 42 })
val deferred = async {
expect(2)
future.await()
}
val asListenableFuture = deferred.asListenableFuture()
expect(1)
yield()
asListenableFuture.cancel(/*mayInterruptIfRunning=*/false)
expect(3)
latch.countDown()
deferred.join()
assertFailsWith<CancellationException> { asListenableFuture.get() }
assertTrue(future.isCancelled)
assertTrue(asListenableFuture.isCancelled)
assertTrue(deferred.isCancelled)
assertFailsWith<CancellationException> { future.get() }
finish(4)
}
@Test
fun testAsListenableFutureCancellationThroughSetFuture() = runTest {
val latch = CountDownLatch(1)
val future = SettableFuture.create<Void>()
val deferred = async {
expect(2)
future.await()
}
val asListenableFuture = deferred.asListenableFuture()
expect(1)
yield()
future.setFuture(Futures.immediateCancelledFuture())
expect(3)
latch.countDown()
deferred.join()
assertFailsWith<CancellationException> { asListenableFuture.get() }
// Future was not interrupted, but also wasn't blocking, so it will be successfully
// cancelled by its parent Coroutine.
assertTrue(future.isCancelled)
assertTrue(asListenableFuture.isCancelled)
assertTrue(deferred.isCancelled)
assertFailsWith<CancellationException> { future.get() }
finish(4)
}
@Test
@Ignore // TODO: propagate cancellation before running listeners.
fun testAsListenableFuturePropagatesCancellationBeforeRunningListeners() = runTest {
expect(1)
val deferred = async(context = Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(3) // Cancelled.
}
}
val asFuture = deferred.asListenableFuture()
asFuture.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor())
assertFalse(asFuture.isDone)
expect(2)
asFuture.cancel(false)
assertTrue(asFuture.isDone)
assertTrue(asFuture.isCancelled)
assertFailsWith<CancellationException> { deferred.await() }
finish(5)
}
@Test
fun testFutureCancellation() = runTest {
val future = awaitFutureWithCancel(true)
assertTrue(future.isCancelled)
assertFailsWith<CancellationException> { future.get() }
finish(4)
}
@Test
fun testAsListenableDeferredCancellationCauseAndMessagePropagate() = runTest {
val deferred = CompletableDeferred<Int>()
val inputCancellationException = CancellationException("Foobar")
inputCancellationException.initCause(OutOfMemoryError("Foobaz"))
deferred.cancel(inputCancellationException)
val asFuture = deferred.asListenableFuture()
val outputCancellationException =
assertFailsWith<CancellationException> { asFuture.get() }
val cause = outputCancellationException.cause
assertNotNull(cause)
assertEquals(cause.message, "Foobar")
assertTrue(cause.cause is OutOfMemoryError)
assertEquals(cause.cause?.message, "Foobaz")
}
@Test
fun testNoFutureCancellation() = runTest {
val future = awaitFutureWithCancel(false)
assertFalse(future.isCancelled)
@Suppress("BlockingMethodInNonBlockingContext")
assertEquals(42, future.get())
finish(4)
}
@Test
fun testCancelledDeferredAsListenableFutureAwaitThrowsCancellation() = runTest {
val future = Futures.immediateCancelledFuture<Int>()
val asDeferred = future.asDeferred()
val asDeferredAsFuture = asDeferred.asListenableFuture()
assertTrue(asDeferredAsFuture.isCancelled)
assertFailsWith<CancellationException> {
asDeferredAsFuture.await()
}
}
@Test
fun testCancelledDeferredAsListenableFutureAsDeferredPassesCancellationAlong() = runTest {
val deferred = CompletableDeferred<Int>()
deferred.completeExceptionally(CancellationException())
val asFuture = deferred.asListenableFuture()
val asFutureAsDeferred = asFuture.asDeferred()
assertTrue(asFutureAsDeferred.isCancelled)
assertTrue(asFutureAsDeferred.isCompleted)
// By documentation, join() shouldn't throw when asDeferred is already complete.
asFutureAsDeferred.join()
assertTrue(asFutureAsDeferred.getCompletionExceptionOrNull() is CancellationException)
}
@Test
fun testCancelledFutureAsDeferredAwaitThrowsCancellation() = runTest {
val future = Futures.immediateCancelledFuture<Int>()
val asDeferred = future.asDeferred()
assertTrue(asDeferred.isCancelled)
assertFailsWith<CancellationException> {
asDeferred.await()
}
}
@Test
fun testCancelledFutureAsDeferredJoinDoesNotThrow() = runTest {
val future = Futures.immediateCancelledFuture<Int>()
val asDeferred = future.asDeferred()
assertTrue(asDeferred.isCancelled)
assertTrue(asDeferred.isCompleted)
// By documentation, join() shouldn't throw when asDeferred is already complete.
asDeferred.join()
assertTrue(asDeferred.getCompletionExceptionOrNull() is CancellationException)
}
@Test
fun testCompletedFutureAsDeferred() = runTest {
val future = SettableFuture.create<Int>()
val task = async {
expect(2)
assertEquals(42, future.asDeferred().await())
expect(4)
}
expect(1)
yield()
expect(3)
future.set(42)
task.join()
finish(5)
}
@Test
fun testFailedFutureAsDeferred() = runTest {
val future = SettableFuture.create<Int>().apply {
setException(TestException())
}
val deferred = future.asDeferred()
assertTrue(deferred.isCancelled && deferred.isCompleted)
val completionException = deferred.getCompletionExceptionOrNull()!!
assertTrue(completionException is TestException)
try {
deferred.await()
expectUnreached()
} catch (e: Throwable) {
assertTrue(e is TestException)
}
}
@Test
fun testFutureCompletedWithNullFastPathAsDeferred() = runTest {
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable<Int> { null }).also {
@Suppress("BlockingMethodInNonBlockingContext")
it.get()
}
assertNull(future.asDeferred().await())
}
@Test
fun testFutureCompletedWithNullSlowPathAsDeferred() = runTest {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable<Int> {
latch.await()
null
})
val awaiter = async(start = CoroutineStart.UNDISPATCHED) {
future.asDeferred().await()
}
latch.countDown()
assertNull(awaiter.await())
}
@Test
fun testThrowingFutureAsDeferred() = runTest {
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { throw TestException() })
try {
future.asDeferred().await()
expectUnreached()
} catch (e: Throwable) {
assertTrue(e is TestException)
}
}
@Test
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}
@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}
result.checkFutureException<TestException>()
}
@Test
fun testExternalCancellation() = runTest {
val future = future(Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
expectUnreached()
} catch (e: CancellationException) {
expect(2)
throw e
}
}
yield()
expect(1)
future.cancel(true)
finish(3)
}
@Test
fun testExceptionOnExternalCancellation() = runTest(expected = {it is TestException}) {
val result = future(Dispatchers.Unconfined) {
try {
expect(1)
delay(Long.MAX_VALUE)
expectUnreached()
} catch (e: CancellationException) {
expect(3)
throw TestException()
}
}
expect(2)
result.cancel(true)
finish(4)
}
@Test
fun testUnhandledExceptionOnExternalCancellation() = runTest {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled and is set to be lost.
}
}
result.cancel(true)
finish(3)
}
/** This test ensures that we never pass [CancellationException] to [CoroutineExceptionHandler]. */
@Test
fun testCancellationExceptionOnExternalCancellation() = runTest {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestCancellationException() // this exception cannot be handled
}
}
assertTrue(result.cancel(true))
finish(3)
}
@Test
fun testCancellingFutureContextJobCancelsFuture() = runTest {
expect(1)
val supervisorJob = SupervisorJob()
val future = future(context = supervisorJob) {
expect(2)
try {
delay(Long.MAX_VALUE)
expectUnreached()
} catch (e: CancellationException) {
expect(4)
throw e
}
}
yield()
expect(3)
supervisorJob.cancel(CancellationException("Parent cancelled", TestException()))
supervisorJob.join()
assertTrue(future.isDone)
assertTrue(future.isCancelled)
val thrown = assertFailsWith<CancellationException> { future.get() }
val cause = thrown.cause
assertNotNull(cause)
assertTrue(cause is CancellationException)
assertEquals("Parent cancelled", cause.message)
assertTrue(cause.cause is TestException)
finish(5)
}
@Test
fun testFutureChildException() = runTest {
val future = future(context = NonCancellable + Dispatchers.Unconfined) {
val foo = async { delay(Long.MAX_VALUE); 42 }
val bar = async<Int> { throw TestException() }
foo.await() + bar.await()
}
future.checkFutureException<TestException>()
}
@Test
fun testFutureIsDoneAfterChildrenCompleted() = runTest {
expect(1)
val testException = TestException()
// Don't propagate exception to the test and use different dispatchers as we are going to block test thread.
val future = future(context = NonCancellable + Dispatchers.Default) {
val foo = async {
try {
delay(Long.MAX_VALUE)
42
} finally {
withContext(NonCancellable) {
delay(200)
}
}
}
foo.invokeOnCompletion {
expect(3)
}
val bar = async<Int> { throw testException }
foo.await() + bar.await()
}
yield()
expect(2)
// Blocking get should succeed after internal coroutine completes.
val thrown = assertFailsWith<ExecutionException> { future.get() }
expect(4)
assertEquals(testException, thrown.cause)
finish(5)
}
@Test
@Ignore // TODO: propagate cancellation before running listeners.
fun testFuturePropagatesCancellationBeforeRunningListeners() = runTest {
expect(1)
val future = future(context = Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(3) // Cancelled.
}
}
future.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor())
assertFalse(future.isDone)
expect(2)
future.cancel(false)
assertTrue(future.isDone)
assertTrue(future.isCancelled)
finish(5)
}
@Test
fun testFutureCompletedExceptionally() = runTest {
val testException = TestException()
// NonCancellable to not propagate error to this scope.
val future = future(context = NonCancellable) {
throw testException
}
yield()
assertTrue(future.isDone)
assertFalse(future.isCancelled)
val thrown = assertFailsWith<ExecutionException> { future.get() }
assertEquals(testException, thrown.cause)
}
@Test
fun testAsListenableFutureCompletedExceptionally() = runTest {
val testException = TestException()
val deferred = CompletableDeferred<String>().apply {
completeExceptionally(testException)
}
val asListenableFuture = deferred.asListenableFuture()
assertTrue(asListenableFuture.isDone)
assertFalse(asListenableFuture.isCancelled)
val thrown = assertFailsWith<ExecutionException> { asListenableFuture.get() }
assertEquals(testException, thrown.cause)
}
private inline fun <reified T: Throwable> ListenableFuture<*>.checkFutureException() {
val e = assertFailsWith<ExecutionException> { get() }
val cause = e.cause!!
assertTrue(cause is T)
}
@Suppress("SuspendFunctionOnCoroutineScope")
private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture<Int> {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { latch.await(); 42 })
val deferred = async {
expect(2)
if (cancellable) future.await()
else future.asDeferred().await()
}
expect(1)
yield()
deferred.cancel()
expect(3)
latch.countDown()
return future
}
@Test
fun testCancelledParent() = runTest({ it is CancellationException }) {
cancel()
future { expectUnreached() }
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}
@Test
fun testStackOverflow() = runTest {
val future = SettableFuture.create<Int>()
val completed = AtomicLong()
val count = 10000L
val children = ArrayList<Job>()
for (i in 0 until count) {
children += launch(Dispatchers.Default) {
future.asDeferred().await()
completed.incrementAndGet()
}
}
future.set(1)
withTimeout(60_000) {
children.forEach { it.join() }
assertEquals(count, completed.get())
}
}
@Test
fun testFuturePropagatesExceptionToParentAfterCancellation() = runTest {
val throwLatch = CompletableDeferred<Boolean>()
val cancelLatch = CompletableDeferred<Boolean>()
val parent = Job()
val scope = CoroutineScope(parent)
val exception = TestException("propagated to parent")
val future = scope.future {
cancelLatch.complete(true)
withContext(NonCancellable) {
throwLatch.await()
throw exception
}
}
cancelLatch.await()
future.cancel(true)
throwLatch.complete(true)
parent.join()
assertTrue(parent.isCancelled)
assertEquals(exception, parent.getCancellationException().cause)
}
// Stress tests.
@Test
fun testFutureDoesNotReportToCoroutineExceptionHandler() = runTest {
repeat(1000) {
supervisorScope { // Don't propagate failures in children to parent and other children.
val innerFuture = SettableFuture.create<Unit>()
val outerFuture = async { innerFuture.await() }
withContext(Dispatchers.Default) {
launch { innerFuture.setException(TestException("can be lost")) }
launch { outerFuture.cancel() }
// nothing should be reported to CoroutineExceptionHandler, otherwise `Future.cancel` contract violation.
}
}
}
}
@Test
fun testJobListenableFutureIsCancelledDoesNotThrow() = runTest {
repeat(1000) {
val deferred = CompletableDeferred<String>()
val asListenableFuture = deferred.asListenableFuture()
// We heed two threads to test a race condition.
withContext(Dispatchers.Default) {
val cancellationJob = launch {
asListenableFuture.cancel(false)
}
while (!cancellationJob.isCompleted) {
asListenableFuture.isCancelled // Shouldn't throw.
}
}
}
}
}