Gracefully shutdown worker threads when destructing GRPC VHAL

Test: build
Bug: 181371253
Change-Id: I8931293bdcaa975cdda0f9e5b8f4075951b1ce36
diff --git a/hal/vehicle/2.0/GarageModeServerSideHandler.cpp b/hal/vehicle/2.0/GarageModeServerSideHandler.cpp
index 785e668..b19b1df 100644
--- a/hal/vehicle/2.0/GarageModeServerSideHandler.cpp
+++ b/hal/vehicle/2.0/GarageModeServerSideHandler.cpp
@@ -27,6 +27,7 @@
 #include <android-base/logging.h>
 #include <utils/SystemClock.h>
 
+#include "Utils.h"
 #include "vhal_v2_0/VehicleUtils.h"
 
 namespace android::hardware::automotive::vehicle::V2_0::impl {
@@ -156,8 +157,11 @@
     [[maybe_unused]] struct inotify_event& inotifyEvent =
             *reinterpret_cast<struct inotify_event*>(inotifyEventBuffer);
 
+    HandleNewPowerState();
     while (!mShuttingDownFlag.load()) {
-        HandleNewPowerState();
+        if (!WaitForReadWithTimeout(inotifyFd, kFileStatusCheckPeriod)) {
+            continue;
+        }
 
         auto eventReadLen = read(inotifyFd, inotifyEventBuffer, sizeof(inotifyEventBuffer));
         if (eventReadLen < 0) {
@@ -169,6 +173,7 @@
                        << sizeof(struct inotify_event) << ", read size: " << eventReadLen;
             return;
         }
+        HandleNewPowerState();
     }
 }
 
diff --git a/hal/vehicle/2.0/GrpcVehicleServer.cpp b/hal/vehicle/2.0/GrpcVehicleServer.cpp
index 3ffa6dd..0a7ec9f 100644
--- a/hal/vehicle/2.0/GrpcVehicleServer.cpp
+++ b/hal/vehicle/2.0/GrpcVehicleServer.cpp
@@ -48,7 +48,13 @@
     }
 
     // method from GrpcVehicleServer
-    void Start() override;
+    GrpcVehicleServer& Start() override;
+
+    void Wait() override;
+
+    GrpcVehicleServer& Stop() override;
+
+    uint32_t NumOfActivePropertyValueStream() override;
 
     // methods from IVehicleServer
     void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
@@ -102,9 +108,11 @@
     };
 
     std::string mServiceAddr;
+    std::unique_ptr<::grpc::Server> mServer{nullptr};
     VehiclePropValuePool mValueObjectPool;
     std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
     PowerStateListener mPowerStateListener;
+    std::thread mPowerStateListenerThread{};
     mutable std::shared_mutex mConnectionMutex;
     mutable std::shared_mutex mWriterMutex;
     std::list<ConnectionDescriptor> mValueStreamingConnections;
@@ -121,19 +129,51 @@
     return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
 }
 
-void GrpcVehicleServerImpl::Start() {
+GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
+    if (mServer) {
+        LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
+        return *this;
+    }
+
     ::grpc::ServerBuilder builder;
     builder.RegisterService(this);
     builder.AddListeningPort(mServiceAddr, getServerCredentials());
-    std::unique_ptr<::grpc::Server> server(builder.BuildAndStart());
+    mServer = builder.BuildAndStart();
 
-    CHECK(server) << __func__ << ": failed to create the GRPC server, "
-                  << "please make sure the configuration and permissions are correct";
+    CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
+                   << "please make sure the configuration and permissions are correct";
 
-    std::thread powerStateListenerThread([this]() { mPowerStateListener.Listen(); });
+    mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
+    return *this;
+}
 
-    server->Wait();
-    powerStateListenerThread.join();
+void GrpcVehicleServerImpl::Wait() {
+    if (mServer) {
+        mServer->Wait();
+    }
+
+    if (mPowerStateListenerThread.joinable()) {
+        mPowerStateListenerThread.join();
+    }
+
+    mPowerStateListenerThread = {};
+    mServer.reset();
+}
+
+GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
+    if (!mServer) {
+        LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
+        return *this;
+    }
+
+    mServer->Shutdown();
+    mPowerStateListener.Stop();
+    return *this;
+}
+
+uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
+    std::shared_lock read_lock(mConnectionMutex);
+    return mValueStreamingConnections.size();
 }
 
 void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
diff --git a/hal/vehicle/2.0/GrpcVehicleServer.h b/hal/vehicle/2.0/GrpcVehicleServer.h
index 9acd5c6..9c8c731 100644
--- a/hal/vehicle/2.0/GrpcVehicleServer.h
+++ b/hal/vehicle/2.0/GrpcVehicleServer.h
@@ -31,8 +31,17 @@
 // Connect to the Vehicle Client via GRPC
 class GrpcVehicleServer : public VehicleHalServer {
   public:
-    // Start listening incoming calls, should never return if working normally
-    virtual void Start() = 0;
+    // Start listening incoming calls
+    virtual GrpcVehicleServer& Start() = 0;
+
+    // Wait until error or Stop is called
+    virtual void Wait() = 0;
+
+    // Stop the server
+    virtual GrpcVehicleServer& Stop() = 0;
+
+    // Methods for unit tests
+    virtual uint32_t NumOfActivePropertyValueStream() = 0;
 };
 
 using GrpcVehicleServerPtr = std::unique_ptr<GrpcVehicleServer>;
diff --git a/hal/vehicle/2.0/PowerStateListener.cpp b/hal/vehicle/2.0/PowerStateListener.cpp
index 7087344..dbb5886 100644
--- a/hal/vehicle/2.0/PowerStateListener.cpp
+++ b/hal/vehicle/2.0/PowerStateListener.cpp
@@ -16,6 +16,8 @@
 
 #include "PowerStateListener.h"
 
+#include <chrono>
+
 #include <sys/socket.h>
 #include <sys/stat.h>
 #include <sys/types.h>
@@ -25,6 +27,8 @@
 
 #include <android-base/logging.h>
 
+#include "Utils.h"
+
 namespace android::hardware::automotive::vehicle::V2_0::impl {
 
 static bool ForwardSocketToFile(int sockfd, const std::string& filePath) {
@@ -67,10 +71,12 @@
     : mSocketPath(socketPath), mPowerStateMarkerFilePath(powerStateMarkerFilePath) {}
 
 void PowerStateListener::Listen() {
+    using std::literals::chrono_literals::operator""s;
+
     // Newly created files are not accessible by other users
     umask(0077);
 
-    int socketfd = socket(AF_UNIX, SOCK_STREAM, 0);
+    int socketfd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
 
     if (socketfd < 0) {
         LOG(ERROR) << __func__ << ": failed to create UNIX socket: " << strerror(errno);
@@ -100,14 +106,35 @@
         return;
     }
 
-    int fd;
-    socklen_t socklen = sizeof(addr);
-    while ((fd = accept(socketfd, reinterpret_cast<sockaddr*>(&addr), &socklen)) != -1) {
-        if (!ForwardSocketToFile(fd, mPowerStateMarkerFilePath)) {
-            return;
+    constexpr auto kSocketCheckPeriod = 1s;
+
+    while (!mShuttingDownFlag.load()) {
+        if (!WaitForReadWithTimeout(socketfd, kSocketCheckPeriod)) {
+            continue;
         }
+
+        socklen_t socklen = sizeof(addr);
+        int fd = accept(socketfd, reinterpret_cast<sockaddr*>(&addr), &socklen);
+
+        if (fd == -1) {
+            if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                PLOG(ERROR) << __func__ << ": failed to accept, path: " << mSocketPath;
+            }
+            continue;
+        }
+
+        if (!ForwardSocketToFile(fd, mPowerStateMarkerFilePath)) {
+            LOG(ERROR) << __func__ << ": failed to forward power state, "
+                       << "path: " << mPowerStateMarkerFilePath;
+            continue;
+        }
+
         close(fd);
     }
 }
 
+void PowerStateListener::Stop() {
+    mShuttingDownFlag.store(true);
+}
+
 }  // namespace android::hardware::automotive::vehicle::V2_0::impl
diff --git a/hal/vehicle/2.0/PowerStateListener.h b/hal/vehicle/2.0/PowerStateListener.h
index c6a48cb..002487f 100644
--- a/hal/vehicle/2.0/PowerStateListener.h
+++ b/hal/vehicle/2.0/PowerStateListener.h
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <string>
 
 namespace android::hardware::automotive::vehicle::V2_0::impl {
@@ -31,7 +32,10 @@
 
     void Listen();
 
+    void Stop();
+
   private:
+    std::atomic<bool> mShuttingDownFlag{false};
     const std::string mSocketPath;
     const std::string mPowerStateMarkerFilePath;
 };
diff --git a/hal/vehicle/2.0/Utils.cpp b/hal/vehicle/2.0/Utils.cpp
index dd4ac98..cd4ba81 100644
--- a/hal/vehicle/2.0/Utils.cpp
+++ b/hal/vehicle/2.0/Utils.cpp
@@ -51,6 +51,20 @@
 }
 #endif
 
+bool WaitForReadWithTimeout(int fd, struct timeval&& timeout) {
+    fd_set read_fd_set;
+    FD_ZERO(&read_fd_set);
+    FD_SET(fd, &read_fd_set);
+    auto ready = select(FD_SETSIZE, &read_fd_set, nullptr, nullptr, &timeout);
+
+    if (ready < 0) {
+        cerr << __func__ << ": fd: " << fd << ", errno: " << errno << ", " << strerror(errno)
+             << endl;
+        return false;
+    }
+    return ready > 0;
+}
+
 static std::optional<unsigned> parseUnsignedIntFromString(const char* optarg, const char* name) {
     auto v = strtoul(optarg, nullptr, 0);
     if (((v == ULONG_MAX) && (errno == ERANGE)) || (v > UINT_MAX)) {
diff --git a/hal/vehicle/2.0/Utils.h b/hal/vehicle/2.0/Utils.h
index 44d11cb..7b8f15e 100644
--- a/hal/vehicle/2.0/Utils.h
+++ b/hal/vehicle/2.0/Utils.h
@@ -17,6 +17,7 @@
 #ifndef android_hardware_automotive_vehicle_V2_0_impl_virtualization_Utils_H_
 #define android_hardware_automotive_vehicle_V2_0_impl_virtualization_Utils_H_
 
+#include <chrono>
 #include <optional>
 #include <string>
 
@@ -33,6 +34,28 @@
 namespace V2_0 {
 namespace impl {
 
+template <class duration_t>
+constexpr struct timeval TimeValFromChronoDuration(duration_t duration) {
+    using std::micro;
+    using std::chrono::duration_cast;
+    using std::chrono::microseconds;
+    using std::chrono::seconds;
+    return {
+            .tv_sec = static_cast<time_t>(duration_cast<seconds>(duration).count()),
+            .tv_usec = static_cast<suseconds_t>(duration_cast<microseconds>(duration).count() %
+                                                micro::den),
+    };
+}
+
+// True means fd is ready, False means timeout
+bool WaitForReadWithTimeout(int fd, struct timeval&& timeout);
+
+// True means fd is ready, False means timeout
+template <class duration_t>
+bool WaitForReadWithTimeout(int fd, duration_t timeout) {
+    return WaitForReadWithTimeout(fd, TimeValFromChronoDuration(timeout));
+}
+
 struct VirtualizedVhalServerInfo {
 #ifdef __BIONIC__
     android::hardware::automotive::utils::VsockConnectionInfo vsock;
diff --git a/hal/vehicle/2.0/VirtualizationGrpcServer.cpp b/hal/vehicle/2.0/VirtualizationGrpcServer.cpp
index 2ca5cd7..c988b4b 100644
--- a/hal/vehicle/2.0/VirtualizationGrpcServer.cpp
+++ b/hal/vehicle/2.0/VirtualizationGrpcServer.cpp
@@ -27,6 +27,6 @@
     CHECK(serverInfo.has_value()) << err;
 
     auto server = vhal_impl::makeGrpcVehicleServer(*serverInfo);
-    server->Start();
+    server->Start().Wait();
     return 0;
 }