blob: fc4436f418bacef7c344ad4e2f4d498aee2b0f36 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("UNUSED_VARIABLE")
package kotlinx.coroutines.scheduling
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import org.junit.*
import kotlin.coroutines.*
import kotlin.test.*
abstract class SchedulerTestBase : TestBase() {
companion object {
val CORES_COUNT = AVAILABLE_PROCESSORS
/**
* Asserts that [expectedThreadsCount] pool worker threads were created.
* Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
*/
fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) {
val threadsCount = maxSequenceNumber()!!
assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount")
}
/**
* Asserts that any number of pool worker threads in [range] were created.
* Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
*/
fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) {
val maxSequenceNumber = maxSequenceNumber()!!
val r = (range.first)..(range.last + base)
assertTrue(
maxSequenceNumber in r,
"Expected pool threads to be in interval $r, but has $maxSequenceNumber"
)
}
private fun maxSequenceNumber(): Int? {
return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
.map { sequenceNumber(it.name) }.maxOrNull()
}
private fun sequenceNumber(threadName: String): Int {
val suffix = threadName.substring(threadName.lastIndexOf("-") + 1)
val separatorIndex = suffix.indexOf(' ')
if (separatorIndex == -1) {
return suffix.toInt()
}
return suffix.substring(0, separatorIndex).toInt()
}
suspend fun Iterable<Job>.joinAll() = forEach { it.join() }
}
protected var corePoolSize = CORES_COUNT
protected var maxPoolSize = 1024
protected var idleWorkerKeepAliveNs = IDLE_WORKER_KEEP_ALIVE_NS
private var _dispatcher: SchedulerCoroutineDispatcher? = null
protected val dispatcher: CoroutineDispatcher
get() {
if (_dispatcher == null) {
_dispatcher = SchedulerCoroutineDispatcher(
corePoolSize,
maxPoolSize,
idleWorkerKeepAliveNs
)
}
return _dispatcher!!
}
protected var blockingDispatcher = lazy {
blockingDispatcher(1000)
}
protected fun blockingDispatcher(parallelism: Int): CoroutineDispatcher {
val intitialize = dispatcher
return _dispatcher!!.blocking(parallelism)
}
protected fun view(parallelism: Int): CoroutineDispatcher {
val intitialize = dispatcher
return _dispatcher!!.limitedParallelism(parallelism)
}
@After
fun after() {
runBlocking {
withTimeout(5_000) {
_dispatcher?.close()
}
}
}
}
internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): CoroutineDispatcher {
return object : CoroutineDispatcher() {
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
this@blocking.dispatchWithContext(block, BlockingContext, true)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
this@blocking.dispatchWithContext(block, BlockingContext, false)
}
}.limitedParallelism(parallelism)
}