blob: 608221e09f9f8c3f569c840fcad2008be48d847d [file] [log] [blame]
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* Catches exceptions in the flow completion and calls a specified [action] with
* the caught exception. This operator is *transparent* to exceptions that occur
* in downstream flow and does not catch exceptions that are thrown to cancel the flow.
* For example:
* ```
* flow { emitData() }
* .map { computeOne(it) }
* .catch { ... } // catches exceptions in emitData and computeOne
* .map { computeTwo(it) }
* .collect { process(it) } // throws exceptions from process and computeTwo
* ```
* Conceptually, the action of `catch` operator is similar to wrapping the code of upstream flows with
* `try { ... } catch (e: Throwable) { action(e) }`.
* Any exception in the [action] code itself proceeds downstream where it can be
* caught by further `catch` operators if needed. If a particular exception does not need to be
* caught it can be rethrown from the action of `catch` operator. For example:
* ```
* flow.catch { e ->
* if (e !is IOException) throw e // rethrow all but IOException
* // e is IOException here
* ...
* }
* ```
* The [action] code has [FlowCollector] as a receiver and can [emit][FlowCollector.emit] values downstream.
* For example, caught exception can be replaced with some wrapper value for errors:
* ```
* flow.catch { e -> emit(ErrorWrapperValue(e)) }
* ```
* The [action] can also use [emitAll] to fallback on some other flow in case of an error. However, to
* retry an original flow use [retryWhen] operator that can retry the flow multiple times without
* introducing ever-growing stack of suspending calls.
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
flow {
val exception = catchImpl(this)
if (exception != null) action(exception)
* @suppress **Deprecated**: Use `(Throwable) -> Boolean` functional type
level = DeprecationLevel.ERROR,
message = "Use (Throwable) -> Boolean functional type",
replaceWith = ReplaceWith("(Throwable) -> Boolean")
public typealias ExceptionPredicate = (Throwable) -> Boolean
* Switches to the [fallback] flow if the original flow throws an exception that matches the [predicate].
* Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
* @suppress **Deprecated**: Use `catch { e -> if (predicate(e)) emitAll(fallback) else throw e }`
level = DeprecationLevel.ERROR,
message = "Use catch { e -> if (predicate(e)) emitAll(fallback) else throw e }",
replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emitAll(fallback) else throw e }")
public fun <T> Flow<T>.onErrorCollect(
fallback: Flow<T>,
predicate: (Throwable) -> Boolean = { true }
): Flow<T> = catch { e ->
if (!predicate(e)) throw e
* Retries collection of the given flow up to [retries] times when an exception that matches the
* given [predicate] occurs in the upstream flow. This operator is *transparent* to exceptions that occur
* in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
* See [catch] for details on how exceptions are caught in flows.
* The default value of [retries] parameter is [Long.MAX_VALUE]. This value effectively means to retry forever.
* This operator is a shorthand for the following code (see [retryWhen]). Note that `attempt` is checked first
* and [predicate] is not called when it reaches the given number of [retries]:
* ```
* retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
* ```
* The [predicate] parameter is always true by default. The [predicate] is a suspending function,
* so it can be also used to introduce delay before retry, for example:
* ```
* flow.retry(3) { e ->
* // retry on any IOException but also introduce delay if retrying
* (e is IOException).also { if (it) delay(1000) }
* }
* ```
* @throws IllegalArgumentException when [retries] is not positive.
public fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(retries > 0) { "Expected positive amount of retries, but had $retries" }
return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with retries: Int preview version")
public fun <T> Flow<T>.retry(
retries: Int = Int.MAX_VALUE,
predicate: (Throwable) -> Boolean = { true }
): Flow<T> {
require(retries > 0) { "Expected positive amount of retries, but had $retries" }
return retryWhen { cause, attempt -> predicate(cause) && attempt < retries }
* Retries collection of the given flow when an exception occurs in the upstream flow and the
* [predicate] returns true. The predicate also receives an `attempt` number as parameter,
* starting from zero on the initial call. This operator is *transparent* to exceptions that occur
* in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
* For example, the following call retries the flow forever if the error is caused by `IOException`, but
* stops after 3 retries on any other exception:
* ```
* flow.retryWhen { cause, attempt -> cause is IOException || attempt < 3 }
* ```
* To implement a simple retry logic with a limit on the number of retries use [retry] operator.
* Similarly to [catch] operator, the [predicate] code has [FlowCollector] as a receiver and can
* [emit][FlowCollector.emit] values downstream.
* The [predicate] is a suspending function, so it can be used to introduce delay before retry, for example:
* ```
* flow.retryWhen { cause, attempt ->
* if (cause is IOException) { // retry on IOException
* emit(RetryWrapperValue(e))
* delay(1000) // delay for one second before retry
* true
* } else { // do not retry otherwise
* false
* }
* }
* ```
* See [catch] for more details.
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
flow {
var attempt = 0L
var shallRetry: Boolean
do {
shallRetry = false
val cause = catchImpl(this)
if (cause != null) {
if (predicate(cause, attempt)) {
shallRetry = true
} else {
throw cause
} while (shallRetry)
// Return exception from upstream or null
internal suspend fun <T> Flow<T>.catchImpl(
collector: FlowCollector<T>
): Throwable? {
var fromDownstream: Throwable? = null
try {
collect {
try {
} catch (e: Throwable) {
fromDownstream = e
throw e
} catch (e: Throwable) {
* First check ensures that we catch an original exception, not one rethrown by an operator.
* Seconds check ignores cancellation causes, they cannot be caught.
if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
throw e // Rethrow exceptions from downstream and cancellation causes
} else {
return e // not from downstream
return null
private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {
val job = coroutineContext[Job]
if (job == null || !job.isCancelled) return false
return isSameExceptionAs(job.getCancellationException())
private fun Throwable.isSameExceptionAs(other: Throwable?): Boolean =
other != null && unwrap(other) == unwrap(this)