Slimmer rewrite of socket_forward_proxy
Key differences:
1. No QueueState
2. No sequence number
3. No generation number
Instead, the guest-side monitors all queues for new connections and the
host-side keeps track of which queues are allocated
design: https://docs.google.com/document/d/1z43c9LGeEEU6G-ojNtEeQP9-ezK890MU3GP6df3byYs
Change-Id: If0396de2ef8080ed78e7afc36ac0d661f99b6d3c
Bug: 80104636
Bug: 110707067
Test: run local while restarting the host-side process and guest-side
process every ~10 seconds. Connect to a guest-side server that only
sends (doesn't receive) information to the host. Connect several
host-side clients to a guest-side echo server. Restart without having
sent any data in either direction or in only one direction.
diff --git a/common/vsoc/lib/socket_forward_region_view.cpp b/common/vsoc/lib/socket_forward_region_view.cpp
index 6f64b19..2dbd2e8 100644
--- a/common/vsoc/lib/socket_forward_region_view.cpp
+++ b/common/vsoc/lib/socket_forward_region_view.cpp
@@ -24,7 +24,6 @@
using vsoc::layout::socket_forward::Queue;
using vsoc::layout::socket_forward::QueuePair;
-namespace QueueState = vsoc::layout::socket_forward::QueueState;
// store the read and write direction as variables to keep the ifdefs and macros
// in later code to a minimum
constexpr auto ReadDirection = &QueuePair::
@@ -84,25 +83,6 @@
void SocketForwardRegionView::CleanUpPreviousConnections() {
data()->Recover();
- int connection_id = 0;
- auto current_generation = generation();
- auto begin_packet = Packet::MakeBegin();
- begin_packet.set_generation(current_generation);
- auto end_packet = Packet::MakeEnd();
- end_packet.set_generation(current_generation);
- for (auto&& queue_pair : data()->queues_) {
- std::uint32_t state{};
- {
- auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
- state = (queue_pair.*WriteDirection).queue_state_;
-#ifndef CUTTLEFISH_HOST
- if (state == QueueState::HOST_CONNECTED) {
- state = (queue_pair.*WriteDirection).queue_state_ =
- (queue_pair.*ReadDirection).queue_state_ =
- QueueState::BOTH_CONNECTED;
- }
-#endif
- }
static constexpr auto kRestartPacket = Packet::MakeRestart();
for (int connection_id = 0; connection_id < kNumQueues; ++connection_id) {
@@ -110,18 +90,11 @@
}
}
-void SocketForwardRegionView::MarkQueueDisconnected(
- int connection_id, Queue QueuePair::*direction) {
- auto& queue_pair = data()->queues_[connection_id];
- auto& queue = queue_pair.*direction;
-
-#ifdef CUTTLEFISH_HOST
- // if the host has connected but the guest hasn't seen it yet, wait for the
- // guest to connect so the protocol can follow the normal state transition.
- while (queue.queue_state_ == QueueState::HOST_CONNECTED) {
- LOG(WARNING) << "closing queue[" << connection_id
- << "] in HOST_CONNECTED state. waiting";
- WaitForSignal(&queue.queue_state_, QueueState::HOST_CONNECTED);
+SocketForwardRegionView::ConnectionViewCollection
+SocketForwardRegionView::AllConnections() {
+ SocketForwardRegionView::ConnectionViewCollection all_queues;
+ for (int connection_id = 0; connection_id < kNumQueues; ++connection_id) {
+ all_queues.emplace_back(this, connection_id);
}
return all_queues;
}
@@ -190,20 +163,15 @@
static constexpr auto kEmptyPacket = Packet::MakeData();
received_packet_ = kEmptyPacket;
}
- ++last_seq_number_;
- int id = 0;
- for (auto&& queue_pair : data()->queues_) {
- LOG(DEBUG) << "locking and checking queue at index " << id;
- auto guard = make_lock_guard(&queue_pair.queue_state_lock_);
- if (queue_pair.host_to_guest.queue_state_ == QueueState::HOST_CONNECTED) {
- CHECK(queue_pair.guest_to_host.queue_state_ ==
- QueueState::HOST_CONNECTED);
- LOG(DEBUG) << "found waiting connection at index " << id;
- queue_pair.host_to_guest.queue_state_ = QueueState::BOTH_CONNECTED;
- queue_pair.guest_to_host.queue_state_ = QueueState::BOTH_CONNECTED;
- SendSignal(layout::Sides::Peer, &queue_pair.host_to_guest.queue_state_);
- SendSignal(layout::Sides::Peer, &queue_pair.guest_to_host.queue_state_);
- return id;
+ received_packet_free_ = false;
+ receive_thread_data_cv_.notify_one();
+}
+
+void SocketForwardRegionView::ShmConnectionView::Receiver::Start() {
+ while (ExpectMorePackets()) {
+ std::unique_lock<std::mutex> guard(receive_thread_data_lock_);
+ while (!received_packet_free_) {
+ receive_thread_data_cv_.wait(guard);
}
do {
diff --git a/common/vsoc/shm/socket_forward_layout.h b/common/vsoc/shm/socket_forward_layout.h
index 141aff3..4a9beda 100644
--- a/common/vsoc/shm/socket_forward_layout.h
+++ b/common/vsoc/shm/socket_forward_layout.h
@@ -27,30 +27,12 @@
constexpr std::size_t kMaxPacketSize = 8192;
constexpr std::size_t kNumQueues = 16;
-<<<<<<< HEAD
-=======
-namespace QueueState {
-constexpr std::uint32_t INACTIVE = 0;
-constexpr std::uint32_t HOST_CONNECTED = 1;
-constexpr std::uint32_t BOTH_CONNECTED = 2;
-constexpr std::uint32_t HOST_CLOSED = 3;
-constexpr std::uint32_t GUEST_CLOSED = 4;
-// If both are closed then the queue goes back to INACTIVE
-// BOTH_CLOSED = 0,
-} // namespace QueueState
-
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
struct Queue {
static constexpr size_t layout_size =
CircularPacketQueue<16, kMaxPacketSize>::layout_size;
CircularPacketQueue<16, kMaxPacketSize> queue;
-<<<<<<< HEAD
-=======
- std::atomic_uint32_t queue_state_;
-
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
bool Recover() { return queue.Recover(); }
};
ASSERT_SHM_COMPATIBLE(Queue);
@@ -63,13 +45,6 @@
// Traffic originating from guest that proceeds towards host.
Queue guest_to_host;
-<<<<<<< HEAD
-=======
- std::uint32_t port_;
-
- SpinLock queue_state_lock_;
-
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
bool Recover() {
bool recovered = false;
recovered = recovered || host_to_guest.Recover();
@@ -88,20 +63,10 @@
bool rval = i.Recover();
recovered = recovered || rval;
}
-<<<<<<< HEAD
-=======
- // TODO: consider handling the sequence number here
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
return recovered;
}
QueuePair queues_[kNumQueues];
-<<<<<<< HEAD
-=======
- std::atomic_uint32_t seq_num; // incremented for every new connection
- std::atomic_uint32_t
- generation_num; // incremented for every new socket forward process
->>>>>>> 4d9ddd4... Host will wait on HOST_CONNECTED queues until queue_state changes.
static const char* region_name;
};