Use NotifyDatasourceStopped for perfetto_hprof.

This takes the guesswork out of how long we need to run the trace for.
Now, we can have a short trace duration (but long enough for the fork +
datasource init to happen), and a long datasource_stop_timeout. This
way, the trace will only take as long as it needs to.

Bug: 159594916
Change-Id: I0fa4219f454e91476833297342a11009e0e27435
diff --git a/perfetto_hprof/perfetto_hprof.cc b/perfetto_hprof/perfetto_hprof.cc
index 7fa1c33..c2fe1f5 100644
--- a/perfetto_hprof/perfetto_hprof.cc
+++ b/perfetto_hprof/perfetto_hprof.cc
@@ -247,7 +247,20 @@
     }
   }
 
-  void OnStop(const StopArgs&) override {}
+  // This datasource can be used with a trace config with a short duration_ms
+  // but a long datasource_stop_timeout_ms. In that case, OnStop is called (in
+  // general) before the dump is done. In that case, we handle the stop
+  // asynchronously, and notify the tracing service once we are done.
+  // In case OnStop is called after the dump is done (but before the process)
+  // has exited, we just acknowledge the request.
+  void OnStop(const StopArgs& a) override {
+    art::MutexLock lk(art_thread(), finish_mutex_);
+    if (is_finished_) {
+      return;
+    }
+    is_stopped_ = true;
+    async_stop_ = std::move(a.HandleStopAsynchronously());
+  }
 
   static art::Thread* art_thread() {
     // TODO(fmayer): Attach the Perfetto producer thread to ART and give it a name. This is
@@ -261,11 +274,25 @@
 
   std::vector<std::string> ignored_types() { return ignored_types_; }
 
+  void Finish() {
+    art::MutexLock lk(art_thread(), finish_mutex_);
+    if (is_stopped_) {
+      async_stop_();
+    } else {
+      is_finished_ = true;
+    }
+  }
+
  private:
   bool enabled_ = false;
   bool dump_smaps_ = false;
   std::vector<std::string> ignored_types_;
   static art::Thread* self_;
+
+  art::Mutex finish_mutex_{"perfetto_hprof_ds_mutex", art::LockLevel::kGenericBottomLock};
+  bool is_finished_ = false;
+  bool is_stopped_ = false;
+  std::function<void()> async_stop_;
 };
 
 art::Thread* JavaHprofDataSource::self_ = nullptr;
@@ -278,6 +305,7 @@
 
   perfetto::DataSourceDescriptor dsd;
   dsd.set_name("android.java_hprof");
+  dsd.set_will_notify_on_stop(true);
   JavaHprofDataSource::Register(dsd);
 
   LOG(INFO) << "waiting for data source";
@@ -585,6 +613,7 @@
             {
               auto ds = ctx.GetDataSourceLocked();
               if (!ds || !ds->enabled()) {
+                if (ds) ds->Finish();
                 LOG(INFO) << "skipping irrelevant data source.";
                 return;
               }
@@ -757,7 +786,6 @@
             }
 
             writer.Finalize();
-
             ctx.Flush([] {
               {
                 art::MutexLock lk(JavaHprofDataSource::art_thread(), GetStateMutex());
@@ -765,12 +793,24 @@
                 GetStateCV().Broadcast(JavaHprofDataSource::art_thread());
               }
             });
+            // Wait for the Flush that will happen on the Perfetto thread.
+            {
+              art::MutexLock lk(JavaHprofDataSource::art_thread(), GetStateMutex());
+              while (g_state != State::kEnd) {
+                GetStateCV().Wait(JavaHprofDataSource::art_thread());
+              }
+            }
+            {
+              auto ds = ctx.GetDataSourceLocked();
+              if (ds) {
+                ds->Finish();
+              } else {
+                LOG(ERROR) << "datasource timed out (duration_ms + datasource_stop_timeout_ms) "
+                              "before dump finished";
+              }
+            }
           });
 
-  art::MutexLock lk(self, GetStateMutex());
-  while (g_state != State::kEnd) {
-    GetStateCV().Wait(self);
-  }
   LOG(INFO) << "finished dumping heap for " << parent_pid;
   // Prevent the atexit handlers to run. We do not want to call cleanup
   // functions the parent process has registered.