blob: 194504e713c2353d0f61ddabdcaf60b3ad6e1c16 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
import kotlin.test.*
class ProduceTest : TestBase() {
@Test
fun testBasic() = runTest {
val c = produce {
expect(2)
send(1)
expect(3)
send(2)
expect(6)
}
expect(1)
check(c.receive() == 1)
expect(4)
check(c.receive() == 2)
expect(5)
check(c.receiveOrNull() == null)
finish(7)
}
@Test
fun testCancelWithoutCause() = runTest {
val c = produce(NonCancellable) {
expect(2)
send(1)
expect(3)
try {
send(2) // will get cancelled
expectUnreached()
} catch (e: Throwable) {
expect(7)
check(e is CancellationException)
throw e
}
expectUnreached()
}
expect(1)
check(c.receive() == 1)
expect(4)
c.cancel()
expect(5)
assertFailsWith<CancellationException> { c.receiveOrNull() }
expect(6)
yield() // to produce
finish(8)
}
@Test
fun testCancelWithCause() = runTest {
val c = produce(NonCancellable) {
expect(2)
send(1)
expect(3)
try {
send(2) // will get cancelled
expectUnreached()
} catch (e: Throwable) {
expect(6)
check(e is TestCancellationException)
throw e
}
expectUnreached()
}
expect(1)
check(c.receive() == 1)
expect(4)
c.cancel(TestCancellationException())
try {
assertNull(c.receiveOrNull())
expectUnreached()
} catch (e: TestCancellationException) {
expect(5)
}
yield() // to produce
finish(7)
}
@Test
fun testCancelOnCompletionUnconfined() = runTest {
cancelOnCompletion(Dispatchers.Unconfined)
}
@Test
fun testCancelOnCompletion() = runTest {
cancelOnCompletion(coroutineContext)
}
@Test
fun testCancelWhenTheChannelIsClosed() = runTest {
val channel = produce<Int> {
send(1)
close()
expect(2)
launch {
expect(3)
hang { expect(5) }
}
}
expect(1)
channel.receive()
yield()
expect(4)
channel.cancel()
(channel as Job).join()
finish(6)
}
@Test
fun testAwaitConsumerCancellation() = runTest {
val parent = Job()
val channel = produce<Int>(parent) {
expect(2)
awaitClose { expect(4) }
}
expect(1)
yield()
expect(3)
channel.cancel()
parent.complete()
parent.join()
finish(5)
}
@Test
fun testAwaitProducerCancellation() = runTest {
val parent = Job()
produce<Int>(parent) {
expect(2)
launch {
expect(3)
this@produce.cancel()
}
awaitClose { expect(4) }
}
expect(1)
parent.complete()
parent.join()
finish(5)
}
@Test
fun testAwaitParentCancellation() = runTest {
val parent = Job()
produce<Int>(parent) {
expect(2)
awaitClose { expect(4) }
}
expect(1)
yield()
expect(3)
parent.cancelAndJoin()
finish(5)
}
@Test
fun testAwaitIllegalState() = runTest {
val channel = produce<Int> { }
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).awaitClose() }
callbackFlow<Unit> {
expect(1)
launch {
expect(2)
assertFailsWith<IllegalStateException> {
awaitClose { expectUnreached() }
expectUnreached()
}
}
close()
}.collect()
finish(3)
}
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
val source = Channel<Int>()
expect(1)
val produced = produce<Int>(coroutineContext, onCompletion = { source.cancelConsumed(it) }) {
expect(2)
source.receive()
}
yield()
expect(3)
produced.cancel()
try {
source.receive()
} catch (e: CancellationException) {
finish(4)
}
}
}