Slimmer rewrite of socket_forward_proxy

Key differences:
 1. No QueueState
 2. No sequence number
 3. No generation number

Instead, the guest-side monitors all queues for new connections and the
host-side keeps track of which queues are allocated

design: https://docs.google.com/document/d/1z43c9LGeEEU6G-ojNtEeQP9-ezK890MU3GP6df3byYs

Change-Id: If0396de2ef8080ed78e7afc36ac0d661f99b6d3c
Bug: 80104636
Bug: 110707067
Test: run local while restarting the host-side process and guest-side
process every ~10 seconds. Connect to a guest-side server that only
sends (doesn't receive) information to the host. Connect several
host-side clients to a guest-side echo server. Restart without having
sent any data in either direction or in only one direction.
diff --git a/common/vsoc/lib/socket_forward_region_view.cpp b/common/vsoc/lib/socket_forward_region_view.cpp
index 6f64b19..2dbd2e8 100644
--- a/common/vsoc/lib/socket_forward_region_view.cpp
+++ b/common/vsoc/lib/socket_forward_region_view.cpp
@@ -24,7 +24,6 @@
 
 using vsoc::layout::socket_forward::Queue;
 using vsoc::layout::socket_forward::QueuePair;
-namespace QueueState = vsoc::layout::socket_forward::QueueState;
 // store the read and write direction as variables to keep the ifdefs and macros
 // in later code to a minimum
 constexpr auto ReadDirection = &QueuePair::
@@ -84,25 +83,6 @@
 
 void SocketForwardRegionView::CleanUpPreviousConnections() {
   data()->Recover();
-  int connection_id = 0;
-  auto current_generation = generation();
-  auto begin_packet = Packet::MakeBegin();
-  begin_packet.set_generation(current_generation);
-  auto end_packet = Packet::MakeEnd();
-  end_packet.set_generation(current_generation);
-  for (auto&& queue_pair : data()->queues_) {
-    std::uint32_t state{};
-    {
-      auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
-      state = (queue_pair.*WriteDirection).queue_state_;
-#ifndef CUTTLEFISH_HOST
-      if (state == QueueState::HOST_CONNECTED) {
-        state = (queue_pair.*WriteDirection).queue_state_ =
-            (queue_pair.*ReadDirection).queue_state_ =
-                QueueState::BOTH_CONNECTED;
-      }
-#endif
-    }
 
   static constexpr auto kRestartPacket = Packet::MakeRestart();
   for (int connection_id = 0; connection_id < kNumQueues; ++connection_id) {
@@ -110,18 +90,11 @@
   }
 }
 
-void SocketForwardRegionView::MarkQueueDisconnected(
-    int connection_id, Queue QueuePair::*direction) {
-  auto& queue_pair = data()->queues_[connection_id];
-  auto& queue = queue_pair.*direction;
-
-#ifdef CUTTLEFISH_HOST
-  // if the host has connected but the guest hasn't seen it yet, wait for the
-  // guest to connect so the protocol can follow the normal state transition.
-  while (queue.queue_state_ == QueueState::HOST_CONNECTED) {
-    LOG(WARNING) << "closing queue[" << connection_id
-                 << "] in HOST_CONNECTED state. waiting";
-    WaitForSignal(&queue.queue_state_, QueueState::HOST_CONNECTED);
+SocketForwardRegionView::ConnectionViewCollection
+SocketForwardRegionView::AllConnections() {
+  SocketForwardRegionView::ConnectionViewCollection all_queues;
+  for (int connection_id = 0; connection_id < kNumQueues; ++connection_id) {
+    all_queues.emplace_back(this, connection_id);
   }
   return all_queues;
 }
@@ -190,20 +163,15 @@
     static constexpr auto kEmptyPacket = Packet::MakeData();
     received_packet_ = kEmptyPacket;
   }
-  ++last_seq_number_;
-  int id = 0;
-  for (auto&& queue_pair : data()->queues_) {
-    LOG(DEBUG) << "locking and checking queue at index " << id;
-    auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
-    if (queue_pair.host_to_guest.queue_state_ == QueueState::HOST_CONNECTED) {
-      CHECK(queue_pair.guest_to_host.queue_state_ ==
-            QueueState::HOST_CONNECTED);
-      LOG(DEBUG) << "found waiting connection at index " << id;
-      queue_pair.host_to_guest.queue_state_ = QueueState::BOTH_CONNECTED;
-      queue_pair.guest_to_host.queue_state_ = QueueState::BOTH_CONNECTED;
-      SendSignal(layout::Sides::Peer, &queue_pair.host_to_guest.queue_state_);
-      SendSignal(layout::Sides::Peer, &queue_pair.guest_to_host.queue_state_);
-      return id;
+  received_packet_free_ = false;
+  receive_thread_data_cv_.notify_one();
+}
+
+void SocketForwardRegionView::ShmConnectionView::Receiver::Start() {
+  while (ExpectMorePackets()) {
+    std::unique_lock<std::mutex> guard(receive_thread_data_lock_);
+    while (!received_packet_free_) {
+      receive_thread_data_cv_.wait(guard);
     }
 
     do {
diff --git a/common/vsoc/shm/socket_forward_layout.h b/common/vsoc/shm/socket_forward_layout.h
index 141aff3..4a9beda 100644
--- a/common/vsoc/shm/socket_forward_layout.h
+++ b/common/vsoc/shm/socket_forward_layout.h
@@ -27,30 +27,12 @@
 constexpr std::size_t kMaxPacketSize = 8192;
 constexpr std::size_t kNumQueues = 16;
 
-<<<<<<< HEAD
-=======
-namespace QueueState {
-constexpr std::uint32_t INACTIVE = 0;
-constexpr std::uint32_t HOST_CONNECTED = 1;
-constexpr std::uint32_t BOTH_CONNECTED = 2;
-constexpr std::uint32_t HOST_CLOSED = 3;
-constexpr std::uint32_t GUEST_CLOSED = 4;
-// If both are closed then the queue goes back to INACTIVE
-// BOTH_CLOSED = 0,
-}  // namespace QueueState
-
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
 struct Queue {
   static constexpr size_t layout_size =
       CircularPacketQueue<16, kMaxPacketSize>::layout_size;
 
   CircularPacketQueue<16, kMaxPacketSize> queue;
 
-<<<<<<< HEAD
-=======
-  std::atomic_uint32_t queue_state_;
-
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
   bool Recover() { return queue.Recover(); }
 };
 ASSERT_SHM_COMPATIBLE(Queue);
@@ -63,13 +45,6 @@
   // Traffic originating from guest that proceeds towards host.
   Queue guest_to_host;
 
-<<<<<<< HEAD
-=======
-  std::uint32_t port_;
-
-  SpinLock queue_state_lock_;
-
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
   bool Recover() {
     bool recovered = false;
     recovered = recovered || host_to_guest.Recover();
@@ -88,20 +63,10 @@
       bool rval = i.Recover();
       recovered = recovered || rval;
     }
-<<<<<<< HEAD
-=======
-    // TODO: consider handling the sequence number here
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
     return recovered;
   }
 
   QueuePair queues_[kNumQueues];
-<<<<<<< HEAD
-=======
-  std::atomic_uint32_t seq_num;  // incremented for every new connection
-  std::atomic_uint32_t
-      generation_num;  // incremented for every new socket forward process
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
   static const char* region_name;
 };