Modify input gating logic

Before this change, onInputBufferAvailable() is called during
initialization, and after each call to renderOutputBuffer() or
discardBuffer().

After this change, CCodecBufferChannel will have two conditions that may
block onInputBufferAvailable() from being called: 1) if the number of
received input buffers exceeds the number of work items returned from
the component by a certain limit; or 2) if the number of received input
buffers exceeds the number of output buffers rendered or discarded by a
certain limit.

Test: adb shell setprop debug.stagefright.ccodec 2 \
adb shell killall mediacodec \
adb shell killall mediaserver
* Then play videos.

Bug: 111415084
Change-Id: I7b075e26c8a5d115bd63985170260d1a52880463
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index c6720f2..bec9397 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -142,7 +142,7 @@
      * shall retain the internal state so that it will honor index and
      * buffer from previous calls of requestNewBuffer().
      */
-    virtual std::unique_ptr<InputBuffers> toArrayMode() = 0;
+    virtual std::unique_ptr<InputBuffers> toArrayMode(size_t size) = 0;
 
 protected:
     // Pool to obtain blocks for input buffers.
@@ -196,7 +196,7 @@
      * shall retain the internal state so that it will honor index and
      * buffer from previous calls of registerBuffer().
      */
-    virtual std::unique_ptr<OutputBuffers> toArrayMode() = 0;
+    virtual std::unique_ptr<OutputBuffers> toArrayMode(size_t size) = 0;
 
     /**
      * Initialize SkipCutBuffer object.
@@ -267,7 +267,10 @@
 
 // TODO: get this info from component
 const static size_t kMinInputBufferArraySize = 8;
-const static size_t kMinOutputBufferArraySize = 16;
+const static size_t kMaxPipelineCapacity = 16;
+const static size_t kChannelOutputDelay = 2;
+const static size_t kMinOutputBufferArraySize = kMaxPipelineCapacity +
+                                                kChannelOutputDelay;
 const static size_t kLinearBufferSize = 1048576;
 // This can fit 4K RGBA frame, and most likely client won't need more than this.
 const static size_t kMaxLinearBufferSize = 3840 * 2160 * 4;
@@ -644,7 +647,8 @@
 
     bool isArrayMode() const final { return true; }
 
-    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode() final {
+    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode(
+            size_t) final {
         return nullptr;
     }
 
@@ -711,7 +715,8 @@
         mImpl.flush();
     }
 
-    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode() final {
+    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode(
+            size_t size) final {
         int32_t capacity = kLinearBufferSize;
         (void)mFormat->findInt32(C2_NAME_STREAM_MAX_BUFFER_SIZE_SETTING, &capacity);
 
@@ -721,7 +726,7 @@
         array->setFormat(mFormat);
         array->initialize(
                 mImpl,
-                kMinInputBufferArraySize,
+                size,
                 [this, capacity] () -> sp<Codec2Buffer> { return alloc(capacity); });
         return std::move(array);
     }
@@ -839,7 +844,8 @@
         // track of the flushed work.
     }
 
-    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode() final {
+    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode(
+            size_t size) final {
         std::shared_ptr<C2Allocator> alloc;
         c2_status_t err = mStore->fetchAllocator(mPool->getAllocatorId(), &alloc);
         if (err != C2_OK) {
@@ -851,7 +857,7 @@
         array->setFormat(mFormat);
         array->initialize(
                 mImpl,
-                kMinInputBufferArraySize,
+                size,
                 [format = mFormat, alloc]() -> sp<Codec2Buffer> {
                     return new GraphicMetadataBuffer(format, alloc);
                 });
@@ -896,14 +902,15 @@
         // track of the flushed work.
     }
 
-    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode() final {
+    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode(
+            size_t size) final {
         std::unique_ptr<InputBuffersArray> array(
                 new InputBuffersArray(mComponentName.c_str(), "2D-BB-Input[N]"));
         array->setPool(mPool);
         array->setFormat(mFormat);
         array->initialize(
                 mImpl,
-                kMinInputBufferArraySize,
+                size,
                 [pool = mPool, format = mFormat, lbp = mLocalBufferPool]() -> sp<Codec2Buffer> {
                     C2MemoryUsage usage = { C2MemoryUsage::CPU_READ, C2MemoryUsage::CPU_WRITE };
                     return AllocateGraphicBuffer(
@@ -934,7 +941,8 @@
     void flush() override {
     }
 
-    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode() final {
+    std::unique_ptr<CCodecBufferChannel::InputBuffers> toArrayMode(
+            size_t) final {
         return nullptr;
     }
 
@@ -960,7 +968,8 @@
 
     bool isArrayMode() const final { return true; }
 
-    std::unique_ptr<CCodecBufferChannel::OutputBuffers> toArrayMode() final {
+    std::unique_ptr<CCodecBufferChannel::OutputBuffers> toArrayMode(
+            size_t) final {
         return nullptr;
     }
 
@@ -1074,13 +1083,14 @@
         // track of the flushed work.
     }
 
-    std::unique_ptr<CCodecBufferChannel::OutputBuffers> toArrayMode() override {
+    std::unique_ptr<CCodecBufferChannel::OutputBuffers> toArrayMode(
+            size_t size) override {
         std::unique_ptr<OutputBuffersArray> array(new OutputBuffersArray(mComponentName.c_str()));
         array->setFormat(mFormat);
         array->transferSkipCutBuffer(mSkipCutBuffer);
         array->initialize(
                 mImpl,
-                kMinOutputBufferArraySize,
+                size,
                 [this]() { return allocateArrayBuffer(); });
         return std::move(array);
     }
@@ -1245,6 +1255,77 @@
     }
 }
 
+// CCodecBufferChannel::PipelineCapacity
+
+CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
+      : component(0), output(0), mName("<UNKNOWN COMPONENT>") {
+}
+
+void CCodecBufferChannel::PipelineCapacity::initialize(
+        int newComponent,
+        int newOutput,
+        const char* newName,
+        const char* callerTag) {
+    component.store(newComponent, std::memory_order_relaxed);
+    output.store(newOutput, std::memory_order_relaxed);
+    mName = newName;
+    ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
+          "pipeline availability initialized ==> "
+          "component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            newComponent, newOutput);
+}
+
+bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
+    int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
+    int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
+    if (prevComponent > 0 && prevOutput > 0) {
+        ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
+              "pipeline availability -1 all ==> component = %d, output = %d",
+                mName, callerTag ? callerTag : "*",
+                prevComponent - 1, prevOutput - 1);
+        return true;
+    }
+    component.fetch_add(1, std::memory_order_relaxed);
+    output.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
+          "pipeline availability unchanged ==> component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            prevComponent, prevOutput);
+    return false;
+}
+
+void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
+    int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
+    int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::free(): "
+          "pipeline availability +1 all ==> component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            prevComponent + 1, prevOutput + 1);
+}
+
+int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
+        const char* callerTag) {
+    int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
+          "pipeline availability +1 component ==> component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            prevComponent + 1, output.load(std::memory_order_relaxed));
+    return prevComponent + 1;
+}
+
+int CCodecBufferChannel::PipelineCapacity::freeOutputSlot(
+        const char* callerTag) {
+    int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::freeOutputSlot(): "
+          "pipeline availability +1 output ==> component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            component.load(std::memory_order_relaxed), prevOutput + 1);
+    return prevOutput + 1;
+}
+
+// CCodecBufferChannel
+
 CCodecBufferChannel::CCodecBufferChannel(
         const std::shared_ptr<CCodecCallback> &callback)
     : mHeapSeqNum(-1),
@@ -1252,7 +1333,7 @@
       mFrameIndex(0u),
       mFirstValidFrameIndex(0u),
       mMetaMode(MODE_NONE),
-      mPendingFeed(0),
+      mAvailablePipelineCapacity(),
       mInputMetEos(false) {
 }
 
@@ -1471,27 +1552,26 @@
 }
 
 void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
-    while (!mInputMetEos && mPendingFeed > 0) {
+    while (!mInputMetEos &&
+           mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
         sp<MediaCodecBuffer> inBuffer;
         size_t index;
         {
             Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
             if (!(*buffers)->requestNewBuffer(&index, &inBuffer)) {
                 ALOGV("[%s] no new buffer available", mName);
+                mAvailablePipelineCapacity.free("feedInputBufferIfAvailable");
                 break;
             }
         }
         ALOGV("[%s] new input index = %zu [%p]", mName, index, inBuffer.get());
         mCallback->onInputBufferAvailable(index, inBuffer);
-        ALOGV("[%s] %s: pending feed -1 from %u", mName, __func__, mPendingFeed.load());
-        --mPendingFeed;
     }
 }
 
 status_t CCodecBufferChannel::renderOutputBuffer(
         const sp<MediaCodecBuffer> &buffer, int64_t timestampNs) {
-    ALOGV("[%s] %s: pending feed +1 from %u", mName, __func__, mPendingFeed.load());
-    ++mPendingFeed;
+    mAvailablePipelineCapacity.freeOutputSlot("renderOutputBuffer");
     feedInputBufferIfAvailable();
     std::shared_ptr<C2Buffer> c2Buffer;
     {
@@ -1639,9 +1719,9 @@
     {
         Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
         if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
+            buffers.unlock();
             released = true;
-            ALOGV("[%s] %s: pending feed +1 from %u", mName, __func__, mPendingFeed.load());
-            ++mPendingFeed;
+            mAvailablePipelineCapacity.freeOutputSlot("discardBuffer");
         }
     }
     feedInputBufferIfAvailable();
@@ -1656,7 +1736,7 @@
     Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
 
     if (!(*buffers)->isArrayMode()) {
-        *buffers = (*buffers)->toArrayMode();
+        *buffers = (*buffers)->toArrayMode(kMinInputBufferArraySize);
     }
 
     (*buffers)->getArray(array);
@@ -1667,7 +1747,7 @@
     Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
 
     if (!(*buffers)->isArrayMode()) {
-        *buffers = (*buffers)->toArrayMode();
+        *buffers = (*buffers)->toArrayMode(kMinOutputBufferArraySize);
     }
 
     (*buffers)->getArray(array);
@@ -1675,8 +1755,8 @@
 
 status_t CCodecBufferChannel::start(
         const sp<AMessage> &inputFormat, const sp<AMessage> &outputFormat) {
-    C2StreamFormatConfig::input iStreamFormat(0u);
-    C2StreamFormatConfig::output oStreamFormat(0u);
+    C2StreamBufferTypeSetting::input iStreamFormat(0u);
+    C2StreamBufferTypeSetting::output oStreamFormat(0u);
     c2_status_t err = mComponent->query(
             { &iStreamFormat, &oStreamFormat },
             {},
@@ -1686,6 +1766,27 @@
         return UNKNOWN_ERROR;
     }
 
+    // Query delays
+    C2PortRequestedDelayTuning::input inputDelay;
+    C2PortRequestedDelayTuning::output outputDelay;
+    C2RequestedPipelineDelayTuning pipelineDelay;
+#if 0
+    err = mComponent->query(
+            { &inputDelay, &pipelineDelay, &outputDelay },
+            {},
+            C2_DONT_BLOCK,
+            nullptr);
+    mAvailablePipelineCapacity.initialize(
+            inputDelay + pipelineDelay,
+            inputDelay + pipelineDelay + outputDelay,
+            mName);
+#else
+    mAvailablePipelineCapacity.initialize(
+            kMaxPipelineCapacity,
+            kMinOutputBufferArraySize,
+            mName);
+#endif
+
     // TODO: get this from input format
     bool secure = mComponent->getName().find(".secure") != std::string::npos;
 
@@ -1912,14 +2013,13 @@
                 }
                 if (delay || padding) {
                     // We need write access to the buffers..
-                    (*buffers) = (*buffers)->toArrayMode();
+                    (*buffers) = (*buffers)->toArrayMode(kMinOutputBufferArraySize);
                     (*buffers)->initSkipCutBuffer(delay, padding, sampleRate, channelCount);
                 }
             }
         }
     }
 
-    mPendingFeed = 0;
     mInputMetEos = false;
     mSync.start();
     return OK;
@@ -1977,7 +2077,14 @@
                 }
             }
             if (post) {
-                mCallback->onInputBufferAvailable(index, buffer);
+                if (mAvailablePipelineCapacity.allocate(
+                        "requestInitialInputBuffers")) {
+                    mCallback->onInputBufferAvailable(index, buffer);
+                } else {
+                    ALOGD("[%s] pipeline is full while "
+                          "requesting %zu-th input buffer",
+                            mName, i);
+                }
             }
         }
     }
@@ -2029,9 +2136,9 @@
 void CCodecBufferChannel::onWorkDone(
         std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
         const C2StreamInitDataInfo::output *initData) {
+    mAvailablePipelineCapacity.freeComponentSlot("onWorkDone");
     if (handleWork(std::move(work), outputFormat, initData)) {
-        ALOGV("[%s] onWorkDone: pending feed +1 from %u", mName, mPendingFeed.load());
-        ++mPendingFeed;
+        mAvailablePipelineCapacity.freeOutputSlot("onWorkDone");
     }
     feedInputBufferIfAvailable();
 }
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index c043686..f68e119 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -241,7 +241,79 @@
     std::shared_ptr<InputSurfaceWrapper> mInputSurface;
 
     MetaMode mMetaMode;
-    std::atomic_int32_t mPendingFeed;
+
+    // PipelineCapacity is used in the input buffer gating logic.
+    //
+    // There are two criteria that need to be met before
+    // onInputBufferAvailable() is called:
+    // 1. The number of work items that have been received by
+    //    CCodecBufferChannel whose outputs have not been returned from the
+    //    component (by calling onWorkDone()) does not exceed a certain limit.
+    //    Let us call this the "component" capacity.
+    // 2. The number of work items that have been received by
+    //    CCodecBufferChannel whose outputs have not been released by the app
+    //    (either by calling discardBuffer() on an output buffer or calling
+    //    renderOutputBuffer()) does not exceed a certain limit. Let us call
+    //    this the "output" capacity.
+    //
+    // These two criteria guarantee that the new input buffer that arrives from
+    // the invocation of onInputBufferAvailable() will not
+    // 1. overload the component; or
+    // 2. overload CCodecBufferChannel's output buffers if the component
+    //    finishes all the pending work right away.
+    //
+    struct PipelineCapacity {
+        // The number of available component capacity.
+        std::atomic_int component;
+        // The number of available output capacity.
+        std::atomic_int output;
+
+        PipelineCapacity();
+        // Set the values of component and output.
+        void initialize(int newComponent, int newOutput,
+                        const char* newName = "<UNKNOWN COMPONENT>",
+                        const char* callerTag = nullptr);
+
+        // Return true and decrease component and output by one if they are both
+        // greater than zero; return false otherwise.
+        //
+        // callerTag is used for logging only.
+        //
+        // allocate() is called by CCodecBufferChannel to check whether it can
+        // receive another input buffer. If the return value is true,
+        // onInputBufferAvailable() can (and will) be called afterwards.
+        bool allocate(const char* callerTag = nullptr);
+
+        // Increase component and output by one.
+        //
+        // callerTag is used for logging only.
+        //
+        // free() is called by CCodecBufferChannel after allocate() returns true
+        // but onInputBufferAvailable() cannot be called for any reasons.
+        void free(const char* callerTag = nullptr);
+
+        // Increase component by one and return the updated value.
+        //
+        // callerTag is used for logging only.
+        //
+        // freeComponentSlot() is called by CCodecBufferChannel when onWorkDone() is
+        // called.
+        int freeComponentSlot(const char* callerTag = nullptr);
+
+        // Increase output by one and return the updated value.
+        //
+        // callerTag is used for logging only.
+        //
+        // freeOutputSlot() is called by CCodecBufferChannel when discardBuffer() is
+        // called on an output buffer or when renderOutputBuffer() is called.
+        int freeOutputSlot(const char* callerTag = nullptr);
+
+    private:
+        // Component name. Used for logging.
+        const char* mName;
+    };
+    PipelineCapacity mAvailablePipelineCapacity;
+
     std::atomic_bool mInputMetEos;
 
     inline bool hasCryptoOrDescrambler() {