CCodec: add reordering logic

Bug: 111877607
Test: atest --test-mapping frameworks/av/media/libstagefright:postsubmit
Merged-In: I54437645af17b9aee914b2783546e0f50d9dea9c
Change-Id: I54437645af17b9aee914b2783546e0f50d9dea9c
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index 6b2c2a0..defdcde 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -1050,7 +1050,7 @@
                     return clientBuffer->canCopy(buffer);
                 });
         if (err != OK) {
-            ALOGD("[%s] grabBuffer failed: %d", mName, err);
+            ALOGV("[%s] grabBuffer failed: %d", mName, err);
             return false;
         }
         c2Buffer->setFormat(mFormat);
@@ -1330,65 +1330,57 @@
 // CCodecBufferChannel::PipelineCapacity
 
 CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
-      : input(0), component(0), output(0),
+      : input(0), component(0),
         mName("<UNKNOWN COMPONENT>") {
 }
 
 void CCodecBufferChannel::PipelineCapacity::initialize(
         int newInput,
         int newComponent,
-        int newOutput,
         const char* newName,
         const char* callerTag) {
     input.store(newInput, std::memory_order_relaxed);
     component.store(newComponent, std::memory_order_relaxed);
-    output.store(newOutput, std::memory_order_relaxed);
     mName = newName;
     ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
           "pipeline availability initialized ==> "
-          "input = %d, component = %d, output = %d",
+          "input = %d, component = %d",
             mName, callerTag ? callerTag : "*",
-            newInput, newComponent, newOutput);
+            newInput, newComponent);
 }
 
 bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
     int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
     int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
-    int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
-    if (prevInput > 0 && prevComponent > 0 && prevOutput > 0) {
+    if (prevInput > 0 && prevComponent > 0) {
         ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
               "pipeline availability -1 all ==> "
-              "input = %d, component = %d, output = %d",
+              "input = %d, component = %d",
                 mName, callerTag ? callerTag : "*",
                 prevInput - 1,
-                prevComponent - 1,
-                prevOutput - 1);
+                prevComponent - 1);
         return true;
     }
     input.fetch_add(1, std::memory_order_relaxed);
     component.fetch_add(1, std::memory_order_relaxed);
-    output.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
           "pipeline availability unchanged ==> "
-          "input = %d, component = %d, output = %d",
+          "input = %d, component = %d",
             mName, callerTag ? callerTag : "*",
             prevInput,
-            prevComponent,
-            prevOutput);
+            prevComponent);
     return false;
 }
 
 void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
     int prevInput = input.fetch_add(1, std::memory_order_relaxed);
     int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
-    int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::free(): "
           "pipeline availability +1 all ==> "
-          "input = %d, component = %d, output = %d",
+          "input = %d, component = %d",
             mName, callerTag ? callerTag : "*",
             prevInput + 1,
-            prevComponent + 1,
-            prevOutput + 1);
+            prevComponent + 1);
 }
 
 int CCodecBufferChannel::PipelineCapacity::freeInputSlots(
@@ -1398,13 +1390,12 @@
                                     std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::freeInputSlots(%zu): "
           "pipeline availability +%zu input ==> "
-          "input = %d, component = %d, output = %d",
+          "input = %d, component = %d",
             mName, callerTag ? callerTag : "*",
             numDiscardedInputBuffers,
             numDiscardedInputBuffers,
             prevInput + static_cast<int>(numDiscardedInputBuffers),
-            component.load(std::memory_order_relaxed),
-            output.load(std::memory_order_relaxed));
+            component.load(std::memory_order_relaxed));
     return prevInput + static_cast<int>(numDiscardedInputBuffers);
 }
 
@@ -1413,38 +1404,84 @@
     int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
     ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
           "pipeline availability +1 component ==> "
-          "input = %d, component = %d, output = %d",
+          "input = %d, component = %d",
             mName, callerTag ? callerTag : "*",
             input.load(std::memory_order_relaxed),
-            prevComponent + 1,
-            output.load(std::memory_order_relaxed));
+            prevComponent + 1);
     return prevComponent + 1;
 }
 
-int CCodecBufferChannel::PipelineCapacity::freeOutputSlot(
-        const char* callerTag) {
-    int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::freeOutputSlot(): "
-          "pipeline availability +1 output ==> "
-          "input = %d, component = %d, output = %d",
-            mName, callerTag ? callerTag : "*",
-            input.load(std::memory_order_relaxed),
-            component.load(std::memory_order_relaxed),
-            prevOutput + 1);
-    return prevOutput + 1;
+// CCodecBufferChannel::ReorderStash
+
+CCodecBufferChannel::ReorderStash::ReorderStash() {
+    clear();
 }
 
-int CCodecBufferChannel::PipelineCapacity::forceAllocateOutputSlot(
-        const char* callerTag) {
-    int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::forceAllocateOutputSlot(): "
-          "pipeline availability -1 output ==> "
-          "input = %d, component = %d, output = %d",
-            mName, callerTag ? callerTag : "*",
-            input.load(std::memory_order_relaxed),
-            component.load(std::memory_order_relaxed),
-            prevOutput - 1);
-    return prevOutput - 1;
+void CCodecBufferChannel::ReorderStash::clear() {
+    mPending.clear();
+    mStash.clear();
+    mDepth = 0;
+    mKey = C2Config::ORDINAL;
+}
+
+void CCodecBufferChannel::ReorderStash::setDepth(uint32_t depth) {
+    mPending.splice(mPending.end(), mStash);
+    mDepth = depth;
+}
+void CCodecBufferChannel::ReorderStash::setKey(C2Config::ordinal_key_t key) {
+    mPending.splice(mPending.end(), mStash);
+    mKey = key;
+}
+
+bool CCodecBufferChannel::ReorderStash::pop(Entry *entry) {
+    if (mPending.empty()) {
+        return false;
+    }
+    entry->buffer     = mPending.front().buffer;
+    entry->timestamp  = mPending.front().timestamp;
+    entry->flags      = mPending.front().flags;
+    entry->ordinal    = mPending.front().ordinal;
+    mPending.pop_front();
+    return true;
+}
+
+void CCodecBufferChannel::ReorderStash::emplace(
+        const std::shared_ptr<C2Buffer> &buffer,
+        int64_t timestamp,
+        int32_t flags,
+        const C2WorkOrdinalStruct &ordinal) {
+    for (auto it = mStash.begin(); it != mStash.end(); ++it) {
+        if (less(ordinal, it->ordinal)) {
+            mStash.emplace(it, buffer, timestamp, flags, ordinal);
+            return;
+        }
+    }
+    mStash.emplace_back(buffer, timestamp, flags, ordinal);
+    while (!mStash.empty() && mStash.size() > mDepth) {
+        mPending.push_back(mStash.front());
+        mStash.pop_front();
+    }
+}
+
+void CCodecBufferChannel::ReorderStash::defer(
+        const CCodecBufferChannel::ReorderStash::Entry &entry) {
+    mPending.push_front(entry);
+}
+
+bool CCodecBufferChannel::ReorderStash::hasPending() const {
+    return !mPending.empty();
+}
+
+bool CCodecBufferChannel::ReorderStash::less(
+        const C2WorkOrdinalStruct &o1, const C2WorkOrdinalStruct &o2) {
+    switch (mKey) {
+        case C2Config::ORDINAL:   return o1.frameIndex < o2.frameIndex;
+        case C2Config::TIMESTAMP: return o1.timestamp < o2.timestamp;
+        case C2Config::CUSTOM:    return o1.customOrdinal < o2.customOrdinal;
+        default:
+            ALOGD("Unrecognized key; default to timestamp");
+            return o1.frameIndex < o2.frameIndex;
+    }
 }
 
 // CCodecBufferChannel
@@ -1686,6 +1723,7 @@
 
 void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
     while (!mInputMetEos &&
+           !mReorderStash.lock()->hasPending() &&
            mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
         sp<MediaCodecBuffer> inBuffer;
         size_t index;
@@ -1704,8 +1742,6 @@
 
 status_t CCodecBufferChannel::renderOutputBuffer(
         const sp<MediaCodecBuffer> &buffer, int64_t timestampNs) {
-    mAvailablePipelineCapacity.freeOutputSlot("renderOutputBuffer");
-    feedInputBufferIfAvailable();
     std::shared_ptr<C2Buffer> c2Buffer;
     {
         Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
@@ -1716,6 +1752,7 @@
     if (!c2Buffer) {
         return INVALID_OPERATION;
     }
+    sendOutputBuffers();
 
 #if 0
     const std::vector<std::shared_ptr<const C2Info>> infoParams = c2Buffer->info();
@@ -1856,11 +1893,12 @@
         if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
             buffers.unlock();
             released = true;
-            mAvailablePipelineCapacity.freeOutputSlot("discardBuffer");
         }
     }
-    feedInputBufferIfAvailable();
-    if (!released) {
+    if (released) {
+        feedInputBufferIfAvailable();
+        sendOutputBuffers();
+    } else {
         ALOGD("[%s] MediaCodec discarded an unknown buffer", mName);
     }
     return OK;
@@ -1892,12 +1930,31 @@
         const sp<AMessage> &inputFormat, const sp<AMessage> &outputFormat) {
     C2StreamBufferTypeSetting::input iStreamFormat(0u);
     C2StreamBufferTypeSetting::output oStreamFormat(0u);
+    C2PortReorderBufferDepthTuning::output reorderDepth;
+    C2PortReorderKeySetting::output reorderKey;
     c2_status_t err = mComponent->query(
-            { &iStreamFormat, &oStreamFormat },
+            {
+                &iStreamFormat,
+                &oStreamFormat,
+                &reorderDepth,
+                &reorderKey,
+            },
             {},
             C2_DONT_BLOCK,
             nullptr);
-    if (err != C2_OK) {
+    if (err == C2_BAD_INDEX) {
+        if (!iStreamFormat || !oStreamFormat) {
+            return UNKNOWN_ERROR;
+        }
+        Mutexed<ReorderStash>::Locked reorder(mReorderStash);
+        reorder->clear();
+        if (reorderDepth) {
+            reorder->setDepth(reorderDepth.value);
+        }
+        if (reorderKey) {
+            reorder->setKey(reorderKey.value);
+        }
+    } else if (err != C2_OK) {
         return UNKNOWN_ERROR;
     }
 
@@ -2164,7 +2221,6 @@
     mAvailablePipelineCapacity.initialize(
             kMinInputBufferArraySize,
             kMaxPipelineCapacity,
-            kMinOutputBufferArraySize,
             mName);
 #endif
 
@@ -2242,7 +2298,6 @@
     for (const sp<MediaCodecBuffer> &buffer : toBeQueued) {
         if (queueInputBufferInternal(buffer) != OK) {
             mAvailablePipelineCapacity.freeComponentSlot("requestInitialInputBuffers");
-            mAvailablePipelineCapacity.freeOutputSlot("requestInitialInputBuffers");
         }
     }
     return OK;
@@ -2326,11 +2381,14 @@
         return false;
     }
 
-    mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+    if (work->worklets.size() != 1u
+            || !work->worklets.front()
+            || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
+        mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+    }
 
     if (work->result == C2_NOT_FOUND) {
         ALOGD("[%s] flushed work; ignored.", mName);
-        mAvailablePipelineCapacity.freeOutputSlot("handleWork (flushed)");
         return true;
     }
 
@@ -2364,6 +2422,40 @@
         }
     }
 
+    while (!worklet->output.configUpdate.empty()) {
+        std::unique_ptr<C2Param> param;
+        worklet->output.configUpdate.back().swap(param);
+        worklet->output.configUpdate.pop_back();
+        switch (param->coreIndex().coreIndex()) {
+            case C2PortReorderBufferDepthTuning::CORE_INDEX: {
+                C2PortReorderBufferDepthTuning::output reorderDepth;
+                if (reorderDepth.updateFrom(*param)) {
+                    mReorderStash.lock()->setDepth(reorderDepth.value);
+                    ALOGV("[%s] onWorkDone: updated reorder depth to %u",
+                          mName, reorderDepth.value);
+                } else {
+                    ALOGD("[%s] onWorkDone: failed to read reorder depth", mName);
+                }
+                break;
+            }
+            case C2PortReorderKeySetting::CORE_INDEX: {
+                C2PortReorderKeySetting::output reorderKey;
+                if (reorderKey.updateFrom(*param)) {
+                    mReorderStash.lock()->setKey(reorderKey.value);
+                    ALOGV("[%s] onWorkDone: updated reorder key to %u",
+                          mName, reorderKey.value);
+                } else {
+                    ALOGD("[%s] onWorkDone: failed to read reorder key", mName);
+                }
+                break;
+            }
+            default:
+                ALOGV("[%s] onWorkDone: unrecognized config update (%08X)",
+                      mName, param->index());
+                break;
+        }
+    }
+
     if (outputFormat != nullptr) {
         Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
         ALOGD("[%s] onWorkDone: output format changed to %s",
@@ -2388,7 +2480,6 @@
         ALOGV("[%s] onWorkDone: output EOS", mName);
     }
 
-    bool csdReported = false;
     sp<MediaCodecBuffer> outBuffer;
     size_t index;
 
@@ -2418,7 +2509,6 @@
             buffers.unlock();
             mCallback->onOutputBufferAvailable(index, outBuffer);
             buffers.lock();
-            csdReported = true;
         } else {
             ALOGD("[%s] onWorkDone: unable to register csd", mName);
             buffers.unlock();
@@ -2431,11 +2521,6 @@
     if (!buffer && !flags) {
         ALOGV("[%s] onWorkDone: Not reporting output buffer (%lld)",
               mName, work->input.ordinal.frameIndex.peekull());
-        if (!csdReported) {
-            // onOutputBufferAvailable() has not been and will not be called for
-            // this work item.
-            mAvailablePipelineCapacity.freeOutputSlot("handleWork (no csd)");
-        }
         return true;
     }
 
@@ -2455,28 +2540,49 @@
     }
 
     {
-        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
-        if (!(*buffers)->registerBuffer(buffer, &index, &outBuffer)) {
-            ALOGD("[%s] onWorkDone: unable to register output buffer", mName);
-            // TODO
-            // buffers.unlock();
-            // mCCodecCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
-            // buffers.lock();
-            return false;
+        Mutexed<ReorderStash>::Locked reorder(mReorderStash);
+        reorder->emplace(buffer, timestamp.peek(), flags, worklet->output.ordinal);
+        if (flags & MediaCodec::BUFFER_FLAG_EOS) {
+            // Flush reorder stash
+            reorder->setDepth(0);
         }
     }
-
-    outBuffer->meta()->setInt64("timeUs", timestamp.peek());
-    outBuffer->meta()->setInt32("flags", flags);
-    ALOGV("[%s] onWorkDone: out buffer index = %zu [%p] => %p + %zu",
-            mName, index, outBuffer.get(), outBuffer->data(), outBuffer->size());
-    if (csdReported) {
-        mAvailablePipelineCapacity.forceAllocateOutputSlot("handleWork");
-    }
-    mCallback->onOutputBufferAvailable(index, outBuffer);
+    sendOutputBuffers();
     return true;
 }
 
+void CCodecBufferChannel::sendOutputBuffers() {
+    ReorderStash::Entry entry;
+    sp<MediaCodecBuffer> outBuffer;
+    size_t index;
+
+    while (true) {
+        {
+            Mutexed<ReorderStash>::Locked reorder(mReorderStash);
+            if (!reorder->hasPending()) {
+                break;
+            }
+            if (!reorder->pop(&entry)) {
+                break;
+            }
+        }
+        Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
+        if (!(*buffers)->registerBuffer(entry.buffer, &index, &outBuffer)) {
+            ALOGV("[%s] sendOutputBuffers: unable to register output buffer", mName);
+            buffers.unlock();
+            mReorderStash.lock()->defer(entry);
+            return;
+        }
+        buffers.unlock();
+
+        outBuffer->meta()->setInt64("timeUs", entry.timestamp);
+        outBuffer->meta()->setInt32("flags", entry.flags);
+        ALOGV("[%s] sendOutputBuffers: out buffer index = %zu [%p] => %p + %zu",
+                mName, index, outBuffer.get(), outBuffer->data(), outBuffer->size());
+        mCallback->onOutputBufferAvailable(index, outBuffer);
+    }
+}
+
 status_t CCodecBufferChannel::setSurface(const sp<Surface> &newSurface) {
     static std::atomic_uint32_t surfaceGeneration{0};
     uint32_t generation = (getpid() << 10) |
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index 3c23c81..3ef62bc 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -217,6 +217,7 @@
     bool handleWork(
             std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
             const C2StreamInitDataInfo::output *initData);
+    void sendOutputBuffers();
 
     QueueSync mSync;
     sp<MemoryDealer> mDealer;
@@ -271,34 +272,25 @@
     //    CCodecBufferChannel whose outputs have not been returned from the
     //    component (by calling onWorkDone()) does not exceed a certain limit.
     //    (Let us call this the "component" capacity.)
-    // 3. The number of work items that have been received by
-    //    CCodecBufferChannel whose outputs have not been released by the app
-    //    (either by calling discardBuffer() on an output buffer or calling
-    //    renderOutputBuffer()) does not exceed a certain limit. (Let us call
-    //    this the "output" capacity.)
     //
     // These three criteria guarantee that a new input buffer that arrives from
     // the invocation of onInputBufferAvailable() will not
     // 1. overload CCodecBufferChannel's input buffers;
     // 2. overload the component; or
-    // 3. overload CCodecBufferChannel's output buffers if the component
-    //    finishes all the pending work right away.
     //
     struct PipelineCapacity {
         // The number of available input capacity.
         std::atomic_int input;
         // The number of available component capacity.
         std::atomic_int component;
-        // The number of available output capacity.
-        std::atomic_int output;
 
         PipelineCapacity();
-        // Set the values of #component and #output.
-        void initialize(int newInput, int newComponent, int newOutput,
+        // Set the values of #input and #component.
+        void initialize(int newInput, int newComponent,
                         const char* newName = "<UNKNOWN COMPONENT>",
                         const char* callerTag = nullptr);
 
-        // Return true and decrease #input, #component and #output by one if
+        // Return true and decrease #input and #component by one if
         // they are all greater than zero; return false otherwise.
         //
         // callerTag is used for logging only.
@@ -309,7 +301,7 @@
         // afterwards.
         bool allocate(const char* callerTag = nullptr);
 
-        // Increase #input, #component and #output by one.
+        // Increase #input and #component by one.
         //
         // callerTag is used for logging only.
         //
@@ -336,30 +328,52 @@
         // onWorkDone() is called.
         int freeComponentSlot(const char* callerTag = nullptr);
 
-        // Increase #output by one and return the updated value.
-        //
-        // callerTag is used for logging only.
-        //
-        // freeOutputSlot() is called by CCodecBufferChannel when
-        // discardBuffer() is called on an output buffer or when
-        // renderOutputBuffer() is called.
-        int freeOutputSlot(const char* callerTag = nullptr);
-
-        // Force-allocate an output slot.
-        //
-        // callerTag is used for logging only.
-        //
-        // forceAllocateOutputSlot() is called by CCodecBufferChannel to report
-        // a configuration change. #output may become negative when
-        // forceAllocateOutputSlot() returns.
-        int forceAllocateOutputSlot(const char* callerTag = nullptr);
-
     private:
         // Component name. Used for logging.
         const char* mName;
     };
     PipelineCapacity mAvailablePipelineCapacity;
 
+    class ReorderStash {
+    public:
+        struct Entry {
+            inline Entry() : buffer(nullptr), timestamp(0), flags(0), ordinal({0, 0, 0}) {}
+            inline Entry(
+                    const std::shared_ptr<C2Buffer> &b,
+                    int64_t t,
+                    int32_t f,
+                    const C2WorkOrdinalStruct &o)
+                : buffer(b), timestamp(t), flags(f), ordinal(o) {}
+            std::shared_ptr<C2Buffer> buffer;
+            int64_t timestamp;
+            int32_t flags;
+            C2WorkOrdinalStruct ordinal;
+        };
+
+        ReorderStash();
+
+        void clear();
+        void setDepth(uint32_t depth);
+        void setKey(C2Config::ordinal_key_t key);
+        bool pop(Entry *entry);
+        void emplace(
+                const std::shared_ptr<C2Buffer> &buffer,
+                int64_t timestamp,
+                int32_t flags,
+                const C2WorkOrdinalStruct &ordinal);
+        void defer(const Entry &entry);
+        bool hasPending() const;
+
+    private:
+        std::list<Entry> mPending;
+        std::list<Entry> mStash;
+        uint32_t mDepth;
+        C2Config::ordinal_key_t mKey;
+
+        bool less(const C2WorkOrdinalStruct &o1, const C2WorkOrdinalStruct &o2);
+    };
+    Mutexed<ReorderStash> mReorderStash;
+
     std::atomic_bool mInputMetEos;
 
     inline bool hasCryptoOrDescrambler() {