blob: b8bc3f00be3d31addf2fe67289c0d4bed4d86df2 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import org.junit.*
import java.util.concurrent.*
import kotlin.test.*
import kotlin.test.Test
class CancellableContinuationResumeCloseStressTest : TestBase() {
@get:Rule
public val dispatcher = ExecutorRule(2)
private val startBarrier = CyclicBarrier(3)
private val doneBarrier = CyclicBarrier(2)
private val nRepeats = 1_000 * stressTestMultiplier
private val closed = atomic(false)
private var returnedOk = false
@Test
@Suppress("BlockingMethodInNonBlockingContext")
fun testStress() = runTest {
repeat(nRepeats) {
closed.value = false
returnedOk = false
val job = testJob()
startBarrier.await()
job.cancel() // (1) cancel job
job.join()
// check consistency
doneBarrier.await()
if (returnedOk) {
assertFalse(closed.value, "should not have closed resource -- returned Ok")
} else {
assertTrue(closed.value, "should have closed resource -- was cancelled")
}
}
}
private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) {
val ok = resumeClose() // might be cancelled
assertEquals("OK", ok)
returnedOk = true
}
private suspend fun resumeClose() = suspendCancellableCoroutine<String> { cont ->
dispatcher.executor.execute {
startBarrier.await() // (2) resume at the same time
cont.resume("OK") {
close()
}
doneBarrier.await()
}
startBarrier.await() // (3) return at the same time
}
fun close() {
assertFalse(closed.getAndSet(true))
}
}