package kotlinx.coroutines.internal import kotlinx.coroutines.testing.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.debug.internal.* import org.junit.Test import kotlin.concurrent.* import kotlin.test.* /** * Concurrent test for [ConcurrentWeakMap] that tests put/get/remove from concurrent threads and is * arranged so that concurrent rehashing is also happening. */ class ConcurrentWeakMapOperationStressTest : TestBase() { private val nThreads = 10 private val batchSize = 1000 private val nSeconds = 3 * stressTestMultiplier private val count = atomic(0L) private val stop = atomic(false) private data class Key(val i: Long) @Test fun testOperations() { // We don't create queue here, because concurrent operations are enough to make it clean itself val m = ConcurrentWeakMap() val threads = Array(nThreads) { index -> thread(start = false, name = "ConcurrentWeakMapOperationStressTest-$index") { var generationOffset = 0L while (!stop.value) { val kvs = (generationOffset + batchSize * index until generationOffset + batchSize * (index + 1)) .associateBy({ Key(it) }, { it * it }) generationOffset += batchSize * nThreads for ((k, v) in kvs) { assertEquals(null, m.put(k, v)) } for ((k, v) in kvs) { assertEquals(v, m[k]) } for ((k, v) in kvs) { assertEquals(v, m.remove(k)) } for ((k, _) in kvs) { assertEquals(null, m[k]) } count.incrementAndGet() } } } val uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { t, ex -> ex.printStackTrace() error("Error in thread $t", ex) } threads.forEach { it.uncaughtExceptionHandler = uncaughtExceptionHandler } threads.forEach { it.start() } var lastCount = -1L for (sec in 1..nSeconds) { Thread.sleep(1000) val count = count.value println("$sec: done $count batches") assertTrue(count > lastCount) // ensure progress lastCount = count } stop.value = true threads.forEach { it.join() } assertEquals(0, m.size, "Unexpected map state: $m") } }