blob: d156e554f4dff57d3965943321a39c307f63f321 [file] [log] [blame]
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::BTreeSet;
use std::collections::VecDeque;
use thiserror::Error;
use crate::encoder::CodedBitstreamBuffer;
use crate::encoder::EncodeError;
use crate::encoder::EncodeResult;
use crate::encoder::FrameMetadata;
use crate::encoder::Tunings;
use crate::encoder::VideoEncoder;
pub mod h264;
pub mod h265;
pub mod vp8;
pub mod vp9;
#[derive(Debug, Error)]
pub enum StatefulBackendError {
#[error("invalid internal state. This is likely a bug.")]
InvalidInternalState,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub type StatefulBackendResult<T> = Result<T, StatefulBackendError>;
/// Unique identifier of the [`BackendRequest`]
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct BackendRequestId(usize);
/// Request package that is offered to [`StatefulVideoEncoderBackend`] for processing
pub struct BackendRequest<Handle> {
/// Request's unique identifier
pub request_id: BackendRequestId,
/// Frame's metadata
pub meta: FrameMetadata,
/// Frame's handle
pub handle: Handle,
/// Tunings set for the request
pub tunings: Tunings,
}
pub struct BackendOutput {
/// Request's unique identifier corresponding to [`BackendRequest`]
pub request_id: BackendRequestId,
/// Result of the request. [`CodedBitstreamBuffer`] containing encoded frame
pub buffer: CodedBitstreamBuffer,
}
/// Generic trait for stateful encoder backends
pub trait StatefulVideoEncoderBackend<Handle> {
/// Try to submit encode request to the backend. The backend may not be able to accept the
/// request eg. if there are not enough available resources or backend desires to finish
/// previous request first. The function shall not be blocking.
/// If backend accepts the request for processing it shall take the `request` (take ownership of
/// [`BackendRequest`] and set ref mut to [`None`].
fn consume_request(
&mut self,
request: &mut Option<BackendRequest<Handle>>,
) -> StatefulBackendResult<()>;
/// Function shall block, until the backend can accept request with [`consume_request`] or
/// will finished processing of some [`BackendRequest`] and [`poll`] can be used to
/// fetch is result.
///
/// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
/// [`poll`]: StatefulVideoEncoderBackend::poll
fn sync(&mut self) -> StatefulBackendResult<()>;
/// Blocking function, until the backend finishes processing all [`BackendRequest`], that the
/// backend has accepted and all outputs of those requests are returned.
fn drain(&mut self) -> StatefulBackendResult<Vec<BackendOutput>>;
/// If the processing of any [`BackendRequest`] is finished then the function should yield it's
/// corresponding [`BackendOutput`].
///
/// [`consume_request`]: StatefulVideoEncoderBackend::consume_request
fn poll(&mut self) -> StatefulBackendResult<Option<BackendOutput>>;
}
pub struct StatefulEncoder<Handle, Backend>
where
Backend: StatefulVideoEncoderBackend<Handle>,
{
/// Pending queue of frames to encoded by the backend
queue: VecDeque<BackendRequest<Handle>>,
/// Unique request identifier continue
request_counter: usize,
/// Latest [`Tunings`], that will be cloned in to request
tunings: Tunings,
/// Processed encoded bitstream queue for client to poll
coded_queue: VecDeque<CodedBitstreamBuffer>,
/// Currently processed requests by the backend
processing: BTreeSet<BackendRequestId>,
// [`StatefulVideoEncoderBackend`] instance to delegate [`BackendRequest`] to
backend: Backend,
}
impl<Handle, Backend> StatefulEncoder<Handle, Backend>
where
Backend: StatefulVideoEncoderBackend<Handle>,
{
/// Utility function that creates an new [`StatefulEncoder`] with [`Tunings`] and
/// [`StatefulVideoEncoderBackend`] instance.
#[allow(dead_code)]
fn create(tunings: Tunings, backend: Backend) -> Self {
Self {
queue: Default::default(),
request_counter: 0,
tunings,
coded_queue: Default::default(),
processing: Default::default(),
backend,
}
}
/// Handles the [`BackendOutput`] from the backend, ie add to the queue for client to poll.
fn handle_output(&mut self, output: BackendOutput) -> EncodeResult<()> {
log::debug!(
"Backend yieled output buffer for request id={:?} timestamp={} bytes={}",
output.request_id,
output.buffer.metadata.timestamp,
output.buffer.bitstream.len()
);
if !self.processing.remove(&output.request_id) {
log::warn!("Coded buffer returned for non existing or already processed request id={:?} timestamp={}",
output.request_id,
output.buffer.metadata.timestamp,
);
}
self.coded_queue.push_back(output.buffer);
Ok(())
}
/// Poll the backend for outputs and handles them
fn poll_backend(&mut self) -> EncodeResult<()> {
while let Some(output) = self.backend.poll()? {
self.handle_output(output)?;
}
Ok(())
}
/// Performs essential processing. Poll the backend for outputs and tries to submit requests to
/// backends.
fn process(&mut self) -> EncodeResult<()> {
log::debug!(
"Pending requests: {}, currently processed: {:?}, pending coded buffer: {}",
self.queue.len(),
self.processing,
self.coded_queue.len()
);
if !self.processing.is_empty() {
self.poll_backend()?;
}
while let Some(request) = self.queue.pop_front() {
let request_id = request.request_id;
let timestamp = request.meta.timestamp;
let mut request = Some(request);
log::trace!("Passing request to backend id={request_id:?} timestamp={timestamp}");
self.backend.consume_request(&mut request)?;
if let Some(request) = request {
log::trace!("Backend stalled request id={request_id:?} timestamp={timestamp}");
self.queue.push_front(request);
break;
} else {
log::debug!("Backend consumed request id={request_id:?} timestamp={timestamp}");
self.processing.insert(request_id);
}
}
Ok(())
}
/// [`StatefulVideoEncoderBackend`]'s instance
pub fn backend(&mut self) -> &Backend {
&self.backend
}
}
impl<Handle, Backend> VideoEncoder<Handle> for StatefulEncoder<Handle, Backend>
where
Backend: StatefulVideoEncoderBackend<Handle>,
{
fn tune(&mut self, tunings: Tunings) -> EncodeResult<()> {
self.tunings = tunings;
Ok(())
}
fn encode(&mut self, meta: FrameMetadata, handle: Handle) -> Result<(), EncodeError> {
let request_id = BackendRequestId(self.request_counter);
self.request_counter = self.request_counter.wrapping_add(1);
log::trace!("Got new request id={request_id:?} timestamp={}", meta.timestamp);
let request = BackendRequest { request_id, meta, handle, tunings: self.tunings.clone() };
self.queue.push_back(request);
self.process()?;
Ok(())
}
fn poll(&mut self) -> EncodeResult<Option<CodedBitstreamBuffer>> {
if !self.queue.is_empty() || !self.processing.is_empty() {
self.process()?;
}
if let Some(buffer) = self.coded_queue.pop_front() {
log::debug!("Returning coded buffer timestamp={}", buffer.metadata.timestamp);
return Ok(Some(buffer));
}
Ok(None)
}
fn drain(&mut self) -> EncodeResult<()> {
log::debug!(
"Got drain request. Pending in queue: {}. Currently processed: {:?}",
self.queue.len(),
self.processing
);
while !self.queue.is_empty() {
self.process()?;
if !self.queue.is_empty() {
self.backend.sync()?;
}
}
if self.processing.is_empty() {
log::debug!("Skipping drain request to backend, everything is drained");
}
log::debug!("Sending drain request to backend");
for output in self.backend.drain()? {
self.handle_output(output)?;
}
Ok(())
}
}