blob: 97b33092b1b06d105de710d416200b4186a715ce [file] [log] [blame]
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use nix::errno::Errno;
use nix::sys::eventfd::EfdFlags;
use nix::sys::eventfd::EventFd;
use thiserror::Error;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::vec::Vec;
use crate::decoder::StreamInfo;
use crate::video_frame::VideoFrame;
use crate::Fourcc;
pub mod c2_decoder;
pub mod c2_encoder;
#[cfg(feature = "v4l2")]
pub mod c2_v4l2_decoder;
#[cfg(feature = "v4l2")]
pub mod c2_v4l2_encoder;
#[cfg(feature = "vaapi")]
pub mod c2_vaapi_decoder;
#[cfg(feature = "vaapi")]
pub mod c2_vaapi_encoder;
#[derive(Debug, Default, PartialEq, Eq, Copy, Clone)]
pub enum DrainMode {
// Not draining
#[default]
NoDrain = -1,
// Drain the C2 component and signal an EOS. Currently we also change the state to stop.
EOSDrain = 0,
// Drain the C2 component, but keep accepting new jobs in the queue immediately after.
NoEOSDrain = 1,
}
#[derive(Debug)]
pub struct C2DecodeJob<V: VideoFrame> {
// Compressed input data
// TODO: Use VideoFrame for input too
pub input: Vec<u8>,
// Decompressed output frame. Note that this needs to be reference counted because we may still
// use this frame as a reference frame even while we're displaying it.
pub output: Option<Arc<V>>,
pub drain: DrainMode,
// TODO: Add output delay and color aspect support as needed.
}
impl<V> Job for C2DecodeJob<V>
where
V: VideoFrame,
{
type Frame = V;
fn set_drain(&mut self, drain: DrainMode) {
self.drain = drain;
}
fn get_drain(&self) -> DrainMode {
self.drain
}
}
impl<V: VideoFrame> Default for C2DecodeJob<V> {
fn default() -> Self {
Self { input: vec![], output: None, drain: DrainMode::NoDrain }
}
}
pub trait Job: Send + 'static {
type Frame: VideoFrame;
fn set_drain(&mut self, drain: DrainMode) -> ();
fn get_drain(&self) -> DrainMode;
}
#[derive(Debug)]
pub struct C2EncodeJob<V: VideoFrame> {
pub input: Option<V>,
// TODO: Use VideoFrame for output too
pub output: Vec<u8>,
// In microseconds.
pub timestamp: u64,
// TODO: only support CBR right now, follow up with VBR support.
pub bitrate: u64,
// Framerate is actually negotiated, so the encoder can change this value
// based on the timestamps of the frames it receives.
pub framerate: Arc<AtomicU32>,
pub drain: DrainMode,
}
impl<V: VideoFrame> Default for C2EncodeJob<V> {
fn default() -> Self {
Self {
input: None,
output: vec![],
timestamp: 0,
bitrate: 0,
framerate: Arc::new(AtomicU32::new(0)),
drain: DrainMode::NoDrain,
}
}
}
impl<V> Job for C2EncodeJob<V>
where
V: VideoFrame,
{
type Frame = V;
fn set_drain(&mut self, drain: DrainMode) {
self.drain = drain;
}
fn get_drain(&self) -> DrainMode {
self.drain
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum C2State {
C2Running,
C2Stopped,
// Note that on state C2Error, stop() must be called before we can start()
// again.
C2Error,
}
// This is not a very "Rust-y" way of doing error handling, but it will
// hopefully make the FFI easier to write. Numerical values taken from
// frameworks/av/media/codec2/core/include/C2.h
// TODO: Improve error handling by adding more statuses.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum C2Status {
C2Ok = 0,
C2BadState = 1, // EPERM
C2BadValue = 22, // EINVAL
}
// J should be either C2DecodeJob or C2EncodeJob.
pub trait C2Worker<J>
where
J: Send + Job + 'static,
{
type Options: Clone + Send + 'static;
fn new(
input_fourcc: Fourcc,
output_fourcc: Fourcc,
awaiting_job_event: Arc<EventFd>,
error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>,
work_queue: Arc<Mutex<VecDeque<J>>>,
state: Arc<Mutex<C2State>>,
framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>,
options: Self::Options,
) -> Result<Self, String>
where
Self: Sized;
fn process_loop(&mut self);
}
#[derive(Debug, Error)]
pub enum C2WrapperError {
#[error("failed to create EventFd for awaiting job event: {0}")]
AwaitingJobEventFd(Errno),
}
// Note that we do not guarantee thread safety in C2CrosCodecsWrapper.
pub struct C2Wrapper<J, W>
where
J: Send + Default + Job + 'static,
W: C2Worker<J>,
{
awaiting_job_event: Arc<EventFd>,
input_fourcc: Fourcc,
output_fourcc: Fourcc,
error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>,
work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>,
work_queue: Arc<Mutex<VecDeque<J>>>,
state: Arc<Mutex<C2State>>,
framepool_hint_cb: Arc<Mutex<dyn FnMut(StreamInfo) + Send + 'static>>,
alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>,
options: <W as C2Worker<J>>::Options,
worker_thread: Option<JoinHandle<()>>,
// The instance of V actually lives in the thread creation closure, not
// this struct.
_phantom: PhantomData<W>,
}
impl<J, W> C2Wrapper<J, W>
where
J: Send + Default + Job + 'static,
W: C2Worker<J>,
{
pub fn new(
input_fourcc: Fourcc,
output_fourcc: Fourcc,
error_cb: impl FnMut(C2Status) + Send + 'static,
work_done_cb: impl FnMut(J) + Send + 'static,
framepool_hint_cb: impl FnMut(StreamInfo) + Send + 'static,
alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static,
options: <W as C2Worker<J>>::Options,
) -> Self {
Self {
awaiting_job_event: Arc::new(
EventFd::from_flags(EfdFlags::EFD_SEMAPHORE)
.map_err(C2WrapperError::AwaitingJobEventFd)
.unwrap(),
),
input_fourcc: input_fourcc,
output_fourcc: output_fourcc,
error_cb: Arc::new(Mutex::new(error_cb)),
work_done_cb: Arc::new(Mutex::new(work_done_cb)),
work_queue: Arc::new(Mutex::new(VecDeque::new())),
state: Arc::new(Mutex::new(C2State::C2Stopped)),
framepool_hint_cb: Arc::new(Mutex::new(framepool_hint_cb)),
alloc_cb: Arc::new(Mutex::new(alloc_cb)),
options: options,
worker_thread: None,
_phantom: Default::default(),
}
}
// This isn't part of C2, but it's convenient to check if our worker thread
// is still running.
pub fn is_alive(&self) -> bool {
match &self.worker_thread {
Some(worker_thread) => !worker_thread.is_finished(),
None => false,
}
}
// Start the decoder/encoder.
// State will be C2Running after this call.
pub fn start(&mut self) -> C2Status {
{
let mut state = self.state.lock().unwrap();
if *state != C2State::C2Stopped {
(*self.error_cb.lock().unwrap())(C2Status::C2BadState);
return C2Status::C2BadState;
}
*state = C2State::C2Running;
}
let input_fourcc = self.input_fourcc.clone();
let output_fourcc = self.output_fourcc.clone();
let error_cb = self.error_cb.clone();
let work_done_cb = self.work_done_cb.clone();
let work_queue = self.work_queue.clone();
let state = self.state.clone();
let options = self.options.clone();
let awaiting_job_event = self.awaiting_job_event.clone();
let framepool_hint_cb = self.framepool_hint_cb.clone();
let alloc_cb = self.alloc_cb.clone();
self.worker_thread = Some(thread::spawn(move || {
let worker = W::new(
input_fourcc,
output_fourcc,
awaiting_job_event,
error_cb.clone(),
work_done_cb,
work_queue,
state.clone(),
framepool_hint_cb,
alloc_cb,
options,
);
match worker {
Ok(mut worker) => worker.process_loop(),
Err(msg) => {
log::debug!("Error instantiating C2Worker {}", msg);
*state.lock().unwrap() = C2State::C2Error;
(*error_cb.lock().unwrap())(C2Status::C2BadValue);
}
};
}));
C2Status::C2Ok
}
// Stop the decoder/encoder and abandon in-flight work.
// C2's reset() function is equivalent for our purposes.
// Note that in event of error, stop() must be called before we can start()
// again. This is to ensure we clear out the work queue.
// State will be C2Stopped after this call.
pub fn stop(&mut self) -> C2Status {
*self.state.lock().unwrap() = C2State::C2Stopped;
let mut worker_thread: Option<JoinHandle<()>> = None;
std::mem::swap(&mut worker_thread, &mut self.worker_thread);
self.worker_thread = match worker_thread {
Some(worker_thread) => {
let _ = worker_thread.join();
None
}
None => None,
};
self.work_queue.lock().unwrap().drain(..);
self.awaiting_job_event.write(1).unwrap();
C2Status::C2Ok
}
// Add work to the work queue.
// State must be C2Running or this function is invalid.
// State will remain C2Running.
pub fn queue(&mut self, work_items: Vec<J>) -> C2Status {
if *self.state.lock().unwrap() != C2State::C2Running {
(*self.error_cb.lock().unwrap())(C2Status::C2BadState);
return C2Status::C2BadState;
}
self.work_queue.lock().unwrap().extend(work_items.into_iter());
self.awaiting_job_event.write(1).unwrap();
C2Status::C2Ok
}
// Flush work from the queue and return it as |flushed_work|.
// State will not change after this call.
// TODO: Support different flush modes.
pub fn flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status {
if *self.state.lock().unwrap() != C2State::C2Running {
(*self.error_cb.lock().unwrap())(C2Status::C2BadState);
return C2Status::C2BadState;
}
let mut tmp = self.work_queue.lock().unwrap().drain(..).collect::<Vec<J>>();
flushed_work.append(&mut tmp);
C2Status::C2Ok
}
// Signal to the decoder/encoder that it does not need to wait for
// additional work to begin processing. This is an unusual name for this
// function, but it is the convention that C2 uses.
// State must be C2Running or this function is invalid.
// State will remain C2Running until the last frames drain, at which point
// the state will change to C2Stopped.
// TODO: Support different drain modes.
pub fn drain(&mut self, mode: DrainMode) -> C2Status {
if *self.state.lock().unwrap() != C2State::C2Running {
(*self.error_cb.lock().unwrap())(C2Status::C2BadState);
return C2Status::C2BadState;
}
let mut drain_job: J = Default::default();
drain_job.set_drain(mode);
self.work_queue.lock().unwrap().push_back(drain_job);
self.awaiting_job_event.write(1).unwrap();
C2Status::C2Ok
}
}
// Instead of C2's release() function, we implement Drop and use RAII to
// accomplish the same thing
impl<J, W> Drop for C2Wrapper<J, W>
where
J: Send + Default + Job + 'static,
W: C2Worker<J>,
{
fn drop(&mut self) {
self.stop();
}
}