package kotlinx.coroutines.flow import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test import java.util.* import java.util.concurrent.atomic.* import kotlin.random.* import kotlin.test.* import kotlin.time.* import kotlin.time.TimeSource class SharingStressTest : TestBase() { private val testDuration = 1000L * stressTestMultiplier private val nSubscribers = 5 private val testStarted = TimeSource.Monotonic.markNow() @get:Rule val emitterDispatcher = ExecutorRule(1) @get:Rule val subscriberDispatcher = ExecutorRule(nSubscribers) @Test public fun testNoReplayLazy() = testStress(0, started = SharingStarted.Lazily) @Test public fun testNoReplayWhileSubscribed() = testStress(0, started = SharingStarted.WhileSubscribed()) @Test public fun testNoReplayWhileSubscribedTimeout() = testStress(0, started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 50L)) @Test public fun testReplay100WhileSubscribed() = testStress(100, started = SharingStarted.WhileSubscribed()) @Test public fun testReplay100WhileSubscribedReset() = testStress(100, started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0L)) @Test public fun testReplay100WhileSubscribedTimeout() = testStress(100, started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 50L)) @Test public fun testStateLazy() = testStress(1, started = SharingStarted.Lazily) @Test public fun testStateWhileSubscribed() = testStress(1, started = SharingStarted.WhileSubscribed()) @Test public fun testStateWhileSubscribedReset() = testStress(1, started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0L)) private fun testStress(replay: Int, started: SharingStarted) = runTest { log("-- Stress with replay=$replay, started=$started") val random = Random(1) val emitIndex = AtomicLong() val cancelledEmits = HashSet() val missingCollects = Collections.synchronizedSet(LinkedHashSet()) // at most one copy of upstream can be running at any time val isRunning = AtomicInteger(0) val upstream = flow { assertEquals(0, isRunning.getAndIncrement()) try { while (true) { val value = emitIndex.getAndIncrement() try { emit(value) } catch (e: CancellationException) { // emission was cancelled -> could be missing cancelledEmits.add(value) throw e } } } finally { assertEquals(1, isRunning.getAndDecrement()) } } val subCount = MutableStateFlow(0) val sharingJob = Job() val sharingScope = this + emitterDispatcher + sharingJob val usingStateFlow = replay == 1 val sharedFlow = if (usingStateFlow) upstream.stateIn(sharingScope, started, 0L) else upstream.shareIn(sharingScope, started, replay) try { val subscribers = ArrayList() withTimeoutOrNull(testDuration) { // start and stop subscribers while (true) { log("Staring $nSubscribers subscribers") repeat(nSubscribers) { subscribers += launchSubscriber(sharedFlow, usingStateFlow, subCount, missingCollects) } // wait until they all subscribed subCount.first { it == nSubscribers } // let them work a bit more & make sure emitter did not hang val fromEmitIndex = emitIndex.get() val waitEmitIndex = fromEmitIndex + 100 // wait until 100 emitted withTimeout(10000) { // wait for at most 10s for something to be emitted do { delay(random.nextLong(50L..100L)) } while (emitIndex.get() < waitEmitIndex) // Ok, enough was emitted, wait more if not } // Stop all subscribers and ensure they collected something log("Stopping subscribers (emitted = ${emitIndex.get() - fromEmitIndex})") subscribers.forEach { it.job.cancelAndJoin() assertTrue { it.count > 0 } // something must be collected too } subscribers.clear() log("Intermission") delay(random.nextLong(10L..100L)) // wait a bit before starting them again } } if (!subscribers.isEmpty()) { log("Stopping subscribers") subscribers.forEach { it.job.cancelAndJoin() } } } finally { log("--- Finally: Cancelling sharing job") sharingJob.cancel() } sharingJob.join() // make sure sharing job did not hang log("Emitter was cancelled ${cancelledEmits.size} times") log("Collectors missed ${missingCollects.size} values") for (value in missingCollects) { assertTrue(value in cancelledEmits, "Value $value is missing for no apparent reason") } } private fun CoroutineScope.launchSubscriber( sharedFlow: SharedFlow, usingStateFlow: Boolean, subCount: MutableStateFlow, missingCollects: MutableSet ): SubJob { val subJob = SubJob() subJob.job = launch(subscriberDispatcher) { var last = -1L sharedFlow .onSubscription { subCount.increment(1) } .onCompletion { subCount.increment(-1) } .collect { j -> subJob.count++ // last must grow sequentially, no jumping or losses if (last == -1L) { last = j } else { val expected = last + 1 if (usingStateFlow) assertTrue(expected <= j) else { if (expected != j) { if (j == expected + 1) { // if missing just one -- could be race with cancelled emit missingCollects.add(expected) } else { // broken otherwise assertEquals(expected, j) } } } last = j } } } return subJob } private class SubJob { lateinit var job: Job var count = 0L } private fun log(msg: String) = println("${testStarted.elapsedNow().inWholeMilliseconds} ms: $msg") private fun MutableStateFlow.increment(delta: Int) { update { it + delta } } }