package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import org.junit.Test import java.util.concurrent.locks.* import kotlin.concurrent.* import kotlin.test.* /** * Tests event loops integration. * See [https://github.com/Kotlin/kotlinx.coroutines/issues/860]. */ class EventLoopsTest : TestBase() { @Test fun testNestedRunBlocking() { runBlocking { // outer event loop // Produce string "OK" val ch = produce { send("OK") } // try receive this string in a blocking way: assertEquals("OK", runBlocking { ch.receive() }) // it should not hang here } } @Test fun testUnconfinedInRunBlocking() { var completed = false runBlocking { launch(Dispatchers.Unconfined) { completed = true } // should not go into runBlocking loop, but complete here assertTrue(completed) } } @Test fun testNestedUnconfined() { expect(1) GlobalScope.launch(Dispatchers.Unconfined) { expect(2) GlobalScope.launch(Dispatchers.Unconfined) { // this gets scheduled into outer unconfined loop expect(4) } expect(3) // ^^ executed before the above unconfined } finish(5) } @Test fun testEventLoopInDefaultExecutor() = runTest { expect(1) withContext(Dispatchers.Unconfined) { delay(1) assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) expect(2) // now runBlocking inside default executor thread --> should use outer event loop DefaultExecutor.enqueue(Runnable { expect(4) // will execute when runBlocking runs loop }) expect(3) runBlocking { expect(5) } } finish(6) } /** * Simple test for [processNextEventInCurrentThread] API use-case. */ @Test fun testProcessNextEventInCurrentThreadSimple() = runTest { expect(1) val event = EventSync() // this coroutine fires event launch { expect(3) event.fireEvent() } // main coroutine waits for event (same thread!) expect(2) event.blockingAwait() finish(4) } @Test fun testSecondThreadRunBlocking() = runTest { val testThread = Thread.currentThread() val testContext = coroutineContext val event = EventSync() // will signal completion var thread = thread { runBlocking { // outer event loop // Produce string "OK" val ch = produce { send("OK") } // try receive this string in a blocking way using test context (another thread) assertEquals("OK", runBlocking(testContext) { assertEquals(testThread, Thread.currentThread()) ch.receive() // it should not hang here }) } event.fireEvent() // done thread } event.blockingAwait() // wait for thread to complete thread.join() // it is safe to join thread now } /** * Test for [processNextEventInCurrentThread] API use-case with delay. */ @Test fun testProcessNextEventInCurrentThreadDelay() = runTest { expect(1) val event = EventSync() // this coroutine fires event launch { expect(3) delay(100) event.fireEvent() } // main coroutine waits for event (same thread!) expect(2) event.blockingAwait() finish(4) } class EventSync { private val waitingThread = atomic(null) private val fired = atomic(false) fun fireEvent() { fired.value = true waitingThread.value?.let { LockSupport.unpark(it) } } fun blockingAwait() { check(waitingThread.getAndSet(Thread.currentThread()) == null) while (!fired.getAndSet(false)) { val time = processNextEventInCurrentThread() LockSupport.parkNanos(time) } waitingThread.value = null } } }