package kotlinx.coroutines.reactive import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.reactivestreams.* import org.reactivestreams.tck.* import org.testng.* import org.testng.annotations.* class ReactiveStreamTckTest : TestBase() { @Factory(dataProvider = "dispatchers") fun createTests(dispatcher: Dispatcher): Array { return arrayOf(ReactiveStreamTckTestSuite(dispatcher)) } @DataProvider(name = "dispatchers") public fun dispatchers(): Array> = Dispatcher.values().map { arrayOf(it) }.toTypedArray() public class ReactiveStreamTckTestSuite( private val dispatcher: Dispatcher ) : PublisherVerification(TestEnvironment(500, 500)) { override fun createPublisher(elements: Long): Publisher = publish(dispatcher.dispatcher) { for (i in 1..elements) send(i) } override fun createFailedPublisher(): Publisher = publish(dispatcher.dispatcher) { throw TestException() } @Test public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() { throw SkipException("Skipped") } class TestException : Exception() } } enum class Dispatcher(val dispatcher: CoroutineDispatcher) { DEFAULT(Dispatchers.Default), UNCONFINED(Dispatchers.Unconfined) }