bufferpool: Handle pending bufferpool messages
Ensure pending buffer messages are processed even if bufferpool process
is not doing anything.
Bug: 72651719
Change-Id: I13faad1a642413d6ade1211c9c50ad841e61fd6d
diff --git a/codec2/vndk/bufferpool/AccessorImpl.cpp b/codec2/vndk/bufferpool/AccessorImpl.cpp
index dd7eca5..06d49c1 100644
--- a/codec2/vndk/bufferpool/AccessorImpl.cpp
+++ b/codec2/vndk/bufferpool/AccessorImpl.cpp
@@ -484,6 +484,9 @@
const native_handle_t** handle) {
BufferId bufferId = mSeq++;
+ if (mSeq == Connection::SYNC_BUFFERID) {
+ mSeq = 0;
+ }
std::unique_ptr<InternalBuffer> buffer =
std::make_unique<InternalBuffer>(
bufferId, alloc, allocSize, params);
diff --git a/codec2/vndk/bufferpool/BufferPoolClient.cpp b/codec2/vndk/bufferpool/BufferPoolClient.cpp
index 148a872..3626004 100644
--- a/codec2/vndk/bufferpool/BufferPoolClient.cpp
+++ b/codec2/vndk/bufferpool/BufferPoolClient.cpp
@@ -79,7 +79,9 @@
int64_t timestampUs);
bool postReceiveResult(
- BufferId bufferId, TransactionId transactionId, bool result);
+ BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
+
+ void trySyncFromRemote();
bool syncReleased();
@@ -93,7 +95,6 @@
TransactionId transactionId, BufferId bufferId,
native_handle_t **handle);
-
struct BlockPoolDataDtor;
struct ClientBuffer;
@@ -136,6 +137,11 @@
std::list<BufferId> mReleasedIds;
std::unique_ptr<BufferStatusChannel> mStatusChannel;
} mReleasing;
+
+ // This lock is held during synchronization from remote side.
+ // In order to minimize remote calls and locking durtaion, this lock is held
+ // by best effort approach using try_lock().
+ std::mutex mRemoteSyncLock;
};
struct BufferPoolClient::Impl::BlockPoolDataDtor {
@@ -412,13 +418,17 @@
mCache.mCreateCv.wait(lock);
}
}
+ bool needsSync = false;
bool posted = postReceiveResult(bufferId, transactionId,
- *buffer ? true : false);
+ *buffer ? true : false, &needsSync);
ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
*buffer ? "ok" : "fail", posted);
if (mValid && mLocal && mLocalConnection) {
mLocalConnection->cleanUp(false);
}
+ if (needsSync && mRemoteConnection) {
+ trySyncFromRemote();
+ }
if (*buffer) {
if (!posted) {
buffer->reset();
@@ -442,6 +452,7 @@
BufferId bufferId, ConnectionId receiver,
TransactionId *transactionId, int64_t *timestampUs) {
bool ret = false;
+ bool needsSync = false;
{
std::lock_guard<std::mutex> lock(mReleasing.mLock);
*timestampUs = getTimestampNow();
@@ -450,10 +461,14 @@
ret = mReleasing.mStatusChannel->postBufferStatusMessage(
*transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
+ needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
}
if (mValid && mLocal && mLocalConnection) {
mLocalConnection->cleanUp(false);
}
+ if (needsSync && mRemoteConnection) {
+ trySyncFromRemote();
+ }
return ret;
}
@@ -484,14 +499,38 @@
}
bool BufferPoolClient::Impl::postReceiveResult(
- BufferId bufferId, TransactionId transactionId, bool result) {
+ BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
std::lock_guard<std::mutex> lock(mReleasing.mLock);
// TODO: retry, add timeout
- return mReleasing.mStatusChannel->postBufferStatusMessage(
+ bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
transactionId, bufferId,
result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
mConnectionId, -1, mReleasing.mReleasingIds,
mReleasing.mReleasedIds);
+ *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
+ return ret;
+}
+
+void BufferPoolClient::Impl::trySyncFromRemote() {
+ if (mRemoteSyncLock.try_lock()) {
+ bool needsSync = false;
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ needsSync = mReleasing.mStatusChannel->needsSync();
+ }
+ if (needsSync) {
+ TransactionId transactionId = (mConnectionId << 32);
+ BufferId bufferId = Connection::SYNC_BUFFERID;
+ Return<void> transResult = mRemoteConnection->fetch(
+ transactionId, bufferId,
+ []
+ (ResultStatus outStatus, Buffer outBuffer) {
+ (void) outStatus;
+ (void) outBuffer;
+ });
+ }
+ mRemoteSyncLock.unlock();
+ }
}
// should have mCache.mLock
diff --git a/codec2/vndk/bufferpool/BufferStatus.cpp b/codec2/vndk/bufferpool/BufferStatus.cpp
index 73941f3..596cfa5 100644
--- a/codec2/vndk/bufferpool/BufferStatus.cpp
+++ b/codec2/vndk/bufferpool/BufferStatus.cpp
@@ -38,6 +38,7 @@
}
static constexpr int kNumElementsInQueue = 1024*16;
+static constexpr int kMinElementsToSyncInQueue = 128;
ResultStatus BufferStatusObserver::open(
ConnectionId id, const QueueDescriptor** fmqDescPtr) {
@@ -105,6 +106,14 @@
return mValid;
}
+bool BufferStatusChannel::needsSync() {
+ if (mValid) {
+ size_t avail = mBufferStatusQueue->availableToWrite();
+ return avail + kMinElementsToSyncInQueue < kNumElementsInQueue;
+ }
+ return false;
+}
+
void BufferStatusChannel::postBufferRelease(
ConnectionId connectionId,
std::list<BufferId> &pending, std::list<BufferId> &posted) {
diff --git a/codec2/vndk/bufferpool/BufferStatus.h b/codec2/vndk/bufferpool/BufferStatus.h
index eecfee3..a18a921 100644
--- a/codec2/vndk/bufferpool/BufferStatus.h
+++ b/codec2/vndk/bufferpool/BufferStatus.h
@@ -94,6 +94,9 @@
/** Returns whether the FMQ is connected successfully. */
bool isValid();
+ /** Returns whether the FMQ needs to be synced from the buffer pool */
+ bool needsSync();
+
/**
* Posts a buffer release message to the buffer pool.
*
diff --git a/codec2/vndk/bufferpool/Connection.cpp b/codec2/vndk/bufferpool/Connection.cpp
index a89b7b3..9e741e7 100644
--- a/codec2/vndk/bufferpool/Connection.cpp
+++ b/codec2/vndk/bufferpool/Connection.cpp
@@ -27,12 +27,16 @@
Return<void> Connection::fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) {
ResultStatus status = ResultStatus::CRITICAL_ERROR;
if (mInitialized && mAccessor) {
- const native_handle_t *handle = NULL;
- status = mAccessor->fetch(
- mConnectionId, transactionId, bufferId, &handle);
- if (status == ResultStatus::OK) {
- _hidl_cb(status, Buffer{bufferId, handle});
- return Void();
+ if (bufferId != SYNC_BUFFERID) {
+ const native_handle_t *handle = NULL;
+ status = mAccessor->fetch(
+ mConnectionId, transactionId, bufferId, &handle);
+ if (status == ResultStatus::OK) {
+ _hidl_cb(status, Buffer{bufferId, handle});
+ return Void();
+ }
+ } else {
+ mAccessor->cleanUp(false);
}
}
_hidl_cb(status, Buffer{0, nullptr});
diff --git a/codec2/vndk/bufferpool/Connection.h b/codec2/vndk/bufferpool/Connection.h
index 3bb93ca..e19cb67 100644
--- a/codec2/vndk/bufferpool/Connection.h
+++ b/codec2/vndk/bufferpool/Connection.h
@@ -83,6 +83,10 @@
*/
void initialize(const sp<Accessor> &accessor, ConnectionId connectionId);
+ enum : uint32_t {
+ SYNC_BUFFERID = UINT32_MAX,
+ };
+
private:
bool mInitialized;
sp<Accessor> mAccessor;