@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.reactive import kotlinx.coroutines.flow.* import org.junit.Ignore import org.junit.Test import org.reactivestreams.* import org.reactivestreams.tck.* import java.util.concurrent.* import java.util.concurrent.ForkJoinPool.* import kotlin.test.* class IterableFlowTckTest : PublisherVerification(TestEnvironment()) { private fun generate(num: Long): Array { return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() } } override fun createPublisher(elements: Long): Publisher { return generate(elements).asIterable().asFlow().asPublisher() } @Suppress("SubscriberImplementation") override fun createFailedPublisher(): Publisher? { /* * This is a hack for our adapter structure: * Tests assume that calling "collect" is enough for publisher to fail and it is not * true for our implementation */ val pub = { error(42) }.asFlow().asPublisher() return Publisher { subscriber -> pub.subscribe(object : Subscriber by subscriber as Subscriber { override fun onSubscribe(s: Subscription) { subscriber.onSubscribe(s) s.request(1) } }) } } @Test fun testStackOverflowTrampoline() { val latch = CountDownLatch(1) val collected = ArrayList() val toRequest = 1000L val array = generate(toRequest) val publisher = array.asIterable().asFlow().asPublisher() publisher.subscribe(object : Subscriber { private lateinit var s: Subscription override fun onSubscribe(s: Subscription) { this.s = s s.request(1) } override fun onNext(aLong: Long) { collected.add(aLong) s.request(1) } override fun onError(t: Throwable) { } override fun onComplete() { latch.countDown() } }) latch.await(5, TimeUnit.SECONDS) assertEquals(collected, array.toList()) } @Test fun testConcurrentRequest() { val latch = CountDownLatch(1) val collected = ArrayList() val n = 50000L val array = generate(n) val publisher = array.asIterable().asFlow().asPublisher() publisher.subscribe(object : Subscriber { private var s: Subscription? = null override fun onSubscribe(s: Subscription) { this.s = s for (i in 0..n) { commonPool().execute { s.request(1) } } } override fun onNext(aLong: Long) { collected.add(aLong) } override fun onError(t: Throwable) { } override fun onComplete() { latch.countDown() } }) latch.await() assertEquals(array.toList(), collected) } @Ignore override fun required_spec309_requestZeroMustSignalIllegalArgumentException() { } @Ignore override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { } @Ignore override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() { // This test has a bug in it } }