* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package kotlinx.coroutines.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
internal const val BUFFER_CAPACITY_BASE = 7
internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
* and any amount of consumers, other pool workers which are trying to steal work.
* ### Fairness
* [WorkQueue] provides semi-FIFO order, but with priority for most recently submitted task assuming
* that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast.
* E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order.
* ### Work offloading
* When the queue is full, half of existing tasks are offloaded to global queue which is regularly polled by other pool workers.
* Offloading occurs in LIFO order for the sake of implementation simplicity: offloads should be extremely rare and occurs only in specific use-cases
* (e.g. when coroutine starts heavy fork-join-like computation), so fairness is not important.
* As an alternative, offloading directly to some [CoroutineScheduler.Worker] may be used, but then the strategy of selecting any idle worker
* should be implemented and implementation should be aware multiple producers.
* @suppress **This is unstable API and it is subject to change.**
internal class WorkQueue {
* We read two independent counter here.
* Producer index is incremented only by owner
* Consumer index is incremented both by owner and external threads
* The only harmful race is:
* [T1] readProducerIndex (1) preemption(2) readConsumerIndex(5)
* [T2] changeProducerIndex (3)
* [T3] changeConsumerIndex (4)
* Which can lead to resulting size bigger than actual size at any moment of time.
* This is in general harmless because steal will be blocked by timer
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
// TODO replace with inlined array when atomicfu will support it
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
private val lastScheduledTask = atomic<Task?>(null)
private val producerIndex = atomic(0)
private val consumerIndex = atomic(0)
* Retrieves and removes task from the head of the queue
* Invariant: this method is called only by the owner of the queue ([pollExternal] is not)
fun poll(): Task? =
lastScheduledTask.getAndSet(null) ?: pollExternal()
* Invariant: this method is called only by the owner of the queue
* @param task task to put into local queue
* @param globalQueue fallback queue which is used when the local queue is overflown
* @return true if no offloading happened, false otherwise
fun add(task: Task, globalQueue: GlobalQueue): Boolean {
val previous = lastScheduledTask.getAndSet(task) ?: return true
return addLast(previous, globalQueue)
// Called only by the owner, returns true if no offloading happened, false otherwise
fun addLast(task: Task, globalQueue: GlobalQueue): Boolean {
var noOffloadingHappened = true
* We need the loop here because race possible not only on full queue,
* but also on queue with one element during stealing
while (!tryAddLast(task)) {
noOffloadingHappened = false
return noOffloadingHappened
* Tries stealing from [victim] queue into this queue, using [globalQueue] to offload stolen tasks in case of current queue overflow.
* @return whether any task was stolen
fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean {
val time = schedulerTimeSource.nanoTime()
val bufferSize = victim.bufferSize
if (bufferSize == 0) return tryStealLastScheduled(time, victim, globalQueue)
* Invariant: time is monotonically increasing (thanks to nanoTime), so we can stop as soon as we find the first task not satisfying a predicate.
* If queue size is larger than QUEUE_SIZE_OFFLOAD_THRESHOLD then unconditionally steal tasks over this limit to prevent possible queue overflow
var wasStolen = false
repeat(((bufferSize / 2).coerceAtLeast(1))) {
val task = victim.pollExternal { task ->
time - task.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS || victim.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD
?: return wasStolen // non-local return from trySteal as we're done
wasStolen = true
add(task, globalQueue)
return wasStolen
private fun tryStealLastScheduled(
time: Long,
victim: WorkQueue,
globalQueue: GlobalQueue
): Boolean {
val lastScheduled = victim.lastScheduledTask.value ?: return false
if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) {
return false
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
add(lastScheduled, globalQueue)
return true
return false
internal fun size(): Int = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize
* Offloads half of the current buffer to [globalQueue]
private fun offloadWork(globalQueue: GlobalQueue) {
repeat((bufferSize / 2).coerceAtLeast(1)) {
val task = pollExternal() ?: return
addToGlobalQueue(globalQueue, task)
private fun addToGlobalQueue(globalQueue: GlobalQueue, task: Task) {
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
* been already shutdown (with the only exception of the last worker thread that might be performing
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
check(globalQueue.addLast(task)) { "GlobalQueue could not be closed yet" }
internal fun offloadAllWork(globalQueue: GlobalQueue) {
lastScheduledTask.getAndSet(null)?.let { addToGlobalQueue(globalQueue, it) }
while (true) {
addToGlobalQueue(globalQueue, pollExternal() ?: return)
* [poll] for external (not owning this queue) workers
private inline fun pollExternal(predicate: (Task) -> Boolean = { true }): Task? {
while (true) {
val tailLocal = consumerIndex.value
if (tailLocal - producerIndex.value == 0) return null
val index = tailLocal and MASK
val element = buffer[index] ?: continue
if (!predicate(element)) {
return null
if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
// 1) Help GC 2) Signal producer that this slot is consumed and may be used
return buffer.getAndSet(index, null)
// Called only by the owner
private fun tryAddLast(task: Task): Boolean {
if (bufferSize == BUFFER_CAPACITY - 1) return false
val headLocal = producerIndex.value
val nextIndex = headLocal and MASK
* If current element is not null then we're racing with consumers for the tail. If we skip this check then
* the consumer can null out current element and it will be lost. If we're racing for tail then
* the queue is close to overflowing => it's fine to offload work to global queue
if (buffer[nextIndex] != null) {
return false
buffer.lazySet(nextIndex, task)
return true