blob: 1de1bff479121d558498cef38bd1398c72b5cba7 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
/**
* Non-cancellable dispatch mode.
*
* **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
* inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
*/
internal const val MODE_ATOMIC = 0
/**
* Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
* Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
*
* **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
*/
@PublishedApi
internal const val MODE_CANCELLABLE: Int = 1
/**
* Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
* Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
* implementation of reuse checks mode via [Int.isReusableMode] extension.
*/
internal const val MODE_CANCELLABLE_REUSABLE = 2
/**
* Undispatched mode for [CancellableContinuation.resumeUndispatched].
* It is used when the thread is right, but it needs to be marked with the current coroutine.
*/
internal const val MODE_UNDISPATCHED = 4
/**
* Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
* overwritten when continuation is resumed with the actual resume mode.
*/
internal const val MODE_UNINITIALIZED = -1
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
internal abstract val delegate: Continuation<T>
internal abstract fun takeState(): Any?
/**
* Called when this task was cancelled while it was being dispatched.
*/
internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
/**
* There are two implementations of `DispatchedTask`:
* * [DispatchedContinuation] keeps only simple values as successfully results.
* * [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
*/
@Suppress("UNCHECKED_CAST")
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T
/**
* There are two implementations of `DispatchedTask`:
* * [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
* properly recovered and is ready to pass to the [delegate] continuation directly.
* * [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
* its stack-trace has to be recovered, so it overrides this method for that purpose.
*/
internal open fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
/**
* Machinery that handles fatal exceptions in kotlinx.coroutines.
* There are two kinds of fatal exceptions:
*
* 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
* the library or the compiler has a bug that breaks internal invariants.
* They usually have specific workarounds, but require careful study of the cause and should
* be reported to the maintainers and fixed on the library's side anyway.
*
* 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
* While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
* we can't ignore it because it may leave coroutine in the inconsistent state.
* If you encounter such exception, you can either disable this context element or wrap it into
* another context element that catches all exceptions and handles it in the application specific manner.
*
* Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
* a failed coroutine, but such exceptions should be reported anyway.
*/
public fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
if (exception === null && finallyException === null) return
if (exception !== null && finallyException !== null) {
exception.addSuppressedThrowable(finallyException)
}
val cause = exception ?: finallyException
val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
handleCoroutineException(this.delegate.context, reason)
}
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
val delegate = this.delegate
val undispatched = mode == MODE_UNDISPATCHED
if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
// delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
// or undispatched mode was requested
resume(delegate, undispatched)
}
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
// This resume is never cancellable. The result is always delivered to delegate continuation.
val state = takeState()
val exception = getExceptionalResult(state)
val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
when {
undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
else -> delegate.resumeWith(result)
}
}
private fun DispatchedTask<*>.resumeUnconfined() {
val eventLoop = ThreadLocalEventLoop.eventLoop
if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
eventLoop.dispatchUnconfined(this)
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop) {
resume(delegate, undispatched = true)
}
}
}
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (true) {
// break when all unconfined continuations where executed
if (!eventLoop.processUnconfinedEvent()) break
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if we have a bug in implementation.
* Report it as a fatal exception.
*/
handleFatalException(e, null)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}