adb: fdevent: move run queue to fdevent_context.

Make the run queue logic reusable between implementations of fdevent by
moving it to the abstract base class.

Test: adb_test
Change-Id: If2f72e3ddc8007304bca63aa75446fa117267b25
diff --git a/adb/fdevent/fdevent.cpp b/adb/fdevent/fdevent.cpp
index e80bb5a..c858f6b 100644
--- a/adb/fdevent/fdevent.cpp
+++ b/adb/fdevent/fdevent.cpp
@@ -49,6 +49,32 @@
                                        state.c_str());
 }
 
+void fdevent_context::Run(std::function<void()> fn) {
+    {
+        std::lock_guard<std::mutex> lock(run_queue_mutex_);
+        run_queue_.push_back(std::move(fn));
+    }
+
+    Interrupt();
+}
+
+void fdevent_context::FlushRunQueue() {
+    // We need to be careful around reentrancy here, since a function we call can queue up another
+    // function.
+    while (true) {
+        std::function<void()> fn;
+        {
+            std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
+            if (this->run_queue_.empty()) {
+                break;
+            }
+            fn = this->run_queue_.front();
+            this->run_queue_.pop_front();
+        }
+        fn();
+    }
+}
+
 static auto& g_ambient_fdevent_context =
         *new std::unique_ptr<fdevent_context>(new fdevent_context_poll());
 
diff --git a/adb/fdevent/fdevent.h b/adb/fdevent/fdevent.h
index b46219c..5a2f2c6 100644
--- a/adb/fdevent/fdevent.h
+++ b/adb/fdevent/fdevent.h
@@ -21,10 +21,14 @@
 #include <stdint.h>
 
 #include <chrono>
+#include <deque>
 #include <functional>
+#include <mutex>
 #include <optional>
 #include <variant>
 
+#include <android-base/thread_annotations.h>
+
 #include "adb_unique_fd.h"
 
 // Events that may be observed
@@ -48,6 +52,7 @@
 std::string dump_fde(const fdevent* fde);
 
 struct fdevent_context {
+  public:
     virtual ~fdevent_context() = default;
 
     // Allocate and initialize a new fdevent object.
@@ -68,17 +73,29 @@
     virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0;
 
     // Loop forever, handling events.
+    // Implementations should call FlushRunQueue on every iteration.
     virtual void Loop() = 0;
 
     // Assert that the caller is running on the context's main thread.
     virtual void CheckMainThread() = 0;
 
     // Queue an operation to be run on the main thread.
-    virtual void Run(std::function<void()> fn) = 0;
+    void Run(std::function<void()> fn);
 
     // Test-only functionality:
     virtual void TerminateLoop() = 0;
     virtual size_t InstalledCount() = 0;
+
+  protected:
+    // Interrupt the run loop.
+    virtual void Interrupt() = 0;
+
+    // Run all pending functions enqueued via Run().
+    void FlushRunQueue() EXCLUDES(run_queue_mutex_);
+
+  private:
+    std::mutex run_queue_mutex_;
+    std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
 };
 
 struct fdevent {
diff --git a/adb/fdevent/fdevent_poll.cpp b/adb/fdevent/fdevent_poll.cpp
index 6e016f6..7615859 100644
--- a/adb/fdevent/fdevent_poll.cpp
+++ b/adb/fdevent/fdevent_poll.cpp
@@ -50,6 +50,35 @@
 #include "fdevent.h"
 #include "sysdeps/chrono.h"
 
+static void fdevent_interrupt(int fd, unsigned, void*) {
+    char buf[BUFSIZ];
+    ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf)));
+    if (rc == -1) {
+        PLOG(FATAL) << "failed to read from fdevent interrupt fd";
+    }
+}
+
+fdevent_context_poll::fdevent_context_poll() {
+    int s[2];
+    if (adb_socketpair(s) != 0) {
+        PLOG(FATAL) << "failed to create fdevent interrupt socketpair";
+    }
+
+    if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
+        PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking";
+    }
+
+    this->interrupt_fd_.reset(s[0]);
+    fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr);
+    CHECK(fde != nullptr);
+    this->Add(fde, FDE_READ);
+}
+
+fdevent_context_poll::~fdevent_context_poll() {
+    main_thread_valid_ = false;
+    this->Destroy(this->interrupt_fde_);
+}
+
 void fdevent_context_poll::CheckMainThread() {
     if (main_thread_valid_) {
         CHECK_EQ(main_thread_id_, android::base::GetThreadId());
@@ -291,79 +320,6 @@
             fde->func);
 }
 
-static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) {
-    // We need to be careful around reentrancy here, since a function we call can queue up another
-    // function.
-    while (true) {
-        std::function<void()> fn;
-        {
-            std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
-            if (ctx->run_queue_.empty()) {
-                break;
-            }
-            fn = ctx->run_queue_.front();
-            ctx->run_queue_.pop_front();
-        }
-        fn();
-    }
-}
-
-static void fdevent_run_func(int fd, unsigned ev, void* data) {
-    CHECK_GE(fd, 0);
-    CHECK(ev & FDE_READ);
-
-    bool* run_needs_flush = static_cast<bool*>(data);
-    char buf[1024];
-
-    // Empty the fd.
-    if (adb_read(fd, buf, sizeof(buf)) == -1) {
-        PLOG(FATAL) << "failed to empty run queue notify fd";
-    }
-
-    // Mark that we need to flush, and then run it at the end of fdevent_loop.
-    *run_needs_flush = true;
-}
-
-static void fdevent_run_setup(fdevent_context_poll* ctx) {
-    {
-        std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
-        CHECK(ctx->run_queue_notify_fd_.get() == -1);
-        int s[2];
-        if (adb_socketpair(s) != 0) {
-            PLOG(FATAL) << "failed to create run queue notify socketpair";
-        }
-
-        if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
-            PLOG(FATAL) << "failed to make run queue notify socket nonblocking";
-        }
-
-        ctx->run_queue_notify_fd_.reset(s[0]);
-        fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_);
-        CHECK(fde != nullptr);
-        ctx->Add(fde, FDE_READ);
-    }
-
-    fdevent_run_flush(ctx);
-}
-
-void fdevent_context_poll::Run(std::function<void()> fn) {
-    std::lock_guard<std::mutex> lock(run_queue_mutex_);
-    run_queue_.push_back(std::move(fn));
-
-    // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
-    // In that case, rely on the setup code to flush the queue without a notification being needed.
-    if (run_queue_notify_fd_ != -1) {
-        int rc = adb_write(run_queue_notify_fd_.get(), "", 1);
-
-        // It's possible that we get EAGAIN here, if lots of notifications came in while handling.
-        if (rc == 0) {
-            PLOG(FATAL) << "run queue notify fd was closed?";
-        } else if (rc == -1 && errno != EAGAIN) {
-            PLOG(FATAL) << "failed to write to run queue notify fd";
-        }
-    }
-}
-
 static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
     // Check to see if we're spinning because we forgot about an fdevent
     // by keeping track of how long fdevents have been continuously pending.
@@ -424,7 +380,6 @@
 void fdevent_context_poll::Loop() {
     this->main_thread_id_ = android::base::GetThreadId();
     this->main_thread_valid_ = true;
-    fdevent_run_setup(this);
 
     uint64_t cycle = 0;
     while (true) {
@@ -444,17 +399,27 @@
             fdevent_call_fdfunc(fde);
         }
 
-        if (run_needs_flush_) {
-            fdevent_run_flush(this);
-            run_needs_flush_ = false;
-        }
+        this->FlushRunQueue();
     }
 }
 
 void fdevent_context_poll::TerminateLoop() {
     terminate_loop_ = true;
+    Interrupt();
 }
 
 size_t fdevent_context_poll::InstalledCount() {
-    return poll_node_map_.size();
+    // We always have an installed fde for interrupt.
+    return poll_node_map_.size() - 1;
+}
+
+void fdevent_context_poll::Interrupt() {
+    int rc = adb_write(this->interrupt_fd_, "", 1);
+
+    // It's possible that we get EAGAIN here, if lots of notifications came in while handling.
+    if (rc == 0) {
+        PLOG(FATAL) << "fdevent interrupt fd was closed?";
+    } else if (rc == -1 && errno != EAGAIN) {
+        PLOG(FATAL) << "failed to write to fdevent interrupt fd";
+    }
 }
diff --git a/adb/fdevent/fdevent_poll.h b/adb/fdevent/fdevent_poll.h
index f5720ca..1b505a7 100644
--- a/adb/fdevent/fdevent_poll.h
+++ b/adb/fdevent/fdevent_poll.h
@@ -25,6 +25,7 @@
 
 #include <android-base/thread_annotations.h>
 
+#include "adb_unique_fd.h"
 #include "fdevent.h"
 
 struct PollNode {
@@ -44,7 +45,8 @@
 };
 
 struct fdevent_context_poll : public fdevent_context {
-    virtual ~fdevent_context_poll() = default;
+    fdevent_context_poll();
+    virtual ~fdevent_context_poll();
 
     virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final;
     virtual unique_fd Destroy(fdevent* fde) final;
@@ -58,11 +60,13 @@
 
     virtual void CheckMainThread() final;
 
-    virtual void Run(std::function<void()> fn) final;
-
     virtual void TerminateLoop() final;
     virtual size_t InstalledCount() final;
 
+  protected:
+    virtual void Interrupt() final;
+
+  public:
     // All operations to fdevent should happen only in the main thread.
     // That's why we don't need a lock for fdevent.
     std::unordered_map<int, PollNode> poll_node_map_;
@@ -71,10 +75,7 @@
     uint64_t main_thread_id_ = 0;
     uint64_t fdevent_id_ = 0;
 
-    bool run_needs_flush_ = false;
-    unique_fd run_queue_notify_fd_;
-    std::mutex run_queue_mutex_;
-    std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
-
+    unique_fd interrupt_fd_;
+    fdevent* interrupt_fde_ = nullptr;
     std::atomic<bool> terminate_loop_ = false;
 };
diff --git a/adb/fdevent/fdevent_test.h b/adb/fdevent/fdevent_test.h
index 24bce59..2139d0f 100644
--- a/adb/fdevent/fdevent_test.h
+++ b/adb/fdevent/fdevent_test.h
@@ -78,8 +78,8 @@
     }
 
     size_t GetAdditionalLocalSocketCount() {
-        // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
-        return 2;
+        // dummy socket installed in PrepareThread()
+        return 1;
     }
 
     void TerminateThread() {