package kotlinx.coroutines.flow import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlin.random.* import kotlin.test.* // A simplified version of StateFlowStressTest class StateFlowCommonStressTest : TestBase() { private val state = MutableStateFlow(0) @Test fun testSingleEmitterAndCollector() = runTest { var collected = 0L val collector = launch(Dispatchers.Default) { // collect, but abort and collect again after every 1000 values to stress allocation/deallocation do { val batchSize = Random.nextInt(1..1000) var index = 0 val cnt = state.onEach { value -> // the first value in batch is allowed to repeat, but cannot go back val ok = if (index++ == 0) value >= collected else value > collected check(ok) { "Values must be monotonic, but $value is not, was $collected" } collected = value }.take(batchSize).map { 1 }.sum() } while (cnt == batchSize) } var current = 1L val emitter = launch { while (true) { state.value = current++ if (current % 1000 == 0L) yield() // make it cancellable } } delay(3000) emitter.cancelAndJoin() collector.cancelAndJoin() assertTrue { current >= collected / 2 } } }