Revert^2 "Add spin loop to mutex, overhaul monitor"

This reverts commit d56f7d1086b16f32c0771a41a4afb376b5fd3076.

Reason for revert: PS2 fixes the problems I identified.

PS1 is straight revert^2 of, and thus identical to aosp/1111800.

PS2 in addition:
- Reduces the size of the test, when not modified to run as benchmark.
- Disables the test in slow-running configurations, which timed out.
- Stops using the mutex recursion count, and instead reintroduces
  one at the monitor level. The plan is to eliminate the one in mutex
  in a future CL.
- Avoids modifying various monitor fields in suspended state.
  MonitorInfo, deflation, etc., may look at the state of a suspended
  thread and expect not to race. I don't think the original code had
  this completely correct either, but PS1 made it worse.
- Documents some aspects of the code that confused me at times.
- Avoids dereferencing the monitor owner Thread* unless it holds the
  thread list lock, and thus knows that the thread won't go away.
- Stores a Thread* instead of a thread_id with the monitor owner method
  and dex pc information used for debugging. This is required to avoid
  extra thread list lock acquisitions, since we need to dereference the
  owner to get the thread id.
- Makes the thread list Contains method public, again in order to
  support the above. (This ignores C/C++ restrictions on dangling
  pointer use. We already rely on violating those elsewhere, and
  the committees are trying to get their story straight about this.)
- Causes the spin loop to give up immediately if the process is
  shutting down. This gets us to an actual idle state sooner in that
  case, and should hopefully mitigate the shutdown issues somewhat.
  (We tried not spinnning in "suspended" state, but that reintroduced
  some performance issues.)
- Makes runtime shutdown code more defensive. Clear fields pointing to
  deallocated objects. Always wait for quiescence AFTER all threads
  are suspended.
- Consistently checks for a runtime that's shutting down or missing
  after waking up from a futex wait, thus avoiding touching deallocated
  memory. I believe this was the cause of b/121302864, which PS1
  managed to aggravate.
- SleepForever() was a very light sleeper, waking up once a second.
  Fix that, so the daemon threads we leak on runtime shutdown cost
  us less.
- Remove a data race from the "was the runtime deleted" logic.

Bug: 140590186
Bug: 121302864
Test: Build and boot AOSP.
Test: art/test/testrunner/testrunner.py --host -b -t 1932-monitor-events-misc
Test: art/test/testrunner/testrunner.py --host -b -t 132-daemon-locks-shutdown
Test: art/test/testrunner/testrunner.py --host -b -t 004-ThreadStress

Change-Id: I6667c61beed2ba68c84cd4c0821fb8e21e188bbc
diff --git a/libartbase/base/utils.cc b/libartbase/base/utils.cc
index 7d4dd59..19311b3 100644
--- a/libartbase/base/utils.cc
+++ b/libartbase/base/utils.cc
@@ -309,7 +309,7 @@
 
 void SleepForever() {
   while (true) {
-    usleep(1000000);
+    sleep(100000000);
   }
 }
 
diff --git a/runtime/base/mutex.cc b/runtime/base/mutex.cc
index 0d5ce15..0d1b162 100644
--- a/runtime/base/mutex.cc
+++ b/runtime/base/mutex.cc
@@ -61,6 +61,28 @@
 }
 #endif
 
+#if ART_USE_FUTEXES
+// If we wake up from a futex wake, and the runtime disappeared while we were asleep,
+// it's important to stop in our tracks before we touch deallocated memory.
+static inline void SleepIfRuntimeDeleted(Thread* self) {
+  if (self != nullptr) {
+    JNIEnvExt* const env = self->GetJniEnv();
+    if (UNLIKELY(env != nullptr && env->IsRuntimeDeleted())) {
+      DCHECK(self->IsDaemon());
+      // If the runtime has been deleted, then we cannot proceed. Just sleep forever. This may
+      // occur for user daemon threads that get a spurious wakeup. This occurs for test 132 with
+      // --host and --gdb.
+      // After we wake up, the runtime may have been shutdown, which means that this condition may
+      // have been deleted. It is not safe to retry the wait.
+      SleepForever();
+    }
+  }
+}
+#else
+// We should be doing this for pthreads to, but it seems to be impossible for something
+// like a condition variable wait. Thus we don't bother trying.
+#endif
+
 // Wait for an amount of time that roughly increases in the argument i.
 // Spin for small arguments and yield/sleep for longer ones.
 static void BackOff(uint32_t i) {
@@ -82,6 +104,34 @@
   }
 }
 
+// Wait until pred(testLoc->load(std::memory_order_relaxed)) holds, or until a
+// short time interval, on the order of kernel context-switch time, passes.
+// Return true if the predicate test succeeded, false if we timed out.
+template<typename Pred>
+static inline bool WaitBrieflyFor(AtomicInteger* testLoc, Thread* self, Pred pred) {
+  // TODO: Tune these parameters correctly. BackOff(3) should take on the order of 100 cycles. So
+  // this should result in retrying <= 10 times, usually waiting around 100 cycles each. The
+  // maximum delay should be significantly less than the expected futex() context switch time, so
+  // there should be little danger of this worsening things appreciably. If the lock was only
+  // held briefly by a running thread, this should help immensely.
+  static constexpr uint32_t kMaxBackOff = 3;  // Should probably be <= kSpinMax above.
+  static constexpr uint32_t kMaxIters = 50;
+  JNIEnvExt* const env = self == nullptr ? nullptr : self->GetJniEnv();
+  for (uint32_t i = 1; i <= kMaxIters; ++i) {
+    BackOff(std::min(i, kMaxBackOff));
+    if (pred(testLoc->load(std::memory_order_relaxed))) {
+      return true;
+    }
+    if (UNLIKELY(env != nullptr && env->IsRuntimeDeleted())) {
+      // This returns true once we've started shutting down. We then try to reach a quiescent
+      // state as soon as possible to avoid touching data that may be deallocated by the shutdown
+      // process. It currently relies on a timeout.
+      return false;
+    }
+  }
+  return false;
+}
+
 class ScopedAllMutexesLock final {
  public:
   explicit ScopedAllMutexesLock(const BaseMutex* mutex) : mutex_(mutex) {
@@ -381,24 +431,35 @@
       } else {
         // Failed to acquire, hang up.
         ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-        // Increment contender count. We can't create enough threads for this to overflow.
-        increment_contenders();
-        // Make cur_state again reflect the expected value of state_and_contenders.
-        cur_state += kContenderIncrement;
-        if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-          self->CheckEmptyCheckpointFromMutex();
-        }
-        if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state,
-                  nullptr, nullptr, 0) != 0) {
-          // We only went to sleep after incrementing and contenders and checking that the lock
-          // is still held by someone else.
-          // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
-          // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
-          if ((errno != EAGAIN) && (errno != EINTR)) {
-            PLOG(FATAL) << "futex wait failed for " << name_;
+        // Empirically, it appears important to spin again each time through the loop; if we
+        // bother to go to sleep and wake up, we should be fairly persistent in trying for the
+        // lock.
+        if (!WaitBrieflyFor(&state_and_contenders_, self,
+                            [](int32_t v) { return (v & kHeldMask) == 0; })) {
+          // Increment contender count. We can't create enough threads for this to overflow.
+          increment_contenders();
+          // Make cur_state again reflect the expected value of state_and_contenders.
+          cur_state += kContenderIncrement;
+          if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+            self->CheckEmptyCheckpointFromMutex();
           }
+          do {
+            if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state,
+                      nullptr, nullptr, 0) != 0) {
+              // We only went to sleep after incrementing and contenders and checking that the
+              // lock is still held by someone else.  EAGAIN and EINTR both indicate a spurious
+              // failure, try again from the beginning.  We don't use TEMP_FAILURE_RETRY so we can
+              // intentionally retry to acquire the lock.
+              if ((errno != EAGAIN) && (errno != EINTR)) {
+                PLOG(FATAL) << "futex wait failed for " << name_;
+              }
+            }
+            SleepIfRuntimeDeleted(self);
+            // Retry until not held. In heavy contention situations we otherwise get redundant
+            // futex wakeups as a result of repeatedly decrementing and incrementing contenders.
+          } while ((state_and_contenders_.load(std::memory_order_relaxed) & kHeldMask) != 0);
+          decrement_contenders();
         }
-        decrement_contenders();
       }
     } while (!done);
     // Confirm that lock is now held.
@@ -406,7 +467,8 @@
 #else
     CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
 #endif
-    DCHECK_EQ(GetExclusiveOwnerTid(), 0);
+    DCHECK_EQ(GetExclusiveOwnerTid(), 0) << " my tid = " << SafeGetTid(self)
+                                         << " recursive_ = " << recursive_;
     exclusive_owner_.store(SafeGetTid(self), std::memory_order_relaxed);
     RegisterAsLocked(self);
   }
@@ -459,6 +521,48 @@
   return true;
 }
 
+bool Mutex::ExclusiveTryLockWithSpinning(Thread* self) {
+  // Spin a small number of times, since this affects our ability to respond to suspension
+  // requests. We spin repeatedly only if the mutex repeatedly becomes available and unavailable
+  // in rapid succession, and then we will typically not spin for the maximal period.
+  const int kMaxSpins = 5;
+  for (int i = 0; i < kMaxSpins; ++i) {
+    if (ExclusiveTryLock(self)) {
+      return true;
+    }
+#if ART_USE_FUTEXES
+    if (!WaitBrieflyFor(&state_and_contenders_, self,
+            [](int32_t v) { return (v & kHeldMask) == 0; })) {
+      return false;
+    }
+#endif
+  }
+  return ExclusiveTryLock(self);
+}
+
+#if ART_USE_FUTEXES
+void Mutex::ExclusiveLockUncontendedFor(Thread* new_owner) {
+  DCHECK_EQ(level_, kMonitorLock);
+  DCHECK(!recursive_);
+  state_and_contenders_.store(kHeldMask, std::memory_order_relaxed);
+  recursion_count_ = 1;
+  exclusive_owner_.store(SafeGetTid(new_owner), std::memory_order_relaxed);
+  // Don't call RegisterAsLocked(). It wouldn't register anything anyway.  And
+  // this happens as we're inflating a monitor, which doesn't logically affect
+  // held "locks"; it effectively just converts a thin lock to a mutex.  By doing
+  // this while the lock is already held, we're delaying the acquisition of a
+  // logically held mutex, which can introduce bogus lock order violations.
+}
+
+void Mutex::ExclusiveUnlockUncontended() {
+  DCHECK_EQ(level_, kMonitorLock);
+  state_and_contenders_.store(0, std::memory_order_relaxed);
+  recursion_count_ = 0;
+  exclusive_owner_.store(0 /* pid */, std::memory_order_relaxed);
+  // Skip RegisterAsUnlocked(), which wouldn't do anything anyway.
+}
+#endif  // ART_USE_FUTEXES
+
 void Mutex::ExclusiveUnlock(Thread* self) {
   if (kIsDebugBuild && self != nullptr && self != Thread::Current()) {
     std::string name1 = "<null>";
@@ -589,18 +693,21 @@
     } else {
       // Failed to acquire, hang up.
       ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-      num_contenders_.fetch_add(1);
-      if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-        self->CheckEmptyCheckpointFromMutex();
-      }
-      if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
-        // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
-        // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
-        if ((errno != EAGAIN) && (errno != EINTR)) {
-          PLOG(FATAL) << "futex wait failed for " << name_;
+      if (!WaitBrieflyFor(&state_, self, [](int32_t v) { return v == 0; })) {
+        num_contenders_.fetch_add(1);
+        if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+          self->CheckEmptyCheckpointFromMutex();
         }
+        if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
+          // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
+          // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
+          if ((errno != EAGAIN) && (errno != EINTR)) {
+            PLOG(FATAL) << "futex wait failed for " << name_;
+          }
+        }
+        SleepIfRuntimeDeleted(self);
+        num_contenders_.fetch_sub(1);
       }
-      num_contenders_.fetch_sub(1);
     }
   } while (!done);
   DCHECK_EQ(state_.load(std::memory_order_relaxed), -1);
@@ -665,22 +772,25 @@
         return false;  // Timed out.
       }
       ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-      num_contenders_.fetch_add(1);
-      if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-        self->CheckEmptyCheckpointFromMutex();
-      }
-      if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, &rel_ts, nullptr, 0) != 0) {
-        if (errno == ETIMEDOUT) {
-          num_contenders_.fetch_sub(1);
-          return false;  // Timed out.
-        } else if ((errno != EAGAIN) && (errno != EINTR)) {
-          // EAGAIN and EINTR both indicate a spurious failure,
-          // recompute the relative time out from now and try again.
-          // We don't use TEMP_FAILURE_RETRY so we can recompute rel_ts;
-          PLOG(FATAL) << "timed futex wait failed for " << name_;
+      if (!WaitBrieflyFor(&state_, self, [](int32_t v) { return v == 0; })) {
+        num_contenders_.fetch_add(1);
+        if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+          self->CheckEmptyCheckpointFromMutex();
         }
+        if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, &rel_ts, nullptr, 0) != 0) {
+          if (errno == ETIMEDOUT) {
+            num_contenders_.fetch_sub(1);
+            return false;  // Timed out.
+          } else if ((errno != EAGAIN) && (errno != EINTR)) {
+            // EAGAIN and EINTR both indicate a spurious failure,
+            // recompute the relative time out from now and try again.
+            // We don't use TEMP_FAILURE_RETRY so we can recompute rel_ts;
+            PLOG(FATAL) << "timed futex wait failed for " << name_;
+          }
+        }
+        SleepIfRuntimeDeleted(self);
+        num_contenders_.fetch_sub(1);
       }
-      num_contenders_.fetch_sub(1);
     }
   } while (!done);
 #else
@@ -706,16 +816,19 @@
 void ReaderWriterMutex::HandleSharedLockContention(Thread* self, int32_t cur_state) {
   // Owner holds it exclusively, hang up.
   ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-  num_contenders_.fetch_add(1);
-  if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-    self->CheckEmptyCheckpointFromMutex();
-  }
-  if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
-    if (errno != EAGAIN && errno != EINTR) {
-      PLOG(FATAL) << "futex wait failed for " << name_;
+  if (!WaitBrieflyFor(&state_, self, [](int32_t v) { return v >= 0; })) {
+    num_contenders_.fetch_add(1);
+    if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+      self->CheckEmptyCheckpointFromMutex();
     }
+    if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
+      if (errno != EAGAIN && errno != EINTR) {
+        PLOG(FATAL) << "futex wait failed for " << name_;
+      }
+    }
+    SleepIfRuntimeDeleted(self);
+    num_contenders_.fetch_sub(1);
   }
-  num_contenders_.fetch_sub(1);
 }
 #endif
 
@@ -895,18 +1008,7 @@
       PLOG(FATAL) << "futex wait failed for " << name_;
     }
   }
-  if (self != nullptr) {
-    JNIEnvExt* const env = self->GetJniEnv();
-    if (UNLIKELY(env != nullptr && env->IsRuntimeDeleted())) {
-      CHECK(self->IsDaemon());
-      // If the runtime has been deleted, then we cannot proceed. Just sleep forever. This may
-      // occur for user daemon threads that get a spurious wakeup. This occurs for test 132 with
-      // --host and --gdb.
-      // After we wake up, the runtime may have been shutdown, which means that this condition may
-      // have been deleted. It is not safe to retry the wait.
-      SleepForever();
-    }
-  }
+  SleepIfRuntimeDeleted(self);
   guard_.ExclusiveLock(self);
   CHECK_GT(num_waiters_, 0);
   num_waiters_--;
@@ -948,6 +1050,7 @@
       PLOG(FATAL) << "timed futex wait failed for " << name_;
     }
   }
+  SleepIfRuntimeDeleted(self);
   guard_.ExclusiveLock(self);
   CHECK_GT(num_waiters_, 0);
   num_waiters_--;
diff --git a/runtime/base/mutex.h b/runtime/base/mutex.h
index 136e17a..33878e6 100644
--- a/runtime/base/mutex.h
+++ b/runtime/base/mutex.h
@@ -57,7 +57,7 @@
 constexpr bool kDebugLocking = kIsDebugBuild;
 
 // Record Log contention information, dumpable via SIGQUIT.
-#ifdef ART_USE_FUTEXES
+#if ART_USE_FUTEXES
 // To enable lock contention logging, set this to true.
 constexpr bool kLogLockContentions = false;
 // FUTEX_WAKE first argument:
@@ -102,7 +102,11 @@
 
   BaseMutex(const char* name, LockLevel level);
   virtual ~BaseMutex();
+
+  // Add this mutex to those owned by self, and perform appropriate checking.
+  // For this call only, self may also be another suspended thread.
   void RegisterAsLocked(Thread* self);
+
   void RegisterAsUnlocked(Thread* self);
   void CheckSafeToWait(Thread* self);
 
@@ -156,8 +160,14 @@
 // -------------------------------------------
 // Free      | Exclusive     | error
 // Exclusive | Block*        | Free
-// * Mutex is not reentrant and so an attempt to ExclusiveLock on the same thread will result in
-//   an error. Being non-reentrant simplifies Waiting on ConditionVariables.
+// * Mutex is not reentrant unless recursive is true. An attempt to ExclusiveLock on a
+// recursive=false Mutex on a thread already owning the Mutex results in an error.
+//
+// TODO(b/140590186): Remove support for recursive == true.
+//
+// Some mutexes, including those associated with Java monitors may be accessed (in particular
+// acquired) by a thread in suspended state. Suspending all threads does NOT prevent mutex state
+// from changing.
 std::ostream& operator<<(std::ostream& os, const Mutex& mu);
 class LOCKABLE Mutex : public BaseMutex {
  public:
@@ -173,6 +183,8 @@
   // Returns true if acquires exclusive access, false otherwise.
   bool ExclusiveTryLock(Thread* self) TRY_ACQUIRE(true);
   bool TryLock(Thread* self) TRY_ACQUIRE(true) { return ExclusiveTryLock(self); }
+  // Equivalent to ExclusiveTryLock, but retry for a short period before giving up.
+  bool ExclusiveTryLockWithSpinning(Thread* self) TRY_ACQUIRE(true);
 
   // Release exclusive access.
   void ExclusiveUnlock(Thread* self) RELEASE();
@@ -200,7 +212,9 @@
   // whether we hold the lock; any other information may be invalidated before we return.
   pid_t GetExclusiveOwnerTid() const;
 
-  // Returns how many times this Mutex has been locked, it is better to use AssertHeld/NotHeld.
+  // Returns how many times this Mutex has been locked, it is typically better to use
+  // AssertHeld/NotHeld. For a simply held mutex this method returns 1. Should only be called
+  // while holding the mutex or threads are suspended.
   unsigned int GetDepth() const {
     return recursion_count_;
   }
@@ -212,6 +226,18 @@
 
   void WakeupToRespondToEmptyCheckpoint() override;
 
+#if ART_USE_FUTEXES
+  // Acquire the mutex, possibly on behalf of another thread. Acquisition must be
+  // uncontended. New_owner must be current thread or suspended.
+  // Mutex must be at level kMonitorLock.
+  // Not implementable for the pthreads version, so we must avoid calling it there.
+  void ExclusiveLockUncontendedFor(Thread* new_owner);
+
+  // Undo the effect of the previous calling, setting the mutex back to unheld.
+  // Still assumes no concurrent access.
+  void ExclusiveUnlockUncontended();
+#endif  // ART_USE_FUTEXES
+
  private:
 #if ART_USE_FUTEXES
   // Low order bit: 0 is unheld, 1 is held.
diff --git a/runtime/gc/heap.cc b/runtime/gc/heap.cc
index 0601b8d..ee9e4a8 100644
--- a/runtime/gc/heap.cc
+++ b/runtime/gc/heap.cc
@@ -2307,6 +2307,12 @@
                                 std::memory_order_release);
 }
 
+#pragma clang diagnostic push
+#if !ART_USE_FUTEXES
+// Frame gets too large, perhaps due to Bionic pthread_mutex_lock size. We don't care.
+#  pragma clang diagnostic ignored "-Wframe-larger-than="
+#endif
+// This has a large frame, but shouldn't be run anywhere near the stack limit.
 void Heap::PreZygoteFork() {
   if (!HasZygoteSpace()) {
     // We still want to GC in case there is some unreachable non moving objects that could cause a
@@ -2467,6 +2473,7 @@
     AddRememberedSet(post_zygote_non_moving_space_rem_set);
   }
 }
+#pragma clang diagnostic pop
 
 void Heap::FlushAllocStack() {
   MarkAllocStackAsLive(allocation_stack_.get());
diff --git a/runtime/jni/jni_env_ext.cc b/runtime/jni/jni_env_ext.cc
index e2581dd..4a0a3a5 100644
--- a/runtime/jni/jni_env_ext.cc
+++ b/runtime/jni/jni_env_ext.cc
@@ -90,7 +90,7 @@
 
 void JNIEnvExt::SetFunctionsToRuntimeShutdownFunctions() {
   functions = GetRuntimeShutdownNativeInterface();
-  runtime_deleted_ = true;
+  runtime_deleted_.store(true, std::memory_order_relaxed);
 }
 
 JNIEnvExt::~JNIEnvExt() {
diff --git a/runtime/jni/jni_env_ext.h b/runtime/jni/jni_env_ext.h
index 7aae92f..067389e 100644
--- a/runtime/jni/jni_env_ext.h
+++ b/runtime/jni/jni_env_ext.h
@@ -110,7 +110,7 @@
   }
   JavaVMExt* GetVm() const { return vm_; }
 
-  bool IsRuntimeDeleted() const { return runtime_deleted_; }
+  bool IsRuntimeDeleted() const { return runtime_deleted_.load(std::memory_order_relaxed); }
   bool IsCheckJniEnabled() const { return check_jni_; }
 
 
@@ -206,7 +206,7 @@
   bool check_jni_;
 
   // If we are a JNI env for a daemon thread with a deleted runtime.
-  bool runtime_deleted_;
+  std::atomic<bool> runtime_deleted_;
 
   template<bool kEnableIndexIds> friend class JNI;
   friend class ScopedJniEnvLocalRefState;
diff --git a/runtime/lock_word.h b/runtime/lock_word.h
index ac7890c..30559a0 100644
--- a/runtime/lock_word.h
+++ b/runtime/lock_word.h
@@ -42,6 +42,7 @@
  *  |10|9|8|765432109876|5432109876543210|
  *  |00|m|r| lock count |thread id owner |
  *
+ * The lock count is zero, but the owner is nonzero for a simply held lock.
  * When the lock word is in the "fat" state and its bits are formatted as follows:
  *
  *  |33|2|2|2222222211111111110000000000|
@@ -72,7 +73,8 @@
     kMarkBitStateSize = 1,
     // Number of bits to encode the thin lock owner.
     kThinLockOwnerSize = 16,
-    // Remaining bits are the recursive lock count.
+    // Remaining bits are the recursive lock count. Zero means it is locked exactly once
+    // and not recursively.
     kThinLockCountSize = 32 - kThinLockOwnerSize - kStateSize - kReadBarrierStateSize -
         kMarkBitStateSize,
 
@@ -234,7 +236,8 @@
   // Return the owner thin lock thread id.
   uint32_t ThinLockOwner() const;
 
-  // Return the number of times a lock value has been locked.
+  // Return the number of times a lock value has been re-locked. Only valid in thin-locked state.
+  // If the lock is held only once the return value is zero.
   uint32_t ThinLockCount() const;
 
   // Return the Monitor encoded in a fat lock.
diff --git a/runtime/monitor-inl.h b/runtime/monitor-inl.h
index e8ffafa..f7e31a0 100644
--- a/runtime/monitor-inl.h
+++ b/runtime/monitor-inl.h
@@ -29,6 +29,57 @@
   return obj_.Read<kReadBarrierOption>();
 }
 
+// Check for request to set lock owner info.
+void Monitor::CheckLockOwnerRequest(Thread* self) {
+  DCHECK(self != nullptr);
+  Thread* request_thread = lock_owner_request_.load(std::memory_order_relaxed);
+  if (request_thread == self) {
+    SetLockingMethod(self);
+    // Only do this the first time after a request.
+    lock_owner_request_.store(nullptr, std::memory_order_relaxed);
+  }
+}
+
+uintptr_t Monitor::LockOwnerInfoChecksum(ArtMethod* m, uint32_t dex_pc, Thread* t) {
+  uintptr_t dpc_and_thread = static_cast<uintptr_t>(dex_pc << 8) ^ reinterpret_cast<uintptr_t>(t);
+  return reinterpret_cast<uintptr_t>(m) ^ dpc_and_thread
+      ^ (dpc_and_thread << (/* ptr_size / 2 */ (sizeof m) << 2));
+}
+
+void Monitor::SetLockOwnerInfo(ArtMethod* method, uint32_t dex_pc, Thread* t) {
+  lock_owner_method_.store(method, std::memory_order_relaxed);
+  lock_owner_dex_pc_.store(dex_pc, std::memory_order_relaxed);
+  lock_owner_.store(t, std::memory_order_relaxed);
+  uintptr_t sum = LockOwnerInfoChecksum(method, dex_pc, t);
+  lock_owner_sum_.store(sum, std::memory_order_relaxed);
+}
+
+void Monitor::GetLockOwnerInfo(/*out*/ArtMethod** method, /*out*/uint32_t* dex_pc,
+                               Thread* t) {
+  ArtMethod* owners_method;
+  uint32_t owners_dex_pc;
+  Thread* owner;
+  uintptr_t owners_sum;
+  DCHECK(t != nullptr);
+  do {
+    owner = lock_owner_.load(std::memory_order_relaxed);
+    if (owner == nullptr) {
+      break;
+    }
+    owners_method = lock_owner_method_.load(std::memory_order_relaxed);
+    owners_dex_pc = lock_owner_dex_pc_.load(std::memory_order_relaxed);
+    owners_sum = lock_owner_sum_.load(std::memory_order_relaxed);
+  } while (owners_sum != LockOwnerInfoChecksum(owners_method, owners_dex_pc, owner));
+  if (owner == t) {
+    *method = owners_method;
+    *dex_pc = owners_dex_pc;
+  } else {
+    *method = nullptr;
+    *dex_pc = 0;
+  }
+}
+
+
 }  // namespace art
 
 #endif  // ART_RUNTIME_MONITOR_INL_H_
diff --git a/runtime/monitor.cc b/runtime/monitor.cc
index 9d114ed..6b05e28 100644
--- a/runtime/monitor.cc
+++ b/runtime/monitor.cc
@@ -52,7 +52,8 @@
 /*
  * Every Object has a monitor associated with it, but not every Object is actually locked.  Even
  * the ones that are locked do not need a full-fledged monitor until a) there is actual contention
- * or b) wait() is called on the Object.
+ * or b) wait() is called on the Object, or (c) we need to lock an object that also has an
+ * identity hashcode.
  *
  * For Android, we have implemented a scheme similar to the one described in Bacon et al.'s
  * "Thin locks: featherweight synchronization for Java" (ACM 1998).  Things are even easier for us,
@@ -91,7 +92,6 @@
 
 Monitor::Monitor(Thread* self, Thread* owner, ObjPtr<mirror::Object> obj, int32_t hash_code)
     : monitor_lock_("a monitor lock", kMonitorLock),
-      monitor_contenders_("monitor contenders", monitor_lock_),
       num_waiters_(0),
       owner_(owner),
       lock_count_(0),
@@ -99,8 +99,8 @@
       wait_set_(nullptr),
       wake_set_(nullptr),
       hash_code_(hash_code),
-      locking_method_(nullptr),
-      locking_dex_pc_(0),
+      lock_owner_method_(nullptr),
+      lock_owner_dex_pc_(0),
       monitor_id_(MonitorPool::ComputeMonitorId(this, self)) {
 #ifdef __LP64__
   DCHECK(false) << "Should not be reached in 64b";
@@ -118,7 +118,6 @@
                  int32_t hash_code,
                  MonitorId id)
     : monitor_lock_("a monitor lock", kMonitorLock),
-      monitor_contenders_("monitor contenders", monitor_lock_),
       num_waiters_(0),
       owner_(owner),
       lock_count_(0),
@@ -126,8 +125,8 @@
       wait_set_(nullptr),
       wake_set_(nullptr),
       hash_code_(hash_code),
-      locking_method_(nullptr),
-      locking_dex_pc_(0),
+      lock_owner_(nullptr),
+      lock_owner_request_(nullptr),
       monitor_id_(id) {
 #ifdef __LP64__
   next_free_ = nullptr;
@@ -150,20 +149,106 @@
   return hc;
 }
 
-bool Monitor::Install(Thread* self) {
-  MutexLock mu(self, monitor_lock_);  // Uncontended mutex acquisition as monitor isn't yet public.
-  CHECK(owner_ == nullptr || owner_ == self || owner_->IsSuspended());
+void Monitor::SetLockingMethod(Thread* owner) {
+  DCHECK(owner == Thread::Current() || owner->IsSuspended());
+  // Do not abort on dex pc errors. This can easily happen when we want to dump a stack trace on
+  // abort.
+  ArtMethod* lock_owner_method;
+  uint32_t lock_owner_dex_pc;
+  lock_owner_method = owner->GetCurrentMethod(&lock_owner_dex_pc, false);
+  if (lock_owner_method != nullptr && UNLIKELY(lock_owner_method->IsProxyMethod())) {
+    // Grab another frame. Proxy methods are not helpful for lock profiling. This should be rare
+    // enough that it's OK to walk the stack twice.
+    struct NextMethodVisitor final : public StackVisitor {
+      explicit NextMethodVisitor(Thread* thread) REQUIRES_SHARED(Locks::mutator_lock_)
+          : StackVisitor(thread,
+                         nullptr,
+                         StackVisitor::StackWalkKind::kIncludeInlinedFrames,
+                         false),
+            count_(0),
+            method_(nullptr),
+            dex_pc_(0) {}
+      bool VisitFrame() override REQUIRES_SHARED(Locks::mutator_lock_) {
+        ArtMethod* m = GetMethod();
+        if (m->IsRuntimeMethod()) {
+          // Continue if this is a runtime method.
+          return true;
+        }
+        count_++;
+        if (count_ == 2u) {
+          method_ = m;
+          dex_pc_ = GetDexPc(false);
+          return false;
+        }
+        return true;
+      }
+      size_t count_;
+      ArtMethod* method_;
+      uint32_t dex_pc_;
+    };
+    NextMethodVisitor nmv(owner_.load(std::memory_order_relaxed));
+    nmv.WalkStack();
+    lock_owner_method = nmv.method_;
+    lock_owner_dex_pc = nmv.dex_pc_;
+  }
+  SetLockOwnerInfo(lock_owner_method, lock_owner_dex_pc, owner);
+  DCHECK(lock_owner_method == nullptr || !lock_owner_method->IsProxyMethod());
+}
+
+void Monitor::SetLockingMethodNoProxy(Thread *owner) {
+  DCHECK(owner == Thread::Current());
+  uint32_t lock_owner_dex_pc;
+  ArtMethod* lock_owner_method = owner->GetCurrentMethod(&lock_owner_dex_pc);
+  // We don't expect a proxy method here.
+  DCHECK(lock_owner_method == nullptr || !lock_owner_method->IsProxyMethod());
+  SetLockOwnerInfo(lock_owner_method, lock_owner_dex_pc, owner);
+}
+
+bool Monitor::Install(Thread* self) NO_THREAD_SAFETY_ANALYSIS {
+  // This may or may not result in acquiring monitor_lock_. Its behavior is much more complicated
+  // than what clang thread safety analysis understands.
+  // Monitor is not yet public.
+  Thread* owner = owner_.load(std::memory_order_relaxed);
+  CHECK(owner == nullptr || owner == self || (ART_USE_FUTEXES && owner->IsSuspended()));
   // Propagate the lock state.
   LockWord lw(GetObject()->GetLockWord(false));
   switch (lw.GetState()) {
     case LockWord::kThinLocked: {
-      CHECK_EQ(owner_->GetThreadId(), lw.ThinLockOwner());
+      DCHECK(owner != nullptr);
+      CHECK_EQ(owner->GetThreadId(), lw.ThinLockOwner());
+      DCHECK_EQ(monitor_lock_.GetExclusiveOwnerTid(), 0) << " my tid = " << SafeGetTid(self);
       lock_count_ = lw.ThinLockCount();
-      break;
+#if ART_USE_FUTEXES
+      monitor_lock_.ExclusiveLockUncontendedFor(owner);
+#else
+      monitor_lock_.ExclusiveLock(owner);
+#endif
+      DCHECK_EQ(monitor_lock_.GetExclusiveOwnerTid(), owner->GetTid())
+          << " my tid = " << SafeGetTid(self);
+      LockWord fat(this, lw.GCState());
+      // Publish the updated lock word, which may race with other threads.
+      bool success = GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
+      if (success) {
+        if (ATraceEnabled()) {
+          SetLockingMethod(owner);
+        }
+        return true;
+      } else {
+#if ART_USE_FUTEXES
+        monitor_lock_.ExclusiveUnlockUncontended();
+#else
+        for (uint32_t i = 0; i <= lockCount; ++i) {
+          monitor_lock_.ExclusiveUnlock(owner);
+        }
+#endif
+        return false;
+      }
     }
     case LockWord::kHashCode: {
       CHECK_EQ(hash_code_.load(std::memory_order_relaxed), static_cast<int32_t>(lw.GetHashCode()));
-      break;
+      DCHECK_EQ(monitor_lock_.GetExclusiveOwnerTid(), 0) << " my tid = " << SafeGetTid(self);
+      LockWord fat(this, lw.GCState());
+      return GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
     }
     case LockWord::kFatLocked: {
       // The owner_ is suspended but another thread beat us to install a monitor.
@@ -178,52 +263,6 @@
       UNREACHABLE();
     }
   }
-  LockWord fat(this, lw.GCState());
-  // Publish the updated lock word, which may race with other threads.
-  bool success = GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
-  // Lock profiling.
-  if (success && owner_ != nullptr && lock_profiling_threshold_ != 0) {
-    // Do not abort on dex pc errors. This can easily happen when we want to dump a stack trace on
-    // abort.
-    locking_method_ = owner_->GetCurrentMethod(&locking_dex_pc_, false);
-    if (locking_method_ != nullptr && UNLIKELY(locking_method_->IsProxyMethod())) {
-      // Grab another frame. Proxy methods are not helpful for lock profiling. This should be rare
-      // enough that it's OK to walk the stack twice.
-      struct NextMethodVisitor final : public StackVisitor {
-        explicit NextMethodVisitor(Thread* thread) REQUIRES_SHARED(Locks::mutator_lock_)
-            : StackVisitor(thread,
-                           nullptr,
-                           StackVisitor::StackWalkKind::kIncludeInlinedFrames,
-                           false),
-              count_(0),
-              method_(nullptr),
-              dex_pc_(0) {}
-        bool VisitFrame() override REQUIRES_SHARED(Locks::mutator_lock_) {
-          ArtMethod* m = GetMethod();
-          if (m->IsRuntimeMethod()) {
-            // Continue if this is a runtime method.
-            return true;
-          }
-          count_++;
-          if (count_ == 2u) {
-            method_ = m;
-            dex_pc_ = GetDexPc(false);
-            return false;
-          }
-          return true;
-        }
-        size_t count_;
-        ArtMethod* method_;
-        uint32_t dex_pc_;
-      };
-      NextMethodVisitor nmv(owner_);
-      nmv.WalkStack();
-      locking_method_ = nmv.method_;
-      locking_dex_pc_ = nmv.dex_pc_;
-    }
-    DCHECK(locking_method_ == nullptr || !locking_method_->IsProxyMethod());
-  }
-  return success;
 }
 
 Monitor::~Monitor() {
@@ -371,226 +410,222 @@
   return oss.str();
 }
 
-bool Monitor::TryLockLocked(Thread* self) {
-  if (owner_ == nullptr) {  // Unowned.
-    owner_ = self;
-    CHECK_EQ(lock_count_, 0);
-    // When debugging, save the current monitor holder for future
-    // acquisition failures to use in sampled logging.
-    if (lock_profiling_threshold_ != 0) {
-      locking_method_ = self->GetCurrentMethod(&locking_dex_pc_);
-      // We don't expect a proxy method here.
-      DCHECK(locking_method_ == nullptr || !locking_method_->IsProxyMethod());
-    }
-  } else if (owner_ == self) {  // Recursive.
+bool Monitor::TryLock(Thread* self, bool spin) {
+  Thread *owner = owner_.load(std::memory_order_relaxed);
+  if (owner == self) {
     lock_count_++;
+    CHECK_NE(lock_count_, 0u);  // Abort on overflow.
   } else {
-    return false;
+    bool success = spin ? monitor_lock_.ExclusiveTryLockWithSpinning(self)
+        : monitor_lock_.ExclusiveTryLock(self);
+    if (!success) {
+      return false;
+    }
+    DCHECK(owner_.load(std::memory_order_relaxed) == nullptr);
+    owner_.store(self, std::memory_order_relaxed);
+    CHECK_EQ(lock_count_, 0u);
+    if (ATraceEnabled()) {
+      SetLockingMethodNoProxy(self);
+    }
   }
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   AtraceMonitorLock(self, GetObject(), /* is_wait= */ false);
   return true;
 }
 
-bool Monitor::TryLock(Thread* self) {
-  MutexLock mu(self, monitor_lock_);
-  return TryLockLocked(self);
-}
-
-// Asserts that a mutex isn't held when the class comes into and out of scope.
-class ScopedAssertNotHeld {
- public:
-  ScopedAssertNotHeld(Thread* self, Mutex& mu) : self_(self), mu_(mu) {
-    mu_.AssertNotHeld(self_);
-  }
-
-  ~ScopedAssertNotHeld() {
-    mu_.AssertNotHeld(self_);
-  }
-
- private:
-  Thread* const self_;
-  Mutex& mu_;
-  DISALLOW_COPY_AND_ASSIGN(ScopedAssertNotHeld);
-};
-
 template <LockReason reason>
 void Monitor::Lock(Thread* self) {
-  ScopedAssertNotHeld sanh(self, monitor_lock_);
   bool called_monitors_callback = false;
-  monitor_lock_.Lock(self);
-  while (true) {
-    if (TryLockLocked(self)) {
-      break;
+  if (TryLock(self, /*spin=*/ true)) {
+    // TODO: This preserves original behavior. Correct?
+    if (called_monitors_callback) {
+      CHECK(reason == LockReason::kForLock);
+      Runtime::Current()->GetRuntimeCallbacks()->MonitorContendedLocked(this);
     }
-    // Contended.
-    const bool log_contention = (lock_profiling_threshold_ != 0);
-    uint64_t wait_start_ms = log_contention ? MilliTime() : 0;
-    ArtMethod* owners_method = locking_method_;
-    uint32_t owners_dex_pc = locking_dex_pc_;
-    // Do this before releasing the lock so that we don't get deflated.
-    size_t num_waiters = num_waiters_;
-    ++num_waiters_;
-
-    // If systrace logging is enabled, first look at the lock owner. Acquiring the monitor's
-    // lock and then re-acquiring the mutator lock can deadlock.
-    bool started_trace = false;
-    if (ATraceEnabled()) {
-      if (owner_ != nullptr) {  // Did the owner_ give the lock up?
-        std::ostringstream oss;
-        std::string name;
-        owner_->GetThreadName(name);
-        oss << PrettyContentionInfo(name,
-                                    owner_->GetTid(),
-                                    owners_method,
-                                    owners_dex_pc,
-                                    num_waiters);
-        // Add info for contending thread.
-        uint32_t pc;
-        ArtMethod* m = self->GetCurrentMethod(&pc);
-        const char* filename;
-        int32_t line_number;
-        TranslateLocation(m, pc, &filename, &line_number);
-        oss << " blocking from "
-            << ArtMethod::PrettyMethod(m) << "(" << (filename != nullptr ? filename : "null")
-            << ":" << line_number << ")";
-        ATraceBegin(oss.str().c_str());
-        started_trace = true;
-      }
-    }
-
-    monitor_lock_.Unlock(self);  // Let go of locks in order.
-    // Call the contended locking cb once and only once. Also only call it if we are locking for
-    // the first time, not during a Wait wakeup.
-    if (reason == LockReason::kForLock && !called_monitors_callback) {
-      called_monitors_callback = true;
-      Runtime::Current()->GetRuntimeCallbacks()->MonitorContendedLocking(this);
-    }
-    self->SetMonitorEnterObject(GetObject().Ptr());
-    {
-      ScopedThreadSuspension tsc(self, kBlocked);  // Change to blocked and give up mutator_lock_.
-      uint32_t original_owner_thread_id = 0u;
-      {
-        // Reacquire monitor_lock_ without mutator_lock_ for Wait.
-        MutexLock mu2(self, monitor_lock_);
-        if (owner_ != nullptr) {  // Did the owner_ give the lock up?
-          original_owner_thread_id = owner_->GetThreadId();
-          monitor_contenders_.Wait(self);  // Still contended so wait.
-        }
-      }
-      if (original_owner_thread_id != 0u) {
-        // Woken from contention.
-        if (log_contention) {
-          uint64_t wait_ms = MilliTime() - wait_start_ms;
-          uint32_t sample_percent;
-          if (wait_ms >= lock_profiling_threshold_) {
-            sample_percent = 100;
-          } else {
-            sample_percent = 100 * wait_ms / lock_profiling_threshold_;
-          }
-          if (sample_percent != 0 && (static_cast<uint32_t>(rand() % 100) < sample_percent)) {
-            // Reacquire mutator_lock_ for logging.
-            ScopedObjectAccess soa(self);
-
-            bool owner_alive = false;
-            pid_t original_owner_tid = 0;
-            std::string original_owner_name;
-
-            const bool should_dump_stacks = stack_dump_lock_profiling_threshold_ > 0 &&
-                wait_ms > stack_dump_lock_profiling_threshold_;
-            std::string owner_stack_dump;
-
-            // Acquire thread-list lock to find thread and keep it from dying until we've got all
-            // the info we need.
-            {
-              Locks::thread_list_lock_->ExclusiveLock(Thread::Current());
-
-              // Re-find the owner in case the thread got killed.
-              Thread* original_owner = Runtime::Current()->GetThreadList()->FindThreadByThreadId(
-                  original_owner_thread_id);
-
-              if (original_owner != nullptr) {
-                owner_alive = true;
-                original_owner_tid = original_owner->GetTid();
-                original_owner->GetThreadName(original_owner_name);
-
-                if (should_dump_stacks) {
-                  // Very long contention. Dump stacks.
-                  struct CollectStackTrace : public Closure {
-                    void Run(art::Thread* thread) override
-                        REQUIRES_SHARED(art::Locks::mutator_lock_) {
-                      thread->DumpJavaStack(oss);
-                    }
-
-                    std::ostringstream oss;
-                  };
-                  CollectStackTrace owner_trace;
-                  // RequestSynchronousCheckpoint releases the thread_list_lock_ as a part of its
-                  // execution.
-                  original_owner->RequestSynchronousCheckpoint(&owner_trace);
-                  owner_stack_dump = owner_trace.oss.str();
-                } else {
-                  Locks::thread_list_lock_->ExclusiveUnlock(Thread::Current());
-                }
-              } else {
-                Locks::thread_list_lock_->ExclusiveUnlock(Thread::Current());
-              }
-              // This is all the data we need. Now drop the thread-list lock, it's OK for the
-              // owner to go away now.
-            }
-
-            // If we found the owner (and thus have owner data), go and log now.
-            if (owner_alive) {
-              // Give the detailed traces for really long contention.
-              if (should_dump_stacks) {
-                // This must be here (and not above) because we cannot hold the thread-list lock
-                // while running the checkpoint.
-                std::ostringstream self_trace_oss;
-                self->DumpJavaStack(self_trace_oss);
-
-                uint32_t pc;
-                ArtMethod* m = self->GetCurrentMethod(&pc);
-
-                LOG(WARNING) << "Long "
-                    << PrettyContentionInfo(original_owner_name,
-                                            original_owner_tid,
-                                            owners_method,
-                                            owners_dex_pc,
-                                            num_waiters)
-                    << " in " << ArtMethod::PrettyMethod(m) << " for "
-                    << PrettyDuration(MsToNs(wait_ms)) << "\n"
-                    << "Current owner stack:\n" << owner_stack_dump
-                    << "Contender stack:\n" << self_trace_oss.str();
-              } else if (wait_ms > kLongWaitMs && owners_method != nullptr) {
-                uint32_t pc;
-                ArtMethod* m = self->GetCurrentMethod(&pc);
-                // TODO: We should maybe check that original_owner is still a live thread.
-                LOG(WARNING) << "Long "
-                    << PrettyContentionInfo(original_owner_name,
-                                            original_owner_tid,
-                                            owners_method,
-                                            owners_dex_pc,
-                                            num_waiters)
-                    << " in " << ArtMethod::PrettyMethod(m) << " for "
-                    << PrettyDuration(MsToNs(wait_ms));
-              }
-              LogContentionEvent(self,
-                                wait_ms,
-                                sample_percent,
-                                owners_method,
-                                owners_dex_pc);
-            }
-          }
-        }
-      }
-    }
-    if (started_trace) {
-      ATraceEnd();
-    }
-    self->SetMonitorEnterObject(nullptr);
-    monitor_lock_.Lock(self);  // Reacquire locks in order.
-    --num_waiters_;
+    return;
   }
-  monitor_lock_.Unlock(self);
+  // Contended; not reentrant. We hold no locks, so tread carefully.
+  const bool log_contention = (lock_profiling_threshold_ != 0);
+  uint64_t wait_start_ms = log_contention ? MilliTime() : 0;
+
+  Thread *orig_owner = nullptr;
+  ArtMethod* owners_method;
+  uint32_t owners_dex_pc;
+
+  // Do this before releasing the mutator lock so that we don't get deflated.
+  size_t num_waiters = num_waiters_.fetch_add(1, std::memory_order_relaxed);
+
+  bool started_trace = false;
+  if (ATraceEnabled() && owner_.load(std::memory_order_relaxed) != nullptr) {
+    // Acquiring thread_list_lock_ ensures that owner doesn't disappear while
+    // we're looking at it.
+    Locks::thread_list_lock_->ExclusiveLock(self);
+    orig_owner = owner_.load(std::memory_order_relaxed);
+    if (orig_owner != nullptr) {  // Did the owner_ give the lock up?
+      const uint32_t orig_owner_thread_id = orig_owner->GetThreadId();
+      GetLockOwnerInfo(&owners_method, &owners_dex_pc, orig_owner);
+      std::ostringstream oss;
+      std::string name;
+      orig_owner->GetThreadName(name);
+      oss << PrettyContentionInfo(name,
+                                  orig_owner_thread_id,
+                                  owners_method,
+                                  owners_dex_pc,
+                                  num_waiters);
+      Locks::thread_list_lock_->ExclusiveUnlock(self);
+      // Add info for contending thread.
+      uint32_t pc;
+      ArtMethod* m = self->GetCurrentMethod(&pc);
+      const char* filename;
+      int32_t line_number;
+      TranslateLocation(m, pc, &filename, &line_number);
+      oss << " blocking from "
+          << ArtMethod::PrettyMethod(m) << "(" << (filename != nullptr ? filename : "null")
+          << ":" << line_number << ")";
+      ATraceBegin(oss.str().c_str());
+      started_trace = true;
+    } else {
+      Locks::thread_list_lock_->ExclusiveUnlock(self);
+    }
+  }
+  if (log_contention) {
+    // Request the current holder to set lock_owner_info.
+    // Do this even if tracing is enabled, so we semi-consistently get the information
+    // corresponding to MonitorExit.
+    // TODO: Consider optionally obtaining a stack trace here via a checkpoint.  That would allow
+    // us to see what the other thread is doing while we're waiting.
+    orig_owner = owner_.load(std::memory_order_relaxed);
+    lock_owner_request_.store(orig_owner, std::memory_order_relaxed);
+  }
+  // Call the contended locking cb once and only once. Also only call it if we are locking for
+  // the first time, not during a Wait wakeup.
+  if (reason == LockReason::kForLock && !called_monitors_callback) {
+    called_monitors_callback = true;
+    Runtime::Current()->GetRuntimeCallbacks()->MonitorContendedLocking(this);
+  }
+  self->SetMonitorEnterObject(GetObject().Ptr());
+  {
+    ScopedThreadSuspension tsc(self, kBlocked);  // Change to blocked and give up mutator_lock_.
+
+    // Acquire monitor_lock_ without mutator_lock_, expecting to block this time.
+    // We already tried spinning above. The shutdown procedure currently assumes we stop
+    // touching monitors shortly after we suspend, so don't spin again here.
+    monitor_lock_.ExclusiveLock(self);
+
+    if (log_contention && orig_owner != nullptr) {
+      // Woken from contention.
+      uint64_t wait_ms = MilliTime() - wait_start_ms;
+      uint32_t sample_percent;
+      if (wait_ms >= lock_profiling_threshold_) {
+        sample_percent = 100;
+      } else {
+        sample_percent = 100 * wait_ms / lock_profiling_threshold_;
+      }
+      if (sample_percent != 0 && (static_cast<uint32_t>(rand() % 100) < sample_percent)) {
+        // Do this unconditionally for consistency. It's possible another thread
+        // snuck in in the middle, and tracing was enabled. In that case, we may get its
+        // MonitorEnter information. We can live with that.
+        GetLockOwnerInfo(&owners_method, &owners_dex_pc, orig_owner);
+
+        // Reacquire mutator_lock_ for logging.
+        ScopedObjectAccess soa(self);
+
+        const bool should_dump_stacks = stack_dump_lock_profiling_threshold_ > 0 &&
+            wait_ms > stack_dump_lock_profiling_threshold_;
+
+        // Acquire thread-list lock to find thread and keep it from dying until we've got all
+        // the info we need.
+        Locks::thread_list_lock_->ExclusiveLock(self);
+
+        // Is there still a thread at the same address as the original owner?
+        // We tolerate the fact that it may occasionally be the wrong one.
+        if (Runtime::Current()->GetThreadList()->Contains(orig_owner)) {
+          uint32_t original_owner_tid = orig_owner->GetTid();  // System thread id.
+          std::string original_owner_name;
+          orig_owner->GetThreadName(original_owner_name);
+          std::string owner_stack_dump;
+
+          if (should_dump_stacks) {
+            // Very long contention. Dump stacks.
+            struct CollectStackTrace : public Closure {
+              void Run(art::Thread* thread) override
+                  REQUIRES_SHARED(art::Locks::mutator_lock_) {
+                thread->DumpJavaStack(oss);
+              }
+
+              std::ostringstream oss;
+            };
+            CollectStackTrace owner_trace;
+            // RequestSynchronousCheckpoint releases the thread_list_lock_ as a part of its
+            // execution.
+            orig_owner->RequestSynchronousCheckpoint(&owner_trace);
+            owner_stack_dump = owner_trace.oss.str();
+          } else {
+            Locks::thread_list_lock_->ExclusiveUnlock(self);
+          }
+
+          // This is all the data we need. We dropped the thread-list lock, it's OK for the
+          // owner to go away now.
+
+          if (should_dump_stacks) {
+            // Give the detailed traces for really long contention.
+            // This must be here (and not above) because we cannot hold the thread-list lock
+            // while running the checkpoint.
+            std::ostringstream self_trace_oss;
+            self->DumpJavaStack(self_trace_oss);
+
+            uint32_t pc;
+            ArtMethod* m = self->GetCurrentMethod(&pc);
+
+            LOG(WARNING) << "Long "
+                << PrettyContentionInfo(original_owner_name,
+                                        original_owner_tid,
+                                        owners_method,
+                                        owners_dex_pc,
+                                        num_waiters)
+                << " in " << ArtMethod::PrettyMethod(m) << " for "
+                << PrettyDuration(MsToNs(wait_ms)) << "\n"
+                << "Current owner stack:\n" << owner_stack_dump
+                << "Contender stack:\n" << self_trace_oss.str();
+          } else if (wait_ms > kLongWaitMs && owners_method != nullptr) {
+            uint32_t pc;
+            ArtMethod* m = self->GetCurrentMethod(&pc);
+            // TODO: We should maybe check that original_owner is still a live thread.
+            LOG(WARNING) << "Long "
+                << PrettyContentionInfo(original_owner_name,
+                                        original_owner_tid,
+                                        owners_method,
+                                        owners_dex_pc,
+                                        num_waiters)
+                << " in " << ArtMethod::PrettyMethod(m) << " for "
+                << PrettyDuration(MsToNs(wait_ms));
+          }
+          LogContentionEvent(self,
+                            wait_ms,
+                            sample_percent,
+                            owners_method,
+                            owners_dex_pc);
+        } else {
+          Locks::thread_list_lock_->ExclusiveUnlock(self);
+        }
+      }
+    }
+  }
+  // We've successfully acquired monitor_lock_, released thread_list_lock, and are runnable.
+
+  // We avoided touching monitor fields while suspended, so set owner_ here.
+  owner_.store(self, std::memory_order_relaxed);
+  DCHECK_EQ(lock_count_, 0u);
+
+  if (ATraceEnabled()) {
+    SetLockingMethodNoProxy(self);
+  }
+  if (started_trace) {
+    ATraceEnd();
+  }
+  self->SetMonitorEnterObject(nullptr);
+  num_waiters_.fetch_sub(1, std::memory_order_relaxed);
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   // We need to pair this with a single contended locking call. NB we match the RI behavior and call
   // this even if MonitorEnter failed.
   if (called_monitors_callback) {
@@ -634,7 +669,6 @@
                            uint32_t expected_owner_thread_id,
                            uint32_t found_owner_thread_id,
                            Monitor* monitor) {
-  // Acquire thread list lock so threads won't disappear from under us.
   std::string current_owner_string;
   std::string expected_owner_string;
   std::string found_owner_string;
@@ -700,39 +734,44 @@
 
 bool Monitor::Unlock(Thread* self) {
   DCHECK(self != nullptr);
-  uint32_t owner_thread_id = 0u;
-  DCHECK(!monitor_lock_.IsExclusiveHeld(self));
-  monitor_lock_.Lock(self);
-  Thread* owner = owner_;
-  if (owner != nullptr) {
-    owner_thread_id = owner->GetThreadId();
-  }
+  Thread* owner = owner_.load(std::memory_order_relaxed);
   if (owner == self) {
     // We own the monitor, so nobody else can be in here.
+    CheckLockOwnerRequest(self);
     AtraceMonitorUnlock();
     if (lock_count_ == 0) {
-      owner_ = nullptr;
-      locking_method_ = nullptr;
-      locking_dex_pc_ = 0;
-      SignalContendersAndReleaseMonitorLock(self);
-      return true;
+      owner_.store(nullptr, std::memory_order_relaxed);
+      SignalWaiterAndReleaseMonitorLock(self);
     } else {
       --lock_count_;
-      monitor_lock_.Unlock(self);
-      return true;
+      DCHECK(monitor_lock_.IsExclusiveHeld(self));
+      DCHECK_EQ(owner_.load(std::memory_order_relaxed), self);
+      // Keep monitor_lock_, but pretend we released it.
+      FakeUnlockMonitorLock();
     }
+    return true;
   }
   // We don't own this, so we're not allowed to unlock it.
   // The JNI spec says that we should throw IllegalMonitorStateException in this case.
+  uint32_t owner_thread_id = 0u;
+  {
+    MutexLock mu(self, *Locks::thread_list_lock_);
+    owner = owner_.load(std::memory_order_relaxed);
+    if (owner != nullptr) {
+      owner_thread_id = owner->GetThreadId();
+    }
+  }
   FailedUnlock(GetObject(), self->GetThreadId(), owner_thread_id, this);
-  monitor_lock_.Unlock(self);
+  // Pretend to release monitor_lock_, which we should not.
+  FakeUnlockMonitorLock();
   return false;
 }
 
-void Monitor::SignalContendersAndReleaseMonitorLock(Thread* self) {
-  // We want to signal one thread to wake up, to acquire the monitor that
-  // we are releasing. This could either be a Thread waiting on its own
-  // ConditionVariable, or a thread waiting on monitor_contenders_.
+void Monitor::SignalWaiterAndReleaseMonitorLock(Thread* self) {
+  // We want to release the monitor and signal up to one thread that was waiting
+  // but has since been notified.
+  DCHECK_EQ(lock_count_, 0u);
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   while (wake_set_ != nullptr) {
     // No risk of waking ourselves here; since monitor_lock_ is not released until we're ready to
     // return, notify can't move the current thread from wait_set_ to wake_set_ until this
@@ -740,6 +779,7 @@
     Thread* thread = wake_set_;
     wake_set_ = thread->GetWaitNext();
     thread->SetWaitNext(nullptr);
+    DCHECK(owner_.load(std::memory_order_relaxed) == nullptr);
 
     // Check to see if the thread is still waiting.
     {
@@ -764,16 +804,14 @@
         // Release the lock, so that a potentially awakened thread will not
         // immediately contend on it. The lock ordering here is:
         // monitor_lock_, self->GetWaitMutex, thread->GetWaitMutex
-        monitor_lock_.Unlock(self);
+        monitor_lock_.Unlock(self);  // Releases contenders.
         thread->GetWaitConditionVariable()->Signal(self);
         return;
       }
     }
   }
-  // If we didn't wake any threads that were originally waiting on us,
-  // wake a contender.
-  monitor_contenders_.Signal(self);
   monitor_lock_.Unlock(self);
+  DCHECK(!monitor_lock_.IsExclusiveHeld(self));
 }
 
 void Monitor::Wait(Thread* self, int64_t ms, int32_t ns,
@@ -781,11 +819,8 @@
   DCHECK(self != nullptr);
   DCHECK(why == kTimedWaiting || why == kWaiting || why == kSleeping);
 
-  monitor_lock_.Lock(self);
-
   // Make sure that we hold the lock.
-  if (owner_ != self) {
-    monitor_lock_.Unlock(self);
+  if (owner_.load(std::memory_order_relaxed) != self) {
     ThrowIllegalMonitorStateExceptionF("object not locked by thread before wait()");
     return;
   }
@@ -798,23 +833,19 @@
 
   // Enforce the timeout range.
   if (ms < 0 || ns < 0 || ns > 999999) {
-    monitor_lock_.Unlock(self);
     self->ThrowNewExceptionF("Ljava/lang/IllegalArgumentException;",
                              "timeout arguments out of range: ms=%" PRId64 " ns=%d", ms, ns);
     return;
   }
 
+  CheckLockOwnerRequest(self);
+
   /*
    * Release our hold - we need to let it go even if we're a few levels
    * deep in a recursive lock, and we need to restore that later.
    */
-  int prev_lock_count = lock_count_;
+  unsigned int prev_lock_count = lock_count_;
   lock_count_ = 0;
-  owner_ = nullptr;
-  ArtMethod* saved_method = locking_method_;
-  locking_method_ = nullptr;
-  uintptr_t saved_dex_pc = locking_dex_pc_;
-  locking_dex_pc_ = 0;
 
   AtraceMonitorUnlock();  // For the implict Unlock() just above. This will only end the deepest
                           // nesting, but that is enough for the visualization, and corresponds to
@@ -823,6 +854,9 @@
 
   bool was_interrupted = false;
   bool timed_out = false;
+  // Update monitor state now; it's not safe once we're "suspended".
+  owner_.store(nullptr, std::memory_order_relaxed);
+  num_waiters_.fetch_add(1, std::memory_order_relaxed);
   {
     // Update thread state. If the GC wakes up, it'll ignore us, knowing
     // that we won't touch any references in this state, and we'll check
@@ -840,8 +874,6 @@
      * until we've signalled contenders on this monitor.
      */
     AppendToWaitSet(self);
-    ++num_waiters_;
-
 
     // Set wait_monitor_ to the monitor object we will be waiting on. When wait_monitor_ is
     // non-null a notifying or interrupting thread must signal the thread's wait_cond_ to wake it
@@ -850,7 +882,8 @@
     self->SetWaitMonitor(this);
 
     // Release the monitor lock.
-    SignalContendersAndReleaseMonitorLock(self);
+    DCHECK(monitor_lock_.IsExclusiveHeld(self));
+    SignalWaiterAndReleaseMonitorLock(self);
 
     // Handle the case where the thread was interrupted before we called wait().
     if (self->IsInterrupted()) {
@@ -899,30 +932,18 @@
 
   // Re-acquire the monitor and lock.
   Lock<LockReason::kForWait>(self);
-  monitor_lock_.Lock(self);
+  lock_count_ = prev_lock_count;
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   self->GetWaitMutex()->AssertNotHeld(self);
 
-  /*
-   * We remove our thread from wait set after restoring the count
-   * and owner fields so the subroutine can check that the calling
-   * thread owns the monitor. Aside from that, the order of member
-   * updates is not order sensitive as we hold the pthread mutex.
-   */
-  owner_ = self;
-  lock_count_ = prev_lock_count;
-  locking_method_ = saved_method;
-  locking_dex_pc_ = saved_dex_pc;
-  --num_waiters_;
+  num_waiters_.fetch_sub(1, std::memory_order_relaxed);
   RemoveFromWaitSet(self);
-
-  monitor_lock_.Unlock(self);
 }
 
 void Monitor::Notify(Thread* self) {
   DCHECK(self != nullptr);
-  MutexLock mu(self, monitor_lock_);
   // Make sure that we hold the lock.
-  if (owner_ != self) {
+  if (owner_.load(std::memory_order_relaxed) != self) {
     ThrowIllegalMonitorStateExceptionF("object not locked by thread before notify()");
     return;
   }
@@ -937,9 +958,8 @@
 
 void Monitor::NotifyAll(Thread* self) {
   DCHECK(self != nullptr);
-  MutexLock mu(self, monitor_lock_);
   // Make sure that we hold the lock.
-  if (owner_ != self) {
+  if (owner_.load(std::memory_order_relaxed) != self) {
     ThrowIllegalMonitorStateExceptionF("object not locked by thread before notifyAll()");
     return;
   }
@@ -968,30 +988,18 @@
   if (lw.GetState() == LockWord::kFatLocked) {
     Monitor* monitor = lw.FatLockMonitor();
     DCHECK(monitor != nullptr);
-    MutexLock mu(self, monitor->monitor_lock_);
-    // Can't deflate if we have anybody waiting on the CV.
-    if (monitor->num_waiters_ > 0) {
+    // Can't deflate if we have anybody waiting on the CV or trying to acquire the monitor.
+    if (monitor->num_waiters_.load(std::memory_order_relaxed) > 0) {
       return false;
     }
-    Thread* owner = monitor->owner_;
-    if (owner != nullptr) {
-      // Can't deflate if we are locked and have a hash code.
-      if (monitor->HasHashCode()) {
-        return false;
-      }
-      // Can't deflate if our lock count is too high.
-      if (static_cast<uint32_t>(monitor->lock_count_) > LockWord::kThinLockMaxCount) {
-        return false;
-      }
-      // Deflate to a thin lock.
-      LockWord new_lw = LockWord::FromThinLockId(owner->GetThreadId(),
-                                                 monitor->lock_count_,
-                                                 lw.GCState());
-      // Assume no concurrent read barrier state changes as mutators are suspended.
-      obj->SetLockWord(new_lw, false);
-      VLOG(monitor) << "Deflated " << obj << " to thin lock " << owner->GetTid() << " / "
-          << monitor->lock_count_;
-    } else if (monitor->HasHashCode()) {
+    if (!monitor->monitor_lock_.ExclusiveTryLock(self)) {
+      // We cannot deflate a monitor that's currently held. It's unclear whether we should if
+      // we could.
+      return false;
+    }
+    DCHECK_EQ(monitor->lock_count_, 0u);
+    DCHECK_EQ(monitor->owner_.load(std::memory_order_relaxed), static_cast<Thread*>(nullptr));
+    if (monitor->HasHashCode()) {
       LockWord new_lw = LockWord::FromHashCode(monitor->GetHashCode(), lw.GCState());
       // Assume no concurrent read barrier state changes as mutators are suspended.
       obj->SetLockWord(new_lw, false);
@@ -1003,6 +1011,8 @@
       obj->SetLockWord(new_lw, false);
       VLOG(monitor) << "Deflated" << obj << " to empty lock word";
     }
+    monitor->monitor_lock_.ExclusiveUnlock(self);
+    DCHECK(!(monitor->monitor_lock_.IsExclusiveHeld(self)));
     // The monitor is deflated, mark the object as null so that we know to delete it during the
     // next GC.
     monitor->obj_ = GcRoot<mirror::Object>(nullptr);
@@ -1088,6 +1098,10 @@
   size_t contention_count = 0;
   StackHandleScope<1> hs(self);
   Handle<mirror::Object> h_obj(hs.NewHandle(obj));
+#if !ART_USE_FUTEXES
+  // In this case we cannot inflate an unowned monitor, so we sometimes defer inflation.
+  bool should_inflate = false;
+#endif
   while (true) {
     // We initially read the lockword with ordinary Java/relaxed semantics. When stronger
     // semantics are needed, we address it below. Since GetLockWord bottoms out to a relaxed load,
@@ -1098,6 +1112,11 @@
         // No ordering required for preceding lockword read, since we retest.
         LockWord thin_locked(LockWord::FromThinLockId(thread_id, 0, lock_word.GCState()));
         if (h_obj->CasLockWord(lock_word, thin_locked, CASMode::kWeak, std::memory_order_acquire)) {
+#if !ART_USE_FUTEXES
+          if (should_inflate) {
+            InflateThinLocked(self, h_obj, lock_word, 0);
+          }
+#endif
           AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false);
           return h_obj.Get();  // Success!
         }
@@ -1152,9 +1171,16 @@
             // of nanoseconds or less.
             sched_yield();
           } else {
+#if ART_USE_FUTEXES
             contention_count = 0;
             // No ordering required for initial lockword read. Install rereads it anyway.
             InflateThinLocked(self, h_obj, lock_word, 0);
+#else
+            // Can't inflate from non-owning thread. Keep waiting. Bad for power, but this code
+            // isn't used on-device.
+            should_inflate = true;
+            usleep(10);
+#endif
           }
         }
         continue;  // Start from the beginning.
@@ -1168,6 +1194,7 @@
           return mon->TryLock(self) ? h_obj.Get() : nullptr;
         } else {
           mon->Lock(self);
+          DCHECK(mon->monitor_lock_.IsExclusiveHeld(self));
           return h_obj.Get();  // Success!
         }
       }
@@ -1531,8 +1558,7 @@
 }
 
 bool Monitor::IsLocked() REQUIRES_SHARED(Locks::mutator_lock_) {
-  MutexLock mu(Thread::Current(), monitor_lock_);
-  return owner_ != nullptr;
+  return GetOwner() != nullptr;
 }
 
 void Monitor::TranslateLocation(ArtMethod* method,
@@ -1553,8 +1579,9 @@
 }
 
 uint32_t Monitor::GetOwnerThreadId() {
-  MutexLock mu(Thread::Current(), monitor_lock_);
-  Thread* owner = owner_;
+  // Make sure owner is not deallocated during access.
+  MutexLock mu(Thread::Current(), *Locks::thread_list_lock_);
+  Thread* owner = GetOwner();
   if (owner != nullptr) {
     return owner->GetThreadId();
   } else {
@@ -1682,14 +1709,16 @@
       break;
     case LockWord::kFatLocked: {
       Monitor* mon = lock_word.FatLockMonitor();
-      owner_ = mon->owner_;
+      owner_ = mon->owner_.load(std::memory_order_relaxed);
       // Here it is okay for the owner to be null since we don't reset the LockWord back to
       // kUnlocked until we get a GC. In cases where this hasn't happened yet we will have a fat
       // lock without an owner.
+      // Neither owner_ nor entry_count_ is touched by threads in "suspended" state, so
+      // we must see consistent values.
       if (owner_ != nullptr) {
         entry_count_ = 1 + mon->lock_count_;
       } else {
-        DCHECK_EQ(mon->lock_count_, 0) << "Monitor is fat-locked without any owner!";
+        DCHECK_EQ(mon->lock_count_, 0u) << "Monitor is fat-locked without any owner!";
       }
       for (Thread* waiter = mon->wait_set_; waiter != nullptr; waiter = waiter->GetWaitNext()) {
         waiters_.push_back(waiter);
diff --git a/runtime/monitor.h b/runtime/monitor.h
index 4187f27..0714da1 100644
--- a/runtime/monitor.h
+++ b/runtime/monitor.h
@@ -21,6 +21,7 @@
 #include <stdint.h>
 #include <stdlib.h>
 
+#include <atomic>
 #include <iosfwd>
 #include <list>
 #include <vector>
@@ -129,12 +130,14 @@
 
   void SetObject(ObjPtr<mirror::Object> object);
 
-  Thread* GetOwner() const NO_THREAD_SAFETY_ANALYSIS {
-    return owner_;
+  // Provides no memory ordering guarantees.
+  Thread* GetOwner() const {
+    return owner_.load(std::memory_order_relaxed);
   }
 
   int32_t GetHashCode();
 
+  // Is the monitor currently locked? Debug only, provides no memory ordering guarantees.
   bool IsLocked() REQUIRES_SHARED(Locks::mutator_lock_) REQUIRES(!monitor_lock_);
 
   bool HasHashCode() const {
@@ -176,7 +179,7 @@
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   // Install the monitor into its object, may fail if another thread installs a different monitor
-  // first.
+  // first. Monitor remains in the same logical state as before, i.e. held the same # of times.
   bool Install(Thread* self)
       REQUIRES(!monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
@@ -189,7 +192,10 @@
   // this routine.
   void RemoveFromWaitSet(Thread* thread) REQUIRES(monitor_lock_);
 
-  void SignalContendersAndReleaseMonitorLock(Thread* self) RELEASE(monitor_lock_);
+  // Release the monitor lock and signal a waiting thread that has been notified and now needs the
+  // lock. Assumes the monitor lock is held exactly once, and the owner_ field has been reset to
+  // null. Caller may be suspended (Wait) or runnable (MonitorExit).
+  void SignalWaiterAndReleaseMonitorLock(Thread* self) RELEASE(monitor_lock_);
 
   // Changes the shape of a monitor from thin to fat, preserving the internal lock state. The
   // calling thread must own the lock or the owner must be suspended. There's a race with other
@@ -210,37 +216,33 @@
                            uint32_t expected_owner_thread_id,
                            uint32_t found_owner_thread_id,
                            Monitor* mon)
-      REQUIRES(!Locks::thread_list_lock_,
-               !monitor_lock_)
+      REQUIRES(!Locks::thread_list_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   // Try to lock without blocking, returns true if we acquired the lock.
-  bool TryLock(Thread* self)
-      REQUIRES(!monitor_lock_)
-      REQUIRES_SHARED(Locks::mutator_lock_);
-  // Variant for already holding the monitor lock.
-  bool TryLockLocked(Thread* self)
-      REQUIRES(monitor_lock_)
+  // If spin is true, then we spin for a short period before failing.
+  bool TryLock(Thread* self, bool spin = false)
+      TRY_ACQUIRE(true, monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   template<LockReason reason = LockReason::kForLock>
   void Lock(Thread* self)
-      REQUIRES(!monitor_lock_)
+      ACQUIRE(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   bool Unlock(Thread* thread)
-      REQUIRES(!monitor_lock_)
+      RELEASE(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   static void DoNotify(Thread* self, ObjPtr<mirror::Object> obj, bool notify_all)
       REQUIRES_SHARED(Locks::mutator_lock_) NO_THREAD_SAFETY_ANALYSIS;  // For mon->Notify.
 
   void Notify(Thread* self)
-      REQUIRES(!monitor_lock_)
+      REQUIRES(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   void NotifyAll(Thread* self)
-      REQUIRES(!monitor_lock_)
+      REQUIRES(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   static std::string PrettyContentionInfo(const std::string& owner_name,
@@ -270,7 +272,7 @@
   // Since we're allowed to wake up "early", we clamp extremely long durations to return at the end
   // of the 32-bit time epoch.
   void Wait(Thread* self, int64_t msec, int32_t nsec, bool interruptShouldThrow, ThreadState why)
-      REQUIRES(!monitor_lock_)
+      REQUIRES(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   // Translates the provided method and pc into its declaring class' source file and line number.
@@ -279,8 +281,18 @@
                                 int32_t* line_number)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
+  // Provides no memory ordering guarantees.
   uint32_t GetOwnerThreadId() REQUIRES(!monitor_lock_);
 
+  // Set locking_method_ and locking_dex_pc_ corresponding to owner's current stack.
+  // owner is either self or suspended.
+  void SetLockingMethod(Thread* owner) REQUIRES(monitor_lock_)
+      REQUIRES_SHARED(Locks::mutator_lock_);
+
+  // The same, but without checking for a proxy method. Currently requires owner == self.
+  void SetLockingMethodNoProxy(Thread* owner) REQUIRES(monitor_lock_)
+      REQUIRES_SHARED(Locks::mutator_lock_);
+
   // Support for systrace output of monitor operations.
   ALWAYS_INLINE static void AtraceMonitorLock(Thread* self,
                                               ObjPtr<mirror::Object> obj,
@@ -294,19 +306,27 @@
 
   static uint32_t lock_profiling_threshold_;
   static uint32_t stack_dump_lock_profiling_threshold_;
+  static bool capture_method_eagerly_;
 
+  // Holding the monitor N times is represented by holding monitor_lock_ N times.
   Mutex monitor_lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
 
-  ConditionVariable monitor_contenders_ GUARDED_BY(monitor_lock_);
+  // Pretend to unlock monitor lock.
+  void FakeUnlockMonitorLock() RELEASE(monitor_lock_) NO_THREAD_SAFETY_ANALYSIS {}
 
-  // Number of people waiting on the condition.
-  size_t num_waiters_ GUARDED_BY(monitor_lock_);
+  // Number of threads either waiting on the condition or waiting on a contended
+  // monitor acquisition. Prevents deflation.
+  std::atomic<size_t> num_waiters_;
 
-  // Which thread currently owns the lock?
-  Thread* volatile owner_ GUARDED_BY(monitor_lock_);
+  // Which thread currently owns the lock? monitor_lock_ only keeps the tid.
+  // Only set while holding monitor_lock_. Non-locking readers only use it to
+  // compare to self or for debugging.
+  std::atomic<Thread*> owner_;
 
-  // Owner's recursive lock depth.
-  int lock_count_ GUARDED_BY(monitor_lock_);
+  // Owner's recursive lock depth. Owner_ non-null, and lock_count_ == 0 ==> held once.
+  unsigned int lock_count_ GUARDED_BY(monitor_lock_);
+
+  // Owner's recursive lock depth is given by monitor_lock_.GetDepth().
 
   // What object are we part of. This is a weak root. Do not access
   // this directly, use GetObject() to read it so it will be guarded
@@ -322,11 +342,76 @@
   // Stored object hash code, generated lazily by GetHashCode.
   AtomicInteger hash_code_;
 
-  // Method and dex pc where the lock owner acquired the lock, used when lock
-  // sampling is enabled. locking_method_ may be null if the lock is currently
-  // unlocked, or if the lock is acquired by the system when the stack is empty.
-  ArtMethod* locking_method_ GUARDED_BY(monitor_lock_);
-  uint32_t locking_dex_pc_ GUARDED_BY(monitor_lock_);
+  // Data structure used to remember the method and dex pc of a recent holder of the
+  // lock. Used for tracing and contention reporting. Setting these is expensive, since it
+  // involves a partial stack walk. We set them only as follows, to minimize the cost:
+  // - If tracing is enabled, they are needed immediately when we first notice contention, so we
+  //   set them unconditionally when a monitor is acquired.
+  // - If contention reporting is enabled, we use the lock_owner_request_ field to have the
+  //   contending thread request them. The current owner then sets them when releasing the monitor,
+  //   making them available when the contending thread acquires the monitor.
+  // - If both are enabled, we blindly do both. This usually prevents us from switching between
+  //   reporting the end and beginning of critical sections for contention logging when tracing is
+  //   enabled.  We expect that tracing overhead is normally much higher than for contention
+  //   logging, so the added cost should be small. It also minimizes glitches when enabling and
+  //   disabling traces.
+  // We're tolerant of missing information. E.g. when tracing is initially turned on, we may
+  // not have the lock holder information if the holder acquired the lock with tracing off.
+  //
+  // We make this data unconditionally atomic; for contention logging all accesses are in fact
+  // protected by the monitor, but for tracing, reads are not. Writes are always
+  // protected by the monitor.
+  //
+  // The fields are always accessed without memory ordering. We store a checksum, and reread if
+  // the checksum doesn't correspond to the values.  This results in values that are correct with
+  // very high probability, but not certainty.
+  //
+  // If we need lock_owner information for a certain thread for contenion logging, we store its
+  // tid in lock_owner_request_. To satisfy the request, we store lock_owner_tid_,
+  // lock_owner_method_, and lock_owner_dex_pc_ and the corresponding checksum while holding the
+  // monitor.
+  //
+  // At all times, either lock_owner_ is zero, the checksum is valid, or a thread is actively
+  // in the process of establishing one of those states. Only one thread at a time can be actively
+  // establishing such a state, since writes are protected by the monitor.
+  std::atomic<Thread*> lock_owner_;  // *lock_owner_ may no longer exist!
+  std::atomic<ArtMethod*> lock_owner_method_;
+  std::atomic<uint32_t> lock_owner_dex_pc_;
+  std::atomic<uintptr_t> lock_owner_sum_;
+
+  // Request lock owner save method and dex_pc. Written asynchronously.
+  std::atomic<Thread*> lock_owner_request_;
+
+  // Compute method, dex pc, and tid "checksum".
+  uintptr_t LockOwnerInfoChecksum(ArtMethod* m, uint32_t dex_pc, Thread* t);
+
+  // Set owning method, dex pc, and tid. owner_ field is set and points to current thread.
+  void SetLockOwnerInfo(ArtMethod* method, uint32_t dex_pc, Thread* t)
+      REQUIRES(monitor_lock_);
+
+  // Get owning method and dex pc for the given thread, if available.
+  void GetLockOwnerInfo(/*out*/ArtMethod** method, /*out*/uint32_t* dex_pc, Thread* t);
+
+  // Do the same, while holding the monitor. There are no concurrent updates.
+  void GetLockOwnerInfoLocked(/*out*/ArtMethod** method, /*out*/uint32_t* dex_pc,
+                              uint32_t thread_id)
+      REQUIRES(monitor_lock_);
+
+  // We never clear lock_owner method and dex pc. Since it often reflects
+  // ownership when we last detected contention, it may be inconsistent with owner_
+  // and not 100% reliable. For lock contention monitoring, in the absence of tracing,
+  // there is a small risk that the current owner may finish before noticing the request,
+  // or the information will be overwritten by another intervening request and monitor
+  // release, so it's also not 100% reliable. But if we report information at all, it
+  // should generally (modulo accidental checksum matches) pertain to to an acquisition of the
+  // right monitor by the right thread, so it's extremely unlikely to be seriously misleading.
+  // Since we track threads by a pointer to the Thread structure, there is a small chance we may
+  // confuse threads allocated at the same exact address, if a contending thread dies before
+  // we inquire about it.
+
+  // Check for and act on a pending lock_owner_request_
+  void CheckLockOwnerRequest(Thread* self)
+      REQUIRES(monitor_lock_) REQUIRES_SHARED(Locks::mutator_lock_);
 
   // The denser encoded version of this monitor as stored in the lock word.
   MonitorId monitor_id_;
diff --git a/runtime/runtime.cc b/runtime/runtime.cc
index a500d9f..fc589e1 100644
--- a/runtime/runtime.cc
+++ b/runtime/runtime.cc
@@ -416,7 +416,7 @@
   // Shutdown any trace running.
   Trace::Shutdown();
 
-  // Report death. Clients me require a working thread, still, so do it before GC completes and
+  // Report death. Clients may require a working thread, still, so do it before GC completes and
   // all non-daemon threads are done.
   {
     ScopedObjectAccess soa(self);
@@ -439,9 +439,14 @@
 
   // Make sure our internal threads are dead before we start tearing down things they're using.
   GetRuntimeCallbacks()->StopDebugger();
+  // Deletion ordering is tricky. Null out everything we've deleted.
   delete signal_catcher_;
+  signal_catcher_ = nullptr;
 
   // Make sure all other non-daemon threads have terminated, and all daemon threads are suspended.
+  // Also wait for daemon threads to quiesce, so that in addition to being "suspended", they
+  // no longer access monitor and thread list data structures. We leak user daemon threads
+  // themselves, since we have no mechanism for shutting them down.
   {
     ScopedTrace trace2("Delete thread list");
     thread_list_->ShutDown();
@@ -458,7 +463,10 @@
   }
 
   // Finally delete the thread list.
+  // Thread_list_ can be accessed by "suspended" threads, e.g. in InflateThinLocked.
+  // We assume that by this point, we've waited long enough for things to quiesce.
   delete thread_list_;
+  thread_list_ = nullptr;
 
   // Delete the JIT after thread list to ensure that there is no remaining threads which could be
   // accessing the instrumentation when we delete it.
@@ -473,11 +481,17 @@
 
   ScopedTrace trace2("Delete state");
   delete monitor_list_;
+  monitor_list_ = nullptr;
   delete monitor_pool_;
+  monitor_pool_ = nullptr;
   delete class_linker_;
+  class_linker_ = nullptr;
   delete heap_;
+  heap_ = nullptr;
   delete intern_table_;
+  intern_table_ = nullptr;
   delete oat_file_manager_;
+  oat_file_manager_ = nullptr;
   Thread::Shutdown();
   QuasiAtomic::Shutdown();
   verifier::ClassVerifier::Shutdown();
diff --git a/runtime/thread_list.cc b/runtime/thread_list.cc
index d7b12de..89adda8 100644
--- a/runtime/thread_list.cc
+++ b/runtime/thread_list.cc
@@ -1351,15 +1351,17 @@
       thread->GetJniEnv()->SetFunctionsToRuntimeShutdownFunctions();
     }
   }
-  // If we have any daemons left, wait 200ms to ensure they are not stuck in a place where they
-  // are about to access runtime state and are not in a runnable state. Examples: Monitor code
-  // or waking up from a condition variable. TODO: Try and see if there is a better way to wait
-  // for daemon threads to be in a blocked state.
-  if (daemons_left > 0) {
-    static constexpr size_t kDaemonSleepTime = 200 * 1000;
-    usleep(kDaemonSleepTime);
+  if (daemons_left == 0) {
+    // No threads left; safe to shut down.
+    return;
   }
-  // Give the threads a chance to suspend, complaining if they're slow.
+  // If we have any daemons left, wait until they are (a) suspended and (b) they are not stuck
+  // in a place where they are about to access runtime state and are not in a runnable state.
+  // We attempt to do the latter by just waiting long enough for things to
+  // quiesce. Examples: Monitor code or waking up from a condition variable.
+  // TODO: Try and see if there is a better way to wait for daemon threads to be in a
+  // blocked state.
+  // Give the threads a chance to suspend, complaining if they're slow. (a)
   bool have_complained = false;
   static constexpr size_t kTimeoutMicroseconds = 2000 * 1000;
   static constexpr size_t kSleepMicroseconds = 1000;
@@ -1378,6 +1380,9 @@
       }
     }
     if (all_suspended) {
+      // One final wait for all the now "suspended" threads to actually quiesce. (b)
+      static constexpr size_t kDaemonSleepTime = 200 * 1000;
+      usleep(kDaemonSleepTime);
       return;
     }
     usleep(kSleepMicroseconds);
diff --git a/runtime/thread_list.h b/runtime/thread_list.h
index 4ea0995..7a3c26b 100644
--- a/runtime/thread_list.h
+++ b/runtime/thread_list.h
@@ -104,6 +104,10 @@
   // Find an existing thread (or self) by its thread id (not tid).
   Thread* FindThreadByThreadId(uint32_t thread_id) REQUIRES(Locks::thread_list_lock_);
 
+  // Does the thread list still contain the given thread, or one at the same address?
+  // Used by Monitor to provide (mostly accurate) debugging information.
+  bool Contains(Thread* thread) REQUIRES(Locks::thread_list_lock_);
+
   // Run a checkpoint on threads, running threads are not suspended but run the checkpoint inside
   // of the suspend check. Returns how many checkpoints that are expected to run, including for
   // already suspended threads for b/24191051. Run the callback, if non-null, inside the
@@ -197,7 +201,6 @@
   uint32_t AllocThreadId(Thread* self);
   void ReleaseThreadId(Thread* self, uint32_t id) REQUIRES(!Locks::allocated_thread_ids_lock_);
 
-  bool Contains(Thread* thread) REQUIRES(Locks::thread_list_lock_);
   bool Contains(pid_t tid) REQUIRES(Locks::thread_list_lock_);
   size_t RunCheckpoint(Closure* checkpoint_function, bool includeSuspended)
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
diff --git a/test/2029-contended-monitors/expected.txt b/test/2029-contended-monitors/expected.txt
new file mode 100644
index 0000000..1894ad7
--- /dev/null
+++ b/test/2029-contended-monitors/expected.txt
@@ -0,0 +1,10 @@
+Starting
+Atomic increments
+Hold time 2, shared lock
+Hold time 20, shared lock
+Hold time 200, shared lock
+Hold time 2000, shared lock
+Hold time 20000, shared lock
+Hold time 200000, shared lock
+Hold for 2 msecs while sleeping, shared lock
+Hold for 2 msecs while sleeping, private lock
diff --git a/test/2029-contended-monitors/info.txt b/test/2029-contended-monitors/info.txt
new file mode 100644
index 0000000..f6ccdd3
--- /dev/null
+++ b/test/2029-contended-monitors/info.txt
@@ -0,0 +1,4 @@
+Checks that monitor-protected increments at various granularities are indeed
+atomic. Also checks j.u.c. increments. Can be configured to print execution
+times for contended and uncontentended monitor acquisition under different
+cicumstances.
diff --git a/test/2029-contended-monitors/src/Main.java b/test/2029-contended-monitors/src/Main.java
new file mode 100644
index 0000000..78d2ae4
--- /dev/null
+++ b/test/2029-contended-monitors/src/Main.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Main {
+
+  private final boolean PRINT_TIMES = false;  // False for use as run test.
+
+  // Number of increments done by each thread.  Must be multiple of largest hold time below,
+  // times any possible thread count. Finishes much faster when used as run test.
+  private final int TOTAL_ITERS = PRINT_TIMES? 16_000_000 : 1_600_000;
+  private final int MAX_HOLD_TIME = PRINT_TIMES? 2_000_000 : 200_000;
+
+  private int counter;
+
+  private AtomicInteger atomicCounter = new AtomicInteger();
+
+  private Object lock;
+
+  private int currentThreadCount = 0;
+
+  // A function such that if we repeatedly apply it to -1, the value oscillates
+  // between -1 and 3. Thus the average value is 1.
+  // This is designed to make it hard for the compiler to predict the values in
+  // the sequence.
+  private int nextInt(int x) {
+    if (x < 0) {
+      return x * x + 2;
+    } else {
+      return x - 4;
+    }
+  }
+
+  // Increment counter by n, holding log for time roughly propertional to n.
+  // N must be even.
+  private void holdFor(Object lock, int n) {
+    synchronized(lock) {
+      int y = -1;
+      for (int i = 0; i < n; ++i) {
+        counter += y;
+        y = nextInt(y);
+      }
+    }
+  }
+
+  private class RepeatedLockHolder implements Runnable {
+    RepeatedLockHolder(boolean shared, int n /* even */) {
+      sharedLock = shared;
+      holdTime = n;
+    }
+    @Override
+    public void run() {
+      Object myLock = sharedLock ? lock : new Object();
+      int nIters = TOTAL_ITERS / currentThreadCount / holdTime;
+      for (int i = 0; i < nIters; ++i) {
+        holdFor(myLock, holdTime);
+      }
+    }
+    private boolean sharedLock;
+    private int holdTime;
+  }
+
+  private class SleepyLockHolder implements Runnable {
+    SleepyLockHolder(boolean shared) {
+      sharedLock = shared;
+    }
+    @Override
+    public void run() {
+      Object myLock = sharedLock ? lock : new Object();
+      int nIters = TOTAL_ITERS / currentThreadCount / 10_000;
+      for (int i = 0; i < nIters; ++i) {
+        synchronized(myLock) {
+          try {
+            Thread.sleep(2);
+          } catch(InterruptedException e) {
+            throw new AssertionError("Unexpected interrupt");
+          }
+          counter += 10_000;
+        }
+      }
+    }
+    private boolean sharedLock;
+  }
+
+  // Increment atomicCounter n times, on average by 1 each time.
+  private class RepeatedIncrementer implements Runnable {
+    @Override
+    public void run() {
+      int y = -1;
+      int nIters = TOTAL_ITERS / currentThreadCount;
+      for (int i = 0; i < nIters; ++i) {
+        atomicCounter.addAndGet(y);
+        y = nextInt(y);
+      }
+    }
+  }
+
+  // Run n threads doing work. Return the elapsed time this took, in milliseconds.
+  private long runMultiple(int n, Runnable work) {
+    Thread[] threads = new Thread[n];
+    // Replace lock, so that we start with a clean, uninflated lock each time.
+    lock = new Object();
+    for (int i = 0; i < n; ++i) {
+      threads[i] = new Thread(work);
+    }
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < n; ++i) {
+      threads[i].start();
+    }
+    for (int i = 0; i < n; ++i) {
+      try {
+        threads[i].join();
+      } catch(InterruptedException e) {
+        throw new AssertionError("Unexpected interrupt");
+      }
+    }
+    return System.currentTimeMillis() - startTime;
+  }
+
+  // Run on different numbers of threads.
+  private void runAll(Runnable work, Runnable init, Runnable checker) {
+    for (int i = 1; i <= 8; i *= 2) {
+      currentThreadCount = i;
+      init.run();
+      long time = runMultiple(i, work);
+      if (PRINT_TIMES) {
+        System.out.print(time + (i == 8 ? "\n" : "\t"));
+      }
+      checker.run();
+    }
+  }
+
+  private class CheckAtomicCounter implements Runnable {
+    @Override
+    public void run() {
+      if (atomicCounter.get() != TOTAL_ITERS) {
+        throw new AssertionError("Failed atomicCounter postcondition check for "
+            + currentThreadCount + " threads");
+      }
+    }
+  }
+
+  private class CheckCounter implements Runnable {
+    @Override
+    public void run() {
+      if (counter != TOTAL_ITERS) {
+        throw new AssertionError("Failed counter postcondition check for "
+            + currentThreadCount + " threads");
+      }
+    }
+  }
+
+  private void run() {
+    if (PRINT_TIMES) {
+      System.out.println("All times in milliseconds for 1, 2, 4 and 8 threads");
+    }
+    System.out.println("Atomic increments");
+    runAll(new RepeatedIncrementer(), () -> { atomicCounter.set(0); }, new CheckAtomicCounter());
+    for (int i = 2; i <= MAX_HOLD_TIME; i *= 10) {
+      // i * 8 (max thread count) divides TOTAL_ITERS
+      System.out.println("Hold time " + i + ", shared lock");
+      runAll(new RepeatedLockHolder(true, i), () -> { counter = 0; }, new CheckCounter());
+    }
+    if (PRINT_TIMES) {
+      for (int i = 2; i <= MAX_HOLD_TIME; i *= 1000) {
+        // i divides TOTAL_ITERS
+        System.out.println("Hold time " + i + ", private lock");
+        // Since there is no mutual exclusion final counter value is unpredictable.
+        runAll(new RepeatedLockHolder(false, i), () -> { counter = 0; }, () -> {});
+      }
+    }
+    System.out.println("Hold for 2 msecs while sleeping, shared lock");
+    runAll(new SleepyLockHolder(true), () -> { counter = 0; }, new CheckCounter());
+    System.out.println("Hold for 2 msecs while sleeping, private lock");
+    runAll(new SleepyLockHolder(false), () -> { counter = 0; }, () -> {});
+  }
+
+  public static void main(String[] args) {
+    System.out.println("Starting");
+    new Main().run();
+  }
+}
diff --git a/test/knownfailures.json b/test/knownfailures.json
index 02705bc..559bee7 100644
--- a/test/knownfailures.json
+++ b/test/knownfailures.json
@@ -1289,6 +1289,12 @@
         "tests": ["909-attach-agent", "126-miranda-multidex"],
         "zipapex": true,
         "bug": "b/135507613",
-        "description": ["These tests run dalvikvm multiple times, this can messup the zipapex runner"]
+        "description": ["These tests run dalvikvm multiple times, this can mess up the",
+                        "zipapex runner."]
+    },
+    {
+        "tests": ["2029-contended-monitors"],
+        "variant": "interpreter | interp-ac | gcstress | trace",
+        "description": ["Slow test. Prone to timeouts."]
     }
 ]