blob: f675b3722d4c59e0d00f82e0af76f91c873dd061 [file] [log] [blame]
/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.build.gradle.internal.tasks
import com.android.builder.tasks.BooleanLatch
import org.junit.Test
import java.util.concurrent.Callable
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertTrue
/** Tests for WorkLimiter */
class WorkLimiterTest {
/**
* This test verifies that the work limiter both
* (a) Prevents more than workLimit items from running in parallel.
* (b) Allows workLimit items to run in parallel.
*/
@Test
fun checkConcurrencyIsCappedCorrectly() {
val workLimiter = WorkLimiter(concurrencyLimit)
val workItemStartAttemptCount = AtomicInteger()
val workItemConcurrentRunningCount = AtomicInteger()
val callers = mutableListOf<ForkJoinTask<*>>()
val allAttemptsStarted = BooleanLatch()
val maxRunningInParallel = BooleanLatch()
val callerPool = ForkJoinPool(concurrentAttempts)
try {
// Schedule more work items and ensure that not too many run in parallel.
for (workItemIndex in 1..concurrentAttempts) {
callers.add(
callerPool.submit(
TestRunnable(
workLimiter, workItemIndex, workItemStartAttemptCount,
allAttemptsStarted,
workItemConcurrentRunningCount,
maxRunningInParallel
)
)
)
}
allAttemptsStarted.await()
for (caller in callers) {
caller.get()
}
} finally {
callerPool.shutdown()
}
}
private inner class TestRunnable(
val workLimiter: WorkLimiter,
val id: Int,
val workItemStartAttemptCount: AtomicInteger,
val allAttemptsStarted: BooleanLatch,
val workItemConcurrentRunningCount: AtomicInteger,
val maxRunningInParallel: BooleanLatch
) : Runnable {
override fun run() {
val startAttemptCount = workItemStartAttemptCount.incrementAndGet()
if (startAttemptCount == concurrentAttempts) {
// All the attempts have started, allow them to continue to execute.
allAttemptsStarted.signal()
}
workLimiter.limit(Callable<Void> {
val currentlyRunning = workItemConcurrentRunningCount.incrementAndGet()
try {
when {
currentlyRunning == concurrencyLimit -> {
// We have reached the desired level of concurrency, allow the test to
// continue.
maxRunningInParallel.signal()
}
currentlyRunning > concurrencyLimit -> {
throw AssertionError("Work item $id: Too many concurrent jobs")
}
}
// Block until all items have started, to attempt to prevent the test from
// trivially passing because of previous work items finishing before the next
// are scheduled.
assertTrue(
allAttemptsStarted.await(waitTimeoutNanos),
"Attempt count $id timed out waiting for all work items to be scheduled."
)
// Expectation: At most concurrencyLimit threads were waiting on
// allAttemptsStarted when it is first signalled, as the others will be blocked
// by the workLimiter.
// Block items until sufficient items have started to prevent the test from
// passing if the work limiter is limiting the number of concurrent items
// to below the work limit.
assertTrue(
maxRunningInParallel.await(waitTimeoutNanos),
"Attempt count $id timed out waiting for $concurrencyLimit work items to be scheduled."
)
} finally {
workItemConcurrentRunningCount.decrementAndGet()
}
null
})
}
}
companion object {
// Timeout to avoid the making the rest of the suite time out if something is wrong
val waitTimeoutNanos = TimeUnit.SECONDS.toNanos(600)
const val concurrencyLimit = 3
const val concurrentAttempts = 10
}
}