Host socket_forward_proxy waits for guest

If a host-side process connects then disconnects before the guest-side
process sees it: the host-side sfp queue will be marked as
HOST_DISCONNECTED so the guest-side sfp will never see acquire it. The
host-side sfp thread will wait forever for the host to send an END
marker through the shm region.

host-side sfp instead waits for the guest to connect before marking its
side as disconnected.

This sort of thing will happen when running adb connect repeatedly.

Bug: 79702418
Test: boot locally, run adb connect repeatedly, quickly, watch the
connections work themselves out.

Change-Id: I3e86bd848c1770075515efd79f88d477e478563b
diff --git a/common/vsoc/lib/socket_forward_region_view.cpp b/common/vsoc/lib/socket_forward_region_view.cpp
index 8315b3e..0282fd6 100644
--- a/common/vsoc/lib/socket_forward_region_view.cpp
+++ b/common/vsoc/lib/socket_forward_region_view.cpp
@@ -22,6 +22,7 @@
 #include "common/vsoc/shm/lock.h"
 #include "common/vsoc/shm/socket_forward_layout.h"
 
+using vsoc::layout::socket_forward::Queue;
 using vsoc::layout::socket_forward::QueuePair;
 using vsoc::layout::socket_forward::QueueState;
 // store the read and write direction as variables to keep the ifdefs and macros
@@ -114,7 +115,8 @@
     switch (state) {
       case QueueState::HOST_CONNECTED:
       case kOtherSideClosed:
-        LOG(DEBUG) << "host_connected or other side is closed, marking inactive";
+        LOG(DEBUG)
+            << "host_connected or other side is closed, marking inactive";
         state = QueueState::INACTIVE;
         break;
 
@@ -147,10 +149,9 @@
       state = (queue_pair.*WriteDirection).queue_state_;
 #ifndef CUTTLEFISH_HOST
       if (state == QueueState::HOST_CONNECTED) {
-        state =
-            (queue_pair.*WriteDirection).queue_state_ =
+        state = (queue_pair.*WriteDirection).queue_state_ =
             (queue_pair.*ReadDirection).queue_state_ =
-            QueueState::BOTH_CONNECTED;
+                QueueState::BOTH_CONNECTED;
       }
 #endif
     }
@@ -169,24 +170,38 @@
   }
   ++data()->generation_num;
 }
-// TODO merge these two into a helper since the only difference is one
-// Read/Write
-void SocketForwardRegionView::MarkSendQueueDisconnected(int connection_id) {
+
+void SocketForwardRegionView::MarkQueueDisconnected(
+    int connection_id, Queue QueuePair::*direction) {
   auto& queue_pair = data()->queues_[connection_id];
+  auto& queue = queue_pair.*direction;
+
+#ifdef CUTTLEFISH_HOST
+  // if the host has connected but the guest hasn't seen it yet, wait for the
+  // guest to connect so the protocol can follow the normal state transition.
+  while (true) {
+    auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
+    if (queue.queue_state_ != QueueState::HOST_CONNECTED) {
+      break;
+    }
+    LOG(WARNING) << "closing queue in HOST_CONNECTED state. waiting";
+    sleep(1);
+  }
+#endif
+
   auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
-  auto& queue = queue_pair.*WriteDirection;
+
   queue.queue_state_ = queue.queue_state_ == kOtherSideClosed
                            ? QueueState::INACTIVE
                            : kThisSideClosed;
 }
 
+void SocketForwardRegionView::MarkSendQueueDisconnected(int connection_id) {
+  MarkQueueDisconnected(connection_id, WriteDirection);
+}
+
 void SocketForwardRegionView::MarkRecvQueueDisconnected(int connection_id) {
-  auto& queue_pair = data()->queues_[connection_id];
-  auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
-  auto& queue = queue_pair.*ReadDirection;
-  queue.queue_state_ = queue.queue_state_ == kOtherSideClosed
-                           ? QueueState::INACTIVE
-                           : kThisSideClosed;
+  MarkQueueDisconnected(connection_id, ReadDirection);
 }
 
 int SocketForwardRegionView::port(int connection_id) {
@@ -199,25 +214,26 @@
 
 #ifdef CUTTLEFISH_HOST
 int SocketForwardRegionView::AcquireConnectionID(int port) {
-  int id = 0;
-  for (auto&& queue_pair : data()->queues_) {
-    LOG(DEBUG) << "locking and checking queue at index " << id;
-    auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
-    if (queue_pair.host_to_guest.queue_state_ == QueueState::INACTIVE &&
-        queue_pair.guest_to_host.queue_state_ == QueueState::INACTIVE) {
-      queue_pair.port_ = port;
-      queue_pair.host_to_guest.queue_state_ = QueueState::HOST_CONNECTED;
-      queue_pair.guest_to_host.queue_state_ = QueueState::HOST_CONNECTED;
-      LOG(DEBUG) << "acquired queue " << id
-                 << ". current seq_num: " << data()->seq_num;
-      ++data()->seq_num;
-      return id;
+  while (true) {
+    int id = 0;
+    for (auto&& queue_pair : data()->queues_) {
+      LOG(DEBUG) << "locking and checking queue at index " << id;
+      auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
+      if (queue_pair.host_to_guest.queue_state_ == QueueState::INACTIVE &&
+          queue_pair.guest_to_host.queue_state_ == QueueState::INACTIVE) {
+        queue_pair.port_ = port;
+        queue_pair.host_to_guest.queue_state_ = QueueState::HOST_CONNECTED;
+        queue_pair.guest_to_host.queue_state_ = QueueState::HOST_CONNECTED;
+        LOG(DEBUG) << "acquired queue " << id
+                   << ". current seq_num: " << data()->seq_num;
+        ++data()->seq_num;
+        return id;
+      }
+      ++id;
     }
-    ++id;
+    LOG(ERROR) << "no remaining shm queues for connection, sleeping.";
+    sleep(10);
   }
-  // TODO(haining) handle all queues being used
-  LOG(FATAL) << "no remaining shm queues for connection";
-  return -1;
 }
 
 std::pair<SocketForwardRegionView::Sender, SocketForwardRegionView::Receiver>
diff --git a/common/vsoc/lib/socket_forward_region_view.h b/common/vsoc/lib/socket_forward_region_view.h
index 4aeee77..ce6958a 100644
--- a/common/vsoc/lib/socket_forward_region_view.h
+++ b/common/vsoc/lib/socket_forward_region_view.h
@@ -120,6 +120,10 @@
 
   void ResetQueueStates(layout::socket_forward::QueuePair* queue_pair);
 
+  void MarkQueueDisconnected(int connection_id,
+                             layout::socket_forward::Queue
+                                 layout::socket_forward::QueuePair::*direction);
+
  public:
   // Helper class that will send a ConnectionBegin marker when constructed and a
   // ConnectionEnd marker when destroyed.
@@ -167,7 +171,6 @@
     int connection_id_{};
   };
 
-  // Helper class that will wait for a ConnectionBegin marker when constructed
   class Receiver {
    public:
     explicit Receiver(SocketForwardRegionView* view, int connection_id,
diff --git a/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp b/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp
index 22db608..525ea7c 100644
--- a/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp
+++ b/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp
@@ -104,7 +104,7 @@
     LOG(INFO) << "Attempting to connect to device on port " << port;
     auto sock = cvd::SharedFD::SocketLocalClient(kAdbDaemonPort, SOCK_STREAM);
     if (sock->IsOpen() && AdbConnect(sock, port)) {
-      LOG(INFO) << "connected to device on port " << port << '\n';
+      LOG(INFO) << "connection attempted to device on port " << port;
       break;
     }
     sleep(2);