Fix BlockHound false positives (#2331)
Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2302
Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2190
Partially fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2303
diff --git a/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt b/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
index f89d2be..091e8eb 100644
--- a/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
+++ b/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
@@ -1,16 +1,168 @@
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.debug
-import reactor.blockhound.BlockHound
import kotlinx.coroutines.scheduling.*
+import reactor.blockhound.*
import reactor.blockhound.integration.*
@Suppress("UNUSED")
-public class CoroutinesBlockHoundIntegration: BlockHoundIntegration {
+public class CoroutinesBlockHoundIntegration : BlockHoundIntegration {
- override fun applyTo(builder: BlockHound.Builder) {
- builder.addDynamicThreadPredicate { isSchedulerWorker(it) }
- builder.nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
+ override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) {
+ allowBlockingCallsInPrimitiveImplementations()
+ allowBlockingWhenEnqueuingTasks()
+ allowServiceLoaderInvocationsOnInit()
+ allowBlockingCallsInReflectionImpl()
+ /* The predicates that define that BlockHound should only report blocking calls from threads that are part of
+ the coroutine thread pool and currently execute a CPU-bound coroutine computation. */
+ addDynamicThreadPredicate { isSchedulerWorker(it) }
+ nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
+ }
+
+ /**
+ * Allows blocking calls in various coroutine structures, such as flows and channels.
+ *
+ * They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so
+ * locking in such places doesn't affect the program liveness.
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() {
+ allowBlockingCallsInJobSupport()
+ allowBlockingCallsInThreadSafeHeap()
+ allowBlockingCallsInFlow()
+ allowBlockingCallsInChannels()
+ }
+
+ /**
+ * Allows blocking inside [kotlinx.coroutines.JobSupport].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInJobSupport() {
+ for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling",
+ "tryMakeCompleting"))
+ {
+ allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method)
+ }
+ }
+
+ /**
+ * Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() {
+ for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) {
+ allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method)
+ }
+ // [addLastIf] is only used in [EventLoop.common]. Users of [removeFirstIf]:
+ allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineDispatcher", "doActionsUntil")
+ allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineContext", "triggerActions")
+ }
+
+ private fun BlockHound.Builder.allowBlockingCallsInFlow() {
+ allowBlockingCallsInsideStateFlow()
+ allowBlockingCallsInsideSharedFlow()
+ }
+
+ /**
+ * Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() {
+ allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState")
+ }
+
+ /**
+ * Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() {
+ for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter",
+ "tryTakeValue", "resetReplayCache"))
+ {
+ allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method)
+ }
+ for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) {
+ allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method)
+ }
+ }
+
+ private fun BlockHound.Builder.allowBlockingCallsInChannels() {
+ allowBlockingCallsInArrayChannel()
+ allowBlockingCallsInBroadcastChannel()
+ allowBlockingCallsInConflatedChannel()
+ }
+
+ /**
+ * Allows blocking inside [kotlinx.coroutines.channels.ArrayChannel].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInArrayChannel() {
+ for (method in listOf(
+ "pollInternal", "isEmpty", "isFull", "isClosedForReceive", "offerInternal", "offerSelectInternal",
+ "enqueueSend", "pollInternal", "pollSelectInternal", "enqueueReceiveInternal", "onCancelIdempotent"))
+ {
+ allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayChannel", method)
+ }
+ }
+
+ /**
+ * Allows blocking inside [kotlinx.coroutines.channels.ArrayBroadcastChannel].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannel() {
+ for (method in listOf("offerInternal", "offerSelectInternal", "updateHead")) {
+ allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel", method)
+ }
+ for (method in listOf("checkOffer", "pollInternal", "pollSelectInternal")) {
+ allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel\$Subscriber", method)
+ }
+ }
+
+ /**
+ * Allows blocking inside [kotlinx.coroutines.channels.ConflatedChannel].
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInConflatedChannel() {
+ for (method in listOf("offerInternal", "offerSelectInternal", "pollInternal", "pollSelectInternal",
+ "onCancelIdempotent"))
+ {
+ allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedChannel", method)
+ }
+ }
+
+ /**
+ * Allows blocking when enqueuing tasks into a thread pool.
+ *
+ * Without this, the following code breaks:
+ * ```
+ * withContext(Dispatchers.Default) {
+ * withContext(newSingleThreadContext("singleThreadedContext")) {
+ * }
+ * }
+ * ```
+ */
+ private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() {
+ /* This method may block as part of its implementation, but is probably safe. */
+ allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute")
+ }
+
+ /**
+ * Allows instances of [java.util.ServiceLoader] being called.
+ *
+ * Each instance is listed separately; another approach could be to generally allow the operations performed by
+ * service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a
+ * large API surface, with some methods being hidden as implementation details (in particular, the implementation of
+ * its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation
+ * would be brittle, so here we only provide clearance rules for some specific instances.
+ */
+ private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() {
+ allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>")
+ allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>")
+ // not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking
+ allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>")
+ }
+
+ /**
+ * Allows some blocking calls from the reflection API.
+ *
+ * The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some
+ * simple examples work without problems.
+ */
+ private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() {
+ allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage")
}
}
diff --git a/kotlinx-coroutines-debug/test/BlockHoundTest.kt b/kotlinx-coroutines-debug/test/BlockHoundTest.kt
index ff5c95c..571daca 100644
--- a/kotlinx-coroutines-debug/test/BlockHoundTest.kt
+++ b/kotlinx-coroutines-debug/test/BlockHoundTest.kt
@@ -1,5 +1,6 @@
package kotlinx.coroutines.debug
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.*
import reactor.blockhound.*
@@ -52,6 +53,27 @@
}
}
+ @Test
+ fun testChannelsNotBeingConsideredBlocking() = runTest {
+ withContext(Dispatchers.Default) {
+ // Copy of kotlinx.coroutines.channels.ArrayChannelTest.testSimple
+ val q = Channel<Int>(1)
+ check(q.isEmpty)
+ check(!q.isClosedForReceive)
+ check(!q.isClosedForSend)
+ val sender = launch {
+ q.send(1)
+ q.send(2)
+ }
+ val receiver = launch {
+ q.receive() == 1
+ q.receive() == 2
+ }
+ sender.join()
+ receiver.join()
+ }
+ }
+
@Test(expected = BlockingOperationError::class)
fun testReusingThreadsFailure() = runTest {
val n = 100