package kotlinx.coroutines.scheduling import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* import java.util.concurrent.locks.* import kotlin.jvm.internal.Ref.ObjectRef import kotlin.math.* /** * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines * over worker threads, including both CPU-intensive and blocking tasks, in the most efficient manner. * * Current scheduler implementation has two optimization targets: * - Efficiency in the face of communication patterns (e.g. actors communicating via channel) * - Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool. * * ### Structural overview * * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to * [maxPoolSize] lazily created threads to execute blocking tasks. * Every worker has a local queue in addition to a global scheduler queue * and the global queue has priority over local queue to avoid starvation of externally-submitted * (e.g. from Android UI thread) tasks. * Work-stealing is implemented on top of that queues to provide * even load distribution and illusion of centralized run queue. * * ### Scheduling policy * * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue. * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy, * it effectively couples communicating coroutines into one and eliminates scheduling latency * that arises from placing tasks to the end of the queue. * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack. * When a coroutine is dispatched from an external thread, it's put into the global queue. * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov. * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration * source for the coroutine scheduler. * * ### Work stealing and affinity * * To provide even tasks distribution worker tries to steal tasks from other workers queues * before parking when his local queue is empty. * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS]. * For this purpose, monotonic global clock is used, and every task has associated with its submission time. * This approach shows outstanding results when coroutines are cooperative, * but as downside scheduler now depends on a high-resolution global clock, * which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis. * * ### Thread management * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees * similar to the regular centralized executors. * The state of the threads consists of [controlState] and [parkedWorkersStack] fields. * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks * that require a thread compensation, * while the latter represents intrusive versioned Treiber stack of idle workers. * When a worker cannot find any work, they first add themselves to the stack, * then re-scans the queue to avoid missing signals and then attempts to park * with additional rendezvous against unnecessary parking. * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state * (to be uncounted when additional work is signalled) and parks for such duration. * * When a new task arrives in the scheduler (whether it is local or global queue), * either an idle worker is being signalled, or a new worker is attempted to be created. * (Only [corePoolSize] workers can be created for regular CPU tasks) * * ### Support for blocking tasks * The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks. * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created) * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks. * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size) * and at most core pool size threads to execute CPU tasks". * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue] * and steal **only** blocking tasks from other workers. * * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads. * End users do not have access to the scheduler directly and can dispatch blocking tasks only with * [LimitingDispatcher] that does control concurrency level by its own mechanism. */ @Suppress("NOTHING_TO_INLINE") internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { init { require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) { "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE" } require(maxPoolSize >= corePoolSize) { "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize" } require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) { "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE" } require(idleWorkerKeepAliveNs > 0) { "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive" } } @JvmField val globalCpuQueue = GlobalQueue() @JvmField val globalBlockingQueue = GlobalQueue() private fun addToGlobalQueue(task: Task): Boolean { return if (task.isBlocking) { globalBlockingQueue.addLast(task) } else { globalCpuQueue.addLast(task) } } /** * The stack of parker workers. * Every worker registers itself in a stack before parking (if it was not previously registered), * so it can be signalled when new tasks arrive. * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node. * * The stack is better than a queue (even with the contention on top) because it unparks threads * in most-recently used order, improving both performance and locality. * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required, * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS]. * * This long version consist of version bits with [PARKED_VERSION_MASK] * and top worker thread index bits with [PARKED_INDEX_MASK]. */ private val parkedWorkersStack = atomic(0L) /** * Updates index of the worker at the top of [parkedWorkersStack]. * It always updates version to ensure interference with [parkedWorkersStackPop] operation * that might have already decided to put this index to the top. * * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]). */ fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) { parkedWorkersStack.loop { top -> val index = (top and PARKED_INDEX_MASK).toInt() val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK val updIndex = if (index == oldIndex) { if (newIndex == 0) { parkedWorkersStackNextIndex(worker) } else { newIndex } } else { index // no change to index, but update version } if (updIndex < 0) return@loop // retry if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return } } /** * Pushes worker into [parkedWorkersStack]. * It does nothing is this worker is already physically linked to the stack. * This method is invoked only from the worker thread itself. * This invocation always precedes [LockSupport.parkNanos]. * See [Worker.tryPark]. * * Returns `true` if worker was added to the stack by this invocation, `false` if it was already * registered in the stack. */ fun parkedWorkersStackPush(worker: Worker): Boolean { if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out /* * The below loop can be entered only if this worker was not in the stack and, since no other thread * can add it to the stack (only the worker itself), this invariant holds while this loop executes. */ parkedWorkersStack.loop { top -> val index = (top and PARKED_INDEX_MASK).toInt() val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK val updIndex = worker.indexInArray assert { updIndex != 0 } // only this worker can push itself, cannot be terminated worker.nextParkedWorker = workers[index] /* * Other thread can be changing this worker's index at this point, but it * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail. * Successful CAS of the stack top completes successful push. */ if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true } } /** * Pops worker from [parkedWorkersStack]. * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker. * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker. * See [tryUnpark]. */ private fun parkedWorkersStackPop(): Worker? { parkedWorkersStack.loop { top -> val index = (top and PARKED_INDEX_MASK).toInt() val worker = workers[index] ?: return null // stack is empty val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK val updIndex = parkedWorkersStackNextIndex(worker) if (updIndex < 0) return@loop // retry /* * Other thread can be changing this worker's index at this point, but it * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail. * Successful CAS of the stack top completes successful pop. */ if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) { /* * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without * adding itself again. It does not matter, since we are going it invoke unpark on the thread * that was popped out of parkedWorkersStack anyway. */ worker.nextParkedWorker = NOT_IN_STACK return worker } } } /** * Finds next usable index for [parkedWorkersStack]. The problem is that workers can * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be * put at the top of the stack. In which case we are looking for next. * * Returns `index >= 0` or `-1` for retry. */ private fun parkedWorkersStackNextIndex(worker: Worker): Int { var next = worker.nextParkedWorker findNext@ while (true) { when { next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry next === null -> return 0 // stack becomes empty else -> { val nextWorker = next as Worker val updIndex = nextWorker.indexInArray if (updIndex != 0) return updIndex // found good index for next worker // Otherwise, this worker was terminated and we cannot put it to top anymore, check next next = nextWorker.nextParkedWorker } } } } /** * State of worker threads. * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers. * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists). * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks. * * Workers array is also used as a lock for workers' creation and termination sequence. * * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination * works properly. * * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded * applications. */ @JvmField val workers = ResizableAtomicArray((corePoolSize + 1) * 2) /** * The `Long` value describing the state of workers in this pool. * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits. * * State layout (highest to lowest bits): * | --- number of cpu permits, 22 bits --- | --- number of blocking tasks, 21 bits --- | --- number of created threads, 21 bits --- | */ private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT) private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt() private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value) private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt() private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt() inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt() // Guarded by synchronization private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet()) private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement()) private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT) private inline fun decrementBlockingTasks() { controlState.addAndGet(-(1L shl BLOCKING_SHIFT)) } private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state -> val available = availableCpuPermits(state) if (available == 0) return false val update = state - (1L shl CPU_PERMITS_SHIFT) if (controlState.compareAndSet(state, update)) return true } private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT) // This is used a "stop signal" for close and shutdown functions private val _isTerminated = atomic(false) val isTerminated: Boolean get() = _isTerminated.value companion object { // A symbol to mark workers that are not in parkedWorkersStack @JvmField val NOT_IN_STACK = Symbol("NOT_IN_STACK") // Worker ctl states private const val PARKED = -1 private const val CLAIMED = 0 private const val TERMINATED = 1 // Masks of control state private const val BLOCKING_SHIFT = 21 // 2M threads max private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1 private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2 private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2 // Masks of parkedWorkersStack private const val PARKED_INDEX_MASK = CREATED_MASK private const val PARKED_VERSION_MASK = CREATED_MASK.inv() private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT } override fun execute(command: Runnable) = dispatch(command) override fun close() = shutdown(10_000L) // Shuts down current scheduler and waits until all work is done and all threads are stopped. fun shutdown(timeout: Long) { // atomically set termination flag which is checked when workers are added or removed if (!_isTerminated.compareAndSet(false, true)) return // make sure we are not waiting for the current thread val currentWorker = currentWorker() // Capture # of created workers that cannot change anymore (mind the synchronized block!) val created = synchronized(workers) { createdWorkers } // Shutdown all workers with the only exception of the current thread for (i in 1..created) { val worker = workers[i]!! if (worker !== currentWorker) { // Note: this is java.lang.Thread.getState() of type java.lang.Thread.State while (worker.getState() != Thread.State.TERMINATED) { LockSupport.unpark(worker) worker.join(timeout) } // Note: this is CoroutineScheduler.Worker.state of type CoroutineScheduler.WorkerState assert { worker.state === WorkerState.TERMINATED } // Expected TERMINATED state worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use } } // Make sure no more work is added to GlobalQueue from anywhere globalBlockingQueue.close() globalCpuQueue.close() // Finish processing tasks from globalQueue and/or from this worker's local queue while (true) { val task = currentWorker?.findTask(true) ?: globalCpuQueue.removeFirstOrNull() ?: globalBlockingQueue.removeFirstOrNull() ?: break runSafely(task) } // Shutdown current thread currentWorker?.tryReleaseCpu(WorkerState.TERMINATED) // check & cleanup state assert { availableCpuPermits == corePoolSize } parkedWorkersStack.value = 0L controlState.value = 0L } /** * Dispatches execution of a runnable [block] with a hint to a scheduler whether * this [block] may execute blocking operations (IO, system calls, locking primitives etc.) * * [taskContext] -- concurrency context of given [block]. * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task. * If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested, * but only if the current thread is a corresponding worker thread. * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons: * - [CoroutineStart.UNDISPATCHED] * - Concurrent [close] that effectively shutdowns the worker thread */ fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) val isBlockingTask = task.isBlocking // Invariant: we increment counter **before** publishing the task // so executing thread can safely decrement the number of blocking tasks val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0 // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (isBlockingTask) { // Use state snapshot to better estimate the number of running threads signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark) } else { if (skipUnpark) return signalCpuWork() } } fun createTask(block: Runnable, taskContext: TaskContext): Task { val nanoTime = schedulerTimeSource.nanoTime() if (block is Task) { block.submissionTime = nanoTime block.taskContext = taskContext return block } return TaskImpl(block, nanoTime, taskContext) } // NB: should only be called from 'dispatch' method due to blocking tasks increment private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) { if (skipUnpark) return if (tryUnpark()) return // Use state snapshot to avoid accidental thread overprovision if (tryCreateWorker(stateSnapshot)) return tryUnpark() // Try unpark again in case there was race between permit release and parking } fun signalCpuWork() { if (tryUnpark()) return if (tryCreateWorker()) return tryUnpark() } private fun tryCreateWorker(state: Long = controlState.value): Boolean { val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0) /* * We check how many threads are there to handle non-blocking work, * and create one more if we have not enough of them. */ if (cpuWorkers < corePoolSize) { val newCpuWorkers = createNewWorker() // If we've created the first cpu worker and corePoolSize > 1 then create // one more (second) cpu worker, so that stealing between them is operational if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker() if (newCpuWorkers > 0) return true } return false } private fun tryUnpark(): Boolean { while (true) { val worker = parkedWorkersStackPop() ?: return false if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) { LockSupport.unpark(worker) return true } } } /** * Returns the number of CPU workers after this function (including new worker) or * 0 if no worker was created. */ private fun createNewWorker(): Int { val worker: Worker return synchronized(workers) { // Make sure we're not trying to resurrect terminated scheduler if (isTerminated) return -1 val state = controlState.value val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0) // Double check for overprovision if (cpuWorkers >= corePoolSize) return 0 if (created >= maxPoolSize) return 0 // start & register new worker, commit index only after successful creation val newIndex = createdWorkers + 1 require(newIndex > 0 && workers[newIndex] == null) /* * 1) Claim the slot (under a lock) by the newly created worker * 2) Make it observable by increment created workers count * 3) Only then start the worker, otherwise it may miss its own creation */ worker = Worker(newIndex) workers.setSynchronized(newIndex, worker) require(newIndex == incrementCreatedWorkers()) cpuWorkers + 1 }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652 } /** * Returns `null` if task was successfully added or an instance of the * task that was not added or replaced (thus should be added to global queue). */ private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { if (this == null) return task /* * This worker could have been already terminated from this thread by close/shutdown and it should not * accept any more tasks into its local queue. */ if (state === WorkerState.TERMINATED) return task // Do not add CPU tasks in local queue if we are not able to execute it if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) { return task } mayHaveLocalTasks = true return localQueue.add(task, fair = tailDispatch) } private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this } /** * Returns a string identifying the state of this scheduler for nicer debugging. * Note that this method is not atomic and represents rough state of pool. * * State of the queues: * b for blocking, c for CPU, r for retiring. * E.g. for [1b, 1b, 2c, 1d] means that pool has * two blocking workers with queue size 1, one worker with CPU permit and queue size 1 * and one dormant (executing his local queue before parking) worker with queue size 1. */ override fun toString(): String { var parkedWorkers = 0 var blockingWorkers = 0 var cpuWorkers = 0 var dormant = 0 var terminated = 0 val queueSizes = arrayListOf() for (index in 1 until workers.currentLength()) { val worker = workers[index] ?: continue val queueSize = worker.localQueue.size when (worker.state) { WorkerState.PARKING -> ++parkedWorkers WorkerState.BLOCKING -> { ++blockingWorkers queueSizes += queueSize.toString() + "b" // Blocking } WorkerState.CPU_ACQUIRED -> { ++cpuWorkers queueSizes += queueSize.toString() + "c" // CPU } WorkerState.DORMANT -> { ++dormant if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring } WorkerState.TERMINATED -> ++terminated } } val state = controlState.value return "$schedulerName@$hexAddress[" + "Pool Size {" + "core = $corePoolSize, " + "max = $maxPoolSize}, " + "Worker States {" + "CPU = $cpuWorkers, " + "blocking = $blockingWorkers, " + "parked = $parkedWorkers, " + "dormant = $dormant, " + "terminated = $terminated}, " + "running workers queues = $queueSizes, " + "global CPU queue size = ${globalCpuQueue.size}, " + "global blocking queue size = ${globalBlockingQueue.size}, " + "Control State {" + "created workers= ${createdWorkers(state)}, " + "blocking tasks = ${blockingTasks(state)}, " + "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" + "}]" } fun runSafely(task: Task) { try { task.run() } catch (e: Throwable) { val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { unTrackTask() } } internal inner class Worker private constructor() : Thread() { init { isDaemon = true /* * `Dispatchers.Default` is used as *the* dispatcher in the containerized environments, * isolated by their own classloaders. Workers are populated lazily, thus we are inheriting * `Dispatchers.Default` context class loader here instead of using parent' thread one * in order not to accidentally capture temporary application classloader. */ contextClassLoader = this@CoroutineScheduler.javaClass.classLoader } // guarded by scheduler lock, index in workers array, 0 when not in array (terminated) @Volatile // volatile for push/pop operation into parkedWorkersStack var indexInArray = 0 set(index) { name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}" field = index } constructor(index: Int) : this() { indexInArray = index } inline val scheduler get() = this@CoroutineScheduler @JvmField val localQueue: WorkQueue = WorkQueue() /** * Slot that is used to steal tasks into to avoid re-adding them * to the local queue. See [trySteal] */ private val stolenTask: ObjectRef = ObjectRef() /** * Worker state. **Updated only by this worker thread**. * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken. * Is used locally by the worker to maintain its own invariants. */ @JvmField var state = WorkerState.DORMANT /** * Worker control state responsible for worker claiming, parking and termination. * List of states: * [PARKED] -- worker is parked and can self-terminate after a termination deadline. * [CLAIMED] -- worker is claimed by an external submitter. * [TERMINATED] -- worker is terminated and no longer usable. */ val workerCtl = atomic(CLAIMED) /** * It is set to the termination deadline when started doing [park] and it reset * when there is a task. It serves as protection against spurious wakeups of parkNanos. */ private var terminationDeadline = 0L /** * Reference to the next worker in the [parkedWorkersStack]. * It may be `null` if there is no next parked worker. * This reference is set to [NOT_IN_STACK] when worker is physically not in stack. */ @Volatile var nextParkedWorker: Any? = NOT_IN_STACK /* * The delay until at least one task in other worker queues will become stealable. */ private var minDelayUntilStealableTaskNs = 0L /** * The state of embedded Marsaglia xorshift random number generator, used for work-stealing purposes. * It is initialized with a seed. */ private var rngState: Int = run { // This could've been Random.nextInt(), but we are shaving an extra initialization cost, see #4051 val seed = System.nanoTime().toInt() // rngState shouldn't be zero, as required for the xorshift algorithm if (seed != 0) return@run seed 42 } /** * Tries to acquire CPU token if worker doesn't have one * @return whether worker acquired (or already had) CPU token */ private fun tryAcquireCpuPermit(): Boolean = when { state == WorkerState.CPU_ACQUIRED -> true this@CoroutineScheduler.tryAcquireCpuPermit() -> { state = WorkerState.CPU_ACQUIRED true } else -> false } /** * Releases CPU token if worker has any and changes state to [newState]. * Returns `true` if CPU permit was returned to the pool */ fun tryReleaseCpu(newState: WorkerState): Boolean { val previousState = state val hadCpu = previousState == WorkerState.CPU_ACQUIRED if (hadCpu) releaseCpuPermit() if (previousState != newState) state = newState return hadCpu } override fun run() = runWorker() @JvmField var mayHaveLocalTasks = false private fun runWorker() { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { val task = findTask(mayHaveLocalTasks) // Task found. Execute and repeat if (task != null) { rescanned = false minDelayUntilStealableTaskNs = 0L executeTask(task) continue } else { mayHaveLocalTasks = false } /* * No tasks were found: * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline. * Then its deadline is stored in [minDelayUntilStealableTask] * // '2)' can be found below * * Then just park for that duration (ditto re-scanning). * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations, * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve * it with "spinning via scans" mechanism. * NB: this short potential parking does not interfere with `tryUnpark` */ if (minDelayUntilStealableTaskNs != 0L) { if (!rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu(WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } /* * 2) Or no tasks available, time to park and, potentially, shut down the thread. * Add itself to the stack of parked workers, re-scans all the queues * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks. */ tryPark() } tryReleaseCpu(WorkerState.TERMINATED) } /** * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details. * This is a fine-tailored method for a specific use-case not expected to be used widely. */ fun runSingleTask(): Long { val stateSnapshot = state val isCpuThread = state == WorkerState.CPU_ACQUIRED val task = if (isCpuThread) { findCpuTask() } else { findBlockingTask() } if (task == null) { if (minDelayUntilStealableTaskNs == 0L) return -1L return minDelayUntilStealableTaskNs } runSafely(task) if (!isCpuThread) decrementBlockingTasks() assert { state == stateSnapshot } return 0L } fun isIo() = state == WorkerState.BLOCKING // Counterpart to "tryUnpark" private fun tryPark() { if (!inStack()) { parkedWorkersStackPush(this) return } workerCtl.value = PARKED // Update value once /* * inStack() prevents spurious wakeups, while workerCtl.value == PARKED * prevents the following race: * * - T2 scans the queue, adds itself to the stack, goes to rescan * - T2 suspends in 'workerCtl.value = PARKED' line * - T1 pops T2 from the stack, claims workerCtl, suspends * - T2 fails 'while (inStack())' check, goes to full rescan * - T2 adds itself to the stack, parks * - T1 unparks T2, bails out with success * - T2 unparks and loops in 'while (inStack())' */ while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break tryReleaseCpu(WorkerState.PARKING) interrupted() // Cleanup interruptions park() } } private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK private fun executeTask(task: Task) { val taskMode = task.mode idleReset(taskMode) beforeTask(taskMode) runSafely(task) afterTask(taskMode) } private fun beforeTask(taskMode: Int) { if (taskMode == TASK_NON_BLOCKING) return // Always notify about new work when releasing CPU-permit to execute some blocking task if (tryReleaseCpu(WorkerState.BLOCKING)) { signalCpuWork() } } private fun afterTask(taskMode: Int) { if (taskMode == TASK_NON_BLOCKING) return decrementBlockingTasks() val currentState = state // Shutdown sequence of blocking dispatcher if (currentState !== WorkerState.TERMINATED) { assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState" state = WorkerState.DORMANT } } /* * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes. * ThreadLocalRandom cannot be used to support Android and ThreadLocal is up to 15% slower on Ktor benchmarks */ fun nextInt(upperBound: Int): Int { var r = rngState r = r xor (r shl 13) r = r xor (r shr 17) r = r xor (r shl 5) rngState = r val mask = upperBound - 1 // Fast path for power of two bound if (mask and upperBound == 0) { return r and mask } return (r and Int.MAX_VALUE) % upperBound } private fun park() { // set termination deadline the first time we are here (it is reset in idleReset) if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs // actually park LockSupport.parkNanos(idleWorkerKeepAliveNs) // try terminate when we are idle past termination deadline // note that comparison is written like this to protect against potential nanoTime wraparound if (System.nanoTime() - terminationDeadline >= 0) { terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again tryTerminateWorker() } } /** * Stops execution of current thread and removes it from [createdWorkers]. */ private fun tryTerminateWorker() { synchronized(workers) { // Make sure we're not trying race with termination of scheduler if (isTerminated) return // Someone else terminated, bail out if (createdWorkers <= corePoolSize) return /* * See tryUnpark for state reasoning. * If this CAS fails, then we were successfully unparked by other worker and cannot terminate. */ if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return /* * At this point this thread is no longer considered as usable for scheduling. * We need multi-step choreography to reindex workers. * * 1) Read current worker's index and reset it to zero. */ val oldIndex = indexInArray indexInArray = 0 /* * Now this worker cannot become the top of parkedWorkersStack, but it can * still be at the stack top via oldIndex. * * 2) Update top of stack if it was pointing to oldIndex and make sure no * pending push/pop operation that might have already retrieved oldIndex could complete. */ parkedWorkersStackTopUpdate(this, oldIndex, 0) /* * 3) Move last worker into an index in array that was previously occupied by this worker, * if last worker was a different one (sic!). */ val lastIndex = decrementCreatedWorkers() if (lastIndex != oldIndex) { val lastWorker = workers[lastIndex]!! workers.setSynchronized(oldIndex, lastWorker) lastWorker.indexInArray = oldIndex /* * Now lastWorker is available at both indices in the array, but it can * still be at the stack top on via its lastIndex * * 4) Update top of stack lastIndex -> oldIndex and make sure no * pending push/pop operation that might have already retrieved lastIndex could complete. */ parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex) } /* * 5) It is safe to clear reference from workers array now. */ workers.setSynchronized(lastIndex, null) } state = WorkerState.TERMINATED } // It is invoked by this worker when it finds a task private fun idleReset(mode: Int) { terminationDeadline = 0L // reset deadline for termination if (state == WorkerState.PARKING) { assert { mode == TASK_PROBABLY_BLOCKING } state = WorkerState.BLOCKING } } fun findTask(mayHaveLocalTasks: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks) /* * If we can't acquire a CPU permit, attempt to find blocking task: * - Check if our queue has one (maybe mixed in with CPU tasks) * - Poll global and try steal */ return findBlockingTask() } // NB: ONLY for runSingleTask method private fun findBlockingTask(): Task? { return localQueue.pollBlocking() ?: globalBlockingQueue.removeFirstOrNull() ?: trySteal(STEAL_BLOCKING_ONLY) } // NB: ONLY for runSingleTask method private fun findCpuTask(): Task? { return localQueue.pollCpu() ?: globalBlockingQueue.removeFirstOrNull() ?: trySteal(STEAL_CPU_ONLY) } private fun findAnyTask(scanLocalQueue: Boolean): Task? { /* * Anti-starvation mechanism: probabilistically poll either local * or global queue to ensure progress for both external and internal tasks. */ if (scanLocalQueue) { val globalFirst = nextInt(2 * corePoolSize) == 0 if (globalFirst) pollGlobalQueues()?.let { return it } localQueue.poll()?.let { return it } if (!globalFirst) pollGlobalQueues()?.let { return it } } else { pollGlobalQueues()?.let { return it } } return trySteal(STEAL_ANY) } private fun pollGlobalQueues(): Task? { if (nextInt(2) == 0) { globalCpuQueue.removeFirstOrNull()?.let { return it } return globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull()?.let { return it } return globalCpuQueue.removeFirstOrNull() } } private fun trySteal(stealingMode: StealingMode): Task? { val created = createdWorkers // 0 to await an initialization and 1 to avoid excess stealing on single-core machines if (created < 2) { return null } var currentIndex = nextInt(created) var minDelay = Long.MAX_VALUE repeat(created) { ++currentIndex if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] if (worker !== null && worker !== this) { val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask) if (stealResult == TASK_STOLEN) { val result = stolenTask.element stolenTask.element = null return result } else if (stealResult > 0) { minDelay = min(minDelay, stealResult) } } } minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0 return null } } enum class WorkerState { /** * Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one. */ CPU_ACQUIRED, /** * Executing task with [TASK_PROBABLY_BLOCKING]. */ BLOCKING, /** * Currently parked. */ PARKING, /** * Tries to execute its local work and then goes to infinite sleep as no longer needed worker. */ DORMANT, /** * Terminal state, will no longer be used */ TERMINATED } } /** * Checks if the thread is part of a thread pool that supports coroutines. * This function is needed for integration with BlockHound. */ @JvmName("isSchedulerWorker") internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker /** * Checks if the thread is running a CPU-bound task. * This function is needed for integration with BlockHound. */ @JvmName("mayNotBlock") internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker && thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED