blob: 08b4914c4c6d148cc8f872b22ae15a77285de905 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("DeferredResultUnused")
package kotlinx.coroutines.scheduling
import kotlinx.coroutines.*
import org.junit.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() {
init {
corePoolSize = CORES_COUNT
}
private val observedConcurrency = ConcurrentHashMap<Int, Boolean>()
private val concurrentWorkers = AtomicInteger(0)
@Test
fun testLimitParallelism() = runBlocking {
val limitingDispatcher = blockingDispatcher(CORES_COUNT)
val iterations = 50_000 * stressTestMultiplier
val tasks = (1..iterations).map {
async(limitingDispatcher) {
try {
val currentlyExecuting = concurrentWorkers.incrementAndGet()
observedConcurrency[currentlyExecuting] = true
require(currentlyExecuting <= CORES_COUNT)
} finally {
concurrentWorkers.decrementAndGet()
}
}
}
tasks.forEach { it.await() }
require(tasks.isNotEmpty())
for (i in CORES_COUNT + 1..CORES_COUNT * 2) {
require(i !in observedConcurrency.keys) { "Unexpected state: $observedConcurrency" }
}
checkPoolThreadsCreated(CORES_COUNT..CORES_COUNT + CORES_COUNT * 2)
}
@Test
fun testCpuTasksStarvation() = runBlocking {
val iterations = 1000 * stressTestMultiplier
repeat(iterations) {
// Create a dispatcher every iteration to increase probability of race
val dispatcher = ExperimentalCoroutineDispatcher(CORES_COUNT)
val blockingDispatcher = dispatcher.blocking(100)
val blockingBarrier = CyclicBarrier(CORES_COUNT * 3 + 1)
val cpuBarrier = CyclicBarrier(CORES_COUNT + 1)
val cpuTasks = CopyOnWriteArrayList<Deferred<*>>()
val blockingTasks = CopyOnWriteArrayList<Deferred<*>>()
repeat(CORES_COUNT) {
async(dispatcher) {
// These two will be stolen first
blockingTasks += async(blockingDispatcher) { blockingBarrier.await() }
blockingTasks += async(blockingDispatcher) { blockingBarrier.await() }
// Empty on CPU job which should be executed while blocked tasks are hang
cpuTasks += async(dispatcher) { cpuBarrier.await() }
// Block with next task. Block cores * 3 threads in total
blockingTasks += async(blockingDispatcher) { blockingBarrier.await() }
}
}
cpuTasks.forEach { require(it.isActive) }
cpuBarrier.await()
cpuTasks.forEach { it.await() }
blockingTasks.forEach { require(it.isActive) }
blockingBarrier.await()
blockingTasks.forEach { it.await() }
dispatcher.close()
}
}
@Test
fun testBlockingTasksStarvation() = runBlocking {
corePoolSize = 2 // Easier to reproduce race with unparks
val iterations = 10_000 * stressTestMultiplier
val blockingLimit = 4 // CORES_COUNT * 3
val blocking = blockingDispatcher(blockingLimit)
repeat(iterations) {
val barrier = CyclicBarrier(blockingLimit + 1)
// Should eat all limit * 3 cpu without any starvation
val tasks = (1..blockingLimit).map { async(blocking) { barrier.await() } }
tasks.forEach { require(it.isActive) }
barrier.await()
tasks.joinAll()
}
}
@Test
fun testBlockingTasksStarvationWithCpuTasks() = runBlocking {
val iterations = 1000 * stressTestMultiplier
val blockingLimit = CORES_COUNT * 2
val blocking = blockingDispatcher(blockingLimit)
repeat(iterations) {
// Overwhelm global queue with external CPU tasks
val cpuTasks = (1..CORES_COUNT).map { async(dispatcher) { while (true) delay(1) } }
val barrier = CyclicBarrier(blockingLimit + 1)
// Should eat all limit * 3 cpu without any starvation
val tasks = (1..blockingLimit).map { async(blocking) { barrier.await() } }
tasks.forEach { require(it.isActive) }
barrier.await()
tasks.joinAll()
cpuTasks.forEach { it.cancelAndJoin() }
}
}
}