bufferpool: Handle pending bufferpool messages

Ensure pending buffer messages are processed even if bufferpool process
is not doing anything.

Bug: 72651719
Change-Id: I13faad1a642413d6ade1211c9c50ad841e61fd6d
diff --git a/codec2/vndk/bufferpool/AccessorImpl.cpp b/codec2/vndk/bufferpool/AccessorImpl.cpp
index dd7eca5..06d49c1 100644
--- a/codec2/vndk/bufferpool/AccessorImpl.cpp
+++ b/codec2/vndk/bufferpool/AccessorImpl.cpp
@@ -484,6 +484,9 @@
         const native_handle_t** handle) {
 
     BufferId bufferId = mSeq++;
+    if (mSeq == Connection::SYNC_BUFFERID) {
+        mSeq = 0;
+    }
     std::unique_ptr<InternalBuffer> buffer =
             std::make_unique<InternalBuffer>(
                     bufferId, alloc, allocSize, params);
diff --git a/codec2/vndk/bufferpool/BufferPoolClient.cpp b/codec2/vndk/bufferpool/BufferPoolClient.cpp
index 148a872..3626004 100644
--- a/codec2/vndk/bufferpool/BufferPoolClient.cpp
+++ b/codec2/vndk/bufferpool/BufferPoolClient.cpp
@@ -79,7 +79,9 @@
             int64_t timestampUs);
 
     bool postReceiveResult(
-            BufferId bufferId, TransactionId transactionId, bool result);
+            BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
+
+    void trySyncFromRemote();
 
     bool syncReleased();
 
@@ -93,7 +95,6 @@
             TransactionId transactionId, BufferId bufferId,
             native_handle_t **handle);
 
-
     struct BlockPoolDataDtor;
     struct ClientBuffer;
 
@@ -136,6 +137,11 @@
         std::list<BufferId> mReleasedIds;
         std::unique_ptr<BufferStatusChannel> mStatusChannel;
     } mReleasing;
+
+    // This lock is held during synchronization from remote side.
+    // In order to minimize remote calls and locking durtaion, this lock is held
+    // by best effort approach using try_lock().
+    std::mutex mRemoteSyncLock;
 };
 
 struct BufferPoolClient::Impl::BlockPoolDataDtor {
@@ -412,13 +418,17 @@
             mCache.mCreateCv.wait(lock);
         }
     }
+    bool needsSync = false;
     bool posted = postReceiveResult(bufferId, transactionId,
-                                      *buffer ? true : false);
+                                      *buffer ? true : false, &needsSync);
     ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
           *buffer ? "ok" : "fail", posted);
     if (mValid && mLocal && mLocalConnection) {
         mLocalConnection->cleanUp(false);
     }
+    if (needsSync && mRemoteConnection) {
+        trySyncFromRemote();
+    }
     if (*buffer) {
         if (!posted) {
             buffer->reset();
@@ -442,6 +452,7 @@
         BufferId bufferId, ConnectionId receiver,
         TransactionId *transactionId, int64_t *timestampUs) {
     bool ret = false;
+    bool needsSync = false;
     {
         std::lock_guard<std::mutex> lock(mReleasing.mLock);
         *timestampUs = getTimestampNow();
@@ -450,10 +461,14 @@
         ret =  mReleasing.mStatusChannel->postBufferStatusMessage(
                 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
                 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
+        needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
     }
     if (mValid && mLocal && mLocalConnection) {
         mLocalConnection->cleanUp(false);
     }
+    if (needsSync && mRemoteConnection) {
+        trySyncFromRemote();
+    }
     return ret;
 }
 
@@ -484,14 +499,38 @@
 }
 
 bool BufferPoolClient::Impl::postReceiveResult(
-        BufferId bufferId, TransactionId transactionId, bool result) {
+        BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
     std::lock_guard<std::mutex> lock(mReleasing.mLock);
     // TODO: retry, add timeout
-    return mReleasing.mStatusChannel->postBufferStatusMessage(
+    bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
             transactionId, bufferId,
             result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
             mConnectionId, -1, mReleasing.mReleasingIds,
             mReleasing.mReleasedIds);
+    *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
+    return ret;
+}
+
+void BufferPoolClient::Impl::trySyncFromRemote() {
+    if (mRemoteSyncLock.try_lock()) {
+        bool needsSync = false;
+        {
+            std::lock_guard<std::mutex> lock(mReleasing.mLock);
+            needsSync = mReleasing.mStatusChannel->needsSync();
+        }
+        if (needsSync) {
+            TransactionId transactionId = (mConnectionId << 32);
+            BufferId bufferId = Connection::SYNC_BUFFERID;
+            Return<void> transResult = mRemoteConnection->fetch(
+                    transactionId, bufferId,
+                    []
+                    (ResultStatus outStatus, Buffer outBuffer) {
+                        (void) outStatus;
+                        (void) outBuffer;
+                    });
+        }
+        mRemoteSyncLock.unlock();
+    }
 }
 
 // should have mCache.mLock
diff --git a/codec2/vndk/bufferpool/BufferStatus.cpp b/codec2/vndk/bufferpool/BufferStatus.cpp
index 73941f3..596cfa5 100644
--- a/codec2/vndk/bufferpool/BufferStatus.cpp
+++ b/codec2/vndk/bufferpool/BufferStatus.cpp
@@ -38,6 +38,7 @@
 }
 
 static constexpr int kNumElementsInQueue = 1024*16;
+static constexpr int kMinElementsToSyncInQueue = 128;
 
 ResultStatus BufferStatusObserver::open(
         ConnectionId id, const QueueDescriptor** fmqDescPtr) {
@@ -105,6 +106,14 @@
     return mValid;
 }
 
+bool BufferStatusChannel::needsSync() {
+    if (mValid) {
+        size_t avail = mBufferStatusQueue->availableToWrite();
+        return avail + kMinElementsToSyncInQueue < kNumElementsInQueue;
+    }
+    return false;
+}
+
 void BufferStatusChannel::postBufferRelease(
         ConnectionId connectionId,
         std::list<BufferId> &pending, std::list<BufferId> &posted) {
diff --git a/codec2/vndk/bufferpool/BufferStatus.h b/codec2/vndk/bufferpool/BufferStatus.h
index eecfee3..a18a921 100644
--- a/codec2/vndk/bufferpool/BufferStatus.h
+++ b/codec2/vndk/bufferpool/BufferStatus.h
@@ -94,6 +94,9 @@
     /** Returns whether the FMQ is connected successfully. */
     bool isValid();
 
+    /** Returns whether the FMQ needs to be synced from the buffer pool */
+    bool needsSync();
+
     /**
      * Posts a buffer release message to the buffer pool.
      *
diff --git a/codec2/vndk/bufferpool/Connection.cpp b/codec2/vndk/bufferpool/Connection.cpp
index a89b7b3..9e741e7 100644
--- a/codec2/vndk/bufferpool/Connection.cpp
+++ b/codec2/vndk/bufferpool/Connection.cpp
@@ -27,12 +27,16 @@
 Return<void> Connection::fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) {
     ResultStatus status = ResultStatus::CRITICAL_ERROR;
     if (mInitialized && mAccessor) {
-        const native_handle_t *handle = NULL;
-        status = mAccessor->fetch(
-                mConnectionId, transactionId, bufferId, &handle);
-        if (status == ResultStatus::OK) {
-            _hidl_cb(status, Buffer{bufferId, handle});
-            return Void();
+        if (bufferId != SYNC_BUFFERID) {
+            const native_handle_t *handle = NULL;
+            status = mAccessor->fetch(
+                    mConnectionId, transactionId, bufferId, &handle);
+            if (status == ResultStatus::OK) {
+                _hidl_cb(status, Buffer{bufferId, handle});
+                return Void();
+            }
+        } else {
+            mAccessor->cleanUp(false);
         }
     }
     _hidl_cb(status, Buffer{0, nullptr});
diff --git a/codec2/vndk/bufferpool/Connection.h b/codec2/vndk/bufferpool/Connection.h
index 3bb93ca..e19cb67 100644
--- a/codec2/vndk/bufferpool/Connection.h
+++ b/codec2/vndk/bufferpool/Connection.h
@@ -83,6 +83,10 @@
      */
     void initialize(const sp<Accessor> &accessor, ConnectionId connectionId);
 
+    enum : uint32_t {
+        SYNC_BUFFERID = UINT32_MAX,
+    };
+
 private:
     bool mInitialized;
     sp<Accessor> mAccessor;