blob: 91b5473c419a44328fdac5951df5e11077f0f625 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
/**
* Broadcast channel with array buffer of a fixed [capacity].
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
* receiver suspends only when buffer is empty.
*
* **Note**, that elements that are sent to this channel while there are no
* [openSubscription] subscribers are immediately lost.
*
* This channel is created by `BroadcastChannel(capacity)` factory function invocation.
*
* This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
* The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
* The lists of suspended senders or receivers are lock-free.
*/
internal class ArrayBroadcastChannel<E>(
/**
* Buffer capacity.
*/
val capacity: Int
) : AbstractSendChannel<E>(null), BroadcastChannel<E> {
init {
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
}
/*
* Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
* - Write element to buffer then write "tail" (volatile)
* - Read "tail" (volatile), then read element from buffer
* So read/writes to buffer need not be volatile
*/
private val bufferLock = ReentrantLock()
private val buffer = arrayOfNulls<Any?>(capacity)
// head & tail are Long (64 bits) and we assume that they never wrap around
// head, tail, and size are guarded by bufferLock
private val _head = atomic(0L)
private var head: Long // do modulo on use of head
get() = _head.value
set(value) { _head.value = value }
private val _tail = atomic(0L)
private var tail: Long // do modulo on use of tail
get() = _tail.value
set(value) { _tail.value = value }
private val _size = atomic(0)
private var size: Int
get() = _size.value
set(value) { _size.value = value }
private val subscribers = subscriberList<Subscriber<E>>()
override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity
public override fun openSubscription(): ReceiveChannel<E> =
Subscriber(this).also {
updateHead(addSub = it)
}
public override fun close(cause: Throwable?): Boolean {
if (!super.close(cause)) return false
checkSubOffers()
return true
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
override fun cancel(cause: Throwable?): Boolean =
cancelInternal(cause)
override fun cancel(cause: CancellationException?) {
cancelInternal(cause)
}
private fun cancelInternal(cause: Throwable?): Boolean =
close(cause).also {
for (sub in subscribers) sub.cancelInternal(cause)
}
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
override fun offerInternal(element: E): Any {
bufferLock.withLock {
// check if closed for send (under lock, so size cannot change)
closedForSend?.let { return it }
val size = this.size
if (size >= capacity) return OFFER_FAILED
val tail = this.tail
buffer[(tail % capacity).toInt()] = element
this.size = size + 1
this.tail = tail + 1
}
// if offered successfully, then check subscribers outside of lock
checkSubOffers()
return OFFER_SUCCESS
}
// result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
bufferLock.withLock {
// check if closed for send (under lock, so size cannot change)
closedForSend?.let { return it }
val size = this.size
if (size >= capacity) return OFFER_FAILED
// let's try to select sending this element to buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
return ALREADY_SELECTED
}
val tail = this.tail
buffer[(tail % capacity).toInt()] = element
this.size = size + 1
this.tail = tail + 1
}
// if offered successfully, then check subscribers outside of lock
checkSubOffers()
return OFFER_SUCCESS
}
private fun checkSubOffers() {
var updated = false
var hasSubs = false
@Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
for (sub in subscribers) {
hasSubs = true
if (sub.checkOffer()) updated = true
}
if (updated || !hasSubs)
updateHead()
}
// updates head if needed and optionally adds / removes subscriber under the same lock
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
// update head in a tail rec loop
var send: Send? = null
bufferLock.withLock {
if (addSub != null) {
addSub.subHead = tail // start from last element
val wasEmpty = subscribers.isEmpty()
subscribers.add(addSub)
if (!wasEmpty) return // no need to update when adding second and etc sub
}
if (removeSub != null) {
subscribers.remove(removeSub)
if (head != removeSub.subHead) return // no need to update
}
val minHead = computeMinHead()
val tail = this.tail
var head = this.head
val targetHead = minHead.coerceAtMost(tail)
if (targetHead <= head) return // nothing to do -- head was already moved
var size = this.size
// clean up removed (on not need if we don't have any subscribers anymore)
while (head < targetHead) {
buffer[(head % capacity).toInt()] = null
val wasFull = size >= capacity
// update the size before checking queue (no more senders can queue up)
this.head = ++head
this.size = --size
if (wasFull) {
while (true) {
send = takeFirstSendOrPeekClosed() ?: break // when when no sender
if (send is Closed<*>) break // break when closed for send
val token = send!!.tryResumeSend(null)
if (token != null) {
assert { token === RESUME_TOKEN }
// put sent element to the buffer
buffer[(tail % capacity).toInt()] = (send as Send).pollResult
this.size = size + 1
this.tail = tail + 1
return@withLock // go out of lock to wakeup this sender
}
// Too late, already cancelled, but we removed it from the queue and need to release resources.
// However, ArrayBroadcastChannel does not support onUndeliveredElement, so nothing to do
}
}
}
return // done updating here -> return
}
// we only get out of the lock normally when there is a sender to resume
send!!.completeResumeSend()
// since we've just sent an element, we might need to resume some receivers
checkSubOffers()
// tailrec call to recheck
updateHead()
}
private fun computeMinHead(): Long {
var minHead = Long.MAX_VALUE
for (sub in subscribers)
minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
return minHead
}
@Suppress("UNCHECKED_CAST")
private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
private class Subscriber<E>(
private val broadcastChannel: ArrayBroadcastChannel<E>
) : AbstractChannel<E>(null), ReceiveChannel<E> {
private val subLock = ReentrantLock()
private val _subHead = atomic(0L)
var subHead: Long // guarded by subLock
get() = _subHead.value
set(value) { _subHead.value = value }
override val isBufferAlwaysEmpty: Boolean get() = false
override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
override val isBufferFull: Boolean get() = error("Should not be used")
override fun close(cause: Throwable?): Boolean {
val wasClosed = super.close(cause)
if (wasClosed) {
broadcastChannel.updateHead(removeSub = this)
subLock.withLock {
subHead = broadcastChannel.tail
}
}
return wasClosed
}
// returns true if subHead was updated and broadcast channel's head must be checked
// this method is lock-free (it never waits on lock)
@Suppress("UNCHECKED_CAST")
fun checkOffer(): Boolean {
var updated = false
var closed: Closed<*>? = null
loop@
while (needsToCheckOfferWithoutLock()) {
// just use `tryLock` here and break when some other thread is checking under lock
// it means that `checkOffer` must be retried after every `unlock`
if (!subLock.tryLock()) break
val receive: ReceiveOrClosed<E>?
var result: Any?
try {
result = peekUnderLock()
when {
result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
result is Closed<*> -> {
closed = result
break@loop // was closed
}
}
// find a receiver for an element
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
if (receive is Closed<*>) break // noting more to do if this sub already closed
val token = receive.tryResumeReceive(result as E, null) ?: continue
assert { token === RESUME_TOKEN }
val subHead = this.subHead
this.subHead = subHead + 1 // retrieved element for this subscriber
updated = true
} finally {
subLock.unlock()
}
receive!!.completeResumeReceive(result as E)
}
// do close outside of lock if needed
closed?.also { close(cause = it.closeCause) }
return updated
}
// result is `E | POLL_FAILED | Closed`
override fun pollInternal(): Any? {
var updated = false
val result = subLock.withLock {
val result = peekUnderLock()
when {
result is Closed<*> -> { /* just bail out of lock */ }
result === POLL_FAILED -> { /* just bail out of lock */ }
else -> {
// update subHead after retrieiving element from buffer
val subHead = this.subHead
this.subHead = subHead + 1
updated = true
}
}
result
}
// do close outside of lock
(result as? Closed<*>)?.also { close(cause = it.closeCause) }
// there could have been checkOffer attempt while we were holding lock
// now outside the lock recheck if anything else to offer
if (checkOffer())
updated = true
// and finally update broadcast's channel head if needed
if (updated)
broadcastChannel.updateHead()
return result
}
// result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
override fun pollSelectInternal(select: SelectInstance<*>): Any? {
var updated = false
val result = subLock.withLock {
var result = peekUnderLock()
when {
result is Closed<*> -> { /* just bail out of lock */ }
result === POLL_FAILED -> { /* just bail out of lock */ }
else -> {
// let's try to select receiving this element from buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
result = ALREADY_SELECTED
} else {
// update subHead after retrieiving element from buffer
val subHead = this.subHead
this.subHead = subHead + 1
updated = true
}
}
}
result
}
// do close outside of lock
(result as? Closed<*>)?.also { close(cause = it.closeCause) }
// there could have been checkOffer attempt while we were holding lock
// now outside the lock recheck if anything else to offer
if (checkOffer())
updated = true
// and finally update broadcast's channel head if needed
if (updated)
broadcastChannel.updateHead()
return result
}
// Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
// to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
// subscription about an element that was just offered
private fun needsToCheckOfferWithoutLock(): Boolean {
if (closedForReceive != null)
return false // already closed -> nothing to do
if (isBufferEmpty && broadcastChannel.closedForReceive == null)
return false // no data for us && broadcast channel was not closed yet -> nothing to do
return true // check otherwise
}
// guarded by lock, returns:
// E - the element from the buffer at subHead
// Closed<*> when closed;
// POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
private fun peekUnderLock(): Any? {
val subHead = this.subHead // guarded read (can be non-volatile read)
// note: from the broadcastChannel we must read closed token first, then read its tail
// because it is Ok if tail moves in between the reads (we make decision based on tail first)
val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
val tail = broadcastChannel.tail // unguarded volatile read
if (subHead >= tail) {
// no elements to poll from the queue -- check if closed broads & closed this sub
// must retest `needsToCheckOfferWithoutLock` outside of the lock
return closedBroadcast ?: this.closedForReceive ?: POLL_FAILED
}
// Get tentative result. This result may be wrong (completely invalid value, including null),
// because this subscription might get closed, moving channel's head past this subscription's head.
val result = broadcastChannel.elementAt(subHead)
// now check if this subscription was closed
val closedSub = this.closedForReceive
if (closedSub != null) return closedSub
// we know the subscription was not closed, so this tentative result is Ok to return
return result
}
}
// ------ debug ------
override val bufferDebugString: String
get() = "(buffer:capacity=${buffer.size},size=$size)"
}