package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import org.junit.* import org.junit.Test import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.coroutines.* import kotlin.test.* class CoroutineSchedulerStressTest : TestBase() { private var dispatcher: SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher() private val observedThreads = ConcurrentHashMap() private val tasksNum = 500_000 * stressMemoryMultiplier() private fun stressMemoryMultiplier(): Int { return if (isStressTest) { AVAILABLE_PROCESSORS * 4 } else { 1 } } private val processed = AtomicInteger(0) private val finishLatch = CountDownLatch(1) @After fun tearDown() { dispatcher.close() } @Test fun testInternalTasksSubmissionProgress() { /* * Run a lot of tasks and validate that * 1) All of them are completed successfully * 2) Every thread executed task at least once */ dispatcher.dispatch(EmptyCoroutineContext, Runnable { for (i in 1..tasksNum) { dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable()) } }) finishLatch.await() val observed = observedThreads.size // on slow machines not all threads can be observed assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors") validateResults() } @Test fun testStealingFromNonProgressing() { /* * Work-stealing stress test, * one thread submits pack of tasks, waits until they are completed (to avoid work offloading) * and then repeats, thus never executing its own tasks and relying only on work stealing. */ var blockingThread: Thread? = null dispatcher.dispatch(EmptyCoroutineContext, Runnable { // Submit million tasks blockingThread = Thread.currentThread() var submittedTasks = 0 while (submittedTasks < tasksNum) { ++submittedTasks dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable()) while (submittedTasks - processed.get() > 100) { Thread.yield() } } // Block current thread finishLatch.await() }) finishLatch.await() assertFalse(observedThreads.containsKey(blockingThread!!)) validateResults() } private fun processTask() { val counter = observedThreads[Thread.currentThread()] ?: 0L observedThreads[Thread.currentThread()] = counter + 1 if (processed.incrementAndGet() == tasksNum) { finishLatch.countDown() } } private fun validateResults() { val result = observedThreads.values.sum() assertEquals(tasksNum.toLong(), result) } private inner class ValidatingRunnable : Runnable { private val invoked = atomic(false) override fun run() { if (!invoked.compareAndSet(false, true)) error("The same runnable was invoked twice") processTask() } } }