blob: 2283d45afcbef9bb881fc6dc5b08ddd7e59e50b8 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
import org.reactivestreams.*
import reactor.core.publisher.*
import java.time.Duration.*
class MonoTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("timer-", "parallel-")
}
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
"OK"
}
expect(2)
mono.subscribe { value ->
expect(5)
assertThat(value, IsEqual("OK"))
}
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicFailure() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
expect(2)
mono.subscribe({
expectUnreached()
}, { error ->
expect(5)
assertThat(error, IsInstanceOf(RuntimeException::class.java))
assertThat(error.message, IsEqual("OK"))
})
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicEmpty() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
null
}
expect(2)
mono.subscribe({}, { throw it }, {
expect(5)
})
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
val mono = mono(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
}
expect(2)
// nothing is called on a disposed mono
val sub = mono.subscribe({
expectUnreached()
}, {
expectUnreached()
})
expect(3)
yield() // to started coroutine
expect(5)
sub.dispose() // will cancel coroutine
yield()
finish(6)
}
@Test
fun testMonoNoWait() {
val mono = mono {
"OK"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testMonoAwait() = runBlocking {
assertEquals("OK", Mono.just("O").awaitSingle() + "K")
}
@Test
fun testMonoEmitAndAwait() {
val mono = mono {
Mono.just("O").awaitSingle() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testMonoWithDelay() {
val mono = mono {
Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testMonoException() {
val mono = mono {
Flux.just("O", "K").awaitSingle() + "K"
}
checkErroneous(mono) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitFirst() {
val mono = mono {
Flux.just("O", "#").awaitFirst() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitLast() {
val mono = mono {
Flux.just("#", "O").awaitLast() + "K"
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromFlux() {
val mono = mono {
try {
Flux.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
Flux.just(e.message!!).awaitLast() + "K"
}
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromCoroutine() {
val mono = mono<String> {
throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
}
checkErroneous(mono) {
assert(it is IllegalStateException)
assertEquals("OK", it.message)
}
}
@Test
fun testSuppressedException() = runTest {
val mono = mono(currentDispatcher()) {
launch(start = CoroutineStart.ATOMIC) {
throw TestException() // child coroutine fails
}
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException2() // but parent throws another exception while cleaning up
}
}
try {
mono.awaitSingle()
expectUnreached()
} catch (e: TestException) {
assertTrue(e.suppressed[0] is TestException2)
}
}
@Test
fun testUnhandledException() = runTest {
expect(1)
var subscription: Subscription? = null
val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t ->
assertTrue(t is TestException)
expect(5)
}) {
expect(4)
subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
try {
delay(Long.MAX_VALUE)
} finally {
throw TestException() // would not be able to handle it since mono is disposed
}
}
mono.subscribe(object : Subscriber<Unit> {
override fun onSubscribe(s: Subscription) {
expect(2)
subscription = s
}
override fun onNext(t: Unit?) { expectUnreached() }
override fun onComplete() { expectUnreached() }
override fun onError(t: Throwable) { expectUnreached() }
})
expect(3)
yield() // run coroutine
finish(6)
}
@Test
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
}
}