Handle OnStart timeouts gracefully

The current behaviour is that we require all tracing sessions with the
oome data source to successfully connect before we flush the data.
This is not ideal because a session might be configured to reject the
trigger based on the producer name (or timeout for any other reason). We
still want to flush to connected data sources.

Flag: EXEMPT bugfix
Test: atest HeapprofdJavaCtsTest, manual test CTS (concurrent with tracing session using producer filters)
Bug: 368365125
Change-Id: I999b81293d0992128fa1bfe47377fe11f5cc04ed
diff --git a/perfetto_hprof/perfetto_hprof.cc b/perfetto_hprof/perfetto_hprof.cc
index 0602437..f4ea0de 100644
--- a/perfetto_hprof/perfetto_hprof.cc
+++ b/perfetto_hprof/perfetto_hprof.cc
@@ -91,7 +91,7 @@
 static int requested_tracing_session_id = 0;
 static State g_state = State::kUninitialized;
 static bool g_oome_triggered = false;
-static uint32_t g_oome_sessions_pending = 0;
+static uint32_t g_oome_sessions_started = 0;
 
 // Pipe to signal from the signal handler into a worker thread that handles the
 // dump requests.
@@ -187,10 +187,11 @@
   constexpr static perfetto::BufferExhaustedPolicy kBufferExhaustedPolicy =
     perfetto::BufferExhaustedPolicy::kStall;
 
-  explicit JavaHprofDataSource(bool is_oome_heap) : is_oome_heap_(is_oome_heap) {}
+  explicit JavaHprofDataSource(uint32_t oome_sessions_pending)
+      : oome_sessions_pending_(oome_sessions_pending) {}
 
   void OnSetup(const SetupArgs& args) override {
-    if (!is_oome_heap_) {
+    if (!IsOome()) {
       uint64_t normalized_tracing_session_id =
         args.config->tracing_session_id() % std::numeric_limits<int32_t>::max();
       if (requested_tracing_session_id < 0) {
@@ -214,8 +215,7 @@
     }
     // This tracing session ID matches the requesting tracing session ID, so we know heapprofd
     // has verified it targets this process.
-    enabled_ =
-        !is_oome_heap_ || (IsOomeHeapDumpAllowed(*args.config) && IsOomeDumpEnabled(*cfg.get()));
+    enabled_ = !IsOome() || (IsOomeHeapDumpAllowed(*args.config) && IsOomeDumpEnabled(*cfg.get()));
   }
 
   bool dump_smaps() { return dump_smaps_; }
@@ -225,17 +225,17 @@
 
   void OnStart(const StartArgs&) override {
     art::MutexLock lk(art_thread(), GetStateMutex());
-    // In case there are multiple tracing sessions waiting for an OOME error,
-    // there will be a data source instance for each of them. Before the
-    // transition to kStart and signaling the dumping thread, we need to make
-    // sure all the data sources are ready.
-    if (is_oome_heap_ && g_oome_sessions_pending > 0) {
-      --g_oome_sessions_pending;
+    if (IsOome()) {
+      // In case there are multiple tracing sessions waiting for an OOME error,
+      // there will be a data source instance for each of them. Before the
+      // transition to kStart and signaling the dumping thread, we need to make
+      // sure all the data sources are ready.
+      ++g_oome_sessions_started;
     }
     if (g_state == State::kWaitForStart) {
-      // WriteHeapPackets is responsible for checking whether the DataSource is\
+      // WriteHeapPackets is responsible for checking whether the DataSource is
       // actually enabled.
-      if (!is_oome_heap_ || g_oome_sessions_pending == 0) {
+      if (!IsOome() || g_oome_sessions_started == oome_sessions_pending_) {
         g_state = State::kStart;
         GetStateCV().Broadcast(art_thread());
       }
@@ -279,6 +279,8 @@
   }
 
  private:
+  bool IsOome() const { return oome_sessions_pending_ > 0; }
+
   static bool IsOomeDumpEnabled(const perfetto::protos::pbzero::JavaHprofConfig::Decoder& cfg) {
     std::string cmdline;
     if (!android::base::ReadFileToString("/proc/self/cmdline", &cmdline)) {
@@ -295,7 +297,7 @@
     return false;
   }
 
-  bool is_oome_heap_ = false;
+  uint32_t oome_sessions_pending_ = 0;
   bool enabled_ = false;
   bool dump_smaps_ = false;
   std::vector<std::string> ignored_types_;
@@ -306,7 +308,7 @@
   std::function<void()> async_stop_;
 };
 
-void SetupDataSource(const std::string& ds_name, bool is_oome_heap) {
+void SetupDataSource(const std::string& ds_name, uint32_t oome_sessions_pending) {
   perfetto::TracingInitArgs args;
   args.backends = perfetto::BackendType::kSystemBackend;
   perfetto::Tracing::Initialize(args);
@@ -314,7 +316,7 @@
   perfetto::DataSourceDescriptor dsd;
   dsd.set_name(ds_name);
   dsd.set_will_notify_on_stop(true);
-  JavaHprofDataSource::Register(dsd, is_oome_heap);
+  JavaHprofDataSource::Register(dsd, oome_sessions_pending);
 }
 
 // Waits for the data source OnStart
@@ -1118,7 +1120,7 @@
       // Make sure that this is the first thing we do after forking, so if anything
       // below hangs, the fork will go away from the watchdog.
       ArmWatchdogOrDie();
-      SetupDataSource("android.java_hprof", false);
+      SetupDataSource("android.java_hprof", /* oome_sessions_pending= */ 0);
       WaitForDataSource(self);
       WriteHeapPackets(dumped_pid, timestamp);
       LOG(INFO) << "finished dumping heap for " << dumped_pid;
@@ -1146,7 +1148,6 @@
       return;
     }
     g_oome_triggered = true;
-    g_oome_sessions_pending = session_cnt;
   }
 
   art::ScopedThreadSuspension sts(self, art::ThreadState::kSuspended);
@@ -1172,21 +1173,25 @@
       BusyWaitpid(child, kWatchdogTimeoutSec * 1000);
     },
     // child process
-    [self](pid_t dumped_pid, uint64_t timestamp) {
+    [self, session_cnt](pid_t dumped_pid, uint64_t timestamp) {
       ArmWatchdogOrDie();
       art::SetThreadName("perfetto_oome_hprof");
       art::ScopedTrace trace("perfetto_hprof oome");
-      SetupDataSource("android.java_hprof.oom", true);
-      perfetto::Tracing::ActivateTriggers({"com.android.telemetry.art-outofmemory"}, 500);
+      SetupDataSource("android.java_hprof.oom", session_cnt);
+      perfetto::Tracing::ActivateTriggers({"com.android.telemetry.art-outofmemory"}, 1000);
 
-      // A pre-armed tracing session might not exist, so we should wait for a
-      // limited amount of time before we decide to let the execution continue.
-      if (!TimedWaitForDataSource(self, 1000)) {
-        LOG(INFO) << "OOME hprof timeout (state " << g_state << ")";
-        return;
+      // We know that there are > 0 tracing sessions waiting for the oome data source.
+      // However they could be configured to filter the trigger (e.g. based on the producer regex)
+      // We should wait for a limited amount of time and still flush to established sessions.
+      TimedWaitForDataSource(self, 1000);
+      if (g_oome_sessions_started > 0) {
+        WriteHeapPackets(dumped_pid, timestamp);
+        LOG(INFO) << "OOME hprof complete for " << dumped_pid << ", written to "
+                  << g_oome_sessions_started << "/" << session_cnt << " sessions";
+      } else {
+        LOG(INFO) << "OOME hprof ds setup timeout for " << dumped_pid << "(g_state: "
+          << g_state << ", g_oome_sessions_started: " << g_oome_sessions_started << ")";
       }
-      WriteHeapPackets(dumped_pid, timestamp);
-      LOG(INFO) << "OOME hprof complete for " << dumped_pid;
     });
 }