@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 package kotlinx.coroutines.selects import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.test.* class SelectRendezvousChannelTest : TestBase() { @Test fun testSelectSendSuccess() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(2) assertEquals("OK", channel.receive()) finish(6) } yield() // to launched coroutine expect(3) select { channel.onSend("OK") { expect(4) } } expect(5) } @Test fun testSelectSendSuccessWithDefault() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(2) assertEquals("OK", channel.receive()) finish(6) } yield() // to launched coroutine expect(3) select { channel.onSend("OK") { expect(4) } default { expectUnreached() } } expect(5) } @Test fun testSelectSendWaitWithDefault() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) select { channel.onSend("OK") { expectUnreached() } default { expect(2) } } expect(3) // make sure receive blocks (select above is over) launch { expect(5) assertEquals("CHK", channel.receive()) finish(8) } expect(4) yield() expect(6) channel.send("CHK") expect(7) } @Test fun testSelectSendWait() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(3) assertEquals("OK", channel.receive()) expect(4) } expect(2) select { channel.onSend("OK") { expect(5) } } finish(6) } @Test fun testSelectReceiveSuccess() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(2) channel.send("OK") finish(6) } yield() // to launched coroutine expect(3) select { channel.onReceive { v -> expect(4) assertEquals("OK", v) } } expect(5) } @Test fun testSelectReceiveSuccessWithDefault() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(2) channel.send("OK") finish(6) } yield() // to launched coroutine expect(3) select { channel.onReceive { v -> expect(4) assertEquals("OK", v) } default { expectUnreached() } } expect(5) } @Test fun testSelectReceiveWaitWithDefault() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) select { channel.onReceive { expectUnreached() } default { expect(2) } } expect(3) // make sure send blocks (select above is over) launch { expect(5) channel.send("CHK") finish(8) } expect(4) yield() expect(6) assertEquals("CHK", channel.receive()) expect(7) } @Test fun testSelectReceiveWait() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(3) channel.send("OK") expect(4) } expect(2) select { channel.onReceive { v -> expect(5) assertEquals("OK", v) } } finish(6) } @Test fun testSelectReceiveClosed() = runTest(expected = { it is ClosedReceiveChannelException }) { expect(1) val channel = Channel(Channel.RENDEZVOUS) channel.close() finish(2) select { channel.onReceive { expectUnreached() } } expectUnreached() } @Test fun testSelectReceiveWaitClosed() = runTest(expected = {it is ClosedReceiveChannelException}) { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(3) channel.close() finish(4) } expect(2) select { channel.onReceive { expectUnreached() } } expectUnreached() } @Test fun testSelectSendResourceCleanup() = runTest { val channel = Channel(Channel.RENDEZVOUS) val n = 1_000 expect(1) repeat(n) { i -> select { channel.onSend(i) { expectUnreached() } default { expect(i + 2) } } } finish(n + 2) } @Test fun testSelectReceiveResourceCleanup() = runTest { val channel = Channel(Channel.RENDEZVOUS) val n = 1_000 expect(1) repeat(n) { i -> select { channel.onReceive { expectUnreached() } default { expect(i + 2) } } } finish(n + 2) } @Test fun testSelectAtomicFailure() = runTest { val c1 = Channel(Channel.RENDEZVOUS) val c2 = Channel(Channel.RENDEZVOUS) expect(1) launch { expect(3) val res = select { c1.onReceive { v1 -> expect(4) assertEquals(42, v1) yield() // back to main expect(7) "OK" } c2.onReceive { "FAIL" } } expect(8) assertEquals("OK", res) } expect(2) c1.send(42) // send to coroutine, suspends expect(5) c2.close() // makes sure that selected expression does not fail! expect(6) yield() // back finish(9) } @Test fun testSelectWaitDispatch() = runTest { val c = Channel(Channel.RENDEZVOUS) expect(1) launch { expect(3) val res = select { c.onReceive { v -> expect(6) assertEquals(42, v) yield() // back to main expect(8) "OK" } } expect(9) assertEquals("OK", res) } expect(2) yield() // to launch expect(4) c.send(42) // do not suspend expect(5) yield() // to receive expect(7) yield() // again finish(10) } @Test fun testSelectReceiveCatchingWaitClosed() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(3) channel.close() expect(4) } expect(2) select { channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) assertNull(it.exceptionOrNull()) } } finish(6) } @Test fun testSelectReceiveCatchingWaitClosedWithCause() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { expect(3) channel.close(TestException()) expect(4) } expect(2) select { channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) assertIs(it.exceptionOrNull()) } } finish(6) } @Test fun testSelectReceiveCatchingForClosedChannel() = runTest { val channel = Channel() channel.close() expect(1) select { expect(2) channel.onReceiveCatching { assertTrue(it.isClosed) assertNull(it.exceptionOrNull()) finish(3) } } } @Test fun testSelectReceiveCatching() = runTest { val channel = Channel(Channel.RENDEZVOUS) val iterations = 10 expect(1) val job = launch { repeat(iterations) { select { channel.onReceiveCatching { v -> expect(4 + it * 2) assertEquals(it, v.getOrThrow()) } } } } expect(2) repeat(iterations) { expect(3 + it * 2) channel.send(it) yield() } job.join() finish(3 + iterations * 2) } @Test fun testSelectReceiveCatchingDispatch() = runTest { val c = Channel(Channel.RENDEZVOUS) expect(1) launch { expect(3) val res = select { c.onReceiveCatching { v -> expect(6) assertEquals(42, v.getOrThrow()) yield() // back to main expect(8) "OK" } } expect(9) assertEquals("OK", res) } expect(2) yield() // to launch expect(4) c.send(42) // do not suspend expect(5) yield() // to receive expect(7) yield() // again finish(10) } @Test fun testSelectSendWhenClosed() = runTest { expect(1) val c = Channel(Channel.RENDEZVOUS) val sender = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) c.send(1) // enqueue sender expectUnreached() } c.close() // then close assertFailsWith { // select sender should fail expect(3) select { c.onSend(2) { expectUnreached() } } } sender.cancel() finish(4) } // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) = onTimeout(0, block) @Test fun testSelectSendAndReceive() = runTest { val c = Channel() assertFailsWith { select { c.onSend(1) { expectUnreached() } c.onReceive { expectUnreached() } } } checkNotBroken(c) } @Test fun testSelectReceiveAndSend() = runTest { val c = Channel() assertFailsWith { select { c.onReceive { expectUnreached() } c.onSend(1) { expectUnreached() } } } checkNotBroken(c) } // makes sure the channel is not broken private suspend fun checkNotBroken(c: Channel) { coroutineScope { launch { c.send(42) } assertEquals(42, c.receive()) } } }