Revert^4 "Add spin loop to mutex, overhaul monitor"

This reverts commit ead89ef62768faa06fedd279b86bccbed1464673.

PS1 is identical to Revert^2.

This fixes a shutdown issue caused by prematurely hanging when we
block while trying to acquire system mutexes during shutdown.

Only refuse to wake up after shutdown is well under way.

Bug: 140590186
Bug: 121302864
Test: Build and boot AOSP.
Test: art/test/testrunner/testrunner.py --host -b -t 1932-monitor-events-misc
Test: art/test/testrunner/testrunner.py --host -b -t 004-ThreadStress
Test: art/test/testrunner/testrunner.py --host -b -t 132-daemon-locks-shutdown
Test: 132-daemon-locks-shutdown repeated with increased thread counts and multiple
concurrent tests.

Change-Id: Ic19d32652a2a05c1ca843b3e9c6e29e6770262da
diff --git a/libartbase/base/utils.cc b/libartbase/base/utils.cc
index 7d4dd59..19311b3 100644
--- a/libartbase/base/utils.cc
+++ b/libartbase/base/utils.cc
@@ -309,7 +309,7 @@
 
 void SleepForever() {
   while (true) {
-    usleep(1000000);
+    sleep(100000000);
   }
 }
 
diff --git a/runtime/base/mutex.cc b/runtime/base/mutex.cc
index 0d5ce15..0d1b162 100644
--- a/runtime/base/mutex.cc
+++ b/runtime/base/mutex.cc
@@ -61,6 +61,28 @@
 }
 #endif
 
+#if ART_USE_FUTEXES
+// If we wake up from a futex wake, and the runtime disappeared while we were asleep,
+// it's important to stop in our tracks before we touch deallocated memory.
+static inline void SleepIfRuntimeDeleted(Thread* self) {
+  if (self != nullptr) {
+    JNIEnvExt* const env = self->GetJniEnv();
+    if (UNLIKELY(env != nullptr && env->IsRuntimeDeleted())) {
+      DCHECK(self->IsDaemon());
+      // If the runtime has been deleted, then we cannot proceed. Just sleep forever. This may
+      // occur for user daemon threads that get a spurious wakeup. This occurs for test 132 with
+      // --host and --gdb.
+      // After we wake up, the runtime may have been shutdown, which means that this condition may
+      // have been deleted. It is not safe to retry the wait.
+      SleepForever();
+    }
+  }
+}
+#else
+// We should be doing this for pthreads to, but it seems to be impossible for something
+// like a condition variable wait. Thus we don't bother trying.
+#endif
+
 // Wait for an amount of time that roughly increases in the argument i.
 // Spin for small arguments and yield/sleep for longer ones.
 static void BackOff(uint32_t i) {
@@ -82,6 +104,34 @@
   }
 }
 
+// Wait until pred(testLoc->load(std::memory_order_relaxed)) holds, or until a
+// short time interval, on the order of kernel context-switch time, passes.
+// Return true if the predicate test succeeded, false if we timed out.
+template<typename Pred>
+static inline bool WaitBrieflyFor(AtomicInteger* testLoc, Thread* self, Pred pred) {
+  // TODO: Tune these parameters correctly. BackOff(3) should take on the order of 100 cycles. So
+  // this should result in retrying <= 10 times, usually waiting around 100 cycles each. The
+  // maximum delay should be significantly less than the expected futex() context switch time, so
+  // there should be little danger of this worsening things appreciably. If the lock was only
+  // held briefly by a running thread, this should help immensely.
+  static constexpr uint32_t kMaxBackOff = 3;  // Should probably be <= kSpinMax above.
+  static constexpr uint32_t kMaxIters = 50;
+  JNIEnvExt* const env = self == nullptr ? nullptr : self->GetJniEnv();
+  for (uint32_t i = 1; i <= kMaxIters; ++i) {
+    BackOff(std::min(i, kMaxBackOff));
+    if (pred(testLoc->load(std::memory_order_relaxed))) {
+      return true;
+    }
+    if (UNLIKELY(env != nullptr && env->IsRuntimeDeleted())) {
+      // This returns true once we've started shutting down. We then try to reach a quiescent
+      // state as soon as possible to avoid touching data that may be deallocated by the shutdown
+      // process. It currently relies on a timeout.
+      return false;
+    }
+  }
+  return false;
+}
+
 class ScopedAllMutexesLock final {
  public:
   explicit ScopedAllMutexesLock(const BaseMutex* mutex) : mutex_(mutex) {
@@ -381,24 +431,35 @@
       } else {
         // Failed to acquire, hang up.
         ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-        // Increment contender count. We can't create enough threads for this to overflow.
-        increment_contenders();
-        // Make cur_state again reflect the expected value of state_and_contenders.
-        cur_state += kContenderIncrement;
-        if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-          self->CheckEmptyCheckpointFromMutex();
-        }
-        if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state,
-                  nullptr, nullptr, 0) != 0) {
-          // We only went to sleep after incrementing and contenders and checking that the lock
-          // is still held by someone else.
-          // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
-          // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
-          if ((errno != EAGAIN) && (errno != EINTR)) {
-            PLOG(FATAL) << "futex wait failed for " << name_;
+        // Empirically, it appears important to spin again each time through the loop; if we
+        // bother to go to sleep and wake up, we should be fairly persistent in trying for the
+        // lock.
+        if (!WaitBrieflyFor(&state_and_contenders_, self,
+                            [](int32_t v) { return (v & kHeldMask) == 0; })) {
+          // Increment contender count. We can't create enough threads for this to overflow.
+          increment_contenders();
+          // Make cur_state again reflect the expected value of state_and_contenders.
+          cur_state += kContenderIncrement;
+          if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+            self->CheckEmptyCheckpointFromMutex();
           }
+          do {
+            if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state,
+                      nullptr, nullptr, 0) != 0) {
+              // We only went to sleep after incrementing and contenders and checking that the
+              // lock is still held by someone else.  EAGAIN and EINTR both indicate a spurious
+              // failure, try again from the beginning.  We don't use TEMP_FAILURE_RETRY so we can
+              // intentionally retry to acquire the lock.
+              if ((errno != EAGAIN) && (errno != EINTR)) {
+                PLOG(FATAL) << "futex wait failed for " << name_;
+              }
+            }
+            SleepIfRuntimeDeleted(self);
+            // Retry until not held. In heavy contention situations we otherwise get redundant
+            // futex wakeups as a result of repeatedly decrementing and incrementing contenders.
+          } while ((state_and_contenders_.load(std::memory_order_relaxed) & kHeldMask) != 0);
+          decrement_contenders();
         }
-        decrement_contenders();
       }
     } while (!done);
     // Confirm that lock is now held.
@@ -406,7 +467,8 @@
 #else
     CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
 #endif
-    DCHECK_EQ(GetExclusiveOwnerTid(), 0);
+    DCHECK_EQ(GetExclusiveOwnerTid(), 0) << " my tid = " << SafeGetTid(self)
+                                         << " recursive_ = " << recursive_;
     exclusive_owner_.store(SafeGetTid(self), std::memory_order_relaxed);
     RegisterAsLocked(self);
   }
@@ -459,6 +521,48 @@
   return true;
 }
 
+bool Mutex::ExclusiveTryLockWithSpinning(Thread* self) {
+  // Spin a small number of times, since this affects our ability to respond to suspension
+  // requests. We spin repeatedly only if the mutex repeatedly becomes available and unavailable
+  // in rapid succession, and then we will typically not spin for the maximal period.
+  const int kMaxSpins = 5;
+  for (int i = 0; i < kMaxSpins; ++i) {
+    if (ExclusiveTryLock(self)) {
+      return true;
+    }
+#if ART_USE_FUTEXES
+    if (!WaitBrieflyFor(&state_and_contenders_, self,
+            [](int32_t v) { return (v & kHeldMask) == 0; })) {
+      return false;
+    }
+#endif
+  }
+  return ExclusiveTryLock(self);
+}
+
+#if ART_USE_FUTEXES
+void Mutex::ExclusiveLockUncontendedFor(Thread* new_owner) {
+  DCHECK_EQ(level_, kMonitorLock);
+  DCHECK(!recursive_);
+  state_and_contenders_.store(kHeldMask, std::memory_order_relaxed);
+  recursion_count_ = 1;
+  exclusive_owner_.store(SafeGetTid(new_owner), std::memory_order_relaxed);
+  // Don't call RegisterAsLocked(). It wouldn't register anything anyway.  And
+  // this happens as we're inflating a monitor, which doesn't logically affect
+  // held "locks"; it effectively just converts a thin lock to a mutex.  By doing
+  // this while the lock is already held, we're delaying the acquisition of a
+  // logically held mutex, which can introduce bogus lock order violations.
+}
+
+void Mutex::ExclusiveUnlockUncontended() {
+  DCHECK_EQ(level_, kMonitorLock);
+  state_and_contenders_.store(0, std::memory_order_relaxed);
+  recursion_count_ = 0;
+  exclusive_owner_.store(0 /* pid */, std::memory_order_relaxed);
+  // Skip RegisterAsUnlocked(), which wouldn't do anything anyway.
+}
+#endif  // ART_USE_FUTEXES
+
 void Mutex::ExclusiveUnlock(Thread* self) {
   if (kIsDebugBuild && self != nullptr && self != Thread::Current()) {
     std::string name1 = "<null>";
@@ -589,18 +693,21 @@
     } else {
       // Failed to acquire, hang up.
       ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-      num_contenders_.fetch_add(1);
-      if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-        self->CheckEmptyCheckpointFromMutex();
-      }
-      if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
-        // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
-        // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
-        if ((errno != EAGAIN) && (errno != EINTR)) {
-          PLOG(FATAL) << "futex wait failed for " << name_;
+      if (!WaitBrieflyFor(&state_, self, [](int32_t v) { return v == 0; })) {
+        num_contenders_.fetch_add(1);
+        if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+          self->CheckEmptyCheckpointFromMutex();
         }
+        if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
+          // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
+          // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
+          if ((errno != EAGAIN) && (errno != EINTR)) {
+            PLOG(FATAL) << "futex wait failed for " << name_;
+          }
+        }
+        SleepIfRuntimeDeleted(self);
+        num_contenders_.fetch_sub(1);
       }
-      num_contenders_.fetch_sub(1);
     }
   } while (!done);
   DCHECK_EQ(state_.load(std::memory_order_relaxed), -1);
@@ -665,22 +772,25 @@
         return false;  // Timed out.
       }
       ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-      num_contenders_.fetch_add(1);
-      if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-        self->CheckEmptyCheckpointFromMutex();
-      }
-      if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, &rel_ts, nullptr, 0) != 0) {
-        if (errno == ETIMEDOUT) {
-          num_contenders_.fetch_sub(1);
-          return false;  // Timed out.
-        } else if ((errno != EAGAIN) && (errno != EINTR)) {
-          // EAGAIN and EINTR both indicate a spurious failure,
-          // recompute the relative time out from now and try again.
-          // We don't use TEMP_FAILURE_RETRY so we can recompute rel_ts;
-          PLOG(FATAL) << "timed futex wait failed for " << name_;
+      if (!WaitBrieflyFor(&state_, self, [](int32_t v) { return v == 0; })) {
+        num_contenders_.fetch_add(1);
+        if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+          self->CheckEmptyCheckpointFromMutex();
         }
+        if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, &rel_ts, nullptr, 0) != 0) {
+          if (errno == ETIMEDOUT) {
+            num_contenders_.fetch_sub(1);
+            return false;  // Timed out.
+          } else if ((errno != EAGAIN) && (errno != EINTR)) {
+            // EAGAIN and EINTR both indicate a spurious failure,
+            // recompute the relative time out from now and try again.
+            // We don't use TEMP_FAILURE_RETRY so we can recompute rel_ts;
+            PLOG(FATAL) << "timed futex wait failed for " << name_;
+          }
+        }
+        SleepIfRuntimeDeleted(self);
+        num_contenders_.fetch_sub(1);
       }
-      num_contenders_.fetch_sub(1);
     }
   } while (!done);
 #else
@@ -706,16 +816,19 @@
 void ReaderWriterMutex::HandleSharedLockContention(Thread* self, int32_t cur_state) {
   // Owner holds it exclusively, hang up.
   ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
-  num_contenders_.fetch_add(1);
-  if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
-    self->CheckEmptyCheckpointFromMutex();
-  }
-  if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
-    if (errno != EAGAIN && errno != EINTR) {
-      PLOG(FATAL) << "futex wait failed for " << name_;
+  if (!WaitBrieflyFor(&state_, self, [](int32_t v) { return v >= 0; })) {
+    num_contenders_.fetch_add(1);
+    if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
+      self->CheckEmptyCheckpointFromMutex();
     }
+    if (futex(state_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) {
+      if (errno != EAGAIN && errno != EINTR) {
+        PLOG(FATAL) << "futex wait failed for " << name_;
+      }
+    }
+    SleepIfRuntimeDeleted(self);
+    num_contenders_.fetch_sub(1);
   }
-  num_contenders_.fetch_sub(1);
 }
 #endif
 
@@ -895,18 +1008,7 @@
       PLOG(FATAL) << "futex wait failed for " << name_;
     }
   }
-  if (self != nullptr) {
-    JNIEnvExt* const env = self->GetJniEnv();
-    if (UNLIKELY(env != nullptr && env->IsRuntimeDeleted())) {
-      CHECK(self->IsDaemon());
-      // If the runtime has been deleted, then we cannot proceed. Just sleep forever. This may
-      // occur for user daemon threads that get a spurious wakeup. This occurs for test 132 with
-      // --host and --gdb.
-      // After we wake up, the runtime may have been shutdown, which means that this condition may
-      // have been deleted. It is not safe to retry the wait.
-      SleepForever();
-    }
-  }
+  SleepIfRuntimeDeleted(self);
   guard_.ExclusiveLock(self);
   CHECK_GT(num_waiters_, 0);
   num_waiters_--;
@@ -948,6 +1050,7 @@
       PLOG(FATAL) << "timed futex wait failed for " << name_;
     }
   }
+  SleepIfRuntimeDeleted(self);
   guard_.ExclusiveLock(self);
   CHECK_GT(num_waiters_, 0);
   num_waiters_--;
diff --git a/runtime/base/mutex.h b/runtime/base/mutex.h
index 136e17a..33878e6 100644
--- a/runtime/base/mutex.h
+++ b/runtime/base/mutex.h
@@ -57,7 +57,7 @@
 constexpr bool kDebugLocking = kIsDebugBuild;
 
 // Record Log contention information, dumpable via SIGQUIT.
-#ifdef ART_USE_FUTEXES
+#if ART_USE_FUTEXES
 // To enable lock contention logging, set this to true.
 constexpr bool kLogLockContentions = false;
 // FUTEX_WAKE first argument:
@@ -102,7 +102,11 @@
 
   BaseMutex(const char* name, LockLevel level);
   virtual ~BaseMutex();
+
+  // Add this mutex to those owned by self, and perform appropriate checking.
+  // For this call only, self may also be another suspended thread.
   void RegisterAsLocked(Thread* self);
+
   void RegisterAsUnlocked(Thread* self);
   void CheckSafeToWait(Thread* self);
 
@@ -156,8 +160,14 @@
 // -------------------------------------------
 // Free      | Exclusive     | error
 // Exclusive | Block*        | Free
-// * Mutex is not reentrant and so an attempt to ExclusiveLock on the same thread will result in
-//   an error. Being non-reentrant simplifies Waiting on ConditionVariables.
+// * Mutex is not reentrant unless recursive is true. An attempt to ExclusiveLock on a
+// recursive=false Mutex on a thread already owning the Mutex results in an error.
+//
+// TODO(b/140590186): Remove support for recursive == true.
+//
+// Some mutexes, including those associated with Java monitors may be accessed (in particular
+// acquired) by a thread in suspended state. Suspending all threads does NOT prevent mutex state
+// from changing.
 std::ostream& operator<<(std::ostream& os, const Mutex& mu);
 class LOCKABLE Mutex : public BaseMutex {
  public:
@@ -173,6 +183,8 @@
   // Returns true if acquires exclusive access, false otherwise.
   bool ExclusiveTryLock(Thread* self) TRY_ACQUIRE(true);
   bool TryLock(Thread* self) TRY_ACQUIRE(true) { return ExclusiveTryLock(self); }
+  // Equivalent to ExclusiveTryLock, but retry for a short period before giving up.
+  bool ExclusiveTryLockWithSpinning(Thread* self) TRY_ACQUIRE(true);
 
   // Release exclusive access.
   void ExclusiveUnlock(Thread* self) RELEASE();
@@ -200,7 +212,9 @@
   // whether we hold the lock; any other information may be invalidated before we return.
   pid_t GetExclusiveOwnerTid() const;
 
-  // Returns how many times this Mutex has been locked, it is better to use AssertHeld/NotHeld.
+  // Returns how many times this Mutex has been locked, it is typically better to use
+  // AssertHeld/NotHeld. For a simply held mutex this method returns 1. Should only be called
+  // while holding the mutex or threads are suspended.
   unsigned int GetDepth() const {
     return recursion_count_;
   }
@@ -212,6 +226,18 @@
 
   void WakeupToRespondToEmptyCheckpoint() override;
 
+#if ART_USE_FUTEXES
+  // Acquire the mutex, possibly on behalf of another thread. Acquisition must be
+  // uncontended. New_owner must be current thread or suspended.
+  // Mutex must be at level kMonitorLock.
+  // Not implementable for the pthreads version, so we must avoid calling it there.
+  void ExclusiveLockUncontendedFor(Thread* new_owner);
+
+  // Undo the effect of the previous calling, setting the mutex back to unheld.
+  // Still assumes no concurrent access.
+  void ExclusiveUnlockUncontended();
+#endif  // ART_USE_FUTEXES
+
  private:
 #if ART_USE_FUTEXES
   // Low order bit: 0 is unheld, 1 is held.
diff --git a/runtime/gc/heap.cc b/runtime/gc/heap.cc
index 0601b8d..ee9e4a8 100644
--- a/runtime/gc/heap.cc
+++ b/runtime/gc/heap.cc
@@ -2307,6 +2307,12 @@
                                 std::memory_order_release);
 }
 
+#pragma clang diagnostic push
+#if !ART_USE_FUTEXES
+// Frame gets too large, perhaps due to Bionic pthread_mutex_lock size. We don't care.
+#  pragma clang diagnostic ignored "-Wframe-larger-than="
+#endif
+// This has a large frame, but shouldn't be run anywhere near the stack limit.
 void Heap::PreZygoteFork() {
   if (!HasZygoteSpace()) {
     // We still want to GC in case there is some unreachable non moving objects that could cause a
@@ -2467,6 +2473,7 @@
     AddRememberedSet(post_zygote_non_moving_space_rem_set);
   }
 }
+#pragma clang diagnostic pop
 
 void Heap::FlushAllocStack() {
   MarkAllocStackAsLive(allocation_stack_.get());
diff --git a/runtime/jni/jni_env_ext.cc b/runtime/jni/jni_env_ext.cc
index e2581dd..cf6a22c 100644
--- a/runtime/jni/jni_env_ext.cc
+++ b/runtime/jni/jni_env_ext.cc
@@ -90,7 +90,6 @@
 
 void JNIEnvExt::SetFunctionsToRuntimeShutdownFunctions() {
   functions = GetRuntimeShutdownNativeInterface();
-  runtime_deleted_ = true;
 }
 
 JNIEnvExt::~JNIEnvExt() {
diff --git a/runtime/jni/jni_env_ext.h b/runtime/jni/jni_env_ext.h
index 7aae92f..2c7ba3b 100644
--- a/runtime/jni/jni_env_ext.h
+++ b/runtime/jni/jni_env_ext.h
@@ -110,7 +110,8 @@
   }
   JavaVMExt* GetVm() const { return vm_; }
 
-  bool IsRuntimeDeleted() const { return runtime_deleted_; }
+  void SetRuntimeDeleted() { runtime_deleted_.store(true, std::memory_order_relaxed); }
+  bool IsRuntimeDeleted() const { return runtime_deleted_.load(std::memory_order_relaxed); }
   bool IsCheckJniEnabled() const { return check_jni_; }
 
 
@@ -206,7 +207,7 @@
   bool check_jni_;
 
   // If we are a JNI env for a daemon thread with a deleted runtime.
-  bool runtime_deleted_;
+  std::atomic<bool> runtime_deleted_;
 
   template<bool kEnableIndexIds> friend class JNI;
   friend class ScopedJniEnvLocalRefState;
diff --git a/runtime/lock_word.h b/runtime/lock_word.h
index ac7890c..30559a0 100644
--- a/runtime/lock_word.h
+++ b/runtime/lock_word.h
@@ -42,6 +42,7 @@
  *  |10|9|8|765432109876|5432109876543210|
  *  |00|m|r| lock count |thread id owner |
  *
+ * The lock count is zero, but the owner is nonzero for a simply held lock.
  * When the lock word is in the "fat" state and its bits are formatted as follows:
  *
  *  |33|2|2|2222222211111111110000000000|
@@ -72,7 +73,8 @@
     kMarkBitStateSize = 1,
     // Number of bits to encode the thin lock owner.
     kThinLockOwnerSize = 16,
-    // Remaining bits are the recursive lock count.
+    // Remaining bits are the recursive lock count. Zero means it is locked exactly once
+    // and not recursively.
     kThinLockCountSize = 32 - kThinLockOwnerSize - kStateSize - kReadBarrierStateSize -
         kMarkBitStateSize,
 
@@ -234,7 +236,8 @@
   // Return the owner thin lock thread id.
   uint32_t ThinLockOwner() const;
 
-  // Return the number of times a lock value has been locked.
+  // Return the number of times a lock value has been re-locked. Only valid in thin-locked state.
+  // If the lock is held only once the return value is zero.
   uint32_t ThinLockCount() const;
 
   // Return the Monitor encoded in a fat lock.
diff --git a/runtime/monitor-inl.h b/runtime/monitor-inl.h
index e8ffafa..f7e31a0 100644
--- a/runtime/monitor-inl.h
+++ b/runtime/monitor-inl.h
@@ -29,6 +29,57 @@
   return obj_.Read<kReadBarrierOption>();
 }
 
+// Check for request to set lock owner info.
+void Monitor::CheckLockOwnerRequest(Thread* self) {
+  DCHECK(self != nullptr);
+  Thread* request_thread = lock_owner_request_.load(std::memory_order_relaxed);
+  if (request_thread == self) {
+    SetLockingMethod(self);
+    // Only do this the first time after a request.
+    lock_owner_request_.store(nullptr, std::memory_order_relaxed);
+  }
+}
+
+uintptr_t Monitor::LockOwnerInfoChecksum(ArtMethod* m, uint32_t dex_pc, Thread* t) {
+  uintptr_t dpc_and_thread = static_cast<uintptr_t>(dex_pc << 8) ^ reinterpret_cast<uintptr_t>(t);
+  return reinterpret_cast<uintptr_t>(m) ^ dpc_and_thread
+      ^ (dpc_and_thread << (/* ptr_size / 2 */ (sizeof m) << 2));
+}
+
+void Monitor::SetLockOwnerInfo(ArtMethod* method, uint32_t dex_pc, Thread* t) {
+  lock_owner_method_.store(method, std::memory_order_relaxed);
+  lock_owner_dex_pc_.store(dex_pc, std::memory_order_relaxed);
+  lock_owner_.store(t, std::memory_order_relaxed);
+  uintptr_t sum = LockOwnerInfoChecksum(method, dex_pc, t);
+  lock_owner_sum_.store(sum, std::memory_order_relaxed);
+}
+
+void Monitor::GetLockOwnerInfo(/*out*/ArtMethod** method, /*out*/uint32_t* dex_pc,
+                               Thread* t) {
+  ArtMethod* owners_method;
+  uint32_t owners_dex_pc;
+  Thread* owner;
+  uintptr_t owners_sum;
+  DCHECK(t != nullptr);
+  do {
+    owner = lock_owner_.load(std::memory_order_relaxed);
+    if (owner == nullptr) {
+      break;
+    }
+    owners_method = lock_owner_method_.load(std::memory_order_relaxed);
+    owners_dex_pc = lock_owner_dex_pc_.load(std::memory_order_relaxed);
+    owners_sum = lock_owner_sum_.load(std::memory_order_relaxed);
+  } while (owners_sum != LockOwnerInfoChecksum(owners_method, owners_dex_pc, owner));
+  if (owner == t) {
+    *method = owners_method;
+    *dex_pc = owners_dex_pc;
+  } else {
+    *method = nullptr;
+    *dex_pc = 0;
+  }
+}
+
+
 }  // namespace art
 
 #endif  // ART_RUNTIME_MONITOR_INL_H_
diff --git a/runtime/monitor.cc b/runtime/monitor.cc
index 9d114ed..6b05e28 100644
--- a/runtime/monitor.cc
+++ b/runtime/monitor.cc
@@ -52,7 +52,8 @@
 /*
  * Every Object has a monitor associated with it, but not every Object is actually locked.  Even
  * the ones that are locked do not need a full-fledged monitor until a) there is actual contention
- * or b) wait() is called on the Object.
+ * or b) wait() is called on the Object, or (c) we need to lock an object that also has an
+ * identity hashcode.
  *
  * For Android, we have implemented a scheme similar to the one described in Bacon et al.'s
  * "Thin locks: featherweight synchronization for Java" (ACM 1998).  Things are even easier for us,
@@ -91,7 +92,6 @@
 
 Monitor::Monitor(Thread* self, Thread* owner, ObjPtr<mirror::Object> obj, int32_t hash_code)
     : monitor_lock_("a monitor lock", kMonitorLock),
-      monitor_contenders_("monitor contenders", monitor_lock_),
       num_waiters_(0),
       owner_(owner),
       lock_count_(0),
@@ -99,8 +99,8 @@
       wait_set_(nullptr),
       wake_set_(nullptr),
       hash_code_(hash_code),
-      locking_method_(nullptr),
-      locking_dex_pc_(0),
+      lock_owner_method_(nullptr),
+      lock_owner_dex_pc_(0),
       monitor_id_(MonitorPool::ComputeMonitorId(this, self)) {
 #ifdef __LP64__
   DCHECK(false) << "Should not be reached in 64b";
@@ -118,7 +118,6 @@
                  int32_t hash_code,
                  MonitorId id)
     : monitor_lock_("a monitor lock", kMonitorLock),
-      monitor_contenders_("monitor contenders", monitor_lock_),
       num_waiters_(0),
       owner_(owner),
       lock_count_(0),
@@ -126,8 +125,8 @@
       wait_set_(nullptr),
       wake_set_(nullptr),
       hash_code_(hash_code),
-      locking_method_(nullptr),
-      locking_dex_pc_(0),
+      lock_owner_(nullptr),
+      lock_owner_request_(nullptr),
       monitor_id_(id) {
 #ifdef __LP64__
   next_free_ = nullptr;
@@ -150,20 +149,106 @@
   return hc;
 }
 
-bool Monitor::Install(Thread* self) {
-  MutexLock mu(self, monitor_lock_);  // Uncontended mutex acquisition as monitor isn't yet public.
-  CHECK(owner_ == nullptr || owner_ == self || owner_->IsSuspended());
+void Monitor::SetLockingMethod(Thread* owner) {
+  DCHECK(owner == Thread::Current() || owner->IsSuspended());
+  // Do not abort on dex pc errors. This can easily happen when we want to dump a stack trace on
+  // abort.
+  ArtMethod* lock_owner_method;
+  uint32_t lock_owner_dex_pc;
+  lock_owner_method = owner->GetCurrentMethod(&lock_owner_dex_pc, false);
+  if (lock_owner_method != nullptr && UNLIKELY(lock_owner_method->IsProxyMethod())) {
+    // Grab another frame. Proxy methods are not helpful for lock profiling. This should be rare
+    // enough that it's OK to walk the stack twice.
+    struct NextMethodVisitor final : public StackVisitor {
+      explicit NextMethodVisitor(Thread* thread) REQUIRES_SHARED(Locks::mutator_lock_)
+          : StackVisitor(thread,
+                         nullptr,
+                         StackVisitor::StackWalkKind::kIncludeInlinedFrames,
+                         false),
+            count_(0),
+            method_(nullptr),
+            dex_pc_(0) {}
+      bool VisitFrame() override REQUIRES_SHARED(Locks::mutator_lock_) {
+        ArtMethod* m = GetMethod();
+        if (m->IsRuntimeMethod()) {
+          // Continue if this is a runtime method.
+          return true;
+        }
+        count_++;
+        if (count_ == 2u) {
+          method_ = m;
+          dex_pc_ = GetDexPc(false);
+          return false;
+        }
+        return true;
+      }
+      size_t count_;
+      ArtMethod* method_;
+      uint32_t dex_pc_;
+    };
+    NextMethodVisitor nmv(owner_.load(std::memory_order_relaxed));
+    nmv.WalkStack();
+    lock_owner_method = nmv.method_;
+    lock_owner_dex_pc = nmv.dex_pc_;
+  }
+  SetLockOwnerInfo(lock_owner_method, lock_owner_dex_pc, owner);
+  DCHECK(lock_owner_method == nullptr || !lock_owner_method->IsProxyMethod());
+}
+
+void Monitor::SetLockingMethodNoProxy(Thread *owner) {
+  DCHECK(owner == Thread::Current());
+  uint32_t lock_owner_dex_pc;
+  ArtMethod* lock_owner_method = owner->GetCurrentMethod(&lock_owner_dex_pc);
+  // We don't expect a proxy method here.
+  DCHECK(lock_owner_method == nullptr || !lock_owner_method->IsProxyMethod());
+  SetLockOwnerInfo(lock_owner_method, lock_owner_dex_pc, owner);
+}
+
+bool Monitor::Install(Thread* self) NO_THREAD_SAFETY_ANALYSIS {
+  // This may or may not result in acquiring monitor_lock_. Its behavior is much more complicated
+  // than what clang thread safety analysis understands.
+  // Monitor is not yet public.
+  Thread* owner = owner_.load(std::memory_order_relaxed);
+  CHECK(owner == nullptr || owner == self || (ART_USE_FUTEXES && owner->IsSuspended()));
   // Propagate the lock state.
   LockWord lw(GetObject()->GetLockWord(false));
   switch (lw.GetState()) {
     case LockWord::kThinLocked: {
-      CHECK_EQ(owner_->GetThreadId(), lw.ThinLockOwner());
+      DCHECK(owner != nullptr);
+      CHECK_EQ(owner->GetThreadId(), lw.ThinLockOwner());
+      DCHECK_EQ(monitor_lock_.GetExclusiveOwnerTid(), 0) << " my tid = " << SafeGetTid(self);
       lock_count_ = lw.ThinLockCount();
-      break;
+#if ART_USE_FUTEXES
+      monitor_lock_.ExclusiveLockUncontendedFor(owner);
+#else
+      monitor_lock_.ExclusiveLock(owner);
+#endif
+      DCHECK_EQ(monitor_lock_.GetExclusiveOwnerTid(), owner->GetTid())
+          << " my tid = " << SafeGetTid(self);
+      LockWord fat(this, lw.GCState());
+      // Publish the updated lock word, which may race with other threads.
+      bool success = GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
+      if (success) {
+        if (ATraceEnabled()) {
+          SetLockingMethod(owner);
+        }
+        return true;
+      } else {
+#if ART_USE_FUTEXES
+        monitor_lock_.ExclusiveUnlockUncontended();
+#else
+        for (uint32_t i = 0; i <= lockCount; ++i) {
+          monitor_lock_.ExclusiveUnlock(owner);
+        }
+#endif
+        return false;
+      }
     }
     case LockWord::kHashCode: {
       CHECK_EQ(hash_code_.load(std::memory_order_relaxed), static_cast<int32_t>(lw.GetHashCode()));
-      break;
+      DCHECK_EQ(monitor_lock_.GetExclusiveOwnerTid(), 0) << " my tid = " << SafeGetTid(self);
+      LockWord fat(this, lw.GCState());
+      return GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
     }
     case LockWord::kFatLocked: {
       // The owner_ is suspended but another thread beat us to install a monitor.
@@ -178,52 +263,6 @@
       UNREACHABLE();
     }
   }
-  LockWord fat(this, lw.GCState());
-  // Publish the updated lock word, which may race with other threads.
-  bool success = GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
-  // Lock profiling.
-  if (success && owner_ != nullptr && lock_profiling_threshold_ != 0) {
-    // Do not abort on dex pc errors. This can easily happen when we want to dump a stack trace on
-    // abort.
-    locking_method_ = owner_->GetCurrentMethod(&locking_dex_pc_, false);
-    if (locking_method_ != nullptr && UNLIKELY(locking_method_->IsProxyMethod())) {
-      // Grab another frame. Proxy methods are not helpful for lock profiling. This should be rare
-      // enough that it's OK to walk the stack twice.
-      struct NextMethodVisitor final : public StackVisitor {
-        explicit NextMethodVisitor(Thread* thread) REQUIRES_SHARED(Locks::mutator_lock_)
-            : StackVisitor(thread,
-                           nullptr,
-                           StackVisitor::StackWalkKind::kIncludeInlinedFrames,
-                           false),
-              count_(0),
-              method_(nullptr),
-              dex_pc_(0) {}
-        bool VisitFrame() override REQUIRES_SHARED(Locks::mutator_lock_) {
-          ArtMethod* m = GetMethod();
-          if (m->IsRuntimeMethod()) {
-            // Continue if this is a runtime method.
-            return true;
-          }
-          count_++;
-          if (count_ == 2u) {
-            method_ = m;
-            dex_pc_ = GetDexPc(false);
-            return false;
-          }
-          return true;
-        }
-        size_t count_;
-        ArtMethod* method_;
-        uint32_t dex_pc_;
-      };
-      NextMethodVisitor nmv(owner_);
-      nmv.WalkStack();
-      locking_method_ = nmv.method_;
-      locking_dex_pc_ = nmv.dex_pc_;
-    }
-    DCHECK(locking_method_ == nullptr || !locking_method_->IsProxyMethod());
-  }
-  return success;
 }
 
 Monitor::~Monitor() {
@@ -371,226 +410,222 @@
   return oss.str();
 }
 
-bool Monitor::TryLockLocked(Thread* self) {
-  if (owner_ == nullptr) {  // Unowned.
-    owner_ = self;
-    CHECK_EQ(lock_count_, 0);
-    // When debugging, save the current monitor holder for future
-    // acquisition failures to use in sampled logging.
-    if (lock_profiling_threshold_ != 0) {
-      locking_method_ = self->GetCurrentMethod(&locking_dex_pc_);
-      // We don't expect a proxy method here.
-      DCHECK(locking_method_ == nullptr || !locking_method_->IsProxyMethod());
-    }
-  } else if (owner_ == self) {  // Recursive.
+bool Monitor::TryLock(Thread* self, bool spin) {
+  Thread *owner = owner_.load(std::memory_order_relaxed);
+  if (owner == self) {
     lock_count_++;
+    CHECK_NE(lock_count_, 0u);  // Abort on overflow.
   } else {
-    return false;
+    bool success = spin ? monitor_lock_.ExclusiveTryLockWithSpinning(self)
+        : monitor_lock_.ExclusiveTryLock(self);
+    if (!success) {
+      return false;
+    }
+    DCHECK(owner_.load(std::memory_order_relaxed) == nullptr);
+    owner_.store(self, std::memory_order_relaxed);
+    CHECK_EQ(lock_count_, 0u);
+    if (ATraceEnabled()) {
+      SetLockingMethodNoProxy(self);
+    }
   }
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   AtraceMonitorLock(self, GetObject(), /* is_wait= */ false);
   return true;
 }
 
-bool Monitor::TryLock(Thread* self) {
-  MutexLock mu(self, monitor_lock_);
-  return TryLockLocked(self);
-}
-
-// Asserts that a mutex isn't held when the class comes into and out of scope.
-class ScopedAssertNotHeld {
- public:
-  ScopedAssertNotHeld(Thread* self, Mutex& mu) : self_(self), mu_(mu) {
-    mu_.AssertNotHeld(self_);
-  }
-
-  ~ScopedAssertNotHeld() {
-    mu_.AssertNotHeld(self_);
-  }
-
- private:
-  Thread* const self_;
-  Mutex& mu_;
-  DISALLOW_COPY_AND_ASSIGN(ScopedAssertNotHeld);
-};
-
 template <LockReason reason>
 void Monitor::Lock(Thread* self) {
-  ScopedAssertNotHeld sanh(self, monitor_lock_);
   bool called_monitors_callback = false;
-  monitor_lock_.Lock(self);
-  while (true) {
-    if (TryLockLocked(self)) {
-      break;
+  if (TryLock(self, /*spin=*/ true)) {
+    // TODO: This preserves original behavior. Correct?
+    if (called_monitors_callback) {
+      CHECK(reason == LockReason::kForLock);
+      Runtime::Current()->GetRuntimeCallbacks()->MonitorContendedLocked(this);
     }
-    // Contended.
-    const bool log_contention = (lock_profiling_threshold_ != 0);
-    uint64_t wait_start_ms = log_contention ? MilliTime() : 0;
-    ArtMethod* owners_method = locking_method_;
-    uint32_t owners_dex_pc = locking_dex_pc_;
-    // Do this before releasing the lock so that we don't get deflated.
-    size_t num_waiters = num_waiters_;
-    ++num_waiters_;
-
-    // If systrace logging is enabled, first look at the lock owner. Acquiring the monitor's
-    // lock and then re-acquiring the mutator lock can deadlock.
-    bool started_trace = false;
-    if (ATraceEnabled()) {
-      if (owner_ != nullptr) {  // Did the owner_ give the lock up?
-        std::ostringstream oss;
-        std::string name;
-        owner_->GetThreadName(name);
-        oss << PrettyContentionInfo(name,
-                                    owner_->GetTid(),
-                                    owners_method,
-                                    owners_dex_pc,
-                                    num_waiters);
-        // Add info for contending thread.
-        uint32_t pc;
-        ArtMethod* m = self->GetCurrentMethod(&pc);
-        const char* filename;
-        int32_t line_number;
-        TranslateLocation(m, pc, &filename, &line_number);
-        oss << " blocking from "
-            << ArtMethod::PrettyMethod(m) << "(" << (filename != nullptr ? filename : "null")
-            << ":" << line_number << ")";
-        ATraceBegin(oss.str().c_str());
-        started_trace = true;
-      }
-    }
-
-    monitor_lock_.Unlock(self);  // Let go of locks in order.
-    // Call the contended locking cb once and only once. Also only call it if we are locking for
-    // the first time, not during a Wait wakeup.
-    if (reason == LockReason::kForLock && !called_monitors_callback) {
-      called_monitors_callback = true;
-      Runtime::Current()->GetRuntimeCallbacks()->MonitorContendedLocking(this);
-    }
-    self->SetMonitorEnterObject(GetObject().Ptr());
-    {
-      ScopedThreadSuspension tsc(self, kBlocked);  // Change to blocked and give up mutator_lock_.
-      uint32_t original_owner_thread_id = 0u;
-      {
-        // Reacquire monitor_lock_ without mutator_lock_ for Wait.
-        MutexLock mu2(self, monitor_lock_);
-        if (owner_ != nullptr) {  // Did the owner_ give the lock up?
-          original_owner_thread_id = owner_->GetThreadId();
-          monitor_contenders_.Wait(self);  // Still contended so wait.
-        }
-      }
-      if (original_owner_thread_id != 0u) {
-        // Woken from contention.
-        if (log_contention) {
-          uint64_t wait_ms = MilliTime() - wait_start_ms;
-          uint32_t sample_percent;
-          if (wait_ms >= lock_profiling_threshold_) {
-            sample_percent = 100;
-          } else {
-            sample_percent = 100 * wait_ms / lock_profiling_threshold_;
-          }
-          if (sample_percent != 0 && (static_cast<uint32_t>(rand() % 100) < sample_percent)) {
-            // Reacquire mutator_lock_ for logging.
-            ScopedObjectAccess soa(self);
-
-            bool owner_alive = false;
-            pid_t original_owner_tid = 0;
-            std::string original_owner_name;
-
-            const bool should_dump_stacks = stack_dump_lock_profiling_threshold_ > 0 &&
-                wait_ms > stack_dump_lock_profiling_threshold_;
-            std::string owner_stack_dump;
-
-            // Acquire thread-list lock to find thread and keep it from dying until we've got all
-            // the info we need.
-            {
-              Locks::thread_list_lock_->ExclusiveLock(Thread::Current());
-
-              // Re-find the owner in case the thread got killed.
-              Thread* original_owner = Runtime::Current()->GetThreadList()->FindThreadByThreadId(
-                  original_owner_thread_id);
-
-              if (original_owner != nullptr) {
-                owner_alive = true;
-                original_owner_tid = original_owner->GetTid();
-                original_owner->GetThreadName(original_owner_name);
-
-                if (should_dump_stacks) {
-                  // Very long contention. Dump stacks.
-                  struct CollectStackTrace : public Closure {
-                    void Run(art::Thread* thread) override
-                        REQUIRES_SHARED(art::Locks::mutator_lock_) {
-                      thread->DumpJavaStack(oss);
-                    }
-
-                    std::ostringstream oss;
-                  };
-                  CollectStackTrace owner_trace;
-                  // RequestSynchronousCheckpoint releases the thread_list_lock_ as a part of its
-                  // execution.
-                  original_owner->RequestSynchronousCheckpoint(&owner_trace);
-                  owner_stack_dump = owner_trace.oss.str();
-                } else {
-                  Locks::thread_list_lock_->ExclusiveUnlock(Thread::Current());
-                }
-              } else {
-                Locks::thread_list_lock_->ExclusiveUnlock(Thread::Current());
-              }
-              // This is all the data we need. Now drop the thread-list lock, it's OK for the
-              // owner to go away now.
-            }
-
-            // If we found the owner (and thus have owner data), go and log now.
-            if (owner_alive) {
-              // Give the detailed traces for really long contention.
-              if (should_dump_stacks) {
-                // This must be here (and not above) because we cannot hold the thread-list lock
-                // while running the checkpoint.
-                std::ostringstream self_trace_oss;
-                self->DumpJavaStack(self_trace_oss);
-
-                uint32_t pc;
-                ArtMethod* m = self->GetCurrentMethod(&pc);
-
-                LOG(WARNING) << "Long "
-                    << PrettyContentionInfo(original_owner_name,
-                                            original_owner_tid,
-                                            owners_method,
-                                            owners_dex_pc,
-                                            num_waiters)
-                    << " in " << ArtMethod::PrettyMethod(m) << " for "
-                    << PrettyDuration(MsToNs(wait_ms)) << "\n"
-                    << "Current owner stack:\n" << owner_stack_dump
-                    << "Contender stack:\n" << self_trace_oss.str();
-              } else if (wait_ms > kLongWaitMs && owners_method != nullptr) {
-                uint32_t pc;
-                ArtMethod* m = self->GetCurrentMethod(&pc);
-                // TODO: We should maybe check that original_owner is still a live thread.
-                LOG(WARNING) << "Long "
-                    << PrettyContentionInfo(original_owner_name,
-                                            original_owner_tid,
-                                            owners_method,
-                                            owners_dex_pc,
-                                            num_waiters)
-                    << " in " << ArtMethod::PrettyMethod(m) << " for "
-                    << PrettyDuration(MsToNs(wait_ms));
-              }
-              LogContentionEvent(self,
-                                wait_ms,
-                                sample_percent,
-                                owners_method,
-                                owners_dex_pc);
-            }
-          }
-        }
-      }
-    }
-    if (started_trace) {
-      ATraceEnd();
-    }
-    self->SetMonitorEnterObject(nullptr);
-    monitor_lock_.Lock(self);  // Reacquire locks in order.
-    --num_waiters_;
+    return;
   }
-  monitor_lock_.Unlock(self);
+  // Contended; not reentrant. We hold no locks, so tread carefully.
+  const bool log_contention = (lock_profiling_threshold_ != 0);
+  uint64_t wait_start_ms = log_contention ? MilliTime() : 0;
+
+  Thread *orig_owner = nullptr;
+  ArtMethod* owners_method;
+  uint32_t owners_dex_pc;
+
+  // Do this before releasing the mutator lock so that we don't get deflated.
+  size_t num_waiters = num_waiters_.fetch_add(1, std::memory_order_relaxed);
+
+  bool started_trace = false;
+  if (ATraceEnabled() && owner_.load(std::memory_order_relaxed) != nullptr) {
+    // Acquiring thread_list_lock_ ensures that owner doesn't disappear while
+    // we're looking at it.
+    Locks::thread_list_lock_->ExclusiveLock(self);
+    orig_owner = owner_.load(std::memory_order_relaxed);
+    if (orig_owner != nullptr) {  // Did the owner_ give the lock up?
+      const uint32_t orig_owner_thread_id = orig_owner->GetThreadId();
+      GetLockOwnerInfo(&owners_method, &owners_dex_pc, orig_owner);
+      std::ostringstream oss;
+      std::string name;
+      orig_owner->GetThreadName(name);
+      oss << PrettyContentionInfo(name,
+                                  orig_owner_thread_id,
+                                  owners_method,
+                                  owners_dex_pc,
+                                  num_waiters);
+      Locks::thread_list_lock_->ExclusiveUnlock(self);
+      // Add info for contending thread.
+      uint32_t pc;
+      ArtMethod* m = self->GetCurrentMethod(&pc);
+      const char* filename;
+      int32_t line_number;
+      TranslateLocation(m, pc, &filename, &line_number);
+      oss << " blocking from "
+          << ArtMethod::PrettyMethod(m) << "(" << (filename != nullptr ? filename : "null")
+          << ":" << line_number << ")";
+      ATraceBegin(oss.str().c_str());
+      started_trace = true;
+    } else {
+      Locks::thread_list_lock_->ExclusiveUnlock(self);
+    }
+  }
+  if (log_contention) {
+    // Request the current holder to set lock_owner_info.
+    // Do this even if tracing is enabled, so we semi-consistently get the information
+    // corresponding to MonitorExit.
+    // TODO: Consider optionally obtaining a stack trace here via a checkpoint.  That would allow
+    // us to see what the other thread is doing while we're waiting.
+    orig_owner = owner_.load(std::memory_order_relaxed);
+    lock_owner_request_.store(orig_owner, std::memory_order_relaxed);
+  }
+  // Call the contended locking cb once and only once. Also only call it if we are locking for
+  // the first time, not during a Wait wakeup.
+  if (reason == LockReason::kForLock && !called_monitors_callback) {
+    called_monitors_callback = true;
+    Runtime::Current()->GetRuntimeCallbacks()->MonitorContendedLocking(this);
+  }
+  self->SetMonitorEnterObject(GetObject().Ptr());
+  {
+    ScopedThreadSuspension tsc(self, kBlocked);  // Change to blocked and give up mutator_lock_.
+
+    // Acquire monitor_lock_ without mutator_lock_, expecting to block this time.
+    // We already tried spinning above. The shutdown procedure currently assumes we stop
+    // touching monitors shortly after we suspend, so don't spin again here.
+    monitor_lock_.ExclusiveLock(self);
+
+    if (log_contention && orig_owner != nullptr) {
+      // Woken from contention.
+      uint64_t wait_ms = MilliTime() - wait_start_ms;
+      uint32_t sample_percent;
+      if (wait_ms >= lock_profiling_threshold_) {
+        sample_percent = 100;
+      } else {
+        sample_percent = 100 * wait_ms / lock_profiling_threshold_;
+      }
+      if (sample_percent != 0 && (static_cast<uint32_t>(rand() % 100) < sample_percent)) {
+        // Do this unconditionally for consistency. It's possible another thread
+        // snuck in in the middle, and tracing was enabled. In that case, we may get its
+        // MonitorEnter information. We can live with that.
+        GetLockOwnerInfo(&owners_method, &owners_dex_pc, orig_owner);
+
+        // Reacquire mutator_lock_ for logging.
+        ScopedObjectAccess soa(self);
+
+        const bool should_dump_stacks = stack_dump_lock_profiling_threshold_ > 0 &&
+            wait_ms > stack_dump_lock_profiling_threshold_;
+
+        // Acquire thread-list lock to find thread and keep it from dying until we've got all
+        // the info we need.
+        Locks::thread_list_lock_->ExclusiveLock(self);
+
+        // Is there still a thread at the same address as the original owner?
+        // We tolerate the fact that it may occasionally be the wrong one.
+        if (Runtime::Current()->GetThreadList()->Contains(orig_owner)) {
+          uint32_t original_owner_tid = orig_owner->GetTid();  // System thread id.
+          std::string original_owner_name;
+          orig_owner->GetThreadName(original_owner_name);
+          std::string owner_stack_dump;
+
+          if (should_dump_stacks) {
+            // Very long contention. Dump stacks.
+            struct CollectStackTrace : public Closure {
+              void Run(art::Thread* thread) override
+                  REQUIRES_SHARED(art::Locks::mutator_lock_) {
+                thread->DumpJavaStack(oss);
+              }
+
+              std::ostringstream oss;
+            };
+            CollectStackTrace owner_trace;
+            // RequestSynchronousCheckpoint releases the thread_list_lock_ as a part of its
+            // execution.
+            orig_owner->RequestSynchronousCheckpoint(&owner_trace);
+            owner_stack_dump = owner_trace.oss.str();
+          } else {
+            Locks::thread_list_lock_->ExclusiveUnlock(self);
+          }
+
+          // This is all the data we need. We dropped the thread-list lock, it's OK for the
+          // owner to go away now.
+
+          if (should_dump_stacks) {
+            // Give the detailed traces for really long contention.
+            // This must be here (and not above) because we cannot hold the thread-list lock
+            // while running the checkpoint.
+            std::ostringstream self_trace_oss;
+            self->DumpJavaStack(self_trace_oss);
+
+            uint32_t pc;
+            ArtMethod* m = self->GetCurrentMethod(&pc);
+
+            LOG(WARNING) << "Long "
+                << PrettyContentionInfo(original_owner_name,
+                                        original_owner_tid,
+                                        owners_method,
+                                        owners_dex_pc,
+                                        num_waiters)
+                << " in " << ArtMethod::PrettyMethod(m) << " for "
+                << PrettyDuration(MsToNs(wait_ms)) << "\n"
+                << "Current owner stack:\n" << owner_stack_dump
+                << "Contender stack:\n" << self_trace_oss.str();
+          } else if (wait_ms > kLongWaitMs && owners_method != nullptr) {
+            uint32_t pc;
+            ArtMethod* m = self->GetCurrentMethod(&pc);
+            // TODO: We should maybe check that original_owner is still a live thread.
+            LOG(WARNING) << "Long "
+                << PrettyContentionInfo(original_owner_name,
+                                        original_owner_tid,
+                                        owners_method,
+                                        owners_dex_pc,
+                                        num_waiters)
+                << " in " << ArtMethod::PrettyMethod(m) << " for "
+                << PrettyDuration(MsToNs(wait_ms));
+          }
+          LogContentionEvent(self,
+                            wait_ms,
+                            sample_percent,
+                            owners_method,
+                            owners_dex_pc);
+        } else {
+          Locks::thread_list_lock_->ExclusiveUnlock(self);
+        }
+      }
+    }
+  }
+  // We've successfully acquired monitor_lock_, released thread_list_lock, and are runnable.
+
+  // We avoided touching monitor fields while suspended, so set owner_ here.
+  owner_.store(self, std::memory_order_relaxed);
+  DCHECK_EQ(lock_count_, 0u);
+
+  if (ATraceEnabled()) {
+    SetLockingMethodNoProxy(self);
+  }
+  if (started_trace) {
+    ATraceEnd();
+  }
+  self->SetMonitorEnterObject(nullptr);
+  num_waiters_.fetch_sub(1, std::memory_order_relaxed);
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   // We need to pair this with a single contended locking call. NB we match the RI behavior and call
   // this even if MonitorEnter failed.
   if (called_monitors_callback) {
@@ -634,7 +669,6 @@
                            uint32_t expected_owner_thread_id,
                            uint32_t found_owner_thread_id,
                            Monitor* monitor) {
-  // Acquire thread list lock so threads won't disappear from under us.
   std::string current_owner_string;
   std::string expected_owner_string;
   std::string found_owner_string;
@@ -700,39 +734,44 @@
 
 bool Monitor::Unlock(Thread* self) {
   DCHECK(self != nullptr);
-  uint32_t owner_thread_id = 0u;
-  DCHECK(!monitor_lock_.IsExclusiveHeld(self));
-  monitor_lock_.Lock(self);
-  Thread* owner = owner_;
-  if (owner != nullptr) {
-    owner_thread_id = owner->GetThreadId();
-  }
+  Thread* owner = owner_.load(std::memory_order_relaxed);
   if (owner == self) {
     // We own the monitor, so nobody else can be in here.
+    CheckLockOwnerRequest(self);
     AtraceMonitorUnlock();
     if (lock_count_ == 0) {
-      owner_ = nullptr;
-      locking_method_ = nullptr;
-      locking_dex_pc_ = 0;
-      SignalContendersAndReleaseMonitorLock(self);
-      return true;
+      owner_.store(nullptr, std::memory_order_relaxed);
+      SignalWaiterAndReleaseMonitorLock(self);
     } else {
       --lock_count_;
-      monitor_lock_.Unlock(self);
-      return true;
+      DCHECK(monitor_lock_.IsExclusiveHeld(self));
+      DCHECK_EQ(owner_.load(std::memory_order_relaxed), self);
+      // Keep monitor_lock_, but pretend we released it.
+      FakeUnlockMonitorLock();
     }
+    return true;
   }
   // We don't own this, so we're not allowed to unlock it.
   // The JNI spec says that we should throw IllegalMonitorStateException in this case.
+  uint32_t owner_thread_id = 0u;
+  {
+    MutexLock mu(self, *Locks::thread_list_lock_);
+    owner = owner_.load(std::memory_order_relaxed);
+    if (owner != nullptr) {
+      owner_thread_id = owner->GetThreadId();
+    }
+  }
   FailedUnlock(GetObject(), self->GetThreadId(), owner_thread_id, this);
-  monitor_lock_.Unlock(self);
+  // Pretend to release monitor_lock_, which we should not.
+  FakeUnlockMonitorLock();
   return false;
 }
 
-void Monitor::SignalContendersAndReleaseMonitorLock(Thread* self) {
-  // We want to signal one thread to wake up, to acquire the monitor that
-  // we are releasing. This could either be a Thread waiting on its own
-  // ConditionVariable, or a thread waiting on monitor_contenders_.
+void Monitor::SignalWaiterAndReleaseMonitorLock(Thread* self) {
+  // We want to release the monitor and signal up to one thread that was waiting
+  // but has since been notified.
+  DCHECK_EQ(lock_count_, 0u);
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   while (wake_set_ != nullptr) {
     // No risk of waking ourselves here; since monitor_lock_ is not released until we're ready to
     // return, notify can't move the current thread from wait_set_ to wake_set_ until this
@@ -740,6 +779,7 @@
     Thread* thread = wake_set_;
     wake_set_ = thread->GetWaitNext();
     thread->SetWaitNext(nullptr);
+    DCHECK(owner_.load(std::memory_order_relaxed) == nullptr);
 
     // Check to see if the thread is still waiting.
     {
@@ -764,16 +804,14 @@
         // Release the lock, so that a potentially awakened thread will not
         // immediately contend on it. The lock ordering here is:
         // monitor_lock_, self->GetWaitMutex, thread->GetWaitMutex
-        monitor_lock_.Unlock(self);
+        monitor_lock_.Unlock(self);  // Releases contenders.
         thread->GetWaitConditionVariable()->Signal(self);
         return;
       }
     }
   }
-  // If we didn't wake any threads that were originally waiting on us,
-  // wake a contender.
-  monitor_contenders_.Signal(self);
   monitor_lock_.Unlock(self);
+  DCHECK(!monitor_lock_.IsExclusiveHeld(self));
 }
 
 void Monitor::Wait(Thread* self, int64_t ms, int32_t ns,
@@ -781,11 +819,8 @@
   DCHECK(self != nullptr);
   DCHECK(why == kTimedWaiting || why == kWaiting || why == kSleeping);
 
-  monitor_lock_.Lock(self);
-
   // Make sure that we hold the lock.
-  if (owner_ != self) {
-    monitor_lock_.Unlock(self);
+  if (owner_.load(std::memory_order_relaxed) != self) {
     ThrowIllegalMonitorStateExceptionF("object not locked by thread before wait()");
     return;
   }
@@ -798,23 +833,19 @@
 
   // Enforce the timeout range.
   if (ms < 0 || ns < 0 || ns > 999999) {
-    monitor_lock_.Unlock(self);
     self->ThrowNewExceptionF("Ljava/lang/IllegalArgumentException;",
                              "timeout arguments out of range: ms=%" PRId64 " ns=%d", ms, ns);
     return;
   }
 
+  CheckLockOwnerRequest(self);
+
   /*
    * Release our hold - we need to let it go even if we're a few levels
    * deep in a recursive lock, and we need to restore that later.
    */
-  int prev_lock_count = lock_count_;
+  unsigned int prev_lock_count = lock_count_;
   lock_count_ = 0;
-  owner_ = nullptr;
-  ArtMethod* saved_method = locking_method_;
-  locking_method_ = nullptr;
-  uintptr_t saved_dex_pc = locking_dex_pc_;
-  locking_dex_pc_ = 0;
 
   AtraceMonitorUnlock();  // For the implict Unlock() just above. This will only end the deepest
                           // nesting, but that is enough for the visualization, and corresponds to
@@ -823,6 +854,9 @@
 
   bool was_interrupted = false;
   bool timed_out = false;
+  // Update monitor state now; it's not safe once we're "suspended".
+  owner_.store(nullptr, std::memory_order_relaxed);
+  num_waiters_.fetch_add(1, std::memory_order_relaxed);
   {
     // Update thread state. If the GC wakes up, it'll ignore us, knowing
     // that we won't touch any references in this state, and we'll check
@@ -840,8 +874,6 @@
      * until we've signalled contenders on this monitor.
      */
     AppendToWaitSet(self);
-    ++num_waiters_;
-
 
     // Set wait_monitor_ to the monitor object we will be waiting on. When wait_monitor_ is
     // non-null a notifying or interrupting thread must signal the thread's wait_cond_ to wake it
@@ -850,7 +882,8 @@
     self->SetWaitMonitor(this);
 
     // Release the monitor lock.
-    SignalContendersAndReleaseMonitorLock(self);
+    DCHECK(monitor_lock_.IsExclusiveHeld(self));
+    SignalWaiterAndReleaseMonitorLock(self);
 
     // Handle the case where the thread was interrupted before we called wait().
     if (self->IsInterrupted()) {
@@ -899,30 +932,18 @@
 
   // Re-acquire the monitor and lock.
   Lock<LockReason::kForWait>(self);
-  monitor_lock_.Lock(self);
+  lock_count_ = prev_lock_count;
+  DCHECK(monitor_lock_.IsExclusiveHeld(self));
   self->GetWaitMutex()->AssertNotHeld(self);
 
-  /*
-   * We remove our thread from wait set after restoring the count
-   * and owner fields so the subroutine can check that the calling
-   * thread owns the monitor. Aside from that, the order of member
-   * updates is not order sensitive as we hold the pthread mutex.
-   */
-  owner_ = self;
-  lock_count_ = prev_lock_count;
-  locking_method_ = saved_method;
-  locking_dex_pc_ = saved_dex_pc;
-  --num_waiters_;
+  num_waiters_.fetch_sub(1, std::memory_order_relaxed);
   RemoveFromWaitSet(self);
-
-  monitor_lock_.Unlock(self);
 }
 
 void Monitor::Notify(Thread* self) {
   DCHECK(self != nullptr);
-  MutexLock mu(self, monitor_lock_);
   // Make sure that we hold the lock.
-  if (owner_ != self) {
+  if (owner_.load(std::memory_order_relaxed) != self) {
     ThrowIllegalMonitorStateExceptionF("object not locked by thread before notify()");
     return;
   }
@@ -937,9 +958,8 @@
 
 void Monitor::NotifyAll(Thread* self) {
   DCHECK(self != nullptr);
-  MutexLock mu(self, monitor_lock_);
   // Make sure that we hold the lock.
-  if (owner_ != self) {
+  if (owner_.load(std::memory_order_relaxed) != self) {
     ThrowIllegalMonitorStateExceptionF("object not locked by thread before notifyAll()");
     return;
   }
@@ -968,30 +988,18 @@
   if (lw.GetState() == LockWord::kFatLocked) {
     Monitor* monitor = lw.FatLockMonitor();
     DCHECK(monitor != nullptr);
-    MutexLock mu(self, monitor->monitor_lock_);
-    // Can't deflate if we have anybody waiting on the CV.
-    if (monitor->num_waiters_ > 0) {
+    // Can't deflate if we have anybody waiting on the CV or trying to acquire the monitor.
+    if (monitor->num_waiters_.load(std::memory_order_relaxed) > 0) {
       return false;
     }
-    Thread* owner = monitor->owner_;
-    if (owner != nullptr) {
-      // Can't deflate if we are locked and have a hash code.
-      if (monitor->HasHashCode()) {
-        return false;
-      }
-      // Can't deflate if our lock count is too high.
-      if (static_cast<uint32_t>(monitor->lock_count_) > LockWord::kThinLockMaxCount) {
-        return false;
-      }
-      // Deflate to a thin lock.
-      LockWord new_lw = LockWord::FromThinLockId(owner->GetThreadId(),
-                                                 monitor->lock_count_,
-                                                 lw.GCState());
-      // Assume no concurrent read barrier state changes as mutators are suspended.
-      obj->SetLockWord(new_lw, false);
-      VLOG(monitor) << "Deflated " << obj << " to thin lock " << owner->GetTid() << " / "
-          << monitor->lock_count_;
-    } else if (monitor->HasHashCode()) {
+    if (!monitor->monitor_lock_.ExclusiveTryLock(self)) {
+      // We cannot deflate a monitor that's currently held. It's unclear whether we should if
+      // we could.
+      return false;
+    }
+    DCHECK_EQ(monitor->lock_count_, 0u);
+    DCHECK_EQ(monitor->owner_.load(std::memory_order_relaxed), static_cast<Thread*>(nullptr));
+    if (monitor->HasHashCode()) {
       LockWord new_lw = LockWord::FromHashCode(monitor->GetHashCode(), lw.GCState());
       // Assume no concurrent read barrier state changes as mutators are suspended.
       obj->SetLockWord(new_lw, false);
@@ -1003,6 +1011,8 @@
       obj->SetLockWord(new_lw, false);
       VLOG(monitor) << "Deflated" << obj << " to empty lock word";
     }
+    monitor->monitor_lock_.ExclusiveUnlock(self);
+    DCHECK(!(monitor->monitor_lock_.IsExclusiveHeld(self)));
     // The monitor is deflated, mark the object as null so that we know to delete it during the
     // next GC.
     monitor->obj_ = GcRoot<mirror::Object>(nullptr);
@@ -1088,6 +1098,10 @@
   size_t contention_count = 0;
   StackHandleScope<1> hs(self);
   Handle<mirror::Object> h_obj(hs.NewHandle(obj));
+#if !ART_USE_FUTEXES
+  // In this case we cannot inflate an unowned monitor, so we sometimes defer inflation.
+  bool should_inflate = false;
+#endif
   while (true) {
     // We initially read the lockword with ordinary Java/relaxed semantics. When stronger
     // semantics are needed, we address it below. Since GetLockWord bottoms out to a relaxed load,
@@ -1098,6 +1112,11 @@
         // No ordering required for preceding lockword read, since we retest.
         LockWord thin_locked(LockWord::FromThinLockId(thread_id, 0, lock_word.GCState()));
         if (h_obj->CasLockWord(lock_word, thin_locked, CASMode::kWeak, std::memory_order_acquire)) {
+#if !ART_USE_FUTEXES
+          if (should_inflate) {
+            InflateThinLocked(self, h_obj, lock_word, 0);
+          }
+#endif
           AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false);
           return h_obj.Get();  // Success!
         }
@@ -1152,9 +1171,16 @@
             // of nanoseconds or less.
             sched_yield();
           } else {
+#if ART_USE_FUTEXES
             contention_count = 0;
             // No ordering required for initial lockword read. Install rereads it anyway.
             InflateThinLocked(self, h_obj, lock_word, 0);
+#else
+            // Can't inflate from non-owning thread. Keep waiting. Bad for power, but this code
+            // isn't used on-device.
+            should_inflate = true;
+            usleep(10);
+#endif
           }
         }
         continue;  // Start from the beginning.
@@ -1168,6 +1194,7 @@
           return mon->TryLock(self) ? h_obj.Get() : nullptr;
         } else {
           mon->Lock(self);
+          DCHECK(mon->monitor_lock_.IsExclusiveHeld(self));
           return h_obj.Get();  // Success!
         }
       }
@@ -1531,8 +1558,7 @@
 }
 
 bool Monitor::IsLocked() REQUIRES_SHARED(Locks::mutator_lock_) {
-  MutexLock mu(Thread::Current(), monitor_lock_);
-  return owner_ != nullptr;
+  return GetOwner() != nullptr;
 }
 
 void Monitor::TranslateLocation(ArtMethod* method,
@@ -1553,8 +1579,9 @@
 }
 
 uint32_t Monitor::GetOwnerThreadId() {
-  MutexLock mu(Thread::Current(), monitor_lock_);
-  Thread* owner = owner_;
+  // Make sure owner is not deallocated during access.
+  MutexLock mu(Thread::Current(), *Locks::thread_list_lock_);
+  Thread* owner = GetOwner();
   if (owner != nullptr) {
     return owner->GetThreadId();
   } else {
@@ -1682,14 +1709,16 @@
       break;
     case LockWord::kFatLocked: {
       Monitor* mon = lock_word.FatLockMonitor();
-      owner_ = mon->owner_;
+      owner_ = mon->owner_.load(std::memory_order_relaxed);
       // Here it is okay for the owner to be null since we don't reset the LockWord back to
       // kUnlocked until we get a GC. In cases where this hasn't happened yet we will have a fat
       // lock without an owner.
+      // Neither owner_ nor entry_count_ is touched by threads in "suspended" state, so
+      // we must see consistent values.
       if (owner_ != nullptr) {
         entry_count_ = 1 + mon->lock_count_;
       } else {
-        DCHECK_EQ(mon->lock_count_, 0) << "Monitor is fat-locked without any owner!";
+        DCHECK_EQ(mon->lock_count_, 0u) << "Monitor is fat-locked without any owner!";
       }
       for (Thread* waiter = mon->wait_set_; waiter != nullptr; waiter = waiter->GetWaitNext()) {
         waiters_.push_back(waiter);
diff --git a/runtime/monitor.h b/runtime/monitor.h
index 4187f27..0714da1 100644
--- a/runtime/monitor.h
+++ b/runtime/monitor.h
@@ -21,6 +21,7 @@
 #include <stdint.h>
 #include <stdlib.h>
 
+#include <atomic>
 #include <iosfwd>
 #include <list>
 #include <vector>
@@ -129,12 +130,14 @@
 
   void SetObject(ObjPtr<mirror::Object> object);
 
-  Thread* GetOwner() const NO_THREAD_SAFETY_ANALYSIS {
-    return owner_;
+  // Provides no memory ordering guarantees.
+  Thread* GetOwner() const {
+    return owner_.load(std::memory_order_relaxed);
   }
 
   int32_t GetHashCode();
 
+  // Is the monitor currently locked? Debug only, provides no memory ordering guarantees.
   bool IsLocked() REQUIRES_SHARED(Locks::mutator_lock_) REQUIRES(!monitor_lock_);
 
   bool HasHashCode() const {
@@ -176,7 +179,7 @@
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   // Install the monitor into its object, may fail if another thread installs a different monitor
-  // first.
+  // first. Monitor remains in the same logical state as before, i.e. held the same # of times.
   bool Install(Thread* self)
       REQUIRES(!monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
@@ -189,7 +192,10 @@
   // this routine.
   void RemoveFromWaitSet(Thread* thread) REQUIRES(monitor_lock_);
 
-  void SignalContendersAndReleaseMonitorLock(Thread* self) RELEASE(monitor_lock_);
+  // Release the monitor lock and signal a waiting thread that has been notified and now needs the
+  // lock. Assumes the monitor lock is held exactly once, and the owner_ field has been reset to
+  // null. Caller may be suspended (Wait) or runnable (MonitorExit).
+  void SignalWaiterAndReleaseMonitorLock(Thread* self) RELEASE(monitor_lock_);
 
   // Changes the shape of a monitor from thin to fat, preserving the internal lock state. The
   // calling thread must own the lock or the owner must be suspended. There's a race with other
@@ -210,37 +216,33 @@
                            uint32_t expected_owner_thread_id,
                            uint32_t found_owner_thread_id,
                            Monitor* mon)
-      REQUIRES(!Locks::thread_list_lock_,
-               !monitor_lock_)
+      REQUIRES(!Locks::thread_list_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   // Try to lock without blocking, returns true if we acquired the lock.
-  bool TryLock(Thread* self)
-      REQUIRES(!monitor_lock_)
-      REQUIRES_SHARED(Locks::mutator_lock_);
-  // Variant for already holding the monitor lock.
-  bool TryLockLocked(Thread* self)
-      REQUIRES(monitor_lock_)
+  // If spin is true, then we spin for a short period before failing.
+  bool TryLock(Thread* self, bool spin = false)
+      TRY_ACQUIRE(true, monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   template<LockReason reason = LockReason::kForLock>
   void Lock(Thread* self)
-      REQUIRES(!monitor_lock_)
+      ACQUIRE(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   bool Unlock(Thread* thread)
-      REQUIRES(!monitor_lock_)
+      RELEASE(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   static void DoNotify(Thread* self, ObjPtr<mirror::Object> obj, bool notify_all)
       REQUIRES_SHARED(Locks::mutator_lock_) NO_THREAD_SAFETY_ANALYSIS;  // For mon->Notify.
 
   void Notify(Thread* self)
-      REQUIRES(!monitor_lock_)
+      REQUIRES(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   void NotifyAll(Thread* self)
-      REQUIRES(!monitor_lock_)
+      REQUIRES(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   static std::string PrettyContentionInfo(const std::string& owner_name,
@@ -270,7 +272,7 @@
   // Since we're allowed to wake up "early", we clamp extremely long durations to return at the end
   // of the 32-bit time epoch.
   void Wait(Thread* self, int64_t msec, int32_t nsec, bool interruptShouldThrow, ThreadState why)
-      REQUIRES(!monitor_lock_)
+      REQUIRES(monitor_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
   // Translates the provided method and pc into its declaring class' source file and line number.
@@ -279,8 +281,18 @@
                                 int32_t* line_number)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
+  // Provides no memory ordering guarantees.
   uint32_t GetOwnerThreadId() REQUIRES(!monitor_lock_);
 
+  // Set locking_method_ and locking_dex_pc_ corresponding to owner's current stack.
+  // owner is either self or suspended.
+  void SetLockingMethod(Thread* owner) REQUIRES(monitor_lock_)
+      REQUIRES_SHARED(Locks::mutator_lock_);
+
+  // The same, but without checking for a proxy method. Currently requires owner == self.
+  void SetLockingMethodNoProxy(Thread* owner) REQUIRES(monitor_lock_)
+      REQUIRES_SHARED(Locks::mutator_lock_);
+
   // Support for systrace output of monitor operations.
   ALWAYS_INLINE static void AtraceMonitorLock(Thread* self,
                                               ObjPtr<mirror::Object> obj,
@@ -294,19 +306,27 @@
 
   static uint32_t lock_profiling_threshold_;
   static uint32_t stack_dump_lock_profiling_threshold_;
+  static bool capture_method_eagerly_;
 
+  // Holding the monitor N times is represented by holding monitor_lock_ N times.
   Mutex monitor_lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
 
-  ConditionVariable monitor_contenders_ GUARDED_BY(monitor_lock_);
+  // Pretend to unlock monitor lock.
+  void FakeUnlockMonitorLock() RELEASE(monitor_lock_) NO_THREAD_SAFETY_ANALYSIS {}
 
-  // Number of people waiting on the condition.
-  size_t num_waiters_ GUARDED_BY(monitor_lock_);
+  // Number of threads either waiting on the condition or waiting on a contended
+  // monitor acquisition. Prevents deflation.
+  std::atomic<size_t> num_waiters_;
 
-  // Which thread currently owns the lock?
-  Thread* volatile owner_ GUARDED_BY(monitor_lock_);
+  // Which thread currently owns the lock? monitor_lock_ only keeps the tid.
+  // Only set while holding monitor_lock_. Non-locking readers only use it to
+  // compare to self or for debugging.
+  std::atomic<Thread*> owner_;
 
-  // Owner's recursive lock depth.
-  int lock_count_ GUARDED_BY(monitor_lock_);
+  // Owner's recursive lock depth. Owner_ non-null, and lock_count_ == 0 ==> held once.
+  unsigned int lock_count_ GUARDED_BY(monitor_lock_);
+
+  // Owner's recursive lock depth is given by monitor_lock_.GetDepth().
 
   // What object are we part of. This is a weak root. Do not access
   // this directly, use GetObject() to read it so it will be guarded
@@ -322,11 +342,76 @@
   // Stored object hash code, generated lazily by GetHashCode.
   AtomicInteger hash_code_;
 
-  // Method and dex pc where the lock owner acquired the lock, used when lock
-  // sampling is enabled. locking_method_ may be null if the lock is currently
-  // unlocked, or if the lock is acquired by the system when the stack is empty.
-  ArtMethod* locking_method_ GUARDED_BY(monitor_lock_);
-  uint32_t locking_dex_pc_ GUARDED_BY(monitor_lock_);
+  // Data structure used to remember the method and dex pc of a recent holder of the
+  // lock. Used for tracing and contention reporting. Setting these is expensive, since it
+  // involves a partial stack walk. We set them only as follows, to minimize the cost:
+  // - If tracing is enabled, they are needed immediately when we first notice contention, so we
+  //   set them unconditionally when a monitor is acquired.
+  // - If contention reporting is enabled, we use the lock_owner_request_ field to have the
+  //   contending thread request them. The current owner then sets them when releasing the monitor,
+  //   making them available when the contending thread acquires the monitor.
+  // - If both are enabled, we blindly do both. This usually prevents us from switching between
+  //   reporting the end and beginning of critical sections for contention logging when tracing is
+  //   enabled.  We expect that tracing overhead is normally much higher than for contention
+  //   logging, so the added cost should be small. It also minimizes glitches when enabling and
+  //   disabling traces.
+  // We're tolerant of missing information. E.g. when tracing is initially turned on, we may
+  // not have the lock holder information if the holder acquired the lock with tracing off.
+  //
+  // We make this data unconditionally atomic; for contention logging all accesses are in fact
+  // protected by the monitor, but for tracing, reads are not. Writes are always
+  // protected by the monitor.
+  //
+  // The fields are always accessed without memory ordering. We store a checksum, and reread if
+  // the checksum doesn't correspond to the values.  This results in values that are correct with
+  // very high probability, but not certainty.
+  //
+  // If we need lock_owner information for a certain thread for contenion logging, we store its
+  // tid in lock_owner_request_. To satisfy the request, we store lock_owner_tid_,
+  // lock_owner_method_, and lock_owner_dex_pc_ and the corresponding checksum while holding the
+  // monitor.
+  //
+  // At all times, either lock_owner_ is zero, the checksum is valid, or a thread is actively
+  // in the process of establishing one of those states. Only one thread at a time can be actively
+  // establishing such a state, since writes are protected by the monitor.
+  std::atomic<Thread*> lock_owner_;  // *lock_owner_ may no longer exist!
+  std::atomic<ArtMethod*> lock_owner_method_;
+  std::atomic<uint32_t> lock_owner_dex_pc_;
+  std::atomic<uintptr_t> lock_owner_sum_;
+
+  // Request lock owner save method and dex_pc. Written asynchronously.
+  std::atomic<Thread*> lock_owner_request_;
+
+  // Compute method, dex pc, and tid "checksum".
+  uintptr_t LockOwnerInfoChecksum(ArtMethod* m, uint32_t dex_pc, Thread* t);
+
+  // Set owning method, dex pc, and tid. owner_ field is set and points to current thread.
+  void SetLockOwnerInfo(ArtMethod* method, uint32_t dex_pc, Thread* t)
+      REQUIRES(monitor_lock_);
+
+  // Get owning method and dex pc for the given thread, if available.
+  void GetLockOwnerInfo(/*out*/ArtMethod** method, /*out*/uint32_t* dex_pc, Thread* t);
+
+  // Do the same, while holding the monitor. There are no concurrent updates.
+  void GetLockOwnerInfoLocked(/*out*/ArtMethod** method, /*out*/uint32_t* dex_pc,
+                              uint32_t thread_id)
+      REQUIRES(monitor_lock_);
+
+  // We never clear lock_owner method and dex pc. Since it often reflects
+  // ownership when we last detected contention, it may be inconsistent with owner_
+  // and not 100% reliable. For lock contention monitoring, in the absence of tracing,
+  // there is a small risk that the current owner may finish before noticing the request,
+  // or the information will be overwritten by another intervening request and monitor
+  // release, so it's also not 100% reliable. But if we report information at all, it
+  // should generally (modulo accidental checksum matches) pertain to to an acquisition of the
+  // right monitor by the right thread, so it's extremely unlikely to be seriously misleading.
+  // Since we track threads by a pointer to the Thread structure, there is a small chance we may
+  // confuse threads allocated at the same exact address, if a contending thread dies before
+  // we inquire about it.
+
+  // Check for and act on a pending lock_owner_request_
+  void CheckLockOwnerRequest(Thread* self)
+      REQUIRES(monitor_lock_) REQUIRES_SHARED(Locks::mutator_lock_);
 
   // The denser encoded version of this monitor as stored in the lock word.
   MonitorId monitor_id_;
diff --git a/runtime/runtime.cc b/runtime/runtime.cc
index a500d9f..fc589e1 100644
--- a/runtime/runtime.cc
+++ b/runtime/runtime.cc
@@ -416,7 +416,7 @@
   // Shutdown any trace running.
   Trace::Shutdown();
 
-  // Report death. Clients me require a working thread, still, so do it before GC completes and
+  // Report death. Clients may require a working thread, still, so do it before GC completes and
   // all non-daemon threads are done.
   {
     ScopedObjectAccess soa(self);
@@ -439,9 +439,14 @@
 
   // Make sure our internal threads are dead before we start tearing down things they're using.
   GetRuntimeCallbacks()->StopDebugger();
+  // Deletion ordering is tricky. Null out everything we've deleted.
   delete signal_catcher_;
+  signal_catcher_ = nullptr;
 
   // Make sure all other non-daemon threads have terminated, and all daemon threads are suspended.
+  // Also wait for daemon threads to quiesce, so that in addition to being "suspended", they
+  // no longer access monitor and thread list data structures. We leak user daemon threads
+  // themselves, since we have no mechanism for shutting them down.
   {
     ScopedTrace trace2("Delete thread list");
     thread_list_->ShutDown();
@@ -458,7 +463,10 @@
   }
 
   // Finally delete the thread list.
+  // Thread_list_ can be accessed by "suspended" threads, e.g. in InflateThinLocked.
+  // We assume that by this point, we've waited long enough for things to quiesce.
   delete thread_list_;
+  thread_list_ = nullptr;
 
   // Delete the JIT after thread list to ensure that there is no remaining threads which could be
   // accessing the instrumentation when we delete it.
@@ -473,11 +481,17 @@
 
   ScopedTrace trace2("Delete state");
   delete monitor_list_;
+  monitor_list_ = nullptr;
   delete monitor_pool_;
+  monitor_pool_ = nullptr;
   delete class_linker_;
+  class_linker_ = nullptr;
   delete heap_;
+  heap_ = nullptr;
   delete intern_table_;
+  intern_table_ = nullptr;
   delete oat_file_manager_;
+  oat_file_manager_ = nullptr;
   Thread::Shutdown();
   QuasiAtomic::Shutdown();
   verifier::ClassVerifier::Shutdown();
diff --git a/runtime/thread_list.cc b/runtime/thread_list.cc
index d7b12de..8917560 100644
--- a/runtime/thread_list.cc
+++ b/runtime/thread_list.cc
@@ -1351,15 +1351,21 @@
       thread->GetJniEnv()->SetFunctionsToRuntimeShutdownFunctions();
     }
   }
-  // If we have any daemons left, wait 200ms to ensure they are not stuck in a place where they
-  // are about to access runtime state and are not in a runnable state. Examples: Monitor code
-  // or waking up from a condition variable. TODO: Try and see if there is a better way to wait
-  // for daemon threads to be in a blocked state.
-  if (daemons_left > 0) {
-    static constexpr size_t kDaemonSleepTime = 200 * 1000;
-    usleep(kDaemonSleepTime);
+  if (daemons_left == 0) {
+    // No threads left; safe to shut down.
+    return;
   }
-  // Give the threads a chance to suspend, complaining if they're slow.
+  // There is not a clean way to shut down if we have daemons left. We have no mechanism for
+  // killing them and reclaiming thread stacks. We also have no mechanism for waiting until they
+  // have truly finished touching the memory we are about to deallocate. We do the best we can with
+  // timeouts.
+  //
+  // If we have any daemons left, wait until they are (a) suspended and (b) they are not stuck
+  // in a place where they are about to access runtime state and are not in a runnable state.
+  // We attempt to do the latter by just waiting long enough for things to
+  // quiesce. Examples: Monitor code or waking up from a condition variable.
+  //
+  // Give the threads a chance to suspend, complaining if they're slow. (a)
   bool have_complained = false;
   static constexpr size_t kTimeoutMicroseconds = 2000 * 1000;
   static constexpr size_t kSleepMicroseconds = 1000;
@@ -1378,6 +1384,26 @@
       }
     }
     if (all_suspended) {
+      // Wait again for all the now "suspended" threads to actually quiesce. (b)
+      static constexpr size_t kDaemonSleepTime = 200 * 1000;
+      usleep(kDaemonSleepTime);
+      std::list<Thread*> list_copy;
+      {
+        MutexLock mu(self, *Locks::thread_list_lock_);
+        // Half-way through the wait, set the "runtime deleted" flag, causing any newly awoken
+        // threads to immediately go back to sleep without touching memory. This prevents us from
+        // touching deallocated memory, but it also prevents mutexes from getting released. Thus we
+        // only do this once we're reasonably sure that no system mutexes are still held.
+        for (const auto& thread : list_) {
+          DCHECK(thread == self || thread->GetState() != kRunnable);
+          thread->GetJniEnv()->SetRuntimeDeleted();
+          // Possibly contended Mutex acquisitions are unsafe after this.
+          // Releasing thread_list_lock_ is OK, since it can't block.
+        }
+      }
+      // Finally wait for any threads woken before we set the "runtime deleted" flags to finish
+      // touching memory.
+      usleep(kDaemonSleepTime);
       return;
     }
     usleep(kSleepMicroseconds);
diff --git a/runtime/thread_list.h b/runtime/thread_list.h
index 4ea0995..7a3c26b 100644
--- a/runtime/thread_list.h
+++ b/runtime/thread_list.h
@@ -104,6 +104,10 @@
   // Find an existing thread (or self) by its thread id (not tid).
   Thread* FindThreadByThreadId(uint32_t thread_id) REQUIRES(Locks::thread_list_lock_);
 
+  // Does the thread list still contain the given thread, or one at the same address?
+  // Used by Monitor to provide (mostly accurate) debugging information.
+  bool Contains(Thread* thread) REQUIRES(Locks::thread_list_lock_);
+
   // Run a checkpoint on threads, running threads are not suspended but run the checkpoint inside
   // of the suspend check. Returns how many checkpoints that are expected to run, including for
   // already suspended threads for b/24191051. Run the callback, if non-null, inside the
@@ -197,7 +201,6 @@
   uint32_t AllocThreadId(Thread* self);
   void ReleaseThreadId(Thread* self, uint32_t id) REQUIRES(!Locks::allocated_thread_ids_lock_);
 
-  bool Contains(Thread* thread) REQUIRES(Locks::thread_list_lock_);
   bool Contains(pid_t tid) REQUIRES(Locks::thread_list_lock_);
   size_t RunCheckpoint(Closure* checkpoint_function, bool includeSuspended)
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
diff --git a/test/2029-contended-monitors/expected.txt b/test/2029-contended-monitors/expected.txt
new file mode 100644
index 0000000..1894ad7
--- /dev/null
+++ b/test/2029-contended-monitors/expected.txt
@@ -0,0 +1,10 @@
+Starting
+Atomic increments
+Hold time 2, shared lock
+Hold time 20, shared lock
+Hold time 200, shared lock
+Hold time 2000, shared lock
+Hold time 20000, shared lock
+Hold time 200000, shared lock
+Hold for 2 msecs while sleeping, shared lock
+Hold for 2 msecs while sleeping, private lock
diff --git a/test/2029-contended-monitors/info.txt b/test/2029-contended-monitors/info.txt
new file mode 100644
index 0000000..f6ccdd3
--- /dev/null
+++ b/test/2029-contended-monitors/info.txt
@@ -0,0 +1,4 @@
+Checks that monitor-protected increments at various granularities are indeed
+atomic. Also checks j.u.c. increments. Can be configured to print execution
+times for contended and uncontentended monitor acquisition under different
+cicumstances.
diff --git a/test/2029-contended-monitors/src/Main.java b/test/2029-contended-monitors/src/Main.java
new file mode 100644
index 0000000..78d2ae4
--- /dev/null
+++ b/test/2029-contended-monitors/src/Main.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Main {
+
+  private final boolean PRINT_TIMES = false;  // False for use as run test.
+
+  // Number of increments done by each thread.  Must be multiple of largest hold time below,
+  // times any possible thread count. Finishes much faster when used as run test.
+  private final int TOTAL_ITERS = PRINT_TIMES? 16_000_000 : 1_600_000;
+  private final int MAX_HOLD_TIME = PRINT_TIMES? 2_000_000 : 200_000;
+
+  private int counter;
+
+  private AtomicInteger atomicCounter = new AtomicInteger();
+
+  private Object lock;
+
+  private int currentThreadCount = 0;
+
+  // A function such that if we repeatedly apply it to -1, the value oscillates
+  // between -1 and 3. Thus the average value is 1.
+  // This is designed to make it hard for the compiler to predict the values in
+  // the sequence.
+  private int nextInt(int x) {
+    if (x < 0) {
+      return x * x + 2;
+    } else {
+      return x - 4;
+    }
+  }
+
+  // Increment counter by n, holding log for time roughly propertional to n.
+  // N must be even.
+  private void holdFor(Object lock, int n) {
+    synchronized(lock) {
+      int y = -1;
+      for (int i = 0; i < n; ++i) {
+        counter += y;
+        y = nextInt(y);
+      }
+    }
+  }
+
+  private class RepeatedLockHolder implements Runnable {
+    RepeatedLockHolder(boolean shared, int n /* even */) {
+      sharedLock = shared;
+      holdTime = n;
+    }
+    @Override
+    public void run() {
+      Object myLock = sharedLock ? lock : new Object();
+      int nIters = TOTAL_ITERS / currentThreadCount / holdTime;
+      for (int i = 0; i < nIters; ++i) {
+        holdFor(myLock, holdTime);
+      }
+    }
+    private boolean sharedLock;
+    private int holdTime;
+  }
+
+  private class SleepyLockHolder implements Runnable {
+    SleepyLockHolder(boolean shared) {
+      sharedLock = shared;
+    }
+    @Override
+    public void run() {
+      Object myLock = sharedLock ? lock : new Object();
+      int nIters = TOTAL_ITERS / currentThreadCount / 10_000;
+      for (int i = 0; i < nIters; ++i) {
+        synchronized(myLock) {
+          try {
+            Thread.sleep(2);
+          } catch(InterruptedException e) {
+            throw new AssertionError("Unexpected interrupt");
+          }
+          counter += 10_000;
+        }
+      }
+    }
+    private boolean sharedLock;
+  }
+
+  // Increment atomicCounter n times, on average by 1 each time.
+  private class RepeatedIncrementer implements Runnable {
+    @Override
+    public void run() {
+      int y = -1;
+      int nIters = TOTAL_ITERS / currentThreadCount;
+      for (int i = 0; i < nIters; ++i) {
+        atomicCounter.addAndGet(y);
+        y = nextInt(y);
+      }
+    }
+  }
+
+  // Run n threads doing work. Return the elapsed time this took, in milliseconds.
+  private long runMultiple(int n, Runnable work) {
+    Thread[] threads = new Thread[n];
+    // Replace lock, so that we start with a clean, uninflated lock each time.
+    lock = new Object();
+    for (int i = 0; i < n; ++i) {
+      threads[i] = new Thread(work);
+    }
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < n; ++i) {
+      threads[i].start();
+    }
+    for (int i = 0; i < n; ++i) {
+      try {
+        threads[i].join();
+      } catch(InterruptedException e) {
+        throw new AssertionError("Unexpected interrupt");
+      }
+    }
+    return System.currentTimeMillis() - startTime;
+  }
+
+  // Run on different numbers of threads.
+  private void runAll(Runnable work, Runnable init, Runnable checker) {
+    for (int i = 1; i <= 8; i *= 2) {
+      currentThreadCount = i;
+      init.run();
+      long time = runMultiple(i, work);
+      if (PRINT_TIMES) {
+        System.out.print(time + (i == 8 ? "\n" : "\t"));
+      }
+      checker.run();
+    }
+  }
+
+  private class CheckAtomicCounter implements Runnable {
+    @Override
+    public void run() {
+      if (atomicCounter.get() != TOTAL_ITERS) {
+        throw new AssertionError("Failed atomicCounter postcondition check for "
+            + currentThreadCount + " threads");
+      }
+    }
+  }
+
+  private class CheckCounter implements Runnable {
+    @Override
+    public void run() {
+      if (counter != TOTAL_ITERS) {
+        throw new AssertionError("Failed counter postcondition check for "
+            + currentThreadCount + " threads");
+      }
+    }
+  }
+
+  private void run() {
+    if (PRINT_TIMES) {
+      System.out.println("All times in milliseconds for 1, 2, 4 and 8 threads");
+    }
+    System.out.println("Atomic increments");
+    runAll(new RepeatedIncrementer(), () -> { atomicCounter.set(0); }, new CheckAtomicCounter());
+    for (int i = 2; i <= MAX_HOLD_TIME; i *= 10) {
+      // i * 8 (max thread count) divides TOTAL_ITERS
+      System.out.println("Hold time " + i + ", shared lock");
+      runAll(new RepeatedLockHolder(true, i), () -> { counter = 0; }, new CheckCounter());
+    }
+    if (PRINT_TIMES) {
+      for (int i = 2; i <= MAX_HOLD_TIME; i *= 1000) {
+        // i divides TOTAL_ITERS
+        System.out.println("Hold time " + i + ", private lock");
+        // Since there is no mutual exclusion final counter value is unpredictable.
+        runAll(new RepeatedLockHolder(false, i), () -> { counter = 0; }, () -> {});
+      }
+    }
+    System.out.println("Hold for 2 msecs while sleeping, shared lock");
+    runAll(new SleepyLockHolder(true), () -> { counter = 0; }, new CheckCounter());
+    System.out.println("Hold for 2 msecs while sleeping, private lock");
+    runAll(new SleepyLockHolder(false), () -> { counter = 0; }, () -> {});
+  }
+
+  public static void main(String[] args) {
+    System.out.println("Starting");
+    new Main().run();
+  }
+}
diff --git a/test/knownfailures.json b/test/knownfailures.json
index 02705bc..559bee7 100644
--- a/test/knownfailures.json
+++ b/test/knownfailures.json
@@ -1289,6 +1289,12 @@
         "tests": ["909-attach-agent", "126-miranda-multidex"],
         "zipapex": true,
         "bug": "b/135507613",
-        "description": ["These tests run dalvikvm multiple times, this can messup the zipapex runner"]
+        "description": ["These tests run dalvikvm multiple times, this can mess up the",
+                        "zipapex runner."]
+    },
+    {
+        "tests": ["2029-contended-monitors"],
+        "variant": "interpreter | interp-ac | gcstress | trace",
+        "description": ["Slow test. Prone to timeouts."]
     }
 ]