blob: 3c04abdd9952cd3f0150d8d5991ccc3e6fb4020e [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow.operators
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.test.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds
class SampleTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500)
emit("B")
delay(500)
emit("C")
delay(250)
emit("D")
delay(2000)
emit("E")
expect(4)
}
expect(2)
val result = flow.sample(1000).toList()
assertEquals(listOf("A", "B", "D"), result)
finish(5)
}
@Test
fun testDelayedFirst() = withVirtualTime {
val flow = flow {
delay(60)
emit(1)
delay(60)
expect(1)
}.sample(100)
assertEquals(1, flow.singleOrNull())
finish(2)
}
@Test
fun testBasic2() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit(1)
emit(2)
delay(501)
emit(3)
delay(100)
emit(4)
delay(100)
emit(5)
emit(6)
delay(301)
emit(7)
delay(501)
expect(4)
}
expect(2)
val result = flow.sample(500).toList()
assertEquals(listOf(2, 6, 7), result)
finish(5)
}
@Test
fun testFixedDelay() = withVirtualTime {
val flow = flow {
emit("A")
delay(150)
emit("B")
expect(1)
}.sample(100)
assertEquals("A", flow.single())
finish(2)
}
@Test
fun testSingleNull() = withVirtualTime {
val flow = flow<Int?> {
emit(null)
delay(2)
expect(1)
}.sample(1)
assertNull(flow.single())
finish(2)
}
@Test
fun testBasicWithNulls() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500)
emit(null)
delay(500)
emit("C")
delay(250)
emit(null)
delay(2000)
emit("E")
expect(4)
}
expect(2)
val result = flow.sample(1000).toList()
assertEquals(listOf("A", null, null), result)
finish(5)
}
@Test
fun testEmpty() = runTest {
val flow = emptyFlow<Int>().sample(Long.MAX_VALUE)
assertNull(flow.singleOrNull())
}
@Test
fun testScalar() = runTest {
val flow = flowOf(1, 2, 3).sample(Long.MAX_VALUE)
assertNull(flow.singleOrNull())
}
@Test
// note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits
fun testLongWait() = withVirtualTime {
expect(1)
val flow = flow {
expect(2)
emit("A")
delay(3500) // long delay -- multiple sampling intervals
emit("B")
delay(900) // crosses time = 4000 barrier
emit("C")
delay(3000) // long wait again
}
val result = flow.sample(1000).toList()
assertEquals(listOf("A", "B", "C"), result)
finish(3)
}
@Test
fun testPace() = withVirtualTime {
val flow = flow {
expect(1)
repeat(4) {
emit(-it)
delay(50)
}
repeat(4) {
emit(it)
delay(100)
}
expect(2)
}.sample(100)
assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList())
finish(3)
}
@Test
fun testUpstreamError() = testUpstreamError(TestException())
@Test
fun testUpstreamErrorCancellationException() = testUpstreamError(CancellationException(""))
private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
val latch = Channel<Unit>()
val flow = flow {
expect(1)
emit(1)
expect(2)
latch.receive()
throw cause
}.sample(1).map {
latch.send(Unit)
hang { expect(3) }
}
assertFailsWith<T>(flow)
finish(4)
}
@Test
fun testUpstreamErrorIsolatedContext() = runTest {
val latch = Channel<Unit>()
val flow = flow {
assertEquals("upstream", NamedDispatchers.name())
expect(1)
emit(1)
expect(2)
latch.receive()
throw TestException()
}.flowOn(NamedDispatchers("upstream")).sample(1).map {
latch.send(Unit)
hang { expect(3) }
}
assertFailsWith<TestException>(flow)
finish(4)
}
@Test
fun testUpstreamErrorSampleNotTriggered() = runTest {
val flow = flow {
expect(1)
emit(1)
expect(2)
throw TestException()
}.sample(Long.MAX_VALUE).map {
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
}
@Test
fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest {
val flow = flow {
expect(1)
emit(1)
expect(2)
throw TestException()
}.flowOn(NamedDispatchers("unused")).sample(Long.MAX_VALUE).map {
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
}
@Test
fun testDownstreamError() = runTest {
val flow = flow {
expect(1)
emit(1)
hang { expect(3) }
}.sample(100).map {
expect(2)
yield()
throw TestException()
}
assertFailsWith<TestException>(flow)
finish(4)
}
@Test
fun testDownstreamErrorIsolatedContext() = runTest {
val flow = flow {
assertEquals("upstream", NamedDispatchers.name())
expect(1)
emit(1)
hang { expect(3) }
}.flowOn(NamedDispatchers("upstream")).sample(100).map {
expect(2)
yield()
throw TestException()
}
assertFailsWith<TestException>(flow)
finish(4)
}
@Test
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500.milliseconds)
emit("B")
delay(500.milliseconds)
emit("C")
delay(250.milliseconds)
emit("D")
delay(2000.milliseconds)
emit("E")
expect(4)
}
expect(2)
val result = flow.sample(1000.milliseconds).toList()
assertEquals(listOf("A", "B", "D"), result)
finish(5)
}
}