Release input buffers when codec holds no refs

Codec2Client::Component and CCodecBufferChannel hold input buffers until
they return from the component in onWorkDone() or flush(). This may
cause some input buffers to never be freed and reused if the component
loses references without calling onWorkDone(). This CL adds a mechanism
to handle this situation.

Test: YouTube and Camera apps work as usual

Bug: 67591367
Bug: 80279018
Bug: 111415084
Change-Id: I778a1fa928ae14a392f0a1ab630a539f258eae0b
diff --git a/codec2/hidl/1.0/utils/Component.cpp b/codec2/hidl/1.0/utils/Component.cpp
index 23d97cd..4465504 100644
--- a/codec2/hidl/1.0/utils/Component.cpp
+++ b/codec2/hidl/1.0/utils/Component.cpp
@@ -16,7 +16,7 @@
 
 //#define LOG_NDEBUG 0
 #define LOG_TAG "Codec2-Component"
-#include <log/log.h>
+#include <android-base/logging.h>
 
 #include <C2PlatformSupport.h>
 #include <codec2/hidl/1.0/Component.h>
@@ -24,10 +24,15 @@
 #include <codec2/hidl/1.0/types.h>
 
 #include <hidl/HidlBinderSupport.h>
+#include <utils/Timers.h>
 
 #include <C2BqBufferPriv.h>
+#include <C2Debug.h>
 #include <C2PlatformSupport.h>
 
+#include <chrono>
+#include <thread>
+
 namespace hardware {
 namespace google {
 namespace media {
@@ -84,6 +89,221 @@
 
 } // unnamed namespace
 
+// InputBufferManager
+// ==================
+//
+// InputBufferManager presents a way to track and untrack input buffers in this
+// (codec) process and send a notification to a listener, possibly in a
+// different process, when a tracked buffer no longer has any references in this
+// process. (In fact, this class would work for listeners in the same process
+// too, but the optimization discussed below will not be beneficial.)
+//
+// InputBufferManager holds a collection of records representing tracked buffers
+// and their callback listeners. Conceptually, one record is a triple (listener,
+// frameIndex, bufferIndex) where
+//
+// - (frameIndex, bufferIndex) is a pair of indices used to identify the buffer.
+// - listener is of type IComponentListener. Its onFramesRendered() function
+//   will be called after the associated buffer dies. The argument of
+//   onFramesRendered() is a list of RenderedFrame objects, each of which has
+//   the following members:
+//
+//     uint64_t bufferQueueId
+//     int32_t  slotId
+//     int64_t  timestampNs
+//
+// When a tracked buffer associated to the triple (listener, frameIndex,
+// bufferIndex) goes out of scope, listener->onFramesRendered() will be called
+// with a RenderedFrame object whose members are set as follows:
+//
+//     bufferQueueId = frameIndex
+//     slotId        = ~bufferIndex
+//     timestampNs   = systemTime() at the time of notification
+//
+// The reason for the bitwise negation of bufferIndex is that onFramesRendered()
+// may be used for a different purpose when slotId is non-negative (which is a
+// more general use case).
+//
+// IPC Optimization
+// ----------------
+//
+// Since onFramesRendered() generally is an IPC call, InputBufferManager tries
+// not to call it too often. There is a mechanism to guarantee that any two
+// calls to the same listener are at least kNotificationPeriodNs nanoseconds
+// apart.
+//
+struct InputBufferManager {
+    // The minimum time period between IPC calls to notify the client about the
+    // destruction of input buffers.
+    static constexpr nsecs_t kNotificationPeriodNs = 1000000;
+
+    // Track all buffers in a C2FrameData object.
+    //
+    // input (C2FrameData) has the following two members that are of interest:
+    //
+    //   C2WorkOrdinal                ordinal
+    //   vector<shared_ptr<C2Buffer>> buffers
+    //
+    // Calling registerFrameData(listener, input) will register multiple
+    // triples (, frameIndex, bufferIndex) where frameIndex is equal to
+    // input.ordinal.frameIndex and bufferIndex runs through the indices of
+    // input.buffers such that input.buffers[bufferIndex] is not null.
+    //
+    // This should be called from queue().
+    static void registerFrameData(
+            const sp<IComponentListener>& listener,
+            const C2FrameData& input);
+
+    // Untrack all buffers in a C2FrameData object.
+    //
+    // Calling unregisterFrameData(listener, input) will unregister and remove
+    // pending notifications for all triples (l, fi, bufferIndex) such that
+    // l = listener and fi = input.ordinal.frameIndex.
+    //
+    // This should be called from onWorkDone() and flush().
+    static void unregisterFrameData(
+            const wp<IComponentListener>& listener,
+            const C2FrameData& input);
+
+    // Untrack all buffers associated to a given listener.
+    //
+    // Calling unregisterFrameData(listener) will unregister and remove
+    // pending notifications for all triples (l, frameIndex, bufferIndex) such
+    // that l = listener.
+    //
+    // This should be called when the component cleans up all input buffers,
+    // i.e., when reset(), release(), stop() or ~Component() is called.
+    static void unregisterFrameData(
+            const wp<IComponentListener>& listener);
+
+private:
+    void _registerFrameData(
+            const sp<IComponentListener>& listener,
+            const C2FrameData& input);
+    void _unregisterFrameData(
+            const wp<IComponentListener>& listener,
+            const C2FrameData& input);
+    void _unregisterFrameData(
+            const wp<IComponentListener>& listener);
+
+    // The callback function tied to C2Buffer objects.
+    //
+    // Note: This function assumes that sInstance is the only instance of this
+    //       class.
+    static void onBufferDestroyed(const C2Buffer* buf, void* arg);
+    void _onBufferDestroyed(const C2Buffer* buf, void* arg);
+
+    // Comparison operator for weak pointers.
+    struct CompareWeakComponentListener {
+        constexpr bool operator()(
+                const wp<IComponentListener>& x,
+                const wp<IComponentListener>& y) const {
+            return x.get_refs() < y.get_refs();
+        }
+    };
+
+    // Persistent data to be passed as "arg" in onBufferDestroyed().
+    // This is essentially the triple (listener, frameIndex, bufferIndex) plus a
+    // weak pointer to the C2Buffer object.
+    //
+    // Note that the "key" is bufferIndex according to operator<(). This is
+    // designed to work with TrackedBuffersMap defined below.
+    struct TrackedBuffer {
+        wp<IComponentListener> listener;
+        uint64_t frameIndex;
+        size_t bufferIndex;
+        std::weak_ptr<C2Buffer> buffer;
+        TrackedBuffer(const wp<IComponentListener>& listener,
+                      uint64_t frameIndex,
+                      size_t bufferIndex,
+                      const std::shared_ptr<C2Buffer>& buffer)
+              : listener(listener),
+                frameIndex(frameIndex),
+                bufferIndex(bufferIndex),
+                buffer(buffer) {}
+        TrackedBuffer(const TrackedBuffer&) = default;
+        bool operator<(const TrackedBuffer& other) const {
+            return bufferIndex < other.bufferIndex;
+        }
+    };
+
+    // Map: listener -> frameIndex -> set<TrackedBuffer>.
+    // Essentially, this is used to store triples (listener, frameIndex,
+    // bufferIndex) that's searchable by listener and (listener, frameIndex).
+    // However, the value of the innermost map is TrackedBuffer, which also
+    // contains an extra copy of listener and frameIndex. This is needed
+    // because onBufferDestroyed() needs to know listener and frameIndex too.
+    typedef std::map<wp<IComponentListener>,
+                     std::map<uint64_t,
+                              std::set<TrackedBuffer>>,
+                     CompareWeakComponentListener> TrackedBuffersMap;
+
+    // Storage for pending (unsent) death notifications for one listener.
+    // Each pair in member named "indices" are (frameIndex, bufferIndex) from
+    // the (listener, frameIndex, bufferIndex) triple.
+    struct DeathNotifications {
+
+        // The number of pending notifications for this listener.
+        // count may be 0, in which case the DeathNotifications object will
+        // remain valid for only a small period (kNotificationPeriodNs
+        // nanoseconds).
+        size_t count;
+
+        // The timestamp of the most recent callback on this listener. This is
+        // used to guarantee that callbacks do not occur too frequently, and
+        // also to trigger expiration of a DeathNotifications object that has
+        // count = 0.
+        nsecs_t lastSentNs;
+
+        // Map: frameIndex -> vector of bufferIndices
+        // This is essentially a collection of (framdeIndex, bufferIndex).
+        std::map<uint64_t, std::vector<size_t>> indices;
+
+        DeathNotifications()
+              : count(0),
+                lastSentNs(systemTime() - kNotificationPeriodNs),
+                indices() {}
+    };
+
+    // Mutex for the management of all input buffers.
+    std::mutex mMutex;
+
+    // Tracked input buffers.
+    TrackedBuffersMap mTrackedBuffersMap;
+
+    // Death notifications to be sent.
+    //
+    // A DeathNotifications object is associated to each listener. An entry in
+    // this map will be removed if its associated DeathNotifications has count =
+    // 0 and lastSentNs < systemTime() - kNotificationPeriodNs.
+    std::map<wp<IComponentListener>, DeathNotifications> mDeathNotifications;
+
+    // Condition variable signaled when an entry is added to mDeathNotifications.
+    std::condition_variable mOnBufferDestroyed;
+
+    // Notify the clients about buffer destructions.
+    // Return false if all destructions have been notified.
+    // Return true and set timeToRetry to the duration to wait for before
+    // retrying if some destructions have not been notified.
+    bool processNotifications(nsecs_t* timeToRetryNs);
+
+    // Main function for the input buffer manager thread.
+    void main();
+
+    // The thread that manages notifications.
+    //
+    // Note: This variable is declared last so its initialization will happen
+    // after all other member variables have been initialized.
+    std::thread mMainThread;
+
+    // Private constructor.
+    InputBufferManager();
+
+    // The only instance of this class.
+    static InputBufferManager& getInstance();
+
+};
+
 // ComponentInterface
 ComponentInterface::ComponentInterface(
         const std::shared_ptr<C2ComponentInterface>& intf,
@@ -148,6 +368,18 @@
             std::weak_ptr<C2Component> /* c2component */,
             std::list<std::unique_ptr<C2Work>> c2workItems) override {
         ALOGV("onWorkDone");
+        for (const std::unique_ptr<C2Work>& work : c2workItems) {
+            if (work) {
+                if (work->worklets.empty()
+                        || !work->worklets.back()
+                        || (work->worklets.back()->output.flags &
+                            C2FrameData::FLAG_INCOMPLETE) == 0) {
+                    InputBufferManager::
+                            unregisterFrameData(mListener, work->input);
+                }
+            }
+        }
+
         sp<IComponentListener> listener = mListener.promote();
         if (listener) {
             WorkBundle workBundle;
@@ -201,11 +433,19 @@
     ALOGV("queue -- converting input");
     std::list<std::unique_ptr<C2Work>> c2works;
 
-    // TODO: Connect with bufferpool API for buffer transfers
     if (objcpy(&c2works, workBundle) != C2_OK) {
         ALOGV("queue -- corrupted");
         return Status::CORRUPTED;
     }
+
+    // Register input buffers.
+    for (const std::unique_ptr<C2Work>& work : c2works) {
+        if (work) {
+            InputBufferManager::
+                    registerFrameData(mListener, work->input);
+        }
+    }
+
     ALOGV("queue -- calling");
     return static_cast<Status>(mComponent->queue_nb(&c2works));
 }
@@ -216,8 +456,21 @@
     c2_status_t c2res = mComponent->flush_sm(
             C2Component::FLUSH_COMPONENT,
             &c2flushedWorks);
-    WorkBundle flushedWorkBundle;
 
+    // Unregister input buffers.
+    for (const std::unique_ptr<C2Work>& work : c2flushedWorks) {
+        if (work) {
+            if (work->worklets.empty()
+                    || !work->worklets.back()
+                    || (work->worklets.back()->output.flags &
+                        C2FrameData::FLAG_INCOMPLETE) == 0) {
+                InputBufferManager::
+                        unregisterFrameData(mListener, work->input);
+            }
+        }
+    }
+
+    WorkBundle flushedWorkBundle;
     Status res = static_cast<Status>(c2res);
     if (c2res == C2_OK) {
         ALOGV("flush -- converting output");
@@ -366,6 +619,7 @@
 
 Return<Status> Component::stop() {
     ALOGV("stop");
+    InputBufferManager::unregisterFrameData(mListener);
     return static_cast<Status>(mComponent->stop());
 }
 
@@ -376,6 +630,7 @@
         std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
         mBlockPools.clear();
     }
+    InputBufferManager::unregisterFrameData(mListener);
     return status;
 }
 
@@ -386,6 +641,7 @@
         std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
         mBlockPools.clear();
     }
+    InputBufferManager::unregisterFrameData(mListener);
     return status;
 }
 
@@ -403,6 +659,7 @@
 }
 
 Component::~Component() {
+    InputBufferManager::unregisterFrameData(mListener);
     mStore->reportComponentDeath(mLocalId);
 }
 
@@ -415,6 +672,385 @@
     }
 }
 
+// InputBufferManager implementation
+
+constexpr nsecs_t InputBufferManager::kNotificationPeriodNs;
+
+void InputBufferManager::registerFrameData(
+        const sp<IComponentListener>& listener,
+        const C2FrameData& input) {
+    getInstance()._registerFrameData(listener, input);
+}
+
+void InputBufferManager::unregisterFrameData(
+        const wp<IComponentListener>& listener,
+        const C2FrameData& input) {
+    getInstance()._unregisterFrameData(listener, input);
+}
+
+void InputBufferManager::unregisterFrameData(
+        const wp<IComponentListener>& listener) {
+    getInstance()._unregisterFrameData(listener);
+}
+
+void InputBufferManager::_registerFrameData(
+        const sp<IComponentListener>& listener,
+        const C2FrameData& input) {
+    uint64_t frameIndex = input.ordinal.frameIndex.peeku();
+    ALOGV("InputBufferManager::_registerFrameData called "
+          "(listener @ %p, frameIndex = %llu)",
+          listener.get(),
+          static_cast<long long unsigned>(frameIndex));
+    std::lock_guard<std::mutex> lock(mMutex);
+
+    std::set<TrackedBuffer> &bufferIds =
+            mTrackedBuffersMap[listener][frameIndex];
+
+    for (size_t i = 0; i < input.buffers.size(); ++i) {
+        if (!input.buffers[i]) {
+            ALOGV("InputBufferManager::_registerFrameData: "
+                  "Input buffer at index %zu is null", i);
+            continue;
+        }
+        const TrackedBuffer &bufferId =
+                *bufferIds.emplace(listener, frameIndex, i, input.buffers[i]).
+                first;
+
+        c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
+                onBufferDestroyed,
+                const_cast<void*>(reinterpret_cast<const void*>(&bufferId)));
+        if (status != C2_OK) {
+            ALOGD("InputBufferManager: registerOnDestroyNotify failed "
+                  "(listener @ %p, frameIndex = %llu, bufferIndex = %zu) "
+                  "=> %s (%d)",
+                  listener.get(),
+                  static_cast<unsigned long long>(frameIndex),
+                  i,
+                  asString(status), static_cast<int>(status));
+        }
+    }
+
+    mDeathNotifications.emplace(listener, DeathNotifications());
+}
+
+// Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
+// mDeathNotifications. This implies all bufferIndices are removed.
+//
+// This is called from onWorkDone() and flush().
+void InputBufferManager::_unregisterFrameData(
+        const wp<IComponentListener>& listener,
+        const C2FrameData& input) {
+    uint64_t frameIndex = input.ordinal.frameIndex.peeku();
+    ALOGV("InputBufferManager::_unregisterFrameData called "
+          "(listener @ %p, frameIndex = %llu)",
+          listener.unsafe_get(),
+          static_cast<long long unsigned>(frameIndex));
+    std::lock_guard<std::mutex> lock(mMutex);
+
+    auto findListener = mTrackedBuffersMap.find(listener);
+    if (findListener != mTrackedBuffersMap.end()) {
+        std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
+                = findListener->second;
+        auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
+        if (findFrameIndex != frameIndex2BufferIds.end()) {
+            std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
+            for (const TrackedBuffer& bufferId : bufferIds) {
+                std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
+                if (buffer) {
+                    c2_status_t status = buffer->unregisterOnDestroyNotify(
+                            onBufferDestroyed,
+                            const_cast<void*>(
+                            reinterpret_cast<const void*>(&bufferId)));
+                    if (status != C2_OK) {
+                        ALOGD("InputBufferManager: "
+                              "unregisterOnDestroyNotify failed "
+                              "(listener @ %p, "
+                              "frameIndex = %llu, "
+                              "bufferIndex = %zu) "
+                              "=> %s (%d)",
+                              bufferId.listener.unsafe_get(),
+                              static_cast<unsigned long long>(
+                                  bufferId.frameIndex),
+                              bufferId.bufferIndex,
+                              asString(status), static_cast<int>(status));
+                    }
+                }
+            }
+
+            frameIndex2BufferIds.erase(findFrameIndex);
+            if (frameIndex2BufferIds.empty()) {
+                mTrackedBuffersMap.erase(findListener);
+            }
+        }
+    }
+
+    auto findListenerD = mDeathNotifications.find(listener);
+    if (findListenerD != mDeathNotifications.end()) {
+        DeathNotifications &deathNotifications = findListenerD->second;
+        auto findFrameIndex = deathNotifications.indices.find(frameIndex);
+        if (findFrameIndex != deathNotifications.indices.end()) {
+            std::vector<size_t> &bufferIndices = findFrameIndex->second;
+            deathNotifications.count -= bufferIndices.size();
+            deathNotifications.indices.erase(findFrameIndex);
+        }
+    }
+}
+
+// Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
+// all frameIndices and bufferIndices are removed.
+//
+// This is called when the component cleans up all input buffers, i.e., when
+// reset(), release(), stop() or ~Component() is called.
+void InputBufferManager::_unregisterFrameData(
+        const wp<IComponentListener>& listener) {
+    ALOGV("InputBufferManager::_unregisterFrameData called (listener @ %p)",
+            listener.unsafe_get());
+    std::lock_guard<std::mutex> lock(mMutex);
+
+    auto findListener = mTrackedBuffersMap.find(listener);
+    if (findListener != mTrackedBuffersMap.end()) {
+        std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds =
+                findListener->second;
+        for (auto findFrameIndex = frameIndex2BufferIds.begin();
+                findFrameIndex != frameIndex2BufferIds.end();
+                ++findFrameIndex) {
+            std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
+            for (const TrackedBuffer& bufferId : bufferIds) {
+                std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
+                if (buffer) {
+                    c2_status_t status = buffer->unregisterOnDestroyNotify(
+                            onBufferDestroyed,
+                            const_cast<void*>(
+                            reinterpret_cast<const void*>(&bufferId)));
+                    if (status != C2_OK) {
+                        ALOGD("InputBufferManager: "
+                              "unregisterOnDestroyNotify failed "
+                              "(listener @ %p, "
+                              "frameIndex = %llu, "
+                              "bufferIndex = %zu) "
+                              "=> %s (%d)",
+                              bufferId.listener.unsafe_get(),
+                              static_cast<unsigned long long>(bufferId.frameIndex),
+                              bufferId.bufferIndex,
+                              asString(status), static_cast<int>(status));
+                    }
+                }
+            }
+        }
+        mTrackedBuffersMap.erase(findListener);
+    }
+
+    mDeathNotifications.erase(listener);
+}
+
+// Move a buffer from mTrackedBuffersMap to mDeathNotifications.
+// This is called when a registered C2Buffer object is destroyed.
+void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
+    getInstance()._onBufferDestroyed(buf, arg);
+}
+
+void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
+    if (!buf || !arg) {
+        ALOGW("InputBufferManager::_onBufferDestroyed called "
+              "with null argument(s) (buf @ %p, arg @ %p)",
+              buf, arg);
+        return;
+    }
+    TrackedBuffer id(*reinterpret_cast<TrackedBuffer*>(arg));
+    ALOGV("InputBufferManager::_onBufferDestroyed called "
+          "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
+          id.listener.unsafe_get(),
+          static_cast<unsigned long long>(id.frameIndex),
+          id.bufferIndex);
+
+    std::lock_guard<std::mutex> lock(mMutex);
+
+    auto findListener = mTrackedBuffersMap.find(id.listener);
+    if (findListener == mTrackedBuffersMap.end()) {
+        ALOGD("InputBufferManager::_onBufferDestroyed received "
+              "invalid listener "
+              "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
+              id.listener.unsafe_get(),
+              static_cast<unsigned long long>(id.frameIndex),
+              id.bufferIndex);
+        return;
+    }
+
+    std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
+            = findListener->second;
+    auto findFrameIndex = frameIndex2BufferIds.find(id.frameIndex);
+    if (findFrameIndex == frameIndex2BufferIds.end()) {
+        ALOGD("InputBufferManager::_onBufferDestroyed received "
+              "invalid frame index "
+              "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
+              id.listener.unsafe_get(),
+              static_cast<unsigned long long>(id.frameIndex),
+              id.bufferIndex);
+        return;
+    }
+
+    std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
+    auto findBufferId = bufferIds.find(id);
+    if (findBufferId == bufferIds.end()) {
+        ALOGD("InputBufferManager::_onBufferDestroyed received "
+              "invalid buffer index: "
+              "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
+              id.listener.unsafe_get(),
+              static_cast<unsigned long long>(id.frameIndex),
+              id.bufferIndex);
+    }
+
+    bufferIds.erase(findBufferId);
+    if (bufferIds.empty()) {
+        frameIndex2BufferIds.erase(findFrameIndex);
+        if (frameIndex2BufferIds.empty()) {
+            mTrackedBuffersMap.erase(findListener);
+        }
+    }
+
+    DeathNotifications &deathNotifications = mDeathNotifications[id.listener];
+    deathNotifications.indices[id.frameIndex].emplace_back(id.bufferIndex);
+    ++deathNotifications.count;
+    mOnBufferDestroyed.notify_one();
+}
+
+// Notify the clients about buffer destructions.
+// Return false if all destructions have been notified.
+// Return true and set timeToRetry to the time point to wait for before
+// retrying if some destructions have not been notified.
+bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
+
+    struct Notification {
+        sp<IComponentListener> listener;
+        hidl_vec<IComponentListener::RenderedFrame> renderedFrames;
+        Notification(const sp<IComponentListener>& l, size_t s)
+              : listener(l), renderedFrames(s) {}
+    };
+    std::list<Notification> notifications;
+
+    bool retry = false;
+    {
+        std::lock_guard<std::mutex> lock(mMutex);
+        *timeToRetryNs = kNotificationPeriodNs;
+        nsecs_t timeNowNs = systemTime();
+        for (auto it = mDeathNotifications.begin();
+                it != mDeathNotifications.end(); ) {
+            sp<IComponentListener> listener = it->first.promote();
+            if (!listener) {
+                ++it;
+                continue;
+            }
+            DeathNotifications &deathNotifications = it->second;
+
+            nsecs_t timeSinceLastNotifiedNs =
+                    timeNowNs - deathNotifications.lastSentNs;
+            // If not enough time has passed since the last callback, leave the
+            // notifications for this listener untouched for now and retry
+            // later.
+            if (timeSinceLastNotifiedNs < kNotificationPeriodNs) {
+                retry = true;
+                *timeToRetryNs = std::min(*timeToRetryNs,
+                        kNotificationPeriodNs - timeSinceLastNotifiedNs);
+                ALOGV("InputBufferManager: Notifications for "
+                      "listener @ %p will be postponed.",
+                      listener.get());
+                ++it;
+                continue;
+            }
+
+            // If enough time has passed since the last notification to this
+            // listener but there are currently no pending notifications, the
+            // listener can be removed from mDeathNotifications---there is no
+            // need to keep track of the last notification time anymore.
+            if (deathNotifications.count == 0) {
+                it = mDeathNotifications.erase(it);
+                continue;
+            }
+
+            // Create the argument for the callback.
+            notifications.emplace_back(listener, deathNotifications.count);
+            hidl_vec<IComponentListener::RenderedFrame>& renderedFrames =
+                    notifications.back().renderedFrames;
+            size_t i = 0;
+            for (std::pair<const uint64_t, std::vector<size_t>>& p :
+                    deathNotifications.indices) {
+                uint64_t frameIndex = p.first;
+                const std::vector<size_t> &bufferIndices = p.second;
+                for (const size_t& bufferIndex : bufferIndices) {
+                    IComponentListener::RenderedFrame &renderedFrame
+                            = renderedFrames[i++];
+                    renderedFrame.slotId = ~bufferIndex;
+                    renderedFrame.bufferQueueId = frameIndex;
+                    renderedFrame.timestampNs = timeNowNs;
+                    ALOGV("InputBufferManager: "
+                          "Sending death notification (listener @ %p, "
+                          "frameIndex = %llu, bufferIndex = %zu)",
+                          listener.get(),
+                          static_cast<long long unsigned>(frameIndex),
+                          bufferIndex);
+                }
+            }
+
+            // Clear deathNotifications for this listener and set retry to true
+            // so processNotifications will be called again. This will
+            // guarantee that a listener with no pending notifications will
+            // eventually be removed from mDeathNotifications after
+            // kNotificationPeriodNs nanoseconds has passed.
+            retry = true;
+            deathNotifications.indices.clear();
+            deathNotifications.count = 0;
+            deathNotifications.lastSentNs = timeNowNs;
+            ++it;
+        }
+    }
+
+    // Call onFramesRendered outside the lock to avoid deadlock.
+    for (const Notification& notification : notifications) {
+        if (!notification.listener->onFramesRendered(
+                notification.renderedFrames).isOk()) {
+            // This may trigger if the client has died.
+            ALOGD("InputBufferManager: onFramesRendered transaction failed "
+                  "(listener @ %p)",
+                  notification.listener.get());
+        }
+    }
+    if (retry) {
+        ALOGV("InputBufferManager: Pending death notifications"
+              "will be sent in %lldns.",
+              static_cast<long long>(*timeToRetryNs));
+    }
+    return retry;
+}
+
+void InputBufferManager::main() {
+    ALOGV("InputBufferManager: Starting main thread");
+    nsecs_t timeToRetryNs;
+    while (true) {
+        std::unique_lock<std::mutex> lock(mMutex);
+        while (mDeathNotifications.empty()) {
+            ALOGV("InputBufferManager: Waiting for buffer deaths");
+            mOnBufferDestroyed.wait(lock);
+        }
+        lock.unlock();
+        ALOGV("InputBufferManager: Sending buffer death notifications");
+        while (processNotifications(&timeToRetryNs)) {
+            std::this_thread::sleep_for(
+                    std::chrono::nanoseconds(timeToRetryNs));
+            ALOGV("InputBufferManager: Sending pending death notifications");
+        }
+        ALOGV("InputBufferManager: No pending death notifications");
+    }
+}
+
+InputBufferManager::InputBufferManager()
+      : mMainThread(&InputBufferManager::main, this) {
+}
+
+InputBufferManager& InputBufferManager::getInstance() {
+    static InputBufferManager instance{};
+    return instance;
+}
+
 }  // namespace utils
 }  // namespace V1_0
 }  // namespace c2
diff --git a/codec2/hidl/client/client.cpp b/codec2/hidl/client/client.cpp
index 9a82712..33e6cf2 100644
--- a/codec2/hidl/client/client.cpp
+++ b/codec2/hidl/client/client.cpp
@@ -327,8 +327,111 @@
     return status;
 }
 
-// Codec2Client
+// Codec2Client::Component::HidlListener
+struct Codec2Client::Component::HidlListener : public IComponentListener {
+    std::weak_ptr<Component> component;
+    std::weak_ptr<Listener> base;
 
+    virtual Return<void> onWorkDone(const WorkBundle& workBundle) override {
+        std::list<std::unique_ptr<C2Work>> workItems;
+        c2_status_t status = objcpy(&workItems, workBundle);
+        if (status != C2_OK) {
+            ALOGI("onWorkDone -- received corrupted WorkBundle. "
+                    "status = %d.", static_cast<int>(status));
+            return Void();
+        }
+        // release input buffers potentially held by the component from queue
+        std::shared_ptr<Codec2Client::Component> strongComponent = component.lock();
+        if (strongComponent) {
+            strongComponent->handleOnWorkDone(workItems);
+        }
+        if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
+            listener->onWorkDone(component, workItems);
+        } else {
+            ALOGD("onWorkDone -- listener died.");
+        }
+        return Void();
+    }
+
+    virtual Return<void> onTripped(
+            const hidl_vec<SettingResult>& settingResults) override {
+        std::vector<std::shared_ptr<C2SettingResult>> c2SettingResults(
+                settingResults.size());
+        c2_status_t status;
+        for (size_t i = 0; i < settingResults.size(); ++i) {
+            std::unique_ptr<C2SettingResult> c2SettingResult;
+            status = objcpy(&c2SettingResult, settingResults[i]);
+            if (status != C2_OK) {
+                ALOGI("onTripped -- received corrupted SettingResult. "
+                        "status = %d.", static_cast<int>(status));
+                return Void();
+            }
+            c2SettingResults[i] = std::move(c2SettingResult);
+        }
+        if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
+            listener->onTripped(component, c2SettingResults);
+        } else {
+            ALOGD("onTripped -- listener died.");
+        }
+        return Void();
+    }
+
+    virtual Return<void> onError(Status s, uint32_t errorCode) override {
+        ALOGD("onError -- status = %d, errorCode = %u.",
+                static_cast<int>(s),
+                static_cast<unsigned>(errorCode));
+        if (std::shared_ptr<Listener> listener = base.lock()) {
+            listener->onError(component, s == Status::OK ?
+                    errorCode : static_cast<c2_status_t>(s));
+        } else {
+            ALOGD("onError -- listener died.");
+        }
+        return Void();
+    }
+
+    virtual Return<void> onFramesRendered(
+            const hidl_vec<RenderedFrame>& renderedFrames) override {
+        std::shared_ptr<Listener> listener = base.lock();
+        std::vector<Codec2Client::Listener::RenderedFrame> rfs;
+        rfs.reserve(renderedFrames.size());
+        for (const RenderedFrame& rf : renderedFrames) {
+            if (rf.slotId >= 0) {
+                if (listener) {
+                    rfs.emplace_back(rf.bufferQueueId,
+                                     rf.slotId,
+                                     rf.timestampNs);
+                }
+            } else {
+                std::shared_ptr<Codec2Client::Component> strongComponent =
+                        component.lock();
+                if (strongComponent) {
+                    uint64_t frameIndex = rf.bufferQueueId;
+                    size_t bufferIndex = static_cast<size_t>(~rf.slotId);
+                    ALOGV("Received death notification of input buffer: "
+                          "frameIndex = %llu, bufferIndex = %zu.",
+                          static_cast<long long unsigned>(frameIndex),
+                          bufferIndex);
+                    std::shared_ptr<C2Buffer> buffer =
+                            strongComponent->freeInputBuffer(
+                                frameIndex, bufferIndex);
+                    if (buffer) {
+                        listener->onInputBufferDone(buffer);
+                    }
+                }
+            }
+        }
+        if (!rfs.empty()) {
+            if (listener) {
+                listener->onFramesRendered(rfs);
+            } else {
+                ALOGD("onFramesRendered -- listener died.");
+            }
+        }
+        return Void();
+    }
+};
+
+// Codec2Client
 Codec2Client::Base* Codec2Client::base() const {
     return static_cast<Base*>(mBase.get());
 }
@@ -350,91 +453,9 @@
 
     // TODO: Add support for Bufferpool
 
-    struct HidlListener : public IComponentListener {
-        std::weak_ptr<Component> component;
-        std::weak_ptr<Listener> base;
-
-        virtual Return<void> onWorkDone(const WorkBundle& workBundle) override {
-            std::list<std::unique_ptr<C2Work>> workItems;
-            c2_status_t status = objcpy(&workItems, workBundle);
-            if (status != C2_OK) {
-                ALOGE("onWorkDone -- received corrupted WorkBundle. "
-                        "status = %d.", static_cast<int>(status));
-                return Void();
-            }
-            // release input buffers potentially held by the component from queue
-            std::shared_ptr<Codec2Client::Component> strongComponent = component.lock();
-            if (strongComponent) {
-                strongComponent->handleOnWorkDone(workItems);
-            }
-            if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
-                listener->onWorkDone(component, workItems);
-            } else {
-                ALOGW("onWorkDone -- listener died.");
-            }
-            return Void();
-        }
-
-        virtual Return<void> onTripped(
-                const hidl_vec<SettingResult>& settingResults) override {
-            std::vector<std::shared_ptr<C2SettingResult>> c2SettingResults(
-                    settingResults.size());
-            c2_status_t status;
-            for (size_t i = 0; i < settingResults.size(); ++i) {
-                std::unique_ptr<C2SettingResult> c2SettingResult;
-                status = objcpy(&c2SettingResult, settingResults[i]);
-                if (status != C2_OK) {
-                    ALOGE("onTripped -- received corrupted SettingResult. "
-                            "status = %d.", static_cast<int>(status));
-                    return Void();
-                }
-                c2SettingResults[i] = std::move(c2SettingResult);
-            }
-            if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
-                listener->onTripped(component, c2SettingResults);
-            } else {
-                ALOGW("onTripped -- listener died.");
-            }
-            return Void();
-        }
-
-        virtual Return<void> onError(Status s, uint32_t errorCode) override {
-            ALOGE("onError -- status = %d, errorCode = %u.",
-                    static_cast<int>(s),
-                    static_cast<unsigned>(errorCode));
-            if (std::shared_ptr<Listener> listener = base.lock()) {
-                listener->onError(component, s == Status::OK ?
-                        errorCode : static_cast<c2_status_t>(s));
-            } else {
-                ALOGW("onError -- listener died.");
-            }
-            return Void();
-        }
-
-        virtual Return<void> onFramesRendered(
-                const hidl_vec<RenderedFrame>& renderedFrames) override {
-            if (std::shared_ptr<Listener> listener = base.lock()) {
-                std::vector<Codec2Client::Listener::RenderedFrame>
-                        rfs(renderedFrames.size());
-                for (size_t i = 0; i < rfs.size(); ++i) {
-                    rfs[i].bufferQueueId = static_cast<uint64_t>(
-                            renderedFrames[i].bufferQueueId);
-                    rfs[i].slotId = static_cast<int32_t>(
-                            renderedFrames[i].slotId);
-                    rfs[i].timestampNs = static_cast<int64_t>(
-                            renderedFrames[i].timestampNs);
-                }
-                listener->onFramesRendered(rfs);
-            } else {
-                ALOGW("onFramesRendered -- listener died.");
-            }
-            return Void();
-        }
-
-    };
 
     c2_status_t status;
-    sp<HidlListener> hidlListener = new HidlListener();
+    sp<Component::HidlListener> hidlListener = new Component::HidlListener();
     hidlListener->base = listener;
     Return<void> transStatus = base()->createComponent(
             name,
@@ -839,11 +860,15 @@
         for (uint64_t inputIndex : inputDone) {
             auto it = mInputBuffers.find(inputIndex);
             if (it == mInputBuffers.end()) {
-                ALOGI("unknown input index %llu in onWorkDone", (long long)inputIndex);
+                ALOGV("onWorkDone -- returned consumed/unknown "
+                      "input frame: index %llu",
+                        (long long)inputIndex);
             } else {
-                ALOGV("done with input index %llu with %zu buffers",
+                ALOGV("onWorkDone -- processed input frame: "
+                      "index %llu (containing %zu buffers)",
                         (long long)inputIndex, it->second.size());
                 mInputBuffers.erase(it);
+                mInputBufferCount.erase(inputIndex);
             }
         }
     }
@@ -860,6 +885,38 @@
     }
 }
 
+std::shared_ptr<C2Buffer> Codec2Client::Component::freeInputBuffer(
+        uint64_t frameIndex,
+        size_t bufferIndex) {
+    std::shared_ptr<C2Buffer> buffer;
+    std::lock_guard<std::mutex> lock(mInputBuffersMutex);
+    auto it = mInputBuffers.find(frameIndex);
+    if (it == mInputBuffers.end()) {
+        ALOGI("freeInputBuffer -- Unrecognized input frame index %llu.",
+              static_cast<long long unsigned>(frameIndex));
+        return nullptr;
+    }
+    if (bufferIndex >= it->second.size()) {
+        ALOGI("freeInputBuffer -- Input buffer no. %zu is invalid in "
+              "input frame index %llu.",
+              bufferIndex, static_cast<long long unsigned>(frameIndex));
+        return nullptr;
+    }
+    buffer = it->second[bufferIndex];
+    if (!buffer) {
+        ALOGI("freeInputBuffer -- Input buffer no. %zu in "
+              "input frame index %llu has already been freed.",
+              bufferIndex, static_cast<long long unsigned>(frameIndex));
+        return nullptr;
+    }
+    it->second[bufferIndex] = nullptr;
+    if (--mInputBufferCount[frameIndex] == 0) {
+        mInputBuffers.erase(it);
+        mInputBufferCount.erase(frameIndex);
+    }
+    return buffer;
+}
+
 c2_status_t Codec2Client::Component::queue(
         std::list<std::unique_ptr<C2Work>>* const items) {
     // remember input buffers queued to hold reference to them
@@ -869,15 +926,22 @@
             if (!work) {
                 continue;
             }
+            if (work->input.buffers.size() == 0) {
+                continue;
+            }
 
             uint64_t inputIndex = work->input.ordinal.frameIndex.peeku();
             auto res = mInputBuffers.emplace(inputIndex, work->input.buffers);
             if (!res.second) {
-                ALOGI("duplicate input index %llu in queue", (long long)inputIndex);
                 // TODO: append? - for now we are replacing
                 res.first->second = work->input.buffers;
+                ALOGI("queue -- duplicate input frame: index %llu. "
+                      "Discarding the old input frame...",
+                        (long long)inputIndex);
             }
-            ALOGV("qeueing input index %llu with %zu buffers",
+            mInputBufferCount[inputIndex] = work->input.buffers.size();
+            ALOGV("queue -- queueing input frame: "
+                  "index %llu (containing %zu buffers)",
                     (long long)inputIndex, work->input.buffers.size());
         }
     }
@@ -927,7 +991,14 @@
     std::vector<uint64_t> flushedIndices;
     for (const std::unique_ptr<C2Work> &work : *flushedWork) {
         if (work) {
-            flushedIndices.emplace_back(work->input.ordinal.frameIndex.peeku());
+            if (work->worklets.empty()
+                    || !work->worklets.back()
+                    || (work->worklets.back()->output.flags &
+                        C2FrameData::FLAG_INCOMPLETE) == 0) {
+                // input is complete
+                flushedIndices.emplace_back(
+                        work->input.ordinal.frameIndex.peeku());
+            }
         }
     }
 
@@ -936,11 +1007,15 @@
         std::lock_guard<std::mutex> lock(mInputBuffersMutex);
         auto it = mInputBuffers.find(flushedIndex);
         if (it == mInputBuffers.end()) {
-            ALOGI("unknown input index %llu in flush", (long long)flushedIndex);
+            ALOGV("flush -- returned consumed/unknown input frame: "
+                  "index %llu",
+                    (long long)flushedIndex);
         } else {
-            ALOGV("flushed input index %llu with %zu buffers",
-                    (long long)flushedIndex, it->second.size());
+            ALOGV("flush -- returned unprocessed input frame: "
+                  "index %llu (containing %zu buffers)",
+                    (long long)flushedIndex, mInputBufferCount[flushedIndex]);
             mInputBuffers.erase(it);
+            mInputBufferCount.erase(flushedIndex);
         }
     }
 
@@ -1003,6 +1078,7 @@
     }
     mInputBuffersMutex.lock();
     mInputBuffers.clear();
+    mInputBufferCount.clear();
     mInputBuffersMutex.unlock();
     return status;
 }
@@ -1021,6 +1097,7 @@
     }
     mInputBuffersMutex.lock();
     mInputBuffers.clear();
+    mInputBufferCount.clear();
     mInputBuffersMutex.unlock();
     return status;
 }
@@ -1039,6 +1116,7 @@
     }
     mInputBuffersMutex.lock();
     mInputBuffers.clear();
+    mInputBufferCount.clear();
     mInputBuffersMutex.unlock();
     return status;
 }
diff --git a/codec2/hidl/client/include/codec2/hidl/client.h b/codec2/hidl/client/include/codec2/hidl/client.h
index 3f38a9a..01da733 100644
--- a/codec2/hidl/client/include/codec2/hidl/client.h
+++ b/codec2/hidl/client/include/codec2/hidl/client.h
@@ -259,10 +259,29 @@
     virtual void onDeath(
             const std::weak_ptr<Component>& comp) = 0;
 
+    // This is called when an input buffer is no longer in use by the codec.
+    // Input buffers that have been returned by onWorkDone() or flush() will not
+    // trigger a call to this function.
+    virtual void onInputBufferDone(
+            const std::shared_ptr<C2Buffer>& buffer) = 0;
+
+    // This structure is used for transporting onFramesRendered() event to the
+    // client in the case where the output buffers are obtained from a
+    // bufferqueue.
     struct RenderedFrame {
+        // The id of the bufferqueue.
         uint64_t bufferQueueId;
+        // The slot of the buffer inside the bufferqueue.
         int32_t slotId;
+        // The timestamp.
         int64_t timestampNs;
+
+        RenderedFrame(uint64_t bufferQueueId, int32_t slotId,
+                      int64_t timestampNs)
+              : bufferQueueId(bufferQueueId),
+                slotId(slotId),
+                timestampNs(timestampNs) {}
+        RenderedFrame(const RenderedFrame&) = default;
     };
 
     virtual void onFramesRendered(
@@ -347,8 +366,6 @@
 
     c2_status_t disconnectFromInputSurface();
 
-    void handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
-
     // base cannot be null.
     Component(const sp<Base>& base);
 
@@ -357,10 +374,24 @@
 protected:
     Base* base() const;
 
+    // Mutex for mInputBuffers and mInputBufferCount.
     mutable std::mutex mInputBuffersMutex;
+
+    // Map: frameIndex -> vector of bufferIndices
+    //
+    // mInputBuffers[frameIndex][bufferIndex] may be null if the buffer in that
+    // slot has been freed.
     mutable std::map<uint64_t, std::vector<std::shared_ptr<C2Buffer>>>
             mInputBuffers;
 
+    // Map: frameIndex -> number of bufferIndices that have not been freed
+    //
+    // mInputBufferCount[frameIndex] keeps track of the number of non-null
+    // elements in mInputBuffers[frameIndex]. When mInputBufferCount[frameIndex]
+    // decreases to 0, frameIndex can be removed from both mInputBuffers and
+    // mInputBufferCount.
+    mutable std::map<uint64_t, size_t> mInputBufferCount;
+
     ::hardware::google::media::c2::V1_0::utils::DefaultBufferPoolSender
             mBufferPoolSender;
 
@@ -375,6 +406,12 @@
     sp<::android::hardware::hidl_death_recipient> mDeathRecipient;
 
     friend struct Codec2Client;
+
+    struct HidlListener;
+    void handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
+    // Remove an input buffer from mInputBuffers and return it.
+    std::shared_ptr<C2Buffer> freeInputBuffer(uint64_t frameIndex, size_t bufferIndex);
+
 };
 
 struct Codec2Client::InputSurface {
diff --git a/media/codecs/base/SimpleC2Component.cpp b/media/codecs/base/SimpleC2Component.cpp
index 0393466..117a22b 100644
--- a/media/codecs/base/SimpleC2Component.cpp
+++ b/media/codecs/base/SimpleC2Component.cpp
@@ -451,6 +451,7 @@
         listener->onWorkDone_nb(shared_from_this(), vec(work));
     } else {
         ALOGV("queue pending work");
+        work->input.buffers.clear();
         std::unique_ptr<C2Work> unexpected;
         {
             Mutexed<PendingWork>::Locked pending(mPendingWork);
diff --git a/media/sfplugin/CCodec.cpp b/media/sfplugin/CCodec.cpp
index 04df714..e2c8421 100644
--- a/media/sfplugin/CCodec.cpp
+++ b/media/sfplugin/CCodec.cpp
@@ -479,6 +479,14 @@
         (void)renderedFrames;
     }
 
+    virtual void onInputBufferDone(
+            const std::shared_ptr<C2Buffer>& buffer) override {
+        sp<CCodec> codec(mCodec.promote());
+        if (codec) {
+            codec->onInputBufferDone(buffer);
+        }
+    }
+
 private:
     wp<CCodec> mCodec;
 };
@@ -1386,6 +1394,10 @@
     (new AMessage(kWhatWorkDone, this))->post();
 }
 
+void CCodec::onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer) {
+    mChannel->onInputBufferDone(buffer);
+}
+
 void CCodec::onMessageReceived(const sp<AMessage> &msg) {
     TimePoint now = std::chrono::steady_clock::now();
     CCodecWatchdog::getInstance()->watch(this);
diff --git a/media/sfplugin/CCodec.h b/media/sfplugin/CCodec.h
index 252ccef..a02963c 100644
--- a/media/sfplugin/CCodec.h
+++ b/media/sfplugin/CCodec.h
@@ -67,6 +67,7 @@
 
     void initiateReleaseIfStuck();
     void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems);
+    void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
 
 protected:
     virtual ~CCodec();
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index bec9397..6bae83e 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -132,6 +132,13 @@
             const sp<MediaCodecBuffer> &buffer, std::shared_ptr<C2Buffer> *c2buffer) = 0;
 
     /**
+     * Release the buffer that is no longer used by the codec process. Return
+     * true if and only if the buffer was on file and released successfully.
+     */
+    virtual bool expireComponentBuffer(
+            const std::shared_ptr<C2Buffer> &c2buffer) = 0;
+
+    /**
      * Flush internal state. After this call, no index or buffer previously
      * returned from requestNewBuffer() is valid.
      */
@@ -267,8 +274,8 @@
 
 // TODO: get this info from component
 const static size_t kMinInputBufferArraySize = 8;
-const static size_t kMaxPipelineCapacity = 16;
-const static size_t kChannelOutputDelay = 2;
+const static size_t kMaxPipelineCapacity = 18;
+const static size_t kChannelOutputDelay = 0;
 const static size_t kMinOutputBufferArraySize = kMaxPipelineCapacity +
                                                 kChannelOutputDelay;
 const static size_t kLinearBufferSize = 1048576;
@@ -476,6 +483,21 @@
         return true;
     }
 
+    bool expireComponentBuffer(const std::shared_ptr<C2Buffer> &c2buffer) {
+        for (size_t i = 0; i < mBuffers.size(); ++i) {
+            std::shared_ptr<C2Buffer> compBuffer =
+                    mBuffers[i].compBuffer.lock();
+            if (!compBuffer || compBuffer != c2buffer) {
+                continue;
+            }
+            mBuffers[i].clientBuffer = nullptr;
+            mBuffers[i].compBuffer.reset();
+            return true;
+        }
+        ALOGV("[%s] codec released an unknown buffer", mName);
+        return false;
+    }
+
     void flush() {
         ALOGV("[%s] buffers are flushed %zu", mName, mBuffers.size());
         mBuffers.clear();
@@ -599,6 +621,28 @@
         return true;
     }
 
+    bool expireComponentBuffer(const std::shared_ptr<C2Buffer> &c2buffer) {
+        for (size_t i = 0; i < mBuffers.size(); ++i) {
+            std::shared_ptr<C2Buffer> compBuffer =
+                    mBuffers[i].compBuffer.lock();
+            if (!compBuffer) {
+                continue;
+            }
+            if (c2buffer == compBuffer) {
+                if (mBuffers[i].ownedByClient) {
+                    // This should not happen.
+                    ALOGD("[%s] codec released a buffer owned by client "
+                          "(index %zu)", mName, i);
+                    mBuffers[i].ownedByClient = false;
+                }
+                mBuffers[i].compBuffer.reset();
+                return true;
+            }
+        }
+        ALOGV("[%s] codec released an unknown buffer (array mode)", mName);
+        return false;
+    }
+
     /**
      * Populate |array| with the underlying buffer array.
      *
@@ -672,6 +716,11 @@
         return mImpl.returnBuffer(buffer, c2buffer);
     }
 
+    bool expireComponentBuffer(
+            const std::shared_ptr<C2Buffer> &c2buffer) override {
+        return mImpl.expireComponentBuffer(c2buffer);
+    }
+
     void flush() override {
         mImpl.flush();
     }
@@ -709,6 +758,11 @@
         return mImpl.releaseSlot(buffer, c2buffer);
     }
 
+    bool expireComponentBuffer(
+            const std::shared_ptr<C2Buffer> &c2buffer) override {
+        return mImpl.expireComponentBuffer(c2buffer);
+    }
+
     void flush() override {
         // This is no-op by default unless we're in array mode where we need to keep
         // track of the flushed work.
@@ -839,6 +893,11 @@
         return mImpl.releaseSlot(buffer, c2buffer);
     }
 
+    bool expireComponentBuffer(
+            const std::shared_ptr<C2Buffer> &c2buffer) override {
+        return mImpl.expireComponentBuffer(c2buffer);
+    }
+
     void flush() override {
         // This is no-op by default unless we're in array mode where we need to keep
         // track of the flushed work.
@@ -897,6 +956,10 @@
         return mImpl.releaseSlot(buffer, c2buffer);
     }
 
+    bool expireComponentBuffer(
+            const std::shared_ptr<C2Buffer> &c2buffer) override {
+        return mImpl.expireComponentBuffer(c2buffer);
+    }
     void flush() override {
         // This is no-op by default unless we're in array mode where we need to keep
         // track of the flushed work.
@@ -938,6 +1001,9 @@
         return false;
     }
 
+    bool expireComponentBuffer(const std::shared_ptr<C2Buffer> &) override {
+        return false;
+    }
     void flush() override {
     }
 
@@ -2143,6 +2209,18 @@
     feedInputBufferIfAvailable();
 }
 
+void CCodecBufferChannel::onInputBufferDone(
+        const std::shared_ptr<C2Buffer>& buffer) {
+    bool newInputSlotAvailable;
+    {
+        Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+        newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
+    }
+    if (newInputSlotAvailable) {
+        feedInputBufferIfAvailable();
+    }
+}
+
 bool CCodecBufferChannel::handleWork(
         std::unique_ptr<C2Work> work,
         const sp<AMessage> &outputFormat,
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index f68e119..e9652ce 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -131,6 +131,14 @@
             std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
             const C2StreamInitDataInfo::output *initData);
 
+    /**
+     * Make an input buffer available for the client as it is no longer needed
+     * by the codec.
+     *
+     * @param buffer The buffer that becomes unused.
+     */
+    void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+
     enum MetaMode {
         MODE_NONE,
         MODE_ANW,