blob: f829573c87982932e471afbe47b3799bbd9ee416 [file] [log] [blame]
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package benchmarks.scheduler
import benchmarks.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
* Benchmark which launches multiple async jobs each with either own private or global shared state,
* each job iterates over its state multiple times and suspends after every iteration.
* Benchmark is intended to indicate pros and cons of coroutines affinity (assuming threads are rarely migrated)
* and comparison with single thread and ForkJoinPool
* Benchmark (dispatcher) (jobsCount) Mode Cnt Score Error Units
* StatefulAsyncBenchmark.dependentStateAsync fjp 1 avgt 10 42.147 ± 11.563 us/op
* StatefulAsyncBenchmark.dependentStateAsync fjp 8 avgt 10 111.053 ± 40.097 us/op
* StatefulAsyncBenchmark.dependentStateAsync fjp 16 avgt 10 239.992 ± 52.839 us/op
* StatefulAsyncBenchmark.dependentStateAsync ftp_1 1 avgt 10 32.851 ± 11.385 us/op
* StatefulAsyncBenchmark.dependentStateAsync ftp_1 8 avgt 10 51.692 ± 0.961 us/op
* StatefulAsyncBenchmark.dependentStateAsync ftp_1 16 avgt 10 101.511 ± 3.060 us/op
* StatefulAsyncBenchmark.dependentStateAsync ftp_8 1 avgt 10 31.549 ± 1.014 us/op
* StatefulAsyncBenchmark.dependentStateAsync ftp_8 8 avgt 10 103.990 ± 1.588 us/op
* StatefulAsyncBenchmark.dependentStateAsync ftp_8 16 avgt 10 156.384 ± 2.914 us/op
* StatefulAsyncBenchmark.independentStateAsync fjp 1 avgt 10 32.503 ± 0.721 us/op
* StatefulAsyncBenchmark.independentStateAsync fjp 8 avgt 10 73.000 ± 1.686 us/op
* StatefulAsyncBenchmark.independentStateAsync fjp 16 avgt 10 98.629 ± 7.541 us/op
* StatefulAsyncBenchmark.independentStateAsync ftp_1 1 avgt 10 26.111 ± 0.814 us/op
* StatefulAsyncBenchmark.independentStateAsync ftp_1 8 avgt 10 54.644 ± 1.261 us/op
* StatefulAsyncBenchmark.independentStateAsync ftp_1 16 avgt 10 104.871 ± 1.599 us/op
* StatefulAsyncBenchmark.independentStateAsync ftp_8 1 avgt 10 31.929 ± 0.698 us/op
* StatefulAsyncBenchmark.independentStateAsync ftp_8 8 avgt 10 108.959 ± 1.029 us/op
* StatefulAsyncBenchmark.independentStateAsync ftp_8 16 avgt 10 159.593 ± 5.262 us/op
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
private val stateSize = 2048
private val jobSuspensions = 2 // multiplicative factor for throughput
// it's useful to have more jobs than cores so run queue always will be non empty
@Param("1", "8", "16")
var jobsCount = 1
@Param("fjp", "ftp_1", "dispatcher")
override var dispatcher: String = "fjp"
private var state: Array<LongArray>? = null
override fun setup() {
state = Array(Runtime.getRuntime().availableProcessors() * 4) { LongArray(stateSize) { ThreadLocalRandom.current().nextLong() } }
fun independentStateAsync() = runBlocking {
val broadcastChannel = BroadcastChannel<Int>(1)
val subscriptionChannel = Channel<Int>(jobsCount)
val jobs= (0 until jobsCount).map { launchJob(it, broadcastChannel, subscriptionChannel) }.toList()
repeat(jobsCount) {
subscriptionChannel.receive() // await all jobs to start
// Fire barrier to start execution
jobs.forEach { it.await() }
fun dependentStateAsync() = runBlocking {
val broadcastChannel = BroadcastChannel<Int>(1)
val subscriptionChannel = Channel<Int>(jobsCount)
val jobs= (0 until jobsCount).map { launchJob(0, broadcastChannel, subscriptionChannel) }.toList()
repeat(jobsCount) {
subscriptionChannel.receive() // await all jobs to start
// Fire barrier to start execution
jobs.forEach { it.await() }
private fun launchJob(
stateNum: Int,
channel: BroadcastChannel<Int>,
subscriptionChannel: Channel<Int>
): Deferred<Long> =
async {
val subscription = channel.openSubscription()
var sum = 0L
repeat(jobSuspensions) {
val arr = state!![stateNum]
for (i in 0 until stateSize) {
sum += arr[i]