adblib: Add `runAlongOtherScope` utility

It is sometimes useful to run a coroutine function
(i.e. a `suspend` function) that should be cancelled
from 2 possible sources: either the cancellation
can come from the the current scope (or parent),
or the cancellation can come with some other
unrelated scope with an unrelated lifetime.

In the context of `adblib`, this can be useful when
executing a long running coroutine, e.g. collecting
a `StateFlow`, for a long as a device is connected
or for as long as a JdwpProcess is active. By
using the `ConnectedDevice.scope` or
`JdwpProcess.scope` as the `otherScope`
parameter of `runAlongOtherScope`, one can
achieve the intended behavior.

Note that the using the
`outerScope.async { suspend_call() }.await`
pattern does not quite work: cancellation
of `outerScope` indeed ensures that
`suspend_call()` is cancelled, but cancellation
of the parent scope only cancel the `await()`
call, i.e. it does *not* cancel the `suspend_call()`
coroutine.

Test: Included
Bug: n/a
Change-Id: I3982def486b1f9712500001881df5d1e03fa7b32
diff --git a/adblib/src/com/android/adblib/utils/CoroutineUtils.kt b/adblib/src/com/android/adblib/utils/CoroutineUtils.kt
index 6691af2..2948e70 100644
--- a/adblib/src/com/android/adblib/utils/CoroutineUtils.kt
+++ b/adblib/src/com/android/adblib/utils/CoroutineUtils.kt
@@ -18,6 +18,7 @@
 import kotlinx.coroutines.CancellationException
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.CoroutineStart
+import kotlinx.coroutines.InternalCoroutinesApi
 import kotlinx.coroutines.Job
 import kotlinx.coroutines.SupervisorJob
 import kotlinx.coroutines.cancel
@@ -74,6 +75,49 @@
 }
 
 /**
+ * Runs [block] as a regular `suspend` function, except that it gets cancelled when [otherScope]
+ * is cancelled.
+ */
+suspend inline fun <R> runAlongOtherScope(
+    otherScope: CoroutineScope,
+    crossinline block: suspend () -> R
+): R {
+    // Attach a completion handler that cancels this coroutine when "otherScope" is cancelled
+    // The completion handler is removed as soon as the execution of `block` ends, so that
+    // we don't cancel the caller at some point later in the execution path.
+    val currentJob = currentCoroutineContext().job
+
+    // Note: As of April 2024, the `invokeOnCompletion` overload that take `onCancelling` parameter
+    // is marked as internal API. The other overload is fully public and supported, but uses the
+    // `onCancelling = false` semantics, which delays the completion handler invocation, which
+    // may lead to deadlocks if using a "single thread" Dispatcher, i.e. `block` may not be
+    // cancelled right when `outerScope` is cancelled, leading to `block` still executing and
+    // using a Dispatcher slot.
+    @OptIn(InternalCoroutinesApi::class)
+    val handler = otherScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) { throwable ->
+        when (throwable) {
+            is CancellationException -> {
+                currentJob.cancel(throwable)
+            }
+
+            null -> {
+                /* Nothing to do */
+            }
+
+            else -> {
+                currentJob.cancel(CancellationException(throwable.message, throwable))
+            }
+        }
+    }
+
+    return try {
+        block()
+    } finally {
+        handler.dispose()
+    }
+}
+
+/**
  * Re-entrant version of [Mutex.lock]
  *
  * See [Phantom of the Coroutine](https://elizarov.medium.com/phantom-of-the-coroutine-afc63b03a131)
diff --git a/adblib/test/src/com/android/adblib/utils/RunAlongOtherScopeTest.kt b/adblib/test/src/com/android/adblib/utils/RunAlongOtherScopeTest.kt
new file mode 100644
index 0000000..985a7e4
--- /dev/null
+++ b/adblib/test/src/com/android/adblib/utils/RunAlongOtherScopeTest.kt
@@ -0,0 +1,241 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.android.adblib.utils
+
+import com.android.adblib.testingutils.CoroutineTestUtils.runBlockingWithTimeout
+import com.android.adblib.testingutils.CoroutineTestUtils.yieldUntil
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.junit.Assert
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.ExpectedException
+
+class RunAlongOtherScopeTest {
+
+    @JvmField
+    @Rule
+    var exceptionRule: ExpectedException = ExpectedException.none()
+
+    @Test
+    fun testSimpleInvocationWorks() = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+
+        // Act
+        val foo = runAlongOtherScope(otherScope) {
+            "foo"
+        }
+
+        // Assert
+        assertEquals("foo", foo)
+
+        // (Let test scope terminate)
+        otherScope.cancel("Test ended")
+    }
+
+    @Test
+    fun testSimpleSuspendingInvocationWorks() = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+
+        // Act
+        val foo = runAlongOtherScope(otherScope) {
+            delay(10)
+            "foo"
+        }
+        otherScope.cancel("End of test")
+
+        // Assert
+        assertEquals("foo", foo)
+    }
+
+    @Test
+    fun testInvocationIsTransparentToException() = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+
+        // Act
+        exceptionRule.expect(Exception::class.java)
+        exceptionRule.expectMessage("foo")
+        runAlongOtherScope(otherScope) {
+            throw Exception("foo")
+        }
+
+        // Assert
+        @Suppress("UNREACHABLE_CODE")
+        Assert.fail("Should not reach")
+    }
+
+    @Test
+    fun testInvocationIsTransparentToCancellation() = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+
+        // Act
+        exceptionRule.expect(CancellationException::class.java)
+        exceptionRule.expectMessage("foo")
+        runAlongOtherScope(otherScope) {
+            throw CancellationException("foo")
+        }
+
+        // Assert
+        @Suppress("UNREACHABLE_CODE")
+        Assert.fail("Should not reach")
+    }
+
+    @Test
+    fun testInvocationIsTransparentToCancel() = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+
+        // Act
+        exceptionRule.expect(CancellationException::class.java)
+        exceptionRule.expectMessage("foo")
+        coroutineScope {
+            runAlongOtherScope(otherScope) {
+                cancel("foo")
+            }
+        }
+
+        // Assert
+        Assert.fail("Should not reach")
+    }
+
+    @Test
+    fun testInvocationIsCancelledWhenOtherScopeIsCancelledBeforeInvocation() =
+        runBlockingWithTimeout {
+            // Prepare
+            val otherScope = createChildScope(isSupervisor = true)
+
+            // Act
+            exceptionRule.expect(CancellationException::class.java)
+            exceptionRule.expectMessage("foo")
+            otherScope.cancel("foo")
+            runAlongOtherScope(otherScope) {
+                delay(1_000_000)
+            }
+
+            // Assert
+            Assert.fail("Should not reach")
+        }
+
+    @Test
+    fun testInvocationIsCancelledWhenOtherScopeIsCancelledDuringInvocation(): Unit =
+        runBlockingWithTimeout {
+            // Prepare
+            val otherScope = createChildScope(isSupervisor = true)
+
+            // Act
+            val started = CompletableDeferred<Unit>()
+            launch {
+                started.await()
+                otherScope.cancel("foo")
+            }
+            exceptionRule.expect(CancellationException::class.java)
+            exceptionRule.expectMessage("foo")
+            runAlongOtherScope(otherScope) {
+                started.complete(Unit)
+                delay(1_000_000)
+            }
+
+            // Assert
+            Assert.fail("Should not reach")
+        }
+
+    @Test
+    fun testInvocationIsCancelledWhenJobIsCancelled(): Unit = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+        val parentScope = createChildScope()
+
+        // Act
+        val started = CompletableDeferred<Unit>()
+        var exception: Throwable? = null
+        val job = parentScope.async {
+            runAlongOtherScope(otherScope) {
+                try {
+                    started.complete(Unit)
+                    delay(1_000_000)
+                } catch (t: Throwable) {
+                    exception = t
+                }
+            }
+        }
+
+        started.await()
+        job.cancel("foo")
+        yieldUntil { exception != null }
+
+        // Assert
+        val result = kotlin.runCatching { job.await() }
+        assertTrue(result.isFailure)
+        assertTrue(result.exceptionOrNull() is CancellationException)
+        assertEquals("foo", result.exceptionOrNull()?.message)
+
+        assertTrue(exception is CancellationException)
+        assertEquals("foo", exception?.message)
+
+        // (Let test scope terminate)
+        parentScope.cancel("Test ended")
+        otherScope.cancel("Test ended")
+    }
+
+    @Test
+    fun testInvocationIsCancelledWhenParentScopeIsCancelled(): Unit = runBlockingWithTimeout {
+        // Prepare
+        val otherScope = createChildScope(isSupervisor = true)
+        val parentScope = createChildScope()
+
+        // Act
+        val started = CompletableDeferred<Unit>()
+        var exception: Throwable? = null
+        val job = parentScope.async {
+            runAlongOtherScope(otherScope) {
+                try {
+                    started.complete(Unit)
+                    delay(1_000_000)
+                } catch (t: Throwable) {
+                    exception = t
+                }
+            }
+        }
+
+        started.await()
+        parentScope.cancel("foo")
+        yieldUntil { exception != null }
+
+        // Assert
+        val result = kotlin.runCatching { job.await() }
+        assertTrue(result.isFailure)
+        assertTrue(result.exceptionOrNull() is CancellationException)
+        assertEquals("foo", result.exceptionOrNull()?.message)
+
+        assertTrue(exception is CancellationException)
+        assertEquals("foo", exception?.message)
+
+        // (Let test scope terminate)
+        parentScope.cancel("Test ended")
+        otherScope.cancel("Test ended")
+    }
+}