blob: 7fc212f59fe9cc60dfa1b1a8fd00e1dd0cd2192b [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.atomic.*
import kotlin.test.*
/**
* Test that ensures implementation correctness of [LimitingDispatcher] and
* designed to stress its particular implementation details.
*/
class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() {
private val concurrentWorkers = AtomicInteger(0)
@Before
fun setUp() {
// In case of starvation test will hang
idleWorkerKeepAliveNs = Long.MAX_VALUE
}
@Test
fun testAddPollRace() = runBlocking {
val limitingDispatcher = blockingDispatcher(1)
val iterations = 25_000 * stressTestMultiplier
// Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
for (i in 1..iterations) {
val tasks = (1..2).map {
async(limitingDispatcher) {
try {
val currentlyExecuting = concurrentWorkers.incrementAndGet()
assertEquals(1, currentlyExecuting)
} finally {
concurrentWorkers.decrementAndGet()
}
}
}
tasks.forEach { it.await() }
}
}
@Test
fun testPingPongThreadsCount() = runBlocking {
corePoolSize = CORES_COUNT
val iterations = 100_000 * stressTestMultiplier
val completed = AtomicInteger(0)
for (i in 1..iterations) {
val tasks = (1..2).map {
async(dispatcher) {
// Useless work
concurrentWorkers.incrementAndGet()
concurrentWorkers.decrementAndGet()
completed.incrementAndGet()
}
}
tasks.forEach { it.await() }
}
assertEquals(2 * iterations, completed.get())
}
}