Use callbacks when adding commands to ProcessMonitor

This allows to specify custom handling of exited subprocesses, which
will be needed when running the e2e test from the launcher.

Bug: 118688323
Test: build & run locally

Change-Id: If5d1a73119651eefbbcd66aea769da8a7649660f
diff --git a/host/commands/launch/main.cc b/host/commands/launch/main.cc
index 0b950e5..a94af3e 100644
--- a/host/commands/launch/main.cc
+++ b/host/commands/launch/main.cc
@@ -314,6 +314,14 @@
   return FLAGS_run_adb_connector && AdbTunnelEnabled();
 }
 
+bool OnSubprocessExitCallback(cvd::MonitorEntry* entry) {
+  if (FLAGS_restart_subprocesses) {
+    return cvd::ProcessMonitor::RestartOnExitCb(entry);
+  } else {
+    return cvd::ProcessMonitor::DoNotMonitorCb(entry);
+  }
+}
+
 void LaunchUsbServerIfEnabled(const vsoc::CuttlefishConfig& config,
                               cvd::ProcessMonitor* process_monitor) {
   if (!AdbUsbEnabled()) {
@@ -330,7 +338,7 @@
   cvd::Command usb_server(FLAGS_virtual_usb_manager_binary);
   usb_server.AddParameter("-usb_v1_fd=", usb_v1_server);
   process_monitor->StartSubprocess(std::move(usb_server),
-                                   FLAGS_restart_subprocesses);
+                                   OnSubprocessExitCallback);
 }
 
 cvd::Command GetKernelLogMonitorCommand(const vsoc::CuttlefishConfig& config,
@@ -377,7 +385,7 @@
     cvd::Command adb_connector(FLAGS_adb_connector_binary);
     adb_connector.AddParameter(GetAdbConnectorPortArg());
     process_monitor->StartSubprocess(std::move(adb_connector),
-                                     FLAGS_restart_subprocesses);
+                                     OnSubprocessExitCallback);
   }
 }
 
@@ -387,7 +395,7 @@
     adb_tunnel.AddParameter(GetGuestPortArg());
     adb_tunnel.AddParameter(GetHostPortArg());
     process_monitor->StartSubprocess(std::move(adb_tunnel),
-                                     FLAGS_restart_subprocesses);
+                                     OnSubprocessExitCallback);
   }
 }
 
@@ -398,7 +406,7 @@
     cvd::Command vnc_server(FLAGS_vnc_server_binary);
     vnc_server.AddParameter(port_options);
     process_monitor->StartSubprocess(std::move(vnc_server),
-                                     FLAGS_restart_subprocesses);
+                                     OnSubprocessExitCallback);
   }
 }
 
@@ -1008,10 +1016,11 @@
 
   process_monitor.StartSubprocess(
       GetKernelLogMonitorCommand(*config, boot_events_pipe),
-      FLAGS_restart_subprocesses);
+      OnSubprocessExitCallback);
   LaunchUsbServerIfEnabled(*config, &process_monitor);
-  process_monitor.StartSubprocess(GetIvServerCommand(*config),
-                                  FLAGS_restart_subprocesses);
+  process_monitor.StartSubprocess(
+      GetIvServerCommand(*config),
+      OnSubprocessExitCallback);
 
   // Initialize the regions that require so before the VM starts.
   PreLaunchInitializers::Initialize(*config);
diff --git a/host/commands/launch/process_monitor.cc b/host/commands/launch/process_monitor.cc
index 40393b9..24b50b9 100644
--- a/host/commands/launch/process_monitor.cc
+++ b/host/commands/launch/process_monitor.cc
@@ -51,69 +51,82 @@
   fd->Read(buffer, sizeof(buffer));
 }
 
-void WaitForSubprocessAndRestartIt(cvd::MonitorEntry* monitor_entry) {
-  // In the future we may want to read from the fd, but for now we
-  // assume it just needs restarting
-  LOG(INFO) << "Detected exit of monitored subprocess";
-  // Make sure the subprocess isn't left in a zombie state, and that the
-  // pid is logged
-  int wstatus;
-  auto wait_ret = TEMP_FAILURE_RETRY(monitor_entry->proc->Wait(&wstatus, 0));
-  // None of the error conditions specified on waitpid(2) apply
-  assert(wait_ret > 0);
-  if (WIFEXITED(wstatus)) {
-    LOG(INFO) << "Subprocess " << monitor_entry->cmd->GetShortName() << " ("
-              << wait_ret << ") has exited with exit code "
-              << WEXITSTATUS(wstatus);
-  } else if (WIFSIGNALED(wstatus)) {
-    LOG(ERROR) << "Subprocess " << monitor_entry->cmd->GetShortName() << " ("
-               << wait_ret << ") was interrupted by a signal: "
-               << WTERMSIG(wstatus);
-  } else {
-    LOG(INFO) << "subprocess " << monitor_entry->cmd->GetShortName() << " ("
-               << wait_ret << ") has exited for unknown reasons";
-  }
-  monitor_entry->proc.reset(new Subprocess(monitor_entry->cmd->Start(true)));
-}
-
 }  // namespace
 
 ProcessMonitor::ProcessMonitor() {
   if (!SharedFD::SocketPair(AF_LOCAL, SOCK_STREAM, 0, &thread_comm_main_,
-                            &thread_comm_restarter_)) {
+                            &thread_comm_monitor_)) {
     LOG(ERROR) << "Unable to create restarter communication socket pair: "
                << strerror(errno);
     return;
   }
-  restarter_ = std::thread([this]() { RestarterRoutine(); });
+  monitor_thread_ = std::thread([this]() { MonitorRoutine(); });
 }
 
-void ProcessMonitor::StartSubprocess(Command cmd, bool restart_on_exit) {
+void ProcessMonitor::StartSubprocess(Command cmd, OnSocketReadyCb callback) {
   auto proc = cmd.Start(true);
   if (!proc.Started()) {
     LOG(ERROR) << "Failed to start process";
     return;
   }
-  if (restart_on_exit) {
-    {
-      std::lock_guard<std::mutex> lock(processes_mutex_);
-      monitored_processes_.push_back(MonitorEntry());
-      auto& entry = monitored_processes_.back();
-      entry.cmd.reset(new Command(std::move(cmd)));
-      entry.proc.reset(new Subprocess(std::move(proc)));
-    }
-    // Wake the restarter thread up so that it starts monitoring this subprocess
-    // Do this after releasing the lock so that the restarter thread is free to
-    // begin work as soon as select returns.
-    NotifyThread(thread_comm_main_);
+  {
+    std::lock_guard<std::mutex> lock(processes_mutex_);
+    monitored_processes_.push_back(MonitorEntry());
+    auto& entry = monitored_processes_.back();
+    entry.cmd.reset(new Command(std::move(cmd)));
+    entry.proc.reset(new Subprocess(std::move(proc)));
+    entry.on_control_socket_ready_cb = callback;
   }
+  // Wake the restarter thread up so that it starts monitoring this subprocess
+  // Do this after releasing the lock so that the restarter thread is free to
+  // begin work as soon as select returns.
+  NotifyThread(thread_comm_main_);
 }
 
-void ProcessMonitor::RestarterRoutine() {
+bool ProcessMonitor::RestartOnExitCb(MonitorEntry* entry) {
+  // Make sure the process actually exited
+  char buffer[16];
+  auto bytes_read = entry->proc->control_socket()->Read(buffer, sizeof(buffer));
+  if (bytes_read > 0) {
+    LOG(WARNING) << "Subprocess " << entry->cmd->GetShortName() << " wrote "
+                 << bytes_read
+                 << " bytes on the control socket, this is unexpected";
+    // The process may not have exited, continue monitoring without restarting
+    return true;
+  }
+
+  LOG(INFO) << "Detected exit of monitored subprocess";
+  // Make sure the subprocess isn't left in a zombie state, and that the
+  // pid is logged
+  int wstatus;
+  auto wait_ret = TEMP_FAILURE_RETRY(entry->proc->Wait(&wstatus, 0));
+  // None of the error conditions specified on waitpid(2) apply
+  assert(wait_ret > 0);
+  if (WIFEXITED(wstatus)) {
+    LOG(INFO) << "Subprocess " << entry->cmd->GetShortName() << " ("
+              << wait_ret << ") has exited with exit code "
+              << WEXITSTATUS(wstatus);
+  } else if (WIFSIGNALED(wstatus)) {
+    LOG(ERROR) << "Subprocess " << entry->cmd->GetShortName() << " ("
+               << wait_ret << ") was interrupted by a signal: "
+               << WTERMSIG(wstatus);
+  } else {
+    LOG(INFO) << "subprocess " << entry->cmd->GetShortName() << " ("
+               << wait_ret << ") has exited for unknown reasons";
+  }
+  entry->proc.reset(new Subprocess(entry->cmd->Start(true)));
+  return true;
+}
+
+bool ProcessMonitor::DoNotMonitorCb(MonitorEntry*) {
+  return false;
+}
+
+void ProcessMonitor::MonitorRoutine() {
   LOG(INFO) << "Started monitoring subprocesses";
   do {
     SharedFDSet read_set;
-    read_set.Set(thread_comm_restarter_);
+    read_set.Set(thread_comm_monitor_);
     {
       std::lock_guard<std::mutex> lock(processes_mutex_);
       for (auto& monitored_process: monitored_processes_) {
@@ -136,22 +149,27 @@
     }
     if (num_fds > 0) {
       // Try the communication fd, it's the most likely to be set
-      if (read_set.IsSet(thread_comm_restarter_)) {
+      if (read_set.IsSet(thread_comm_monitor_)) {
         --num_fds;
-        ConsumeNotifications(thread_comm_restarter_);
+        ConsumeNotifications(thread_comm_monitor_);
       }
     }
     {
       std::lock_guard<std::mutex> lock(processes_mutex_);
       // Keep track of the number of file descriptors ready for read, chances
       // are we don't need to go over the entire list of subprocesses
-      for (size_t idx = 0; idx < monitored_processes_.size() && num_fds > 0;
-           ++idx) {
-        auto monitor_entry = &monitored_processes_[idx];
-        auto control_socket = monitor_entry->proc->control_socket();
+      auto it = monitored_processes_.begin();
+      while (it != monitored_processes_.end()) {
+        auto control_socket = it->proc->control_socket();
+        bool keep_monitoring = true;
         if (read_set.IsSet(control_socket)) {
           --num_fds;
-          WaitForSubprocessAndRestartIt(monitor_entry);
+          keep_monitoring = it->on_control_socket_ready_cb(&(*it));
+        }
+        if (keep_monitoring) {
+          ++it;
+        } else {
+          it = monitored_processes_.erase(it);
         }
       }
     }
diff --git a/host/commands/launch/process_monitor.h b/host/commands/launch/process_monitor.h
index d4c9c95..b78a196 100644
--- a/host/commands/launch/process_monitor.h
+++ b/host/commands/launch/process_monitor.h
@@ -23,24 +23,37 @@
 #include <common/libs/utils/subprocess.h>
 
 namespace cvd {
+
+struct MonitorEntry;
+using OnSocketReadyCb = std::function<bool(MonitorEntry*)>;
+
 struct MonitorEntry {
   std::unique_ptr<Command> cmd;
   std::unique_ptr<Subprocess> proc;
+  OnSocketReadyCb on_control_socket_ready_cb;
 };
 
 // Keeps track of launched subprocesses, restarts them if they unexpectedly exit
 class ProcessMonitor {
  public:
   ProcessMonitor();
-  void StartSubprocess(Command cmd, bool restart_on_exit = false);
+  // Starts a managed subprocess with a controlling socket. The
+  // on_control_socket_ready_cb callback will be called when data is ready to be
+  // read from the socket or the subprocess has ended. No member functions of
+  // the process monitor object should be called from the callback as it may
+  // lead to a dealock. If the callback returns false the subprocess will no
+  // longer be monitored
+  void StartSubprocess(Command cmd, OnSocketReadyCb on_control_socket_ready_cb);
+  static bool RestartOnExitCb(MonitorEntry* entry);
+  static bool DoNotMonitorCb(MonitorEntry* entry);
 
  private:
-  void RestarterRoutine();
+  void MonitorRoutine();
 
   std::vector<MonitorEntry> monitored_processes_;
   // Used for communication with the restarter thread
-  cvd::SharedFD thread_comm_main_, thread_comm_restarter_;
-  std::thread restarter_;
+  cvd::SharedFD thread_comm_main_, thread_comm_monitor_;
+  std::thread monitor_thread_;
   // Protects access to the monitored_processes_
   std::mutex processes_mutex_;
 };