blob: 551988b8147af1f58483e3b6a67895c450f64a47 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.*
import org.junit.Test
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.*
import java.time.*
import java.time.Duration.*
import java.util.function.*
import kotlin.test.*
class MonoTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("timer-", "parallel-")
}
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
"OK"
}
expect(2)
mono.subscribe { value ->
expect(5)
assertEquals("OK", value)
}
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicFailure() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
expect(2)
mono.subscribe({
expectUnreached()
}, { error ->
expect(5)
assertTrue(error is RuntimeException)
assertEquals("OK", error.message)
})
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicEmpty() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
null
}
expect(2)
mono.subscribe({}, { throw it }, {
expect(5)
})
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
}
expect(2)
// nothing is called on a disposed mono
val sub = mono.subscribe({
expectUnreached()
}, {
expectUnreached()
})
expect(3)
yield() // to started coroutine
expect(5)
sub.dispose() // will cancel coroutine
yield()
finish(6)
}
@Test
fun testMonoNoWait() {
val mono = mono {
"OK"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testMonoAwait() = runBlocking {
assertEquals("OK", Mono.just("O").awaitSingle() + "K")
}
@Test
fun testMonoEmitAndAwait() {
val mono = mono {
Mono.just("O").awaitSingle() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testMonoWithDelay() {
val mono = mono {
Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testMonoException() {
val mono = mono {
Flux.just("O", "K").awaitSingle() + "K"
}
checkErroneous(mono) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitFirst() {
val mono = mono {
Flux.just("O", "#").awaitFirst() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitLast() {
val mono = mono {
Flux.just("#", "O").awaitLast() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromFlux() {
val mono = mono {
try {
Flux.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
Flux.just(e.message!!).awaitLast() + "K"
}
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromCoroutine() {
val mono = mono<String> {
throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
}
checkErroneous(mono) {
assert(it is IllegalStateException)
assertEquals("OK", it.message)
}
}
@Test
fun testSuppressedException() = runTest {
val mono = mono(currentDispatcher()) {
launch(start = CoroutineStart.ATOMIC) {
throw TestException() // child coroutine fails
}
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException2() // but parent throws another exception while cleaning up
}
}
try {
mono.awaitSingle()
expectUnreached()
} catch (e: TestException) {
assertTrue(e.suppressed[0] is TestException2)
}
}
@Test
fun testUnhandledException() = runTest {
expect(1)
var subscription: Subscription? = null
val handler = BiFunction<Throwable, Any?, Throwable> { t, _ ->
assertTrue(t is TestException)
expect(5)
t
}
val mono = mono(currentDispatcher()) {
expect(4)
subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException() // would not be able to handle it since mono is disposed
}
}.subscriberContext { Context.of("reactor.onOperatorError.local", handler) }
mono.subscribe(object : Subscriber<Unit> {
override fun onSubscribe(s: Subscription) {
expect(2)
subscription = s
}
override fun onNext(t: Unit?) { expectUnreached() }
override fun onComplete() { expectUnreached() }
override fun onError(t: Throwable) { expectUnreached() }
})
expect(3)
yield() // run coroutine
finish(6)
}
@Test
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
}
@Test
fun testExceptionAfterCancellation() = runTest {
// Test exception is not reported to global handler
Flux
.interval(ofMillis(1))
.switchMap {
mono(coroutineContext) {
timeBomb().awaitFirst()
}
}
.onErrorReturn({
expect(1)
true
}, 42)
.blockLast()
finish(2)
}
private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
@Test
fun testLeakedException() = runBlocking {
// Test exception is not reported to global handler
val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
repeat(10000) {
combine(flow, flow) { _, _ -> Unit }
.catch {}
.collect { }
}
}
}