blob: 9e093b77ec84e0bca948a4db8b92d9c95af85654 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.native.concurrent.*
/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
*
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
* Sender to this broadcast channel never suspends and [offer] always returns `true`.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
*
* This implementation is fully lock-free. In this implementation
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
* number of subscribers.
*
* **Note: This API is experimental.** It may be changed in the future updates.
*/
@ExperimentalCoroutinesApi
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Creates an instance of this class that already holds a value.
*
* It is as a shortcut to creating an instance with a default constructor and
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
*/
public constructor(value: E) : this() {
_state.lazySet(State<E>(value, null))
}
private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
private val _updating = atomic(0)
// State transitions: null -> handler -> HANDLER_INVOKED
private val onCloseHandler = atomic<Any?>(null)
private companion object {
@SharedImmutable
private val CLOSED = Closed(null)
@SharedImmutable
private val UNDEFINED = Symbol("UNDEFINED")
private val INITIAL_STATE = State<Any?>(UNDEFINED, null)
}
private class State<E>(
@JvmField val value: Any?, // UNDEFINED | E
@JvmField val subscribers: Array<Subscriber<E>>?
)
private class Closed(@JvmField val closeCause: Throwable?) {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
}
/**
* The most recently sent element to this channel.
*
* Access to this property throws [IllegalStateException] when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close] without a cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
@Suppress("UNCHECKED_CAST")
public val value: E get() {
_state.loop { state ->
when (state) {
is Closed -> throw state.valueException
is State<*> -> {
if (state.value === UNDEFINED) throw IllegalStateException("No value")
return state.value as E
}
else -> error("Invalid state $state")
}
}
}
/**
* The most recently sent element to this channel or `null` when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close].
*/
public val valueOrNull: E? get() = when (val state = _state.value) {
is Closed -> null
is State<*> -> UNDEFINED.unbox<E?>(state.value)
else -> error("Invalid state $state")
}
public override val isClosedForSend: Boolean get() = _state.value is Closed
public override val isFull: Boolean get() = false
@Suppress("UNCHECKED_CAST")
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber(this)
_state.loop { state ->
when (state) {
is Closed -> {
subscriber.close(state.closeCause)
return subscriber
}
is State<*> -> {
if (state.value !== UNDEFINED)
subscriber.offerInternal(state.value as E)
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
if (_state.compareAndSet(state, update))
return subscriber
}
else -> error("Invalid state $state")
}
}
}
@Suppress("UNCHECKED_CAST")
private fun closeSubscriber(subscriber: Subscriber<E>) {
_state.loop { state ->
when (state) {
is Closed -> return
is State<*> -> {
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
if (_state.compareAndSet(state, update))
return
}
else -> error("Invalid state $state")
}
}
}
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
if (list == null) return Array(1) { subscriber }
return list + subscriber
}
@Suppress("UNCHECKED_CAST")
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
val n = list.size
val i = list.indexOf(subscriber)
assert { i >= 0 }
if (n == 1) return null
val update = arrayOfNulls<Subscriber<E>>(n - 1)
list.copyInto(
destination = update,
endIndex = i
)
list.copyInto(
destination = update,
destinationOffset = i,
startIndex = i + 1
)
return update as Array<Subscriber<E>>
}
@Suppress("UNCHECKED_CAST")
public override fun close(cause: Throwable?): Boolean {
_state.loop { state ->
when (state) {
is Closed -> return false
is State<*> -> {
val update = if (cause == null) CLOSED else Closed(cause)
if (_state.compareAndSet(state, update)) {
(state as State<E>).subscribers?.forEach { it.close(cause) }
invokeOnCloseHandler(cause)
return true
}
}
else -> error("Invalid state $state")
}
}
}
private fun invokeOnCloseHandler(cause: Throwable?) {
val handler = onCloseHandler.value
if (handler !== null && handler !== HANDLER_INVOKED
&& onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
@Suppress("UNCHECKED_CAST")
(handler as Handler)(cause)
}
}
override fun invokeOnClose(handler: Handler) {
// Intricate dance for concurrent invokeOnClose and close
if (!onCloseHandler.compareAndSet(null, handler)) {
val value = onCloseHandler.value
if (value === HANDLER_INVOKED) {
throw IllegalStateException("Another handler was already registered and successfully invoked")
} else {
throw IllegalStateException("Another handler was already registered: $value")
}
} else {
val state = _state.value
if (state is Closed && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
(handler)(state.closeCause)
}
}
}
/**
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
public override fun cancel(cause: Throwable?): Boolean = close(cause)
/**
* Cancels this conflated broadcast channel with an optional cause, same as [close].
* This function closes the channel with
* the specified cause (unless it was already closed),
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
*/
public override fun cancel(cause: CancellationException?) {
close(cause)
}
/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation never suspends.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public override suspend fun send(element: E) {
offerInternal(element)?.let { throw it.sendException }
}
/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation always returns `true`.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public override fun offer(element: E): Boolean {
offerInternal(element)?.let { throw it.sendException }
return true
}
@Suppress("UNCHECKED_CAST")
private fun offerInternal(element: E): Closed? {
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
// before that offer (we lost) and that offer overwrote us and conflated our offer.
if (!_updating.compareAndSet(0, 1)) return null
try {
_state.loop { state ->
when (state) {
is Closed -> return state
is State<*> -> {
val update = State(element, (state as State<E>).subscribers)
if (_state.compareAndSet(state, update)) {
// Note: Using offerInternal here to ignore the case when this subscriber was
// already concurrently closed (assume the close had conflated our offer for this
// particular subscriber).
state.subscribers?.forEach { it.offerInternal(element) }
return null
}
}
else -> error("Invalid state $state")
}
}
} finally {
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
}
}
public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
registerSelectSend(select, param, block)
}
}
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect()) return
offerInternal(element)?.let {
select.resumeSelectWithException(it.sendException)
return
}
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
}
private class Subscriber<E>(
private val broadcastChannel: ConflatedBroadcastChannel<E>
) : ConflatedChannel<E>(), ReceiveChannel<E> {
override fun onCancelIdempotent(wasClosed: Boolean) {
if (wasClosed) {
broadcastChannel.closeSubscriber(this)
}
}
public override fun offerInternal(element: E): Any = super.offerInternal(element)
}
}