Fix ABA problem when iterating epoll events.

Original patch contributed by andrey.semashev@gmail.com.

In PhysicalSocketServer::WaitEpoll(), the loop verifies that the
signalled dispatcher is in dispatchers_ set. It does so by looking up
the dispatcher pointer in the set. This is vulnerable to the ABA
problem because one dispatcher may be removed and destroyed and another
created and added with the same address before epoll reports an event
for the old dispatcher. The same issue exists for other Wait
implementations, if a dispatcher is removed and a new one added with
the same socket handle is the old.

This is avoided by using a 64-bit key for looking up the dispatcher
in the set. The key is set from a running counter which gets incremented
when a dispatcher is added to the set, so even if the same dispatcher
pointer is added, removed and added again, the key value will be
different.

This changes the storage of dispatchers_ from a set to a flat_hash_map,
which uses a bit more memory but has faster lookup (O(1) as opposed to
O(log n)).

Bug: webrtc:11124
Change-Id: I6d206e1a367b58ba971edca9b48af7664384b797
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/181027
Commit-Queue: Taylor <deadbeef@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32019}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index a39a9b6..61886a6 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -802,6 +802,7 @@
   ]
   absl_deps = [
     "//third_party/abseil-cpp/absl/algorithm:container",
+    "//third_party/abseil-cpp/absl/container:flat_hash_map",
     "//third_party/abseil-cpp/absl/memory",
     "//third_party/abseil-cpp/absl/strings",
     "//third_party/abseil-cpp/absl/types:optional",
diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc
index 05b3255..cf6e792 100644
--- a/rtc_base/physical_socket_server.cc
+++ b/rtc_base/physical_socket_server.cc
@@ -103,6 +103,20 @@
 #endif
 #endif
 
+namespace {
+class ScopedSetTrue {
+ public:
+  ScopedSetTrue(bool* value) : value_(value) {
+    RTC_DCHECK(!*value_);
+    *value_ = true;
+  }
+  ~ScopedSetTrue() { *value_ = false; }
+
+ private:
+  bool* value_;
+};
+}  // namespace
+
 namespace rtc {
 
 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
@@ -835,7 +849,7 @@
 
 #if defined(WEBRTC_USE_EPOLL)
 
-static int GetEpollEvents(uint32_t ff) {
+inline static int GetEpollEvents(uint32_t ff) {
   int events = 0;
   if (ff & (DE_READ | DE_ACCEPT)) {
     events |= EPOLLIN;
@@ -1061,7 +1075,8 @@
     close(epoll_fd_);
   }
 #endif
-  RTC_DCHECK(dispatchers_.empty());
+  RTC_DCHECK(dispatcher_by_key_.empty());
+  RTC_DCHECK(key_by_dispatcher_.empty());
 }
 
 void PhysicalSocketServer::WakeUp() {
@@ -1100,45 +1115,32 @@
 
 void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
   CritScope cs(&crit_);
-  if (processing_dispatchers_) {
-    // A dispatcher is being added while a "Wait" call is processing the
-    // list of socket events.
-    // Defer adding to "dispatchers_" set until processing is done to avoid
-    // invalidating the iterator in "Wait".
-    pending_remove_dispatchers_.erase(pdispatcher);
-    pending_add_dispatchers_.insert(pdispatcher);
-  } else {
-    dispatchers_.insert(pdispatcher);
+  if (key_by_dispatcher_.count(pdispatcher)) {
+    RTC_LOG(LS_WARNING)
+        << "PhysicalSocketServer asked to add a duplicate dispatcher.";
+    return;
   }
+  uint64_t key = next_dispatcher_key_++;
+  dispatcher_by_key_.emplace(key, pdispatcher);
+  key_by_dispatcher_.emplace(pdispatcher, key);
 #if defined(WEBRTC_USE_EPOLL)
   if (epoll_fd_ != INVALID_SOCKET) {
-    AddEpoll(pdispatcher);
+    AddEpoll(pdispatcher, key);
   }
 #endif  // WEBRTC_USE_EPOLL
 }
 
 void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
   CritScope cs(&crit_);
-  if (processing_dispatchers_) {
-    // A dispatcher is being removed while a "Wait" call is processing the
-    // list of socket events.
-    // Defer removal from "dispatchers_" set until processing is done to avoid
-    // invalidating the iterator in "Wait".
-    if (!pending_add_dispatchers_.erase(pdispatcher) &&
-        dispatchers_.find(pdispatcher) == dispatchers_.end()) {
-      RTC_LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
-                             "dispatcher, potentially from a duplicate call to "
-                             "Add.";
-      return;
-    }
-
-    pending_remove_dispatchers_.insert(pdispatcher);
-  } else if (!dispatchers_.erase(pdispatcher)) {
+  if (!key_by_dispatcher_.count(pdispatcher)) {
     RTC_LOG(LS_WARNING)
         << "PhysicalSocketServer asked to remove a unknown "
            "dispatcher, potentially from a duplicate call to Add.";
     return;
   }
+  uint64_t key = key_by_dispatcher_.at(pdispatcher);
+  key_by_dispatcher_.erase(pdispatcher);
+  dispatcher_by_key_.erase(key);
 #if defined(WEBRTC_USE_EPOLL)
   if (epoll_fd_ != INVALID_SOCKET) {
     RemoveEpoll(pdispatcher);
@@ -1152,34 +1154,22 @@
     return;
   }
 
+  // Don't update dispatchers that haven't yet been added.
   CritScope cs(&crit_);
-  if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
+  if (!key_by_dispatcher_.count(pdispatcher)) {
     return;
   }
 
-  UpdateEpoll(pdispatcher);
+  UpdateEpoll(pdispatcher, key_by_dispatcher_.at(pdispatcher));
 #endif
 }
 
-void PhysicalSocketServer::AddRemovePendingDispatchers() {
-  if (!pending_add_dispatchers_.empty()) {
-    for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
-      dispatchers_.insert(pdispatcher);
-    }
-    pending_add_dispatchers_.clear();
-  }
-
-  if (!pending_remove_dispatchers_.empty()) {
-    for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
-      dispatchers_.erase(pdispatcher);
-    }
-    pending_remove_dispatchers_.clear();
-  }
-}
-
 #if defined(WEBRTC_POSIX)
 
 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
+  // We don't support reentrant waiting.
+  RTC_DCHECK(!waiting_);
+  ScopedSetTrue s(&waiting_);
 #if defined(WEBRTC_USE_EPOLL)
   // We don't keep a dedicated "epoll" descriptor containing only the non-IO
   // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
@@ -1205,6 +1195,9 @@
                  &len);
   }
 
+  // Most often the socket is writable or readable or both, so make a single
+  // virtual call to get requested events
+  const uint32_t requested_events = dispatcher->GetRequestedEvents();
   uint32_t ff = 0;
 
   // Check readable descriptors. If we're waiting on an accept, signal
@@ -1212,7 +1205,7 @@
   // readable or really closed.
   // TODO(pthatcher): Only peek at TCP descriptors.
   if (readable) {
-    if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
+    if (requested_events & DE_ACCEPT) {
       ff |= DE_ACCEPT;
     } else if (errcode || dispatcher->IsDescriptorClosed()) {
       ff |= DE_CLOSE;
@@ -1224,7 +1217,7 @@
   // Check writable descriptors. If we're waiting on a connect, detect
   // success versus failure by the reaped error code.
   if (writable) {
-    if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
+    if (requested_events & DE_CONNECT) {
       if (!errcode) {
         ff |= DE_CONNECT;
       } else {
@@ -1258,13 +1251,9 @@
     stop_us = rtc::TimeMicros() + cmsWait * 1000;
   }
 
-  // Zero all fd_sets. Don't need to do this inside the loop since
-  // select() zeros the descriptors not signaled
 
   fd_set fdsRead;
-  FD_ZERO(&fdsRead);
   fd_set fdsWrite;
-  FD_ZERO(&fdsWrite);
 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
 // inline assembly in FD_ZERO.
 // http://crbug.com/344505
@@ -1276,16 +1265,22 @@
   fWait_ = true;
 
   while (fWait_) {
+    // Zero all fd_sets. Although select() zeros the descriptors not signaled,
+    // we may need to do this for dispatchers that were deleted while
+    // iterating.
+    FD_ZERO(&fdsRead);
+    FD_ZERO(&fdsWrite);
     int fdmax = -1;
     {
       CritScope cr(&crit_);
-      // TODO(jbauch): Support re-entrant waiting.
-      RTC_DCHECK(!processing_dispatchers_);
-      for (Dispatcher* pdispatcher : dispatchers_) {
+      current_dispatcher_keys_.clear();
+      for (auto const& kv : dispatcher_by_key_) {
+        uint64_t key = kv.first;
+        Dispatcher* pdispatcher = kv.second;
         // Query dispatchers for read and write wait state
-        RTC_DCHECK(pdispatcher);
         if (!process_io && (pdispatcher != signal_wakeup_))
           continue;
+        current_dispatcher_keys_.push_back(key);
         int fd = pdispatcher->GetDescriptor();
         // "select"ing a file descriptor that is equal to or larger than
         // FD_SETSIZE will result in undefined behavior.
@@ -1323,8 +1318,14 @@
     } else {
       // We have signaled descriptors
       CritScope cr(&crit_);
-      processing_dispatchers_ = true;
-      for (Dispatcher* pdispatcher : dispatchers_) {
+      // Iterate only on the dispatchers whose sockets were passed into
+      // WSAEventSelect; this avoids the ABA problem (a socket being
+      // destroyed and a new one created with the same file descriptor).
+      for (uint64_t key : current_dispatcher_keys_) {
+        if (!dispatcher_by_key_.count(key))
+          continue;
+        Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
+
         int fd = pdispatcher->GetDescriptor();
 
         bool readable = FD_ISSET(fd, &fdsRead);
@@ -1340,11 +1341,6 @@
         // The error code can be signaled through reads or writes.
         ProcessEvents(pdispatcher, readable, writable, readable || writable);
       }
-
-      processing_dispatchers_ = false;
-      // Process deferred dispatchers that have been added/removed while the
-      // events were handled above.
-      AddRemovePendingDispatchers();
     }
 
     // Recalc the time remaining to wait. Doing it here means it doesn't get
@@ -1365,7 +1361,7 @@
 
 #if defined(WEBRTC_USE_EPOLL)
 
-void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
+void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher, uint64_t key) {
   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
   int fd = pdispatcher->GetDescriptor();
   RTC_DCHECK(fd != INVALID_SOCKET);
@@ -1375,7 +1371,7 @@
 
   struct epoll_event event = {0};
   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
-  event.data.ptr = pdispatcher;
+  event.data.u64 = key;
   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
   RTC_DCHECK_EQ(err, 0);
   if (err == -1) {
@@ -1404,7 +1400,7 @@
   }
 }
 
-void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
+void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) {
   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
   int fd = pdispatcher->GetDescriptor();
   RTC_DCHECK(fd != INVALID_SOCKET);
@@ -1414,7 +1410,7 @@
 
   struct epoll_event event = {0};
   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
-  event.data.ptr = pdispatcher;
+  event.data.u64 = key;
   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
   RTC_DCHECK_EQ(err, 0);
   if (err == -1) {
@@ -1456,11 +1452,12 @@
       CritScope cr(&crit_);
       for (int i = 0; i < n; ++i) {
         const epoll_event& event = epoll_events_[i];
-        Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
-        if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
+        uint64_t key = event.data.u64;
+        if (!dispatcher_by_key_.count(key)) {
           // The dispatcher for this socket no longer exists.
           continue;
         }
+        Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
 
         bool readable = (event.events & (EPOLLIN | EPOLLPRI));
         bool writable = (event.events & EPOLLOUT);
@@ -1472,7 +1469,7 @@
 
     if (cmsWait != kForever) {
       tvWait = TimeDiff(tvStop, TimeMillis());
-      if (tvWait < 0) {
+      if (tvWait <= 0) {
         // Return success on timeout.
         return true;
       }
@@ -1555,6 +1552,10 @@
 
 #if defined(WEBRTC_WIN)
 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
+  // We don't support reentrant waiting.
+  RTC_DCHECK(!waiting_);
+  ScopedSetTrue s(&waiting_);
+
   int64_t cmsTotal = cmsWait;
   int64_t cmsElapsed = 0;
   int64_t msStart = Time();
@@ -1562,37 +1563,40 @@
   fWait_ = true;
   while (fWait_) {
     std::vector<WSAEVENT> events;
-    std::vector<Dispatcher*> event_owners;
+    std::vector<uint64_t> event_owners;
 
     events.push_back(socket_ev_);
 
     {
       CritScope cr(&crit_);
-      // TODO(jbauch): Support re-entrant waiting.
-      RTC_DCHECK(!processing_dispatchers_);
-
-      // Calling "CheckSignalClose" might remove a closed dispatcher from the
-      // set. This must be deferred to prevent invalidating the iterator.
-      processing_dispatchers_ = true;
-      for (Dispatcher* disp : dispatchers_) {
+      // Get a snapshot of all current dispatchers; this is used to avoid the
+      // ABA problem (see later comment) and avoids the dispatcher_by_key_
+      // iterator being invalidated by calling CheckSignalClose, which may
+      // remove the dispatcher from the list.
+      current_dispatcher_keys_.clear();
+      for (auto const& kv : dispatcher_by_key_) {
+        current_dispatcher_keys_.push_back(kv.first);
+      }
+      for (uint64_t key : current_dispatcher_keys_) {
+        if (!dispatcher_by_key_.count(key)) {
+          continue;
+        }
+        Dispatcher* disp = dispatcher_by_key_.at(key);
+        if (!disp)
+          continue;
         if (!process_io && (disp != signal_wakeup_))
           continue;
         SOCKET s = disp->GetSocket();
         if (disp->CheckSignalClose()) {
-          // We just signalled close, don't poll this socket
+          // We just signalled close, don't poll this socket.
         } else if (s != INVALID_SOCKET) {
           WSAEventSelect(s, events[0],
                          FlagsToEvents(disp->GetRequestedEvents()));
         } else {
           events.push_back(disp->GetWSAEvent());
-          event_owners.push_back(disp);
+          event_owners.push_back(key);
         }
       }
-
-      processing_dispatchers_ = false;
-      // Process deferred dispatchers that have been added/removed while the
-      // events were handled above.
-      AddRemovePendingDispatchers();
     }
 
     // Which is shorter, the delay wait or the asked wait?
@@ -1624,15 +1628,23 @@
       int index = dw - WSA_WAIT_EVENT_0;
       if (index > 0) {
         --index;  // The first event is the socket event
-        Dispatcher* disp = event_owners[index];
-        // The dispatcher could have been removed while waiting for events.
-        if (dispatchers_.find(disp) != dispatchers_.end()) {
-          disp->OnPreEvent(0);
-          disp->OnEvent(0, 0);
+        uint64_t key = event_owners[index];
+        if (!dispatcher_by_key_.count(key)) {
+          // The dispatcher could have been removed while waiting for events.
+          continue;
         }
+        Dispatcher* disp = dispatcher_by_key_.at(key);
+        disp->OnPreEvent(0);
+        disp->OnEvent(0, 0);
       } else if (process_io) {
-        processing_dispatchers_ = true;
-        for (Dispatcher* disp : dispatchers_) {
+        // Iterate only on the dispatchers whose sockets were passed into
+        // WSAEventSelect; this avoids the ABA problem (a socket being
+        // destroyed and a new one created with the same SOCKET handle).
+        for (uint64_t key : current_dispatcher_keys_) {
+          if (!dispatcher_by_key_.count(key)) {
+            continue;
+          }
+          Dispatcher* disp = dispatcher_by_key_.at(key);
           SOCKET s = disp->GetSocket();
           if (s == INVALID_SOCKET)
             continue;
@@ -1698,11 +1710,6 @@
             }
           }
         }
-
-        processing_dispatchers_ = false;
-        // Process deferred dispatchers that have been added/removed while the
-        // events were handled above.
-        AddRemovePendingDispatchers();
       }
 
       // Reset the network event until new activity occurs
diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h
index 7eaf590..cc21a67 100644
--- a/rtc_base/physical_socket_server.h
+++ b/rtc_base/physical_socket_server.h
@@ -18,7 +18,7 @@
 
 #include <array>
 #include <memory>
-#include <set>
+#include <unordered_map>
 #include <vector>
 
 #include "rtc_base/deprecated/recursive_critical_section.h"
@@ -85,17 +85,13 @@
   // The number of events to process with one call to "epoll_wait".
   static constexpr size_t kNumEpollEvents = 128;
 
-  typedef std::set<Dispatcher*> DispatcherSet;
-
-  void AddRemovePendingDispatchers() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
-
 #if defined(WEBRTC_POSIX)
   bool WaitSelect(int cms, bool process_io);
 #endif  // WEBRTC_POSIX
 #if defined(WEBRTC_USE_EPOLL)
-  void AddEpoll(Dispatcher* dispatcher);
+  void AddEpoll(Dispatcher* dispatcher, uint64_t key);
   void RemoveEpoll(Dispatcher* dispatcher);
-  void UpdateEpoll(Dispatcher* dispatcher);
+  void UpdateEpoll(Dispatcher* dispatcher, uint64_t key);
   bool WaitEpoll(int cms);
   bool WaitPoll(int cms, Dispatcher* dispatcher);
 
@@ -106,16 +102,31 @@
   std::array<epoll_event, kNumEpollEvents> epoll_events_;
   const int epoll_fd_ = INVALID_SOCKET;
 #endif  // WEBRTC_USE_EPOLL
-  DispatcherSet dispatchers_ RTC_GUARDED_BY(crit_);
-  DispatcherSet pending_add_dispatchers_ RTC_GUARDED_BY(crit_);
-  DispatcherSet pending_remove_dispatchers_ RTC_GUARDED_BY(crit_);
-  bool processing_dispatchers_ RTC_GUARDED_BY(crit_) = false;
+  // uint64_t keys are used to uniquely identify a dispatcher in order to avoid
+  // the ABA problem during the epoll loop (a dispatcher being destroyed and
+  // replaced by one with the same address).
+  uint64_t next_dispatcher_key_ RTC_GUARDED_BY(crit_) = 0;
+  std::unordered_map<uint64_t, Dispatcher*> dispatcher_by_key_
+      RTC_GUARDED_BY(crit_);
+  // Reverse lookup necessary for removals/updates.
+  std::unordered_map<Dispatcher*, uint64_t> key_by_dispatcher_
+      RTC_GUARDED_BY(crit_);
+  // A list of dispatcher keys that we're interested in for the current
+  // select() or WSAWaitForMultipleEvents() loop. Again, used to avoid the ABA
+  // problem (a socket being destroyed and a new one created with the same
+  // handle, erroneously receiving the events from the destroyed socket).
+  //
+  // Kept as a member variable just for efficiency.
+  std::vector<uint64_t> current_dispatcher_keys_;
   Signaler* signal_wakeup_;  // Assigned in constructor only
   RecursiveCriticalSection crit_;
 #if defined(WEBRTC_WIN)
   const WSAEVENT socket_ev_;
 #endif
   bool fWait_;
+  // Are we currently in a select()/epoll()/WSAWaitForMultipleEvents loop?
+  // Used for a DCHECK, because we don't support reentrant waiting.
+  bool waiting_ = false;
 };
 
 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
diff --git a/rtc_base/physical_socket_server_unittest.cc b/rtc_base/physical_socket_server_unittest.cc
index 586b9db..648f397 100644
--- a/rtc_base/physical_socket_server_unittest.cc
+++ b/rtc_base/physical_socket_server_unittest.cc
@@ -381,6 +381,15 @@
   SocketTest::TestCloseInClosedCallbackIPv6();
 }
 
+TEST_F(PhysicalSocketTest, TestDeleteInReadCallbackIPv4) {
+  MAYBE_SKIP_IPV4;
+  SocketTest::TestDeleteInReadCallbackIPv4();
+}
+
+TEST_F(PhysicalSocketTest, TestDeleteInReadCallbackIPv6) {
+  SocketTest::TestDeleteInReadCallbackIPv6();
+}
+
 TEST_F(PhysicalSocketTest, TestSocketServerWaitIPv4) {
   MAYBE_SKIP_IPV4;
   SocketTest::TestSocketServerWaitIPv4();
diff --git a/rtc_base/socket_unittest.cc b/rtc_base/socket_unittest.cc
index 6ea4b47..ac85099 100644
--- a/rtc_base/socket_unittest.cc
+++ b/rtc_base/socket_unittest.cc
@@ -149,6 +149,15 @@
   CloseInClosedCallbackInternal(kIPv6Loopback);
 }
 
+void SocketTest::TestDeleteInReadCallbackIPv4() {
+  DeleteInReadCallbackInternal(kIPv4Loopback);
+}
+
+void SocketTest::TestDeleteInReadCallbackIPv6() {
+  MAYBE_SKIP_IPV6;
+  DeleteInReadCallbackInternal(kIPv6Loopback);
+}
+
 void SocketTest::TestSocketServerWaitIPv4() {
   SocketServerWaitInternal(kIPv4Loopback);
 }
@@ -651,6 +660,42 @@
   EXPECT_TRUE(Socket::CS_CLOSED == client->GetState());
 }
 
+// Helper class specifically for the test below.
+class SocketDeleter : public sigslot::has_slots<> {
+ public:
+  explicit SocketDeleter(std::unique_ptr<AsyncSocket> socket)
+      : socket_(std::move(socket)) {}
+
+  void Delete(AsyncSocket* other) { socket_.reset(); }
+
+  bool deleted() const { return socket_ == nullptr; }
+
+ private:
+  std::unique_ptr<AsyncSocket> socket_;
+};
+
+// Tested deleting a socket within another socket's read callback. A previous
+// iteration of the select loop failed in this situation, if both sockets
+// became readable at the same time.
+void SocketTest::DeleteInReadCallbackInternal(const IPAddress& loopback) {
+  std::unique_ptr<AsyncSocket> socket1(
+      ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM));
+  std::unique_ptr<AsyncSocket> socket2(
+      ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM));
+  EXPECT_EQ(0, socket1->Bind(SocketAddress(loopback, 0)));
+  EXPECT_EQ(0, socket2->Bind(SocketAddress(loopback, 0)));
+  EXPECT_EQ(3, socket1->SendTo("foo", 3, socket1->GetLocalAddress()));
+  EXPECT_EQ(3, socket2->SendTo("bar", 3, socket1->GetLocalAddress()));
+  // Sleep a while to ensure sends are both completed at the same time.
+  Thread::SleepMs(1000);
+
+  // Configure the helper class to delete socket 2 when socket 1 has a read
+  // event.
+  SocketDeleter deleter(std::move(socket2));
+  socket1->SignalReadEvent.connect(&deleter, &SocketDeleter::Delete);
+  EXPECT_TRUE_WAIT(deleter.deleted(), kTimeout);
+}
+
 class Sleeper : public MessageHandler {
  public:
   void OnMessage(Message* msg) override { Thread::Current()->SleepMs(500); }
diff --git a/rtc_base/socket_unittest.h b/rtc_base/socket_unittest.h
index 5197ccd..91ef39c 100644
--- a/rtc_base/socket_unittest.h
+++ b/rtc_base/socket_unittest.h
@@ -46,6 +46,8 @@
   void TestServerCloseIPv6();
   void TestCloseInClosedCallbackIPv4();
   void TestCloseInClosedCallbackIPv6();
+  void TestDeleteInReadCallbackIPv4();
+  void TestDeleteInReadCallbackIPv6();
   void TestSocketServerWaitIPv4();
   void TestSocketServerWaitIPv6();
   void TestTcpIPv4();
@@ -83,6 +85,7 @@
   void ClientCloseDuringConnectInternal(const IPAddress& loopback);
   void ServerCloseInternal(const IPAddress& loopback);
   void CloseInClosedCallbackInternal(const IPAddress& loopback);
+  void DeleteInReadCallbackInternal(const IPAddress& loopback);
   void SocketServerWaitInternal(const IPAddress& loopback);
   void SingleFlowControlCallbackInternal(const IPAddress& loopback);
   void UdpInternal(const IPAddress& loopback);