package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.reactive.FlowSubscription import org.reactivestreams.* import reactor.core.CoreSubscriber import reactor.core.publisher.Flux import kotlin.coroutines.* /** * Converts the given flow to a cold flux. * The original flow is cancelled when the flux subscriber is disposed. * * This function is integrated with [ReactorContext], see its documentation for additional details. * * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ @JvmOverloads // binary compatibility public fun Flow.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = FlowAsFlux(this, Dispatchers.Unconfined + context) private class FlowAsFlux( private val flow: Flow, private val context: CoroutineContext ) : Flux() { override fun subscribe(subscriber: CoreSubscriber) { val hasContext = !subscriber.currentContext().isEmpty val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow subscriber.onSubscribe(FlowSubscription(source, subscriber, context)) } }