blob: 892a2a62d491c795dcf10b5747dd76a9db50332f [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import kotlin.coroutines.*
import kotlin.test.*
class ReusableCancellableContinuationTest : TestBase() {
@Test
fun testReusable() = runTest {
testContinuationsCount(10, 1, ::suspendAtomicCancellableCoroutineReusable)
}
@Test
fun testRegular() = runTest {
testContinuationsCount(10, 10, ::suspendAtomicCancellableCoroutine)
}
private suspend inline fun CoroutineScope.testContinuationsCount(
iterations: Int,
expectedInstances: Int,
suspender: suspend ((CancellableContinuation<Unit>) -> Unit) -> Unit
) {
val result = mutableSetOf<CancellableContinuation<*>>()
val job = coroutineContext[Job]!!
val channel = Channel<Continuation<Unit>>(1)
launch {
channel.consumeEach {
val f = FieldWalker.walk(job)
result.addAll(f.filterIsInstance<CancellableContinuation<*>>())
it.resumeWith(Result.success(Unit))
}
}
repeat(iterations) {
suspender {
assertTrue(channel.offer(it))
}
}
channel.close()
assertEquals(expectedInstances, result.size - 1)
}
@Test
fun testCancelledOnClaimedCancel() = runTest {
expect(1)
try {
suspendAtomicCancellableCoroutineReusable<Unit> {
it.cancel()
}
expectUnreached()
} catch (e: CancellationException) {
finish(2)
}
}
@Test
fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) {
expect(1)
// Bind child at first
var continuation: Continuation<*>? = null
suspendAtomicCancellableCoroutineReusable<Unit> {
expect(2)
continuation = it
launch { // Attach to the parent, avoid fast path
expect(3)
it.resume(Unit)
}
}
expect(4)
ensureActive()
// Verify child was bound
FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it === continuation }
suspendAtomicCancellableCoroutineReusable<Unit> {
expect(5)
coroutineContext[Job]!!.cancel()
it.resume(Unit)
}
assertFalse(isActive)
finish(6)
}
@Test
fun testResumeReusablePreservesReference() = runTest {
expect(1)
var cont: Continuation<Unit>? = null
launch {
cont!!.resumeWith(Result.success(Unit))
}
suspendAtomicCancellableCoroutineReusable<Unit> {
cont = it
}
ensureActive()
assertTrue { FieldWalker.walk(coroutineContext[Job]).contains(cont!!) }
finish(2)
}
@Test
fun testResumeRegularDoesntPreservesReference() = runTest {
expect(1)
var cont: Continuation<Unit>? = null
launch { // Attach to the parent, avoid fast path
cont!!.resumeWith(Result.success(Unit))
}
suspendAtomicCancellableCoroutine<Unit> {
cont = it
}
ensureActive()
FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont }
finish(2)
}
@Test
fun testDetachedOnCancel() = runTest {
expect(1)
var cont: Continuation<*>? = null
try {
suspendAtomicCancellableCoroutineReusable<Unit> {
cont = it
it.cancel()
}
expectUnreached()
} catch (e: CancellationException) {
FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont }
finish(2)
}
}
@Test
fun testPropagatedCancel() = runTest({it is CancellationException}) {
val currentJob = coroutineContext[Job]!!
expect(1)
// Bind child at first
suspendAtomicCancellableCoroutineReusable<Unit> {
expect(2)
// Attach to the parent, avoid fast path
launch {
expect(3)
it.resume(Unit)
}
}
expect(4)
ensureActive()
// Verify child was bound
FieldWalker.assertReachableCount(1, currentJob) { it is CancellableContinuation<*> }
currentJob.cancel()
assertFalse(isActive)
// Child detached
FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
suspendAtomicCancellableCoroutineReusable<Unit> { it.resume(Unit) }
suspendAtomicCancellableCoroutineReusable<Unit> { it.resume(Unit) }
FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
try {
suspendAtomicCancellableCoroutineReusable<Unit> {}
} catch (e: CancellationException) {
FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
finish(5)
}
}
@Test
fun testChannelMemoryLeak() = runTest {
val iterations = 100
val channel = Channel<Unit>()
launch {
repeat(iterations) {
select {
channel.onSend(Unit) {}
}
}
}
val receiver = launch {
repeat(iterations) {
channel.receive()
}
expect(2)
val job = coroutineContext[Job]!!
// 1 for reusable CC, another one for outer joiner
FieldWalker.assertReachableCount(2, job) { it is CancellableContinuation<*> }
}
expect(1)
receiver.join()
// Reference should be claimed at this point
FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> }
finish(3)
}
@Test
fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest {
val channel = produce {
repeat(10) {
send(Unit)
}
}
for (value in channel) {
delay(1)
}
FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it is ChildContinuation }
}
}