package kotlinx.coroutines.flow import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.random.* import kotlin.test.* /** * This test suite contains some basic tests for [SharedFlow]. There are some scenarios here written * using [expect] and they are not very readable. See [SharedFlowScenarioTest] for a better * behavioral test-suit. */ class SharedFlowTest : TestBase() { @Test fun testRendezvousSharedFlowBasic() = runTest { expect(1) val sh = MutableSharedFlow() assertTrue(sh.replayCache.isEmpty()) assertEquals(0, sh.subscriptionCount.value) sh.emit(1) // no suspend assertTrue(sh.replayCache.isEmpty()) assertEquals(0, sh.subscriptionCount.value) expect(2) // one collector val job1 = launch(start = CoroutineStart.UNDISPATCHED) { expect(3) sh.collect { when(it) { 4 -> expect(5) 6 -> expect(7) 10 -> expect(11) 13 -> expect(14) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(4) assertEquals(1, sh.subscriptionCount.value) sh.emit(4) assertTrue(sh.replayCache.isEmpty()) expect(6) sh.emit(6) expect(8) // one more collector val job2 = launch(start = CoroutineStart.UNDISPATCHED) { expect(9) sh.collect { when(it) { 10 -> expect(12) 13 -> expect(15) 17 -> expect(18) null -> expect(20) 21 -> expect(22) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(10) assertEquals(2, sh.subscriptionCount.value) sh.emit(10) // to both collectors now! assertTrue(sh.replayCache.isEmpty()) expect(13) sh.emit(13) expect(16) job1.cancel() // cancel the first collector yield() assertEquals(1, sh.subscriptionCount.value) expect(17) sh.emit(17) // only to second collector expect(19) sh.emit(null) // emit null to the second collector expect(21) sh.emit(21) // non-null again expect(23) job2.cancel() // cancel the second collector yield() assertEquals(0, sh.subscriptionCount.value) expect(24) sh.emit(24) // does not go anywhere assertEquals(0, sh.subscriptionCount.value) assertTrue(sh.replayCache.isEmpty()) finish(25) } @Test fun testRendezvousSharedFlowReset() = runTest { expect(1) val sh = MutableSharedFlow() val barrier = Channel(1) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) sh.collect { when (it) { 3 -> { expect(4) barrier.receive() // hold on before collecting next one } 6 -> expect(10) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(3) sh.emit(3) // rendezvous expect(5) assertFalse(sh.tryEmit(5)) // collector is not ready now launch(start = CoroutineStart.UNDISPATCHED) { expect(6) sh.emit(6) // suspends expect(12) } expect(7) yield() // no wakeup -> all suspended expect(8) // now reset cache -> nothing happens, there is no cache sh.resetReplayCache() yield() expect(9) // now resume collector barrier.send(Unit) yield() // to collector expect(11) yield() // to emitter expect(13) assertFalse(sh.tryEmit(13)) // rendezvous does not work this way job.cancel() finish(14) } @Test fun testReplay1SharedFlowBasic() = runTest { expect(1) val sh = MutableSharedFlow(1) assertTrue(sh.replayCache.isEmpty()) assertEquals(0, sh.subscriptionCount.value) sh.emit(1) // no suspend assertEquals(listOf(1), sh.replayCache) assertEquals(0, sh.subscriptionCount.value) expect(2) sh.emit(2) // no suspend assertEquals(listOf(2), sh.replayCache) expect(3) // one collector val job1 = launch(start = CoroutineStart.UNDISPATCHED) { expect(4) sh.collect { when(it) { 2 -> expect(5) // got it immediately from replay cache 6 -> expect(8) null -> expect(14) 17 -> expect(18) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(6) assertEquals(1, sh.subscriptionCount.value) sh.emit(6) // does not suspend, but buffers assertEquals(listOf(6), sh.replayCache) expect(7) yield() expect(9) // one more collector val job2 = launch(start = CoroutineStart.UNDISPATCHED) { expect(10) sh.collect { when(it) { 6 -> expect(11) // from replay cache null -> expect(15) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(12) assertEquals(2, sh.subscriptionCount.value) sh.emit(null) expect(13) assertEquals(listOf(null), sh.replayCache) yield() assertEquals(listOf(null), sh.replayCache) expect(16) job2.cancel() yield() assertEquals(1, sh.subscriptionCount.value) expect(17) sh.emit(17) assertEquals(listOf(17), sh.replayCache) yield() expect(19) job1.cancel() yield() assertEquals(0, sh.subscriptionCount.value) assertEquals(listOf(17), sh.replayCache) finish(20) } @Test fun testReplay1() = runTest { expect(1) val sh = MutableSharedFlow(1) assertEquals(listOf(), sh.replayCache) val barrier = Channel(1) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) sh.collect { when (it) { 3 -> { expect(4) barrier.receive() // collector waits } 5 -> expect(10) 6 -> expect(11) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(3) assertTrue(sh.tryEmit(3)) // buffered assertEquals(listOf(3), sh.replayCache) yield() // to collector expect(5) assertTrue(sh.tryEmit(5)) // buffered assertEquals(listOf(5), sh.replayCache) launch(start = CoroutineStart.UNDISPATCHED) { expect(6) sh.emit(6) // buffer full, suspended expect(13) } expect(7) assertEquals(listOf(5), sh.replayCache) sh.resetReplayCache() // clear cache assertEquals(listOf(), sh.replayCache) expect(8) yield() // emitter still suspended expect(9) assertEquals(listOf(), sh.replayCache) assertFalse(sh.tryEmit(10)) // still no buffer space assertEquals(listOf(), sh.replayCache) barrier.send(Unit) // resume collector yield() // to collector expect(12) yield() // to emitter, that should have resumed expect(14) job.cancel() assertEquals(listOf(6), sh.replayCache) finish(15) } @Test fun testReplay2Extra1() = runTest { expect(1) val sh = MutableSharedFlow( replay = 2, extraBufferCapacity = 1 ) assertEquals(listOf(), sh.replayCache) assertTrue(sh.tryEmit(0)) assertEquals(listOf(0), sh.replayCache) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) var cnt = 0 sh.collect { when (it) { 0 -> when (cnt++) { 0 -> expect(3) 1 -> expect(14) else -> expectUnreached() } 1 -> expect(6) 2 -> expect(7) 3 -> expect(8) 4 -> expect(12) 5 -> expect(13) 16 -> expect(17) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(4) assertTrue(sh.tryEmit(1)) // buffered assertEquals(listOf(0, 1), sh.replayCache) assertTrue(sh.tryEmit(2)) // buffered assertEquals(listOf(1, 2), sh.replayCache) assertTrue(sh.tryEmit(3)) // buffered (buffer size is 3) assertEquals(listOf(2, 3), sh.replayCache) expect(5) yield() // to collector expect(9) assertEquals(listOf(2, 3), sh.replayCache) assertTrue(sh.tryEmit(4)) // can buffer now assertEquals(listOf(3, 4), sh.replayCache) assertTrue(sh.tryEmit(5)) // can buffer now assertEquals(listOf(4, 5), sh.replayCache) assertTrue(sh.tryEmit(0)) // can buffer one more, let it be zero again assertEquals(listOf(5, 0), sh.replayCache) expect(10) assertFalse(sh.tryEmit(10)) // cannot buffer anymore! sh.resetReplayCache() // replay cache assertEquals(listOf(), sh.replayCache) // empty assertFalse(sh.tryEmit(0)) // still cannot buffer anymore (reset does not help) assertEquals(listOf(), sh.replayCache) // empty expect(11) yield() // resume collector, will get next values expect(15) sh.resetReplayCache() // reset again, nothing happens assertEquals(listOf(), sh.replayCache) // empty yield() // collector gets nothing -- no change expect(16) assertTrue(sh.tryEmit(16)) assertEquals(listOf(16), sh.replayCache) yield() // gets it expect(18) job.cancel() finish(19) } @Test fun testBufferNoReplayCancelWhileBuffering() = runTest { val n = 123 val sh = MutableSharedFlow(replay = 0, extraBufferCapacity = n) repeat(3) { val m = n / 2 // collect half, then suspend val barrier = Channel(1) val collectorJob = sh .onSubscription { barrier.send(1) } .onEach { value -> if (value == m) { barrier.send(2) delay(Long.MAX_VALUE) } } .launchIn(this) assertEquals(1, barrier.receive()) // make sure it subscribes launch(start = CoroutineStart.UNDISPATCHED) { for (i in 0 until n + m) sh.emit(i) // these emits should go Ok barrier.send(3) sh.emit(n + 4) // this emit will suspend on buffer overflow barrier.send(4) } assertEquals(2, barrier.receive()) // wait until m collected assertEquals(3, barrier.receive()) // wait until all are emitted collectorJob.cancel() // cancelling collector job must clear buffer and resume emitter assertEquals(4, barrier.receive()) // verify that emitter resumes } } @Test fun testRepeatedResetWithReplay() = runTest { val n = 10 val sh = MutableSharedFlow(n) var i = 0 repeat(3) { // collector is slow val collector = sh.onEach { delay(Long.MAX_VALUE) }.launchIn(this) val emitter = launch { repeat(3 * n) { sh.emit(i); i++ } } repeat(3) { yield() } // enough to run it to suspension assertEquals((i - n until i).toList(), sh.replayCache) sh.resetReplayCache() assertEquals(emptyList(), sh.replayCache) repeat(3) { yield() } // enough to run it to suspension assertEquals(emptyList(), sh.replayCache) // still blocked collector.cancel() emitter.cancel() repeat(3) { yield() } // enough to run it to suspension } } @Test fun testSynchronousSharedFlowEmitterCancel() = runTest { expect(1) val sh = MutableSharedFlow() val barrier1 = Job() val barrier2 = Job() val barrier3 = Job() val collector1 = sh.onEach { when (it) { 1 -> expect(3) 2 -> { expect(6) barrier2.complete() } 3 -> { expect(9) barrier3.complete() } else -> expectUnreached() } }.launchIn(this) val collector2 = sh.onEach { when (it) { 1 -> { expect(4) barrier1.complete() delay(Long.MAX_VALUE) } else -> expectUnreached() } }.launchIn(this) repeat(2) { yield() } // launch both subscribers val emitter = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) sh.emit(1) barrier1.join() expect(5) sh.emit(2) // suspends because of slow collector2 expectUnreached() // will be cancelled } barrier2.join() // wait expect(7) // Now cancel the emitter! emitter.cancel() yield() // Cancel slow collector collector2.cancel() yield() // emit to fast collector1 expect(8) sh.emit(3) barrier3.join() expect(10) // cancel it, too collector1.cancel() finish(11) } @Test fun testDifferentBufferedFlowCapacities() = runTest { if (isBoundByJsTestTimeout) return@runTest // Too slow for JS, bounded by 2 sec. default JS timeout for (replay in 0..10) { for (extraBufferCapacity in 0..5) { if (replay == 0 && extraBufferCapacity == 0) continue // test only buffered shared flows try { val sh = MutableSharedFlow(replay, extraBufferCapacity) // repeat the whole test a few times to make sure it works correctly when slots are reused repeat(3) { testBufferedFlow(sh, replay) } } catch (e: Throwable) { error("Failed for replay=$replay, extraBufferCapacity=$extraBufferCapacity", e) } } } } private suspend fun testBufferedFlow(sh: MutableSharedFlow, replay: Int) = withContext(Job()) { reset() expect(1) val n = 100 // initially emitted to fill buffer for (i in 1..n) assertTrue(sh.tryEmit(i)) // initial expected replayCache val rcStart = n - replay + 1 val rcRange = rcStart..n val rcSize = n - rcStart + 1 assertEquals(rcRange.toList(), sh.replayCache) // create collectors val m = 10 // collectors created var ofs = 0 val k = 42 // emissions to collectors val ecRange = n + 1..n + k val jobs = List(m) { jobIndex -> launch(start = CoroutineStart.UNDISPATCHED) { sh.collect { i -> when (i) { in rcRange -> expect(2 + i - rcStart + jobIndex * rcSize) in ecRange -> expect(2 + ofs + jobIndex) else -> expectUnreached() } } expectUnreached() // does not complete normally } } ofs = rcSize * m + 2 expect(ofs) // emit to all k times for (p in ecRange) { sh.emit(p) expect(1 + ofs) // buffered, no suspend yield() ofs += 2 + m expect(ofs) } assertEquals(ecRange.toList().takeLast(replay), sh.replayCache) // cancel all collectors jobs.forEach { it.cancel() } yield() // replay cache is still there assertEquals(ecRange.toList().takeLast(replay), sh.replayCache) finish(1 + ofs) } @Test fun testDropLatest() = testDropLatestOrOldest(BufferOverflow.DROP_LATEST) @Test fun testDropOldest() = testDropLatestOrOldest(BufferOverflow.DROP_OLDEST) private fun testDropLatestOrOldest(bufferOverflow: BufferOverflow) = runTest { reset() expect(1) val sh = MutableSharedFlow(1, onBufferOverflow = bufferOverflow) sh.emit(1) sh.emit(2) // always keeps last w/o collectors assertEquals(listOf(2), sh.replayCache) assertEquals(0, sh.subscriptionCount.value) // one collector val valueAfterOverflow = when (bufferOverflow) { BufferOverflow.DROP_OLDEST -> 5 BufferOverflow.DROP_LATEST -> 4 else -> error("not supported in this test: $bufferOverflow") } val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) sh.collect { when(it) { 2 -> { // replayed expect(3) yield() // and suspends, busy waiting } valueAfterOverflow -> expect(7) 8 -> expect(9) else -> expectUnreached() } } expectUnreached() // does not complete normally } expect(4) assertEquals(1, sh.subscriptionCount.value) assertEquals(listOf(2), sh.replayCache) sh.emit(4) // buffering, collector is busy assertEquals(listOf(4), sh.replayCache) expect(5) sh.emit(5) // Buffer overflow here, will not suspend assertEquals(listOf(valueAfterOverflow), sh.replayCache) expect(6) yield() // to the job expect(8) sh.emit(8) // not busy now assertEquals(listOf(8), sh.replayCache) // buffered yield() // to process expect(10) job.cancel() // cancel the job yield() assertEquals(0, sh.subscriptionCount.value) finish(11) } @Test public fun testOnSubscription() = runTest { expect(1) val sh = MutableSharedFlow() fun share(s: String) { launch(start = CoroutineStart.UNDISPATCHED) { sh.emit(s) } } sh .onSubscription { emit("collector->A") share("share->A") } .onSubscription { emit("collector->B") share("share->B") } .onStart { emit("collector->C") share("share->C") // get's lost, no subscribers yet } .onStart { emit("collector->D") share("share->D") // get's lost, no subscribers yet } .onEach { when (it) { "collector->D" -> expect(2) "collector->C" -> expect(3) "collector->A" -> expect(4) "collector->B" -> expect(5) "share->A" -> expect(6) "share->B" -> { expect(7) currentCoroutineContext().cancel() } else -> expectUnreached() } } .launchIn(this) .join() finish(8) } @Test @Suppress("DEPRECATION") // 'catch' fun onSubscriptionThrows() = runTest { expect(1) val sh = MutableSharedFlow(1) sh.tryEmit("OK") // buffer a string assertEquals(listOf("OK"), sh.replayCache) sh .onSubscription { expect(2) throw TestException() } .catch { e -> assertIs(e) expect(3) } .collect { // onSubscription throw before replay is emitted, so no value is collected if it throws expectUnreached() } assertEquals(0, sh.subscriptionCount.value) finish(4) } @Test fun testBigReplayManySubscribers() = testManySubscribers(true) @Test fun testBigBufferManySubscribers() = testManySubscribers(false) private fun testManySubscribers(replay: Boolean) = runTest { val n = 100 val rnd = Random(replay.hashCode()) val sh = MutableSharedFlow( replay = if (replay) n else 0, extraBufferCapacity = if (replay) 0 else n ) val subs = ArrayList() for (i in 1..n) { sh.emit(i) val subBarrier = Channel() val subJob = SubJob() subs += subJob // will receive all starting from replay or from new emissions only subJob.lastReceived = if (replay) 0 else i subJob.job = sh .onSubscription { subBarrier.send(Unit) // signal subscribed } .onEach { value -> assertEquals(subJob.lastReceived + 1, value) subJob.lastReceived = value } .launchIn(this) subBarrier.receive() // wait until subscribed // must have also receive all from the replay buffer directly after being subscribed assertEquals(subJob.lastReceived, i) // 50% of time cancel one subscriber if (i % 2 == 0) { val victim = subs.removeAt(rnd.nextInt(subs.size)) yield() // make sure victim processed all emissions assertEquals(victim.lastReceived, i) victim.job.cancel() } } yield() // make sure the last emission is processed for (subJob in subs) { assertEquals(subJob.lastReceived, n) subJob.job.cancel() } } private class SubJob { lateinit var job: Job var lastReceived = 0 } @Test fun testStateFlowModel() = runTest { if (isBoundByJsTestTimeout) return@runTest // Too slow for JS, bounded by 2 sec. default JS timeout val stateFlow = MutableStateFlow(null) val expect = modelLog(stateFlow) val sharedFlow = MutableSharedFlow( replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST ) sharedFlow.tryEmit(null) // initial value val actual = modelLog(sharedFlow) { distinctUntilChanged() } for (i in 0 until minOf(expect.size, actual.size)) { if (actual[i] != expect[i]) { for (j in maxOf(0, i - 10)..i) println("Actual log item #$j: ${actual[j]}") assertEquals(expect[i], actual[i], "Log item #$i") } } assertEquals(expect.size, actual.size) } private suspend fun modelLog( sh: MutableSharedFlow, op: Flow.() -> Flow = { this } ): List = coroutineScope { val rnd = Random(1) val result = ArrayList() val job = launch { sh.op().collect { value -> result.add("Collect: $value") repeat(rnd.nextInt(0..2)) { result.add("Collect: yield") yield() } } } repeat(1000) { val value = if (rnd.nextBoolean()) null else rnd.nextData() if (rnd.nextInt(20) == 0) { result.add("resetReplayCache & emit: $value") if (sh !is StateFlow<*>) sh.resetReplayCache() assertTrue(sh.tryEmit(value)) } else { result.add("Emit: $value") sh.emit(value) } repeat(rnd.nextInt(0..2)) { result.add("Emit: yield") yield() } } result.add("main: cancel") job.cancel() result.add("main: yield") yield() result.add("main: join") job.join() result } data class Data(val x: Int) private val dataCache = (1..5).associateWith { Data(it) } // Note that we test proper null support here, too private fun Random.nextData(): Data? { val x = nextInt(0..5) if (x == 0) return null // randomly reuse ref or create a new instance return if(nextBoolean()) dataCache[x] else Data(x) } @Test fun testOperatorFusion() { val sh = MutableSharedFlow() assertSame(sh, (sh as Flow<*>).cancellable()) assertSame(sh, (sh as Flow<*>).flowOn(Dispatchers.Default)) assertSame(sh, sh.buffer(Channel.RENDEZVOUS)) } @Test fun testIllegalArgumentException() { assertFailsWith { MutableSharedFlow(-1) } assertFailsWith { MutableSharedFlow(0, extraBufferCapacity = -1) } assertFailsWith { MutableSharedFlow(0, onBufferOverflow = BufferOverflow.DROP_LATEST) } assertFailsWith { MutableSharedFlow(0, onBufferOverflow = BufferOverflow.DROP_OLDEST) } } @Test public fun testReplayCancellability() = testCancellability(fromReplay = true) @Test public fun testEmitCancellability() = testCancellability(fromReplay = false) private fun testCancellability(fromReplay: Boolean) = runTest { expect(1) val sh = MutableSharedFlow(5) fun emitTestData() { for (i in 1..5) assertTrue(sh.tryEmit(i)) } if (fromReplay) emitTestData() // fill in replay first var subscribed = true val job = sh .onSubscription { subscribed = true } .onEach { i -> when (i) { 1 -> expect(2) 2 -> expect(3) 3 -> { expect(4) currentCoroutineContext().cancel() } else -> expectUnreached() // shall check for cancellation } } .launchIn(this) yield() assertTrue(subscribed) // yielding in enough if (!fromReplay) emitTestData() // emit after subscription job.join() finish(5) } @Test fun testSubscriptionCount() = runTest { val flow = MutableSharedFlow() fun startSubscriber() = launch(start = CoroutineStart.UNDISPATCHED) { flow.collect() } assertEquals(0, flow.subscriptionCount.first()) val j1 = startSubscriber() assertEquals(1, flow.subscriptionCount.first()) val j2 = startSubscriber() assertEquals(2, flow.subscriptionCount.first()) j1.cancelAndJoin() assertEquals(1, flow.subscriptionCount.first()) j2.cancelAndJoin() assertEquals(0, flow.subscriptionCount.first()) } @Test fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest { testSubscriptionByFirstSuspensionInCollect(MutableSharedFlow()) { emit(it) } } } /** * Check that, by the time [SharedFlow.collect] suspends for the first time, its subscription is already active. */ inline fun> CoroutineScope.testSubscriptionByFirstSuspensionInCollect(flow: T, emit: T.(Int) -> Unit) { var received = 0 val job = launch(start = CoroutineStart.UNDISPATCHED) { flow.collect { received = it } } flow.emit(1) assertEquals(1, received) job.cancel() }