Limit the number of live input buffers in CCodec

Test: m cts && cts-tradefed run cts -m CtsMediaTestCases \
-t android.media.cts.AdaptivePlaybackTest

Bug: 113292710
Change-Id: Ic31162345daa6d1052f4064854eaf4b52a3929d6
Merged-In: Ic31162345daa6d1052f4064854eaf4b52a3929d6
(cherry picked from commit 100f6fb5ab164c3d118c88c931e2a70aba85b796)
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index 5049914..90b33de 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -1330,59 +1330,124 @@
 // CCodecBufferChannel::PipelineCapacity
 
 CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
-      : component(0), output(0), mName("<UNKNOWN COMPONENT>") {
+      : input(0), lentInput(0), component(0), output(0),
+        mName("<UNKNOWN COMPONENT>") {
 }
 
 void CCodecBufferChannel::PipelineCapacity::initialize(
+        int newInput,
         int newComponent,
         int newOutput,
         const char* newName,
         const char* callerTag) {
+    input.store(newInput, std::memory_order_relaxed);
+    lentInput.store(0, std::memory_order_relaxed);
     component.store(newComponent, std::memory_order_relaxed);
     output.store(newOutput, std::memory_order_relaxed);
     mName = newName;
     ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
           "pipeline availability initialized ==> "
-          "component = %d, output = %d",
+          "input = %d (-%d), component = %d, output = %d",
             mName, callerTag ? callerTag : "*",
-            newComponent, newOutput);
+            newInput, 0, newComponent, newOutput);
 }
 
 bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
+    int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
     int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
     int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
-    if (prevComponent > 0 && prevOutput > 0) {
+    if (prevInput > 0 && prevComponent > 0 && prevOutput > 0) {
         ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
-              "pipeline availability -1 all ==> component = %d, output = %d",
+              "pipeline availability -1 all ==> "
+              "input = %d (-%d), component = %d, output = %d",
                 mName, callerTag ? callerTag : "*",
-                prevComponent - 1, prevOutput - 1);
+                prevInput - 1,
+                lentInput.load(std::memory_order_relaxed),
+                prevComponent - 1,
+                prevOutput - 1);
         return true;
     }
+    input.fetch_add(1, std::memory_order_relaxed);
     component.fetch_add(1, std::memory_order_relaxed);
     output.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
-          "pipeline availability unchanged ==> component = %d, output = %d",
+          "pipeline availability unchanged ==> "
+          "input = %d (-%d), component = %d, output = %d",
             mName, callerTag ? callerTag : "*",
-            prevComponent, prevOutput);
+            prevInput,
+            lentInput.load(std::memory_order_relaxed),
+            prevComponent,
+            prevOutput);
     return false;
 }
 
 void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
+    int prevInput = input.fetch_add(1, std::memory_order_relaxed);
     int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
     int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::free(): "
-          "pipeline availability +1 all ==> component = %d, output = %d",
+          "pipeline availability +1 all ==> "
+          "input = %d (-%d), component = %d, output = %d",
             mName, callerTag ? callerTag : "*",
-            prevComponent + 1, prevOutput + 1);
+            prevInput + 1,
+            lentInput.load(std::memory_order_relaxed),
+            prevComponent + 1,
+            prevOutput + 1);
+}
+
+int CCodecBufferChannel::PipelineCapacity::lendInputSlot(
+        const char* callerTag) {
+    int prevInput = input.fetch_add(1, std::memory_order_relaxed);
+    int prevLentInput = lentInput.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::lendInputSlot(): "
+          "pipeline availability +1 (-1) input ==> "
+          "input = %d (-%d), component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            prevInput + 1,
+            prevLentInput + 1,
+            component.load(std::memory_order_relaxed),
+            output.load(std::memory_order_relaxed));
+    return prevInput + 1;
+}
+
+int CCodecBufferChannel::PipelineCapacity::freeInputSlot(
+        const char* callerTag) {
+    int prevLentInput = lentInput.fetch_sub(1, std::memory_order_relaxed);
+    if (prevLentInput > 0) {
+        ALOGV("[%s] %s -- PipelineCapacity::freeInputSlot(): "
+              "pipeline availability +0 (+1) input ==> "
+              "input = %d (-%d), component = %d, output = %d",
+                mName, callerTag ? callerTag : "*",
+                input.load(std::memory_order_relaxed),
+                prevLentInput - 1,
+                component.load(std::memory_order_relaxed),
+                output.load(std::memory_order_relaxed));
+        return input.load(std::memory_order_relaxed);
+    }
+    lentInput.fetch_add(1, std::memory_order_relaxed);
+    int prevInput = input.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::freeInputSlot(): "
+          "pipeline availability +1 (+0) input ==> "
+          "input = %d (-%d), component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            prevInput + 1,
+            prevLentInput,
+            component.load(std::memory_order_relaxed),
+            output.load(std::memory_order_relaxed));
+    return prevInput + 1;
 }
 
 int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
         const char* callerTag) {
     int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
-          "pipeline availability +1 component ==> component = %d, output = %d",
+          "pipeline availability +1 component ==> "
+          "input = %d (-%d), component = %d, output = %d",
             mName, callerTag ? callerTag : "*",
-            prevComponent + 1, output.load(std::memory_order_relaxed));
+            input.load(std::memory_order_relaxed),
+            lentInput.load(std::memory_order_relaxed),
+            prevComponent + 1,
+            output.load(std::memory_order_relaxed));
     return prevComponent + 1;
 }
 
@@ -1390,9 +1455,13 @@
         const char* callerTag) {
     int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::freeOutputSlot(): "
-          "pipeline availability +1 output ==> component = %d, output = %d",
+          "pipeline availability +1 output ==> "
+          "input = %d (-%d), component = %d, output = %d",
             mName, callerTag ? callerTag : "*",
-            component.load(std::memory_order_relaxed), prevOutput + 1);
+            input.load(std::memory_order_relaxed),
+            lentInput.load(std::memory_order_relaxed),
+            component.load(std::memory_order_relaxed),
+            prevOutput + 1);
     return prevOutput + 1;
 }
 
@@ -1855,11 +1924,13 @@
             C2_DONT_BLOCK,
             nullptr);
     mAvailablePipelineCapacity.initialize(
+            inputDelay,
             inputDelay + pipelineDelay,
             inputDelay + pipelineDelay + outputDelay,
             mName);
 #else
     mAvailablePipelineCapacity.initialize(
+            kMinInputBufferArraySize,
             kMaxPipelineCapacity,
             kMinOutputBufferArraySize,
             mName);
@@ -2213,6 +2284,7 @@
 void CCodecBufferChannel::onWorkDone(
         std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
         const C2StreamInitDataInfo::output *initData) {
+    mAvailablePipelineCapacity.freeInputSlot("onWorkDone");
     mAvailablePipelineCapacity.freeComponentSlot("onWorkDone");
     if (handleWork(std::move(work), outputFormat, initData)) {
         mAvailablePipelineCapacity.freeOutputSlot("onWorkDone");
@@ -2228,6 +2300,7 @@
         newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
     }
     if (newInputSlotAvailable) {
+        mAvailablePipelineCapacity.lendInputSlot("onInputBufferDone");
         feedInputBufferIfAvailable();
     }
 }
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index b4877a8..f9e086f 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -257,38 +257,50 @@
 
     // PipelineCapacity is used in the input buffer gating logic.
     //
-    // There are two criteria that need to be met before
+    // There are three criteria that need to be met before
     // onInputBufferAvailable() is called:
-    // 1. The number of work items that have been received by
+    // 1. The number of input buffers that have been received by
+    //    CCodecBufferChannel but not returned via onWorkDone() or
+    //    onInputBufferDone() does not exceed a certain limit. (Let us call this
+    //    number the "input" capacity.)
+    // 2. The number of work items that have been received by
     //    CCodecBufferChannel whose outputs have not been returned from the
     //    component (by calling onWorkDone()) does not exceed a certain limit.
-    //    Let us call this the "component" capacity.
-    // 2. The number of work items that have been received by
+    //    (Let us call this the "component" capacity.)
+    // 3. The number of work items that have been received by
     //    CCodecBufferChannel whose outputs have not been released by the app
     //    (either by calling discardBuffer() on an output buffer or calling
-    //    renderOutputBuffer()) does not exceed a certain limit. Let us call
-    //    this the "output" capacity.
+    //    renderOutputBuffer()) does not exceed a certain limit. (Let us call
+    //    this the "output" capacity.)
     //
-    // These two criteria guarantee that the new input buffer that arrives from
+    // These three criteria guarantee that a new input buffer that arrives from
     // the invocation of onInputBufferAvailable() will not
-    // 1. overload the component; or
-    // 2. overload CCodecBufferChannel's output buffers if the component
+    // 1. overload CCodecBufferChannel's input buffers;
+    // 2. overload the component; or
+    // 3. overload CCodecBufferChannel's output buffers if the component
     //    finishes all the pending work right away.
     //
     struct PipelineCapacity {
+        // The number of available input capacity.
+        std::atomic_int input;
+        // The number of input buffers that have been released by
+        // onInputBufferDone() but not onWorkDone(). Once onWorkDone() is
+        // called, this number will decrease unless it is already zero; in
+        // which case #input will increase instead.
+        std::atomic_int lentInput;
         // The number of available component capacity.
         std::atomic_int component;
         // The number of available output capacity.
         std::atomic_int output;
 
         PipelineCapacity();
-        // Set the values of component and output.
-        void initialize(int newComponent, int newOutput,
+        // Set the values of #component and #output.
+        void initialize(int newInput, int newComponent, int newOutput,
                         const char* newName = "<UNKNOWN COMPONENT>",
                         const char* callerTag = nullptr);
 
-        // Return true and decrease component and output by one if they are both
-        // greater than zero; return false otherwise.
+        // Return true and decrease #input, #component and #output by one if
+        // they are all greater than zero; return false otherwise.
         //
         // callerTag is used for logging only.
         //
@@ -297,15 +309,37 @@
         // onInputBufferAvailable() can (and will) be called afterwards.
         bool allocate(const char* callerTag = nullptr);
 
-        // Increase component and output by one.
+        // Increase #input, #component and #output by one.
         //
         // callerTag is used for logging only.
         //
         // free() is called by CCodecBufferChannel after allocate() returns true
-        // but onInputBufferAvailable() cannot be called for any reasons.
+        // but onInputBufferAvailable() cannot be called for any reasons. It
+        // essentially undoes an allocate() call.
         void free(const char* callerTag = nullptr);
 
-        // Increase component by one and return the updated value.
+        // Increase #input and #lentInput by 1.
+        //
+        // callerTag is used for logging only.
+        //
+        // lendInputSlot() is called by CCodecBufferChannel when
+        // onInputBufferDone() is called. This means an input buffer has been
+        // freed, but a subsequent call to onWorkDone() will not free an input
+        // buffer.
+        int lendInputSlot(const char* callerTag = nullptr);
+
+        // Increase #input by one if #lentInput is 0; otherwise, decrease
+        // #lentInput by 1.
+        //
+        // callerTag is used for logging only.
+        //
+        // freeInputSlot() is called by CCodecBufferChannel when onWorkDone() is
+        // called. If #lentInput is not zero, that means the input buffer for
+        // the returned work has already been freed, so #input will not
+        // increase.
+        int freeInputSlot(const char* callerTag = nullptr);
+
+        // Increase #component by one and return the updated value.
         //
         // callerTag is used for logging only.
         //
@@ -313,7 +347,7 @@
         // called.
         int freeComponentSlot(const char* callerTag = nullptr);
 
-        // Increase output by one and return the updated value.
+        // Increase #output by one and return the updated value.
         //
         // callerTag is used for logging only.
         //