blob: e266a6a092a3e0c434f24303c01b90f5160aa218 [file] [log] [blame]
/*
* Copyright 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("NOTHING_TO_INLINE")
package androidx.camera.camera2.pipe.impl
import androidx.annotation.GuardedBy
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.atomicfu.atomic
import java.io.Closeable
import java.util.ArrayDeque
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.math.min
import kotlin.io.use
/**
* Provides fair access to a resources by acquiring and releasing variable sized [Token] objects.
*
* A [TokenLock] has a fixed maximum size that it will issue [Token] objects for. Additional
* requests beyond the maximum capacity of the lock will wait until enough of the outstanding
* tokens have been closed to fulfill the next request in the queue.
*
* This object behaves like a lock or mutex, which means that it's possible to deadlock if a
* function or sequence attempts to acquire or hold multiple tokens. For this reason, it's
* recommended to request tokens using a range of values in a single call if multiple tokens are
* needed instead of incrementally acquiring tokens.
*
* Access the methods and properties of the [TokenLock] are ThreadSafe, and closing this object
* multiple times has no effect.
*/
interface TokenLock : AutoCloseable, Closeable {
val capacity: Long
val available: Long
val size: Long
/**
* Acquire a token or suspend until a token is available. Canceling the request will remove the
* request from the queue, and closing the [TokenLock] will cause this method to throw a
* CancellationException.
*
* @throws CancellationException if the TokenLock is closed while this function is suspended.
* @throws IllegalArgumentException if min > capacity
*/
suspend fun acquire(min: Long, max: Long): Token
/**
* Immediately acquire a token or return null if a token cannot be immediately acquired.
*
* @throws IllegalArgumentException if min > capacity
*/
fun acquireOrNull(min: Long, max: Long): Token?
/**
* Tokens are Thread-safe objects that hold onto the acquired value. Closing this object returns
* its value to the parent TokenLock.
*
* Closing this object multiple times has no effect.
*/
interface Token : AutoCloseable, Closeable {
val value: Long
/**
* Close this token and return true if this call successfully released the value to the
* parent [TokenLock]. This method call is atomic, and can be used to guard shutdown calls
* that must only be run once.
*/
fun release(): Boolean
}
}
/** Shorthand for "acquire(value, value)" */
suspend inline fun TokenLock.acquire(value: Long): TokenLock.Token = this.acquire(value, value)
/** Shorthand for "acquireOrNull(value, value)" */
inline fun TokenLock.acquireOrNull(value: Long): TokenLock.Token? = this.acquireOrNull(
value,
value
)
/**
* Executes the given action while holding a token.
*/
suspend inline fun <T> TokenLock.withToken(
value: Long,
crossinline action: (token: TokenLock.Token) -> T
): T {
this.acquire(value).use {
return action(it)
}
}
/**
* Executes the given action while holding a token.
*/
suspend inline fun <T> TokenLock.withToken(
min: Long,
max: Long,
crossinline action: (token: TokenLock.Token) -> T
): T {
this.acquire(min, max).use {
return action(it)
}
}
class TokenLockImpl(override val capacity: Long) : TokenLock {
companion object {
val closedException = CancellationException()
}
private val pending = ArrayDeque<TokenRequest>()
@GuardedBy("pending")
private var closed = false
@GuardedBy("pending")
private var _available: Long = capacity
override val available: Long
get() = synchronized(pending) {
return if (closed || pending.isNotEmpty()) {
0
} else {
_available
}
}
override val size: Long
get() = synchronized(pending) {
return if (closed || pending.isNotEmpty()) {
capacity
} else {
capacity - _available
}
}
override fun acquireOrNull(min: Long, max: Long): TokenLock.Token? {
if (min > capacity)
throw IllegalArgumentException("Attempted to acquire $min / $capacity")
synchronized(pending) {
if (closed) return null
if (pending.isEmpty()) {
val value = min(_available, max)
if (value >= min) {
_available -= value
return TokenImpl(value)
}
}
}
return null
}
override suspend fun acquire(min: Long, max: Long): TokenLock.Token =
suspendCancellableCoroutine { continuation ->
if (min > capacity) {
continuation.resumeWithException(
IllegalArgumentException("Attempted to acquire $min / $capacity")
)
return@suspendCancellableCoroutine
}
synchronized(pending) {
if (closed) throw closedException
if (pending.isEmpty()) {
val value = min(_available, max)
if (value >= min) {
_available -= value
continuation.resume(TokenImpl(value))
return@suspendCancellableCoroutine
}
}
pending.add(TokenRequest(continuation, min, max))
}
// WARNING: This may invoke the release method **synchronously** if the continuation
// was canceled while this method was executing.
continuation.invokeOnCancellation { release(0) }
}
override fun close() {
synchronized(pending) {
if (closed) {
return
}
closed = true
}
// Make sure all suspended functions that are waiting for a token are canceled, then clear
// the list. This access is safe because all other interactions with the pending list occur
// within a synchronized block that's guarded by a closed check.
pending.forEach {
it.continuation.cancel()
}
pending.clear()
}
/**
* WARNING: This is an internal function to avoid creating synthetic accessors but it should
* ONLY be called by TokenImpl.close()
*/
internal fun release(qty: Long) {
var requestsToComplete: List<TokenRequest>? = null
synchronized(pending) {
if (closed) return
_available += qty
// Slower path: If we have pending requests, then we need figure out which ones we
// should complete, in order, and to update the internal state.
//
// The CompletableDeferred is _ONLY_ completed outside of the synchronized block to
// avoid reentrant behavior.
if (!pending.isEmpty()) {
val requests = mutableListOf<TokenRequest>()
// Loop through the pending queue. If we can fulfil the pending request without
// going over capacity, update the capacity and add the request to a list of
// requests that must be completed.
while (!pending.isEmpty()) {
// This will always be safe since we never insert non-null values and because
// the loop checks that the pending queue is not empty.
val next = pending.peek()!!
if (next.continuation.isCancelled || next.continuation.isCompleted) {
pending.remove()
} else {
val value = min(_available, next.max)
if (value >= next.min) {
_available -= value
next.token = TokenImpl(value)
requests.add(pending.remove())
} else {
break
}
}
}
// If we fulfilled 1 or more requests, then create and pass tokens to the
// continuation outside of the syncronized block.
if (requests.isNotEmpty()) {
requestsToComplete = requests
}
}
}
requestsToComplete?.forEach {
it.continuation.resume(it.token!!)
}
}
private class TokenRequest(
val continuation: CancellableContinuation<TokenLock.Token>,
val min: Long,
val max: Long,
var token: TokenImpl? = null
)
inner class TokenImpl(override val value: Long) : TokenLock.Token {
private val closed = atomic(false)
override fun close() {
release()
}
override fun release(): Boolean {
if (closed.compareAndSet(expect = false, update = true)) {
release(value)
return true
}
return false
}
}
}