@file:JvmMultifileClass @file:JvmName("ChannelsKt") @file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlinx.coroutines.selects.* import kotlin.contracts.* import kotlin.jvm.* internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed" // -------- Operations on BroadcastChannel -------- /** * This function is deprecated in the favour of [ReceiveChannel.receiveCatching]. * * This function is considered error-prone for the following reasons; * - Is throwing if the channel has failed even though its signature may suggest it returns 'null' * - It is easy to forget that exception handling still have to be explicit * - During code reviews and code reading, intentions of the code are frequently unclear: * are potential exceptions ignored deliberately or not? * * @suppress doc */ @Deprecated( "Deprecated in the favour of 'receiveCatching'", ReplaceWith("receiveCatching().getOrNull()"), DeprecationLevel.HIDDEN ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 @Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DEPRECATION_ERROR") public suspend fun ReceiveChannel.receiveOrNull(): E? { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") return (this as ReceiveChannel).receiveOrNull() } /** * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching] */ @Deprecated( "Deprecated in the favour of 'onReceiveCatching'", level = DeprecationLevel.HIDDEN ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 @Suppress("DEPRECATION_ERROR") public fun ReceiveChannel.onReceiveOrNull(): SelectClause1 { return (this as ReceiveChannel).onReceiveOrNull } /** * Makes sure that the given [block] consumes all elements from the given channel * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block. * * The operation is _terminal_. */ public inline fun ReceiveChannel.consume(block: ReceiveChannel.() -> R): R { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } var cause: Throwable? = null try { return block() } catch (e: Throwable) { cause = e throw e } finally { cancelConsumed(cause) } } /** * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] * the channel after the execution of the block. * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead. * * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. */ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit): Unit = consume { for (e in this) action(e) } /** * Returns a [List] containing all elements. * * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. */ public suspend fun ReceiveChannel.toList(): List = buildList { consumeEach { add(it) } } @PublishedApi internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { cancel(cause?.let { it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) }) }