blob: 724a5909b950850aab88b3963e94077b175169a8 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package benchmarks.scheduler
import benchmarks.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
/*
* Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
* FJP job is organized in perfectly balanced binary tree, every leaf node computes
* FPU-heavy sum over its data and intermediate nodes sum results.
*
* Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
* ForkJoinBenchmark.asyncExperimental avgt 10 681.512 ± 32.069 ms/op
* ForkJoinBenchmark.asyncFjp avgt 10 845.386 ± 73.204 ms/op
* ForkJoinBenchmark.fjpRecursiveTask avgt 10 692.120 ± 26.224 ms/op
* ForkJoinBenchmark.fjpTask avgt 10 791.087 ± 66.544 ms/op
*
* Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
* Benchmark Mode Cnt Score Error Units
* ForkJoinBenchmark.asyncExperimental avgt 10 1273.271 ± 190.372 ms/op
* ForkJoinBenchmark.asyncFjp avgt 10 1406.102 ± 216.793 ms/op
* ForkJoinBenchmark.fjpRecursiveTask avgt 10 849.941 ± 141.254 ms/op
* ForkJoinBenchmark.fjpTask avgt 10 831.554 ± 57.276 ms/op
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
companion object {
/*
* Change task size to control global granularity of benchmark
* Change batch size to control affinity/work stealing/scheduling overhead effects
*/
const val TASK_SIZE = 8192 * 1024
const val BATCH_SIZE = 32 * 8192
}
lateinit var coefficients: LongArray
override var dispatcher: String = "scheduler"
@Setup
override fun setup() {
super.setup()
coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
}
@Benchmark
fun asyncFjp() = runBlocking {
CoroutineScope(ForkJoinPool.commonPool().asCoroutineDispatcher()).startAsync(coefficients, 0, coefficients.size).await()
}
@Benchmark
fun asyncExperimental() = runBlocking {
startAsync(coefficients, 0, coefficients.size).await()
}
@Benchmark
fun fjpRecursiveTask(): Double {
val task = RecursiveAction(coefficients, 0, coefficients.size)
return ForkJoinPool.commonPool().submit(task).join()
}
@Benchmark
fun fjpTask(): Double {
val task = Task(coefficients, 0, coefficients.size)
return ForkJoinPool.commonPool().submit(task).join()
}
suspend fun CoroutineScope.startAsync(coefficients: LongArray, start: Int, end: Int): Deferred<Double> = async {
if (end - start <= BATCH_SIZE) {
compute(coefficients, start, end)
} else {
val first = startAsync(coefficients, start, start + (end - start) / 2)
val second = startAsync(coefficients, start + (end - start) / 2, end)
first.await() + second.await()
}
}
class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
override fun compute(): Double {
if (end - start <= BATCH_SIZE) {
return compute(coefficients, start, end)
}
val first = Task(coefficients, start, start + (end - start) / 2).fork()
val second = Task(coefficients, start + (end - start) / 2, end).fork()
var result = 0.0
result += first.join()
result += second.join()
return result
}
private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
var result = 0.0
for (i in start until end) {
result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
}
return result
}
}
class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
private var first: ForkJoinTask<Double>? = null
private var second: ForkJoinTask<Double>? = null
override fun getRawResult(): Double {
return result
}
override fun setRawResult(t: Double) {
result = t
}
override fun compute() {
if (end - start <= BATCH_SIZE) {
rawResult = compute(coefficients, start, end)
} else {
pendingCount = 2
// One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
first = RecursiveAction(
coefficients,
start,
start + (end - start) / 2,
parent = this
).fork()
second = RecursiveAction(
coefficients,
start + (end - start) / 2,
end,
parent = this
).fork()
}
tryComplete()
}
override fun onCompletion(caller: CountedCompleter<*>?) {
if (caller !== this) {
rawResult = first!!.rawResult + second!!.rawResult
}
super.onCompletion(caller)
}
}
}
private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
var result = 0.0
for (i in start until end) {
result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
}
return result
}