Merge changes from topic "cherrypicker-L17700000962261813:N11400001392188571" into main
* changes:
Support mainline s/w codec
AIDL BufferPool implementation (HIDL -> AIDL)
AIDL bufferpool implementation (just copy from HIDL impl)
bufferpool2: Support mainline s/w codec
bufferpool2: add sync() method and etc
diff --git a/common/fmq/aidl/Android.bp b/common/fmq/aidl/Android.bp
index 40ceb32..148c63c 100644
--- a/common/fmq/aidl/Android.bp
+++ b/common/fmq/aidl/Android.bp
@@ -33,6 +33,7 @@
apex_available: [
"//apex_available:platform",
"com.android.btservices",
+ "com.android.media.swcodec",
],
min_sdk_version: "29",
},
diff --git a/media/bufferpool/aidl/Android.bp b/media/bufferpool/aidl/Android.bp
index 5ea2948..b01cdbe 100644
--- a/media/bufferpool/aidl/Android.bp
+++ b/media/bufferpool/aidl/Android.bp
@@ -40,6 +40,11 @@
},
ndk: {
enabled: true,
+ apex_available: [
+ "//apex_available:platform",
+ "com.android.media.swcodec",
+ ],
+ min_sdk_version: "29",
},
},
}
diff --git a/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IClientManager.aidl b/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IClientManager.aidl
index 54896d4..5899a40 100644
--- a/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IClientManager.aidl
+++ b/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IClientManager.aidl
@@ -34,5 +34,10 @@
package android.hardware.media.bufferpool2;
@VintfStability
interface IClientManager {
- long registerSender(in android.hardware.media.bufferpool2.IAccessor bufferPool);
+ android.hardware.media.bufferpool2.IClientManager.Registration registerSender(in android.hardware.media.bufferpool2.IAccessor bufferPool);
+ @VintfStability
+ parcelable Registration {
+ long connectionId;
+ boolean isNew = true;
+ }
}
diff --git a/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IConnection.aidl b/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IConnection.aidl
index 300fcba..844e920 100644
--- a/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IConnection.aidl
+++ b/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/IConnection.aidl
@@ -35,12 +35,13 @@
@VintfStability
interface IConnection {
android.hardware.media.bufferpool2.IConnection.FetchResult[] fetch(in android.hardware.media.bufferpool2.IConnection.FetchInfo[] fetchInfos);
+ void sync();
parcelable FetchInfo {
long transactionId;
int bufferId;
}
union FetchResult {
android.hardware.media.bufferpool2.Buffer buffer;
- android.hardware.media.bufferpool2.ResultStatus failure;
+ int failure;
}
}
diff --git a/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/ResultStatus.aidl b/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/ResultStatus.aidl
index 7370998..4bc3889 100644
--- a/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/ResultStatus.aidl
+++ b/media/bufferpool/aidl/aidl_api/android.hardware.media.bufferpool2/current/android/hardware/media/bufferpool2/ResultStatus.aidl
@@ -34,7 +34,6 @@
package android.hardware.media.bufferpool2;
@VintfStability
parcelable ResultStatus {
- int resultStatus;
const int OK = 0;
const int NO_MEMORY = 1;
const int ALREADY_EXISTS = 2;
diff --git a/media/bufferpool/aidl/android/hardware/media/bufferpool2/IClientManager.aidl b/media/bufferpool/aidl/android/hardware/media/bufferpool2/IClientManager.aidl
index bf36e25..a3054cb 100644
--- a/media/bufferpool/aidl/android/hardware/media/bufferpool2/IClientManager.aidl
+++ b/media/bufferpool/aidl/android/hardware/media/bufferpool2/IClientManager.aidl
@@ -28,6 +28,16 @@
@VintfStability
interface IClientManager {
/**
+ * Result of registerSender.
+ */
+ @VintfStability
+ parcelable Registration {
+ /** registered connection id */
+ long connectionId;
+ /** true when the connection is new */
+ boolean isNew = true;
+ }
+ /**
* Sets up a buffer receiving communication node for the specified
* buffer pool. A manager must create a IConnection to the buffer
* pool if it does not already have a connection.
@@ -39,8 +49,7 @@
* sent to that connection during transfers.
* @throws ServiceSpecificException with one of the following values:
* ResultStatus::NO_MEMORY - Memory allocation failure occurred.
- * ResultStatus::ALREADY_EXISTS - A sender was registered already.
* ResultStatus::CRITICAL_ERROR - Other errors.
*/
- long registerSender(in IAccessor bufferPool);
+ Registration registerSender(in IAccessor bufferPool);
}
diff --git a/media/bufferpool/aidl/android/hardware/media/bufferpool2/IConnection.aidl b/media/bufferpool/aidl/android/hardware/media/bufferpool2/IConnection.aidl
index d869f47..68367c7 100644
--- a/media/bufferpool/aidl/android/hardware/media/bufferpool2/IConnection.aidl
+++ b/media/bufferpool/aidl/android/hardware/media/bufferpool2/IConnection.aidl
@@ -49,7 +49,7 @@
* ResultStatus::NOT_FOUND - A buffer was not found due to invalidation.
* ResultStatus::CRITICAL_ERROR - Other errors.
*/
- ResultStatus failure;
+ int failure;
}
/**
@@ -70,4 +70,12 @@
* ResultStatus::CRITICAL_ERROR - Other errors.
*/
FetchResult[] fetch(in FetchInfo[] fetchInfos);
+
+ /**
+ * Enforce processing of unprocessed bufferpool messages.
+ *
+ * BufferPool implementation optimizes message processing by piggy-backing approach.
+ * This method can ensure pending bufferpool messages being processed timely.
+ */
+ void sync();
}
diff --git a/media/bufferpool/aidl/android/hardware/media/bufferpool2/ResultStatus.aidl b/media/bufferpool/aidl/android/hardware/media/bufferpool2/ResultStatus.aidl
index 162f9a7..003d147 100644
--- a/media/bufferpool/aidl/android/hardware/media/bufferpool2/ResultStatus.aidl
+++ b/media/bufferpool/aidl/android/hardware/media/bufferpool2/ResultStatus.aidl
@@ -23,6 +23,4 @@
const int ALREADY_EXISTS = 2;
const int NOT_FOUND = 3;
const int CRITICAL_ERROR = 4;
-
- int resultStatus;
}
diff --git a/media/bufferpool/aidl/default/Accessor.cpp b/media/bufferpool/aidl/default/Accessor.cpp
new file mode 100644
index 0000000..3d206ac
--- /dev/null
+++ b/media/bufferpool/aidl/default/Accessor.cpp
@@ -0,0 +1,509 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "AidlBufferPoolAcc"
+//#define LOG_NDEBUG 0
+
+#include <sys/types.h>
+#include <stdint.h>
+#include <time.h>
+#include <unistd.h>
+#include <utils/Log.h>
+#include <thread>
+
+#include "Accessor.h"
+#include "Connection.h"
+#include "DataHelper.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+namespace {
+ static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
+ static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
+}
+
+#ifdef __ANDROID_VNDK__
+static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
+#else
+static constexpr uint32_t kSeqIdVndkBit = 0;
+#endif
+
+static constexpr uint32_t kSeqIdMax = 0x7fffffff;
+uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;
+
+namespace {
+// anonymous namespace
+static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
+ std::make_shared<ConnectionDeathRecipient>();
+
+void serviceDied(void *cookie) {
+ if (sConnectionDeathRecipient) {
+ sConnectionDeathRecipient->onDead(cookie);
+ }
+}
+}
+
+std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
+ return sConnectionDeathRecipient;
+}
+
+ConnectionDeathRecipient::ConnectionDeathRecipient() {
+ mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
+ AIBinder_DeathRecipient_new(serviceDied));
+}
+
+void ConnectionDeathRecipient::add(
+ int64_t connectionId,
+ const std::shared_ptr<Accessor> &accessor) {
+ std::lock_guard<std::mutex> lock(mLock);
+ if (mAccessors.find(connectionId) == mAccessors.end()) {
+ mAccessors.insert(std::make_pair(connectionId, accessor));
+ }
+}
+
+void ConnectionDeathRecipient::remove(int64_t connectionId) {
+ std::lock_guard<std::mutex> lock(mLock);
+ mAccessors.erase(connectionId);
+ auto it = mConnectionToCookie.find(connectionId);
+ if (it != mConnectionToCookie.end()) {
+ void * cookie = it->second;
+ mConnectionToCookie.erase(it);
+ auto cit = mCookieToConnections.find(cookie);
+ if (cit != mCookieToConnections.end()) {
+ cit->second.erase(connectionId);
+ if (cit->second.size() == 0) {
+ mCookieToConnections.erase(cit);
+ }
+ }
+ }
+}
+
+void ConnectionDeathRecipient::addCookieToConnection(
+ void *cookie,
+ int64_t connectionId) {
+ std::lock_guard<std::mutex> lock(mLock);
+ if (mAccessors.find(connectionId) == mAccessors.end()) {
+ return;
+ }
+ mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
+ auto it = mCookieToConnections.find(cookie);
+ if (it != mCookieToConnections.end()) {
+ it->second.insert(connectionId);
+ } else {
+ mCookieToConnections.insert(std::make_pair(
+ cookie, std::set<int64_t>{connectionId}));
+ }
+}
+
+void ConnectionDeathRecipient::onDead(void *cookie) {
+ std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
+ {
+ std::lock_guard<std::mutex> lock(mLock);
+
+ auto it = mCookieToConnections.find(cookie);
+ if (it != mCookieToConnections.end()) {
+ for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
+ auto accessorIt = mAccessors.find(*conIt);
+ if (accessorIt != mAccessors.end()) {
+ connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
+ mAccessors.erase(accessorIt);
+ }
+ mConnectionToCookie.erase(*conIt);
+ }
+ mCookieToConnections.erase(it);
+ }
+ }
+
+ if (connectionsToClose.size() > 0) {
+ std::shared_ptr<Accessor> accessor;
+ for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
+ accessor = it->second.lock();
+
+ if (accessor) {
+ accessor->close(it->first);
+ ALOGD("connection %lld closed on death", (long long)it->first);
+ }
+ }
+ }
+}
+
+AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
+ return mDeathRecipient.get();
+}
+
+::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
+ std::shared_ptr<Connection> connection;
+ ConnectionId connectionId;
+ uint32_t msgId;
+ StatusDescriptor statusDesc;
+ InvalidationDescriptor invDesc;
+ BufferPoolStatus status = connect(
+ in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
+ if (status == ResultStatus::OK) {
+ _aidl_return->connection = connection;
+ _aidl_return->connectionId = connectionId;
+ _aidl_return->msgId = msgId;
+ _aidl_return->toFmqDesc = std::move(statusDesc);
+ _aidl_return->fromFmqDesc = std::move(invDesc);
+ return ::ndk::ScopedAStatus::ok();
+ }
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
+}
+
+Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
+ : mAllocator(allocator), mScheduleEvictTs(0) {}
+
+Accessor::~Accessor() {
+}
+
+bool Accessor::isValid() {
+ return mBufferPool.isValid();
+}
+
+BufferPoolStatus Accessor::flush() {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.flush(ref<Accessor>());
+ return ResultStatus::OK;
+}
+
+BufferPoolStatus Accessor::allocate(
+ ConnectionId connectionId,
+ const std::vector<uint8_t> ¶ms,
+ BufferId *bufferId, const native_handle_t** handle) {
+ std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ BufferPoolStatus status = ResultStatus::OK;
+ if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
+ lock.unlock();
+ std::shared_ptr<BufferPoolAllocation> alloc;
+ size_t allocSize;
+ status = mAllocator->allocate(params, &alloc, &allocSize);
+ lock.lock();
+ if (status == ResultStatus::OK) {
+ status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
+ }
+ ALOGV("create a buffer %d : %u %p",
+ status == ResultStatus::OK, *bufferId, *handle);
+ }
+ if (status == ResultStatus::OK) {
+ // TODO: handle ownBuffer failure
+ mBufferPool.handleOwnBuffer(connectionId, *bufferId);
+ }
+ mBufferPool.cleanUp();
+ scheduleEvictIfNeeded();
+ return status;
+}
+
+BufferPoolStatus Accessor::fetch(
+ ConnectionId connectionId, TransactionId transactionId,
+ BufferId bufferId, const native_handle_t** handle) {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ auto found = mBufferPool.mTransactions.find(transactionId);
+ if (found != mBufferPool.mTransactions.end() &&
+ contains(&mBufferPool.mPendingTransactions,
+ connectionId, transactionId)) {
+ if (found->second->mSenderValidated &&
+ found->second->mStatus == BufferStatus::TRANSFER_FROM &&
+ found->second->mBufferId == bufferId) {
+ found->second->mStatus = BufferStatus::TRANSFER_FETCH;
+ auto bufferIt = mBufferPool.mBuffers.find(bufferId);
+ if (bufferIt != mBufferPool.mBuffers.end()) {
+ mBufferPool.mStats.onBufferFetched();
+ *handle = bufferIt->second->handle();
+ return ResultStatus::OK;
+ }
+ }
+ }
+ mBufferPool.cleanUp();
+ scheduleEvictIfNeeded();
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus Accessor::connect(
+ const std::shared_ptr<IObserver> &observer, bool local,
+ std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
+ uint32_t *pMsgId,
+ StatusDescriptor* statusDescPtr,
+ InvalidationDescriptor* invDescPtr) {
+ std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
+ BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
+ {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ if (newConnection) {
+ int32_t pid = getpid();
+ ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
+ status = mBufferPool.mObserver.open(id, statusDescPtr);
+ if (status == ResultStatus::OK) {
+ newConnection->initialize(ref<Accessor>(), id);
+ *connection = newConnection;
+ *pConnectionId = id;
+ *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+ mBufferPool.mConnectionIds.insert(id);
+ mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
+ mBufferPool.mInvalidation.onConnect(id, observer);
+ if (sSeqId == kSeqIdMax) {
+ sSeqId = 0;
+ } else {
+ ++sSeqId;
+ }
+ }
+
+ }
+ mBufferPool.processStatusMessages();
+ mBufferPool.cleanUp();
+ scheduleEvictIfNeeded();
+ }
+ if (!local && status == ResultStatus::OK) {
+ std::shared_ptr<Accessor> accessor(ref<Accessor>());
+ sConnectionDeathRecipient->add(*pConnectionId, accessor);
+ }
+ return status;
+}
+
+BufferPoolStatus Accessor::close(ConnectionId connectionId) {
+ {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
+ mBufferPool.processStatusMessages();
+ mBufferPool.handleClose(connectionId);
+ mBufferPool.mObserver.close(connectionId);
+ mBufferPool.mInvalidation.onClose(connectionId);
+ // Since close# will be called after all works are finished, it is OK to
+ // evict unused buffers.
+ mBufferPool.cleanUp(true);
+ scheduleEvictIfNeeded();
+ }
+ sConnectionDeathRecipient->remove(connectionId);
+ return ResultStatus::OK;
+}
+
+void Accessor::cleanUp(bool clearCache) {
+ // transaction timeout, buffer caching TTL handling
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.cleanUp(clearCache);
+}
+
+void Accessor::handleInvalidateAck() {
+ std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
+ uint32_t invalidationId;
+ {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
+ }
+ // Do not hold lock for send invalidations
+ size_t deadClients = 0;
+ for (auto it = observers.begin(); it != observers.end(); ++it) {
+ const std::shared_ptr<IObserver> observer = it->second;
+ if (observer) {
+ ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
+ if (!status.isOk()) {
+ ++deadClients;
+ }
+ }
+ }
+ if (deadClients > 0) {
+ ALOGD("During invalidation found %zu dead clients", deadClients);
+ }
+}
+
+void Accessor::invalidatorThread(
+ std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv,
+ bool &ready) {
+ constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
+ constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
+ constexpr useconds_t MAX_SLEEP_US = 10000;
+ uint32_t numSpin = 0;
+ useconds_t sleepUs = 1;
+
+ while(true) {
+ std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (!ready) {
+ numSpin = 0;
+ sleepUs = 1;
+ cv.wait(lock);
+ }
+ copied.insert(accessors.begin(), accessors.end());
+ }
+ std::list<ConnectionId> erased;
+ for (auto it = copied.begin(); it != copied.end(); ++it) {
+ const std::shared_ptr<Accessor> acc = it->second.lock();
+ if (!acc) {
+ erased.push_back(it->first);
+ } else {
+ acc->handleInvalidateAck();
+ }
+ }
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ for (auto it = erased.begin(); it != erased.end(); ++it) {
+ accessors.erase(*it);
+ }
+ if (accessors.size() == 0) {
+ ready = false;
+ } else {
+ // N.B. Since there is not a efficient way to wait over FMQ,
+ // polling over the FMQ is the current way to prevent draining
+ // CPU.
+ lock.unlock();
+ ++numSpin;
+ if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
+ sleepUs < MAX_SLEEP_US) {
+ sleepUs *= 10;
+ }
+ if (numSpin % NUM_SPIN_TO_LOG == 0) {
+ ALOGW("invalidator thread spinning");
+ }
+ ::usleep(sleepUs);
+ }
+ }
+ }
+}
+
+Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
+ std::thread invalidator(
+ invalidatorThread,
+ std::ref(mAccessors),
+ std::ref(mMutex),
+ std::ref(mCv),
+ std::ref(mReady));
+ invalidator.detach();
+}
+
+void Accessor::AccessorInvalidator::addAccessor(
+ uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
+ bool notify = false;
+ std::unique_lock<std::mutex> lock(mMutex);
+ if (mAccessors.find(accessorId) == mAccessors.end()) {
+ if (!mReady) {
+ mReady = true;
+ notify = true;
+ }
+ mAccessors.emplace(accessorId, accessor);
+ ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
+ }
+ lock.unlock();
+ if (notify) {
+ mCv.notify_one();
+ }
+}
+
+void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ mAccessors.erase(accessorId);
+ ALOGV("buffer invalidation deleted bp:%u", accessorId);
+ if (mAccessors.size() == 0) {
+ mReady = false;
+ }
+}
+
+std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
+
+void Accessor::createInvalidator() {
+ if (!sInvalidator) {
+ sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
+ }
+}
+
+void Accessor::evictorThread(
+ std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv) {
+ std::list<const std::weak_ptr<Accessor>> evictList;
+ while (true) {
+ int expired = 0;
+ int evicted = 0;
+ {
+ nsecs_t now = systemTime();
+ std::unique_lock<std::mutex> lock(mutex);
+ while (accessors.size() == 0) {
+ cv.wait(lock);
+ }
+ auto it = accessors.begin();
+ while (it != accessors.end()) {
+ if (now > (it->second + kEvictDurationNs)) {
+ ++expired;
+ evictList.push_back(it->first);
+ it = accessors.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+ // evict idle accessors;
+ for (auto it = evictList.begin(); it != evictList.end(); ++it) {
+ const std::shared_ptr<Accessor> accessor = it->lock();
+ if (accessor) {
+ accessor->cleanUp(true);
+ ++evicted;
+ }
+ }
+ if (expired > 0) {
+ ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
+ }
+ evictList.clear();
+ ::usleep(kEvictGranularityNs / 1000);
+ }
+}
+
+Accessor::AccessorEvictor::AccessorEvictor() {
+ std::thread evictor(
+ evictorThread,
+ std::ref(mAccessors),
+ std::ref(mMutex),
+ std::ref(mCv));
+ evictor.detach();
+}
+
+void Accessor::AccessorEvictor::addAccessor(
+ const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ bool notify = mAccessors.empty();
+ auto it = mAccessors.find(accessor);
+ if (it == mAccessors.end()) {
+ mAccessors.emplace(accessor, ts);
+ } else {
+ it->second = ts;
+ }
+ if (notify) {
+ mCv.notify_one();
+ }
+}
+
+std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
+
+void Accessor::createEvictor() {
+ if (!sEvictor) {
+ sEvictor = std::make_unique<Accessor::AccessorEvictor>();
+ }
+}
+
+void Accessor::scheduleEvictIfNeeded() {
+ nsecs_t now = systemTime();
+
+ if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
+ mScheduleEvictTs = now;
+ sEvictor->addAccessor(ref<Accessor>(), now);
+ }
+}
+
+} // namespace aidl::android::hardware::media::bufferpool2::implemntation {
diff --git a/media/bufferpool/aidl/default/Accessor.h b/media/bufferpool/aidl/default/Accessor.h
new file mode 100644
index 0000000..85e2fa7
--- /dev/null
+++ b/media/bufferpool/aidl/default/Accessor.h
@@ -0,0 +1,238 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <aidl/android/hardware/media/bufferpool2/BnAccessor.h>
+#include <aidl/android/hardware/media/bufferpool2/IObserver.h>
+#include <bufferpool2/BufferPoolTypes.h>
+
+#include <memory>
+#include <map>
+#include <set>
+#include <condition_variable>
+
+#include "BufferPool.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+struct Connection;
+using ::aidl::android::hardware::media::bufferpool2::IObserver;
+using ::aidl::android::hardware::media::bufferpool2::IAccessor;
+
+/**
+ * Receives death notifications from remote connections.
+ * On death notifications, the connections are closed and used resources
+ * are released.
+ */
+struct ConnectionDeathRecipient {
+ ConnectionDeathRecipient();
+ /**
+ * Registers a newly connected connection from remote processes.
+ */
+ void add(int64_t connectionId, const std::shared_ptr<Accessor> &accessor);
+
+ /**
+ * Removes a connection.
+ */
+ void remove(int64_t connectionId);
+
+ void addCookieToConnection(void *cookie, int64_t connectionId);
+
+ void onDead(void *cookie);
+
+ AIBinder_DeathRecipient *getRecipient();
+
+private:
+ ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient;
+
+ std::mutex mLock;
+ std::map<void *, std::set<int64_t>> mCookieToConnections;
+ std::map<int64_t, void *> mConnectionToCookie;
+ std::map<int64_t, const std::weak_ptr<Accessor>> mAccessors;
+};
+
+/**
+ * A buffer pool accessor which enables a buffer pool to communicate with buffer
+ * pool clients. 1:1 correspondense holds between a buffer pool and an accessor.
+ */
+struct Accessor : public BnAccessor {
+ // Methods from ::aidl::android::hardware::media::bufferpool2::IAccessor.
+ ::ndk::ScopedAStatus connect(const std::shared_ptr<IObserver>& in_observer,
+ IAccessor::ConnectionInfo* _aidl_return) override;
+
+ /**
+ * Creates a buffer pool accessor which uses the specified allocator.
+ *
+ * @param allocator buffer allocator.
+ */
+ explicit Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator);
+
+ /** Destructs a buffer pool accessor. */
+ ~Accessor();
+
+ /** Returns whether the accessor is valid. */
+ bool isValid();
+
+ /** Invalidates all buffers which are owned by bufferpool */
+ BufferPoolStatus flush();
+
+ /** Allocates a buffer from a buffer pool.
+ *
+ * @param connectionId the connection id of the client.
+ * @param params the allocation parameters.
+ * @param bufferId the id of the allocated buffer.
+ * @param handle the native handle of the allocated buffer.
+ *
+ * @return OK when a buffer is successfully allocated.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus allocate(
+ ConnectionId connectionId,
+ const std::vector<uint8_t>& params,
+ BufferId *bufferId,
+ const native_handle_t** handle);
+
+ /**
+ * Fetches a buffer for the specified transaction.
+ *
+ * @param connectionId the id of receiving connection(client).
+ * @param transactionId the id of the transfer transaction.
+ * @param bufferId the id of the buffer to be fetched.
+ * @param handle the native handle of the fetched buffer.
+ *
+ * @return OK when a buffer is successfully fetched.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus fetch(
+ ConnectionId connectionId,
+ TransactionId transactionId,
+ BufferId bufferId,
+ const native_handle_t** handle);
+
+ /**
+ * Makes a connection to the buffer pool. The buffer pool client uses the
+ * created connection in order to communicate with the buffer pool. An
+ * FMQ for buffer status message is also created for the client.
+ *
+ * @param observer client observer for buffer invalidation
+ * @param local true when a connection request comes from local process,
+ * false otherwise.
+ * @param connection created connection
+ * @param pConnectionId the id of the created connection
+ * @param pMsgId the id of the recent buffer pool message
+ * @param statusDescPtr FMQ descriptor for shared buffer status message
+ * queue between a buffer pool and the client.
+ * @param invDescPtr FMQ descriptor for buffer invalidation message
+ * queue from a buffer pool to the client.
+ *
+ * @return OK when a connection is successfully made.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus connect(
+ const std::shared_ptr<IObserver>& observer,
+ bool local,
+ std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
+ uint32_t *pMsgId,
+ StatusDescriptor* statusDescPtr,
+ InvalidationDescriptor* invDescPtr);
+
+ /**
+ * Closes the specified connection to the client.
+ *
+ * @param connectionId the id of the connection.
+ *
+ * @return OK when the connection is closed.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus close(ConnectionId connectionId);
+
+ /**
+ * Processes pending buffer status messages and performs periodic cache
+ * cleaning.
+ *
+ * @param clearCache if clearCache is true, it frees all buffers waiting
+ * to be recycled.
+ */
+ void cleanUp(bool clearCache);
+
+ /**
+ * ACK on buffer invalidation messages
+ */
+ void handleInvalidateAck();
+
+ /**
+ * Gets a death_recipient for remote connection death.
+ */
+ static std::shared_ptr<ConnectionDeathRecipient> getConnectionDeathRecipient();
+
+ static void createInvalidator();
+
+ static void createEvictor();
+
+private:
+ // ConnectionId = pid : (timestamp_created + seqId)
+ // in order to guarantee uniqueness for each connection
+ static uint32_t sSeqId;
+
+ const std::shared_ptr<BufferPoolAllocator> mAllocator;
+ nsecs_t mScheduleEvictTs;
+ BufferPool mBufferPool;
+
+ struct AccessorInvalidator {
+ std::map<uint32_t, const std::weak_ptr<Accessor>> mAccessors;
+ std::mutex mMutex;
+ std::condition_variable mCv;
+ bool mReady;
+
+ AccessorInvalidator();
+ void addAccessor(uint32_t accessorId, const std::weak_ptr<Accessor> &accessor);
+ void delAccessor(uint32_t accessorId);
+ };
+
+ static std::unique_ptr<AccessorInvalidator> sInvalidator;
+
+ static void invalidatorThread(
+ std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv,
+ bool &ready);
+
+ struct AccessorEvictor {
+ std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> mAccessors;
+ std::mutex mMutex;
+ std::condition_variable mCv;
+
+ AccessorEvictor();
+ void addAccessor(const std::weak_ptr<Accessor> &accessor, nsecs_t ts);
+ };
+
+ static std::unique_ptr<AccessorEvictor> sEvictor;
+
+ static void evictorThread(
+ std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv);
+
+ void scheduleEvictIfNeeded();
+
+ friend struct BufferPool;
+};
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/Android.bp b/media/bufferpool/aidl/default/Android.bp
new file mode 100644
index 0000000..11a6163
--- /dev/null
+++ b/media/bufferpool/aidl/default/Android.bp
@@ -0,0 +1,50 @@
+package {
+ // See: http://go/android-license-faq
+ // A large-scale-change added 'default_applicable_licenses' to import
+ // all of the 'license_kinds' from "frameworks_av_license"
+ // to get the below license kinds:
+ // SPDX-license-identifier-Apache-2.0
+ default_applicable_licenses: ["hardware_interfaces_license"],
+}
+
+cc_library {
+ name: "libstagefright_aidl_bufferpool2",
+ vendor_available: true,
+ min_sdk_version: "29",
+ apex_available: [
+ "//apex_available:platform",
+ "com.android.media.swcodec",
+ "test_com.android.media.swcodec",
+ ],
+ srcs: [
+ "Accessor.cpp",
+ "BufferPool.cpp",
+ "BufferPoolClient.cpp",
+ "BufferStatus.cpp",
+ "ClientManager.cpp",
+ "Connection.cpp",
+ "Observer.cpp",
+ ],
+ export_include_dirs: [
+ "include",
+ ],
+ shared_libs: [
+ "libbinder_ndk",
+ "libcutils",
+ "libfmq",
+ "liblog",
+ "libutils",
+ "android.hardware.media.bufferpool2-V1-ndk",
+ ],
+ static_libs: [
+ "libaidlcommonsupport",
+ ],
+ export_shared_lib_headers: [
+ "libfmq",
+ "android.hardware.media.bufferpool2-V1-ndk",
+ ],
+ double_loadable: true,
+ cflags: [
+ "-DBUFFERPOOL_CLONE_HANDLES",
+ ],
+}
diff --git a/media/bufferpool/aidl/default/BufferPool.cpp b/media/bufferpool/aidl/default/BufferPool.cpp
new file mode 100644
index 0000000..ed4574f
--- /dev/null
+++ b/media/bufferpool/aidl/default/BufferPool.cpp
@@ -0,0 +1,540 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "AidlBufferPool"
+//#define LOG_NDEBUG 0
+
+#include <sys/types.h>
+#include <stdint.h>
+#include <time.h>
+#include <unistd.h>
+#include <utils/Log.h>
+#include <thread>
+#include "Accessor.h"
+#include "BufferPool.h"
+#include "Connection.h"
+#include "DataHelper.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+namespace {
+ static constexpr int64_t kCleanUpDurationMs = 500; // 0.5 sec
+ static constexpr int64_t kLogDurationMs = 5000; // 5 secs
+
+ static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
+ static constexpr size_t kMinBufferCountForEviction = 25;
+ static constexpr size_t kMaxUnusedBufferCount = 64;
+ static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
+}
+
+BufferPool::BufferPool()
+ : mTimestampMs(::android::elapsedRealtime()),
+ mLastCleanUpMs(mTimestampMs),
+ mLastLogMs(mTimestampMs),
+ mSeq(0),
+ mStartSeq(0) {
+ mValid = mInvalidationChannel.isValid();
+}
+
+
+// Statistics helper
+template<typename T, typename S>
+int percentage(T base, S total) {
+ return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
+}
+
+std::atomic<std::uint32_t> BufferPool::Invalidation::sInvSeqId(0);
+
+BufferPool::~BufferPool() {
+ std::lock_guard<std::mutex> lock(mMutex);
+ ALOGD("Destruction - bufferpool2 %p "
+ "cached: %zu/%zuM, %zu/%d%% in use; "
+ "allocs: %zu, %d%% recycled; "
+ "transfers: %zu, %d%% unfetched",
+ this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
+ mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
+ mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
+ mStats.mTotalTransfers,
+ percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
+}
+
+void BufferPool::Invalidation::onConnect(
+ ConnectionId conId, const std::shared_ptr<IObserver>& observer) {
+ mAcks[conId] = mInvalidationId; // starts from current invalidationId
+ mObservers.insert(std::make_pair(conId, observer));
+}
+
+void BufferPool::Invalidation::onClose(ConnectionId conId) {
+ mAcks.erase(conId);
+ mObservers.erase(conId);
+}
+
+void BufferPool::Invalidation::onAck(
+ ConnectionId conId,
+ uint32_t msgId) {
+ auto it = mAcks.find(conId);
+ if (it == mAcks.end()) {
+ ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
+ return;
+ }
+ if (isMessageLater(msgId, it->second)) {
+ mAcks[conId] = msgId;
+ }
+}
+
+void BufferPool::Invalidation::onBufferInvalidated(
+ BufferId bufferId,
+ BufferInvalidationChannel &channel) {
+ for (auto it = mPendings.begin(); it != mPendings.end();) {
+ if (it->isInvalidated(bufferId)) {
+ uint32_t msgId = 0;
+ if (it->mNeedsAck) {
+ msgId = ++mInvalidationId;
+ if (msgId == 0) {
+ // wrap happens
+ msgId = ++mInvalidationId;
+ }
+ }
+ channel.postInvalidation(msgId, it->mFrom, it->mTo);
+ it = mPendings.erase(it);
+ continue;
+ }
+ ++it;
+ }
+}
+
+void BufferPool::Invalidation::onInvalidationRequest(
+ bool needsAck,
+ uint32_t from,
+ uint32_t to,
+ size_t left,
+ BufferInvalidationChannel &channel,
+ const std::shared_ptr<Accessor> &impl) {
+ uint32_t msgId = 0;
+ if (needsAck) {
+ msgId = ++mInvalidationId;
+ if (msgId == 0) {
+ // wrap happens
+ msgId = ++mInvalidationId;
+ }
+ }
+ ALOGV("bufferpool2 invalidation requested and queued");
+ if (left == 0) {
+ channel.postInvalidation(msgId, from, to);
+ } else {
+ ALOGV("bufferpoo2 invalidation requested and pending");
+ Pending pending(needsAck, from, to, left, impl);
+ mPendings.push_back(pending);
+ }
+ Accessor::sInvalidator->addAccessor(mId, impl);
+}
+
+void BufferPool::Invalidation::onHandleAck(
+ std::map<ConnectionId, const std::shared_ptr<IObserver>> *observers,
+ uint32_t *invalidationId) {
+ if (mInvalidationId != 0) {
+ *invalidationId = mInvalidationId;
+ std::set<int> deads;
+ for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
+ if (it->second != mInvalidationId) {
+ const std::shared_ptr<IObserver> observer = mObservers[it->first];
+ if (observer) {
+ observers->emplace(it->first, observer);
+ ALOGV("connection %lld will call observer (%u: %u)",
+ (long long)it->first, it->second, mInvalidationId);
+ // N.B: onMessage will be called later. ignore possibility of
+ // onMessage# oneway call being lost.
+ it->second = mInvalidationId;
+ } else {
+ ALOGV("bufferpool2 observer died %lld", (long long)it->first);
+ deads.insert(it->first);
+ }
+ }
+ }
+ if (deads.size() > 0) {
+ for (auto it = deads.begin(); it != deads.end(); ++it) {
+ onClose(*it);
+ }
+ }
+ }
+ if (mPendings.size() == 0) {
+ // All invalidation Ids are synced and no more pending invalidations.
+ Accessor::sInvalidator->delAccessor(mId);
+ }
+}
+
+bool BufferPool::handleOwnBuffer(
+ ConnectionId connectionId, BufferId bufferId) {
+
+ bool added = insert(&mUsingBuffers, connectionId, bufferId);
+ if (added) {
+ auto iter = mBuffers.find(bufferId);
+ iter->second->mOwnerCount++;
+ }
+ insert(&mUsingConnections, bufferId, connectionId);
+ return added;
+}
+
+bool BufferPool::handleReleaseBuffer(
+ ConnectionId connectionId, BufferId bufferId) {
+ bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
+ if (deleted) {
+ auto iter = mBuffers.find(bufferId);
+ iter->second->mOwnerCount--;
+ if (iter->second->mOwnerCount == 0 &&
+ iter->second->mTransactionCount == 0) {
+ if (!iter->second->mInvalidated) {
+ mStats.onBufferUnused(iter->second->mAllocSize);
+ mFreeBuffers.insert(bufferId);
+ } else {
+ mStats.onBufferUnused(iter->second->mAllocSize);
+ mStats.onBufferEvicted(iter->second->mAllocSize);
+ mBuffers.erase(iter);
+ mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+ }
+ }
+ }
+ erase(&mUsingConnections, bufferId, connectionId);
+ ALOGV("release buffer %u : %d", bufferId, deleted);
+ return deleted;
+}
+
+bool BufferPool::handleTransferTo(const BufferStatusMessage &message) {
+ auto completed = mCompletedTransactions.find(
+ message.transactionId);
+ if (completed != mCompletedTransactions.end()) {
+ // already completed
+ mCompletedTransactions.erase(completed);
+ return true;
+ }
+ // the buffer should exist and be owned.
+ auto bufferIter = mBuffers.find(message.bufferId);
+ if (bufferIter == mBuffers.end() ||
+ !contains(&mUsingBuffers, message.connectionId, FromAidl(message.bufferId))) {
+ return false;
+ }
+ auto found = mTransactions.find(message.transactionId);
+ if (found != mTransactions.end()) {
+ // transfer_from was received earlier.
+ found->second->mSender = message.connectionId;
+ found->second->mSenderValidated = true;
+ return true;
+ }
+ if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
+ // N.B: it could be fake or receive connection already closed.
+ ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
+ this, (long long)message.targetConnectionId);
+ return false;
+ }
+ mStats.onBufferSent();
+ mTransactions.insert(std::make_pair(
+ message.transactionId,
+ std::make_unique<TransactionStatus>(message, mTimestampMs)));
+ insert(&mPendingTransactions, message.targetConnectionId,
+ FromAidl(message.transactionId));
+ bufferIter->second->mTransactionCount++;
+ return true;
+}
+
+bool BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
+ auto found = mTransactions.find(message.transactionId);
+ if (found == mTransactions.end()) {
+ // TODO: is it feasible to check ownership here?
+ mStats.onBufferSent();
+ mTransactions.insert(std::make_pair(
+ message.transactionId,
+ std::make_unique<TransactionStatus>(message, mTimestampMs)));
+ insert(&mPendingTransactions, message.connectionId,
+ FromAidl(message.transactionId));
+ auto bufferIter = mBuffers.find(message.bufferId);
+ bufferIter->second->mTransactionCount++;
+ } else {
+ if (message.connectionId == found->second->mReceiver) {
+ found->second->mStatus = BufferStatus::TRANSFER_FROM;
+ }
+ }
+ return true;
+}
+
+bool BufferPool::handleTransferResult(const BufferStatusMessage &message) {
+ auto found = mTransactions.find(message.transactionId);
+ if (found != mTransactions.end()) {
+ bool deleted = erase(&mPendingTransactions, message.connectionId,
+ FromAidl(message.transactionId));
+ if (deleted) {
+ if (!found->second->mSenderValidated) {
+ mCompletedTransactions.insert(message.transactionId);
+ }
+ auto bufferIter = mBuffers.find(message.bufferId);
+ if (message.status == BufferStatus::TRANSFER_OK) {
+ handleOwnBuffer(message.connectionId, message.bufferId);
+ }
+ bufferIter->second->mTransactionCount--;
+ if (bufferIter->second->mOwnerCount == 0
+ && bufferIter->second->mTransactionCount == 0) {
+ if (!bufferIter->second->mInvalidated) {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mFreeBuffers.insert(message.bufferId);
+ } else {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+ mBuffers.erase(bufferIter);
+ mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
+ }
+ }
+ mTransactions.erase(found);
+ }
+ ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
+ message.bufferId, deleted);
+ return deleted;
+ }
+ ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
+ message.bufferId);
+ return false;
+}
+
+void BufferPool::processStatusMessages() {
+ std::vector<BufferStatusMessage> messages;
+ mObserver.getBufferStatusChanges(messages);
+ mTimestampMs = ::android::elapsedRealtime();
+ for (BufferStatusMessage& message: messages) {
+ bool ret = false;
+ switch (message.status) {
+ case BufferStatus::NOT_USED:
+ ret = handleReleaseBuffer(
+ message.connectionId, message.bufferId);
+ break;
+ case BufferStatus::USED:
+ // not happening
+ break;
+ case BufferStatus::TRANSFER_TO:
+ ret = handleTransferTo(message);
+ break;
+ case BufferStatus::TRANSFER_FROM:
+ ret = handleTransferFrom(message);
+ break;
+ case BufferStatus::TRANSFER_TIMEOUT:
+ // TODO
+ break;
+ case BufferStatus::TRANSFER_LOST:
+ // TODO
+ break;
+ case BufferStatus::TRANSFER_FETCH:
+ // not happening
+ break;
+ case BufferStatus::TRANSFER_OK:
+ case BufferStatus::TRANSFER_ERROR:
+ ret = handleTransferResult(message);
+ break;
+ case BufferStatus::INVALIDATION_ACK:
+ mInvalidation.onAck(message.connectionId, message.bufferId);
+ ret = true;
+ break;
+ }
+ if (ret == false) {
+ ALOGW("buffer status message processing failure - message : %d connection : %lld",
+ message.status, (long long)message.connectionId);
+ }
+ }
+ messages.clear();
+}
+
+bool BufferPool::handleClose(ConnectionId connectionId) {
+ // Cleaning buffers
+ auto buffers = mUsingBuffers.find(connectionId);
+ if (buffers != mUsingBuffers.end()) {
+ for (const BufferId& bufferId : buffers->second) {
+ bool deleted = erase(&mUsingConnections, bufferId, connectionId);
+ if (deleted) {
+ auto bufferIter = mBuffers.find(bufferId);
+ bufferIter->second->mOwnerCount--;
+ if (bufferIter->second->mOwnerCount == 0 &&
+ bufferIter->second->mTransactionCount == 0) {
+ // TODO: handle freebuffer insert fail
+ if (!bufferIter->second->mInvalidated) {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mFreeBuffers.insert(bufferId);
+ } else {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+ mBuffers.erase(bufferIter);
+ mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+ }
+ }
+ }
+ }
+ mUsingBuffers.erase(buffers);
+ }
+
+ // Cleaning transactions
+ auto pending = mPendingTransactions.find(connectionId);
+ if (pending != mPendingTransactions.end()) {
+ for (const TransactionId& transactionId : pending->second) {
+ auto iter = mTransactions.find(transactionId);
+ if (iter != mTransactions.end()) {
+ if (!iter->second->mSenderValidated) {
+ mCompletedTransactions.insert(transactionId);
+ }
+ BufferId bufferId = iter->second->mBufferId;
+ auto bufferIter = mBuffers.find(bufferId);
+ bufferIter->second->mTransactionCount--;
+ if (bufferIter->second->mOwnerCount == 0 &&
+ bufferIter->second->mTransactionCount == 0) {
+ // TODO: handle freebuffer insert fail
+ if (!bufferIter->second->mInvalidated) {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mFreeBuffers.insert(bufferId);
+ } else {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+ mBuffers.erase(bufferIter);
+ mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+ }
+ }
+ mTransactions.erase(iter);
+ }
+ }
+ }
+ mConnectionIds.erase(connectionId);
+ return true;
+}
+
+bool BufferPool::getFreeBuffer(
+ const std::shared_ptr<BufferPoolAllocator> &allocator,
+ const std::vector<uint8_t> ¶ms, BufferId *pId,
+ const native_handle_t** handle) {
+ auto bufferIt = mFreeBuffers.begin();
+ for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
+ BufferId bufferId = *bufferIt;
+ if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
+ break;
+ }
+ }
+ if (bufferIt != mFreeBuffers.end()) {
+ BufferId id = *bufferIt;
+ mFreeBuffers.erase(bufferIt);
+ mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
+ *handle = mBuffers[id]->handle();
+ *pId = id;
+ ALOGV("recycle a buffer %u %p", id, *handle);
+ return true;
+ }
+ return false;
+}
+
+BufferPoolStatus BufferPool::addNewBuffer(
+ const std::shared_ptr<BufferPoolAllocation> &alloc,
+ const size_t allocSize,
+ const std::vector<uint8_t> ¶ms,
+ BufferId *pId,
+ const native_handle_t** handle) {
+
+ BufferId bufferId = mSeq++;
+ if (mSeq == Connection::SYNC_BUFFERID) {
+ mSeq = 0;
+ }
+ std::unique_ptr<InternalBuffer> buffer =
+ std::make_unique<InternalBuffer>(
+ bufferId, alloc, allocSize, params);
+ if (buffer) {
+ auto res = mBuffers.insert(std::make_pair(
+ bufferId, std::move(buffer)));
+ if (res.second) {
+ mStats.onBufferAllocated(allocSize);
+ *handle = alloc->handle();
+ *pId = bufferId;
+ return ResultStatus::OK;
+ }
+ }
+ return ResultStatus::NO_MEMORY;
+}
+
+void BufferPool::cleanUp(bool clearCache) {
+ if (clearCache || mTimestampMs > mLastCleanUpMs + kCleanUpDurationMs ||
+ mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
+ mLastCleanUpMs = mTimestampMs;
+ if (mTimestampMs > mLastLogMs + kLogDurationMs ||
+ mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
+ mLastLogMs = mTimestampMs;
+ ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
+ "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
+ "%zu/%zu (fetch/transfer)",
+ this, mStats.mBuffersCached, mStats.mSizeCached,
+ mStats.mBuffersInUse, mStats.mSizeInUse,
+ mStats.mTotalRecycles, mStats.mTotalAllocations,
+ mStats.mTotalFetches, mStats.mTotalTransfers);
+ }
+ for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
+ if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
+ (mStats.mSizeCached < kMinAllocBytesForEviction ||
+ mBuffers.size() < kMinBufferCountForEviction)) {
+ break;
+ }
+ auto it = mBuffers.find(*freeIt);
+ if (it != mBuffers.end() &&
+ it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
+ mStats.onBufferEvicted(it->second->mAllocSize);
+ mBuffers.erase(it);
+ freeIt = mFreeBuffers.erase(freeIt);
+ } else {
+ ++freeIt;
+ ALOGW("bufferpool2 inconsistent!");
+ }
+ }
+ }
+}
+
+void BufferPool::invalidate(
+ bool needsAck, BufferId from, BufferId to,
+ const std::shared_ptr<Accessor> &impl) {
+ for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
+ if (isBufferInRange(from, to, *freeIt)) {
+ auto it = mBuffers.find(*freeIt);
+ if (it != mBuffers.end() &&
+ it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
+ mStats.onBufferEvicted(it->second->mAllocSize);
+ mBuffers.erase(it);
+ freeIt = mFreeBuffers.erase(freeIt);
+ continue;
+ } else {
+ ALOGW("bufferpool2 inconsistent!");
+ }
+ }
+ ++freeIt;
+ }
+
+ size_t left = 0;
+ for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+ if (isBufferInRange(from, to, it->first)) {
+ it->second->invalidate();
+ ++left;
+ }
+ }
+ mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
+}
+
+void BufferPool::flush(const std::shared_ptr<Accessor> &impl) {
+ BufferId from = mStartSeq;
+ BufferId to = mSeq;
+ mStartSeq = mSeq;
+ // TODO: needsAck params
+ ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
+ if (from != to) {
+ invalidate(true, from, to, impl);
+ }
+}
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/BufferPool.h b/media/bufferpool/aidl/default/BufferPool.h
new file mode 100644
index 0000000..1529a53
--- /dev/null
+++ b/media/bufferpool/aidl/default/BufferPool.h
@@ -0,0 +1,337 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <vector>
+#include <mutex>
+#include <condition_variable>
+#include <utils/Timers.h>
+
+#include "BufferStatus.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using BufferStatus = aidl::android::hardware::media::bufferpool2::BufferStatus;
+using BufferStatusMessage = aidl::android::hardware::media::bufferpool2::BufferStatusMessage;
+
+struct Accessor;
+struct InternalBuffer;
+struct TransactionStatus;
+
+/**
+ * Buffer pool implementation.
+ *
+ * Handles buffer status messages. Handles buffer allocation/recycling.
+ * Handles buffer transfer between buffer pool clients.
+ */
+struct BufferPool {
+private:
+ std::mutex mMutex;
+ int64_t mTimestampMs;
+ int64_t mLastCleanUpMs;
+ int64_t mLastLogMs;
+ BufferId mSeq;
+ BufferId mStartSeq;
+ bool mValid;
+ BufferStatusObserver mObserver;
+ BufferInvalidationChannel mInvalidationChannel;
+
+ std::map<ConnectionId, std::set<BufferId>> mUsingBuffers;
+ std::map<BufferId, std::set<ConnectionId>> mUsingConnections;
+
+ std::map<ConnectionId, std::set<TransactionId>> mPendingTransactions;
+ // Transactions completed before TRANSFER_TO message arrival.
+ // Fetch does not occur for the transactions.
+ // Only transaction id is kept for the transactions in short duration.
+ std::set<TransactionId> mCompletedTransactions;
+ // Currently active(pending) transations' status & information.
+ std::map<TransactionId, std::unique_ptr<TransactionStatus>>
+ mTransactions;
+
+ std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
+ std::set<BufferId> mFreeBuffers;
+ std::set<ConnectionId> mConnectionIds;
+
+ struct Invalidation {
+ static std::atomic<std::uint32_t> sInvSeqId;
+
+ struct Pending {
+ bool mNeedsAck;
+ uint32_t mFrom;
+ uint32_t mTo;
+ size_t mLeft;
+ const std::weak_ptr<Accessor> mImpl;
+ Pending(bool needsAck, uint32_t from, uint32_t to, size_t left,
+ const std::shared_ptr<Accessor> &impl)
+ : mNeedsAck(needsAck),
+ mFrom(from),
+ mTo(to),
+ mLeft(left),
+ mImpl(impl)
+ {}
+
+ bool isInvalidated(uint32_t bufferId) {
+ return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0;
+ }
+ };
+
+ std::list<Pending> mPendings;
+ std::map<ConnectionId, uint32_t> mAcks;
+ std::map<ConnectionId, const std::shared_ptr<IObserver>> mObservers;
+ uint32_t mInvalidationId;
+ uint32_t mId;
+
+ Invalidation() : mInvalidationId(0), mId(sInvSeqId.fetch_add(1)) {}
+
+ void onConnect(ConnectionId conId, const std::shared_ptr<IObserver> &observer);
+
+ void onClose(ConnectionId conId);
+
+ void onAck(ConnectionId conId, uint32_t msgId);
+
+ void onBufferInvalidated(
+ BufferId bufferId,
+ BufferInvalidationChannel &channel);
+
+ void onInvalidationRequest(
+ bool needsAck, uint32_t from, uint32_t to, size_t left,
+ BufferInvalidationChannel &channel,
+ const std::shared_ptr<Accessor> &impl);
+
+ void onHandleAck(
+ std::map<ConnectionId, const std::shared_ptr<IObserver>> *observers,
+ uint32_t *invalidationId);
+ } mInvalidation;
+ /// Buffer pool statistics which tracks allocation and transfer statistics.
+ struct Stats {
+ /// Total size of allocations which are used or available to use.
+ /// (bytes or pixels)
+ size_t mSizeCached;
+ /// # of cached buffers which are used or available to use.
+ size_t mBuffersCached;
+ /// Total size of allocations which are currently used. (bytes or pixels)
+ size_t mSizeInUse;
+ /// # of currently used buffers
+ size_t mBuffersInUse;
+
+ /// # of allocations called on bufferpool. (# of fetched from BlockPool)
+ size_t mTotalAllocations;
+ /// # of allocations that were served from the cache.
+ /// (# of allocator alloc prevented)
+ size_t mTotalRecycles;
+ /// # of buffer transfers initiated.
+ size_t mTotalTransfers;
+ /// # of transfers that had to be fetched.
+ size_t mTotalFetches;
+
+ Stats()
+ : mSizeCached(0), mBuffersCached(0), mSizeInUse(0), mBuffersInUse(0),
+ mTotalAllocations(0), mTotalRecycles(0), mTotalTransfers(0), mTotalFetches(0) {}
+
+ /// # of currently unused buffers
+ size_t buffersNotInUse() const {
+ ALOG_ASSERT(mBuffersCached >= mBuffersInUse);
+ return mBuffersCached - mBuffersInUse;
+ }
+
+ /// A new buffer is allocated on an allocation request.
+ void onBufferAllocated(size_t allocSize) {
+ mSizeCached += allocSize;
+ mBuffersCached++;
+
+ mSizeInUse += allocSize;
+ mBuffersInUse++;
+
+ mTotalAllocations++;
+ }
+
+ /// A buffer is evicted and destroyed.
+ void onBufferEvicted(size_t allocSize) {
+ mSizeCached -= allocSize;
+ mBuffersCached--;
+ }
+
+ /// A buffer is recycled on an allocation request.
+ void onBufferRecycled(size_t allocSize) {
+ mSizeInUse += allocSize;
+ mBuffersInUse++;
+
+ mTotalAllocations++;
+ mTotalRecycles++;
+ }
+
+ /// A buffer is available to be recycled.
+ void onBufferUnused(size_t allocSize) {
+ mSizeInUse -= allocSize;
+ mBuffersInUse--;
+ }
+
+ /// A buffer transfer is initiated.
+ void onBufferSent() {
+ mTotalTransfers++;
+ }
+
+ /// A buffer fetch is invoked by a buffer transfer.
+ void onBufferFetched() {
+ mTotalFetches++;
+ }
+ } mStats;
+
+ bool isValid() {
+ return mValid;
+ }
+
+ void invalidate(bool needsAck, BufferId from, BufferId to,
+ const std::shared_ptr<Accessor> &impl);
+
+ static void createInvalidator();
+
+public:
+ /** Creates a buffer pool. */
+ BufferPool();
+
+ /** Destroys a buffer pool. */
+ ~BufferPool();
+
+ /**
+ * Processes all pending buffer status messages, and returns the result.
+ * Each status message is handled by methods with 'handle' prefix.
+ */
+ void processStatusMessages();
+
+ /**
+ * Handles a buffer being owned by a connection.
+ *
+ * @param connectionId the id of the buffer owning connection.
+ * @param bufferId the id of the buffer.
+ *
+ * @return {@code true} when the buffer is owned,
+ * {@code false} otherwise.
+ */
+ bool handleOwnBuffer(ConnectionId connectionId, BufferId bufferId);
+
+ /**
+ * Handles a buffer being released by a connection.
+ *
+ * @param connectionId the id of the buffer owning connection.
+ * @param bufferId the id of the buffer.
+ *
+ * @return {@code true} when the buffer ownership is released,
+ * {@code false} otherwise.
+ */
+ bool handleReleaseBuffer(ConnectionId connectionId, BufferId bufferId);
+
+ /**
+ * Handles a transfer transaction start message from the sender.
+ *
+ * @param message a buffer status message for the transaction.
+ *
+ * @result {@code true} when transfer_to message is acknowledged,
+ * {@code false} otherwise.
+ */
+ bool handleTransferTo(const BufferStatusMessage &message);
+
+ /**
+ * Handles a transfer transaction being acked by the receiver.
+ *
+ * @param message a buffer status message for the transaction.
+ *
+ * @result {@code true} when transfer_from message is acknowledged,
+ * {@code false} otherwise.
+ */
+ bool handleTransferFrom(const BufferStatusMessage &message);
+
+ /**
+ * Handles a transfer transaction result message from the receiver.
+ *
+ * @param message a buffer status message for the transaction.
+ *
+ * @result {@code true} when the existing transaction is finished,
+ * {@code false} otherwise.
+ */
+ bool handleTransferResult(const BufferStatusMessage &message);
+
+ /**
+ * Handles a connection being closed, and returns the result. All the
+ * buffers and transactions owned by the connection will be cleaned up.
+ * The related FMQ will be cleaned up too.
+ *
+ * @param connectionId the id of the connection.
+ *
+ * @result {@code true} when the connection existed,
+ * {@code false} otherwise.
+ */
+ bool handleClose(ConnectionId connectionId);
+
+ /**
+ * Recycles a existing free buffer if it is possible.
+ *
+ * @param allocator the buffer allocator
+ * @param params the allocation parameters.
+ * @param pId the id of the recycled buffer.
+ * @param handle the native handle of the recycled buffer.
+ *
+ * @return {@code true} when a buffer is recycled, {@code false}
+ * otherwise.
+ */
+ bool getFreeBuffer(
+ const std::shared_ptr<BufferPoolAllocator> &allocator,
+ const std::vector<uint8_t> ¶ms,
+ BufferId *pId, const native_handle_t **handle);
+
+ /**
+ * Adds a newly allocated buffer to bufferpool.
+ *
+ * @param alloc the newly allocated buffer.
+ * @param allocSize the size of the newly allocated buffer.
+ * @param params the allocation parameters.
+ * @param pId the buffer id for the newly allocated buffer.
+ * @param handle the native handle for the newly allocated buffer.
+ *
+ * @return OK when an allocation is successfully allocated.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus addNewBuffer(
+ const std::shared_ptr<BufferPoolAllocation> &alloc,
+ const size_t allocSize,
+ const std::vector<uint8_t> ¶ms,
+ BufferId *pId,
+ const native_handle_t **handle);
+
+ /**
+ * Processes pending buffer status messages and performs periodic cache
+ * cleaning.
+ *
+ * @param clearCache if clearCache is true, it frees all buffers
+ * waiting to be recycled.
+ */
+ void cleanUp(bool clearCache = false);
+
+ /**
+ * Processes pending buffer status messages and invalidate all current
+ * free buffers. Active buffers are invalidated after being inactive.
+ */
+ void flush(const std::shared_ptr<Accessor> &impl);
+
+ friend struct Accessor;
+};
+
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/BufferPoolClient.cpp b/media/bufferpool/aidl/default/BufferPoolClient.cpp
new file mode 100644
index 0000000..e9777d8
--- /dev/null
+++ b/media/bufferpool/aidl/default/BufferPoolClient.cpp
@@ -0,0 +1,858 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "AidlBufferPoolCli"
+//#define LOG_NDEBUG 0
+
+#include <thread>
+#include <aidlcommonsupport/NativeHandle.h>
+#include <utils/Log.h>
+#include "BufferPoolClient.h"
+#include "Accessor.h"
+#include "Connection.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using aidl::android::hardware::media::bufferpool2::IConnection;
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+using FetchInfo = aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo;
+using FetchResult = aidl::android::hardware::media::bufferpool2::IConnection::FetchResult;
+
+static constexpr int64_t kReceiveTimeoutMs = 2000; // 2s
+static constexpr int kPostMaxRetry = 3;
+static constexpr int kCacheTtlMs = 1000;
+static constexpr size_t kMaxCachedBufferCount = 64;
+static constexpr size_t kCachedBufferCountTarget = kMaxCachedBufferCount - 16;
+
+class BufferPoolClient::Impl
+ : public std::enable_shared_from_this<BufferPoolClient::Impl> {
+public:
+ explicit Impl(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer);
+
+ explicit Impl(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer);
+
+ bool isValid() {
+ return mValid;
+ }
+
+ bool isLocal() {
+ return mValid && mLocal;
+ }
+
+ ConnectionId getConnectionId() {
+ return mConnectionId;
+ }
+
+ std::shared_ptr<IAccessor> &getAccessor() {
+ return mAccessor;
+ }
+
+ bool isActive(int64_t *lastTransactionMs, bool clearCache);
+
+ void receiveInvalidation(uint32_t msgID);
+
+ BufferPoolStatus flush();
+
+ BufferPoolStatus allocate(const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ BufferPoolStatus receive(
+ TransactionId transactionId, BufferId bufferId,
+ int64_t timestampMs,
+ native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
+
+ void postBufferRelease(BufferId bufferId);
+
+ bool postSend(
+ BufferId bufferId, ConnectionId receiver,
+ TransactionId *transactionId, int64_t *timestampMs);
+private:
+
+ bool postReceive(
+ BufferId bufferId, TransactionId transactionId,
+ int64_t timestampMs);
+
+ bool postReceiveResult(
+ BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
+
+ void trySyncFromRemote();
+
+ bool syncReleased(uint32_t msgId = 0);
+
+ void evictCaches(bool clearCache = false);
+
+ void invalidateBuffer(BufferId id);
+
+ void invalidateRange(BufferId from, BufferId to);
+
+ BufferPoolStatus allocateBufferHandle(
+ const std::vector<uint8_t>& params, BufferId *bufferId,
+ native_handle_t **handle);
+
+ BufferPoolStatus fetchBufferHandle(
+ TransactionId transactionId, BufferId bufferId,
+ native_handle_t **handle);
+
+ struct BlockPoolDataDtor;
+ struct ClientBuffer;
+
+ bool mLocal;
+ bool mValid;
+ std::shared_ptr<IAccessor> mAccessor;
+ std::shared_ptr<Connection> mLocalConnection;
+ std::shared_ptr<IConnection> mRemoteConnection;
+ uint32_t mSeqId;
+ ConnectionId mConnectionId;
+ int64_t mLastEvictCacheMs;
+ std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
+
+ // CachedBuffers
+ struct BufferCache {
+ std::mutex mLock;
+ bool mCreating;
+ std::condition_variable mCreateCv;
+ std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
+ int mActive;
+ int64_t mLastChangeMs;
+
+ BufferCache() : mCreating(false), mActive(0),
+ mLastChangeMs(::android::elapsedRealtime()) {}
+
+ void incActive_l() {
+ ++mActive;
+ mLastChangeMs = ::android::elapsedRealtime();
+ }
+
+ void decActive_l() {
+ --mActive;
+ mLastChangeMs = ::android::elapsedRealtime();
+ }
+
+ int cachedBufferCount() const {
+ return mBuffers.size() - mActive;
+ }
+ } mCache;
+
+ // FMQ - release notifier
+ struct ReleaseCache {
+ std::mutex mLock;
+ std::list<BufferId> mReleasingIds;
+ std::list<BufferId> mReleasedIds;
+ uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
+ bool mInvalidateAck;
+ std::unique_ptr<BufferStatusChannel> mStatusChannel;
+
+ ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
+ } mReleasing;
+
+ // This lock is held during synchronization from remote side.
+ // In order to minimize remote calls and locking duration, this lock is held
+ // by best effort approach using try_lock().
+ std::mutex mRemoteSyncLock;
+};
+
+struct BufferPoolClient::Impl::BlockPoolDataDtor {
+ BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
+ : mImpl(impl) {}
+
+ void operator()(BufferPoolData *buffer) {
+ BufferId id = buffer->mId;
+ delete buffer;
+
+ auto impl = mImpl.lock();
+ if (impl && impl->isValid()) {
+ impl->postBufferRelease(id);
+ }
+ }
+ const std::weak_ptr<BufferPoolClient::Impl> mImpl;
+};
+
+struct BufferPoolClient::Impl::ClientBuffer {
+private:
+ int64_t mExpireMs;
+ bool mHasCache;
+ ConnectionId mConnectionId;
+ BufferId mId;
+ native_handle_t *mHandle;
+ std::weak_ptr<BufferPoolData> mCache;
+
+ void updateExpire() {
+ mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
+ }
+
+public:
+ ClientBuffer(
+ ConnectionId connectionId, BufferId id, native_handle_t *handle)
+ : mHasCache(false), mConnectionId(connectionId),
+ mId(id), mHandle(handle) {
+ mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
+ }
+
+ ~ClientBuffer() {
+ if (mHandle) {
+ native_handle_close(mHandle);
+ native_handle_delete(mHandle);
+ }
+ }
+
+ BufferId id() const {
+ return mId;
+ }
+
+ bool expire() const {
+ int64_t now = ::android::elapsedRealtime();
+ return now >= mExpireMs;
+ }
+
+ bool hasCache() const {
+ return mHasCache;
+ }
+
+ std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
+ if (mHasCache) {
+ std::shared_ptr<BufferPoolData> cache = mCache.lock();
+ if (cache) {
+ *pHandle = mHandle;
+ }
+ return cache;
+ }
+ return nullptr;
+ }
+
+ std::shared_ptr<BufferPoolData> createCache(
+ const std::shared_ptr<BufferPoolClient::Impl> &impl,
+ native_handle_t **pHandle) {
+ if (!mHasCache) {
+ // Allocates a raw ptr in order to avoid sending #postBufferRelease
+ // from deleter, in case of native_handle_clone failure.
+ BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
+ if (ptr) {
+ std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
+ if (cache) {
+ mCache = cache;
+ mHasCache = true;
+ *pHandle = mHandle;
+ return cache;
+ }
+ }
+ if (ptr) {
+ delete ptr;
+ }
+ }
+ return nullptr;
+ }
+
+ bool onCacheRelease() {
+ if (mHasCache) {
+ // TODO: verify mCache is not valid;
+ updateExpire();
+ mHasCache = false;
+ return true;
+ }
+ return false;
+ }
+};
+
+BufferPoolClient::Impl::Impl(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer)
+ : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
+ mLastEvictCacheMs(::android::elapsedRealtime()) {
+ StatusDescriptor statusDesc;
+ InvalidationDescriptor invDesc;
+ BufferPoolStatus status = accessor->connect(
+ observer, true,
+ &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
+ &statusDesc, &invDesc);
+ if (status == ResultStatus::OK) {
+ mReleasing.mStatusChannel =
+ std::make_unique<BufferStatusChannel>(statusDesc);
+ mInvalidationListener =
+ std::make_unique<BufferInvalidationListener>(invDesc);
+ mValid = mReleasing.mStatusChannel &&
+ mReleasing.mStatusChannel->isValid() &&
+ mInvalidationListener &&
+ mInvalidationListener->isValid();
+ }
+}
+
+BufferPoolClient::Impl::Impl(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer)
+ : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
+ mLastEvictCacheMs(::android::elapsedRealtime()) {
+ IAccessor::ConnectionInfo conInfo;
+ bool valid = false;
+ if(accessor->connect(observer, &conInfo).isOk()) {
+ auto channel = std::make_unique<BufferStatusChannel>(conInfo.toFmqDesc);
+ auto observer = std::make_unique<BufferInvalidationListener>(conInfo.fromFmqDesc);
+
+ if (channel && channel->isValid()
+ && observer && observer->isValid()) {
+ mRemoteConnection = conInfo.connection;
+ mConnectionId = conInfo.connectionId;
+ mReleasing.mInvalidateId = conInfo.msgId;
+ mReleasing.mStatusChannel = std::move(channel);
+ mInvalidationListener = std::move(observer);
+ valid = true;
+ }
+ }
+ mValid = valid;
+}
+
+bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionMs, bool clearCache) {
+ bool active = false;
+ {
+ std::lock_guard<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ evictCaches(clearCache);
+ *lastTransactionMs = mCache.mLastChangeMs;
+ active = mCache.mActive > 0;
+ }
+ if (mValid && mLocal && mLocalConnection) {
+ mLocalConnection->cleanUp(clearCache);
+ return true;
+ }
+ return active;
+}
+
+void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
+ std::lock_guard<std::mutex> lock(mCache.mLock);
+ syncReleased(messageId);
+ // TODO: evict cache required?
+}
+
+BufferPoolStatus BufferPoolClient::Impl::flush() {
+ if (!mLocal || !mLocalConnection || !mValid) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ {
+ std::unique_lock<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ evictCaches();
+ return mLocalConnection->flush();
+ }
+}
+
+BufferPoolStatus BufferPoolClient::Impl::allocate(
+ const std::vector<uint8_t> ¶ms,
+ native_handle_t **pHandle,
+ std::shared_ptr<BufferPoolData> *buffer) {
+ if (!mLocal || !mLocalConnection || !mValid) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ BufferId bufferId;
+ native_handle_t *handle = nullptr;
+ buffer->reset();
+ BufferPoolStatus status = allocateBufferHandle(params, &bufferId, &handle);
+ if (status == ResultStatus::OK) {
+ if (handle) {
+ std::unique_lock<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ evictCaches();
+ auto cacheIt = mCache.mBuffers.find(bufferId);
+ if (cacheIt != mCache.mBuffers.end()) {
+ // TODO: verify it is recycled. (not having active ref)
+ mCache.mBuffers.erase(cacheIt);
+ }
+ auto clientBuffer = std::make_unique<ClientBuffer>(
+ mConnectionId, bufferId, handle);
+ if (clientBuffer) {
+ auto result = mCache.mBuffers.insert(std::make_pair(
+ bufferId, std::move(clientBuffer)));
+ if (result.second) {
+ *buffer = result.first->second->createCache(
+ shared_from_this(), pHandle);
+ if (*buffer) {
+ mCache.incActive_l();
+ }
+ }
+ }
+ }
+ if (!*buffer) {
+ ALOGV("client cache creation failure %d: %lld",
+ handle != nullptr, (long long)mConnectionId);
+ status = ResultStatus::NO_MEMORY;
+ postBufferRelease(bufferId);
+ }
+ }
+ return status;
+}
+
+BufferPoolStatus BufferPoolClient::Impl::receive(
+ TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
+ native_handle_t **pHandle,
+ std::shared_ptr<BufferPoolData> *buffer) {
+ if (!mValid) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ if (timestampMs != 0) {
+ timestampMs += kReceiveTimeoutMs;
+ }
+ if (!postReceive(bufferId, transactionId, timestampMs)) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
+ buffer->reset();
+ while(1) {
+ std::unique_lock<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ evictCaches();
+ auto cacheIt = mCache.mBuffers.find(bufferId);
+ if (cacheIt != mCache.mBuffers.end()) {
+ if (cacheIt->second->hasCache()) {
+ *buffer = cacheIt->second->fetchCache(pHandle);
+ if (!*buffer) {
+ // check transfer time_out
+ lock.unlock();
+ std::this_thread::yield();
+ continue;
+ }
+ ALOGV("client receive from reference %lld", (long long)mConnectionId);
+ break;
+ } else {
+ *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
+ if (*buffer) {
+ mCache.incActive_l();
+ }
+ ALOGV("client receive from cache %lld", (long long)mConnectionId);
+ break;
+ }
+ } else {
+ if (!mCache.mCreating) {
+ mCache.mCreating = true;
+ lock.unlock();
+ native_handle_t* handle = nullptr;
+ status = fetchBufferHandle(transactionId, bufferId, &handle);
+ lock.lock();
+ if (status == ResultStatus::OK) {
+ if (handle) {
+ auto clientBuffer = std::make_unique<ClientBuffer>(
+ mConnectionId, bufferId, handle);
+ if (clientBuffer) {
+ auto result = mCache.mBuffers.insert(
+ std::make_pair(bufferId, std::move(
+ clientBuffer)));
+ if (result.second) {
+ *buffer = result.first->second->createCache(
+ shared_from_this(), pHandle);
+ if (*buffer) {
+ mCache.incActive_l();
+ }
+ }
+ }
+ }
+ if (!*buffer) {
+ status = ResultStatus::NO_MEMORY;
+ }
+ }
+ mCache.mCreating = false;
+ lock.unlock();
+ mCache.mCreateCv.notify_all();
+ break;
+ }
+ mCache.mCreateCv.wait(lock);
+ }
+ }
+ bool needsSync = false;
+ bool posted = postReceiveResult(bufferId, transactionId,
+ *buffer ? true : false, &needsSync);
+ ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
+ *buffer ? "ok" : "fail", posted);
+ if (mValid && mLocal && mLocalConnection) {
+ mLocalConnection->cleanUp(false);
+ }
+ if (needsSync && mRemoteConnection) {
+ trySyncFromRemote();
+ }
+ if (*buffer) {
+ if (!posted) {
+ buffer->reset();
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ return ResultStatus::OK;
+ }
+ return status;
+}
+
+
+void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ mReleasing.mReleasingIds.push_back(bufferId);
+ mReleasing.mStatusChannel->postBufferRelease(
+ mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
+}
+
+// TODO: revise ad-hoc posting data structure
+bool BufferPoolClient::Impl::postSend(
+ BufferId bufferId, ConnectionId receiver,
+ TransactionId *transactionId, int64_t *timestampMs) {
+ {
+ // TODO: don't need to call syncReleased every time
+ std::lock_guard<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ }
+ bool ret = false;
+ bool needsSync = false;
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ *timestampMs = ::android::elapsedRealtime();
+ *transactionId = (mConnectionId << 32) | mSeqId++;
+ // TODO: retry, add timeout, target?
+ ret = mReleasing.mStatusChannel->postBufferStatusMessage(
+ *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
+ receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
+ needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
+ }
+ if (mValid && mLocal && mLocalConnection) {
+ mLocalConnection->cleanUp(false);
+ }
+ if (needsSync && mRemoteConnection) {
+ trySyncFromRemote();
+ }
+ return ret;
+}
+
+bool BufferPoolClient::Impl::postReceive(
+ BufferId bufferId, TransactionId transactionId, int64_t timestampMs) {
+ for (int i = 0; i < kPostMaxRetry; ++i) {
+ std::unique_lock<std::mutex> lock(mReleasing.mLock);
+ int64_t now = ::android::elapsedRealtime();
+ if (timestampMs == 0 || now < timestampMs) {
+ bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
+ transactionId, bufferId, BufferStatus::TRANSFER_FROM,
+ mConnectionId, -1, mReleasing.mReleasingIds,
+ mReleasing.mReleasedIds);
+ if (result) {
+ return true;
+ }
+ lock.unlock();
+ std::this_thread::yield();
+ } else {
+ mReleasing.mStatusChannel->postBufferStatusMessage(
+ transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
+ mConnectionId, -1, mReleasing.mReleasingIds,
+ mReleasing.mReleasedIds);
+ return false;
+ }
+ }
+ return false;
+}
+
+bool BufferPoolClient::Impl::postReceiveResult(
+ BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ // TODO: retry, add timeout
+ bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
+ transactionId, bufferId,
+ result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
+ mConnectionId, -1, mReleasing.mReleasingIds,
+ mReleasing.mReleasedIds);
+ *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
+ return ret;
+}
+
+void BufferPoolClient::Impl::trySyncFromRemote() {
+ if (mRemoteSyncLock.try_lock()) {
+ bool needsSync = false;
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ needsSync = mReleasing.mStatusChannel->needsSync();
+ }
+ if (needsSync) {
+ if (!mRemoteConnection->sync().isOk()) {
+ ALOGD("sync from client %lld failed: bufferpool process died.",
+ (long long)mConnectionId);
+ }
+ }
+ mRemoteSyncLock.unlock();
+ }
+}
+
+// should have mCache.mLock
+bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
+ bool cleared = false;
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ if (mReleasing.mReleasingIds.size() > 0) {
+ mReleasing.mStatusChannel->postBufferRelease(
+ mConnectionId, mReleasing.mReleasingIds,
+ mReleasing.mReleasedIds);
+ }
+ if (mReleasing.mReleasedIds.size() > 0) {
+ for (BufferId& id: mReleasing.mReleasedIds) {
+ ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
+ auto found = mCache.mBuffers.find(id);
+ if (found != mCache.mBuffers.end()) {
+ if (found->second->onCacheRelease()) {
+ mCache.decActive_l();
+ } else {
+ // should not happen!
+ ALOGW("client %lld cache release status inconsistent!",
+ (long long)mConnectionId);
+ }
+ } else {
+ // should not happen!
+ ALOGW("client %lld cache status inconsistent!", (long long)mConnectionId);
+ }
+ }
+ mReleasing.mReleasedIds.clear();
+ cleared = true;
+ }
+ }
+ std::vector<BufferInvalidationMessage> invalidations;
+ mInvalidationListener->getInvalidations(invalidations);
+ uint32_t lastMsgId = 0;
+ if (invalidations.size() > 0) {
+ for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
+ if (it->messageId != 0) {
+ lastMsgId = it->messageId;
+ }
+ if (it->fromBufferId == it->toBufferId) {
+ // TODO: handle fromBufferId = UINT32_MAX
+ invalidateBuffer(it->fromBufferId);
+ } else {
+ invalidateRange(it->fromBufferId, it->toBufferId);
+ }
+ }
+ }
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ if (lastMsgId != 0) {
+ if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
+ mReleasing.mInvalidateId = lastMsgId;
+ mReleasing.mInvalidateAck = false;
+ }
+ } else if (messageId != 0) {
+ // messages are drained.
+ if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
+ mReleasing.mInvalidateId = messageId;
+ mReleasing.mInvalidateAck = true;
+ }
+ }
+ if (!mReleasing.mInvalidateAck) {
+ // post ACK
+ mReleasing.mStatusChannel->postBufferInvalidateAck(
+ mConnectionId,
+ mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
+ ALOGV("client %lld invalidateion ack (%d) %u",
+ (long long)mConnectionId,
+ mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
+ }
+ }
+ return cleared;
+}
+
+// should have mCache.mLock
+void BufferPoolClient::Impl::evictCaches(bool clearCache) {
+ int64_t now = ::android::elapsedRealtime();
+ if (now >= mLastEvictCacheMs + kCacheTtlMs ||
+ clearCache || mCache.cachedBufferCount() > kMaxCachedBufferCount) {
+ size_t evicted = 0;
+ for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
+ if (!it->second->hasCache() && (it->second->expire() ||
+ clearCache || mCache.cachedBufferCount() > kCachedBufferCountTarget)) {
+ it = mCache.mBuffers.erase(it);
+ ++evicted;
+ } else {
+ ++it;
+ }
+ }
+ ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
+ (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
+ mLastEvictCacheMs = now;
+ }
+}
+
+// should have mCache.mLock
+void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
+ for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
+ if (id == it->second->id()) {
+ if (!it->second->hasCache()) {
+ mCache.mBuffers.erase(it);
+ ALOGV("cache invalidated %lld : buffer %u",
+ (long long)mConnectionId, id);
+ } else {
+ ALOGW("Inconsistent invalidation %lld : activer buffer!! %u",
+ (long long)mConnectionId, (unsigned int)id);
+ }
+ break;
+ }
+ }
+}
+
+// should have mCache.mLock
+void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
+ size_t invalidated = 0;
+ for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
+ if (!it->second->hasCache()) {
+ BufferId bid = it->second->id();
+ if (from < to) {
+ if (from <= bid && bid < to) {
+ ++invalidated;
+ it = mCache.mBuffers.erase(it);
+ continue;
+ }
+ } else {
+ if (from <= bid || bid < to) {
+ ++invalidated;
+ it = mCache.mBuffers.erase(it);
+ continue;
+ }
+ }
+ }
+ ++it;
+ }
+ ALOGV("cache invalidated %lld : # of invalidated %zu",
+ (long long)mConnectionId, invalidated);
+}
+
+BufferPoolStatus BufferPoolClient::Impl::allocateBufferHandle(
+ const std::vector<uint8_t>& params, BufferId *bufferId,
+ native_handle_t** handle) {
+ if (mLocalConnection) {
+ const native_handle_t* allocHandle = nullptr;
+ BufferPoolStatus status = mLocalConnection->allocate(
+ params, bufferId, &allocHandle);
+ if (status == ResultStatus::OK) {
+ *handle = native_handle_clone(allocHandle);
+ }
+ ALOGV("client allocate result %lld %d : %u clone %p",
+ (long long)mConnectionId, status == ResultStatus::OK,
+ *handle ? *bufferId : 0 , *handle);
+ return status;
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus BufferPoolClient::Impl::fetchBufferHandle(
+ TransactionId transactionId, BufferId bufferId,
+ native_handle_t **handle) {
+ std::shared_ptr<IConnection> connection;
+ if (mLocal) {
+ connection = mLocalConnection;
+ } else {
+ connection = mRemoteConnection;
+ }
+ std::vector<FetchInfo> infos;
+ std::vector<FetchResult> results;
+ infos.emplace_back(FetchInfo{ToAidl(transactionId), ToAidl(bufferId)});
+ ndk::ScopedAStatus status = connection->fetch(infos, &results);
+ if (!status.isOk()) {
+ BufferPoolStatus svcSpecific = status.getServiceSpecificError();
+ return svcSpecific ? svcSpecific : ResultStatus::CRITICAL_ERROR;
+ }
+ if (results[0].getTag() == FetchResult::buffer) {
+ *handle = ::android::dupFromAidl(results[0].get<FetchResult::buffer>().buffer);
+ return ResultStatus::OK;
+ }
+ return results[0].get<FetchResult::failure>();
+}
+
+
+BufferPoolClient::BufferPoolClient(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer) {
+ mImpl = std::make_shared<Impl>(accessor, observer);
+}
+
+BufferPoolClient::BufferPoolClient(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer) {
+ mImpl = std::make_shared<Impl>(accessor, observer);
+}
+
+BufferPoolClient::~BufferPoolClient() {
+ // TODO: how to handle orphaned buffers?
+}
+
+bool BufferPoolClient::isValid() {
+ return mImpl && mImpl->isValid();
+}
+
+bool BufferPoolClient::isLocal() {
+ return mImpl && mImpl->isLocal();
+}
+
+bool BufferPoolClient::isActive(int64_t *lastTransactionMs, bool clearCache) {
+ if (!isValid()) {
+ *lastTransactionMs = 0;
+ return false;
+ }
+ return mImpl->isActive(lastTransactionMs, clearCache);
+}
+
+ConnectionId BufferPoolClient::getConnectionId() {
+ if (isValid()) {
+ return mImpl->getConnectionId();
+ }
+ return -1;
+}
+
+BufferPoolStatus BufferPoolClient::getAccessor(std::shared_ptr<IAccessor> *accessor) {
+ if (isValid()) {
+ *accessor = mImpl->getAccessor();
+ return ResultStatus::OK;
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
+ ALOGV("bufferpool2 client recv inv %u", msgId);
+ if (isValid()) {
+ mImpl->receiveInvalidation(msgId);
+ }
+}
+
+BufferPoolStatus BufferPoolClient::flush() {
+ if (isValid()) {
+ return mImpl->flush();
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus BufferPoolClient::allocate(
+ const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer) {
+ if (isValid()) {
+ return mImpl->allocate(params, handle, buffer);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus BufferPoolClient::receive(
+ TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
+ native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
+ if (isValid()) {
+ return mImpl->receive(transactionId, bufferId, timestampMs, handle, buffer);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus BufferPoolClient::postSend(
+ ConnectionId receiverId,
+ const std::shared_ptr<BufferPoolData> &buffer,
+ TransactionId *transactionId,
+ int64_t *timestampMs) {
+ if (isValid()) {
+ bool result = mImpl->postSend(
+ buffer->mId, receiverId, transactionId, timestampMs);
+ return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/BufferPoolClient.h b/media/bufferpool/aidl/default/BufferPoolClient.h
new file mode 100644
index 0000000..80fd43e
--- /dev/null
+++ b/media/bufferpool/aidl/default/BufferPoolClient.h
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <aidl/android/hardware/media/bufferpool2/IAccessor.h>
+#include <aidl/android/hardware/media/bufferpool2/IObserver.h>
+#include <bufferpool2/BufferPoolTypes.h>
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using aidl::android::hardware::media::bufferpool2::IAccessor;
+using aidl::android::hardware::media::bufferpool2::IObserver;
+
+struct Accessor;
+
+/**
+ * A buffer pool client for a buffer pool. For a specific buffer pool, at most
+ * one buffer pool client exists per process. This class will not be exposed
+ * outside. A buffer pool client will be used via ClientManager.
+ */
+class BufferPoolClient {
+public:
+ /**
+ * Creates a buffer pool client from a local buffer pool
+ * (via ClientManager#create).
+ */
+ explicit BufferPoolClient(const std::shared_ptr<Accessor> &accessor,
+ const std::shared_ptr<IObserver> &observer);
+
+ /**
+ * Creates a buffer pool client from a remote buffer pool
+ * (via ClientManager#registerSender).
+ * Note: A buffer pool client created with remote buffer pool cannot
+ * allocate a buffer.
+ */
+ explicit BufferPoolClient(const std::shared_ptr<IAccessor> &accessor,
+ const std::shared_ptr<IObserver> &observer);
+
+ /** Destructs a buffer pool client. */
+ ~BufferPoolClient();
+
+private:
+ bool isValid();
+
+ bool isLocal();
+
+ bool isActive(int64_t *lastTransactionMs, bool clearCache);
+
+ ConnectionId getConnectionId();
+
+ BufferPoolStatus getAccessor(std::shared_ptr<IAccessor> *accessor);
+
+ void receiveInvalidation(uint32_t msgId);
+
+ BufferPoolStatus flush();
+
+ BufferPoolStatus allocate(const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ BufferPoolStatus receive(TransactionId transactionId,
+ BufferId bufferId,
+ int64_t timestampMs,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ BufferPoolStatus postSend(ConnectionId receiver,
+ const std::shared_ptr<BufferPoolData> &buffer,
+ TransactionId *transactionId,
+ int64_t *timestampMs);
+
+ class Impl;
+ std::shared_ptr<Impl> mImpl;
+
+ friend struct ClientManager;
+ friend struct Observer;
+};
+
+} // namespace aidl::android::hardware::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/BufferStatus.cpp b/media/bufferpool/aidl/default/BufferStatus.cpp
new file mode 100644
index 0000000..19caa1e
--- /dev/null
+++ b/media/bufferpool/aidl/default/BufferStatus.cpp
@@ -0,0 +1,281 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "AidlBufferPoolStatus"
+//#define LOG_NDEBUG 0
+
+#include <thread>
+#include <time.h>
+#include <aidl/android/hardware/media/bufferpool2/BufferStatus.h>
+#include "BufferStatus.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using aidl::android::hardware::media::bufferpool2::BufferStatus;
+
+bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId) {
+ return curMsgId != prevMsgId && curMsgId - prevMsgId < prevMsgId - curMsgId;
+}
+
+bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId) {
+ if (from < to) {
+ return from <= bufferId && bufferId < to;
+ } else { // wrap happens
+ return from <= bufferId || bufferId < to;
+ }
+}
+
+static constexpr int kNumElementsInQueue = 1024*16;
+static constexpr int kMinElementsToSyncInQueue = 128;
+
+BufferPoolStatus BufferStatusObserver::open(
+ ConnectionId id, StatusDescriptor* fmqDescPtr) {
+ if (mBufferStatusQueues.find(id) != mBufferStatusQueues.end()) {
+ ALOGE("connection id collision %lld", (unsigned long long)id);
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ auto queue = std::make_unique<BufferStatusQueue>(kNumElementsInQueue);
+ if (!queue || queue->isValid() == false) {
+ return ResultStatus::NO_MEMORY;
+ }
+ *fmqDescPtr = queue->dupeDesc();
+ auto result = mBufferStatusQueues.insert(
+ std::make_pair(id, std::move(queue)));
+ if (!result.second) {
+ return ResultStatus::NO_MEMORY;
+ }
+ return ResultStatus::OK;
+}
+
+BufferPoolStatus BufferStatusObserver::close(ConnectionId id) {
+ if (mBufferStatusQueues.find(id) == mBufferStatusQueues.end()) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ mBufferStatusQueues.erase(id);
+ return ResultStatus::OK;
+}
+
+void BufferStatusObserver::getBufferStatusChanges(std::vector<BufferStatusMessage> &messages) {
+ for (auto it = mBufferStatusQueues.begin(); it != mBufferStatusQueues.end(); ++it) {
+ BufferStatusMessage message;
+ size_t avail = it->second->availableToRead();
+ while (avail > 0) {
+ if (!it->second->read(&message, 1)) {
+ // Since available # of reads are already confirmed,
+ // this should not happen.
+ // TODO: error handling (spurious client?)
+ ALOGW("FMQ message cannot be read from %lld", (long long)it->first);
+ return;
+ }
+ message.connectionId = it->first;
+ messages.push_back(message);
+ --avail;
+ }
+ }
+}
+
+BufferStatusChannel::BufferStatusChannel(
+ const StatusDescriptor &fmqDesc) {
+ auto queue = std::make_unique<BufferStatusQueue>(fmqDesc);
+ if (!queue || queue->isValid() == false) {
+ mValid = false;
+ return;
+ }
+ mValid = true;
+ mBufferStatusQueue = std::move(queue);
+}
+
+bool BufferStatusChannel::isValid() {
+ return mValid;
+}
+
+bool BufferStatusChannel::needsSync() {
+ if (mValid) {
+ size_t avail = mBufferStatusQueue->availableToWrite();
+ return avail + kMinElementsToSyncInQueue < kNumElementsInQueue;
+ }
+ return false;
+}
+
+void BufferStatusChannel::postBufferRelease(
+ ConnectionId connectionId,
+ std::list<BufferId> &pending, std::list<BufferId> &posted) {
+ if (mValid && pending.size() > 0) {
+ size_t avail = mBufferStatusQueue->availableToWrite();
+ avail = std::min(avail, pending.size());
+ BufferStatusMessage message;
+ for (size_t i = 0 ; i < avail; ++i) {
+ BufferId id = pending.front();
+ message.status = BufferStatus::NOT_USED;
+ message.bufferId = id;
+ message.connectionId = connectionId;
+ if (!mBufferStatusQueue->write(&message, 1)) {
+ // Since available # of writes are already confirmed,
+ // this should not happen.
+ // TODO: error handing?
+ ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
+ return;
+ }
+ pending.pop_front();
+ posted.push_back(id);
+ }
+ }
+}
+
+void BufferStatusChannel::postBufferInvalidateAck(
+ ConnectionId connectionId,
+ uint32_t invalidateId,
+ bool *invalidated) {
+ if (mValid && !*invalidated) {
+ size_t avail = mBufferStatusQueue->availableToWrite();
+ if (avail > 0) {
+ BufferStatusMessage message;
+ message.status = BufferStatus::INVALIDATION_ACK;
+ message.bufferId = invalidateId;
+ message.connectionId = connectionId;
+ if (!mBufferStatusQueue->write(&message, 1)) {
+ // Since available # of writes are already confirmed,
+ // this should not happen.
+ // TODO: error handing?
+ ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
+ return;
+ }
+ *invalidated = true;
+ }
+ }
+}
+
+bool BufferStatusChannel::postBufferStatusMessage(
+ TransactionId transactionId, BufferId bufferId,
+ BufferStatus status, ConnectionId connectionId, ConnectionId targetId,
+ std::list<BufferId> &pending, std::list<BufferId> &posted) {
+ if (mValid) {
+ size_t avail = mBufferStatusQueue->availableToWrite();
+ size_t numPending = pending.size();
+ if (avail >= numPending + 1) {
+ BufferStatusMessage release, message;
+ for (size_t i = 0; i < numPending; ++i) {
+ BufferId id = pending.front();
+ release.status = BufferStatus::NOT_USED;
+ release.bufferId = id;
+ release.connectionId = connectionId;
+ if (!mBufferStatusQueue->write(&release, 1)) {
+ // Since available # of writes are already confirmed,
+ // this should not happen.
+ // TODO: error handling?
+ ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
+ return false;
+ }
+ pending.pop_front();
+ posted.push_back(id);
+ }
+ message.transactionId = transactionId;
+ message.bufferId = bufferId;
+ message.status = status;
+ message.connectionId = connectionId;
+ message.targetConnectionId = targetId;
+ // TODO : timesatamp
+ message.timestampUs = 0;
+ if (!mBufferStatusQueue->write(&message, 1)) {
+ // Since available # of writes are already confirmed,
+ // this should not happen.
+ ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
+ return false;
+ }
+ return true;
+ }
+ }
+ return false;
+}
+
+BufferInvalidationListener::BufferInvalidationListener(
+ const InvalidationDescriptor &fmqDesc) {
+ std::unique_ptr<BufferInvalidationQueue> queue =
+ std::make_unique<BufferInvalidationQueue>(fmqDesc);
+ if (!queue || queue->isValid() == false) {
+ mValid = false;
+ return;
+ }
+ mValid = true;
+ mBufferInvalidationQueue = std::move(queue);
+ // drain previous messages
+ size_t avail = std::min(
+ mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
+ std::vector<BufferInvalidationMessage> temp(avail);
+ if (avail > 0) {
+ mBufferInvalidationQueue->read(temp.data(), avail);
+ }
+}
+
+void BufferInvalidationListener::getInvalidations(
+ std::vector<BufferInvalidationMessage> &messages) {
+ // Try twice in case of overflow.
+ // TODO: handling overflow though it may not happen.
+ for (int i = 0; i < 2; ++i) {
+ size_t avail = std::min(
+ mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
+ if (avail > 0) {
+ std::vector<BufferInvalidationMessage> temp(avail);
+ if (mBufferInvalidationQueue->read(temp.data(), avail)) {
+ messages.reserve(messages.size() + avail);
+ for (auto it = temp.begin(); it != temp.end(); ++it) {
+ messages.push_back(*it);
+ }
+ break;
+ }
+ } else {
+ return;
+ }
+ }
+}
+
+bool BufferInvalidationListener::isValid() {
+ return mValid;
+}
+
+BufferInvalidationChannel::BufferInvalidationChannel()
+ : mValid(true),
+ mBufferInvalidationQueue(
+ std::make_unique<BufferInvalidationQueue>(kNumElementsInQueue, true)) {
+ if (!mBufferInvalidationQueue || mBufferInvalidationQueue->isValid() == false) {
+ mValid = false;
+ }
+}
+
+bool BufferInvalidationChannel::isValid() {
+ return mValid;
+}
+
+void BufferInvalidationChannel::getDesc(InvalidationDescriptor *fmqDescPtr) {
+ if (mValid) {
+ *fmqDescPtr = mBufferInvalidationQueue->dupeDesc();
+ }
+ // TODO: writing invalid descriptor?
+}
+
+void BufferInvalidationChannel::postInvalidation(
+ uint32_t msgId, BufferId fromId, BufferId toId) {
+ BufferInvalidationMessage message;
+
+ message.messageId = msgId;
+ message.fromBufferId = fromId;
+ message.toBufferId = toId;
+ // TODO: handle failure (it does not happen normally.)
+ mBufferInvalidationQueue->write(&message);
+}
+
+} // namespace ::aidl::android::hardware::media::bufferpool2::implementation
+
diff --git a/media/bufferpool/aidl/default/BufferStatus.h b/media/bufferpool/aidl/default/BufferStatus.h
new file mode 100644
index 0000000..3dd92f4
--- /dev/null
+++ b/media/bufferpool/aidl/default/BufferStatus.h
@@ -0,0 +1,211 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <bufferpool2/BufferPoolTypes.h>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <vector>
+#include <list>
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId);
+
+bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId);
+
+/**
+ * A collection of buffer status message FMQ for a buffer pool. buffer
+ * ownership/status change messages are sent via the FMQs from the clients.
+ */
+class BufferStatusObserver {
+private:
+ std::map<ConnectionId, std::unique_ptr<BufferStatusQueue>>
+ mBufferStatusQueues;
+
+public:
+ /** Creates a buffer status message FMQ for the specified
+ * connection(client).
+ *
+ * @param connectionId connection Id of the specified client.
+ * @param fmqDescPtr ptr of created FMQ's descriptor.
+ *
+ * @return OK if FMQ is created successfully.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus open(ConnectionId id, StatusDescriptor* _Nonnull fmqDescPtr);
+
+ /** Closes a buffer status message FMQ for the specified
+ * connection(client).
+ *
+ * @param connectionId connection Id of the specified client.
+ *
+ * @return OK if the specified connection is closed successfully.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus close(ConnectionId id);
+
+ /** Retrieves all pending FMQ buffer status messages from clients.
+ *
+ * @param messages retrieved pending messages.
+ */
+ void getBufferStatusChanges(std::vector<BufferStatusMessage> &messages);
+};
+
+/**
+ * A buffer status message FMQ for a buffer pool client. Buffer ownership/status
+ * change messages are sent via the fmq to the buffer pool.
+ */
+class BufferStatusChannel {
+private:
+ bool mValid;
+ std::unique_ptr<BufferStatusQueue> mBufferStatusQueue;
+
+public:
+ /**
+ * Connects to a buffer status message FMQ from a descriptor of
+ * the created FMQ.
+ *
+ * @param fmqDesc Descriptor of the created FMQ.
+ */
+ BufferStatusChannel(const StatusDescriptor &fmqDesc);
+
+ /** Returns whether the FMQ is connected successfully. */
+ bool isValid();
+
+ /** Returns whether the FMQ needs to be synced from the buffer pool */
+ bool needsSync();
+
+ /**
+ * Posts a buffer release message to the buffer pool.
+ *
+ * @param connectionId connection Id of the client.
+ * @param pending currently pending buffer release messages.
+ * @param posted posted buffer release messages.
+ */
+ void postBufferRelease(
+ ConnectionId connectionId,
+ std::list<BufferId> &pending, std::list<BufferId> &posted);
+
+ /**
+ * Posts a buffer status message regarding the specified buffer
+ * transfer transaction.
+ *
+ * @param transactionId Id of the specified transaction.
+ * @param bufferId buffer Id of the specified transaction.
+ * @param status new status of the buffer.
+ * @param connectionId connection Id of the client.
+ * @param targetId connection Id of the receiver(only when the sender
+ * posts a status message).
+ * @param pending currently pending buffer release messages.
+ * @param posted posted buffer release messages.
+ *
+ * @return {@code true} when the specified message is posted,
+ * {@code false} otherwise.
+ */
+ bool postBufferStatusMessage(
+ TransactionId transactionId,
+ BufferId bufferId,
+ BufferStatus status,
+ ConnectionId connectionId,
+ ConnectionId targetId,
+ std::list<BufferId> &pending, std::list<BufferId> &posted);
+
+ /**
+ * Posts a buffer invaliadation message to the buffer pool.
+ *
+ * @param connectionId connection Id of the client.
+ * @param invalidateId invalidation ack to the buffer pool.
+ * if invalidation id is zero, the ack will not be
+ * posted.
+ * @param invalidated sets {@code true} only when the invalidation ack is
+ * posted.
+ */
+ void postBufferInvalidateAck(
+ ConnectionId connectionId,
+ uint32_t invalidateId,
+ bool* _Nonnull invalidated);
+};
+
+/**
+ * A buffer invalidation FMQ for a buffer pool client. Buffer invalidation
+ * messages are received via the fmq from the buffer pool. Buffer invalidation
+ * messages are handled as soon as possible.
+ */
+class BufferInvalidationListener {
+private:
+ bool mValid;
+ std::unique_ptr<BufferInvalidationQueue> mBufferInvalidationQueue;
+
+public:
+ /**
+ * Connects to a buffer invalidation FMQ from a descriptor of the created FMQ.
+ *
+ * @param fmqDesc Descriptor of the created FMQ.
+ */
+ BufferInvalidationListener(const InvalidationDescriptor &fmqDesc);
+
+ /** Retrieves all pending buffer invalidation messages from the buffer pool.
+ *
+ * @param messages retrieved pending messages.
+ */
+ void getInvalidations(std::vector<BufferInvalidationMessage> &messages);
+
+ /** Returns whether the FMQ is connected successfully. */
+ bool isValid();
+};
+
+/**
+ * A buffer invalidation FMQ for a buffer pool. A buffer pool will send buffer
+ * invalidation messages to the clients via the FMQ. The FMQ is shared among
+ * buffer pool clients.
+ */
+class BufferInvalidationChannel {
+private:
+ bool mValid;
+ std::unique_ptr<BufferInvalidationQueue> mBufferInvalidationQueue;
+
+public:
+ /**
+ * Creates a buffer invalidation FMQ for a buffer pool.
+ */
+ BufferInvalidationChannel();
+
+ /** Returns whether the FMQ is connected successfully. */
+ bool isValid();
+
+ /**
+ * Retrieves the descriptor of a buffer invalidation FMQ. the descriptor may
+ * be passed to the client for buffer invalidation handling.
+ *
+ * @param fmqDescPtr ptr of created FMQ's descriptor.
+ */
+ void getDesc(InvalidationDescriptor* _Nonnull fmqDescPtr);
+
+ /** Posts a buffer invalidation for invalidated buffers.
+ *
+ * @param msgId Invalidation message id which is used when clients send
+ * acks back via BufferStatusMessage
+ * @param fromId The start bufferid of the invalidated buffers(inclusive)
+ * @param toId The end bufferId of the invalidated buffers(inclusive)
+ */
+ void postInvalidation(uint32_t msgId, BufferId fromId, BufferId toId);
+};
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/ClientManager.cpp b/media/bufferpool/aidl/default/ClientManager.cpp
new file mode 100644
index 0000000..de1db50
--- /dev/null
+++ b/media/bufferpool/aidl/default/ClientManager.cpp
@@ -0,0 +1,515 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "AidlBufferPoolMgr"
+//#define LOG_NDEBUG 0
+
+#include <aidl/android/hardware/media/bufferpool2/ResultStatus.h>
+#include <bufferpool2/ClientManager.h>
+
+#include <sys/types.h>
+#include <utils/SystemClock.h>
+#include <unistd.h>
+#include <utils/Log.h>
+
+#include <chrono>
+
+#include "BufferPoolClient.h"
+#include "Observer.h"
+#include "Accessor.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using namespace std::chrono_literals;
+
+using Registration = aidl::android::hardware::media::bufferpool2::IClientManager::Registration;
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+
+static constexpr int64_t kRegisterTimeoutMs = 500; // 0.5 sec
+static constexpr int64_t kCleanUpDurationMs = 1000; // TODO: 1 sec tune
+static constexpr int64_t kClientTimeoutMs = 5000; // TODO: 5 secs tune
+
+class ClientManager::Impl {
+public:
+ Impl();
+
+ // BnRegisterSender
+ BufferPoolStatus registerSender(const std::shared_ptr<IAccessor> &accessor,
+ Registration *pRegistration);
+
+ // BpRegisterSender
+ BufferPoolStatus registerSender(const std::shared_ptr<IClientManager> &receiver,
+ ConnectionId senderId,
+ ConnectionId *receiverId,
+ bool *isNew);
+
+ BufferPoolStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
+ ConnectionId *pConnectionId);
+
+ BufferPoolStatus close(ConnectionId connectionId);
+
+ BufferPoolStatus flush(ConnectionId connectionId);
+
+ BufferPoolStatus allocate(ConnectionId connectionId,
+ const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ BufferPoolStatus receive(ConnectionId connectionId,
+ TransactionId transactionId,
+ BufferId bufferId,
+ int64_t timestampMs,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ BufferPoolStatus postSend(ConnectionId receiverId,
+ const std::shared_ptr<BufferPoolData> &buffer,
+ TransactionId *transactionId,
+ int64_t *timestampMs);
+
+ BufferPoolStatus getAccessor(ConnectionId connectionId,
+ std::shared_ptr<IAccessor> *accessor);
+
+ void cleanUp(bool clearCache = false);
+
+private:
+ // In order to prevent deadlock between multiple locks,
+ // always lock ClientCache.lock before locking ActiveClients.lock.
+ struct ClientCache {
+ // This lock is held for brief duration.
+ // Blocking operation is not performed while holding the lock.
+ std::mutex mMutex;
+ std::list<std::pair<const std::weak_ptr<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
+ mClients;
+ std::condition_variable mConnectCv;
+ bool mConnecting;
+ int64_t mLastCleanUpMs;
+
+ ClientCache() : mConnecting(false), mLastCleanUpMs(::android::elapsedRealtime()) {}
+ } mCache;
+
+ // Active clients which can be retrieved via ConnectionId
+ struct ActiveClients {
+ // This lock is held for brief duration.
+ // Blocking operation is not performed holding the lock.
+ std::mutex mMutex;
+ std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
+ mClients;
+ } mActive;
+
+ std::shared_ptr<Observer> mObserver;
+};
+
+ClientManager::Impl::Impl()
+ : mObserver(::ndk::SharedRefBase::make<Observer>()) {}
+
+BufferPoolStatus ClientManager::Impl::registerSender(
+ const std::shared_ptr<IAccessor> &accessor, Registration *pRegistration) {
+ cleanUp();
+ int64_t timeoutMs = ::android::elapsedRealtime() + kRegisterTimeoutMs;
+ do {
+ std::unique_lock<std::mutex> lock(mCache.mMutex);
+ for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
+ std::shared_ptr<IAccessor> sAccessor = it->first.lock();
+ if (sAccessor && sAccessor.get() == accessor.get()) {
+ const std::shared_ptr<BufferPoolClient> client = it->second.lock();
+ if (client) {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ pRegistration->connectionId = client->getConnectionId();
+ if (mActive.mClients.find(pRegistration->connectionId)
+ != mActive.mClients.end()) {
+ ALOGV("register existing connection %lld",
+ (long long)pRegistration->connectionId);
+ pRegistration->isNew = false;
+ return ResultStatus::OK;
+ }
+ }
+ mCache.mClients.erase(it);
+ break;
+ }
+ }
+ if (!mCache.mConnecting) {
+ mCache.mConnecting = true;
+ lock.unlock();
+ BufferPoolStatus result = ResultStatus::OK;
+ const std::shared_ptr<BufferPoolClient> client =
+ std::make_shared<BufferPoolClient>(accessor, mObserver);
+ lock.lock();
+ if (!client) {
+ result = ResultStatus::NO_MEMORY;
+ } else if (!client->isValid()) {
+ result = ResultStatus::CRITICAL_ERROR;
+ }
+ if (result == ResultStatus::OK) {
+ // TODO: handle insert fail. (malloc fail)
+ const std::weak_ptr<BufferPoolClient> wclient = client;
+ mCache.mClients.push_back(std::make_pair(accessor, wclient));
+ ConnectionId conId = client->getConnectionId();
+ mObserver->addClient(conId, wclient);
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ mActive.mClients.insert(std::make_pair(conId, client));
+ }
+ pRegistration->connectionId = conId;
+ pRegistration->isNew = true;
+ ALOGV("register new connection %lld", (long long)conId);
+ }
+ mCache.mConnecting = false;
+ lock.unlock();
+ mCache.mConnectCv.notify_all();
+ return result;
+ }
+ mCache.mConnectCv.wait_for(lock, kRegisterTimeoutMs*1ms);
+ } while (::android::elapsedRealtime() < timeoutMs);
+ // TODO: return timeout error
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::Impl::registerSender(
+ const std::shared_ptr<IClientManager> &receiver,
+ ConnectionId senderId,
+ ConnectionId *receiverId,
+ bool *isNew) {
+ std::shared_ptr<IAccessor> accessor;
+ bool local = false;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(senderId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ it->second->getAccessor(&accessor);
+ local = it->second->isLocal();
+ }
+ if (accessor) {
+ Registration registration;
+ ::ndk::ScopedAStatus status = receiver->registerSender(accessor, ®istration);
+ if (!status.isOk()) {
+ return ResultStatus::CRITICAL_ERROR;
+ } else if (local) {
+ std::shared_ptr<ConnectionDeathRecipient> recipient =
+ Accessor::getConnectionDeathRecipient();
+ if (recipient) {
+ ALOGV("client death recipient registered %lld", (long long)*receiverId);
+ recipient->addCookieToConnection(receiver->asBinder().get(), *receiverId);
+ AIBinder_linkToDeath(receiver->asBinder().get(), recipient->getRecipient(),
+ receiver->asBinder().get());
+ }
+ }
+ *receiverId = registration.connectionId;
+ *isNew = registration.isNew;
+ return ResultStatus::OK;
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::Impl::create(
+ const std::shared_ptr<BufferPoolAllocator> &allocator,
+ ConnectionId *pConnectionId) {
+ std::shared_ptr<Accessor> accessor = ::ndk::SharedRefBase::make<Accessor>(allocator);
+ if (!accessor || !accessor->isValid()) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ // TODO: observer is local. use direct call instead of hidl call.
+ std::shared_ptr<BufferPoolClient> client =
+ std::make_shared<BufferPoolClient>(accessor, mObserver);
+ if (!client || !client->isValid()) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ // Since a new bufferpool is created, evict memories which are used by
+ // existing bufferpools and clients.
+ cleanUp(true);
+ {
+ // TODO: handle insert fail. (malloc fail)
+ std::lock_guard<std::mutex> lock(mCache.mMutex);
+ const std::weak_ptr<BufferPoolClient> wclient = client;
+ mCache.mClients.push_back(std::make_pair(accessor, wclient));
+ ConnectionId conId = client->getConnectionId();
+ mObserver->addClient(conId, wclient);
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ mActive.mClients.insert(std::make_pair(conId, client));
+ }
+ *pConnectionId = conId;
+ ALOGV("create new connection %lld", (long long)*pConnectionId);
+ }
+ return ResultStatus::OK;
+}
+
+BufferPoolStatus ClientManager::Impl::close(ConnectionId connectionId) {
+ std::unique_lock<std::mutex> lock1(mCache.mMutex);
+ std::unique_lock<std::mutex> lock2(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it != mActive.mClients.end()) {
+ std::shared_ptr<IAccessor> accessor;
+ it->second->getAccessor(&accessor);
+ std::shared_ptr<BufferPoolClient> closing = it->second;
+ mActive.mClients.erase(connectionId);
+ for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
+ // clean up dead client caches
+ std::shared_ptr<IAccessor> cAccessor = cit->first.lock();
+ if (!cAccessor || (accessor && cAccessor.get() == accessor.get())) {
+ cit = mCache.mClients.erase(cit);
+ } else {
+ cit++;
+ }
+ }
+ lock2.unlock();
+ lock1.unlock();
+ closing->flush();
+ return ResultStatus::OK;
+ }
+ return ResultStatus::NOT_FOUND;
+}
+
+BufferPoolStatus ClientManager::Impl::flush(ConnectionId connectionId) {
+ std::shared_ptr<BufferPoolClient> client;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ client = it->second;
+ }
+ return client->flush();
+}
+
+BufferPoolStatus ClientManager::Impl::allocate(
+ ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
+ std::shared_ptr<BufferPoolClient> client;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ client = it->second;
+ }
+#ifdef BUFFERPOOL_CLONE_HANDLES
+ native_handle_t *origHandle;
+ BufferPoolStatus res = client->allocate(params, &origHandle, buffer);
+ if (res != ResultStatus::OK) {
+ return res;
+ }
+ *handle = native_handle_clone(origHandle);
+ if (handle == NULL) {
+ buffer->reset();
+ return ResultStatus::NO_MEMORY;
+ }
+ return ResultStatus::OK;
+#else
+ return client->allocate(params, handle, buffer);
+#endif
+}
+
+BufferPoolStatus ClientManager::Impl::receive(
+ ConnectionId connectionId, TransactionId transactionId,
+ BufferId bufferId, int64_t timestampMs,
+ native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
+ std::shared_ptr<BufferPoolClient> client;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ client = it->second;
+ }
+#ifdef BUFFERPOOL_CLONE_HANDLES
+ native_handle_t *origHandle;
+ BufferPoolStatus res = client->receive(
+ transactionId, bufferId, timestampMs, &origHandle, buffer);
+ if (res != ResultStatus::OK) {
+ return res;
+ }
+ *handle = native_handle_clone(origHandle);
+ if (handle == NULL) {
+ buffer->reset();
+ return ResultStatus::NO_MEMORY;
+ }
+ return ResultStatus::OK;
+#else
+ return client->receive(transactionId, bufferId, timestampMs, handle, buffer);
+#endif
+}
+
+BufferPoolStatus ClientManager::Impl::postSend(
+ ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
+ TransactionId *transactionId, int64_t *timestampMs) {
+ ConnectionId connectionId = buffer->mConnectionId;
+ std::shared_ptr<BufferPoolClient> client;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ client = it->second;
+ }
+ return client->postSend(receiverId, buffer, transactionId, timestampMs);
+}
+
+BufferPoolStatus ClientManager::Impl::getAccessor(
+ ConnectionId connectionId, std::shared_ptr<IAccessor> *accessor) {
+ std::shared_ptr<BufferPoolClient> client;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ client = it->second;
+ }
+ return client->getAccessor(accessor);
+}
+
+void ClientManager::Impl::cleanUp(bool clearCache) {
+ int64_t now = ::android::elapsedRealtime();
+ int64_t lastTransactionMs;
+ std::lock_guard<std::mutex> lock1(mCache.mMutex);
+ if (clearCache || mCache.mLastCleanUpMs + kCleanUpDurationMs < now) {
+ std::lock_guard<std::mutex> lock2(mActive.mMutex);
+ int cleaned = 0;
+ for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
+ if (!it->second->isActive(&lastTransactionMs, clearCache)) {
+ if (lastTransactionMs + kClientTimeoutMs < now) {
+ std::shared_ptr<IAccessor> accessor;
+ it->second->getAccessor(&accessor);
+ it = mActive.mClients.erase(it);
+ ++cleaned;
+ continue;
+ }
+ }
+ ++it;
+ }
+ for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
+ // clean up dead client caches
+ std::shared_ptr<IAccessor> cAccessor = cit->first.lock();
+ if (!cAccessor) {
+ cit = mCache.mClients.erase(cit);
+ } else {
+ ++cit;
+ }
+ }
+ ALOGV("# of cleaned connections: %d", cleaned);
+ mCache.mLastCleanUpMs = now;
+ }
+}
+
+::ndk::ScopedAStatus ClientManager::registerSender(
+ const std::shared_ptr<IAccessor>& in_bufferPool, Registration* _aidl_return) {
+ BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
+ if (mImpl) {
+ status = mImpl->registerSender(in_bufferPool, _aidl_return);
+ }
+ if (status != ResultStatus::OK) {
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
+ }
+ return ::ndk::ScopedAStatus::ok();
+}
+
+// Methods for local use.
+std::shared_ptr<ClientManager> ClientManager::sInstance;
+std::mutex ClientManager::sInstanceLock;
+
+std::shared_ptr<ClientManager> ClientManager::getInstance() {
+ std::lock_guard<std::mutex> lock(sInstanceLock);
+ if (!sInstance) {
+ sInstance = ::ndk::SharedRefBase::make<ClientManager>();
+ // TODO: configure thread count for threadpool properly
+ // after b/261652496 is resolved.
+ }
+ Accessor::createInvalidator();
+ Accessor::createEvictor();
+ return sInstance;
+}
+
+ClientManager::ClientManager() : mImpl(new Impl()) {}
+
+ClientManager::~ClientManager() {
+}
+
+BufferPoolStatus ClientManager::create(
+ const std::shared_ptr<BufferPoolAllocator> &allocator,
+ ConnectionId *pConnectionId) {
+ if (mImpl) {
+ return mImpl->create(allocator, pConnectionId);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::registerSender(
+ const std::shared_ptr<IClientManager> &receiver,
+ ConnectionId senderId,
+ ConnectionId *receiverId,
+ bool *isNew) {
+ if (mImpl) {
+ return mImpl->registerSender(receiver, senderId, receiverId, isNew);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::close(ConnectionId connectionId) {
+ if (mImpl) {
+ return mImpl->close(connectionId);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::flush(ConnectionId connectionId) {
+ if (mImpl) {
+ return mImpl->flush(connectionId);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::allocate(
+ ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
+ if (mImpl) {
+ return mImpl->allocate(connectionId, params, handle, buffer);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::receive(
+ ConnectionId connectionId, TransactionId transactionId,
+ BufferId bufferId, int64_t timestampMs,
+ native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
+ if (mImpl) {
+ return mImpl->receive(connectionId, transactionId, bufferId,
+ timestampMs, handle, buffer);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus ClientManager::postSend(
+ ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
+ TransactionId *transactionId, int64_t* timestampMs) {
+ if (mImpl && buffer) {
+ return mImpl->postSend(receiverId, buffer, transactionId, timestampMs);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+void ClientManager::cleanUp() {
+ if (mImpl) {
+ mImpl->cleanUp(true);
+ }
+}
+
+} // namespace ::aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/Connection.cpp b/media/bufferpool/aidl/default/Connection.cpp
new file mode 100644
index 0000000..53d350d
--- /dev/null
+++ b/media/bufferpool/aidl/default/Connection.cpp
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define LOG_TAG "AidlBufferPoolCon"
+//#define LOG_NDEBUG 0
+
+#include <aidlcommonsupport/NativeHandle.h>
+
+#include "Connection.h"
+#include "Accessor.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+using Buffer = aidl::android::hardware::media::bufferpool2::Buffer;
+using FetchInfo = aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo;
+using FetchResult = aidl::android::hardware::media::bufferpool2::IConnection::FetchResult;
+
+::ndk::ScopedAStatus Connection::fetch(const std::vector<FetchInfo>& in_fetchInfos,
+ std::vector<FetchResult>* _aidl_return) {
+ int success = 0;
+ int failure = 0;
+ if (mInitialized && mAccessor) {
+ for (auto it = in_fetchInfos.begin(); it != in_fetchInfos.end(); ++it) {
+ if (fetch(it->transactionId, it->bufferId, _aidl_return)) {
+ success++;
+ } else {
+ failure++;
+ }
+ }
+ if (failure > 0) {
+ ALOGD("total fetch %d, failure %d", success + failure, failure);
+ }
+ return ::ndk::ScopedAStatus::ok();
+ }
+ return ::ndk::ScopedAStatus::fromServiceSpecificError(ResultStatus::CRITICAL_ERROR);
+}
+
+::ndk::ScopedAStatus Connection::sync() {
+ if (mInitialized && mAccessor) {
+ mAccessor->cleanUp(false);
+ }
+ return ::ndk::ScopedAStatus::ok();
+}
+
+
+bool Connection::fetch(TransactionId transactionId, BufferId bufferId,
+ std::vector<FetchResult> *result) {
+ BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
+ const native_handle_t *handle = nullptr;
+ status = mAccessor->fetch(
+ mConnectionId, transactionId, bufferId, &handle);
+ if (status == ResultStatus::OK) {
+ result->emplace_back(FetchResult::make<FetchResult::buffer>());
+ result->back().get<FetchResult::buffer>().id = bufferId;
+ result->back().get<FetchResult::buffer>().buffer = ::android::dupToAidl(handle);
+ return true;
+ }
+ result->emplace_back(FetchResult::make<FetchResult::failure>(status));
+ return false;
+}
+
+Connection::Connection() : mInitialized(false), mConnectionId(-1LL) {}
+
+Connection::~Connection() {
+ if (mInitialized && mAccessor) {
+ mAccessor->close(mConnectionId);
+ }
+}
+
+void Connection::initialize(
+ const std::shared_ptr<Accessor>& accessor, ConnectionId connectionId) {
+ if (!mInitialized) {
+ mAccessor = accessor;
+ mConnectionId = connectionId;
+ mInitialized = true;
+ }
+}
+
+BufferPoolStatus Connection::flush() {
+ if (mInitialized && mAccessor) {
+ return mAccessor->flush();
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+BufferPoolStatus Connection::allocate(
+ const std::vector<uint8_t> ¶ms, BufferId *bufferId,
+ const native_handle_t **handle) {
+ if (mInitialized && mAccessor) {
+ return mAccessor->allocate(mConnectionId, params, bufferId, handle);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+void Connection::cleanUp(bool clearCache) {
+ if (mInitialized && mAccessor) {
+ mAccessor->cleanUp(clearCache);
+ }
+}
+
+} // namespace ::aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/Connection.h b/media/bufferpool/aidl/default/Connection.h
new file mode 100644
index 0000000..d8298af
--- /dev/null
+++ b/media/bufferpool/aidl/default/Connection.h
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include <aidl/android/hardware/media/bufferpool2/BnConnection.h>
+#include <bufferpool2/BufferPoolTypes.h>
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+struct Accessor;
+
+struct Connection : public BnConnection {
+ // Methods from ::aidl::android::hardware::media::bufferpool2::IConnection.
+ ::ndk::ScopedAStatus fetch(const std::vector<::aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo>& in_fetchInfos, std::vector<::aidl::android::hardware::media::bufferpool2::IConnection::FetchResult>* _aidl_return) override;
+
+ // Methods from ::aidl::android::hardware::media::bufferpool2::IConnection.
+ ::ndk::ScopedAStatus sync() override;
+
+ /**
+ * Invalidates all buffers which are active and/or are ready to be recycled.
+ */
+ BufferPoolStatus flush();
+
+ /**
+ * Allocates a buffer using the specified parameters. Recycles a buffer if
+ * it is possible. The returned buffer can be transferred to other remote
+ * clients(Connection).
+ *
+ * @param params allocation parameters.
+ * @param bufferId Id of the allocated buffer.
+ * @param handle native handle of the allocated buffer.
+ *
+ * @return OK if a buffer is successfully allocated.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus allocate(const std::vector<uint8_t> ¶ms,
+ BufferId *bufferId, const native_handle_t **handle);
+
+ /**
+ * Processes pending buffer status messages and performs periodic cache cleaning
+ * from bufferpool.
+ *
+ * @param clearCache if clearCache is true, bufferpool frees all buffers
+ * waiting to be recycled.
+ */
+ void cleanUp(bool clearCache);
+
+ /** Destructs a connection. */
+ ~Connection();
+
+ /** Creates a connection. */
+ Connection();
+
+ /**
+ * Initializes with the specified buffer pool and the connection id.
+ * The connection id should be unique in the whole system.
+ *
+ * @param accessor the specified buffer pool.
+ * @param connectionId Id.
+ */
+ void initialize(const std::shared_ptr<Accessor> &accessor, ConnectionId connectionId);
+
+ enum : uint32_t {
+ SYNC_BUFFERID = UINT32_MAX,
+ };
+
+private:
+ bool mInitialized;
+ std::shared_ptr<Accessor> mAccessor;
+ ConnectionId mConnectionId;
+
+ bool fetch(
+ uint64_t transactionId,
+ uint32_t bufferId,
+ std::vector<::aidl::android::hardware::media::bufferpool2::IConnection::FetchResult>
+ *result);
+};
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/DataHelper.h b/media/bufferpool/aidl/default/DataHelper.h
new file mode 100644
index 0000000..a90b3c7
--- /dev/null
+++ b/media/bufferpool/aidl/default/DataHelper.h
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <aidl/android/hardware/media/bufferpool2/BufferStatusMessage.h>
+#include <bufferpool2/BufferPoolTypes.h>
+
+#include <map>
+#include <set>
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+// Helper template methods for handling map of set.
+template<class T, class U>
+bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
+ auto iter = mapOfSet->find(key);
+ if (iter == mapOfSet->end()) {
+ std::set<U> valueSet{value};
+ mapOfSet->insert(std::make_pair(key, valueSet));
+ return true;
+ } else if (iter->second.find(value) == iter->second.end()) {
+ iter->second.insert(value);
+ return true;
+ }
+ return false;
+}
+
+// Helper template methods for handling map of set.
+template<class T, class U>
+bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
+ bool ret = false;
+ auto iter = mapOfSet->find(key);
+ if (iter != mapOfSet->end()) {
+ if (iter->second.erase(value) > 0) {
+ ret = true;
+ }
+ if (iter->second.size() == 0) {
+ mapOfSet->erase(iter);
+ }
+ }
+ return ret;
+}
+
+// Helper template methods for handling map of set.
+template<class T, class U>
+bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
+ auto iter = mapOfSet->find(key);
+ if (iter != mapOfSet->end()) {
+ auto setIter = iter->second.find(value);
+ return setIter != iter->second.end();
+ }
+ return false;
+}
+
+// Buffer data structure for internal BufferPool use.(storage/fetching)
+struct InternalBuffer {
+ BufferId mId;
+ size_t mOwnerCount;
+ size_t mTransactionCount;
+ const std::shared_ptr<BufferPoolAllocation> mAllocation;
+ const size_t mAllocSize;
+ const std::vector<uint8_t> mConfig;
+ bool mInvalidated;
+
+ InternalBuffer(
+ BufferId id,
+ const std::shared_ptr<BufferPoolAllocation> &alloc,
+ const size_t allocSize,
+ const std::vector<uint8_t> &allocConfig)
+ : mId(id), mOwnerCount(0), mTransactionCount(0),
+ mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
+ mInvalidated(false) {}
+
+ const native_handle_t *handle() {
+ return mAllocation->handle();
+ }
+
+ void invalidate() {
+ mInvalidated = true;
+ }
+};
+
+// Buffer transacion status/message data structure for internal BufferPool use.
+struct TransactionStatus {
+ TransactionId mId;
+ BufferId mBufferId;
+ ConnectionId mSender;
+ ConnectionId mReceiver;
+ BufferStatus mStatus;
+ int64_t mTimestampMs;
+ bool mSenderValidated;
+
+ TransactionStatus(const BufferStatusMessage &message, int64_t timestampMs) {
+ mId = message.transactionId;
+ mBufferId = message.bufferId;
+ mStatus = message.status;
+ mTimestampMs = timestampMs;
+ if (mStatus == BufferStatus::TRANSFER_TO) {
+ mSender = message.connectionId;
+ mReceiver = message.targetConnectionId;
+ mSenderValidated = true;
+ } else {
+ mSender = -1LL;
+ mReceiver = message.connectionId;
+ mSenderValidated = false;
+ }
+ }
+};
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/Observer.cpp b/media/bufferpool/aidl/default/Observer.cpp
new file mode 100644
index 0000000..a22e825
--- /dev/null
+++ b/media/bufferpool/aidl/default/Observer.cpp
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Observer.h"
+#include "BufferPoolClient.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+Observer::Observer() {
+}
+
+Observer::~Observer() {
+}
+
+::ndk::ScopedAStatus Observer::onMessage(int64_t in_connectionId, int32_t in_msgId) {
+ std::unique_lock<std::mutex> lock(mLock);
+ auto it = mClients.find(in_connectionId);
+ if (it != mClients.end()) {
+ const std::shared_ptr<BufferPoolClient> client = it->second.lock();
+ if (!client) {
+ mClients.erase(it);
+ } else {
+ lock.unlock();
+ client->receiveInvalidation(in_msgId);
+ }
+ }
+ return ::ndk::ScopedAStatus::ok();
+}
+
+void Observer::addClient(ConnectionId connectionId,
+ const std::weak_ptr<BufferPoolClient> &wclient) {
+ std::lock_guard<std::mutex> lock(mLock);
+ for (auto it = mClients.begin(); it != mClients.end();) {
+ if (!it->second.lock() || it->first == connectionId) {
+ it = mClients.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ mClients.insert(std::make_pair(connectionId, wclient));
+
+}
+
+void Observer::delClient(ConnectionId connectionId) {
+ std::lock_guard<std::mutex> lock(mLock);
+ mClients.erase(connectionId);
+}
+
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
diff --git a/media/bufferpool/aidl/default/Observer.h b/media/bufferpool/aidl/default/Observer.h
new file mode 100644
index 0000000..febb21b
--- /dev/null
+++ b/media/bufferpool/aidl/default/Observer.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <mutex>
+#include <aidl/android/hardware/media/bufferpool2/BnObserver.h>
+#include <bufferpool2/BufferPoolTypes.h>
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+class BufferPoolClient;
+
+struct Observer : public BnObserver {
+ ::ndk::ScopedAStatus onMessage(int64_t in_connectionId, int32_t in_msgId) override;
+
+ ~Observer();
+
+ void addClient(ConnectionId connectionId,
+ const std::weak_ptr<BufferPoolClient> &wclient);
+
+ void delClient(ConnectionId connectionId);
+
+private:
+ Observer();
+
+ friend class ::ndk::SharedRefBase;
+
+ std::mutex mLock;
+ std::map<ConnectionId, const std::weak_ptr<BufferPoolClient>> mClients;
+};
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
+
diff --git a/media/bufferpool/aidl/default/include/bufferpool2/BufferPoolTypes.h b/media/bufferpool/aidl/default/include/bufferpool2/BufferPoolTypes.h
new file mode 100644
index 0000000..b833362
--- /dev/null
+++ b/media/bufferpool/aidl/default/include/bufferpool2/BufferPoolTypes.h
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cutils/native_handle.h>
+#include <fmq/AidlMessageQueue.h>
+#include <aidl/android/hardware/media/bufferpool2/BufferStatusMessage.h>
+#include <aidl/android/hardware/media/bufferpool2/BufferInvalidationMessage.h>
+#include <aidl/android/hardware/media/bufferpool2/ResultStatus.h>
+
+namespace aidl::android::hardware::media::bufferpool2 {
+
+struct BufferPoolData {
+ // For local use, to specify a bufferpool (client connection) for buffers.
+ // Retrieved from returned info of IAccessor#connect(android.hardware.media.bufferpool@2.0).
+ int64_t mConnectionId;
+ // BufferId
+ uint32_t mId;
+
+ BufferPoolData() : mConnectionId(0), mId(0) {}
+
+ BufferPoolData(
+ int64_t connectionId, uint32_t id)
+ : mConnectionId(connectionId), mId(id) {}
+
+ ~BufferPoolData() {}
+};
+
+namespace implementation {
+
+using aidl::android::hardware::common::fmq::SynchronizedReadWrite;
+using aidl::android::hardware::common::fmq::UnsynchronizedWrite;
+
+using aidl::android::hardware::media::bufferpool2::BufferStatusMessage;
+using aidl::android::hardware::media::bufferpool2::BufferInvalidationMessage;
+
+typedef uint32_t BufferId;
+typedef uint64_t TransactionId;
+typedef int64_t ConnectionId;
+typedef int32_t BufferPoolStatus;
+
+// AIDL hal description language does not support unsigned.
+int32_t static inline ToAidl(BufferId id) {return static_cast<int32_t>(id);}
+int64_t static inline ToAidl(TransactionId id) {return static_cast<int64_t>(id);}
+
+BufferId static inline FromAidl(int32_t id) {return static_cast<BufferId>(id);}
+TransactionId static inline FromAidl(int64_t id) {return static_cast<TransactionId>(id);}
+
+enum : ConnectionId {
+ INVALID_CONNECTIONID = 0,
+};
+
+typedef ::android::AidlMessageQueue<BufferStatusMessage, SynchronizedReadWrite> BufferStatusQueue;
+typedef aidl::android::hardware::common::fmq::MQDescriptor<BufferStatusMessage, SynchronizedReadWrite>
+ StatusDescriptor;
+
+typedef ::android::AidlMessageQueue<BufferInvalidationMessage, UnsynchronizedWrite>
+ BufferInvalidationQueue;
+typedef aidl::android::hardware::common::fmq::MQDescriptor<BufferInvalidationMessage, UnsynchronizedWrite>
+ InvalidationDescriptor;
+
+/**
+ * Allocation wrapper class for buffer pool.
+ */
+struct BufferPoolAllocation {
+ const native_handle_t *mHandle;
+
+ const native_handle_t *handle() {
+ return mHandle;
+ }
+
+ BufferPoolAllocation(const native_handle_t *handle) : mHandle(handle) {}
+
+ ~BufferPoolAllocation() {};
+};
+
+/**
+ * Allocator wrapper class for buffer pool.
+ */
+class BufferPoolAllocator {
+public:
+
+ /**
+ * Allocate an allocation(buffer) for buffer pool.
+ *
+ * @param params allocation parameters
+ * @param alloc created allocation
+ * @param allocSize size of created allocation
+ *
+ * @return OK when an allocation is created successfully.
+ */
+ virtual BufferPoolStatus allocate(
+ const std::vector<uint8_t> ¶ms,
+ std::shared_ptr<BufferPoolAllocation> *alloc,
+ size_t *allocSize) = 0;
+
+ /**
+ * Returns whether allocation parameters of an old allocation are
+ * compatible with new allocation parameters.
+ */
+ virtual bool compatible(const std::vector<uint8_t> &newParams,
+ const std::vector<uint8_t> &oldParams) = 0;
+
+protected:
+ BufferPoolAllocator() = default;
+
+ virtual ~BufferPoolAllocator() = default;
+};
+
+} // namespace implementation
+} // namespace aidl::android::hareware::media::bufferpool2
+
diff --git a/media/bufferpool/aidl/default/include/bufferpool2/ClientManager.h b/media/bufferpool/aidl/default/include/bufferpool2/ClientManager.h
new file mode 100644
index 0000000..bff75ba
--- /dev/null
+++ b/media/bufferpool/aidl/default/include/bufferpool2/ClientManager.h
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <aidl/android/hardware/media/bufferpool2/IAccessor.h>
+#include <aidl/android/hardware/media/bufferpool2/BnClientManager.h>
+#include <memory>
+#include "BufferPoolTypes.h"
+
+namespace aidl::android::hardware::media::bufferpool2::implementation {
+
+using aidl::android::hardware::media::bufferpool2::BnClientManager;
+using aidl::android::hardware::media::bufferpool2::IClientManager;
+using aidl::android::hardware::media::bufferpool2::IAccessor;
+
+struct ClientManager : public BnClientManager {
+ // Methods from ::aidl::android::hardware::media::bufferpool2::IClientManager follow.
+ ::ndk::ScopedAStatus registerSender(
+ const std::shared_ptr<IAccessor>& in_bufferPool,
+ ::aidl::android::hardware::media::bufferpool2::IClientManager::Registration* _aidl_return)
+ override;
+
+ /** Gets an instance. */
+ static std::shared_ptr<ClientManager> getInstance();
+
+ /**
+ * Creates a local connection with a newly created buffer pool.
+ *
+ * @param allocator for new buffer allocation.
+ * @param pConnectionId Id of the created connection. This is
+ * system-wide unique.
+ *
+ * @return OK when a buffer pool and a local connection is successfully
+ * created.
+ * ResultStatus::NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
+ ConnectionId *pConnectionId);
+
+ /**
+ * Register a created connection as sender for remote process.
+ *
+ * @param receiver The remote receiving process.
+ * @param senderId A local connection which will send buffers to.
+ * @param receiverId Id of the created receiving connection on the receiver
+ * process.
+ * @param isNew @true when the receiving connection is newly created.
+ *
+ * @return OK when the receiving connection is successfully created on the
+ * receiver process.
+ * NOT_FOUND when the sender connection was not found.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus registerSender(const std::shared_ptr<IClientManager> &receiver,
+ ConnectionId senderId,
+ ConnectionId *receiverId,
+ bool *isNew);
+
+ /**
+ * Closes the specified connection.
+ *
+ * @param connectionId The id of the connection.
+ *
+ * @return OK when the connection is closed.
+ * NOT_FOUND when the specified connection was not found.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus close(ConnectionId connectionId);
+
+ /**
+ * Evicts cached allocations. If it's local connection, release the
+ * previous allocations and do not recycle current active allocations.
+ *
+ * @param connectionId The id of the connection.
+ *
+ * @return OK when the connection is resetted.
+ * NOT_FOUND when the specified connection was not found.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus flush(ConnectionId connectionId);
+
+ /**
+ * Allocates a buffer from the specified connection. The output parameter
+ * handle is cloned from the internal handle. So it is safe to use directly,
+ * and it should be deleted and destroyed after use.
+ *
+ * @param connectionId The id of the connection.
+ * @param params The allocation parameters.
+ * @param handle The native handle to the allocated buffer. handle
+ * should be cloned before use.
+ * @param buffer The allocated buffer.
+ *
+ * @return OK when a buffer was allocated successfully.
+ * NOT_FOUND when the specified connection was not found.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus allocate(ConnectionId connectionId,
+ const std::vector<uint8_t> ¶ms,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ /**
+ * Receives a buffer for the transaction. The output parameter handle is
+ * cloned from the internal handle. So it is safe to use directly, and it
+ * should be deleted and destoyed after use.
+ *
+ * @param connectionId The id of the receiving connection.
+ * @param transactionId The id for the transaction.
+ * @param bufferId The id for the buffer.
+ * @param timestampMs The timestamp of the buffer is being sent.
+ * @param handle The native handle to the allocated buffer. handle
+ * should be cloned before use.
+ * @param buffer The received buffer.
+ *
+ * @return OK when a buffer was received successfully.
+ * NOT_FOUND when the specified connection was not found.
+ * NO_MEMORY when there is no memory.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus receive(ConnectionId connectionId,
+ TransactionId transactionId,
+ BufferId bufferId,
+ int64_t timestampMs,
+ native_handle_t **handle,
+ std::shared_ptr<BufferPoolData> *buffer);
+
+ /**
+ * Posts a buffer transfer transaction to the buffer pool. Sends a buffer
+ * to other remote clients(connection) after this call has been succeeded.
+ *
+ * @param receiverId The id of the receiving connection.
+ * @param buffer to transfer
+ * @param transactionId Id of the transfer transaction.
+ * @param timestampMs The timestamp of the buffer transaction is being
+ * posted.
+ *
+ * @return OK when a buffer transaction was posted successfully.
+ * NOT_FOUND when the sending connection was not found.
+ * CRITICAL_ERROR otherwise.
+ */
+ BufferPoolStatus postSend(ConnectionId receiverId,
+ const std::shared_ptr<BufferPoolData> &buffer,
+ TransactionId *transactionId,
+ int64_t *timestampMs);
+
+ /**
+ * Time out inactive lingering connections and close.
+ */
+ void cleanUp();
+
+ /** Destructs the manager of buffer pool clients. */
+ ~ClientManager();
+private:
+ static std::shared_ptr<ClientManager> sInstance;
+ static std::mutex sInstanceLock;
+
+ class Impl;
+ const std::unique_ptr<Impl> mImpl;
+
+ friend class ::ndk::SharedRefBase;
+
+ ClientManager();
+};
+
+} // namespace aidl::android::hardware::media::bufferpool2::implementation
+
diff --git a/media/bufferpool/aidl/default/tests/Android.bp b/media/bufferpool/aidl/default/tests/Android.bp
new file mode 100644
index 0000000..549af57
--- /dev/null
+++ b/media/bufferpool/aidl/default/tests/Android.bp
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package {
+ // See: http://go/android-license-faq
+ // A large-scale-change added 'default_applicable_licenses' to import
+ // all of the 'license_kinds' from "frameworks_av_license"
+ // to get the below license kinds:
+ // SPDX-license-identifier-Apache-2.0
+ default_applicable_licenses: ["hardware_interfaces_license"],
+}
+
+cc_test {
+ name: "VtsVndkAidlBufferpool2V1_0TargetSingleTest",
+ test_suites: ["device-tests"],
+ defaults: ["VtsHalTargetTestDefaults"],
+ srcs: [
+ "allocator.cpp",
+ "single.cpp",
+ ],
+ shared_libs: [
+ "libbinder_ndk",
+ "libcutils",
+ "libfmq",
+ "liblog",
+ "libutils",
+ "android.hardware.media.bufferpool2-V1-ndk",
+ ],
+ static_libs: [
+ "libaidlcommonsupport",
+ "libstagefright_aidl_bufferpool2"
+ ],
+ compile_multilib: "both",
+}
+
+cc_test {
+ name: "VtsVndkAidlBufferpool2V1_0TargetMultiTest",
+ test_suites: ["device-tests"],
+ defaults: ["VtsHalTargetTestDefaults"],
+ srcs: [
+ "allocator.cpp",
+ "multi.cpp",
+ ],
+ shared_libs: [
+ "libbinder_ndk",
+ "libcutils",
+ "libfmq",
+ "liblog",
+ "libutils",
+ "android.hardware.media.bufferpool2-V1-ndk",
+ ],
+ static_libs: [
+ "libaidlcommonsupport",
+ "libstagefright_aidl_bufferpool2"
+ ],
+ compile_multilib: "both",
+}
+
+cc_test {
+ name: "VtsVndkAidlBufferpool2V1_0TargetCondTest",
+ test_suites: ["device-tests"],
+ defaults: ["VtsHalTargetTestDefaults"],
+ srcs: [
+ "allocator.cpp",
+ "cond.cpp",
+ ],
+ shared_libs: [
+ "libbinder_ndk",
+ "libcutils",
+ "libfmq",
+ "liblog",
+ "libutils",
+ "android.hardware.media.bufferpool2-V1-ndk",
+ ],
+ static_libs: [
+ "libaidlcommonsupport",
+ "libstagefright_aidl_bufferpool2"
+ ],
+ compile_multilib: "both",
+}
diff --git a/media/bufferpool/aidl/default/tests/allocator.cpp b/media/bufferpool/aidl/default/tests/allocator.cpp
new file mode 100644
index 0000000..16b33a6
--- /dev/null
+++ b/media/bufferpool/aidl/default/tests/allocator.cpp
@@ -0,0 +1,251 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cutils/ashmem.h>
+#include <sys/mman.h>
+#include "allocator.h"
+
+union Params {
+ struct {
+ uint32_t capacity;
+ } data;
+ uint8_t array[0];
+ Params() : data{0} {}
+ Params(uint32_t size)
+ : data{size} {}
+};
+
+
+namespace {
+
+struct HandleAshmem : public native_handle_t {
+ HandleAshmem(int ashmemFd, size_t size)
+ : native_handle_t(cHeader),
+ mFds{ ashmemFd },
+ mInts{ int (size & 0xFFFFFFFF), int((uint64_t(size) >> 32) & 0xFFFFFFFF), kMagic } {}
+
+ int ashmemFd() const { return mFds.mAshmem; }
+ size_t size() const {
+ return size_t(unsigned(mInts.mSizeLo))
+ | size_t(uint64_t(unsigned(mInts.mSizeHi)) << 32);
+ }
+
+ static bool isValid(const native_handle_t * const o);
+
+protected:
+ struct {
+ int mAshmem;
+ } mFds;
+ struct {
+ int mSizeLo;
+ int mSizeHi;
+ int mMagic;
+ } mInts;
+
+private:
+ enum {
+ kMagic = 'ahm\x00',
+ numFds = sizeof(mFds) / sizeof(int),
+ numInts = sizeof(mInts) / sizeof(int),
+ version = sizeof(native_handle_t)
+ };
+ const static native_handle_t cHeader;
+};
+
+const native_handle_t HandleAshmem::cHeader = {
+ HandleAshmem::version,
+ HandleAshmem::numFds,
+ HandleAshmem::numInts,
+ {}
+};
+
+bool HandleAshmem::isValid(const native_handle_t * const o) {
+ if (!o || memcmp(o, &cHeader, sizeof(cHeader))) {
+ return false;
+ }
+ const HandleAshmem *other = static_cast<const HandleAshmem*>(o);
+ return other->mInts.mMagic == kMagic;
+}
+
+class AllocationAshmem {
+private:
+ AllocationAshmem(int ashmemFd, size_t capacity, bool res)
+ : mHandle(ashmemFd, capacity),
+ mInit(res) {}
+
+public:
+ static AllocationAshmem *Alloc(size_t size) {
+ constexpr static const char *kAllocationTag = "bufferpool_test";
+ int ashmemFd = ashmem_create_region(kAllocationTag, size);
+ return new AllocationAshmem(ashmemFd, size, ashmemFd >= 0);
+ }
+
+ ~AllocationAshmem() {
+ if (mInit) {
+ native_handle_close(&mHandle);
+ }
+ }
+
+ const HandleAshmem *handle() {
+ return &mHandle;
+ }
+
+private:
+ HandleAshmem mHandle;
+ bool mInit;
+ // TODO: mapping and map fd
+};
+
+struct AllocationDtor {
+ AllocationDtor(const std::shared_ptr<AllocationAshmem> &alloc)
+ : mAlloc(alloc) {}
+
+ void operator()(BufferPoolAllocation *poolAlloc) { delete poolAlloc; }
+
+ const std::shared_ptr<AllocationAshmem> mAlloc;
+};
+
+}
+
+void IpcMutex::init() {
+ pthread_mutexattr_t mattr;
+ pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(&lock, &mattr);
+ pthread_mutexattr_destroy(&mattr);
+
+ pthread_condattr_t cattr;
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+ pthread_cond_init(&cond, &cattr);
+ pthread_condattr_destroy(&cattr);
+}
+
+IpcMutex *IpcMutex::Import(void *pMutex) {
+ return reinterpret_cast<IpcMutex *>(pMutex);
+}
+
+
+BufferPoolStatus TestBufferPoolAllocator::allocate(
+ const std::vector<uint8_t> ¶ms,
+ std::shared_ptr<BufferPoolAllocation> *alloc,
+ size_t *allocSize) {
+ Params ashmemParams;
+ memcpy(&ashmemParams, params.data(), std::min(sizeof(Params), params.size()));
+
+ std::shared_ptr<AllocationAshmem> ashmemAlloc =
+ std::shared_ptr<AllocationAshmem>(
+ AllocationAshmem::Alloc(ashmemParams.data.capacity));
+ if (ashmemAlloc) {
+ BufferPoolAllocation *ptr = new BufferPoolAllocation(ashmemAlloc->handle());
+ if (ptr) {
+ *alloc = std::shared_ptr<BufferPoolAllocation>(ptr, AllocationDtor(ashmemAlloc));
+ if (*alloc) {
+ *allocSize = ashmemParams.data.capacity;
+ return ResultStatus::OK;
+ }
+ delete ptr;
+ return ResultStatus::NO_MEMORY;
+ }
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
+bool TestBufferPoolAllocator::compatible(const std::vector<uint8_t> &newParams,
+ const std::vector<uint8_t> &oldParams) {
+ size_t newSize = newParams.size();
+ size_t oldSize = oldParams.size();
+ if (newSize == oldSize) {
+ for (size_t i = 0; i < newSize; ++i) {
+ if (newParams[i] != oldParams[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+}
+
+bool TestBufferPoolAllocator::Fill(const native_handle_t *handle, const unsigned char val) {
+ if (!HandleAshmem::isValid(handle)) {
+ return false;
+ }
+ const HandleAshmem *o = static_cast<const HandleAshmem*>(handle);
+ unsigned char *ptr = (unsigned char *)mmap(
+ NULL, o->size(), PROT_READ|PROT_WRITE, MAP_SHARED, o->ashmemFd(), 0);
+
+ if (ptr != MAP_FAILED) {
+ for (size_t i = 0; i < o->size(); ++i) {
+ ptr[i] = val;
+ }
+ munmap(ptr, o->size());
+ return true;
+ }
+ return false;
+}
+
+bool TestBufferPoolAllocator::Verify(const native_handle_t *handle, const unsigned char val) {
+ if (!HandleAshmem::isValid(handle)) {
+ return false;
+ }
+ const HandleAshmem *o = static_cast<const HandleAshmem*>(handle);
+ unsigned char *ptr = (unsigned char *)mmap(
+ NULL, o->size(), PROT_READ, MAP_SHARED, o->ashmemFd(), 0);
+
+ if (ptr != MAP_FAILED) {
+ bool res = true;
+ for (size_t i = 0; i < o->size(); ++i) {
+ if (ptr[i] != val) {
+ res = false;
+ break;
+ }
+ }
+ munmap(ptr, o->size());
+ return res;
+ }
+ return false;
+}
+
+bool TestBufferPoolAllocator::MapMemoryForMutex(const native_handle_t *handle, void **mem) {
+ if (!HandleAshmem::isValid(handle)) {
+ return false;
+ }
+ const HandleAshmem *o = static_cast<const HandleAshmem*>(handle);
+ *mem = mmap(
+ NULL, o->size(), PROT_READ|PROT_WRITE, MAP_SHARED, o->ashmemFd(), 0);
+ if (*mem == MAP_FAILED || *mem == nullptr) {
+ return false;
+ }
+ return true;
+}
+
+bool TestBufferPoolAllocator::UnmapMemoryForMutex(void *mem) {
+ munmap(mem, sizeof(IpcMutex));
+ return true;
+}
+
+void getTestAllocatorParams(std::vector<uint8_t> *params) {
+ constexpr static int kAllocationSize = 1024 * 10;
+ Params ashmemParams(kAllocationSize);
+
+ params->assign(ashmemParams.array, ashmemParams.array + sizeof(ashmemParams));
+}
+
+void getIpcMutexParams(std::vector<uint8_t> *params) {
+ Params ashmemParams(sizeof(IpcMutex));
+
+ params->assign(ashmemParams.array, ashmemParams.array + sizeof(ashmemParams));
+}
diff --git a/media/bufferpool/aidl/default/tests/allocator.h b/media/bufferpool/aidl/default/tests/allocator.h
new file mode 100644
index 0000000..7e7203f
--- /dev/null
+++ b/media/bufferpool/aidl/default/tests/allocator.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pthread.h>
+#include <bufferpool2/BufferPoolTypes.h>
+
+using aidl::android::hardware::media::bufferpool2::implementation::
+ BufferPoolStatus;
+using aidl::android::hardware::media::bufferpool2::implementation::
+ BufferPoolAllocation;
+using aidl::android::hardware::media::bufferpool2::implementation::
+ BufferPoolAllocator;
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+
+struct IpcMutex {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int counter = 0;
+ bool signalled = false;
+
+ void init();
+
+ static IpcMutex *Import(void *mem);
+};
+
+// buffer allocator for the tests
+class TestBufferPoolAllocator : public BufferPoolAllocator {
+ public:
+ TestBufferPoolAllocator() {}
+
+ ~TestBufferPoolAllocator() override {}
+
+ BufferPoolStatus allocate(const std::vector<uint8_t> ¶ms,
+ std::shared_ptr<BufferPoolAllocation> *alloc,
+ size_t *allocSize) override;
+
+ bool compatible(const std::vector<uint8_t> &newParams,
+ const std::vector<uint8_t> &oldParams) override;
+
+ static bool Fill(const native_handle_t *handle, const unsigned char val);
+
+ static bool Verify(const native_handle_t *handle, const unsigned char val);
+
+ static bool MapMemoryForMutex(const native_handle_t *handle, void **mem);
+
+ static bool UnmapMemoryForMutex(void *mem);
+};
+
+// retrieve buffer allocator parameters
+void getTestAllocatorParams(std::vector<uint8_t> *params);
+
+void getIpcMutexParams(std::vector<uint8_t> *params);
diff --git a/media/bufferpool/aidl/default/tests/cond.cpp b/media/bufferpool/aidl/default/tests/cond.cpp
new file mode 100644
index 0000000..6d469ce
--- /dev/null
+++ b/media/bufferpool/aidl/default/tests/cond.cpp
@@ -0,0 +1,280 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "buffferpool_unit_test"
+
+#include <gtest/gtest.h>
+
+#include <android/binder_manager.h>
+#include <android/binder_process.h>
+#include <android/binder_stability.h>
+#include <android-base/logging.h>
+#include <bufferpool2/ClientManager.h>
+
+#include <errno.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <memory>
+#include <vector>
+
+#include "allocator.h"
+
+using aidl::android::hardware::media::bufferpool2::IClientManager;
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+using aidl::android::hardware::media::bufferpool2::implementation::BufferId;
+using aidl::android::hardware::media::bufferpool2::implementation::ClientManager;
+using aidl::android::hardware::media::bufferpool2::implementation::ConnectionId;
+using aidl::android::hardware::media::bufferpool2::implementation::TransactionId;
+using aidl::android::hardware::media::bufferpool2::BufferPoolData;
+
+namespace {
+
+const std::string testInstance = std::string() + ClientManager::descriptor + "/condtest";
+
+// communication message types between processes.
+enum PipeCommand : int32_t {
+ INIT_OK = 0,
+ INIT_ERROR,
+ SEND,
+ RECEIVE_OK,
+ RECEIVE_ERROR,
+};
+
+// communication message between processes.
+union PipeMessage {
+ struct {
+ int32_t command;
+ BufferId bufferId;
+ ConnectionId connectionId;
+ TransactionId transactionId;
+ int64_t timestampUs;
+ } data;
+ char array[0];
+};
+
+constexpr int kSignalInt = 200;
+
+// media.bufferpool test setup
+class BufferpoolMultiTest : public ::testing::Test {
+ public:
+ virtual void SetUp() override {
+ BufferPoolStatus status;
+ mReceiverPid = -1;
+ mConnectionValid = false;
+
+ ASSERT_TRUE(pipe(mCommandPipeFds) == 0);
+ ASSERT_TRUE(pipe(mResultPipeFds) == 0);
+
+ mReceiverPid = fork();
+ ASSERT_TRUE(mReceiverPid >= 0);
+
+ if (mReceiverPid == 0) {
+ doReceiver();
+ // In order to ignore gtest behaviour, wait for being killed from
+ // tearDown
+ pause();
+ }
+
+ mManager = ClientManager::getInstance();
+ ASSERT_NE(mManager, nullptr);
+
+ mAllocator = std::make_shared<TestBufferPoolAllocator>();
+ ASSERT_TRUE((bool)mAllocator);
+
+ status = mManager->create(mAllocator, &mConnectionId);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ mConnectionValid = true;
+ }
+
+ virtual void TearDown() override {
+ if (mReceiverPid > 0) {
+ kill(mReceiverPid, SIGKILL);
+ int wstatus;
+ wait(&wstatus);
+ }
+
+ if (mConnectionValid) {
+ mManager->close(mConnectionId);
+ }
+ }
+
+ protected:
+ static void description(const std::string& description) {
+ RecordProperty("description", description);
+ }
+
+ std::shared_ptr<ClientManager> mManager;
+ std::shared_ptr<BufferPoolAllocator> mAllocator;
+ bool mConnectionValid;
+ ConnectionId mConnectionId;
+ pid_t mReceiverPid;
+ int mCommandPipeFds[2];
+ int mResultPipeFds[2];
+
+ bool sendMessage(int *pipes, const PipeMessage &message) {
+ int ret = write(pipes[1], message.array, sizeof(PipeMessage));
+ return ret == sizeof(PipeMessage);
+ }
+
+ bool receiveMessage(int *pipes, PipeMessage *message) {
+ int ret = read(pipes[0], message->array, sizeof(PipeMessage));
+ return ret == sizeof(PipeMessage);
+ }
+
+ void doReceiver() {
+ ABinderProcess_setThreadPoolMaxThreadCount(1);
+ ABinderProcess_startThreadPool();
+ PipeMessage message;
+ mManager = ClientManager::getInstance();
+ if (!mManager) {
+ message.data.command = PipeCommand::INIT_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ auto binder = mManager->asBinder();
+ AIBinder_forceDowngradeToSystemStability(binder.get());
+ binder_status_t status =
+ AServiceManager_addService(binder.get(), testInstance.c_str());
+ CHECK_EQ(status, STATUS_OK);
+ if (status != android::OK) {
+ message.data.command = PipeCommand::INIT_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ message.data.command = PipeCommand::INIT_OK;
+ sendMessage(mResultPipeFds, message);
+
+ int val = 0;
+ receiveMessage(mCommandPipeFds, &message);
+ {
+ native_handle_t *rhandle = nullptr;
+ std::shared_ptr<BufferPoolData> rbuffer;
+ void *mem = nullptr;
+ IpcMutex *mutex = nullptr;
+ BufferPoolStatus status = mManager->receive(
+ message.data.connectionId, message.data.transactionId,
+ message.data.bufferId, message.data.timestampUs, &rhandle, &rbuffer);
+ mManager->close(message.data.connectionId);
+ if (status != ResultStatus::OK) {
+ message.data.command = PipeCommand::RECEIVE_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ if (!TestBufferPoolAllocator::MapMemoryForMutex(rhandle, &mem)) {
+ message.data.command = PipeCommand::RECEIVE_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ mutex = IpcMutex::Import(mem);
+ pthread_mutex_lock(&(mutex->lock));
+ while (mutex->signalled != true) {
+ pthread_cond_wait(&(mutex->cond), &(mutex->lock));
+ }
+ val = mutex->counter;
+ pthread_mutex_unlock(&(mutex->lock));
+
+ (void)TestBufferPoolAllocator::UnmapMemoryForMutex(mem);
+ if (rhandle) {
+ native_handle_close(rhandle);
+ native_handle_delete(rhandle);
+ }
+ }
+ if (val == kSignalInt) {
+ message.data.command = PipeCommand::RECEIVE_OK;
+ } else {
+ message.data.command = PipeCommand::RECEIVE_ERROR;
+ }
+ sendMessage(mResultPipeFds, message);
+ }
+};
+
+// Buffer transfer test between processes.
+TEST_F(BufferpoolMultiTest, TransferBuffer) {
+ BufferPoolStatus status;
+ PipeMessage message;
+
+ ASSERT_TRUE(receiveMessage(mResultPipeFds, &message));
+ ABinderProcess_setThreadPoolMaxThreadCount(1);
+ ABinderProcess_startThreadPool();
+
+
+ std::shared_ptr<IClientManager> receiver =
+ IClientManager::fromBinder(
+ ndk::SpAIBinder(AServiceManager_waitForService(testInstance.c_str())));
+ ASSERT_NE(receiver, nullptr);
+ ConnectionId receiverId;
+
+ bool isNew = true;
+ status = mManager->registerSender(receiver, mConnectionId, &receiverId, &isNew);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ {
+ native_handle_t *shandle = nullptr;
+ std::shared_ptr<BufferPoolData> sbuffer;
+ TransactionId transactionId;
+ int64_t postUs;
+ std::vector<uint8_t> vecParams;
+ void *mem = nullptr;
+ IpcMutex *mutex = nullptr;
+
+ getIpcMutexParams(&vecParams);
+ status = mManager->allocate(mConnectionId, vecParams, &shandle, &sbuffer);
+ ASSERT_TRUE(status == ResultStatus::OK);
+
+ ASSERT_TRUE(TestBufferPoolAllocator::MapMemoryForMutex(shandle, &mem));
+
+ mutex = new(mem) IpcMutex();
+ mutex->init();
+
+ status = mManager->postSend(receiverId, sbuffer, &transactionId, &postUs);
+ ASSERT_TRUE(status == ResultStatus::OK);
+
+ message.data.command = PipeCommand::SEND;
+ message.data.bufferId = sbuffer->mId;
+ message.data.connectionId = receiverId;
+ message.data.transactionId = transactionId;
+ message.data.timestampUs = postUs;
+ sendMessage(mCommandPipeFds, message);
+ for (int i=0; i < 200000000; ++i) {
+ // no-op in order to ensure
+ // pthread_cond_wait is called before pthread_cond_signal
+ }
+ pthread_mutex_lock(&(mutex->lock));
+ mutex->counter = kSignalInt;
+ mutex->signalled = true;
+ pthread_cond_signal(&(mutex->cond));
+ pthread_mutex_unlock(&(mutex->lock));
+ (void)TestBufferPoolAllocator::UnmapMemoryForMutex(mem);
+ if (shandle) {
+ native_handle_close(shandle);
+ native_handle_delete(shandle);
+ }
+ }
+ EXPECT_TRUE(receiveMessage(mResultPipeFds, &message));
+ EXPECT_TRUE(message.data.command == PipeCommand::RECEIVE_OK);
+}
+
+} // anonymous namespace
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ int status = RUN_ALL_TESTS();
+ LOG(INFO) << "Test result = " << status;
+ return status;
+}
diff --git a/media/bufferpool/aidl/default/tests/multi.cpp b/media/bufferpool/aidl/default/tests/multi.cpp
new file mode 100644
index 0000000..8806eb0
--- /dev/null
+++ b/media/bufferpool/aidl/default/tests/multi.cpp
@@ -0,0 +1,243 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "buffferpool_unit_test"
+
+#include <gtest/gtest.h>
+
+#include <android/binder_manager.h>
+#include <android/binder_process.h>
+#include <android/binder_stability.h>
+#include <android-base/logging.h>
+#include <bufferpool2/ClientManager.h>
+
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <memory>
+#include <vector>
+
+#include "allocator.h"
+
+using aidl::android::hardware::media::bufferpool2::IClientManager;
+using aidl::android::hardware::media::bufferpool2::ResultStatus;
+using aidl::android::hardware::media::bufferpool2::implementation::BufferId;
+using aidl::android::hardware::media::bufferpool2::implementation::ClientManager;
+using aidl::android::hardware::media::bufferpool2::implementation::ConnectionId;
+using aidl::android::hardware::media::bufferpool2::implementation::TransactionId;
+using aidl::android::hardware::media::bufferpool2::BufferPoolData;
+
+namespace {
+
+const std::string testInstance = std::string() + ClientManager::descriptor + "/multitest";
+
+// communication message types between processes.
+enum PipeCommand : int32_t {
+ INIT_OK = 0,
+ INIT_ERROR,
+ SEND,
+ RECEIVE_OK,
+ RECEIVE_ERROR,
+};
+
+// communication message between processes.
+union PipeMessage {
+ struct {
+ int32_t command;
+ BufferId bufferId;
+ ConnectionId connectionId;
+ TransactionId transactionId;
+ int64_t timestampUs;
+ } data;
+ char array[0];
+};
+
+// media.bufferpool test setup
+class BufferpoolMultiTest : public ::testing::Test {
+ public:
+ virtual void SetUp() override {
+ BufferPoolStatus status;
+ mReceiverPid = -1;
+ mConnectionValid = false;
+
+ ASSERT_TRUE(pipe(mCommandPipeFds) == 0);
+ ASSERT_TRUE(pipe(mResultPipeFds) == 0);
+
+ mReceiverPid = fork();
+ ASSERT_TRUE(mReceiverPid >= 0);
+
+ if (mReceiverPid == 0) {
+ doReceiver();
+ // In order to ignore gtest behaviour, wait for being killed from
+ // tearDown
+ pause();
+ }
+ mManager = ClientManager::getInstance();
+ ASSERT_NE(mManager, nullptr);
+
+ mAllocator = std::make_shared<TestBufferPoolAllocator>();
+ ASSERT_TRUE((bool)mAllocator);
+
+ status = mManager->create(mAllocator, &mConnectionId);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ mConnectionValid = true;
+ }
+
+ virtual void TearDown() override {
+ if (mReceiverPid > 0) {
+ kill(mReceiverPid, SIGKILL);
+ int wstatus;
+ wait(&wstatus);
+ }
+
+ if (mConnectionValid) {
+ mManager->close(mConnectionId);
+ }
+ }
+
+ protected:
+ static void description(const std::string& description) {
+ RecordProperty("description", description);
+ }
+
+ std::shared_ptr<ClientManager> mManager;
+ std::shared_ptr<BufferPoolAllocator> mAllocator;
+ bool mConnectionValid;
+ ConnectionId mConnectionId;
+ pid_t mReceiverPid;
+ int mCommandPipeFds[2];
+ int mResultPipeFds[2];
+
+ bool sendMessage(int *pipes, const PipeMessage &message) {
+ int ret = write(pipes[1], message.array, sizeof(PipeMessage));
+ return ret == sizeof(PipeMessage);
+ }
+
+ bool receiveMessage(int *pipes, PipeMessage *message) {
+ int ret = read(pipes[0], message->array, sizeof(PipeMessage));
+ return ret == sizeof(PipeMessage);
+ }
+
+ void doReceiver() {
+ ABinderProcess_setThreadPoolMaxThreadCount(1);
+ ABinderProcess_startThreadPool();
+ PipeMessage message;
+ mManager = ClientManager::getInstance();
+ if (!mManager) {
+ message.data.command = PipeCommand::INIT_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ auto binder = mManager->asBinder();
+ AIBinder_forceDowngradeToSystemStability(binder.get());
+ binder_status_t status =
+ AServiceManager_addService(binder.get(), testInstance.c_str());
+ CHECK_EQ(status, STATUS_OK);
+ if (status != android::OK) {
+ message.data.command = PipeCommand::INIT_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ message.data.command = PipeCommand::INIT_OK;
+ sendMessage(mResultPipeFds, message);
+
+ receiveMessage(mCommandPipeFds, &message);
+ {
+ native_handle_t *rhandle = nullptr;
+ std::shared_ptr<BufferPoolData> rbuffer;
+ BufferPoolStatus status = mManager->receive(
+ message.data.connectionId, message.data.transactionId,
+ message.data.bufferId, message.data.timestampUs, &rhandle, &rbuffer);
+ mManager->close(message.data.connectionId);
+ if (status != ResultStatus::OK) {
+ message.data.command = PipeCommand::RECEIVE_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ if (!TestBufferPoolAllocator::Verify(rhandle, 0x77)) {
+ message.data.command = PipeCommand::RECEIVE_ERROR;
+ sendMessage(mResultPipeFds, message);
+ return;
+ }
+ if (rhandle) {
+ native_handle_close(rhandle);
+ native_handle_delete(rhandle);
+ }
+ }
+ message.data.command = PipeCommand::RECEIVE_OK;
+ sendMessage(mResultPipeFds, message);
+ }
+};
+
+// Buffer transfer test between processes.
+TEST_F(BufferpoolMultiTest, TransferBuffer) {
+ BufferPoolStatus status;
+ PipeMessage message;
+
+ ASSERT_TRUE(receiveMessage(mResultPipeFds, &message));
+ ABinderProcess_setThreadPoolMaxThreadCount(1);
+ ABinderProcess_startThreadPool();
+
+ std::shared_ptr<IClientManager> receiver = IClientManager::fromBinder(ndk::SpAIBinder(
+ AServiceManager_waitForService(testInstance.c_str())));
+ ASSERT_NE(receiver, nullptr);
+ ConnectionId receiverId;
+
+ bool isNew = true;
+ status = mManager->registerSender(receiver, mConnectionId, &receiverId, &isNew);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ {
+ native_handle_t *shandle = nullptr;
+ std::shared_ptr<BufferPoolData> sbuffer;
+ TransactionId transactionId;
+ int64_t postUs;
+ std::vector<uint8_t> vecParams;
+
+ getTestAllocatorParams(&vecParams);
+ status = mManager->allocate(mConnectionId, vecParams, &shandle, &sbuffer);
+ ASSERT_TRUE(status == ResultStatus::OK);
+
+ ASSERT_TRUE(TestBufferPoolAllocator::Fill(shandle, 0x77));
+ if (shandle) {
+ native_handle_close(shandle);
+ native_handle_delete(shandle);
+ }
+
+ status = mManager->postSend(receiverId, sbuffer, &transactionId, &postUs);
+ ASSERT_TRUE(status == ResultStatus::OK);
+
+ message.data.command = PipeCommand::SEND;
+ message.data.bufferId = sbuffer->mId;
+ message.data.connectionId = receiverId;
+ message.data.transactionId = transactionId;
+ message.data.timestampUs = postUs;
+ sendMessage(mCommandPipeFds, message);
+ }
+ EXPECT_TRUE(receiveMessage(mResultPipeFds, &message));
+ EXPECT_TRUE(message.data.command == PipeCommand::RECEIVE_OK);
+}
+
+} // anonymous namespace
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ int status = RUN_ALL_TESTS();
+ LOG(INFO) << "Test result = " << status;
+ return status;
+}
diff --git a/media/bufferpool/aidl/default/tests/single.cpp b/media/bufferpool/aidl/default/tests/single.cpp
new file mode 100644
index 0000000..66aa5e9
--- /dev/null
+++ b/media/bufferpool/aidl/default/tests/single.cpp
@@ -0,0 +1,179 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "buffferpool_unit_test"
+
+#include <gtest/gtest.h>
+
+#include <android-base/logging.h>
+#include <binder/ProcessState.h>
+#include <bufferpool2/ClientManager.h>
+#include <unistd.h>
+#include <iostream>
+#include <memory>
+#include <vector>
+#include "allocator.h"
+
+using aidl::android::hardware::media::bufferpool2::implementation::BufferId;
+using aidl::android::hardware::media::bufferpool2::implementation::BufferPoolStatus;
+using aidl::android::hardware::media::bufferpool2::implementation::ClientManager;
+using aidl::android::hardware::media::bufferpool2::implementation::ConnectionId;
+using aidl::android::hardware::media::bufferpool2::implementation::TransactionId;
+using aidl::android::hardware::media::bufferpool2::BufferPoolData;
+
+namespace {
+
+// Number of iteration for buffer allocation test.
+constexpr static int kNumAllocationTest = 3;
+
+// Number of iteration for buffer recycling test.
+constexpr static int kNumRecycleTest = 3;
+
+// media.bufferpool test setup
+class BufferpoolSingleTest : public ::testing::Test {
+ public:
+ virtual void SetUp() override {
+ BufferPoolStatus status;
+ mConnectionValid = false;
+
+ mManager = ClientManager::getInstance();
+ ASSERT_NE(mManager, nullptr);
+
+ mAllocator = std::make_shared<TestBufferPoolAllocator>();
+ ASSERT_TRUE((bool)mAllocator);
+
+ status = mManager->create(mAllocator, &mConnectionId);
+ ASSERT_TRUE(status == ResultStatus::OK);
+
+ mConnectionValid = true;
+
+ bool isNew = true;
+ status = mManager->registerSender(mManager, mConnectionId, &mReceiverId, &isNew);
+ ASSERT_TRUE(status == ResultStatus::OK && isNew == false &&
+ mReceiverId == mConnectionId);
+ }
+
+ virtual void TearDown() override {
+ if (mConnectionValid) {
+ mManager->close(mConnectionId);
+ }
+ }
+
+ protected:
+ static void description(const std::string& description) {
+ RecordProperty("description", description);
+ }
+
+ std::shared_ptr<ClientManager> mManager;
+ std::shared_ptr<BufferPoolAllocator> mAllocator;
+ bool mConnectionValid;
+ ConnectionId mConnectionId;
+ ConnectionId mReceiverId;
+
+};
+
+// Buffer allocation test.
+// Check whether each buffer allocation is done successfully with
+// unique buffer id.
+TEST_F(BufferpoolSingleTest, AllocateBuffer) {
+ BufferPoolStatus status;
+ std::vector<uint8_t> vecParams;
+ getTestAllocatorParams(&vecParams);
+
+ std::shared_ptr<BufferPoolData> buffer[kNumAllocationTest];
+ native_handle_t *allocHandle = nullptr;
+ for (int i = 0; i < kNumAllocationTest; ++i) {
+ status = mManager->allocate(mConnectionId, vecParams, &allocHandle, &buffer[i]);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ if (allocHandle) {
+ native_handle_close(allocHandle);
+ native_handle_delete(allocHandle);
+ }
+ }
+ for (int i = 0; i < kNumAllocationTest; ++i) {
+ for (int j = i + 1; j < kNumAllocationTest; ++j) {
+ ASSERT_TRUE(buffer[i]->mId != buffer[j]->mId);
+ }
+ }
+ EXPECT_TRUE(kNumAllocationTest > 1);
+}
+
+// Buffer recycle test.
+// Check whether de-allocated buffers are recycled.
+TEST_F(BufferpoolSingleTest, RecycleBuffer) {
+ BufferPoolStatus status;
+ std::vector<uint8_t> vecParams;
+ getTestAllocatorParams(&vecParams);
+
+ BufferId bid[kNumRecycleTest];
+ for (int i = 0; i < kNumRecycleTest; ++i) {
+ std::shared_ptr<BufferPoolData> buffer;
+ native_handle_t *allocHandle = nullptr;
+ status = mManager->allocate(mConnectionId, vecParams, &allocHandle, &buffer);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ bid[i] = buffer->mId;
+ if (allocHandle) {
+ native_handle_close(allocHandle);
+ native_handle_delete(allocHandle);
+ }
+ }
+ for (int i = 1; i < kNumRecycleTest; ++i) {
+ ASSERT_TRUE(bid[i - 1] == bid[i]);
+ }
+ EXPECT_TRUE(kNumRecycleTest > 1);
+}
+
+// Buffer transfer test.
+// Check whether buffer is transferred to another client successfully.
+TEST_F(BufferpoolSingleTest, TransferBuffer) {
+ BufferPoolStatus status;
+ std::vector<uint8_t> vecParams;
+ getTestAllocatorParams(&vecParams);
+ std::shared_ptr<BufferPoolData> sbuffer, rbuffer;
+ native_handle_t *allocHandle = nullptr;
+ native_handle_t *recvHandle = nullptr;
+
+ TransactionId transactionId;
+ int64_t postMs;
+
+ status = mManager->allocate(mConnectionId, vecParams, &allocHandle, &sbuffer);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ ASSERT_TRUE(TestBufferPoolAllocator::Fill(allocHandle, 0x77));
+ status = mManager->postSend(mReceiverId, sbuffer, &transactionId, &postMs);
+ ASSERT_TRUE(status == ResultStatus::OK);
+ status = mManager->receive(mReceiverId, transactionId, sbuffer->mId, postMs,
+ &recvHandle, &rbuffer);
+ EXPECT_TRUE(status == ResultStatus::OK);
+ ASSERT_TRUE(TestBufferPoolAllocator::Verify(recvHandle, 0x77));
+
+ if (allocHandle) {
+ native_handle_close(allocHandle);
+ native_handle_delete(allocHandle);
+ }
+ if (recvHandle) {
+ native_handle_close(recvHandle);
+ native_handle_delete(recvHandle);
+ }
+}
+
+} // anonymous namespace
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ int status = RUN_ALL_TESTS();
+ LOG(INFO) << "Test result = " << status;
+ return status;
+}