package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* import org.junit.* import org.junit.Test import java.util.concurrent.* import kotlin.test.* class RejectedExecutionTest : TestBase() { private val threadName = "RejectedExecutionTest" private val executor = RejectingExecutor() @After fun tearDown() { executor.shutdown() executor.awaitTermination(10, TimeUnit.SECONDS) } @Test fun testRejectOnLaunch() = runTest { expect(1) val job = launch(executor.asCoroutineDispatcher()) { expectUnreached() } assertEquals(1, executor.submittedTasks) assertTrue(job.isCancelled) finish(2) } @Test fun testRejectOnLaunchAtomic() = runTest { expect(1) val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) { expect(2) assertEquals(true, coroutineContext[Job]?.isCancelled) assertIoThread() // was rejected on start, but start was atomic } assertEquals(1, executor.submittedTasks) job.join() finish(3) } @Test fun testRejectOnWithContext() = runTest { expect(1) assertFailsWith { withContext(executor.asCoroutineDispatcher()) { expectUnreached() } } assertEquals(1, executor.submittedTasks) finish(2) } @Test fun testRejectOnResumeInContext() = runTest { expect(1) executor.acceptTasks = 1 // accept one task assertFailsWith { withContext(executor.asCoroutineDispatcher()) { expect(2) assertExecutorThread() try { withContext(Dispatchers.Default) { expect(3) assertDefaultDispatcherThread() // We have to wait until caller executor thread had already suspended (if not running task), // so that we resume back to it a new task is posted executor.awaitNotRunningTask() expect(4) assertDefaultDispatcherThread() } // cancelled on resume back } finally { expect(5) assertIoThread() } expectUnreached() } } assertEquals(2, executor.submittedTasks) finish(6) } @Test fun testRejectOnDelay() = runTest { if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs expect(1) executor.acceptTasks = 1 // accept one task assertFailsWith { withContext(executor.asCoroutineDispatcher()) { expect(2) assertExecutorThread() try { delay(10) // cancelled } finally { // Since it was cancelled on attempt to delay, it still stays on the same thread assertExecutorThread() } expectUnreached() } } assertEquals(2, executor.submittedTasks) finish(3) } @Test fun testRejectWithTimeout() = runTest { if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs expect(1) executor.acceptTasks = 1 // accept one task assertFailsWith { withContext(executor.asCoroutineDispatcher()) { expect(2) assertExecutorThread() withTimeout(1000) { expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way) assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled } expectUnreached() } } assertEquals(2, executor.submittedTasks) finish(4) } private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { var acceptTasks = 0 var submittedTasks = 0 val runningTask = MutableStateFlow(false) override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { submittedTasks++ if (submittedTasks > acceptTasks) throw RejectedExecutionException() val wrapper = Runnable { runningTask.value = true try { command.run() } finally { runningTask.value = false } } return super.schedule(wrapper, delay, unit) } suspend fun awaitNotRunningTask() = runningTask.first { !it } } private fun assertExecutorThread() { val thread = Thread.currentThread() if (!thread.name.startsWith(threadName)) error("Not an executor thread: $thread") } private fun assertDefaultDispatcherThread() { val thread = Thread.currentThread() if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.Default: $thread") assertEquals(CoroutineScheduler.WorkerState.CPU_ACQUIRED, thread.state) } private fun assertIoThread() { val thread = Thread.currentThread() if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread") assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state) } }