blob: 19a5b43f31e757b720c37305ff13d8392143e660 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.coroutines.*
/**
* An asynchronous data stream that sequentially emits values and completes normally or with an exception.
*
* _Intermediate operators_ on the flow such as [map], [filter], [take], [zip], etc are functions that are
* applied to the _upstream_ flow or flows and return a _downstream_ flow where further operators can be applied to.
* Intermediate operations do not execute any code in the flow and are not suspending functions themselves.
* They only set up a chain of operations for future execution and quickly return.
* This is known as a _cold flow_ property.
*
* _Terminal operators_ on the flow are either suspending functions such as [collect], [single], [reduce], [toList], etc.
* or [launchIn] operator that starts collection of the flow in the given scope.
* They are applied to the upstream flow and trigger execution of all operations.
* Execution of the flow is also called _collecting the flow_ and is always performed in a suspending manner
* without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed
* execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example:
*
* ```
* try {
* flow.collect { value ->
* println("Received $value")
* }
* } catch (e: Exception) {
* println("The flow has thrown an exception: $e")
* }
* ```
*
* By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine,
* with an exception for a few operations specifically designed to introduce concurrency into flow
* execution such as [buffer] and [flatMapMerge]. See their documentation for details.
*
* The `Flow` interface does not carry information whether a flow is a _cold_ stream that can be collected repeatedly and
* triggers execution of the same code every time it is collected, or if it is a _hot_ stream that emits different
* values from the same running source on each collection. Usually flows represent _cold_ streams, but
* there is a [SharedFlow] subtype that represents _hot_ streams. In addition to that, any flow can be turned
* into a _hot_ one by the [stateIn] and [shareIn] operators, or by converting the flow into a hot channel
* via the [produceIn] operator.
*
* ### Flow builders
*
* There are the following basic ways to create a flow:
*
* * [flowOf(...)][flowOf] functions to create a flow from a fixed set of values.
* * [asFlow()][asFlow] extension functions on various types to convert them into flows.
* * [flow { ... }][flow] builder function to construct arbitrary flows from
* sequential calls to [emit][FlowCollector.emit] function.
* * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from
* potentially concurrent calls to the [send][kotlinx.coroutines.channels.SendChannel.send] function.
* * [MutableStateFlow] and [MutableSharedFlow] define the corresponding constructor functions to create
* a _hot_ flow that can be directly updated.
*
* ### Flow constraints
*
* All implementations of the `Flow` interface must adhere to two key properties described in detail below:
*
* * Context preservation.
* * Exception transparency.
*
* These properties ensure the ability to perform local reasoning about the code with flows and modularize the code
* in such a way that upstream flow emitters can be developed separately from downstream flow collectors.
* A user of a flow does not need to be aware of implementation details of the upstream flows it uses.
*
* ### Context preservation
*
* The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks
* it downstream, thus making reasoning about the execution context of particular transformations or terminal
* operations trivial.
*
* There is only one way to change the context of a flow: the [flowOn][Flow.flowOn] operator
* that changes the upstream context ("everything above the `flowOn` operator").
* For additional information refer to its documentation.
*
* This reasoning can be demonstrated in practice:
*
* ```
* val flowA = flowOf(1, 2, 3)
* .map { it + 1 } // Will be executed in ctxA
* .flowOn(ctxA) // Changes the upstream context: flowOf and map
*
* // Now we have a context-preserving flow: it is executed somewhere but this information is encapsulated in the flow itself
*
* val filtered = flowA // ctxA is encapsulated in flowA
* .filter { it == 3 } // Pure operator without a context yet
*
* withContext(Dispatchers.Main) {
* // All non-encapsulated operators will be executed in Main: filter and single
* val result = filtered.single()
* myUi.text = result
* }
* ```
*
* From the implementation point of view, it means that all flow implementations should
* only emit from the same coroutine.
* This constraint is efficiently enforced by the default [flow] builder.
* The [flow] builder should be used if flow implementation does not start any coroutines.
* Its implementation prevents most of the development mistakes:
*
* ```
* val myFlow = flow {
* // GlobalScope.launch { // is prohibited
* // launch(Dispatchers.IO) { // is prohibited
* // withContext(CoroutineName("myFlow")) // is prohibited
* emit(1) // OK
* coroutineScope {
* emit(2) // OK -- still the same coroutine
* }
* }
* ```
*
* Use [channelFlow] if the collection and emission of a flow are to be separated into multiple coroutines.
* It encapsulates all the context preservation work and allows you to focus on your
* domain-specific problem, rather than invariant implementation details.
* It is possible to use any combination of coroutine builders from within [channelFlow].
*
* If you are looking for performance and are sure that no concurrent emits and context jumps will happen,
* the [flow] builder can be used alongside a [coroutineScope] or [supervisorScope] instead:
* - Scoped primitive should be used to provide a [CoroutineScope].
* - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or
* a builder argument (e.g. `launch(ctx)`).
* - Collecting another flow from a separate context is allowed, but it has the same effect as
* applying the [flowOn] operator to that flow, which is more efficient.
*
* ### Exception transparency
*
* Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint
* it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into
* `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with
* [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing
* all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
* throw any unhandled exceptions that occur in their code or in upstream flows, for example:
*
* ```
* flow { emitData() }
* .map { computeOne(it) }
* .catch { ... } // catches exceptions in emitData and computeOne
* .map { computeTwo(it) }
* .collect { process(it) } // throws exceptions from process and computeTwo
* ```
* The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block.
*
* Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
* by an upstream flow, limiting the ability of local reasoning about the code.
*
* Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value,
* if an exception has been thrown on previous attempt.
*
* ### Reactive streams
*
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
*
* ### Not stable for inheritance
*
* **The `Flow` interface is not stable for inheritance in 3rd party libraries**, as new methods
* might be added to this interface in the future, but is stable for use.
* Use the `flow { ... }` builder function to create an implementation.
*/
public interface Flow<out T> {
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
* This method should never be implemented or used directly.
*
* The only way to implement the `Flow` interface directly is to extend [AbstractFlow].
* To collect it into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
* should be used. Such limitation ensures that the context preservation property is not violated and prevents most
* of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.
*/
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
/**
* Base class for stateful implementations of `Flow`.
* It tracks all the properties required for context preservation and throws an [IllegalStateException]
* if any of the properties are violated.
*
* Example of the implementation:
*
* ```
* // list.asFlow() + collect counter
* class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() {
* private val collectedCounter = AtomicInteger(0)
*
* override suspend fun collectSafely(collector: FlowCollector<Int>) {
* collectedCounter.incrementAndGet() // Increment collected counter
* values.forEach { // Emit all the values
* collector.emit(it)
* }
* }
*
* fun toDiagnosticString(): String = "Flow with values $values was collected ${collectedCounter.value} times"
* }
* ```
*/
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
*
* A valid implementation of this method has the following constraints:
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
* The emission should happen in the context of the [collect] call.
* Please refer to the top-level [Flow] documentation for more details.
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not
* thread-safe by default.
* To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
*
* @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}