blob: 0fa50489834b5757304e68cc79f17b8bdfc9af98 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package benchmarks
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.select
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import java.lang.Integer.max
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Phaser
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
/**
* Benchmark to measure channel algorithm performance in terms of average time per `send-receive` pair;
* actually, it measures the time for a batch of such operations separated into the specified number of consumers/producers.
* It uses different channels (rendezvous, buffered, unlimited; see [ChannelCreator]) and different dispatchers
* (see [DispatcherCreator]). If the [_3_withSelect] property is set, it invokes `send` and
* `receive` via [select], waiting on a local dummy channel simultaneously, simulating a "cancellation" channel.
*
* Please, be patient, this benchmark takes quite a lot of time to complete.
*/
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 3)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class ChannelProducerConsumerBenchmark {
@Param
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
@Param
private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
@Param("0", "1000")
private var _2_coroutines: Int = 0
@Param("false", "true")
private var _3_withSelect: Boolean = false
@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "12") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var _4_parallelism: Int = 0
private lateinit var dispatcher: CoroutineDispatcher
private lateinit var channel: Channel<Int>
@InternalCoroutinesApi
@Setup
fun setup() {
dispatcher = _0_dispatcher.create(_4_parallelism)
channel = _1_channel.create()
}
@Benchmark
fun spmc() {
if (_2_coroutines != 0) return
val producers = max(1, _4_parallelism - 1)
val consumers = 1
run(producers, consumers)
}
@Benchmark
fun mpmc() {
val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
val consumers = producers
run(producers, consumers)
}
private fun run(producers: Int, consumers: Int) {
val n = APPROX_BATCH_SIZE / producers * producers
val phaser = Phaser(producers + consumers + 1)
// Run producers
repeat(producers) {
GlobalScope.launch(dispatcher) {
val dummy = if (_3_withSelect) _1_channel.create() else null
repeat(n / producers) {
produce(it, dummy)
}
phaser.arrive()
}
}
// Run consumers
repeat(consumers) {
GlobalScope.launch(dispatcher) {
val dummy = if (_3_withSelect) _1_channel.create() else null
repeat(n / consumers) {
consume(dummy)
}
phaser.arrive()
}
}
// Wait until work is done
phaser.arriveAndAwaitAdvance()
}
private suspend fun produce(element: Int, dummy: Channel<Int>?) {
if (_3_withSelect) {
select<Unit> {
channel.onSend(element) {}
dummy!!.onReceive {}
}
} else {
channel.send(element)
}
doWork()
}
private suspend fun consume(dummy: Channel<Int>?) {
if (_3_withSelect) {
select<Unit> {
channel.onReceive {}
dummy!!.onReceive {}
}
} else {
channel.receive()
}
doWork()
}
}
enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
}
enum class ChannelCreator(private val capacity: Int) {
RENDEZVOUS(Channel.RENDEZVOUS),
// BUFFERED_1(1),
BUFFERED_2(2),
// BUFFERED_4(4),
BUFFERED_32(32),
BUFFERED_128(128),
BUFFERED_UNLIMITED(Channel.UNLIMITED);
fun create(): Channel<Int> = Channel(capacity)
}
private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
private const val WORK_MIN = 50L
private const val WORK_MAX = 100L
private const val APPROX_BATCH_SIZE = 100000