blob: 02dbfc40d926212dcb2fce5b868d47ffa7821fc0 [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.test.*
/*
* Replace: { i, j -> i + j } -> ::sum as soon as KT-30991 is fixed
*/
class ZipTest : TestBase() {
@Test
fun testZip() = runTest {
val f1 = flowOf("a", "b", "c")
val f2 = flowOf(1, 2, 3)
assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2, { i, j -> i + j }).toList())
}
@Test
fun testUnevenZip() = runTest {
val f1 = flowOf("a", "b", "c", "d", "e")
val f2 = flowOf(1, 2, 3)
assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2) { i, j -> i + j }.toList())
assertEquals(listOf("a1", "b2", "c3"), f2.zip(f1) { i, j -> j + i }.toList())
}
@Test
fun testEmptyFlows() = runTest {
val f1 = emptyFlow<String>()
val f2 = emptyFlow<Int>()
assertEquals(emptyList(), f1.zip(f2, { i, j -> i + j }).toList())
}
@Test
fun testEmpty() = runTest {
val f1 = emptyFlow<String>()
val f2 = flowOf(1)
assertEquals(emptyList(), f1.zip(f2, { i, j -> i + j }).toList())
}
@Test
fun testEmptyOther() = runTest {
val f1 = flowOf("a")
val f2 = emptyFlow<Int>()
assertEquals(emptyList(), f1.zip(f2, { i, j -> i + j }).toList())
}
@Test
fun testNulls() = runTest {
val f1 = flowOf("a", null, null, "d")
val f2 = flowOf(1, 2, 3)
assertEquals(listOf("a1", "null2", "null3"), f1.zip(f2, { i, j -> i + j }).toList())
}
@Test
fun testNullsOther() = runTest {
val f1 = flowOf("a", "b", "c")
val f2 = flowOf(1, null, null, 2)
assertEquals(listOf("a1", "bnull", "cnull"), f1.zip(f2, { i, j -> i + j }).toList())
}
@Test
fun testCancelWhenFlowIsDone() = runTest {
val f1 = flow<String> {
emit("1")
emit("2")
}
val f2 = flow<String> {
emit("a")
emit("b")
expectUnreached()
}
assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList())
finish(1)
}
@Test
fun testCancelWhenFlowIsDone2() = runTest {
val f1 = flow<String> {
emit("1")
emit("2")
try {
emit("3")
expectUnreached()
} finally {
expect(1)
}
}
val f2 = flowOf("a", "b")
assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList())
finish(2)
}
@Test
fun testCancelWhenFlowIsDoneReversed() = runTest {
val f1 = flow<String> {
emit("1")
emit("2")
hang {
expect(1)
}
}
val f2 = flow<String> {
emit("a")
emit("b")
yield()
}
assertEquals(listOf("a1", "b2"), f2.zip(f1) { s1, s2 -> s1 + s2 }.toList())
finish(2)
}
@Test
fun testContextIsIsolatedReversed() = runTest {
val f1 = flow {
emit("a")
assertEquals("first", NamedDispatchers.name())
expect(3)
}.flowOn(NamedDispatchers("first")).onEach {
assertEquals("with", NamedDispatchers.name())
expect(4)
}.flowOn(NamedDispatchers("with"))
val f2 = flow {
emit(1)
assertEquals("second", NamedDispatchers.name())
expect(1)
}.flowOn(NamedDispatchers("second")).onEach {
assertEquals("nested", NamedDispatchers.name())
expect(2)
}.flowOn(NamedDispatchers("nested"))
val value = withContext(NamedDispatchers("main")) {
f1.zip(f2) { i, j ->
assertEquals("main", NamedDispatchers.name())
expect(5)
i + j
}.single()
}
assertEquals("a1", value)
finish(6)
}
@Test
fun testErrorInDownstreamCancelsUpstream() = runTest {
val f1 = flow {
emit("a")
hang {
expect(3)
}
}.flowOn(NamedDispatchers("first"))
val f2 = flow {
emit(1)
hang {
expect(2)
}
}.flowOn(NamedDispatchers("second"))
val flow = f1.zip(f2) { i, j ->
assertEquals("zip", NamedDispatchers.name())
expect(1)
i + j
}.flowOn(NamedDispatchers("zip")).onEach {
throw TestException()
}
assertFailsWith<TestException>(flow)
finish(4)
}
@Test
fun testErrorCancelsSibling() = runTest {
val f1 = flow {
emit("a")
hang {
expect(1)
}
}.flowOn(NamedDispatchers("first"))
val f2 = flow {
emit(1)
throw TestException()
}.flowOn(NamedDispatchers("second"))
val flow = f1.zip(f2) { _, _ -> 1 }
assertFailsWith<TestException>(flow)
finish(2)
}
@Test
fun testCancellationUpstream() = runTest {
val f1 = flow {
expect(1)
emit(1)
expect(5)
throw CancellationException("")
}
val f2 = flow {
expect(2)
emit(1)
expect(3)
hang { expect(6) }
}
val flow = f1.zip(f2) { _, _ -> 1 }.onEach { expect(4) }
assertFailsWith<CancellationException>(flow)
finish(7)
}
@Test
fun testCancellationDownstream() = runTest {
val f1 = flow {
expect(1)
emit(1)
expectUnreached() // Will throw CE
}
val f2 = flow {
expect(2)
emit(1)
expect(3)
hang { expect(5) }
}
val flow = f1.zip(f2, { _, _ -> 1 }).onEach {
expect(4)
yield()
throw CancellationException("")
}
assertFailsWith<CancellationException>(flow)
finish(6)
}
@Test
fun testCancellationOfCollector() = runTest {
val f1 = flow {
emit("1")
awaitCancellation()
}
val f2 = flow {
emit("2")
yield()
}
f1.zip(f2) { a, b -> a + b }.collect { }
}
}