blob: 89f71605fb7183a68161daeacc9962c9b44edf63 [file] [log] [blame]
/*
* Copyright 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.datastore.core
import androidx.datastore.core.handlers.NoOpCorruptionHandler
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.completeWith
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.WhileSubscribed
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.dropWhile
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
/**
* Multi process implementation of DataStore. It is multi-process safe.
*/
internal class DataStoreImpl<T>(
private val storage: Storage<T>,
/**
* The list of initialization tasks to perform. These tasks will be completed before any data
* is published to the data and before any read-modify-writes execute in updateData. If
* any of the tasks fail, the tasks will be run again the next time data is collected or
* updateData is called. Init tasks should not wait on results from data - this will
* result in deadlock.
*/
initTasksList: List<suspend (api: InitializerApi<T>) -> Unit> = emptyList(),
/**
* The handler of [CorruptionException]s when they are thrown during reads or writes. It
* produces the new data to replace the corrupted data on disk. By default it is a no-op which
* simply throws the exception and does not produce new data.
*/
private val corruptionHandler: CorruptionHandler<T> = NoOpCorruptionHandler(),
private val scope: CoroutineScope = CoroutineScope(ioDispatcher() + SupervisorJob())
) : DataStore<T> {
/**
* Shared flow responsible for observing [InterProcessCoordinator] for file changes.
* Each downstream [data] flow collects on this [kotlinx.coroutines.flow.SharedFlow] to ensure
* we observe the [InterProcessCoordinator] when there is an active collection on the [data].
*/
private val updateCollection = flow<Unit> {
// deferring 1 flow so we can create coordinator lazily just to match existing behavior.
// also wait for initialization to complete before watching update events.
readAndInit.awaitComplete()
coordinator.updateNotifications.conflate().collect {
val currentState = inMemoryCache.currentState
if (currentState !is Final) {
// update triggered reads should always wait for lock
readDataAndUpdateCache(requireLock = true)
}
}
}.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(
stopTimeout = Duration.ZERO,
replayExpiration = Duration.ZERO
),
replay = 0
)
/**
* The actual values of DataStore. This is exposed in the API via [data] to be able to combine
* its lifetime with IPC update collection ([updateCollection]).
*/
private val internalDataFlow: Flow<T> = flow {
/**
* If downstream flow is UnInitialized, no data has been read yet, we need to trigger a new
* read then start emitting values once we have seen a new value (or exception).
*
* If downstream flow has a ReadException, there was an exception last time we tried to read
* data. We need to trigger a new read then start emitting values once we have seen a new
* value (or exception).
*
* If downstream flow has Data, we should start emitting from downstream flow as long as its
* version is not stale compared to the version read from the shared counter when we enter
* the flow.
*
* If Downstream flow is Final, the scope has been cancelled so the data store is no
* longer usable. We should just propagate this exception.
*
* State always starts at null. null can transition to ReadException, Data or
* Final. ReadException can transition to another ReadException, Data or Final.
* Data can transition to another Data or Final. Final will not change.
*/
// the first read should not be blocked by ongoing writes, so it can be dirty read. If it is
// a unlocked read, the same value might be emitted to the flow again
val startState = readState(requireLock = false)
when (startState) {
is Data<T> -> emit(startState.value)
is UnInitialized -> error(BUG_MESSAGE)
is ReadException<T> -> throw startState.readException
// TODO(b/273990827): decide the contract of accessing when state is Final
is Final -> return@flow
}
emitAll(
inMemoryCache.flow.takeWhile {
// end the flow if we reach the final value
it !is Final
}.dropWhile {
it is Data && it.version <= startState.version
}.map {
when (it) {
is ReadException<T> -> throw it.readException
is Data<T> -> it.value
is Final<T>, is UnInitialized -> error(
BUG_MESSAGE
)
}
}
)
}
override val data: Flow<T> = channelFlow {
val updateCollector = launch(start = CoroutineStart.LAZY) {
updateCollection.collect {
// collect it infinitely so it keeps running as long as the data flow is active.
}
}
internalDataFlow
.onStart { updateCollector.start() }
.onCompletion { updateCollector.cancel() }
.collect {
send(it)
}
}
override suspend fun updateData(transform: suspend (t: T) -> T): T {
val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
parentContextElement?.checkNotUpdating(this)
val childContextElement = UpdatingDataContextElement(
parent = parentContextElement,
instance = this
)
return withContext(childContextElement) {
val ack = CompletableDeferred<T>()
val currentDownStreamFlowState = inMemoryCache.currentState
val updateMsg =
Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
writeActor.offer(updateMsg)
ack.await()
}
}
// cache is only set by the reads who have file lock, so cache always has stable data
private val inMemoryCache = DataStoreInMemoryCache<T>()
private val readAndInit = InitDataStore(initTasksList)
// TODO(b/269772127): make this private after we allow multiple instances of DataStore on the
// same file
private val storageConnectionDelegate = lazy {
storage.createConnection()
}
internal val storageConnection by storageConnectionDelegate
private val coordinator: InterProcessCoordinator by lazy { storageConnection.coordinator }
private val writeActor = SimpleActor<Message.Update<T>>(
scope = scope,
onComplete = {
// We expect it to always be non-null but we will leave the alternative as a no-op
// just in case.
it?.let {
inMemoryCache.tryUpdate(Final(it))
}
// don't try to close storage connection if it was not created in the first place.
if (storageConnectionDelegate.isInitialized()) {
storageConnection.close()
}
},
onUndeliveredElement = { msg, ex ->
msg.ack.completeExceptionally(
ex ?: CancellationException(
"DataStore scope was cancelled before updateData could complete"
)
)
}
) { msg ->
handleUpdate(msg)
}
private suspend fun readState(requireLock: Boolean): State<T> =
withContext(scope.coroutineContext) {
if (inMemoryCache.currentState is Final) {
// if state is Final, just return it
inMemoryCache.currentState
} else {
try {
// make sure we initialize properly before reading from file.
readAndInitOrPropagateAndThrowFailure()
} catch (throwable: Throwable) {
// init or read failed, it is already updated in the cached value
// so we don't need to do anything.
return@withContext ReadException(throwable, -1)
}
// after init, try to read again. If the init run for this block, it won't re-read
// the file and use cache, so this is an OK call to make wrt performance.
readDataAndUpdateCache(requireLock)
}
}
private suspend fun handleUpdate(update: Message.Update<T>) {
update.ack.completeWith(
runCatching {
val result: T
when (val currentState = inMemoryCache.currentState) {
is Data -> {
// We are already initialized, we just need to perform the update
result = transformAndWrite(update.transform, update.callerContext)
}
is ReadException, is UnInitialized -> {
if (currentState === update.lastState) {
// we need to try to read again
readAndInitOrPropagateAndThrowFailure()
// We've successfully read, now we need to perform the update
result = transformAndWrite(update.transform, update.callerContext)
} else {
// Someone else beat us to read but also failed. We just need to
// signal the writer that is waiting on ack.
// This cast is safe because we can't be in the UnInitialized
// state if the state has changed.
throw (currentState as ReadException).readException
}
}
is Final -> throw currentState.finalException // won't happen
}
result
}
)
}
private suspend fun readAndInitOrPropagateAndThrowFailure() {
val preReadVersion = coordinator.getVersion()
try {
readAndInit.runIfNeeded()
} catch (throwable: Throwable) {
inMemoryCache.tryUpdate(ReadException(throwable, preReadVersion))
throw throwable
}
}
/**
* Reads the file and updates the cache unless current cached value is Data and its version is
* equal to the latest version, or it is unable to get lock.
*
* Calling this method when state is UnInitialized is a bug and this method will throw if that
* happens.
*/
private suspend fun readDataAndUpdateCache(requireLock: Boolean): State<T> {
// Check if the cached version matches with shared memory counter
val currentState = inMemoryCache.currentState
// should not call this without initialization first running
check(currentState !is UnInitialized) {
BUG_MESSAGE
}
val latestVersion = coordinator.getVersion()
val cachedVersion = if (currentState is Data) currentState.version else -1
// Return cached value if cached version is latest
if (currentState is Data && latestVersion == cachedVersion) {
return currentState
}
val (newState, acquiredLock) =
if (requireLock) {
coordinator.lock {
try {
readDataOrHandleCorruption(hasWriteFileLock = true)
} catch (ex: Throwable) {
ReadException<T>(ex, coordinator.getVersion())
} to true
}
} else {
coordinator.tryLock { locked ->
try {
readDataOrHandleCorruption(locked)
} catch (ex: Throwable) {
ReadException<T>(
ex, if (locked) coordinator.getVersion() else cachedVersion
)
} to locked
}
}
if (acquiredLock) {
inMemoryCache.tryUpdate(newState)
}
return newState
}
// Caller is responsible for (try to) getting file lock. It reads from the file directly without
// checking shared counter version and returns serializer default value if file is not found.
private suspend fun readDataFromFileOrDefault(): T {
return storageConnection.readData()
}
private suspend fun transformAndWrite(
transform: suspend (t: T) -> T,
callerContext: CoroutineContext
): T = coordinator.lock {
val curData = readDataOrHandleCorruption(hasWriteFileLock = true)
val newData = withContext(callerContext) { transform(curData.value) }
// Check that curData has not changed...
curData.checkHashCode()
if (curData != newData) {
writeData(newData, updateCache = true)
}
newData
}
// Write data to disk and return the corresponding version if succeed.
internal suspend fun writeData(newData: T, updateCache: Boolean): Int {
var newVersion = 0
// The code in `writeScope` is run synchronously, i.e. the newVersion isn't returned until
// the code in `writeScope` completes.
storageConnection.writeScope {
// update version before write to file to avoid the case where if update version after
// file write, the process can crash after file write but before version increment, so
// the readers might skip reading forever because the version isn't changed
newVersion = coordinator.incrementAndGetVersion()
writeData(newData)
if (updateCache) {
inMemoryCache.tryUpdate(Data(newData, newData.hashCode(), newVersion))
}
}
return newVersion
}
private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> {
try {
return if (hasWriteFileLock) {
val data = readDataFromFileOrDefault()
Data(data, data.hashCode(), version = coordinator.getVersion())
} else {
val preLockVersion = coordinator.getVersion()
coordinator.tryLock { locked ->
val data = readDataFromFileOrDefault()
val version = if (locked) coordinator.getVersion() else preLockVersion
Data(
data,
data.hashCode(),
version
)
}
}
} catch (ex: CorruptionException) {
var newData: T = corruptionHandler.handleCorruption(ex)
var version: Int // initialized inside the try block
try {
doWithWriteFileLock(hasWriteFileLock) {
// Confirms the file is still corrupted before overriding
try {
newData = readDataFromFileOrDefault()
version = coordinator.getVersion()
} catch (ignoredEx: CorruptionException) {
version = writeData(newData, updateCache = true)
}
}
} catch (writeEx: Throwable) {
// If we fail to write the handled data, add the new exception as a suppressed
// exception.
ex.addSuppressed(writeEx)
throw ex
}
// If we reach this point, we've successfully replaced the data on disk with newData.
return Data(newData, newData.hashCode(), version)
}
}
@OptIn(ExperimentalContracts::class)
private suspend fun <R> doWithWriteFileLock(
hasWriteFileLock: Boolean,
block: suspend () -> R
): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return if (hasWriteFileLock) {
block()
} else {
coordinator.lock { block() }
}
}
private inner class InitDataStore(
initTasksList: List<suspend (api: InitializerApi<T>) -> Unit>
) : RunOnce() {
// cleaned after initialization is complete
private var initTasks: List<suspend (api: InitializerApi<T>) -> Unit>? =
initTasksList.toList()
override suspend fun doRun() {
val initData = if ((initTasks == null) || initTasks!!.isEmpty()) {
// if there are no init tasks, we can directly read
readDataOrHandleCorruption(hasWriteFileLock = false)
} else {
// if there are init tasks, we need to obtain a lock to ensure migrations
// run as 1 chunk
coordinator.lock {
val updateLock = Mutex()
var initializationComplete = false
var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value
val api = object : InitializerApi<T> {
override suspend fun updateData(transform: suspend (t: T) -> T): T {
return updateLock.withLock {
check(!initializationComplete) {
"InitializerApi.updateData should not be called after " +
"initialization is complete."
}
val newData = transform(currentData)
if (newData != currentData) {
writeData(newData, updateCache = false)
currentData = newData
}
currentData
}
}
}
initTasks?.forEach { it(api) }
// Init tasks have run successfully, we don't need them anymore.
initTasks = null
updateLock.withLock {
initializationComplete = true
}
// only to make compiler happy
Data(
value = currentData,
hashCode = currentData.hashCode(),
version = coordinator.getVersion()
)
}
}
inMemoryCache.tryUpdate(initData)
}
}
companion object {
private const val BUG_MESSAGE = "This is a bug in DataStore. Please file a bug at: " +
"https://issuetracker.google.com/issues/new?component=907884&template=1466542"
}
}
/**
* Helper class that executes [doRun] up to 1 time to completion. If it fails, it will be retried
* in the next [runIfNeeded] call.
*/
internal abstract class RunOnce {
private val runMutex = Mutex()
private val didRun = CompletableDeferred<Unit>()
protected abstract suspend fun doRun()
suspend fun awaitComplete() = didRun.await()
suspend fun runIfNeeded() {
if (didRun.isCompleted) return
runMutex.withLock {
if (didRun.isCompleted) return
doRun()
didRun.complete(Unit)
}
}
}
/**
* [CoroutineContext.Element] that is added to the coroutineContext when [DataStore.updateData] is
* called to detect nested calls. b/241760537 (see: [DataStoreImpl.updateData])
*
* It is OK for different DataStore instances to nest updateData calls, they just cannot be on the
* same DataStore. To track these instances, each [UpdatingDataContextElement] holds a reference
* to a parent.
*/
internal class UpdatingDataContextElement(
private val parent: UpdatingDataContextElement?,
private val instance: DataStoreImpl<*>
) : CoroutineContext.Element {
companion object {
internal val NESTED_UPDATE_ERROR_MESSAGE = """
Calling updateData inside updateData on the same DataStore instance is not supported
since updates made in the parent updateData call will not be visible to the nested
updateData call. See https://issuetracker.google.com/issues/241760537 for details.
""".trimIndent()
internal object Key : CoroutineContext.Key<UpdatingDataContextElement>
}
/**
* Checks the given [candidate] is not currently in a [DataStore.updateData] block.
*/
fun checkNotUpdating(candidate: DataStore<*>) {
if (instance === candidate) {
error(NESTED_UPDATE_ERROR_MESSAGE)
}
// check the parent if it exists to detect nested calls between [DataStore] instances.
parent?.checkNotUpdating(candidate)
}
override val key: CoroutineContext.Key<*>
get() = Key
}