blob: f52f8b5bcf90212263ca3ee6c92955076d302d7c [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.exceptions
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.junit.*
import org.junit.rules.*
import kotlin.coroutines.*
class StackTraceRecoveryChannelsTest : TestBase() {
@get:Rule
val name = TestName()
@Test
fun testReceiveFromChannel() = runTest {
val channel = Channel<Int>()
val job = launch {
expect(2)
channel.close(RecoverableTestException())
}
expect(1)
channelReceive(channel)
expect(3)
job.join()
finish(4)
}
@Test
fun testReceiveFromClosedChannel() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
channelReceive(channel)
}
@Test
fun testReceiveOrNullFromClosedChannel() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
channelReceiveOrNull(channel)
}
@Test
fun testSendToClosedChannel() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
channelSend(channel)
}
@Test
fun testSendToChannel() = runTest {
val channel = Channel<Int>()
val job = launch {
expect(2)
channel.cancel()
}
expect(1)
channelSend(channel)
expect(3)
job.join()
finish(4)
}
private suspend fun channelReceive(channel: Channel<Int>) = channelOp { channel.receive() }
private suspend fun channelReceiveOrNull(channel: Channel<Int>) = channelOp { channel.receiveOrNull() }
private suspend inline fun channelOp(block: () -> Unit) {
try {
yield()
block()
expectUnreached()
} catch (e: RecoverableTestException) {
verifyStackTrace("channels/${name.methodName}", e)
}
}
private suspend fun channelSend(channel: Channel<Int>) {
try {
yield()
channel.send(1)
expectUnreached()
} catch (e: Exception) {
verifyStackTrace("channels/${name.methodName}", e)
}
}
@Test
fun testOfferWithCurrentContext() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
try {
channel.sendWithContext(coroutineContext)
} catch (e: RecoverableTestException) {
verifyStackTrace("channels/${name.methodName}", e)
}
}
@Test
fun testOfferWithContextWrapped() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
try {
channel.sendWithContext(wrapperDispatcher(coroutineContext))
} catch (e: Exception) {
verifyStackTrace("channels/${name.methodName}", e)
}
}
@Test
fun testOfferFromScope() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
try {
channel.sendFromScope()
} catch (e: Exception) {
verifyStackTrace("channels/${name.methodName}", e)
}
}
// Slow path via suspending send
@Test
fun testSendFromScope() = runTest {
val channel = Channel<Int>()
val deferred = async {
try {
expect(1)
channel.sendFromScope()
} catch (e: Exception) {
verifyStackTrace("channels/${name.methodName}", e)
}
}
yield()
expect(2)
// Cancel is an analogue of `produce` failure, just a shorthand
channel.cancel(RecoverableTestCancellationException())
finish(3)
deferred.await()
}
// See https://github.com/Kotlin/kotlinx.coroutines/issues/950
@Test
fun testCancelledOffer() = runTest {
expect(1)
val job = Job()
val actor = actor<Int>(job, Channel.UNLIMITED) {
consumeEach {
expectUnreached() // is cancelled before offer
}
}
job.cancel()
try {
actor.offer(1)
} catch (e: Exception) {
verifyStackTrace("channels/${name.methodName}", e)
finish(2)
}
}
private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) {
sendInChannel()
yield() // TCE
}
private suspend fun Channel<Int>.sendInChannel() {
send(42)
yield() // TCE
}
private suspend fun Channel<Int>.sendFromScope() = coroutineScope {
sendWithContext(wrapperDispatcher(coroutineContext))
}
}