@file:Suppress("UNCHECKED_CAST") // KT-32203 package kotlinx.coroutines.flow.internal import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* private typealias Update = IndexedValue @PublishedApi internal suspend fun FlowCollector.combineInternal( flows: Array>, arrayFactory: () -> Array?, // Array factory is required to workaround array typing on JVM transform: suspend FlowCollector.(Array) -> Unit ): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope val size = flows.size if (size == 0) return@flowScope // bail-out for empty input val latestValues = arrayOfNulls(size) latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster than Array(size) { UNINITIALIZED } val resultChannel = Channel(size) val nonClosed = LocalAtomicInt(size) var remainingAbsentValues = size for (i in 0 until size) { // Coroutine per flow that keeps track of its value and sends result to downstream launch { try { flows[i].collect { value -> resultChannel.send(Update(i, value)) yield() // Emulate fairness, giving each flow chance to emit } } finally { // Close the channel when there is no more flows if (nonClosed.decrementAndGet() == 0) { resultChannel.close() } } } } /* * Batch-receive optimization: read updates in batches, but bail-out * as soon as we encountered two values from the same source */ val lastReceivedEpoch = ByteArray(size) var currentEpoch: Byte = 0 while (true) { ++currentEpoch // Start batch // The very first receive in epoch should be suspending var element = resultChannel.receiveCatching().getOrNull() ?: break // Channel is closed, nothing to do here while (true) { val index = element.index // Update values val previous = latestValues[index] latestValues[index] = element.value if (previous === UNINITIALIZED) --remainingAbsentValues // Check epoch // Received the second value from the same flow in the same epoch -- bail out if (lastReceivedEpoch[index] == currentEpoch) break lastReceivedEpoch[index] = currentEpoch element = resultChannel.tryReceive().getOrNull() ?: break } // Process batch result if there is enough data if (remainingAbsentValues == 0) { /* * If arrayFactory returns null, then we can avoid array copy because * it's our own safe transformer that immediately deconstructs the array */ val results = arrayFactory() if (results == null) { transform(latestValues as Array) } else { (latestValues as Array).copyInto(results) transform(results as Array) } } } } internal fun zipImpl(flow: Flow, flow2: Flow, transform: suspend (T1, T2) -> R): Flow = unsafeFlow { coroutineScope { val second = produce { flow2.collect { value -> return@collect channel.send(value ?: NULL) } } /* * This approach only works with rendezvous channel and is required to enforce correctness * in the following scenario: * ``` * val f1 = flow { emit(1); delay(Long.MAX_VALUE) } * val f2 = flowOf(1) * f1.zip(f2) { ... } * ``` * * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction). */ val collectJob = Job() (second as SendChannel<*>).invokeOnClose { // Optimization to avoid AFE allocation when the other flow is done if (collectJob.isActive) collectJob.cancel(AbortFlowException(collectJob)) } try { /* * Non-trivial undispatched (because we are in the right context and there is no structured concurrency) * hierarchy: * -Outer coroutineScope that owns the whole zip process * - First flow is collected by the child of coroutineScope, collectJob. * So it can be safely cancelled as soon as the second flow is done * - **But** the downstream MUST NOT be cancelled when the second flow is done, * so we emit to downstream from coroutineScope job. * Typically, such hierarchy requires coroutine for collector that communicates * with coroutines scope via a channel, but it's way too expensive, so * we are using this trick instead. */ val scopeContext = coroutineContext val cnt = threadContextElements(scopeContext) withContextUndispatched(coroutineContext + collectJob, Unit) { flow.collect { value -> withContextUndispatched(scopeContext, Unit, cnt) { val otherValue = second.receiveCatching().getOrElse { throw it ?: AbortFlowException(collectJob) } emit(transform(value, NULL.unbox(otherValue))) } } } } catch (e: AbortFlowException) { e.checkOwnership(owner = collectJob) } finally { second.cancel() } } }