Guide to reactive streams with coroutines

This guide explains the key differences between Kotlin coroutines and reactive streams and shows how they can be used together for the greater good. Prior familiarity with the basic coroutine concepts that are covered in Guide to kotlinx.coroutines is not required, but is a big plus. If you are familiar with reactive streams, you may find this guide a better introduction into the world of coroutines.

There are several modules in kotlinx.coroutines project that are related to reactive streams:

This guide is mostly based on Reactive Streams specification and uses its Publisher interface with some examples based on RxJava 2.x, which implements reactive streams specification.

You are welcome to clone kotlinx.coroutines project from GitHub to your workstation in order to run all the presented examples. They are contained in reactive/kotlinx-coroutines-rx2/test/guide directory of the project.

Table of contents

Differences between reactive streams and channels

This section outlines key differences between reactive streams and coroutine-based channels.

Basics of iteration

The Channel is somewhat similar concept to the following reactive stream classes:

They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite, and all of them support backpressure.

However, the Channel always represents a hot stream of items, using Rx terminology. Elements are being sent into the channel by producer coroutines and are received from it by consumer coroutines. Every receive invocation consumes an element from the channel. Let us illustrate it with the following example:

fun main() = runBlocking<Unit> {
    // create a channel that produces numbers from 1 to 3 with 200ms delays between them
    val source = produce<Int> {
        println("Begin") // mark the beginning of this coroutine in output
        for (x in 1..3) {
            delay(200) // wait for 200ms
            send(x) // send number x to the channel
        }
    }
    // print elements from the source
    println("Elements:")
    source.consumeEach { // consume elements from it
        println(it)
    }
    // print elements from the source AGAIN
    println("Again:")
    source.consumeEach { // consume elements from it
        println(it)
    }
}

You can get full code here.

This code produces the following output:

Elements:
Begin
1
2
3
Again:

Notice how the “Begin” line was printed just once, because the produce coroutine builder, when it is executed, launches one coroutine to produce a stream of elements. All the produced elements are consumed with ReceiveChannel.consumeEach extension function. There is no way to receive the elements from this channel again. The channel is closed when the producer coroutine is over and an attempt to receive from it again cannot receive anything.

Let us rewrite this code using the publish coroutine builder from kotlinx-coroutines-reactive module instead of produce from kotlinx-coroutines-core module. The code stays the same, but where source used to have the ReceiveChannel type, it now has the reactive streams' Publisher type, and where consumeEach was used to consume elements from the channel, now collect is used to collect elements from the publisher.

fun main() = runBlocking<Unit> {
    // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
    val source = publish<Int> {
    //           ^^^^^^^  <---  Difference from the previous examples is here
        println("Begin") // mark the beginning of this coroutine in output
        for (x in 1..3) {
            delay(200) // wait for 200ms
            send(x) // send number x to the channel
        }
    }
    // print elements from the source
    println("Elements:")
    source.collect { // collect elements from it
        println(it)
    }
    // print elements from the source AGAIN
    println("Again:")
    source.collect { // collect elements from it
        println(it)
    }
}

You can get full code here.

Now the output of this code changes to:

Elements:
Begin
1
2
3
Again:
Begin
1
2
3

This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order functional concept. While the channel is a stream of elements, the reactive stream defines a recipe on how the stream of elements is produced. It becomes the actual stream of elements when collected. Each collector may receive the same or a different stream of elements, depending on how the corresponding implementation of Publisher works.

The publish coroutine builder used in the above example does not launch a coroutine, but every collect invocation does. There are two of them here and that is why we see “Begin” printed twice.

In Rx lingo, this kind of publisher is called cold. Many standard Rx operators produce cold streams, too. We can collect them from a coroutine, and every collector gets the same stream of elements.

Note that we can replicate the same behaviour that we saw with channels by using Rx publish operator and connect method with it.

Subscription and cancellation

In the second example from the previous section, source.collect { ... } was used to collect all elements. Instead, we can open a channel using openSubscription and iterate over it. In this way, we can have finer-grained control over our iteration (using break, for example), as shown below:

fun main() = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // a range of five numbers
        .doOnSubscribe { println("OnSubscribe") } // provide some insight
        .doOnComplete { println("OnComplete") }   // ...
        .doFinally { println("Finally") }         // ... into what's going on
    var cnt = 0 
    source.openSubscription().consume { // open channel to the source
        for (x in this) { // iterate over the channel to receive elements from it
            println(x)
            if (++cnt >= 3) break // break when 3 elements are printed
        }
        // Note: `consume` cancels the channel when this block of code is complete
    }
}

You can get full code here.

It produces the following output:

OnSubscribe
1
2
3
Finally

With an explicit openSubscription we should cancel the corresponding subscription to unsubscribe from the source, but there is no need to call cancel explicitly -- consume does that for us under the hood. The installed doFinally listener prints “Finally” to confirm that the subscription is actually being closed. Note that “OnComplete” is never printed because we did not consume all of the elements.

We do not need to use an explicit cancel either if we collect all the elements:

fun main() = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // a range of five numbers
        .doOnSubscribe { println("OnSubscribe") } // provide some insight
        .doOnComplete { println("OnComplete") }   // ...
        .doFinally { println("Finally") }         // ... into what's going on
    // collect the source fully
    source.collect { println(it) }
}

You can get full code here.

We get the following output:

OnSubscribe
1
2
3
OnComplete
Finally
4
5

Notice how “OnComplete” and “Finally” are printed before the lasts elements “4” and “5”. It happens because our main function in this example is a coroutine that we start with the runBlocking coroutine builder. Our main coroutine receives on the flowable using the source.collect { ... } expression. The main coroutine is suspended while it waits for the source to emit an item. When the last items are emitted by Flowable.range(1, 5) it resumes the main coroutine, which gets dispatched onto the main thread to print this last element at a later point in time, while the source completes and prints “Finally”.

Backpressure

Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can suspend and they provide a natural answer to handling backpressure.

In Rx Java 2.x, the backpressure-capable class is called Flowable. In the following example, we use rxFlowable coroutine builder from kotlinx-coroutines-rx2 module to define a flowable that sends three integers from 1 to 3. It prints a message to the output before invocation of the suspending send function, so that we can study how it operates.

The integers are generated in the context of the main thread, but the subscription is shifted to another thread using Rx observeOn operator with a buffer of size 1. The subscriber is slow. It takes 500 ms to process each item, which is simulated using Thread.sleep.

fun main() = runBlocking<Unit> { 
    // coroutine -- fast producer of elements in the context of the main thread
    val source = rxFlowable {
        for (x in 1..3) {
            send(x) // this is a suspending function
            println("Sent $x") // print after successfully sent item
        }
    }
    // subscribe on another thread with a slow subscriber using Rx
    source
        .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
        .doOnComplete { println("Complete") }
        .subscribe { x ->
            Thread.sleep(500) // 500ms to process each item
            println("Processed $x")
        }
    delay(2000) // suspend the main thread for a few seconds
}

You can get full code here.

The output of this code nicely illustrates how backpressure works with coroutines:

Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete

We see here how the producer coroutine puts the first element in the buffer and is suspended while trying to send another one. Only after the consumer processes the first item, the producer sends the second one and resumes, etc.

Rx Subject vs BroadcastChannel

RxJava has a concept of Subject which is an object that effectively broadcasts elements to all its subscribers. The matching concept in the coroutines world is called a BroadcastChannel. There is a variety of subjects in Rx with BehaviorSubject being the one used to manage state:

fun main() {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost
    // now subscribe to this subject and print everything
    subject.subscribe(System.out::println)
    subject.onNext("three")
    subject.onNext("four")
}

You can get full code here.

This code prints the current state of the subject on subscription and all its further updates:

two
three
four

You can subscribe to subjects from a coroutine just as with any other reactive stream:

fun main() = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // now launch a coroutine to print everything
    GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context
        subject.collect { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
}

You can get full code here.

The result is the same:

two
three
four

Here we use the Dispatchers.Unconfined coroutine context to launch a consuming coroutine with the same behavior as subscription in Rx. It basically means that the launched coroutine is going to be immediately executed in the same thread that is emitting elements. Contexts are covered in more details in a separate section.

The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates. A typical UI application does not need to react to every state change. Only the most recent state is relevant. A sequence of back-to-back updates to the application state needs to get reflected in UI only once, as soon as the UI thread is free. For the following example we are going to simulate this by launching a consuming coroutine in the context of the main thread and use the yield function to simulate a break in the sequence of updates and to release the main thread:

fun main() = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // now launch a coroutine to print the most recent update
    launch { // use the context of the main thread for a coroutine
        subject.collect { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
    yield() // yield the main thread to the launched coroutine <--- HERE
    subject.onComplete() // now complete the subject's sequence to cancel the consumer, too    
}

You can get full code here.

Now the coroutine processes (prints) only the most recent update:

four

The corresponding behavior in the pure coroutines world is implemented by ConflatedBroadcastChannel that provides the same logic on top of coroutine channels directly, without going through the bridge to the reactive streams:

fun main() = runBlocking<Unit> {
    val broadcast = ConflatedBroadcastChannel<String>()
    broadcast.offer("one")
    broadcast.offer("two")
    // now launch a coroutine to print the most recent update
    launch { // use the context of the main thread for a coroutine
        broadcast.consumeEach { println(it) }
    }
    broadcast.offer("three")
    broadcast.offer("four")
    yield() // yield the main thread to the launched coroutine
    broadcast.close() // now close the broadcast channel to cancel the consumer, too    
}

You can get full code here.

It produces the same output as the previous example based on BehaviorSubject:

four

Another implementation of BroadcastChannel is ArrayBroadcastChannel with an array-based buffer of a specified capacity. It can be created with BroadcastChannel(capacity). It delivers every event to every subscriber as soon as their corresponding subscriptions are opened. It corresponds to PublishSubject in Rx. The capacity of the buffer in the constructor of ArrayBroadcastChannel controls the numbers of elements that can be sent before the sender is suspended waiting for a receiver to receive those elements.

Operators

Full-featured reactive stream libraries, like Rx, come with a very large set of operators to create, transform, combine and otherwise process the corresponding streams. Creating your own operators with support for back-pressure is notoriously difficult.

Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, but processing streams of elements is extremely simple and back-pressure is supported automatically without you having to explicitly think about it.

This section shows a coroutine-based implementation of several reactive stream operators.

Range

Let‘s roll out own implementation of range operator for the reactive streams’ Publisher interface. The asynchronous clean-slate implementation of this operator for reactive streams is explained in this blog post. It takes a lot of code. Here is the corresponding code with coroutines:

fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) send(x)
}

Here, CoroutineScope and context are used instead of an Executor and all the backpressure aspects are taken care of by the coroutines machinery. Note that this implementation depends only on the small reactive streams library that defines the Publisher interface and its friends.

Using it from a coroutine is straightforward:

fun main() = runBlocking<Unit> {
    // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default
    range(Dispatchers.Default, 1, 5).collect { println(it) }
}

You can get full code here.

The result of this code is quite expected:

1
2
3
4
5

Fused filter-map hybrid

Reactive operators like filter and map are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them into the single fusedFilterMap operator:

fun <T, R> Publisher<T>.fusedFilterMap(
    context: CoroutineContext,   // the context to execute this coroutine in
    predicate: (T) -> Boolean,   // the filter predicate
    mapper: (T) -> R             // the mapper function
) = publish<R>(context) {
    collect {                    // collect the source stream 
        if (predicate(it))       // filter part
            send(mapper(it))     // map part
    }        
}

Using range from the previous example we can test our fusedFilterMap by filtering for even numbers and mapping them to strings:

fun main() = runBlocking<Unit> {
   range(1, 5)
       .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
       .collect { println(it) } // print all the resulting strings
}

You can get full code here.

It is not hard to see that the result is going to be:

2 is even
4 is even

Take until

Let's implement our own version of takeUntil operator. It is quite tricky as subscriptions to two streams need to be tracked and managed. We need to relay all the elements from the source stream until the other stream either completes or emits anything. However, we have the select expression to rescue us in the coroutines implementation:

fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
    this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
        val current = this
        other.openSubscription().consume { // explicitly open channel to Publisher<U>
            val other = this
            whileSelect {
                other.onReceive { false }            // bail out on any received element from `other`
                current.onReceive { send(it); true } // resend element from this channel and continue
            }
        }
    }
}

This code is using whileSelect as a nicer shortcut to while(select{...}) {} loop and Kotlin's consume expressions to close the channels on exit, which unsubscribes from the corresponding publishers.

The following hand-written combination of range with interval is used for testing. It is coded using a publish coroutine builder (its pure-Rx implementation is shown in later sections):

fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
    for (x in start until start + count) { 
        delay(time) // wait before sending each number
        send(x)
    }
}

The following code shows how takeUntil works:

fun main() = runBlocking<Unit> {
    val slowNums = rangeWithInterval(200, 1, 10)         // numbers with 200ms interval
    val stop = rangeWithInterval(500, 1, 10)             // the first one after 500ms
    slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
}

You can get full code here.

Producing

1
2

Merge

There are always at least two ways for processing multiple streams of data with coroutines. One way involving select was shown in the previous example. The other way is just to launch multiple coroutines. Let us implement merge operator using the latter approach:

fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
  collect { pub -> // for each publisher collected
      launch {  // launch a child coroutine
          pub.collect { send(it) } // resend all element from this publisher
      }
  }
}

Notice that all the coroutines that are being launched here are the children of the publish coroutine and will get cancelled when the publish coroutine is cancelled or is otherwise completed. Moreover, since the parent coroutine waits until all the children are complete, this implementation fully merges all the received streams.

For a test, let us start with the rangeWithInterval function from the previous example and write a producer that sends its results twice with some delay:

fun CoroutineScope.testPub() = publish<Publisher<Int>> {
    send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms 
    delay(100) // wait for 100 ms
    send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
    delay(1100) // wait for 1.1s - done in 1.2 sec after start
}

The test code is to use merge on testPub and to display the results:

fun main() = runBlocking<Unit> {
    testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream
}

You can get full code here.

And the results should be:

1
2
11
3
4
12
13

Coroutine context

All the example operators that are shown in the previous section have an explicit CoroutineContext parameter. In the Rx world it roughly corresponds to a Scheduler.

Threads with Rx

The following example shows the basics of threading context management with Rx. Here rangeWithIntervalRx is an implementation of rangeWithInterval function using Rx zip, range, and interval operators.

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

You can get full code here.

We are explicitly passing the Schedulers.computation() scheduler to our rangeWithIntervalRx operator and it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one:

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

Threads with coroutines

In the world of coroutines Schedulers.computation() roughly corresponds to Dispatchers.Default, so the previous example is similar to the following one:

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // wait before sending each number
        send(x)
    }
}

fun main() {
    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

You can get full code here.

The produced output is going to be similar to:

1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1

Here we've used Rx subscribe operator that does not have its own scheduler and operates on the same thread that the publisher -- on a default shared pool of threads in this example.

Rx observeOn

In Rx you use special operators to modify the threading context for operations in the chain. You can find some good guides about them, if you are not familiar.

For example, there is observeOn operator. Let us modify the previous example to observe using Schedulers.computation():

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // wait before sending each number
        send(x)
    }
}

fun main() {
    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
        .observeOn(Schedulers.computation())                           // <-- THIS LINE IS ADDED
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

You can get full code here.

Here is the difference in output, notice “RxComputationThreadPool”:

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

Coroutine context to rule them all

A coroutine is always working in some context. For example, let us start a coroutine in the main thread with runBlocking and iterate over the result of the Rx version of rangeWithIntervalRx operator, instead of using Rx subscribe operator:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() = runBlocking<Unit> {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .collect { println("$it on thread ${Thread.currentThread().name}") }
}

You can get full code here.

The resulting messages are going to be printed in the main thread:

1 on thread main
2 on thread main
3 on thread main

Unconfined context

Most Rx operators do not have any specific thread (scheduler) associated with them and are working in whatever thread they happen to be invoked. We've seen it in the example with the subscribe operator in the threads with Rx section.

In the world of coroutines, Dispatchers.Unconfined context serves a similar role. Let us modify our previous example, but instead of iterating over the source Flowable from the runBlocking coroutine that is confined to the main thread, we launch a new coroutine in the Dispatchers.Unconfined context, while the main coroutine simply waits for its completion using Job.join:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool)
        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
            .collect { println("$it on thread ${Thread.currentThread().name}") }
    }
    job.join() // wait for our coroutine to complete
}

You can get full code here.

Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just like our initial example using the Rx subscribe operator.

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

Note that the Dispatchers.Unconfined context should be used with care. It may improve the overall performance on certain tests, due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks and makes it harder to reason about asynchronicity of the code that is using it.

If a coroutine sends an element to a channel, then the thread that invoked the send may start executing the code of the coroutine with the Dispatchers.Unconfined dispatcher. The original producer coroutine that invoked send is paused until the unconfined consumer coroutine hits its next suspension point. This is very similar to a lock-step single-threaded onNext execution in the Rx world in the absense of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines, where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines.