Fix CastSocket e2e tests

This patch fixes the Mac and ARM64 cast socket end to end tests. Turns
out this was actually a bug in the SocketHandleWaiter, where it
would only ever process the first ready handle in its list of handles,
completely starving any other handles in its list.

The fix is to store the last updated time along with the socket
subscriber, and continously process the last updated handles until
either (1) a timeout is reached, or (2) all handles have been updated.

Bug=b/155214294
Change-Id: Icf967a80036654520a3d13661a596f38730951c4
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/2173961
Reviewed-by: Jordan Bayles <jophba@chromium.org>
Reviewed-by: Brandon Tolsch <btolsch@chromium.org>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/platform/impl/socket_handle_waiter.cc b/platform/impl/socket_handle_waiter.cc
index 1560aa6..bcf36e0 100644
--- a/platform/impl/socket_handle_waiter.cc
+++ b/platform/impl/socket_handle_waiter.cc
@@ -19,7 +19,7 @@
                                    SocketHandleRef handle) {
   std::lock_guard<std::mutex> lock(mutex_);
   if (handle_mappings_.find(handle) == handle_mappings_.end()) {
-    handle_mappings_.emplace(handle, subscriber);
+    handle_mappings_.emplace(handle, SocketSubscription{subscriber});
   }
 }
 
@@ -35,7 +35,7 @@
 void SocketHandleWaiter::UnsubscribeAll(Subscriber* subscriber) {
   std::lock_guard<std::mutex> lock(mutex_);
   for (auto it = handle_mappings_.begin(); it != handle_mappings_.end();) {
-    if (it->second == subscriber) {
+    if (it->second.subscriber == subscriber) {
       it = handle_mappings_.erase(it);
     } else {
       it++;
@@ -69,26 +69,40 @@
 }
 
 void SocketHandleWaiter::ProcessReadyHandles(
-    const std::vector<HandleWithSubscriber>& handles,
+    std::vector<HandleWithSubscription>* handles,
     Clock::duration timeout) {
-  Clock::time_point start_time = now_function_();
-  bool processed_one = false;
-  // TODO(btolsch): Track explicit or implicit time since last handled on each
-  // watched handle so we can sort by it here for better fairness.
-  for (const HandleWithSubscriber& handle : handles) {
-    Clock::time_point current_time = now_function_();
-    if (processed_one && (current_time - start_time) > timeout) {
-      return;
-    }
-
-    processed_one = true;
-    handle.subscriber->ProcessReadyHandle(handle.handle);
-
-    current_time = now_function_();
-    if ((current_time - start_time) > timeout) {
-      return;
-    }
+  if (handles->empty()) {
+    return;
   }
+
+  Clock::time_point start_time = now_function_();
+  // Process the stalest handles one by one until we hit our timeout.
+  do {
+    Clock::time_point oldest_time = Clock::time_point::max();
+    HandleWithSubscription& oldest_handle = handles->at(0);
+    for (HandleWithSubscription& handle : *handles) {
+      // Skip already processed handles.
+      if (handle.subscription->last_updated >= start_time) {
+        continue;
+      }
+
+      // Select the oldest handle.
+      if (handle.subscription->last_updated < oldest_time) {
+        oldest_time = handle.subscription->last_updated;
+        oldest_handle = handle;
+      }
+    }
+
+    // Already processed all handles.
+    if (oldest_time == Clock::time_point::max()) {
+      return;
+    }
+
+    // Process the oldest handle.
+    oldest_handle.subscription->last_updated = now_function_();
+    oldest_handle.subscription->subscriber->ProcessReadyHandle(
+        oldest_handle.handle);
+  } while (now_function_() - start_time <= timeout);
 }
 
 Error SocketHandleWaiter::ProcessHandles(Clock::duration timeout) {
@@ -109,7 +123,7 @@
   ErrorOr<std::vector<SocketHandleRef>> changed_handles =
       AwaitSocketsReadable(handles, remaining_timeout);
 
-  std::vector<HandleWithSubscriber> ready_handles;
+  std::vector<HandleWithSubscription> ready_handles;
   {
     std::lock_guard<std::mutex> lock(mutex_);
     handles_being_deleted_.clear();
@@ -121,7 +135,7 @@
         auto mapping_it = handle_mappings_.find(handle);
         if (mapping_it != handle_mappings_.end()) {
           ready_handles.push_back(
-              HandleWithSubscriber{handle, mapping_it->second});
+              HandleWithSubscription{handle, &(mapping_it->second)});
         }
       }
     }
@@ -132,7 +146,7 @@
 
     current_time = now_function_();
     remaining_timeout = timeout - (current_time - start_time);
-    ProcessReadyHandles(ready_handles, remaining_timeout);
+    ProcessReadyHandles(&ready_handles, remaining_timeout);
   }
   return Error::None();
 }
diff --git a/platform/impl/socket_handle_waiter.h b/platform/impl/socket_handle_waiter.h
index 1ad8152..c2e5693 100644
--- a/platform/impl/socket_handle_waiter.h
+++ b/platform/impl/socket_handle_waiter.h
@@ -72,14 +72,21 @@
       const Clock::duration& timeout) = 0;
 
  private:
-  struct HandleWithSubscriber {
+  struct SocketSubscription {
+    Subscriber* subscriber = nullptr;
+    Clock::time_point last_updated = Clock::time_point::min();
+  };
+
+  struct HandleWithSubscription {
     SocketHandleRef handle;
-    Subscriber* subscriber;
+    // Reference to the original subscription in the unordered map, so
+    // we can keep track of when we updated this socket handle.
+    SocketSubscription* subscription;
   };
 
   // Call the subscriber associated with each changed handle.  Handles are only
   // processed until |timeout| is exceeded.  Must be called with |mutex_| held.
-  void ProcessReadyHandles(const std::vector<HandleWithSubscriber>& handles,
+  void ProcessReadyHandles(std::vector<HandleWithSubscription>* handles,
                            Clock::duration timeout);
 
   // Guards against concurrent access to all other class data members.
@@ -94,7 +101,7 @@
 
   // Set of all socket handles currently being watched, mapped to the subscriber
   // that is watching them.
-  std::unordered_map<SocketHandleRef, Subscriber*, SocketHandleHash>
+  std::unordered_map<SocketHandleRef, SocketSubscription, SocketHandleHash>
       handle_mappings_;
 
   const ClockNowFunctionPtr now_function_;