Merge pull request #2337 from Kotlin/version-1.4.0

Version 1.4.0
diff --git a/CHANGES.md b/CHANGES.md
index 513c28f..bce941c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,29 @@
 # Change log for kotlinx.coroutines
 
+## Version 1.4.0
+
+### Improvements
+
+* `StateFlow`, `SharedFlow` and corresponding operators are promoted to stable API (#2316).
+* `Flow.debounce` operator with timeout selector based on each individual element is added (#1216, thanks to @mkano9!). 
+* `CoroutineContext.job` extension property is introduced (#2159).
+* `Flow.combine operator` is reworked:
+    * Complete fairness is maintained for single-threaded dispatchers.
+    * Its performance is improved, depending on the use-case, by at least 50% (#2296).
+    * Quadratic complexity depending on the number of upstream flows is eliminated (#2296).
+    * `crossinline` and `inline`-heavy internals are removed, fixing sporadic SIGSEGV on Mediatek Android devices (#1683, #1743).
+* `Flow.zip` operator performance is improved by 40%.
+* Various API has been promoted to stable or its deprecation level has been raised (#2316).
+
+### Bug fixes
+
+* Suspendable `stateIn` operator propagates exception to the caller when upstream fails to produce initial value (#2329).
+* Fix `SharedFlow` with replay for subscribers working at different speed (#2325).
+* Do not fail debug agent installation when security manager does not provide access to system properties (#2311).
+* Cancelled lazy coroutines are properly cleaned up from debug agent output (#2294).
+* `BlockHound` false-positives are correctly filtered out (#2302, #2190, #2303).
+* Potential crash during a race between cancellation and upstream in `Observable.asFlow` is fixed (#2104, #2299, thanks to @LouisCAD and @drinkthestars).
+
 ## Version 1.4.0-M1
 
 ### Breaking changes
diff --git a/README.md b/README.md
index 1e72cc1..2bc45cf 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
 
 [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
 [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0)
-[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.4.0-M1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.0-M1)
+[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.4.0) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.0)
 [![Kotlin](https://img.shields.io/badge/kotlin-1.4.0-blue.svg?logo=kotlin)](http://kotlinlang.org)
 [![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/)
 
@@ -29,7 +29,7 @@
   * [delay] and [yield] top-level suspending functions;
   * [Flow] — cold asynchronous stream with [flow][_flow] builder and comprehensive operator set ([filter], [map], etc);
   * [Channel], [Mutex], and [Semaphore] communication and synchronization primitives;
-  * [coroutineScope], [supervisorScope], [withContext], and [withTimeout] scope builders;
+  * [coroutineScope][_coroutineScope], [supervisorScope][_supervisorScope], [withContext], and [withTimeout] scope builders;
   * [MainScope()] for Android and UI applications;
   * [SupervisorJob()] and [CoroutineExceptionHandler] for supervision of coroutines hierarchies;
   * [select] expression support and more.
@@ -86,7 +86,7 @@
 <dependency>
     <groupId>org.jetbrains.kotlinx</groupId>
     <artifactId>kotlinx-coroutines-core</artifactId>
-    <version>1.4.0-M1</version>
+    <version>1.4.0</version>
 </dependency>
 ```
 
@@ -104,7 +104,7 @@
 
 ```groovy
 dependencies {
-    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0-M1'
+    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0'
 }
 ```
 
@@ -130,7 +130,7 @@
 
 ```groovy
 dependencies {
-    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0-M1")
+    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0")
 }
 ```
 
@@ -152,7 +152,7 @@
 ```groovy
 commonMain {
     dependencies {
-        implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0-M1")
+        implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0")
     }
 }
 ```
@@ -163,7 +163,7 @@
 module as dependency when using `kotlinx.coroutines` on Android:
 
 ```groovy
-implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0-M1'
+implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0'
 ```
 
 This gives you access to Android [Dispatchers.Main]
@@ -190,7 +190,7 @@
 ### JS
 
 [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as 
-[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.0-M1/jar)
+[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.0/jar)
 (follow the link to get the dependency declaration snippet).
  
 You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. 
@@ -198,7 +198,7 @@
 ### Native
 
 [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as 
-[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.0-M1/jar)
+[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.0/jar)
 (follow the link to get the dependency declaration snippet).
 
 Only single-threaded code (JS-style) on Kotlin/Native is currently supported. 
@@ -227,8 +227,8 @@
 [Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
 [delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
 [yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html
-[coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
-[supervisorScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/supervisor-scope.html
+[_coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
+[_supervisorScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/supervisor-scope.html
 [withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
 [withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout.html
 [MainScope()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-main-scope.html
diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts
index 5da40f2..b60dcbc 100644
--- a/benchmarks/build.gradle.kts
+++ b/benchmarks/build.gradle.kts
@@ -31,38 +31,12 @@
     }
 }
 
-/*
- * Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths,
- * and it breaks JMH that tries to post-process these symbols and fails because they are renamed.
- */
-val removeRedundantFiles by tasks.registering(Delete::class) {
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
 
-    // Primes
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class")
-    delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class")
-}
-
-tasks.named("jmhRunBytecodeGenerator") {
-    dependsOn(removeRedundantFiles)
-}
 
 // It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
 // ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark"
 extensions.configure<JMHPluginExtension>("jmh") {
-    jmhVersion = "1.21"
+    jmhVersion = "1.26"
     duplicateClassesStrategy = DuplicatesStrategy.INCLUDE
     failOnError = true
     resultFormat = "CSV"
@@ -80,7 +54,7 @@
 }
 
 dependencies {
-    compile("org.openjdk.jmh:jmh-core:1.21")
+    compile("org.openjdk.jmh:jmh-core:1.26")
     compile("io.projectreactor:reactor-core:${version("reactor")}")
     compile("io.reactivex.rxjava2:rxjava:2.1.9")
     compile("com.github.akarnokd:rxjava2-extensions:0.20.8")
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt
index 9c7f38a..6c5b623 100644
--- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt
+++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt
@@ -50,4 +50,22 @@
         for (i in start until (start + count))
             send(i)
     }
+
+    // Migrated from deprecated operators, are good only for stressing channels
+
+    private fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+        GlobalScope.produce(context, onCompletion = { cancel() }) {
+            for (e in this@filter) {
+                if (predicate(e)) send(e)
+            }
+        }
+
+    private suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
+        var accumulator = initial
+        consumeEach {
+            accumulator = operation(accumulator, it)
+        }
+        return accumulator
+    }
 }
+
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt
new file mode 100644
index 0000000..4725ced
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.*
+
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class CombineFlowsBenchmark {
+
+    @Param("10", "100", "1000")
+    private var size = 10
+
+    @Benchmark
+    fun combine() = runBlocking {
+        combine((1 until size).map { flowOf(it) }) { a -> a}.collect()
+    }
+
+    @Benchmark
+    fun combineTransform() = runBlocking {
+        val list = (1 until size).map { flowOf(it) }.toList()
+        combineTransform((1 until size).map { flowOf(it) }) { emit(it) }.collect()
+    }
+}
+
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt
new file mode 100644
index 0000000..f7fbc6c
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/CombineTwoFlowsBenchmark.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.flow.internal.*
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.*
+
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class CombineTwoFlowsBenchmark {
+
+    @Param("100", "100000", "1000000")
+    private var size = 100000
+
+    @Benchmark
+    fun combinePlain() = runBlocking {
+        val flow = (1 until size.toLong()).asFlow()
+        flow.combine(flow) { a, b -> a + b }.collect()
+    }
+
+    @Benchmark
+    fun combineTransform() = runBlocking {
+        val flow = (1 until size.toLong()).asFlow()
+        flow.combineTransform(flow) { a, b -> emit(a + b) }.collect()
+    }
+
+    @Benchmark
+    fun combineVararg() = runBlocking {
+        val flow = (1 until size.toLong()).asFlow()
+        combine(listOf(flow, flow)) { arr -> arr[0] + arr[1] }.collect()
+    }
+
+    @Benchmark
+    fun combineTransformVararg() = runBlocking {
+        val flow = (1 until size.toLong()).asFlow()
+        combineTransform(listOf(flow, flow)) { arr -> emit(arr[0] + arr[1]) }.collect()
+    }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt
index 4ebb3d0..8453f5c 100644
--- a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt
@@ -77,14 +77,14 @@
 
     @Benchmark
     fun zipRx() {
-        val numbers = rxNumbers().take(natural.toLong())
+        val numbers = rxNumbers().take(natural)
         val first = numbers
             .filter { it % 2L != 0L }
             .map { it * it }
         val second = numbers
             .filter { it % 2L == 0L }
             .map { it * it }
-        first.zipWith(second, BiFunction<Long, Long, Long> { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
+        first.zipWith(second, { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
             .blockingGet()
     }
 
@@ -98,7 +98,7 @@
 
     @Benchmark
     fun transformationsRx(): Long {
-       return rxNumbers().take(natural.toLong())
+       return rxNumbers().take(natural)
             .filter { it % 2L != 0L }
             .map { it * it }
             .filter { (it + 1) % 3 == 0L }.count()
diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts
index 96b17a3..adcbd90 100644
--- a/buildSrc/build.gradle.kts
+++ b/buildSrc/build.gradle.kts
@@ -9,6 +9,7 @@
 }
 
 val cacheRedirectorEnabled = System.getenv("CACHE_REDIRECTOR")?.toBoolean() == true
+val buildSnapshotTrain = properties["build_snapshot_train"]?.toString()?.toBoolean() == true
 
 repositories {
     if (cacheRedirectorEnabled) {
@@ -20,6 +21,10 @@
         maven("https://dl.bintray.com/kotlin/kotlin-eap")
         maven("https://dl.bintray.com/kotlin/kotlin-dev")
     }
+
+    if (buildSnapshotTrain) {
+        mavenLocal()
+    }
 }
 
 kotlinDslPluginOptions {
@@ -30,8 +35,14 @@
     file("../gradle.properties").inputStream().use { load(it) }
 }
 
-fun version(target: String): String =
-    props.getProperty("${target}_version")
+fun version(target: String): String {
+    // Intercept reading from properties file
+    if (target == "kotlin") {
+        val snapshotVersion = properties["kotlin_snapshot_version"]
+        if (snapshotVersion != null) return snapshotVersion.toString()
+    }
+    return props.getProperty("${target}_version")
+}
 
 dependencies {
     implementation(kotlin("gradle-plugin", version("kotlin")))
diff --git a/buildSrc/settings.gradle.kts b/buildSrc/settings.gradle.kts
index e5267ea..a6da8fd 100644
--- a/buildSrc/settings.gradle.kts
+++ b/buildSrc/settings.gradle.kts
@@ -1,17 +1,18 @@
 /*
  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
-
 pluginManagement {
+    val build_snapshot_train: String? by settings
     repositories {
         val cacheRedirectorEnabled = System.getenv("CACHE_REDIRECTOR")?.toBoolean() == true
-
         if (cacheRedirectorEnabled) {
             println("Redirecting repositories for buildSrc buildscript")
-
             maven("https://cache-redirector.jetbrains.com/plugins.gradle.org/m2")
         } else {
             maven("https://plugins.gradle.org/m2")
         }
+        if (build_snapshot_train?.toBoolean() == true) {
+            mavenLocal()
+        }
     }
 }
diff --git a/docs/basics.md b/docs/basics.md
index cb64328..8aca23a 100644
--- a/docs/basics.md
+++ b/docs/basics.md
@@ -235,12 +235,12 @@
 ### Scope builder
 
 In addition to the coroutine scope provided by different builders, it is possible to declare your own scope using the
-[coroutineScope] builder. It creates a coroutine scope and does not complete until all launched children complete. 
+[coroutineScope][_coroutineScope] builder. It creates a coroutine scope and does not complete until all launched children complete. 
 
-[runBlocking] and [coroutineScope] may look similar because they both wait for their body and all its children to complete.
+[runBlocking] and [coroutineScope][_coroutineScope] may look similar because they both wait for their body and all its children to complete.
 The main difference is that the [runBlocking] method _blocks_ the current thread for waiting,
-while [coroutineScope] just suspends, releasing the underlying thread for other usages.
-Because of that difference, [runBlocking] is a regular function and [coroutineScope] is a suspending function.
+while [coroutineScope][_coroutineScope] just suspends, releasing the underlying thread for other usages.
+Because of that difference, [runBlocking] is a regular function and [coroutineScope][_coroutineScope] is a suspending function.
 
 It can be demonstrated by the following example:
 
@@ -281,7 +281,7 @@
 -->
 
 Note that right after the "Task from coroutine scope" message (while waiting for nested launch)
- "Task from runBlocking" is executed and printed — even though the [coroutineScope] is not completed yet. 
+ "Task from runBlocking" is executed and printed — even though the [coroutineScope][_coroutineScope] is not completed yet. 
 
 ### Extract function refactoring
 
@@ -403,7 +403,7 @@
 [runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
 [Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
 [Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
-[coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
+[_coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
 [CoroutineScope()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope.html
 <!--- END -->
 
diff --git a/docs/composing-suspending-functions.md b/docs/composing-suspending-functions.md
index 013076a..81b6f53 100644
--- a/docs/composing-suspending-functions.md
+++ b/docs/composing-suspending-functions.md
@@ -308,7 +308,7 @@
 Let us take the [Concurrent using async](#concurrent-using-async) example and extract a function that 
 concurrently performs `doSomethingUsefulOne` and `doSomethingUsefulTwo` and returns the sum of their results.
 Because the [async] coroutine builder is defined as an extension on [CoroutineScope], we need to have it in the 
-scope and that is what the [coroutineScope] function provides:
+scope and that is what the [coroutineScope][_coroutineScope] function provides:
 
 <div class="sample" markdown="1" theme="idea" data-highlight-only>
 
@@ -431,5 +431,5 @@
 [Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/start.html
 [GlobalScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-global-scope/index.html
 [CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
-[coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
+[_coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
 <!--- END -->
diff --git a/docs/exception-handling.md b/docs/exception-handling.md
index d0b6b51..a307021 100644
--- a/docs/exception-handling.md
+++ b/docs/exception-handling.md
@@ -413,9 +413,9 @@
 
 #### Supervision scope
 
-Instead of [coroutineScope], we can use [supervisorScope] for _scoped_ concurrency. It propagates the cancellation
+Instead of [coroutineScope][_coroutineScope], we can use [supervisorScope][_supervisorScope] for _scoped_ concurrency. It propagates the cancellation
 in one direction only and cancels all its children only if it failed itself. It also waits for all children before completion
-just like [coroutineScope] does.
+just like [coroutineScope][_coroutineScope] does.
 
 <div class="sample" markdown="1" theme="idea" data-highlight-only>
 
@@ -464,7 +464,7 @@
 Another crucial difference between regular and supervisor jobs is exception handling.
 Every child should handle its exceptions by itself via the exception handling mechanism.
 This difference comes from the fact that child's failure does not propagate to the parent.
-It means that coroutines launched directly inside the [supervisorScope] _do_ use the [CoroutineExceptionHandler]
+It means that coroutines launched directly inside the [supervisorScope][_supervisorScope] _do_ use the [CoroutineExceptionHandler]
 that is installed in their scope in the same way as root coroutines do
 (see the [CoroutineExceptionHandler](#coroutineexceptionhandler) section for details). 
 
@@ -517,8 +517,8 @@
 [runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
 [SupervisorJob()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-supervisor-job.html
 [Job()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job.html
-[coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
-[supervisorScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/supervisor-scope.html
+[_coroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html
+[_supervisorScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/supervisor-scope.html
 <!--- INDEX kotlinx.coroutines.channels -->
 [actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
 [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
diff --git a/docs/flow.md b/docs/flow.md
index 2b1dfd5..4374e7a 100644
--- a/docs/flow.md
+++ b/docs/flow.md
@@ -203,7 +203,7 @@
 
 Notice the following differences in the code with the [Flow] from the earlier examples:
 
-* A builder function for [Flow] type is called [flow].
+* A builder function for [Flow] type is called [flow][_flow].
 * Code inside the `flow { ... }` builder block can suspend.
 * The `simple` function  is no longer marked with `suspend` modifier.   
 * Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
@@ -214,7 +214,7 @@
 
 ### Flows are cold
 
-Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow] builder does not
+Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow][_flow] builder does not
 run until the flow is collected. This becomes clear in the following example:
 
 <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
@@ -1785,7 +1785,7 @@
  
 ### Flow cancellation checks
 
-For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. 
+For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. 
 It means that a busy loop emitting from a `flow { ... }` is cancellable:
  
 <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
@@ -1944,7 +1944,7 @@
 [CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
 <!--- INDEX kotlinx.coroutines.flow -->
 [Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
-[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
+[_flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
 [FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
 [collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
 [flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
diff --git a/gradle.properties b/gradle.properties
index 18b9516..75c07d3 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -3,14 +3,14 @@
 #
 
 # Kotlin
-version=1.4.0-M1-SNAPSHOT
+version=1.4.0-SNAPSHOT
 group=org.jetbrains.kotlinx
 kotlin_version=1.4.0
 
 # Dependencies
 junit_version=4.12
 atomicfu_version=0.14.4
-knit_version=0.2.0
+knit_version=0.2.2
 html_version=0.6.8
 lincheck_version=2.7.1
 dokka_version=0.9.16-rdev-2-mpp-hacks
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index bb1c0f3..b86076f 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -394,6 +394,7 @@
 	public static final fun cancelFutureOnCompletion (Lkotlinx/coroutines/Job;Ljava/util/concurrent/Future;)Lkotlinx/coroutines/DisposableHandle;
 	public static final fun ensureActive (Lkotlin/coroutines/CoroutineContext;)V
 	public static final fun ensureActive (Lkotlinx/coroutines/Job;)V
+	public static final fun getJob (Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/Job;
 	public static final fun isActive (Lkotlin/coroutines/CoroutineContext;)Z
 }
 
@@ -941,7 +942,9 @@
 	public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
+	public static final fun debounce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun debounce-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow;
+	public static final fun debounceDuration (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
 	public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
 	public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -989,7 +992,6 @@
 	public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
-	public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt
index c0924a0..b7deacc 100644
--- a/kotlinx-coroutines-core/common/src/Builders.common.kt
+++ b/kotlinx-coroutines-core/common/src/Builders.common.kt
@@ -175,7 +175,6 @@
  *
  * This inline function calls [withContext].
  */
-@ExperimentalCoroutinesApi
 public suspend inline operator fun <T> CoroutineDispatcher.invoke(
     noinline block: suspend CoroutineScope.() -> T
 ): T = withContext(this, block)
diff --git a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt
index 0605817..2f00847 100644
--- a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt
+++ b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt
@@ -57,7 +57,6 @@
  * This function transitions this deferred in the same ways described by [CompletableDeferred.complete] and
  * [CompletableDeferred.completeExceptionally].
  */
-@ExperimentalCoroutinesApi // since 1.3.2, tentatively until 1.4.0
 public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean =
     result.fold({ complete(it) }, { completeExceptionally(it) })
 
diff --git a/kotlinx-coroutines-core/common/src/CoroutineStart.kt b/kotlinx-coroutines-core/common/src/CoroutineStart.kt
index 05e80e3..d5791c7 100644
--- a/kotlinx-coroutines-core/common/src/CoroutineStart.kt
+++ b/kotlinx-coroutines-core/common/src/CoroutineStart.kt
@@ -55,7 +55,7 @@
      * Cancellability of coroutine at suspension points depends on the particular implementation details of
      * suspending functions as in [DEFAULT].
      */
-    @ExperimentalCoroutinesApi
+    @ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
     ATOMIC,
 
     /**
@@ -71,7 +71,7 @@
      *
      * **Note: This is an experimental api.** Execution semantics of coroutines may change in the future when this mode is used.
      */
-    @ExperimentalCoroutinesApi
+    @ExperimentalCoroutinesApi  // Since 1.0.0, no ETA on stability
     UNDISPATCHED;
 
     /**
diff --git a/kotlinx-coroutines-core/common/src/Debug.common.kt b/kotlinx-coroutines-core/common/src/Debug.common.kt
index 013b983..949b05c 100644
--- a/kotlinx-coroutines-core/common/src/Debug.common.kt
+++ b/kotlinx-coroutines-core/common/src/Debug.common.kt
@@ -27,7 +27,7 @@
  * Copy mechanism is used only on JVM, but it might be convenient to implement it in common exceptions,
  * so on JVM their stacktraces will be properly recovered.
  */
-@ExperimentalCoroutinesApi
+@ExperimentalCoroutinesApi // Since 1.2.0, no ETA on stability
 public interface CopyableThrowable<T> where T : Throwable, T : CopyableThrowable<T> {
 
     /**
diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt
index f794844..aae623d 100644
--- a/kotlinx-coroutines-core/common/src/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/Delay.kt
@@ -95,7 +95,6 @@
  * }
  * ```
  */
-@ExperimentalCoroutinesApi
 public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
 
 /**
diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt
index 754fa43..2e05635 100644
--- a/kotlinx-coroutines-core/common/src/Job.kt
+++ b/kotlinx-coroutines-core/common/src/Job.kt
@@ -635,6 +635,15 @@
 public fun CoroutineContext.cancelChildren(): Unit = cancelChildren(null)
 
 /**
+ * Retrieves the current [Job] instance from the given [CoroutineContext] or
+ * throws [IllegalStateException] if no job is present in the context.
+ *
+ * This method is a short-cut for `coroutineContext[Job]!!` and should be used only when it is known in advance that
+ * the context does have instance of the job in it.
+ */
+public val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't contain Job in it: $this")
+
+/**
  * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [CoroutineContext.cancelChildren].
  */
 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 53ecf06..8edd2b3 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -137,14 +137,6 @@
         return sendSuspend(element)
     }
 
-    internal suspend fun sendFair(element: E) {
-        if (offerInternal(element) === OFFER_SUCCESS) {
-            yield() // Works only on fast path to properly work in sequential use-cases
-            return
-        }
-        return sendSuspend(element)
-    }
-
     public final override fun offer(element: E): Boolean {
         val result = offerInternal(element)
         return when {
diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
index 790580e..0193ed0 100644
--- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -47,7 +47,7 @@
     val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
     // We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
     // which passes all exceptions upstream to the source ReceiveChannel
-    return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
+    return scope.broadcast(capacity = capacity, start = start, onCompletion = { cancelConsumed(it) }) {
         for (e in this@broadcast) {
             send(e)
         }
diff --git a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
index 99994ea..a89c633 100644
--- a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
+++ b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
@@ -16,7 +16,6 @@
  * * [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
  *   (so that buffer contents stay the same), do not suspend.
  */
-@ExperimentalCoroutinesApi
 public enum class BufferOverflow {
     /**
      * Suspend on buffer overflow.
diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
index 3f53b48..a75d466 100644
--- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
@@ -34,9 +34,4 @@
         _channel.cancel(exception) // cancel the channel
         cancelCoroutine(exception) // cancel the job
     }
-
-    @Suppress("UNCHECKED_CAST")
-    suspend fun sendFair(element: E) {
-        (_channel as AbstractSendChannel<E>).sendFair(element)
-    }
 }
diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
index d19028b..398d5ca 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
@@ -3,7 +3,7 @@
  */
 @file:JvmMultifileClass
 @file:JvmName("ChannelsKt")
-@file:Suppress("DEPRECATION")
+@file:Suppress("DEPRECATION_ERROR")
 
 package kotlinx.coroutines.channels
 
@@ -52,7 +52,7 @@
  * to find bugs.
  */
 @Suppress("EXTENSION_SHADOWED_BY_MEMBER")
-@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
     @Suppress("DEPRECATION", "UNCHECKED_CAST")
     return (this as ReceiveChannel<E?>).receiveOrNull()
@@ -68,7 +68,7 @@
  * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
  * to find bugs.
  **/
-@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
     @Suppress("DEPRECATION", "UNCHECKED_CAST")
     return (this as ReceiveChannel<E?>).onReceiveOrNull
@@ -102,8 +102,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
     cancelConsumed(cause)
@@ -125,8 +125,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
     { cause: Throwable? ->
@@ -150,7 +150,6 @@
  *
  * The operation is _terminal_.
  */
-@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0
 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
     var cause: Throwable? = null
     try {
@@ -171,7 +170,6 @@
  * The operation is _terminal_.
  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0
 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
     consume {
         for (e in this) action(e)
@@ -187,8 +185,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
     var index = 0
@@ -207,8 +205,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
     elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
@@ -223,8 +221,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
     consume {
@@ -248,8 +246,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
     consume {
@@ -273,8 +271,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
     firstOrNull(predicate)
@@ -289,8 +287,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
     lastOrNull(predicate)
@@ -306,8 +304,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.first(): E =
     consume {
@@ -328,8 +326,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
     consumeEach {
@@ -348,8 +346,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
     consume {
@@ -369,8 +367,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
     consumeEach {
@@ -389,8 +387,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
     var index = 0
@@ -412,8 +410,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
     var index = 0
@@ -435,8 +433,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
     var lastIndex = -1
@@ -460,8 +458,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.last(): E =
     consume {
@@ -485,8 +483,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
     var last: E? = null
@@ -512,8 +510,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
     var lastIndex = -1
@@ -536,8 +534,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
     consume {
@@ -560,8 +558,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
     var last: E? = null
@@ -583,8 +581,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.single(): E =
     consume {
@@ -607,8 +605,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
     var single: E? = null
@@ -635,8 +633,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
     consume {
@@ -659,8 +657,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
     var single: E? = null
@@ -686,8 +684,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -714,8 +712,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -740,8 +738,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -762,8 +760,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -785,8 +783,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
     consumeEachIndexed { (index, element) ->
@@ -807,8 +805,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
     consumeEachIndexed { (index, element) ->
@@ -827,8 +825,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
     filter(context) { !predicate(it) }
@@ -843,8 +841,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 @Suppress("UNCHECKED_CAST")
 public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
@@ -860,8 +858,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
     consumeEach {
@@ -880,8 +878,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
     consumeEach {
@@ -900,8 +898,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
@@ -920,8 +918,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
@@ -940,8 +938,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
@@ -960,8 +958,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
@@ -980,8 +978,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1006,8 +1004,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1032,8 +1030,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
     associateTo(LinkedHashMap(), transform)
@@ -1053,8 +1051,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
     associateByTo(LinkedHashMap(), keySelector)
@@ -1073,8 +1071,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
     associateByTo(LinkedHashMap(), keySelector, valueTransform)
@@ -1093,8 +1091,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
     consumeEach {
@@ -1117,8 +1115,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
     consumeEach {
@@ -1140,8 +1138,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
     consumeEach {
@@ -1161,8 +1159,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
     consumeEach {
@@ -1181,8 +1179,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
     consumeEach {
@@ -1210,8 +1208,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
     toMap(LinkedHashMap())
@@ -1226,8 +1224,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
     consumeEach {
@@ -1246,8 +1244,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
     toCollection(ArrayList())
@@ -1264,8 +1262,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
     this.toMutableSet()
@@ -1280,8 +1278,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1303,8 +1301,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
     groupByTo(LinkedHashMap(), keySelector)
@@ -1323,8 +1321,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
     groupByTo(LinkedHashMap(), keySelector, valueTransform)
@@ -1342,8 +1340,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
     consumeEach {
@@ -1368,8 +1366,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
     consumeEach {
@@ -1388,8 +1386,8 @@
  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1411,8 +1409,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1435,8 +1433,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
     mapIndexed(context, transform).filterNotNull()
@@ -1454,8 +1452,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
     consumeEachIndexed { (index, element) ->
@@ -1477,8 +1475,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
     consumeEachIndexed { (index, element) ->
@@ -1500,8 +1498,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
     var index = 0
@@ -1524,8 +1522,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
     var index = 0
@@ -1546,8 +1544,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
     map(context, transform).filterNotNull()
@@ -1563,8 +1561,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
     consumeEach {
@@ -1584,8 +1582,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
     consumeEach {
@@ -1605,8 +1603,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
     consumeEach {
@@ -1626,8 +1624,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
     consumeEach {
@@ -1646,8 +1644,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1669,8 +1667,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
     this.distinctBy { it }
@@ -1688,8 +1686,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
     GlobalScope.produce(context, onCompletion = consumes()) {
@@ -1715,8 +1713,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
     toCollection(LinkedHashSet())
@@ -1731,8 +1729,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
     consumeEach {
@@ -1751,8 +1749,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
     consume {
@@ -1769,8 +1767,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
     consumeEach {
@@ -1789,8 +1787,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.count(): Int {
     var count = 0
@@ -1808,8 +1806,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
     var count = 0
@@ -1829,8 +1827,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
     var accumulator = initial
@@ -1853,8 +1851,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
     var index = 0
@@ -1875,8 +1873,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
     consume {
@@ -1905,8 +1903,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
     consume {
@@ -1930,8 +1928,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
     consume {
@@ -1960,8 +1958,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
     consume {
@@ -1985,8 +1983,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
     consume {
@@ -2003,8 +2001,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
     consumeEach {
@@ -2023,8 +2021,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
     consume {
@@ -2050,8 +2048,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
     consume {
@@ -2075,8 +2073,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
     var sum = 0
@@ -2096,8 +2094,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
     var sum = 0.0
@@ -2117,8 +2115,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
     map { it ?: throw IllegalArgumentException("null element found in $this.") }
@@ -2135,8 +2133,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
     val first = ArrayList<E>()
@@ -2162,8 +2160,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
     zip(other) { t1, t2 -> t1 to t2 }
@@ -2178,8 +2176,8 @@
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  */
 @Deprecated(
-    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
-    level = DeprecationLevel.WARNING
+    message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
+    level = DeprecationLevel.ERROR
 )
 public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
     GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 7e47e69..7d84cd2 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -204,8 +204,8 @@
 @FlowPreview
 @Deprecated(
     message = "Use channelFlow with awaitClose { } instead of flowViaChannel and invokeOnClose { }.",
-    level = DeprecationLevel.WARNING
-)
+    level = DeprecationLevel.ERROR
+) // To be removed in 1.4.x
 @Suppress("DeprecatedCallableAddReplaceWith")
 public fun <T> flowViaChannel(
     bufferSize: Int = BUFFERED,
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 59873eb..11969a4 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -367,10 +367,10 @@
     message = "Flow analogue of 'combineLatest' is 'combine'",
     replaceWith = ReplaceWith("combine(this, other, other2, transform)")
 )
-public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
+public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
     other: Flow<T2>,
     other2: Flow<T3>,
-    crossinline transform: suspend (T1, T2, T3) -> R
+    transform: suspend (T1, T2, T3) -> R
 ) = combine(this, other, other2, transform)
 
 @Deprecated(
@@ -378,11 +378,11 @@
     message = "Flow analogue of 'combineLatest' is 'combine'",
     replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
 )
-public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
+public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
     other: Flow<T2>,
     other2: Flow<T3>,
     other3: Flow<T4>,
-    crossinline transform: suspend (T1, T2, T3, T4) -> R
+    transform: suspend (T1, T2, T3, T4) -> R
 ) = combine(this, other, other2, other3, transform)
 
 @Deprecated(
@@ -390,12 +390,12 @@
     message = "Flow analogue of 'combineLatest' is 'combine'",
     replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
 )
-public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
+public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
     other: Flow<T2>,
     other2: Flow<T3>,
     other3: Flow<T4>,
     other4: Flow<T5>,
-    crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
+    transform: suspend (T1, T2, T3, T4, T5) -> R
 ): Flow<R> = combine(this, other, other2, other3, other4, transform)
 
 /**
@@ -434,7 +434,6 @@
     message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
     replaceWith = ReplaceWith("runningReduce(operation)")
 )
-@ExperimentalCoroutinesApi
 public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)
 
 @Deprecated(
@@ -482,4 +481,4 @@
     message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
     replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
 )
-public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
\ No newline at end of file
+public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
index 88dc775..427041a 100644
--- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
@@ -108,7 +108,6 @@
  * might be added to this interface in the future, but is stable for use.
  * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
  */
-@ExperimentalCoroutinesApi
 public interface SharedFlow<out T> : Flow<T> {
     /**
      * A snapshot of the replay cache.
@@ -138,7 +137,6 @@
  * might be added to this interface in the future, but is stable for use.
  * Use the `MutableSharedFlow(...)` constructor function to create an implementation.
  */
-@ExperimentalCoroutinesApi
 public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
     /**
      * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
@@ -202,7 +200,6 @@
  *   supported only when `replay > 0` or `extraBufferCapacity > 0`).
  */
 @Suppress("FunctionName", "UNCHECKED_CAST")
-@ExperimentalCoroutinesApi
 public fun <T> MutableSharedFlow(
     replay: Int = 0,
     extraBufferCapacity: Int = 0,
@@ -326,7 +323,7 @@
         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
         val emitted = synchronized(this) {
             if (tryEmitLocked(value)) {
-                resumes = findSlotsToResumeLocked()
+                resumes = findSlotsToResumeLocked(resumes)
                 true
             } else {
                 false
@@ -422,7 +419,7 @@
             // recheck buffer under lock again (make sure it is really full)
             if (tryEmitLocked(value)) {
                 cont.resume(Unit)
-                resumes = findSlotsToResumeLocked()
+                resumes = findSlotsToResumeLocked(resumes)
                 return@lock null
             }
             // add suspended emitter to the buffer
@@ -430,7 +427,7 @@
                 enqueueLocked(it)
                 queueSize++ // added to queue of waiting emitters
                 // synchronous shared flow might rendezvous with waiting emitter
-                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked()
+                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
             }
         }
         // outside of the lock: register dispose on cancellation
@@ -512,6 +509,8 @@
         updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
         // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
         cleanupTailLocked()
+        // We need to waken up suspended collectors if any emitters were resumed here
+        if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
         return resumes
     }
 
@@ -598,9 +597,9 @@
         }
     }
 
-    private fun findSlotsToResumeLocked(): Array<Continuation<Unit>?> {
-        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
-        var resumeCount = 0
+    private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
+        var resumes: Array<Continuation<Unit>?> = resumesIn
+        var resumeCount = resumesIn.size
         forEachSlotLocked loop@{ slot ->
             val cont = slot.cont ?: return@loop // only waiting slots
             if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt
index 935efda..19e5fa3 100644
--- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt
+++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt
@@ -12,7 +12,6 @@
  * A command emitted by [SharingStarted] implementations to control the sharing coroutine in
  * the [shareIn] and [stateIn] operators.
  */
-@ExperimentalCoroutinesApi
 public enum class SharingCommand {
     /**
      * Starts sharing, launching collection of the upstream flow.
@@ -75,19 +74,16 @@
  * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
  * The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
  */
-@ExperimentalCoroutinesApi
 public interface SharingStarted {
     public companion object {
         /**
          * Sharing is started immediately and never stops.
          */
-        @ExperimentalCoroutinesApi
         public val Eagerly: SharingStarted = StartedEagerly()
 
         /**
          * Sharing is started when the first subscriber appears and never stops.
          */
-        @ExperimentalCoroutinesApi
         public val Lazily: SharingStarted = StartedLazily()
 
         /**
@@ -108,7 +104,6 @@
          * are negative.
          */
         @Suppress("FunctionName")
-        @ExperimentalCoroutinesApi
         public fun WhileSubscribed(
             stopTimeoutMillis: Long = 0,
             replayExpirationMillis: Long = Long.MAX_VALUE
@@ -143,7 +138,6 @@
  */
 @Suppress("FunctionName")
 @ExperimentalTime
-@ExperimentalCoroutinesApi
 public fun SharingStarted.Companion.WhileSubscribed(
     stopTimeout: Duration = Duration.ZERO,
     replayExpiration: Duration = Duration.INFINITE
diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
index 8587606..a9a4ed3 100644
--- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
@@ -135,7 +135,6 @@
  * might be added to this interface in the future, but is stable for use.
  * Use the `MutableStateFlow(value)` constructor function to create an implementation.
  */
-@ExperimentalCoroutinesApi
 public interface StateFlow<out T> : SharedFlow<T> {
     /**
      * The current value of this state flow.
@@ -156,7 +155,6 @@
  * might be added to this interface in the future, but is stable for use.
  * Use the `MutableStateFlow()` constructor function to create an implementation.
  */
-@ExperimentalCoroutinesApi
 public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
     /**
      * The current value of this state flow.
@@ -180,7 +178,6 @@
  * Creates a [MutableStateFlow] with the given initial [value].
  */
 @Suppress("FunctionName")
-@ExperimentalCoroutinesApi
 public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
 
 // ------------------------------------ Implementation ------------------------------------
@@ -380,4 +377,4 @@
         return this
     }
     return fuseSharedFlow(context, capacity, onBufferOverflow)
-}
\ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index e53ef35..f3730cc 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -224,19 +224,33 @@
     private val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creation
 
     override suspend fun emit(value: T): Unit =
-        withContextUndispatched(emitContext, countOrElement, emitRef, value)
+        withContextUndispatched(emitContext, value, countOrElement, emitRef)
 }
 
 // Efficiently computes block(value) in the newContext
-private suspend fun <T, V> withContextUndispatched(
+internal suspend fun <T, V> withContextUndispatched(
     newContext: CoroutineContext,
+    value: V,
     countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
-    block: suspend (V) -> T, value: V
+    block: suspend (V) -> T
 ): T =
     suspendCoroutineUninterceptedOrReturn { uCont ->
         withCoroutineContext(newContext, countOrElement) {
-            block.startCoroutineUninterceptedOrReturn(value, Continuation(newContext) {
-                uCont.resumeWith(it)
-            })
+            block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
         }
     }
+
+// Continuation that links the caller with uCont with walkable CoroutineStackFrame
+private class StackFrameContinuation<T>(
+    private val uCont: Continuation<T>, override val context: CoroutineContext
+) : Continuation<T>, CoroutineStackFrame {
+
+    override val callerFrame: CoroutineStackFrame?
+        get() = uCont as? CoroutineStackFrame
+
+    override fun resumeWith(result: Result<T>) {
+        uCont.resumeWith(result)
+    }
+
+    override fun getStackTraceElement(): StackTraceElement? = null
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index 67da32c..bbdebd0 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -9,133 +9,135 @@
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.internal.*
-import kotlinx.coroutines.selects.*
+import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
 
-internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
-
-internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
-    first: Flow<T1>, second: Flow<T2>,
-    transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
-) {
-    coroutineScope {
-        val firstChannel = asFairChannel(first)
-        val secondChannel = asFairChannel(second)
-        var firstValue: Any? = null
-        var secondValue: Any? = null
-        var firstIsClosed = false
-        var secondIsClosed = false
-        while (!firstIsClosed || !secondIsClosed) {
-            select<Unit> {
-                onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
-                    firstValue = value
-                    if (secondValue !== null) {
-                        transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
-                    }
-                }
-
-                onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
-                    secondValue = value
-                    if (firstValue !== null) {
-                        transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
-                    }
-                }
-            }
-        }
-    }
-}
+private typealias Update = IndexedValue<Any?>
 
 @PublishedApi
 internal suspend fun <R, T> FlowCollector<R>.combineInternal(
     flows: Array<out Flow<T>>,
-    arrayFactory: () -> Array<T?>,
+    arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
     transform: suspend FlowCollector<R>.(Array<T>) -> Unit
-): Unit = coroutineScope {
+): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
     val size = flows.size
-    val channels = Array(size) { asFairChannel(flows[it]) }
+    if (size == 0) return@flowScope // bail-out for empty input
     val latestValues = arrayOfNulls<Any?>(size)
-    val isClosed = Array(size) { false }
-    var nonClosed = size
-    var remainingNulls = size
-    // See flow.combine(other) for explanation.
-    while (nonClosed != 0) {
-        select<Unit> {
-            for (i in 0 until size) {
-                onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
-                    if (latestValues[i] == null) --remainingNulls
-                    latestValues[i] = value
-                    if (remainingNulls != 0) return@onReceive
-                    val arguments = arrayFactory()
-                    for (index in 0 until size) {
-                        arguments[index] = NULL.unbox(latestValues[index])
+    latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
+    val resultChannel = Channel<Update>(size)
+    val nonClosed = LocalAtomicInt(size)
+    var remainingAbsentValues = size
+    for (i in 0 until size) {
+        // Coroutine per flow that keeps track of its value and sends result to downstream
+        launch {
+            try {
+                flows[i].collect { value ->
+                    resultChannel.send(Update(i, value))
+                    yield() // Emulate fairness, giving each flow chance to emit
+                }
+            } finally {
+                // Close the channel when there is no more flows
+                if (nonClosed.decrementAndGet() == 0) {
+                    resultChannel.close()
+                }
+            }
+        }
+    }
+
+    /*
+     * Batch-receive optimization: read updates in batches, but bail-out
+     * as soon as we encountered two values from the same source
+     */
+    val lastReceivedEpoch = ByteArray(size)
+    var currentEpoch: Byte = 0
+    while (true) {
+        ++currentEpoch
+        // Start batch
+        // The very first receive in epoch should be suspending
+        var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here
+        while (true) {
+            val index = element.index
+            // Update values
+            val previous = latestValues[index]
+            latestValues[index] = element.value
+            if (previous === UNINITIALIZED) --remainingAbsentValues
+            // Check epoch
+            // Received the second value from the same flow in the same epoch -- bail out
+            if (lastReceivedEpoch[index] == currentEpoch) break
+            lastReceivedEpoch[index] = currentEpoch
+            element = resultChannel.poll() ?: break
+        }
+
+        // Process batch result if there is enough data
+        if (remainingAbsentValues == 0) {
+            /*
+             * If arrayFactory returns null, then we can avoid array copy because
+             * it's our own safe transformer that immediately deconstructs the array
+             */
+            val results = arrayFactory()
+            if (results == null) {
+                transform(latestValues as Array<T>)
+            } else {
+                (latestValues as Array<T?>).copyInto(results)
+                transform(results as Array<T>)
+            }
+        }
+    }
+}
+
+internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+    unsafeFlow {
+        coroutineScope {
+            val second = produce<Any> {
+                flow2.collect { value ->
+                    return@collect channel.send(value ?: NULL)
+                }
+            }
+
+            /*
+             * This approach only works with rendezvous channel and is required to enforce correctness
+             * in the following scenario:
+             * ```
+             * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+             * val f2 = flowOf(1)
+             * f1.zip(f2) { ... }
+             * ```
+             *
+             * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+             */
+            val collectJob = Job()
+            (second as SendChannel<*>).invokeOnClose {
+                // Optimization to avoid AFE allocation when the other flow is done
+                if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
+            }
+
+            try {
+                /*
+                 * Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
+                 * hierarchy:
+                 * -Outer coroutineScope that owns the whole zip process
+                 * - First flow is collected by the child of coroutineScope, collectJob.
+                 *    So it can be safely cancelled as soon as the second flow is done
+                 * - **But** the downstream MUST NOT be cancelled when the second flow is done,
+                 *    so we emit to downstream from coroutineScope job.
+                 * Typically, such hierarchy requires coroutine for collector that communicates
+                 * with coroutines scope via a channel, but it's way too expensive, so
+                 * we are using this trick instead.
+                 */
+                val scopeContext = coroutineContext
+                val cnt = threadContextElements(scopeContext)
+                withContextUndispatched(coroutineContext + collectJob, Unit) {
+                    flow.collect { value ->
+                        withContextUndispatched(scopeContext, Unit, cnt) {
+                            val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
+                            emit(transform(value, NULL.unbox(otherValue)))
+                        }
                     }
-                    transform(arguments as Array<T>)
                 }
+            } catch (e: AbortFlowException) {
+                e.checkOwnership(owner = this@unsafeFlow)
+            } finally {
+                if (!second.isClosedForReceive) second.cancel()
             }
         }
     }
-}
-
-private inline fun SelectBuilder<Unit>.onReceive(
-    isClosed: Boolean,
-    channel: ReceiveChannel<Any>,
-    crossinline onClosed: () -> Unit,
-    noinline onReceive: suspend (value: Any) -> Unit
-) {
-    if (isClosed) return
-    @Suppress("DEPRECATION")
-    channel.onReceiveOrNull {
-        // TODO onReceiveOrClosed when boxing issues are fixed
-        if (it === null) onClosed()
-        else onReceive(it)
-    }
-}
-
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
-    val channel = channel as ChannelCoroutine<Any>
-    flow.collect { value ->
-        return@collect channel.sendFair(value ?: NULL)
-    }
-}
-
-internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
-    coroutineScope {
-        val first = asChannel(flow)
-        val second = asChannel(flow2)
-        /*
-         * This approach only works with rendezvous channel and is required to enforce correctness
-         * in the following scenario:
-         * ```
-         * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
-         * val f2 = flowOf(1)
-         * f1.zip(f2) { ... }
-         * ```
-         *
-         * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
-         */
-        (second as SendChannel<*>).invokeOnClose {
-            if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
-        }
-
-        val otherIterator = second.iterator()
-        try {
-            first.consumeEach { value ->
-                if (!otherIterator.hasNext()) {
-                    return@consumeEach
-                }
-                emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
-            }
-        } catch (e: AbortFlowException) {
-            e.checkOwnership(owner = this@unsafeFlow)
-        } finally {
-            if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
-        }
-    }
-}
-
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
-    flow.collect { value ->
-        return@collect channel.send(value ?: NULL)
-    }
-}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
index 22e1957..f20deb2 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
@@ -11,11 +11,20 @@
 /**
  * This value is used a a surrogate `null` value when needed.
  * It should never leak to the outside world.
+ * Its usage typically are paired with [Symbol.unbox] usages.
  */
 @JvmField
 @SharedImmutable
 internal val NULL = Symbol("NULL")
 
+/**
+ * Symbol to indicate that the value is not yet initialized.
+ * It should never leak to the outside world.
+ */
+@JvmField
+@SharedImmutable
+internal val UNINITIALIZED = Symbol("UNINITIALIZED")
+
 /*
  * Symbol used to indicate that the flow is complete.
  * It should never leak to the outside world.
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index aa55fea..c95b4be 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -64,39 +64,61 @@
  */
 @FlowPreview
 public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
-    require(timeoutMillis > 0) { "Debounce timeout should be positive" }
-    return scopedFlow { downstream ->
-        // Actually Any, KT-30796
-        val values = produce<Any?>(capacity = Channel.CONFLATED) {
-            collect { value -> send(value ?: NULL) }
-        }
-        var lastValue: Any? = null
-        while (lastValue !== DONE) {
-            select<Unit> {
-                // Should be receiveOrClosed when boxing issues are fixed
-                values.onReceiveOrNull {
-                    if (it == null) {
-                        if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
-                        lastValue = DONE
-                    } else {
-                        lastValue = it
-                    }
-                }
-
-                lastValue?.let { value ->
-                    // set timeout when lastValue != null
-                    onTimeout(timeoutMillis) {
-                        lastValue = null // Consume the value
-                        downstream.emit(NULL.unbox(value))
-                    }
-                }
-            }
-        }
-    }
+    require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
+    if (timeoutMillis == 0L) return this
+    return debounceInternal { timeoutMillis }
 }
 
 /**
  * Returns a flow that mirrors the original flow, but filters out values
+ * that are followed by the newer values within the given [timeout][timeoutMillis].
+ * The latest value is always emitted.
+ *
+ * A variation of [debounce] that allows specifying the timeout value dynamically.
+ *
+ * Example:
+ *
+ * ```kotlin
+ * flow {
+ *     emit(1)
+ *     delay(90)
+ *     emit(2)
+ *     delay(90)
+ *     emit(3)
+ *     delay(1010)
+ *     emit(4)
+ *     delay(1010)
+ *     emit(5)
+ * }.debounce {
+ *     if (it == 1) {
+ *         0L
+ *     } else {
+ *         1000L
+ *     }
+ * }
+ * ```
+ * <!--- KNIT example-delay-02.kt -->
+ *
+ * produces the following emissions
+ *
+ * ```text
+ * 1, 3, 4, 5
+ * ```
+ * <!--- TEST -->
+ *
+ * Note that the resulting flow does not emit anything as long as the original flow emits
+ * items faster than every [timeoutMillis] milliseconds.
+ *
+ * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
+ */
+@FlowPreview
+@OptIn(kotlin.experimental.ExperimentalTypeInference::class)
+@OverloadResolutionByLambdaReturnType
+public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
+    debounceInternal(timeoutMillis)
+
+/**
+ * Returns a flow that mirrors the original flow, but filters out values
  * that are followed by the newer values within the given [timeout].
  * The latest value is always emitted.
  *
@@ -129,7 +151,104 @@
  */
 @ExperimentalTime
 @FlowPreview
-public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())
+public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> =
+    debounce(timeout.toDelayMillis())
+
+/**
+ * Returns a flow that mirrors the original flow, but filters out values
+ * that are followed by the newer values within the given [timeout].
+ * The latest value is always emitted.
+ *
+ * A variation of [debounce] that allows specifying the timeout value dynamically.
+ *
+ * Example:
+ *
+ * ```kotlin
+ * flow {
+ *     emit(1)
+ *     delay(90.milliseconds)
+ *     emit(2)
+ *     delay(90.milliseconds)
+ *     emit(3)
+ *     delay(1010.milliseconds)
+ *     emit(4)
+ *     delay(1010.milliseconds)
+ *     emit(5)
+ * }.debounce {
+ *     if (it == 1) {
+ *         0.milliseconds
+ *     } else {
+ *         1000.milliseconds
+ *     }
+ * }
+ * ```
+ * <!--- KNIT example-delay-duration-02.kt -->
+ *
+ * produces the following emissions
+ *
+ * ```text
+ * 1, 3, 4, 5
+ * ```
+ * <!--- TEST -->
+ *
+ * Note that the resulting flow does not emit anything as long as the original flow emits
+ * items faster than every [timeout] unit.
+ *
+ * @param timeout [T] is the emitted value and the return value is timeout in [Duration].
+ */
+@ExperimentalTime
+@FlowPreview
+@JvmName("debounceDuration")
+@OptIn(kotlin.experimental.ExperimentalTypeInference::class)
+@OverloadResolutionByLambdaReturnType
+public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> =
+    debounceInternal { emittedItem ->
+        timeout(emittedItem).toDelayMillis()
+    }
+
+private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
+    scopedFlow { downstream ->
+        // Produce the values using the default (rendezvous) channel
+        // Note: the actual type is Any, KT-30796
+        val values = produce<Any?> {
+            collect { value -> send(value ?: NULL) }
+        }
+        // Now consume the values
+        var lastValue: Any? = null
+        while (lastValue !== DONE) {
+            var timeoutMillis = 0L // will be always computed when lastValue != null
+            // Compute timeout for this value
+            if (lastValue != null) {
+                timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue))
+                require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
+                if (timeoutMillis == 0L) {
+                    downstream.emit(NULL.unbox(lastValue))
+                    lastValue = null // Consume the value
+                }
+            }
+            // assert invariant: lastValue != null implies timeoutMillis > 0
+            assert { lastValue == null || timeoutMillis > 0 }
+            // wait for the next value with timeout
+            select<Unit> {
+                // Set timeout when lastValue exists and is not consumed yet
+                if (lastValue != null) {
+                    onTimeout(timeoutMillis) {
+                        downstream.emit(NULL.unbox(lastValue))
+                        lastValue = null // Consume the value
+                    }
+                }
+                // Should be receiveOrClosed when boxing issues are fixed
+                values.onReceiveOrNull { value ->
+                    if (value == null) {
+                        if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+                        lastValue = DONE
+                    } else {
+                        lastValue = value
+                    }
+                }
+            }
+        }
+    }
 
 /**
  * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
@@ -144,7 +263,7 @@
  *     }
  * }.sample(200)
  * ```
- * <!--- KNIT example-delay-02.kt -->
+ * <!--- KNIT example-delay-03.kt -->
  *
  * produces the following emissions
  *
@@ -152,7 +271,7 @@
  * 1, 3, 5, 7, 9
  * ```
  * <!--- TEST -->
- * 
+ *
  * Note that the latest element is not emitted if it does not fit into the sampling window.
  */
 @FlowPreview
@@ -215,7 +334,7 @@
  *     }
  * }.sample(200.milliseconds)
  * ```
- * <!--- KNIT example-delay-duration-02.kt -->
+ * <!--- KNIT example-delay-duration-03.kt -->
  *
  * produces the following emissions
  *
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
index 3ffe5fe..244af9a 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
@@ -71,7 +71,6 @@
  *     .collect { println(it) } // prints Begin, a, b, c
  * ```
  */
-@ExperimentalCoroutinesApi
 public fun <T> Flow<T>.onStart(
     action: suspend FlowCollector<T>.() -> Unit
 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
@@ -142,7 +141,6 @@
  * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
  * Use [catch] if you need to suppress failure and replace it with emission of elements.
  */
-@ExperimentalCoroutinesApi
 public fun <T> Flow<T>.onCompletion(
     action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
@@ -158,7 +156,12 @@
         throw e
     }
     // Normal completion
-    SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null)
+    val sc = SafeCollector(this, currentCoroutineContext())
+    try {
+        sc.action(null)
+    } finally {
+        sc.releaseIntercepted()
+    }
 }
 
 /**
@@ -173,7 +176,6 @@
  * }.collect { println(it) } // prints 1, 2
  * ```
  */
-@ExperimentalCoroutinesApi
 public fun <T> Flow<T>.onEmpty(
     action: suspend FlowCollector<T>.() -> Unit
 ): Flow<T> = unsafeFlow {
@@ -198,12 +200,6 @@
     }
 }
 
-// It was only released in 1.3.0-M2, remove in 1.4.0
-/** @suppress */
-@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with a version w/o FlowCollector receiver")
-public fun <T> Flow<T>.onCompletion(action: suspend (cause: Throwable?) -> Unit): Flow<T> =
-    onCompletion { action(it) }
-
 private suspend fun <T> FlowCollector<T>.invokeSafely(
     action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
     cause: Throwable?
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
index 4dd89ee..fe737a5 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
@@ -132,7 +132,6 @@
  * @param started the strategy that controls when sharing is started and stopped.
  * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
  */
-@ExperimentalCoroutinesApi
 public fun <T> Flow<T>.shareIn(
     scope: CoroutineScope,
     started: SharingStarted,
@@ -297,7 +296,6 @@
  *   This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
  *   with the `replayExpirationMillis` parameter.
  */
-@ExperimentalCoroutinesApi
 public fun <T> Flow<T>.stateIn(
     scope: CoroutineScope,
     started: SharingStarted,
@@ -316,7 +314,6 @@
  *
  * @param scope the coroutine scope in which sharing is started.
  */
-@ExperimentalCoroutinesApi
 public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
     val config = configureSharing(1)
     val result = CompletableDeferred<StateFlow<T>>()
@@ -330,13 +327,20 @@
     result: CompletableDeferred<StateFlow<T>>
 ) {
     launch(context) {
-        var state: MutableStateFlow<T>? = null
-        upstream.collect { value ->
-            state?.let { it.value = value } ?: run {
-                state = MutableStateFlow(value).also {
-                    result.complete(it.asStateFlow())
+        try {
+            var state: MutableStateFlow<T>? = null
+            upstream.collect { value ->
+                state?.let { it.value = value } ?: run {
+                    state = MutableStateFlow(value).also {
+                        result.complete(it.asStateFlow())
+                    }
                 }
             }
+        } catch (e: Throwable) {
+            // Notify the waiter that the flow has failed
+            result.completeExceptionally(e)
+            // But still cancel the scope where state was (not) produced
+            throw e
         }
     }
 }
@@ -346,14 +350,12 @@
 /**
  * Represents this mutable shared flow as a read-only shared flow.
  */
-@ExperimentalCoroutinesApi
 public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
     ReadonlySharedFlow(this)
 
 /**
  * Represents this mutable state flow as a read-only state flow.
  */
-@ExperimentalCoroutinesApi
 public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
     ReadonlyStateFlow(this)
 
@@ -384,7 +386,6 @@
  *
  * The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements.
  */
-@ExperimentalCoroutinesApi
 public fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T> =
     SubscribedSharedFlow(this, action)
 
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
index ec66181..790c089 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -8,7 +8,6 @@
 
 package kotlinx.coroutines.flow
 
-import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.internal.*
 import kotlin.jvm.*
 import kotlinx.coroutines.flow.flow as safeFlow
@@ -31,9 +30,7 @@
  */
 @JvmName("flowCombine")
 public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
-    combineTransformInternal(this@combine, flow) { a, b ->
-        emit(transform(a, b))
-    }
+    combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
 }
 
 /**
@@ -75,10 +72,11 @@
 public fun <T1, T2, R> Flow<T1>.combineTransform(
     flow: Flow<T2>,
     @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
-): Flow<R> = safeFlow {
-    combineTransformInternal(this@combineTransform, flow) { a, b ->
-        transform(a, b)
-    }
+): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
+    transform(
+        args[0] as T1,
+        args[1] as T2
+    )
 }
 
 /**
@@ -102,7 +100,7 @@
     flow: Flow<T1>,
     flow2: Flow<T2>,
     @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
-): Flow<R> = combineTransform(flow, flow2) { args: Array<*> ->
+): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2
@@ -113,12 +111,12 @@
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-public inline fun <T1, T2, T3, R> combine(
+public fun <T1, T2, T3, R> combine(
     flow: Flow<T1>,
     flow2: Flow<T2>,
     flow3: Flow<T3>,
-    @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
-): Flow<R> = combine(flow, flow2, flow3) { args: Array<*> ->
+    @BuilderInference transform: suspend (T1, T2, T3) -> R
+): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -132,12 +130,12 @@
  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
  * generic function that may transform emitted element, skip it or emit it multiple times.
  */
-public inline fun <T1, T2, T3, R> combineTransform(
+public fun <T1, T2, T3, R> combineTransform(
     flow: Flow<T1>,
     flow2: Flow<T2>,
     flow3: Flow<T3>,
-    @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
-): Flow<R> = combineTransform(flow, flow2, flow3) { args: Array<*> ->
+    @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
+): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -149,12 +147,12 @@
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-public inline fun <T1, T2, T3, T4, R> combine(
+public fun <T1, T2, T3, T4, R> combine(
     flow: Flow<T1>,
     flow2: Flow<T2>,
     flow3: Flow<T3>,
     flow4: Flow<T4>,
-    crossinline transform: suspend (T1, T2, T3, T4) -> R
+    transform: suspend (T1, T2, T3, T4) -> R
 ): Flow<R> = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
     transform(
         args[0] as T1,
@@ -170,13 +168,13 @@
  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
  * generic function that may transform emitted element, skip it or emit it multiple times.
  */
-public inline fun <T1, T2, T3, T4, R> combineTransform(
+public fun <T1, T2, T3, T4, R> combineTransform(
     flow: Flow<T1>,
     flow2: Flow<T2>,
     flow3: Flow<T3>,
     flow4: Flow<T4>,
-    @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
-): Flow<R> = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
+    @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
+): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -189,14 +187,14 @@
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-public inline fun <T1, T2, T3, T4, T5, R> combine(
+public fun <T1, T2, T3, T4, T5, R> combine(
     flow: Flow<T1>,
     flow2: Flow<T2>,
     flow3: Flow<T3>,
     flow4: Flow<T4>,
     flow5: Flow<T5>,
-    crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
-): Flow<R> = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
+    transform: suspend (T1, T2, T3, T4, T5) -> R
+): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -212,14 +210,14 @@
  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
  * generic function that may transform emitted element, skip it or emit it multiple times.
  */
-public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
+public fun <T1, T2, T3, T4, T5, R> combineTransform(
     flow: Flow<T1>,
     flow2: Flow<T2>,
     flow3: Flow<T3>,
     flow4: Flow<T4>,
     flow5: Flow<T5>,
-    @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
-): Flow<R> = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
+    @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
+): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -253,6 +251,31 @@
     combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
 }
 
+/*
+ * Same as combine, but does not copy array each time, deconstructing existing
+ * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
+ */
+private inline fun <reified T, R> combineUnsafe(
+    vararg flows: Flow<T>,
+    crossinline transform: suspend (Array<T>) -> R
+): Flow<R> = flow {
+    combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
+}
+
+/*
+ * Same as combineTransform, but does not copy array each time, deconstructing existing
+ * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
+ */
+private inline fun <reified T, R> combineTransformUnsafe(
+    vararg flows: Flow<T>,
+    @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = safeFlow {
+    combineInternal(flows, nullArrayFactory(), { transform(it) })
+}
+
+// Saves bunch of anonymous classes
+private fun <T> nullArrayFactory(): () -> Array<T>? = { null }
+
 /**
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
@@ -298,5 +321,11 @@
  *     println(it) // Will print "1a 2b 3c"
  * }
  * ```
+ *
+ * ### Buffering
+ *
+ * The upstream flow is collected sequentially in the same coroutine without any buffering, while the
+ * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator
+ * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency.
  */
 public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
diff --git a/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt
new file mode 100644
index 0000000..bcfb932
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+/*
+ * These are atomics that are used as local variables
+ * where atomicfu doesn't support its tranformations.
+ *
+ * Have `Local` prefix to avoid AFU clashes during star-imports
+ */
+internal expect class LocalAtomicInt(value: Int) {
+    fun get(): Int
+    fun set(value: Int)
+    fun decrementAndGet(): Int
+}
+
+internal inline var LocalAtomicInt.value
+    get() = get()
+    set(value) = set(value)
diff --git a/kotlinx-coroutines-core/common/test/EnsureActiveTest.kt b/kotlinx-coroutines-core/common/test/JobExtensionsTest.kt
similarity index 81%
rename from kotlinx-coroutines-core/common/test/EnsureActiveTest.kt
rename to kotlinx-coroutines-core/common/test/JobExtensionsTest.kt
index 89e749c..b335926 100644
--- a/kotlinx-coroutines-core/common/test/EnsureActiveTest.kt
+++ b/kotlinx-coroutines-core/common/test/JobExtensionsTest.kt
@@ -4,9 +4,10 @@
 
 package kotlinx.coroutines
 
+import kotlin.coroutines.*
 import kotlin.test.*
 
-class EnsureActiveTest : TestBase() {
+class JobExtensionsTest : TestBase() {
 
     private val job = Job()
     private val scope = CoroutineScope(job + CoroutineExceptionHandler { _, _ ->  })
@@ -81,4 +82,14 @@
         assertTrue(exception is JobCancellationException)
         assertTrue(exception.cause is TestException)
     }
+
+    @Test
+    fun testJobExtension() = runTest {
+        assertSame(coroutineContext[Job]!!, coroutineContext.job)
+        assertSame(NonCancellable, NonCancellable.job)
+        assertSame(job, job.job)
+        assertFailsWith<IllegalStateException> { EmptyCoroutineContext.job }
+        assertFailsWith<IllegalStateException> { Dispatchers.Default.job }
+        assertFailsWith<IllegalStateException> { (Dispatchers.Default + CoroutineName("")).job }
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
index ba786d5..fb704c5 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
@@ -51,123 +51,6 @@
     }
 
     @Test
-    fun testAssociate() = runTest {
-        assertEquals(testList.associate { it * 2 to it * 3 },
-            testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())
-    }
-
-    @Test
-    fun testAssociateBy() = runTest {
-        assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 })
-    }
-
-    @Test
-    fun testAssociateBy2() = runTest {
-        assertEquals(testList.associateBy({ it * 2}, { it * 3 }),
-            testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap())
-    }
-
-    @Test
-    fun testDistinct() = runTest {
-        assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList())
-    }
-
-    @Test
-    fun testDistinctBy() = runTest {
-        assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList())
-    }
-
-    @Test
-    fun testToCollection() = runTest {
-        val target = mutableListOf<Int>()
-        testList.asReceiveChannel().toCollection(target)
-        assertEquals(testList, target)
-    }
-
-    @Test
-    fun testDrop() = runTest {
-        for (i in 0..testList.size) {
-            assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i")
-        }
-    }
-
-    @Test
-    fun testElementAtOrElse() = runTest {
-        assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 })
-        assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 })
-    }
-
-    @Test
-    fun testFirst() = runTest {
-        assertEquals(testList.first(), testList.asReceiveChannel().first())
-        for (i in testList) {
-            assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i })
-        }
-        try {
-            testList.asReceiveChannel().first { it == 9 }
-            fail()
-        } catch (nse: NoSuchElementException) {
-        }
-    }
-
-    @Test
-    fun testFirstOrNull() = runTest {
-        assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull())
-        assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 })
-        assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 })
-    }
-
-    @Test
-    fun testFlatMap() = runTest {
-        assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList())
-
-    }
-
-    @Test
-    fun testFold() = runTest {
-        assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } },
-            testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList())
-    }
-
-    @Test
-    fun testFoldIndexed() = runTest {
-        assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } },
-            testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList())
-    }
-
-    @Test
-    fun testGroupBy() = runTest {
-        assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 })
-    }
-
-    @Test
-    fun testGroupBy2() = runTest {
-        assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap())
-
-    }
-
-    @Test
-    fun testMap() = runTest {
-        assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList())
-
-    }
-
-    @Test
-    fun testMapToCollection() = runTest {
-        val c = mutableListOf<Int>()
-        testList.asReceiveChannel().mapTo(c) { it + 10 }
-        assertEquals(testList.map { it + 10 }, c)
-    }
-
-    @Test
-    fun testMapToSendChannel() = runTest {
-        val c = produce<Int> {
-            testList.asReceiveChannel().mapTo(channel) { it + 10 }
-        }
-        assertEquals(testList.map { it + 10 }, c.toList())
-    }
-
-    @Test
     fun testEmptyList() = runTest {
         assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
     }
@@ -178,413 +61,6 @@
 
     }
 
-    @Test
-    fun testEmptySet() = runTest {
-        assertTrue(emptyList<Nothing>().asReceiveChannel().toSet().isEmpty())
-
-    }
-
-    @Test
-    fun testToSet() = runTest {
-        assertEquals(testList.toSet(), testList.asReceiveChannel().toSet())
-    }
-
-    @Test
-    fun testToMutableSet() = runTest {
-        assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet())
-    }
-
-    @Test
-    fun testEmptySequence() = runTest {
-        val channel = Channel<Nothing>()
-        channel.close()
-
-        assertEquals(emptyList<Nothing>().asReceiveChannel().count(), 0)
-    }
-
-    @Test
-    fun testEmptyMap() = runTest {
-        val channel = Channel<Pair<Nothing, Nothing>>()
-        channel.close()
-
-        assertTrue(channel.toMap().isEmpty())
-    }
-
-    @Test
-    fun testToMap() = runTest {
-        val values = testList.map { it to it.toString() }
-        assertEquals(values.toMap(), values.asReceiveChannel().toMap())
-    }
-
-    @Test
-    fun testReduce() = runTest {
-        assertEquals(testList.reduce { acc, e -> acc * e },
-            testList.asReceiveChannel().reduce { acc, e -> acc * e })
-    }
-
-    @Test
-    fun testReduceIndexed() = runTest {
-        assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e },
-            testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e })
-    }
-
-    @Test
-    fun testTake() = runTest {
-        for (i in 0..testList.size) {
-            assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList())
-        }
-    }
-
-    @Test
-    fun testPartition() = runTest {
-        assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 })
-    }
-
-    @Test
-    fun testZip() = runTest {
-        val other = listOf("a", "b")
-        assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList())
-    }
-
-    @Test
-    fun testElementAt() = runTest {
-        testList.indices.forEach { i ->
-            assertEquals(testList[i], testList.asReceiveChannel().elementAt(i))
-        }
-    }
-
-    @Test
-    fun testElementAtOrNull() = runTest {
-        testList.indices.forEach { i ->
-            assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i))
-        }
-        assertNull(testList.asReceiveChannel().elementAtOrNull(-1))
-        assertNull(testList.asReceiveChannel().elementAtOrNull(testList.size))
-    }
-
-    @Test
-    fun testFind() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.find { it % 2 == mod },
-                testList.asReceiveChannel().find { it % 2 == mod })
-        }
-    }
-
-    @Test
-    fun testFindLast() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod })
-        }
-    }
-
-    @Test
-    fun testIndexOf() = runTest {
-        repeat(testList.size + 1) { i ->
-            assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i))
-        }
-    }
-
-    @Test
-    fun testLastIndexOf() = runTest {
-        repeat(testList.size + 1) { i ->
-            assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i))
-        }
-    }
-
-    @Test
-    fun testIndexOfFirst() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.indexOfFirst { it % 2 == mod },
-                testList.asReceiveChannel().indexOfFirst { it % 2 == mod })
-        }
-    }
-
-    @Test
-    fun testIndexOfLast() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.indexOfLast { it % 2 != mod },
-                testList.asReceiveChannel().indexOfLast { it % 2 != mod })
-        }
-    }
-
-    @Test
-    fun testLastOrNull() = runTest {
-        assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull())
-        assertNull(emptyList<Int>().asReceiveChannel().lastOrNull())
-    }
-
-    @Test
-    fun testSingleOrNull() = runTest {
-        assertEquals(1, listOf(1).asReceiveChannel().singleOrNull())
-        assertNull(listOf(1, 2).asReceiveChannel().singleOrNull())
-        assertNull(emptyList<Int>().asReceiveChannel().singleOrNull())
-        repeat(testList.size + 1) { i ->
-            assertEquals(testList.singleOrNull { it == i },
-                testList.asReceiveChannel().singleOrNull { it == i })
-        }
-        repeat(3) { mod ->
-            assertEquals(testList.singleOrNull { it % 2 == mod },
-                testList.asReceiveChannel().singleOrNull { it % 2 == mod })
-        }
-    }
-
-    @Test
-    fun testDropWhile() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.dropWhile { it % 2 == mod },
-                testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList())
-        }
-    }
-
-    @Test
-    fun testFilter() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.filter { it % 2 == mod },
-                testList.asReceiveChannel().filter { it % 2 == mod }.toList())
-        }
-    }
-
-    @Test
-    fun testFilterToCollection() = runTest {
-        repeat(3) { mod ->
-            val c = mutableListOf<Int>()
-            testList.asReceiveChannel().filterTo(c) { it % 2 == mod }
-            assertEquals(testList.filter { it % 2 == mod }, c)
-        }
-    }
-
-    @Test
-    fun testFilterToSendChannel() = runTest {
-        repeat(3) { mod ->
-            val c = produce<Int> {
-                testList.asReceiveChannel().filterTo(channel) { it % 2 == mod }
-            }
-            assertEquals(testList.filter { it % 2 == mod }, c.toList())
-        }
-    }
-
-    @Test
-    fun testFilterNot() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.filterNot { it % 2 == mod },
-                testList.asReceiveChannel().filterNot { it % 2 == mod }.toList())
-        }
-    }
-
-    @Test
-    fun testFilterNotToCollection() = runTest {
-        repeat(3) { mod ->
-            val c = mutableListOf<Int>()
-            testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod }
-            assertEquals(testList.filterNot { it % 2 == mod }, c)
-        }
-    }
-
-    @Test
-    fun testFilterNotToSendChannel() = runTest {
-        repeat(3) { mod ->
-            val c = produce<Int> {
-                testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod }
-            }
-            assertEquals(testList.filterNot { it % 2 == mod }, c.toList())
-        }
-    }
-
-    @Test
-    fun testFilterNotNull() = runTest {
-        repeat(3) { mod ->
-            assertEquals(
-                testList.mapNotNull { it.takeIf { it % 2 == mod } },
-                testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList())
-        }
-    }
-
-    @Test
-    fun testFilterNotNullToCollection() = runTest {
-        repeat(3) { mod ->
-            val c = mutableListOf<Int>()
-            testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c)
-            assertEquals(testList.mapNotNull { it.takeIf { it % 2 == mod } }, c)
-        }
-    }
-
-    @Test
-    fun testFilterNotNullToSendChannel() = runTest {
-        repeat(3) { mod ->
-            val c = produce<Int> {
-                testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel)
-            }
-            assertEquals(testList.mapNotNull { it.takeIf { it % 2 == mod } }, c.toList())
-        }
-    }
-
-    @Test
-    fun testFilterIndexed() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod },
-                testList.asReceiveChannel().filterIndexed { index, _ ->  index % 2 == mod }.toList())
-        }
-    }
-
-    @Test
-    fun testFilterIndexedToCollection() = runTest {
-        repeat(3) { mod ->
-            val c = mutableListOf<Int>()
-            testList.asReceiveChannel().filterIndexedTo(c) { index, _ ->  index % 2 == mod }
-            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod }, c)
-        }
-    }
-
-    @Test
-    fun testFilterIndexedToChannel() = runTest {
-        repeat(3) { mod ->
-            val c = produce<Int> {
-                testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
-            }
-            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod }, c.toList())
-        }
-    }
-
-    @Test
-    fun testTakeWhile() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.takeWhile { it % 2 != mod },
-                testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList())
-        }
-    }
-
-    @Test
-    fun testToChannel() = runTest {
-        val c = produce<Int> {
-            testList.asReceiveChannel().toChannel(channel)
-        }
-        assertEquals(testList, c.toList())
-    }
-
-    @Test
-    fun testMapIndexed() = runTest {
-        assertEquals(testList.mapIndexed { index, i -> index + i },
-            testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList())
-    }
-
-    @Test
-    fun testMapIndexedToCollection() = runTest {
-        val c = mutableListOf<Int>()
-        testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i }
-        assertEquals(testList.mapIndexed { index, i -> index + i }, c)
-    }
-
-    @Test
-    fun testMapIndexedToSendChannel() = runTest {
-        val c = produce<Int> {
-            testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i }
-        }
-        assertEquals(testList.mapIndexed { index, i -> index + i }, c.toList())
-    }
-
-    @Test
-    fun testMapNotNull() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } },
-                testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList())
-        }
-    }
-
-    @Test
-    fun testMapNotNullToCollection() = runTest {
-        repeat(3) { mod ->
-            val c = mutableListOf<Int>()
-            testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } }
-            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c)
-        }
-    }
-
-    @Test
-    fun testMapNotNullToSendChannel() = runTest {
-        repeat(3) { mod ->
-            val c = produce<Int> {
-                testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } }
-            }
-            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c.toList())
-        }
-    }
-
-    @Test
-    fun testMapIndexedNotNull() = runTest {
-        repeat(3) { mod ->
-            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } },
-                testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList())
-        }
-    }
-
-    @Test
-    fun testMapIndexedNotNullToCollection() = runTest {
-        repeat(3) { mod ->
-            val c = mutableListOf<Int>()
-            testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } }
-            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c)
-        }
-    }
-
-    @Test
-    fun testMapIndexedNotNullToSendChannel() = runTest {
-        repeat(3) { mod ->
-            val c = produce<Int> {
-                testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } }
-            }
-            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c.toList())
-        }
-    }
-
-    @Test
-    fun testWithIndex() = runTest {
-        assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList())
-    }
-
-    @Test
-    fun testMaxBy() = runTest {
-        assertEquals(testList.maxBy { 10 - abs(it - 2) },
-            testList.asReceiveChannel().maxBy { 10 - abs(it - 2) })
-    }
-
-    @Test
-    fun testMaxWith() = runTest {
-        val cmp = compareBy<Int> { 10 - abs(it - 2) }
-        assertEquals(testList.maxWith(cmp),
-            testList.asReceiveChannel().maxWith(cmp))
-    }
-
-    @Test
-    fun testMinBy() = runTest {
-        assertEquals(testList.minBy { abs(it - 2) },
-            testList.asReceiveChannel().minBy { abs(it - 2) })
-    }
-
-    @Test
-    fun testMinWith() = runTest {
-        val cmp = compareBy<Int> { abs(it - 2) }
-        assertEquals(testList.minWith(cmp),
-            testList.asReceiveChannel().minWith(cmp))
-    }
-
-    @Test
-    fun testSumBy() = runTest {
-        assertEquals(testList.sumBy { it * 3 },
-            testList.asReceiveChannel().sumBy { it * 3 })
-    }
-
-    @Test
-    fun testSumByDouble() = runTest {
-        val expected = testList.sumByDouble { it * 3.0 }
-        val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 }
-        assertEquals(expected, actual)
-    }
-
-    @Test
-    fun testRequireNoNulls() = runTest {
-        assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList())
-    }
-
     private fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
         GlobalScope.produce(context) {
             for (element in this@asReceiveChannel)
diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
index 885f1d6..6ddde00 100644
--- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
@@ -163,7 +163,7 @@
     private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
         val source = Channel<Int>()
         expect(1)
-        val produced = produce<Int>(coroutineContext, onCompletion = source.consumes()) {
+        val produced = produce<Int>(coroutineContext, onCompletion = { source.cancelConsumed(it) }) {
             expect(2)
             source.receive()
         }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
index b51197e..8c65ea4 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
@@ -161,4 +161,33 @@
         }.singleOrNull()
         assertNull(value)
     }
+
+    @Test
+    fun testFairnessInVariousConfigurations() = runTest {
+        // Test various configurations
+        for (flowsCount in 2..5) {
+            for (flowSize in 1..5) {
+                val flows = List(flowsCount) { (1..flowSize).asFlow() }
+                val combined = combine(flows) { it.joinToString(separator = "") }.toList()
+                val expected = List(flowSize) { (it +  1).toString().repeat(flowsCount) }
+                assertEquals(expected, combined, "Count: $flowsCount, size: $flowSize")
+            }
+        }
+    }
+
+    @Test
+    fun testEpochOverflow() = runTest {
+        val flow = (0..1023).asFlow()
+        val result = flow.combine(flow) { a, b -> a + b }.toList()
+        assertEquals(List(1024) { it * 2 } , result)
+    }
+
+    @Test
+    fun testArrayType() = runTest {
+        val arr = flowOf(1)
+        combine(listOf(arr, arr)) {
+            println(it[0])
+            it[0]
+        }.toList().also { println(it) }
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
index 2893321..5e2926d 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
@@ -1,10 +1,11 @@
 /*
  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
-
-package kotlinx.coroutines.flow
+@file:Suppress("UNCHECKED_CAST")
+package kotlinx.coroutines.flow.operators
 
 import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
 import kotlin.test.*
 import kotlinx.coroutines.flow.combine as combineOriginal
 import kotlinx.coroutines.flow.combineTransform as combineTransformOriginal
@@ -21,28 +22,28 @@
         val flow = flowOf("a", "b", "c")
         val flow2 = flowOf(1, 2, 3)
         val list = flow.combineLatest(flow2) { i, j -> i + j }.toList()
-        assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list)
+        assertEquals(listOf("a1", "b2", "c3"), list)
     }
 
     @Test
     fun testNulls() = runTest {
         val flow = flowOf("a", null, null)
         val flow2 = flowOf(1, 2, 3)
-        val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
-        assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list)
+        val list = flow.combineLatest(flow2) { i, j -> i + j }.toList()
+        assertEquals(listOf("a1", "null2", "null3"), list)
     }
 
     @Test
     fun testNullsOther() = runTest {
         val flow = flowOf("a", "b", "c")
         val flow2 = flowOf(null, 2, null)
-        val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
-        assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list)
+        val list = flow.combineLatest(flow2) { i, j -> i + j }.toList()
+        assertEquals(listOf("anull", "b2", "cnull"), list)
     }
 
     @Test
     fun testEmptyFlow() = runTest {
-        val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>(), { i, j -> i + j })
+        val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>()) { i, j -> i + j }
         assertNull(flow.singleOrNull())
     }
 
@@ -208,12 +209,12 @@
         }
         val f2 = flow {
             emit(1)
-            hang { expect(3) }
+            expectUnreached()
         }
 
-        val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) }
+        val flow = f1.combineLatest(f2) { _, _ -> 1 }.onEach { expect(2) }
         assertFailsWith<CancellationException>(flow)
-        finish(4)
+        finish(3)
     }
 
     @Test
@@ -229,7 +230,7 @@
             hang { expect(6) }
         }
 
-        val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach {
+        val flow = f1.combineLatest(f2) { _, _ -> 1 }.onEach {
             expect(1)
             yield()
             expect(4)
@@ -248,7 +249,7 @@
                 emit(Unit) // emit
             }
             cancel() // cancel the scope
-            flow.combineLatest(flow) { u, _ -> u }.collect {
+            flow.combineLatest(flow) { _, _ ->  }.collect {
                 // should not be reached, because cancelled before it runs
                 expectUnreached()
             }
@@ -265,15 +266,26 @@
         emit(transform(a, b))
     }
 }
+// Array null-out is an additional test for our array elimination optimization
 
 class CombineVarargAdapterTest : CombineTestBase() {
     override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
-        combineOriginal(this, other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
+        combineOriginal(this, other) { args: Array<Any?> ->
+            transform(args[0] as T1, args[1] as T2).also {
+                args[0] = null
+                args[1] = null
+            }
+        }
 }
 
 class CombineIterableTest : CombineTestBase() {
     override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
-        combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) }
+        combineOriginal(listOf(this, other)) { args ->
+            transform(args[0] as T1, args[1] as T2).also {
+                args[0] = null
+                args[1] = null
+            }
+        }
 }
 
 class CombineTransformAdapterTest : CombineTestBase() {
@@ -283,11 +295,20 @@
 
 class CombineTransformVarargAdapterTest : CombineTestBase() {
     override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
-        combineTransformOriginal(this, other) { args: Array<Any?> -> emit(transform(args[0] as T1, args[1] as T2)) }
+        combineTransformOriginal(this, other) { args: Array<Any?> ->
+            emit(transform(args[0] as T1, args[1] as T2))   // Mess up with array
+            args[0] = null
+            args[1] = null
+        }
 }
 
 class CombineTransformIterableTest : CombineTestBase() {
     override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
-        combineTransformOriginal(listOf(this, other)) { args -> emit(transform(args[0] as T1, args[1] as T2)) }
+        combineTransformOriginal(listOf(this, other)) { args ->
+            emit(transform(args[0] as T1, args[1] as T2))
+            // Mess up with array
+            args[0] = null
+            args[1] = null
+        }
 }
 
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
index 4065671..ce75e59 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
@@ -11,7 +11,7 @@
 
 class DebounceTest : TestBase() {
     @Test
-    public fun testBasic() = withVirtualTime {
+    fun testBasic() = withVirtualTime {
         expect(1)
         val flow = flow {
             expect(3)
@@ -159,7 +159,7 @@
             expect(2)
             throw TestException()
         }.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
-                expectUnreached()
+            expectUnreached()
         }
         assertFailsWith<TestException>(flow)
         finish(3)
@@ -175,7 +175,6 @@
             expect(2)
             yield()
             throw TestException()
-            it
         }
 
         assertFailsWith<TestException>(flow)
@@ -193,7 +192,6 @@
             expect(2)
             yield()
             throw TestException()
-            it
         }
 
         assertFailsWith<TestException>(flow)
@@ -202,7 +200,7 @@
 
     @ExperimentalTime
     @Test
-    public fun testDurationBasic() = withVirtualTime {
+    fun testDurationBasic() = withVirtualTime {
         expect(1)
         val flow = flow {
             expect(3)
@@ -223,4 +221,102 @@
         assertEquals(listOf("A", "D", "E"), result)
         finish(5)
     }
+
+    @ExperimentalTime
+    @Test
+    fun testDebounceSelectorBasic() = withVirtualTime {
+        expect(1)
+        val flow = flow {
+            expect(3)
+            emit(1)
+            delay(90)
+            emit(2)
+            delay(90)
+            emit(3)
+            delay(1010)
+            emit(4)
+            delay(1010)
+            emit(5)
+            expect(4)
+        }
+
+        expect(2)
+        val result = flow.debounce {
+            if (it == 1) {
+                0
+            } else {
+                1000
+            }
+        }.toList()
+
+        assertEquals(listOf(1, 3, 4, 5), result)
+        finish(5)
+    }
+
+    @Test
+    fun testZeroDebounceTime() = withVirtualTime {
+        expect(1)
+        val flow = flow {
+            expect(3)
+            emit("A")
+            emit("B")
+            emit("C")
+            expect(4)
+        }
+
+        expect(2)
+        val result = flow.debounce(0).toList()
+
+        assertEquals(listOf("A", "B", "C"), result)
+        finish(5)
+    }
+
+    @ExperimentalTime
+    @Test
+    fun testZeroDebounceTimeSelector() = withVirtualTime {
+        expect(1)
+        val flow = flow {
+            expect(3)
+            emit("A")
+            emit("B")
+            expect(4)
+        }
+
+        expect(2)
+        val result = flow.debounce { 0 }.toList()
+
+        assertEquals(listOf("A", "B"), result)
+        finish(5)
+    }
+
+    @ExperimentalTime
+    @Test
+    fun testDebounceDurationSelectorBasic() = withVirtualTime {
+        expect(1)
+        val flow = flow {
+            expect(3)
+            emit("A")
+            delay(1500.milliseconds)
+            emit("B")
+            delay(500.milliseconds)
+            emit("C")
+            delay(250.milliseconds)
+            emit("D")
+            delay(2000.milliseconds)
+            emit("E")
+            expect(4)
+        }
+
+        expect(2)
+        val result = flow.debounce {
+            if (it == "C") {
+                0.milliseconds
+            } else {
+                1000.milliseconds
+            }
+        }.toList()
+
+        assertEquals(listOf("A", "C", "D", "E"), result)
+        finish(5)
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
index 5f2b5a7..02dbfc4 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
@@ -23,7 +23,7 @@
     fun testUnevenZip() = runTest {
         val f1 = flowOf("a", "b", "c", "d", "e")
         val f2 = flowOf(1, 2, 3)
-        assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2, { i, j -> i + j }).toList())
+        assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2) { i, j -> i + j }.toList())
         assertEquals(listOf("a1", "b2", "c3"), f2.zip(f1) { i, j -> j + i }.toList())
     }
 
@@ -67,12 +67,33 @@
         val f1 = flow<String> {
             emit("1")
             emit("2")
-            expectUnreached() // the above emit will get cancelled because f2 ends
+        }
+
+        val f2 = flow<String> {
+            emit("a")
+            emit("b")
+            expectUnreached()
+        }
+        assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList())
+        finish(1)
+    }
+
+    @Test
+    fun testCancelWhenFlowIsDone2() = runTest {
+        val f1 = flow<String> {
+            emit("1")
+            emit("2")
+            try {
+                emit("3")
+                expectUnreached()
+            } finally {
+                expect(1)
+            }
         }
 
         val f2 = flowOf("a", "b")
         assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList())
-        finish(1)
+        finish(2)
     }
 
     @Test
@@ -85,7 +106,12 @@
             }
         }
 
-        val f2 = flowOf("a", "b")
+        val f2 = flow<String> {
+            emit("a")
+            emit("b")
+            yield()
+        }
+
         assertEquals(listOf("a1", "b2"), f2.zip(f1) { s1, s2 -> s1 + s2 }.toList())
         finish(2)
     }
@@ -95,19 +121,19 @@
         val f1 = flow {
             emit("a")
             assertEquals("first", NamedDispatchers.name())
-            expect(1)
+            expect(3)
         }.flowOn(NamedDispatchers("first")).onEach {
             assertEquals("with", NamedDispatchers.name())
-            expect(2)
+            expect(4)
         }.flowOn(NamedDispatchers("with"))
 
         val f2 = flow {
             emit(1)
             assertEquals("second", NamedDispatchers.name())
-            expect(3)
+            expect(1)
         }.flowOn(NamedDispatchers("second")).onEach {
             assertEquals("nested", NamedDispatchers.name())
-            expect(4)
+            expect(2)
         }.flowOn(NamedDispatchers("nested"))
 
         val value = withContext(NamedDispatchers("main")) {
@@ -127,14 +153,14 @@
         val f1 = flow {
             emit("a")
             hang {
-                expect(2)
+                expect(3)
             }
         }.flowOn(NamedDispatchers("first"))
 
         val f2 = flow {
             emit(1)
             hang {
-                expect(3)
+                expect(2)
             }
         }.flowOn(NamedDispatchers("second"))
 
@@ -174,19 +200,18 @@
         val f1 = flow {
             expect(1)
             emit(1)
-            yield()
-            expect(4)
+            expect(5)
             throw CancellationException("")
         }
 
         val f2 = flow {
             expect(2)
             emit(1)
-            expect(5)
+            expect(3)
             hang { expect(6) }
         }
 
-        val flow = f1.zip(f2, { _, _ -> 1 }).onEach { expect(3) }
+        val flow = f1.zip(f2) { _, _ -> 1 }.onEach { expect(4) }
         assertFailsWith<CancellationException>(flow)
         finish(7)
     }
@@ -196,24 +221,37 @@
         val f1 = flow {
             expect(1)
             emit(1)
-            yield()
-            expect(4)
-            hang { expect(6) }
+            expectUnreached() // Will throw CE
         }
 
         val f2 = flow {
             expect(2)
             emit(1)
-            expect(5)
-            hang { expect(7) }
+            expect(3)
+            hang { expect(5) }
         }
 
         val flow = f1.zip(f2, { _, _ -> 1 }).onEach {
-            expect(3)
+            expect(4)
             yield()
             throw CancellationException("")
         }
         assertFailsWith<CancellationException>(flow)
-        finish(8)
+        finish(6)
+    }
+
+    @Test
+    fun testCancellationOfCollector() = runTest {
+        val f1 = flow {
+            emit("1")
+            awaitCancellation()
+        }
+
+        val f2 = flow {
+            emit("2")
+            yield()
+        }
+
+        f1.zip(f2) { a, b -> a + b }.collect { }
     }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
index f716389..c3eb2da 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
@@ -176,6 +176,31 @@
             collect(b, 15)
         }
 
+    @Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320
+    fun testResumeFastSubscriberOnResumedEmitter() =
+        testSharedFlow<Int>(MutableSharedFlow(1)) {
+            // create two subscribers and start collecting
+            val s1 = subscribe("s1"); resumeCollecting(s1)
+            val s2 = subscribe("s2"); resumeCollecting(s2)
+            // now emit 0, make sure it is collected
+            emitRightNow(0); expectReplayOf(0)
+            awaitCollected(s1, 0)
+            awaitCollected(s2, 0)
+            // now emit 1, and only first subscriber continues and collects it
+            emitRightNow(1); expectReplayOf(1)
+            collect(s1, 1)
+            // now emit 2, it suspend (s2 is blocking it)
+            val e2 = emitSuspends(2)
+            resumeCollecting(s1) // resume, but does not collect (e2 is still queued)
+            collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1
+            awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!!
+            emitResumes(e2); expectReplayOf(2)
+            // now emit 3, it suspends (s2 blocks it)
+            val e3 = emitSuspends(3)
+            collect(s2, 2)
+            emitResumes(e3); expectReplayOf(3)
+        }
+
     private fun <T> testSharedFlow(
         sharedFlow: MutableSharedFlow<T>,
         scenario: suspend ScenarioDsl<T>.() -> Unit
@@ -305,14 +330,23 @@
             return TestJob(job, name)
         }
 
+        // collect ~== resumeCollecting + awaitCollected (for each value)
         suspend fun collect(job: TestJob, vararg a: T) {
             for (value in a) {
                 checkReplay() // should not have changed
-                addAction(ResumeCollecting(job))
-                awaitAction(Collected(job, value))
+                resumeCollecting(job)
+                awaitCollected(job, value)
             }
         }
 
+        suspend fun resumeCollecting(job: TestJob) {
+            addAction(ResumeCollecting(job))
+        }
+
+        suspend fun awaitCollected(job: TestJob, value: T) {
+            awaitAction(Collected(job, value))
+        }
+
         fun stop() {
             log("--- stop")
             scope.cancel()
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt
index 2a613af..d0e76c4 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt
@@ -30,21 +30,21 @@
 
     @Test
     fun testUpstreamCompletedNoInitialValue() =
-        testUpstreamCompletedOrFailedReset(failed = false, iv = false)
+        testUpstreamCompletedOrFailedReset(failed = false, withInitialValue = false)
 
     @Test
     fun testUpstreamFailedNoInitialValue() =
-        testUpstreamCompletedOrFailedReset(failed = true, iv = false)
+        testUpstreamCompletedOrFailedReset(failed = true, withInitialValue = false)
 
     @Test
     fun testUpstreamCompletedWithInitialValue() =
-        testUpstreamCompletedOrFailedReset(failed = false, iv = true)
+        testUpstreamCompletedOrFailedReset(failed = false, withInitialValue = true)
 
     @Test
     fun testUpstreamFailedWithInitialValue() =
-        testUpstreamCompletedOrFailedReset(failed = true, iv = true)
+        testUpstreamCompletedOrFailedReset(failed = true, withInitialValue = true)
 
-    private fun testUpstreamCompletedOrFailedReset(failed: Boolean, iv: Boolean) = runTest {
+    private fun testUpstreamCompletedOrFailedReset(failed: Boolean, withInitialValue: Boolean) = runTest {
         val emitted = Job()
         val terminate = Job()
         val sharingJob = CompletableDeferred<Unit>()
@@ -56,7 +56,7 @@
         }
         val scope = this + sharingJob
         val shared: StateFlow<String?>
-        if (iv) {
+        if (withInitialValue) {
             shared = upstream.stateIn(scope, SharingStarted.Eagerly, null)
             assertEquals(null, shared.value)
         } else {
@@ -75,4 +75,15 @@
             assertNull(sharingJob.getCompletionExceptionOrNull())
         }
     }
-}
\ No newline at end of file
+
+    @Test
+    fun testUpstreamFailedImmediatelyWithInitialValue() = runTest {
+        val ceh = CoroutineExceptionHandler { _, _ -> expect(2) }
+        val flow = flow<Int> {
+            expect(1)
+            throw TestException()
+        }
+        assertFailsWith<TestException> { flow.stateIn(CoroutineScope(currentCoroutineContext() + Job() + ceh)) }
+        finish(3)
+    }
+}
diff --git a/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt
new file mode 100644
index 0000000..fffd76c
--- /dev/null
+++ b/kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt
@@ -0,0 +1,15 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+internal actual class LocalAtomicInt actual constructor(private var value: Int) {
+    actual fun set(value: Int) {
+        this.value = value
+    }
+
+    actual fun get(): Int = value
+
+    actual fun decrementAndGet(): Int = --value
+}
diff --git a/kotlinx-coroutines-core/jvm/src/Interruptible.kt b/kotlinx-coroutines-core/jvm/src/Interruptible.kt
index f50e093..070aa62 100644
--- a/kotlinx-coroutines-core/jvm/src/Interruptible.kt
+++ b/kotlinx-coroutines-core/jvm/src/Interruptible.kt
@@ -40,8 +40,7 @@
 
 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
     try {
-        val job = coroutineContext[Job]!! // withContext always creates a job
-        val threadState = ThreadState(job)
+        val threadState = ThreadState(coroutineContext.job)
         threadState.setup()
         try {
             return block()
diff --git a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt
index df10501..5a1a1ed 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt
@@ -21,9 +21,9 @@
 
     public var isInstalledStatically = false
 
-    private val enableCreationStackTraces =
+    private val enableCreationStackTraces = runCatching {
         System.getProperty("kotlinx.coroutines.debug.enable.creation.stack.trace")?.toBoolean()
-            ?: DebugProbesImpl.enableCreationStackTraces
+    }.getOrNull() ?: DebugProbesImpl.enableCreationStackTraces
 
     @JvmStatic
     public fun premain(args: String?, instrumentation: Instrumentation) {
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
index 9dd6c5a..83bc02c 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
@@ -111,7 +111,7 @@
         check(isInstalled) { "Debug probes are not installed" }
         val jobToStack = capturedCoroutines
             .filter { it.delegate.context[Job] != null }
-            .associateBy({ it.delegate.context[Job]!! }, { it.info })
+            .associateBy({ it.delegate.context.job }, { it.info })
         return buildString {
             job.build(jobToStack, this, "")
         }
@@ -156,7 +156,11 @@
                 // Stable ordering of coroutines by their sequence number
                 .sortedBy { it.info.sequenceNumber }
                 // Leave in the dump only the coroutines that were not collected while we were dumping them
-                .mapNotNull { owner -> owner.info.context?.let { context -> create(owner, context) } }
+                .mapNotNull { owner ->
+                    // Fuse map and filter into one operation to save an inline
+                    if (owner.isFinished()) null
+                    else owner.info.context?.let { context -> create(owner, context) }
+                }
         }
 
     /*
@@ -183,10 +187,27 @@
         dumpCoroutinesSynchronized(out)
     }
 
+    /*
+     * Filters out coroutines that do not call probeCoroutineCompleted,
+     * are completed, but not yet garbage collected.
+     *
+     * Typically, we intercept completion of the coroutine so it invokes "probeCoroutineCompleted",
+     * but it's not the case for lazy coroutines that get cancelled before start.
+     */
+    private fun CoroutineOwner<*>.isFinished(): Boolean {
+        // Guarded by lock
+        val job = info.context?.get(Job) ?: return false
+        if (!job.isCompleted) return false
+        capturedCoroutinesMap.remove(this) // Clean it up by the way
+        return true
+    }
+
     private fun dumpCoroutinesSynchronized(out: PrintStream): Unit = coroutineStateLock.write {
         check(isInstalled) { "Debug probes are not installed" }
         out.print("Coroutines dump ${dateFormat.format(System.currentTimeMillis())}")
         capturedCoroutines
+            .asSequence()
+            .filter { !it.isFinished() }
             .sortedBy { it.info.sequenceNumber }
             .forEach { owner ->
                 val info = owner.info
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
index a8e04f0..ab42b63 100644
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
@@ -21,7 +21,11 @@
 internal actual class SafeCollector<T> actual constructor(
     @JvmField internal actual val collector: FlowCollector<T>,
     @JvmField internal actual val collectContext: CoroutineContext
-) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
+) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
+
+    override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
+
+    override fun getStackTraceElement(): StackTraceElement? = null
 
     @JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
     internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
diff --git a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt
new file mode 100644
index 0000000..f508749
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt
@@ -0,0 +1,8 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+@Suppress("ACTUAL_WITHOUT_EXPECT")
+internal actual typealias LocalAtomicInt = java.util.concurrent.atomic.AtomicInteger
diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt
index a6f4dd6..7f6d6b6 100644
--- a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt
@@ -5,6 +5,7 @@
 package kotlinx.coroutines
 
 import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.*
 import kotlinx.coroutines.scheduling.*
 import org.junit.*
 import org.junit.Test
@@ -89,6 +90,7 @@
 
     @Test
     fun testRejectOnDelay() = runTest {
+        if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs
         expect(1)
         executor.acceptTasks = 1 // accept one task
         assertFailsWith<CancellationException> {
@@ -110,6 +112,7 @@
 
     @Test
     fun testRejectWithTimeout() = runTest {
+        if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs
         expect(1)
         executor.acceptTasks = 1 // accept one task
         assertFailsWith<CancellationException> {
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt
deleted file mode 100644
index cb19b36..0000000
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt
+++ /dev/null
@@ -1,908 +0,0 @@
-/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-@file:Suppress("DEPRECATION")
-
-package kotlinx.coroutines.channels
-
-import kotlinx.coroutines.*
-import kotlin.coroutines.*
-import kotlin.test.*
-
-/**
- * Tests that various operators on channels properly consume (close) their source channels.
- */
-class ChannelsConsumeTest : TestBase() {
-    private val sourceList = (1..10).toList()
-
-    // test source with numbers 1..10
-    private fun CoroutineScope.testSource() = produce(NonCancellable) {
-        for (i in sourceList) {
-            send(i)
-        }
-    }
-
-    @Test
-    fun testConsume() {
-        checkTerminal {
-            consume {
-                assertEquals(1, receive())
-            }
-        }
-    }
-
-    @Test
-    fun testConsumeEach() {
-        checkTerminal {
-            var sum = 0
-            consumeEach { sum += it }
-            assertEquals(55, sum)
-        }
-    }
-
-    @Test
-    fun testConsumeEachIndexed() {
-        checkTerminal {
-            var sum = 0
-            consumeEachIndexed { (index, i) -> sum += index * i }
-            assertEquals(330, sum)
-        }
-    }
-
-    @Test
-    fun testElementAt() {
-        checkTerminal {
-            assertEquals(2, elementAt(1))
-        }
-        checkTerminal(expected = { it is IndexOutOfBoundsException }) {
-            elementAt(10)
-        }
-    }
-
-    @Test
-    fun testElementAtOrElse() {
-        checkTerminal {
-            assertEquals(3, elementAtOrElse(2) { error("Cannot happen") })
-        }
-        checkTerminal {
-            assertEquals(-23, elementAtOrElse(10) { -23 })
-        }
-    }
-
-    @Test
-    fun testElementOrNull() {
-        checkTerminal {
-            assertEquals(4, elementAtOrNull(3))
-        }
-        checkTerminal {
-            assertNull(elementAtOrNull(10))
-        }
-    }
-
-    @Test
-    fun testFind() {
-        checkTerminal {
-            assertEquals(3, find { it % 3 == 0 })
-        }
-    }
-
-    @Test
-    fun testFindLast() {
-        checkTerminal {
-            assertEquals(9, findLast { it % 3 == 0 })
-        }
-    }
-
-    @Test
-    fun testFirst() {
-        checkTerminal {
-            assertEquals(1, first())
-        }
-    }
-
-    @Test
-    fun testFirstPredicate() {
-        checkTerminal {
-            assertEquals(3, first { it % 3 == 0 })
-        }
-        checkTerminal(expected = { it is NoSuchElementException }) {
-            first { it > 10 }
-        }
-    }
-
-    @Test
-    fun testFirstOrNull() {
-        checkTerminal {
-            assertEquals(1, firstOrNull())
-        }
-    }
-
-    @Test
-    fun testFirstOrNullPredicate() {
-        checkTerminal {
-            assertEquals(3, firstOrNull { it % 3 == 0 })
-        }
-        checkTerminal {
-            assertNull(firstOrNull { it > 10 })
-        }
-    }
-
-    @Test
-    fun testIndexOf() {
-        checkTerminal {
-            assertEquals(2, indexOf(3))
-        }
-        checkTerminal {
-            assertEquals(-1, indexOf(11))
-        }
-    }
-
-    @Test
-    fun testIndexOfFirst() {
-        checkTerminal {
-            assertEquals(2, indexOfFirst { it % 3 == 0 })
-        }
-        checkTerminal {
-            assertEquals(-1, indexOfFirst { it > 10 })
-        }
-    }
-
-    @Test
-    fun testIndexOfLast() {
-        checkTerminal {
-            assertEquals(8, indexOfLast { it % 3 == 0 })
-        }
-        checkTerminal {
-            assertEquals(-1, indexOfLast { it > 10 })
-        }
-    }
-
-    @Test
-    fun testLast() {
-        checkTerminal {
-            assertEquals(10, last())
-        }
-    }
-
-    @Test
-    fun testLastPredicate() {
-        checkTerminal {
-            assertEquals(9, last { it % 3 == 0 })
-        }
-        checkTerminal(expected = { it is NoSuchElementException }) {
-            last { it > 10 }
-        }
-    }
-
-    @Test
-    fun testLastIndexOf() {
-        checkTerminal {
-            assertEquals(8, lastIndexOf(9))
-        }
-    }
-
-    @Test
-    fun testLastOrNull() {
-        checkTerminal {
-            assertEquals(10, lastOrNull())
-        }
-    }
-
-    @Test
-    fun testLastOrNullPredicate() {
-        checkTerminal {
-            assertEquals(9, lastOrNull { it % 3 == 0 })
-        }
-        checkTerminal {
-            assertNull(lastOrNull { it > 10 })
-        }
-    }
-
-    @Test
-    fun testSingle() {
-        checkTerminal(expected = { it is IllegalArgumentException }) {
-            single()
-        }
-    }
-
-    @Test
-    fun testSinglePredicate() {
-        checkTerminal {
-            assertEquals(7, single { it % 7 == 0 })
-        }
-        checkTerminal(expected = { it is IllegalArgumentException }) {
-            single { it % 3 == 0 }
-        }
-        checkTerminal(expected = { it is NoSuchElementException }) {
-            single { it > 10 }
-        }
-    }
-
-    @Test
-    fun testSingleOrNull() {
-        checkTerminal {
-            assertNull(singleOrNull())
-        }
-    }
-
-    @Test
-    fun testSingleOrNullPredicate() {
-        checkTerminal {
-            assertEquals(7, singleOrNull { it % 7 == 0 })
-        }
-        checkTerminal {
-            assertNull(singleOrNull { it % 3 == 0 })
-        }
-        checkTerminal {
-            assertNull(singleOrNull { it > 10 })
-        }
-    }
-
-    @Test
-    fun testDrop() {
-        checkTransform(sourceList.drop(3)) {
-            drop(3)
-        }
-    }
-
-    @Test
-    fun testDropWhile() {
-        checkTransform(sourceList.dropWhile { it < 4}) {
-            dropWhile { it < 4 }
-        }
-    }
-
-    @Test
-    fun testFilter() {
-        checkTransform(sourceList.filter { it % 2 == 0 }) {
-            filter { it % 2 == 0 }
-        }
-    }
-
-    @Test
-    fun testFilterIndexed() {
-        checkTransform(sourceList.filterIndexed { index, _ -> index % 2 == 0 }) {
-            filterIndexed { index, _ -> index % 2 == 0 }
-        }
-    }
-
-    @Test
-    fun testFilterIndexedToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            filterIndexedTo(list) { index, _ -> index % 2 == 0 }
-            assertEquals(listOf(1, 3, 5, 7, 9), list)
-        }
-    }
-
-    @Test
-    fun testFilterIndexedToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            filterIndexedTo(channel) { index, _ -> index % 2 == 0 }
-            channel.close()
-            assertEquals(listOf(1, 3, 5, 7, 9), result.await())
-        }
-    }
-
-    @Test
-    fun testFilterNot() {
-        checkTransform(sourceList.filterNot { it % 2 == 0 }) {
-            filterNot { it % 2 == 0 }
-        }
-    }
-
-    @Test
-    fun testFilterNotNullToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            filterNotNullTo(list)
-            assertEquals((1..10).toList(), list)
-        }
-    }
-
-    @Test
-    fun testFilterNotNullToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            filterNotNullTo(channel)
-            channel.close()
-            assertEquals((1..10).toList(), result.await())
-        }
-    }
-
-    @Test
-    fun testFilterNotToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            filterNotTo(list) { it % 2 == 0 }
-            assertEquals(listOf(1, 3, 5, 7, 9), list)
-        }
-    }
-
-    @Test
-    fun testFilterNotToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            filterNotTo(channel) { it % 2 == 0 }
-            channel.close()
-            assertEquals(listOf(1, 3, 5, 7, 9), result.await())
-        }
-    }
-
-    @Test
-    fun testFilterToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            filterTo(list) { it % 2 == 0 }
-            assertEquals(listOf(2, 4, 6, 8, 10), list)
-        }
-    }
-
-    @Test
-    fun testFilterToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            filterTo(channel) { it % 2 == 0 }
-            channel.close()
-            assertEquals(listOf(2, 4, 6, 8, 10), result.await())
-        }
-    }
-
-    @Test
-    fun testTake() {
-        checkTransform(sourceList.take(3)) {
-            take(3)
-        }
-    }
-
-    @Test
-    fun testTakeWhile() {
-        checkTransform(sourceList.takeWhile { it < 4 }) {
-            takeWhile { it < 4 }
-        }
-    }
-
-    @Test
-    fun testAssociate() {
-        checkTerminal {
-            assertEquals(sourceList.associate { it to it.toString() }, associate { it to it.toString() })
-        }
-    }
-
-    @Test
-    fun testAssociateBy() {
-        checkTerminal {
-            assertEquals(sourceList.associateBy { it.toString() }, associateBy { it.toString() })
-        }
-    }
-
-    @Test
-    fun testAssociateByTwo() {
-        checkTerminal {
-            assertEquals(sourceList.associateBy({ it.toString() }, { it + 1}), associateBy({ it.toString() }, { it + 1}))
-        }
-    }
-
-    @Test
-    fun testAssociateByToMap() {
-        checkTerminal {
-            val map = mutableMapOf<String, Int>()
-            associateByTo(map) { it.toString() }
-            assertEquals(sourceList.associateBy { it.toString() }, map)
-        }
-    }
-
-    @Test
-    fun testAssociateByTwoToMap() {
-        checkTerminal {
-            val map = mutableMapOf<String, Int>()
-            associateByTo(map, { it.toString() }, { it + 1})
-            assertEquals(sourceList.associateBy({ it.toString() }, { it + 1}), map)
-        }
-    }
-
-    @Test
-    fun testAssociateToMap() {
-        checkTerminal {
-            val map = mutableMapOf<Int, String>()
-            associateTo(map) { it to it.toString() }
-            assertEquals(sourceList.associate { it to it.toString() }, map)
-        }
-    }
-
-    @Test
-    fun testToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            toChannel(channel)
-            channel.close()
-            assertEquals(sourceList, result.await())
-        }
-    }
-
-    @Test
-    fun testToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            toCollection(list)
-            assertEquals(sourceList, list)
-        }
-    }
-
-    @Test
-    fun testToList() {
-        checkTerminal {
-            val list = toList()
-            assertEquals(sourceList, list)
-        }
-    }
-
-    @Test
-    fun testToMap() {
-        checkTerminal {
-            val map = map { it to it.toString() }.toMap()
-            assertEquals(sourceList.map { it to it.toString() }.toMap(), map)
-        }
-    }
-
-    @Test
-    fun testToMapWithMap() {
-        checkTerminal {
-            val map = mutableMapOf<Int, String>()
-            map { it to it.toString() }.toMap(map)
-            assertEquals(sourceList.map { it to it.toString() }.toMap(), map)
-        }
-    }
-
-    @Test
-    fun testToMutableList() {
-        checkTerminal {
-            val list = toMutableList()
-            assertEquals(sourceList, list)
-        }
-    }
-
-    @Test
-    fun testToSet() {
-        checkTerminal {
-            val set = toSet()
-            assertEquals(sourceList.toSet(), set)
-        }
-    }
-
-    @Test
-    fun testFlatMap() {
-        checkTransform(sourceList.flatMap { listOf("A$it", "B$it") }) {
-            flatMap {
-                GlobalScope.produce(coroutineContext) {
-                    send("A$it")
-                    send("B$it")
-                }
-            }
-        }
-    }
-
-    @Test
-    fun testGroupBy() {
-        checkTerminal {
-            val map = groupBy { it % 2 }
-            assertEquals(sourceList.groupBy { it % 2 }, map)
-        }
-    }
-
-    @Test
-    fun testGroupByTwo() {
-        checkTerminal {
-            val map = groupBy({ it % 2 }, { it.toString() })
-            assertEquals(sourceList.groupBy({ it % 2 }, { it.toString() }), map)
-        }
-    }
-
-    @Test
-    fun testGroupByTo() {
-        checkTerminal {
-            val map = mutableMapOf<Int, MutableList<Int>>()
-            groupByTo(map) { it % 2 }
-            assertEquals(sourceList.groupBy { it % 2 }, map)
-        }
-    }
-
-    @Test
-    fun testGroupByToTwo() {
-        checkTerminal {
-            val map = mutableMapOf<Int, MutableList<String>>()
-            groupByTo(map, { it % 2 }, { it.toString() })
-            assertEquals(sourceList.groupBy({ it % 2 }, { it.toString() }), map)
-        }
-    }
-
-    @Test
-    fun testMap() {
-        checkTransform(sourceList.map { it.toString() }) {
-            map { it.toString() }
-        }
-    }
-
-    @Test
-    fun testMapIndexed() {
-        checkTransform(sourceList.mapIndexed { index, v -> "$index$v" }) {
-            mapIndexed { index, v -> "$index$v" }
-        }
-    }
-
-    @Test
-    fun testMapIndexedNotNull() {
-        checkTransform(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }) {
-            mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }
-        }
-    }
-
-    @Test
-    fun testMapIndexedNotNullToCollection() {
-        checkTerminal {
-            val list = mutableListOf<String>()
-            mapIndexedNotNullTo(list) { index, v -> "$index$v".takeIf { v % 2 == 0 } }
-            assertEquals(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }, list)
-        }
-    }
-
-    @Test
-    fun testMapIndexedNotNullToChannel() {
-        checkTerminal {
-            val channel = Channel<String>()
-            val result = GlobalScope.async { channel.toList() }
-            mapIndexedNotNullTo(channel) { index, v -> "$index$v".takeIf { v % 2 == 0 } }
-            channel.close()
-            assertEquals(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }, result.await())
-        }
-    }
-
-    @Test
-    fun testMapIndexedToCollection() {
-        checkTerminal {
-            val list = mutableListOf<String>()
-            mapIndexedTo(list) { index, v -> "$index$v" }
-            assertEquals(sourceList.mapIndexed { index, v -> "$index$v" }, list)
-        }
-    }
-
-    @Test
-    fun testMapIndexedToChannel() {
-        checkTerminal {
-            val channel = Channel<String>()
-            val result = GlobalScope.async { channel.toList() }
-            mapIndexedTo(channel) { index, v -> "$index$v" }
-            channel.close()
-            assertEquals(sourceList.mapIndexed { index, v -> "$index$v" }, result.await())
-        }
-    }
-
-    @Test
-    fun testMapNotNull() {
-        checkTransform(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }) {
-            mapNotNull { (it + 3).takeIf { it % 2 == 0 } }
-        }
-    }
-
-    @Test
-    fun testMapNotNullToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            mapNotNullTo(list) { (it + 3).takeIf { it % 2 == 0 } }
-            assertEquals(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }, list)
-        }
-    }
-
-    @Test
-    fun testMapNotNullToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            mapNotNullTo(channel) { (it + 3).takeIf { it % 2 == 0 } }
-            channel.close()
-            assertEquals(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }, result.await())
-        }
-    }
-
-    @Test
-    fun testMapToCollection() {
-        checkTerminal {
-            val list = mutableListOf<Int>()
-            mapTo(list) { it + 3 }
-            assertEquals(sourceList.map { it + 3 }, list)
-        }
-    }
-
-    @Test
-    fun testMapToChannel() {
-        checkTerminal {
-            val channel = Channel<Int>()
-            val result = GlobalScope.async { channel.toList() }
-            mapTo(channel) { it + 3 }
-            channel.close()
-            assertEquals(sourceList.map { it + 3 }, result.await())
-        }
-    }
-
-    @Test
-    fun testWithIndex() {
-        checkTransform(sourceList.asSequence().withIndex().toList()) {
-            withIndex()
-        }
-    }
-
-    @Test
-    fun testDistinctBy() {
-        checkTransform(sourceList.distinctBy { it / 2 }) {
-            distinctBy { it / 2 }
-        }
-    }
-
-    @Test
-    fun testToMutableSet() {
-        checkTerminal {
-            val set = toMutableSet()
-            assertEquals(sourceList.toSet(), set)
-        }
-    }
-
-    @Test
-    fun testAll() {
-        checkTerminal {
-            val all = all { it < 11 }
-            assertEquals(sourceList.all { it < 11 }, all)
-        }
-    }
-
-    @Test
-    fun testAny() {
-        checkTerminal {
-            val any = any()
-            assertEquals(sourceList.any(), any)
-        }
-    }
-
-    @Test
-    fun testAnyPredicate() {
-        checkTerminal {
-            val any = any { it % 3 == 0 }
-            assertEquals(sourceList.any { it % 3 == 0 }, any)
-        }
-    }
-    
-    @Test
-    fun testCount() {
-        checkTerminal {
-            val c = count()
-            assertEquals(sourceList.count(), c)
-        }
-    }
-
-    @Test
-    fun testCountPredicate() {
-        checkTerminal {
-            val c = count { it % 3 == 0 }
-            assertEquals(sourceList.count { it % 3 == 0 }, c)
-        }
-    }
-
-    @Test
-    fun testFold() {
-        checkTerminal {
-            val c = fold(1) { a, b -> a + b }
-            assertEquals(sourceList.fold(1) { a, b -> a + b }, c)
-        }
-    }
-
-    @Test
-    fun testFoldIndexed() {
-        checkTerminal {
-            val c = foldIndexed(1) { i, a, b -> i * a + b }
-            assertEquals(sourceList.foldIndexed(1) { i, a, b -> i * a + b }, c)
-        }
-    }
-
-    @Test
-    fun testMaxBy() {
-        checkTerminal {
-            val c = maxBy { it % 3 }
-            assertEquals(sourceList.maxBy { it % 3 }, c)
-        }
-    }
-
-    @Test
-    fun testMaxWith() {
-        checkTerminal {
-            val c = maxWith(compareBy { it % 3 })
-            assertEquals(sourceList.maxWith(compareBy { it % 3 }), c)
-        }
-    }
-
-    @Test
-    fun testMinBy() {
-        checkTerminal {
-            val c = maxBy { it % 3 }
-            assertEquals(sourceList.maxBy { it % 3 }, c)
-        }
-    }
-
-    @Test
-    fun testMinWith() {
-        checkTerminal {
-            val c = maxWith(compareBy { it % 3 })
-            assertEquals(sourceList.maxWith(compareBy { it % 3 }), c)
-        }
-    }
-
-    @Test
-    fun testNone() {
-        checkTerminal {
-            val none = none()
-            assertEquals(sourceList.none(), none)
-        }
-    }
-
-    @Test
-    fun testNonePredicate() {
-        checkTerminal {
-            val none = none { it > 10 }
-            assertEquals(sourceList.none { it > 10 }, none)
-        }
-    }
-
-    @Test
-    fun testReduce() {
-        checkTerminal {
-            val c = reduce { a, b -> a + b }
-            assertEquals(sourceList.reduce { a, b -> a + b }, c)
-        }
-    }
-
-    @Test
-    fun testReduceIndexed() {
-        checkTerminal {
-            val c = reduceIndexed { i, a, b -> i * a + b }
-            assertEquals(sourceList.reduceIndexed { i, a, b -> i * a + b }, c)
-        }
-    }
-
-    @Test
-    fun testSubBy() {
-        checkTerminal {
-            val c = sumBy { it }
-            assertEquals(sourceList.sumBy { it }, c)
-        }
-    }
-
-    @Test
-    fun testSubByDouble() {
-        checkTerminal {
-            val c = sumByDouble { it.toDouble() }
-            assertEquals(sourceList.sumByDouble { it.toDouble() }, c)
-        }
-    }
-
-    @Test
-    fun testPartition() {
-        checkTerminal {
-            val pair = partition { it % 2 == 0 }
-            assertEquals(sourceList.partition { it % 2 == 0 }, pair)
-        }
-    }
-
-    @Test
-    fun testZip() {
-        val expect = sourceList.zip(sourceList) { a, b -> a + 2 * b }
-        checkTransform(expect) {
-            with(CoroutineScope(coroutineContext)) {
-                zip(testSource()) { a, b -> a + 2*b }
-            }
-        }
-        checkTransform(expect) {
-            with(CoroutineScope(coroutineContext)) {
-                testSource().zip(this@checkTransform) { a, b -> a + 2*b }
-            }
-        }
-    }
-
-    // ------------------
-    
-    private fun checkTerminal(
-        expected: ((Throwable?) -> Unit)? = null,
-        terminal: suspend ReceiveChannel<Int>.() -> Unit
-    ) {
-        checkTerminalCompletion(expected, terminal)
-        checkTerminalCancellation(expected, terminal)
-    }
-
-    private fun checkTerminalCompletion(
-        expected: ((Throwable?) -> Unit)? = null,
-        terminal: suspend ReceiveChannel<Int>.() -> Unit
-    ) {
-        val src = runBlocking {
-            val src = testSource()
-            try {
-                // terminal operation
-                terminal(src)
-                // source must be cancelled at the end of terminal op
-                if (expected != null) error("Exception was expected")
-            } catch (e: Throwable) {
-                if (expected == null) throw e
-                expected(e)
-            }
-            src
-        }
-        assertTrue(src.isClosedForReceive, "Source must be closed")
-    }
-
-    private fun checkTerminalCancellation(
-        expected: ((Throwable?) -> Unit)? = null,
-        terminal: suspend ReceiveChannel<Int>.() -> Unit
-    ) {
-        val src = runBlocking {
-            val src = testSource()
-            // terminal operation in a separate async context started until the first suspension
-            val d = async(NonCancellable, start = CoroutineStart.UNDISPATCHED) {
-                terminal(src)
-            }
-            // then cancel it
-            d.cancel()
-            // and try to get it's result
-            try {
-                d.await()
-            } catch (e: CancellationException) {
-                // ok -- was cancelled
-            } catch (e: Throwable) {
-                // if threw a different exception -- must be an expected one
-                if (expected == null) throw e
-                expected(e)
-            }
-            src
-        }
-        // source must be cancelled at the end of terminal op even if it was cancelled while in process
-        assertTrue(src.isClosedForReceive, "Source must be closed")
-    }
-
-    private fun <R> checkTransform(
-        expect: List<R>,
-        transform: suspend ReceiveChannel<Int>.() -> ReceiveChannel<R>
-    ) {
-        // check for varying number of received elements from the channel
-        for (nReceive in 0..expect.size) {
-            checkTransform(nReceive, expect, transform)
-        }
-    }
-
-    private fun <R> checkTransform(
-        nReceive: Int,
-        expect: List<R>,
-        transform: suspend ReceiveChannel<Int>.() -> ReceiveChannel<R>
-    ) {
-        val src = runBlocking {
-            val src = testSource()
-            // transform
-            val res = transform(src)
-            // receive nReceive elements from the result
-            repeat(nReceive) { i ->
-                assertEquals(expect[i], res.receive())
-            }
-            if (nReceive < expect.size) {
-                // then cancel
-                res.cancel()
-            } else {
-                // then check that result is closed
-                assertNull(res.receiveOrNull(), "Result has unexpected values")
-            }
-            src
-        }
-        // source must be cancelled when runBlocking processes all the scheduled stuff
-        assertTrue(src.isClosedForReceive, "Source must be closed")
-    }
-}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
index 7613f04..da20f0c 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
@@ -14,7 +14,9 @@
     fun testBlocking() {
         val ch = Channel<Int>()
         val sum = GlobalScope.async {
-            ch.sumBy { it }
+            var sum = 0
+            ch.consumeEach { sum += it }
+            sum
         }
         repeat(10) {
             ch.sendBlocking(it)
diff --git a/kotlinx-coroutines-core/jvm/test/examples/example-delay-02.kt b/kotlinx-coroutines-core/jvm/test/examples/example-delay-02.kt
index 1b6b12f..f74422e 100644
--- a/kotlinx-coroutines-core/jvm/test/examples/example-delay-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/examples/example-delay-02.kt
@@ -11,9 +11,20 @@
 fun main() = runBlocking {
 
 flow {
-    repeat(10) {
-        emit(it)
-        delay(110)
+    emit(1)
+    delay(90)
+    emit(2)
+    delay(90)
+    emit(3)
+    delay(1010)
+    emit(4)
+    delay(1010)
+    emit(5)
+}.debounce {
+    if (it == 1) {
+        0L
+    } else {
+        1000L
     }
-}.sample(200)
+}
 .toList().joinToString().let { println(it) } }
diff --git a/kotlinx-coroutines-core/jvm/test/examples/example-delay-03.kt b/kotlinx-coroutines-core/jvm/test/examples/example-delay-03.kt
new file mode 100644
index 0000000..edaea74
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/examples/example-delay-03.kt
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from Delay.kt by Knit tool. Do not edit.
+package kotlinx.coroutines.examples.exampleDelay03
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+
+flow {
+    repeat(10) {
+        emit(it)
+        delay(110)
+    }
+}.sample(200)
+.toList().joinToString().let { println(it) } }
diff --git a/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-02.kt b/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-02.kt
index e43dfd1..10ba88a 100644
--- a/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-02.kt
@@ -13,9 +13,20 @@
 fun main() = runBlocking {
 
 flow {
-    repeat(10) {
-        emit(it)
-        delay(110.milliseconds)
+    emit(1)
+    delay(90.milliseconds)
+    emit(2)
+    delay(90.milliseconds)
+    emit(3)
+    delay(1010.milliseconds)
+    emit(4)
+    delay(1010.milliseconds)
+    emit(5)
+}.debounce {
+    if (it == 1) {
+        0.milliseconds
+    } else {
+        1000.milliseconds
     }
-}.sample(200.milliseconds)
+}
 .toList().joinToString().let { println(it) } }
diff --git a/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-03.kt b/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-03.kt
new file mode 100644
index 0000000..5fa980a
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/examples/example-delay-duration-03.kt
@@ -0,0 +1,21 @@
+@file:OptIn(ExperimentalTime::class)
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from Delay.kt by Knit tool. Do not edit.
+package kotlinx.coroutines.examples.exampleDelayDuration03
+
+import kotlin.time.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+
+flow {
+    repeat(10) {
+        emit(it)
+        delay(110.milliseconds)
+    }
+}.sample(200.milliseconds)
+.toList().joinToString().let { println(it) } }
diff --git a/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt b/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt
index 226d31c..99e72eb 100644
--- a/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt
@@ -17,6 +17,13 @@
     }
 
     @Test
+    fun testExampleDelay02() {
+        test("ExampleDelay02") { kotlinx.coroutines.examples.exampleDelay02.main() }.verifyLines(
+            "1, 3, 4, 5"
+        )
+    }
+
+    @Test
     fun testExampleDelayDuration01() {
         test("ExampleDelayDuration01") { kotlinx.coroutines.examples.exampleDelayDuration01.main() }.verifyLines(
             "3, 4, 5"
@@ -24,15 +31,22 @@
     }
 
     @Test
-    fun testExampleDelay02() {
-        test("ExampleDelay02") { kotlinx.coroutines.examples.exampleDelay02.main() }.verifyLines(
+    fun testExampleDelayDuration02() {
+        test("ExampleDelayDuration02") { kotlinx.coroutines.examples.exampleDelayDuration02.main() }.verifyLines(
+            "1, 3, 4, 5"
+        )
+    }
+
+    @Test
+    fun testExampleDelay03() {
+        test("ExampleDelay03") { kotlinx.coroutines.examples.exampleDelay03.main() }.verifyLines(
             "1, 3, 5, 7, 9"
         )
     }
 
     @Test
-    fun testExampleDelayDuration02() {
-        test("ExampleDelayDuration02") { kotlinx.coroutines.examples.exampleDelayDuration02.main() }.verifyLines(
+    fun testExampleDelayDuration03() {
+        test("ExampleDelayDuration03") { kotlinx.coroutines.examples.exampleDelayDuration03.main() }.verifyLines(
             "1, 3, 5, 7, 9"
         )
     }
diff --git a/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt
new file mode 100644
index 0000000..a6268b5
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class OnCompletionInterceptedReleaseTest : TestBase() {
+    @Test
+    fun testLeak() = runTest {
+        expect(1)
+        var cont: Continuation<Unit>? = null
+        val interceptor = CountingInterceptor()
+        val job = launch(interceptor, start = CoroutineStart.UNDISPATCHED) {
+            emptyFlow<Int>()
+                .onCompletion { emit(1) }
+                .collect { value ->
+                    expect(2)
+                    assertEquals(1, value)
+                    suspendCoroutine { cont = it }
+                }
+        }
+        cont!!.resume(Unit)
+        assertTrue(job.isCompleted)
+        assertEquals(interceptor.intercepted, interceptor.released)
+        finish(3)
+    }
+
+    class CountingInterceptor : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
+        var intercepted = 0
+        var released = 0
+        override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
+            intercepted++
+            return Continuation(continuation.context) { continuation.resumeWith(it) }
+        }
+
+        override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
+            released++
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt
new file mode 100644
index 0000000..398cb63
--- /dev/null
+++ b/kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+import kotlinx.atomicfu.*
+
+internal actual class LocalAtomicInt actual constructor(value: Int) {
+
+    private val iRef = atomic(value)
+
+    actual fun set(value: Int) {
+        iRef.value = value
+    }
+
+    actual fun get(): Int = iRef.value
+
+    actual fun decrementAndGet(): Int = iRef.decrementAndGet()
+}
diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md
index 5518e00..0c01400 100644
--- a/kotlinx-coroutines-debug/README.md
+++ b/kotlinx-coroutines-debug/README.md
@@ -23,7 +23,7 @@
 Add `kotlinx-coroutines-debug` to your project test dependencies:
 ```
 dependencies {
-    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.4.0-M1'
+    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.4.0'
 }
 ```
 
@@ -61,7 +61,7 @@
 ### Using as JVM agent
 
 Debug module can also be used as a standalone JVM agent to enable debug probes on the application startup.
-You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.0-M1.jar`.
+You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.0.jar`.
 Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines.
 When used as Java agent, `"kotlinx.coroutines.debug.enable.creation.stack.trace"` system property can be used to control 
 [DebugProbes.enableCreationStackTraces] along with agent startup.
diff --git a/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt b/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
index f89d2be..091e8eb 100644
--- a/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
+++ b/kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
@@ -1,16 +1,168 @@
 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
 package kotlinx.coroutines.debug
 
-import reactor.blockhound.BlockHound
 import kotlinx.coroutines.scheduling.*
+import reactor.blockhound.*
 import reactor.blockhound.integration.*
 
 @Suppress("UNUSED")
-public class CoroutinesBlockHoundIntegration: BlockHoundIntegration {
+public class CoroutinesBlockHoundIntegration : BlockHoundIntegration {
 
-    override fun applyTo(builder: BlockHound.Builder) {
-        builder.addDynamicThreadPredicate { isSchedulerWorker(it) }
-        builder.nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
+    override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) {
+        allowBlockingCallsInPrimitiveImplementations()
+        allowBlockingWhenEnqueuingTasks()
+        allowServiceLoaderInvocationsOnInit()
+        allowBlockingCallsInReflectionImpl()
+        /* The predicates that define that BlockHound should only report blocking calls from threads that are part of
+        the coroutine thread pool and currently execute a CPU-bound coroutine computation. */
+        addDynamicThreadPredicate { isSchedulerWorker(it) }
+        nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
+    }
+
+    /**
+     * Allows blocking calls in various coroutine structures, such as flows and channels.
+     *
+     * They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so
+     * locking in such places doesn't affect the program liveness.
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() {
+        allowBlockingCallsInJobSupport()
+        allowBlockingCallsInThreadSafeHeap()
+        allowBlockingCallsInFlow()
+        allowBlockingCallsInChannels()
+    }
+
+    /**
+     * Allows blocking inside [kotlinx.coroutines.JobSupport].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInJobSupport() {
+        for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling",
+            "tryMakeCompleting"))
+        {
+            allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method)
+        }
+    }
+
+    /**
+     * Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() {
+        for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) {
+            allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method)
+        }
+        // [addLastIf] is only used in [EventLoop.common]. Users of [removeFirstIf]:
+        allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineDispatcher", "doActionsUntil")
+        allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineContext", "triggerActions")
+    }
+
+    private fun BlockHound.Builder.allowBlockingCallsInFlow() {
+        allowBlockingCallsInsideStateFlow()
+        allowBlockingCallsInsideSharedFlow()
+    }
+
+    /**
+     * Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() {
+        allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState")
+    }
+
+    /**
+     * Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() {
+        for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter",
+            "tryTakeValue", "resetReplayCache"))
+        {
+            allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method)
+        }
+        for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) {
+            allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method)
+        }
+    }
+
+    private fun BlockHound.Builder.allowBlockingCallsInChannels() {
+        allowBlockingCallsInArrayChannel()
+        allowBlockingCallsInBroadcastChannel()
+        allowBlockingCallsInConflatedChannel()
+    }
+
+    /**
+     * Allows blocking inside [kotlinx.coroutines.channels.ArrayChannel].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInArrayChannel() {
+        for (method in listOf(
+            "pollInternal", "isEmpty", "isFull", "isClosedForReceive", "offerInternal", "offerSelectInternal",
+            "enqueueSend", "pollInternal", "pollSelectInternal", "enqueueReceiveInternal", "onCancelIdempotent"))
+        {
+            allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayChannel", method)
+        }
+    }
+
+    /**
+     * Allows blocking inside [kotlinx.coroutines.channels.ArrayBroadcastChannel].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannel() {
+        for (method in listOf("offerInternal", "offerSelectInternal", "updateHead")) {
+            allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel", method)
+        }
+        for (method in listOf("checkOffer", "pollInternal", "pollSelectInternal")) {
+            allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel\$Subscriber", method)
+        }
+    }
+
+    /**
+     * Allows blocking inside [kotlinx.coroutines.channels.ConflatedChannel].
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInConflatedChannel() {
+        for (method in listOf("offerInternal", "offerSelectInternal", "pollInternal", "pollSelectInternal",
+            "onCancelIdempotent"))
+        {
+            allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedChannel", method)
+        }
+    }
+
+    /**
+     * Allows blocking when enqueuing tasks into a thread pool.
+     *
+     * Without this, the following code breaks:
+     * ```
+     * withContext(Dispatchers.Default) {
+     *     withContext(newSingleThreadContext("singleThreadedContext")) {
+     *     }
+     * }
+     * ```
+     */
+    private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() {
+        /* This method may block as part of its implementation, but is probably safe. */
+        allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute")
+    }
+
+    /**
+     * Allows instances of [java.util.ServiceLoader] being called.
+     *
+     * Each instance is listed separately; another approach could be to generally allow the operations performed by
+     * service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a
+     * large API surface, with some methods being hidden as implementation details (in particular, the implementation of
+     * its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation
+     * would be brittle, so here we only provide clearance rules for some specific instances.
+     */
+    private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() {
+        allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>")
+        allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>")
+        // not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking
+        allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>")
+    }
+
+    /**
+     * Allows some blocking calls from the reflection API.
+     *
+     * The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some
+     * simple examples work without problems.
+     */
+    private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() {
+        allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage")
     }
 
 }
diff --git a/kotlinx-coroutines-debug/test/BlockHoundTest.kt b/kotlinx-coroutines-debug/test/BlockHoundTest.kt
index ff5c95c..571daca 100644
--- a/kotlinx-coroutines-debug/test/BlockHoundTest.kt
+++ b/kotlinx-coroutines-debug/test/BlockHoundTest.kt
@@ -1,5 +1,6 @@
 package kotlinx.coroutines.debug
 import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
 import org.junit.*
 import reactor.blockhound.*
 
@@ -52,6 +53,27 @@
         }
     }
 
+    @Test
+    fun testChannelsNotBeingConsideredBlocking() = runTest {
+        withContext(Dispatchers.Default) {
+            // Copy of kotlinx.coroutines.channels.ArrayChannelTest.testSimple
+            val q = Channel<Int>(1)
+            check(q.isEmpty)
+            check(!q.isClosedForReceive)
+            check(!q.isClosedForSend)
+            val sender = launch {
+                q.send(1)
+                q.send(2)
+            }
+            val receiver = launch {
+                q.receive() == 1
+                q.receive() == 2
+            }
+            sender.join()
+            receiver.join()
+        }
+    }
+
     @Test(expected = BlockingOperationError::class)
     fun testReusingThreadsFailure() = runTest {
         val n = 100
diff --git a/kotlinx-coroutines-debug/test/LazyCoroutineTest.kt b/kotlinx-coroutines-debug/test/LazyCoroutineTest.kt
new file mode 100644
index 0000000..c872b6a
--- /dev/null
+++ b/kotlinx-coroutines-debug/test/LazyCoroutineTest.kt
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+package kotlinx.coroutines.debug
+
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.test.*
+
+class LazyCoroutineTest : DebugTestBase() {
+
+    @Test
+    fun testLazyCompletedCoroutine() = runTest {
+        val job = launch(start = CoroutineStart.LAZY) {}
+        job.invokeOnCompletion { expect(2) }
+        expect(1)
+        job.cancelAndJoin()
+        expect(3)
+        assertEquals(1, DebugProbes.dumpCoroutinesInfo().size) // Outer runBlocking
+        verifyPartialDump(1, "BlockingCoroutine{Active}")
+        finish(4)
+    }
+}
diff --git a/kotlinx-coroutines-debug/test/WithContextUndispatchedTest.kt b/kotlinx-coroutines-debug/test/WithContextUndispatchedTest.kt
new file mode 100644
index 0000000..e803c98
--- /dev/null
+++ b/kotlinx-coroutines-debug/test/WithContextUndispatchedTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+package kotlinx.coroutines.debug
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+
+// Test four our internal optimization "withContextUndispatched"
+class WithContextUndispatchedTest : DebugTestBase() {
+
+    @Test
+    fun testZip() = runTest {
+        val f1 = flowOf("a")
+        val f2 = flow {
+            nestedEmit()
+            yield()
+        }
+        f1.zip(f2) { i, j -> i + j }.collect {
+            bar(false)
+        }
+    }
+
+    private suspend fun FlowCollector<Int>.nestedEmit() {
+        emit(1)
+        emit(2)
+    }
+
+    @Test
+    fun testUndispatchedFlowOn() = runTest {
+        val flow = flowOf(1, 2, 3).flowOn(CoroutineName("..."))
+        flow.collect {
+            bar(true)
+        }
+    }
+
+    @Test
+    fun testUndispatchedFlowOnWithNestedCaller() = runTest {
+        val flow = flow {
+            nestedEmit()
+        }.flowOn(CoroutineName("..."))
+        flow.collect {
+            bar(true)
+        }
+    }
+
+    private suspend fun bar(forFlowOn: Boolean) {
+        yield()
+        if (forFlowOn) {
+            verifyFlowOn()
+        } else {
+            verifyZip()
+        }
+        yield()
+    }
+
+    private suspend fun verifyFlowOn() {
+        yield() // suspend
+        verifyPartialDump(1, "verifyFlowOn", "bar")
+    }
+
+    private suspend fun verifyZip() {
+        yield() // suspend
+        verifyPartialDump(2, "verifyZip", "bar", "nestedEmit")
+    }
+}
diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md
index b82fe85..0b1c239 100644
--- a/kotlinx-coroutines-test/README.md
+++ b/kotlinx-coroutines-test/README.md
@@ -9,7 +9,7 @@
 Add `kotlinx-coroutines-test` to your project test dependencies:
 ```
 dependencies {
-    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.0-M1'
+    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.0'
 }
 ```
 
diff --git a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt
index d274083..6fd9a5e 100644
--- a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt
@@ -28,7 +28,7 @@
  * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
  *        to cancellation and error handling may change in the future.
  */
-@ExperimentalCoroutinesApi
+@ExperimentalCoroutinesApi // Since 1.3.x
 public fun <T> flowPublish(
     context: CoroutineContext = EmptyCoroutineContext,
     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index cf73ef2..41c82ed 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -81,7 +81,13 @@
     val observer = object : Observer<T> {
         override fun onComplete() { close() }
         override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
-        override fun onNext(t: T) { sendBlocking(t) }
+        override fun onNext(t: T) {
+            try {
+                sendBlocking(t)
+            } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
+                // Is handled by the downstream flow
+            }
+        }
         override fun onError(e: Throwable) { close(e) }
     }
 
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
new file mode 100644
index 0000000..159f372
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import java.util.concurrent.*
+
+class ObservableSourceAsFlowStressTest : TestBase() {
+
+    private val iterations = 100 * stressTestMultiplierSqrt
+
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testAsFlowCancellation() = runTest {
+        repeat(iterations) {
+            val latch = Channel<Unit>(1)
+            var i = 0
+            val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
+                .doOnNext {  if (++i > 100) latch.offer(Unit) }
+            val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
+            latch.receive()
+            job.cancelAndJoin()
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index 9bb38c0..0978423 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -81,7 +81,13 @@
     val observer = object : Observer<T> {
         override fun onComplete() { close() }
         override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
-        override fun onNext(t: T) { sendBlocking(t) }
+        override fun onNext(t: T) {
+            try {
+                sendBlocking(t)
+            } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
+                // Is handled by the downstream flow
+            }
+        }
         override fun onError(e: Throwable) { close(e) }
     }
 
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
new file mode 100644
index 0000000..431a7a7
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import java.util.concurrent.*
+
+class ObservableSourceAsFlowStressTest : TestBase() {
+
+    private val iterations = 100 * stressTestMultiplierSqrt
+
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testAsFlowCancellation() = runTest {
+        repeat(iterations) {
+            val latch = Channel<Unit>(1)
+            var i = 0
+            val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
+                .doOnNext {  if (++i > 100) latch.offer(Unit) }
+            val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
+            latch.receive()
+            job.cancelAndJoin()
+        }
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index d22d65f..3a07891 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -8,7 +8,7 @@
 
         // JMH
         id "net.ltgt.apt" version "0.21"
-        id "me.champeau.gradle.jmh" version "0.5.0"
+        id "me.champeau.gradle.jmh" version "0.5.2"
     }
 }
 
diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md
index 5df8d7f..297b1fb 100644
--- a/ui/coroutines-guide-ui.md
+++ b/ui/coroutines-guide-ui.md
@@ -110,7 +110,7 @@
 `app/build.gradle` file:
 
 ```groovy
-implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0-M1"
+implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0"
 ```
 
 You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your 
diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
index cd34eb1..5ee7794 100644
--- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
@@ -21,7 +21,7 @@
 # org.gradle.parallel=true
 
 kotlin_version=1.4.0
-coroutines_version=1.4.0-M1
+coroutines_version=1.4.0
 
 android.useAndroidX=true
 android.enableJetifier=true
diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties
index cd34eb1..5ee7794 100644
--- a/ui/kotlinx-coroutines-android/example-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties
@@ -21,7 +21,7 @@
 # org.gradle.parallel=true
 
 kotlin_version=1.4.0
-coroutines_version=1.4.0-M1
+coroutines_version=1.4.0
 
 android.useAndroidX=true
 android.enableJetifier=true
diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
index 903b60a..c7fcb1c 100644
--- a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
+++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -24,7 +24,7 @@
  * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
  * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
  */
-@ExperimentalCoroutinesApi
+@ExperimentalCoroutinesApi // Since 1.3.x
 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
     val listener = ChangeListener<T> { _, _, newValue ->
         try {