Change RequestCollapsingThrottler to Use a Channel

1000 runs:
https://fusion2.corp.google.com/invocations/22d36137-4c95-49bd-8761-3d3a67012b85

Bug: 328819365
Test: Enabled
Change-Id: I6e2311328272860578cd5236fb295cf1a1022f10
diff --git a/app-inspection/.idea/libraries/kotlin_test.xml b/app-inspection/.idea/libraries/kotlin_test.xml
index 365b751..c67c30e 100644
--- a/app-inspection/.idea/libraries/kotlin_test.xml
+++ b/app-inspection/.idea/libraries/kotlin_test.xml
@@ -2,10 +2,12 @@
   <library name="kotlin-test">
     <CLASSES>
       <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlin/kotlin-test/1.9.20/kotlin-test-1.9.20.jar!/" />
+      <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlinx/kotlinx-coroutines-test-jvm/1.6.4/kotlinx-coroutines-test-jvm-1.6.4.jar!/" />
     </CLASSES>
     <JAVADOC />
     <SOURCES>
       <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlin/kotlin-test/1.9.20/kotlin-test-1.9.20-sources.jar!/" />
+      <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlinx/kotlinx-coroutines-test-jvm/1.6.4/kotlinx-coroutines-test-jvm-1.6.4-sources.jar!/" />
     </SOURCES>
   </library>
-</component>
\ No newline at end of file
+</component>
diff --git a/app-inspection/.idea/libraries/kotlinx_coroutines.xml b/app-inspection/.idea/libraries/kotlinx_coroutines.xml
index e041d9f..8292caa 100644
--- a/app-inspection/.idea/libraries/kotlinx_coroutines.xml
+++ b/app-inspection/.idea/libraries/kotlinx_coroutines.xml
@@ -1,10 +1,11 @@
 <component name="libraryTable">
   <library name="kotlinx-coroutines">
     <CLASSES>
-      <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlinx/kotlinx-coroutines-core/1.7.3/kotlinx-coroutines-core-1.7.3.jar!/" />
-      <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlinx/kotlinx-coroutines-core-jvm/1.7.3/kotlinx-coroutines-core-jvm-1.7.3.jar!/" />
+      <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlinx/kotlinx-coroutines-core-jvm/1.6.4/kotlinx-coroutines-core-jvm-1.6.4.jar!/" />
     </CLASSES>
     <JAVADOC />
-    <SOURCES />
+    <SOURCES>
+      <root url="jar://$PROJECT_DIR$/../../../prebuilts/tools/common/m2/repository/org/jetbrains/kotlinx/kotlinx-coroutines-core-jvm/1.6.4/kotlinx-coroutines-core-jvm-1.6.4-sources.jar!/" />
+    </SOURCES>
   </library>
-</component>
\ No newline at end of file
+</component>
diff --git a/app-inspection/inspectors/database/BUILD b/app-inspection/inspectors/database/BUILD
index e93a805..95d2d9f 100644
--- a/app-inspection/inspectors/database/BUILD
+++ b/app-inspection/inspectors/database/BUILD
@@ -18,6 +18,7 @@
     "//tools/base/app-inspection/inspectors/common:app-inspection.inspectors.common",
     "//tools/base/bazel:studio-proto",
     "@maven//:androidx.annotation.annotation",
+    "@maven//:org.jetbrains.kotlinx.kotlinx-coroutines-core-jvm",
 ]
 
 kotlin_library(
@@ -41,6 +42,7 @@
     bundle_srcs = [
         "//tools/base/app-inspection/inspectors/common:app-inspection.inspectors.common",
         "@maven//:org.jetbrains.kotlin.kotlin-stdlib",
+        "@maven//:org.jetbrains.kotlinx.kotlinx-coroutines-core-jvm",
     ],
     d8_flags = [
         "--min-api 26",  # Database inspector is only supported on O+ devices.
@@ -62,7 +64,8 @@
         "//tools/base/testutils:tools.testutils",
         "@maven//:com.google.truth.truth",
         "@maven//:junit.junit",
-        "@maven//:org.jetbrains.kotlinx.kotlinx-coroutines-core",
+        "@maven//:org.jetbrains.kotlinx.kotlinx-coroutines-core-jvm",
+        "@maven//:org.jetbrains.kotlinx.kotlinx-coroutines-test-jvm",
         "@maven//:org.robolectric.robolectric",
     ],
 )
diff --git a/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/RequestCollapsingThrottler.kt b/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/RequestCollapsingThrottler.kt
index 9044f4f..305cd58 100644
--- a/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/RequestCollapsingThrottler.kt
+++ b/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/RequestCollapsingThrottler.kt
@@ -15,9 +15,11 @@
  */
 package com.android.tools.appinspection.database
 
-import androidx.annotation.GuardedBy
-
-private const val NEVER: Long = -1
+import kotlin.coroutines.CoroutineContext
+import kotlin.time.Duration
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.consumeEach
 
 /**
  * Throttler implementation ensuring that events are run not more frequently that specified
@@ -25,61 +27,30 @@
  * executed).
  *
  * Thread safe.
- *
- * TODO(aalbert): This can probably be eliminated by using coroutines with a channel/flow
  */
 internal class RequestCollapsingThrottler(
-  private val minIntervalMs: Long,
+  private val minInterval: Duration,
   private val action: Runnable,
-  private val executor: DeferredExecutor,
+  coroutineContext: CoroutineContext,
 ) {
-  private val lock = Any()
+  private val channel = Channel<Unit>(Channel.CONFLATED)
+  private val supervisor = SupervisorJob()
 
-  @GuardedBy("lock") private var pendingDispatch = false
-
-  @GuardedBy("lock") private var lastSubmitted = NEVER
+  init {
+    CoroutineScope(coroutineContext + supervisor).launch {
+      channel.consumeEach {
+        action.run()
+        delay(minInterval)
+      }
+    }
+  }
 
   fun submitRequest() {
-    synchronized(lock) {
-      if (pendingDispatch) {
-        return
-      }
-      pendingDispatch = true // about to schedule
-    }
-    val delayMs = minIntervalMs - sinceLast() // delayMs < 0 is OK
-    scheduleDispatch(delayMs)
+    channel.trySend(Unit)
   }
 
-  // TODO: switch to ListenableFuture to react on failures
-  private fun scheduleDispatch(delayMs: Long) {
-    executor.schedule(
-      {
-        try {
-          action.run()
-        } finally {
-          synchronized(lock) {
-            lastSubmitted = now()
-            pendingDispatch = false
-          }
-        }
-      },
-      delayMs,
-    )
+  fun dispose() {
+    channel.close()
+    supervisor.cancel()
   }
-
-  private fun sinceLast(): Long {
-    synchronized(lock) {
-      val lastSubmitted: Long = lastSubmitted
-      return if (lastSubmitted == NEVER) (minIntervalMs + 1) // more than minIntervalMs
-      else (now() - lastSubmitted)
-    }
-  }
-
-  internal fun interface DeferredExecutor {
-    fun schedule(command: Runnable, delayMs: Long)
-  }
-}
-
-private fun now(): Long {
-  return System.currentTimeMillis()
 }
diff --git a/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/SqliteInspector.kt b/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/SqliteInspector.kt
index 3af6205..a2b1000 100644
--- a/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/SqliteInspector.kt
+++ b/app-inspection/inspectors/database/src/com/android/tools/appinspection/database/SqliteInspector.kt
@@ -70,7 +70,6 @@
 import androidx.sqlite.inspection.SqliteInspectorProtocol.Table
 import androidx.sqlite.inspection.SqliteInspectorProtocol.TrackDatabasesCommand
 import androidx.sqlite.inspection.SqliteInspectorProtocol.TrackDatabasesResponse
-import com.android.tools.appinspection.database.RequestCollapsingThrottler.DeferredExecutor
 import com.android.tools.appinspection.database.SqliteInspectionExecutors.submit
 import com.android.tools.idea.protobuf.ByteString
 import java.io.File
@@ -80,6 +79,9 @@
 import java.util.WeakHashMap
 import java.util.concurrent.Executor
 import java.util.concurrent.Future
+import kotlin.coroutines.CoroutineContext
+import kotlin.time.Duration.Companion.seconds
+import kotlinx.coroutines.asCoroutineDispatcher
 
 private const val OPEN_DATABASE_COMMAND_SIG_API_11 =
   "openDatabase" +
@@ -112,7 +114,7 @@
 private val SQLITE_STATEMENT_EXECUTE_METHODS_SIGNATURES: List<String> =
   mutableListOf("execute()V", "executeInsert()J", "executeUpdateDelete()I")
 
-private const val INVALIDATION_MIN_INTERVAL_MS = 1000
+private val INVALIDATION_MIN_INTERVAL = 1.seconds
 
 // Note: this only works on API26+ because of pragma_* functions
 // TODO: replace with a resource file
@@ -161,6 +163,8 @@
 internal class SqliteInspector(
   connection: Connection,
   private val environment: InspectorEnvironment,
+  throttlerCoroutineContext: CoroutineContext =
+    environment.executors().io().asCoroutineDispatcher(),
   testMode: Boolean = false,
 ) : Inspector(connection) {
   @VisibleForTesting
@@ -179,6 +183,13 @@
       SqlDelight2Invalidation.create(environment.artTooling()),
     )
 
+  private val throttler =
+    RequestCollapsingThrottler(
+      INVALIDATION_MIN_INTERVAL,
+      ::dispatchDatabasePossiblyChangedEvent,
+      throttlerCoroutineContext,
+    )
+
   override fun onReceiveCommand(data: ByteArray, callback: CommandCallback) {
     try {
       val command = Command.parseFrom(data)
@@ -217,6 +228,7 @@
     super.onDispose()
     databaseRegistry.dispose()
     databaseLockRegistry.dispose()
+    throttler.dispose()
   }
 
   private fun handleTrackDatabases(command: TrackDatabasesCommand, callback: CommandCallback) {
@@ -405,24 +417,9 @@
   }
 
   private fun registerInvalidationHooks(hookRegistry: EntryExitMatchingHookRegistry) {
-    /*
-     * Schedules a task using {@link mScheduledExecutor} and executes it on {@link mIOExecutor}.
-     */
-    val deferredExecutor = DeferredExecutor { command, delayMs ->
-
-      // TODO: handle errors from Future
-      environment.executors().handler().postDelayed({ ioExecutor.execute(command) }, delayMs)
-    }
-    val throttler =
-      RequestCollapsingThrottler(
-        INVALIDATION_MIN_INTERVAL_MS.toLong(),
-        { dispatchDatabasePossiblyChangedEvent() },
-        deferredExecutor,
-      )
-
-    registerInvalidationHooksSqliteStatement(throttler)
-    registerInvalidationHooksTransaction(throttler)
-    registerInvalidationHooksSQLiteCursor(throttler, hookRegistry)
+    registerInvalidationHooksSqliteStatement()
+    registerInvalidationHooksTransaction()
+    registerInvalidationHooksSQLiteCursor(hookRegistry)
   }
 
   /**
@@ -432,7 +429,7 @@
    * TODO: track if transaction committed or rolled back by observing if
    *   [ ][SQLiteDatabase.setTransactionSuccessful] was called
    */
-  private fun registerInvalidationHooksTransaction(throttler: RequestCollapsingThrottler) {
+  private fun registerInvalidationHooksTransaction() {
     environment.artTooling().registerExitHook<Any>(
       SQLiteDatabase::class.java,
       "endTransaction()V",
@@ -448,7 +445,7 @@
    * * [SQLiteStatement.executeInsert]
    * * [SQLiteStatement.executeUpdateDelete]
    */
-  private fun registerInvalidationHooksSqliteStatement(throttler: RequestCollapsingThrottler) {
+  private fun registerInvalidationHooksSqliteStatement() {
     for (method in SQLITE_STATEMENT_EXECUTE_METHODS_SIGNATURES) {
       environment.artTooling().registerExitHook<Any>(SQLiteStatement::class.java, method) { result
         ->
@@ -465,10 +462,7 @@
    * In order to access cursor's query, we also use [SQLiteDatabase.rawQueryWithFactory] which takes
    * a query String and constructs a cursor based on it.
    */
-  private fun registerInvalidationHooksSQLiteCursor(
-    throttler: RequestCollapsingThrottler,
-    hookRegistry: EntryExitMatchingHookRegistry,
-  ) {
+  private fun registerInvalidationHooksSQLiteCursor(hookRegistry: EntryExitMatchingHookRegistry) {
     // TODO: add active pruning via Cursor#close listener
 
     val trackedCursors = Collections.synchronizedMap(WeakHashMap<SQLiteCursor, Void?>())
diff --git a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/CancellationQueryTest.kt b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/CancellationQueryTest.kt
index 894a3f2..8f30dbc 100644
--- a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/CancellationQueryTest.kt
+++ b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/CancellationQueryTest.kt
@@ -29,6 +29,7 @@
 import com.google.common.truth.Truth.assertThat
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors.newCachedThreadPool
+import kotlin.coroutines.EmptyCoroutineContext
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.cancelAndJoin
 import kotlinx.coroutines.channels.Channel
@@ -54,7 +55,8 @@
 @SQLiteMode(SQLiteMode.Mode.NATIVE)
 class CancellationQueryTest {
   private val countingExecutorService = CountingDelegatingExecutorService(newCachedThreadPool())
-  private val environment = SqliteInspectorTestEnvironment(countingExecutorService)
+  private val environment =
+    SqliteInspectorTestEnvironment(countingExecutorService, EmptyCoroutineContext)
   private val temporaryFolder = TemporaryFolder()
   private val closeablesRule = CloseablesRule()
 
diff --git a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/InvalidationTest.kt b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/InvalidationTest.kt
index 08e4376..f3ba397 100644
--- a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/InvalidationTest.kt
+++ b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/InvalidationTest.kt
@@ -25,6 +25,7 @@
 import android.os.Build
 import androidx.inspection.ArtTooling
 import androidx.sqlite.inspection.SqliteInspectorProtocol
+import androidx.sqlite.inspection.SqliteInspectorProtocol.Event
 import androidx.sqlite.inspection.SqliteInspectorProtocol.Event.OneOfCase.DATABASE_POSSIBLY_CHANGED
 import com.android.testutils.CloseablesRule
 import com.android.tools.appinspection.common.testing.LogPrinterRule
@@ -38,8 +39,10 @@
 import com.android.tools.appinspection.database.testing.asExitHook
 import com.android.tools.appinspection.database.testing.createInstance
 import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.delay
 import kotlinx.coroutines.runBlocking
-import org.junit.Ignore
+import kotlinx.coroutines.test.runTest
 import org.junit.Rule
 import org.junit.Test
 import org.junit.rules.RuleChain
@@ -47,7 +50,6 @@
 import org.junit.runner.RunWith
 import org.robolectric.RobolectricTestRunner
 import org.robolectric.RuntimeEnvironment
-import org.robolectric.Shadows.shadowOf
 import org.robolectric.annotation.Config
 import org.robolectric.annotation.SQLiteMode
 import org.robolectric.junit.rules.CloseGuardRule
@@ -72,8 +74,6 @@
       .around(temporaryFolder)
       .around(LogPrinterRule())
 
-  private val shadowLooper = shadowOf(testEnvironment.getLooper())
-
   @Test
   fun test_execute_hook_methods() =
     test_simple_hook_methods("execute()V", SQLiteStatement::class.java)
@@ -101,7 +101,6 @@
 
       testEnvironment.assertNoQueuedEvents()
       hook.first().asExitHook.onExit(null)
-      shadowLooper.runToNextTask()
       testEnvironment.receiveEvent().let { event ->
         assertThat(event.oneOfCase).isEqualTo(DATABASE_POSSIBLY_CHANGED)
         assertThat(event.databasePossiblyChanged)
@@ -111,9 +110,15 @@
     }
   }
 
+  // runTest is out of experimental status, but we are still using and old coroutine-test artifact
+  @OptIn(ExperimentalCoroutinesApi::class)
   @Test
-  @Ignore("b/328819365") // Test was marked as flaky and is also ignored in the original repo
-  fun test_throttling(): Unit = runBlocking {
+  fun test_throttling() = runTest {
+    val testEnvironment =
+      closeablesRule.register(
+        SqliteInspectorTestEnvironment(ioCoroutineContextOverride = coroutineContext)
+      )
+    val events = mutableListOf<Event>()
     // Starting to track databases makes the inspector register hooks
     testEnvironment.sendCommand(MessageFactory.createTrackDatabasesCommand())
 
@@ -128,25 +133,31 @@
 
     // First invalidation triggering event
     hook.onExit(null)
-    shadowLooper.runToEndOfTasks()
-    val event1 = testEnvironment.receiveEvent()
+    events.add(testEnvironment.receiveEvent())
 
     // Shortly followed by many invalidation-triggering events
-    repeat(50) { hook.onExit(null) }
-    shadowLooper.runToEndOfTasks()
-    val event2 = testEnvironment.receiveEvent()
+    repeat(50) {
+      delay(100)
+      hook.onExit(null)
+    }
+    // 50 events with 100ms delay takes 5 seconds. Throttler delay is 1 seconds so we expect 6
+    // events.
+    repeat(6) { events.add(testEnvironment.receiveEvent()) }
 
     // Event validation
-    listOf(event1, event2).forEach { assertThat(it.oneOfCase).isEqualTo(DATABASE_POSSIBLY_CHANGED) }
+    events.forEach { assertThat(it.oneOfCase).isEqualTo(DATABASE_POSSIBLY_CHANGED) }
 
-    // Only two invalidation events received
-    shadowLooper.runToEndOfTasks()
     testEnvironment.assertNoQueuedEvents()
   }
 
+  // runTest is out of experimental status, but we are still using and old coroutine-test artifact
+  @OptIn(ExperimentalCoroutinesApi::class)
   @Test
-  @Ignore("b/328819365") // Test was marked as flaky and is also ignored in the original repo
-  fun test_cursor_methods(): Unit = runBlocking {
+  fun test_cursor_methods(): Unit = runTest {
+    val testEnvironment =
+      closeablesRule.register(
+        SqliteInspectorTestEnvironment(ioCoroutineContextOverride = coroutineContext)
+      )
     // Starting to track databases makes the inspector register hooks
     testEnvironment.sendCommand(MessageFactory.createTrackDatabasesCommand())
 
@@ -188,7 +199,6 @@
         hooks.entryHookFor(closeMethodSignature).onEntry(cursor, emptyList())
 
         if (shouldCauseInvalidation) {
-          shadowLooper.runToEndOfTasks()
           testEnvironment.receiveEvent()
         }
         testEnvironment.assertNoQueuedEvents()
@@ -197,30 +207,8 @@
 
     // no crash for unknown cursor class
     hooks.entryHookFor(rawQueryMethodSignature).onEntry(null, listOf(null, "select * from t1"))
-    hooks
-      .exitHookFor(rawQueryMethodSignature)
-      .onExit(
-        object : AbstractCursor() {
-          override fun getLong(column: Int): Long = 0
-
-          override fun getCount(): Int = 0
-
-          override fun getColumnNames(): Array<String> = emptyArray()
-
-          override fun getShort(column: Int): Short = 0
-
-          override fun getFloat(column: Int): Float = 0f
-
-          override fun getDouble(column: Int): Double = 0.0
-
-          override fun isNull(column: Int): Boolean = false
-
-          override fun getInt(column: Int): Int = 0
-
-          override fun getString(column: Int): String = ""
-        }
-      )
-
+    val unsupportedCursor = closeablesRule.register(UnsupportedCursorType())
+    hooks.exitHookFor(rawQueryMethodSignature).onExit(unsupportedCursor)
     Unit
   }
 
@@ -228,7 +216,7 @@
     val db =
       Database("ignored", Table("t1", Column("c1", "int")))
         .createInstance(closeablesRule, temporaryFolder)
-    val cursor = db.rawQuery(query, null)
+    val cursor = closeablesRule.register(db.rawQuery(query, null))
     val context = RuntimeEnvironment.getApplication()
     context.deleteDatabase(db.path)
     return cursor as SQLiteCursor
@@ -241,4 +229,24 @@
   private fun List<Hook>.exitHookFor(m: String): ArtTooling.ExitHook<Any> =
     this.first { it.originMethod == m && it is Hook.ExitHook }.asExitHook
       as ArtTooling.ExitHook<Any>
+
+  private class UnsupportedCursorType : AbstractCursor() {
+    override fun getLong(column: Int): Long = 0
+
+    override fun getCount(): Int = 0
+
+    override fun getColumnNames(): Array<String> = emptyArray()
+
+    override fun getShort(column: Int): Short = 0
+
+    override fun getFloat(column: Int): Float = 0f
+
+    override fun getDouble(column: Int): Double = 0.0
+
+    override fun isNull(column: Int): Boolean = false
+
+    override fun getInt(column: Int): Int = 0
+
+    override fun getString(column: Int): String = ""
+  }
 }
diff --git a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/SqliteInspectorTestEnvironment.kt b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/SqliteInspectorTestEnvironment.kt
index 211cde1..d46e96a 100644
--- a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/SqliteInspectorTestEnvironment.kt
+++ b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/SqliteInspectorTestEnvironment.kt
@@ -22,15 +22,21 @@
 import androidx.inspection.testing.DefaultTestInspectorEnvironment
 import androidx.inspection.testing.InspectorTester
 import androidx.inspection.testing.TestInspectorExecutors
-import androidx.sqlite.inspection.SqliteInspectorProtocol.*
+import androidx.sqlite.inspection.SqliteInspectorProtocol.Command
+import androidx.sqlite.inspection.SqliteInspectorProtocol.DatabaseOpenedEvent
+import androidx.sqlite.inspection.SqliteInspectorProtocol.Event
+import androidx.sqlite.inspection.SqliteInspectorProtocol.QueryResponse
+import androidx.sqlite.inspection.SqliteInspectorProtocol.Response
 import androidx.test.platform.app.InstrumentationRegistry
 import com.google.common.truth.Truth.assertThat
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit.SECONDS
+import kotlin.coroutines.CoroutineContext
 import kotlin.test.fail
 import kotlinx.coroutines.ExperimentalCoroutinesApi
 import kotlinx.coroutines.Job
+import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.cancelAndJoin
 import kotlinx.coroutines.runBlocking
 import org.junit.rules.ExternalResource
@@ -38,13 +44,14 @@
 internal const val SQLITE_INSPECTOR_ID = "androidx.sqlite.inspection"
 
 class SqliteInspectorTestEnvironment(
-  private val ioExecutorOverride: ExecutorService = Executors.newFixedThreadPool(4)
-) : ExternalResource() {
+  private val ioExecutorOverride: ExecutorService = Executors.newFixedThreadPool(4),
+  ioCoroutineContextOverride: CoroutineContext = ioExecutorOverride.asCoroutineDispatcher(),
+) : ExternalResource(), AutoCloseable {
   private val artTooling = FakeArtTooling()
   private val job = Job()
   private val inspectorEnvironment =
     DefaultTestInspectorEnvironment(TestInspectorExecutors(job, ioExecutorOverride), artTooling)
-  private val inspectorFactory = TestInspectorFactory()
+  private val inspectorFactory = TestInspectorFactory(ioCoroutineContextOverride)
   private val inspectorTester: InspectorTester = runBlocking {
     InspectorTester(SQLITE_INSPECTOR_ID, inspectorEnvironment, inspectorFactory)
   }
@@ -60,7 +67,9 @@
     assertThat(ioExecutorOverride.awaitTermination(5, SECONDS)).isTrue()
   }
 
-  fun getLooper() = inspectorEnvironment.executors().handler().looper
+  override fun close() {
+    after()
+  }
 
   @OptIn(ExperimentalCoroutinesApi::class)
   fun assertNoQueuedEvents() {
diff --git a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/TestInspectorFactory.kt b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/TestInspectorFactory.kt
index e394f9c..9381a7e 100644
--- a/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/TestInspectorFactory.kt
+++ b/app-inspection/inspectors/database/testSrc/com/android/tools/appinspection/database/testing/TestInspectorFactory.kt
@@ -20,10 +20,12 @@
 import androidx.inspection.InspectorEnvironment
 import androidx.inspection.InspectorFactory
 import com.android.tools.appinspection.database.SqliteInspector
+import kotlin.coroutines.CoroutineContext
 
 /** An [InspectorFactory] that gives access to the created inspector. */
-internal class TestInspectorFactory : InspectorFactory<SqliteInspector>(SQLITE_INSPECTOR_ID) {
+internal class TestInspectorFactory(private val ioCoroutineContextOverride: CoroutineContext) :
+  InspectorFactory<SqliteInspector>(SQLITE_INSPECTOR_ID) {
 
   override fun createInspector(connection: Connection, environment: InspectorEnvironment) =
-    SqliteInspector(connection, environment, testMode = true)
+    SqliteInspector(connection, environment, ioCoroutineContextOverride, testMode = true)
 }