Coroutines debugger should keep weak references to running coroutines (#2129)

It should not prevent garbage-collection of coroutines that were otherwise lost, which included the following practically-useful cases:
* Synchronous coroutines (iterator/sequence).
* Lazy coroutines that were not started.
* Abandoned coroutines that suspend forever without strong references to them in GlobalScope.

Two kinds of tests cover this functionality:
* A test via FieldWalker ensures that debugger impl does not keep a strong reference. This tests works fast and provides good diagnostics if anything goes wrong, but it is fragile, as futures changes to debugger my introduce static references to running coroutines elsewhere.
* A stress-test that ensures that no OOM indeed happens when you run a lot of such lost coroutines. Longer-running, more stable to code change, but fragile in a difference sense as it may accidentally start passing in the future if lots of memory get allocated for tests.

Fixes #2117
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 2f1fd15..4609c68 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -840,10 +840,34 @@
 	public final synthetic fun unbox-impl ()Ljava/lang/Object;
 }
 
+public final class kotlinx/coroutines/debug/internal/DebugCoroutineInfo {
+	public fun <init> (Lkotlinx/coroutines/debug/internal/DebugCoroutineInfoImpl;Lkotlin/coroutines/CoroutineContext;)V
+	public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
+	public final fun getCreationStackBottom ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
+	public final fun getCreationStackTrace ()Ljava/util/List;
+	public final fun getLastObservedFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
+	public final fun getLastObservedThread ()Ljava/lang/Thread;
+	public final fun getSequenceNumber ()J
+	public final fun getState ()Ljava/lang/String;
+	public final fun lastObservedStackTrace ()Ljava/util/List;
+}
+
 public synthetic class kotlinx/coroutines/debug/internal/DebugProbesImplSequenceNumberRefVolatile {
 	public fun <init> (J)V
 }
 
+public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Serializable {
+	public fun <init> (Lkotlinx/coroutines/debug/internal/DebugCoroutineInfoImpl;Lkotlin/coroutines/CoroutineContext;)V
+	public final fun getCoroutineId ()Ljava/lang/Long;
+	public final fun getDispatcher ()Ljava/lang/String;
+	public final fun getLastObservedStackTrace ()Ljava/util/List;
+	public final fun getLastObservedThreadName ()Ljava/lang/String;
+	public final fun getLastObservedThreadState ()Ljava/lang/String;
+	public final fun getName ()Ljava/lang/String;
+	public final fun getSequenceNumber ()J
+	public final fun getState ()Ljava/lang/String;
+}
+
 public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
 	public fun <init> ()V
 	public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt
new file mode 100644
index 0000000..79f024c
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt
@@ -0,0 +1,284 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.debug.internal
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.internal.*
+import java.lang.ref.*
+
+// This is very limited implementation, not suitable as a generic map replacement.
+// It has lock-free get and put with synchronized rehash for simplicity (and better CPU usage on contention)
+@OptIn(ExperimentalStdlibApi::class)
+@Suppress("UNCHECKED_CAST")
+internal class ConcurrentWeakMap<K : Any, V: Any>(
+    /**
+     * Weak reference queue is needed when a small key is mapped to a large value and we need to promptly release a
+     * reference to the value when the key was already disposed.
+     */
+    weakRefQueue: Boolean = false
+) : AbstractMutableMap<K, V>() {
+    private val _size = atomic(0)
+    private val core = atomic(Core(MIN_CAPACITY))
+    private val weakRefQueue: ReferenceQueue<K>? = if (weakRefQueue) ReferenceQueue() else null
+
+    override val size: Int
+        get() = _size.value
+
+    private fun decrementSize() { _size.decrementAndGet() }
+
+    override fun get(key: K): V? = core.value.getImpl(key)
+
+    override fun put(key: K, value: V): V? {
+        var oldValue = core.value.putImpl(key, value)
+        if (oldValue === REHASH) oldValue = putSynchronized(key, value)
+        if (oldValue == null) _size.incrementAndGet()
+        return oldValue as V?
+    }
+
+    override fun remove(key: K): V? {
+        var oldValue = core.value.putImpl(key, null)
+        if (oldValue === REHASH) oldValue = putSynchronized(key, null)
+        if (oldValue != null) _size.decrementAndGet()
+        return oldValue as V?
+    }
+
+    @Synchronized
+    private fun putSynchronized(key: K, value: V?): V? {
+        // Note: concurrent put leaves chance that we fail to put even after rehash, we retry until successful
+        var curCore = core.value
+        while (true) {
+            val oldValue = curCore.putImpl(key, value)
+            if (oldValue !== REHASH) return oldValue as V?
+            curCore = curCore.rehash()
+            core.value = curCore
+        }
+    }
+
+    override val keys: MutableSet<K>
+        get() = KeyValueSet { k, _ -> k }
+
+    override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
+        get() = KeyValueSet { k, v -> Entry(k, v) }
+
+    // We don't care much about clear's efficiency
+    override fun clear() {
+        for (k in keys) remove(k)
+    }
+
+    fun runWeakRefQueueCleaningLoopUntilInterrupted() {
+        check(weakRefQueue != null) { "Must be created with weakRefQueue = true" }
+        try {
+            while (true) {
+                cleanWeakRef(weakRefQueue.remove() as HashedWeakRef<*>)
+            }
+        } catch(e: InterruptedException) {
+            Thread.currentThread().interrupt()
+        }
+    }
+
+    private fun cleanWeakRef(w: HashedWeakRef<*>) {
+        core.value.cleanWeakRef(w)
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private inner class Core(private val allocated: Int) {
+        private val shift = allocated.countLeadingZeroBits() + 1
+        private val threshold = 2 * allocated / 3 // max fill factor at 66% to ensure speedy lookups
+        private val load = atomic(0) // counts how many slots are occupied in this core
+        private val keys = atomicArrayOfNulls<HashedWeakRef<K>?>(allocated)
+        private val values = atomicArrayOfNulls<Any?>(allocated)
+
+        private fun index(hash: Int) = (hash * MAGIC) ushr shift
+
+        // get is always lock-free, unwraps the value that was marked by concurrent rehash
+        fun getImpl(key: K): V? {
+            var index = index(key.hashCode())
+            while (true) {
+                val w = keys[index].value ?: return null // not found
+                val k = w.get()
+                if (key == k) {
+                    val value = values[index].value
+                    return (if (value is Marked) value.ref else value) as V?
+                }
+                if (k == null) removeCleanedAt(index) // weak ref was here, but collected
+                if (index == 0) index = allocated
+                index--
+            }
+        }
+
+        private fun removeCleanedAt(index: Int) {
+            while (true) {
+                val oldValue = values[index].value ?: return // return when already removed
+                if (oldValue is Marked) return // cannot remove marked (rehash is working on it, will not copy)
+                if (values[index].compareAndSet(oldValue, null)) { // removed
+                    decrementSize()
+                    return
+                }
+            }
+        }
+
+        // returns REHASH when rehash is needed (the value was not put)
+        fun putImpl(key: K, value: V?, weakKey0: HashedWeakRef<K>? = null): Any? {
+            var index = index(key.hashCode())
+            var loadIncremented = false
+            var weakKey: HashedWeakRef<K>? = weakKey0
+            while (true) {
+                val w = keys[index].value
+                if (w == null) { // slot empty => not found => try reserving slot
+                    if (value == null) return null // removing missing value, nothing to do here
+                    if (!loadIncremented) {
+                        // We must increment load before we even try to occupy a slot to avoid overfill during concurrent put
+                        load.update { n ->
+                            if (n >= threshold) return REHASH // the load is already too big -- rehash
+                            n + 1 // otherwise increment
+                        }
+                        loadIncremented = true
+                    }
+                    if (weakKey == null) weakKey = HashedWeakRef(key, weakRefQueue)
+                    if (keys[index].compareAndSet(null, weakKey)) break // slot reserved !!!
+                    continue // retry at this slot on CAS failure (somebody already reserved this slot)
+                }
+                val k = w.get()
+                if (key == k) { // found already reserved slot at index
+                    if (loadIncremented) load.decrementAndGet() // undo increment, because found a slot
+                    break
+                }
+                if (k == null) removeCleanedAt(index) // weak ref was here, but collected
+                if (index == 0) index = allocated
+                index--
+            }
+            // update value
+            var oldValue: Any?
+            while (true) {
+                oldValue = values[index].value
+                if (oldValue is Marked) return REHASH // rehash started, cannot work here
+                if (values[index].compareAndSet(oldValue, value)) break
+            }
+            return oldValue as V?
+        }
+
+        // only one thread can rehash, but may have concurrent puts/gets
+        fun rehash(): Core {
+            // use size to approximate new required capacity to have at least 25-50% fill factor,
+            // may fail due to concurrent modification, will retry
+            retry@while (true) {
+                val newCapacity = size.coerceAtLeast(MIN_CAPACITY / 4).takeHighestOneBit() * 4
+                val newCore = Core(newCapacity)
+                for (index in 0 until allocated) {
+                    // load the key
+                    val w = keys[index].value
+                    val k = w?.get()
+                    if (w != null && k == null) removeCleanedAt(index) // weak ref was here, but collected
+                    // mark value so that it cannot be changed while we rehash to new core
+                    var value: Any?
+                    while (true) {
+                        value = values[index].value
+                        if (value is Marked) { // already marked -- good
+                            value = value.ref
+                            break
+                        }
+                        // try mark
+                        if (values[index].compareAndSet(value, value.mark())) break
+                    }
+                    if (k != null && value != null) {
+                        val oldValue = newCore.putImpl(k, value as V, w)
+                        if (oldValue === REHASH) continue@retry // retry if we underestimated capacity
+                        assert(oldValue == null)
+                    }
+                }
+                return newCore // rehashed everything successfully
+            }
+        }
+
+        fun cleanWeakRef(weakRef: HashedWeakRef<*>) {
+            var index = index(weakRef.hash)
+            while (true) {
+                val w = keys[index].value ?: return // return when slots are over
+                if (w === weakRef) { // found
+                    removeCleanedAt(index)
+                    return
+                }
+                if (index == 0) index = allocated
+                index--
+            }
+        }
+
+        fun <E> keyValueIterator(factory: (K, V) -> E): MutableIterator<E> = KeyValueIterator(factory)
+
+        private inner class KeyValueIterator<E>(private val factory: (K, V) -> E) : MutableIterator<E> {
+            private var index = -1
+            private lateinit var key: K
+            private lateinit var value: V
+
+            init { findNext() }
+
+            private fun findNext() {
+                while (++index < allocated) {
+                    key = keys[index].value?.get() ?: continue
+                    var value = values[index].value
+                    if (value is Marked) value = value.ref
+                    if (value != null) {
+                        this.value = value as V
+                        return
+                    }
+                }
+            }
+
+            override fun hasNext(): Boolean = index < allocated
+
+            override fun next(): E {
+                if (index >= allocated) throw NoSuchElementException()
+                return factory(key, value).also { findNext() }
+            }
+
+            override fun remove() = noImpl()
+        }
+    }
+
+    private class Entry<K, V>(override val key: K, override val value: V) : MutableMap.MutableEntry<K, V> {
+        override fun setValue(newValue: V): V = noImpl()
+    }
+
+    private inner class KeyValueSet<E>(
+        private val factory: (K, V) -> E
+    ) : AbstractMutableSet<E>() {
+        override val size: Int get() = this@ConcurrentWeakMap.size
+        override fun add(element: E): Boolean = noImpl()
+        override fun iterator(): MutableIterator<E> = core.value.keyValueIterator(factory)
+    }
+}
+
+private const val MAGIC = 2654435769L.toInt() // golden ratio
+private const val MIN_CAPACITY = 16
+private val REHASH = Symbol("REHASH")
+private val MARKED_NULL = Marked(null)
+private val MARKED_TRUE = Marked(true) // When using map as set "true" used as value, optimize its mark allocation
+
+/**
+ * Weak reference that stores the original hash code so that we can use reference queue to promptly clean them up
+ * from the hashtable even in the absence of ongoing modifications.
+ */
+internal class HashedWeakRef<T>(
+    ref: T, queue: ReferenceQueue<T>?
+) : WeakReference<T>(ref, queue) {
+    @JvmField
+    val hash = ref.hashCode()
+}
+
+/**
+ * Marked values cannot be modified. The marking is performed when rehash has started to ensure that concurrent
+ * modifications (that are lock-free) cannot perform any changes and are forced to synchronize with ongoing rehash.
+ */
+private class Marked(@JvmField val ref: Any?)
+
+private fun Any?.mark(): Marked = when(this) {
+    null -> MARKED_NULL
+    true -> MARKED_TRUE
+    else -> Marked(this)
+}
+
+private fun noImpl(): Nothing {
+    throw UnsupportedOperationException("not implemented")
+}
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt
index 82e18ea..9d9fa3f 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt
@@ -7,81 +7,24 @@
 import kotlin.coroutines.*
 import kotlin.coroutines.jvm.internal.*
 
-internal const val CREATED = "CREATED"
-internal const val RUNNING = "RUNNING"
-internal const val SUSPENDED = "SUSPENDED"
-
+/**
+ * This class represents the data required by IDEA debugger.
+ * IDEA debugger either directly reads data from the corresponding JVM fields of this class or calls the getters,
+ * so we keep both for maximal flexibility for now.
+ * **DO NOT MAKE BINARY-INCOMPATIBLE CHANGES TO THIS CLASS**.
+ */
+@Suppress("unused")
+@PublishedApi
 internal class DebugCoroutineInfo(
-    public val context: CoroutineContext,
-    public val creationStackBottom: CoroutineStackFrame?,
-    @JvmField internal val sequenceNumber: Long
+    source: DebugCoroutineInfoImpl,
+    public val context: CoroutineContext // field is used as of 1.4-M3
 ) {
-
-    public val creationStackTrace: List<StackTraceElement> get() = creationStackTrace()
-
-    /**
-     * Last observed state of the coroutine.
-     * Can be CREATED, RUNNING, SUSPENDED.
-     */
-    public val state: String get() = _state
-    private var _state: String = CREATED
-
-    @JvmField
-    internal var lastObservedThread: Thread? = null
-    @JvmField
-    internal var lastObservedFrame: CoroutineStackFrame? = null
-
-    public fun copy(): DebugCoroutineInfo = DebugCoroutineInfo(
-        context,
-        creationStackBottom,
-        sequenceNumber
-    ).also {
-        it._state = _state
-        it.lastObservedFrame = lastObservedFrame
-        it.lastObservedThread = lastObservedThread
-    }
-
-    /**
-     * Last observed stacktrace of the coroutine captured on its suspension or resumption point.
-     * It means that for [running][State.RUNNING] coroutines resulting stacktrace is inaccurate and
-     * reflects stacktrace of the resumption point, not the actual current stacktrace.
-     */
-    public fun lastObservedStackTrace(): List<StackTraceElement> {
-        var frame: CoroutineStackFrame? = lastObservedFrame ?: return emptyList()
-        val result = ArrayList<StackTraceElement>()
-        while (frame != null) {
-            frame.getStackTraceElement()?.let { result.add(it) }
-            frame = frame.callerFrame
-        }
-        return result
-    }
-
-    private fun creationStackTrace(): List<StackTraceElement> {
-        val bottom = creationStackBottom ?: return emptyList()
-        // Skip "Coroutine creation stacktrace" frame
-        return sequence<StackTraceElement> { yieldFrames(bottom.callerFrame) }.toList()
-    }
-
-    private tailrec suspend fun SequenceScope<StackTraceElement>.yieldFrames(frame: CoroutineStackFrame?) {
-        if (frame == null) return
-        frame.getStackTraceElement()?.let { yield(it) }
-        val caller = frame.callerFrame
-        if (caller != null) {
-            yieldFrames(caller)
-        }
-    }
-
-    internal fun updateState(state: String, frame: Continuation<*>) {
-        // Propagate only duplicating transitions to running for KT-29997
-        if (_state == state && state == SUSPENDED && lastObservedFrame != null) return
-        _state = state
-        lastObservedFrame = frame as? CoroutineStackFrame
-        lastObservedThread = if (state == RUNNING) {
-            Thread.currentThread()
-        } else {
-            null
-        }
-    }
-
-    override fun toString(): String = "DebugCoroutineInfo(state=$state,context=$context)"
-}
+    public val creationStackBottom: CoroutineStackFrame? = source.creationStackBottom // field is used as of 1.4-M3
+    public val sequenceNumber: Long = source.sequenceNumber // field is used as of 1.4-M3
+    public val creationStackTrace = source.creationStackTrace // getter is used as of 1.4-M3
+    public val state: String = source.state // getter is used as of 1.4-M3
+    public val lastObservedThread: Thread? = source.lastObservedThread // field is used as of 1.4-M3
+    public val lastObservedFrame: CoroutineStackFrame? = source.lastObservedFrame // field is used as of 1.4-M3
+    @get:JvmName("lastObservedStackTrace") // method with this name is used as of 1.4-M3
+    public val lastObservedStackTrace: List<StackTraceElement> = source.lastObservedStackTrace()
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt
new file mode 100644
index 0000000..cf007bb
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.debug.internal
+
+import java.lang.ref.*
+import kotlin.coroutines.*
+import kotlin.coroutines.jvm.internal.*
+
+internal const val CREATED = "CREATED"
+internal const val RUNNING = "RUNNING"
+internal const val SUSPENDED = "SUSPENDED"
+
+/**
+ * Internal implementation class where debugger tracks details it knows about each coroutine.
+ */
+internal class DebugCoroutineInfoImpl(
+    context: CoroutineContext?,
+    /**
+     * A reference to a stack-trace that is converted to a [StackTraceFrame] which implements [CoroutineStackFrame].
+     * The actual reference to the coroutine is not stored here, so we keep a strong reference.
+     */
+    public val creationStackBottom: StackTraceFrame?,
+    @JvmField internal val sequenceNumber: Long
+) {
+    /**
+     * We cannot keep a strong reference to the context, because with the [Job] in the context it will indirectly
+     * keep a reference to the last frame of an abandoned coroutine which the debugger should not be preventing
+     * garbage-collection of. The reference to context will not disappear as long as the coroutine itself is not lost.
+     */
+    private val _context = WeakReference(context)
+    public val context: CoroutineContext? // can be null when the coroutine was already garbage-collected
+        get() = _context.get()
+
+    public val creationStackTrace: List<StackTraceElement> get() = creationStackTrace()
+
+    /**
+     * Last observed state of the coroutine.
+     * Can be CREATED, RUNNING, SUSPENDED.
+     */
+    public val state: String get() = _state
+    private var _state: String = CREATED
+
+    @JvmField
+    internal var lastObservedThread: Thread? = null
+
+    /**
+     * We cannot keep a strong reference to the last observed frame of the coroutine, because this will
+     * prevent garbage-collection of a coroutine that was lost.
+     */
+    private var _lastObservedFrame: WeakReference<CoroutineStackFrame>? = null
+    internal var lastObservedFrame: CoroutineStackFrame?
+        get() = _lastObservedFrame?.get()
+        set(value) { _lastObservedFrame = value?.let { WeakReference(it) } }
+
+    /**
+     * Last observed stacktrace of the coroutine captured on its suspension or resumption point.
+     * It means that for [running][State.RUNNING] coroutines resulting stacktrace is inaccurate and
+     * reflects stacktrace of the resumption point, not the actual current stacktrace.
+     */
+    public fun lastObservedStackTrace(): List<StackTraceElement> {
+        var frame: CoroutineStackFrame? = lastObservedFrame ?: return emptyList()
+        val result = ArrayList<StackTraceElement>()
+        while (frame != null) {
+            frame.getStackTraceElement()?.let { result.add(it) }
+            frame = frame.callerFrame
+        }
+        return result
+    }
+
+    private fun creationStackTrace(): List<StackTraceElement> {
+        val bottom = creationStackBottom ?: return emptyList()
+        // Skip "Coroutine creation stacktrace" frame
+        return sequence<StackTraceElement> { yieldFrames(bottom.callerFrame) }.toList()
+    }
+
+    private tailrec suspend fun SequenceScope<StackTraceElement>.yieldFrames(frame: CoroutineStackFrame?) {
+        if (frame == null) return
+        frame.getStackTraceElement()?.let { yield(it) }
+        val caller = frame.callerFrame
+        if (caller != null) {
+            yieldFrames(caller)
+        }
+    }
+
+    internal fun updateState(state: String, frame: Continuation<*>) {
+        // Propagate only duplicating transitions to running for KT-29997
+        if (_state == state && state == SUSPENDED && lastObservedFrame != null) return
+        _state = state
+        lastObservedFrame = frame as? CoroutineStackFrame
+        lastObservedThread = if (state == RUNNING) {
+            Thread.currentThread()
+        } else {
+            null
+        }
+    }
+
+    override fun toString(): String = "DebugCoroutineInfo(state=$state,context=$context)"
+}
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
index 6ff51f3..9dd6c5a 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
@@ -7,24 +7,38 @@
 import kotlinx.atomicfu.*
 import kotlinx.coroutines.*
 import kotlinx.coroutines.debug.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.internal.ScopeCoroutine
 import java.io.*
+import java.lang.StackTraceElement
 import java.text.*
-import java.util.*
-import java.util.concurrent.*
 import java.util.concurrent.locks.*
 import kotlin.collections.ArrayList
 import kotlin.concurrent.*
 import kotlin.coroutines.*
-import kotlin.coroutines.jvm.internal.*
+import kotlin.coroutines.jvm.internal.CoroutineStackFrame
+import kotlin.synchronized
 import kotlinx.coroutines.internal.artificialFrame as createArtificialFrame // IDEA bug workaround
 
 internal object DebugProbesImpl {
     private const val ARTIFICIAL_FRAME_MESSAGE = "Coroutine creation stacktrace"
     private val dateFormat = SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-    private val capturedCoroutines = Collections.newSetFromMap(ConcurrentHashMap<CoroutineOwner<*>, Boolean>())
+
+    private var weakRefCleanerThread: Thread? = null
+
+    // Values are boolean, so this map does not need to use a weak reference queue
+    private val capturedCoroutinesMap = ConcurrentWeakMap<CoroutineOwner<*>, Boolean>()
+    private val capturedCoroutines: Set<CoroutineOwner<*>> get() = capturedCoroutinesMap.keys
+
     @Volatile
     private var installations = 0
+
+    /**
+     * This internal method is used by IDEA debugger under the JVM name of
+     * "isInstalled$kotlinx_coroutines_debug".
+     */
     internal val isInstalled: Boolean get() = installations > 0
+
     // To sort coroutines by creation order, used as unique id
     private val sequenceNumber = atomic(0L)
     /*
@@ -52,7 +66,6 @@
         ctor.newInstance() as Function1<Boolean, Unit>
     }.getOrNull()
 
-
     /*
      * This is an optimization in the face of KT-29997:
      * Consider suspending call stack a()->b()->c() and c() completes its execution and every call is
@@ -60,11 +73,15 @@
      *
      * Then at least three RUNNING -> RUNNING transitions will occur consecutively and complexity of each is O(depth).
      * To avoid that quadratic complexity, we are caching lookup result for such chains in this map and update it incrementally.
+     *
+     * [DebugCoroutineInfoImpl] keeps a lot of auxiliary information about a coroutine, so we use a weak reference queue
+     * to promptly release the corresponding memory when the reference to the coroutine itself was already collected.
      */
-    private val callerInfoCache = ConcurrentHashMap<CoroutineStackFrame, DebugCoroutineInfo>()
+    private val callerInfoCache = ConcurrentWeakMap<CoroutineStackFrame, DebugCoroutineInfoImpl>(weakRefQueue = true)
 
     public fun install(): Unit = coroutineStateLock.write {
         if (++installations > 1) return
+        startWeakRefCleanerThread()
         if (AgentPremain.isInstalledStatically) return
         dynamicAttach?.invoke(true) // attach
     }
@@ -72,12 +89,24 @@
     public fun uninstall(): Unit = coroutineStateLock.write {
         check(isInstalled) { "Agent was not installed" }
         if (--installations != 0) return
-        capturedCoroutines.clear()
+        stopWeakRefCleanerThread()
+        capturedCoroutinesMap.clear()
         callerInfoCache.clear()
         if (AgentPremain.isInstalledStatically) return
         dynamicAttach?.invoke(false) // detach
     }
 
+    private fun startWeakRefCleanerThread() {
+        weakRefCleanerThread = thread(isDaemon = true, name = "Coroutines Debugger Cleaner") {
+            callerInfoCache.runWeakRefQueueCleaningLoopUntilInterrupted()
+        }
+    }
+
+    private fun stopWeakRefCleanerThread() {
+        weakRefCleanerThread?.interrupt()
+        weakRefCleanerThread = null
+    }
+
     public fun hierarchyToString(job: Job): String = coroutineStateLock.write {
         check(isInstalled) { "Debug probes are not installed" }
         val jobToStack = capturedCoroutines
@@ -88,13 +117,13 @@
         }
     }
 
-    private fun Job.build(map: Map<Job, DebugCoroutineInfo>, builder: StringBuilder, indent: String) {
+    private fun Job.build(map: Map<Job, DebugCoroutineInfoImpl>, builder: StringBuilder, indent: String) {
         val info = map[this]
         val newIndent: String
         if (info == null) { // Append coroutine without stacktrace
             // Do not print scoped coroutines and do not increase indentation level
             @Suppress("INVISIBLE_REFERENCE")
-            if (this !is kotlinx.coroutines.internal.ScopeCoroutine<*>) {
+            if (this !is ScopeCoroutine<*>) {
                 builder.append("$indent$debugString\n")
                 newIndent = indent + "\t"
             } else {
@@ -116,19 +145,32 @@
     @Suppress("DEPRECATION_ERROR") // JobSupport
     private val Job.debugString: String get() = if (this is JobSupport) toDebugString() else toString()
 
-    public fun dumpCoroutinesInfo(): List<DebugCoroutineInfo> = coroutineStateLock.write {
-        check(isInstalled) { "Debug probes are not installed" }
-        return capturedCoroutines.asSequence()
-            .map { it.info.copy() } // Copy as CoroutineInfo can be mutated concurrently by DebugProbes
-            .sortedBy { it.sequenceNumber }
-            .toList()
-    }
+    /**
+     * Private method that dumps coroutines so that different public-facing method can use
+     * to produce different result types.
+     */
+    private inline fun <R : Any> dumpCoroutinesInfoImpl(create: (CoroutineOwner<*>, CoroutineContext) -> R): List<R> =
+        coroutineStateLock.write {
+            check(isInstalled) { "Debug probes are not installed" }
+            capturedCoroutines
+                // Stable ordering of coroutines by their sequence number
+                .sortedBy { it.info.sequenceNumber }
+                // Leave in the dump only the coroutines that were not collected while we were dumping them
+                .mapNotNull { owner -> owner.info.context?.let { context -> create(owner, context) } }
+        }
 
     /*
-     * Internal (JVM-public) method used by IDEA debugger.
-     * It is equivalent to dumpCoroutines, but returns serializable (and thus less typed) objects.
+     * Internal (JVM-public) method used by IDEA debugger as of 1.4-M3.
      */
-    public fun dumpDebuggerInfo() = dumpCoroutinesInfo().map { DebuggerInfo(it) }
+    public fun dumpCoroutinesInfo(): List<DebugCoroutineInfo> =
+        dumpCoroutinesInfoImpl { owner, context -> DebugCoroutineInfo(owner.info, context) }
+
+    /*
+     * Internal (JVM-public) method to be used by IDEA debugger in the future (not used as of 1.4-M3).
+     * It is equivalent to [dumpCoroutinesInfo], but returns serializable (and thus less typed) objects.
+     */
+    public fun dumpDebuggerInfo(): List<DebuggerInfo> =
+        dumpCoroutinesInfoImpl { owner, context -> DebuggerInfo(owner.info, context) }
 
     public fun dumpCoroutines(out: PrintStream): Unit = synchronized(out) {
         /*
@@ -145,17 +187,15 @@
         check(isInstalled) { "Debug probes are not installed" }
         out.print("Coroutines dump ${dateFormat.format(System.currentTimeMillis())}")
         capturedCoroutines
-            .asSequence()
             .sortedBy { it.info.sequenceNumber }
             .forEach { owner ->
                 val info = owner.info
                 val observedStackTrace = info.lastObservedStackTrace()
-                val enhancedStackTrace = enhanceStackTraceWithThreadDump(info, observedStackTrace)
+                val enhancedStackTrace = enhanceStackTraceWithThreadDumpImpl(info.state, info.lastObservedThread, observedStackTrace)
                 val state = if (info.state == RUNNING && enhancedStackTrace === observedStackTrace)
                     "${info.state} (Last suspension stacktrace, not an actual stacktrace)"
                 else
-                    info.state.toString()
-
+                    info.state
                 out.print("\n\nCoroutine ${owner.delegate}, state: $state")
                 if (observedStackTrace.isEmpty()) {
                     out.print("\n\tat ${createArtificialFrame(ARTIFICIAL_FRAME_MESSAGE)}")
@@ -172,18 +212,29 @@
         }
     }
 
+    /*
+     * Internal (JVM-public) method used by IDEA debugger as of 1.4-M3.
+     * It is similar to [enhanceStackTraceWithThreadDumpImpl], but uses debugger-facing [DebugCoroutineInfo] type.
+     */
+    @Suppress("unused")
+    public fun enhanceStackTraceWithThreadDump(
+        info: DebugCoroutineInfo,
+        coroutineTrace: List<StackTraceElement>
+    ): List<StackTraceElement> =
+        enhanceStackTraceWithThreadDumpImpl(info.state, info.lastObservedThread, coroutineTrace)
+
     /**
-     * Tries to enhance [coroutineTrace] (obtained by call to [DebugCoroutineInfo.lastObservedStackTrace]) with
-     * thread dump of [DebugCoroutineInfo.lastObservedThread].
+     * Tries to enhance [coroutineTrace] (obtained by call to [DebugCoroutineInfoImpl.lastObservedStackTrace]) with
+     * thread dump of [DebugCoroutineInfoImpl.lastObservedThread].
      *
      * Returns [coroutineTrace] if enhancement was unsuccessful or the enhancement result.
      */
-    private fun enhanceStackTraceWithThreadDump(
-        info: DebugCoroutineInfo,
+    private fun enhanceStackTraceWithThreadDumpImpl(
+        state: String,
+        thread: Thread?,
         coroutineTrace: List<StackTraceElement>
     ): List<StackTraceElement> {
-        val thread = info.lastObservedThread
-        if (info.state != RUNNING || thread == null) return coroutineTrace
+        if (state != RUNNING || thread == null) return coroutineTrace
         // Avoid security manager issues
         val actualTrace = runCatching { thread.stackTrace }.getOrNull()
             ?: return coroutineTrace
@@ -289,7 +340,7 @@
     private fun updateRunningState(frame: CoroutineStackFrame, state: String): Unit = coroutineStateLock.read {
         if (!isInstalled) return
         // Lookup coroutine info in cache or by traversing stack frame
-        val info: DebugCoroutineInfo
+        val info: DebugCoroutineInfoImpl
         val cached = callerInfoCache.remove(frame)
         if (cached != null) {
             info = cached
@@ -331,39 +382,36 @@
         val owner = completion.owner()
         if (owner != null) return completion
         /*
-         * Here we replace completion with a sequence of CoroutineStackFrame objects
+         * Here we replace completion with a sequence of StackTraceFrame objects
          * which represents creation stacktrace, thus making stacktrace recovery mechanism
          * even more verbose (it will attach coroutine creation stacktrace to all exceptions),
          * and then using CoroutineOwner completion as unique identifier of coroutineSuspended/resumed calls.
          */
-
         val frame = if (enableCreationStackTraces) {
-            val stacktrace = sanitizeStackTrace(Exception())
-            stacktrace.foldRight<StackTraceElement, CoroutineStackFrame?>(null) { frame, acc ->
-                object : CoroutineStackFrame {
-                    override val callerFrame: CoroutineStackFrame? = acc
-                    override fun getStackTraceElement(): StackTraceElement = frame
-                }
-            }
+            sanitizeStackTrace(Exception()).toStackTraceFrame()
         } else {
             null
         }
-
         return createOwner(completion, frame)
     }
 
-    private fun <T> createOwner(completion: Continuation<T>, frame: CoroutineStackFrame?): Continuation<T> {
+    private fun List<StackTraceElement>.toStackTraceFrame(): StackTraceFrame? =
+        foldRight<StackTraceElement, StackTraceFrame?>(null) { frame, acc ->
+            StackTraceFrame(acc, frame)
+        }
+
+    private fun <T> createOwner(completion: Continuation<T>, frame: StackTraceFrame?): Continuation<T> {
         if (!isInstalled) return completion
-        val info = DebugCoroutineInfo(completion.context, frame, sequenceNumber.incrementAndGet())
+        val info = DebugCoroutineInfoImpl(completion.context, frame, sequenceNumber.incrementAndGet())
         val owner = CoroutineOwner(completion, info, frame)
-        capturedCoroutines += owner
-        if (!isInstalled) capturedCoroutines.clear()
+        capturedCoroutinesMap[owner] = true
+        if (!isInstalled) capturedCoroutinesMap.clear()
         return owner
     }
 
     // Not guarded by the lock at all, does not really affect consistency
     private fun probeCoroutineCompleted(owner: CoroutineOwner<*>) {
-        capturedCoroutines.remove(owner)
+        capturedCoroutinesMap.remove(owner)
         /*
          * This removal is a guard against improperly implemented CoroutineStackFrame
          * and bugs in the compiler.
@@ -378,7 +426,7 @@
      */
     private class CoroutineOwner<T>(
         @JvmField val delegate: Continuation<T>,
-        @JvmField val info: DebugCoroutineInfo,
+        @JvmField val info: DebugCoroutineInfoImpl,
         private val frame: CoroutineStackFrame?
     ) : Continuation<T> by delegate, CoroutineStackFrame {
 
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt
index 4b95af9..3e9533b 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt
@@ -12,12 +12,14 @@
 
 /*
  * This class represents all the data required by IDEA debugger.
- * It is serializable in order to speedup JDWP interactions
+ * It is serializable in order to speedup JDWP interactions.
+ * **DO NOT MAKE BINARY-INCOMPATIBLE CHANGES TO THIS CLASS**.
  */
-internal class DebuggerInfo(source: DebugCoroutineInfo) : Serializable {
-    public val coroutineId: Long? = source.context[CoroutineId]?.id
-    public val dispatcher: String? = source.context[ContinuationInterceptor].toString()
-    public val name: String? = source.context[CoroutineName]?.name
+@PublishedApi
+internal class DebuggerInfo(source: DebugCoroutineInfoImpl, context: CoroutineContext) : Serializable {
+    public val coroutineId: Long? = context[CoroutineId]?.id
+    public val dispatcher: String? = context[ContinuationInterceptor]?.toString()
+    public val name: String? = context[CoroutineName]?.name
     public val state: String = source.state
     public val lastObservedThreadState: String? = source.lastObservedThread?.state?.toString()
     public val lastObservedThreadName = source.lastObservedThread?.name
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt
new file mode 100644
index 0000000..37c60ee
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.debug.internal
+
+import kotlin.coroutines.jvm.internal.*
+
+/**
+ * A stack-trace represented as [CoroutineStackFrame].
+ */
+internal class StackTraceFrame(
+    override val callerFrame: CoroutineStackFrame?,
+    private val stackTraceElement: StackTraceElement
+) : CoroutineStackFrame {
+    override fun getStackTraceElement(): StackTraceElement = stackTraceElement
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
index 12fd4de..e8079eb 100644
--- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
+++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
@@ -4,10 +4,13 @@
 
 package kotlinx.coroutines
 
+import java.lang.ref.*
 import java.lang.reflect.*
+import java.text.*
 import java.util.*
 import java.util.Collections.*
 import java.util.concurrent.atomic.*
+import java.util.concurrent.locks.*
 import kotlin.collections.ArrayList
 import kotlin.test.*
 
@@ -22,7 +25,11 @@
 
     init {
         // excluded/terminal classes (don't walk them)
-        fieldsCache += listOf(Any::class, String::class, Thread::class, Throwable::class)
+        fieldsCache += listOf(
+            Any::class, String::class, Thread::class, Throwable::class, StackTraceElement::class,
+            WeakReference::class, ReferenceQueue::class, AbstractMap::class,
+            ReentrantReadWriteLock::class, SimpleDateFormat::class
+        )
             .map { it.java }
             .associateWith { emptyList<Field>() }
     }
@@ -31,10 +38,10 @@
      * Reflectively starts to walk through object graph and returns identity set of all reachable objects.
      * Use [walkRefs] if you need a path from root for debugging.
      */
-    public fun walk(root: Any?): Set<Any> = walkRefs(root).keys
+    public fun walk(root: Any?): Set<Any> = walkRefs(root, false).keys
 
-    public fun assertReachableCount(expected: Int, root: Any?, predicate: (Any) -> Boolean) {
-        val visited = walkRefs(root)
+    public fun assertReachableCount(expected: Int, root: Any?, rootStatics: Boolean = false, predicate: (Any) -> Boolean) {
+        val visited = walkRefs(root, rootStatics)
         val actual = visited.keys.filter(predicate)
         if (actual.size != expected) {
             val textDump = actual.joinToString("") { "\n\t" + showPath(it, visited) }
@@ -49,16 +56,18 @@
      * Reflectively starts to walk through object graph and map to all the reached object to their path
      * in from root. Use [showPath] do display a path if needed.
      */
-    private fun walkRefs(root: Any?): Map<Any, Ref> {
+    private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
         val visited = IdentityHashMap<Any, Ref>()
         if (root == null) return visited
         visited[root] = Ref.RootRef
         val stack = ArrayDeque<Any>()
         stack.addLast(root)
+        var statics = rootStatics
         while (stack.isNotEmpty()) {
             val element = stack.removeLast()
             try {
-                visit(element, visited, stack)
+                visit(element, visited, stack, statics)
+                statics = false // only scan root static when asked
             } catch (e: Exception) {
                 error("Failed to visit element ${showPath(element, visited)}: $e")
             }
@@ -75,7 +84,7 @@
             when (ref) {
                 is Ref.FieldRef -> {
                     cur = ref.parent
-                    path += ".${ref.name}"
+                    path += "|${ref.parent.javaClass.simpleName}::${ref.name}"
                 }
                 is Ref.ArrayRef -> {
                     cur = ref.parent
@@ -87,7 +96,7 @@
         return path.joinToString("")
     }
 
-    private fun visit(element: Any, visited: IdentityHashMap<Any, Ref>, stack: ArrayDeque<Any>) {
+    private fun visit(element: Any, visited: IdentityHashMap<Any, Ref>, stack: ArrayDeque<Any>, statics: Boolean) {
         val type = element.javaClass
         when {
             // Special code for arrays
@@ -111,8 +120,16 @@
             element is AtomicReference<*> -> {
                 push(element.get(), visited, stack) { Ref.FieldRef(element, "value") }
             }
+            element is AtomicReferenceArray<*> -> {
+                for (index in 0 until element.length()) {
+                    push(element[index], visited, stack) { Ref.ArrayRef(element, index) }
+                }
+            }
+            element is AtomicLongFieldUpdater<*> -> {
+                /* filter it out here to suppress its subclasses too */
+            }
             // All the other classes are reflectively scanned
-            else -> fields(type).forEach { field ->
+            else -> fields(type, statics).forEach { field ->
                 push(field.get(element), visited, stack) { Ref.FieldRef(element, field.name) }
                 // special case to scan Throwable cause (cannot get it reflectively)
                 if (element is Throwable) {
@@ -129,19 +146,21 @@
         }
     }
 
-    private fun fields(type0: Class<*>): List<Field> {
+    private fun fields(type0: Class<*>, rootStatics: Boolean): List<Field> {
         fieldsCache[type0]?.let { return it }
         val result = ArrayList<Field>()
         var type = type0
+        var statics = rootStatics
         while (true) {
             val fields = type.declaredFields.filter {
                 !it.type.isPrimitive
-                        && !Modifier.isStatic(it.modifiers)
+                        && (statics || !Modifier.isStatic(it.modifiers))
                         && !(it.type.isArray && it.type.componentType.isPrimitive)
             }
             fields.forEach { it.isAccessible = true } // make them all accessible
             result.addAll(fields)
             type = type.superclass
+            statics = false
             val superFields = fieldsCache[type] // will stop at Any anyway
             if (superFields != null) {
                 result.addAll(superFields)
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
index 5f5620c..892a2a6 100644
--- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
@@ -203,6 +203,6 @@
         for (value in channel) {
             delay(1)
         }
-        FieldWalker.assertReachableCount(1, coroutineContext[Job], { it is ChildContinuation })
+        FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it is ChildContinuation }
     }
 }
diff --git a/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapCollectionStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapCollectionStressTest.kt
new file mode 100644
index 0000000..d9a2a96
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapCollectionStressTest.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+import junit.framework.Assert.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.debug.internal.*
+import org.junit.*
+import kotlin.concurrent.*
+
+class ConcurrentWeakMapCollectionStressTest : TestBase() {
+    private data class Key(val i: Int)
+    private val nElements = 100_000 * stressTestMultiplier
+    private val size = 100_000
+    
+    @Test
+    fun testCollected() {
+        // use very big arrays as values, we'll need a queue and a cleaner thread to handle them
+        val m = ConcurrentWeakMap<Key, ByteArray>(weakRefQueue = true)
+        val cleaner = thread(name = "ConcurrentWeakMapCollectionStressTest-Cleaner") {
+            m.runWeakRefQueueCleaningLoopUntilInterrupted()
+        }
+        for (i in 1..nElements) {
+            m.put(Key(i), ByteArray(size))
+        }
+        assertTrue(m.size < nElements) // some of it was collected for sure
+        cleaner.interrupt()
+        cleaner.join()
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapOperationStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapOperationStressTest.kt
new file mode 100644
index 0000000..49e6ccc
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapOperationStressTest.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.debug.internal.*
+import org.junit.Test
+import kotlin.concurrent.*
+import kotlin.test.*
+
+/**
+ * Concurrent test for [ConcurrentWeakMap] that tests put/get/remove from concurrent threads and is
+ * arranged so that concurrent rehashing is also happening.
+ */
+class ConcurrentWeakMapOperationStressTest : TestBase() {
+    private val nThreads = 10
+    private val batchSize = 1000
+    private val nSeconds = 3 * stressTestMultiplier
+
+    private val count = atomic(0L)
+    private val stop = atomic(false)
+
+    private data class Key(val i: Long)
+
+    @Test
+    fun testOperations() {
+        // We don't create queue here, because concurrent operations are enough to make it clean itself
+        val m = ConcurrentWeakMap<Key, Long>()
+        val threads = Array(nThreads) { index ->
+            thread(start = false, name = "ConcurrentWeakMapOperationStressTest-$index") {
+                var generationOffset = 0L
+                while (!stop.value) {
+                    val kvs = (generationOffset + batchSize * index until generationOffset + batchSize * (index + 1))
+                        .associateBy({ Key(it) }, {  it * it })
+                    generationOffset += batchSize * nThreads
+                    for ((k, v) in kvs) {
+                        assertEquals(null, m.put(k, v))
+                    }
+                    for ((k, v) in kvs) {
+                        assertEquals(v, m[k])
+                    }
+                    for ((k, v) in kvs) {
+                        assertEquals(v, m.remove(k))
+                    }
+                    for ((k, v) in kvs) {
+                        assertEquals(null, m.get(k))
+                    }
+                    count.incrementAndGet()
+                }
+            }
+        }
+        val uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { t, ex ->
+            ex.printStackTrace()
+            error("Error in thread $t", ex)
+        }
+        threads.forEach { it.uncaughtExceptionHandler = uncaughtExceptionHandler }
+        threads.forEach { it.start() }
+        var lastCount = -1L
+        for (sec in 1..nSeconds) {
+            Thread.sleep(1000)
+            val count = count.value
+            println("$sec: done $count batches")
+            assertTrue(count > lastCount) // ensure progress
+            lastCount = count
+        }
+        stop.value = true
+        threads.forEach { it.join() }
+        assertEquals(0, m.size)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapTest.kt b/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapTest.kt
new file mode 100644
index 0000000..ae4b5fc
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/internal/ConcurrentWeakMapTest.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+import junit.framework.Assert.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.debug.internal.*
+import org.junit.*
+
+class ConcurrentWeakMapTest : TestBase() {
+    @Test
+    fun testSimple() {
+        val expect = (1..1000).associateWith { it.toString() }
+        val m = ConcurrentWeakMap<Int, String>()
+        // repeat adding/removing a few times
+        repeat(5) {
+            assertEquals(0, m.size)
+            assertEquals(emptySet<Int>(), m.keys)
+            assertEquals(emptyList<String>(), m.values.toList())
+            assertEquals(emptySet<Map.Entry<Int, String>>(), m.entries)
+            for ((k, v) in expect) {
+                assertNull(m.put(k, v))
+            }
+            assertEquals(expect.size, m.size)
+            assertEquals(expect.keys, m.keys)
+            assertEquals(expect.entries, m.entries)
+            for ((k, v) in expect) {
+                assertEquals(v, m.get(k))
+            }
+            assertEquals(expect.size, m.size)
+            if (it % 2 == 0) {
+                for ((k, v) in expect) {
+                    assertEquals(v, m.remove(k))
+                }
+            } else {
+                m.clear()
+            }
+            assertEquals(0, m.size)
+            for ((k, v) in expect) {
+                assertNull(m.get(k))
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-debug/src/CoroutineInfo.kt b/kotlinx-coroutines-debug/src/CoroutineInfo.kt
index 11224f5..ce1478a 100644
--- a/kotlinx-coroutines-debug/src/CoroutineInfo.kt
+++ b/kotlinx-coroutines-debug/src/CoroutineInfo.kt
@@ -18,10 +18,12 @@
      * [Coroutine context][coroutineContext] of the coroutine
      */
     public val context: CoroutineContext = delegate.context
+
     /**
      * Last observed state of the coroutine
      */
     public val state: State = State.valueOf(delegate.state)
+
     private val creationStackBottom: CoroutineStackFrame? = delegate.creationStackBottom
 
     /**
diff --git a/kotlinx-coroutines-debug/test/DebugLeaksStressTest.kt b/kotlinx-coroutines-debug/test/DebugLeaksStressTest.kt
new file mode 100644
index 0000000..bf34917
--- /dev/null
+++ b/kotlinx-coroutines-debug/test/DebugLeaksStressTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.debug.*
+import org.junit.*
+
+/**
+ * This stress tests ensure that no actual [OutOfMemoryError] occurs when lots of coroutines are created and
+ * leaked in various ways under debugger. A faster but more fragile version of this test is in [DebugLeaksTest].
+ */
+class DebugLeaksStressTest : DebugTestBase() {
+    private val nRepeat = 100_000 * stressTestMultiplier
+    private val nBytes = 100_000
+
+    @Test
+    fun testIteratorLeak() {
+        repeat(nRepeat) {
+            val bytes = ByteArray(nBytes)
+            iterator { yield(bytes) }
+        }
+    }
+
+    @Test
+    fun testLazyGlobalCoroutineLeak() {
+        repeat(nRepeat) {
+            val bytes = ByteArray(nBytes)
+            GlobalScope.launch(start = CoroutineStart.LAZY) { println(bytes) }
+        }
+    }
+
+    @Test
+    fun testLazyCancelledChildCoroutineLeak() = runTest {
+        coroutineScope {
+            repeat(nRepeat) {
+                val bytes = ByteArray(nBytes)
+                val child = launch(start = CoroutineStart.LAZY) { println(bytes) }
+                child.cancel()
+            }
+        }
+    }
+
+    @Test
+    fun testAbandonedGlobalCoroutineLeak() {
+        repeat(nRepeat) {
+            val bytes = ByteArray(nBytes)
+            GlobalScope.launch {
+                suspendForever()
+                println(bytes)
+            }
+        }
+    }
+
+    private suspend fun suspendForever() = suspendCancellableCoroutine<Unit> {  }
+}
diff --git a/kotlinx-coroutines-debug/test/DebugLeaksTest.kt b/kotlinx-coroutines-debug/test/DebugLeaksTest.kt
new file mode 100644
index 0000000..a43b33b
--- /dev/null
+++ b/kotlinx-coroutines-debug/test/DebugLeaksTest.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.debug.*
+import kotlinx.coroutines.debug.internal.*
+import org.junit.*
+
+/**
+ * This is fast but fragile version of [DebugLeaksStressTest] that check reachability of a captured object
+ * in [DebugProbesImpl] via [FieldWalker].
+ */
+@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
+class DebugLeaksTest : DebugTestBase() {
+    private class Captured
+
+    @Test
+    fun testIteratorLeak() {
+        val captured = Captured()
+        iterator { yield(captured) }
+        assertNoCapturedReference()
+    }
+
+    @Test
+    fun testLazyGlobalCoroutineLeak() {
+        val captured = Captured()
+        GlobalScope.launch(start = CoroutineStart.LAZY) { println(captured) }
+        assertNoCapturedReference()
+    }
+
+    @Test
+    fun testLazyCancelledChildCoroutineLeak() = runTest {
+        val captured = Captured()
+        coroutineScope {
+            val child = launch(start = CoroutineStart.LAZY) { println(captured) }
+            child.cancel()
+        }
+        assertNoCapturedReference()
+    }
+
+    @Test
+    fun testAbandonedGlobalCoroutineLeak() {
+        val captured = Captured()
+        GlobalScope.launch {
+            suspendForever()
+            println(captured)
+        }
+        assertNoCapturedReference()
+    }
+
+    private suspend fun suspendForever() = suspendCancellableCoroutine<Unit> {  }
+
+    private fun assertNoCapturedReference() {
+        FieldWalker.assertReachableCount(0, DebugProbesImpl, rootStatics = true) { it is Captured }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-debug/test/DumpWithoutCreationStackTraceTest.kt b/kotlinx-coroutines-debug/test/DumpWithoutCreationStackTraceTest.kt
index 89782e4..6e405ca 100644
--- a/kotlinx-coroutines-debug/test/DumpWithoutCreationStackTraceTest.kt
+++ b/kotlinx-coroutines-debug/test/DumpWithoutCreationStackTraceTest.kt
@@ -21,13 +21,13 @@
         yield()
         verifyDump(
             "Coroutine \"coroutine#1\":BlockingCoroutine{Active}@70d1cb56, state: RUNNING\n" +
-                    "\tat java.lang.Thread.getStackTrace(Thread.java:1559)\n" +
-                    "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.enhanceStackTraceWithThreadDump(DebugProbesImpl.kt:188)\n" +
-                    "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutinesSynchronized(DebugProbesImpl.kt:153)\n" +
-                    "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutines(DebugProbesImpl.kt:141)",
+                    "\tat java.lang.Thread.getStackTrace(Thread.java)\n" +
+                    "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.enhanceStackTraceWithThreadDumpImpl(DebugProbesImpl.kt)\n" +
+                    "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutinesSynchronized(DebugProbesImpl.kt)\n" +
+                    "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutines(DebugProbesImpl.kt)",
 
             "Coroutine \"coroutine#2\":DeferredCoroutine{Active}@383fa309, state: SUSPENDED\n" +
-                    "\tat kotlinx.coroutines.debug.DumpWithoutCreationStackTraceTest\$createActiveDeferred\$1.invokeSuspend(DumpWithoutCreationStackTraceTest.kt:63)"
+                    "\tat kotlinx.coroutines.debug.DumpWithoutCreationStackTraceTest\$createActiveDeferred\$1.invokeSuspend(DumpWithoutCreationStackTraceTest.kt)"
         )
         deferred.cancelAndJoin()
     }
diff --git a/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt b/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt
index 4c13f5e..e7fdeed 100644
--- a/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt
+++ b/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt
@@ -150,16 +150,16 @@
     @Test
     fun testRunBlocking() = runBlocking {
         verifyDump("Coroutine \"coroutine#1\":BlockingCoroutine{Active}@4bcd176c, state: RUNNING\n" +
-                "\tat java.lang.Thread.getStackTrace(Thread.java:1552)\n" +
-                "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.enhanceStackTraceWithThreadDump(DebugProbesImpl.kt:147)\n" +
-                "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutinesSynchronized(DebugProbesImpl.kt:122)\n" +
-                "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutines(DebugProbesImpl.kt:109)\n" +
-                "\tat kotlinx.coroutines.debug.DebugProbes.dumpCoroutines(DebugProbes.kt:122)\n" +
-                "\tat kotlinx.coroutines.debug.StracktraceUtilsKt.verifyDump(StracktraceUtils.kt)\n" +
-                "\tat kotlinx.coroutines.debug.StracktraceUtilsKt.verifyDump\$default(StracktraceUtils.kt)\n" +
-                "\tat kotlinx.coroutines.debug.RunningThreadStackMergeTest\$testRunBlocking\$1.invokeSuspend(RunningThreadStackMergeTest.kt:112)\n" +
+                "\tat java.lang.Thread.getStackTrace(Thread.java)\n" +
+                "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.enhanceStackTraceWithThreadDumpImpl(DebugProbesImpl.kt)\n" +
+                "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutinesSynchronized(DebugProbesImpl.kt)\n" +
+                "\tat kotlinx.coroutines.debug.internal.DebugProbesImpl.dumpCoroutines(DebugProbesImpl.kt)\n" +
+                "\tat kotlinx.coroutines.debug.DebugProbes.dumpCoroutines(DebugProbes.kt)\n" +
+                "\tat kotlinx.coroutines.debug.StacktraceUtilsKt.verifyDump(StacktraceUtils.kt)\n" +
+                "\tat kotlinx.coroutines.debug.StacktraceUtilsKt.verifyDump\$default(StacktraceUtils.kt)\n" +
+                "\tat kotlinx.coroutines.debug.RunningThreadStackMergeTest\$testRunBlocking\$1.invokeSuspend(RunningThreadStackMergeTest.kt)\n" +
                 "\t(Coroutine creation stacktrace)\n" +
-                "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n")
+                "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt)\n")
     }
 
 
diff --git a/kotlinx-coroutines-debug/test/StracktraceUtils.kt b/kotlinx-coroutines-debug/test/StacktraceUtils.kt
similarity index 100%
rename from kotlinx-coroutines-debug/test/StracktraceUtils.kt
rename to kotlinx-coroutines-debug/test/StacktraceUtils.kt