blob: 8d9eed21bcc75cf1fb762420e33e984fba3ee0f9 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks.
* It is created or pigged back onto (see [ThreadLocalEventLoop])
* by `runBlocking` and by [Dispatchers.Unconfined].
*
* @suppress **This an internal API and should not be used from general code.**
*/
internal abstract class EventLoop : CoroutineDispatcher() {
/**
* Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
*/
private var useCount = 0L
/**
* Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
*/
private var shared = false
/**
* Queue used by [Dispatchers.Unconfined] tasks.
* These tasks are thread-local for performance and take precedence over the rest of the queue.
*/
private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* * `<= 0` -- there are potentially more events for immediate processing;
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
* * [Long.MAX_VALUE] -- no more events.
*
* **NOTE**: Must be invoked only from the event loop's thread
* (no check for performance reasons, may be added in the future).
*/
open fun processNextEvent(): Long {
if (!processUnconfinedEvent()) return Long.MAX_VALUE
return 0
}
protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
protected open val nextTime: Long
get() {
val queue = unconfinedQueue ?: return Long.MAX_VALUE
return if (queue.isEmpty()) Long.MAX_VALUE else 0L
}
fun processUnconfinedEvent(): Boolean {
val queue = unconfinedQueue ?: return false
val task = queue.removeFirstOrNull() ?: return false
task.run()
return true
}
/**
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
open fun shouldBeProcessedFromContext(): Boolean = false
/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}
val isActive: Boolean
get() = useCount > 0
val isUnconfinedLoopActive: Boolean
get() = useCount >= delta(unconfined = true)
// May only be used from the event loop's thread
val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty() ?: true
private fun delta(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L
fun incrementUseCount(unconfined: Boolean = false) {
useCount += delta(unconfined)
if (!unconfined) shared = true
}
fun decrementUseCount(unconfined: Boolean = false) {
useCount -= delta(unconfined)
if (useCount > 0) return
assert { useCount == 0L } // "Extra decrementUseCount"
if (shared) {
// shut it down and remove from ThreadLocalEventLoop
shutdown()
}
}
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}
open fun shutdown() {}
}
internal object ThreadLocalEventLoop {
private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal fun currentOrNull(): EventLoop? =
ref.get()
internal fun resetEventLoop() {
ref.set(null)
}
internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}
private val DISPOSED_TASK = Symbol("REMOVED_TASK")
// results for scheduleImpl
private const val SCHEDULE_OK = 0
private const val SCHEDULE_COMPLETED = 1
private const val SCHEDULE_DISPOSED = 2
private const val MS_TO_NS = 1_000_000L
private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
/**
* First-line overflow protection -- limit maximal delay.
* Delays longer than this one (~146 years) are considered to be delayed "forever".
*/
private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
internal fun delayToNanos(timeMillis: Long): Long = when {
timeMillis <= 0 -> 0L
timeMillis >= MAX_MS -> Long.MAX_VALUE
else -> timeMillis * MS_TO_NS
}
internal fun delayNanosToMillis(timeNanos: Long): Long =
timeNanos / MS_TO_NS
private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
private typealias Queue<T> = LockFreeTaskQueueCore<T>
internal expect abstract class EventLoopImplPlatform() : EventLoop {
// Called to unpark this event loop's thread
protected fun unpark()
// Called to reschedule to DefaultExecutor when this event loop is complete
protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
}
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// Allocated only only once
private val _delayed = atomic<DelayedTaskQueue?>(null)
private val _isCompleted = atomic(false)
private var isCompleted
get() = _isCompleted.value
set(value) { _isCompleted.value = value }
override val isEmpty: Boolean get() {
if (!isUnconfinedQueueEmpty) return false
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) return false
return when (val queue = _queue.value) {
null -> true
is Queue<*> -> queue.isEmpty
else -> queue === CLOSED_EMPTY
}
}
override val nextTime: Long
get() {
if (super.nextTime == 0L) return 0L
val queue = _queue.value
when {
queue === null -> {} // empty queue -- proceed
queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
else -> return 0 // non-empty queue
}
val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
}
override fun shutdown() {
// Clean up thread-local reference here -- this event loop is shutting down
ThreadLocalEventLoop.resetEventLoop()
// We should signal that this event loop should not accept any more tasks
// and process queued events (that could have been added after last processNextEvent)
isCompleted = true
closeQueue()
// complete processing of all queued tasks
while (processNextEvent() <= 0) { /* spin */ }
// reschedule the rest of delayed tasks
rescheduleAllDelayed()
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
/*
* Order is important here: first we schedule the heap and only then
* publish it to continuation. Otherwise, `DelayedResumeTask` would
* have to know how to be disposed of even when it wasn't scheduled yet.
*/
schedule(now, task)
continuation.disposeOnCancellation(task)
}
}
}
protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val timeNanos = delayToNanos(timeMillis)
return if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedRunnableTask(now + timeNanos, block).also { task ->
schedule(now, task)
}
} else {
NonDisposableHandle
}
}
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return 0
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
val task = dequeue()
if (task != null) {
platformAutoreleasePool { task.run() }
return 0
}
return nextTime
}
final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
open fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}
@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
_queue.loop { queue ->
when (queue) {
null -> return null
is Queue<*> -> {
val result = (queue as Queue<Runnable>).removeFirstOrNull()
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
_queue.compareAndSet(queue, queue.next())
}
else -> when {
queue === CLOSED_EMPTY -> return null
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
}
}
}
}
private fun closeQueue() {
assert { isCompleted }
_queue.loop { queue ->
when (queue) {
null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
is Queue<*> -> {
queue.close()
return
}
else -> when {
queue === CLOSED_EMPTY -> return
else -> {
// update to full-blown queue to close
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
if (_queue.compareAndSet(queue, newQueue)) return
}
}
}
}
}
fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
if (isCompleted) return SCHEDULE_COMPLETED
val delayedQueue = _delayed.value ?: run {
_delayed.compareAndSet(null, DelayedTaskQueue(now))
_delayed.value!!
}
return delayedTask.scheduleTask(now, delayedQueue, this)
}
// It performs "hard" shutdown for test cleanup purposes
protected fun resetAll() {
_queue.value = null
_delayed.value = null
}
// This is a "soft" (normal) shutdown
private fun rescheduleAllDelayed() {
val now = nanoTime()
while (true) {
/*
* `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
* synchronized on DelayedTask itself. All other operation are synchronized both on
* DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
* first removes DelayedTask from the heap (under synchronization) then
* assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
*/
val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
reschedule(now, delayedTask)
}
}
internal abstract class DelayedTask(
/**
* This field can be only modified in [scheduleTask] before putting this DelayedTask
* into heap to avoid overflow and corruption of heap data structure.
*/
@JvmField var nanoTime: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
@Volatile
private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
override var heap: ThreadSafeHeap<*>?
get() = _heap as? ThreadSafeHeap<*>
set(value) {
require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
_heap = value
}
override var index: Int = -1
override fun compareTo(other: DelayedTask): Int {
val dTime = nanoTime - other.nanoTime
return when {
dTime > 0 -> 1
dTime < 0 -> -1
else -> 0
}
}
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
delayed.addLastIf(this) { firstTask ->
if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
/**
* We are about to add new task and we have to make sure that [DelayedTaskQueue]
* invariant is maintained. The code in this lambda is additionally executed under
* the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
*/
if (firstTask == null) {
/**
* When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
* the current now time even if that means "going backwards in time". This makes the structure
* self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
* are removed from the delayed queue for execution.
*/
delayed.timeNow = now
} else {
/**
* Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
* and only goes forward in time. We cannot let it go backwards in time or invariant can be
* violated for tasks that were already scheduled.
*/
val firstTime = firstTask.nanoTime
// compute min(now, firstTime) using a wrap-safe check
val minTime = if (firstTime - now >= 0) now else firstTime
// update timeNow only when going forward in time
if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
}
/**
* Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
* task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
* function can be called to reschedule from one queue to another and this might be another reason
* where new task's time might now violate invariant.
* We correct invariant violation (if any) by simply changing this task's time to now.
*/
if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
true
}
return SCHEDULE_OK
}
final override fun dispose(): Unit = synchronized(this) {
val heap = _heap
if (heap === DISPOSED_TASK) return // already disposed
(heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
_heap = DISPOSED_TASK // never add again to any heap
}
override fun toString(): String = "Delayed[nanos=$nanoTime]"
}
private inner class DelayedResumeTask(
nanoTime: Long,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
override fun run() { with(cont) { resumeUndispatched(Unit) } }
override fun toString(): String = super.toString() + cont.toString()
}
private class DelayedRunnableTask(
nanoTime: Long,
private val block: Runnable
) : DelayedTask(nanoTime) {
override fun run() { block.run() }
override fun toString(): String = super.toString() + block.toString()
}
/**
* Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
* long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
* heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
* The invariant is that for every scheduled [DelayedTask]:
*
* ```
* delayedTask.nanoTime - timeNow >= 0
* ```
*
* So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
* scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
* new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
* (so there is nothing special to do there).
*/
internal class DelayedTaskQueue(
@JvmField var timeNow: Long
) : ThreadSafeHeap<DelayedTask>()
}
internal expect fun createEventLoop(): EventLoop
internal expect fun nanoTime(): Long
internal expect object DefaultExecutor {
fun enqueue(task: Runnable)
}
/**
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
* non-Darwin native targets.
*
* Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
* the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
* be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
* pool management, it must manage the pool creation and pool drainage manually.
*/
internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)