package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* /** * A _hot_ [Flow] that shares emitted values among all its collectors in a broadcast fashion, so that all collectors * get all emitted values. A shared flow is called _hot_ because its active instance exists independently of the * presence of collectors. This is opposed to a regular [Flow], such as defined by the [`flow { ... }`][flow] function, * which is _cold_ and is started separately for each collector. * * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally, and * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_. * * A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running * is cancelled. A subscriber to a shared flow is always [cancellable][Flow.cancellable], and checks for * cancellation before each emission. Note that most terminal operators like [Flow.toList] would also not complete, * when applied to a shared flow, but flow-truncating operators like [Flow.take] and [Flow.takeWhile] can be used on a * shared flow to turn it into a completing one. * * A [mutable shared flow][MutableSharedFlow] is created using the [MutableSharedFlow(...)] constructor function. * Its state can be updated by [emitting][MutableSharedFlow.emit] values to it and performing other operations. * See the [MutableSharedFlow] documentation for details. * * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go. * For example, the following class encapsulates an event bus that distributes events to all subscribers * in a _rendezvous_ manner, suspending until all subscribers receive emitted event: * * ``` * class EventBus { * private val _events = MutableSharedFlow() // private mutable shared flow * val events = _events.asSharedFlow() // publicly exposed as read-only shared flow * * suspend fun produceEvent(event: Event) { * _events.emit(event) // suspends until all subscribers receive it * } * } * ``` * * As an alternative to the above usage with the `MutableSharedFlow(...)` constructor function, * any _cold_ [Flow] can be converted to a shared flow using the [shareIn] operator. * * There is a specialized implementation of shared flow for the case where the most recent state value needs * to be shared. See [StateFlow] for details. * * ### Replay cache and buffer * * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscriber first * gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is * specified when the shared flow is created by the `replay` parameter. A snapshot of the current replay cache * is available via the [replayCache] property and it can be reset with the [MutableSharedFlow.resetReplayCache] function. * * A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved * using the `extraBufferCapacity` parameter. * * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend. * * **Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept * the new value.** In the absence of subscribers only the most recent `replay` values are stored and the buffer * overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never * suspends despite [BufferOverflow.SUSPEND] option and [BufferOverflow.DROP_LATEST] option does not have effect either. * Essentially, the behavior in the absence of subscribers is always similar to [BufferOverflow.DROP_OLDEST], * but the buffer is just of `replay` size (without any `extraBufferCapacity`). * * ### Unbuffered shared flow * * A default implementation of a shared flow that is created with `MutableSharedFlow()` constructor function * without parameters has no replay cache nor additional buffer. * [emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value * and returns immediately if there are no subscribers. * Thus, [tryEmit][MutableSharedFlow.tryEmit] call succeeds and returns `true` only if * there are no subscribers (in which case the emitted value is immediately lost). * * ### SharedFlow vs BroadcastChannel * * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel] * and is designed to completely replace it. * It has the following important differences: * * - `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows * for faster and simpler implementation. * - `SharedFlow` supports configurable replay and buffer overflow strategy. * - `SharedFlow` has a clear separation into a read-only `SharedFlow` interface and a [MutableSharedFlow]. * - `SharedFlow` cannot be closed like `BroadcastChannel` and can never represent a failure. * All errors and completion signals should be explicitly _materialized_ if needed. * * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)` * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators. * * ### Concurrency * * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without * external synchronization. * * ### Operator fusion * * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity, * or [cancellable] operators to a shared flow has no effect. * * ### Implementation notes * * Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are * resumed outside of this lock to avoid deadlocks when using unconfined coroutines. Adding new subscribers * has `O(1)` amortized cost, but emitting has `O(N)` cost, where `N` is the number of subscribers. * * ### Not stable for inheritance * * **The `SharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods * might be added to this interface in the future, but is stable for use. * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation. */ public interface SharedFlow : Flow { /** * A snapshot of the replay cache. */ public val replayCache: List /** * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. * To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` * SAM-conversion can be used. * * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator * on a shared flow never completes normally. * * It is guaranteed that, by the time the first suspension happens, [collect] has already subscribed to the * [SharedFlow] and is eligible for receiving emissions. In particular, the following code will always print `1`: * ``` * val flow = MutableSharedFlow() * launch(start = CoroutineStart.UNDISPATCHED) { * flow.collect { println(1) } * } * flow.emit(1) * ``` * * @see [Flow.collect] for implementation and inheritance details. */ override suspend fun collect(collector: FlowCollector): Nothing } /** * A mutable [SharedFlow] that provides functions to [emit] values to the flow. * An instance of `MutableSharedFlow` with the given configuration parameters can be created using `MutableSharedFlow(...)` * constructor function. * * See the [SharedFlow] documentation for details on shared flows. * * `MutableSharedFlow` is a [SharedFlow] that also provides the abilities to [emit] a value, * to [tryEmit] without suspension if possible, to track the [subscriptionCount], * and to [resetReplayCache]. * * ### Concurrency * * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without * external synchronization. * * ### Not stable for inheritance * * **The `MutableSharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods * might be added to this interface in the future, but is stable for use. * Use the `MutableSharedFlow(...)` constructor function to create an implementation. */ public interface MutableSharedFlow : SharedFlow, FlowCollector { /** * Emits a [value] to this shared flow, suspending on buffer overflow. * * This call can suspend only when the [BufferOverflow] strategy is * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow. * * If there are no subscribers, the buffer is not used. * Instead, the most recently emitted value is simply stored into * the replay cache if one was configured, displacing the older elements there, * or dropped if no replay cache was configured. * * See [tryEmit] for a non-suspending variant of this function. * * This method is **thread-safe** and can be safely invoked from concurrent coroutines without * external synchronization. */ override suspend fun emit(value: T) /** * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was * emitted successfully (see below). When this function returns `false`, it means that a call to a plain [emit] * function would suspend until there is buffer space available. * * This call can return `false` only when the [BufferOverflow] strategy is * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow. * * If there are no subscribers, the buffer is not used. * Instead, the most recently emitted value is simply stored into * the replay cache if one was configured, displacing the older elements there, * or dropped if no replay cache was configured. In any case, `tryEmit` returns `true`. * * This method is **thread-safe** and can be safely invoked from concurrent coroutines without * external synchronization. */ public fun tryEmit(value: T): Boolean /** * The number of subscribers (active collectors) to this shared flow. * * The integer in the resulting [StateFlow] is not negative and starts with zero for a freshly created * shared flow. * * This state can be used to react to changes in the number of subscriptions to this shared flow. * For example, if you need to call `onActive` when the first subscriber appears and `onInactive` * when the last one disappears, you can set it up like this: * * ``` * sharedFlow.subscriptionCount * .map { count -> count > 0 } // map count into active/inactive flag * .distinctUntilChanged() // only react to true<->false changes * .onEach { isActive -> // configure an action * if (isActive) onActive() else onInactive() * } * .launchIn(scope) // launch it * ``` * * Usually, [StateFlow] conflates values, but [subscriptionCount] is not conflated. * This is done so that any subscribers that need to be notified when subscribers appear do * reliably observe it. With conflation, if a single subscriber appeared and immediately left, those * collecting [subscriptionCount] could fail to notice it due to `0` immediately conflating the * subscription count. */ public val subscriptionCount: StateFlow /** * Resets the [replayCache] of this shared flow to an empty state. * New subscribers will be receiving only the values that were emitted after this call, * while old subscribers will still be receiving previously buffered values. * To reset a shared flow to an initial value, emit the value after this call. * * On a [MutableStateFlow], which always contains a single value, this function is not * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow] * to an initial value, just update its [value][MutableStateFlow.value]. * * This method is **thread-safe** and can be safely invoked from concurrent coroutines without * external synchronization. * * **Note: This is an experimental api.** This function may be removed or renamed in the future. */ @ExperimentalCoroutinesApi public fun resetReplayCache() } /** * Creates a [MutableSharedFlow] with the given configuration parameters. * * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof. * * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero). * @param extraBufferCapacity the number of values buffered in addition to `replay`. * [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero). * @param onBufferOverflow configures an [emit][MutableSharedFlow.emit] action on buffer overflow. Optional, defaults to * [suspending][BufferOverflow.SUSPEND] attempts to emit a value. * Values other than [BufferOverflow.SUSPEND] are supported only when `replay > 0` or `extraBufferCapacity > 0`. * **Buffer overflow can happen only when there is at least one subscriber that is not ready to accept * the new value.** In the absence of subscribers only the most recent [replay] values are stored and * the buffer overflow behavior is never triggered and has no effect. */ @Suppress("FunctionName", "UNCHECKED_CAST") public fun MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow { require(replay >= 0) { "replay cannot be negative, but was $replay" } require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" } require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) { "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow" } val bufferCapacity0 = replay + extraBufferCapacity val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow) } // ------------------------------------ Implementation ------------------------------------ internal class SharedFlowSlot : AbstractSharedFlowSlot>() { @JvmField var index = -1L // current "to-be-emitted" index, -1 means the slot is free now @JvmField var cont: Continuation? = null // collector waiting for new value override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean { if (index >= 0) return false // not free index = flow.updateNewCollectorIndexLocked() return true } override fun freeLocked(flow: SharedFlowImpl<*>): Array?> { assert { index >= 0 } val oldIndex = index index = -1L cont = null // cleanup continuation reference return flow.updateCollectorIndexLocked(oldIndex) } } internal open class SharedFlowImpl( private val replay: Int, private val bufferCapacity: Int, private val onBufferOverflow: BufferOverflow ) : AbstractSharedFlow(), MutableSharedFlow, CancellableFlow, FusibleFlow { /* Logical structure of the buffer buffered values /-----------------------\ replayCache queued emitters /----------\/----------------------\ +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ | | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | | +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ ^ ^ ^ ^ | | | | head | head + bufferSize head + totalSize | | | index of the slowest | index of the fastest possible collector | possible collector | | | replayIndex == new collector's index \---------------------- / range of possible minCollectorIndex head == minOf(minCollectorIndex, replayIndex) // by definition totalSize == bufferSize + queueSize // by definition INVARIANTS: minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize) replayIndex <= head + bufferSize */ // Stored state private var buffer: Array? = null // allocated when needed, allocated size always power of two private var replayIndex = 0L // minimal index from which new collector gets values private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none private var bufferSize = 0 // number of buffered values private var queueSize = 0 // number of queued emitters // Computed state private val head: Long get() = minOf(minCollectorIndex, replayIndex) private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt() private val totalSize: Int get() = bufferSize + queueSize private val bufferEndIndex: Long get() = head + bufferSize private val queueEndIndex: Long get() = head + bufferSize + queueSize override val replayCache: List get() = synchronized(this) { val replaySize = this.replaySize if (replaySize == 0) return emptyList() val result = ArrayList(replaySize) val buffer = buffer!! // must be allocated, because replaySize > 0 @Suppress("UNCHECKED_CAST") for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T result } /* * A tweak for SubscriptionCountStateFlow to get the latest value. */ @Suppress("UNCHECKED_CAST") protected val lastReplayedLocked: T get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T @Suppress("UNCHECKED_CAST") override suspend fun collect(collector: FlowCollector): Nothing { val slot = allocateSlot() try { if (collector is SubscribedFlowCollector) collector.onSubscription() val collectorJob = currentCoroutineContext()[Job] while (true) { var newValue: Any? while (true) { newValue = tryTakeValue(slot) // attempt no-suspend fast path first if (newValue !== NO_VALUE) break awaitValue(slot) // await signal that the new value is available } collectorJob?.ensureActive() collector.emit(newValue as T) } } finally { freeSlot(slot) } } override fun tryEmit(value: T): Boolean { var resumes: Array?> = EMPTY_RESUMES val emitted = synchronized(this) { if (tryEmitLocked(value)) { resumes = findSlotsToResumeLocked(resumes) true } else { false } } for (cont in resumes) cont?.resume(Unit) return emitted } override suspend fun emit(value: T) { if (tryEmit(value)) return // fast-path emitSuspend(value) } @Suppress("UNCHECKED_CAST") private fun tryEmitLocked(value: T): Boolean { // Fast path without collectors -> no buffering if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true // With collectors we'll have to buffer // cannot emit now if buffer is full & blocked by slow collectors if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) { when (onBufferOverflow) { BufferOverflow.SUSPEND -> return false // will suspend BufferOverflow.DROP_LATEST -> return true // just drop incoming BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead } } enqueueLocked(value) bufferSize++ // value was added to buffer // drop oldest from the buffer if it became more than bufferCapacity if (bufferSize > bufferCapacity) dropOldestLocked() // keep replaySize not larger that needed if (replaySize > replay) { // increment replayIndex by one updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex) } return true } private fun tryEmitNoCollectorsLocked(value: T): Boolean { assert { nCollectors == 0 } if (replay == 0) return true // no need to replay, just forget it now enqueueLocked(value) // enqueue to replayCache bufferSize++ // value was added to buffer // drop oldest from the buffer if it became more than replay if (bufferSize > replay) dropOldestLocked() minCollectorIndex = head + bufferSize // a default value (max allowed) return true } private fun dropOldestLocked() { buffer!!.setBufferAt(head, null) bufferSize-- val newHead = head + 1 if (replayIndex < newHead) replayIndex = newHead if (minCollectorIndex < newHead) correctCollectorIndexesOnDropOldest(newHead) assert { head == newHead } // since head = minOf(minCollectorIndex, replayIndex) it should have updated } private fun correctCollectorIndexesOnDropOldest(newHead: Long) { forEachSlotLocked { slot -> @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend if (slot.index >= 0 && slot.index < newHead) { slot.index = newHead // force move it up (this collector was too slow and missed the value at its index) } } minCollectorIndex = newHead } // enqueues item to buffer array, caller shall increment either bufferSize or queueSize private fun enqueueLocked(item: Any?) { val curSize = totalSize val buffer = when (val curBuffer = buffer) { null -> growBuffer(null, 0, 2) else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer } buffer.setBufferAt(head + curSize, item) } private fun growBuffer(curBuffer: Array?, curSize: Int, newSize: Int): Array { check(newSize > 0) { "Buffer size overflow" } val newBuffer = arrayOfNulls(newSize).also { buffer = it } if (curBuffer == null) return newBuffer val head = head for (i in 0 until curSize) { newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i)) } return newBuffer } private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine sc@{ cont -> var resumes: Array?> = EMPTY_RESUMES val emitter = synchronized(this) lock@{ // recheck buffer under lock again (make sure it is really full) if (tryEmitLocked(value)) { cont.resume(Unit) resumes = findSlotsToResumeLocked(resumes) return@lock null } // add suspended emitter to the buffer Emitter(this, head + totalSize, value, cont).also { enqueueLocked(it) queueSize++ // added to queue of waiting emitters // synchronous shared flow might rendezvous with waiting emitter if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes) } } // outside of the lock: register dispose on cancellation emitter?.let { cont.disposeOnCancellation(it) } // outside of the lock: resume slots if needed for (r in resumes) r?.resume(Unit) } private fun cancelEmitter(emitter: Emitter) = synchronized(this) { if (emitter.index < head) return // already skipped past this index val buffer = buffer!! if (buffer.getBufferAt(emitter.index) !== emitter) return // already resumed buffer.setBufferAt(emitter.index, NO_VALUE) cleanupTailLocked() } internal fun updateNewCollectorIndexLocked(): Long { val index = replayIndex if (index < minCollectorIndex) minCollectorIndex = index return index } // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock internal fun updateCollectorIndexLocked(oldIndex: Long): Array?> { assert { oldIndex >= minCollectorIndex } if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min // start computing new minimal index of active collectors val head = head var newMinCollectorIndex = head + bufferSize // take into account a special case of sync shared flow that can go past 1st queued emitter if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++ forEachSlotLocked { slot -> @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index } assert { newMinCollectorIndex >= minCollectorIndex } // can only grow if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes // Compute new buffer size if we drop items we no longer need and no emitter is resumed: // We must keep all the items from newMinIndex to the end of buffer var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed val maxResumeCount = if (nCollectors > 0) { // If we have collectors we can resume up to maxResumeCount waiting emitters // a) queueSize -> that's how many waiting emitters we have // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt() minOf(queueSize, bufferCapacity - newBufferSize0) } else { // If we don't have collectors anymore we must resume all waiting emitters queueSize // that's how many waiting emitters we have (at most) } var resumes: Array?> = EMPTY_RESUMES val newQueueEndIndex = newBufferEndIndex + queueSize if (maxResumeCount > 0) { // collect emitters to resume if we have them resumes = arrayOfNulls(maxResumeCount) var resumeCount = 0 val buffer = buffer!! for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) { val emitter = buffer.getBufferAt(curEmitterIndex) if (emitter !== NO_VALUE) { emitter as Emitter // must have Emitter class resumes[resumeCount++] = emitter.cont buffer.setBufferAt(curEmitterIndex, NO_VALUE) // make as canceled if we moved ahead buffer.setBufferAt(newBufferEndIndex, emitter.value) newBufferEndIndex++ if (resumeCount >= maxResumeCount) break // enough resumed, done } } } // Compute new buffer size -> how many values we now actually have after resume val newBufferSize1 = (newBufferEndIndex - head).toInt() // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity, // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1) // expression, which coerces values that are too big anyway. if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1)) // adjustment for synchronous case with cancelled emitter (NO_VALUE) if (bufferCapacity == 0 && newReplayIndex < newQueueEndIndex && buffer!!.getBufferAt(newReplayIndex) == NO_VALUE) { newBufferEndIndex++ newReplayIndex++ } // Update buffer state updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex) // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now cleanupTailLocked() // We need to waken up suspended collectors if any emitters were resumed here if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes) return resumes } private fun updateBufferLocked( newReplayIndex: Long, newMinCollectorIndex: Long, newBufferEndIndex: Long, newQueueEndIndex: Long ) { // Compute new head value val newHead = minOf(newMinCollectorIndex, newReplayIndex) assert { newHead >= head } // cleanup items we don't have to buffer anymore (because head is about to move) for (index in head until newHead) buffer!!.setBufferAt(index, null) // update all state variables to newly computed values replayIndex = newReplayIndex minCollectorIndex = newMinCollectorIndex bufferSize = (newBufferEndIndex - newHead).toInt() queueSize = (newQueueEndIndex - newBufferEndIndex).toInt() // check our key invariants (just in case) assert { bufferSize >= 0 } assert { queueSize >= 0 } assert { replayIndex <= this.head + bufferSize } } // Removes all the NO_VALUE items from the end of the queue and reduces its size private fun cleanupTailLocked() { // If we have synchronous case, then keep one emitter queued if (bufferCapacity == 0 && queueSize <= 1) return // return, don't clear it val buffer = buffer!! while (queueSize > 0 && buffer.getBufferAt(head + totalSize - 1) === NO_VALUE) { queueSize-- buffer.setBufferAt(head + totalSize, null) } } // returns NO_VALUE if cannot take value without suspension private fun tryTakeValue(slot: SharedFlowSlot): Any? { var resumes: Array?> = EMPTY_RESUMES val value = synchronized(this) { val index = tryPeekLocked(slot) if (index < 0) { NO_VALUE } else { val oldIndex = slot.index val newValue = getPeekedValueLockedAt(index) slot.index = index + 1 // points to the next index after peeked one resumes = updateCollectorIndexLocked(oldIndex) newValue } } for (resume in resumes) resume?.resume(Unit) return value } // returns -1 if cannot peek value without suspension private fun tryPeekLocked(slot: SharedFlowSlot): Long { // return buffered value if possible val index = slot.index if (index < bufferEndIndex) return index if (bufferCapacity > 0) return -1L // if there's a buffer, never try to rendezvous with emitters // Synchronous shared flow (bufferCapacity == 0) tries to rendezvous if (index > head) return -1L // ... but only with the first emitter (never look forward) if (queueSize == 0) return -1L // nothing there to rendezvous with return index // rendezvous with the first emitter } private fun getPeekedValueLockedAt(index: Long): Any? = when (val item = buffer!!.getBufferAt(index)) { is Emitter -> item.value else -> item } private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont -> synchronized(this) lock@{ val index = tryPeekLocked(slot) // recheck under this lock if (index < 0) { slot.cont = cont // Ok -- suspending } else { cont.resume(Unit) // has value, no need to suspend return@lock } slot.cont = cont // suspend, waiting } } private fun findSlotsToResumeLocked(resumesIn: Array?>): Array?> { var resumes: Array?> = resumesIn var resumeCount = resumesIn.size forEachSlotLocked loop@{ slot -> val cont = slot.cont ?: return@loop // only waiting slots if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size)) resumes[resumeCount++] = cont slot.cont = null // not waiting anymore } return resumes } override fun createSlot() = SharedFlowSlot() override fun createSlotArray(size: Int): Array = arrayOfNulls(size) override fun resetReplayCache() = synchronized(this) { // Update buffer state updateBufferLocked( newReplayIndex = bufferEndIndex, newMinCollectorIndex = minCollectorIndex, newBufferEndIndex = bufferEndIndex, newQueueEndIndex = queueEndIndex ) } override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow) private class Emitter( @JvmField val flow: SharedFlowImpl<*>, @JvmField var index: Long, @JvmField val value: Any?, @JvmField val cont: Continuation ) : DisposableHandle { override fun dispose() = flow.cancelEmitter(this) } } @JvmField internal val NO_VALUE = Symbol("NO_VALUE") private fun Array.getBufferAt(index: Long) = get(index.toInt() and (size - 1)) private fun Array.setBufferAt(index: Long, item: Any?) = set(index.toInt() and (size - 1), item) internal fun SharedFlow.fuseSharedFlow( context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow ): Flow { // context is irrelevant for shared flow and making additional rendezvous is meaningless // however, additional non-trivial buffering after shared flow could make sense for very slow subscribers if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) { return this } // Apply channel flow operator as usual return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow) }