blob: 91d941b32ce8a547f789da965c201fdabf557901 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlin.test.*
class BasicOperationsTest : TestBase() {
@Test
fun testSimpleSendReceive() = runTest {
// Parametrized common test :(
TestChannelKind.values().forEach { kind -> testSendReceive(kind, 20) }
}
@Test
fun testOfferAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testOffer(kind) }
}
@Test
fun testSendAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
}
@Test
fun testReceiveOrNullAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
}
@Test
fun testReceiveOrNullAfterCloseWithException() = runTest {
TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) }
}
@Test
fun testReceiveOrClosed() = runTest {
TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) }
}
@Test
fun testInvokeOnClose() = TestChannelKind.values().forEach { kind ->
reset()
val channel = kind.create<Int>()
channel.invokeOnClose {
if (it is AssertionError) {
expect(3)
}
}
expect(1)
channel.offer(42)
expect(2)
channel.close(AssertionError())
finish(4)
}
@Test
fun testInvokeOnClosed() = TestChannelKind.values().forEach { kind ->
reset()
expect(1)
val channel = kind.create<Int>()
channel.close()
channel.invokeOnClose { expect(2) }
assertFailsWith<IllegalStateException> { channel.invokeOnClose { expect(3) } }
finish(3)
}
@Test
fun testMultipleInvokeOnClose() = TestChannelKind.values().forEach { kind ->
reset()
val channel = kind.create<Int>()
channel.invokeOnClose { expect(3) }
expect(1)
assertFailsWith<IllegalStateException> { channel.invokeOnClose { expect(4) } }
expect(2)
channel.close()
finish(4)
}
@Test
fun testIterator() = runTest {
TestChannelKind.values().forEach { kind ->
val channel = kind.create<Int>()
val iterator = channel.iterator()
assertFailsWith<IllegalStateException> { iterator.next() }
channel.close()
assertFailsWith<IllegalStateException> { iterator.next() }
assertFalse(iterator.hasNext())
}
}
private suspend fun testReceiveOrNull(kind: TestChannelKind) = coroutineScope {
val channel = kind.create<Int>()
val d = async(NonCancellable) {
channel.receive()
}
yield()
channel.close()
assertTrue(channel.isClosedForReceive)
assertNull(channel.receiveOrNull())
assertNull(channel.poll())
d.join()
assertTrue(d.getCancellationException().cause is ClosedReceiveChannelException)
}
private suspend fun testReceiveOrNullException(kind: TestChannelKind) = coroutineScope {
val channel = kind.create<Int>()
val d = async(NonCancellable) {
channel.receive()
}
yield()
channel.close(TestException())
assertTrue(channel.isClosedForReceive)
assertFailsWith<TestException> { channel.poll() }
try {
channel.receiveOrNull()
fail()
} catch (e: TestException) {
// Expected
}
d.join()
assertTrue(d.getCancellationException().cause is TestException)
}
@Suppress("ReplaceAssertBooleanWithAssertEquality")
private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope {
reset()
val channel = kind.create<Int>()
launch {
expect(2)
channel.send(1)
}
expect(1)
val result = channel.receiveOrClosed()
assertEquals(1, result.value)
assertEquals(1, result.valueOrNull)
assertTrue(ValueOrClosed.value(1) == result)
expect(3)
launch {
expect(4)
channel.close()
}
val closed = channel.receiveOrClosed()
expect(5)
assertNull(closed.valueOrNull)
assertTrue(closed.isClosed)
assertNull(closed.closeCause)
assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed)
finish(6)
}
private suspend fun testOffer(kind: TestChannelKind) = coroutineScope {
val channel = kind.create<Int>()
val d = async { channel.send(42) }
yield()
channel.close()
assertTrue(channel.isClosedForSend)
try {
channel.offer(2)
fail()
} catch (e: ClosedSendChannelException) {
if (!kind.isConflated) {
assertEquals(42, channel.receive())
}
}
d.await()
}
/**
* [ClosedSendChannelException] should not be eaten.
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
*/
private suspend fun testSendAfterClose(kind: TestChannelKind) {
assertFailsWith<ClosedSendChannelException> {
coroutineScope {
val channel = kind.create<Int>()
channel.close()
launch {
channel.send(1)
}
}
}
}
private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) = coroutineScope {
val channel = kind.create<Int>()
launch {
repeat(iterations) { channel.send(it) }
channel.close()
}
var expected = 0
for (x in channel) {
if (!kind.isConflated) {
assertEquals(expected++, x)
} else {
assertTrue(x >= expected)
expected = x + 1
}
}
if (!kind.isConflated) {
assertEquals(iterations, expected)
}
}
}