package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test import java.util.concurrent.atomic.* import kotlin.test.* /** * Test that ensures implementation correctness of [LimitingDispatcher] and * designed to stress its particular implementation details. */ class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() { private val concurrentWorkers = AtomicInteger(0) @Before fun setUp() { // In case of starvation test will hang idleWorkerKeepAliveNs = Long.MAX_VALUE } @Test fun testAddPollRace() = runBlocking { val limitingDispatcher = blockingDispatcher(1) val iterations = 25_000 * stressTestMultiplier // Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang. for (i in 1..iterations) { val tasks = (1..2).map { async(limitingDispatcher) { try { val currentlyExecuting = concurrentWorkers.incrementAndGet() assertEquals(1, currentlyExecuting) } finally { concurrentWorkers.decrementAndGet() } } } tasks.forEach { it.await() } } } @Test fun testPingPongThreadsCount() = runBlocking { corePoolSize = CORES_COUNT val iterations = 100_000 * stressTestMultiplier val completed = AtomicInteger(0) for (i in 1..iterations) { val tasks = (1..2).map { async(dispatcher) { // Useless work concurrentWorkers.incrementAndGet() concurrentWorkers.decrementAndGet() completed.incrementAndGet() } } tasks.forEach { it.await() } } assertEquals(2 * iterations, completed.get()) } }