blob: 340d8e313ebec73632f6af5090e26ba404691dd7 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("unused")
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
/**
* Name of the property that defines the value of [DEFAULT_CONCURRENCY].
*/
@FlowPreview
public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"
/**
* Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
* It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
*/
@FlowPreview
public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
16, 1, Int.MAX_VALUE
)
/**
* Transforms elements emitted by the original flow by applying [transform], that returns another flow,
* and then concatenating and flattening these flows.
*
* This method is is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat].
*
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*/
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()
/**
* Transforms elements emitted by the original flow by applying [transform], that returns another flow,
* and then merging and flattening these flows.
*
* This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency]
* limit on the number of concurrently collected flows.
* It is a shortcut for `map(transform).flattenMerge(concurrency)`.
* See [flattenMerge] for details.
*
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
* at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
*/
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
/**
* Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
* This method is conceptually identical to `flattenMerge(concurrency = 1)` but has faster implementation.
*
* Inner flows are collected by this operator *sequentially*.
*/
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}
/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
/*
* This is a fuseable implementation of the following operator:
* channelFlow {
* forEach { flow ->
* launch {
* flow.collect { send(it) }
* }
* }
* }
*/
return ChannelLimitedFlowMerge(this)
}
/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
/**
* Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
* concurrently collected flows.
*
* If [concurrency] is more than 1, then inner flows are be collected by this operator *concurrently*.
* With `concurrency == 1` this operator is identical to [flattenConcat].
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
* at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
*/
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
/**
* Returns a flow that produces element by [transform] function every time the original flow emits a value.
* When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.transformLatest { value ->
* emit(value)
* delay(200)
* emit(value + "_last")
* }
* ```
* produces `a b b_last`.
*
* This operator is [buffered][buffer] by default
* and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
ChannelFlowTransformLatest(transform, this)
/**
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
* When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.flatMapLatest { value ->
* flow {
* emit(value)
* delay(200)
* emit(value + "_last")
* }
* }
* ```
* produces `a b b_last`
*
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
/**
* Returns a flow that emits elements from the original flow transformed by [transform] function.
* When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.mapLatest { value ->
* println("Started computing $value")
* delay(200)
* "Computed $value"
* }
* ```
* will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
*
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
transformLatest { emit(transform(it)) }