Use callbacks when adding commands to ProcessMonitor
This allows to specify custom handling of exited subprocesses, which
will be needed when running the e2e test from the launcher.
Bug: 118688323
Test: build & run locally
Change-Id: If5d1a73119651eefbbcd66aea769da8a7649660f
diff --git a/host/commands/launch/main.cc b/host/commands/launch/main.cc
index 0b950e5..a94af3e 100644
--- a/host/commands/launch/main.cc
+++ b/host/commands/launch/main.cc
@@ -314,6 +314,14 @@
return FLAGS_run_adb_connector && AdbTunnelEnabled();
}
+bool OnSubprocessExitCallback(cvd::MonitorEntry* entry) {
+ if (FLAGS_restart_subprocesses) {
+ return cvd::ProcessMonitor::RestartOnExitCb(entry);
+ } else {
+ return cvd::ProcessMonitor::DoNotMonitorCb(entry);
+ }
+}
+
void LaunchUsbServerIfEnabled(const vsoc::CuttlefishConfig& config,
cvd::ProcessMonitor* process_monitor) {
if (!AdbUsbEnabled()) {
@@ -330,7 +338,7 @@
cvd::Command usb_server(FLAGS_virtual_usb_manager_binary);
usb_server.AddParameter("-usb_v1_fd=", usb_v1_server);
process_monitor->StartSubprocess(std::move(usb_server),
- FLAGS_restart_subprocesses);
+ OnSubprocessExitCallback);
}
cvd::Command GetKernelLogMonitorCommand(const vsoc::CuttlefishConfig& config,
@@ -377,7 +385,7 @@
cvd::Command adb_connector(FLAGS_adb_connector_binary);
adb_connector.AddParameter(GetAdbConnectorPortArg());
process_monitor->StartSubprocess(std::move(adb_connector),
- FLAGS_restart_subprocesses);
+ OnSubprocessExitCallback);
}
}
@@ -387,7 +395,7 @@
adb_tunnel.AddParameter(GetGuestPortArg());
adb_tunnel.AddParameter(GetHostPortArg());
process_monitor->StartSubprocess(std::move(adb_tunnel),
- FLAGS_restart_subprocesses);
+ OnSubprocessExitCallback);
}
}
@@ -398,7 +406,7 @@
cvd::Command vnc_server(FLAGS_vnc_server_binary);
vnc_server.AddParameter(port_options);
process_monitor->StartSubprocess(std::move(vnc_server),
- FLAGS_restart_subprocesses);
+ OnSubprocessExitCallback);
}
}
@@ -1008,10 +1016,11 @@
process_monitor.StartSubprocess(
GetKernelLogMonitorCommand(*config, boot_events_pipe),
- FLAGS_restart_subprocesses);
+ OnSubprocessExitCallback);
LaunchUsbServerIfEnabled(*config, &process_monitor);
- process_monitor.StartSubprocess(GetIvServerCommand(*config),
- FLAGS_restart_subprocesses);
+ process_monitor.StartSubprocess(
+ GetIvServerCommand(*config),
+ OnSubprocessExitCallback);
// Initialize the regions that require so before the VM starts.
PreLaunchInitializers::Initialize(*config);
diff --git a/host/commands/launch/process_monitor.cc b/host/commands/launch/process_monitor.cc
index 40393b9..24b50b9 100644
--- a/host/commands/launch/process_monitor.cc
+++ b/host/commands/launch/process_monitor.cc
@@ -51,69 +51,82 @@
fd->Read(buffer, sizeof(buffer));
}
-void WaitForSubprocessAndRestartIt(cvd::MonitorEntry* monitor_entry) {
- // In the future we may want to read from the fd, but for now we
- // assume it just needs restarting
- LOG(INFO) << "Detected exit of monitored subprocess";
- // Make sure the subprocess isn't left in a zombie state, and that the
- // pid is logged
- int wstatus;
- auto wait_ret = TEMP_FAILURE_RETRY(monitor_entry->proc->Wait(&wstatus, 0));
- // None of the error conditions specified on waitpid(2) apply
- assert(wait_ret > 0);
- if (WIFEXITED(wstatus)) {
- LOG(INFO) << "Subprocess " << monitor_entry->cmd->GetShortName() << " ("
- << wait_ret << ") has exited with exit code "
- << WEXITSTATUS(wstatus);
- } else if (WIFSIGNALED(wstatus)) {
- LOG(ERROR) << "Subprocess " << monitor_entry->cmd->GetShortName() << " ("
- << wait_ret << ") was interrupted by a signal: "
- << WTERMSIG(wstatus);
- } else {
- LOG(INFO) << "subprocess " << monitor_entry->cmd->GetShortName() << " ("
- << wait_ret << ") has exited for unknown reasons";
- }
- monitor_entry->proc.reset(new Subprocess(monitor_entry->cmd->Start(true)));
-}
-
} // namespace
ProcessMonitor::ProcessMonitor() {
if (!SharedFD::SocketPair(AF_LOCAL, SOCK_STREAM, 0, &thread_comm_main_,
- &thread_comm_restarter_)) {
+ &thread_comm_monitor_)) {
LOG(ERROR) << "Unable to create restarter communication socket pair: "
<< strerror(errno);
return;
}
- restarter_ = std::thread([this]() { RestarterRoutine(); });
+ monitor_thread_ = std::thread([this]() { MonitorRoutine(); });
}
-void ProcessMonitor::StartSubprocess(Command cmd, bool restart_on_exit) {
+void ProcessMonitor::StartSubprocess(Command cmd, OnSocketReadyCb callback) {
auto proc = cmd.Start(true);
if (!proc.Started()) {
LOG(ERROR) << "Failed to start process";
return;
}
- if (restart_on_exit) {
- {
- std::lock_guard<std::mutex> lock(processes_mutex_);
- monitored_processes_.push_back(MonitorEntry());
- auto& entry = monitored_processes_.back();
- entry.cmd.reset(new Command(std::move(cmd)));
- entry.proc.reset(new Subprocess(std::move(proc)));
- }
- // Wake the restarter thread up so that it starts monitoring this subprocess
- // Do this after releasing the lock so that the restarter thread is free to
- // begin work as soon as select returns.
- NotifyThread(thread_comm_main_);
+ {
+ std::lock_guard<std::mutex> lock(processes_mutex_);
+ monitored_processes_.push_back(MonitorEntry());
+ auto& entry = monitored_processes_.back();
+ entry.cmd.reset(new Command(std::move(cmd)));
+ entry.proc.reset(new Subprocess(std::move(proc)));
+ entry.on_control_socket_ready_cb = callback;
}
+ // Wake the restarter thread up so that it starts monitoring this subprocess
+ // Do this after releasing the lock so that the restarter thread is free to
+ // begin work as soon as select returns.
+ NotifyThread(thread_comm_main_);
}
-void ProcessMonitor::RestarterRoutine() {
+bool ProcessMonitor::RestartOnExitCb(MonitorEntry* entry) {
+ // Make sure the process actually exited
+ char buffer[16];
+ auto bytes_read = entry->proc->control_socket()->Read(buffer, sizeof(buffer));
+ if (bytes_read > 0) {
+ LOG(WARNING) << "Subprocess " << entry->cmd->GetShortName() << " wrote "
+ << bytes_read
+ << " bytes on the control socket, this is unexpected";
+ // The process may not have exited, continue monitoring without restarting
+ return true;
+ }
+
+ LOG(INFO) << "Detected exit of monitored subprocess";
+ // Make sure the subprocess isn't left in a zombie state, and that the
+ // pid is logged
+ int wstatus;
+ auto wait_ret = TEMP_FAILURE_RETRY(entry->proc->Wait(&wstatus, 0));
+ // None of the error conditions specified on waitpid(2) apply
+ assert(wait_ret > 0);
+ if (WIFEXITED(wstatus)) {
+ LOG(INFO) << "Subprocess " << entry->cmd->GetShortName() << " ("
+ << wait_ret << ") has exited with exit code "
+ << WEXITSTATUS(wstatus);
+ } else if (WIFSIGNALED(wstatus)) {
+ LOG(ERROR) << "Subprocess " << entry->cmd->GetShortName() << " ("
+ << wait_ret << ") was interrupted by a signal: "
+ << WTERMSIG(wstatus);
+ } else {
+ LOG(INFO) << "subprocess " << entry->cmd->GetShortName() << " ("
+ << wait_ret << ") has exited for unknown reasons";
+ }
+ entry->proc.reset(new Subprocess(entry->cmd->Start(true)));
+ return true;
+}
+
+bool ProcessMonitor::DoNotMonitorCb(MonitorEntry*) {
+ return false;
+}
+
+void ProcessMonitor::MonitorRoutine() {
LOG(INFO) << "Started monitoring subprocesses";
do {
SharedFDSet read_set;
- read_set.Set(thread_comm_restarter_);
+ read_set.Set(thread_comm_monitor_);
{
std::lock_guard<std::mutex> lock(processes_mutex_);
for (auto& monitored_process: monitored_processes_) {
@@ -136,22 +149,27 @@
}
if (num_fds > 0) {
// Try the communication fd, it's the most likely to be set
- if (read_set.IsSet(thread_comm_restarter_)) {
+ if (read_set.IsSet(thread_comm_monitor_)) {
--num_fds;
- ConsumeNotifications(thread_comm_restarter_);
+ ConsumeNotifications(thread_comm_monitor_);
}
}
{
std::lock_guard<std::mutex> lock(processes_mutex_);
// Keep track of the number of file descriptors ready for read, chances
// are we don't need to go over the entire list of subprocesses
- for (size_t idx = 0; idx < monitored_processes_.size() && num_fds > 0;
- ++idx) {
- auto monitor_entry = &monitored_processes_[idx];
- auto control_socket = monitor_entry->proc->control_socket();
+ auto it = monitored_processes_.begin();
+ while (it != monitored_processes_.end()) {
+ auto control_socket = it->proc->control_socket();
+ bool keep_monitoring = true;
if (read_set.IsSet(control_socket)) {
--num_fds;
- WaitForSubprocessAndRestartIt(monitor_entry);
+ keep_monitoring = it->on_control_socket_ready_cb(&(*it));
+ }
+ if (keep_monitoring) {
+ ++it;
+ } else {
+ it = monitored_processes_.erase(it);
}
}
}
diff --git a/host/commands/launch/process_monitor.h b/host/commands/launch/process_monitor.h
index d4c9c95..b78a196 100644
--- a/host/commands/launch/process_monitor.h
+++ b/host/commands/launch/process_monitor.h
@@ -23,24 +23,37 @@
#include <common/libs/utils/subprocess.h>
namespace cvd {
+
+struct MonitorEntry;
+using OnSocketReadyCb = std::function<bool(MonitorEntry*)>;
+
struct MonitorEntry {
std::unique_ptr<Command> cmd;
std::unique_ptr<Subprocess> proc;
+ OnSocketReadyCb on_control_socket_ready_cb;
};
// Keeps track of launched subprocesses, restarts them if they unexpectedly exit
class ProcessMonitor {
public:
ProcessMonitor();
- void StartSubprocess(Command cmd, bool restart_on_exit = false);
+ // Starts a managed subprocess with a controlling socket. The
+ // on_control_socket_ready_cb callback will be called when data is ready to be
+ // read from the socket or the subprocess has ended. No member functions of
+ // the process monitor object should be called from the callback as it may
+ // lead to a dealock. If the callback returns false the subprocess will no
+ // longer be monitored
+ void StartSubprocess(Command cmd, OnSocketReadyCb on_control_socket_ready_cb);
+ static bool RestartOnExitCb(MonitorEntry* entry);
+ static bool DoNotMonitorCb(MonitorEntry* entry);
private:
- void RestarterRoutine();
+ void MonitorRoutine();
std::vector<MonitorEntry> monitored_processes_;
// Used for communication with the restarter thread
- cvd::SharedFD thread_comm_main_, thread_comm_restarter_;
- std::thread restarter_;
+ cvd::SharedFD thread_comm_main_, thread_comm_monitor_;
+ std::thread monitor_thread_;
// Protects access to the monitored_processes_
std::mutex processes_mutex_;
};