SharedFlow: Fix scenario with concurrent emitters and cancellation of subscriber (#2359)

* Added a specific test for a problematic scenario.
* Added stress test with concurrent emitters and subscribers that come and go.

Fixes #2356
diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
index 427041a..feb2749 100644
--- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
@@ -498,6 +498,12 @@
         }
         // Compute new buffer size -> how many values we now actually have after resume
         val newBufferSize1 = (newBufferEndIndex - head).toInt()
+        // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
+        // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
+        // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
+        // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
+        // expression, which coerces values that are too big anyway.
+        if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
         // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
         var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
         // adjustment for synchronous case with cancelled emitter (NO_VALUE)
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
index c3eb2da..794553b 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
@@ -201,6 +201,48 @@
             emitResumes(e3); expectReplayOf(3)
         }
 
+    @Test
+    fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() =
+        testSharedFlow<Int>(MutableSharedFlow(1)) {
+            val a = subscribe("a");
+            emitRightNow(0); expectReplayOf(0)
+            collect(a, 0)
+            emitRightNow(1); expectReplayOf(1)
+            val e2 = emitSuspends(2) // suspends until 1 is collected
+            val e3 = emitSuspends(3) // suspends until 1 is collected, too
+            cancel(a) // must resume emitters 2 & 3
+            emitResumes(e2)
+            emitResumes(e3)
+            expectReplayOf(3) // but replay size is 1 so only 3 should be kept
+            // Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
+            val b = subscribe("b")
+            collect(b, 3)
+            emitRightNow(4); expectReplayOf(4)
+            collect(b, 4)
+        }
+
+    @Test
+    fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() =
+        testSharedFlow<Int>(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) {
+            val a = subscribe("a");
+            emitRightNow(0); expectReplayOf(0)
+            collect(a, 0)
+            emitRightNow(1); expectReplayOf(1)
+            emitRightNow(2); expectReplayOf(2)
+            val e3 = emitSuspends(3) // suspends until 1 is collected
+            val e4 = emitSuspends(4) // suspends until 1 is collected, too
+            val e5 = emitSuspends(5) // suspends until 1 is collected, too
+            cancel(a) // must resume emitters 3, 4, 5
+            emitResumes(e3)
+            emitResumes(e4)
+            emitResumes(e5)
+            expectReplayOf(5)
+            val b = subscribe("b")
+            collect(b, 5)
+            emitRightNow(6); expectReplayOf(6)
+            collect(b, 6)
+        }
+
     private fun <T> testSharedFlow(
         sharedFlow: MutableSharedFlow<T>,
         scenario: suspend ScenarioDsl<T>.() -> Unit
diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt
new file mode 100644
index 0000000..349b7c8
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.collections.ArrayList
+import kotlin.test.*
+import kotlin.time.*
+
+@ExperimentalTime
+class SharedFlowStressTest : TestBase() {
+    private val nProducers = 5
+    private val nConsumers = 3
+    private val nSeconds = 3 * stressTestMultiplier
+
+    private lateinit var sf: MutableSharedFlow<Long>
+    private lateinit var view: SharedFlow<Long>
+
+    @get:Rule
+    val producerDispatcher = ExecutorRule(nProducers)
+    @get:Rule
+    val consumerDispatcher = ExecutorRule(nConsumers)
+
+    private val totalProduced = atomic(0L)
+    private val totalConsumed = atomic(0L)
+
+    @Test
+    fun testStressReplay1() =
+        testStress(1, 0)
+
+    @Test
+    fun testStressReplay1ExtraBuffer1() =
+        testStress(1, 1)
+
+    @Test
+    fun testStressReplay2ExtraBuffer1() =
+        testStress(2, 1)
+
+    private fun testStress(replay: Int, extraBufferCapacity: Int) = runTest {
+        sf = MutableSharedFlow(replay, extraBufferCapacity)
+        view = sf.asSharedFlow()
+        val jobs = ArrayList<Job>()
+        jobs += List(nProducers) { producerIndex ->
+            launch(producerDispatcher) {
+                var cur = producerIndex.toLong()
+                while (isActive) {
+                    sf.emit(cur)
+                    totalProduced.incrementAndGet()
+                    cur += nProducers
+                }
+            }
+        }
+        jobs += List(nConsumers) { consumerIndex ->
+            launch(consumerDispatcher) {
+                while (isActive) {
+                    view
+                        .dropWhile { it % nConsumers != consumerIndex.toLong() }
+                        .take(1)
+                        .collect {
+                            check(it % nConsumers == consumerIndex.toLong())
+                            totalConsumed.incrementAndGet()
+                        }
+                }
+            }
+        }
+        var lastProduced = 0L
+        var lastConsumed = 0L
+        for (sec in 1..nSeconds) {
+            delay(1.seconds)
+            val produced = totalProduced.value
+            val consumed = totalConsumed.value
+            println("$sec sec: produced = $produced; consumed = $consumed")
+            assertNotEquals(lastProduced, produced)
+            assertNotEquals(lastConsumed, consumed)
+            lastProduced = produced
+            lastConsumed = consumed
+        }
+        jobs.forEach { it.cancel() }
+        jobs.forEach { it.join() }
+        println("total: produced = ${totalProduced.value}; consumed = ${totalConsumed.value}")
+    }
+}
\ No newline at end of file