blob: bbdebd08b959ee7d2219a4feca8e4cc98cace861 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
package kotlinx.coroutines.flow.internal
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
private typealias Update = IndexedValue<Any?>
@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
flows: Array<out Flow<T>>,
arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
val size = flows.size
if (size == 0) return@flowScope // bail-out for empty input
val latestValues = arrayOfNulls<Any?>(size)
latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
val resultChannel = Channel<Update>(size)
val nonClosed = LocalAtomicInt(size)
var remainingAbsentValues = size
for (i in 0 until size) {
// Coroutine per flow that keeps track of its value and sends result to downstream
launch {
try {
flows[i].collect { value ->
resultChannel.send(Update(i, value))
yield() // Emulate fairness, giving each flow chance to emit
}
} finally {
// Close the channel when there is no more flows
if (nonClosed.decrementAndGet() == 0) {
resultChannel.close()
}
}
}
}
/*
* Batch-receive optimization: read updates in batches, but bail-out
* as soon as we encountered two values from the same source
*/
val lastReceivedEpoch = ByteArray(size)
var currentEpoch: Byte = 0
while (true) {
++currentEpoch
// Start batch
// The very first receive in epoch should be suspending
var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here
while (true) {
val index = element.index
// Update values
val previous = latestValues[index]
latestValues[index] = element.value
if (previous === UNINITIALIZED) --remainingAbsentValues
// Check epoch
// Received the second value from the same flow in the same epoch -- bail out
if (lastReceivedEpoch[index] == currentEpoch) break
lastReceivedEpoch[index] = currentEpoch
element = resultChannel.poll() ?: break
}
// Process batch result if there is enough data
if (remainingAbsentValues == 0) {
/*
* If arrayFactory returns null, then we can avoid array copy because
* it's our own safe transformer that immediately deconstructs the array
*/
val results = arrayFactory()
if (results == null) {
transform(latestValues as Array<T>)
} else {
(latestValues as Array<T?>).copyInto(results)
transform(results as Array<T>)
}
}
}
}
internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
unsafeFlow {
coroutineScope {
val second = produce<Any> {
flow2.collect { value ->
return@collect channel.send(value ?: NULL)
}
}
/*
* This approach only works with rendezvous channel and is required to enforce correctness
* in the following scenario:
* ```
* val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
* val f2 = flowOf(1)
* f1.zip(f2) { ... }
* ```
*
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
*/
val collectJob = Job()
(second as SendChannel<*>).invokeOnClose {
// Optimization to avoid AFE allocation when the other flow is done
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
}
try {
/*
* Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
* hierarchy:
* -Outer coroutineScope that owns the whole zip process
* - First flow is collected by the child of coroutineScope, collectJob.
* So it can be safely cancelled as soon as the second flow is done
* - **But** the downstream MUST NOT be cancelled when the second flow is done,
* so we emit to downstream from coroutineScope job.
* Typically, such hierarchy requires coroutine for collector that communicates
* with coroutines scope via a channel, but it's way too expensive, so
* we are using this trick instead.
*/
val scopeContext = coroutineContext
val cnt = threadContextElements(scopeContext)
withContextUndispatched(coroutineContext + collectJob, Unit) {
flow.collect { value ->
withContextUndispatched(scopeContext, Unit, cnt) {
val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
emit(transform(value, NULL.unbox(otherValue)))
}
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this@unsafeFlow)
} finally {
if (!second.isClosedForReceive) second.cancel()
}
}
}