libbinder: RPC read connection info on 2nd thread
Don't allow the main server thread to be blocked when reading a new ID.
Gotta be available for reading new clients.
Future consideration will be timeouts for reads/writes or trying to
detect DOS here in general, but this CL is what is needed for the
fuzzer.
Bug: 185167543
Bug: 182938024
Test: binderRpcTest
Change-Id: I8ba4a6ce6d1d07f3c36ac554ccb6cc403a15db2b
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index c0cdcd6..3a98cad 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -132,38 +132,12 @@
}
LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
- // TODO(b/183988761): cannot trust this simple ID, should not block this
- // thread
- LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
- int32_t id;
- if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
- ALOGE("Could not read ID from fd %d", clientFd.get());
- continue;
- }
-
{
std::lock_guard<std::mutex> _l(mLock);
-
- sp<RpcSession> session;
- if (id == RPC_SESSION_ID_NEW) {
- // new client!
- LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs");
- mSessionIdCounter++;
-
- session = RpcSession::make();
- session->setForServer(wp<RpcServer>::fromExisting(this), mSessionIdCounter);
-
- mSessions[mSessionIdCounter] = session;
- } else {
- auto it = mSessions.find(id);
- if (it == mSessions.end()) {
- ALOGE("Cannot add thread, no record of session with ID %d", id);
- continue;
- }
- session = it->second;
- }
-
- session->startThread(std::move(clientFd));
+ std::thread thread =
+ std::thread(&RpcServer::establishConnection, this,
+ std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd));
+ mConnectingThreads[thread.get_id()] = std::move(thread);
}
}
}
@@ -178,6 +152,56 @@
return sessions;
}
+void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
+ LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object");
+
+ // TODO(b/183988761): cannot trust this simple ID
+ LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+ int32_t id;
+ if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
+ ALOGE("Could not read ID from fd %d", clientFd.get());
+ return;
+ }
+
+ std::thread thisThread;
+ sp<RpcSession> session;
+ {
+ std::lock_guard<std::mutex> _l(mLock);
+
+ auto threadId = mConnectingThreads.find(std::this_thread::get_id());
+ LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(),
+ "Must establish connection on owned thread");
+ thisThread = std::move(threadId->second);
+ mConnectingThreads.erase(threadId);
+
+ if (id == RPC_SESSION_ID_NEW) {
+ LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs");
+ mSessionIdCounter++;
+
+ session = RpcSession::make();
+ session->setForServer(wp<RpcServer>::fromExisting(this), mSessionIdCounter);
+
+ mSessions[mSessionIdCounter] = session;
+ } else {
+ auto it = mSessions.find(id);
+ if (it == mSessions.end()) {
+ ALOGE("Cannot add thread, no record of session with ID %d", id);
+ return;
+ }
+ session = it->second;
+ }
+ }
+
+ // avoid strong cycle
+ server = nullptr;
+ //
+ //
+ // DO NOT ACCESS MEMBER VARIABLES BELOW
+ //
+
+ session->join(std::move(thisThread), std::move(clientFd));
+}
+
bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str());
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index f38135b..f32aa7a 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -131,24 +131,14 @@
return OK;
}
-void RpcSession::startThread(unique_fd client) {
- std::lock_guard<std::mutex> _l(mMutex);
- sp<RpcSession> holdThis = sp<RpcSession>::fromExisting(this);
- int fd = client.release();
- auto thread = std::thread([=] {
- holdThis->join(unique_fd(fd));
- {
- std::lock_guard<std::mutex> _l(holdThis->mMutex);
- auto it = mThreads.find(std::this_thread::get_id());
- LOG_ALWAYS_FATAL_IF(it == mThreads.end());
- it->second.detach();
- mThreads.erase(it);
- }
- });
- mThreads[thread.get_id()] = std::move(thread);
-}
+void RpcSession::join(std::thread thread, unique_fd client) {
+ LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
-void RpcSession::join(unique_fd client) {
+ {
+ std::lock_guard<std::mutex> _l(mMutex);
+ mThreads[thread.get_id()] = std::move(thread);
+ }
+
// must be registered to allow arbitrary client code executing commands to
// be able to do nested calls (we can't only read from it)
sp<RpcConnection> connection = assignServerToThisThread(std::move(client));
@@ -165,6 +155,14 @@
LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
"bad state: connection object guaranteed to be in list");
+
+ {
+ std::lock_guard<std::mutex> _l(mMutex);
+ auto it = mThreads.find(std::this_thread::get_id());
+ LOG_ALWAYS_FATAL_IF(it == mThreads.end());
+ it->second.detach();
+ mThreads.erase(it);
+ }
}
void RpcSession::terminateLocked() {
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 3534d51..b1e5519 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -22,6 +22,7 @@
#include <utils/RefBase.h>
#include <mutex>
+#include <thread>
// WARNING: This is a feature which is still in development, and it is subject
// to radical change. Any production use of this may subject your code to any
@@ -115,6 +116,7 @@
friend sp<RpcServer>;
RpcServer();
+ void establishConnection(sp<RpcServer>&& session, base::unique_fd clientFd);
bool setupSocketServer(const RpcSocketAddress& address);
bool mAgreedExperimental = false;
@@ -123,6 +125,7 @@
base::unique_fd mServer; // socket we are accepting sessions on
std::mutex mLock; // for below
+ std::map<std::thread::id, std::thread> mConnectingThreads;
sp<IBinder> mRootObject;
std::map<int32_t, sp<RpcSession>> mSessions;
int32_t mSessionIdCounter = 0;
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index c8ab9e4..92ee100 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -114,8 +114,7 @@
status_t readId();
- void startThread(base::unique_fd client);
- void join(base::unique_fd client);
+ void join(std::thread thread, base::unique_fd client);
void terminateLocked();
struct RpcConnection : public RefBase {