package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* /** * Non-cancellable dispatch mode. * * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling * inline `suspendAtomicCancellableCoroutine` function and did not support reuse. */ internal const val MODE_ATOMIC = 0 /** * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine]. * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension. * * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine]. */ @PublishedApi internal const val MODE_CANCELLABLE: Int = 1 /** * Cancellable dispatch mode for [suspendCancellableCoroutineReusable]. * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension; * implementation of reuse checks mode via [Int.isReusableMode] extension. */ internal const val MODE_CANCELLABLE_REUSABLE = 2 /** * Undispatched mode for [CancellableContinuation.resumeUndispatched]. * It is used when the thread is right, but it needs to be marked with the current coroutine. */ internal const val MODE_UNDISPATCHED = 4 /** * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always * overwritten when continuation is resumed with the actual resume mode. */ internal const val MODE_UNINITIALIZED = -1 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE @PublishedApi internal abstract class DispatchedTask internal constructor( // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102 @JvmField public var resumeMode: Int ) : SchedulerTask() { internal abstract val delegate: Continuation internal abstract fun takeState(): Any? /** * Called when this task was cancelled while it was being dispatched. */ internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {} /** * There are two implementations of `DispatchedTask`: * - [DispatchedContinuation] keeps only simple values as successfully results. * - [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it. */ @Suppress("UNCHECKED_CAST") internal open fun getSuccessfulResult(state: Any?): T = state as T /** * There are two implementations of `DispatchedTask`: * - [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace * properly recovered and is ready to pass to the [delegate] continuation directly. * - [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched * its stack-trace has to be recovered, so it overrides this method for that purpose. */ internal open fun getExceptionalResult(state: Any?): Throwable? = (state as? CompletedExceptionally)?.cause final override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled val exception = getExceptionalResult(state) /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null) { continuation.resumeWithException(exception) } else { continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } } /** * Machinery that handles fatal exceptions in kotlinx.coroutines. * There are two kinds of fatal exceptions: * * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either * the library or the compiler has a bug that breaks internal invariants. * They usually have specific workarounds, but require careful study of the cause and should * be reported to the maintainers and fixed on the library's side anyway. * * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext]. * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement], * we can't ignore it because it may leave coroutine in the inconsistent state. * If you encounter such exception, you can either disable this context element or wrap it into * another context element that catches all exceptions and handles it in the application specific manner. * * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of * a failed coroutine, but such exceptions should be reported anyway. */ internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) { if (exception === null && finallyException === null) return if (exception !== null && finallyException !== null) { exception.addSuppressed(finallyException) } val cause = exception ?: finallyException val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " + "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!) handleCoroutineException(this.delegate.context, reason) } } internal fun DispatchedTask.dispatch(mode: Int) { assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method val delegate = this.delegate val undispatched = mode == MODE_UNDISPATCHED if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { // dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation) // or undispatched mode was requested resume(delegate, undispatched) } } internal fun DispatchedTask.resume(delegate: Continuation, undispatched: Boolean) { // This resume is never cancellable. The result is always delivered to delegate continuation. val state = takeState() val exception = getExceptionalResult(state) val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult(state)) when { undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result) else -> delegate.resumeWith(result) } } private fun DispatchedTask<*>.resumeUnconfined() { val eventLoop = ThreadLocalEventLoop.eventLoop if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow eventLoop.dispatchUnconfined(this) } else { // Was not active -- run event loop until all unconfined tasks are executed runUnconfinedEventLoop(eventLoop) { resume(delegate, undispatched = true) } } } internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { eventLoop.incrementUseCount(unconfined = true) try { block() while (true) { // break when all unconfined continuations where executed if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { /* * This exception doesn't happen normally, only if we have a bug in implementation. * Report it as a fatal exception. */ handleFatalException(e, null) } finally { eventLoop.decrementUseCount(unconfined = true) } } @Suppress("NOTHING_TO_INLINE") internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) { resumeWith(Result.failure(recoverStackTrace(exception, this))) }