package kotlinx.coroutines.flow.internal import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* internal class ChannelFlowTransformLatest( private val transform: suspend FlowCollector.(value: T) -> Unit, flow: Flow, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator(flow, context, capacity, onBufferOverflow) { override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow = ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow) override suspend fun flowCollect(collector: FlowCollector) { assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream coroutineScope { var previousFlow: Job? = null flow.collect { value -> previousFlow?.apply { cancel(ChildCancelledException()) join() } // Do not pay for dispatch here, it's never necessary previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { collector.transform(value) } } } } } internal class ChannelFlowMerge( private val flow: Flow>, private val concurrency: Int, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlow(context, capacity, onBufferOverflow) { override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow = ChannelFlowMerge(flow, concurrency, context, capacity, onBufferOverflow) override fun produceImpl(scope: CoroutineScope): ReceiveChannel { return scope.produce(context, capacity, block = collectToFun) } override suspend fun collectTo(scope: ProducerScope) { val semaphore = Semaphore(concurrency) val collector = SendingCollector(scope) val job: Job? = coroutineContext[Job] flow.collect { inner -> /* * We launch a coroutine on each emitted element and the only potential * suspension point in this collector is `semaphore.acquire` that rarely suspends, * so we manually check for cancellation to propagate it to the upstream in time. */ job?.ensureActive() semaphore.acquire() scope.launch { try { inner.collect(collector) } finally { semaphore.release() // Release concurrency permit } } } } override fun additionalToStringProps(): String = "concurrency=$concurrency" } internal class ChannelLimitedFlowMerge( private val flows: Iterable>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlow(context, capacity, onBufferOverflow) { override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow = ChannelLimitedFlowMerge(flows, context, capacity, onBufferOverflow) override fun produceImpl(scope: CoroutineScope): ReceiveChannel { return scope.produce(context, capacity, block = collectToFun) } override suspend fun collectTo(scope: ProducerScope) { val collector = SendingCollector(scope) flows.forEach { flow -> scope.launch { flow.collect(collector) } } } }