package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.reactivestreams.* import java.util.concurrent.atomic.* import kotlin.coroutines.* /** * Converts this job to the hot reactive completable that signals * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes. * * Every subscriber gets the signal at the same time. * Unsubscribing from the resulting completable **does not** affect the original job in any way. * * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change * in the future to account for the concept of structured concurrency. * * @param context -- the coroutine context from which the resulting completable is going to be signalled */ public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } /** * Converts this deferred value to the hot reactive maybe that signals * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError]. * * Every subscriber gets the same completion value. * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way. * * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change * in the future to account for the concept of structured concurrency. * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } /** * Converts this deferred value to the hot reactive single that signals either * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError]. * * Every subscriber gets the same completion value. * Unsubscribing from the resulting single **does not** affect the original deferred value in any way. * * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change * in the future to account for the concept of structured concurrency. * * @param context -- the coroutine context from which the resulting single is going to be signalled */ public fun Deferred.asSingle(context: CoroutineContext): Single = rxSingle(context) { this@asSingle.await() } /** * Transforms given cold [ObservableSource] into cold [Flow]. * * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator * is applied to the resulting flow. * * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ public fun ObservableSource.asFlow(): Flow = callbackFlow { val disposableRef = AtomicReference() val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { /* * Channel was closed by the downstream, so the exception (if any) * also was handled by the same downstream */ try { trySendBlocking(t) } catch (e: InterruptedException) { // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } } subscribe(observer) awaitClose { disposableRef.getAndSet(Disposables.disposed())?.dispose() } } /** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. * * An optional [context] can be specified to control the execution context of calls to [Observer] methods. * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if * asObservable is already invoked from unconfined */ val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) { try { collect { value -> emitter.onNext(value) } emitter.onComplete() } catch (e: Throwable) { // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete` if (e !is CancellationException) { if (!emitter.tryOnError(e)) { handleUndeliverableException(e, coroutineContext) } } else { emitter.onComplete() } } } emitter.setCancellable(RxCancellable(job)) } /** * Converts the given flow to a cold flowable. * The original flow is cancelled when the flowable subscriber is disposed. * * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = Flowable.fromPublisher(asPublisher(context)) @Deprecated( message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow") ) // Deprecated since 1.4.0 public fun ReceiveChannel.asObservable(context: CoroutineContext): Observable = rxObservable(context) { for (t in this@asObservable) send(t) } /** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that public fun Flow._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = asFlowable(context) /** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that public fun Flow._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = asObservable(context)