RESTRICT AUTOMERGE Gate input buffers from input surface

Test: Record videos with Camera app

Bug: 123848562
Change-Id: I50e7d945a62d59d6df774051e557238f1219b32d
(cherry picked from commit 520e1d07a7870f4256434d33e3ef9a21e6796d41)
diff --git a/media/sfplugin/C2OMXNode.cpp b/media/sfplugin/C2OMXNode.cpp
index 73cbbc6..92b86b6 100644
--- a/media/sfplugin/C2OMXNode.cpp
+++ b/media/sfplugin/C2OMXNode.cpp
@@ -49,8 +49,10 @@
 
 }  // namespace
 
-C2OMXNode::C2OMXNode(const std::shared_ptr<Codec2Client::Component> &comp)
-    : mComp(comp), mFrameIndex(0), mWidth(0), mHeight(0),
+C2OMXNode::C2OMXNode(const std::shared_ptr<Codec2Client::Component> &comp,
+                     const std::shared_ptr<InputGater> &inputGater)
+    : mComp(comp), mInputGater(inputGater),
+      mFrameIndex(0), mWidth(0), mHeight(0),
       mAdjustTimestampGapUs(0), mFirstInputFrame(true) {
     // TODO: read from intf()
     if (!strncmp(comp->getName().c_str(), "c2.android.", 11)) {
@@ -212,6 +214,7 @@
         sp<Fence> fence = new Fence(fenceFd);
         fence->waitForever(LOG_TAG);
     }
+
     std::shared_ptr<Codec2Client::Component> comp = mComp.lock();
     if (!comp) {
         return NO_INIT;
@@ -287,6 +290,11 @@
     std::list<std::unique_ptr<C2Work>> items;
     items.push_back(std::move(work));
 
+    std::shared_ptr<InputGater> inputGater = mInputGater.lock();
+    if (!inputGater || !inputGater->canQueue()) {
+        return OK;
+    }
+
     c2_status_t err = comp->queue(&items);
     if (err != C2_OK) {
         return UNKNOWN_ERROR;
diff --git a/media/sfplugin/C2OMXNode.h b/media/sfplugin/C2OMXNode.h
index b5a815e..a81a993 100644
--- a/media/sfplugin/C2OMXNode.h
+++ b/media/sfplugin/C2OMXNode.h
@@ -24,6 +24,8 @@
 #include <media/OMXBuffer.h>
 #include <codec2/hidl/client.h>
 
+#include "InputSurfaceWrapper.h"
+
 namespace android {
 
 /**
@@ -33,7 +35,12 @@
  * to work in any other usage than IGraphicBufferSource.
  */
 struct C2OMXNode : public BnOMXNode {
-    explicit C2OMXNode(const std::shared_ptr<Codec2Client::Component> &comp);
+
+    using InputGater = InputSurfaceWrapper::InputGater;
+
+    explicit C2OMXNode(
+            const std::shared_ptr<Codec2Client::Component> &comp,
+            const std::shared_ptr<InputGater> &inputGater);
     ~C2OMXNode() override = default;
 
     // IOMXNode
@@ -80,6 +87,7 @@
 
 private:
     std::weak_ptr<Codec2Client::Component> mComp;
+    std::weak_ptr<InputGater> mInputGater;
     sp<IOMXBufferSource> mBufferSource;
     std::shared_ptr<C2Allocator> mAllocator;
     std::atomic_uint64_t mFrameIndex;
diff --git a/media/sfplugin/CCodec.cpp b/media/sfplugin/CCodec.cpp
index 20c3de3..e370edd 100644
--- a/media/sfplugin/CCodec.cpp
+++ b/media/sfplugin/CCodec.cpp
@@ -139,7 +139,8 @@
 
     ~C2InputSurfaceWrapper() override = default;
 
-    status_t connect(const std::shared_ptr<Codec2Client::Component> &comp) override {
+    status_t connect(const std::shared_ptr<Codec2Client::Component> &comp,
+                     const std::shared_ptr<InputGater>& /*inputGater*/) override {
         if (mConnection != nullptr) {
             return ALREADY_EXISTS;
         }
@@ -191,8 +192,9 @@
     }
     ~GraphicBufferSourceWrapper() override = default;
 
-    status_t connect(const std::shared_ptr<Codec2Client::Component> &comp) override {
-        mNode = new C2OMXNode(comp);
+    status_t connect(const std::shared_ptr<Codec2Client::Component> &comp,
+                     const std::shared_ptr<InputGater> &inputGater) override {
+        mNode = new C2OMXNode(comp, inputGater);
         mNode->setFrameSize(mWidth, mHeight);
 
         // NOTE: we do not use/pass through color aspects from GraphicBufferSource as we
@@ -373,6 +375,7 @@
     sp<C2OMXNode> mNode;
     uint32_t mWidth;
     uint32_t mHeight;
+    std::shared_ptr<C2OMXNode::InputGater> mInputGater;
     Config mConfig;
 };
 
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index 2cdea6e..d6d6d68 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -1428,6 +1428,31 @@
     return false;
 }
 
+bool CCodecBufferChannel::PipelineCapacity::allocateOutput(const char* callerTag) {
+    int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
+    int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
+    if (prevComponent > 0 && prevOutput > 1) { // One output reserved for csd.
+        ALOGV("[%s] %s -- PipelineCapacity::allocateOutput() returns true: "
+              "pipeline availability -1 output and -1 component ==> "
+              "input = %d, component = %d, output = %d",
+                mName, callerTag ? callerTag : "*",
+                input.load(std::memory_order_relaxed),
+                prevComponent - 1,
+                prevOutput - 1);
+        return true;
+    }
+    component.fetch_add(1, std::memory_order_relaxed);
+    output.fetch_add(1, std::memory_order_relaxed);
+    ALOGV("[%s] %s -- PipelineCapacity::allocateOutput() returns false: "
+          "pipeline availability unchanged ==> "
+          "input = %d, component = %d, output = %d",
+            mName, callerTag ? callerTag : "*",
+            input.load(std::memory_order_relaxed),
+            prevComponent,
+            prevOutput);
+    return false;
+}
+
 void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
     int prevInput = input.fetch_add(1, std::memory_order_relaxed);
     int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
@@ -1484,6 +1509,23 @@
     return prevOutput + 1;
 }
 
+// InputGater
+struct CCodecBufferChannel::InputGater : public InputSurfaceWrapper::InputGater {
+    InputGater(const std::shared_ptr<CCodecBufferChannel>& owner)
+          : mOwner(owner) {}
+    virtual bool canQueue() override {
+        std::shared_ptr<CCodecBufferChannel> owner = mOwner.lock();
+        if (!owner) {
+            return false;
+        }
+        QueueGuard guard(owner->mSync);
+        return guard.isRunning() &&
+                owner->mAvailablePipelineCapacity.allocateOutput("InputGater");
+    }
+private:
+    std::weak_ptr<CCodecBufferChannel> mOwner;
+};
+
 // CCodecBufferChannel
 
 CCodecBufferChannel::CCodecBufferChannel(
@@ -1517,7 +1559,8 @@
         const std::shared_ptr<InputSurfaceWrapper> &surface) {
     ALOGV("[%s] setInputSurface", mName);
     mInputSurface = surface;
-    return mInputSurface->connect(mComponent);
+    mInputGater = std::make_shared<InputGater>(shared_from_this());
+    return mInputSurface->connect(mComponent, mInputGater);
 }
 
 status_t CCodecBufferChannel::signalEndOfInputStream() {
@@ -2318,6 +2361,7 @@
     if (mInputSurface != nullptr) {
         mInputSurface.reset();
     }
+    mInputGater.reset();
 }
 
 void CCodecBufferChannel::flush(const std::list<std::unique_ptr<C2Work>> &flushedWork) {
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index 86739ab..3ef58be 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -308,6 +308,18 @@
         // onInputBufferAvailable() can (and will) be called afterwards.
         bool allocate(const char* callerTag = nullptr);
 
+        // Return true and decrease #component and #output by one if they are
+        // all greater than zero; return false otherwise.
+        //
+        // callerTag is used for logging only.
+        //
+        // allocateOutput() is called by CCodecBufferChannel::InputGater to
+        // check whether the component can accept a queue operation. This is
+        // used when the input comes from an input surface rather than from
+        // queueInputBuffer(). Calling allocateOutput() is similar to calling
+        // allocate() when the input capacity is infinite.
+        bool allocateOutput(const char* callerTag = nullptr);
+
         // Increase #input, #component and #output by one.
         //
         // callerTag is used for logging only.
@@ -356,6 +368,9 @@
     inline bool hasCryptoOrDescrambler() {
         return mCrypto != NULL || mDescrambler != NULL;
     }
+
+    struct InputGater;
+    std::shared_ptr<InputGater> mInputGater;
 };
 
 // Conversion of a c2_status_t value to a status_t value may depend on the
diff --git a/media/sfplugin/InputSurfaceWrapper.h b/media/sfplugin/InputSurfaceWrapper.h
index d9c4eec..12e58b2 100644
--- a/media/sfplugin/InputSurfaceWrapper.h
+++ b/media/sfplugin/InputSurfaceWrapper.h
@@ -33,15 +33,33 @@
 
     virtual ~InputSurfaceWrapper() = default;
 
+    struct InputGater {
+        /**
+         * Try to reserve an input in the pipeline. If this function returns
+         * true, the pipeline slot is reserved. An implementation of
+         * InputSurfaceWrapper should not call Component::queue() if canQueue()
+         * returns false.
+         *
+         * \return true if the input can be reserved; false otherwise.
+         */
+        virtual bool canQueue() = 0;
+        virtual ~InputGater() = default;
+    };
+
     /**
      * Connect the surface with |comp|. A surface can
      * connect to at most one component at a time.
      *
+     * `inputGater->canQueue()` will be called before queuing a buffer. If it
+     * returns false, the buffer will not be queued to `comp` and simply
+     * dropped.
+     *
      * \return OK               successfully connected to |comp|
      * \return ALREADY_EXISTS   already connected to another component.
      */
     virtual status_t connect(
-            const std::shared_ptr<Codec2Client::Component> &comp) = 0;
+            const std::shared_ptr<Codec2Client::Component> &comp,
+            const std::shared_ptr<InputGater> &inputGater) = 0;
 
     /**
      * Disconnect the surface from the component if any.