package kotlinx.coroutines.flow import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.coroutines.* import kotlin.test.* /** * This test suit for [SharedFlow] has a dense framework that allows to test complex * suspend/resume scenarios while keeping the code readable. Each test here is for * one specific [SharedFlow] configuration, testing all the various corner cases in its * behavior. */ class SharedFlowScenarioTest : TestBase() { @Test fun testReplay1Extra2() = testSharedFlow(MutableSharedFlow(1, 2)) { // total buffer size == 3 expectReplayOf() emitRightNow(1); expectReplayOf(1) emitRightNow(2); expectReplayOf(2) emitRightNow(3); expectReplayOf(3) emitRightNow(4); expectReplayOf(4) // no prob - no subscribers val a = subscribe("a"); collect(a, 4) emitRightNow(5); expectReplayOf(5) emitRightNow(6); expectReplayOf(6) emitRightNow(7); expectReplayOf(7) // suspend/collect sequentially val e8 = emitSuspends(8); collect(a, 5); emitResumes(e8); expectReplayOf(8) val e9 = emitSuspends(9); collect(a, 6); emitResumes(e9); expectReplayOf(9) // buffer full, but parallel emitters can still suspend (queue up) val e10 = emitSuspends(10) val e11 = emitSuspends(11) val e12 = emitSuspends(12) collect(a, 7); emitResumes(e10); expectReplayOf(10) // buffer 8, 9 | 10 collect(a, 8); emitResumes(e11); expectReplayOf(11) // buffer 9, 10 | 11 sharedFlow.resetReplayCache(); expectReplayOf() // 9, 10 11 | no replay collect(a, 9); emitResumes(e12); expectReplayOf(12) collect(a, 10, 11, 12); expectReplayOf(12) // buffer empty | 12 emitRightNow(13); expectReplayOf(13) emitRightNow(14); expectReplayOf(14) emitRightNow(15); expectReplayOf(15) // buffer 13, 14 | 15 val e16 = emitSuspends(16) val e17 = emitSuspends(17) val e18 = emitSuspends(18) cancel(e17); expectReplayOf(15) // cancel in the middle of three emits; buffer 13, 14 | 15 collect(a, 13); emitResumes(e16); expectReplayOf(16) // buffer 14, 15, | 16 collect(a, 14); emitResumes(e18); expectReplayOf(18) // buffer 15, 16 | 18 val e19 = emitSuspends(19) val e20 = emitSuspends(20) val e21 = emitSuspends(21) cancel(e21); expectReplayOf(18) // cancel last emit; buffer 15, 16, 18 collect(a, 15); emitResumes(e19); expectReplayOf(19) // buffer 16, 18 | 19 collect(a, 16); emitResumes(e20); expectReplayOf(20) // buffer 18, 19 | 20 collect(a, 18, 19, 20); expectReplayOf(20) // buffer empty | 20 emitRightNow(22); expectReplayOf(22) emitRightNow(23); expectReplayOf(23) emitRightNow(24); expectReplayOf(24) // buffer 22, 23 | 24 val e25 = emitSuspends(25) val e26 = emitSuspends(26) val e27 = emitSuspends(27) cancel(e25); expectReplayOf(24) // cancel first emit, buffer 22, 23 | 24 sharedFlow.resetReplayCache(); expectReplayOf() // buffer 22, 23, 24 | no replay val b = subscribe("b") // new subscriber collect(a, 22); emitResumes(e26); expectReplayOf(26) // buffer 23, 24 | 26 collect(b, 26) collect(a, 23); emitResumes(e27); expectReplayOf(27) // buffer 24, 26 | 27 collect(a, 24, 26, 27) // buffer empty | 27 emitRightNow(28); expectReplayOf(28) emitRightNow(29); expectReplayOf(29) // buffer 27, 28 | 29 collect(a, 28, 29) // but b is slow val e30 = emitSuspends(30) val e31 = emitSuspends(31) val e32 = emitSuspends(32) val e33 = emitSuspends(33) val e34 = emitSuspends(34) val e35 = emitSuspends(35) val e36 = emitSuspends(36) val e37 = emitSuspends(37) val e38 = emitSuspends(38) val e39 = emitSuspends(39) cancel(e31) // cancel emitter in queue cancel(b) // cancel slow subscriber -> 3 emitters resume emitResumes(e30); emitResumes(e32); emitResumes(e33); expectReplayOf(33) // buffer 30, 32 | 33 val c = subscribe("c"); collect(c, 33) // replays cancel(e34) collect(a, 30); emitResumes(e35); expectReplayOf(35) // buffer 32, 33 | 35 cancel(e37) cancel(a); emitResumes(e36); emitResumes(e38); expectReplayOf(38) // buffer 35, 36 | 38 collect(c, 35); emitResumes(e39); expectReplayOf(39) // buffer 36, 38 | 39 collect(c, 36, 38, 39); expectReplayOf(39) cancel(c); expectReplayOf(39) // replay stays } @Test fun testReplay1() = testSharedFlow(MutableSharedFlow(1)) { emitRightNow(0); expectReplayOf(0) emitRightNow(1); expectReplayOf(1) emitRightNow(2); expectReplayOf(2) sharedFlow.resetReplayCache(); expectReplayOf() sharedFlow.resetReplayCache(); expectReplayOf() emitRightNow(3); expectReplayOf(3) emitRightNow(4); expectReplayOf(4) val a = subscribe("a"); collect(a, 4) emitRightNow(5); expectReplayOf(5); collect(a, 5) emitRightNow(6) sharedFlow.resetReplayCache(); expectReplayOf() sharedFlow.resetReplayCache(); expectReplayOf() val e7 = emitSuspends(7) val e8 = emitSuspends(8) val e9 = emitSuspends(9) collect(a, 6); emitResumes(e7); expectReplayOf(7) sharedFlow.resetReplayCache(); expectReplayOf() sharedFlow.resetReplayCache(); expectReplayOf() // buffer 7 | -- no replay, but still buffered val b = subscribe("b") collect(a, 7); emitResumes(e8); expectReplayOf(8) collect(b, 8) // buffer | 8 -- a is slow val e10 = emitSuspends(10) val e11 = emitSuspends(11) val e12 = emitSuspends(12) cancel(e9) collect(a, 8); emitResumes(e10); expectReplayOf(10) collect(a, 10) // now b's slow cancel(e11) collect(b, 10); emitResumes(e12); expectReplayOf(12) collect(a, 12) collect(b, 12) sharedFlow.resetReplayCache(); expectReplayOf() sharedFlow.resetReplayCache(); expectReplayOf() // nothing is buffered -- both collectors up to date emitRightNow(13); expectReplayOf(13) collect(b, 13) // a is slow val e14 = emitSuspends(14) val e15 = emitSuspends(15) val e16 = emitSuspends(16) cancel(e14) cancel(a); emitResumes(e15); expectReplayOf(15) // cancelling slow subscriber collect(b, 15); emitResumes(e16); expectReplayOf(16) collect(b, 16) } @Test fun testReplay2Extra2DropOldest() = testSharedFlow(MutableSharedFlow(2, 2, BufferOverflow.DROP_OLDEST)) { emitRightNow(0); expectReplayOf(0) emitRightNow(1); expectReplayOf(0, 1) emitRightNow(2); expectReplayOf(1, 2) emitRightNow(3); expectReplayOf(2, 3) emitRightNow(4); expectReplayOf(3, 4) val a = subscribe("a") collect(a, 3) emitRightNow(5); expectReplayOf(4, 5) emitRightNow(6); expectReplayOf(5, 6) emitRightNow(7); expectReplayOf(6, 7) // buffer 4, 5 | 6, 7 emitRightNow(8); expectReplayOf(7, 8) // buffer 5, 6 | 7, 8 emitRightNow(9); expectReplayOf(8, 9) // buffer 6, 7 | 8, 9 collect(a, 6, 7) val b = subscribe("b") collect(b, 8, 9) // buffer | 8, 9 emitRightNow(10); expectReplayOf(9, 10) // buffer 8 | 9, 10 collect(a, 8, 9, 10) // buffer | 9, 10, note "b" had not collected 10 yet emitRightNow(11); expectReplayOf(10, 11) // buffer | 10, 11 emitRightNow(12); expectReplayOf(11, 12) // buffer 10 | 11, 12 emitRightNow(13); expectReplayOf(12, 13) // buffer 10, 11 | 12, 13 emitRightNow(14); expectReplayOf(13, 14) // buffer 11, 12 | 13, 14, "b" missed 10 collect(b, 11, 12, 13, 14) sharedFlow.resetReplayCache(); expectReplayOf() // buffer 11, 12, 13, 14 | sharedFlow.resetReplayCache(); expectReplayOf() collect(a, 11, 12, 13, 14) emitRightNow(15); expectReplayOf(15) collect(a, 15) collect(b, 15) } @Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320 fun testResumeFastSubscriberOnResumedEmitter() = testSharedFlow(MutableSharedFlow(1)) { // create two subscribers and start collecting val s1 = subscribe("s1"); resumeCollecting(s1) val s2 = subscribe("s2"); resumeCollecting(s2) // now emit 0, make sure it is collected emitRightNow(0); expectReplayOf(0) awaitCollected(s1, 0) awaitCollected(s2, 0) // now emit 1, and only first subscriber continues and collects it emitRightNow(1); expectReplayOf(1) collect(s1, 1) // now emit 2, it suspend (s2 is blocking it) val e2 = emitSuspends(2) resumeCollecting(s1) // resume, but does not collect (e2 is still queued) collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1 awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!! emitResumes(e2); expectReplayOf(2) // now emit 3, it suspends (s2 blocks it) val e3 = emitSuspends(3) collect(s2, 2) emitResumes(e3); expectReplayOf(3) } @Test fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() = testSharedFlow(MutableSharedFlow(1)) { val a = subscribe("a"); emitRightNow(0); expectReplayOf(0) collect(a, 0) emitRightNow(1); expectReplayOf(1) val e2 = emitSuspends(2) // suspends until 1 is collected val e3 = emitSuspends(3) // suspends until 1 is collected, too cancel(a) // must resume emitters 2 & 3 emitResumes(e2) emitResumes(e3) expectReplayOf(3) // but replay size is 1 so only 3 should be kept // Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer val b = subscribe("b") collect(b, 3) emitRightNow(4); expectReplayOf(4) collect(b, 4) } @Test fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() = testSharedFlow(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) { val a = subscribe("a"); emitRightNow(0); expectReplayOf(0) collect(a, 0) emitRightNow(1); expectReplayOf(1) emitRightNow(2); expectReplayOf(2) val e3 = emitSuspends(3) // suspends until 1 is collected val e4 = emitSuspends(4) // suspends until 1 is collected, too val e5 = emitSuspends(5) // suspends until 1 is collected, too cancel(a) // must resume emitters 3, 4, 5 emitResumes(e3) emitResumes(e4) emitResumes(e5) expectReplayOf(5) val b = subscribe("b") collect(b, 5) emitRightNow(6); expectReplayOf(6) collect(b, 6) } private fun testSharedFlow( sharedFlow: MutableSharedFlow, scenario: suspend ScenarioDsl.() -> Unit ) = runTest { var dsl: ScenarioDsl? = null try { coroutineScope { dsl = ScenarioDsl(sharedFlow, coroutineContext) dsl!!.scenario() dsl!!.stop() } } catch (e: Throwable) { dsl?.printLog() throw e } } private data class TestJob(val job: Job, val name: String) { override fun toString(): String = name } private open class Action private data class EmitResumes(val job: TestJob) : Action() private data class Collected(val job: TestJob, val value: Any?) : Action() private data class ResumeCollecting(val job: TestJob) : Action() private data class Cancelled(val job: TestJob) : Action() @OptIn(ExperimentalStdlibApi::class) private class ScenarioDsl( val sharedFlow: MutableSharedFlow, coroutineContext: CoroutineContext ) { private val log = ArrayList() private val timeout = 10000L private val scope = CoroutineScope(coroutineContext + Job()) private val actions = HashSet() private val actionWaiters = ArrayDeque>() private var expectedReplay = emptyList() private fun checkReplay() { assertEquals(expectedReplay, sharedFlow.replayCache) } private fun wakeupWaiters() { repeat(actionWaiters.size) { actionWaiters.removeFirst().resume(Unit) } } private fun addAction(action: Action) { actions.add(action) wakeupWaiters() } private suspend fun awaitAction(action: Action) { withTimeoutOrNull(timeout) { while (!actions.remove(action)) { suspendCancellableCoroutine { actionWaiters.add(it) } } } ?: error("Timed out waiting for action: $action") wakeupWaiters() } private fun launchEmit(a: T): TestJob { val name = "emit($a)" val job = scope.launch(start = CoroutineStart.UNDISPATCHED) { val job = TestJob(coroutineContext[Job]!!, name) try { log(name) sharedFlow.emit(a) log("$name resumes") addAction(EmitResumes(job)) } catch(e: CancellationException) { log("$name cancelled") addAction(Cancelled(job)) } } return TestJob(job, name) } fun expectReplayOf(vararg a: T) { expectedReplay = a.toList() checkReplay() } fun emitRightNow(a: T) { val job = launchEmit(a) assertTrue(actions.remove(EmitResumes(job))) } fun emitSuspends(a: T): TestJob { val job = launchEmit(a) assertFalse(EmitResumes(job) in actions) checkReplay() return job } suspend fun emitResumes(job: TestJob) { awaitAction(EmitResumes(job)) } suspend fun cancel(job: TestJob) { log("cancel(${job.name})") job.job.cancel() awaitAction(Cancelled(job)) } fun subscribe(id: String): TestJob { val name = "collect($id)" val job = scope.launch(start = CoroutineStart.UNDISPATCHED) { val job = TestJob(coroutineContext[Job]!!, name) try { awaitAction(ResumeCollecting(job)) log("$name start") sharedFlow.collect { value -> log("$name -> $value") addAction(Collected(job, value)) awaitAction(ResumeCollecting(job)) log("$name -> $value resumes") } error("$name completed") } catch(e: CancellationException) { log("$name cancelled") addAction(Cancelled(job)) } } return TestJob(job, name) } // collect ~== resumeCollecting + awaitCollected (for each value) suspend fun collect(job: TestJob, vararg a: T) { for (value in a) { checkReplay() // should not have changed resumeCollecting(job) awaitCollected(job, value) } } suspend fun resumeCollecting(job: TestJob) { addAction(ResumeCollecting(job)) } suspend fun awaitCollected(job: TestJob, value: T) { awaitAction(Collected(job, value)) } fun stop() { log("--- stop") scope.cancel() } private fun log(text: String) { log.add(text) } fun printLog() { println("--- The most recent log entries ---") log.takeLast(30).forEach(::println) println("--- That's it ---") } } }