sync: Don't transfer waiters from Condvar -> Mutex

A performance optimization should never change the observable behavior
and yet that's what this one did. Canceling a `cv.wait()` call after
the waiter was already transferred to the Mutex's wait list should still
result in us waking up the next waiter in the Condvar's wait list.
Instead, the `cancel_after_transfer` test was checking for the opposite
behavior.

Additionally, the transfer was racy with concurrent cancellation.
Consider the following sequence of events:

Thread A                            Thread B
--------                            --------

drop WaitFuture                     cv.notify_all()
waiter.cancel.lock()                raw_mutex.transfer_waiters()
c = cancel.c
data = cancel.data
waiter.cancel.unlock()
                                    waiter.cancel.lock()
                                    cancel.c = mu_cancel_waiter
                                    cancel.data = mutex_ptr
                                    waiter.cancel.unlock()
                                    waiter.is_waiting_for = Mutex
                                    mu.unlock_slow()
                                    get_wake_list()
                                    waiter.is_waiting_for = None
                                    waiter.wake()
c(data, waiter, false)
cancel_waiter(cv, waiter, false)
waiter.is_waiting_for == None
get_wake_list

There are 2 issues in the above sequence:

1. Thread A has stale information about the state of the waiter.  Since
   the waiter was woken, it needs to set `wake_next` in the cancel
   function to true but instead incorrectly sets it to false.  By
   itself, this isn't that big of an issue because the cancel function
   also checks if the waiter was already removed from the wait
   list (i.e., it was woken up) but that check is problematic because of
   the next issue.
2. The Condvar's cancel function can detect when a waiter has been moved
   to the Mutex's wait list (waiter.is_waiting_for == Mutex) and can
   request to retry the cancellation.  However, when
   waiter.is_waiting_for == None (which means it was removed from the
   wait list), it doesn't know whether the waiter was woken up from the
   Mutex's wait list or the Condvar's wait list.  It incorrectly assumes
   that the waiter was in the Condvar's wait list and does not retry the
   cancel.  As a result, the Mutex's cancel function is never called,
   which means any waiters still in the Mutex's wait list will never get
   woken up.

I haven't been able to come up with a way to fix these issues without
making everything way more complicated so for now let's just drop the
transfer optimization.

The initial motivation for this optimization was to avoid having to make
a FUTEX_WAKE syscall for every thread that needs to be woken up and to
avoid a thundering herd problem where the newly woken up threads all
cause a bunch of contention on the mutex.  However, waking up futures
tends to be cheaper than waking up a whole thread.  If no executor
threads are blocked then it doesn't even involve making a syscall as the
executor will simply add the future to its ready list.  Additionally,
it's unlikely that multi-threaded executors will have more threads than
the # of cpus on the system so that should also reduce the amount of
contention on the mutex.

If this code starts showing up as a hotspot in perf traces then we
should consider figuring out a way to re-enable this optimization.

BUG=chromium:1157860
TEST=unit tests.  Also run the tests in a loop for an hour on a kukui
     and see that it didn't hang

Cq-Depend: chromium:2793844
Change-Id: Iee3861a40c8d9a45d3a01863d804efc82d4467ac
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2804867
Tested-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Commit-Queue: Chirantan Ekbote <chirantan@chromium.org>
diff --git a/cros_async/src/sync/cv.rs b/cros_async/src/sync/cv.rs
index 3a335da..830df82 100644
--- a/cros_async/src/sync/cv.rs
+++ b/cros_async/src/sync/cv.rs
@@ -229,10 +229,7 @@
         // Safe because the spin lock guarantees exclusive access and the reference does not escape
         // this function.
         let waiters = unsafe { &mut *self.waiters.get() };
-        let (mut wake_list, all_readers) = get_wake_list(waiters);
-
-        // Safe because the spin lock guarantees exclusive access.
-        let muptr = unsafe { (*self.mu.get()) as *const RawMutex };
+        let wake_list = get_wake_list(waiters);
 
         let newstate = if waiters.is_empty() {
             // Also clear the mutex associated with this Condvar since there are no longer any
@@ -247,17 +244,10 @@
             HAS_WAITERS
         };
 
-        // Try to transfer waiters before releasing the spin lock.
-        if !wake_list.is_empty() {
-            // Safe because there was a waiter in the queue and the thread that owns the waiter also
-            // owns a reference to the Mutex, guaranteeing that the pointer is valid.
-            unsafe { (*muptr).transfer_waiters(&mut wake_list, all_readers) };
-        }
-
         // Release the spin lock.
         self.state.store(newstate, Ordering::Release);
 
-        // Now wake any waiters still left in the wake list.
+        // Now wake any waiters in the wake list.
         for w in wake_list {
             w.wake();
         }
@@ -295,22 +285,12 @@
         }
 
         // Safe because the spin lock guarantees exclusive access to `self.waiters`.
-        let mut wake_list = unsafe { (*self.waiters.get()).take() };
-
-        // Safe because the spin lock guarantees exclusive access.
-        let muptr = unsafe { (*self.mu.get()) as *const RawMutex };
+        let wake_list = unsafe { (*self.waiters.get()).take() };
 
         // Clear the mutex associated with this Condvar since there are no longer any waiters. Safe
         // because we the spin lock guarantees exclusive access.
         unsafe { *self.mu.get() = 0 };
 
-        // Try to transfer waiters before releasing the spin lock.
-        if !wake_list.is_empty() {
-            // Safe because there was a waiter in the queue and the thread that owns the waiter also
-            // owns a reference to the Mutex, guaranteeing that the pointer is valid.
-            unsafe { (*muptr).transfer_waiters(&mut wake_list, false) };
-        }
-
         // Mark any waiters left as no longer waiting for the Condvar.
         for w in &wake_list {
             w.set_waiting_for(WaitingFor::None);
@@ -319,7 +299,7 @@
         // Release the spin lock.  We can clear all bits in the state since we took all the waiters.
         self.state.store(0, Ordering::Release);
 
-        // Now wake any waiters still left in the wake list.
+        // Now wake any waiters in the wake list.
         for w in wake_list {
             w.wake();
         }
@@ -373,24 +353,14 @@
                 None
             };
 
-            let (mut wake_list, all_readers) = if wake_next || waiting_for == WaitingFor::None {
+            let wake_list = if wake_next || waiting_for == WaitingFor::None {
                 // Either the waiter was already woken or it's been removed from the condvar's waiter
                 // list and is going to be woken. Either way, we need to wake up another thread.
                 get_wake_list(waiters)
             } else {
-                (WaiterList::new(WaiterAdapter::new()), false)
+                WaiterList::new(WaiterAdapter::new())
             };
 
-            // Safe because the spin lock guarantees exclusive access.
-            let muptr = unsafe { (*self.mu.get()) as *const RawMutex };
-
-            // Try to transfer waiters before releasing the spin lock.
-            if !wake_list.is_empty() {
-                // Safe because there was a waiter in the queue and the thread that owns the waiter also
-                // owns a reference to the Mutex, guaranteeing that the pointer is valid.
-                unsafe { (*muptr).transfer_waiters(&mut wake_list, all_readers) };
-            }
-
             let set_on_release = if waiters.is_empty() {
                 // Clear the mutex associated with this Condvar since there are no longer any waiters. Safe
                 // because we the spin lock guarantees exclusive access.
@@ -423,15 +393,14 @@
     }
 }
 
-// Scan `waiters` and return all waiters that should be woken up. If all waiters in the returned
-// wait list are readers then the returned bool will be true.
+// Scan `waiters` and return all waiters that should be woken up.
 //
 // If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
 // waiting for a shared lock are also woken up. In addition one writer is woken up, if possible.
 //
 // If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
 // the rest of the list is not scanned.
-fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, bool) {
+fn get_wake_list(waiters: &mut WaiterList) -> WaiterList {
     let mut to_wake = WaiterList::new(WaiterAdapter::new());
     let mut cursor = waiters.front_mut();
 
@@ -445,7 +414,6 @@
                 let waiter = cursor.remove().unwrap();
                 waiter.set_waiting_for(WaitingFor::None);
                 to_wake.push_back(waiter);
-                all_readers = false;
                 break;
             }
 
@@ -475,7 +443,7 @@
         }
     }
 
-    (to_wake, all_readers)
+    to_wake
 }
 
 fn cancel_waiter(cv: usize, waiter: &Waiter, wake_next: bool) -> bool {
@@ -1058,7 +1026,7 @@
     }
 
     #[test]
-    fn cancel_after_notify() {
+    fn cancel_after_notify_one() {
         async fn dec(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
             let mut count = mu.lock().await;
 
@@ -1102,7 +1070,7 @@
     }
 
     #[test]
-    fn cancel_after_transfer() {
+    fn cancel_after_notify_all() {
         async fn dec(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
             let mut count = mu.lock().await;
 
@@ -1134,63 +1102,7 @@
         let mut count = block_on(mu.lock());
         *count = 2;
 
-        // Notify the cv while holding the lock. Only transfer one waiter.
-        cv.notify_one();
-        assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS);
-
-        // Drop the lock and then the future. This should not cause fut2 to become runnable as it
-        // should still be in the Condvar's wait queue.
-        mem::drop(count);
-        mem::drop(fut1);
-
-        if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) {
-            panic!("future unexpectedly ready");
-        }
-
-        // Now wake up fut2.  Since the lock isn't held, it should wake up immediately.
-        cv.notify_one();
-        if fut2.as_mut().poll(&mut cx).is_pending() {
-            panic!("future unable to complete");
-        }
-
-        assert_eq!(*block_on(mu.lock()), 1);
-    }
-
-    #[test]
-    fn cancel_after_transfer_and_wake() {
-        async fn dec(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
-            let mut count = mu.lock().await;
-
-            while *count == 0 {
-                count = cv.wait(count).await;
-            }
-
-            *count -= 1;
-        }
-
-        let mu = Arc::new(Mutex::new(0));
-        let cv = Arc::new(Condvar::new());
-
-        let arc_waker = Arc::new(TestWaker);
-        let waker = waker_ref(&arc_waker);
-        let mut cx = Context::from_waker(&waker);
-
-        let mut fut1 = Box::pin(dec(mu.clone(), cv.clone()));
-        let mut fut2 = Box::pin(dec(mu.clone(), cv.clone()));
-
-        if let Poll::Ready(()) = fut1.as_mut().poll(&mut cx) {
-            panic!("future unexpectedly ready");
-        }
-        if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) {
-            panic!("future unexpectedly ready");
-        }
-        assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS);
-
-        let mut count = block_on(mu.lock());
-        *count = 2;
-
-        // Notify the cv while holding the lock. This should transfer both waiters to the mutex's
-        // wait queue.
+        // Notify the cv while holding the lock. This should wake up both waiters.
         cv.notify_all();
         assert_eq!(cv.state.load(Ordering::Relaxed), 0);
 
diff --git a/cros_async/src/sync/mu.rs b/cros_async/src/sync/mu.rs
index 6bf827a..bf35644 100644
--- a/cros_async/src/sync/mu.rs
+++ b/cros_async/src/sync/mu.rs
@@ -36,8 +36,6 @@
 const READ_LOCK: usize = 1 << 8;
 // Mask used for checking if any threads currently hold a shared lock.
 const READ_MASK: usize = !0xff;
-// Mask used to check if the lock is held in either shared or exclusive mode.
-const ANY_LOCK: usize = LOCKED | READ_MASK;
 
 // The number of times the thread should just spin and attempt to re-acquire the lock.
 const SPIN_THRESHOLD: usize = 7;
@@ -507,105 +505,6 @@
         }
     }
 
-    // Transfer waiters from the `Condvar` wait list to the `Mutex` wait list. `all_readers` may
-    // be set to true if all waiters are waiting to acquire a shared lock but should not be true if
-    // there is even one waiter waiting on an exclusive lock.
-    //
-    // This is similar to what the `FUTEX_CMP_REQUEUE` flag does on linux.
-    pub fn transfer_waiters(&self, new_waiters: &mut WaiterList, all_readers: bool) {
-        if new_waiters.is_empty() {
-            return;
-        }
-
-        let mut oldstate = self.state.load(Ordering::Relaxed);
-        let can_acquire_read_lock = (oldstate & Shared::zero_to_acquire()) == 0;
-
-        // The lock needs to be held in some mode or else the waiters we transfer now may never get
-        // woken up. Additionally, if all the new waiters are readers and can acquire the lock now
-        // then we can just wake them up.
-        if (oldstate & ANY_LOCK) == 0 || (all_readers && can_acquire_read_lock) {
-            // Nothing to do here. The Condvar will wake up all the waiters left in `new_waiters`.
-            return;
-        }
-
-        if (oldstate & SPINLOCK) == 0
-            && self
-                .state
-                .compare_exchange_weak(
-                    oldstate,
-                    oldstate | SPINLOCK | HAS_WAITERS,
-                    Ordering::Acquire,
-                    Ordering::Relaxed,
-                )
-                .is_ok()
-        {
-            let mut transferred_writer = false;
-
-            // Safe because the spin lock guarantees exclusive access and the reference does not
-            // escape this function.
-            let waiters = unsafe { &mut *self.waiters.get() };
-
-            let mut current = new_waiters.front_mut();
-            while let Some(w) = current.get() {
-                match w.kind() {
-                    WaiterKind::Shared => {
-                        if can_acquire_read_lock {
-                            current.move_next();
-                        } else {
-                            // We need to update the cancellation function since we're moving this
-                            // waiter into our queue. Also update the waiting to indicate that it is
-                            // now in the Mutex's waiter list.
-                            let w = current.remove().unwrap();
-                            w.set_cancel(cancel_waiter, self as *const RawMutex as usize);
-                            w.set_waiting_for(WaitingFor::Mutex);
-                            waiters.push_back(w);
-                        }
-                    }
-                    WaiterKind::Exclusive => {
-                        transferred_writer = true;
-                        // We need to update the cancellation function since we're moving this
-                        // waiter into our queue. Also update the waiting to indicate that it is
-                        // now in the Mutex's waiter list.
-                        let w = current.remove().unwrap();
-                        w.set_cancel(cancel_waiter, self as *const RawMutex as usize);
-                        w.set_waiting_for(WaitingFor::Mutex);
-                        waiters.push_back(w);
-                    }
-                }
-            }
-
-            let set_on_release = if transferred_writer {
-                WRITER_WAITING
-            } else {
-                0
-            };
-
-            // If we didn't actually transfer any waiters, clear the HAS_WAITERS bit that we set
-            // earlier when we acquired the spin lock.
-            let clear = if waiters.is_empty() {
-                SPINLOCK | HAS_WAITERS
-            } else {
-                SPINLOCK
-            };
-
-            while self
-                .state
-                .compare_exchange_weak(
-                    oldstate,
-                    (oldstate | set_on_release) & !clear,
-                    Ordering::Release,
-                    Ordering::Relaxed,
-                )
-                .is_err()
-            {
-                spin_loop_hint();
-                oldstate = self.state.load(Ordering::Relaxed);
-            }
-        }
-
-        // The Condvar will wake up any waiters still left in the queue.
-    }
-
     fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) -> bool {
         let mut oldstate = self.state.load(Ordering::Relaxed);
         while oldstate & SPINLOCK != 0
@@ -849,11 +748,7 @@
         }
     }
 
-    // Called from `Condvar::wait` when the thread wants to reacquire the lock. Since we may
-    // directly transfer waiters from the `Condvar` wait list to the `Mutex` wait list (see
-    // `transfer_all` below), we cannot call `Mutex::lock` as we also need to clear the
-    // `DESIGNATED_WAKER` bit when acquiring the lock. Not doing so will prevent us from waking up
-    // any other threads in the wait list.
+    // Called from `Condvar::wait` when the thread wants to reacquire the lock.
     #[inline]
     pub(crate) async fn lock_from_cv(&self) -> MutexGuard<'_, T> {
         self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
@@ -869,24 +764,7 @@
     #[inline]
     pub(crate) async fn read_lock_from_cv(&self) -> MutexReadGuard<'_, T> {
         // Threads that have waited in the Condvar's waiter list don't have to care if there is a
-        // writer waiting. This also prevents a deadlock in the following case:
-        //
-        //  * Thread A holds a write lock.
-        //  * Thread B is in the mutex's waiter list, also waiting on a write lock.
-        //  * Threads C, D, and E are in the condvar's waiter list. C and D want a read lock; E
-        //    wants a write lock.
-        //  * A calls `cv.notify_all()` while still holding the lock, which transfers C, D, and E
-        //    onto the mutex's wait list.
-        //  * A releases the lock, which wakes up B.
-        //  * B acquires the lock, does some work, and releases the lock. This wakes up C and D.
-        //    However, when iterating through the waiter list we find E, which is waiting for a
-        //    write lock so we set the WRITER_WAITING bit.
-        //  * C and D go through this function to acquire the lock. If we didn't clear the
-        //    WRITER_WAITING bit from the zero_to_acquire set then it would prevent C and D from
-        //    acquiring the lock and they would add themselves back into the waiter list.
-        //  * Now C, D, and E will sit in the waiter list indefinitely unless some other thread
-        //    comes along and acquires the lock. On release, it would wake up E and everything would
-        //    go back to normal.
+        // writer waiting since they have already waited once.
         self.raw
             .lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
             .await;
@@ -2093,7 +1971,7 @@
     }
 
     #[test]
-    fn transfer_notify_one() {
+    fn notify_one() {
         async fn read(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
             let mut count = mu.read_lock().await;
             while *count == 0 {
@@ -2137,9 +2015,20 @@
         let mut count = block_on(mu.lock());
         *count = 1;
 
-        // This should transfer all readers + one writer to the waiter queue.
+        // This should wake all readers + one writer.
         cv.notify_one();
 
+        // Poll the readers and the writer so they add themselves to the mutex's waiter list.
+        for r in &mut readers {
+            if r.as_mut().poll(&mut cx).is_ready() {
+                panic!("reader unexpectedly ready");
+            }
+        }
+
+        if writer.as_mut().poll(&mut cx).is_ready() {
+            panic!("writer unexpectedly ready");
+        }
+
         assert_eq!(
             mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
             HAS_WAITERS
@@ -2170,7 +2059,7 @@
     }
 
     #[test]
-    fn transfer_waiters_when_unlocked() {
+    fn notify_when_unlocked() {
         async fn dec(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
             let mut count = mu.lock().await;
 
@@ -2204,8 +2093,7 @@
         *block_on(mu.lock()) = futures.len();
         cv.notify_all();
 
-        // Since the lock is not held, instead of transferring the waiters to the waiter list we
-        // should just wake them all up.
+        // Since we haven't polled `futures` yet, the mutex should not have any waiters.
         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
 
         for f in &mut futures {
@@ -2217,7 +2105,7 @@
     }
 
     #[test]
-    fn transfer_reader_writer() {
+    fn notify_reader_writer() {
         async fn read(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
             let mut count = mu.read_lock().await;
             while *count == 0 {
@@ -2282,8 +2170,7 @@
             HAS_WAITERS | WRITER_WAITING
         );
 
-        // Wake up waiters while holding the lock. This should end with them transferred to the
-        // mutex's waiter list.
+        // Wake up waiters while holding the lock.
         cv.notify_all();
 
         // Drop the lock.  This should wake up the lock function.
@@ -2293,10 +2180,8 @@
             panic!("lock() unable to complete");
         }
 
-        assert_eq!(
-            mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
-            HAS_WAITERS | WRITER_WAITING
-        );
+        // Since we haven't polled `futures` yet, the mutex state should now be empty.
+        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
 
         // Poll everything again. The readers should be able to make progress (but not complete) but
         // the writer should be blocked.
@@ -2338,7 +2223,7 @@
     }
 
     #[test]
-    fn transfer_readers_with_read_lock() {
+    fn notify_readers_with_read_lock() {
         async fn read(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
             let mut count = mu.read_lock().await;
             while *count == 0 {
@@ -2374,23 +2259,24 @@
 
         let g = block_on(mu.read_lock());
 
-        // Notify the condvar while holding the read lock. This should wake up all the waiters
-        // rather than just transferring them.
+        // Notify the condvar while holding the read lock. This should wake up all the waiters.
         cv.notify_all();
-        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
 
-        mem::drop(g);
-
+        // Since the lock is held in shared mode, all the readers should immediately be able to
+        // acquire the read lock.
         for f in &mut futures {
             if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
                 panic!("future unexpectedly ready");
             }
         }
+        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
         assert_eq!(
-            mu.raw.state.load(Ordering::Relaxed),
-            READ_LOCK * futures.len()
+            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
+            READ_LOCK * (futures.len() + 1)
         );
 
+        mem::drop(g);
+
         for f in &mut futures {
             if f.as_mut().poll(&mut cx).is_pending() {
                 panic!("future unable to complete");
diff --git a/cros_async/src/sync/waiter.rs b/cros_async/src/sync/waiter.rs
index 48335c7..d9e0fa0 100644
--- a/cros_async/src/sync/waiter.rs
+++ b/cros_async/src/sync/waiter.rs
@@ -189,18 +189,6 @@
         self.waiting_for.store(waiting_for as u8, Ordering::Release);
     }
 
-    // Change the cancellation function that this `Waiter` should use. This will panic if called
-    // when the `Waiter` is still linked into a waiter list.
-    pub fn set_cancel(&self, c: fn(usize, &Waiter, bool) -> bool, data: usize) {
-        debug_assert!(
-            !self.is_linked(),
-            "Cannot change cancellation function while linked"
-        );
-        let mut cancel = self.cancel.lock();
-        cancel.c = c;
-        cancel.data = data;
-    }
-
     // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a
     // waiter list.
     pub fn reset(&self, waiting_for: WaitingFor) {