Gracefully shutdown worker threads when destructing GRPC VHAL
Test: build
Bug: 181371253
Change-Id: I8931293bdcaa975cdda0f9e5b8f4075951b1ce36
diff --git a/hal/vehicle/2.0/GarageModeServerSideHandler.cpp b/hal/vehicle/2.0/GarageModeServerSideHandler.cpp
index 785e668..b19b1df 100644
--- a/hal/vehicle/2.0/GarageModeServerSideHandler.cpp
+++ b/hal/vehicle/2.0/GarageModeServerSideHandler.cpp
@@ -27,6 +27,7 @@
#include <android-base/logging.h>
#include <utils/SystemClock.h>
+#include "Utils.h"
#include "vhal_v2_0/VehicleUtils.h"
namespace android::hardware::automotive::vehicle::V2_0::impl {
@@ -156,8 +157,11 @@
[[maybe_unused]] struct inotify_event& inotifyEvent =
*reinterpret_cast<struct inotify_event*>(inotifyEventBuffer);
+ HandleNewPowerState();
while (!mShuttingDownFlag.load()) {
- HandleNewPowerState();
+ if (!WaitForReadWithTimeout(inotifyFd, kFileStatusCheckPeriod)) {
+ continue;
+ }
auto eventReadLen = read(inotifyFd, inotifyEventBuffer, sizeof(inotifyEventBuffer));
if (eventReadLen < 0) {
@@ -169,6 +173,7 @@
<< sizeof(struct inotify_event) << ", read size: " << eventReadLen;
return;
}
+ HandleNewPowerState();
}
}
diff --git a/hal/vehicle/2.0/GrpcVehicleServer.cpp b/hal/vehicle/2.0/GrpcVehicleServer.cpp
index 3ffa6dd..0a7ec9f 100644
--- a/hal/vehicle/2.0/GrpcVehicleServer.cpp
+++ b/hal/vehicle/2.0/GrpcVehicleServer.cpp
@@ -48,7 +48,13 @@
}
// method from GrpcVehicleServer
- void Start() override;
+ GrpcVehicleServer& Start() override;
+
+ void Wait() override;
+
+ GrpcVehicleServer& Stop() override;
+
+ uint32_t NumOfActivePropertyValueStream() override;
// methods from IVehicleServer
void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
@@ -102,9 +108,11 @@
};
std::string mServiceAddr;
+ std::unique_ptr<::grpc::Server> mServer{nullptr};
VehiclePropValuePool mValueObjectPool;
std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
PowerStateListener mPowerStateListener;
+ std::thread mPowerStateListenerThread{};
mutable std::shared_mutex mConnectionMutex;
mutable std::shared_mutex mWriterMutex;
std::list<ConnectionDescriptor> mValueStreamingConnections;
@@ -121,19 +129,51 @@
return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
}
-void GrpcVehicleServerImpl::Start() {
+GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
+ if (mServer) {
+ LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
+ return *this;
+ }
+
::grpc::ServerBuilder builder;
builder.RegisterService(this);
builder.AddListeningPort(mServiceAddr, getServerCredentials());
- std::unique_ptr<::grpc::Server> server(builder.BuildAndStart());
+ mServer = builder.BuildAndStart();
- CHECK(server) << __func__ << ": failed to create the GRPC server, "
- << "please make sure the configuration and permissions are correct";
+ CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
+ << "please make sure the configuration and permissions are correct";
- std::thread powerStateListenerThread([this]() { mPowerStateListener.Listen(); });
+ mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
+ return *this;
+}
- server->Wait();
- powerStateListenerThread.join();
+void GrpcVehicleServerImpl::Wait() {
+ if (mServer) {
+ mServer->Wait();
+ }
+
+ if (mPowerStateListenerThread.joinable()) {
+ mPowerStateListenerThread.join();
+ }
+
+ mPowerStateListenerThread = {};
+ mServer.reset();
+}
+
+GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
+ if (!mServer) {
+ LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
+ return *this;
+ }
+
+ mServer->Shutdown();
+ mPowerStateListener.Stop();
+ return *this;
+}
+
+uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
+ std::shared_lock read_lock(mConnectionMutex);
+ return mValueStreamingConnections.size();
}
void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
diff --git a/hal/vehicle/2.0/GrpcVehicleServer.h b/hal/vehicle/2.0/GrpcVehicleServer.h
index 9acd5c6..9c8c731 100644
--- a/hal/vehicle/2.0/GrpcVehicleServer.h
+++ b/hal/vehicle/2.0/GrpcVehicleServer.h
@@ -31,8 +31,17 @@
// Connect to the Vehicle Client via GRPC
class GrpcVehicleServer : public VehicleHalServer {
public:
- // Start listening incoming calls, should never return if working normally
- virtual void Start() = 0;
+ // Start listening incoming calls
+ virtual GrpcVehicleServer& Start() = 0;
+
+ // Wait until error or Stop is called
+ virtual void Wait() = 0;
+
+ // Stop the server
+ virtual GrpcVehicleServer& Stop() = 0;
+
+ // Methods for unit tests
+ virtual uint32_t NumOfActivePropertyValueStream() = 0;
};
using GrpcVehicleServerPtr = std::unique_ptr<GrpcVehicleServer>;
diff --git a/hal/vehicle/2.0/PowerStateListener.cpp b/hal/vehicle/2.0/PowerStateListener.cpp
index 7087344..dbb5886 100644
--- a/hal/vehicle/2.0/PowerStateListener.cpp
+++ b/hal/vehicle/2.0/PowerStateListener.cpp
@@ -16,6 +16,8 @@
#include "PowerStateListener.h"
+#include <chrono>
+
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
@@ -25,6 +27,8 @@
#include <android-base/logging.h>
+#include "Utils.h"
+
namespace android::hardware::automotive::vehicle::V2_0::impl {
static bool ForwardSocketToFile(int sockfd, const std::string& filePath) {
@@ -67,10 +71,12 @@
: mSocketPath(socketPath), mPowerStateMarkerFilePath(powerStateMarkerFilePath) {}
void PowerStateListener::Listen() {
+ using std::literals::chrono_literals::operator""s;
+
// Newly created files are not accessible by other users
umask(0077);
- int socketfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ int socketfd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (socketfd < 0) {
LOG(ERROR) << __func__ << ": failed to create UNIX socket: " << strerror(errno);
@@ -100,14 +106,35 @@
return;
}
- int fd;
- socklen_t socklen = sizeof(addr);
- while ((fd = accept(socketfd, reinterpret_cast<sockaddr*>(&addr), &socklen)) != -1) {
- if (!ForwardSocketToFile(fd, mPowerStateMarkerFilePath)) {
- return;
+ constexpr auto kSocketCheckPeriod = 1s;
+
+ while (!mShuttingDownFlag.load()) {
+ if (!WaitForReadWithTimeout(socketfd, kSocketCheckPeriod)) {
+ continue;
}
+
+ socklen_t socklen = sizeof(addr);
+ int fd = accept(socketfd, reinterpret_cast<sockaddr*>(&addr), &socklen);
+
+ if (fd == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << __func__ << ": failed to accept, path: " << mSocketPath;
+ }
+ continue;
+ }
+
+ if (!ForwardSocketToFile(fd, mPowerStateMarkerFilePath)) {
+ LOG(ERROR) << __func__ << ": failed to forward power state, "
+ << "path: " << mPowerStateMarkerFilePath;
+ continue;
+ }
+
close(fd);
}
}
+void PowerStateListener::Stop() {
+ mShuttingDownFlag.store(true);
+}
+
} // namespace android::hardware::automotive::vehicle::V2_0::impl
diff --git a/hal/vehicle/2.0/PowerStateListener.h b/hal/vehicle/2.0/PowerStateListener.h
index c6a48cb..002487f 100644
--- a/hal/vehicle/2.0/PowerStateListener.h
+++ b/hal/vehicle/2.0/PowerStateListener.h
@@ -16,6 +16,7 @@
#pragma once
+#include <atomic>
#include <string>
namespace android::hardware::automotive::vehicle::V2_0::impl {
@@ -31,7 +32,10 @@
void Listen();
+ void Stop();
+
private:
+ std::atomic<bool> mShuttingDownFlag{false};
const std::string mSocketPath;
const std::string mPowerStateMarkerFilePath;
};
diff --git a/hal/vehicle/2.0/Utils.cpp b/hal/vehicle/2.0/Utils.cpp
index dd4ac98..cd4ba81 100644
--- a/hal/vehicle/2.0/Utils.cpp
+++ b/hal/vehicle/2.0/Utils.cpp
@@ -51,6 +51,20 @@
}
#endif
+bool WaitForReadWithTimeout(int fd, struct timeval&& timeout) {
+ fd_set read_fd_set;
+ FD_ZERO(&read_fd_set);
+ FD_SET(fd, &read_fd_set);
+ auto ready = select(FD_SETSIZE, &read_fd_set, nullptr, nullptr, &timeout);
+
+ if (ready < 0) {
+ cerr << __func__ << ": fd: " << fd << ", errno: " << errno << ", " << strerror(errno)
+ << endl;
+ return false;
+ }
+ return ready > 0;
+}
+
static std::optional<unsigned> parseUnsignedIntFromString(const char* optarg, const char* name) {
auto v = strtoul(optarg, nullptr, 0);
if (((v == ULONG_MAX) && (errno == ERANGE)) || (v > UINT_MAX)) {
diff --git a/hal/vehicle/2.0/Utils.h b/hal/vehicle/2.0/Utils.h
index 44d11cb..7b8f15e 100644
--- a/hal/vehicle/2.0/Utils.h
+++ b/hal/vehicle/2.0/Utils.h
@@ -17,6 +17,7 @@
#ifndef android_hardware_automotive_vehicle_V2_0_impl_virtualization_Utils_H_
#define android_hardware_automotive_vehicle_V2_0_impl_virtualization_Utils_H_
+#include <chrono>
#include <optional>
#include <string>
@@ -33,6 +34,28 @@
namespace V2_0 {
namespace impl {
+template <class duration_t>
+constexpr struct timeval TimeValFromChronoDuration(duration_t duration) {
+ using std::micro;
+ using std::chrono::duration_cast;
+ using std::chrono::microseconds;
+ using std::chrono::seconds;
+ return {
+ .tv_sec = static_cast<time_t>(duration_cast<seconds>(duration).count()),
+ .tv_usec = static_cast<suseconds_t>(duration_cast<microseconds>(duration).count() %
+ micro::den),
+ };
+}
+
+// True means fd is ready, False means timeout
+bool WaitForReadWithTimeout(int fd, struct timeval&& timeout);
+
+// True means fd is ready, False means timeout
+template <class duration_t>
+bool WaitForReadWithTimeout(int fd, duration_t timeout) {
+ return WaitForReadWithTimeout(fd, TimeValFromChronoDuration(timeout));
+}
+
struct VirtualizedVhalServerInfo {
#ifdef __BIONIC__
android::hardware::automotive::utils::VsockConnectionInfo vsock;
diff --git a/hal/vehicle/2.0/VirtualizationGrpcServer.cpp b/hal/vehicle/2.0/VirtualizationGrpcServer.cpp
index 2ca5cd7..c988b4b 100644
--- a/hal/vehicle/2.0/VirtualizationGrpcServer.cpp
+++ b/hal/vehicle/2.0/VirtualizationGrpcServer.cpp
@@ -27,6 +27,6 @@
CHECK(serverInfo.has_value()) << err;
auto server = vhal_impl::makeGrpcVehicleServer(*serverInfo);
- server->Start();
+ server->Start().Wait();
return 0;
}