CCodec: add reordering logic
Bug: 111877607
Test: atest --test-mapping frameworks/av/media/libstagefright:postsubmit
Merged-In: I54437645af17b9aee914b2783546e0f50d9dea9c
Change-Id: I54437645af17b9aee914b2783546e0f50d9dea9c
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index 6b2c2a0..defdcde 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -1050,7 +1050,7 @@
return clientBuffer->canCopy(buffer);
});
if (err != OK) {
- ALOGD("[%s] grabBuffer failed: %d", mName, err);
+ ALOGV("[%s] grabBuffer failed: %d", mName, err);
return false;
}
c2Buffer->setFormat(mFormat);
@@ -1330,65 +1330,57 @@
// CCodecBufferChannel::PipelineCapacity
CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
- : input(0), component(0), output(0),
+ : input(0), component(0),
mName("<UNKNOWN COMPONENT>") {
}
void CCodecBufferChannel::PipelineCapacity::initialize(
int newInput,
int newComponent,
- int newOutput,
const char* newName,
const char* callerTag) {
input.store(newInput, std::memory_order_relaxed);
component.store(newComponent, std::memory_order_relaxed);
- output.store(newOutput, std::memory_order_relaxed);
mName = newName;
ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
"pipeline availability initialized ==> "
- "input = %d, component = %d, output = %d",
+ "input = %d, component = %d",
mName, callerTag ? callerTag : "*",
- newInput, newComponent, newOutput);
+ newInput, newComponent);
}
bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
- int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
- if (prevInput > 0 && prevComponent > 0 && prevOutput > 0) {
+ if (prevInput > 0 && prevComponent > 0) {
ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
"pipeline availability -1 all ==> "
- "input = %d, component = %d, output = %d",
+ "input = %d, component = %d",
mName, callerTag ? callerTag : "*",
prevInput - 1,
- prevComponent - 1,
- prevOutput - 1);
+ prevComponent - 1);
return true;
}
input.fetch_add(1, std::memory_order_relaxed);
component.fetch_add(1, std::memory_order_relaxed);
- output.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
"pipeline availability unchanged ==> "
- "input = %d, component = %d, output = %d",
+ "input = %d, component = %d",
mName, callerTag ? callerTag : "*",
prevInput,
- prevComponent,
- prevOutput);
+ prevComponent);
return false;
}
void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
int prevInput = input.fetch_add(1, std::memory_order_relaxed);
int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
- int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::free(): "
"pipeline availability +1 all ==> "
- "input = %d, component = %d, output = %d",
+ "input = %d, component = %d",
mName, callerTag ? callerTag : "*",
prevInput + 1,
- prevComponent + 1,
- prevOutput + 1);
+ prevComponent + 1);
}
int CCodecBufferChannel::PipelineCapacity::freeInputSlots(
@@ -1398,13 +1390,12 @@
std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::freeInputSlots(%zu): "
"pipeline availability +%zu input ==> "
- "input = %d, component = %d, output = %d",
+ "input = %d, component = %d",
mName, callerTag ? callerTag : "*",
numDiscardedInputBuffers,
numDiscardedInputBuffers,
prevInput + static_cast<int>(numDiscardedInputBuffers),
- component.load(std::memory_order_relaxed),
- output.load(std::memory_order_relaxed));
+ component.load(std::memory_order_relaxed));
return prevInput + static_cast<int>(numDiscardedInputBuffers);
}
@@ -1413,38 +1404,84 @@
int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
"pipeline availability +1 component ==> "
- "input = %d, component = %d, output = %d",
+ "input = %d, component = %d",
mName, callerTag ? callerTag : "*",
input.load(std::memory_order_relaxed),
- prevComponent + 1,
- output.load(std::memory_order_relaxed));
+ prevComponent + 1);
return prevComponent + 1;
}
-int CCodecBufferChannel::PipelineCapacity::freeOutputSlot(
- const char* callerTag) {
- int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
- ALOGV("[%s] %s -- PipelineCapacity::freeOutputSlot(): "
- "pipeline availability +1 output ==> "
- "input = %d, component = %d, output = %d",
- mName, callerTag ? callerTag : "*",
- input.load(std::memory_order_relaxed),
- component.load(std::memory_order_relaxed),
- prevOutput + 1);
- return prevOutput + 1;
+// CCodecBufferChannel::ReorderStash
+
+CCodecBufferChannel::ReorderStash::ReorderStash() {
+ clear();
}
-int CCodecBufferChannel::PipelineCapacity::forceAllocateOutputSlot(
- const char* callerTag) {
- int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
- ALOGV("[%s] %s -- PipelineCapacity::forceAllocateOutputSlot(): "
- "pipeline availability -1 output ==> "
- "input = %d, component = %d, output = %d",
- mName, callerTag ? callerTag : "*",
- input.load(std::memory_order_relaxed),
- component.load(std::memory_order_relaxed),
- prevOutput - 1);
- return prevOutput - 1;
+void CCodecBufferChannel::ReorderStash::clear() {
+ mPending.clear();
+ mStash.clear();
+ mDepth = 0;
+ mKey = C2Config::ORDINAL;
+}
+
+void CCodecBufferChannel::ReorderStash::setDepth(uint32_t depth) {
+ mPending.splice(mPending.end(), mStash);
+ mDepth = depth;
+}
+void CCodecBufferChannel::ReorderStash::setKey(C2Config::ordinal_key_t key) {
+ mPending.splice(mPending.end(), mStash);
+ mKey = key;
+}
+
+bool CCodecBufferChannel::ReorderStash::pop(Entry *entry) {
+ if (mPending.empty()) {
+ return false;
+ }
+ entry->buffer = mPending.front().buffer;
+ entry->timestamp = mPending.front().timestamp;
+ entry->flags = mPending.front().flags;
+ entry->ordinal = mPending.front().ordinal;
+ mPending.pop_front();
+ return true;
+}
+
+void CCodecBufferChannel::ReorderStash::emplace(
+ const std::shared_ptr<C2Buffer> &buffer,
+ int64_t timestamp,
+ int32_t flags,
+ const C2WorkOrdinalStruct &ordinal) {
+ for (auto it = mStash.begin(); it != mStash.end(); ++it) {
+ if (less(ordinal, it->ordinal)) {
+ mStash.emplace(it, buffer, timestamp, flags, ordinal);
+ return;
+ }
+ }
+ mStash.emplace_back(buffer, timestamp, flags, ordinal);
+ while (!mStash.empty() && mStash.size() > mDepth) {
+ mPending.push_back(mStash.front());
+ mStash.pop_front();
+ }
+}
+
+void CCodecBufferChannel::ReorderStash::defer(
+ const CCodecBufferChannel::ReorderStash::Entry &entry) {
+ mPending.push_front(entry);
+}
+
+bool CCodecBufferChannel::ReorderStash::hasPending() const {
+ return !mPending.empty();
+}
+
+bool CCodecBufferChannel::ReorderStash::less(
+ const C2WorkOrdinalStruct &o1, const C2WorkOrdinalStruct &o2) {
+ switch (mKey) {
+ case C2Config::ORDINAL: return o1.frameIndex < o2.frameIndex;
+ case C2Config::TIMESTAMP: return o1.timestamp < o2.timestamp;
+ case C2Config::CUSTOM: return o1.customOrdinal < o2.customOrdinal;
+ default:
+ ALOGD("Unrecognized key; default to timestamp");
+ return o1.frameIndex < o2.frameIndex;
+ }
}
// CCodecBufferChannel
@@ -1686,6 +1723,7 @@
void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
while (!mInputMetEos &&
+ !mReorderStash.lock()->hasPending() &&
mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
sp<MediaCodecBuffer> inBuffer;
size_t index;
@@ -1704,8 +1742,6 @@
status_t CCodecBufferChannel::renderOutputBuffer(
const sp<MediaCodecBuffer> &buffer, int64_t timestampNs) {
- mAvailablePipelineCapacity.freeOutputSlot("renderOutputBuffer");
- feedInputBufferIfAvailable();
std::shared_ptr<C2Buffer> c2Buffer;
{
Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
@@ -1716,6 +1752,7 @@
if (!c2Buffer) {
return INVALID_OPERATION;
}
+ sendOutputBuffers();
#if 0
const std::vector<std::shared_ptr<const C2Info>> infoParams = c2Buffer->info();
@@ -1856,11 +1893,12 @@
if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
buffers.unlock();
released = true;
- mAvailablePipelineCapacity.freeOutputSlot("discardBuffer");
}
}
- feedInputBufferIfAvailable();
- if (!released) {
+ if (released) {
+ feedInputBufferIfAvailable();
+ sendOutputBuffers();
+ } else {
ALOGD("[%s] MediaCodec discarded an unknown buffer", mName);
}
return OK;
@@ -1892,12 +1930,31 @@
const sp<AMessage> &inputFormat, const sp<AMessage> &outputFormat) {
C2StreamBufferTypeSetting::input iStreamFormat(0u);
C2StreamBufferTypeSetting::output oStreamFormat(0u);
+ C2PortReorderBufferDepthTuning::output reorderDepth;
+ C2PortReorderKeySetting::output reorderKey;
c2_status_t err = mComponent->query(
- { &iStreamFormat, &oStreamFormat },
+ {
+ &iStreamFormat,
+ &oStreamFormat,
+ &reorderDepth,
+ &reorderKey,
+ },
{},
C2_DONT_BLOCK,
nullptr);
- if (err != C2_OK) {
+ if (err == C2_BAD_INDEX) {
+ if (!iStreamFormat || !oStreamFormat) {
+ return UNKNOWN_ERROR;
+ }
+ Mutexed<ReorderStash>::Locked reorder(mReorderStash);
+ reorder->clear();
+ if (reorderDepth) {
+ reorder->setDepth(reorderDepth.value);
+ }
+ if (reorderKey) {
+ reorder->setKey(reorderKey.value);
+ }
+ } else if (err != C2_OK) {
return UNKNOWN_ERROR;
}
@@ -2164,7 +2221,6 @@
mAvailablePipelineCapacity.initialize(
kMinInputBufferArraySize,
kMaxPipelineCapacity,
- kMinOutputBufferArraySize,
mName);
#endif
@@ -2242,7 +2298,6 @@
for (const sp<MediaCodecBuffer> &buffer : toBeQueued) {
if (queueInputBufferInternal(buffer) != OK) {
mAvailablePipelineCapacity.freeComponentSlot("requestInitialInputBuffers");
- mAvailablePipelineCapacity.freeOutputSlot("requestInitialInputBuffers");
}
}
return OK;
@@ -2326,11 +2381,14 @@
return false;
}
- mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+ if (work->worklets.size() != 1u
+ || !work->worklets.front()
+ || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
+ mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+ }
if (work->result == C2_NOT_FOUND) {
ALOGD("[%s] flushed work; ignored.", mName);
- mAvailablePipelineCapacity.freeOutputSlot("handleWork (flushed)");
return true;
}
@@ -2364,6 +2422,40 @@
}
}
+ while (!worklet->output.configUpdate.empty()) {
+ std::unique_ptr<C2Param> param;
+ worklet->output.configUpdate.back().swap(param);
+ worklet->output.configUpdate.pop_back();
+ switch (param->coreIndex().coreIndex()) {
+ case C2PortReorderBufferDepthTuning::CORE_INDEX: {
+ C2PortReorderBufferDepthTuning::output reorderDepth;
+ if (reorderDepth.updateFrom(*param)) {
+ mReorderStash.lock()->setDepth(reorderDepth.value);
+ ALOGV("[%s] onWorkDone: updated reorder depth to %u",
+ mName, reorderDepth.value);
+ } else {
+ ALOGD("[%s] onWorkDone: failed to read reorder depth", mName);
+ }
+ break;
+ }
+ case C2PortReorderKeySetting::CORE_INDEX: {
+ C2PortReorderKeySetting::output reorderKey;
+ if (reorderKey.updateFrom(*param)) {
+ mReorderStash.lock()->setKey(reorderKey.value);
+ ALOGV("[%s] onWorkDone: updated reorder key to %u",
+ mName, reorderKey.value);
+ } else {
+ ALOGD("[%s] onWorkDone: failed to read reorder key", mName);
+ }
+ break;
+ }
+ default:
+ ALOGV("[%s] onWorkDone: unrecognized config update (%08X)",
+ mName, param->index());
+ break;
+ }
+ }
+
if (outputFormat != nullptr) {
Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
ALOGD("[%s] onWorkDone: output format changed to %s",
@@ -2388,7 +2480,6 @@
ALOGV("[%s] onWorkDone: output EOS", mName);
}
- bool csdReported = false;
sp<MediaCodecBuffer> outBuffer;
size_t index;
@@ -2418,7 +2509,6 @@
buffers.unlock();
mCallback->onOutputBufferAvailable(index, outBuffer);
buffers.lock();
- csdReported = true;
} else {
ALOGD("[%s] onWorkDone: unable to register csd", mName);
buffers.unlock();
@@ -2431,11 +2521,6 @@
if (!buffer && !flags) {
ALOGV("[%s] onWorkDone: Not reporting output buffer (%lld)",
mName, work->input.ordinal.frameIndex.peekull());
- if (!csdReported) {
- // onOutputBufferAvailable() has not been and will not be called for
- // this work item.
- mAvailablePipelineCapacity.freeOutputSlot("handleWork (no csd)");
- }
return true;
}
@@ -2455,28 +2540,49 @@
}
{
- Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
- if (!(*buffers)->registerBuffer(buffer, &index, &outBuffer)) {
- ALOGD("[%s] onWorkDone: unable to register output buffer", mName);
- // TODO
- // buffers.unlock();
- // mCCodecCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
- // buffers.lock();
- return false;
+ Mutexed<ReorderStash>::Locked reorder(mReorderStash);
+ reorder->emplace(buffer, timestamp.peek(), flags, worklet->output.ordinal);
+ if (flags & MediaCodec::BUFFER_FLAG_EOS) {
+ // Flush reorder stash
+ reorder->setDepth(0);
}
}
-
- outBuffer->meta()->setInt64("timeUs", timestamp.peek());
- outBuffer->meta()->setInt32("flags", flags);
- ALOGV("[%s] onWorkDone: out buffer index = %zu [%p] => %p + %zu",
- mName, index, outBuffer.get(), outBuffer->data(), outBuffer->size());
- if (csdReported) {
- mAvailablePipelineCapacity.forceAllocateOutputSlot("handleWork");
- }
- mCallback->onOutputBufferAvailable(index, outBuffer);
+ sendOutputBuffers();
return true;
}
+void CCodecBufferChannel::sendOutputBuffers() {
+ ReorderStash::Entry entry;
+ sp<MediaCodecBuffer> outBuffer;
+ size_t index;
+
+ while (true) {
+ {
+ Mutexed<ReorderStash>::Locked reorder(mReorderStash);
+ if (!reorder->hasPending()) {
+ break;
+ }
+ if (!reorder->pop(&entry)) {
+ break;
+ }
+ }
+ Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
+ if (!(*buffers)->registerBuffer(entry.buffer, &index, &outBuffer)) {
+ ALOGV("[%s] sendOutputBuffers: unable to register output buffer", mName);
+ buffers.unlock();
+ mReorderStash.lock()->defer(entry);
+ return;
+ }
+ buffers.unlock();
+
+ outBuffer->meta()->setInt64("timeUs", entry.timestamp);
+ outBuffer->meta()->setInt32("flags", entry.flags);
+ ALOGV("[%s] sendOutputBuffers: out buffer index = %zu [%p] => %p + %zu",
+ mName, index, outBuffer.get(), outBuffer->data(), outBuffer->size());
+ mCallback->onOutputBufferAvailable(index, outBuffer);
+ }
+}
+
status_t CCodecBufferChannel::setSurface(const sp<Surface> &newSurface) {
static std::atomic_uint32_t surfaceGeneration{0};
uint32_t generation = (getpid() << 10) |
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index 3c23c81..3ef62bc 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -217,6 +217,7 @@
bool handleWork(
std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
const C2StreamInitDataInfo::output *initData);
+ void sendOutputBuffers();
QueueSync mSync;
sp<MemoryDealer> mDealer;
@@ -271,34 +272,25 @@
// CCodecBufferChannel whose outputs have not been returned from the
// component (by calling onWorkDone()) does not exceed a certain limit.
// (Let us call this the "component" capacity.)
- // 3. The number of work items that have been received by
- // CCodecBufferChannel whose outputs have not been released by the app
- // (either by calling discardBuffer() on an output buffer or calling
- // renderOutputBuffer()) does not exceed a certain limit. (Let us call
- // this the "output" capacity.)
//
// These three criteria guarantee that a new input buffer that arrives from
// the invocation of onInputBufferAvailable() will not
// 1. overload CCodecBufferChannel's input buffers;
// 2. overload the component; or
- // 3. overload CCodecBufferChannel's output buffers if the component
- // finishes all the pending work right away.
//
struct PipelineCapacity {
// The number of available input capacity.
std::atomic_int input;
// The number of available component capacity.
std::atomic_int component;
- // The number of available output capacity.
- std::atomic_int output;
PipelineCapacity();
- // Set the values of #component and #output.
- void initialize(int newInput, int newComponent, int newOutput,
+ // Set the values of #input and #component.
+ void initialize(int newInput, int newComponent,
const char* newName = "<UNKNOWN COMPONENT>",
const char* callerTag = nullptr);
- // Return true and decrease #input, #component and #output by one if
+ // Return true and decrease #input and #component by one if
// they are all greater than zero; return false otherwise.
//
// callerTag is used for logging only.
@@ -309,7 +301,7 @@
// afterwards.
bool allocate(const char* callerTag = nullptr);
- // Increase #input, #component and #output by one.
+ // Increase #input and #component by one.
//
// callerTag is used for logging only.
//
@@ -336,30 +328,52 @@
// onWorkDone() is called.
int freeComponentSlot(const char* callerTag = nullptr);
- // Increase #output by one and return the updated value.
- //
- // callerTag is used for logging only.
- //
- // freeOutputSlot() is called by CCodecBufferChannel when
- // discardBuffer() is called on an output buffer or when
- // renderOutputBuffer() is called.
- int freeOutputSlot(const char* callerTag = nullptr);
-
- // Force-allocate an output slot.
- //
- // callerTag is used for logging only.
- //
- // forceAllocateOutputSlot() is called by CCodecBufferChannel to report
- // a configuration change. #output may become negative when
- // forceAllocateOutputSlot() returns.
- int forceAllocateOutputSlot(const char* callerTag = nullptr);
-
private:
// Component name. Used for logging.
const char* mName;
};
PipelineCapacity mAvailablePipelineCapacity;
+ class ReorderStash {
+ public:
+ struct Entry {
+ inline Entry() : buffer(nullptr), timestamp(0), flags(0), ordinal({0, 0, 0}) {}
+ inline Entry(
+ const std::shared_ptr<C2Buffer> &b,
+ int64_t t,
+ int32_t f,
+ const C2WorkOrdinalStruct &o)
+ : buffer(b), timestamp(t), flags(f), ordinal(o) {}
+ std::shared_ptr<C2Buffer> buffer;
+ int64_t timestamp;
+ int32_t flags;
+ C2WorkOrdinalStruct ordinal;
+ };
+
+ ReorderStash();
+
+ void clear();
+ void setDepth(uint32_t depth);
+ void setKey(C2Config::ordinal_key_t key);
+ bool pop(Entry *entry);
+ void emplace(
+ const std::shared_ptr<C2Buffer> &buffer,
+ int64_t timestamp,
+ int32_t flags,
+ const C2WorkOrdinalStruct &ordinal);
+ void defer(const Entry &entry);
+ bool hasPending() const;
+
+ private:
+ std::list<Entry> mPending;
+ std::list<Entry> mStash;
+ uint32_t mDepth;
+ C2Config::ordinal_key_t mKey;
+
+ bool less(const C2WorkOrdinalStruct &o1, const C2WorkOrdinalStruct &o2);
+ };
+ Mutexed<ReorderStash> mReorderStash;
+
std::atomic_bool mInputMetEos;
inline bool hasCryptoOrDescrambler() {