Grpc Graph - fix use after free

GrpcGraph initializes StreamSetObserver - which triggers a thread to
notify GrpcGraph of termination. If GrpcGraph is destroyed, this will
result in use after free. Fix this by enforcing that GrpcGraph object is
not destroyed before StreamSetObserver.

Bug: 170407229
Test: build, unit test and fuzz test
Change-Id: I8b24f72af60c441ebd7a31939769c46589431cfe
Merged-In: I8b24f72af60c441ebd7a31939769c46589431cfe
(cherry picked from commit 3f53bf0525bd5c21ff6cc3235b93cb26a7866ea8)
diff --git a/computepipe/runner/graph/GrpcGraph.cpp b/computepipe/runner/graph/GrpcGraph.cpp
index 6967230..efd56c9 100644
--- a/computepipe/runner/graph/GrpcGraph.cpp
+++ b/computepipe/runner/graph/GrpcGraph.cpp
@@ -65,6 +65,10 @@
 
 }  // namespace
 
+GrpcGraph::~GrpcGraph() {
+    mStreamSetObserver.reset();
+}
+
 PrebuiltGraphState GrpcGraph::GetGraphState() const {
     std::lock_guard lock(mLock);
     return mGraphState;
diff --git a/computepipe/runner/graph/GrpcGraph.h b/computepipe/runner/graph/GrpcGraph.h
index d833dd2..0805e35 100644
--- a/computepipe/runner/graph/GrpcGraph.h
+++ b/computepipe/runner/graph/GrpcGraph.h
@@ -45,7 +45,7 @@
   public:
     GrpcGraph() {}
 
-    virtual ~GrpcGraph() {}
+    virtual ~GrpcGraph();
 
     Status initialize(const std::string& address,
                       std::weak_ptr<PrebuiltEngineInterface> engineInterface);
diff --git a/computepipe/runner/graph/StreamSetObserver.cpp b/computepipe/runner/graph/StreamSetObserver.cpp
index 30e3b54..2586b51 100644
--- a/computepipe/runner/graph/StreamSetObserver.cpp
+++ b/computepipe/runner/graph/StreamSetObserver.cpp
@@ -157,10 +157,9 @@
     std::unique_lock lock(mLock);
     if (mStopped) {
         // Separate thread is necessary here to avoid recursive locking.
-        std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
-            streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
+        mGraphTerminationThread = std::thread([this]() {
+            this->mStreamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
         });
-        t.detach();
         return;
     }
 
@@ -184,10 +183,16 @@
     if (mStreamObservers.empty()) {
         mStopped = true;
         mStoppedCv.notify_one();
-        std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
+        mGraphTerminationThread = std::thread([streamGraphInterface(mStreamGraphInterface)]() {
             streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
         });
-        t.detach();
+    }
+}
+
+StreamSetObserver::~StreamSetObserver() {
+    std::unique_lock lock(mLock);
+    if (mGraphTerminationThread.joinable()) {
+        mGraphTerminationThread.join();
     }
 }
 
diff --git a/computepipe/runner/graph/StreamSetObserver.h b/computepipe/runner/graph/StreamSetObserver.h
index 7999f18..fe3f40f 100644
--- a/computepipe/runner/graph/StreamSetObserver.h
+++ b/computepipe/runner/graph/StreamSetObserver.h
@@ -78,6 +78,8 @@
 
 class StreamSetObserver : public EndOfStreamReporter {
   public:
+    virtual ~StreamSetObserver();
+
     StreamSetObserver(const runner::ClientConfig& clientConfig,
                       StreamGraphInterface* streamGraphInterface);
 
@@ -92,6 +94,7 @@
     std::map<int, std::unique_ptr<SingleStreamObserver>> mStreamObservers;
     std::mutex mLock;
     std::condition_variable mStoppedCv;
+    std::thread mGraphTerminationThread;
     bool mStopped = true;
 };