blob: c95b4be9407cbe801c0916d07e4f4e050f8e5951 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.time.*
/* Scaffolding for Knit code examples
<!--- TEST_NAME FlowDelayTest -->
<!--- PREFIX .*-duration-.*
@file:OptIn(ExperimentalTime::class)
----- INCLUDE .*-duration-.*
import kotlin.time.*
----- INCLUDE .*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
----- SUFFIX .*
.toList().joinToString().let { println(it) } }
-->
*/
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce(1000)
* ```
* <!--- KNIT example-delay-01.kt -->
*
* produces the following emissions
*
* ```text
* 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
}
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0L
* } else {
* 1000L
* }
* }
* ```
* <!--- KNIT example-delay-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*
* @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
*/
@FlowPreview
@OptIn(kotlin.experimental.ExperimentalTypeInference::class)
@OverloadResolutionByLambdaReturnType
public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
debounceInternal(timeoutMillis)
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce(1000.milliseconds)
* ```
* <!--- KNIT example-delay-duration-01.kt -->
*
* produces the following emissions
*
* ```text
* 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] milliseconds.
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> =
debounce(timeout.toDelayMillis())
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0.milliseconds
* } else {
* 1000.milliseconds
* }
* }
* ```
* <!--- KNIT example-delay-duration-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] unit.
*
* @param timeout [T] is the emitted value and the return value is timeout in [Duration].
*/
@ExperimentalTime
@FlowPreview
@JvmName("debounceDuration")
@OptIn(kotlin.experimental.ExperimentalTypeInference::class)
@OverloadResolutionByLambdaReturnType
public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> =
debounceInternal { emittedItem ->
timeout(emittedItem).toDelayMillis()
}
private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Produce the values using the default (rendezvous) channel
// Note: the actual type is Any, KT-30796
val values = produce<Any?> {
collect { value -> send(value ?: NULL) }
}
// Now consume the values
var lastValue: Any? = null
while (lastValue !== DONE) {
var timeoutMillis = 0L // will be always computed when lastValue != null
// Compute timeout for this value
if (lastValue != null) {
timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue))
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) {
downstream.emit(NULL.unbox(lastValue))
lastValue = null // Consume the value
}
}
// assert invariant: lastValue != null implies timeoutMillis > 0
assert { lastValue == null || timeoutMillis > 0 }
// wait for the next value with timeout
select<Unit> {
// Set timeout when lastValue exists and is not consumed yet
if (lastValue != null) {
onTimeout(timeoutMillis) {
downstream.emit(NULL.unbox(lastValue))
lastValue = null // Consume the value
}
}
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull { value ->
if (value == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = value
}
}
}
}
}
/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
*
* Example:
*
* ```kotlin
* flow {
* repeat(10) {
* emit(it)
* delay(110)
* }
* }.sample(200)
* ```
* <!--- KNIT example-delay-03.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 5, 7, 9
* ```
* <!--- TEST -->
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@FlowPreview
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
return scopedFlow { downstream ->
val values = produce<Any?>(capacity = Channel.CONFLATED) {
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
while (lastValue !== DONE) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel(ChildCancelledException())
lastValue = DONE
} else {
lastValue = it
}
}
// todo: shall be start sampling only when an element arrives or sample aways as here?
ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
}
/*
* TODO this design (and design of the corresponding operator) depends on #540
*/
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
delay(initialDelayMillis)
while (true) {
channel.send(Unit)
delay(delayMillis)
}
}
}
/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period].
*
* Example:
*
* ```kotlin
* flow {
* repeat(10) {
* emit(it)
* delay(110.milliseconds)
* }
* }.sample(200.milliseconds)
* ```
* <!--- KNIT example-delay-duration-03.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 5, 7, 9
* ```
* <!--- TEST -->
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelayMillis())