blob: 1c1306c291ddf2d09752e947da32354f41ed3313 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*
@ExperimentalCoroutinesApi
public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher {
if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model")
return WorkerDispatcher(name)
}
public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model")
require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads"}
return MultiWorkerDispatcher(name, nThreads)
}
internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
private val worker = Worker.start(name = name)
override fun dispatch(context: CoroutineContext, block: Runnable) {
worker.executeAfter(0L) { block.run() }
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
worker.executeAfter(timeMillis.toMicrosSafe()) {
with(continuation) { resumeUndispatched(Unit) }
}
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
// Workers don't have an API to cancel sent "executeAfter" block, but we are trying
// to control the damage and reduce reachable objects by nulling out `block`
// that may retain a lot of references, and leaving only an empty shell after a timely disposal
// This is a class and not an object with `block` in a closure because that would defeat the purpose.
class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
private val disposableHolder = AtomicReference<Runnable?>(block)
override fun invoke() {
disposableHolder.value?.run()
}
override fun dispose() {
disposableHolder.value = null
}
}
val disposableBlock = DisposableBlock(block)
worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock)
return disposableBlock
}
override fun close() {
worker.requestTermination().result // Note: calling "result" blocks
}
private fun Long.toMicrosSafe(): Long {
val result = this * 1000
return if (result > this) result else Long.MAX_VALUE
}
}
private class MultiWorkerDispatcher(name: String, workersCount: Int) : CloseableCoroutineDispatcher() {
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
private val workers = Array(workersCount) { Worker.start(name = "$name-$it") }
init {
workers.forEach { w -> w.executeAfter(0L) { workerRunLoop() } }
}
private fun workerRunLoop() = runBlocking {
for (task in tasksQueue) {
// TODO error handling
task.run()
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
// TODO handle rejections
tasksQueue.trySend(block)
}
override fun close() {
tasksQueue.close()
workers.forEach { it.requestTermination().result }
}
}