Merge "Bump the version of the gradle lint checks library" into androidx-main
diff --git a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CameraControlAdapter.kt b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CameraControlAdapter.kt
index b6e55c4..eace746 100644
--- a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CameraControlAdapter.kt
+++ b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CameraControlAdapter.kt
@@ -30,7 +30,6 @@
 import androidx.camera.camera2.pipe.integration.impl.StillCaptureRequestControl
 import androidx.camera.camera2.pipe.integration.impl.TorchControl
 import androidx.camera.camera2.pipe.integration.impl.UseCaseCamera
-import androidx.camera.camera2.pipe.integration.impl.UseCaseThreads
 import androidx.camera.camera2.pipe.integration.impl.ZoomControl
 import androidx.camera.camera2.pipe.integration.interop.Camera2CameraControl
 import androidx.camera.camera2.pipe.integration.interop.CaptureRequestOptions
@@ -49,8 +48,8 @@
 import androidx.camera.core.impl.utils.futures.Futures
 import com.google.common.util.concurrent.ListenableFuture
 import javax.inject.Inject
+import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.async
 
 /**
  * Adapt the [CameraControlInternal] interface to [CameraPipe].
@@ -71,7 +70,6 @@
     private val focusMeteringControl: FocusMeteringControl,
     private val stillCaptureRequestControl: StillCaptureRequestControl,
     private val torchControl: TorchControl,
-    private val threads: UseCaseThreads,
     private val zoomControl: ZoomControl,
     private val zslControl: ZslControl,
     public val camera2cameraControl: Camera2CameraControl,
@@ -112,11 +110,10 @@
 
     override fun cancelFocusAndMetering(): ListenableFuture<Void> {
         return Futures.nonCancellationPropagating(
-            threads.sequentialScope
-                .async {
-                    focusMeteringControl.cancelFocusAndMeteringAsync().join()
+            CompletableDeferred<Void?>()
+                .also {
                     // Convert to null once the task is done, ignore the results.
-                    return@async null
+                    focusMeteringControl.cancelFocusAndMeteringAsync().propagateTo(it) { null }
                 }
                 .asListenableFuture()
         )
diff --git a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapters.kt b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapters.kt
index 76d7012..5353105 100644
--- a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapters.kt
+++ b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapters.kt
@@ -74,21 +74,48 @@
     return CallbackToFutureAdapter.getFuture(resolver)
 }
 
+/**
+ * Propagates the result of this to `destination` parameter when this deferred is completed.
+ *
+ * Cancelling the destination is no-op returned from this function does not cancel the `Deferred`
+ * returned by `block`.
+ */
 public fun <T> Deferred<T>.propagateTo(destination: CompletableDeferred<T>) {
-    invokeOnCompletion { propagateOnceTo(destination, it) }
+    invokeOnCompletion { propagateCompletion(destination, it) }
 }
 
-@OptIn(ExperimentalCoroutinesApi::class)
-public fun <T> Deferred<T>.propagateOnceTo(
-    destination: CompletableDeferred<T>,
-    throwable: Throwable?,
+/**
+ * Propagates the result of this to `destination` parameter when this deferred is completed.
+ *
+ * Cancelling the destination is no-op returned from this function does not cancel the `Deferred`
+ * returned by `block`.
+ *
+ * @param destination The destination [CompletableDeferred] to which result is propagated to.
+ * @param transform Transformation function to convert the result during propagation.
+ */
+public fun <T, R> Deferred<T>.propagateTo(
+    destination: CompletableDeferred<R>,
+    transform: (T) -> R,
 ) {
-    if (throwable != null) {
-        if (throwable is CancellationException) {
-            destination.cancel(throwable)
-        } else {
-            destination.completeExceptionally(throwable)
-        }
+    invokeOnCompletion { propagateCompletion(destination, it, transform) }
+}
+
+/**
+ * Propagates the result of this to `destination` parameter immediately.
+ *
+ * This function assumes that [Deferred.invokeOnCompletion] has already been invoked.
+ *
+ * @param destination The destination `Deferred` to which result is propagated to.
+ * @param completionCause The `Throwable` cause of completion that was passed in
+ *   `Deferred.invokeOnCompletion`.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public fun <T> Deferred<T>.propagateCompletion(
+    destination: CompletableDeferred<T>,
+    completionCause: Throwable?,
+) {
+    if (completionCause != null) {
+        destination.completeFailing(completionCause)
     } else {
         // Ignore exceptions - This should never throw in this situation.
         destination.complete(getCompleted())
@@ -96,6 +123,46 @@
 }
 
 /**
+ * Propagates the result of this to `destination` parameter immediately.
+ *
+ * This function assumes that [Deferred.invokeOnCompletion] has already been invoked.
+ *
+ * @param destination The destination `Deferred` to which result is propagated to.
+ * @param completionCause The `Throwable` cause of completion that was passed in
+ *   `Deferred.invokeOnCompletion`.
+ * @param transform Transformation function to convert the result during propagation.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+public fun <T, R> Deferred<T>.propagateCompletion(
+    destination: CompletableDeferred<R>,
+    completionCause: Throwable?,
+    transform: (T) -> R,
+) {
+    if (completionCause != null) {
+        destination.completeFailing(completionCause)
+    } else {
+        // Ignore exceptions - This should never throw in this situation.
+        destination.complete(transform(getCompleted()))
+    }
+}
+
+/**
+ * Completes this `Deferred` as failure based on the provided `cause`.
+ *
+ * @param cause If it's an instance of [CancellationException], [Deferred.cancel] is invoked for
+ *   this, otherwise, [CompletableDeferred.completeExceptionally] is invoked.
+ */
+public fun <T> CompletableDeferred<T>.completeFailing(
+    cause: Throwable,
+) {
+    if (cause is CancellationException) {
+        cancel(cause)
+    } else {
+        completeExceptionally(cause)
+    }
+}
+
+/**
  * Waits for [Deferred.await] to be completed until the given timeout.
  *
  * @return true if `Deferred.await` had completed, false otherwise.
diff --git a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/MeteringRepeating.kt b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/MeteringRepeating.kt
index 82da040..2080cb8 100644
--- a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/MeteringRepeating.kt
+++ b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/MeteringRepeating.kt
@@ -45,6 +45,7 @@
 import androidx.camera.core.impl.UseCaseConfigFactory
 import androidx.camera.core.impl.UseCaseConfigFactory.CaptureType
 import androidx.camera.core.impl.utils.executor.CameraXExecutors
+import androidx.camera.core.internal.TargetConfig.OPTION_TARGET_NAME
 import kotlin.math.min
 
 private val DEFAULT_PREVIEW_SIZE = Size(0, 0)
@@ -203,6 +204,7 @@
                     OPTION_SESSION_CONFIG_UNPACKER,
                     CameraUseCaseAdapter.DefaultSessionOptionsUnpacker
                 )
+                insertOption(OPTION_TARGET_NAME, "MeteringRepeating")
                 insertOption(OPTION_CAPTURE_TYPE, CaptureType.METERING_REPEATING)
             }
 
diff --git a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/StillCaptureRequestControl.kt b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/StillCaptureRequestControl.kt
index 7ffef5d..94f03ea 100644
--- a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/StillCaptureRequestControl.kt
+++ b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/StillCaptureRequestControl.kt
@@ -19,7 +19,7 @@
 import androidx.annotation.GuardedBy
 import androidx.camera.camera2.pipe.core.Log.debug
 import androidx.camera.camera2.pipe.integration.adapter.asListenableFuture
-import androidx.camera.camera2.pipe.integration.adapter.propagateOnceTo
+import androidx.camera.camera2.pipe.integration.adapter.propagateCompletion
 import androidx.camera.camera2.pipe.integration.config.CameraScope
 import androidx.camera.core.ImageCapture
 import androidx.camera.core.ImageCaptureException
@@ -197,7 +197,7 @@
                     }
                 }
             } else {
-                propagateOnceTo(submittedRequest.result, cause)
+                propagateCompletion(submittedRequest.result, cause)
             }
         }
     }
diff --git a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManager.kt b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManager.kt
index 54adf25..cedc7c4 100644
--- a/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManager.kt
+++ b/camera/camera-camera2-pipe-integration/src/main/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManager.kt
@@ -17,6 +17,7 @@
 package androidx.camera.camera2.pipe.integration.impl
 
 import android.content.Context
+import android.graphics.ImageFormat
 import android.hardware.camera2.CameraCharacteristics
 import android.hardware.camera2.CameraDevice.TEMPLATE_PREVIEW
 import android.hardware.camera2.CaptureRequest
@@ -55,23 +56,30 @@
 import androidx.camera.camera2.pipe.integration.config.UseCaseCameraComponent
 import androidx.camera.camera2.pipe.integration.config.UseCaseCameraConfig
 import androidx.camera.camera2.pipe.integration.config.UseCaseGraphConfig
+import androidx.camera.camera2.pipe.integration.internal.DynamicRangeResolver
 import androidx.camera.camera2.pipe.integration.interop.Camera2CameraControl
 import androidx.camera.camera2.pipe.integration.interop.ExperimentalCamera2Interop
 import androidx.camera.core.DynamicRange
+import androidx.camera.core.ImageCapture
 import androidx.camera.core.MirrorMode
+import androidx.camera.core.Preview
 import androidx.camera.core.UseCase
+import androidx.camera.core.impl.AttachedSurfaceInfo
 import androidx.camera.core.impl.CameraControlInternal
 import androidx.camera.core.impl.CameraInfoInternal
 import androidx.camera.core.impl.CameraInternal
 import androidx.camera.core.impl.CameraMode
 import androidx.camera.core.impl.CaptureConfig
 import androidx.camera.core.impl.DeferrableSurface
-import androidx.camera.core.impl.PreviewConfig
+import androidx.camera.core.impl.MutableOptionsBundle
 import androidx.camera.core.impl.SessionConfig
 import androidx.camera.core.impl.SessionConfig.OutputConfig.SURFACE_GROUP_ID_NONE
 import androidx.camera.core.impl.SessionConfig.ValidatingBuilder
 import androidx.camera.core.impl.SessionProcessor
+import androidx.camera.core.impl.SurfaceConfig
 import androidx.camera.core.impl.stabilization.StabilizationMode
+import androidx.camera.core.streamsharing.StreamSharing
+import androidx.camera.core.streamsharing.StreamSharingConfig
 import javax.inject.Inject
 import javax.inject.Provider
 import kotlinx.coroutines.Deferred
@@ -171,6 +179,8 @@
         )
     }
 
+    private val dynamicRangeResolver = DynamicRangeResolver(cameraProperties.metadata)
+
     @Volatile private var _activeComponent: UseCaseCameraComponent? = null
     public val camera: UseCaseCamera?
         get() = _activeComponent?.getUseCaseCamera()
@@ -602,7 +612,7 @@
             return activeSurfaces > 0 &&
                 with(attachedUseCases.withoutMetering()) {
                     (onlyVideoCapture() || requireMeteringRepeating()) &&
-                        supportMeteringCombination()
+                        isMeteringCombinationSupported()
                 }
         }
         return false
@@ -624,7 +634,7 @@
             return activeSurfaces == 0 ||
                 with(attachedUseCases.withoutMetering()) {
                     !(onlyVideoCapture() || requireMeteringRepeating()) ||
-                        !supportMeteringCombination()
+                        !isMeteringCombinationSupported()
                 }
         }
         return false
@@ -664,46 +674,133 @@
             }
     }
 
-    private fun Collection<UseCase>.supportMeteringCombination(): Boolean {
-        val useCases = this.toMutableList().apply { add(meteringRepeating) }
+    private fun Collection<UseCase>.isMeteringCombinationSupported(): Boolean {
         if (meteringRepeating.attachedSurfaceResolution == null) {
             meteringRepeating.setupSession()
         }
-        return isCombinationSupported(useCases).also {
-            Log.debug { "Combination of $useCases is supported: $it" }
+
+        val attachedSurfaceInfoList = getAttachedSurfaceInfoList()
+
+        if (attachedSurfaceInfoList.isEmpty()) {
+            return false
         }
+
+        val sessionSurfacesConfigs = getSessionSurfacesConfigs()
+
+        return supportedSurfaceCombination
+            .checkSupported(
+                SupportedSurfaceCombination.FeatureSettings(
+                    CameraMode.DEFAULT,
+                    getRequiredMaxBitDepth(attachedSurfaceInfoList),
+                    isPreviewStabilizationOn(),
+                    isUltraHdrOn()
+                ),
+                mutableListOf<SurfaceConfig>().apply {
+                    addAll(sessionSurfacesConfigs)
+                    add(createMeteringRepeatingSurfaceConfig())
+                }
+            )
+            .also {
+                Log.debug {
+                    "Combination of $sessionSurfacesConfigs + $meteringRepeating is supported: $it"
+                }
+            }
     }
 
-    private fun isCombinationSupported(currentUseCases: Collection<UseCase>): Boolean {
-        val surfaceConfigs =
-            currentUseCases.map { useCase ->
-                // TODO: Test with correct Camera Mode when concurrent mode / ultra high resolution
-                // is
-                //  implemented.
-                supportedSurfaceCombination.transformSurfaceConfig(
-                    CameraMode.DEFAULT,
-                    useCase.imageFormat,
-                    useCase.attachedSurfaceResolution!!
+    private fun getRequiredMaxBitDepth(attachedSurfaceInfoList: List<AttachedSurfaceInfo>): Int {
+        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
+            dynamicRangeResolver
+                .resolveAndValidateDynamicRanges(
+                    attachedSurfaceInfoList,
+                    listOf(meteringRepeating.currentConfig),
+                    listOf(0)
+                )
+                .forEach { (_, u) ->
+                    if (u.bitDepth == DynamicRange.BIT_DEPTH_10_BIT) {
+                        return DynamicRange.BIT_DEPTH_10_BIT
+                    }
+                }
+        }
+
+        return DynamicRange.BIT_DEPTH_8_BIT
+    }
+
+    private fun Collection<UseCase>.getAttachedSurfaceInfoList(): List<AttachedSurfaceInfo> =
+        mutableListOf<AttachedSurfaceInfo>().apply {
+            this@getAttachedSurfaceInfoList.forEach { useCase ->
+                val surfaceResolution = useCase.attachedSurfaceResolution
+                val streamSpec = useCase.attachedStreamSpec
+
+                // When collecting the info, the UseCases might be unbound to make these info
+                // become null.
+                if (surfaceResolution == null || streamSpec == null) {
+                    Log.warn { "Invalid surface resolution or stream spec is found." }
+                    clear()
+                    return@apply
+                }
+
+                val surfaceConfig =
+                    supportedSurfaceCombination.transformSurfaceConfig(
+                        // TODO: Test with correct Camera Mode when concurrent mode / ultra high
+                        // resolution is implemented.
+                        CameraMode.DEFAULT,
+                        useCase.currentConfig.inputFormat,
+                        surfaceResolution
+                    )
+                add(
+                    AttachedSurfaceInfo.create(
+                        surfaceConfig,
+                        useCase.currentConfig.inputFormat,
+                        surfaceResolution,
+                        streamSpec.dynamicRange,
+                        useCase.getCaptureTypes(),
+                        streamSpec.implementationOptions ?: MutableOptionsBundle.create(),
+                        useCase.currentConfig.getTargetFrameRate(null)
+                    )
                 )
             }
+        }
 
-        var isPreviewStabilizationOn = false
-        for (useCase in currentUseCases) {
-            if (useCase.currentConfig is PreviewConfig) {
-                isPreviewStabilizationOn =
-                    useCase.currentConfig.previewStabilizationMode == StabilizationMode.ON
+    private fun UseCase.getCaptureTypes() =
+        if (this is StreamSharing) {
+            (currentConfig as StreamSharingConfig).captureTypes
+        } else {
+            listOf(currentConfig.captureType)
+        }
+
+    private fun Collection<UseCase>.isPreviewStabilizationOn() =
+        filterIsInstance<Preview>().firstOrNull()?.currentConfig?.previewStabilizationMode ==
+            StabilizationMode.ON
+
+    private fun Collection<UseCase>.isUltraHdrOn() =
+        filterIsInstance<ImageCapture>().firstOrNull()?.currentConfig?.inputFormat ==
+            ImageFormat.JPEG_R
+
+    private fun Collection<UseCase>.getSessionSurfacesConfigs(): List<SurfaceConfig> =
+        mutableListOf<SurfaceConfig>().apply {
+            this@getSessionSurfacesConfigs.forEach { useCase ->
+                useCase.sessionConfig.surfaces.forEach { deferrableSurface ->
+                    add(
+                        supportedSurfaceCombination.transformSurfaceConfig(
+                            // TODO: Test with correct Camera Mode when concurrent mode / ultra high
+                            // resolution is implemented.
+                            CameraMode.DEFAULT,
+                            useCase.currentConfig.inputFormat,
+                            deferrableSurface.prescribedSize
+                        )
+                    )
+                }
             }
         }
 
-        return supportedSurfaceCombination.checkSupported(
-            SupportedSurfaceCombination.FeatureSettings(
-                CameraMode.DEFAULT,
-                DynamicRange.BIT_DEPTH_8_BIT,
-                isPreviewStabilizationOn
-            ),
-            surfaceConfigs
+    private fun createMeteringRepeatingSurfaceConfig() =
+        supportedSurfaceCombination.transformSurfaceConfig(
+            // TODO: Test with correct Camera Mode when concurrent mode / ultra high resolution is
+            // implemented.
+            CameraMode.DEFAULT,
+            meteringRepeating.imageFormat,
+            meteringRepeating.attachedSurfaceResolution!!
         )
-    }
 
     private fun Collection<UseCase>.surfaceCount(): Int =
         ValidatingBuilder().let { validatingBuilder ->
diff --git a/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapterTest.kt b/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapterTest.kt
index 9b27a11..28b47b6 100644
--- a/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapterTest.kt
+++ b/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/adapter/CoroutineAdapterTest.kt
@@ -46,6 +46,23 @@
     }
 
     @Test
+    fun propagateTransformedCompleteResult(): Unit = runBlocking {
+        // Arrange.
+        val resultValue = 123
+        val resultValueTransformed = resultValue.toString()
+
+        val sourceDeferred = CompletableDeferred<Int>()
+        val resultDeferred = CompletableDeferred<String>()
+        sourceDeferred.propagateTo(resultDeferred) { res -> res.toString() }
+
+        // Act.
+        sourceDeferred.complete(resultValue)
+
+        // Assert.
+        assertThat(resultDeferred.await()).isEqualTo(resultValueTransformed)
+    }
+
+    @Test
     fun propagateCancelResult() {
         // Arrange.
         val sourceDeferred = CompletableDeferred<Unit>()
@@ -59,6 +76,20 @@
         assertThat(resultDeferred.isCancelled).isTrue()
     }
 
+    @Test
+    fun propagateCancelResult_whenTransformFunctionIsUsed() {
+        // Arrange.
+        val sourceDeferred = CompletableDeferred<Unit>()
+        val resultDeferred = CompletableDeferred<Unit>()
+        sourceDeferred.propagateTo(resultDeferred) { res -> res.toString() }
+
+        // Act.
+        sourceDeferred.cancel()
+
+        // Assert.
+        assertThat(resultDeferred.isCancelled).isTrue()
+    }
+
     @OptIn(ExperimentalCoroutinesApi::class)
     @Test
     fun propagateExceptionResult() {
@@ -74,4 +105,20 @@
         // Assert.
         assertThat(resultDeferred.getCompletionExceptionOrNull()).isSameInstanceAs(testThrowable)
     }
+
+    @OptIn(ExperimentalCoroutinesApi::class)
+    @Test
+    fun propagateExceptionResult_whenTransformFunctionIsUsed() {
+        // Arrange.
+        val sourceDeferred = CompletableDeferred<Unit>()
+        val resultDeferred = CompletableDeferred<Unit>()
+        sourceDeferred.propagateTo(resultDeferred) { res -> res.toString() }
+        val testThrowable = Throwable()
+
+        // Act.
+        sourceDeferred.completeExceptionally(testThrowable)
+
+        // Assert.
+        assertThat(resultDeferred.getCompletionExceptionOrNull()).isSameInstanceAs(testThrowable)
+    }
 }
diff --git a/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManagerTest.kt b/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManagerTest.kt
index 37d015b..89e2eb6 100644
--- a/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManagerTest.kt
+++ b/camera/camera-camera2-pipe-integration/src/test/java/androidx/camera/camera2/pipe/integration/impl/UseCaseManagerTest.kt
@@ -228,6 +228,67 @@
     }
 
     @Test
+    fun meteringRepeatingEnabled_whenPreviewEnabledWithNoSurfaceProvider() = runTest {
+        // Arrange
+        initializeUseCaseThreads(this)
+        val useCaseManager = createUseCaseManager()
+        val preview = createPreview(/* withSurfaceProvider= */ false)
+        val imageCapture = createImageCapture()
+        useCaseManager.attach(listOf(preview, imageCapture))
+
+        // Act
+        useCaseManager.activate(preview)
+        useCaseManager.activate(imageCapture)
+
+        // Assert
+        val enabledUseCaseClasses =
+            useCaseManager.getRunningUseCasesForTest().map { it::class.java }
+        assertThat(enabledUseCaseClasses)
+            .containsExactly(
+                Preview::class.java,
+                ImageCapture::class.java,
+                MeteringRepeating::class.java
+            )
+    }
+
+    @Test
+    fun meteringRepeatingNotEnabled_whenImageAnalysisAndPreviewWithNoSurfaceProvider() = runTest {
+        // Arrange
+        initializeUseCaseThreads(this)
+        val useCaseManager = createUseCaseManager()
+        val preview = createPreview(/* withSurfaceProvider= */ false)
+        val imageAnalysis =
+            ImageAnalysis.Builder().build().apply {
+                setAnalyzer(useCaseThreads.backgroundExecutor) { image -> image.close() }
+            }
+        useCaseManager.attach(listOf(preview, imageAnalysis))
+
+        // Act
+        useCaseManager.activate(preview)
+        useCaseManager.activate(imageAnalysis)
+
+        // Assert
+        val enabledUseCases = useCaseManager.getRunningUseCasesForTest()
+        assertThat(enabledUseCases).containsExactly(preview, imageAnalysis)
+    }
+
+    @Test
+    fun meteringRepeatingNotEnabled_whenOnlyPreviewWithNoSurfaceProvider() = runTest {
+        // Arrange
+        initializeUseCaseThreads(this)
+        val useCaseManager = createUseCaseManager()
+        val preview = createPreview(/* withSurfaceProvider= */ false)
+        useCaseManager.attach(listOf(preview))
+
+        // Act
+        useCaseManager.activate(preview)
+
+        // Assert
+        val enabledUseCases = useCaseManager.getRunningUseCasesForTest()
+        assertThat(enabledUseCases).containsExactly(preview)
+    }
+
+    @Test
     fun meteringRepeatingEnabled_whenOnlyImageCaptureEnabled() = runTest {
         // Arrange
         initializeUseCaseThreads(this)
@@ -736,16 +797,18 @@
                 useCaseList.add(it)
             }
 
-    private fun createPreview(): Preview =
+    private fun createPreview(withSurfaceProvider: Boolean = true): Preview =
         Preview.Builder()
             .setCaptureOptionUnpacker(CameraUseCaseAdapter.DefaultCaptureOptionsUnpacker.INSTANCE)
             .setSessionOptionUnpacker(CameraUseCaseAdapter.DefaultSessionOptionsUnpacker)
             .build()
             .apply {
-                setSurfaceProvider(
-                    CameraXExecutors.mainThreadExecutor(),
-                    SurfaceTextureProvider.createSurfaceTextureProvider()
-                )
+                if (withSurfaceProvider) {
+                    setSurfaceProvider(
+                        CameraXExecutors.mainThreadExecutor(),
+                        SurfaceTextureProvider.createSurfaceTextureProvider()
+                    )
+                }
             }
             .also {
                 it.simulateActivation()
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraBackend.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraBackend.kt
index 9ed683c..5d5ee33 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraBackend.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/CameraBackend.kt
@@ -40,7 +40,7 @@
         }
 
         public class CameraAvailable(public val cameraId: CameraId) : CameraStatus() {
-            override fun toString(): String = "CameraAvailable(camera=$cameraId"
+            override fun toString(): String = "CameraAvailable(camera=$cameraId)"
         }
     }
 }
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Camera2CameraController.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Camera2CameraController.kt
index 16a26fd..efe9f68 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Camera2CameraController.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Camera2CameraController.kt
@@ -176,7 +176,7 @@
                 ControllerState.ERROR ->
                     if (
                         cameraStatus is CameraStatus.CameraAvailable &&
-                            lastCameraError == CameraError.ERROR_CAMERA_DEVICE
+                            lastCameraError != CameraError.ERROR_GRAPH_CONFIG
                     ) {
                         shouldRestart = true
                     }
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Exceptions.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Exceptions.kt
index 8039abf..f7532e5 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Exceptions.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/compat/Exceptions.kt
@@ -55,9 +55,19 @@
     } catch (e: Exception) {
         Log.warn { "Unexpected error: " + e.message }
         when (e) {
+            is CameraAccessException -> {
+                cameraErrorListener.onCameraError(
+                    cameraId,
+                    CameraError.from(e),
+                    // CameraAccessException indicates the task failed because the camera is
+                    // unavailable, such as when the camera is in use or disconnected. Such errors
+                    // can be recovered when the camera becomes available.
+                    willAttemptRetry = true,
+                )
+                return null
+            }
             is IllegalArgumentException,
             is IllegalStateException,
-            is CameraAccessException,
             is SecurityException,
             is UnsupportedOperationException,
             is NullPointerException -> {
diff --git a/camera/integration-tests/coretestapp/src/androidTest/java/androidx/camera/integration/core/ResolutionSelectorDeviceTest.kt b/camera/integration-tests/coretestapp/src/androidTest/java/androidx/camera/integration/core/ResolutionSelectorDeviceTest.kt
new file mode 100644
index 0000000..9e9562c
--- /dev/null
+++ b/camera/integration-tests/coretestapp/src/androidTest/java/androidx/camera/integration/core/ResolutionSelectorDeviceTest.kt
@@ -0,0 +1,692 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.integration.core
+
+import android.Manifest
+import android.content.Context
+import android.graphics.ImageFormat
+import android.graphics.Point
+import android.graphics.SurfaceTexture
+import android.hardware.camera2.CameraCharacteristics
+import android.hardware.camera2.CameraCharacteristics.CONTROL_AE_AVAILABLE_TARGET_FPS_RANGES
+import android.util.Log
+import android.util.Range
+import android.util.Rational
+import android.util.Size
+import androidx.camera.camera2.Camera2Config
+import androidx.camera.camera2.internal.DisplayInfoManager
+import androidx.camera.camera2.pipe.integration.CameraPipeConfig
+import androidx.camera.core.AspectRatio.RATIO_16_9
+import androidx.camera.core.AspectRatio.RATIO_4_3
+import androidx.camera.core.Camera
+import androidx.camera.core.CameraSelector
+import androidx.camera.core.CameraXConfig
+import androidx.camera.core.ImageAnalysis
+import androidx.camera.core.ImageCapture
+import androidx.camera.core.Preview
+import androidx.camera.core.UseCase
+import androidx.camera.core.impl.CameraInfoInternal
+import androidx.camera.core.impl.ImageFormatConstants
+import androidx.camera.core.impl.ImageOutputConfig
+import androidx.camera.core.impl.Quirk
+import androidx.camera.core.impl.RestrictedCameraControl
+import androidx.camera.core.impl.utils.AspectRatioUtil
+import androidx.camera.core.impl.utils.AspectRatioUtil.ASPECT_RATIO_16_9
+import androidx.camera.core.impl.utils.AspectRatioUtil.ASPECT_RATIO_4_3
+import androidx.camera.core.impl.utils.AspectRatioUtil.hasMatchingAspectRatio
+import androidx.camera.core.impl.utils.CompareSizesByArea
+import androidx.camera.core.internal.utils.SizeUtil
+import androidx.camera.core.internal.utils.SizeUtil.RESOLUTION_1080P
+import androidx.camera.core.resolutionselector.AspectRatioStrategy
+import androidx.camera.core.resolutionselector.AspectRatioStrategy.FALLBACK_RULE_AUTO
+import androidx.camera.core.resolutionselector.ResolutionFilter
+import androidx.camera.core.resolutionselector.ResolutionSelector
+import androidx.camera.core.resolutionselector.ResolutionSelector.PREFER_CAPTURE_RATE_OVER_HIGHER_RESOLUTION
+import androidx.camera.core.resolutionselector.ResolutionSelector.PREFER_HIGHER_RESOLUTION_OVER_CAPTURE_RATE
+import androidx.camera.core.resolutionselector.ResolutionStrategy
+import androidx.camera.core.resolutionselector.ResolutionStrategy.FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER
+import androidx.camera.lifecycle.ProcessCameraProvider
+import androidx.camera.testing.impl.CameraPipeConfigTestRule
+import androidx.camera.testing.impl.CameraUtil
+import androidx.camera.testing.impl.fakes.FakeLifecycleOwner
+import androidx.test.core.app.ApplicationProvider
+import androidx.test.filters.LargeTest
+import androidx.test.filters.SdkSuppress
+import androidx.test.platform.app.InstrumentationRegistry
+import androidx.test.rule.GrantPermissionRule
+import androidx.testutils.fail
+import com.google.common.truth.Truth.assertThat
+import java.util.concurrent.TimeUnit
+import org.junit.After
+import org.junit.Assume.assumeFalse
+import org.junit.Assume.assumeTrue
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+/**
+ * ResolutionSelector related test on the real device.
+ *
+ * Make the ResolutionSelectorDeviceTest focus on the generic ResolutionSelector selection results
+ * for all the normal devices. Skips the tests when the devices have any of the quirks that might
+ * affect the selected resolution.
+ */
+@LargeTest
+@RunWith(Parameterized::class)
+@SdkSuppress(minSdkVersion = 21)
+class ResolutionSelectorDeviceTest(
+    private val implName: String,
+    private var cameraSelector: CameraSelector,
+    private val cameraConfig: CameraXConfig,
+) {
+    @get:Rule
+    val cameraPipeConfigTestRule =
+        CameraPipeConfigTestRule(
+            active = implName.contains(CameraPipeConfig::class.simpleName!!),
+        )
+
+    @get:Rule
+    val cameraRule =
+        CameraUtil.grantCameraPermissionAndPreTestAndPostTest(
+            CameraUtil.PreTestCameraIdList(cameraConfig)
+        )
+
+    @get:Rule
+    val permissionRule: GrantPermissionRule =
+        GrantPermissionRule.grant(Manifest.permission.RECORD_AUDIO)
+
+    private val useCaseFormatMap =
+        mapOf(
+            Pair(Preview::class.java, ImageFormatConstants.INTERNAL_DEFINED_IMAGE_FORMAT_PRIVATE),
+            Pair(ImageCapture::class.java, ImageFormat.JPEG),
+            Pair(ImageAnalysis::class.java, ImageFormat.YUV_420_888)
+        )
+
+    companion object {
+        @JvmStatic
+        @Parameterized.Parameters(name = "{0}")
+        fun data() =
+            listOf(
+                arrayOf(
+                    "back+" + Camera2Config::class.simpleName,
+                    CameraSelector.DEFAULT_BACK_CAMERA,
+                    Camera2Config.defaultConfig(),
+                ),
+                arrayOf(
+                    "front+" + Camera2Config::class.simpleName,
+                    CameraSelector.DEFAULT_FRONT_CAMERA,
+                    Camera2Config.defaultConfig(),
+                ),
+                arrayOf(
+                    "back+" + CameraPipeConfig::class.simpleName,
+                    CameraSelector.DEFAULT_BACK_CAMERA,
+                    CameraPipeConfig.defaultConfig(),
+                ),
+                arrayOf(
+                    "front+" + CameraPipeConfig::class.simpleName,
+                    CameraSelector.DEFAULT_FRONT_CAMERA,
+                    CameraPipeConfig.defaultConfig(),
+                ),
+            )
+    }
+
+    private val instrumentation = InstrumentationRegistry.getInstrumentation()
+    private val context: Context = ApplicationProvider.getApplicationContext()
+    private lateinit var cameraProvider: ProcessCameraProvider
+    private lateinit var lifecycleOwner: FakeLifecycleOwner
+    private lateinit var camera: Camera
+    private lateinit var cameraInfoInternal: CameraInfoInternal
+
+    @Before
+    fun initializeCameraX() {
+        assumeTrue(CameraUtil.hasCameraWithLensFacing(cameraSelector.lensFacing!!))
+        ProcessCameraProvider.configureInstance(cameraConfig)
+        cameraProvider = ProcessCameraProvider.getInstance(context)[10, TimeUnit.SECONDS]
+
+        instrumentation.runOnMainSync {
+            lifecycleOwner = FakeLifecycleOwner()
+            lifecycleOwner.startAndResume()
+
+            camera = cameraProvider.bindToLifecycle(lifecycleOwner, cameraSelector)
+            cameraInfoInternal = camera.cameraInfo as CameraInfoInternal
+        }
+
+        assumeNotAspectRatioQuirkDevice()
+        assumeNotOutputSizeQuirkDevice()
+    }
+
+    @After
+    fun shutdownCameraX() {
+        if (::cameraProvider.isInitialized) {
+            cameraProvider.shutdownAsync()[10, TimeUnit.SECONDS]
+        }
+    }
+
+    @Test
+    fun canSelect4x3ResolutionForPreviewImageCaptureAndImageAnalysis() {
+        canSelectTargetAspectRatioResolutionForPreviewImageCaptureAndImageAnalysis(RATIO_4_3)
+    }
+
+    @Test
+    fun canSelect16x9ResolutionForPreviewImageCaptureAndImageAnalysis() {
+        canSelectTargetAspectRatioResolutionForPreviewImageCaptureAndImageAnalysis(RATIO_16_9)
+    }
+
+    private fun canSelectTargetAspectRatioResolutionForPreviewImageCaptureAndImageAnalysis(
+        targetAspectRatio: Int
+    ) {
+        val preview = createUseCaseWithResolutionSelector(Preview::class.java, targetAspectRatio)
+        val imageCapture =
+            createUseCaseWithResolutionSelector(ImageCapture::class.java, targetAspectRatio)
+        val imageAnalysis =
+            createUseCaseWithResolutionSelector(ImageAnalysis::class.java, targetAspectRatio)
+        instrumentation.runOnMainSync {
+            cameraProvider.bindToLifecycle(
+                lifecycleOwner,
+                cameraSelector,
+                preview,
+                imageCapture,
+                imageAnalysis
+            )
+        }
+        assertThat(isResolutionAspectRatioBestMatched(preview, targetAspectRatio)).isTrue()
+        assertThat(isResolutionAspectRatioBestMatched(imageCapture, targetAspectRatio)).isTrue()
+        assertThat(isResolutionAspectRatioBestMatched(imageAnalysis, targetAspectRatio)).isTrue()
+    }
+
+    private fun isResolutionAspectRatioBestMatched(
+        useCase: UseCase,
+        targetAspectRatio: Int
+    ): Boolean {
+        val isMatched =
+            hasMatchingAspectRatio(
+                useCase.attachedSurfaceResolution!!,
+                aspectRatioToRational(targetAspectRatio)
+            )
+
+        if (isMatched) {
+            return true
+        }
+
+        // PRIV/PREVIEW + YUV/PREVIEW + JPEG/MAXIMUM will be used to select resolutions for the
+        // combination of Preview + ImageAnalysis + ImageCapture
+        val closestAspectRatioSizes =
+            if (useCase is Preview || useCase is ImageAnalysis) {
+                getClosestAspectRatioSizesUnderPreviewSize(targetAspectRatio, useCase.javaClass)
+            } else {
+                getClosestAspectRatioSizes(targetAspectRatio, useCase.javaClass)
+            }
+
+        Log.d(
+            "ResolutionSelectorDeviceTest",
+            "The selected resolution (${useCase.attachedSurfaceResolution!!}) does not exactly" +
+                " match the target aspect ratio. It is selected from the closest aspect ratio" +
+                " sizes: $closestAspectRatioSizes"
+        )
+
+        return closestAspectRatioSizes.contains(useCase.attachedSurfaceResolution!!)
+    }
+
+    @Test
+    fun canSelect4x3ResolutionForPreviewByResolutionStrategy() =
+        canSelectResolutionByResolutionStrategy(Preview::class.java, RATIO_4_3)
+
+    @Test
+    fun canSelect16x9ResolutionForPreviewByResolutionStrategy() =
+        canSelectResolutionByResolutionStrategy(Preview::class.java, RATIO_16_9)
+
+    @Test
+    fun canSelect4x3ResolutionForImageCaptureByResolutionStrategy() =
+        canSelectResolutionByResolutionStrategy(ImageCapture::class.java, RATIO_4_3)
+
+    @Test
+    fun canSelect16x9ResolutionForImageCaptureByResolutionStrategy() =
+        canSelectResolutionByResolutionStrategy(ImageCapture::class.java, RATIO_16_9)
+
+    @Test
+    fun canSelect4x3ResolutionForImageAnalysisByResolutionStrategy() =
+        canSelectResolutionByResolutionStrategy(ImageAnalysis::class.java, RATIO_4_3)
+
+    @Test
+    fun canSelect16x9ResolutionForImageAnalysisByResolutionStrategy() =
+        canSelectResolutionByResolutionStrategy(ImageAnalysis::class.java, RATIO_16_9)
+
+    private fun <T : UseCase> canSelectResolutionByResolutionStrategy(
+        useCaseClass: Class<T>,
+        ratio: Int
+    ) {
+        // Filters the output sizes matching the target aspect ratio
+        cameraInfoInternal
+            .getSupportedResolutions(useCaseFormatMap[useCaseClass]!!)
+            .filter { size -> hasMatchingAspectRatio(size, aspectRatioToRational(ratio)) }
+            .let {
+                // Picks the item in the middle of the list to run the test
+                it.elementAtOrNull(it.size / 2)?.let { boundSize ->
+                    {
+                        val useCase =
+                            createUseCaseWithResolutionSelector(
+                                useCaseClass,
+                                aspectRatio = ratio,
+                                aspectRatioStrategyFallbackRule = FALLBACK_RULE_AUTO,
+                                boundSize = boundSize,
+                                resolutionStrategyFallbackRule =
+                                    FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER
+                            )
+                        instrumentation.runOnMainSync {
+                            cameraProvider.unbindAll()
+                            cameraProvider.bindToLifecycle(lifecycleOwner, cameraSelector, useCase)
+                        }
+                        assertThat(useCase.attachedSurfaceResolution).isEqualTo(boundSize)
+                    }
+                }
+            }
+    }
+
+    @Test
+    fun canSelectAnyResolutionForPreviewByResolutionFilter() =
+        canSelectAnyResolutionByResolutionFilter(
+            Preview::class.java,
+            // For Preview, need to override resolution strategy so that the output sizes larger
+            // than PREVIEW size can be selected.
+            cameraInfoInternal
+                .getSupportedResolutions(useCaseFormatMap[Preview::class.java]!!)
+                .maxWithOrNull(CompareSizesByArea())
+        )
+
+    @Test
+    fun canSelectAnyHighResolutionForPreviewByResolutionFilter() =
+        canSelectAnyHighResolutionByResolutionFilter(
+            Preview::class.java,
+            // For Preview, need to override resolution strategy so that the output sizes larger
+            // than PREVIEW size can be selected.
+            cameraInfoInternal
+                .getSupportedHighResolutions(useCaseFormatMap[Preview::class.java]!!)
+                .maxWithOrNull(CompareSizesByArea())
+        )
+
+    @Test
+    fun canSelectAnyResolutionForImageCaptureByResolutionFilter() =
+        canSelectAnyResolutionByResolutionFilter(ImageCapture::class.java)
+
+    @Test
+    fun canSelectAnyHighResolutionForImageCaptureByResolutionFilter() =
+        canSelectAnyHighResolutionByResolutionFilter(ImageCapture::class.java)
+
+    @Test
+    fun canSelectAnyResolutionForImageAnalysisByResolutionFilter() =
+        canSelectAnyResolutionByResolutionFilter(ImageAnalysis::class.java)
+
+    @Test
+    fun canSelectAnyHighResolutionForImageAnalysisByResolutionFilter() =
+        canSelectAnyHighResolutionByResolutionFilter(ImageAnalysis::class.java)
+
+    private fun <T : UseCase> canSelectAnyResolutionByResolutionFilter(
+        useCaseClass: Class<T>,
+        boundSize: Size? = null,
+        resolutionStrategyFallbackRule: Int = FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER
+    ) =
+        canSelectAnyResolutionByResolutionFilter(
+            useCaseClass,
+            cameraInfoInternal.getSupportedResolutions(useCaseFormatMap[useCaseClass]!!),
+            boundSize,
+            resolutionStrategyFallbackRule
+        )
+
+    private fun <T : UseCase> canSelectAnyHighResolutionByResolutionFilter(
+        useCaseClass: Class<T>,
+        boundSize: Size? = null,
+        resolutionStrategyFallbackRule: Int = FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER
+    ) =
+        canSelectAnyResolutionByResolutionFilter(
+            useCaseClass,
+            cameraInfoInternal.getSupportedHighResolutions(useCaseFormatMap[useCaseClass]!!),
+            boundSize,
+            resolutionStrategyFallbackRule,
+            PREFER_HIGHER_RESOLUTION_OVER_CAPTURE_RATE
+        )
+
+    private fun <T : UseCase> canSelectAnyResolutionByResolutionFilter(
+        useCaseClass: Class<T>,
+        outputSizes: List<Size>,
+        boundSize: Size? = null,
+        resolutionStrategyFallbackRule: Int = FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER,
+        allowedResolutionMode: Int = PREFER_CAPTURE_RATE_OVER_HIGHER_RESOLUTION
+    ) {
+        outputSizes.forEach { targetResolution ->
+            val useCase =
+                createUseCaseWithResolutionSelector(
+                    useCaseClass,
+                    boundSize = boundSize,
+                    resolutionStrategyFallbackRule = resolutionStrategyFallbackRule,
+                    resolutionFilter = { _, _ -> mutableListOf(targetResolution) },
+                    allowedResolutionMode = allowedResolutionMode
+                )
+            instrumentation.runOnMainSync {
+                cameraProvider.unbindAll()
+                cameraProvider.bindToLifecycle(lifecycleOwner, cameraSelector, useCase)
+            }
+            assertThat(useCase.attachedSurfaceResolution).isEqualTo(targetResolution)
+        }
+    }
+
+    @Test
+    fun canSelectResolutionForSixtyFpsPreview() {
+        assumeTrue(isSixtyFpsSupported())
+
+        val preview = Preview.Builder().setTargetFrameRate(Range.create(60, 60)).build()
+        val imageCapture = ImageCapture.Builder().build()
+
+        instrumentation.runOnMainSync {
+            cameraProvider.bindToLifecycle(lifecycleOwner, cameraSelector, preview, imageCapture)
+        }
+
+        assertThat(getMaxFrameRate(preview.attachedSurfaceResolution!!)).isEqualTo(60)
+    }
+
+    private fun isSixtyFpsSupported() =
+        CameraUtil.getCameraCharacteristics(cameraSelector.lensFacing!!)
+            ?.get(CONTROL_AE_AVAILABLE_TARGET_FPS_RANGES)
+            ?.any { range -> range.upper == 60 } ?: false
+
+    private fun getMaxFrameRate(size: Size) =
+        (1_000_000_000.0 /
+                CameraUtil.getCameraCharacteristics(cameraSelector.lensFacing!!)!!.get(
+                        CameraCharacteristics.SCALER_STREAM_CONFIGURATION_MAP
+                    )!!
+                    .getOutputMinFrameDuration(SurfaceTexture::class.java, size) + 0.5)
+            .toInt()
+
+    private fun <T : UseCase> createUseCaseWithResolutionSelector(
+        useCaseClass: Class<T>,
+        aspectRatio: Int? = null,
+        aspectRatioStrategyFallbackRule: Int = FALLBACK_RULE_AUTO,
+        boundSize: Size? = null,
+        resolutionStrategyFallbackRule: Int = FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER,
+        resolutionFilter: ResolutionFilter? = null,
+        allowedResolutionMode: Int = PREFER_CAPTURE_RATE_OVER_HIGHER_RESOLUTION
+    ): UseCase {
+        val builder =
+            when (useCaseClass) {
+                Preview::class.java -> Preview.Builder()
+                ImageCapture::class.java -> ImageCapture.Builder()
+                ImageAnalysis::class.java -> ImageAnalysis.Builder()
+                else -> throw IllegalArgumentException("Unsupported class type!!")
+            }
+
+        (builder as ImageOutputConfig.Builder<*>).setResolutionSelector(
+            createResolutionSelector(
+                aspectRatio,
+                aspectRatioStrategyFallbackRule,
+                boundSize,
+                resolutionStrategyFallbackRule,
+                resolutionFilter,
+                allowedResolutionMode
+            )
+        )
+
+        return builder.build()
+    }
+
+    private fun createResolutionSelector(
+        aspectRatio: Int? = null,
+        aspectRatioFallbackRule: Int = FALLBACK_RULE_AUTO,
+        boundSize: Size? = null,
+        resolutionFallbackRule: Int = FALLBACK_RULE_CLOSEST_HIGHER_THEN_LOWER,
+        resolutionFilter: ResolutionFilter? = null,
+        allowedResolutionMode: Int = PREFER_CAPTURE_RATE_OVER_HIGHER_RESOLUTION
+    ) =
+        ResolutionSelector.Builder()
+            .apply {
+                aspectRatio?.let {
+                    setAspectRatioStrategy(
+                        AspectRatioStrategy(aspectRatio, aspectRatioFallbackRule)
+                    )
+                }
+                boundSize?.let {
+                    setResolutionStrategy(ResolutionStrategy(boundSize, resolutionFallbackRule))
+                }
+                resolutionFilter?.let { setResolutionFilter(resolutionFilter) }
+                setAllowedResolutionMode(allowedResolutionMode)
+            }
+            .build()
+
+    private fun aspectRatioToRational(ratio: Int) =
+        if (ratio == RATIO_16_9) {
+            ASPECT_RATIO_16_9
+        } else {
+            ASPECT_RATIO_4_3
+        }
+
+    private fun <T : UseCase> getClosestAspectRatioSizesUnderPreviewSize(
+        targetAspectRatio: Int,
+        useCaseClass: Class<T>
+    ): List<Size> {
+        val outputSizes =
+            cameraInfoInternal.getSupportedResolutions(useCaseFormatMap[useCaseClass]!!)
+        return outputSizes
+            .getSmallerThanOrEqualToPreviewScaleSizeSublist()
+            .getClosestAspectRatioSublist(targetAspectRatio)
+    }
+
+    private fun <T : UseCase> getClosestAspectRatioSizes(
+        targetAspectRatio: Int,
+        useCaseClass: Class<T>
+    ): List<Size> {
+        val outputSizes =
+            cameraInfoInternal.getSupportedResolutions(useCaseFormatMap[useCaseClass]!!)
+        return outputSizes.getClosestAspectRatioSublist(targetAspectRatio)
+    }
+
+    private fun List<Size>.getSmallerThanOrEqualToPreviewScaleSizeSublist() = filter { size ->
+        SizeUtil.getArea(size) <= SizeUtil.getArea(getPreviewScaleSize())
+    }
+
+    @Suppress("DEPRECATION")
+    private fun getPreviewScaleSize(): Size {
+        val point = Point()
+        DisplayInfoManager.getInstance(context).getMaxSizeDisplay(false).getRealSize(point)
+        val displaySize = Size(point.x, point.y)
+        return if (SizeUtil.isSmallerByArea(RESOLUTION_1080P, displaySize)) {
+            RESOLUTION_1080P
+        } else {
+            displaySize
+        }
+    }
+
+    private fun List<Size>.getClosestAspectRatioSublist(targetAspectRatio: Int): List<Size> {
+        val sensorRect = (camera.cameraControl as RestrictedCameraControl).sensorRect
+        val aspectRatios = getResolutionListGroupingAspectRatioKeys(this)
+        val sortedAspectRatios =
+            aspectRatios.sortedWith(
+                AspectRatioUtil.CompareAspectRatiosByMappingAreaInFullFovAspectRatioSpace(
+                    aspectRatioToRational(targetAspectRatio),
+                    Rational(sensorRect.width(), sensorRect.height())
+                )
+            )
+        val groupedRatioToSizesMap = groupSizesByAspectRatio(this)
+
+        for (ratio in sortedAspectRatios) {
+            groupedRatioToSizesMap[ratio]?.let {
+                if (it.isNotEmpty()) {
+                    return it
+                }
+            }
+        }
+
+        fail("There should have one non-empty size list returned.")
+    }
+
+    /**
+     * Returns the grouping aspect ratio keys of the input resolution list.
+     *
+     * Some sizes might be mod16 case. When grouping, those sizes will be grouped into an existing
+     * aspect ratio group if the aspect ratio can match by the mod16 rule.
+     */
+    private fun getResolutionListGroupingAspectRatioKeys(
+        resolutionCandidateList: List<Size>
+    ): List<Rational> {
+        val aspectRatios = mutableListOf<Rational>()
+
+        // Adds the default 4:3 and 16:9 items first to avoid their mod16 sizes to create
+        // additional items.
+        aspectRatios.add(ASPECT_RATIO_4_3)
+        aspectRatios.add(ASPECT_RATIO_16_9)
+
+        // Tries to find the aspect ratio which the target size belongs to.
+        for (size in resolutionCandidateList) {
+            val newRatio = Rational(size.width, size.height)
+            val aspectRatioFound = aspectRatios.contains(newRatio)
+
+            // The checking size might be a mod16 size which can be mapped to an existing aspect
+            // ratio group.
+            if (!aspectRatioFound) {
+                var hasMatchingAspectRatio = false
+                for (aspectRatio in aspectRatios) {
+                    if (hasMatchingAspectRatio(size, aspectRatio)) {
+                        hasMatchingAspectRatio = true
+                        break
+                    }
+                }
+                if (!hasMatchingAspectRatio) {
+                    aspectRatios.add(newRatio)
+                }
+            }
+        }
+
+        return aspectRatios
+    }
+
+    /** Groups the input sizes into an aspect ratio to size list map. */
+    private fun groupSizesByAspectRatio(sizes: List<Size>): Map<Rational, MutableList<Size>> {
+        val aspectRatioSizeListMap = mutableMapOf<Rational, MutableList<Size>>()
+        val aspectRatioKeys = getResolutionListGroupingAspectRatioKeys(sizes)
+
+        for (aspectRatio in aspectRatioKeys) {
+            aspectRatioSizeListMap[aspectRatio] = mutableListOf()
+        }
+
+        for (outputSize in sizes) {
+            for (key in aspectRatioSizeListMap.keys) {
+                // Put the size into all groups that is matched in mod16 condition since a size
+                // may match multiple aspect ratio in mod16 algorithm.
+                if (hasMatchingAspectRatio(outputSize, key)) {
+                    aspectRatioSizeListMap[key]!!.add(outputSize)
+                }
+            }
+        }
+
+        return aspectRatioSizeListMap
+    }
+
+    // Skips the tests when the devices have any of the quirks that might affect the selected
+    // resolution.
+    private fun assumeNotAspectRatioQuirkDevice() {
+        assumeFalse(hasAspectRatioLegacyApi21Quirk())
+        assumeFalse(hasNexus4AndroidLTargetAspectRatioQuirk())
+        assumeFalse(hasExtraCroppingQuirk())
+    }
+
+    // Checks whether it is the device for AspectRatioLegacyApi21Quirk
+    private fun hasAspectRatioLegacyApi21Quirk(): Boolean {
+        val quirks = cameraInfoInternal.cameraQuirks
+
+        return if (implName == CameraPipeConfig::class.simpleName) {
+            quirks.contains(
+                androidx.camera.camera2.pipe.integration.compat.quirk
+                        .AspectRatioLegacyApi21Quirk::class
+                    .java
+            )
+        } else {
+            quirks.contains(
+                androidx.camera.camera2.internal.compat.quirk.AspectRatioLegacyApi21Quirk::class
+                    .java
+            )
+        }
+    }
+
+    // Checks whether it is the device for Nexus4AndroidLTargetAspectRatioQuirk
+    private fun hasNexus4AndroidLTargetAspectRatioQuirk() =
+        if (implName == CameraPipeConfig::class.simpleName) {
+            hasDeviceQuirk(
+                androidx.camera.camera2.pipe.integration.compat.quirk
+                        .Nexus4AndroidLTargetAspectRatioQuirk::class
+                    .java
+            )
+        } else {
+            hasDeviceQuirk(
+                androidx.camera.camera2.internal.compat.quirk
+                        .Nexus4AndroidLTargetAspectRatioQuirk::class
+                    .java
+            )
+        }
+
+    // Checks whether it is the device for ExtraCroppingQuirk
+    private fun hasExtraCroppingQuirk() =
+        if (implName == CameraPipeConfig::class.simpleName) {
+            hasDeviceQuirk(
+                androidx.camera.camera2.pipe.integration.compat.quirk.ExtraCroppingQuirk::class.java
+            )
+        } else {
+            hasDeviceQuirk(
+                androidx.camera.camera2.internal.compat.quirk.ExtraCroppingQuirk::class.java
+            )
+        }
+
+    // Skips the tests when the devices have any of the quirks that might affect the selected
+    // resolution.
+    private fun assumeNotOutputSizeQuirkDevice() {
+        assumeFalse(hasExcludedSupportedSizesQuirk())
+        assumeFalse(hasExtraSupportedOutputSizeQuirk())
+    }
+
+    private fun hasExcludedSupportedSizesQuirk() =
+        if (implName == CameraPipeConfig::class.simpleName) {
+            hasDeviceQuirk(
+                androidx.camera.camera2.pipe.integration.compat.quirk
+                        .ExcludedSupportedSizesQuirk::class
+                    .java
+            )
+        } else {
+            hasDeviceQuirk(
+                androidx.camera.camera2.internal.compat.quirk.ExcludedSupportedSizesQuirk::class
+                    .java
+            )
+        }
+
+    private fun hasExtraSupportedOutputSizeQuirk() =
+        if (implName == CameraPipeConfig::class.simpleName) {
+            hasDeviceQuirk(
+                androidx.camera.camera2.pipe.integration.compat.quirk
+                        .ExtraSupportedOutputSizeQuirk::class
+                    .java
+            )
+        } else {
+            hasDeviceQuirk(
+                androidx.camera.camera2.internal.compat.quirk.ExtraSupportedOutputSizeQuirk::class
+                    .java
+            )
+        }
+
+    private fun <T : Quirk?> hasDeviceQuirk(quirkClass: Class<T>) =
+        if (implName == CameraPipeConfig::class.simpleName) {
+            androidx.camera.camera2.pipe.integration.compat.quirk.DeviceQuirks.get(quirkClass)
+        } else {
+            androidx.camera.camera2.internal.compat.quirk.DeviceQuirks.get(quirkClass)
+        } != null
+}
diff --git a/compose/runtime/runtime-test-utils/src/commonMain/kotlin/androidx/compose/runtime/mock/ViewApplier.kt b/compose/runtime/runtime-test-utils/src/commonMain/kotlin/androidx/compose/runtime/mock/ViewApplier.kt
index 4b1b684..e92c777 100644
--- a/compose/runtime/runtime-test-utils/src/commonMain/kotlin/androidx/compose/runtime/mock/ViewApplier.kt
+++ b/compose/runtime/runtime-test-utils/src/commonMain/kotlin/androidx/compose/runtime/mock/ViewApplier.kt
@@ -20,6 +20,8 @@
 
 @Suppress("EXTENSION_SHADOWED_BY_MEMBER")
 class ViewApplier(root: View) : AbstractApplier<View>(root) {
+    var called = false
+
     var onBeginChangesCalled = 0
         private set
 
@@ -28,29 +30,53 @@
 
     override fun insertTopDown(index: Int, instance: View) {
         // Ignored as the tree is built bottom-up.
+        called = true
     }
 
     override fun insertBottomUp(index: Int, instance: View) {
         current.addAt(index, instance)
+        called = true
     }
 
     override fun remove(index: Int, count: Int) {
         current.removeAt(index, count)
+        called = true
     }
 
     override fun move(from: Int, to: Int, count: Int) {
         current.moveAt(from, to, count)
+        called = true
     }
 
     override fun onClear() {
         root.removeAllChildren()
+        called = true
     }
 
     override fun onBeginChanges() {
         onBeginChangesCalled++
+        called = true
     }
 
     override fun onEndChanges() {
         onEndChangesCalled++
+        called = true
+    }
+
+    override var current: View
+        get() = super.current.also { if (it != root) called = true }
+        set(value) {
+            super.current = value
+            called = true
+        }
+
+    override fun down(node: View) {
+        super.down(node)
+        called = true
+    }
+
+    override fun up() {
+        super.up()
+        called = true
     }
 }
diff --git a/compose/runtime/runtime/api/current.ignore b/compose/runtime/runtime/api/current.ignore
index fc9bb64..f02ef5a 100644
--- a/compose/runtime/runtime/api/current.ignore
+++ b/compose/runtime/runtime/api/current.ignore
@@ -1,19 +1,3 @@
 // Baseline format: 1.0
-AddedAbstractMethod: androidx.compose.runtime.Composer#endReplaceGroup():
-    Added method androidx.compose.runtime.Composer.endReplaceGroup()
-AddedAbstractMethod: androidx.compose.runtime.Composer#startReplaceGroup(int):
-    Added method androidx.compose.runtime.Composer.startReplaceGroup(int)
-AddedAbstractMethod: androidx.compose.runtime.ControlledComposition#abandonChanges():
-    Added method androidx.compose.runtime.ControlledComposition.abandonChanges()
-
-
-InvalidNullConversion: androidx.compose.runtime.ComposablesKt#key(Object[], kotlin.jvm.functions.Function0<? extends T>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.ComposablesKt.key(Object[] keys, kotlin.jvm.functions.Function0<? extends T> block)
-InvalidNullConversion: androidx.compose.runtime.ComposablesKt#remember(Object[], kotlin.jvm.functions.Function0<? extends T>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.ComposablesKt.remember(Object[] keys, kotlin.jvm.functions.Function0<? extends T> calculation)
-InvalidNullConversion: androidx.compose.runtime.EffectsKt#DisposableEffect(Object[], kotlin.jvm.functions.Function1<? super androidx.compose.runtime.DisposableEffectScope,? extends androidx.compose.runtime.DisposableEffectResult>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.EffectsKt.DisposableEffect(Object[] keys, kotlin.jvm.functions.Function1<? super androidx.compose.runtime.DisposableEffectScope,? extends androidx.compose.runtime.DisposableEffectResult> effect)
-InvalidNullConversion: androidx.compose.runtime.EffectsKt#LaunchedEffect(Object[], kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.EffectsKt.LaunchedEffect(Object[] keys, kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block)
-InvalidNullConversion: androidx.compose.runtime.SnapshotStateKt#produceState(T, Object[], kotlin.jvm.functions.Function2<? super androidx.compose.runtime.ProduceStateScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?>) parameter #1:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.SnapshotStateKt.produceState(T initialValue, Object[] keys, kotlin.jvm.functions.Function2<? super androidx.compose.runtime.ProduceStateScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> producer)
+AddedAbstractMethod: androidx.compose.runtime.ControlledComposition#setShouldPauseCallback(kotlin.jvm.functions.Function0<java.lang.Boolean>):
+    Added method androidx.compose.runtime.ControlledComposition.setShouldPauseCallback(kotlin.jvm.functions.Function0<java.lang.Boolean>)
diff --git a/compose/runtime/runtime/api/current.txt b/compose/runtime/runtime/api/current.txt
index 110569b..9212ade 100644
--- a/compose/runtime/runtime/api/current.txt
+++ b/compose/runtime/runtime/api/current.txt
@@ -22,6 +22,7 @@
   }
 
   @kotlin.jvm.JvmDefaultWithCompatibility public interface Applier<N> {
+    method public default void apply(kotlin.jvm.functions.Function2<? super N,java.lang.Object?,kotlin.Unit> block, Object? value);
     method public void clear();
     method public void down(N node);
     method public N getCurrent();
@@ -31,6 +32,7 @@
     method public default void onBeginChanges();
     method public default void onEndChanges();
     method public void remove(int index, int count);
+    method public default void reuse();
     method public void up();
     property public abstract N current;
   }
@@ -286,6 +288,7 @@
     method public void recordModificationsOf(java.util.Set<?> values);
     method public void recordReadOf(Object value);
     method public void recordWriteOf(Object value);
+    method public kotlin.jvm.functions.Function0<java.lang.Boolean>? setShouldPauseCallback(kotlin.jvm.functions.Function0<java.lang.Boolean>? shouldPause);
     method @SuppressCompatibility @androidx.compose.runtime.InternalComposeApi public void verifyConsistent();
     property public abstract boolean hasPendingChanges;
     property public abstract boolean isComposing;
@@ -459,6 +462,15 @@
   @kotlin.annotation.Retention(kotlin.annotation.AnnotationRetention.SOURCE) @kotlin.annotation.Target(allowedTargets={kotlin.annotation.AnnotationTarget.FUNCTION, kotlin.annotation.AnnotationTarget.PROPERTY_GETTER}) public @interface NonSkippableComposable {
   }
 
+  public interface PausableComposition extends androidx.compose.runtime.ReusableComposition {
+    method public androidx.compose.runtime.PausedComposition setPausableContent(kotlin.jvm.functions.Function0<kotlin.Unit> content);
+    method public androidx.compose.runtime.PausedComposition setPausableContentWithReuse(kotlin.jvm.functions.Function0<kotlin.Unit> content);
+  }
+
+  public final class PausableCompositionKt {
+    method public static androidx.compose.runtime.PausableComposition PausableComposition(androidx.compose.runtime.Applier<? extends java.lang.Object?> applier, androidx.compose.runtime.CompositionContext parent);
+  }
+
   public final class PausableMonotonicFrameClock implements androidx.compose.runtime.MonotonicFrameClock {
     ctor public PausableMonotonicFrameClock(androidx.compose.runtime.MonotonicFrameClock frameClock);
     method public boolean isPaused();
@@ -468,6 +480,14 @@
     property public final boolean isPaused;
   }
 
+  public interface PausedComposition {
+    method public void apply();
+    method public void cancel();
+    method public boolean isComplete();
+    method public boolean resume(kotlin.jvm.functions.Function0<java.lang.Boolean> shouldPause);
+    property public abstract boolean isComplete;
+  }
+
   public final class PrimitiveSnapshotStateKt {
     method public static inline operator float getValue(androidx.compose.runtime.FloatState, Object? thisObj, kotlin.reflect.KProperty<? extends java.lang.Object?> property);
     method @androidx.compose.runtime.snapshots.StateFactoryMarker public static androidx.compose.runtime.MutableFloatState mutableFloatStateOf(float value);
diff --git a/compose/runtime/runtime/api/restricted_current.ignore b/compose/runtime/runtime/api/restricted_current.ignore
index 5826792..f02ef5a 100644
--- a/compose/runtime/runtime/api/restricted_current.ignore
+++ b/compose/runtime/runtime/api/restricted_current.ignore
@@ -1,23 +1,3 @@
 // Baseline format: 1.0
-AddedAbstractMethod: androidx.compose.runtime.Composer#endReplaceGroup():
-    Added method androidx.compose.runtime.Composer.endReplaceGroup()
-AddedAbstractMethod: androidx.compose.runtime.Composer#startReplaceGroup(int):
-    Added method androidx.compose.runtime.Composer.startReplaceGroup(int)
-AddedAbstractMethod: androidx.compose.runtime.ControlledComposition#abandonChanges():
-    Added method androidx.compose.runtime.ControlledComposition.abandonChanges()
-
-
-InvalidNullConversion: androidx.compose.runtime.ComposablesKt#key(Object[], kotlin.jvm.functions.Function0<? extends T>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.ComposablesKt.key(Object[] keys, kotlin.jvm.functions.Function0<? extends T> block)
-InvalidNullConversion: androidx.compose.runtime.ComposablesKt#remember(Object[], kotlin.jvm.functions.Function0<? extends T>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.ComposablesKt.remember(Object[] keys, kotlin.jvm.functions.Function0<? extends T> calculation)
-InvalidNullConversion: androidx.compose.runtime.EffectsKt#DisposableEffect(Object[], kotlin.jvm.functions.Function1<? super androidx.compose.runtime.DisposableEffectScope,? extends androidx.compose.runtime.DisposableEffectResult>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.EffectsKt.DisposableEffect(Object[] keys, kotlin.jvm.functions.Function1<? super androidx.compose.runtime.DisposableEffectScope,? extends androidx.compose.runtime.DisposableEffectResult> effect)
-InvalidNullConversion: androidx.compose.runtime.EffectsKt#LaunchedEffect(Object[], kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?>) parameter #0:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.EffectsKt.LaunchedEffect(Object[] keys, kotlin.jvm.functions.Function2<? super kotlinx.coroutines.CoroutineScope,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block)
-InvalidNullConversion: androidx.compose.runtime.SnapshotStateKt#produceState(T, Object[], kotlin.jvm.functions.Function2<? super androidx.compose.runtime.ProduceStateScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?>) parameter #1:
-    Attempted to change parameter from @Nullable to @NonNull: incompatible change for parameter keys in androidx.compose.runtime.SnapshotStateKt.produceState(T initialValue, Object[] keys, kotlin.jvm.functions.Function2<? super androidx.compose.runtime.ProduceStateScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> producer)
-
-
-RemovedClass: androidx.compose.runtime.ExpectKt:
-    Removed class androidx.compose.runtime.ExpectKt
+AddedAbstractMethod: androidx.compose.runtime.ControlledComposition#setShouldPauseCallback(kotlin.jvm.functions.Function0<java.lang.Boolean>):
+    Added method androidx.compose.runtime.ControlledComposition.setShouldPauseCallback(kotlin.jvm.functions.Function0<java.lang.Boolean>)
diff --git a/compose/runtime/runtime/api/restricted_current.txt b/compose/runtime/runtime/api/restricted_current.txt
index 186ecc4..9580ec3 100644
--- a/compose/runtime/runtime/api/restricted_current.txt
+++ b/compose/runtime/runtime/api/restricted_current.txt
@@ -26,6 +26,7 @@
   }
 
   @kotlin.jvm.JvmDefaultWithCompatibility public interface Applier<N> {
+    method public default void apply(kotlin.jvm.functions.Function2<? super N,java.lang.Object?,kotlin.Unit> block, Object? value);
     method public void clear();
     method public void down(N node);
     method public N getCurrent();
@@ -35,6 +36,7 @@
     method public default void onBeginChanges();
     method public default void onEndChanges();
     method public void remove(int index, int count);
+    method public default void reuse();
     method public void up();
     property public abstract N current;
   }
@@ -313,6 +315,7 @@
     method public void recordModificationsOf(java.util.Set<?> values);
     method public void recordReadOf(Object value);
     method public void recordWriteOf(Object value);
+    method public kotlin.jvm.functions.Function0<java.lang.Boolean>? setShouldPauseCallback(kotlin.jvm.functions.Function0<java.lang.Boolean>? shouldPause);
     method @SuppressCompatibility @androidx.compose.runtime.InternalComposeApi public void verifyConsistent();
     property public abstract boolean hasPendingChanges;
     property public abstract boolean isComposing;
@@ -487,6 +490,15 @@
   @kotlin.annotation.Retention(kotlin.annotation.AnnotationRetention.SOURCE) @kotlin.annotation.Target(allowedTargets={kotlin.annotation.AnnotationTarget.FUNCTION, kotlin.annotation.AnnotationTarget.PROPERTY_GETTER}) public @interface NonSkippableComposable {
   }
 
+  public interface PausableComposition extends androidx.compose.runtime.ReusableComposition {
+    method public androidx.compose.runtime.PausedComposition setPausableContent(kotlin.jvm.functions.Function0<kotlin.Unit> content);
+    method public androidx.compose.runtime.PausedComposition setPausableContentWithReuse(kotlin.jvm.functions.Function0<kotlin.Unit> content);
+  }
+
+  public final class PausableCompositionKt {
+    method public static androidx.compose.runtime.PausableComposition PausableComposition(androidx.compose.runtime.Applier<? extends java.lang.Object?> applier, androidx.compose.runtime.CompositionContext parent);
+  }
+
   public final class PausableMonotonicFrameClock implements androidx.compose.runtime.MonotonicFrameClock {
     ctor public PausableMonotonicFrameClock(androidx.compose.runtime.MonotonicFrameClock frameClock);
     method public boolean isPaused();
@@ -496,6 +508,14 @@
     property public final boolean isPaused;
   }
 
+  public interface PausedComposition {
+    method public void apply();
+    method public void cancel();
+    method public boolean isComplete();
+    method public boolean resume(kotlin.jvm.functions.Function0<java.lang.Boolean> shouldPause);
+    property public abstract boolean isComplete;
+  }
+
   public final class PrimitiveSnapshotStateKt {
     method public static inline operator float getValue(androidx.compose.runtime.FloatState, Object? thisObj, kotlin.reflect.KProperty<? extends java.lang.Object?> property);
     method @androidx.compose.runtime.snapshots.StateFactoryMarker public static androidx.compose.runtime.MutableFloatState mutableFloatStateOf(float value);
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Applier.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Applier.kt
index e1cd951..5b33661 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Applier.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Applier.kt
@@ -174,6 +174,16 @@
      * root to be used as the target of a new composition in the future.
      */
     fun clear()
+
+    /** Apply a change to the current node. */
+    fun apply(block: N.(Any?) -> Unit, value: Any?) {
+        current.block(value)
+    }
+
+    /** Notify [current] is is being reused in reusable content. */
+    fun reuse() {
+        (current as? ComposeNodeLifecycleCallback)?.onReuse()
+    }
 }
 
 /**
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composer.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composer.kt
index 28f2850..172e582 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composer.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composer.kt
@@ -101,6 +101,15 @@
         priority: Int,
         endRelativeAfter: Int
     )
+
+    /** The restart scope is pausing */
+    fun rememberPausingScope(scope: RecomposeScopeImpl)
+
+    /** The restart scope is resuming */
+    fun startResumingScope(scope: RecomposeScopeImpl)
+
+    /** The restart scope is finished resuming */
+    fun endResumingScope(scope: RecomposeScopeImpl)
 }
 
 /**
@@ -1356,6 +1365,9 @@
     private var insertAnchor: Anchor = insertTable.read { it.anchor(0) }
     private var insertFixups = FixupList()
 
+    private var pausable: Boolean = false
+    private var shouldPauseCallback: (() -> Boolean)? = null
+
     override val applyCoroutineContext: CoroutineContext
         @TestOnly get() = parentContext.effectCoroutineContext
 
@@ -2726,7 +2738,10 @@
                 providerCache = null
 
                 // Invoke the scope's composition function
+                val shouldRestartReusing = !reusing && firstInRange.scope.reusing
+                if (shouldRestartReusing) reusing = true
                 firstInRange.scope.compose(this)
+                if (shouldRestartReusing) reusing = false
 
                 // We could have moved out of a provider so the provider cache is invalid.
                 providerCache = null
@@ -3038,8 +3053,34 @@
     }
 
     @ComposeCompilerApi
-    @Suppress("UNUSED")
     override fun shouldExecute(parametersChanged: Boolean, flags: Int): Boolean {
+        // We only want to pause when we are not resuming and only when inserting new content or
+        // when reusing content. This 0 bit of `flags` is only 1 if this function was restarted by
+        // the restart lambda. The other bits of this flags are currently all 0's and are reserved
+        // for future use.
+        if (((flags and 1) == 0) && (inserting || reusing)) {
+            val callback = shouldPauseCallback ?: return true
+            val scope = currentRecomposeScope ?: return true
+            val pausing = callback()
+            if (pausing) {
+                scope.used = true
+                // Force the composer back into the reusing state when this scope restarts.
+                scope.reusing = reusing
+                scope.paused = true
+                // Remember a place-holder object to ensure all remembers are sent in the correct
+                // order. The remember manager will record the remember callback for the resumed
+                // content into a place-holder to ensure that, when the remember callbacks are
+                // dispatched, the callbacks for the resumed content are dispatched in the same
+                // order they would have been had the content not paused.
+                changeListWriter.rememberPausingScope(scope)
+                parentContext.reportPausedScope(scope)
+                return false
+            }
+            return true
+        }
+
+        // Otherwise we should execute the function if the parameters have changed or when
+        // skipping is disabled.
         return parametersChanged || !skipping
     }
 
@@ -3118,6 +3159,11 @@
                     }
             invalidateStack.push(scope)
             scope.start(compositionToken)
+            if (scope.paused) {
+                scope.paused = false
+                scope.resuming = true
+                changeListWriter.startResumingScope(scope)
+            }
         }
     }
 
@@ -3133,8 +3179,16 @@
         // exception stack unwinding that might have not called the doneJoin/endRestartGroup in the
         // the correct order.
         val scope = if (invalidateStack.isNotEmpty()) invalidateStack.pop() else null
-        scope?.requiresRecompose = false
-        scope?.end(compositionToken)?.let { changeListWriter.endCompositionScope(it, composition) }
+        if (scope != null) {
+            scope.requiresRecompose = false
+            scope.end(compositionToken)?.let {
+                changeListWriter.endCompositionScope(it, composition)
+            }
+            if (scope.resuming) {
+                scope.resuming = false
+                changeListWriter.endResumingScope(scope)
+            }
+        }
         val result =
             if (scope != null && !scope.skipped && (scope.used || forceRecomposeScopes)) {
                 if (scope.anchor == null) {
@@ -3438,10 +3492,16 @@
      */
     internal fun composeContent(
         invalidationsRequested: ScopeMap<RecomposeScopeImpl, Any>,
-        content: @Composable () -> Unit
+        content: @Composable () -> Unit,
+        shouldPause: (() -> Boolean)?
     ) {
         runtimeCheck(changes.isEmpty()) { "Expected applyChanges() to have been called" }
-        doCompose(invalidationsRequested, content)
+        this.shouldPauseCallback = shouldPause
+        try {
+            doCompose(invalidationsRequested, content)
+        } finally {
+            this.shouldPauseCallback = null
+        }
     }
 
     internal fun prepareCompose(block: () -> Unit) {
@@ -3460,6 +3520,7 @@
      */
     internal fun recompose(
         invalidationsRequested: ScopeMap<RecomposeScopeImpl, Any>,
+        shouldPause: (() -> Boolean)?
     ): Boolean {
         runtimeCheck(changes.isEmpty()) { "Expected applyChanges() to have been called" }
         // even if invalidationsRequested is empty we still need to recompose if the Composer has
@@ -3467,7 +3528,12 @@
         // there were a change for a state which was used by the child composition. such changes
         // will be tracked and added into `invalidations` list.
         if (invalidationsRequested.size > 0 || invalidations.isNotEmpty() || forciblyRecompose) {
-            doCompose(invalidationsRequested, null)
+            shouldPauseCallback = shouldPause
+            try {
+                doCompose(invalidationsRequested, null)
+            } finally {
+                shouldPauseCallback = null
+            }
             return changes.isNotEmpty()
         }
         return false
@@ -3786,6 +3852,10 @@
             parentContext.unregisterComposition(composition)
         }
 
+        override fun reportPausedScope(scope: RecomposeScopeImpl) {
+            parentContext.reportPausedScope(scope)
+        }
+
         override val effectCoroutineContext: CoroutineContext
             get() = parentContext.effectCoroutineContext
 
@@ -3802,6 +3872,20 @@
             parentContext.composeInitial(composition, content)
         }
 
+        override fun composeInitialPaused(
+            composition: ControlledComposition,
+            shouldPause: () -> Boolean,
+            content: @Composable () -> Unit
+        ): ScatterSet<RecomposeScopeImpl> =
+            parentContext.composeInitialPaused(composition, shouldPause, content)
+
+        override fun recomposePaused(
+            composition: ControlledComposition,
+            shouldPause: () -> Boolean,
+            invalidScopes: ScatterSet<RecomposeScopeImpl>
+        ): ScatterSet<RecomposeScopeImpl> =
+            parentContext.recomposePaused(composition, shouldPause, invalidScopes)
+
         override fun invalidate(composition: ControlledComposition) {
             // Invalidate ourselves with our parent before we invalidate a child composer.
             // This ensures that when we are scheduling recompositions, parents always
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composition.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composition.kt
index e2efd82..f674dbe 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composition.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Composition.kt
@@ -18,13 +18,12 @@
 
 package androidx.compose.runtime
 
-import androidx.collection.MutableIntList
 import androidx.collection.MutableScatterSet
-import androidx.collection.mutableScatterSetOf
 import androidx.compose.runtime.changelist.ChangeList
 import androidx.compose.runtime.collection.ScopeMap
 import androidx.compose.runtime.collection.fastForEach
 import androidx.compose.runtime.internal.AtomicReference
+import androidx.compose.runtime.internal.RememberEventDispatcher
 import androidx.compose.runtime.internal.trace
 import androidx.compose.runtime.snapshots.ReaderKind
 import androidx.compose.runtime.snapshots.StateObjectImpl
@@ -289,6 +288,29 @@
      * used to compose as if the scopes have already been changed.
      */
     fun <R> delegateInvalidations(to: ControlledComposition?, groupIndex: Int, block: () -> R): R
+
+    /**
+     * Sets the [shouldPause] callback allowing a composition to be pausable if it is not `null`.
+     * Setting the callback to `null` disables pausing.
+     *
+     * @return the previous value of the callback which will be restored once the callback is no
+     *   longer needed.
+     * @see PausableComposition
+     */
+    fun setShouldPauseCallback(shouldPause: (() -> Boolean)?): (() -> Boolean)?
+}
+
+/** Utility function to set and restore a should pause callback. */
+internal inline fun <R> ControlledComposition.pausable(
+    noinline shouldPause: () -> Boolean,
+    block: () -> R
+): R {
+    val previous = setShouldPauseCallback(shouldPause)
+    return try {
+        block()
+    } finally {
+        setShouldPauseCallback(previous)
+    }
 }
 
 /**
@@ -409,7 +431,12 @@
     /** The applier to use to update the tree managed by the composition. */
     private val applier: Applier<*>,
     recomposeContext: CoroutineContext? = null
-) : ControlledComposition, ReusableComposition, RecomposeScopeOwner, CompositionServices {
+) :
+    ControlledComposition,
+    ReusableComposition,
+    RecomposeScopeOwner,
+    CompositionServices,
+    PausableComposition {
     /**
      * `null` if a composition isn't pending to apply. `Set<Any>` or `Array<Set<Any>>` if there are
      * modifications to record [PendingApplyNoModifications] if a composition is pending to apply,
@@ -520,6 +547,14 @@
     @Suppress("MemberVisibilityCanBePrivate") // published as internal
     internal var pendingInvalidScopes = false
 
+    /**
+     * If the [shouldPause] callback is set the composition is pausable and should pause whenever
+     * the [shouldPause] callback returns `true`.
+     */
+    private var shouldPause: (() -> Boolean)? = null
+
+    private var pendingPausedComposition: PausedCompositionImpl? = null
+
     private var invalidationDelegate: CompositionImpl? = null
 
     private var invalidationDelegateGroup: Int = 0
@@ -572,10 +607,16 @@
         get() = synchronized(lock) { composer.hasPendingChanges }
 
     override fun setContent(content: @Composable () -> Unit) {
+        checkPrecondition(pendingPausedComposition == null) {
+            "A pausable composition is in progress"
+        }
         composeInitial(content)
     }
 
     override fun setContentWithReuse(content: @Composable () -> Unit) {
+        checkPrecondition(pendingPausedComposition == null) {
+            "A pausable composition is in progress"
+        }
         composer.startReuseFromRoot()
 
         composeInitial(content)
@@ -583,6 +624,50 @@
         composer.endReuseFromRoot()
     }
 
+    override fun setPausableContent(content: @Composable () -> Unit): PausedComposition {
+        checkPrecondition(!disposed) { "The composition is disposed" }
+        checkPrecondition(pendingPausedComposition == null) {
+            "A pausable composition is in progress"
+        }
+        val pausedComposition =
+            PausedCompositionImpl(
+                composition = this,
+                context = parent,
+                composer = composer,
+                content = content,
+                reusable = false,
+                abandonSet = abandonSet,
+                applier = applier,
+                lock = lock,
+            )
+        pendingPausedComposition = pausedComposition
+        return pausedComposition
+    }
+
+    override fun setPausableContentWithReuse(content: @Composable () -> Unit): PausedComposition {
+        checkPrecondition(!disposed) { "The composition is disposed" }
+        checkPrecondition(pendingPausedComposition == null) {
+            "A pausable composition is in progress"
+        }
+        val pausedComposition =
+            PausedCompositionImpl(
+                composition = this,
+                context = parent,
+                composer = composer,
+                content = content,
+                reusable = true,
+                abandonSet = abandonSet,
+                applier = applier,
+                lock = lock,
+            )
+        pendingPausedComposition = pausedComposition
+        return pausedComposition
+    }
+
+    internal fun pausedCompositionFinished() {
+        pendingPausedComposition = null
+    }
+
     private fun composeInitial(content: @Composable () -> Unit) {
         checkPrecondition(!disposed) { "The composition is disposed" }
         this.composable = content
@@ -701,7 +786,7 @@
                             invalidations.asMap() as Map<RecomposeScope, Set<Any>>
                         )
                     }
-                    composer.composeContent(invalidations, content)
+                    composer.composeContent(invalidations, content, shouldPause)
                     observer?.onEndComposition(this)
                 }
             }
@@ -911,7 +996,7 @@
                         this,
                         invalidations.asMap() as Map<RecomposeScope, Set<Any>>
                     )
-                    composer.recompose(invalidations).also { shouldDrain ->
+                    composer.recompose(invalidations, shouldPause).also { shouldDrain ->
                         // Apply would normally do this for us; do it now if apply shouldn't happen.
                         if (!shouldDrain) drainPendingModificationsLocked()
                         observer?.onEndComposition(this)
@@ -939,11 +1024,13 @@
         try {
             if (changes.isEmpty()) return
             trace("Compose:applyChanges") {
+                val applier = pendingPausedComposition?.pausableApplier ?: applier
+                val rememberManager = pendingPausedComposition?.rememberManager ?: manager
                 applier.onBeginChanges()
 
                 // Apply all changes
                 slotTable.write { slots ->
-                    changes.executeAndFlushAllPendingChanges(applier, slots, manager)
+                    changes.executeAndFlushAllPendingChanges(applier, slots, rememberManager)
                 }
                 applier.onEndChanges()
             }
@@ -962,9 +1049,12 @@
                 }
             }
         } finally {
-            // Only dispatch abandons if we do not have any late changes. The instances in the
-            // abandon set can be remembered in the late changes.
-            if (this.lateChanges.isEmpty()) manager.dispatchAbandons()
+            // Only dispatch abandons if we do not have any late changes or pending paused
+            // compositions. The instances in the abandon set can be remembered in the late changes
+            // or when the paused composition is applied.
+            if (this.lateChanges.isEmpty() && pendingPausedComposition == null) {
+                manager.dispatchAbandons()
+            }
         }
     }
 
@@ -1062,6 +1152,12 @@
         } else block()
     }
 
+    override fun setShouldPauseCallback(shouldPause: (() -> Boolean)?): (() -> Boolean)? {
+        val previous = this.shouldPause
+        this.shouldPause = shouldPause
+        return previous
+    }
+
     override fun invalidate(scope: RecomposeScopeImpl, instance: Any?): InvalidationResult {
         if (scope.defaultsInScope) {
             scope.defaultsInvalid = true
@@ -1241,218 +1337,6 @@
 
     // This is only used in tests to ensure the stacks do not silently leak.
     internal fun composerStacksSizes(): Int = composer.stacksSize()
-
-    /** Helper for collecting remember observers for later strictly ordered dispatch. */
-    private class RememberEventDispatcher(private val abandoning: MutableSet<RememberObserver>) :
-        RememberManager {
-        private val remembering = mutableListOf<RememberObserver>()
-        private val leaving = mutableListOf<Any>()
-        private val sideEffects = mutableListOf<() -> Unit>()
-        private var releasing: MutableScatterSet<ComposeNodeLifecycleCallback>? = null
-        private val pending = mutableListOf<Any>()
-        private val priorities = MutableIntList()
-        private val afters = MutableIntList()
-
-        override fun remembering(instance: RememberObserver) {
-            remembering.add(instance)
-        }
-
-        override fun forgetting(
-            instance: RememberObserver,
-            endRelativeOrder: Int,
-            priority: Int,
-            endRelativeAfter: Int
-        ) {
-            recordLeaving(instance, endRelativeOrder, priority, endRelativeAfter)
-        }
-
-        override fun sideEffect(effect: () -> Unit) {
-            sideEffects += effect
-        }
-
-        override fun deactivating(
-            instance: ComposeNodeLifecycleCallback,
-            endRelativeOrder: Int,
-            priority: Int,
-            endRelativeAfter: Int
-        ) {
-            recordLeaving(instance, endRelativeOrder, priority, endRelativeAfter)
-        }
-
-        override fun releasing(
-            instance: ComposeNodeLifecycleCallback,
-            endRelativeOrder: Int,
-            priority: Int,
-            endRelativeAfter: Int
-        ) {
-            val releasing =
-                releasing
-                    ?: mutableScatterSetOf<ComposeNodeLifecycleCallback>().also { releasing = it }
-
-            releasing += instance
-            recordLeaving(instance, endRelativeOrder, priority, endRelativeAfter)
-        }
-
-        fun dispatchRememberObservers() {
-            // Add any pending out-of-order forgotten objects
-            processPendingLeaving(Int.MIN_VALUE)
-
-            // Send forgets and node callbacks
-            if (leaving.isNotEmpty()) {
-                trace("Compose:onForgotten") {
-                    val releasing = releasing
-                    for (i in leaving.size - 1 downTo 0) {
-                        val instance = leaving[i]
-                        if (instance is RememberObserver) {
-                            abandoning.remove(instance)
-                            instance.onForgotten()
-                        }
-                        if (instance is ComposeNodeLifecycleCallback) {
-                            // node callbacks are in the same queue as forgets to ensure ordering
-                            if (releasing != null && instance in releasing) {
-                                instance.onRelease()
-                            } else {
-                                instance.onDeactivate()
-                            }
-                        }
-                    }
-                }
-            }
-
-            // Send remembers
-            if (remembering.isNotEmpty()) {
-                trace("Compose:onRemembered") {
-                    remembering.fastForEach { instance ->
-                        abandoning.remove(instance)
-                        instance.onRemembered()
-                    }
-                }
-            }
-        }
-
-        fun dispatchSideEffects() {
-            if (sideEffects.isNotEmpty()) {
-                trace("Compose:sideeffects") {
-                    sideEffects.fastForEach { sideEffect -> sideEffect() }
-                    sideEffects.clear()
-                }
-            }
-        }
-
-        fun dispatchAbandons() {
-            if (abandoning.isNotEmpty()) {
-                trace("Compose:abandons") {
-                    val iterator = abandoning.iterator()
-                    // remove elements one by one to ensure that abandons will not be dispatched
-                    // second time in case [onAbandoned] throws.
-                    while (iterator.hasNext()) {
-                        val instance = iterator.next()
-                        iterator.remove()
-                        instance.onAbandoned()
-                    }
-                }
-            }
-        }
-
-        private fun recordLeaving(
-            instance: Any,
-            endRelativeOrder: Int,
-            priority: Int,
-            endRelativeAfter: Int
-        ) {
-            processPendingLeaving(endRelativeOrder)
-            if (endRelativeAfter in 0 until endRelativeOrder) {
-                pending.add(instance)
-                priorities.add(priority)
-                afters.add(endRelativeAfter)
-            } else {
-                leaving.add(instance)
-            }
-        }
-
-        private fun processPendingLeaving(endRelativeOrder: Int) {
-            if (pending.isNotEmpty()) {
-                var index = 0
-                var toAdd: MutableList<Any>? = null
-                var toAddAfter: MutableIntList? = null
-                var toAddPriority: MutableIntList? = null
-                while (index < afters.size) {
-                    if (endRelativeOrder <= afters[index]) {
-                        val instance = pending.removeAt(index)
-                        val endRelativeAfter = afters.removeAt(index)
-                        val priority = priorities.removeAt(index)
-
-                        if (toAdd == null) {
-                            toAdd = mutableListOf(instance)
-                            toAddAfter = MutableIntList().also { it.add(endRelativeAfter) }
-                            toAddPriority = MutableIntList().also { it.add(priority) }
-                        } else {
-                            toAddPriority as MutableIntList
-                            toAddAfter as MutableIntList
-                            toAdd.add(instance)
-                            toAddAfter.add(endRelativeAfter)
-                            toAddPriority.add(priority)
-                        }
-                    } else {
-                        index++
-                    }
-                }
-                if (toAdd != null) {
-                    toAddPriority as MutableIntList
-                    toAddAfter as MutableIntList
-
-                    // Sort the list into [after, -priority] order where it is ordered by after
-                    // in ascending order as the primary key and priority in descending order as
-                    // secondary key.
-
-                    // For example if remember occurs after a child group it must be added after
-                    // all the remembers of the child. This is reported with an after which is the
-                    // slot index of the child's last slot. As this slot might be at the same
-                    // location as where its parents ends this would be ambiguous which should
-                    // first if both the two groups request a slot to be after the same slot.
-                    // Priority is used to break the tie here which is the group index of the group
-                    // which is leaving. Groups that are lower must be added before the parent's
-                    // remember when they have the same after.
-
-                    // The sort must be stable as as consecutive remembers in the same group after
-                    // the same child will have the same after and priority.
-
-                    // A selection sort is used here because it is stable and the groups are
-                    // typically very short so this quickly exit list of one and not loop for
-                    // for sizes of 2. As the information is split between three lists, to
-                    // reduce allocations, [MutableList.sort] cannot be used as it doesn't have
-                    // an option to supply a custom swap.
-                    for (i in 0 until toAdd.size - 1) {
-                        for (j in i + 1 until toAdd.size) {
-                            val iAfter = toAddAfter[i]
-                            val jAfter = toAddAfter[j]
-                            if (
-                                iAfter < jAfter ||
-                                    (jAfter == iAfter && toAddPriority[i] < toAddPriority[j])
-                            ) {
-                                toAdd.swap(i, j)
-                                toAddPriority.swap(i, j)
-                                toAddAfter.swap(i, j)
-                            }
-                        }
-                    }
-                    leaving.addAll(toAdd)
-                }
-            }
-        }
-    }
-}
-
-private fun <T> MutableList<T>.swap(a: Int, b: Int) {
-    val item = this[a]
-    this[a] = this[b]
-    this[b] = item
-}
-
-private fun MutableIntList.swap(a: Int, b: Int) {
-    val item = this[a]
-    this[a] = this[b]
-    this[b] = item
 }
 
 internal object ScopeInvalidated
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/CompositionContext.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/CompositionContext.kt
index 561890c..e5b6d6b 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/CompositionContext.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/CompositionContext.kt
@@ -16,6 +16,7 @@
 
 package androidx.compose.runtime
 
+import androidx.collection.ScatterSet
 import androidx.compose.runtime.internal.persistentCompositionLocalHashMapOf
 import androidx.compose.runtime.tooling.CompositionData
 import kotlin.coroutines.CoroutineContext
@@ -52,6 +53,20 @@
         content: @Composable () -> Unit
     )
 
+    internal abstract fun composeInitialPaused(
+        composition: ControlledComposition,
+        shouldPause: () -> Boolean,
+        content: @Composable () -> Unit
+    ): ScatterSet<RecomposeScopeImpl>
+
+    internal abstract fun recomposePaused(
+        composition: ControlledComposition,
+        shouldPause: () -> Boolean,
+        invalidScopes: ScatterSet<RecomposeScopeImpl>
+    ): ScatterSet<RecomposeScopeImpl>
+
+    internal abstract fun reportPausedScope(scope: RecomposeScopeImpl)
+
     internal abstract fun invalidate(composition: ControlledComposition)
 
     internal abstract fun invalidateScope(scope: RecomposeScopeImpl)
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/PausableComposition.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/PausableComposition.kt
new file mode 100644
index 0000000..0eff8d6
--- /dev/null
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/PausableComposition.kt
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@file:OptIn(InternalComposeApi::class)
+
+package androidx.compose.runtime
+
+import androidx.collection.emptyScatterSet
+import androidx.collection.mutableIntListOf
+import androidx.collection.mutableObjectListOf
+import androidx.compose.runtime.internal.RememberEventDispatcher
+
+/**
+ * A [PausableComposition] is a sub-composition that can be composed incrementally as it supports
+ * being paused and resumed.
+ *
+ * Pausable sub-composition can be used between frames to prepare a sub-composition before it is
+ * required by the main composition. For example, this is used in lazy lists to prepare list items
+ * in between frames to that are likely to be scrolled in. The composition is paused when the start
+ * of the next frame is near allowing composition to be spread across multiple frames without
+ * delaying the production of the next frame.
+ *
+ * The result of the composition should not be used (e.g. the nodes should not added to a layout
+ * tree or placed in layout) until [PausedComposition.isComplete] is `true` and
+ * [PausedComposition.apply] has been called. The composition is incomplete and will not
+ * automatically recompose until after [PausedComposition.apply] is called.
+ *
+ * A [PausableComposition] is a [ReusableComposition] but [setPausableContent] should be used
+ * instead of [ReusableComposition.setContentWithReuse] to create a paused composition.
+ *
+ * If [Composition.setContent] or [ReusableComposition.setContentWithReuse] are used then the
+ * composition behaves as if it wasn't pausable. If there is a [PausedComposition] that has not yet
+ * been applied, an exception is thrown.
+ *
+ * @see Composition
+ * @see ReusableComposition
+ */
+interface PausableComposition : ReusableComposition {
+    /**
+     * Set the content of the composition. A [PausedComposition] that is currently paused. No
+     * composition is performed until [PausedComposition.resume] is called.
+     * [PausedComposition.resume] should be called until [PausedComposition.isComplete] is `true`.
+     * The composition should not be used until [PausedComposition.isComplete] is `true` and
+     * [PausedComposition.apply] has been called.
+     *
+     * @see Composition.setContent
+     * @see ReusableComposition.setContentWithReuse
+     */
+    fun setPausableContent(content: @Composable () -> Unit): PausedComposition
+
+    /**
+     * Set the content of a resuable composition. A [PausedComposition] that is currently paused. No
+     * composition is performed until [PausedComposition.resume] is called.
+     * [PausedComposition.resume] should be called until [PausedComposition.isComplete] is `true`.
+     * The composition should not be used until [PausedComposition.isComplete] is `true` and
+     * [PausedComposition.apply] has been called.
+     *
+     * @see Composition.setContent
+     * @see ReusableComposition.setContentWithReuse
+     */
+    fun setPausableContentWithReuse(content: @Composable () -> Unit): PausedComposition
+}
+
+/**
+ * [PausedComposition] is the result of calling [PausableComposition.setContent] or
+ * [PausableComposition.setContentWithReuse]. It is used to drive the paused composition to
+ * completion. A [PausedComposition] should not be used until [isComplete] is `true` and [apply] has
+ * been called.
+ *
+ * A [PausedComposition] is created paused and will only compose the `content` parameter when
+ * [resume] is called the first time.
+ */
+interface PausedComposition {
+    /**
+     * Returns `true` when the [PausedComposition] is complete. [isComplete] matches the last value
+     * returned from [resume]. Once a [PausedComposition] is [isComplete] the [apply] method should
+     * be called.
+     */
+    val isComplete: Boolean
+
+    /**
+     * Resume the composition that has been paused. This method should be called until [resume]
+     * returns `true` or [isComplete] is `true` which has the same result as the last result of
+     * calling [resume]. The [shouldPause] parameter is a lambda that returns whether the
+     * composition should be paused. For example, in lazy lists this returns `false` until just
+     * prior to the next frame starting in which it returns `true`
+     *
+     * Calling [resume] after it returns `true` or when `isComplete` is true will throw an
+     * exception.
+     *
+     * @param shouldPause A lambda that is used to determine if the composition should be paused.
+     *   This lambda is called often so should be a very simple calculation. Returning `true` does
+     *   not guarantee the composition will pause, it should only be considered a request to pause
+     *   the composition. Not all composable functions are pausable and only pausable composition
+     *   functions will pause.
+     * @return `true` if the composition is complete and `false` if one or more calls to `resume`
+     *   are required to complete composition.
+     */
+    fun resume(shouldPause: () -> Boolean): Boolean
+
+    /**
+     * Apply the composition. This is the last step of a paused composition and is required to be
+     * called prior to the composition is usable.
+     */
+    fun apply()
+
+    /**
+     * Cancels the paused composition. This should only be used if the composition is going to be
+     * disposed and the entire composition is not going to be used.
+     */
+    fun cancel()
+}
+
+/**
+ * Create a [PausableComposition]. A [PausableComposition] can create a [PausedComposition] which
+ * allows pausing and resuming the composition.
+ *
+ * @param applier The [Applier] instance to be used in the composition.
+ * @param parent The parent [CompositionContext].
+ * @see Applier
+ * @see CompositionContext
+ * @see PausableComposition
+ */
+fun PausableComposition(applier: Applier<*>, parent: CompositionContext): PausableComposition =
+    CompositionImpl(parent, applier)
+
+internal enum class PausedCompositionState {
+    Invalid,
+    Cancelled,
+    InitialPending,
+    RecomposePending,
+    ApplyPending,
+    Applied,
+}
+
+internal class PausedCompositionImpl(
+    val composition: CompositionImpl,
+    val context: CompositionContext,
+    val composer: ComposerImpl,
+    abandonSet: MutableSet<RememberObserver>,
+    val content: @Composable () -> Unit,
+    val reusable: Boolean,
+    val applier: Applier<*>,
+    val lock: SynchronizedObject,
+) : PausedComposition {
+    private var state = PausedCompositionState.InitialPending
+    private var invalidScopes = emptyScatterSet<RecomposeScopeImpl>()
+    internal val rememberManager = RememberEventDispatcher(abandonSet)
+    internal val pausableApplier = RecordingApplier(applier.current)
+
+    override val isComplete: Boolean
+        get() = state >= PausedCompositionState.ApplyPending
+
+    override fun resume(shouldPause: () -> Boolean): Boolean {
+        try {
+            when (state) {
+                PausedCompositionState.InitialPending -> {
+                    if (reusable) composer.startReuseFromRoot()
+                    try {
+                        invalidScopes =
+                            context.composeInitialPaused(composition, shouldPause, content)
+                    } finally {
+                        if (reusable) composer.endReuseFromRoot()
+                    }
+                    state = PausedCompositionState.RecomposePending
+                    if (invalidScopes.isEmpty()) markComplete()
+                }
+                PausedCompositionState.RecomposePending -> {
+                    invalidScopes = context.recomposePaused(composition, shouldPause, invalidScopes)
+                    if (invalidScopes.isEmpty()) markComplete()
+                }
+                PausedCompositionState.ApplyPending ->
+                    error("Pausable composition is complete and apply() should be applied")
+                PausedCompositionState.Applied -> error("The paused composition has been applied")
+                PausedCompositionState.Cancelled ->
+                    error("The paused composition has been cancelled")
+                PausedCompositionState.Invalid ->
+                    error("The paused composition is invalid because of a previous exception")
+            }
+        } catch (e: Exception) {
+            state = PausedCompositionState.Invalid
+        }
+        return isComplete
+    }
+
+    override fun apply() {
+        try {
+            when (state) {
+                PausedCompositionState.InitialPending,
+                PausedCompositionState.RecomposePending ->
+                    error("The paused composition has not completed yet")
+                PausedCompositionState.ApplyPending -> {
+                    applyChanges()
+                    state = PausedCompositionState.Applied
+                }
+                PausedCompositionState.Applied ->
+                    error("The paused composition has already been applied")
+                PausedCompositionState.Cancelled ->
+                    error("The paused composition has been cancelled")
+                PausedCompositionState.Invalid ->
+                    error("The paused composition is invalid because of a previous exception")
+            }
+        } catch (e: Exception) {
+            state = PausedCompositionState.Invalid
+            throw e
+        }
+    }
+
+    override fun cancel() {
+        state = PausedCompositionState.Cancelled
+        rememberManager.dispatchAbandons()
+        composition.pausedCompositionFinished()
+    }
+
+    private fun markComplete() {
+        state = PausedCompositionState.ApplyPending
+    }
+
+    private fun applyChanges() {
+        synchronized(lock) {
+            @Suppress("UNCHECKED_CAST")
+            try {
+                pausableApplier.playTo(applier as Applier<Any?>)
+                rememberManager.dispatchRememberObservers()
+                rememberManager.dispatchSideEffects()
+            } finally {
+                rememberManager.dispatchAbandons()
+                composition.pausedCompositionFinished()
+            }
+        }
+    }
+}
+
+internal class RecordingApplier<N>(root: N) : Applier<N> {
+    private val stack = mutableObjectListOf<N>()
+    private val operations = mutableIntListOf()
+    private val instances = mutableObjectListOf<Any?>()
+
+    override var current: N = root
+
+    override fun down(node: N) {
+        operations.add(DOWN)
+        instances.add(node)
+        stack.add(current)
+        current = node
+    }
+
+    override fun up() {
+        operations.add(UP)
+        current = stack.removeAt(stack.size - 1)
+    }
+
+    override fun remove(index: Int, count: Int) {
+        operations.add(REMOVE)
+        operations.add(index)
+        operations.add(count)
+    }
+
+    override fun move(from: Int, to: Int, count: Int) {
+        operations.add(MOVE)
+        operations.add(from)
+        operations.add(to)
+        operations.add(count)
+    }
+
+    override fun clear() {
+        operations.add(CLEAR)
+    }
+
+    override fun insertBottomUp(index: Int, instance: N) {
+        operations.add(INSERT_BOTTOM_UP)
+        operations.add(index)
+        instances.add(instance)
+    }
+
+    override fun insertTopDown(index: Int, instance: N) {
+        operations.add(INSERT_TOP_DOWN)
+        operations.add(index)
+        instances.add(instance)
+    }
+
+    override fun apply(block: N.(Any?) -> Unit, value: Any?) {
+        operations.add(APPLY)
+        instances.add(block)
+        instances.add(value)
+    }
+
+    override fun reuse() {
+        operations.add(REUSE)
+    }
+
+    fun playTo(applier: Applier<N>) {
+        var currentOperation = 0
+        var currentInstance = 0
+        val operations = operations
+        val size = operations.size
+        val instances = instances
+        applier.onBeginChanges()
+        try {
+            while (currentOperation < size) {
+                val operation = operations[currentOperation++]
+                when (operation) {
+                    UP -> {
+                        applier.up()
+                    }
+                    DOWN -> {
+                        @Suppress("UNCHECKED_CAST") val node = instances[currentInstance++] as N
+                        applier.down(node)
+                    }
+                    REMOVE -> {
+                        val index = operations[currentOperation++]
+                        val count = operations[currentOperation++]
+                        applier.remove(index, count)
+                    }
+                    MOVE -> {
+                        val from = operations[currentOperation++]
+                        val to = operations[currentOperation++]
+                        val count = operations[currentOperation++]
+                        applier.move(from, to, count)
+                    }
+                    CLEAR -> {
+                        applier.clear()
+                    }
+                    INSERT_TOP_DOWN -> {
+                        val index = operations[currentOperation++]
+
+                        @Suppress("UNCHECKED_CAST") val instance = instances[currentInstance++] as N
+                        applier.insertTopDown(index, instance)
+                    }
+                    INSERT_BOTTOM_UP -> {
+                        val index = operations[currentOperation++]
+
+                        @Suppress("UNCHECKED_CAST") val instance = instances[currentInstance++] as N
+                        applier.insertBottomUp(index, instance)
+                    }
+                    APPLY -> {
+                        @Suppress("UNCHECKED_CAST")
+                        val block = instances[currentInstance++] as Any?.(Any?) -> Unit
+                        val value = instances[currentInstance++]
+                        applier.apply(block, value)
+                    }
+                    REUSE -> {
+                        applier.reuse()
+                    }
+                }
+            }
+            runtimeCheck(currentInstance == instances.size) { "Applier operation size mismatch" }
+            instances.clear()
+            operations.clear()
+        } finally {
+            applier.onEndChanges()
+        }
+    }
+
+    // These commands need to be an integer, not just a enum value, as they are stored along side
+    // the commands integer parameters, so the values are explicitly set.
+    companion object {
+        const val UP = 0
+        const val DOWN = UP + 1
+        const val REMOVE = DOWN + 1
+        const val MOVE = REMOVE + 1
+        const val CLEAR = MOVE + 1
+        const val INSERT_BOTTOM_UP = CLEAR + 1
+        const val INSERT_TOP_DOWN = INSERT_BOTTOM_UP + 1
+        const val APPLY = INSERT_TOP_DOWN + 1
+        const val REUSE = APPLY + 1
+    }
+}
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/RecomposeScopeImpl.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/RecomposeScopeImpl.kt
index 88245a7..48657d5 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/RecomposeScopeImpl.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/RecomposeScopeImpl.kt
@@ -55,13 +55,16 @@
         ((lowBits shl 1) and highBits))
 }
 
-private const val UsedFlag = 0x01
-private const val DefaultsInScopeFlag = 0x02
-private const val DefaultsInvalidFlag = 0x04
-private const val RequiresRecomposeFlag = 0x08
-private const val SkippedFlag = 0x10
-private const val RereadingFlag = 0x20
-private const val ForcedRecomposeFlag = 0x40
+private const val UsedFlag = 0x001
+private const val DefaultsInScopeFlag = 0x002
+private const val DefaultsInvalidFlag = 0x004
+private const val RequiresRecomposeFlag = 0x008
+private const val SkippedFlag = 0x010
+private const val RereadingFlag = 0x020
+private const val ForcedRecomposeFlag = 0x040
+private const val ForceReusing = 0x080
+private const val Paused = 0x100
+private const val Resuming = 0x200
 
 internal interface RecomposeScopeOwner {
     fun invalidate(scope: RecomposeScopeImpl, instance: Any?): InvalidationResult
@@ -110,11 +113,51 @@
     var used: Boolean
         get() = flags and UsedFlag != 0
         set(value) {
-            if (value) {
-                flags = flags or UsedFlag
-            } else {
-                flags = flags and UsedFlag.inv()
-            }
+            flags =
+                if (value) {
+                    flags or UsedFlag
+                } else {
+                    flags and UsedFlag.inv()
+                }
+        }
+
+    /**
+     * Used to force a scope to the reusing state when a composition is paused while reusing
+     * content.
+     */
+    var reusing: Boolean
+        get() = flags and ForceReusing != 0
+        set(value) {
+            flags =
+                if (value) {
+                    flags or ForceReusing
+                } else {
+                    flags and ForceReusing.inv()
+                }
+        }
+
+    /** Used to flag a scope as paused for pausable compositions */
+    var paused: Boolean
+        get() = flags and Paused != 0
+        set(value) {
+            flags =
+                if (value) {
+                    flags or Paused
+                } else {
+                    flags and Paused.inv()
+                }
+        }
+
+    /** Used to flag a scope as paused for pausable compositions */
+    var resuming: Boolean
+        get() = flags and Resuming != 0
+        set(value) {
+            flags =
+                if (value) {
+                    flags or Resuming
+                } else {
+                    flags and Resuming.inv()
+                }
         }
 
     /**
@@ -299,7 +342,9 @@
     }
 
     fun scopeSkipped() {
-        skipped = true
+        if (!reusing) {
+            skipped = true
+        }
     }
 
     /**
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt
index 67d0d8a..e347076 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt
@@ -17,12 +17,15 @@
 package androidx.compose.runtime
 
 import androidx.collection.MutableScatterSet
+import androidx.collection.ScatterSet
+import androidx.collection.emptyScatterSet
 import androidx.collection.mutableScatterSetOf
 import androidx.compose.runtime.collection.fastForEach
 import androidx.compose.runtime.collection.mutableVectorOf
 import androidx.compose.runtime.collection.wrapIntoSet
 import androidx.compose.runtime.external.kotlinx.collections.immutable.persistentSetOf
 import androidx.compose.runtime.internal.AtomicReference
+import androidx.compose.runtime.internal.SnapshotThreadLocal
 import androidx.compose.runtime.internal.logError
 import androidx.compose.runtime.internal.trace
 import androidx.compose.runtime.snapshots.MutableSnapshot
@@ -232,6 +235,7 @@
     // End properties guarded by stateLock
 
     private val _state = MutableStateFlow(State.Inactive)
+    private val pausedScopes = SnapshotThreadLocal<MutableScatterSet<RecomposeScopeImpl>?>()
 
     /**
      * A [Job] used as a parent of any effects created by this [Recomposer]'s compositions. Its
@@ -1116,6 +1120,54 @@
         }
     }
 
+    internal override fun composeInitialPaused(
+        composition: ControlledComposition,
+        shouldPause: () -> Boolean,
+        content: @Composable () -> Unit
+    ): ScatterSet<RecomposeScopeImpl> {
+        return try {
+            composition.pausable(shouldPause) {
+                composeInitial(composition, content)
+                pausedScopes.get() ?: emptyScatterSet()
+            }
+        } finally {
+            pausedScopes.set(null)
+        }
+    }
+
+    internal override fun recomposePaused(
+        composition: ControlledComposition,
+        shouldPause: () -> Boolean,
+        invalidScopes: ScatterSet<RecomposeScopeImpl>
+    ): ScatterSet<RecomposeScopeImpl> {
+        return try {
+            recordComposerModifications()
+            composition.recordModificationsOf(invalidScopes.wrapIntoSet())
+            composition.pausable(shouldPause) {
+                val needsApply = performRecompose(composition, null)
+                if (needsApply != null) {
+                    performInitialMovableContentInserts(composition)
+                    needsApply.applyChanges()
+                    needsApply.applyLateChanges()
+                }
+                pausedScopes.get() ?: emptyScatterSet()
+            }
+        } finally {
+            pausedScopes.set(null)
+        }
+    }
+
+    override fun reportPausedScope(scope: RecomposeScopeImpl) {
+        val scopes =
+            pausedScopes.get()
+                ?: run {
+                    val newScopes = mutableScatterSetOf<RecomposeScopeImpl>()
+                    pausedScopes.set(newScopes)
+                    newScopes
+                }
+        scopes.add(scope)
+    }
+
     private fun performInitialMovableContentInserts(composition: ControlledComposition) {
         synchronized(stateLock) {
             if (!compositionValuesAwaitingInsert.fastAny { it.composition == composition }) return
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ChangeList.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ChangeList.kt
index 4780cdb..e2eaa76 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ChangeList.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ChangeList.kt
@@ -25,6 +25,7 @@
 import androidx.compose.runtime.InternalComposeApi
 import androidx.compose.runtime.MovableContentState
 import androidx.compose.runtime.MovableContentStateReference
+import androidx.compose.runtime.RecomposeScopeImpl
 import androidx.compose.runtime.RememberManager
 import androidx.compose.runtime.RememberObserver
 import androidx.compose.runtime.SlotTable
@@ -40,6 +41,7 @@
 import androidx.compose.runtime.changelist.Operation.EndCompositionScope
 import androidx.compose.runtime.changelist.Operation.EndCurrentGroup
 import androidx.compose.runtime.changelist.Operation.EndMovableContentPlacement
+import androidx.compose.runtime.changelist.Operation.EndResumingScope
 import androidx.compose.runtime.changelist.Operation.EnsureGroupStarted
 import androidx.compose.runtime.changelist.Operation.EnsureRootGroupStarted
 import androidx.compose.runtime.changelist.Operation.InsertSlots
@@ -48,11 +50,13 @@
 import androidx.compose.runtime.changelist.Operation.MoveNode
 import androidx.compose.runtime.changelist.Operation.ReleaseMovableGroupAtCurrent
 import androidx.compose.runtime.changelist.Operation.Remember
+import androidx.compose.runtime.changelist.Operation.RememberPausingScope
 import androidx.compose.runtime.changelist.Operation.RemoveCurrentGroup
 import androidx.compose.runtime.changelist.Operation.RemoveNode
 import androidx.compose.runtime.changelist.Operation.ResetSlots
 import androidx.compose.runtime.changelist.Operation.SideEffect
 import androidx.compose.runtime.changelist.Operation.SkipToEndOfCurrentGroup
+import androidx.compose.runtime.changelist.Operation.StartResumingScope
 import androidx.compose.runtime.changelist.Operation.TrimParentValues
 import androidx.compose.runtime.changelist.Operation.UpdateAnchoredValue
 import androidx.compose.runtime.changelist.Operation.UpdateAuxData
@@ -87,6 +91,18 @@
         operations.push(Remember) { setObject(Remember.Value, value) }
     }
 
+    fun pushRememberPausingScope(scope: RecomposeScopeImpl) {
+        operations.push(RememberPausingScope) { setObject(RememberPausingScope.Scope, scope) }
+    }
+
+    fun pushStartResumingScope(scope: RecomposeScopeImpl) {
+        operations.push(StartResumingScope) { setObject(StartResumingScope.Scope, scope) }
+    }
+
+    fun pushEndResumingScope(scope: RecomposeScopeImpl) {
+        operations.push(EndResumingScope) { setObject(EndResumingScope.Scope, scope) }
+    }
+
     fun pushUpdateValue(value: Any?, groupSlotIndex: Int) {
         operations.push(UpdateValue) {
             setObject(UpdateValue.Value, value)
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ComposerChangeListWriter.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ComposerChangeListWriter.kt
index 74c7146..9b87d45 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ComposerChangeListWriter.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/ComposerChangeListWriter.kt
@@ -25,6 +25,7 @@
 import androidx.compose.runtime.InternalComposeApi
 import androidx.compose.runtime.MovableContentState
 import androidx.compose.runtime.MovableContentStateReference
+import androidx.compose.runtime.RecomposeScopeImpl
 import androidx.compose.runtime.RememberObserver
 import androidx.compose.runtime.SlotReader
 import androidx.compose.runtime.SlotTable
@@ -192,6 +193,18 @@
         changeList.pushRemember(value)
     }
 
+    fun rememberPausingScope(scope: RecomposeScopeImpl) {
+        changeList.pushRememberPausingScope(scope)
+    }
+
+    fun startResumingScope(scope: RecomposeScopeImpl) {
+        changeList.pushStartResumingScope(scope)
+    }
+
+    fun endResumingScope(scope: RecomposeScopeImpl) {
+        changeList.pushEndResumingScope(scope)
+    }
+
     fun updateValue(value: Any?, groupSlotIndex: Int) {
         pushSlotTableOperationPreamble(useParentSlot = true)
         changeList.pushUpdateValue(value, groupSlotIndex)
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/Operation.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/Operation.kt
index 894f3d47..15aa234 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/Operation.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/changelist/Operation.kt
@@ -18,7 +18,6 @@
 
 import androidx.compose.runtime.Anchor
 import androidx.compose.runtime.Applier
-import androidx.compose.runtime.ComposeNodeLifecycleCallback
 import androidx.compose.runtime.Composition
 import androidx.compose.runtime.CompositionContext
 import androidx.compose.runtime.ControlledComposition
@@ -171,6 +170,66 @@
         }
     }
 
+    object RememberPausingScope : Operation(objects = 1) {
+        inline val Scope
+            get() = ObjectParameter<RecomposeScopeImpl>(0)
+
+        override fun objectParamName(parameter: ObjectParameter<*>): String =
+            when (parameter) {
+                Scope -> "scope"
+                else -> super.objectParamName(parameter)
+            }
+
+        override fun OperationArgContainer.execute(
+            applier: Applier<*>,
+            slots: SlotWriter,
+            rememberManager: RememberManager
+        ) {
+            val scope = getObject(Scope)
+            rememberManager.rememberPausingScope(scope)
+        }
+    }
+
+    object StartResumingScope : Operation(objects = 1) {
+        inline val Scope
+            get() = ObjectParameter<RecomposeScopeImpl>(0)
+
+        override fun objectParamName(parameter: ObjectParameter<*>): String =
+            when (parameter) {
+                Scope -> "scope"
+                else -> super.objectParamName(parameter)
+            }
+
+        override fun OperationArgContainer.execute(
+            applier: Applier<*>,
+            slots: SlotWriter,
+            rememberManager: RememberManager
+        ) {
+            val scope = getObject(Scope)
+            rememberManager.startResumingScope(scope)
+        }
+    }
+
+    object EndResumingScope : Operation(objects = 1) {
+        inline val Scope
+            get() = ObjectParameter<RecomposeScopeImpl>(0)
+
+        override fun objectParamName(parameter: ObjectParameter<*>): String =
+            when (parameter) {
+                Scope -> "scope"
+                else -> super.objectParamName(parameter)
+            }
+
+        override fun OperationArgContainer.execute(
+            applier: Applier<*>,
+            slots: SlotWriter,
+            rememberManager: RememberManager
+        ) {
+            val scope = getObject(Scope)
+            rememberManager.endResumingScope(scope)
+        }
+    }
+
     object AppendValue : Operation(objects = 2) {
         inline val Anchor
             get() = ObjectParameter<Anchor>(0)
@@ -467,7 +526,7 @@
             slots: SlotWriter,
             rememberManager: RememberManager
         ) {
-            (applier.current as ComposeNodeLifecycleCallback).onReuse()
+            applier.reuse()
         }
     }
 
@@ -492,7 +551,7 @@
         ) {
             val value = getObject(Value)
             val block = getObject(Block)
-            applier.current.block(value)
+            applier.apply(block, value)
         }
     }
 
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/internal/RememberEventDispatcher.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/internal/RememberEventDispatcher.kt
new file mode 100644
index 0000000..d9d78ea
--- /dev/null
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/internal/RememberEventDispatcher.kt
@@ -0,0 +1,301 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.compose.runtime.internal
+
+import androidx.collection.MutableIntList
+import androidx.collection.MutableScatterMap
+import androidx.collection.MutableScatterSet
+import androidx.collection.mutableScatterMapOf
+import androidx.collection.mutableScatterSetOf
+import androidx.compose.runtime.ComposeNodeLifecycleCallback
+import androidx.compose.runtime.RecomposeScopeImpl
+import androidx.compose.runtime.RememberManager
+import androidx.compose.runtime.RememberObserver
+import androidx.compose.runtime.Stack
+import androidx.compose.runtime.snapshots.fastForEach
+
+/**
+ * Used as a placeholder for paused compositions to ensure the remembers are dispatch in the correct
+ * order. While the paused composition is resuming all remembered objects are placed into the this
+ * classes list instead of the main list. As remembers are dispatched, this will dispatch remembers
+ * to the object remembered in the paused composition's content in the order that they would have
+ * been dispatched had the composition not been paused.
+ */
+internal class PausedCompositionRemembers(private val abandoning: MutableSet<RememberObserver>) :
+    RememberObserver {
+    val pausedRemembers = mutableListOf<RememberObserver>()
+
+    override fun onRemembered() {
+        pausedRemembers.fastForEach {
+            abandoning.remove(it)
+            it.onRemembered()
+        }
+    }
+
+    // These are never called
+    override fun onForgotten() {}
+
+    override fun onAbandoned() {}
+}
+
+/** Helper for collecting remember observers for later strictly ordered dispatch. */
+internal class RememberEventDispatcher(private val abandoning: MutableSet<RememberObserver>) :
+    RememberManager {
+    private val remembering = mutableListOf<RememberObserver>()
+    private var currentRememberingList = remembering
+    private val leaving = mutableListOf<Any>()
+    private val sideEffects = mutableListOf<() -> Unit>()
+    private var releasing: MutableScatterSet<ComposeNodeLifecycleCallback>? = null
+    private var pausedPlaceholders:
+        MutableScatterMap<RecomposeScopeImpl, PausedCompositionRemembers>? =
+        null
+    private val pending = mutableListOf<Any>()
+    private val priorities = MutableIntList()
+    private val afters = MutableIntList()
+    private var nestedRemembersLists: Stack<MutableList<RememberObserver>>? = null
+
+    override fun remembering(instance: RememberObserver) {
+        currentRememberingList.add(instance)
+    }
+
+    override fun forgetting(
+        instance: RememberObserver,
+        endRelativeOrder: Int,
+        priority: Int,
+        endRelativeAfter: Int
+    ) {
+        recordLeaving(instance, endRelativeOrder, priority, endRelativeAfter)
+    }
+
+    override fun sideEffect(effect: () -> Unit) {
+        sideEffects += effect
+    }
+
+    override fun deactivating(
+        instance: ComposeNodeLifecycleCallback,
+        endRelativeOrder: Int,
+        priority: Int,
+        endRelativeAfter: Int
+    ) {
+        recordLeaving(instance, endRelativeOrder, priority, endRelativeAfter)
+    }
+
+    override fun releasing(
+        instance: ComposeNodeLifecycleCallback,
+        endRelativeOrder: Int,
+        priority: Int,
+        endRelativeAfter: Int
+    ) {
+        val releasing =
+            releasing ?: mutableScatterSetOf<ComposeNodeLifecycleCallback>().also { releasing = it }
+
+        releasing += instance
+        recordLeaving(instance, endRelativeOrder, priority, endRelativeAfter)
+    }
+
+    override fun rememberPausingScope(scope: RecomposeScopeImpl) {
+        val pausedPlaceholder = PausedCompositionRemembers(abandoning)
+        (pausedPlaceholders
+            ?: mutableScatterMapOf<RecomposeScopeImpl, PausedCompositionRemembers>().also {
+                pausedPlaceholders = it
+            })[scope] = pausedPlaceholder
+        this.currentRememberingList.add(pausedPlaceholder)
+    }
+
+    override fun startResumingScope(scope: RecomposeScopeImpl) {
+        val placeholder = pausedPlaceholders?.get(scope)
+        if (placeholder != null) {
+            (nestedRemembersLists
+                    ?: Stack<MutableList<RememberObserver>>().also { nestedRemembersLists = it })
+                .push(currentRememberingList)
+            currentRememberingList = placeholder.pausedRemembers
+        }
+    }
+
+    override fun endResumingScope(scope: RecomposeScopeImpl) {
+        val pausedPlaceholders = pausedPlaceholders
+        if (pausedPlaceholders != null) {
+            val placeholder = pausedPlaceholders[scope]
+            if (placeholder != null) {
+                nestedRemembersLists?.pop()?.let { currentRememberingList = it }
+                pausedPlaceholders.remove(scope)
+            }
+        }
+    }
+
+    fun dispatchRememberObservers() {
+        // Add any pending out-of-order forgotten objects
+        processPendingLeaving(Int.MIN_VALUE)
+
+        // Send forgets and node callbacks
+        if (leaving.isNotEmpty()) {
+            trace("Compose:onForgotten") {
+                val releasing = releasing
+                for (i in leaving.size - 1 downTo 0) {
+                    val instance = leaving[i]
+                    if (instance is RememberObserver) {
+                        abandoning.remove(instance)
+                        instance.onForgotten()
+                    }
+                    if (instance is ComposeNodeLifecycleCallback) {
+                        // node callbacks are in the same queue as forgets to ensure ordering
+                        if (releasing != null && instance in releasing) {
+                            instance.onRelease()
+                        } else {
+                            instance.onDeactivate()
+                        }
+                    }
+                }
+            }
+        }
+
+        // Send remembers
+        if (remembering.isNotEmpty()) {
+            trace("Compose:onRemembered") { dispatchRememberList(remembering) }
+        }
+    }
+
+    private fun dispatchRememberList(list: List<RememberObserver>) {
+        list.fastForEach { instance ->
+            abandoning.remove(instance)
+            instance.onRemembered()
+        }
+    }
+
+    fun dispatchSideEffects() {
+        if (sideEffects.isNotEmpty()) {
+            trace("Compose:sideeffects") {
+                sideEffects.fastForEach { sideEffect -> sideEffect() }
+                sideEffects.clear()
+            }
+        }
+    }
+
+    fun dispatchAbandons() {
+        if (abandoning.isNotEmpty()) {
+            trace("Compose:abandons") {
+                val iterator = abandoning.iterator()
+                // remove elements one by one to ensure that abandons will not be dispatched
+                // second time in case [onAbandoned] throws.
+                while (iterator.hasNext()) {
+                    val instance = iterator.next()
+                    iterator.remove()
+                    instance.onAbandoned()
+                }
+            }
+        }
+    }
+
+    private fun recordLeaving(
+        instance: Any,
+        endRelativeOrder: Int,
+        priority: Int,
+        endRelativeAfter: Int
+    ) {
+        processPendingLeaving(endRelativeOrder)
+        if (endRelativeAfter in 0 until endRelativeOrder) {
+            pending.add(instance)
+            priorities.add(priority)
+            afters.add(endRelativeAfter)
+        } else {
+            leaving.add(instance)
+        }
+    }
+
+    private fun processPendingLeaving(endRelativeOrder: Int) {
+        if (pending.isNotEmpty()) {
+            var index = 0
+            var toAdd: MutableList<Any>? = null
+            var toAddAfter: MutableIntList? = null
+            var toAddPriority: MutableIntList? = null
+            while (index < afters.size) {
+                if (endRelativeOrder <= afters[index]) {
+                    val instance = pending.removeAt(index)
+                    val endRelativeAfter = afters.removeAt(index)
+                    val priority = priorities.removeAt(index)
+
+                    if (toAdd == null) {
+                        toAdd = mutableListOf(instance)
+                        toAddAfter = MutableIntList().also { it.add(endRelativeAfter) }
+                        toAddPriority = MutableIntList().also { it.add(priority) }
+                    } else {
+                        toAddPriority as MutableIntList
+                        toAddAfter as MutableIntList
+                        toAdd.add(instance)
+                        toAddAfter.add(endRelativeAfter)
+                        toAddPriority.add(priority)
+                    }
+                } else {
+                    index++
+                }
+            }
+            if (toAdd != null) {
+                toAddPriority as MutableIntList
+                toAddAfter as MutableIntList
+
+                // Sort the list into [after, -priority] order where it is ordered by after
+                // in ascending order as the primary key and priority in descending order as
+                // secondary key.
+
+                // For example if remember occurs after a child group it must be added after
+                // all the remembers of the child. This is reported with an after which is the
+                // slot index of the child's last slot. As this slot might be at the same
+                // location as where its parents ends this would be ambiguous which should
+                // first if both the two groups request a slot to be after the same slot.
+                // Priority is used to break the tie here which is the group index of the group
+                // which is leaving. Groups that are lower must be added before the parent's
+                // remember when they have the same after.
+
+                // The sort must be stable as as consecutive remembers in the same group after
+                // the same child will have the same after and priority.
+
+                // A selection sort is used here because it is stable and the groups are
+                // typically very short so this quickly exit list of one and not loop for
+                // for sizes of 2. As the information is split between three lists, to
+                // reduce allocations, [MutableList.sort] cannot be used as it doesn't have
+                // an option to supply a custom swap.
+                for (i in 0 until toAdd.size - 1) {
+                    for (j in i + 1 until toAdd.size) {
+                        val iAfter = toAddAfter[i]
+                        val jAfter = toAddAfter[j]
+                        if (
+                            iAfter < jAfter ||
+                                (jAfter == iAfter && toAddPriority[i] < toAddPriority[j])
+                        ) {
+                            toAdd.swap(i, j)
+                            toAddPriority.swap(i, j)
+                            toAddAfter.swap(i, j)
+                        }
+                    }
+                }
+                leaving.addAll(toAdd)
+            }
+        }
+    }
+}
+
+private fun <T> MutableList<T>.swap(a: Int, b: Int) {
+    val item = this[a]
+    this[a] = this[b]
+    this[b] = item
+}
+
+private fun MutableIntList.swap(a: Int, b: Int) {
+    val item = this[a]
+    this[a] = this[b]
+    this[b] = item
+}
diff --git a/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/CompositionTests.kt b/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/CompositionTests.kt
index c2560dd..9328a61 100644
--- a/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/CompositionTests.kt
+++ b/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/CompositionTests.kt
@@ -4800,16 +4800,16 @@
 private val rob_reports_to_alice = Report("Rob", "Alice")
 private val clark_reports_to_lois = Report("Clark", "Lois")
 
-private interface Counted {
+internal interface Counted {
     val count: Int
 }
 
-private interface Ordered {
+internal interface Ordered {
     val rememberOrder: Int
     val forgetOrder: Int
 }
 
-private interface Named {
+internal interface Named {
     val name: String
 }
 
diff --git a/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/PausableCompositionTests.kt b/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/PausableCompositionTests.kt
new file mode 100644
index 0000000..36a9747
--- /dev/null
+++ b/compose/runtime/runtime/src/nonEmulatorCommonTest/kotlin/androidx/compose/runtime/PausableCompositionTests.kt
@@ -0,0 +1,606 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.compose.runtime
+
+import androidx.compose.runtime.mock.EmptyApplier
+import androidx.compose.runtime.mock.Linear
+import androidx.compose.runtime.mock.MockViewValidator
+import androidx.compose.runtime.mock.Text
+import androidx.compose.runtime.mock.View
+import androidx.compose.runtime.mock.ViewApplier
+import androidx.compose.runtime.mock.compositionTest
+import androidx.compose.runtime.mock.validate
+import androidx.compose.runtime.mock.view
+import kotlin.coroutines.resume
+import kotlin.test.Ignore
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.test.runTest
+
+@Stable
+class PausableCompositionTests {
+    @Test
+    fun canCreateARootPausableComposition() = runTest {
+        val recomposer = Recomposer(coroutineContext)
+        val pausableComposition = PausableComposition(EmptyApplier(), recomposer)
+        pausableComposition.dispose()
+        recomposer.cancel()
+        recomposer.close()
+    }
+
+    @Test
+    fun canCreateANestedPausableComposition() = compositionTest {
+        compose {
+            val parent = rememberCompositionContext()
+            DisposableEffect(Unit) {
+                val pausableComposition = PausableComposition(EmptyApplier(), parent)
+                onDispose { pausableComposition.dispose() }
+            }
+        }
+    }
+
+    @Test
+    fun canRecordAComposition() = compositionTest {
+        // This just tests the recording mechanism used in the tests below.
+        val recording = recordTest {
+            compose { A() }
+
+            validate { this.A() }
+        }
+
+        // Legend for the recording:
+        //  +N: Enter N for functions A, B, C, D, (where A:1 is the first lambda in A())
+        //  -N: Exit N
+        //  *N: Calling N (e.g *B is recorded before B() is called).
+        //  ^n: calling remember for some value
+
+        // Here we expect the normal, synchronous, execution as the recorded composition is not
+        // pausable. That is if we see a *B that should immediately followed by a B+ its content and
+        // a B-.
+        assertEquals(
+            recording,
+            "+A, ^z, ^Y, *B, +B, *Linear, +A:1, *C, +C, ^x, *Text, -C, *D, +D, +D:1, *C, +C, " +
+                "^x, *Text, -C, *C, +C, ^x, *Text, -C, *C, +C, ^x, *Text, -C, -D:1, -D, -A:1, " +
+                "-B, -A"
+        )
+    }
+
+    @Test
+    @Ignore // Requires compiler support
+    fun canPauseContent() = compositionTest {
+        val awaiter = Awaiter()
+        var receivedIteration = 0
+        val recording = recordTest {
+            compose {
+                PausableContent(
+                    normalWorkflow {
+                        receivedIteration = iteration
+                        awaiter.done()
+                    }
+                ) {
+                    A()
+                }
+            }
+            awaiter.await()
+        }
+        validate { this.PausableContent { this.A() } }
+        assertEquals(10, receivedIteration)
+
+        // Same Legend as canRecordAComposition
+        // Here we expect all functions to exit before the content of the function is executed
+        // because the above will pause at every pause point. If we see a B* we should not receive
+        // a B+ until after the caller finishes. (e.g. A-).
+        assertEquals(
+            recording,
+            "+A, ^z, ^Y, *B, -A, +B, *Linear, -B, +A:1, *C, *D, -A:1, +C, " +
+                "^x, *Text, -C, +D, -D, +D:1, *C, *C, *C, -D:1, +C, ^x, *Text, -C, +C, ^x, *Text, " +
+                "-C, +C, ^x, *Text, -C"
+        )
+    }
+
+    @Test
+    @Ignore // Requires compiler support
+    fun canPauseReusableContent() = compositionTest {
+        val awaiter = Awaiter()
+        var receivedIteration = 0
+        val recording = recordTest {
+            compose {
+                PausableContent(
+                    reuseWorkflow {
+                        receivedIteration = iteration
+                        awaiter.done()
+                    }
+                ) {
+                    A()
+                }
+            }
+            awaiter.await()
+        }
+        validate { this.PausableContent { this.A() } }
+        assertEquals(10, receivedIteration)
+        // Same Legend as canRecordAComposition
+        // Here we expect the result to be the same as if we were inserting new content as in
+        // canPauseContent
+        assertEquals(
+            "+A, ^z, ^Y, *B, -A, +B, *Linear, -B, +A:1, *C, *D, -A:1, +C, " +
+                "^x, *Text, -C, +D, -D, +D:1, *C, *C, *C, -D:1, +C, ^x, *Text, -C, +C, ^x, *Text, " +
+                "-C, +C, ^x, *Text, -C",
+            recording
+        )
+    }
+
+    @Test
+    @Ignore // Requires compiler support
+    fun canPauseReusingContent() = compositionTest {
+        val awaiter = Awaiter()
+        var recording = ""
+        val workflow: Workflow = {
+            // Create the content
+            setContentWithReuse()
+            resumeTillComplete { false }
+            apply()
+
+            // Reuse the content
+            recording = recordTest {
+                setContentWithReuse()
+                resumeTillComplete { true }
+                apply()
+            }
+            awaiter.done()
+        }
+
+        compose { PausableContent(workflow) { A() } }
+        awaiter.await()
+        // Same Legend as canRecordAComposition
+        // Here we expect the result to be the same as if we were inserting new content as in
+        // canPauseContent
+        assertArrayEquals(
+            ("+A, ^z, ^Y, *B, -A, +B, *Linear, -B, +A:1, *C, *D, -A:1, +C, " +
+                    "^x, *Text, -C, +D, -D, +D:1, *C, *C, *C, -D:1, +C, ^x, *Text, -C, +C, ^x, *Text, " +
+                    "-C, +C, ^x, *Text, -C")
+                .splitRecording(),
+            recording.splitRecording()
+        )
+    }
+
+    @Test
+    fun applierOnlyCalledInApply() = compositionTest {
+        val awaiter = Awaiter()
+        var applier: ViewApplier? = null
+
+        val workflow = workflow {
+            setContent()
+
+            assertFalse(applier?.called == true, "Applier was called during set content")
+
+            resumeTillComplete { false }
+
+            assertFalse(applier?.called == true, "Applier was called during resume")
+
+            apply()
+
+            assertTrue(applier?.called == true, "Applier wasn't called")
+
+            awaiter.done()
+        }
+
+        compose {
+            PausableContent(workflow, { view -> ViewApplier(view).also { applier = it } }) { A() }
+        }
+        awaiter.await()
+    }
+
+    @Test
+    @Ignore // Requires compiler support
+    fun rememberOnlyCalledInApply() = compositionTest {
+        val awaiter = Awaiter()
+        var onRememberCalled = false
+
+        val workflow = workflow {
+            setContent()
+            assertFalse(onRememberCalled, "onRemember called during set content")
+
+            resumeTillComplete {
+                assertFalse(onRememberCalled, "onRemember called during resume")
+                true
+            }
+            assertFalse(onRememberCalled, "onRemember called before resume returned")
+
+            apply()
+
+            assertTrue(onRememberCalled, "onRemember was not called in apply")
+
+            awaiter.done()
+        }
+
+        fun rememberedObject(name: String) =
+            object : RememberObserver {
+                val name = name
+
+                override fun onRemembered() {
+                    onRememberCalled = true
+                    report("+$name")
+                }
+
+                override fun onForgotten() {
+                    report("-$name")
+                }
+
+                override fun onAbandoned() {
+                    report("!$name")
+                }
+            }
+
+        val recording = recordTest {
+            compose {
+                PausableContent(workflow) {
+                    val a = remember { rememberedObject("a") }
+                    report("C(${a.name})")
+                    B {
+                        val b = remember { rememberedObject("b") }
+                        report("C(${b.name})")
+                        B {
+                            val c = remember { rememberedObject("c") }
+                            report("C(${c.name})")
+                            C()
+                            val d = remember { rememberedObject("d") }
+                            report("C(${d.name})")
+                            D()
+                        }
+                    }
+                }
+            }
+
+            awaiter.await()
+        }
+        // Same Legend as canRecordAComposition except the addition of the C(N) added above and
+        // +a, +b, etc. which records when the remembered object are sent the on-remember. This
+        // ensures that all onRemember calls are made after the composition has completed.
+        assertEquals(
+            "C(a), +B, *Linear, -B, C(b), +B, *Linear, -B, C(c), C(d), +C, ^x, *Text, -C, +D, " +
+                "-D, +D:1, *C, *C, *C, -D:1, +C, ^x, *Text, -C, +C, ^x, *Text, -C, +C, ^x, *Text, " +
+                "-C, +a, +b, +c, +d",
+            recording
+        )
+    }
+
+    @Suppress("ListIterator")
+    @Test
+    fun pausable_testRemember_RememberForgetOrder() = compositionTest {
+        var order = 0
+        val objects = mutableListOf<Any>()
+        val newRememberObject = { name: String ->
+            object : RememberObserver, Counted, Ordered, Named {
+                    override var name = name
+                    override var count = 0
+                    override var rememberOrder = -1
+                    override var forgetOrder = -1
+
+                    override fun onRemembered() {
+                        assertEquals(-1, rememberOrder, "Only one call to onRemembered expected")
+                        rememberOrder = order++
+                        count++
+                    }
+
+                    override fun onForgotten() {
+                        assertEquals(-1, forgetOrder, "Only one call to onForgotten expected")
+                        forgetOrder = order++
+                        count--
+                    }
+
+                    override fun onAbandoned() {
+                        assertEquals(0, count, "onAbandoned called after onRemembered")
+                    }
+                }
+                .also { objects.add(it) }
+        }
+
+        @Suppress("UNUSED_PARAMETER") fun used(v: Any) {}
+
+        @Composable
+        fun Tree() {
+            used(remember { newRememberObject("L0B") })
+            Linear {
+                used(remember { newRememberObject("L1B") })
+                Linear {
+                    used(remember { newRememberObject("L2B") })
+                    Linear {
+                        used(remember { newRememberObject("L3B") })
+                        Linear { used(remember { newRememberObject("Leaf") }) }
+                        used(remember { newRememberObject("L3A") })
+                    }
+                    used(remember { newRememberObject("L2A") })
+                }
+                used(remember { newRememberObject("L1A") })
+            }
+            used(remember { newRememberObject("L0A") })
+        }
+
+        val awaiter = Awaiter()
+        val workFlow = normalWorkflow { awaiter.done() }
+
+        compose { PausableContent(workFlow) { Tree() } }
+        awaiter.await()
+
+        // Legend:
+        //   L<N><B|A>: where N is the nesting level and B is before the children and
+        //     A is after the children.
+        //   Leaf: the object remembered in the middle.
+        // This is asserting that the remember order is the same as it would have been had the
+        //   above composition was not paused.
+        assertEquals(
+            "L0B, L1B, L2B, L3B, Leaf, L3A, L2A, L1A, L0A",
+            objects
+                .mapNotNull { it as? Ordered }
+                .sortedBy { it.rememberOrder }
+                .joinToString { (it as Named).name },
+            "Expected enter order",
+        )
+    }
+}
+
+fun String.splitRecording() = split(", ")
+
+typealias Workflow = suspend PausableContentWorkflowScope.() -> Unit
+
+fun workflow(workflow: Workflow): Workflow = workflow
+
+fun reuseWorkflow(done: Workflow = {}) = workflow {
+    setContentWithReuse()
+    resumeTillComplete { true }
+    apply()
+    done()
+}
+
+fun normalWorkflow(done: Workflow = {}) = workflow {
+    setContent()
+    resumeTillComplete { true }
+    apply()
+    done()
+}
+
+private interface TestRecorder {
+    fun log(message: String)
+
+    fun logs(): String
+
+    fun clear()
+}
+
+private var recorder: TestRecorder =
+    object : TestRecorder {
+        override fun log(message: String) {}
+
+        override fun logs(): String = ""
+
+        override fun clear() {}
+    }
+
+private inline fun recordTest(block: () -> Unit): String {
+    val result = mutableListOf<String>()
+    val oldRecorder = recorder
+    recorder =
+        object : TestRecorder {
+            override fun log(message: String) {
+                result.add(message)
+            }
+
+            override fun logs() = result.joinToString()
+
+            override fun clear() {
+                result.clear()
+            }
+        }
+    block()
+    recorder = oldRecorder
+    return result.joinToString()
+}
+
+private fun report(message: String) {
+    synchronized(recorder) { recorder.log(message) }
+}
+
+private inline fun report(message: String, block: () -> Unit) {
+    report("+$message")
+    block()
+    report("-$message")
+}
+
+@Composable
+private fun A() {
+    report("A") {
+        report("^z")
+        val z = remember { 0 }
+        report("^Y")
+        val y = remember { 1 }
+        Text("A: $z $y")
+        report("*B")
+        B {
+            report("A:1") {
+                report("*C")
+                C()
+                report("*D")
+                D()
+            }
+        }
+    }
+}
+
+private fun MockViewValidator.PausableContent(content: MockViewValidator.() -> Unit) {
+    this.view("PausableContentHost") { this.view("PausableContent", content) }
+}
+
+private fun MockViewValidator.A() {
+    Text("A: 0 1")
+    this.B {
+        this.C()
+        this.D()
+    }
+}
+
+@Composable
+private fun B(content: @Composable () -> Unit) {
+    report("B") {
+        report("*Linear")
+        Linear(content)
+    }
+}
+
+private fun MockViewValidator.B(content: MockViewValidator.() -> Unit) {
+    this.Linear(content)
+}
+
+@Composable
+private fun C() {
+    report("C") {
+        report("^x")
+        val x = remember { 3 }
+        report("*Text")
+        Text("C: $x")
+    }
+}
+
+private fun MockViewValidator.C() {
+    this.Text("C: 3")
+}
+
+@Composable
+private fun D() {
+    report("D") {
+        Linear {
+            report("D:1") {
+                repeat(3) {
+                    report("*C")
+                    C()
+                }
+            }
+        }
+    }
+}
+
+private fun MockViewValidator.D() {
+    this.Linear { repeat(3) { this.C() } }
+}
+
+interface PausableContentWorkflowScope {
+    val iteration: Int
+    val applied: Boolean
+
+    fun setContent(): PausedComposition
+
+    fun setContentWithReuse(): PausedComposition
+
+    fun resumeTillComplete(shouldPause: () -> Boolean)
+
+    fun apply()
+}
+
+fun PausableContentWorkflowScope.run(shouldPause: () -> Boolean = { true }) {
+    setContent()
+    resumeTillComplete(shouldPause)
+    apply()
+}
+
+class PausableContentWorkflowDriver(
+    private val composition: PausableComposition,
+    private val content: @Composable () -> Unit,
+    private var host: View?,
+    private var contentView: View?
+) : PausableContentWorkflowScope {
+    private var pausedComposition: PausedComposition? = null
+    override var iteration = 0
+    override val applied: Boolean
+        get() = host == null && pausedComposition == null
+
+    override fun setContent(): PausedComposition {
+        checkPrecondition(pausedComposition == null)
+        return composition.setPausableContent(content).also { pausedComposition = it }
+    }
+
+    override fun setContentWithReuse(): PausedComposition {
+        checkPrecondition(pausedComposition == null)
+        return composition.setPausableContentWithReuse(content).also { pausedComposition = it }
+    }
+
+    override fun resumeTillComplete(shouldPause: () -> Boolean) {
+        val pausedComposition = pausedComposition
+        checkPrecondition(pausedComposition != null)
+        while (!pausedComposition.isComplete) {
+            pausedComposition.resume(shouldPause)
+            iteration++
+        }
+    }
+
+    override fun apply() {
+        val pausedComposition = pausedComposition
+        checkPrecondition(pausedComposition != null && pausedComposition.isComplete)
+        pausedComposition.apply()
+        this.pausedComposition = null
+        val host = host
+        val contentView = contentView
+        if (host != null && contentView != null) {
+            host.children.add(contentView)
+            this.host = null
+            this.contentView = null
+        }
+    }
+}
+
+@Composable
+private fun PausableContent(
+    workflow: suspend PausableContentWorkflowScope.() -> Unit = { run() },
+    createApplier: (view: View) -> Applier<View> = { ViewApplier(it) },
+    content: @Composable () -> Unit
+) {
+    val host = View().also { it.name = "PausableContentHost" }
+    val pausableContent = View().also { it.name = "PausableContent" }
+    ComposeNode<View, ViewApplier>(factory = { host }, update = {})
+    val parent = rememberCompositionContext()
+    val composition =
+        remember(parent) { PausableComposition(createApplier(pausableContent), parent) }
+    LaunchedEffect(content as Any) {
+        val scope = PausableContentWorkflowDriver(composition, content, host, pausableContent)
+        scope.workflow()
+    }
+    DisposableEffect(Unit) { onDispose { composition.dispose() } }
+}
+
+private class Awaiter {
+    private var continuation: CancellableContinuation<Unit>? = null
+    private var done = false
+
+    suspend fun await() {
+        if (!done) {
+            suspendCancellableCoroutine { continuation = it }
+        }
+    }
+
+    fun resume() {
+        val current = continuation
+        continuation = null
+        current?.resume(Unit)
+    }
+
+    fun done() {
+        done = true
+        resume()
+    }
+}
diff --git a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitChildrenTest.kt b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitChildrenTest.kt
index 2273b6a..3ea0bed 100644
--- a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitChildrenTest.kt
+++ b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitChildrenTest.kt
@@ -19,6 +19,7 @@
 import androidx.compose.foundation.layout.Box
 import androidx.compose.ui.Modifier
 import androidx.compose.ui.test.junit4.createComposeRule
+import androidx.compose.ui.zIndex
 import androidx.test.ext.junit.runners.AndroidJUnit4
 import androidx.test.filters.SmallTest
 import com.google.common.truth.Truth.assertThat
@@ -104,6 +105,52 @@
         assertThat(visitedChildren).containsExactly(child1, child2, child3).inOrder()
     }
 
+    @Test
+    fun visitChildrenInOtherLayoutNodesInDrawOrder_zIndex() {
+        // Arrange.
+        abstract class TrackedNode : Modifier.Node()
+        val (node, child1, child2, child3) = List(5) { object : TrackedNode() {} }
+        val visitedChildren = mutableListOf<Modifier.Node>()
+        rule.setContent {
+            Box(Modifier.elementOf(node)) {
+                Box(Modifier.elementOf(child1).zIndex(10f))
+                Box(Modifier.elementOf(child2).zIndex(-10f))
+                Box(Modifier.elementOf(child3))
+            }
+        }
+
+        // Act.
+        rule.runOnIdle {
+            node.visitChildren(Nodes.Any, zOrder = true) {
+                @Suppress("KotlinConstantConditions") if (it is TrackedNode) visitedChildren.add(it)
+            }
+        }
+
+        // Assert.
+        assertThat(visitedChildren).containsExactly(child2, child3, child1).inOrder()
+    }
+
+    @Test
+    fun visitChildrenInOtherLayoutNodesInDrawOrder_subcompose() {
+        // Arrange.
+        val (node, child1, child2, child3) = List(5) { object : Modifier.Node() {} }
+        val visitedChildren = mutableListOf<Modifier.Node>()
+        rule.setContent {
+            ReverseMeasureLayout(
+                Modifier.elementOf(node),
+                { Box(Modifier.elementOf(child1)) },
+                { Box(Modifier.elementOf(child2)) },
+                { Box(Modifier.elementOf(child3)) }
+            )
+        }
+
+        // Act.
+        rule.runOnIdle { node.visitChildren(Nodes.Any, zOrder = true) { visitedChildren.add(it) } }
+
+        // Assert.
+        assertThat(visitedChildren).containsExactly(child1, child2, child3).inOrder()
+    }
+
     @Ignore("b/278765590")
     @Test
     fun skipsUnattachedLocalChild() {
diff --git a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeIfTest.kt b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeIfTest.kt
index 63f82d9..6f1e610 100644
--- a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeIfTest.kt
+++ b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeIfTest.kt
@@ -19,6 +19,7 @@
 import androidx.compose.foundation.layout.Box
 import androidx.compose.ui.Modifier
 import androidx.compose.ui.test.junit4.createComposeRule
+import androidx.compose.ui.zIndex
 import androidx.test.ext.junit.runners.AndroidJUnit4
 import androidx.test.filters.SmallTest
 import com.google.common.truth.Truth.assertThat
@@ -134,6 +135,58 @@
             .inOrder()
     }
 
+    @Test
+    fun visitsItemsAcrossLayoutNodesInDrawOrder_zIndex() {
+        // Arrange.
+        abstract class TrackedNode : Modifier.Node()
+        val (node, child1, child2, child3) = List(5) { object : TrackedNode() {} }
+        val visitedChildren = mutableListOf<Modifier.Node>()
+        rule.setContent {
+            Box(Modifier.elementOf(node)) {
+                Box(Modifier.elementOf(child1).zIndex(10f))
+                Box(Modifier.elementOf(child2).zIndex(-10f))
+                Box(Modifier.elementOf(child3))
+            }
+        }
+
+        // Act.
+        rule.runOnIdle {
+            node.visitSubtreeIf(Nodes.Any, zOrder = true) {
+                @Suppress("KotlinConstantConditions") if (it is TrackedNode) visitedChildren.add(it)
+                true
+            }
+        }
+
+        // Assert.
+        assertThat(visitedChildren).containsExactly(child2, child3, child1).inOrder()
+    }
+
+    @Test
+    fun visitsItemsAcrossLayoutNodesInDrawOrder_subcompose() {
+        // Arrange.
+        val (node, child1, child2, child3) = List(5) { object : Modifier.Node() {} }
+        val visitedChildren = mutableListOf<Modifier.Node>()
+        rule.setContent {
+            ReverseMeasureLayout(
+                Modifier.elementOf(node),
+                { Box(Modifier.elementOf(child1)) },
+                { Box(Modifier.elementOf(child2)) },
+                { Box(Modifier.elementOf(child3)) }
+            )
+        }
+
+        // Act.
+        rule.runOnIdle {
+            node.visitSubtreeIf(Nodes.Any, zOrder = true) {
+                visitedChildren.add(it)
+                true
+            }
+        }
+
+        // Assert.
+        assertThat(visitedChildren).containsExactly(child1, child2, child3).inOrder()
+    }
+
     @Ignore("b/278765590")
     @Test
     fun skipsUnattachedItems() {
diff --git a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeTest.kt b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeTest.kt
index 59d8aed..c2b585b5 100644
--- a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeTest.kt
+++ b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/ModifierNodeVisitSubtreeTest.kt
@@ -19,6 +19,7 @@
 import androidx.compose.foundation.layout.Box
 import androidx.compose.ui.Modifier
 import androidx.compose.ui.test.junit4.createComposeRule
+import androidx.compose.ui.zIndex
 import androidx.test.ext.junit.runners.AndroidJUnit4
 import androidx.test.filters.SmallTest
 import com.google.common.truth.Truth.assertThat
@@ -67,9 +68,6 @@
         assertThat(visitedChildren).containsExactly(localChild1, localChild2).inOrder()
     }
 
-    // TODO(ralu): I feel that this order of visiting children is incorrect, and we should
-    //  visit children in the order of composition. So instead of a stack, we probably need
-    //  to use a queue to hold the intermediate nodes.
     @Test
     fun differentLayoutNodes() {
         // Arrange.
@@ -79,10 +77,10 @@
         val visitedChildren = mutableListOf<Modifier.Node>()
         rule.setContent {
             Box(Modifier.elementOf(node).elementOf(child1).elementOf(child2)) {
-                Box(Modifier.elementOf(child5).elementOf(child6)) {
-                    Box(Modifier.elementOf(child7).elementOf(child8))
+                Box(Modifier.elementOf(child3).elementOf(child4)) {
+                    Box(Modifier.elementOf(child5).elementOf(child6))
                 }
-                Box { Box(Modifier.elementOf(child3).elementOf(child4)) }
+                Box { Box(Modifier.elementOf(child7).elementOf(child8)) }
             }
         }
 
@@ -95,6 +93,54 @@
             .inOrder()
     }
 
+    @Test
+    fun differentLayoutNodesInDrawOrder_zIndex() {
+        // Arrange.
+        abstract class TrackedNode : Modifier.Node()
+        val (node, child1, child2, child3, child4) = List(5) { object : TrackedNode() {} }
+        val visitedChildren = mutableListOf<Modifier.Node>()
+        rule.setContent {
+            Box(Modifier.elementOf(node)) {
+                Box(Modifier.elementOf(child1))
+                Box(Modifier.elementOf(child2).zIndex(10f)) {
+                    Box(Modifier.elementOf(child3).zIndex(-10f))
+                }
+                Box { Box(Modifier.elementOf(child4)) }
+            }
+        }
+
+        // Act.
+        rule.runOnIdle {
+            node.visitSubtree(Nodes.Any, zOrder = true) {
+                @Suppress("KotlinConstantConditions") if (it is TrackedNode) visitedChildren.add(it)
+            }
+        }
+
+        // Assert.
+        assertThat(visitedChildren).containsExactly(child1, child4, child2, child3).inOrder()
+    }
+
+    @Test
+    fun differentLayoutNodesInDrawOrder_subcompose() {
+        // Arrange.
+        val (node, child1, child2, child3, child4) = List(5) { object : Modifier.Node() {} }
+        val visitedChildren = mutableListOf<Modifier.Node>()
+        rule.setContent {
+            ReverseMeasureLayout(
+                Modifier.elementOf(node),
+                { Box(Modifier.elementOf(child1)) },
+                { Box(Modifier.elementOf(child2)) { Box(Modifier.elementOf(child3)) } },
+                { Box { Box(Modifier.elementOf(child4)) } }
+            )
+        }
+
+        // Act.
+        rule.runOnIdle { node.visitSubtree(Nodes.Any, zOrder = true) { visitedChildren.add(it) } }
+
+        // Assert.
+        assertThat(visitedChildren).containsExactly(child1, child2, child3, child4).inOrder()
+    }
+
     @Ignore("b/278765590")
     @Test
     fun skipsUnattached() {
diff --git a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NestedVectorStackTests.kt b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NestedVectorStackTests.kt
deleted file mode 100644
index 74a3c0e..0000000
--- a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NestedVectorStackTests.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2022 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package androidx.compose.ui.node
-
-import androidx.compose.runtime.collection.mutableVectorOf
-import org.junit.Assert
-import org.junit.Test
-
-class NestedVectorStackTests {
-
-    @Test
-    fun testPushPopOrder() {
-        val stack = NestedVectorStack<Int>()
-        stack.push(mutableVectorOf(1, 2, 3))
-        stack.push(mutableVectorOf(4, 5, 6))
-        stack.push(mutableVectorOf())
-        stack.push(mutableVectorOf(7))
-        stack.push(mutableVectorOf(8, 9))
-        val result = buildString {
-            while (stack.isNotEmpty()) {
-                append(stack.pop())
-            }
-        }
-        Assert.assertEquals("987654321", result)
-    }
-
-    @Test
-    fun testPopInBetweenPushes() {
-        val stack = NestedVectorStack<Int>()
-        stack.push(mutableVectorOf(1, 2, 3, 4))
-        stack.pop()
-        stack.push(mutableVectorOf(4, 5, 6))
-        stack.pop()
-        stack.pop()
-        stack.push(mutableVectorOf())
-        stack.push(mutableVectorOf(5, 6, 7))
-        stack.push(mutableVectorOf(8, 9))
-        val result = buildString {
-            while (stack.isNotEmpty()) {
-                append(stack.pop())
-            }
-        }
-        Assert.assertEquals("987654321", result)
-    }
-}
diff --git a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NodeUtils.kt b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NodeUtils.kt
index 29b9c0a..e77e8d6 100644
--- a/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NodeUtils.kt
+++ b/compose/ui/ui/src/androidInstrumentedTest/kotlin/androidx/compose/ui/node/NodeUtils.kt
@@ -16,7 +16,10 @@
 
 package androidx.compose.ui.node
 
+import androidx.compose.runtime.Composable
 import androidx.compose.ui.Modifier
+import androidx.compose.ui.layout.Placeable
+import androidx.compose.ui.layout.SubcomposeLayout
 import androidx.compose.ui.platform.InspectorInfo
 
 /**
@@ -38,3 +41,30 @@
         name = "testNode"
     }
 }
+
+@Composable
+internal fun ReverseMeasureLayout(modifier: Modifier, vararg contents: @Composable () -> Unit) =
+    SubcomposeLayout(modifier) { constraints ->
+        var layoutWidth = constraints.minWidth
+        var layoutHeight = constraints.minHeight
+        val subcomposes = mutableListOf<List<Placeable>>()
+
+        // Measure in reverse order
+        contents.reversed().forEachIndexed { index, content ->
+            subcomposes.add(
+                0,
+                subcompose(index, content).map {
+                    it.measure(constraints).also { placeable ->
+                        layoutWidth = maxOf(layoutWidth, placeable.width)
+                        layoutHeight = maxOf(layoutHeight, placeable.height)
+                    }
+                }
+            )
+        }
+
+        layout(layoutWidth, layoutHeight) {
+
+            // But place in direct order - it sets direct draw order
+            subcomposes.forEach { placeables -> placeables.forEach { it.place(0, 0) } }
+        }
+    }
diff --git a/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/node/UiApplier.android.kt b/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/node/UiApplier.android.kt
index ef4b85e..2a1655b 100644
--- a/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/node/UiApplier.android.kt
+++ b/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/node/UiApplier.android.kt
@@ -45,4 +45,8 @@
         super.onEndChanges()
         root.owner?.onEndApplyChanges()
     }
+
+    override fun reuse() {
+        current.onReuse()
+    }
 }
diff --git a/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/DelegatingNodeTest.kt b/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/DelegatingNodeTest.kt
index c1363c7..0328f65 100644
--- a/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/DelegatingNodeTest.kt
+++ b/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/DelegatingNodeTest.kt
@@ -209,14 +209,14 @@
             layout(d)
         }
         val recorder = Recorder()
-        x.visitSubtree(Nodes.Draw, recorder)
+        x.visitSubtree(Nodes.Draw, block = recorder)
         assertThat(recorder.recorded)
             .isEqualTo(
                 listOf(
                     a.wrapped,
                     b,
-                    d,
                     c.wrapped,
+                    d,
                 )
             )
     }
diff --git a/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/NestedVectorStackTest.kt b/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/NestedVectorStackTest.kt
deleted file mode 100644
index 951cb3d..0000000
--- a/compose/ui/ui/src/androidUnitTest/kotlin/androidx/compose/ui/node/NestedVectorStackTest.kt
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright 2021 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package androidx.compose.ui.node
-
-import androidx.compose.runtime.collection.mutableVectorOf
-import com.google.common.truth.Truth
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.JUnit4
-
-@RunWith(JUnit4::class)
-class NestedVectorStackTest {
-
-    @Test
-    fun testEnumerationOrder() {
-        val stack = NestedVectorStack<Int>()
-        stack.push(mutableVectorOf(1, 2, 3))
-        stack.push(mutableVectorOf(4, 5, 6))
-
-        Truth.assertThat(stack.enumerate()).isEqualTo(listOf(6, 5, 4, 3, 2, 1))
-    }
-
-    @Test
-    fun testEnumerationOrderPartiallyPoppingMiddleVectors() {
-        val stack = NestedVectorStack<Int>()
-        stack.push(mutableVectorOf(1, 2, 3))
-
-        Truth.assertThat(stack.pop()).isEqualTo(3)
-
-        stack.push(mutableVectorOf(4, 5, 6))
-
-        Truth.assertThat(stack.pop()).isEqualTo(6)
-
-        Truth.assertThat(stack.enumerate()).isEqualTo(listOf(5, 4, 2, 1))
-    }
-
-    @Test
-    fun testEnumerationOrderFullyPoppingMiddleVectors() {
-        val stack = NestedVectorStack<Int>()
-        stack.push(mutableVectorOf(1, 2, 3))
-
-        Truth.assertThat(stack.pop()).isEqualTo(3)
-        Truth.assertThat(stack.pop()).isEqualTo(2)
-        Truth.assertThat(stack.pop()).isEqualTo(1)
-
-        stack.push(mutableVectorOf(4, 5, 6))
-
-        Truth.assertThat(stack.pop()).isEqualTo(6)
-
-        Truth.assertThat(stack.enumerate()).isEqualTo(listOf(5, 4))
-    }
-}
-
-internal fun <T> NestedVectorStack<T>.enumerate(): List<T> {
-    val result = mutableListOf<T>()
-    var item: T? = pop()
-    while (item != null) {
-        result.add(item)
-        item = if (isNotEmpty()) pop() else null
-    }
-    return result
-}
diff --git a/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/DelegatableNode.kt b/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/DelegatableNode.kt
index ffc88f5..f967cd8 100644
--- a/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/DelegatableNode.kt
+++ b/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/DelegatableNode.kt
@@ -99,48 +99,33 @@
     return null
 }
 
-internal inline fun DelegatableNode.visitSubtree(mask: Int, block: (Modifier.Node) -> Unit) {
-    // TODO(lmr): we might want to add some safety wheels to prevent this from being called
-    //  while one of the chains is being diffed / updated.
-    checkPrecondition(node.isAttached) { "visitSubtree called on an unattached node" }
-    var node: Modifier.Node? = node.child
-    var layout: LayoutNode? = requireLayoutNode()
-    // we use this bespoke data structure here specifically for traversing children. In the
-    // depth first traversal you would typically do a `stack.addAll(node.children)` type
-    // call, but to avoid enumerating the vector and moving into our stack, we simply keep
-    // a stack of vectors and keep track of where we are in each
-    val nodes = NestedVectorStack<LayoutNode>()
-    while (layout != null) {
-        // NOTE: the ?: is important here for the starting condition, since we are starting
-        // at THIS node, and not the head of this node chain.
-        node = node ?: layout.nodes.head
-        if (node.aggregateChildKindSet and mask != 0) {
-            while (node != null) {
-                if (node.kindSet and mask != 0) {
-                    block(node)
-                }
-                node = node.child
-            }
-        }
-        node = null
-        nodes.push(layout._children)
-        layout = if (nodes.isNotEmpty()) nodes.pop() else null
+private fun LayoutNode.getChildren(zOrder: Boolean) =
+    if (zOrder) {
+        zSortedChildren
+    } else {
+        _children
     }
+
+private fun MutableVector<Modifier.Node>.addLayoutNodeChildren(
+    node: Modifier.Node,
+    zOrder: Boolean,
+) {
+    node.requireLayoutNode().getChildren(zOrder).forEachReversed { add(it.nodes.head) }
 }
 
-private fun MutableVector<Modifier.Node>.addLayoutNodeChildren(node: Modifier.Node) {
-    node.requireLayoutNode()._children.forEachReversed { add(it.nodes.head) }
-}
-
-internal inline fun DelegatableNode.visitChildren(mask: Int, block: (Modifier.Node) -> Unit) {
+internal inline fun DelegatableNode.visitChildren(
+    mask: Int,
+    zOrder: Boolean,
+    block: (Modifier.Node) -> Unit
+) {
     check(node.isAttached) { "visitChildren called on an unattached node" }
     val branches = mutableVectorOf<Modifier.Node>()
     val child = node.child
-    if (child == null) branches.addLayoutNodeChildren(node) else branches.add(child)
+    if (child == null) branches.addLayoutNodeChildren(node, zOrder) else branches.add(child)
     while (branches.isNotEmpty()) {
         val branch = branches.removeAt(branches.lastIndex)
         if (branch.aggregateChildKindSet and mask == 0) {
-            branches.addLayoutNodeChildren(branch)
+            branches.addLayoutNodeChildren(branch, zOrder)
             // none of these nodes match the mask, so don't bother traversing them
             continue
         }
@@ -159,11 +144,15 @@
  * visit the shallow tree of children of a given mask, but if block returns true, we will continue
  * traversing below it
  */
-internal inline fun DelegatableNode.visitSubtreeIf(mask: Int, block: (Modifier.Node) -> Boolean) {
+internal inline fun DelegatableNode.visitSubtreeIf(
+    mask: Int,
+    zOrder: Boolean,
+    block: (Modifier.Node) -> Boolean
+) {
     checkPrecondition(node.isAttached) { "visitSubtreeIf called on an unattached node" }
     val branches = mutableVectorOf<Modifier.Node>()
     val child = node.child
-    if (child == null) branches.addLayoutNodeChildren(node) else branches.add(child)
+    if (child == null) branches.addLayoutNodeChildren(node, zOrder) else branches.add(child)
     outer@ while (branches.isNotEmpty()) {
         val branch = branches.removeAt(branches.size - 1)
         if (branch.aggregateChildKindSet and mask != 0) {
@@ -176,7 +165,7 @@
                 node = node.child
             }
         }
-        branches.addLayoutNodeChildren(branch)
+        branches.addLayoutNodeChildren(branch, zOrder)
     }
 }
 
@@ -264,33 +253,41 @@
     return null
 }
 
-internal inline fun <reified T> DelegatableNode.visitSubtree(
-    type: NodeKind<T>,
-    block: (T) -> Unit
-) = visitSubtree(type.mask) { it.dispatchForKind(type, block) }
-
 internal inline fun <reified T> DelegatableNode.visitChildren(
     type: NodeKind<T>,
+    zOrder: Boolean = false,
     block: (T) -> Unit
-) = visitChildren(type.mask) { it.dispatchForKind(type, block) }
+) = visitChildren(type.mask, zOrder) { it.dispatchForKind(type, block) }
 
 internal inline fun <reified T> DelegatableNode.visitSelfAndChildren(
     type: NodeKind<T>,
+    zOrder: Boolean = false,
     block: (T) -> Unit
 ) {
     node.dispatchForKind(type, block)
-    visitChildren(type.mask) { it.dispatchForKind(type, block) }
+    visitChildren(type.mask, zOrder) { it.dispatchForKind(type, block) }
 }
 
 internal inline fun <reified T> DelegatableNode.visitSubtreeIf(
     type: NodeKind<T>,
+    zOrder: Boolean = false,
     block: (T) -> Boolean
 ) =
-    visitSubtreeIf(type.mask) foo@{ node ->
+    visitSubtreeIf(type.mask, zOrder) foo@{ node ->
         node.dispatchForKind(type) { if (!block(it)) return@foo false }
         true
     }
 
+internal inline fun <reified T> DelegatableNode.visitSubtree(
+    type: NodeKind<T>,
+    zOrder: Boolean = false,
+    block: (T) -> Unit
+) =
+    visitSubtreeIf(type.mask, zOrder) {
+        it.dispatchForKind(type, block)
+        true
+    }
+
 internal fun DelegatableNode.has(type: NodeKind<*>): Boolean =
     node.aggregateChildKindSet and type.mask != 0
 
diff --git a/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/NestedVectorStack.kt b/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/NestedVectorStack.kt
deleted file mode 100644
index 7f93d07..0000000
--- a/compose/ui/ui/src/commonMain/kotlin/androidx/compose/ui/node/NestedVectorStack.kt
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2022 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package androidx.compose.ui.node
-
-import androidx.compose.runtime.collection.MutableVector
-
-internal class NestedVectorStack<T> {
-    // number of vectors in the stack
-    private var size = 0
-    // holds the current "top" index for each vector
-    private var currentIndexes = IntArray(16)
-    private var vectors = arrayOfNulls<MutableVector<T>>(16)
-
-    fun isNotEmpty(): Boolean {
-        return size > 0 && currentIndexes[size - 1] >= 0
-    }
-
-    fun pop(): T {
-        check(size > 0) { "Cannot call pop() on an empty stack. Guard with a call to isNotEmpty()" }
-        val indexOfVector = size - 1
-        val indexOfItem = currentIndexes[indexOfVector]
-        val vector = vectors[indexOfVector]!!
-        if (indexOfItem > 0) currentIndexes[indexOfVector]--
-        else if (indexOfItem == 0) {
-            vectors[indexOfVector] = null
-            size--
-        }
-        return vector[indexOfItem]
-    }
-
-    fun push(vector: MutableVector<T>) {
-        // if the vector is empty there is no reason for us to add it
-        if (vector.isEmpty()) return
-        val nextIndex = size
-        // check to see that we have capacity to add another vector
-        if (nextIndex >= currentIndexes.size) {
-            currentIndexes = currentIndexes.copyOf(currentIndexes.size * 2)
-            vectors = vectors.copyOf(vectors.size * 2)
-        }
-        currentIndexes[nextIndex] = vector.size - 1
-        vectors[nextIndex] = vector
-        size++
-    }
-}
diff --git a/graphics/graphics-core/src/main/java/androidx/graphics/lowlatency/CanvasFrontBufferedRenderer.kt b/graphics/graphics-core/src/main/java/androidx/graphics/lowlatency/CanvasFrontBufferedRenderer.kt
index 598c67e..2110acd 100644
--- a/graphics/graphics-core/src/main/java/androidx/graphics/lowlatency/CanvasFrontBufferedRenderer.kt
+++ b/graphics/graphics-core/src/main/java/androidx/graphics/lowlatency/CanvasFrontBufferedRenderer.kt
@@ -70,13 +70,15 @@
 @JvmOverloads
 constructor(
     surfaceView: SurfaceView,
-    private val callback: Callback<T>,
+    callback: Callback<T>,
     @HardwareBufferFormat val bufferFormat: Int = HardwareBuffer.RGBA_8888
 ) {
 
     /** Target SurfaceView for rendering */
     private var mSurfaceView: SurfaceView? = null
 
+    private var mCallback: Callback<T>? = null
+
     /**
      * Executor used to deliver callbacks for rendering as well as issuing surface control
      * transactions
@@ -185,6 +187,7 @@
 
     init {
         mSurfaceView = surfaceView
+        mCallback = callback
         surfaceView.holder.addCallback(mHolderCallback)
         with(surfaceView.holder) {
             if (surface != null && surface.isValid) {
@@ -253,7 +256,7 @@
                                     }
                                     canvas.drawColor(Color.BLACK, BlendMode.CLEAR)
                                 }
-                                callback.onDrawFrontBufferedLayer(canvas, width, height, param)
+                                mCallback?.onDrawFrontBufferedLayer(canvas, width, height, param)
                             }
 
                             @SuppressLint("WrongConstant")
@@ -293,7 +296,7 @@
                                             transformHint
                                         )
                                     }
-                                    callback.onFrontBufferedLayerRenderComplete(
+                                    mCallback?.onFrontBufferedLayerRenderComplete(
                                         frontBufferSurfaceControl,
                                         transaction
                                     )
@@ -460,7 +463,7 @@
             if (transform != BufferTransformHintResolver.UNKNOWN_TRANSFORM) {
                 transaction.setBufferTransform(parentSurfaceControl, transform)
             }
-            callback.onMultiBufferedLayerRenderComplete(
+            mCallback?.onMultiBufferedLayerRenderComplete(
                 frontBufferSurfaceControl,
                 parentSurfaceControl,
                 transaction
@@ -566,7 +569,7 @@
                     with(multiBufferedRenderer) {
                         mMultiBufferedRenderNode?.let { renderNode ->
                             val canvas = renderNode.beginRecording()
-                            callback.onDrawMultiBufferedLayer(canvas, width, height, params)
+                            mCallback?.onDrawMultiBufferedLayer(canvas, width, height, params)
                             renderNode.endRecording()
                         }
 
@@ -671,6 +674,7 @@
             mSurfaceView?.holder?.removeCallback(mHolderCallback)
             mSurfaceView = null
             releaseInternal(cancelPending) {
+                mCallback = null
                 onReleaseComplete?.invoke()
                 mHandlerThread.quit()
             }
diff --git a/health/connect/connect-client/build.gradle b/health/connect/connect-client/build.gradle
index 9b1779f..b212af5 100644
--- a/health/connect/connect-client/build.gradle
+++ b/health/connect/connect-client/build.gradle
@@ -77,8 +77,7 @@
     }
     testOptions.unitTests.includeAndroidResources = true
     namespace "androidx.health.connect.client"
-    compileSdk = 34
-    compileSdkExtension = 10
+    compileSdk = 35
     // TODO(b/352609562): Typedef with `toLong()`
     experimentalProperties["android.lint.useK2Uast"] = false
 }
diff --git a/health/connect/connect-client/samples/build.gradle b/health/connect/connect-client/samples/build.gradle
index 08b7839..fb7bcef 100644
--- a/health/connect/connect-client/samples/build.gradle
+++ b/health/connect/connect-client/samples/build.gradle
@@ -51,4 +51,5 @@
     defaultConfig {
         minSdkVersion 26
     }
+    compileSdk = 35
 }
diff --git a/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/HealthConnectClientUpsideDownImplTest.kt b/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/HealthConnectClientUpsideDownImplTest.kt
index 14f303c..ee854d9 100644
--- a/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/HealthConnectClientUpsideDownImplTest.kt
+++ b/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/HealthConnectClientUpsideDownImplTest.kt
@@ -95,7 +95,7 @@
                 context.packageName,
                 PackageManager.PackageInfoFlags.of(PackageManager.GET_PERMISSIONS.toLong())
             )
-            .requestedPermissions
+            .requestedPermissions!!
             .filter { it.startsWith(PERMISSION_PREFIX) }
             .toTypedArray()
 
diff --git a/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/platform/aggregate/HealthConnectClientAggregationExtensionsTest.kt b/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/platform/aggregate/HealthConnectClientAggregationExtensionsTest.kt
index 05fe60c..653ea0c 100644
--- a/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/platform/aggregate/HealthConnectClientAggregationExtensionsTest.kt
+++ b/health/connect/connect-client/src/androidTest/java/androidx/health/connect/client/impl/platform/aggregate/HealthConnectClientAggregationExtensionsTest.kt
@@ -78,7 +78,7 @@
                 context.packageName,
                 PackageManager.PackageInfoFlags.of(PackageManager.GET_PERMISSIONS.toLong())
             )
-            .requestedPermissions
+            .requestedPermissions!!
             .filter { it.startsWith(PERMISSION_PREFIX) }
             .toTypedArray()
 
diff --git a/health/connect/connect-client/src/test/java/androidx/health/connect/client/HealthConnectClientTest.kt b/health/connect/connect-client/src/test/java/androidx/health/connect/client/HealthConnectClientTest.kt
index a03c963..e38be6e 100644
--- a/health/connect/connect-client/src/test/java/androidx/health/connect/client/HealthConnectClientTest.kt
+++ b/health/connect/connect-client/src/test/java/androidx/health/connect/client/HealthConnectClientTest.kt
@@ -312,7 +312,7 @@
         @Suppress("Deprecation")
         packageInfo.versionCode = versionCode
         packageInfo.applicationInfo = ApplicationInfo()
-        packageInfo.applicationInfo.enabled = enabled
+        packageInfo.applicationInfo!!.enabled = enabled
         val packageManager = context.packageManager
         Shadows.shadowOf(packageManager).installPackage(packageInfo)
     }
diff --git a/health/connect/connect-client/src/test/java/androidx/health/connect/client/feature/HealthConnectApkImplTest.kt b/health/connect/connect-client/src/test/java/androidx/health/connect/client/feature/HealthConnectApkImplTest.kt
index dea6c6b..47d6fb9 100644
--- a/health/connect/connect-client/src/test/java/androidx/health/connect/client/feature/HealthConnectApkImplTest.kt
+++ b/health/connect/connect-client/src/test/java/androidx/health/connect/client/feature/HealthConnectApkImplTest.kt
@@ -121,7 +121,7 @@
         @Suppress("Deprecation")
         packageInfo.versionCode = versionCode
         packageInfo.applicationInfo = ApplicationInfo()
-        packageInfo.applicationInfo.enabled = true
+        packageInfo.applicationInfo!!.enabled = true
         val packageManager = context.packageManager
         Shadows.shadowOf(packageManager).installPackage(packageInfo)
     }
diff --git a/health/connect/connect-client/src/test/java/androidx/health/connect/client/impl/HealthConnectClientImplTest.kt b/health/connect/connect-client/src/test/java/androidx/health/connect/client/impl/HealthConnectClientImplTest.kt
index f1480f8..2bb9f0c 100644
--- a/health/connect/connect-client/src/test/java/androidx/health/connect/client/impl/HealthConnectClientImplTest.kt
+++ b/health/connect/connect-client/src/test/java/androidx/health/connect/client/impl/HealthConnectClientImplTest.kt
@@ -841,7 +841,7 @@
         val packageInfo = PackageInfo()
         packageInfo.packageName = packageName
         packageInfo.applicationInfo = ApplicationInfo()
-        packageInfo.applicationInfo.enabled = enabled
+        packageInfo.applicationInfo!!.enabled = enabled
         val packageManager = context.packageManager
         Shadows.shadowOf(packageManager).installPackage(packageInfo)
     }
diff --git a/health/connect/connect-client/src/test/java/androidx/health/platform/client/impl/ServiceBackedHealthDataClientTest.kt b/health/connect/connect-client/src/test/java/androidx/health/platform/client/impl/ServiceBackedHealthDataClientTest.kt
index 609c83a5..ff2ea5c 100644
--- a/health/connect/connect-client/src/test/java/androidx/health/platform/client/impl/ServiceBackedHealthDataClientTest.kt
+++ b/health/connect/connect-client/src/test/java/androidx/health/platform/client/impl/ServiceBackedHealthDataClientTest.kt
@@ -186,7 +186,7 @@
         val packageInfo = PackageInfo()
         packageInfo.packageName = packageName
         packageInfo.applicationInfo = ApplicationInfo()
-        packageInfo.applicationInfo.enabled = enabled
+        packageInfo.applicationInfo!!.enabled = enabled
         val packageManager = context.packageManager
         shadowOf(packageManager).installPackage(packageInfo)
     }
diff --git a/health/connect/connect-testing/build.gradle b/health/connect/connect-testing/build.gradle
index 9439dd3..1c1c2b5 100644
--- a/health/connect/connect-testing/build.gradle
+++ b/health/connect/connect-testing/build.gradle
@@ -47,8 +47,7 @@
     }
     namespace "androidx.health.connect.testing"
     testOptions.unitTests.includeAndroidResources = true
-    compileSdk = 34
-    compileSdkExtension = 10
+    compileSdk = 35
 }
 
 androidx {
diff --git a/health/connect/connect-testing/samples/build.gradle b/health/connect/connect-testing/samples/build.gradle
index c93f7cc..a9cb0e5 100644
--- a/health/connect/connect-testing/samples/build.gradle
+++ b/health/connect/connect-testing/samples/build.gradle
@@ -51,6 +51,7 @@
     defaultConfig {
         minSdkVersion 26
     }
+    compileSdk = 35
 }
 
 tasks.withType(KotlinCompile).configureEach {
diff --git a/libraryversions.toml b/libraryversions.toml
index 184139a..e1109b6 100644
--- a/libraryversions.toml
+++ b/libraryversions.toml
@@ -1,6 +1,6 @@
 [versions]
 ACTIVITY = "1.10.0-alpha02"
-ANNOTATION = "1.9.0-alpha02"
+ANNOTATION = "1.9.0-alpha03"
 ANNOTATION_EXPERIMENTAL = "1.5.0-alpha01"
 APPCOMPAT = "1.8.0-alpha01"
 APPSEARCH = "1.1.0-alpha05"
diff --git a/pdf/pdf-viewer-fragment/src/main/java/androidx/pdf/viewer/fragment/PdfViewerFragment.kt b/pdf/pdf-viewer-fragment/src/main/java/androidx/pdf/viewer/fragment/PdfViewerFragment.kt
index d31fa8e..db1406d 100644
--- a/pdf/pdf-viewer-fragment/src/main/java/androidx/pdf/viewer/fragment/PdfViewerFragment.kt
+++ b/pdf/pdf-viewer-fragment/src/main/java/androidx/pdf/viewer/fragment/PdfViewerFragment.kt
@@ -66,6 +66,7 @@
 import androidx.pdf.widget.ZoomView
 import androidx.pdf.widget.ZoomView.ZoomScroll
 import com.google.android.material.floatingactionbutton.FloatingActionButton
+import java.io.IOException
 import kotlinx.coroutines.launch
 
 /**
@@ -298,7 +299,7 @@
                         }
                     }
                 },
-                onDocumentLoadFailure = { thrown -> onLoadDocumentError(thrown) }
+                onDocumentLoadFailure = { thrown -> showLoadingErrorView(thrown) }
             )
 
         setUpEditFab()
@@ -630,7 +631,7 @@
                 // app that owns it has been killed by the system. We will still recover,
                 // but log this.
                 viewState.set(ViewState.ERROR)
-                onLoadDocumentError(e)
+                showLoadingErrorView(e)
             }
         }
     }
@@ -737,6 +738,13 @@
         )
     }
 
+    private fun showLoadingErrorView(error: Throwable) {
+        context?.resources?.getString(R.string.error_cannot_open_pdf)?.let {
+            loadingView?.showErrorView(it)
+        }
+        onLoadDocumentError(error)
+    }
+
     private fun loadFile(fileUri: Uri) {
         Preconditions.checkNotNull(fileUri)
         Preconditions.checkArgument(
@@ -759,8 +767,13 @@
         try {
             validateFileUri(fileUri)
             fetchFile(fileUri)
-        } catch (e: SecurityException) {
-            onLoadDocumentError(e)
+        } catch (error: Exception) {
+            when (error) {
+                is IOException,
+                is SecurityException,
+                is NullPointerException -> showLoadingErrorView(error)
+                else -> throw error
+            }
         }
         if (localUri != null && localUri != fileUri) {
             annotationButton?.hide()
@@ -787,7 +800,7 @@
                 }
 
                 override fun failed(thrown: Throwable) {
-                    onLoadDocumentError(thrown)
+                    showLoadingErrorView(thrown)
                 }
 
                 override fun progress(progress: Float) {}
diff --git a/pdf/pdf-viewer/src/main/java/androidx/pdf/viewer/loader/PdfLoaderCallbacksImpl.kt b/pdf/pdf-viewer/src/main/java/androidx/pdf/viewer/loader/PdfLoaderCallbacksImpl.kt
index b5fbe27..6599830 100644
--- a/pdf/pdf-viewer/src/main/java/androidx/pdf/viewer/loader/PdfLoaderCallbacksImpl.kt
+++ b/pdf/pdf-viewer/src/main/java/androidx/pdf/viewer/loader/PdfLoaderCallbacksImpl.kt
@@ -228,9 +228,6 @@
                         "Document not loaded but status " + status.number
                     )
                 PdfStatus.PDF_ERROR -> {
-                    loadingView.showErrorView(
-                        context.resources.getString(R.string.error_cannot_open_pdf)
-                    )
                     handleError(status)
                 }
                 PdfStatus.FILE_ERROR,
diff --git a/wear/compose/compose-foundation/api/current.txt b/wear/compose/compose-foundation/api/current.txt
index ea38e1a..596925c 100644
--- a/wear/compose/compose-foundation/api/current.txt
+++ b/wear/compose/compose-foundation/api/current.txt
@@ -388,8 +388,10 @@
 
   public sealed interface LazyColumnLayoutInfo {
     method public int getTotalItemsCount();
+    method public long getViewportSize();
     method public java.util.List<androidx.wear.compose.foundation.lazy.LazyColumnVisibleItemInfo> getVisibleItems();
     property public abstract int totalItemsCount;
+    property public abstract long viewportSize;
     property public abstract java.util.List<androidx.wear.compose.foundation.lazy.LazyColumnVisibleItemInfo> visibleItems;
   }
 
diff --git a/wear/compose/compose-foundation/api/restricted_current.txt b/wear/compose/compose-foundation/api/restricted_current.txt
index ea38e1a..596925c 100644
--- a/wear/compose/compose-foundation/api/restricted_current.txt
+++ b/wear/compose/compose-foundation/api/restricted_current.txt
@@ -388,8 +388,10 @@
 
   public sealed interface LazyColumnLayoutInfo {
     method public int getTotalItemsCount();
+    method public long getViewportSize();
     method public java.util.List<androidx.wear.compose.foundation.lazy.LazyColumnVisibleItemInfo> getVisibleItems();
     property public abstract int totalItemsCount;
+    property public abstract long viewportSize;
     property public abstract java.util.List<androidx.wear.compose.foundation.lazy.LazyColumnVisibleItemInfo> visibleItems;
   }
 
diff --git a/wear/compose/compose-foundation/src/androidTest/kotlin/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfoTest.kt b/wear/compose/compose-foundation/src/androidTest/kotlin/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfoTest.kt
new file mode 100644
index 0000000..f07a6b7
--- /dev/null
+++ b/wear/compose/compose-foundation/src/androidTest/kotlin/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfoTest.kt
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.wear.compose.foundation.lazy
+
+import androidx.compose.foundation.layout.Arrangement
+import androidx.compose.foundation.layout.Box
+import androidx.compose.foundation.layout.height
+import androidx.compose.foundation.layout.requiredSize
+import androidx.compose.foundation.layout.width
+import androidx.compose.runtime.Composable
+import androidx.compose.runtime.getValue
+import androidx.compose.runtime.mutableStateOf
+import androidx.compose.runtime.setValue
+import androidx.compose.ui.Modifier
+import androidx.compose.ui.test.junit4.createComposeRule
+import androidx.compose.ui.unit.Dp
+import androidx.compose.ui.unit.IntSize
+import androidx.compose.ui.unit.dp
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import androidx.test.filters.MediumTest
+import com.google.common.truth.Truth.assertThat
+import com.google.common.truth.Truth.assertWithMessage
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.runner.RunWith
+
+@MediumTest
+@RunWith(AndroidJUnit4::class)
+class LazyColumnLayoutInfoTest {
+    @get:Rule val rule = createComposeRule()
+
+    private var itemSizePx: Int = 50
+    private var itemSizeDp: Dp = Dp.Infinity
+
+    @Before
+    fun before() {
+        with(rule.density) { itemSizeDp = itemSizePx.toDp() }
+    }
+
+    @Test
+    fun visibleItemsAreCorrect() {
+        lateinit var state: LazyColumnState
+
+        rule.setContent {
+            LazyColumn(
+                state = rememberLazyColumnState().also { state = it },
+                // Viewport take 4 items, item 0 is exactly above the center and there is space for
+                // two more items below the center line.
+                modifier = Modifier.requiredSize(itemSizeDp * 4f),
+                verticalArrangement = Arrangement.spacedBy(0.dp)
+            ) {
+                items((0..5).toList()) { Box(Modifier.requiredSize(itemSizeDp)) }
+            }
+        }
+
+        rule.runOnIdle {
+            assertThat(state.layoutInfo.viewportSize.height).isEqualTo(itemSizePx * 4)
+            // Start offset compensates for the layout where the first item is exactly above the
+            // center line.
+            state.layoutInfo.assertVisibleItems(count = 3, startOffset = itemSizePx)
+        }
+    }
+
+    @Test
+    fun visibleItemsAreCorrectWithSpacing() {
+        lateinit var state: LazyColumnState
+
+        rule.setContent {
+            LazyColumn(
+                state = rememberLazyColumnState().also { state = it },
+                // Viewport take 4 items, item 0 is exactly above the center and there is space for
+                // two more items below the center line.
+                modifier = Modifier.requiredSize(itemSizeDp * 4f),
+                verticalArrangement = Arrangement.spacedBy(itemSizeDp),
+            ) {
+                items((0..5).toList()) { Box(Modifier.requiredSize(itemSizeDp)) }
+            }
+        }
+
+        rule.runOnIdle {
+            assertThat(state.layoutInfo.viewportSize.height).isEqualTo(itemSizePx * 4)
+            // Start offset compensates for the layout where the first item is exactly above the
+            // center line.
+            state.layoutInfo.assertVisibleItems(
+                count = 2,
+                spacing = itemSizePx,
+                startOffset = itemSizePx
+            )
+        }
+    }
+
+    @Test
+    fun visibleItemsAreObservableWhenResize() {
+        lateinit var state: LazyColumnState
+        var size by mutableStateOf(itemSizeDp * 2)
+        var currentInfo: LazyColumnLayoutInfo? = null
+        @Composable
+        fun observingFun() {
+            currentInfo = state.layoutInfo
+        }
+        rule.setContent {
+            LazyColumn(
+                state = rememberLazyColumnState().also { state = it },
+                modifier = Modifier.requiredSize(itemSizeDp * 4f)
+            ) {
+                item { Box(Modifier.requiredSize(size)) }
+            }
+            observingFun()
+        }
+
+        rule.runOnIdle {
+            assertThat(currentInfo).isNotNull()
+            currentInfo!!.assertVisibleItems(count = 1, expectedSize = itemSizePx * 2)
+            currentInfo = null
+            size = itemSizeDp
+        }
+
+        rule.runOnIdle {
+            assertThat(currentInfo).isNotNull()
+            currentInfo!!.assertVisibleItems(count = 1, expectedSize = itemSizePx)
+        }
+    }
+
+    @Test
+    fun totalCountIsCorrect() {
+        var count by mutableStateOf(10)
+        lateinit var state: LazyColumnState
+        rule.setContent {
+            LazyColumn(state = rememberLazyColumnState().also { state = it }) {
+                items((0 until count).toList()) { Box(Modifier.requiredSize(10.dp)) }
+            }
+        }
+
+        rule.runOnIdle {
+            assertThat(state.layoutInfo.totalItemsCount).isEqualTo(10)
+            count = 20
+        }
+
+        rule.runOnIdle { assertThat(state.layoutInfo.totalItemsCount).isEqualTo(20) }
+    }
+
+    @Test
+    fun viewportOffsetsAndSizeAreCorrect() {
+        val sizePx = 45
+        val sizeDp = with(rule.density) { sizePx.toDp() }
+        lateinit var state: LazyColumnState
+        rule.setContent {
+            LazyColumn(
+                Modifier.height(sizeDp).width(sizeDp * 2),
+                state = rememberLazyColumnState().also { state = it }
+            ) {
+                items((0..3).toList()) { Box(Modifier.requiredSize(sizeDp)) }
+            }
+        }
+
+        rule.runOnIdle {
+            assertThat(state.layoutInfo.viewportSize).isEqualTo(IntSize(sizePx * 2, sizePx))
+        }
+    }
+
+    private fun LazyColumnLayoutInfo.assertVisibleItems(
+        count: Int,
+        startIndex: Int = 0,
+        startOffset: Int = 0,
+        expectedSize: Int = itemSizePx,
+        spacing: Int = 0
+    ) {
+        assertThat(visibleItems.size).isEqualTo(count)
+        var currentIndex = startIndex
+        var currentOffset = startOffset
+        visibleItems.forEach {
+            assertThat(it.index).isEqualTo(currentIndex)
+            assertWithMessage("Offset of item $currentIndex")
+                .that(it.offset)
+                .isEqualTo(currentOffset)
+            assertThat(it.height).isEqualTo(expectedSize)
+            currentIndex++
+            currentOffset += it.height + spacing
+        }
+    }
+}
diff --git a/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfo.kt b/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfo.kt
index c5002a0..14fc4c4 100644
--- a/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfo.kt
+++ b/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnLayoutInfo.kt
@@ -16,6 +16,8 @@
 
 package androidx.wear.compose.foundation.lazy
 
+import androidx.compose.ui.unit.IntSize
+
 /**
  * Scroll progress of an item in a [LazyColumn] before any modifications to the item's height are
  * applied (using [LazyColumnItemScope.transformedHeight] modifier).
@@ -44,21 +46,26 @@
 sealed interface LazyColumnVisibleItemInfo {
     /** The index of the item in the underlying data source. */
     val index: Int
+
     /** The offset of the item from the start of the visible area. */
     val offset: Int
+
     /** The height of the item after applying any height changes. */
     val height: Int
+
     /** The scroll progress of the item, indicating its position within the visible area. */
     val scrollProgress: LazyColumnItemScrollProgress
 }
 
 /** Holds the layout information for a [LazyColumn]. */
 sealed interface LazyColumnLayoutInfo {
+
     /** A list of [LazyColumnVisibleItemInfo] objects representing the visible items in the list. */
     val visibleItems: List<LazyColumnVisibleItemInfo>
 
     /** The total count of items passed to [LazyColumn]. */
     val totalItemsCount: Int
 
-    // TODO: b/352686661 - Expose more properties related to layout.
+    /** The size of the viewport in pixels. */
+    val viewportSize: IntSize
 }
diff --git a/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnMeasureResult.kt b/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnMeasureResult.kt
index 3f316f2..1bd6f3e 100644
--- a/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnMeasureResult.kt
+++ b/wear/compose/compose-foundation/src/main/java/androidx/wear/compose/foundation/lazy/LazyColumnMeasureResult.kt
@@ -17,6 +17,7 @@
 package androidx.wear.compose.foundation.lazy
 
 import androidx.compose.ui.layout.MeasureResult
+import androidx.compose.ui.unit.IntSize
 
 /** The result of the measure pass of the [LazyColumn]. */
 internal class LazyColumnMeasureResult(
@@ -32,4 +33,8 @@
     override val visibleItems: List<LazyColumnVisibleItemInfo>,
     /** see [LazyColumnLayoutInfo.totalItemsCount] */
     override val totalItemsCount: Int,
-) : LazyColumnLayoutInfo, MeasureResult by measureResult
+) : LazyColumnLayoutInfo, MeasureResult by measureResult {
+    /** see [LazyColumnLayoutInfo.viewportSize] */
+    override val viewportSize: IntSize
+        get() = IntSize(width = width, height = height)
+}
diff --git a/wear/compose/compose-material3/src/androidTest/kotlin/androidx/wear/compose/material3/AlertDialogScreenshotTest.kt b/wear/compose/compose-material3/src/androidTest/kotlin/androidx/wear/compose/material3/AlertDialogScreenshotTest.kt
index e049442..7074d90 100644
--- a/wear/compose/compose-material3/src/androidTest/kotlin/androidx/wear/compose/material3/AlertDialogScreenshotTest.kt
+++ b/wear/compose/compose-material3/src/androidTest/kotlin/androidx/wear/compose/material3/AlertDialogScreenshotTest.kt
@@ -64,12 +64,12 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = false,
-            showText = false,
             showContent = false,
             showTwoButtons = false,
             scrollToBottom = false,
             screenSize = screenSize,
-            titleText = "Network error"
+            titleText = "Network error",
+            messageText = null
         )
 
     @Test
@@ -78,12 +78,12 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = false,
-            showText = false,
             showContent = false,
             showTwoButtons = true,
             scrollToBottom = false,
             screenSize = screenSize,
-            titleText = "Network error"
+            titleText = "Network error",
+            messageText = null
         )
 
     @Test
@@ -92,11 +92,11 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = false,
-            showText = false,
             showContent = false,
             showTwoButtons = false,
             scrollToBottom = false,
             screenSize = screenSize,
+            messageText = null,
         )
 
     @Test
@@ -105,11 +105,11 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = false,
-            showText = false,
             showContent = false,
             showTwoButtons = true,
             scrollToBottom = false,
-            screenSize = screenSize
+            screenSize = screenSize,
+            messageText = null
         )
 
     @Test
@@ -118,11 +118,11 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = true,
-            showText = false,
             showContent = false,
             showTwoButtons = false,
             scrollToBottom = false,
-            screenSize = screenSize
+            screenSize = screenSize,
+            messageText = null
         )
     }
 
@@ -132,11 +132,11 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = true,
-            showText = false,
             showContent = false,
             showTwoButtons = true,
             scrollToBottom = false,
-            screenSize = screenSize
+            screenSize = screenSize,
+            messageText = null
         )
     }
 
@@ -146,7 +146,6 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = true,
-            showText = true,
             showContent = false,
             showTwoButtons = false,
             scrollToBottom = false,
@@ -162,7 +161,6 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = true,
-            showText = true,
             showContent = true,
             showTwoButtons = false,
             scrollToBottom = false,
@@ -178,7 +176,6 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = true,
-            showText = true,
             showContent = true,
             showTwoButtons = false,
             scrollToBottom = true,
@@ -194,7 +191,6 @@
             testName = testName,
             screenshotRule = screenshotRule,
             showIcon = true,
-            showText = true,
             showContent = true,
             showTwoButtons = true,
             scrollToBottom = true,
@@ -202,15 +198,43 @@
         )
     }
 
+    @Test
+    fun alert_title_longMessageText_bottomButton(@TestParameter screenSize: ScreenSize) {
+        rule.verifyAlertDialogScreenshot(
+            testName = testName,
+            screenshotRule = screenshotRule,
+            showIcon = false,
+            showContent = false,
+            showTwoButtons = false,
+            scrollToBottom = false,
+            screenSize = screenSize,
+            messageText = longMessageText
+        )
+    }
+
+    @Test
+    fun alert_title_longMessageText_confirmDismissButtons(@TestParameter screenSize: ScreenSize) {
+        rule.verifyAlertDialogScreenshot(
+            testName = testName,
+            screenshotRule = screenshotRule,
+            showIcon = false,
+            showContent = false,
+            showTwoButtons = true,
+            scrollToBottom = false,
+            screenSize = screenSize,
+            messageText = longMessageText
+        )
+    }
+
     private fun ComposeContentTestRule.verifyAlertDialogScreenshot(
         testName: TestName,
         screenshotRule: AndroidXScreenshotTestRule,
         showIcon: Boolean,
-        showText: Boolean,
         showContent: Boolean,
         showTwoButtons: Boolean,
         scrollToBottom: Boolean,
         screenSize: ScreenSize,
+        messageText: String? = "Your battery is low. Turn on battery saver.",
         titleText: String = "Mobile network is not currently available"
     ) {
         setContentWithTheme() {
@@ -239,8 +263,8 @@
                         } else null,
                     showTwoButtons = showTwoButtons,
                     text =
-                        if (showText) {
-                            { Text("Your battery is low. Turn on battery saver.") }
+                        if (messageText != null) {
+                            { Text(messageText) }
                         } else null,
                     content =
                         if (showContent) {
@@ -308,3 +332,6 @@
         )
     }
 }
+
+internal const val longMessageText =
+    "Allow Map to access your location even when you're not using the app? Your location is used to automatically map places to activities."
diff --git a/wear/compose/compose-material3/src/main/java/androidx/wear/compose/material3/AlertDialog.kt b/wear/compose/compose-material3/src/main/java/androidx/wear/compose/material3/AlertDialog.kt
index f0cc000..552cc94 100644
--- a/wear/compose/compose-material3/src/main/java/androidx/wear/compose/material3/AlertDialog.kt
+++ b/wear/compose/compose-material3/src/main/java/androidx/wear/compose/material3/AlertDialog.kt
@@ -338,7 +338,7 @@
     alertButtonsParams: AlertButtonsParams,
     content: (ScalingLazyListScope.() -> Unit)?
 ) {
-    val state = rememberScalingLazyListState()
+    val state = rememberScalingLazyListState(initialCenterItemIndex = 0)
 
     Dialog(
         showDialog = show,
diff --git a/window/window-core/api/current.txt b/window/window-core/api/current.txt
index 2c8a4d2..8b80f00 100644
--- a/window/window-core/api/current.txt
+++ b/window/window-core/api/current.txt
@@ -32,9 +32,9 @@
     method public int getMinWidthDp();
     method @Deprecated public androidx.window.core.layout.WindowHeightSizeClass getWindowHeightSizeClass();
     method @Deprecated public androidx.window.core.layout.WindowWidthSizeClass getWindowWidthSizeClass();
-    method public boolean isAtLeast(int widthDp, int heightDp);
-    method public boolean isHeightAtLeast(int heightDp);
-    method public boolean isWidthAtLeast(int widthDp);
+    method public boolean isAtLeastBreakpoint(int widthBreakpointDp, int heightBreakpointDp);
+    method public boolean isHeightAtLeastBreakpoint(int heightBreakpointDp);
+    method public boolean isWidthAtLeastBreakpoint(int widthBreakpointDp);
     property public final int minHeightDp;
     property public final int minWidthDp;
     property @Deprecated public final androidx.window.core.layout.WindowHeightSizeClass windowHeightSizeClass;
diff --git a/window/window-core/api/restricted_current.txt b/window/window-core/api/restricted_current.txt
index 2c8a4d2..8b80f00 100644
--- a/window/window-core/api/restricted_current.txt
+++ b/window/window-core/api/restricted_current.txt
@@ -32,9 +32,9 @@
     method public int getMinWidthDp();
     method @Deprecated public androidx.window.core.layout.WindowHeightSizeClass getWindowHeightSizeClass();
     method @Deprecated public androidx.window.core.layout.WindowWidthSizeClass getWindowWidthSizeClass();
-    method public boolean isAtLeast(int widthDp, int heightDp);
-    method public boolean isHeightAtLeast(int heightDp);
-    method public boolean isWidthAtLeast(int widthDp);
+    method public boolean isAtLeastBreakpoint(int widthBreakpointDp, int heightBreakpointDp);
+    method public boolean isHeightAtLeastBreakpoint(int heightBreakpointDp);
+    method public boolean isWidthAtLeastBreakpoint(int widthBreakpointDp);
     property public final int minHeightDp;
     property public final int minWidthDp;
     property @Deprecated public final androidx.window.core.layout.WindowHeightSizeClass windowHeightSizeClass;
diff --git a/window/window-core/src/commonMain/kotlin/androidx/window/core/layout/WindowSizeClass.kt b/window/window-core/src/commonMain/kotlin/androidx/window/core/layout/WindowSizeClass.kt
index 43bef20..847ea73 100644
--- a/window/window-core/src/commonMain/kotlin/androidx/window/core/layout/WindowSizeClass.kt
+++ b/window/window-core/src/commonMain/kotlin/androidx/window/core/layout/WindowSizeClass.kt
@@ -80,25 +80,28 @@
         get() = WindowHeightSizeClass.compute(minHeightDp.toFloat())
 
     /**
-     * Returns `true` when [widthDp] is greater than or equal to [minWidthDp], `false` otherwise.
+     * Returns `true` when [minWidthDp] is greater than or equal to [widthBreakpointDp], `false`
+     * otherwise.
      */
-    fun isWidthAtLeast(widthDp: Int): Boolean {
-        return widthDp >= minWidthDp
+    fun isWidthAtLeastBreakpoint(widthBreakpointDp: Int): Boolean {
+        return minWidthDp >= widthBreakpointDp
     }
 
     /**
-     * Returns `true` when [heightDp] is greater than or equal to [minHeightDp], `false` otherwise.
+     * Returns `true` when [minHeightDp] is greater than or equal to [heightBreakpointDp], `false`
+     * otherwise.
      */
-    fun isHeightAtLeast(heightDp: Int): Boolean {
-        return heightDp >= minHeightDp
+    fun isHeightAtLeastBreakpoint(heightBreakpointDp: Int): Boolean {
+        return minHeightDp >= heightBreakpointDp
     }
 
     /**
-     * Returns `true` when [widthDp] is greater than or equal to [minWidthDp] and [heightDp] is
-     * greater than or equal to [minHeightDp], `false` otherwise.
+     * Returns `true` when [widthBreakpointDp] is greater than or equal to [minWidthDp] and
+     * [heightBreakpointDp] is greater than or equal to [minHeightDp], `false` otherwise.
      */
-    fun isAtLeast(widthDp: Int, heightDp: Int): Boolean {
-        return isWidthAtLeast(widthDp) && isHeightAtLeast(heightDp)
+    fun isAtLeastBreakpoint(widthBreakpointDp: Int, heightBreakpointDp: Int): Boolean {
+        return isWidthAtLeastBreakpoint(widthBreakpointDp) &&
+            isHeightAtLeastBreakpoint(heightBreakpointDp)
     }
 
     override fun equals(other: Any?): Boolean {
diff --git a/window/window-core/src/commonTest/kotlin/androidx/window/core/layout/WindowSizeClassTest.kt b/window/window-core/src/commonTest/kotlin/androidx/window/core/layout/WindowSizeClassTest.kt
index 816e40c..e9a0aba 100644
--- a/window/window-core/src/commonTest/kotlin/androidx/window/core/layout/WindowSizeClassTest.kt
+++ b/window/window-core/src/commonTest/kotlin/androidx/window/core/layout/WindowSizeClassTest.kt
@@ -16,6 +16,10 @@
 
 package androidx.window.core.layout
 
+import androidx.window.core.layout.WindowSizeClass.Companion.HEIGHT_DP_EXPANDED_LOWER_BOUND
+import androidx.window.core.layout.WindowSizeClass.Companion.HEIGHT_DP_MEDIUM_LOWER_BOUND
+import androidx.window.core.layout.WindowSizeClass.Companion.WIDTH_DP_EXPANDED_LOWER_BOUND
+import androidx.window.core.layout.WindowSizeClass.Companion.WIDTH_DP_MEDIUM_LOWER_BOUND
 import kotlin.test.Test
 import kotlin.test.assertEquals
 import kotlin.test.assertFailsWith
@@ -115,85 +119,252 @@
     }
 
     @Test
-    fun is_width_at_least_returns_true_when_input_is_greater() {
+    fun is_width_at_least_breakpoint_returns_false_when_breakpoint_is_greater() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertTrue(sizeClass.isWidthAtLeast(width + 1))
+        assertFalse(sizeClass.isWidthAtLeastBreakpoint(width + 1))
     }
 
     @Test
-    fun is_width_at_least_returns_true_when_input_is_equal() {
+    fun is_width_at_least_breakpoint_returns_true_when_breakpoint_is_equal() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertTrue(sizeClass.isWidthAtLeast(width))
+        assertTrue(sizeClass.isWidthAtLeastBreakpoint(width))
     }
 
     @Test
-    fun is_width_at_least_returns_false_when_input_is_smaller() {
+    fun is_width_at_least_breakpoint_returns_true_when_breakpoint_is_smaller() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertFalse(sizeClass.isWidthAtLeast(width - 1))
+        assertTrue(sizeClass.isWidthAtLeastBreakpoint(width - 1))
+    }
+
+    /**
+     * Tests that the width breakpoint logic works as expected. The following sample shows what the
+     * dev use site should be
+     *
+     * WIDTH_DP_MEDIUM_LOWER_BOUND = 600 WIDTH_DP_EXPANDED_LOWER_BOUND = 840
+     *
+     * fun process(sizeClass: WindowSizeClass) { when {
+     * sizeClass.isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND) -> doExpanded()
+     * sizeClass.isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) -> doMedium() else -> doCompact() } }
+     *
+     * val belowMediumBreakpoint = WindowSizeClass(minWidthDp = 300, minHeightDp = 0) val
+     * equalMediumBreakpoint = WindowSizeClass(minWidthDp = 600, minHeightDp = 0) val
+     * expandedBreakpoint = WindowSizeClass(minWidthDp = 840, minHeightDp = 0)
+     *
+     * process(belowBreakpoint) -> doSomethingCompact() process(equalMediumBreakpoint) ->
+     * doSomethingMedium() process(expandedBreakpoint) -> doSomethingExpanded()
+     *
+     * So the following must be true
+     *
+     * expandedBreakpoint WindowSizeClass(840, 0).isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND) ==
+     * true WindowSizeClass(840, 0).isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) == true
+     *
+     * equalMediumBreakpoint WindowSizeClass(600, 0).isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND)
+     * == false WindowSizeClass(600, 0).isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) == true
+     *
+     * belowBreakpoint WindowSizeClass(0, 0).isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND) == false
+     * WindowSizeClass(0, 0).isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) == false
+     */
+    @Test
+    fun is_width_at_least_bounds_checks() {
+        // expandedBreakpoint
+        assertTrue(
+            WindowSizeClass(WIDTH_DP_EXPANDED_LOWER_BOUND, 0)
+                .isWidthAtLeastBreakpoint(WIDTH_DP_EXPANDED_LOWER_BOUND)
+        )
+        assertTrue(
+            WindowSizeClass(WIDTH_DP_EXPANDED_LOWER_BOUND, 0)
+                .isWidthAtLeastBreakpoint(WIDTH_DP_MEDIUM_LOWER_BOUND)
+        )
+
+        // equalMediumBreakpoint
+        assertFalse(
+            WindowSizeClass(WIDTH_DP_MEDIUM_LOWER_BOUND, 0)
+                .isWidthAtLeastBreakpoint(WIDTH_DP_EXPANDED_LOWER_BOUND)
+        )
+        assertTrue(
+            WindowSizeClass(WIDTH_DP_MEDIUM_LOWER_BOUND, 0)
+                .isWidthAtLeastBreakpoint(WIDTH_DP_MEDIUM_LOWER_BOUND)
+        )
+
+        // belowBreakpoint
+        assertFalse(WindowSizeClass(0, 0).isWidthAtLeastBreakpoint(WIDTH_DP_EXPANDED_LOWER_BOUND))
+        assertFalse(WindowSizeClass(0, 0).isWidthAtLeastBreakpoint(WIDTH_DP_MEDIUM_LOWER_BOUND))
+    }
+
+    /**
+     * Tests that the width breakpoint logic works as expected. The following sample shows what the
+     * dev use site should be
+     *
+     * HEIGHT_DP_MEDIUM_LOWER_BOUND = 480 HEIGHT_DP_EXPANDED_LOWER_BOUND = 900
+     *
+     * fun process(sizeClass: WindowSizeClass) { when {
+     * sizeClass.isHeightAtLeast(HEIGHT_DP_EXPANDED_LOWER_BOUND) -> doExpanded()
+     * sizeClass.isHeightAtLeast(HEIGHT_DP_MEDIUM_LOWER_BOUND) -> doMedium() else -> doCompact() } }
+     *
+     * val belowMediumBreakpoint = WindowSizeClass(minWidthDp = 0, minHeightDp = 0) val
+     * equalMediumBreakpoint = WindowSizeClass(minWidthDp = 0, minHeightDp = 480) val
+     * expandedBreakpoint = WindowSizeClass(minWidthDp = 0, minHeightDp = 900)
+     *
+     * process(belowBreakpoint) -> doSomethingCompact() process(equalMediumBreakpoint) ->
+     * doSomethingMedium() process(expandedBreakpoint) -> doSomethingExpanded()
+     *
+     * So the following must be true
+     *
+     * expandedBreakpoint WindowSizeClass(0, 900).isWidthAtLeast(HEIGHT_DP_EXPANDED_LOWER_BOUND) ==
+     * true WindowSizeClass(0, 900).isWidthAtLeast(HEIGHT_DP_MEDIUM_LOWER_BOUND) == true
+     *
+     * equalMediumBreakpoint WindowSizeClass(0, 480).isWidthAtLeast(HEIGHT_DP_EXPANDED_LOWER_BOUND)
+     * == false WindowSizeClass(0, 480).isWidthAtLeast(HEIGHT_DP_MEDIUM_LOWER_BOUND) == true
+     *
+     * belowBreakpoint WindowSizeClass(0, 0).isWidthAtLeast(HEIGHT_DP_EXPANDED_LOWER_BOUND) == false
+     * WindowSizeClass(0, 0).isWidthAtLeast(HEIGHT_DP_MEDIUM_LOWER_BOUND) == false
+     */
+    @Test
+    fun is_height_at_least_bounds_checks() {
+        // expandedBreakpoint
+        assertTrue(
+            WindowSizeClass(0, HEIGHT_DP_EXPANDED_LOWER_BOUND)
+                .isHeightAtLeastBreakpoint(HEIGHT_DP_EXPANDED_LOWER_BOUND)
+        )
+        assertTrue(
+            WindowSizeClass(0, HEIGHT_DP_EXPANDED_LOWER_BOUND)
+                .isHeightAtLeastBreakpoint(HEIGHT_DP_MEDIUM_LOWER_BOUND)
+        )
+
+        // equalMediumBreakpoint
+        assertFalse(
+            WindowSizeClass(0, HEIGHT_DP_MEDIUM_LOWER_BOUND)
+                .isHeightAtLeastBreakpoint(HEIGHT_DP_EXPANDED_LOWER_BOUND)
+        )
+        assertTrue(
+            WindowSizeClass(0, HEIGHT_DP_MEDIUM_LOWER_BOUND)
+                .isHeightAtLeastBreakpoint(HEIGHT_DP_MEDIUM_LOWER_BOUND)
+        )
+
+        // belowBreakpoint
+        assertFalse(WindowSizeClass(0, 0).isHeightAtLeastBreakpoint(HEIGHT_DP_EXPANDED_LOWER_BOUND))
+        assertFalse(WindowSizeClass(0, 0).isHeightAtLeastBreakpoint(HEIGHT_DP_MEDIUM_LOWER_BOUND))
+    }
+
+    /**
+     * Tests that the width breakpoint logic works as expected. The following sample shows what the
+     * dev use site should be
+     *
+     * DIAGONAL_BOUND_MEDIUM = 600, 600 DIAGONAL_BOUND_EXPANDED = 900, 900
+     *
+     * fun process(sizeClass: WindowSizeClass) { when { sizeClass.isAtLeast(DIAGONAL_BOUND_EXPANDED,
+     * DIAGONAL_BOUND_EXPANDED) -> doExpanded() sizeClass.isAtLeast(DIAGONAL_BOUND_MEDIUM,
+     * DIAGONAL_BOUND_MEDIUM) -> doMedium() else -> doCompact() } }
+     *
+     * val belowMediumBreakpoint = WindowSizeClass(minWidthDp = 0, minHeightDp = 0) val
+     * equalMediumBreakpoint = WindowSizeClass(minWidthDp = 600, minHeightDp = 600) val
+     * expandedBreakpoint = WindowSizeClass(minWidthDp = 900, minHeightDp = 900)
+     *
+     * process(belowBreakpoint) -> doSomethingCompact() process(equalMediumBreakpoint) ->
+     * doSomethingMedium() process(expandedBreakpoint) -> doSomethingExpanded()
+     *
+     * So the following must be true
+     *
+     * expandedBreakpoint WindowSizeClass(900, 900).isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND) ==
+     * true WindowSizeClass(900, 900).isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) == true
+     *
+     * equalMediumBreakpoint WindowSizeClass(600, 600).isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND)
+     * == false WindowSizeClass(600, 600).isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) == true
+     *
+     * belowBreakpoint WindowSizeClass(0, 0).isWidthAtLeast(WIDTH_DP_EXPANDED_LOWER_BOUND) == false
+     * WindowSizeClass(0, 0).isWidthAtLeast(WIDTH_DP_MEDIUM_LOWER_BOUND) == false
+     */
+    @Test
+    fun is_area_at_least_bounds_checks() {
+        val diagonalMedium = 600
+        val diagonalExpanded = 900
+        // expandedBreakpoint
+        assertTrue(
+            WindowSizeClass(diagonalExpanded, diagonalExpanded)
+                .isAtLeastBreakpoint(diagonalExpanded, diagonalExpanded)
+        )
+        assertTrue(
+            WindowSizeClass(diagonalExpanded, diagonalExpanded)
+                .isAtLeastBreakpoint(diagonalMedium, diagonalMedium)
+        )
+
+        // equalMediumBreakpoint
+        assertFalse(
+            WindowSizeClass(diagonalMedium, diagonalMedium)
+                .isAtLeastBreakpoint(diagonalExpanded, diagonalExpanded)
+        )
+        assertTrue(
+            WindowSizeClass(diagonalMedium, diagonalMedium)
+                .isAtLeastBreakpoint(diagonalMedium, diagonalMedium)
+        )
+
+        // belowBreakpoint
+        assertFalse(WindowSizeClass(0, 0).isAtLeastBreakpoint(diagonalExpanded, diagonalExpanded))
+        assertFalse(WindowSizeClass(0, 0).isAtLeastBreakpoint(diagonalMedium, diagonalMedium))
     }
 
     @Test
-    fun is_height_at_least_returns_true_when_input_is_greater() {
+    fun is_height_at_least_breakpoint_returns_false_when_breakpoint_is_greater() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertTrue(sizeClass.isHeightAtLeast(height + 1))
+        assertFalse(sizeClass.isHeightAtLeastBreakpoint(height + 1))
     }
 
     @Test
-    fun is_height_at_least_returns_true_when_input_is_equal() {
+    fun is_height_at_least_breakpoint_returns_true_when_breakpoint_is_equal() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertTrue(sizeClass.isHeightAtLeast(height))
+        assertTrue(sizeClass.isHeightAtLeastBreakpoint(height))
     }
 
     @Test
-    fun is_height_at_least_returns_false_when_input_is_smaller() {
+    fun is_height_at_least_breakpoint_returns_true_when_breakpoint_is_smaller() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertFalse(sizeClass.isHeightAtLeast(height - 1))
+        assertTrue(sizeClass.isHeightAtLeastBreakpoint(height - 1))
     }
 
     @Test
-    fun is_at_least_returns_true_when_input_is_greater() {
+    fun is_at_least_breakpoint_returns_false_when_breakpoint_is_greater() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertTrue(sizeClass.isAtLeast(width, height + 1))
-        assertTrue(sizeClass.isAtLeast(width + 1, height))
+        assertFalse(sizeClass.isAtLeastBreakpoint(width, height + 1))
+        assertFalse(sizeClass.isAtLeastBreakpoint(width + 1, height))
     }
 
     @Test
-    fun is_at_least_returns_true_when_input_is_equal() {
+    fun is_at_least_breakpoint_returns_true_when_breakpoint_is_equal() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertTrue(sizeClass.isAtLeast(width, height))
+        assertTrue(sizeClass.isAtLeastBreakpoint(width, height))
     }
 
     @Test
-    fun is_at_least_returns_false_when_input_is_smaller() {
+    fun is_at_least_breakpoint_returns_true_when_breakpoint_is_smaller() {
         val width = 200
         val height = 100
         val sizeClass = WindowSizeClass(width, height)
 
-        assertFalse(sizeClass.isAtLeast(width, height - 1))
-        assertFalse(sizeClass.isAtLeast(width - 1, height))
+        assertTrue(sizeClass.isAtLeastBreakpoint(width, height - 1))
+        assertTrue(sizeClass.isAtLeastBreakpoint(width - 1, height))
     }
 }