Table of contents

Asynchronous Flow

Suspending functions asynchronously return a single value, but how can you return multiple asynchronously computed values? That is what Kotlin Flows are for.

Representing multiple values

Multiple values can be represented in Kotlin using collections. For example, we can have a function foo() that returns a List of three numbers and print them all using forEach:

fun foo(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    foo().forEach { value -> println(value) } 
}

You can get full code here.

This code outputs:

1
2
3

Sequences

If the numbers are computed with some CPU-consuming blocking code (each computation taking 100ms) then we can represent the numbers using a Sequence:

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    foo().forEach { value -> println(value) } 
}

You can get full code here.

This code outputs the same numbers, but it waits 100ms before printing each one.

Suspending functions

However, this computation blocks the main thread that is running the code. When those values are computed by an asynchronous code we can mark function foo with a suspend modifier, so that it can perform its work without blocking and return the result as a list:

import kotlinx.coroutines.*                 
                           
//sampleStart
suspend fun foo(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}
//sampleEnd

You can get full code here.

This code prints the numbers after waiting for a second.

Flows

Using List<Int> result type we can only return all the values at once. To represent the stream of values that are being asynchronously computed we can use Flow<Int> type similarly to the Sequence<Int> type for synchronously computed values:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart               
fun foo(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to see that the main thread is not blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    foo().collect { value -> println(value) } 
}
//sampleEnd

You can get full code here.

This code waits 100ms before printing each number without blocking the main thread. This is verified by printing “I'm not blocked” every 100ms from a separate coroutine that is running in the main thread:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

Notice the following differences of the code with the Flow from the earlier examples:

  • A builder function for Flow type is called flow.
  • Code inside the flow { ... } builder block can suspend.
  • The function foo() is no longer marked with suspend modifier.
  • Values are emitted from the flow using emit function.
  • Values are collected from the flow using collect function.

You can replace delay with Thread.sleep in the body of foo's flow { ... } and see that the main thread is blocked in this case.

Flows are cold

Flows are cold streams similarly to sequences — the code inside a flow builder does not run until the flow is collected. This becomes clear in the following example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart      
fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
//sampleEnd

You can get full code here.

Which prints:

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

That is a key reason why the foo() function (which returns a flow) is not marked with suspend modifier. By itself, foo() returns quickly and does not wait for anything. The flow starts every time it is collected, that is why we see that when we call collect again, we get “Flow started” printed again.

Flow cancellation

Flow adheres to general cooperative cancellation of coroutines. However, flow infrastructure does not introduce additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay) and cannot be cancelled otherwise.

The following example shows how the flow gets cancelled on timeout when running in withTimeoutOrNull block and stops executing its code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
fun foo(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        foo().collect { value -> println(value) } 
    }
    println("Done")
}
//sampleEnd

You can get full code here.

Notice how only two numbers get emitted by the flow in foo() function, producing the following output:

Emitting 1
1
Emitting 2
2
Done

Flow builders

The flow { ... } builder from the previous examples is the most basic one. There are other builders for convenient declaration of flows:

  • flowOf builder that defines a flow emitting a fixed set of values.
  • Various collections and sequences can be converted to flows using .asFlow() extension functions.

Thus, the example that prints numbers from 1 to 3 from a flow can be written as:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) }
//sampleEnd 
}

You can get full code here.

Intermediate flow operators

Flows can be transformed with operators similarly to collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow. These operators are cold, just like flows are. A call to such an operator is not a suspending function itself. It works quickly, returning the definition of a new transformed flow.

The basic operators have familiar names like map and filter. The important difference from sequences is that blocks of code inside those operators can call suspending functions.

For example, a flow of incoming requests can be mapped to results with the map operator even when performing a request is a long-running operation that is implemented by a suspending function:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart           
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
//sampleEnd

You can get full code here.

It produces the following three lines, each line appearing after a second:

response 1
response 2
response 3

Transform operator

Among the flow transformation operators, the most general one is called transform. It can be used to imitate simple transformations like map and filter as well as implement more complex transformations. Using transform operator, you can emit arbitrary values an arbitrary number of times.

For example, using transform we can emit a string before performing a long-running asynchronous request and follow it with a response:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
//sampleStart
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
//sampleEnd
}

You can get full code here.

The output of this code is:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operators

Size-limiting intermediate operators like take cancel the execution of the flow when the corresponding limit is reached. Cancellation in coroutines is always performed by throwing an exception so that all the resource-management functions (like try { ... } finally { ... } blocks) operate normally in case of cancellation:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}            
//sampleEnd

You can get full code here.

The output of this code clearly shows that execution of the flow { ... } body in numbers() function had stopped after emitting the second number:

1
2
Finally in numbers

Terminal flow operators

Terminal operators on flows are suspending functions that start a collection of the flow. The collect operator is the most basic one, but there are other terminal operators for convenience:

  • Conversion to various collections like toList and toSet.
  • Operators to get the first value and to ensure that a flow emits a single value.
  • Reducing a flow to a value with reduce and fold.

For example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart         
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
//sampleEnd     
}

You can get full code here.

Prints a single number:

55

Flows are sequential

Each individual collection of a flow is performed sequentially unless special operators that operate on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. No new coroutines are launched by default. Each emitted value is processed by all intermediate operators from upstream to downstream and is delivered to the terminal operator after that.

See the following example that filters even integers and maps them to strings:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart         
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }    
//sampleEnd                  
}

You can get full code here.

Producing:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow context

Collection of a flow always happens in the context of the calling coroutine. For example, if there is a foo flow, then the following code runs in the context specified by the author of this code, regardless of implementation details of the foo flow:

withContext(context) {
    foo.collect { value ->
        println(value) // run in the specified context 
    }
}

This property of a flow is called context preservation.

So, by default, code in the flow { ... } builder runs in the context that is provided by a collector of the corresponding flow. For example, consider the implementation of foo that prints the thread it is called on and emits three numbers:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
//sampleStart
fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    foo().collect { value -> log("Collected $value") } 
}            
//sampleEnd

You can get full code here.

Running this code produces:

[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

Since foo().collect is called from the main thread, the body of foo's flow is also called in the main thread. This is a perfect default for fast-running or asynchronous code that does not care about the execution context and does not block the caller.

Wrong emission withContext

However, the long-running CPU-consuming code might need to be executed in the context of Dispatchers.Default and UI-updating code might need to be executed in the context of Dispatchers.Main. Usually, withContext is used to change the context in code using Kotlin coroutines, but code in the flow { ... } builder has to honor context preservation property and is not allowed to emit from a different context.

Try running the following code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
                      
//sampleStart
fun foo(): Flow<Int> = flow {
    // WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> println(value) } 
}            
//sampleEnd

You can get full code here.

This code produces the following exception:

Note that we had to use a fully qualified name of kotlinx.coroutines.withContext function in this example to demonstrate this exception. A short name of withContext would have resolved to a special stub function that produces compilation error to prevent us from running into this problem.

flowOn operator

The exception refers to flowOn function that shall be used to change the context of flow emission. The correct way of changing the context of a flow is shown in the below example, which also prints names of the corresponding threads to show how it all works:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
//sampleStart
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        log("Collected $value") 
    } 
}            
//sampleEnd

You can get full code here.

Notice how flow { ... } works in the background thread, while collection happens in the main thread:

Another observation here is that flowOn operator had changed the default sequential nature of the flow. Now collection happens in one coroutine (“coroutine#1”) and emission happens in another coroutine (“coroutine#2”) that is running in another thread concurrently with collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.

Buffering

Running different parts of a flow in different coroutines can be helpful from the standpoint of overall time it takes to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when emission by foo() flow is slow, taking 100 ms to produce an element; and collector is also slow, taking 300 ms to process an element. Let us see how long does it take to collect such a flow with three numbers:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

//sampleStart
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}
//sampleEnd

You can get full code here.

It produces something like this, the whole collection taking around 1200 ms (three numbers times 400 ms each):

1
2
3
Collected in 1220 ms

We can use buffer operator on a flow to run emitting code of foo() concurrently with collecting code, as opposed to running them sequentially:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}

You can get full code here.

It produces the same numbers faster, as we have effectively created a processing pipeline, only having to wait 100 ms for the first number and then spending only 300 ms to process each number. This way it takes around 1000 ms to run:

1
2
3
Collected in 1071 ms

Note that flowOn operator uses the same buffering mechanism when it has to change CoroutineDispatcher, but here we explicitly request buffering without changing execution context.

Conflation

When flow represents partial results of some operation or operation status updates, it may not be necessary to process each value, but only to process the most recent ones. In this case, conflate operator can be used to skip intermediate values when a collector is too slow to process them. Building on the previous example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}

You can get full code here.

We see that while the first number was being processed the second and the third ones were already produced, so the second one was conflated and only the most recent (the third one) was delivered to the collector:

1
3
Collected in 758 ms

Processing the latest value

Conflation is one way to speed up processing when both emitter and collector are slow. It does that by dropping emitted values. The other way is to cancel slow collector and restart it every time a new value is emitted. There is a family of xxxLatest operators that perform the same essential logic of xxx operator, but cancel the code in their block on a new value. Let us change the previous example from conflate to collectLatest:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}

You can get full code here.

Since the body of collectLatest takes 300 ms, but new values are emitted every 100 ms, we see that the block is run on every value, but completes only for the last value:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

Composing multiple flows

There are several ways to compose multiple flows.

Zip

Similarly to Sequence.zip extension function in the Kotlin standard library, flows have zip operator that combines the corresponding values of two flows:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 
//sampleStart                                                                           
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
//sampleEnd
}

You can get full code here.

This example prints:

1 -> one
2 -> two
3 -> three

Combine

When flow represents the most recent value of some variable or operation (see also a related section on conflation) it might be needed to perform a computation that depends on the most recent values of the corresponding flows and to recompute it whenever any of upstream flows emit a value. The corresponding family of operators is called combine.

For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, then zipping them using zip operator would still produce the same result, albeit results are going to be printed every 400 ms:

We use onEach intermediate operator in this example to delay each element and thus make the code that emits sample flows more declarative and shorter.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 
//sampleStart                                                                           
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}

You can get full code here.

However, using combine operator here instead of zip:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 
//sampleStart                                                                           
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}

You can get full code here.

We get quite a different output, where a line is printed at each emission from either nums or strs flows:

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

Flattening flows

Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where each value triggers a request for another sequence of values. For example, we can have the following function that returns a flow of two strings 500 ms apart:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

Now if we have a flow of three integers and call requestFlow for each of them like this:

(1..3).asFlow().map { requestFlow(it) }

Then we end up with a flow of flows (Flow<Flow<String>>) that needs to be flattened into a single flow for further processing. Collections and sequences have flatten and flatMap operators for this purpose. However, the asynchronous nature of flows calls for different modes of flattening thus there is a family of flattening operators on flows.

flatMapConcat

Concatenating mode is implemented by flatMapConcat and flattenConcat operators. They are the most direct analogues of the corresponding sequence operators. They wait for inner flow to complete before starting to collect the next one as the following example shows:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}

You can get full code here.

The sequential nature of flatMapConcat is clearly seen in the output:

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

flatMapMerge

Another flattening mode is to concurrently collect all the incoming flows and merge their values into a single flow so that values are emitted as soon as possible. It is implemented by flatMapMerge and flattenMerge operators. They both accept an optional concurrency parameter that limits the number of concurrent flows that are collected at the same time (it is equal to DEFAULT_CONCURRENCY by default).

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}

You can get full code here.

The concurrent nature of flatMapMerge is obvious:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

Note that flatMapMerge call its block of code ({ requestFlow(it) } in this example) sequentially, but collects the resulting flows concurrently, so it is equivalent to performing a sequential map { requestFlow(it) } first and then calling flattenMerge on the result.

flatMapLatest

In a similar way to collectLatest operator that was shown in “Processing the latest value” section, there is the corresponding “Latest” flattening mode where collection of the previous flow is cancelled as soon as new flow is emitted. It is implemented by flatMapLatest operator.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
//sampleEnd
}

You can get full code here.

The output of this example speaks for the way flatMapLatest works:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

Note that flatMapLatest cancels all the code in its block ({ requestFlow(it) } in this example) on a new value. It makes no difference in this particular example, because the call to requestFlow itself is fast, not-suspending, and cannot be cancelled. However, it would show up if we were to use suspending functions like delay in there.

Flow exceptions

Flow collection can complete with an exception when emitter or any code inside any of the operators throw an exception. There are several ways to handle these exceptions.

Collector try and catch

A collector can use Kotlin's try/catch block to handle exceptions:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
//sampleEnd

You can get full code here.

This code successfully catches an exception in collect terminal operator and, as you can see, no more values are emitted after that:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

Everything is caught

The previous example actually catches any exception happening in emitter or in any intermediate or terminal operators. For example, let us change the code so that emitted values are mapped to strings, but the corresponding code produces an exception:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            
//sampleEnd

You can get full code here.

This exception is still caught and collection is stopped:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

Exception transparency

But how can code of emitter encapsulate its exception handling behavior?

Flows must be transparent to exceptions and it is a violation of exception transparency to emit values in the flow { ... } builder from inside of try/catch block. This guarantees that a collector throwing an exception can always catch it using try/catch as in the previous example.

The emitter can use catch operator that preserves this exception transparency and allows encapsulation of its exception handling. The body of the catch operator can analyze an exception and react to it in different ways depending on which exception was caught:

  • Exceptions can be rethrown using throw.
  • Exceptions can be turned into emission of values using emit from the body of catch.
  • Exceptions can be ignored, logged, or processed by some other code.

For example, let us emit a text on catching an exception:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
//sampleStart
    foo()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
//sampleEnd
}            

You can get full code here.

The output of the example is the same, even though we do not have try/catch around the code anymore.

Transparent catch

The catch intermediate operator, honoring exception transparency, catches only upstream exceptions (that is an exception from all the operators above catch, but not below it). If the block in collect { ... } (placed below catch) throws an exception then it escapes:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}            
//sampleEnd

You can get full code here.

The “Caught ...” message is not printed despite the catch operator:

Catching declaratively

We can combine a declarative nature of catch operator with a desire to handle all exceptions by moving the body of collect operator into onEach and putting it before the catch operator. Collection of this flow must be triggered by a call to collect() without parameters:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
//sampleStart
    foo()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
        .catch { e -> println("Caught $e") }
        .collect()
//sampleEnd
}            

You can get full code here.

Now we can see that “Caught ...” message is printed and thus we can catch all exceptions without explicitly using a try/catch block:

Flow completion

When flow collection completes (normally or exceptionally) it may be needed to execute some action. As you might have already noticed, it also can be done in two ways: imperative and declarative.

Imperative finally block

In addition to try/catch, a collector can also use finally block to execute an action upon collect completion.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}            
//sampleEnd

You can get full code here.

This code prints three numbers produced by the foo() flow followed by “Done” string:

1
2
3
Done

Declarative handling

For declarative approach, flow has onCompletion intermediate operator that is invoked when the flow is completely collected.

The previous example can be rewritten using onCompletion operator and produces the same output:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
//sampleStart
    foo()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
//sampleEnd
}            

You can get full code here.

The key advantage of onCompletion is a nullable Throwable parameter of the lambda that can be used to determine whether flow collection was completed normally or exceptionally. In the following example foo() flow throws exception after emitting number 1:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            
//sampleEnd

You can get full code here.

As you may expect, it prints:

1
Flow completed exceptionally
Caught exception

onCompletion operator, unlike catch, does not handle the exception. As we can see from the above example code, the exception still flows downstream. It will be delivered to further onCompletion operators and can be handled with catch operator.

Upstream exceptions only

Just like catch operator, onCompletion sees only exception coming from upstream and does not see downstream exceptions. For example, run the following code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
//sampleEnd

You can get full code here.

And you can see the completion cause is null, yet collection failed with exception:

1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2

Imperative versus declarative

Now we know how to collect flow, handle its completion and exceptions in both imperative and declarative ways. The natural question here is which approach should be preferred and why. As a library, we do not advocate for any particular approach and believe that both options are valid and should be selected according to your own preferences and code style.

Launching flow

It is convenient to use flows to represent asynchronous events that are coming from some source. In this case, we need an analogue of addEventListener function that registers a piece of code with a reaction on incoming events and continues further work. The onEach operator can serve this role. However, onEach is an intermediate operator. We also need a terminal operator to collect the flow. Otherwise, just calling onEach has no effect.

If we use collect terminal operator after onEach, then code after it waits until the flow is collected:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}            
//sampleEnd

You can get full code here.

As you can see, it prints:

Event: 1
Event: 2
Event: 3
Done

Here launchIn terminal operator comes in handy. Replacing collect with launchIn we can launch collection of the flow in a separate coroutine, so that execution of further code immediately continues:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

//sampleStart
fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}            
//sampleEnd

You can get full code here.

It prints:

Done
Event: 1
Event: 2
Event: 3

The required parameter to launchIn must specify a CoroutineScope in which the coroutine to collect the flow is launched. In the above example this scope comes from runBlocking coroutine builder, so while the flow is running this runBlocking scope waits for completion of its child coroutine and keeps the main function from returning and terminating this example.

In real applications a scope is going to come from some entity with a limited lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling collection of the corresponding flow. This way the pair of onEach { ... }.launchIn(scope) works like addEventListener. However, there is no need for the corresponding removeEventListener function, as cancellation and structured concurrency serve this purpose.

Note, that launchIn also returns a Job which can be used to cancel the corresponding flow collection coroutine only without cancelling the whole scope or to join it.