blob: 25e87d661b411878d35b72086a7e4a962476f2a9 [file] [log] [blame]
/*
* Copyright 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:OptIn(InternalComposeApi::class)
package androidx.compose.runtime.snapshots
import androidx.compose.runtime.AtomicReference
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisallowComposableCalls
import androidx.compose.runtime.ExperimentalComposeApi
import androidx.compose.runtime.InternalComposeApi
import androidx.compose.runtime.SnapshotThreadLocal
import androidx.compose.runtime.synchronized
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import androidx.compose.runtime.internal.JvmDefaultWithCompatibility
/**
* A snapshot of the values return by mutable states and other state objects. All state object
* will have the same value in the snapshot as they had when the snapshot was created unless they
* are explicitly changed in the snapshot.
*
* To enter a snapshot call [enter]. The snapshot is the current snapshot as returned by
* [currentSnapshot] until the control returns from the lambda (or until a nested [enter] is
* called). All state objects will return the values associated with this snapshot, locally in the
* thread, until [enter] returns. All other threads are unaffected.
*
* Snapshots can be nested by calling [takeNestedSnapshot].
*
* @see takeSnapshot
* @see takeMutableSnapshot
* @see androidx.compose.runtime.mutableStateOf
* @see androidx.compose.runtime.mutableStateListOf
* @see androidx.compose.runtime.mutableStateMapOf
*/
sealed class Snapshot(
id: Int,
/**
* A set of all the snapshots that should be treated as invalid.
*/
internal open var invalid: SnapshotIdSet
) {
/**
* The snapshot id of the snapshot. This is a unique number from a monotonically increasing
* value for each snapshot taken.
*/
open var id: Int = id
internal set
/**
* The root snapshot for this snapshot. For non-nested snapshots this is always `this`. For
* nested snapshot it is the parent's [root].
*/
abstract val root: Snapshot
/**
* True if any change to a state object in this snapshot will throw.
*/
abstract val readOnly: Boolean
/**
* Dispose the snapshot. Neglecting to dispose a snapshot will result in difficult to
* diagnose memory leaks as it indirectly causes all state objects to maintain its value for
* the un-disposed snapshot.
*/
open fun dispose() {
disposed = true
sync {
releasePinnedSnapshotLocked()
}
}
/**
* Take a snapshot of the state values in this snapshot. The resulting [Snapshot] is read-only.
* All nested snapshots need to be disposed by calling [dispose] before resources associated
* with this snapshot can be collected. Nested snapshots are still valid after the parent has
* been disposed.
*/
abstract fun takeNestedSnapshot(readObserver: ((Any) -> Unit)? = null): Snapshot
/**
* Whether there are any pending changes in this snapshot. These changes are not visible
* until the snapshot is applied.
*/
abstract fun hasPendingChanges(): Boolean
/**
* Enter the snapshot. In [block] all state objects have the value associated with this
* snapshot. The value of [currentSnapshot] will be this snapshot until this [block] returns
* or a nested call to [enter] is called. When [block] returns, the previous current snapshot
* is restored if there was one.
*
* All changes to state objects inside [block] are isolated to this snapshot and are not
* visible to other snapshot or as global state. If this is a [readOnly] snapshot, any
* changes to state objects will throw an [IllegalStateException].
*
* For a [MutableSnapshot], changes made to a snapshot inside [block] can be applied
* atomically to the global state (or to its parent snapshot if it is a nested snapshot) by
* calling [MutableSnapshot.apply].
*
* @see androidx.compose.runtime.mutableStateOf
* @see androidx.compose.runtime.mutableStateListOf
* @see androidx.compose.runtime.mutableStateMapOf
*/
inline fun <T> enter(block: () -> T): T {
val previous = makeCurrent()
try {
return block()
} finally {
restoreCurrent(previous)
}
}
@PublishedApi
internal open fun makeCurrent(): Snapshot? {
val previous = threadSnapshot.get()
threadSnapshot.set(this)
return previous
}
@PublishedApi
internal open fun restoreCurrent(snapshot: Snapshot?) {
threadSnapshot.set(snapshot)
}
/**
* Enter the snapshot, returning the previous [Snapshot] for leaving this snapshot later
* using [unsafeLeave]. Prefer [enter] or [asContextElement] instead of using [unsafeEnter]
* directly to prevent mismatched [unsafeEnter]/[unsafeLeave] calls.
*
* After returning all state objects have the value associated with this snapshot.
* The value of [currentSnapshot] will be this snapshot until [unsafeLeave] is called
* with the returned [Snapshot] or another call to [unsafeEnter] or [enter]
* is made.
*
* All changes to state objects until another snapshot is entered or this snapshot is left
* are isolated to this snapshot and are not visible to other snapshot or as global state.
* If this is a [readOnly] snapshot, any changes to state objects will throw an
* [IllegalStateException].
*
* For a [MutableSnapshot], changes made to a snapshot can be applied
* atomically to the global state (or to its parent snapshot if it is a nested snapshot) by
* calling [MutableSnapshot.apply].
*/
@ExperimentalComposeApi
fun unsafeEnter(): Snapshot? = makeCurrent()
/**
* Leave the snapshot, restoring the [oldSnapshot] before returning.
* See [unsafeEnter].
*/
@ExperimentalComposeApi
fun unsafeLeave(oldSnapshot: Snapshot?) {
check(threadSnapshot.get() === this) {
"Cannot leave snapshot; $this is not the current snapshot"
}
restoreCurrent(oldSnapshot)
}
internal var disposed = false
/*
* Handle to use when unpinning this snapshot. -1 if this snapshot has been unpinned.
*/
private var pinningTrackingHandle =
if (id != INVALID_SNAPSHOT) trackPinning(id, invalid) else -1
internal inline val isPinned get() = pinningTrackingHandle >= 0
/*
* The read observer for the snapshot if there is one.
*/
internal abstract val readObserver: ((Any) -> Unit)?
/**
* The write observer for the snapshot if there is one.
*/
internal abstract val writeObserver: ((Any) -> Unit)?
/**
* Called when a nested snapshot of this snapshot is activated
*/
internal abstract fun nestedActivated(snapshot: Snapshot)
/**
* Called when a nested snapshot of this snapshot is deactivated
*/
internal abstract fun nestedDeactivated(snapshot: Snapshot)
/**
* Record that state was modified in the snapshot.
*/
internal abstract fun recordModified(state: StateObject)
/**
* The set of state objects that have been modified in this snapshot.
*/
internal abstract val modified: MutableSet<StateObject>?
/**
* Notify the snapshot that all objects created in this snapshot to this point should be
* considered initialized. If any state object is modified after this point it will
* appear as modified in the snapshot. Any applicable snapshot write observer will be
* called for the object and the object will be part of the a set of mutated objects sent to
* any applicable snapshot apply observer.
*
* Unless [notifyObjectsInitialized] is called, state objects created in a snapshot are not
* considered modified by the snapshot even if they are modified after construction.
*/
internal abstract fun notifyObjectsInitialized()
/**
* Closes the snapshot by removing the snapshot id (an any previous id's) from the list of
* open snapshots and unpinning snapshots that no longer are referenced by this snapshot.
*/
internal fun closeAndReleasePinning() {
sync {
closeLocked()
releasePinnedSnapshotsForCloseLocked()
}
}
/**
* Closes the snapshot by removing the snapshot id (and any previous ids) from the list of
* open snapshots. Does not release pinned snapshots. See [releasePinnedSnapshotsForCloseLocked]
* for the second half of [closeAndReleasePinning].
*
* Call while holding a `sync {}` lock.
*/
internal open fun closeLocked() {
openSnapshots = openSnapshots.clear(id)
}
/**
* Releases all pinned snapshots required to perform a clean [closeAndReleasePinning].
*
* Call while holding a `sync {}` lock.
*
* See [closeAndReleasePinning], [closeLocked].
*/
internal open fun releasePinnedSnapshotsForCloseLocked() {
releasePinnedSnapshotLocked()
}
internal fun validateNotDisposed() {
require(!disposed) { "Cannot use a disposed snapshot" }
}
internal fun releasePinnedSnapshotLocked() {
if (pinningTrackingHandle >= 0) {
releasePinningLocked(pinningTrackingHandle)
pinningTrackingHandle = -1
}
}
internal fun takeoverPinnedSnapshot(): Int =
pinningTrackingHandle.also { pinningTrackingHandle = -1 }
companion object {
/**
* Return the thread's active snapshot. If no thread snapshot is active then the current
* global snapshot is used.
*/
val current get() = currentSnapshot()
/**
* Take a snapshot of the current value of all state objects. The values are preserved until
* [Snapshot.dispose] is called on the result.
*
* The [readObserver] parameter can be used to track when all state objects are read when in
* [Snapshot.enter]. A snapshot apply observer can be registered using
* [Snapshot.registerApplyObserver] to observe modification of state objects.
*
* An active snapshot (after it is created but before [Snapshot.dispose] is called) requires
* resources to track the values in the snapshot. Once a snapshot is no longer needed it
* should disposed by calling [Snapshot.dispose].
*
* Leaving a snapshot active could cause hard to diagnose memory leaks values as are
* maintained by state objects for these unneeded snapshots. Take care to always call
* [Snapshot.dispose] on all snapshots when they are no longer needed.
*
* Composition uses both of these to implicitly subscribe to changes to state object and
* automatically update the composition when state objects read during composition change.
*
* A nested snapshot can be taken of a snapshot which is an independent read-only copy of
* the snapshot and can be disposed independently. This is used by [takeSnapshot] when in
* a read-only snapshot for API consistency allowing the result of [takeSnapshot] to be
* disposed leaving the parent snapshot active.
*
* @param readObserver called when any state object is read in the lambda passed to
* [Snapshot.enter] or in the [Snapshot.enter] of any nested snapshot.
*
* @see Snapshot
* @see Snapshot.registerApplyObserver
*/
fun takeSnapshot(
readObserver: ((Any) -> Unit)? = null
): Snapshot = currentSnapshot().takeNestedSnapshot(readObserver)
/**
* Take a snapshot of the current value of all state objects that also allows the state
* to be changed and later atomically applied when [MutableSnapshot.apply] is called. The
* values are preserved until [Snapshot.dispose] is called on the result. The global
* state will either see all the changes made as one atomic change, when [MutableSnapshot
* .apply] is called, or none of the changes if the mutable state object is disposed
* before being applied.
*
* The values in a snapshot can be modified by calling [Snapshot.enter] and then, in its
* lambda, modify any state object. The new values of the state objects will only become
* visible to the global state when [MutableSnapshot.apply] is called.
*
* An active snapshot (after it is created but before [Snapshot.dispose] is called) requires
* resources to track the values in the snapshot. Once a snapshot is no longer needed it
* should disposed by calling [Snapshot.dispose].
*
* Leaving a snapshot active could cause hard to diagnose memory leaks as values are
* maintained by state objects for these unneeded snapshots. Take care to always call
* [Snapshot.dispose] on all snapshots when they are no longer needed.
*
* A nested snapshot can be taken by calling [Snapshot.takeNestedSnapshot], for a read-only
* snapshot, or [MutableSnapshot.takeNestedMutableSnapshot] for a snapshot that can be
* changed. Nested mutable snapshots are applied to the this, the parent snapshot, when
* their [MutableSnapshot.apply] is called. Their applied changes will be visible to in
* this snapshot but will not be visible other snapshots (including other nested
* snapshots) or the global state until this snapshot is applied by calling
* [MutableSnapshot.apply].
*
* Once [MutableSnapshot.apply] is called on this, the parent snapshot, all calls to
* [MutableSnapshot.apply] on an active nested snapshot will fail.
*
* Changes to a mutable snapshot are isolated, using snapshot isolation, from all other
* snapshots. Their changes are only visible as global state or to new snapshots once
* [MutableSnapshot.apply] is called.
*
* Applying a snapshot can fail if currently visible changes to the state object
* conflicts with a change made in the snapshot.
*
* When in a mutable snapshot, [takeMutableSnapshot] creates a nested snapshot of the
* current mutable snapshot. If the current snapshot is read-only, an exception is thrown.
* The current snapshot is the result of calling [currentSnapshot] which is updated by
* calling [Snapshot.enter] which makes the [Snapshot] the current snapshot while in its
* lambda.
*
* Composition uses mutable snapshots to allow changes made in a [Composable] functions
* to be temporarily isolated from the global state and is later applied to the global
* state when the composition is applied. If [MutableSnapshot.apply] fails applying this
* snapshot, the snapshot and the changes calculated during composition are disposed and
* a new composition is scheduled to be calculated again.
*
* @param readObserver called when any state object is read in the lambda passed to
* [Snapshot.enter] or in the [Snapshot.enter] of any nested snapshots.
*
* Composition, layout and draw use [readObserver] to implicitly subscribe to changes to
* state objects to know when to update.
*
* @param writeObserver called when a state object is created or just before it is
* written to the first time in the snapshot or a nested mutable snapshot. This might be
* called several times for the same object if nested mutable snapshots are created.
*
* Composition uses [writeObserver] to track when a state object is modified during
* composition in order to invalidate the reads that have not yet occurred. This allows a
* single pass of composition for state objects that are written to before they are read
* (such as modifying the value of a dynamic ambient provider).
*
* @see Snapshot.takeSnapshot
* @see Snapshot
* @see MutableSnapshot
*/
fun takeMutableSnapshot(
readObserver: ((Any) -> Unit)? = null,
writeObserver: ((Any) -> Unit)? = null
): MutableSnapshot =
(currentSnapshot() as? MutableSnapshot)?.takeNestedMutableSnapshot(
readObserver,
writeObserver
) ?: error("Cannot create a mutable snapshot of an read-only snapshot")
/**
* Escape the current snapshot, if there is one. All state objects will have the value
* associated with the global while the [block] lambda is executing.
*
* @return the result of [block]
*/
inline fun <T> global(block: () -> T): T {
val previous = removeCurrent()
return block().also { restoreCurrent(previous) }
}
/**
* Take a [MutableSnapshot] and run [block] within it. When [block] returns successfully,
* attempt to [MutableSnapshot.apply] the snapshot. Returns the result of [block] or throws
* [SnapshotApplyConflictException] if snapshot changes attempted by [block] could not be
* applied.
*
* Prior to returning, any changes made to snapshot state (e.g. state holders returned by
* [androidx.compose.runtime.mutableStateOf] are not visible to other threads. When
* [withMutableSnapshot] returns successfully those changes will be made visible to other
* threads and any snapshot observers (e.g. [androidx.compose.runtime.snapshotFlow]) will
* be notified of changes.
*
* [block] must not suspend if [withMutableSnapshot] is called from a suspend function.
*/
// TODO: determine a good way to prevent/discourage suspending in an inlined [block]
inline fun <R> withMutableSnapshot(
block: () -> R
): R = takeMutableSnapshot().run {
try {
enter(block).also { apply().check() }
} finally {
dispose()
}
}
/**
* Observe reads and or write of state objects in the current thread.
*
* This only affects the current snapshot (if any) and any new snapshots create from
* [Snapshot.takeSnapshot] and [takeMutableSnapshot]. It will not affect any snapshots
* previous created even if [Snapshot.enter] is called in [block].
*
* @param readObserver called when any state object is read.
* @param writeObserver called when a state object is created or just before it is
* written to the first time in the snapshot or a nested mutable snapshot. This might be
* called several times for the same object if nested mutable snapshots are created.
*/
fun <T> observe(
readObserver: ((Any) -> Unit)? = null,
writeObserver: ((Any) -> Unit)? = null,
block: () -> T
): T {
if (readObserver != null || writeObserver != null) {
val currentSnapshot = threadSnapshot.get()
val snapshot =
if (currentSnapshot == null || currentSnapshot is MutableSnapshot)
TransparentObserverMutableSnapshot(
previousSnapshot = currentSnapshot as? MutableSnapshot,
specifiedReadObserver = readObserver,
specifiedWriteObserver = writeObserver,
mergeParentObservers = true,
ownsPreviousSnapshot = false
)
else if (readObserver == null) return block()
else currentSnapshot.takeNestedSnapshot(readObserver)
try {
return snapshot.enter(block)
} finally {
snapshot.dispose()
}
} else return block()
}
@PublishedApi
internal fun createNonObservableSnapshot(): Snapshot =
createTransparentSnapshotWithNoParentReadObserver(
previousSnapshot = threadSnapshot.get()
)
/**
* Passed [block] will be run with all the currently set snapshot read observers disabled.
*/
@Suppress("BanInlineOptIn") // Treat Kotlin Contracts as non-experimental.
@OptIn(ExperimentalContracts::class)
inline fun <T> withoutReadObservation(block: @DisallowComposableCalls () -> T): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
val snapshot = createNonObservableSnapshot()
try {
return snapshot.enter(block)
} finally {
snapshot.dispose()
}
}
/**
* Register an apply listener that is called back when snapshots are applied to the
* global state.
*
* @return [ObserverHandle] to unregister [observer].
*/
fun registerApplyObserver(observer: (Set<Any>, Snapshot) -> Unit): ObserverHandle {
// Ensure observer does not see changes before this call.
advanceGlobalSnapshot(emptyLambda)
sync {
applyObservers.add(observer)
}
return ObserverHandle {
sync {
applyObservers.remove(observer)
}
}
}
/**
* Register an observer of the first write to the global state of a global state object
* since the last call to [sendApplyNotifications].
*
* Composition uses this to schedule a new composition whenever a state object that
* was read in composition is modified.
*
* State objects can be sent to the apply observer that have not been sent to global write
* observers. This happens for state objects inside [MutableSnapshot] that is later
* applied by calling [MutableSnapshot.apply].
*
* This should only be used to determine if a call to [sendApplyNotifications] should be
* scheduled to be called.
*
* @return [ObserverHandle] to unregister [observer].
*/
fun registerGlobalWriteObserver(observer: ((Any) -> Unit)): ObserverHandle {
sync {
globalWriteObservers.add(observer)
}
advanceGlobalSnapshot()
return ObserverHandle {
sync {
globalWriteObservers.remove(observer)
}
advanceGlobalSnapshot()
}
}
/**
* Notify the snapshot that all objects created in this snapshot to this point should be
* considered initialized. If any state object is are modified passed this point it will
* appear as modified in the snapshot and any applicable snapshot write observer will be
* called for the object and the object will be part of the a set of mutated objects sent to
* any applicable snapshot apply observer.
*
* Unless [notifyObjectsInitialized] is called, state objects created in a snapshot are not
* considered modified by the snapshot even if they are modified after construction.
*
* Compose uses this between phases of composition to allow observing changes to state
* objects create in a previous phase.
*/
fun notifyObjectsInitialized() = currentSnapshot().notifyObjectsInitialized()
/**
* Send any pending apply notifications for state objects changed outside a snapshot.
*
* Apply notifications for state objects modified outside snapshot are deferred until method
* is called. This method is implicitly called whenever a non-nested [MutableSnapshot]
* is applied making its changes visible to all new, non-nested snapshots.
*
* Composition schedules this to be called after changes to state objects are
* detected an observer registered with [registerGlobalWriteObserver].
*/
fun sendApplyNotifications() {
val changes = sync {
currentGlobalSnapshot.get().modified?.isNotEmpty() == true
}
if (changes)
advanceGlobalSnapshot()
}
@InternalComposeApi
fun openSnapshotCount() = openSnapshots.toList().size
@PublishedApi
internal fun removeCurrent(): Snapshot? {
val previous = threadSnapshot.get()
if (previous != null) threadSnapshot.set(null)
return previous
}
@PublishedApi
internal fun restoreCurrent(previous: Snapshot?) {
if (previous != null) threadSnapshot.set(previous)
}
}
}
/**
* Pin the snapshot and invalid set.
*
* @return returns a handle that should be passed to [releasePinningLocked] when the snapshot closes or
* is disposed.
*/
internal fun trackPinning(id: Int, invalid: SnapshotIdSet): Int {
val pinned = invalid.lowest(id)
return sync {
pinningTable.add(pinned)
}
}
/**
* Release the [handle] returned by [trackPinning]
*/
internal fun releasePinningLocked(handle: Int) {
pinningTable.remove(handle)
}
/**
* A snapshot of the values return by mutable states and other state objects. All state object
* will have the same value in the snapshot as they had when the snapshot was created unless they
* are explicitly changed in the snapshot.
* To enter a snapshot call [enter]. The snapshot is the current snapshot as returned by
* [currentSnapshot] until the control returns from the lambda (or until a nested [enter] is
* called. All state objects will return the values associated with this snapshot, locally in the
* thread, until [enter] returns. All other threads are unaffected.
*
* All changes made in a [MutableSnapshot] are snapshot isolated from all other snapshots and
* their changes can only be seen globally, or by new shots, after [MutableSnapshot.apply] as been
* called.
*
* Snapshots can be nested by calling [takeNestedSnapshot] or
* [MutableSnapshot.takeNestedMutableSnapshot].
*
* @see Snapshot.takeMutableSnapshot
* @see androidx.compose.runtime.mutableStateOf
* @see androidx.compose.runtime.mutableStateListOf
* @see androidx.compose.runtime.mutableStateMapOf
*/
open class MutableSnapshot internal constructor(
id: Int,
invalid: SnapshotIdSet,
override val readObserver: ((Any) -> Unit)?,
override val writeObserver: ((Any) -> Unit)?
) : Snapshot(id, invalid) {
/**
* Whether there are any pending changes in this snapshot. These changes are not visible
* until the snapshot is applied.
*/
override fun hasPendingChanges(): Boolean = modified?.isNotEmpty() == true
/**
* Take a mutable snapshot of the state values in this snapshot. Entering this snapshot by
* calling [enter] allows state objects to be modified that are not visible to the this, the
* parent snapshot, until the [apply] is called.
*
* Applying a nested snapshot, by calling [apply], applies its change to, this, the parent
* snapshot. For a change to be visible globally, all the parent snapshots need to be applied
* until the root snapshot is applied to the global state.
*
* All nested snapshots need to be disposed by calling [dispose] before resources associated
* with this snapshot can be collected. Nested active snapshots are still valid after the parent
* has been disposed but calling [apply] will fail.
*/
open fun takeNestedMutableSnapshot(
readObserver: ((Any) -> Unit)? = null,
writeObserver: ((Any) -> Unit)? = null
): MutableSnapshot {
validateNotDisposed()
validateNotAppliedOrPinned()
return advance {
sync {
val newId = nextSnapshotId++
openSnapshots = openSnapshots.set(newId)
val currentInvalid = invalid
this.invalid = currentInvalid.set(newId)
NestedMutableSnapshot(
newId,
currentInvalid.addRange(id + 1, newId),
mergedReadObserver(readObserver, this.readObserver),
mergedWriteObserver(writeObserver, this.writeObserver),
this
)
}
}
}
/**
* Apply the changes made to state objects in this snapshot to the global state, or to the
* parent snapshot if this is a nested mutable snapshot.
*
* Once this method returns all changes made to this snapshot are atomically visible as the
* global state of the state object or to the parent snapshot.
*
* While a snapshot is active (after it is created but before [apply] or [dispose] is called)
* requires resources to track the values in the snapshot. Once a snapshot is no longer
* needed it should be either applied by calling [apply] or disposed by calling [dispose]. A
* snapshot that has been had is [apply] called can also have [dispose] called on it. However,
* calling [apply] after calling [dispose] will throw an exception.
*
* Leaving a snapshot active could cause hard to diagnose memory leaks values are maintained
* by state objects for unneeded snapshots. Take care to always call [dispose] on any snapshot.
*/
open fun apply(): SnapshotApplyResult {
// NOTE: the this algorithm is currently does not guarantee serializable snapshots as it
// doesn't prevent crossing writes as described here https://arxiv.org/pdf/1412.2324.pdf
// Just removing the snapshot from the active snapshot set is enough to make it part of the
// next snapshot, however, this should only be done after first determining that there are no
// colliding writes are being applied.
// A write is considered colliding if any write occurred in a state object in a snapshot
// applied since the snapshot was taken.
val modified = modified
val optimisticMerges = if (modified != null) optimisticMerges(
currentGlobalSnapshot.get(),
this,
openSnapshots.clear(currentGlobalSnapshot.get().id)
) else null
val (observers, globalModified) = sync {
validateOpen(this)
if (modified == null || modified.size == 0) {
closeLocked()
val previousGlobalSnapshot = currentGlobalSnapshot.get()
takeNewGlobalSnapshot(previousGlobalSnapshot, emptyLambda)
val globalModified = previousGlobalSnapshot.modified
if (globalModified != null && globalModified.isNotEmpty())
applyObservers.toMutableList() to globalModified
else
emptyList<(Set<Any>, Snapshot) -> Unit>() to null
} else {
val previousGlobalSnapshot = currentGlobalSnapshot.get()
val result = innerApplyLocked(
nextSnapshotId,
optimisticMerges,
openSnapshots.clear(previousGlobalSnapshot.id)
)
if (result != SnapshotApplyResult.Success) return result
closeLocked()
// Take a new global snapshot that includes this one.
takeNewGlobalSnapshot(previousGlobalSnapshot, emptyLambda)
val globalModified = previousGlobalSnapshot.modified
this.modified = null
previousGlobalSnapshot.modified = null
applyObservers.toMutableList() to globalModified
}
}
// Mark as applied
applied = true
// Notify any apply observers that changes applied were seen
if (globalModified != null && globalModified.isNotEmpty()) {
observers.fastForEach {
it(globalModified, this)
}
}
if (modified != null && modified.isNotEmpty()) {
observers.fastForEach {
it(modified, this)
}
}
// Wait to release pinned snapshots until after running observers.
// This permits observers to safely take a nested snapshot of the one that was just applied
// before unpinning records that need to be retained in this case.
sync {
releasePinnedSnapshotsForCloseLocked()
}
return SnapshotApplyResult.Success
}
override val readOnly: Boolean get() = false
override val root: Snapshot get() = this
override fun dispose() {
if (!disposed) {
super.dispose()
nestedDeactivated(this)
}
}
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
validateNotDisposed()
validateNotAppliedOrPinned()
val previousId = id
return advance {
sync {
val readonlyId = nextSnapshotId++
openSnapshots = openSnapshots.set(readonlyId)
NestedReadonlySnapshot(
readonlyId,
invalid.addRange(previousId + 1, readonlyId),
readObserver,
this
)
}
}
}
override fun nestedActivated(snapshot: Snapshot) { snapshots++ }
override fun nestedDeactivated(snapshot: Snapshot) {
require(snapshots > 0)
if (--snapshots == 0) {
if (!applied) {
abandon()
}
}
}
override fun notifyObjectsInitialized() {
if (applied || disposed) return
advance()
}
override fun closeLocked() {
// Remove itself and previous ids from the open set.
openSnapshots = openSnapshots.clear(id).andNot(previousIds)
}
override fun releasePinnedSnapshotsForCloseLocked() {
releasePreviouslyPinnedSnapshotsLocked()
super.releasePinnedSnapshotsForCloseLocked()
}
internal fun validateNotApplied() {
check(!applied) {
"Unsupported operation on a snapshot that has been applied"
}
}
internal fun validateNotAppliedOrPinned() {
check(!applied || isPinned) {
"Unsupported operation on a disposed or applied snapshot"
}
}
/**
* Abandon the snapshot. This does NOT [closeAndReleasePinning], which must be done
* as an additional step by callers.
*/
private fun abandon() {
val modified = modified
if (modified != null) {
validateNotApplied()
// Mark all state records created in this snapshot as invalid. This allows the snapshot
// id to be forgotten as no state records will refer to it.
this.modified = null
val id = id
for (state in modified) {
var current: StateRecord? = state.firstStateRecord
while (current != null) {
if (current.snapshotId == id || current.snapshotId in previousIds) {
current.snapshotId = INVALID_SNAPSHOT
}
current = current.next
}
}
}
// The snapshot can now be closed.
closeAndReleasePinning()
}
internal fun innerApplyLocked(
snapshotId: Int,
optimisticMerges: Map<StateRecord, StateRecord>?,
invalidSnapshots: SnapshotIdSet
): SnapshotApplyResult {
// This must be called in a synchronized block
// If there are modifications we need to ensure none of the modifications have
// collisions.
// A record is guaranteed not collide if no other write was performed to the record
// by an applied snapshot since this snapshot was taken. No writes to a state object
// occurred if, ignoring this snapshot, the readable records for the snapshots are
// the same. If they are different then there is a potential collision and the state
// object is asked if it can resolve the collision. If it can the updated state record
// is for the apply.
var mergedRecords: MutableList<Pair<StateObject, StateRecord>>? = null
val start = this.invalid.set(id).or(this.previousIds)
val modified = modified!!
var statesToRemove: MutableList<StateObject>? = null
for (state in modified) {
val first = state.firstStateRecord
// If either current or previous cannot be calculated the object was created
// in a nested snapshot that was committed then changed.
val current = readable(first, snapshotId, invalidSnapshots) ?: continue
val previous = readable(first, id, start) ?: continue
if (current != previous) {
val applied = readable(first, id, this.invalid) ?: readError()
val merged = optimisticMerges?.get(current) ?: run {
state.mergeRecords(previous, current, applied)
}
when (merged) {
null -> return SnapshotApplyResult.Failure(this)
applied -> {
// Nothing to do the merge policy says that the current changes
// obscure the current value so ignore the conflict
}
current -> {
(
mergedRecords ?: mutableListOf<Pair<StateObject, StateRecord>>().also {
mergedRecords = it
}
).add(state to current.create())
// If we revert to current then the state is no longer modified.
(
statesToRemove ?: mutableListOf<StateObject>().also {
statesToRemove = it
}
).add(state)
}
else -> {
(
mergedRecords ?: mutableListOf<Pair<StateObject, StateRecord>>().also {
mergedRecords = it
}
).add(
if (merged != previous) state to merged
else state to previous.create()
)
}
}
}
}
mergedRecords?.let {
// Ensure we have a new snapshot id
advance()
// Update all the merged records to have the new id.
it.fastForEach { merged ->
val (state, stateRecord) = merged
stateRecord.snapshotId = id
sync {
stateRecord.next = state.firstStateRecord
state.prependStateRecord(stateRecord)
}
}
}
statesToRemove?.let {
// Remove from modified any state objects that have reverted to the parent value.
modified.removeAll(it)
}
return SnapshotApplyResult.Success
}
internal inline fun <T> advance(block: () -> T): T {
recordPrevious(id)
return block().also {
// Only advance this snapshot if it's possible for it to be applied later,
// otherwise we don't need to bother.
// This simplifies tracking of open snapshots when an apply observer takes
// a nested snapshot of the snapshot that was just applied.
if (!applied && !disposed) {
val previousId = id
sync {
id = nextSnapshotId++
openSnapshots = openSnapshots.set(id)
}
invalid = invalid.addRange(previousId + 1, id)
}
}
}
internal fun advance(): Unit = advance { }
internal fun recordPrevious(id: Int) {
sync {
previousIds = previousIds.set(id)
}
}
internal fun recordPreviousPinnedSnapshot(id: Int) {
if (id >= 0)
previousPinnedSnapshots = previousPinnedSnapshots + id
}
internal fun recordPreviousPinnedSnapshots(handles: IntArray) {
// Avoid unnecessary copies implied by the `+` below.
if (handles.isEmpty()) return
val pinned = previousPinnedSnapshots
if (pinned.isEmpty()) previousPinnedSnapshots = handles
else previousPinnedSnapshots = pinned + handles
}
internal fun releasePreviouslyPinnedSnapshotsLocked() {
for (index in previousPinnedSnapshots.indices) {
releasePinningLocked(previousPinnedSnapshots[index])
}
}
internal fun recordPreviousList(snapshots: SnapshotIdSet) {
sync {
previousIds = previousIds.or(snapshots)
}
}
override fun recordModified(state: StateObject) {
(modified ?: HashSet<StateObject>().also { modified = it }).add(state)
}
override var modified: MutableSet<StateObject>? = null
/**
* A set of the id's previously associated with this snapshot. When this snapshot closes
* then these ids must be removed from the global as well.
*/
internal var previousIds: SnapshotIdSet = SnapshotIdSet.EMPTY
/**
* A list of the pinned snapshots handles that must be released by this snapshot
*/
internal var previousPinnedSnapshots: IntArray = IntArray(0)
/**
* The number of pending nested snapshots of this snapshot. To simplify the code, this
* snapshot it, itself, counted as its own nested snapshot.
*/
private var snapshots = 1
/**
* Tracks whether the snapshot has been applied.
*/
internal var applied = false
}
/**
* The result of a applying a mutable snapshot. [Success] indicates that the snapshot was
* successfully applied and is now visible as the global state of the state object (or visible
* in the parent snapshot for a nested snapshot). [Failure] indicates one or more state objects
* were modified by both this snapshot and in the global (or parent) snapshot, and the changes from
* this snapshot are **not** visible in the global or parent snapshot.
*/
sealed class SnapshotApplyResult {
/**
* Check the result of an apply. If the result is [Success] then this does does nothing. If
* the result is [Failure] then a [SnapshotApplyConflictException] exception is thrown. Once
* [check] as been called the snapshot is disposed.
*/
abstract fun check()
/**
* True if the result is [Success].
*/
abstract val succeeded: Boolean
object Success : SnapshotApplyResult() {
/**
* Check the result of a snapshot apply. Calling [check] on a [Success] result is a noop.
*/
override fun check() { }
override val succeeded: Boolean get() = true
}
class Failure(val snapshot: Snapshot) : SnapshotApplyResult() {
/**
* Check the result of a snapshot apply. Calling [check] on a [Failure] result throws a
* [SnapshotApplyConflictException] exception.
*/
override fun check() {
snapshot.dispose()
throw SnapshotApplyConflictException(snapshot)
}
override val succeeded: Boolean get() = false
}
}
/**
* The type returned by observer registration methods that unregisters the observer when it is
* disposed.
*/
@Suppress("CallbackName")
fun interface ObserverHandle {
/**
* Dispose the observer causing it to be unregistered from the snapshot system.
*/
fun dispose()
}
/**
* Return the thread's active snapshot. If no thread snapshot is active then the current global
* snapshot is used.
*/
internal fun currentSnapshot(): Snapshot =
threadSnapshot.get() ?: currentGlobalSnapshot.get()
/**
* An exception that is thrown when [SnapshotApplyResult.check] is called on a result of a
* [MutableSnapshot.apply] that fails to apply.
*/
class SnapshotApplyConflictException(
@Suppress("unused") val snapshot: Snapshot
) : Exception()
/**
* Snapshot local value of a state object.
*/
abstract class StateRecord {
/**
* The snapshot id of the snapshot in which the record was created.
*/
internal var snapshotId: Int = currentSnapshot().id
/**
* Reference of the next state record. State records are stored in a linked list.
*
* Changes to [next] must preserve all existing records to all threads even during
* intermediately changes. For example, it is safe to add the beginning or end of the list
* but adding to the middle requires care. First the new record must have its [next] updated
* then the [next] of its new predecessor can then be set to point to it. This implies that
* records that are already in the list cannot be moved in the list as this the change must
* be atomic to all threads that cannot happen without a lock which this list cannot afford.
*
* It is unsafe to remove a record as it might be in the process of being reused (see [used]).
* If a record is removed care must be taken to ensure that it is not being claimed by some
* other thread. This would require changes to [used].
*/
internal var next: StateRecord? = null
/**
* Copy the value into this state record from another for the same state object.
*/
abstract fun assign(value: StateRecord)
/**
* Create a new state record for the same state object.
*/
abstract fun create(): StateRecord
}
/**
* Interface implemented by all snapshot aware state objects. Used by this module to maintain the
* state records of a state object.
*/
@JvmDefaultWithCompatibility
interface StateObject {
/**
* The first state record in a linked list of state records.
*/
val firstStateRecord: StateRecord
/**
* Add a new state record to the beginning of a list. After this call [firstStateRecord] should
* be [value].
*/
fun prependStateRecord(value: StateRecord)
/**
* Produce a merged state based on the conflicting state changes.
*
* This method must not modify any of the records received and should treat the state records
* as immutable, even the [applied] record.
*
* @param previous the state record that was used to create the [applied] record and is a state
* that also (though indirectly) produced the [current] record.
*
* @param current the state record of the parent snapshot or global state.
*
* @param applied the state record that is being applied of the parent snapshot or global
* state.
*
* @return the modified state or `null` if the values cannot be merged. If the states cannot
* be merged the current apply will fail. Any of the parameters can be returned as a result.
* If it is not one of the parameter values then it *must* be a new value that is created by
* calling [StateRecord.create] on one of the records passed and then can be modified
* to have the merged value before being returned. If a new record is returned
* [MutableSnapshot.apply] will update the internal snapshot id and call
* [prependStateRecord] if the record is used.
*/
fun mergeRecords(
previous: StateRecord,
current: StateRecord,
applied: StateRecord
): StateRecord? = null
}
/**
* A snapshot whose state objects cannot be modified. If a state object is modified when in a
* read-only snapshot a [IllegalStateException] is thrown.
*/
internal class ReadonlySnapshot internal constructor(
id: Int,
invalid: SnapshotIdSet,
override val readObserver: ((Any) -> Unit)?
) : Snapshot(id, invalid) {
/**
* The number of nested snapshots that are active. To simplify the code, this snapshot counts
* itself as a nested snapshot.
*/
private var snapshots = 1
override val readOnly: Boolean get() = true
override val root: Snapshot get() = this
override fun hasPendingChanges(): Boolean = false
override val writeObserver: ((Any) -> Unit)? get() = null
override var modified: HashSet<StateObject>?
get() = null
@Suppress("UNUSED_PARAMETER")
set(value) = unsupported()
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
validateOpen(this)
return NestedReadonlySnapshot(id, invalid, readObserver, this)
}
override fun notifyObjectsInitialized() {
// Nothing to do for read-only snapshots
}
override fun dispose() {
if (!disposed) {
nestedDeactivated(this)
super.dispose()
}
}
override fun nestedActivated(snapshot: Snapshot) { snapshots++ }
override fun nestedDeactivated(snapshot: Snapshot) {
if (--snapshots == 0) {
// A read-only snapshot can be just be closed as it has no modifications.
closeAndReleasePinning()
}
}
override fun recordModified(state: StateObject) {
reportReadonlySnapshotWrite()
}
}
internal class NestedReadonlySnapshot(
id: Int,
invalid: SnapshotIdSet,
readObserver: ((Any) -> Unit)?,
val parent: Snapshot
) : Snapshot(id, invalid) {
init { parent.nestedActivated(this) }
override val readOnly get() = true
override val root: Snapshot get() = parent.root
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?) =
NestedReadonlySnapshot(id, invalid, readObserver, parent)
override fun notifyObjectsInitialized() {
// Nothing to do for read-only snapshots
}
override fun hasPendingChanges(): Boolean = false
override val readObserver: ((Any) -> Unit)? =
// Merge the read observers if necessary
readObserver?.let {
parent.readObserver?.let {
{ state: Any ->
readObserver(state)
it(state)
}
} ?: readObserver
} ?: parent.readObserver
override fun dispose() {
if (!disposed) {
if (id != parent.id) {
closeAndReleasePinning()
}
parent.nestedDeactivated(this)
super.dispose()
}
}
override val modified: HashSet<StateObject>? get() = null
override val writeObserver: ((Any) -> Unit)? get() = null
override fun recordModified(state: StateObject) = reportReadonlySnapshotWrite()
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
override fun nestedActivated(snapshot: Snapshot) = unsupported()
}
private val emptyLambda: (invalid: SnapshotIdSet) -> Unit = { }
/**
* A snapshot object that simplifies the code by treating the global state as a mutable snapshot.
*/
internal class GlobalSnapshot(id: Int, invalid: SnapshotIdSet) :
MutableSnapshot(
id, invalid, null,
sync {
// Take a defensive copy of the globalWriteObservers list. This then avoids having to
// synchronized access to writerObserver in places it is called and allows the list to
// change while notifications are being dispatched. Changes to globalWriteObservers force
// a new global snapshot to be created.
(
if (globalWriteObservers.isNotEmpty()) {
globalWriteObservers.toMutableList()
} else null
)?.let {
it.singleOrNull() ?: { state: Any ->
it.fastForEach { it(state) }
}
}
}
) {
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot =
takeNewSnapshot { invalid ->
ReadonlySnapshot(
id = sync { nextSnapshotId++ },
invalid = invalid,
readObserver = readObserver
)
}
override fun takeNestedMutableSnapshot(
readObserver: ((Any) -> Unit)?,
writeObserver: ((Any) -> Unit)?
): MutableSnapshot = takeNewSnapshot { invalid ->
MutableSnapshot(
id = sync { nextSnapshotId++ },
invalid = invalid,
// It is intentional that the global read observers are not merged with mutable
// snapshots read observers.
readObserver = readObserver,
// It is intentional that global write observers are not merged with mutable
// snapshots write observers.
writeObserver = writeObserver
)
}
override fun notifyObjectsInitialized() {
advanceGlobalSnapshot()
}
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
override fun nestedActivated(snapshot: Snapshot) = unsupported()
override fun apply(): SnapshotApplyResult =
error("Cannot apply the global snapshot directly. Call Snapshot.advanceGlobalSnapshot")
override fun dispose() {
sync {
releasePinnedSnapshotLocked()
}
}
}
/**
* A nested mutable snapshot created by [MutableSnapshot.takeNestedMutableSnapshot].
*/
internal class NestedMutableSnapshot(
id: Int,
invalid: SnapshotIdSet,
readObserver: ((Any) -> Unit)?,
writeObserver: ((Any) -> Unit)?,
val parent: MutableSnapshot
) : MutableSnapshot(id, invalid, readObserver, writeObserver) {
private var deactivated = false
init { parent.nestedActivated(this) }
override val root: Snapshot get() = parent.root
override fun dispose() {
if (!disposed) {
super.dispose()
deactivate()
}
}
override fun apply(): SnapshotApplyResult {
if (parent.applied || parent.disposed) return SnapshotApplyResult.Failure(this)
// Applying a nested mutable snapshot applies its changes to the parent snapshot.
// See MutableSnapshot.apply() for implantation notes.
// The apply observer notification are for applying to the global scope so it is elided
// here making this code a bit simpler than MutableSnapshot.apply.
val modified = modified
val id = id
val optimisticMerges = if (modified != null)
optimisticMerges(parent, this, parent.invalid)
else null
sync {
validateOpen(this)
if (modified == null || modified.size == 0) {
closeAndReleasePinning()
} else {
val result = innerApplyLocked(parent.id, optimisticMerges, parent.invalid)
if (result != SnapshotApplyResult.Success) return result
// Add all modified objects in this set to the parent
(
parent.modified ?: HashSet<StateObject>().also {
parent.modified = it
}
).addAll(modified)
}
// Ensure the parent is newer than the current snapshot
if (parent.id < id) {
parent.advance()
}
// Make the snapshot visible in the parent snapshot
parent.invalid = parent.invalid.clear(id).andNot(previousIds)
// Ensure the ids associated with this snapshot are also applied by the parent.
parent.recordPrevious(id)
parent.recordPreviousPinnedSnapshot(takeoverPinnedSnapshot())
parent.recordPreviousList(previousIds)
parent.recordPreviousPinnedSnapshots(previousPinnedSnapshots)
}
applied = true
deactivate()
return SnapshotApplyResult.Success
}
private fun deactivate() {
if (!deactivated) {
deactivated = true
parent.nestedDeactivated(this)
}
}
}
/**
* A pseudo snapshot that doesn't introduce isolation but does introduce observers.
*/
internal class TransparentObserverMutableSnapshot(
private val previousSnapshot: MutableSnapshot?,
internal val specifiedReadObserver: ((Any) -> Unit)?,
internal val specifiedWriteObserver: ((Any) -> Unit)?,
private val mergeParentObservers: Boolean,
private val ownsPreviousSnapshot: Boolean
) : MutableSnapshot(
INVALID_SNAPSHOT,
SnapshotIdSet.EMPTY,
mergedReadObserver(
specifiedReadObserver,
previousSnapshot?.readObserver ?: currentGlobalSnapshot.get().readObserver,
mergeParentObservers
),
mergedWriteObserver(
specifiedWriteObserver,
previousSnapshot?.writeObserver ?: currentGlobalSnapshot.get().writeObserver
)
) {
private val currentSnapshot: MutableSnapshot
get() = previousSnapshot ?: currentGlobalSnapshot.get()
override fun dispose() {
// Explicitly don't call super.dispose()
disposed = true
if (ownsPreviousSnapshot) {
previousSnapshot?.dispose()
}
}
override var id: Int
get() = currentSnapshot.id
@Suppress("UNUSED_PARAMETER")
set(value) { unsupported() }
override var invalid get() = currentSnapshot.invalid
@Suppress("UNUSED_PARAMETER")
set(value) = unsupported()
override fun hasPendingChanges(): Boolean = currentSnapshot.hasPendingChanges()
override var modified: MutableSet<StateObject>?
get() = currentSnapshot.modified
@Suppress("UNUSED_PARAMETER")
set(value) = unsupported()
override val readOnly: Boolean
get() = currentSnapshot.readOnly
override fun apply(): SnapshotApplyResult =
currentSnapshot.apply()
override fun recordModified(state: StateObject) =
currentSnapshot.recordModified(state)
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
val mergedReadObserver = mergedReadObserver(readObserver, this.readObserver)
return if (!mergeParentObservers) {
createTransparentSnapshotWithNoParentReadObserver(
previousSnapshot = currentSnapshot.takeNestedSnapshot(null),
readObserver = mergedReadObserver,
ownsPreviousSnapshot = true
)
} else {
currentSnapshot.takeNestedSnapshot(mergedReadObserver)
}
}
override fun takeNestedMutableSnapshot(
readObserver: ((Any) -> Unit)?,
writeObserver: ((Any) -> Unit)?
): MutableSnapshot {
val mergedReadObserver = mergedReadObserver(readObserver, this.readObserver)
val mergedWriteObserver = mergedWriteObserver(writeObserver, this.writeObserver)
return if (!mergeParentObservers) {
val nestedSnapshot = currentSnapshot.takeNestedMutableSnapshot(
readObserver = null,
writeObserver = mergedWriteObserver
)
TransparentObserverMutableSnapshot(
previousSnapshot = nestedSnapshot,
specifiedReadObserver = mergedReadObserver,
specifiedWriteObserver = mergedWriteObserver,
mergeParentObservers = false,
ownsPreviousSnapshot = true
)
} else {
currentSnapshot.takeNestedMutableSnapshot(
mergedReadObserver,
mergedWriteObserver
)
}
}
override fun notifyObjectsInitialized() = currentSnapshot.notifyObjectsInitialized()
/** Should never be called. */
override fun nestedActivated(snapshot: Snapshot) = unsupported()
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
}
/**
* A pseudo snapshot that doesn't introduce isolation but does introduce observers.
*/
internal class TransparentObserverSnapshot(
private val previousSnapshot: Snapshot?,
specifiedReadObserver: ((Any) -> Unit)?,
private val mergeParentObservers: Boolean,
private val ownsPreviousSnapshot: Boolean
) : Snapshot(
INVALID_SNAPSHOT,
SnapshotIdSet.EMPTY,
) {
override val readObserver: ((Any) -> Unit)? = mergedReadObserver(
specifiedReadObserver,
previousSnapshot?.readObserver ?: currentGlobalSnapshot.get().readObserver,
mergeParentObservers
)
override val writeObserver: ((Any) -> Unit)? = null
override val root: Snapshot = this
private val currentSnapshot: Snapshot
get() = previousSnapshot ?: currentGlobalSnapshot.get()
override fun dispose() {
// Explicitly don't call super.dispose()
disposed = true
if (ownsPreviousSnapshot) {
previousSnapshot?.dispose()
}
}
override var id: Int
get() = currentSnapshot.id
@Suppress("UNUSED_PARAMETER")
set(value) { unsupported() }
override var invalid get() = currentSnapshot.invalid
@Suppress("UNUSED_PARAMETER")
set(value) = unsupported()
override fun hasPendingChanges(): Boolean = currentSnapshot.hasPendingChanges()
override var modified: MutableSet<StateObject>?
get() = currentSnapshot.modified
@Suppress("UNUSED_PARAMETER")
set(value) = unsupported()
override val readOnly: Boolean
get() = currentSnapshot.readOnly
override fun recordModified(state: StateObject) =
currentSnapshot.recordModified(state)
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
val mergedReadObserver = mergedReadObserver(readObserver, this.readObserver)
return if (!mergeParentObservers) {
createTransparentSnapshotWithNoParentReadObserver(
currentSnapshot.takeNestedSnapshot(null),
mergedReadObserver,
ownsPreviousSnapshot = true
)
} else {
currentSnapshot.takeNestedSnapshot(mergedReadObserver)
}
}
override fun notifyObjectsInitialized() = currentSnapshot.notifyObjectsInitialized()
/** Should never be called. */
override fun nestedActivated(snapshot: Snapshot) = unsupported()
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
}
private fun createTransparentSnapshotWithNoParentReadObserver(
previousSnapshot: Snapshot?,
readObserver: ((Any) -> Unit)? = null,
ownsPreviousSnapshot: Boolean = false
): Snapshot = if (previousSnapshot is MutableSnapshot || previousSnapshot == null) {
TransparentObserverMutableSnapshot(
previousSnapshot = previousSnapshot as? MutableSnapshot,
specifiedReadObserver = readObserver,
specifiedWriteObserver = null,
mergeParentObservers = false,
ownsPreviousSnapshot = ownsPreviousSnapshot
)
} else {
TransparentObserverSnapshot(
previousSnapshot = previousSnapshot,
specifiedReadObserver = readObserver,
mergeParentObservers = false,
ownsPreviousSnapshot = ownsPreviousSnapshot
)
}
private fun mergedReadObserver(
readObserver: ((Any) -> Unit)?,
parentObserver: ((Any) -> Unit)?,
mergeReadObserver: Boolean = true
): ((Any) -> Unit)? {
@Suppress("NAME_SHADOWING")
val parentObserver = if (mergeReadObserver) parentObserver else null
return if (readObserver != null && parentObserver != null && readObserver != parentObserver) {
{ state: Any ->
readObserver(state)
parentObserver(state)
}
} else readObserver ?: parentObserver
}
private fun mergedWriteObserver(
writeObserver: ((Any) -> Unit)?,
parentObserver: ((Any) -> Unit)?
): ((Any) -> Unit)? =
if (writeObserver != null && parentObserver != null && writeObserver != parentObserver) {
{ state: Any ->
writeObserver(state)
parentObserver(state)
}
} else writeObserver ?: parentObserver
/**
* Snapshot id of `0` is reserved as invalid and no state record with snapshot `0` is considered
* valid.
*
* The value `0` was chosen as it is the default value of the Int snapshot id type and records
* initially created will naturally have a snapshot id of 0. If this wasn't considered invalid
* adding such a record to a state object will make the state record immediately visible to the
* snapshots instead of being born invalid. Using `0` ensures all state records are created invalid
* and must be explicitly marked as valid in to be visible in a snapshot.
*/
private const val INVALID_SNAPSHOT = 0
/**
* Current thread snapshot
*/
private val threadSnapshot = SnapshotThreadLocal<Snapshot>()
/**
* A global synchronization object. This synchronization object should be taken before modifying any
* of the fields below.
*/
@PublishedApi
internal val lock = Any()
@PublishedApi
internal inline fun <T> sync(block: () -> T): T = synchronized(lock, block)
// The following variables should only be written when sync is taken
/**
* A set of snapshots that are currently open and should be considered invalid for new snapshots.
*/
private var openSnapshots = SnapshotIdSet.EMPTY
/** The first snapshot created must be at least on more than the INVALID_SNAPSHOT */
private var nextSnapshotId = INVALID_SNAPSHOT + 1
/**
* A tracking table for pinned snapshots. A pinned snapshot is the lowest snapshot id that the
* snapshot is ignoring by considering them invalid. This is used to calculate when a snapshot
* record can be reused.
*/
private val pinningTable = SnapshotDoubleIndexHeap()
/** A list of apply observers */
private val applyObservers = mutableListOf<(Set<Any>, Snapshot) -> Unit>()
/** A list of observers of writes to the global state. */
private val globalWriteObservers = mutableListOf<((Any) -> Unit)>()
private val currentGlobalSnapshot = AtomicReference(
GlobalSnapshot(
id = nextSnapshotId++,
invalid = SnapshotIdSet.EMPTY
).also {
openSnapshots = openSnapshots.set(it.id)
}
)
/**
* A value to use to initialize the snapshot local variable of writable below. The value of this
* doesn't matter as it is just used to initialize the local that is immediately overwritten by
* Snapshot.current. This is done to avoid a compiler error complaining that the var has not been
* initialized. This can be removed once contracts are out of experimental; then we can mark sync
* with the correct contracts so the compiler would be able to figure out that the variable is
* initialized.
*/
@PublishedApi
internal val snapshotInitializer: Snapshot = currentGlobalSnapshot.get()
private fun <T> takeNewGlobalSnapshot(
previousGlobalSnapshot: Snapshot,
block: (invalid: SnapshotIdSet) -> T
): T {
// Deactivate global snapshot. It is safe to just deactivate it because it cannot have
// any conflicting writes as it is always closed before another snapshot is taken.
val result = block(openSnapshots.clear(previousGlobalSnapshot.id))
sync {
val globalId = nextSnapshotId++
openSnapshots = openSnapshots.clear(previousGlobalSnapshot.id)
currentGlobalSnapshot.set(
GlobalSnapshot(
id = globalId,
invalid = openSnapshots
)
)
previousGlobalSnapshot.dispose()
openSnapshots = openSnapshots.set(globalId)
}
return result
}
private fun <T> advanceGlobalSnapshot(block: (invalid: SnapshotIdSet) -> T): T {
var previousGlobalSnapshot = snapshotInitializer as GlobalSnapshot
val result = sync {
previousGlobalSnapshot = currentGlobalSnapshot.get()
takeNewGlobalSnapshot(previousGlobalSnapshot, block)
}
// If the previous global snapshot had any modified states then notify the registered apply
// observers.
val modified = previousGlobalSnapshot.modified
if (modified != null) {
val observers: List<(Set<Any>, Snapshot) -> Unit> = sync { applyObservers.toMutableList() }
observers.fastForEach { observer ->
observer(modified, previousGlobalSnapshot)
}
}
return result
}
private fun advanceGlobalSnapshot() = advanceGlobalSnapshot { }
private fun <T : Snapshot> takeNewSnapshot(block: (invalid: SnapshotIdSet) -> T): T =
advanceGlobalSnapshot { invalid ->
val result = block(invalid)
sync {
openSnapshots = openSnapshots.set(result.id)
}
result
}
private fun validateOpen(snapshot: Snapshot) {
if (!openSnapshots.get(snapshot.id)) error("Snapshot is not open")
}
/**
* A candidate snapshot is valid if the it is less than or equal to the current snapshot
* and it wasn't specifically marked as invalid when the snapshot started.
*
* All snapshot active at when the snapshot was taken considered invalid for the snapshot
* (they have not been applied and therefore are considered invalid).
*
* All snapshots taken after the current snapshot are considered invalid since they where taken
* after the current snapshot was taken.
*
* INVALID_SNAPSHOT is reserved as an invalid snapshot id.
*/
private fun valid(currentSnapshot: Int, candidateSnapshot: Int, invalid: SnapshotIdSet): Boolean {
return candidateSnapshot != INVALID_SNAPSHOT && candidateSnapshot <= currentSnapshot &&
!invalid.get(candidateSnapshot)
}
// Determine if the given data is valid for the snapshot.
private fun valid(data: StateRecord, snapshot: Int, invalid: SnapshotIdSet): Boolean {
return valid(snapshot, data.snapshotId, invalid)
}
private fun <T : StateRecord> readable(r: T, id: Int, invalid: SnapshotIdSet): T? {
// The readable record is the valid record with the highest snapshotId
var current: StateRecord? = r
var candidate: StateRecord? = null
while (current != null) {
if (valid(current, id, invalid)) {
candidate = if (candidate == null) current
else if (candidate.snapshotId < current.snapshotId) current else candidate
}
current = current.next
}
if (candidate != null) {
@Suppress("UNCHECKED_CAST")
return candidate as T
}
return null
}
/**
* Return the current readable state record for the current snapshot. It is assumed that [this]
* is the first record of [state]
*/
fun <T : StateRecord> T.readable(state: StateObject): T {
val snapshot = Snapshot.current
snapshot.readObserver?.invoke(state)
return readable(this, snapshot.id, snapshot.invalid) ?: sync {
// Readable can return null when the global snapshot has been advanced by another thread
// and state written to the object was overwritten while this thread was paused. Repeating
// the read is valid here as either this will return the same result as the previous call
// or will find a valid record. Being in a sync block prevents other threads from writing
// to this state object until the read completes.
val syncSnapshot = Snapshot.current
readable(this, syncSnapshot.id, syncSnapshot.invalid)
} ?: readError()
}
/**
* Return the current readable state record for the [snapshot]. It is assumed that [this]
* is the first record of [state]
*/
fun <T : StateRecord> T.readable(state: StateObject, snapshot: Snapshot): T {
// invoke the observer associated with the current snapshot.
snapshot.readObserver?.invoke(state)
return readable(this, snapshot.id, snapshot.invalid) ?: readError()
}
private fun readError(): Nothing {
error(
"Reading a state that was created after the snapshot was taken or in a snapshot that " +
"has not yet been applied"
)
}
/**
* A record can be reused if no other snapshot will see it as valid. This is always true for a
* record created in an abandoned snapshot. It is also true if the record is valid in the
* previous snapshot and is obscured by another record also valid in the previous state record.
*/
private fun used(state: StateObject): StateRecord? {
var current: StateRecord? = state.firstStateRecord
var validRecord: StateRecord? = null
val reuseLimit = pinningTable.lowestOrDefault(nextSnapshotId) - 1
val invalid = SnapshotIdSet.EMPTY
while (current != null) {
val currentId = current.snapshotId
if (currentId == INVALID_SNAPSHOT) {
// Any records that were marked invalid by an abandoned snapshot can be used
// immediately.
return current
}
if (valid(current, reuseLimit, invalid)) {
if (validRecord == null) {
validRecord = current
} else {
// If we have two valid records one must obscure the other. Return the
// record with the lowest id
return if (current.snapshotId < validRecord.snapshotId) current else validRecord
}
}
current = current.next
}
return null
}
@PublishedApi
internal fun <T : StateRecord> T.writableRecord(state: StateObject, snapshot: Snapshot): T {
if (snapshot.readOnly) {
// If the snapshot is read-only, use the snapshot recordModified to report it.
snapshot.recordModified(state)
}
val id = snapshot.id
val readData = readable(this, id, snapshot.invalid) ?: readError()
// If the readable data was born in this snapshot, it is writable.
if (readData.snapshotId == snapshot.id) return readData
// Otherwise, make a copy of the readable data and mark it as born in this snapshot, making it
// writable.
val newData = readData.newWritableRecord(state, snapshot)
snapshot.recordModified(state)
return newData
}
internal fun <T : StateRecord> T.overwritableRecord(
state: StateObject,
snapshot: Snapshot,
candidate: T
): T {
if (snapshot.readOnly) {
// If the snapshot is read-only, use the snapshot recordModified to report it.
snapshot.recordModified(state)
}
val id = snapshot.id
if (candidate.snapshotId == id) return candidate
val newData = newOverwritableRecord(state)
newData.snapshotId = id
snapshot.recordModified(state)
return newData
}
internal fun <T : StateRecord> T.newWritableRecord(state: StateObject, snapshot: Snapshot): T {
// Calling used() on a state object might return the same record for each thread calling
// used() therefore selecting the record to reuse should be guarded.
// Note: setting the snapshotId to Int.MAX_VALUE will make it invalid for all snapshots.
// This means the lock can be released as used() will no longer select it. Using id could
// also be used but it puts the object into a state where the reused value appears to be
// the current valid value for the snapshot. This is not an issue if the snapshot is only
// being read from a single thread but using Int.MAX_VALUE allows multiple readers,
// single writer, of a snapshot. Note that threads reading a mutating snapshot should not
// cache the result of readable() as the mutating thread calls to writable() can change the
// result of readable().
@Suppress("UNCHECKED_CAST")
val newData = newOverwritableRecord(state)
newData.assign(this)
newData.snapshotId = snapshot.id
return newData
}
internal fun <T : StateRecord> T.newOverwritableRecord(state: StateObject): T {
// Calling used() on a state object might return the same record for each thread calling
// used() therefore selecting the record to reuse should be guarded.
// Note: setting the snapshotId to Int.MAX_VALUE will make it invalid for all snapshots.
// This means the lock can be released as used() will no longer select it. Using id could
// also be used but it puts the object into a state where the reused value appears to be
// the current valid value for the snapshot. This is not an issue if the snapshot is only
// being read from a single thread but using Int.MAX_VALUE allows multiple readers,
// single writer, of a snapshot. Note that threads reading a mutating snapshot should not
// cache the result of readable() as the mutating thread calls to writable() can change the
// result of readable().
@Suppress("UNCHECKED_CAST")
return (used(state) as T?)?.apply {
snapshotId = Int.MAX_VALUE
} ?: create().apply {
snapshotId = Int.MAX_VALUE
this.next = state.firstStateRecord
state.prependStateRecord(this as T)
} as T
}
@PublishedApi
internal fun notifyWrite(snapshot: Snapshot, state: StateObject) {
snapshot.writeObserver?.invoke(state)
}
/**
* Call [block] with a writable state record for [snapshot] of the given record. It is
* assumed that this is called for the first state record in a state object. If the snapshot is
* read-only calling this will throw.
*/
inline fun <T : StateRecord, R> T.writable(
state: StateObject,
snapshot: Snapshot,
block: T.() -> R
): R {
// A writable record will always be the readable record (as all newer records are invalid it
// must be the newest valid record). This means that if the readable record is not from the
// current snapshot, a new record must be created. To create a new writable record, a record
// can be reused, if possible, and the readable record is applied to it. If a record cannot
// be reused, a new record is created and the readable record is applied to it. Once the
// values are correct the record is made live by giving it the current snapshot id.
// Writes need to be in a `sync` block as all writes in flight must be completed before a new
// snapshot is take. Writing in a sync block ensures this is the case because new snapshots
// are also in a sync block.
return sync {
this.writableRecord(state, snapshot).block()
}.also { notifyWrite(snapshot, state) }
}
/**
* Call [block] with a writable state record for the given record. It is assumed that this is
* called for the first state record in a state object. A record is writable if it was created in
* the current mutable snapshot.
*/
inline fun <T : StateRecord, R> T.writable(state: StateObject, block: T.() -> R): R {
var snapshot: Snapshot = snapshotInitializer
return sync {
snapshot = Snapshot.current
this.writableRecord(state, snapshot).block()
}.also {
notifyWrite(snapshot, state)
}
}
/**
* Call [block] with a writable state record for the given record. It is assumed that this is
* called for the first state record in a state object. A record is writable if it was created in
* the current mutable snapshot. This should only be used when the record will be overwritten in
* its entirety (such as having only one field and that field is written to).
*
* WARNING: If the caller doesn't overwrite all the fields in the state record the object will be
* inconsistent and the fields not written are almost guaranteed to be incorrect. If it is
* possible that [block] will not write to all the fields use [writable] instead.
*
* @param state The object that has this record in its record list.
* @param candidate The current for the snapshot record returned by [withCurrent]
* @param block The block that will mutate all the field of the record.
*/
internal inline fun <T : StateRecord, R> T.overwritable(
state: StateObject,
candidate: T,
block: T.() -> R
): R {
var snapshot: Snapshot = snapshotInitializer
return sync {
snapshot = Snapshot.current
this.overwritableRecord(state, snapshot, candidate).block()
}.also {
notifyWrite(snapshot, state)
}
}
/**
* Produce a set of optimistic merges of the state records, this is performed outside the
* a synchronization block to reduce the amount of time taken in the synchronization block
* reducing the thread contention of merging state values.
*/
private fun optimisticMerges(
currentSnapshot: MutableSnapshot,
applyingSnapshot: MutableSnapshot,
invalidSnapshots: SnapshotIdSet
): Map<StateRecord, StateRecord>? {
val modified = applyingSnapshot.modified
val id = currentSnapshot.id
if (modified == null) return null
val start = applyingSnapshot.invalid.set(applyingSnapshot.id).or(applyingSnapshot.previousIds)
var result: MutableMap<StateRecord, StateRecord>? = null
for (state in modified) {
val first = state.firstStateRecord
val current = readable(first, id, invalidSnapshots) ?: continue
val previous = readable(first, id, start) ?: continue
if (current != previous) {
// Try to produce a merged state record
val applied = readable(first, applyingSnapshot.id, applyingSnapshot.invalid)
?: readError()
val merged = state.mergeRecords(previous, current, applied)
if (merged != null) {
(
result ?: hashMapOf<StateRecord, StateRecord>().also {
result = it
}
)[current] = merged
} else {
// If one fails don't bother calculating the others as they are likely not going
// to be used. There is an unlikely case that a optimistic merge cannot be
// produced but the snapshot will apply because, once the synchronization is taken,
// the current state can be merge. This routine errors on the side of reduced
// overall work by not performing work that is likely to be ignored.
return null
}
}
}
return result
}
private fun reportReadonlySnapshotWrite(): Nothing {
error("Cannot modify a state object in a read-only snapshot")
}
/**
* Returns the current record without notifying any read observers.
*/
@PublishedApi
internal fun <T : StateRecord> current(r: T, snapshot: Snapshot) =
readable(r, snapshot.id, snapshot.invalid) ?: readError()
@PublishedApi
internal fun <T : StateRecord> current(r: T) =
Snapshot.current.let { snapshot ->
readable(r, snapshot.id, snapshot.invalid) ?: sync {
Snapshot.current.let { syncSnapshot ->
readable(r, syncSnapshot.id, syncSnapshot.invalid)
}
} ?: readError()
}
/**
* Provides a [block] with the current record, without notifying any read observers.
*
* @see readable
*/
inline fun <T : StateRecord, R> T.withCurrent(block: (r: T) -> R): R =
block(current(this))
/**
* Helper routine to add a range of values ot a snapshot set
*/
internal fun SnapshotIdSet.addRange(from: Int, until: Int): SnapshotIdSet {
var result = this
for (invalidId in from until until)
result = result.set(invalidId)
return result
}