apexd: Activates apexes in parallel

Activates apexes in parallel on multi-core devices to reduce the boot
time.

- add a mutex for mounted_apexes_
- add a mutex for getting free loopback device
- seperate activatePackageImpl() from ActivateApexPackages()
- run activatePackageImpl() asynchronously

Test: build pass
Test: boot on device and cuttlefish
Test: atest ApexTestCases passed on Cuttlefish
Signed-off-by: Suchang Woo <suchang.woo@samsung.com>
Change-Id: I80ccdd5f13c160b996132761e947bf62fdf64e44
diff --git a/apexd/apex_database.cpp b/apexd/apex_database.cpp
index 0f9502c..fe777ab 100644
--- a/apexd/apex_database.cpp
+++ b/apexd/apex_database.cpp
@@ -250,13 +250,15 @@
 // By synchronizing the mounts info with Database on startup,
 // Apexd serves the correct package list even on the devices
 // which are not ro.apex.updatable.
-void MountedApexDatabase::PopulateFromMounts() {
+void MountedApexDatabase::PopulateFromMounts()
+    REQUIRES(!mounted_apexes_mutex_) {
   LOG(INFO) << "Populating APEX database from mounts...";
 
   std::unordered_map<std::string, int> active_versions;
 
   std::ifstream mounts("/proc/mounts");
   std::string line;
+  std::lock_guard lock(mounted_apexes_mutex_);
   while (std::getline(mounts, line)) {
     auto [block, mount_point] = ParseMountInfo(line);
     // TODO(b/158469914): distinguish between temp and non-temp mounts
@@ -274,12 +276,12 @@
     }
 
     auto [package, version] = ParseMountPoint(mount_point);
-    AddMountedApex(package, false, *mount_data);
+    AddMountedApexLocked(package, false, *mount_data);
 
     auto active = active_versions[package] < version;
     if (active) {
       active_versions[package] = version;
-      SetLatest(package, mount_data->full_path);
+      SetLatestLocked(package, mount_data->full_path);
     }
     LOG(INFO) << "Found " << mount_point << " backed by"
               << (mount_data->deleted ? " deleted " : " ") << "file "
diff --git a/apexd/apex_database.h b/apexd/apex_database.h
index 9342995..d00c5fa 100644
--- a/apexd/apex_database.h
+++ b/apexd/apex_database.h
@@ -18,11 +18,13 @@
 #define ANDROID_APEXD_APEX_DATABASE_H_
 
 #include <map>
+#include <mutex>
 #include <string>
 #include <unordered_set>
 
 #include <android-base/logging.h>
 #include <android-base/result.h>
+#include <android-base/thread_annotations.h>
 
 using android::base::Error;
 using android::base::Result;
@@ -90,8 +92,9 @@
   };
 
   template <typename... Args>
-  inline void AddMountedApex(const std::string& package, bool latest,
-                             Args&&... args) {
+  inline void AddMountedApexLocked(const std::string& package, bool latest,
+                                   Args&&... args)
+      REQUIRES(mounted_apexes_mutex_) {
     auto it = mounted_apexes_.find(package);
     if (it == mounted_apexes_.end()) {
       auto insert_it =
@@ -108,9 +111,18 @@
     CheckUniqueLoopDm();
   }
 
+  template <typename... Args>
+  inline void AddMountedApex(const std::string& package, bool latest,
+                             Args&&... args) REQUIRES(!mounted_apexes_mutex_) {
+    std::lock_guard lock(mounted_apexes_mutex_);
+    AddMountedApexLocked(package, latest, args...);
+  }
+
   inline void RemoveMountedApex(const std::string& package,
                                 const std::string& full_path,
-                                bool match_temp_mounts = false) {
+                                bool match_temp_mounts = false)
+      REQUIRES(!mounted_apexes_mutex_) {
+    std::lock_guard lock(mounted_apexes_mutex_);
     auto it = mounted_apexes_.find(package);
     if (it == mounted_apexes_.end()) {
       return;
@@ -128,7 +140,15 @@
   }
 
   inline void SetLatest(const std::string& package,
-                        const std::string& full_path) {
+                        const std::string& full_path)
+      REQUIRES(!mounted_apexes_mutex_) {
+    std::lock_guard lock(mounted_apexes_mutex_);
+    SetLatestLocked(package, full_path);
+  }
+
+  inline void SetLatestLocked(const std::string& package,
+                              const std::string& full_path)
+      REQUIRES(mounted_apexes_mutex_) {
     auto it = mounted_apexes_.find(package);
     CHECK(it != mounted_apexes_.end());
 
@@ -152,7 +172,9 @@
 
   template <typename T>
   inline void ForallMountedApexes(const std::string& package, const T& handler,
-                                  bool match_temp_mounts = false) const {
+                                  bool match_temp_mounts = false) const
+      REQUIRES(!mounted_apexes_mutex_) {
+    std::lock_guard lock(mounted_apexes_mutex_);
     auto it = mounted_apexes_.find(package);
     if (it == mounted_apexes_.end()) {
       return;
@@ -166,7 +188,9 @@
 
   template <typename T>
   inline void ForallMountedApexes(const T& handler,
-                                  bool match_temp_mounts = false) const {
+                                  bool match_temp_mounts = false) const
+      REQUIRES(!mounted_apexes_mutex_) {
+    std::lock_guard lock(mounted_apexes_mutex_);
     for (const auto& pkg : mounted_apexes_) {
       for (const auto& pair : pkg.second) {
         if (pair.first.is_temp_mount == match_temp_mounts) {
@@ -185,9 +209,18 @@
   //         b) do not have to const_cast (over std::set)
   // TODO(b/158467745): This structure (and functions) need to be guarded by
   //   locks.
-  std::map<std::string, std::map<MountedApexData, bool>> mounted_apexes_;
+  std::map<std::string, std::map<MountedApexData, bool>> mounted_apexes_
+      GUARDED_BY(mounted_apexes_mutex_);
 
-  inline void CheckAtMostOneLatest() {
+  // To fix thread safety negative capability warning
+  class Mutex : public std::mutex {
+   public:
+    // for negative capabilities
+    const Mutex& operator!() const { return *this; }
+  };
+  mutable Mutex mounted_apexes_mutex_;
+
+  inline void CheckAtMostOneLatest() REQUIRES(mounted_apexes_mutex_) {
     for (const auto& apex_set : mounted_apexes_) {
       size_t count = 0;
       for (const auto& pair : apex_set.second) {
@@ -199,7 +232,7 @@
     }
   }
 
-  inline void CheckUniqueLoopDm() {
+  inline void CheckUniqueLoopDm() REQUIRES(mounted_apexes_mutex_) {
     std::unordered_set<std::string> loop_devices;
     std::unordered_set<std::string> dm_devices;
     for (const auto& apex_set : mounted_apexes_) {
diff --git a/apexd/apexd.cpp b/apexd/apexd.cpp
index 98a0d07..a65736d 100644
--- a/apexd/apexd.cpp
+++ b/apexd/apexd.cpp
@@ -59,6 +59,7 @@
 #include <sys/ioctl.h>
 #include <sys/mount.h>
 #include <sys/stat.h>
+#include <sys/sysinfo.h>
 #include <sys/types.h>
 #include <unistd.h>
 
@@ -68,10 +69,13 @@
 #include <cstdlib>
 #include <filesystem>
 #include <fstream>
+#include <future>
 #include <iomanip>
 #include <iterator>
 #include <memory>
+#include <mutex>
 #include <optional>
+#include <queue>
 #include <string>
 #include <thread>
 #include <unordered_map>
@@ -1432,11 +1436,36 @@
   return ret;
 }
 
+std::vector<Result<void>> ActivateApexWorker(
+    std::queue<const ApexFile*>& apex_queue, std::mutex& mutex) {
+  std::vector<Result<void>> ret;
+
+  while (true) {
+    const ApexFile* apex;
+    {
+      std::lock_guard lock(mutex);
+      if (apex_queue.empty()) break;
+      apex = apex_queue.front();
+      apex_queue.pop();
+    }
+
+    if (auto res = ActivatePackageImpl(*apex); !res.ok()) {
+      ret.push_back(Error() << "Failed to activate " << apex->GetPath() << " : "
+                            << res.error());
+    } else {
+      ret.push_back({});
+    }
+  }
+
+  return ret;
+}
+
 Result<void> ActivateApexPackages(const std::vector<ApexFile>& apexes) {
   const auto& packages_with_code = GetActivePackagesMap();
-  size_t failed_cnt = 0;
   size_t skipped_cnt = 0;
-  size_t activated_cnt = 0;
+  std::queue<const ApexFile*> apex_queue;
+  std::mutex apex_queue_mutex;
+
   for (const auto& apex : apexes) {
     if (!apex.GetManifest().providesharedapexlibs()) {
       uint64_t new_version =
@@ -1451,14 +1480,33 @@
       }
     }
 
-    if (auto res = ActivatePackageImpl(apex); !res.ok()) {
-      LOG(ERROR) << "Failed to activate " << apex.GetPath() << " : "
-                 << res.error();
-      failed_cnt++;
-    } else {
-      activated_cnt++;
+    apex_queue.emplace(&apex);
+  }
+
+  // Creates threads as many as half number of cores for the performance.
+  size_t worker_num = std::max(get_nprocs_conf() >> 1, 1);
+  worker_num = std::min(apex_queue.size(), worker_num);
+
+  std::vector<std::future<std::vector<Result<void>>>> futures;
+  futures.reserve(worker_num);
+  for (size_t i = 0; i < worker_num; i++)
+    futures.push_back(std::async(std::launch::async, ActivateApexWorker,
+                                 std::ref(apex_queue),
+                                 std::ref(apex_queue_mutex)));
+
+  size_t activated_cnt = 0;
+  size_t failed_cnt = 0;
+  for (size_t i = 0; i < futures.size(); i++) {
+    for (const auto& res : futures[i].get()) {
+      if (res.ok()) {
+        ++activated_cnt;
+      } else {
+        ++failed_cnt;
+        LOG(ERROR) << res.error();
+      }
     }
   }
+
   if (failed_cnt > 0) {
     return Error() << "Failed to activate " << failed_cnt << " APEX packages";
   }
diff --git a/apexd/apexd_loop.cpp b/apexd/apexd_loop.cpp
index 8a23fbf..5641f10 100644
--- a/apexd/apexd_loop.cpp
+++ b/apexd/apexd_loop.cpp
@@ -306,6 +306,8 @@
     return ErrnoError() << "Failed to open loop-control";
   }
 
+  static std::mutex mlock;
+  std::lock_guard lock(mlock);
   int num = ioctl(ctl_fd.get(), LOOP_CTL_GET_FREE);
   if (num == -1) {
     return ErrnoError() << "Failed LOOP_CTL_GET_FREE";