libbinder: RPC read connection info on 2nd thread

Don't allow the main server thread to be blocked when reading a new ID.
Gotta be available for reading new clients.

Future consideration will be timeouts for reads/writes or trying to
detect DOS here in general, but this CL is what is needed for the
fuzzer.

Bug: 185167543
Bug: 182938024
Test: binderRpcTest
Change-Id: I8ba4a6ce6d1d07f3c36ac554ccb6cc403a15db2b
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index c0cdcd6..3a98cad 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -132,38 +132,12 @@
         }
         LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
 
-        // TODO(b/183988761): cannot trust this simple ID, should not block this
-        // thread
-        LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
-        int32_t id;
-        if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
-            ALOGE("Could not read ID from fd %d", clientFd.get());
-            continue;
-        }
-
         {
             std::lock_guard<std::mutex> _l(mLock);
-
-            sp<RpcSession> session;
-            if (id == RPC_SESSION_ID_NEW) {
-                // new client!
-                LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs");
-                mSessionIdCounter++;
-
-                session = RpcSession::make();
-                session->setForServer(wp<RpcServer>::fromExisting(this), mSessionIdCounter);
-
-                mSessions[mSessionIdCounter] = session;
-            } else {
-                auto it = mSessions.find(id);
-                if (it == mSessions.end()) {
-                    ALOGE("Cannot add thread, no record of session with ID %d", id);
-                    continue;
-                }
-                session = it->second;
-            }
-
-            session->startThread(std::move(clientFd));
+            std::thread thread =
+                    std::thread(&RpcServer::establishConnection, this,
+                                std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd));
+            mConnectingThreads[thread.get_id()] = std::move(thread);
         }
     }
 }
@@ -178,6 +152,56 @@
     return sessions;
 }
 
+void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
+    LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object");
+
+    // TODO(b/183988761): cannot trust this simple ID
+    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+    int32_t id;
+    if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
+        ALOGE("Could not read ID from fd %d", clientFd.get());
+        return;
+    }
+
+    std::thread thisThread;
+    sp<RpcSession> session;
+    {
+        std::lock_guard<std::mutex> _l(mLock);
+
+        auto threadId = mConnectingThreads.find(std::this_thread::get_id());
+        LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(),
+                            "Must establish connection on owned thread");
+        thisThread = std::move(threadId->second);
+        mConnectingThreads.erase(threadId);
+
+        if (id == RPC_SESSION_ID_NEW) {
+            LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs");
+            mSessionIdCounter++;
+
+            session = RpcSession::make();
+            session->setForServer(wp<RpcServer>::fromExisting(this), mSessionIdCounter);
+
+            mSessions[mSessionIdCounter] = session;
+        } else {
+            auto it = mSessions.find(id);
+            if (it == mSessions.end()) {
+                ALOGE("Cannot add thread, no record of session with ID %d", id);
+                return;
+            }
+            session = it->second;
+        }
+    }
+
+    // avoid strong cycle
+    server = nullptr;
+    //
+    //
+    // DO NOT ACCESS MEMBER VARIABLES BELOW
+    //
+
+    session->join(std::move(thisThread), std::move(clientFd));
+}
+
 bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
     LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str());
 
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index f38135b..f32aa7a 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -131,24 +131,14 @@
     return OK;
 }
 
-void RpcSession::startThread(unique_fd client) {
-    std::lock_guard<std::mutex> _l(mMutex);
-    sp<RpcSession> holdThis = sp<RpcSession>::fromExisting(this);
-    int fd = client.release();
-    auto thread = std::thread([=] {
-        holdThis->join(unique_fd(fd));
-        {
-            std::lock_guard<std::mutex> _l(holdThis->mMutex);
-            auto it = mThreads.find(std::this_thread::get_id());
-            LOG_ALWAYS_FATAL_IF(it == mThreads.end());
-            it->second.detach();
-            mThreads.erase(it);
-        }
-    });
-    mThreads[thread.get_id()] = std::move(thread);
-}
+void RpcSession::join(std::thread thread, unique_fd client) {
+    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
 
-void RpcSession::join(unique_fd client) {
+    {
+        std::lock_guard<std::mutex> _l(mMutex);
+        mThreads[thread.get_id()] = std::move(thread);
+    }
+
     // must be registered to allow arbitrary client code executing commands to
     // be able to do nested calls (we can't only read from it)
     sp<RpcConnection> connection = assignServerToThisThread(std::move(client));
@@ -165,6 +155,14 @@
 
     LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
                         "bad state: connection object guaranteed to be in list");
+
+    {
+        std::lock_guard<std::mutex> _l(mMutex);
+        auto it = mThreads.find(std::this_thread::get_id());
+        LOG_ALWAYS_FATAL_IF(it == mThreads.end());
+        it->second.detach();
+        mThreads.erase(it);
+    }
 }
 
 void RpcSession::terminateLocked() {
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 3534d51..b1e5519 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -22,6 +22,7 @@
 #include <utils/RefBase.h>
 
 #include <mutex>
+#include <thread>
 
 // WARNING: This is a feature which is still in development, and it is subject
 // to radical change. Any production use of this may subject your code to any
@@ -115,6 +116,7 @@
     friend sp<RpcServer>;
     RpcServer();
 
+    void establishConnection(sp<RpcServer>&& session, base::unique_fd clientFd);
     bool setupSocketServer(const RpcSocketAddress& address);
 
     bool mAgreedExperimental = false;
@@ -123,6 +125,7 @@
     base::unique_fd mServer; // socket we are accepting sessions on
 
     std::mutex mLock; // for below
+    std::map<std::thread::id, std::thread> mConnectingThreads;
     sp<IBinder> mRootObject;
     std::map<int32_t, sp<RpcSession>> mSessions;
     int32_t mSessionIdCounter = 0;
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index c8ab9e4..92ee100 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -114,8 +114,7 @@
 
     status_t readId();
 
-    void startThread(base::unique_fd client);
-    void join(base::unique_fd client);
+    void join(std::thread thread, base::unique_fd client);
     void terminateLocked();
 
     struct RpcConnection : public RefBase {