Better monitoring for dropped device connections

Rather than look at the devices list, issues an adb shell command to
check uptime.

Bug: 79876086
Test: local boot, boot two devices on gce and see them both connect
Change-Id: I3b243a933d17a0e2fde9fedab7cdf375c94df195
diff --git a/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp b/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp
index 525ea7c..0d695b6 100644
--- a/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp
+++ b/host/libs/adb_connection_maintainer/adb_connection_maintainer.cpp
@@ -13,10 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+#include <cctype>
 #include <iomanip>
 #include <sstream>
 #include <string>
 #include <memory>
+#include <vector>
 #include <glog/logging.h>
 
 #include <unistd.h>
@@ -27,11 +30,9 @@
 namespace {
 
 std::string MakeMessage(const std::string& user_message) {
-  static constexpr char kPrefix[] = "host:";
-  static constexpr std::size_t kPrefixLen = sizeof kPrefix - 1;
   std::ostringstream ss;
-  ss << std::setfill('0') << std::setw(4) << std::hex
-     << (kPrefixLen + user_message.size()) << kPrefix << user_message;
+  ss << std::setfill('0') << std::setw(4) << std::hex << user_message.size()
+     << user_message;
   return ss.str();
 }
 
@@ -40,11 +41,24 @@
   return kLocalHostPrefix + std::to_string(port);
 }
 
+std::string MakeShellUptimeMessage() {
+  return MakeMessage("shell,raw:cut -d. -f1 /proc/uptime");
+}
+
+std::string MakeTransportMessage(int port) {
+  return MakeMessage("host:transport:" + MakeIPAndPort(port));
+}
+
 std::string MakeConnectMessage(int port) {
-  static constexpr char kConnectPrefix[] = "connect:";
+  static constexpr char kConnectPrefix[] = "host:connect:";
   return MakeMessage(kConnectPrefix + MakeIPAndPort(port));
 }
 
+std::string MakeDisconnectMessage(int port) {
+  static constexpr char kDisonnectPrefix[] = "host:disconnect:";
+  return MakeMessage(kDisonnectPrefix + MakeIPAndPort(port));
+}
+
 // returns true if successfully sent the whole message
 bool SendAll(cvd::SharedFD sock, const std::string& msg) {
   ssize_t total_written{};
@@ -68,6 +82,7 @@
   while (total_read < count) {
     auto just_read = sock->Read(data.get() + total_read, count - total_read);
     if (just_read <= 0) {
+      LOG(WARNING) << "adb daemon socket closed early";
       return {};
     }
     total_read += just_read;
@@ -85,48 +100,119 @@
 
 constexpr int kAdbDaemonPort = 5037;
 
-bool AdbConnect(cvd::SharedFD sock, int port) {
-  if (!SendAll(sock, MakeConnectMessage(port))) {
+bool AdbSendMessage(cvd::SharedFD sock, const std::string& message) {
+  if (!sock->IsOpen()) {
+    LOG(INFO) << "failed to connect to adb daemon";
+    return false;
+  }
+  if (!SendAll(sock, message)) {
+    LOG(WARNING) << "failed to send all bytes to adb daemon";
     return false;
   }
   return RecvAll(sock, kAdbStatusResponseLength) == kAdbOkayStatusResponse;
 }
 
+bool AdbSendMessage(const std::string& message) {
+  auto sock = cvd::SharedFD::SocketLocalClient(kAdbDaemonPort, SOCK_STREAM);
+  return AdbSendMessage(sock, message);
+}
+
+bool AdbConnect(int port) { return AdbSendMessage(MakeConnectMessage(port)); }
+
+bool AdbDisconnect(int port) {
+  return AdbSendMessage(MakeDisconnectMessage(port));
+}
+
+bool IsInteger(const std::string& str) {
+  return !str.empty() && std::all_of(str.begin(), str.end(),
+                                     [](char c) { return std::isdigit(c); });
+}
+
 // assumes the OKAY/FAIL status has already been read
 std::string RecvAdbResponse(cvd::SharedFD sock) {
   auto length_as_hex_str = RecvAll(sock, kAdbMessageLengthLength);
+  if (!IsInteger(length_as_hex_str)) {
+    return {};
+  }
   auto length = std::stoi(length_as_hex_str, nullptr, 16);
   return RecvAll(sock, length);
 }
 
+// Returns a negative value if uptime result couldn't be read for
+// any reason.
+int RecvUptimeResult(cvd::SharedFD sock) {
+  std::vector<char> uptime_vec{};
+  std::vector<char> just_read(16);
+  do {
+    auto count = sock->Read(just_read.data(), just_read.size());
+    if (count < 0) {
+      LOG(INFO) << "couldn't receive adb shell output";
+      return -1;
+    }
+    just_read.resize(count);
+    uptime_vec.insert(uptime_vec.end(), just_read.begin(), just_read.end());
+  } while (!just_read.empty());
+
+  if (uptime_vec.empty()) {
+    LOG(INFO) << "empty adb shell result";
+    return -1;
+  }
+
+  uptime_vec.pop_back();
+
+  auto uptime_str = std::string{uptime_vec.data(), uptime_vec.size()};
+  if (!IsInteger(uptime_str)) {
+    LOG(INFO) << "non-numeric: uptime result: " << uptime_str;
+    return -1;
+  }
+
+  return std::stoi(uptime_str);
+}
+
+// There needs to be a gap between the adb commands, the daemon isn't able to
+// handle the avalanche of requests we would be sending without a sleep. Five
+// seconds is much larger than seems necessary so we should be more than okay.
+static constexpr int kAdbCommandGapTime = 5;
+
 void EstablishConnection(int port) {
   while (true) {
     LOG(INFO) << "Attempting to connect to device on port " << port;
-    auto sock = cvd::SharedFD::SocketLocalClient(kAdbDaemonPort, SOCK_STREAM);
-    if (sock->IsOpen() && AdbConnect(sock, port)) {
-      LOG(INFO) << "connection attempted to device on port " << port;
+    if (AdbConnect(port)) {
+      LOG(DEBUG) << "adb connect message for " << port << " successfully sent";
       break;
     }
-    sleep(2);
+    sleep(kAdbCommandGapTime);
   }
+  sleep(kAdbCommandGapTime);
 }
 
 void WaitForAdbDisconnection(int port) {
+  // adb daemon doesn't seem to handle quick, successive messages well. The
+  // sleeps stabilize the communication.
   LOG(INFO) << "Watching for disconnect on port " << port;
   while (true) {
     auto sock = cvd::SharedFD::SocketLocalClient(kAdbDaemonPort, SOCK_STREAM);
-    if (!SendAll(sock, MakeMessage("devices"))) {
+    if (!AdbSendMessage(sock, MakeTransportMessage(port))) {
+      LOG(INFO) << "transport message failed, response body: "
+                << RecvAdbResponse(sock);
       break;
     }
-    if (RecvAll(sock, 4) != kAdbOkayStatusResponse) {
+    if (!AdbSendMessage(sock, MakeShellUptimeMessage())) {
+      LOG(INFO) << "adb shell uptime message failed";
       break;
     }
-    auto devices_str = RecvAdbResponse(sock);
-    if (devices_str.find(MakeIPAndPort(port)) == std::string::npos) {
+
+    auto uptime = RecvUptimeResult(sock);
+    if (uptime < 0) {
+      LOG(INFO) << "couldn't read uptime result";
       break;
     }
-    sleep(2);
+    LOG(DEBUG) << "device on port " << port << " uptime " << uptime;
+    sleep(kAdbCommandGapTime);
   }
+  LOG(INFO) << "Sending adb disconnect";
+  AdbDisconnect(port);
+  sleep(kAdbCommandGapTime);
 }
 
 }  // namespace