blob: 0820f1f101ee38548e816beea8d47dccce0f8218 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.guava
import com.google.common.util.concurrent.*
import com.google.common.util.concurrent.internal.*
import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.coroutines.*
/**
* Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
*
* The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
* [IllegalArgumentException], because Futures don't have a way to start lazily.
*
* When the created coroutine [isCompleted][Job.isCompleted], it will try to
* *synchronously* complete the returned Future with the same outcome. This will
* succeed, barring a race with external cancellation of returned [ListenableFuture].
*
* Cancellation is propagated bidirectionally.
*
* `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
* added/overlaid by passing [context].
*
* If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
* member, [Dispatchers.Default] is used.
*
* The parent job is inherited from this [CoroutineScope], and can be overridden by passing
* a [Job] in [context].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
* facilities.
*
* Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
* In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
* the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
* error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
* no cause. This is to match the specification and behavior of
* `java.util.concurrent.FutureTask`.
*
* @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param block the code to execute.
*/
public fun <T> CoroutineScope.future(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): ListenableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
val coroutine = ListenableFutureCoroutine<T>(newContext)
coroutine.start(start, coroutine, block)
return coroutine.future
}
/**
* Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
*
* Completion is non-atomic between the two promises.
*
* Cancellation is propagated bidirectionally.
*
* When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
* complete the returned `Deferred` with the same value or exception. This will succeed, barring a
* race with cancellation of the `Deferred`.
*
* When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
* it will cancel the returned `Deferred`.
*
* When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
* cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
* `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
* will complete with a different outcome than `this` `ListenableFuture`.
*/
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
/* This method creates very specific behaviour as it entangles the `Deferred` and
* `ListenableFuture`. This behaviour is the best discovered compromise between the possible
* states and interface contracts of a `Future` and the states of a `Deferred`. The specific
* behaviour is described here.
*
* When `this` `ListenableFuture` is successfully cancelled - meaning
* `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
* `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
* `Deferred` will always be put into its "cancelling" state and (barring uncooperative
* cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
* cancelled.
*
* When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
* called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
* cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
* terminal state.
*
* The returned `Deferred` may receive and suppress the `true` return value from
* `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
* This is unavoidable, so make sure no idempotent cancellation work is performed by a
* reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
* cancellation was from the `Deferred` representation of the task.
*
* This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
* semantics. See `Job` for a description of coroutine cancellation semantics.
*/
// First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
// Exception by using the same instance the Future created.
if (this is InternalFutureFailureAccess) {
val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
if (t != null) {
return CompletableDeferred<T>().also {
it.completeExceptionally(t)
}
}
}
// Second, try the fast path for a completed Future. The Future is known to be done, so get()
// will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
// getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
// handle interruption.
if (isDone) {
return try {
CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
} catch (e: CancellationException) {
CompletableDeferred<T>().also { it.cancel(e) }
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future. Anything else showing up here indicates a very fundamental bug in a
// Future implementation.
CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
}
}
// Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
val deferred = CompletableDeferred<T>()
Futures.addCallback(this, object : FutureCallback<T> {
override fun onSuccess(result: T) {
runCatching { deferred.complete(result) }
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}
override fun onFailure(t: Throwable) {
runCatching { deferred.completeExceptionally(t) }
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}
}, MoreExecutors.directExecutor())
// ... And cancel the Future when the deferred completes. Since the return type of this method
// is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
// completion handler runs before the Future is completed, the Deferred must have been
// cancelled and should propagate its cancellation. If it runs after the Future is completed,
// this is a no-op.
deferred.invokeOnCompletion {
cancel(false)
}
// Return hides the CompletableDeferred. This should prevent casting.
return object : Deferred<T> by deferred {}
}
/**
* Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
*
* [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
* fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
* [Exception].
*
* If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
* state - a serious fundamental bug.
*/
private fun ExecutionException.nonNullCause(): Throwable {
return this.cause!!
}
/**
* Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
*
* Completion is non-atomic between the two promises.
*
* When either promise successfully completes, it will attempt to synchronously complete its
* counterpart with the same value. This will succeed barring a race with cancellation.
*
* When either promise completes with an Exception, it will attempt to synchronously complete its
* counterpart with the same Exception. This will succeed barring a race with cancellation.
*
* Cancellation is propagated bidirectionally.
*
* When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
* [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
* the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
* non-cancelled terminal state.
*
* When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
* completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
* cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
* the returned `Future` will always _eventually_ reach its cancelled state when either promise is
* successfully cancelled, for their different meanings of "successfully cancelled".
*
* This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
* semantics. See [Job] for a description of coroutine cancellation semantics. See
* [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
* corner cases of this method.
*/
public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
val listenableFuture = JobListenableFuture<T>(this)
// This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
// The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
invokeOnCompletion { throwable ->
if (throwable == null) {
listenableFuture.complete(getCompleted())
} else {
listenableFuture.completeExceptionallyOrCancel(throwable)
}
}
return listenableFuture
}
/**
* Awaits completion of `this` [ListenableFuture] without blocking a thread.
*
* This suspend function is cancellable.
*
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
*
* This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
* If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
* [kotlinx.coroutines.NonCancellable].
*/
public suspend fun <T> ListenableFuture<T>.await(): T {
try {
if (isDone) return Uninterruptibles.getUninterruptibly(this)
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future, other than CancellationException. Cancellation is propagated upward so that
// the coroutine running this suspend function may process it.
// Any other Exception showing up here indicates a very fundamental bug in a
// Future implementation.
throw e.nonNullCause()
}
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
addListener(
ToContinuation(this, cont),
MoreExecutors.directExecutor())
cont.invokeOnCancellation {
cancel(false)
}
}
}
/**
* Propagates the outcome of [futureToObserve] to [continuation] on completion.
*
* Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
* and fails, the cause of the Future will be propagated without a wrapping
* [ExecutionException] when thrown.
*/
private class ToContinuation<T>(
val futureToObserve: ListenableFuture<T>,
val continuation: CancellableContinuation<T>
): Runnable {
override fun run() {
if (futureToObserve.isCancelled) {
continuation.cancel()
} else {
try {
continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future. Anything else showing up here indicates a very fundamental bug in a
// Future implementation.
continuation.resumeWithException(e.nonNullCause())
}
}
}
}
/**
* An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
* completion.
*
* If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
* By documented contract, a [Future] has been cancelled if
* and only if its `isCancelled()` method returns true.
*
* Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
* The contract of [Future] does not permit it to return an error after it is successfully cancelled.
* On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
* otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
* In contrast to [Future] which can't change its outcome after a successful cancellation,
* cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
* which _can_ show the error.
*
* This may be counterintuitive, but it maintains the error and cancellation contracts of both
* the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
* to the same running task.
*/
private class ListenableFutureCoroutine<T>(
context: CoroutineContext
) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
// JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
@JvmField
val future = JobListenableFuture<T>(this)
override fun onCompleted(value: T) {
future.complete(value)
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
// Note: if future was cancelled in a race with a cancellation of this
// coroutine, and the future was successfully cancelled first, the cause of coroutine
// cancellation is dropped in this promise. A Future can only be completed once.
//
// This is consistent with FutureTask behaviour. A race between a Future.cancel() and
// a FutureTask.setException() for the same Future will similarly drop the
// cause of a failure-after-cancellation.
future.completeExceptionallyOrCancel(cause)
}
}
/**
* A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
*
* This setup allows the returned [ListenableFuture] to maintain the following properties:
*
* - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
* and [isCancelled] methods
* - Cancellation propagation both to and from [Deferred]
* - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
* with different concrete implementations of [ListenableFuture]
* - Fully correct cancellation and listener happens-after obeying [Future] and
* [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
* The best way to be correct, especially given the fun corner cases from
* [AbstractFuture.setFuture], is to just use an [AbstractFuture].
* - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
* around coroutine's result as a state engine to establish happens-after-completion. This
* could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
* cost of the implementation's readability.
*/
private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
/**
* Serves as a state machine for [Future] cancellation.
*
* [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
* cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
* `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
*
* To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
*/
private val auxFuture = SettableFuture.create<Any?>()
/**
* `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
*
* Note: this is eventually consistent with the state of [auxFuture].
*
* Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
* apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
* we use this field as an optimization.
*/
private var auxFutureIsFailed: Boolean = false
/**
* When the attached coroutine [isCompleted][Job.isCompleted] successfully
* its outcome should be passed to this method.
*
* This should succeed barring a race with external cancellation.
*/
fun complete(result: T): Boolean = auxFuture.set(result)
/**
* When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
* its outcome should be passed to this method.
*
* This method will map coroutine's exception into corresponding Future's exception.
*
* This should succeed barring a race with external cancellation.
*/
// CancellationException is wrapped into `Cancelled` to preserve original cause and message.
// All the other exceptions are delegated to SettableFuture.setException.
fun completeExceptionallyOrCancel(t: Throwable): Boolean =
if (t is CancellationException) auxFuture.set(Cancelled(t))
else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
/**
* Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
* [Job.isCancelled].
*
* When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
* contains [CancellationException].
*
* See [cancel].
*/
override fun isCancelled(): Boolean {
// This expression ensures that isCancelled() will *never* return true when isDone() returns false.
// In the case that the deferred has completed with cancellation, completing `this`, its
// reaching the "cancelled" state with a cause of CancellationException is treated as the
// same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
// this Future hasn't itself been successfully cancelled, the Future will return
// isCancelled() == false. This is the only discovered way to reconcile the two different
// cancellation contracts.
return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
} catch (e: CancellationException) {
// `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
true
} catch (e: ExecutionException) {
// `auxFutureIsFailed` hasn't been updated yet.
auxFutureIsFailed = true
false
}
}
/**
* Waits for [auxFuture] to complete by blocking, then uses its `result`
* to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
* This establishes happens-after ordering for completion of the entangled coroutine.
*
* [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
* Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
*
* [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
* contract of [Future] to be correctly followed.
*/
override fun get(): T {
return getInternal(auxFuture.get())
}
/** See [get()]. */
override fun get(timeout: Long, unit: TimeUnit): T {
return getInternal(auxFuture.get(timeout, unit))
}
/** See [get()]. */
private fun getInternal(result: Any?): T = if (result is Cancelled) {
throw CancellationException().initCause(result.exception)
} else {
// We know that `auxFuture` can contain either `T` or `Cancelled`.
@Suppress("UNCHECKED_CAST")
result as T
}
override fun addListener(listener: Runnable, executor: Executor) {
auxFuture.addListener(listener, executor)
}
override fun isDone(): Boolean {
return auxFuture.isDone
}
/**
* Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
*
* The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
* succeeds, [jobToCancel] will have its [Job.cancel] called.
*
* This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
* in a particular way. [jobToCancel] may also be in its "cancelling" state while this
* ListenableFuture is complete and cancelled.
*/
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
// TODO: call jobToCancel.cancel() _before_ running the listeners.
// `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
// `jobToCancel` until after auxFuture's listeners have already run.
// Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
return if (auxFuture.cancel(mayInterruptIfRunning)) {
jobToCancel.cancel()
true
} else {
false
}
}
override fun toString(): String = buildString {
append(super.toString())
append("[status=")
if (isDone) {
try {
when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
else -> append("SUCCESS, result=[$result]")
}
} catch (e: CancellationException) {
// `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
append("CANCELLED")
} catch (e: ExecutionException) {
append("FAILURE, cause=[${e.cause}]")
} catch (t: Throwable) {
// Violation of Future's contract, should never happen.
append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
}
} else {
append("PENDING, delegate=[$auxFuture]")
}
append(']')
}
}
/**
* A wrapper for `Coroutine`'s [CancellationException].
*
* If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
* [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
* class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
*/
private class Cancelled(@JvmField val exception: CancellationException)