@file:JvmMultifileClass @file:JvmName("FlowKt") package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* /** * Terminal flow operator that collects the given flow but ignores all emitted values. * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. * * It is a shorthand for `collect {}`. * * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and * handle an exception that might occur in the upstream flow or during processing, for example: * * ``` * flow * .onEach { value -> process(value) } * .catch { e -> handleException(e) } * .collect() // trigger collection of the flow * ``` */ public suspend fun Flow<*>.collect(): Unit = collect(NopCollector) /** * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope]. * It is a shorthand for `scope.launch { flow.collect() }`. * * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values * handle an exception that might occur in the upstream flow or during processing, for example: * * ``` * flow * .onEach { value -> updateUi(value) } * .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") } * .catch { cause -> LOG.error("Exception: $cause") } * .launchIn(uiScope) * ``` * * Note that the resulting value of [launchIn] is not used and the provided scope takes care of cancellation. */ public fun Flow.launchIn(scope: CoroutineScope): Job = scope.launch { collect() // tail-call } /** * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element. * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. * * See also [collect] and [withIndex]. */ public suspend inline fun Flow.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit = collect(object : FlowCollector { private var index = 0 override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value) }) /** * Terminal flow operator that collects the given flow with a provided [action]. * The crucial difference from [collect] is that when the original flow emits a new value * then the [action] block for the previous value is cancelled. * * It can be demonstrated by the following example: * * ``` * flow { * emit(1) * delay(50) * emit(2) * }.collectLatest { value -> * println("Collecting $value") * delay(100) // Emulate work * println("$value collected") * } * ``` * * prints "Collecting 1, Collecting 2, 2 collected" */ public suspend fun Flow.collectLatest(action: suspend (value: T) -> Unit) { /* * Implementation note: * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.: * ``` * flowOf(1, 2, 3).collectLatest { * delay(1) * println(it) // Expect only 3 to be printed * } * ``` * * It's not the case for intermediate operators which users mostly use for interactive UI, * where performance of dispatch is more important. */ mapLatest(action).buffer(0).collect() } /** * Collects all the values from the given [flow] and emits them to the collector. * It is a shorthand for `flow.collect { value -> emit(value) }`. */ public suspend fun FlowCollector.emitAll(flow: Flow) { ensureActive() flow.collect(this) } /** @suppress */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Backwards compatibility with JS and K/N") public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) })