@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.reactive.* import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* import kotlinx.coroutines.internal.* /** * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result. * Every time the returned mono is subscribed, it starts a new coroutine. * If the result of [block] is `null`, [MonoSink.success] is invoked without a value. * Unsubscribing cancels the running coroutine. * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ public fun mono( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono { require(context[Job] === null) { "Mono context cannot contain job in it." + "Its lifecycle should be managed via Disposable handle. Had $context" } return monoInternal(GlobalScope, context, block) } /** * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if * this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value, * `null` is returned. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun Mono.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> injectCoroutineContext(cont.context).subscribe(object : Subscriber { private var value: T? = null override fun onSubscribe(s: Subscription) { cont.invokeOnCancellation { s.cancel() } s.request(Long.MAX_VALUE) } override fun onComplete() { cont.resume(value) value = null } override fun onNext(t: T) { // We don't return the value immediately because the process that emitted it may not be finished yet. // Resuming now could lead to race conditions between emitter and the awaiting code. value = t } override fun onError(error: Throwable) { cont.resumeWithException(error) } }) } /** * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, * if this Mono has produced an error, throws the corresponding exception. * * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this * function immediately cancels its [Subscription] and resumes with [CancellationException]. * * @throws NoSuchElementException if the Mono does not emit any value */ // TODO: consider using https://github.com/Kotlin/kotlinx.coroutines/issues/2607 once that lands public suspend fun Mono.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() private fun monoInternal( scope: CoroutineScope, // support for legacy mono in scope context: CoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono = Mono.create { sink -> val reactorContext = context.extendReactorContext(sink.currentContext()) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = MonoCoroutine(newContext, sink) sink.onDispose(coroutine) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) } private class MonoCoroutine( parentContext: CoroutineContext, private val sink: MonoSink ) : AbstractCoroutine(parentContext, false, true), Disposable { @Volatile private var disposed = false override fun onCompleted(value: T) { if (value == null) sink.success() else sink.success(value) } override fun onCancelled(cause: Throwable, handled: Boolean) { /** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */ val unwrappedCause = unwrap(cause) if (getCancellationException() !== unwrappedCause || !disposed) { try { /** If [sink] turns out to already be in a terminal state, this exception will be passed through the * [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */ sink.error(cause) } catch (e: Throwable) { // In case of improper error implementation or fatal exceptions cause.addSuppressed(e) handleCoroutineException(context, cause) } } } override fun dispose() { disposed = true cancel() } override fun isDisposed(): Boolean = disposed } /** * @suppress */ @Deprecated( message = "CoroutineScope.mono is deprecated in favour of top-level mono", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("mono(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 public fun CoroutineScope.mono( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono = monoInternal(this, context, block) /** * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. * On [Publisher] instances other than [Mono], this function is not deprecated. * * Both [awaitFirst] and [awaitSingle] await the first value, or throw [NoSuchElementException] if there is none, but * the name [Mono.awaitSingle] better reflects the semantics of [Mono]. * * For example, consider this code: * ``` * myDbClient.findById(uniqueId).awaitFirst() // findById returns a `Mono` * ``` * It looks like more than one value could be returned from `findById` and [awaitFirst] discards the extra elements, * when in fact, at most a single value can be present. * * @suppress */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingle() instead.", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("this.awaitSingle()") ) // Warning since 1.5, error in 1.6 public suspend fun Mono.awaitFirst(): T = awaitSingle() /** * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. * On [Publisher] instances other than [Mono], this function is not deprecated. * * Both [awaitFirstOrDefault] and [awaitSingleOrNull] await the first value, or return some special value if there * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. * * For example, consider this code: * ``` * myDbClient.findById(uniqueId).awaitFirstOrDefault(default) // findById returns a `Mono` * ``` * It looks like more than one value could be returned from `findById` and [awaitFirstOrDefault] discards the extra * elements, when in fact, at most a single value can be present. * * @suppress */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingleOrNull() instead.", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") ) // Warning since 1.5, error in 1.6 public suspend fun Mono.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default /** * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. * On [Publisher] instances other than [Mono], this function is not deprecated. * * Both [awaitFirstOrNull] and [awaitSingleOrNull] await the first value, or return some special value if there * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. * * For example, consider this code: * ``` * myDbClient.findById(uniqueId).awaitFirstOrNull() // findById returns a `Mono` * ``` * It looks like more than one value could be returned from `findById` and [awaitFirstOrNull] discards the extra * elements, when in fact, at most a single value can be present. * * @suppress */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingleOrNull() instead.", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("this.awaitSingleOrNull()") ) // Warning since 1.5, error in 1.6 public suspend fun Mono.awaitFirstOrNull(): T? = awaitSingleOrNull() /** * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. * On [Publisher] instances other than [Mono], this function is not deprecated. * * Both [awaitFirstOrElse] and [awaitSingleOrNull] await the first value, or return some special value if there * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. * * For example, consider this code: * ``` * myDbClient.findById(uniqueId).awaitFirstOrElse(defaultValue) // findById returns a `Mono` * ``` * It looks like more than one value could be returned from `findById` and [awaitFirstOrElse] discards the extra * elements, when in fact, at most a single value can be present. * * @suppress */ @Deprecated( message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + "Please use awaitSingleOrNull() instead.", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()") ) // Warning since 1.5, error in 1.6 public suspend fun Mono.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue() /** * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. * On [Publisher] instances other than [Mono], this function is not deprecated. * * Both [awaitLast] and [awaitSingle] await the single value, or throw [NoSuchElementException] if there is none, but * the name [Mono.awaitSingle] better reflects the semantics of [Mono]. * * For example, consider this code: * ``` * myDbClient.findById(uniqueId).awaitLast() // findById returns a `Mono` * ``` * It looks like more than one value could be returned from `findById` and [awaitLast] discards the initial elements, * when in fact, at most a single value can be present. * * @suppress */ @Deprecated( message = "Mono produces at most one value, so the last element is the same as the first. " + "Please use awaitSingle() instead.", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("this.awaitSingle()") ) // Warning since 1.5, error in 1.6 public suspend fun Mono.awaitLast(): T = awaitSingle()