@file:Suppress("FunctionName") package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel.Factory.BUFFERED import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY import kotlinx.coroutines.channels.Channel.Factory.CONFLATED import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlin.contracts.* import kotlin.internal.* import kotlin.jvm.* /** * Sender's interface to [Channel]. */ public interface SendChannel { /** * Returns `true` if this channel was closed by an invocation of [close] or its receiving side was [cancelled][ReceiveChannel.cancel]. * This means that calling [send] will result in an exception. * * Note that if this property returns `false`, it does not guarantee that consecutive call to [send] will succeed, as the * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [trySend] instead. * * @see SendChannel.trySend * @see SendChannel.close * @see ReceiveChannel.cancel */ @DelicateCoroutinesApi public val isClosedForSend: Boolean /** * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). * * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation * to abort, because closing a channel is conceptually like sending a special "close token" over this channel. * All elements sent over the channel are delivered in first-in first-out order. The sent element * will be delivered to receivers before the close token. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if [send] managed to send the element, but was cancelled * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details. * * Because of the prompt cancellation guarantee, an exception does not always mean a failure to deliver the element. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onSend] clause. * Use [trySend] to try sending to this channel without waiting. */ public suspend fun send(element: E) /** * Clause for the [select] expression of the [send] suspending function that selects when the element that is specified * as the parameter is sent to the channel. When the clause is selected, the reference to this channel * is passed into the corresponding block. * * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). */ public val onSend: SelectClause2> /** * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, * and returns the successful result. Otherwise, returns failed or closed result. * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws. * * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and * it does not call `onUndeliveredElement` that was installed for this channel. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. */ public fun trySend(element: E): ChannelResult /** * Closes this channel. * This is an idempotent operation — subsequent invocations of this function have no effect and return `false`. * Conceptually, it sends a special "close token" over this channel. * * Immediately after invocation of this function, * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive] * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements * are received. * * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive]. * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or * receive on a failed channel throw the specified [cause] exception. */ public fun close(cause: Throwable? = null): Boolean /** * Registers a [handler] which is synchronously invoked once the channel is [closed][close] * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel]. * Only one handler can be attached to a channel during its lifetime. * The `handler` is invoked when [isClosedForSend] starts to return `true`. * If the channel is closed already, the handler is invoked immediately. * * The meaning of `cause` that is passed to the handler: * - `null` if the channel was closed normally without the corresponding argument. * - Instance of [CancellationException] if the channel was cancelled normally without the corresponding argument. * - The cause of `close` or `cancel` otherwise. * * ### Execution context and exception safety * * The [handler] is executed as part of the closing or cancelling operation, and only after the channel reaches its final state. * This means that if the handler throws an exception or hangs, the channel will still be successfully closed or cancelled. * Unhandled exceptions from [handler] are propagated to the closing or cancelling operation's caller. * * Example of usage: * ``` * val events = Channel(UNLIMITED) * callbackBasedApi.registerCallback { event -> * events.trySend(event) * .onClosed { /* channel is already closed, but the callback hasn't stopped yet */ } * } * * val uiUpdater = uiScope.launch(Dispatchers.Main) { * events.consume { /* handle events */ } * } * // Stop the callback after the channel is closed or cancelled * events.invokeOnClose { callbackBasedApi.stop() } * ``` * * **Stability note.** This function constitutes a stable API surface, with the only exception being * that an [IllegalStateException] is thrown when multiple handlers are registered. * This restriction could be lifted in the future. * * @throws UnsupportedOperationException if the underlying channel does not support [invokeOnClose]. * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations * * @throws IllegalStateException if another handler was already registered */ public fun invokeOnClose(handler: (cause: Throwable?) -> Unit) /** * **Deprecated** offer method. * * This method was deprecated in the favour of [trySend]. * It has proven itself as the most error-prone method in Channel API: * * - `Boolean` return type creates the false sense of security, implying that `false` * is returned instead of throwing an exception. * - It was used mostly from non-suspending APIs where CancellationException triggered * internal failures in the application (the most common source of bugs). * - Due to signature and explicit `if (ch.offer(...))` checks it was easy to * oversee such error during code review. * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. * * **NB** Automatic migration provides best-effort for the user experience, but requires removal * or adjusting of the code that relied on the exception handling. * The complete replacement has a more verbose form: * ``` * channel.trySend(element) * .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") } * .isSuccess * ``` * * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. * * @suppress **Deprecated**. */ @Deprecated( level = DeprecationLevel.ERROR, message = "Deprecated in the favour of 'trySend' method", replaceWith = ReplaceWith("trySend(element).isSuccess") ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread public fun offer(element: E): Boolean { val result = trySend(element) if (result.isSuccess) return true throw recoverStackTrace(result.exceptionOrNull() ?: return false) } } /** * Receiver's interface to [Channel]. */ public interface ReceiveChannel { /** * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel] * side and all previously sent items were already received, or if the receiving side was [cancelled][ReceiveChannel.cancel]. * * This means that calling [receive] will result in a [ClosedReceiveChannelException] or a corresponding cancellation cause. * If the channel was closed because of an exception, it is considered closed, too, but is called a _failed_ channel. * All suspending attempts to receive an element from a failed channel throw the original [close][SendChannel.close] cause exception. * * Note that if this property returns `false`, it does not guarantee that consecutive call to [receive] will succeed, as the * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [receiveCatching] instead. * * @see ReceiveChannel.receiveCatching * @see ReceiveChannel.cancel * @see SendChannel.close */ @DelicateCoroutinesApi public val isClosedForReceive: Boolean /** * Returns `true` if the channel is empty (contains no elements), which means that an attempt to [receive] will suspend. * This function returns `false` if the channel [is closed for `receive`][isClosedForReceive]. */ @ExperimentalCoroutinesApi public val isEmpty: Boolean /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty, * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive]. * If the channel was closed because of an exception, it is called a _failed_ channel and this function * will throw the original [close][SendChannel.close] cause exception. * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled while this * function is suspended, this function immediately resumes with a [CancellationException]. * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel, * but then throw [CancellationException], thus failing to deliver the element. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if [receive] managed to retrieve the element from the channel, * but was cancelled while suspended, [CancellationException] will be thrown. * See [suspendCancellableCoroutine] for low-level details. * * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onReceive] clause. * Use [tryReceive] to try receiving from this channel without waiting. */ public suspend fun receive(): E /** * Clause for the [select] expression of the [receive] suspending function that selects with the element * received from the channel. * The [select] invocation fails with an exception if the channel * [is closed for `receive`][isClosedForReceive] (see [close][SendChannel.close] for details). */ public val onReceive: SelectClause1 /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally. * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed]. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if [receiveCatching] managed to retrieve the element from the * channel, but was cancelled while suspended, [CancellationException] will be thrown. * See [suspendCancellableCoroutine] for low-level details. * * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onReceiveCatching] clause. * Use [tryReceive] to try receiving from this channel without waiting. */ public suspend fun receiveCatching(): ChannelResult /** * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value * that is received from the channel or with a close cause if the channel * [is closed for `receive`][isClosedForReceive]. */ public val onReceiveCatching: SelectClause1> /** * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success] * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed] * result if the channel is closed. */ public fun tryReceive(): ChannelResult /** * Returns a new iterator to receive elements from this channel using a `for` loop. * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and * throws the original [close][SendChannel.close] cause exception if the channel has _failed_. */ public operator fun iterator(): ChannelIterator /** * Cancels reception of remaining elements from this channel with an optional [cause]. * This function closes the channel and removes all buffered sent elements from it. * * A cause can be used to specify an error message or to provide other details on * the cancellation reason for debugging purposes. * If the cause is not specified, then an instance of [CancellationException] with a * default message is created to [close][SendChannel.close] the channel. * * Immediately after invocation of this function [isClosedForReceive] and * [isClosedForSend][SendChannel.isClosedForSend] * on the side of [SendChannel] start returning `true`. Any attempt to send to or receive from this channel * will lead to a [CancellationException]. */ public fun cancel(cause: CancellationException? = null) /** * @suppress This method implements old version of JVM ABI. Use [cancel]. */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(): Unit = cancel(null) /** * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel]. */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(cause: Throwable? = null): Boolean /** * **Deprecated** poll method. * * This method was deprecated in the favour of [tryReceive]. * It has proven itself as error-prone method in Channel API: * * - Nullable return type creates the false sense of security, implying that `null` * is returned instead of throwing an exception. * - It was used mostly from non-suspending APIs where CancellationException triggered * internal failures in the application (the most common source of bugs). * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. * * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. * * ### Replacement note * * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception. * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()` * * @suppress **Deprecated**. */ @Deprecated( level = DeprecationLevel.ERROR, message = "Deprecated in the favour of 'tryReceive'. " + "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " + "for the precise replacement please refer to the 'poll' documentation", replaceWith = ReplaceWith("tryReceive().getOrNull()") ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread public fun poll(): E? { val result = tryReceive() if (result.isSuccess) return result.getOrThrow() throw recoverStackTrace(result.exceptionOrNull() ?: return null) } /** * This function was deprecated since 1.3.0 and is no longer recommended to use * or to implement in subclasses. * * It had the following pitfalls: * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value" * - Was throwing if the channel has failed even though its signature may suggest it returns 'null' * - It didn't really belong to core channel API and can be exposed as an extension instead. * * ### Replacement note * * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception. * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`. * * @suppress **Deprecated** */ @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") @LowPriorityInOverloadResolution @Deprecated( message = "Deprecated in favor of 'receiveCatching'. " + "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " + "for the detailed replacement please refer to the 'receiveOrNull' documentation", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("receiveCatching().getOrNull()") ) // Warning since 1.3.0, error in 1.5.0, cannot be hidden due to deprecated extensions public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull() /** * This function was deprecated since 1.3.0 and is no longer recommended to use * or to implement in subclasses. * See [receiveOrNull] documentation. * * @suppress **Deprecated**: in favor of onReceiveCatching extension. */ @Suppress("DEPRECATION_ERROR") @Deprecated( message = "Deprecated in favor of onReceiveCatching extension", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("onReceiveCatching") ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.7.0 public val onReceiveOrNull: SelectClause1 get() = (this as BufferedChannel).onReceiveOrNull } /** * A discriminated union of channel operation result. * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with * an optional cause. * * The successful result represents a successful operation with a value of type [T], for example, * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend]. * * The failed result represents a failed operation attempt to a channel, but it doesn't necessarily indicate that the channel is failed. * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state. * * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed. * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend] * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving. */ @JvmInline public value class ChannelResult @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) { /** * Returns `true` if this instance represents a successful * operation outcome. * * In this case [isFailure] and [isClosed] return `false`. */ public val isSuccess: Boolean get() = holder !is Failed /** * Returns `true` if this instance represents unsuccessful operation. * * In this case [isSuccess] returns false, but it does not imply * that the channel is failed or closed. * * Example of a failed operation without an exception and channel being closed * is [Channel.trySend] attempt to a channel that is full. */ public val isFailure: Boolean get() = holder is Failed /** * Returns `true` if this instance represents unsuccessful operation * to a closed or cancelled channel. * * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply * that [exceptionOrNull] returns non-null value. * * It can happen if the channel was [closed][Channel.close] normally without an exception. */ public val isClosed: Boolean get() = holder is Closed /** * Returns the encapsulated value if this instance represents success or `null` if it represents failed result. */ @Suppress("UNCHECKED_CAST") public fun getOrNull(): T? = if (holder !is Failed) holder as T else null /** * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed. */ public fun getOrThrow(): T { @Suppress("UNCHECKED_CAST") if (holder !is Failed) return holder as T if (holder is Closed && holder.cause != null) throw holder.cause error("Trying to call 'getOrThrow' on a failed channel result: $holder") } /** * Returns the encapsulated exception if this instance represents failure or `null` if it is success * or unsuccessful operation to closed channel. */ public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause internal open class Failed { override fun toString(): String = "Failed" } internal class Closed(@JvmField val cause: Throwable?): Failed() { override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause override fun hashCode(): Int = cause.hashCode() override fun toString(): String = "Closed($cause)" } @InternalCoroutinesApi public companion object { private val failed = Failed() @InternalCoroutinesApi public fun success(value: E): ChannelResult = ChannelResult(value) @InternalCoroutinesApi public fun failure(): ChannelResult = ChannelResult(failed) @InternalCoroutinesApi public fun closed(cause: Throwable?): ChannelResult = ChannelResult(Closed(cause)) } public override fun toString(): String = when (holder) { is Closed -> holder.toString() else -> "Value($holder)" } } /** * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed * result. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.getOrElse(onFailure: (exception: Throwable?) -> T): T { contract { callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE) } @Suppress("UNCHECKED_CAST") return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T } /** * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess]. * Returns the original `ChannelResult` unchanged. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.onSuccess(action: (value: T) -> Unit): ChannelResult { contract { callsInPlace(action, InvocationKind.AT_MOST_ONCE) } @Suppress("UNCHECKED_CAST") if (holder !is ChannelResult.Failed) action(holder as T) return this } /** * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]. * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. * * Returns the original `ChannelResult` unchanged. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult { contract { callsInPlace(action, InvocationKind.AT_MOST_ONCE) } if (holder is ChannelResult.Failed) action(exceptionOrNull()) return this } /** * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure] * due to channel being [closed][Channel.close]. * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend] * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation. * * Returns the original `ChannelResult` unchanged. */ @OptIn(ExperimentalContracts::class) public inline fun ChannelResult.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult { contract { callsInPlace(action, InvocationKind.AT_MOST_ONCE) } if (holder is ChannelResult.Closed) action(exceptionOrNull()) return this } /** * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used * from concurrent coroutines. */ public interface ChannelIterator { /** * Returns `true` if the channel has more elements, suspending the caller while this channel is empty, * or returns `false` if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause. * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. * * This function retrieves and removes an element from this channel for the subsequent invocation * of [next]. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**: even if [hasNext] retrieves the element from the channel during * its operation, but was cancelled while suspended, [CancellationException] will be thrown. * See [suspendCancellableCoroutine] for low-level details. * * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. */ public suspend operator fun hasNext(): Boolean @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN) @Suppress("INAPPLICABLE_JVM_NAME") @JvmName("next") public suspend fun next0(): E { /* * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously * compiled code. */ if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE) return next() } /** * Retrieves the element removed from the channel by a preceding call to [hasNext], or * throws an [IllegalStateException] if [hasNext] was not invoked. * This method should only be used in pair with [hasNext]: * ``` * while (iterator.hasNext()) { * val element = iterator.next() * // ... handle element ... * } * ``` * * This method throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause. * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. */ public operator fun next(): E } /** * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]). * Conceptually, a channel is similar to Java's [BlockingQueue][java.util.concurrent.BlockingQueue], * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close]. * * ### Creating channels * * The `Channel(capacity)` factory function is used to create channels of different kinds depending on * the value of the `capacity` integer: * * - When `capacity` is 0 — it creates a _rendezvous_ channel. * This channel does not have any buffer at all. An element is transferred from the sender * to the receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends * until another coroutine invokes [receive], and [receive] suspends until another coroutine invokes [send]. * * - When `capacity` is [Channel.UNLIMITED] — it creates a channel with effectively unlimited buffer. * This channel has a linked-list buffer of unlimited capacity (limited only by available memory). * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * - When `capacity` is [Channel.CONFLATED] — it creates a _conflated_ channel * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the last element sent. * Back-to-back sent elements are conflated — only the last sent element is received, * while previously sent elements **are lost**. * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * - When `capacity` is positive but less than [UNLIMITED] — it creates an array-based channel with the specified capacity. * This channel has an array buffer of a fixed `capacity`. * [Sending][send] suspends only when the buffer is full, and [receiving][receive] suspends only when the buffer is empty. * * Buffered channels can be configured with an additional [`onBufferOverflow`][BufferOverflow] parameter. It controls the behaviour * of the channel's [send][Channel.send] function on buffer overflow: * * - [SUSPEND][BufferOverflow.SUSPEND] — the default, suspend `send` on buffer overflow until there is * free space in the buffer. * - [DROP_OLDEST][BufferOverflow.DROP_OLDEST] — do not suspend the `send`, add the latest value to the buffer, * drop the oldest one from the buffer. * A channel with `capacity = 1` and `onBufferOverflow = DROP_OLDEST` is a _conflated_ channel. * - [DROP_LATEST][BufferOverflow.DROP_LATEST] — do not suspend the `send`, drop the value that is being sent, * keep the buffer contents intact. * * A non-default `onBufferOverflow` implicitly creates a channel with at least one buffered element and * is ignored for a channel with unlimited buffer. It cannot be specified for `capacity = CONFLATED`, which * is a shortcut by itself. * * ### Prompt cancellation guarantee * * All suspending functions with channels provide **prompt cancellation guarantee**. * If the job was cancelled while send or receive function was suspended, it will not resume successfully, even if it * already changed the channel's state, but throws a [CancellationException]. * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main], this gives a * guarantee that the coroutine promptly reacts to the cancellation of its [Job] and does not resume its execution. * * > **Prompt cancellation guarantee** for channel operations was added since `kotlinx.coroutines` version `1.4.0` * > and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions. * > The low-level mechanics of prompt cancellation are explained in [suspendCancellableCoroutine] function. * * ### Undelivered elements * * As a result of the prompt cancellation guarantee, when a closeable resource * (like open file or a handle to another native resource) is transferred via a channel from one coroutine to another, * it can fail to be delivered and will be lost if the receiving operation is cancelled in transit. * * A `Channel()` constructor function has an `onUndeliveredElement` optional parameter. * When `onUndeliveredElement` parameter is set, the corresponding function is called once for each element * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered, * which can happen in the following cases: * * - When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually * send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. * - When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext] * operation throws an exception when it had retrieved the element from the * channel but was cancelled before the code following the receive call resumed. * - The channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every * remaining element in the channel's buffer. * * Note, that `onUndeliveredElement` function is called synchronously in an arbitrary context. It should be fast, non-blocking, * and should not throw exceptions. Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime * exception which is either rethrown from the caller method or handed off to the exception handler in the current context * (see [CoroutineExceptionHandler]) when one is available. * * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The * following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel * are cancelled. Resources are never lost. * * ``` * // Create the channel with onUndeliveredElement block that closes a resource * val channel = Channel(capacity) { resource -> resource.close() } * * // Producer code * val resourceToSend = openResource() * channel.send(resourceToSend) * * // Consumer code * val resourceReceived = channel.receive() * try { * // work with received resource * } finally { * resourceReceived.close() * } * ``` * * > Note, that if you do any kind of work in between `openResource()` and `channel.send(...)`, then you should * > ensure that resource gets closed in case this additional code fails. */ public interface Channel : SendChannel, ReceiveChannel { /** * Constants for the channel factory function `Channel()`. */ public companion object Factory { /** * Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function. */ public const val UNLIMITED: Int = Int.MAX_VALUE /** * Requests a rendezvous channel in the `Channel(...)` factory function — a channel that does not have a buffer. */ public const val RENDEZVOUS: Int = 0 /** * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST]. */ public const val CONFLATED: Int = -1 /** * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function. * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM. * For non-suspending channels, a buffer of capacity 1 is used. */ public const val BUFFERED: Int = -2 // only for internal use, cannot be used with Channel(...) internal const val OPTIONAL_CHANNEL = -3 /** * Name of the property that defines the default channel capacity when * [BUFFERED] is used as parameter in `Channel(...)` factory function. */ public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer" internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME, 64, 1, UNLIMITED - 1 ) } } /** * Creates a channel with the specified buffer capacity (or without a buffer by default). * See [Channel] interface documentation for details. * * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory]. * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to * a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value, * supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`, * implicitly creates a channel with at least one buffered element). * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer. * See "Undelivered elements" section in [Channel] documentation. * @throws IllegalArgumentException when [capacity] < -2 */ public fun Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel } CONFLATED -> { require(onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement) } UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows BUFFERED -> { // uses default capacity with SUSPEND if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement) else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) } else -> { if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement) else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement) } } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions") public fun Channel(capacity: Int = RENDEZVOUS): Channel = Channel(capacity) /** * Indicates an attempt to [send][SendChannel.send] to a [isClosedForSend][SendChannel.isClosedForSend] channel * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause * exception on send attempts. * * This exception is a subclass of [IllegalStateException], because, conceptually, it is the sender's responsibility * to close the channel and not try to send anything thereafter. Attempts to * send to a closed channel indicate a logical error in the sender's code. */ public class ClosedSendChannelException(message: String?) : IllegalStateException(message) /** * Indicates an attempt to [receive][ReceiveChannel.receive] from a [isClosedForReceive][ReceiveChannel.isClosedForReceive] * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause * exception on receive attempts. * * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections. */ public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)