Limit the number of live input buffers in CCodec
Test: m cts && cts-tradefed run cts -m CtsMediaTestCases \
-t android.media.cts.AdaptivePlaybackTest
Bug: 113292710
Change-Id: Ic31162345daa6d1052f4064854eaf4b52a3929d6
Merged-In: Ic31162345daa6d1052f4064854eaf4b52a3929d6
(cherry picked from commit 100f6fb5ab164c3d118c88c931e2a70aba85b796)
diff --git a/media/sfplugin/CCodecBufferChannel.cpp b/media/sfplugin/CCodecBufferChannel.cpp
index 5049914..90b33de 100644
--- a/media/sfplugin/CCodecBufferChannel.cpp
+++ b/media/sfplugin/CCodecBufferChannel.cpp
@@ -1330,59 +1330,124 @@
// CCodecBufferChannel::PipelineCapacity
CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
- : component(0), output(0), mName("<UNKNOWN COMPONENT>") {
+ : input(0), lentInput(0), component(0), output(0),
+ mName("<UNKNOWN COMPONENT>") {
}
void CCodecBufferChannel::PipelineCapacity::initialize(
+ int newInput,
int newComponent,
int newOutput,
const char* newName,
const char* callerTag) {
+ input.store(newInput, std::memory_order_relaxed);
+ lentInput.store(0, std::memory_order_relaxed);
component.store(newComponent, std::memory_order_relaxed);
output.store(newOutput, std::memory_order_relaxed);
mName = newName;
ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
"pipeline availability initialized ==> "
- "component = %d, output = %d",
+ "input = %d (-%d), component = %d, output = %d",
mName, callerTag ? callerTag : "*",
- newComponent, newOutput);
+ newInput, 0, newComponent, newOutput);
}
bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
+ int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
int prevOutput = output.fetch_sub(1, std::memory_order_relaxed);
- if (prevComponent > 0 && prevOutput > 0) {
+ if (prevInput > 0 && prevComponent > 0 && prevOutput > 0) {
ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
- "pipeline availability -1 all ==> component = %d, output = %d",
+ "pipeline availability -1 all ==> "
+ "input = %d (-%d), component = %d, output = %d",
mName, callerTag ? callerTag : "*",
- prevComponent - 1, prevOutput - 1);
+ prevInput - 1,
+ lentInput.load(std::memory_order_relaxed),
+ prevComponent - 1,
+ prevOutput - 1);
return true;
}
+ input.fetch_add(1, std::memory_order_relaxed);
component.fetch_add(1, std::memory_order_relaxed);
output.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
- "pipeline availability unchanged ==> component = %d, output = %d",
+ "pipeline availability unchanged ==> "
+ "input = %d (-%d), component = %d, output = %d",
mName, callerTag ? callerTag : "*",
- prevComponent, prevOutput);
+ prevInput,
+ lentInput.load(std::memory_order_relaxed),
+ prevComponent,
+ prevOutput);
return false;
}
void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
+ int prevInput = input.fetch_add(1, std::memory_order_relaxed);
int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::free(): "
- "pipeline availability +1 all ==> component = %d, output = %d",
+ "pipeline availability +1 all ==> "
+ "input = %d (-%d), component = %d, output = %d",
mName, callerTag ? callerTag : "*",
- prevComponent + 1, prevOutput + 1);
+ prevInput + 1,
+ lentInput.load(std::memory_order_relaxed),
+ prevComponent + 1,
+ prevOutput + 1);
+}
+
+int CCodecBufferChannel::PipelineCapacity::lendInputSlot(
+ const char* callerTag) {
+ int prevInput = input.fetch_add(1, std::memory_order_relaxed);
+ int prevLentInput = lentInput.fetch_add(1, std::memory_order_relaxed);
+ ALOGV("[%s] %s -- PipelineCapacity::lendInputSlot(): "
+ "pipeline availability +1 (-1) input ==> "
+ "input = %d (-%d), component = %d, output = %d",
+ mName, callerTag ? callerTag : "*",
+ prevInput + 1,
+ prevLentInput + 1,
+ component.load(std::memory_order_relaxed),
+ output.load(std::memory_order_relaxed));
+ return prevInput + 1;
+}
+
+int CCodecBufferChannel::PipelineCapacity::freeInputSlot(
+ const char* callerTag) {
+ int prevLentInput = lentInput.fetch_sub(1, std::memory_order_relaxed);
+ if (prevLentInput > 0) {
+ ALOGV("[%s] %s -- PipelineCapacity::freeInputSlot(): "
+ "pipeline availability +0 (+1) input ==> "
+ "input = %d (-%d), component = %d, output = %d",
+ mName, callerTag ? callerTag : "*",
+ input.load(std::memory_order_relaxed),
+ prevLentInput - 1,
+ component.load(std::memory_order_relaxed),
+ output.load(std::memory_order_relaxed));
+ return input.load(std::memory_order_relaxed);
+ }
+ lentInput.fetch_add(1, std::memory_order_relaxed);
+ int prevInput = input.fetch_add(1, std::memory_order_relaxed);
+ ALOGV("[%s] %s -- PipelineCapacity::freeInputSlot(): "
+ "pipeline availability +1 (+0) input ==> "
+ "input = %d (-%d), component = %d, output = %d",
+ mName, callerTag ? callerTag : "*",
+ prevInput + 1,
+ prevLentInput,
+ component.load(std::memory_order_relaxed),
+ output.load(std::memory_order_relaxed));
+ return prevInput + 1;
}
int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
const char* callerTag) {
int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
- "pipeline availability +1 component ==> component = %d, output = %d",
+ "pipeline availability +1 component ==> "
+ "input = %d (-%d), component = %d, output = %d",
mName, callerTag ? callerTag : "*",
- prevComponent + 1, output.load(std::memory_order_relaxed));
+ input.load(std::memory_order_relaxed),
+ lentInput.load(std::memory_order_relaxed),
+ prevComponent + 1,
+ output.load(std::memory_order_relaxed));
return prevComponent + 1;
}
@@ -1390,9 +1455,13 @@
const char* callerTag) {
int prevOutput = output.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::freeOutputSlot(): "
- "pipeline availability +1 output ==> component = %d, output = %d",
+ "pipeline availability +1 output ==> "
+ "input = %d (-%d), component = %d, output = %d",
mName, callerTag ? callerTag : "*",
- component.load(std::memory_order_relaxed), prevOutput + 1);
+ input.load(std::memory_order_relaxed),
+ lentInput.load(std::memory_order_relaxed),
+ component.load(std::memory_order_relaxed),
+ prevOutput + 1);
return prevOutput + 1;
}
@@ -1855,11 +1924,13 @@
C2_DONT_BLOCK,
nullptr);
mAvailablePipelineCapacity.initialize(
+ inputDelay,
inputDelay + pipelineDelay,
inputDelay + pipelineDelay + outputDelay,
mName);
#else
mAvailablePipelineCapacity.initialize(
+ kMinInputBufferArraySize,
kMaxPipelineCapacity,
kMinOutputBufferArraySize,
mName);
@@ -2213,6 +2284,7 @@
void CCodecBufferChannel::onWorkDone(
std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
const C2StreamInitDataInfo::output *initData) {
+ mAvailablePipelineCapacity.freeInputSlot("onWorkDone");
mAvailablePipelineCapacity.freeComponentSlot("onWorkDone");
if (handleWork(std::move(work), outputFormat, initData)) {
mAvailablePipelineCapacity.freeOutputSlot("onWorkDone");
@@ -2228,6 +2300,7 @@
newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
}
if (newInputSlotAvailable) {
+ mAvailablePipelineCapacity.lendInputSlot("onInputBufferDone");
feedInputBufferIfAvailable();
}
}
diff --git a/media/sfplugin/CCodecBufferChannel.h b/media/sfplugin/CCodecBufferChannel.h
index b4877a8..f9e086f 100644
--- a/media/sfplugin/CCodecBufferChannel.h
+++ b/media/sfplugin/CCodecBufferChannel.h
@@ -257,38 +257,50 @@
// PipelineCapacity is used in the input buffer gating logic.
//
- // There are two criteria that need to be met before
+ // There are three criteria that need to be met before
// onInputBufferAvailable() is called:
- // 1. The number of work items that have been received by
+ // 1. The number of input buffers that have been received by
+ // CCodecBufferChannel but not returned via onWorkDone() or
+ // onInputBufferDone() does not exceed a certain limit. (Let us call this
+ // number the "input" capacity.)
+ // 2. The number of work items that have been received by
// CCodecBufferChannel whose outputs have not been returned from the
// component (by calling onWorkDone()) does not exceed a certain limit.
- // Let us call this the "component" capacity.
- // 2. The number of work items that have been received by
+ // (Let us call this the "component" capacity.)
+ // 3. The number of work items that have been received by
// CCodecBufferChannel whose outputs have not been released by the app
// (either by calling discardBuffer() on an output buffer or calling
- // renderOutputBuffer()) does not exceed a certain limit. Let us call
- // this the "output" capacity.
+ // renderOutputBuffer()) does not exceed a certain limit. (Let us call
+ // this the "output" capacity.)
//
- // These two criteria guarantee that the new input buffer that arrives from
+ // These three criteria guarantee that a new input buffer that arrives from
// the invocation of onInputBufferAvailable() will not
- // 1. overload the component; or
- // 2. overload CCodecBufferChannel's output buffers if the component
+ // 1. overload CCodecBufferChannel's input buffers;
+ // 2. overload the component; or
+ // 3. overload CCodecBufferChannel's output buffers if the component
// finishes all the pending work right away.
//
struct PipelineCapacity {
+ // The number of available input capacity.
+ std::atomic_int input;
+ // The number of input buffers that have been released by
+ // onInputBufferDone() but not onWorkDone(). Once onWorkDone() is
+ // called, this number will decrease unless it is already zero; in
+ // which case #input will increase instead.
+ std::atomic_int lentInput;
// The number of available component capacity.
std::atomic_int component;
// The number of available output capacity.
std::atomic_int output;
PipelineCapacity();
- // Set the values of component and output.
- void initialize(int newComponent, int newOutput,
+ // Set the values of #component and #output.
+ void initialize(int newInput, int newComponent, int newOutput,
const char* newName = "<UNKNOWN COMPONENT>",
const char* callerTag = nullptr);
- // Return true and decrease component and output by one if they are both
- // greater than zero; return false otherwise.
+ // Return true and decrease #input, #component and #output by one if
+ // they are all greater than zero; return false otherwise.
//
// callerTag is used for logging only.
//
@@ -297,15 +309,37 @@
// onInputBufferAvailable() can (and will) be called afterwards.
bool allocate(const char* callerTag = nullptr);
- // Increase component and output by one.
+ // Increase #input, #component and #output by one.
//
// callerTag is used for logging only.
//
// free() is called by CCodecBufferChannel after allocate() returns true
- // but onInputBufferAvailable() cannot be called for any reasons.
+ // but onInputBufferAvailable() cannot be called for any reasons. It
+ // essentially undoes an allocate() call.
void free(const char* callerTag = nullptr);
- // Increase component by one and return the updated value.
+ // Increase #input and #lentInput by 1.
+ //
+ // callerTag is used for logging only.
+ //
+ // lendInputSlot() is called by CCodecBufferChannel when
+ // onInputBufferDone() is called. This means an input buffer has been
+ // freed, but a subsequent call to onWorkDone() will not free an input
+ // buffer.
+ int lendInputSlot(const char* callerTag = nullptr);
+
+ // Increase #input by one if #lentInput is 0; otherwise, decrease
+ // #lentInput by 1.
+ //
+ // callerTag is used for logging only.
+ //
+ // freeInputSlot() is called by CCodecBufferChannel when onWorkDone() is
+ // called. If #lentInput is not zero, that means the input buffer for
+ // the returned work has already been freed, so #input will not
+ // increase.
+ int freeInputSlot(const char* callerTag = nullptr);
+
+ // Increase #component by one and return the updated value.
//
// callerTag is used for logging only.
//
@@ -313,7 +347,7 @@
// called.
int freeComponentSlot(const char* callerTag = nullptr);
- // Increase output by one and return the updated value.
+ // Increase #output by one and return the updated value.
//
// callerTag is used for logging only.
//