@file:JvmMultifileClass @file:JvmName("ChannelsKt") @file:Suppress("unused") package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlin.coroutines.* import kotlin.jvm.* /** * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block. * * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0** * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. * * Safe to remove in 1.9.0 as was inline before. */ @ObsoleteCoroutinesApi @Suppress("DEPRECATION") @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported") public inline fun BroadcastChannel.consume(block: ReceiveChannel.() -> R): R { val channel = openSubscription() try { return channel.block() } finally { channel.cancel() } } /** * Subscribes to this [BroadcastChannel] and performs the specified action for each received element. * * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0** */ @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported") @Suppress("DEPRECATION", "DEPRECATION_ERROR") public suspend inline fun BroadcastChannel.consumeEach(action: (E) -> Unit): Unit = consume { for (element in this) action(element) } /** @suppress **/ @PublishedApi // Binary compatibility internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = { cause: Throwable? -> var exception: Throwable? = null for (channel in channels) try { channel.cancelConsumed(cause) } catch (e: Throwable) { if (exception == null) { exception = e } else { exception.addSuppressed(e) } } exception?.let { throw it } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.elementAt(index: Int): E = consume { if (index < 0) throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") var count = 0 for (element in this) { @Suppress("UNUSED_CHANGED_VALUE") // KT-47628 if (index == count++) return element } throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.elementAtOrNull(index: Int): E? = consume { if (index < 0) return null var count = 0 for (element in this) { if (index == count++) return element } return null } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.first(): E = consume { val iterator = iterator() if (!iterator.hasNext()) throw NoSuchElementException("ReceiveChannel is empty.") return iterator.next() } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.firstOrNull(): E? = consume { val iterator = iterator() if (!iterator.hasNext()) return null return iterator.next() } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.indexOf(element: E): Int { var index = 0 consumeEach { if (element == it) return index index++ } return -1 } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.last(): E = consume { val iterator = iterator() if (!iterator.hasNext()) throw NoSuchElementException("ReceiveChannel is empty.") var last = iterator.next() while (iterator.hasNext()) last = iterator.next() return last } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.lastIndexOf(element: E): Int { var lastIndex = -1 var index = 0 consumeEach { if (element == it) lastIndex = index index++ } return lastIndex } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.lastOrNull(): E? = consume { val iterator = iterator() if (!iterator.hasNext()) return null var last = iterator.next() while (iterator.hasNext()) last = iterator.next() return last } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.single(): E = consume { val iterator = iterator() if (!iterator.hasNext()) throw NoSuchElementException("ReceiveChannel is empty.") val single = iterator.next() if (iterator.hasNext()) throw IllegalArgumentException("ReceiveChannel has more than one element.") return single } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.singleOrNull(): E? = consume { val iterator = iterator() if (!iterator.hasNext()) return null val single = iterator.next() if (iterator.hasNext()) return null return single } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { require(n >= 0) { "Requested element count $n is less than zero." } var remaining: Int = n if (remaining > 0) for (e in this@drop) { remaining-- if (remaining == 0) break } for (e in this@drop) { send(e) } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.dropWhile( context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { for (e in this@dropWhile) { if (!predicate(e)) { send(e) break } } for (e in this@dropWhile) { send(e) } } @PublishedApi internal fun ReceiveChannel.filter( context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { for (e in this@filter) { if (predicate(e)) send(e) } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.filterIndexed( context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { var index = 0 for (e in this@filterIndexed) { if (predicate(index++, e)) send(e) } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.filterNot( context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean ): ReceiveChannel = filter(context) { !predicate(it) } @PublishedApi @Suppress("UNCHECKED_CAST") internal fun ReceiveChannel.filterNotNull(): ReceiveChannel = filter { it != null } as ReceiveChannel /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun > ReceiveChannel.filterNotNullTo(destination: C): C { consumeEach { if (it != null) destination.add(it) } return destination } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun > ReceiveChannel.filterNotNullTo(destination: C): C { consumeEach { if (it != null) destination.send(it) } return destination } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { if (n == 0) return@produce require(n >= 0) { "Requested element count $n is less than zero." } var remaining: Int = n for (e in this@take) { send(e) remaining-- if (remaining == 0) return@produce } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.takeWhile( context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { for (e in this@takeWhile) { if (!predicate(e)) return@produce send(e) } } @PublishedApi internal suspend fun > ReceiveChannel.toChannel(destination: C): C { consumeEach { destination.send(it) } return destination } @PublishedApi internal suspend fun > ReceiveChannel.toCollection(destination: C): C { consumeEach { destination.add(it) } return destination } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel>.toMap(): Map = toMap(LinkedHashMap()) @PublishedApi internal suspend fun > ReceiveChannel>.toMap(destination: M): M { consumeEach { destination += it } return destination } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.toMutableList(): MutableList = toCollection(ArrayList()) /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.toSet(): Set = this.toMutableSet() /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.flatMap( context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { for (e in this@flatMap) { transform(e).toChannel(this) } } @PublishedApi internal fun ReceiveChannel.map( context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { consumeEach { send(transform(it)) } } @PublishedApi internal fun ReceiveChannel.mapIndexed( context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { var index = 0 for (e in this@mapIndexed) { send(transform(index++, e)) } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.mapIndexedNotNull( context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R? ): ReceiveChannel = mapIndexed(context, transform).filterNotNull() /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.mapNotNull( context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R? ): ReceiveChannel = map(context, transform).filterNotNull() /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel> = GlobalScope.produce(context, onCompletion = consumes()) { var index = 0 for (e in this@withIndex) { send(IndexedValue(index++, e)) } } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.distinct(): ReceiveChannel = this.distinctBy { it } @PublishedApi internal fun ReceiveChannel.distinctBy( context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { val keys = HashSet() for (e in this@distinctBy) { val k = selector(e) if (k !in keys) { send(e) keys += k } } } @PublishedApi internal suspend fun ReceiveChannel.toMutableSet(): MutableSet = toCollection(LinkedHashSet()) /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.any(): Boolean = consume { return iterator().hasNext() } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.count(): Int { var count = 0 consumeEach { count++ } return count } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.maxWith(comparator: Comparator): E? = consume { val iterator = iterator() if (!iterator.hasNext()) return null var max = iterator.next() while (iterator.hasNext()) { val e = iterator.next() if (comparator.compare(max, e) < 0) max = e } return max } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.minWith(comparator: Comparator): E? = consume { val iterator = iterator() if (!iterator.hasNext()) return null var min = iterator.next() while (iterator.hasNext()) { val e = iterator.next() if (comparator.compare(min, e) > 0) min = e } return min } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public suspend fun ReceiveChannel.none(): Boolean = consume { return !iterator().hasNext() } /** @suppress **/ @Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN) public fun ReceiveChannel.requireNoNulls(): ReceiveChannel = map { it ?: throw IllegalArgumentException("null element found in $this.") } /** @suppress **/ @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) public infix fun ReceiveChannel.zip(other: ReceiveChannel): ReceiveChannel> = zip(other) { t1, t2 -> t1 to t2 } @PublishedApi // Binary compatibility internal fun ReceiveChannel.zip( other: ReceiveChannel, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V ): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumesAll(this, other)) { val otherIterator = other.iterator() this@zip.consumeEach { element1 -> if (!otherIterator.hasNext()) return@consumeEach val element2 = otherIterator.next() send(transform(element1, element2)) } } @PublishedApi // Binary compatibility internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? -> cancelConsumed(cause) }