package kotlinx.coroutines.testing.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.jvm.* import kotlin.reflect.* public typealias Handler = suspend CoroutineScope.(T) -> Unit /* * Design of this builder is not yet stable, so leaving it as is. */ public class LaunchFlowBuilder { /* * NB: this implementation is a temporary ad-hoc (and slightly incorrect) * solution until coroutine-builders are ready * * NB 2: this internal stuff is required to workaround KT-30795 */ @PublishedApi internal var onEach: Handler? = null @PublishedApi internal var finally: Handler? = null @PublishedApi internal var exceptionHandlers = LinkedHashMap, Handler>() public fun onEach(action: suspend CoroutineScope.(value: T) -> Unit) { check(onEach == null) { "onEach block is already registered" } check(exceptionHandlers.isEmpty()) { "onEach block should be registered before exceptionHandlers block" } check(finally == null) { "onEach block should be registered before finally block" } onEach = action } public inline fun catch(noinline action: suspend CoroutineScope.(T) -> Unit) { check(onEach != null) { "onEach block should be registered first" } check(finally == null) { "exceptionHandlers block should be registered before finally block" } @Suppress("UNCHECKED_CAST") exceptionHandlers[T::class] = action as Handler } public fun finally(action: suspend CoroutineScope.(cause: Throwable?) -> Unit) { check(finally == null) { "Finally block is already registered" } check(onEach != null) { "onEach block should be registered before finally block" } if (finally == null) finally = action } internal fun build(): Handlers = Handlers(onEach ?: error("onEach is not registered"), exceptionHandlers, finally) } internal class Handlers( @JvmField internal var onEach: Handler, @JvmField internal var exceptionHandlers: Map, Handler>, @JvmField internal var finally: Handler? ) private fun CoroutineScope.launchFlow( flow: Flow, builder: LaunchFlowBuilder.() -> Unit ): Job { val handlers = LaunchFlowBuilder().apply(builder).build() return launch { var caught: Throwable? = null try { coroutineScope { flow.collect { value -> handlers.onEach(this, value) } } } catch (e: Throwable) { handlers.exceptionHandlers.forEach { (key, value) -> if (key.isInstance(e)) { caught = e value.invoke(this, e) return@forEach } } if (caught == null) { caught = e throw e } } finally { cancel() // TODO discuss handlers.finally?.invoke(CoroutineScope(coroutineContext + NonCancellable), caught) } } } public fun Flow.launchIn( scope: CoroutineScope, builder: LaunchFlowBuilder.() -> Unit ): Job = scope.launchFlow(this, builder)