adblib: Add `runAlongOtherScope` utility
It is sometimes useful to run a coroutine function
(i.e. a `suspend` function) that should be cancelled
from 2 possible sources: either the cancellation
can come from the the current scope (or parent),
or the cancellation can come with some other
unrelated scope with an unrelated lifetime.
In the context of `adblib`, this can be useful when
executing a long running coroutine, e.g. collecting
a `StateFlow`, for a long as a device is connected
or for as long as a JdwpProcess is active. By
using the `ConnectedDevice.scope` or
`JdwpProcess.scope` as the `otherScope`
parameter of `runAlongOtherScope`, one can
achieve the intended behavior.
Note that the using the
`outerScope.async { suspend_call() }.await`
pattern does not quite work: cancellation
of `outerScope` indeed ensures that
`suspend_call()` is cancelled, but cancellation
of the parent scope only cancel the `await()`
call, i.e. it does *not* cancel the `suspend_call()`
coroutine.
Test: Included
Bug: n/a
Change-Id: I3982def486b1f9712500001881df5d1e03fa7b32
diff --git a/adblib/src/com/android/adblib/utils/CoroutineUtils.kt b/adblib/src/com/android/adblib/utils/CoroutineUtils.kt
index 6691af2..2948e70 100644
--- a/adblib/src/com/android/adblib/utils/CoroutineUtils.kt
+++ b/adblib/src/com/android/adblib/utils/CoroutineUtils.kt
@@ -18,6 +18,7 @@
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
+import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
@@ -74,6 +75,49 @@
}
/**
+ * Runs [block] as a regular `suspend` function, except that it gets cancelled when [otherScope]
+ * is cancelled.
+ */
+suspend inline fun <R> runAlongOtherScope(
+ otherScope: CoroutineScope,
+ crossinline block: suspend () -> R
+): R {
+ // Attach a completion handler that cancels this coroutine when "otherScope" is cancelled
+ // The completion handler is removed as soon as the execution of `block` ends, so that
+ // we don't cancel the caller at some point later in the execution path.
+ val currentJob = currentCoroutineContext().job
+
+ // Note: As of April 2024, the `invokeOnCompletion` overload that take `onCancelling` parameter
+ // is marked as internal API. The other overload is fully public and supported, but uses the
+ // `onCancelling = false` semantics, which delays the completion handler invocation, which
+ // may lead to deadlocks if using a "single thread" Dispatcher, i.e. `block` may not be
+ // cancelled right when `outerScope` is cancelled, leading to `block` still executing and
+ // using a Dispatcher slot.
+ @OptIn(InternalCoroutinesApi::class)
+ val handler = otherScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) { throwable ->
+ when (throwable) {
+ is CancellationException -> {
+ currentJob.cancel(throwable)
+ }
+
+ null -> {
+ /* Nothing to do */
+ }
+
+ else -> {
+ currentJob.cancel(CancellationException(throwable.message, throwable))
+ }
+ }
+ }
+
+ return try {
+ block()
+ } finally {
+ handler.dispose()
+ }
+}
+
+/**
* Re-entrant version of [Mutex.lock]
*
* See [Phantom of the Coroutine](https://elizarov.medium.com/phantom-of-the-coroutine-afc63b03a131)
diff --git a/adblib/test/src/com/android/adblib/utils/RunAlongOtherScopeTest.kt b/adblib/test/src/com/android/adblib/utils/RunAlongOtherScopeTest.kt
new file mode 100644
index 0000000..985a7e4
--- /dev/null
+++ b/adblib/test/src/com/android/adblib/utils/RunAlongOtherScopeTest.kt
@@ -0,0 +1,241 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.android.adblib.utils
+
+import com.android.adblib.testingutils.CoroutineTestUtils.runBlockingWithTimeout
+import com.android.adblib.testingutils.CoroutineTestUtils.yieldUntil
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.junit.Assert
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.ExpectedException
+
+class RunAlongOtherScopeTest {
+
+ @JvmField
+ @Rule
+ var exceptionRule: ExpectedException = ExpectedException.none()
+
+ @Test
+ fun testSimpleInvocationWorks() = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ val foo = runAlongOtherScope(otherScope) {
+ "foo"
+ }
+
+ // Assert
+ assertEquals("foo", foo)
+
+ // (Let test scope terminate)
+ otherScope.cancel("Test ended")
+ }
+
+ @Test
+ fun testSimpleSuspendingInvocationWorks() = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ val foo = runAlongOtherScope(otherScope) {
+ delay(10)
+ "foo"
+ }
+ otherScope.cancel("End of test")
+
+ // Assert
+ assertEquals("foo", foo)
+ }
+
+ @Test
+ fun testInvocationIsTransparentToException() = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ exceptionRule.expect(Exception::class.java)
+ exceptionRule.expectMessage("foo")
+ runAlongOtherScope(otherScope) {
+ throw Exception("foo")
+ }
+
+ // Assert
+ @Suppress("UNREACHABLE_CODE")
+ Assert.fail("Should not reach")
+ }
+
+ @Test
+ fun testInvocationIsTransparentToCancellation() = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ exceptionRule.expect(CancellationException::class.java)
+ exceptionRule.expectMessage("foo")
+ runAlongOtherScope(otherScope) {
+ throw CancellationException("foo")
+ }
+
+ // Assert
+ @Suppress("UNREACHABLE_CODE")
+ Assert.fail("Should not reach")
+ }
+
+ @Test
+ fun testInvocationIsTransparentToCancel() = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ exceptionRule.expect(CancellationException::class.java)
+ exceptionRule.expectMessage("foo")
+ coroutineScope {
+ runAlongOtherScope(otherScope) {
+ cancel("foo")
+ }
+ }
+
+ // Assert
+ Assert.fail("Should not reach")
+ }
+
+ @Test
+ fun testInvocationIsCancelledWhenOtherScopeIsCancelledBeforeInvocation() =
+ runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ exceptionRule.expect(CancellationException::class.java)
+ exceptionRule.expectMessage("foo")
+ otherScope.cancel("foo")
+ runAlongOtherScope(otherScope) {
+ delay(1_000_000)
+ }
+
+ // Assert
+ Assert.fail("Should not reach")
+ }
+
+ @Test
+ fun testInvocationIsCancelledWhenOtherScopeIsCancelledDuringInvocation(): Unit =
+ runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+
+ // Act
+ val started = CompletableDeferred<Unit>()
+ launch {
+ started.await()
+ otherScope.cancel("foo")
+ }
+ exceptionRule.expect(CancellationException::class.java)
+ exceptionRule.expectMessage("foo")
+ runAlongOtherScope(otherScope) {
+ started.complete(Unit)
+ delay(1_000_000)
+ }
+
+ // Assert
+ Assert.fail("Should not reach")
+ }
+
+ @Test
+ fun testInvocationIsCancelledWhenJobIsCancelled(): Unit = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+ val parentScope = createChildScope()
+
+ // Act
+ val started = CompletableDeferred<Unit>()
+ var exception: Throwable? = null
+ val job = parentScope.async {
+ runAlongOtherScope(otherScope) {
+ try {
+ started.complete(Unit)
+ delay(1_000_000)
+ } catch (t: Throwable) {
+ exception = t
+ }
+ }
+ }
+
+ started.await()
+ job.cancel("foo")
+ yieldUntil { exception != null }
+
+ // Assert
+ val result = kotlin.runCatching { job.await() }
+ assertTrue(result.isFailure)
+ assertTrue(result.exceptionOrNull() is CancellationException)
+ assertEquals("foo", result.exceptionOrNull()?.message)
+
+ assertTrue(exception is CancellationException)
+ assertEquals("foo", exception?.message)
+
+ // (Let test scope terminate)
+ parentScope.cancel("Test ended")
+ otherScope.cancel("Test ended")
+ }
+
+ @Test
+ fun testInvocationIsCancelledWhenParentScopeIsCancelled(): Unit = runBlockingWithTimeout {
+ // Prepare
+ val otherScope = createChildScope(isSupervisor = true)
+ val parentScope = createChildScope()
+
+ // Act
+ val started = CompletableDeferred<Unit>()
+ var exception: Throwable? = null
+ val job = parentScope.async {
+ runAlongOtherScope(otherScope) {
+ try {
+ started.complete(Unit)
+ delay(1_000_000)
+ } catch (t: Throwable) {
+ exception = t
+ }
+ }
+ }
+
+ started.await()
+ parentScope.cancel("foo")
+ yieldUntil { exception != null }
+
+ // Assert
+ val result = kotlin.runCatching { job.await() }
+ assertTrue(result.isFailure)
+ assertTrue(result.exceptionOrNull() is CancellationException)
+ assertEquals("foo", result.exceptionOrNull()?.message)
+
+ assertTrue(exception is CancellationException)
+ assertEquals("foo", exception?.message)
+
+ // (Let test scope terminate)
+ parentScope.cancel("Test ended")
+ otherScope.cancel("Test ended")
+ }
+}