package kotlinx.coroutines.rx2 import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.reactive.* import org.junit.* import org.junit.Test import kotlin.test.* class FlowableTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) val observable = rxFlowable(currentDispatcher()) { expect(4) send("OK") } expect(2) observable.subscribe { value -> expect(5) assertEquals("OK", value) } expect(3) yield() // to started coroutine finish(6) } @Test fun testBasicFailure() = runBlocking { expect(1) val observable = rxFlowable(currentDispatcher()) { expect(4) throw RuntimeException("OK") } expect(2) observable.subscribe({ expectUnreached() }, { error -> expect(5) assertIs(error) assertEquals("OK", error.message) }) expect(3) yield() // to started coroutine finish(6) } @Test fun testBasicUnsubscribe() = runBlocking { expect(1) val observable = rxFlowable(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() } expect(2) val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() }) expect(3) yield() // to started coroutine expect(5) sub.dispose() // will cancel coroutine yield() finish(6) } @Test fun testNotifyOnceOnCancellation() = runTest { expect(1) val observable = rxFlowable(currentDispatcher()) { expect(5) send("OK") try { delay(Long.MAX_VALUE) } catch (e: CancellationException) { expect(11) } } .doOnNext { expect(6) assertEquals("OK", it) } .doOnCancel { expect(10) // notified once! } expect(2) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(3) observable.collect { expect(8) assertEquals("OK", it) } } expect(4) yield() // to observable code expect(7) yield() // to consuming coroutines expect(9) job.cancel() job.join() finish(12) } @Test fun testFailingConsumer() = runTest { val pub = rxFlowable(currentDispatcher()) { repeat(3) { expect(it + 1) // expect(1), expect(2) *should* be invoked send(it) } } try { pub.collect { throw TestException() } } catch (e: TestException) { finish(3) } } }