@file:JvmMultifileClass @file:JvmName("BuildersKt") @file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines import java.util.concurrent.locks.* import kotlin.contracts.* import kotlin.coroutines.* /** * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion. * * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in * `main` functions and in tests. * * Calling [runBlocking] from a suspend function is redundant. * For example, the following code is incorrect: * ``` * suspend fun loadConfiguration() { * // DO NOT DO THIS: * val data = runBlocking { // <- redundant and blocks the thread, do not do that * fetchConfigurationData() // suspending function * } * ``` * * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will * block, potentially leading to thread starvation issues. * * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations * in this blocked thread until the completion of this coroutine. * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. * * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, * then this invocation uses the outer event loop. * * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and * this `runBlocking` invocation throws [InterruptedException]. * * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available * for a newly created coroutine. * * @param context the context of the coroutine. The default value is an event loop on the current thread. * @param block the coroutine code. */ @Throws(InterruptedException::class) public actual fun runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } val currentThread = Thread.currentThread() val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } private class BlockingCoroutine( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true, true) { override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { // wake up blocked thread if (Thread.currentThread() != blockedThread) unpark(blockedThread) } @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true) { @Suppress("DEPRECATION") if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break parkNanos(this, parkNanos) } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } // now return result val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T } }