Properly cancel ChannelCoroutine when the channel was closed or cancelled (#2507)

Fixes #2506
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index bb7feef..9721583 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
 package kotlinx.coroutines.channels
@@ -642,7 +642,13 @@
         cancelInternal(cause)
 
     final override fun cancel(cause: CancellationException?) {
-        if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
+        /*
+         * Do not create an exception if channel is already cancelled.
+         * Channel is closed for receive when either it is cancelled (then we are free to bail out)
+         * or was closed and elements were received.
+         * Then `onCancelIdempotent` does nothing for all implementations.
+         */
+        if (isClosedForReceive) return
         cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
     }
 
diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
index 9ceb77d..b2b257d 100644
--- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
 package kotlinx.coroutines.channels
@@ -26,7 +26,7 @@
     }
 
     final override fun cancel(cause: CancellationException?) {
-        if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
+        if (isCancelled) return // Do not create an exception if the coroutine (-> the channel) is already cancelled
         cancelInternal(cause ?: defaultCancellationException())
     }
 
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt
index 601c238..5513dab 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt
@@ -1,3 +1,7 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
 package kotlinx.coroutines.channels
 
 import kotlinx.atomicfu.*
@@ -115,4 +119,4 @@
             check(!_cancelled.getAndSet(true)) { "Already cancelled" }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
index 6ddde00..194504e 100644
--- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
 package kotlinx.coroutines.channels
@@ -96,6 +96,27 @@
     }
 
     @Test
+    fun testCancelWhenTheChannelIsClosed() = runTest {
+        val channel = produce<Int> {
+            send(1)
+            close()
+            expect(2)
+            launch {
+                expect(3)
+                hang { expect(5) }
+            }
+        }
+
+        expect(1)
+        channel.receive()
+        yield()
+        expect(4)
+        channel.cancel()
+        (channel as Job).join()
+        finish(6)
+    }
+
+    @Test
     fun testAwaitConsumerCancellation() = runTest {
         val parent = Job()
         val channel = produce<Int>(parent) {
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index b115150..31a929b 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
 package kotlinx.coroutines.flow
@@ -194,4 +194,17 @@
         assertEquals(listOf(1), flow.toList())
         finish(3)
     }
+
+    @Test
+    fun testCancelledOnCompletion() = runTest {
+        val myFlow = callbackFlow<Any> {
+            expect(2)
+            close()
+            hang { expect(3) }
+        }
+
+        expect(1)
+        myFlow.collect()
+        finish(4)
+    }
 }