| // Copyright 2024 The ChromiumOS Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use nix::errno::Errno; |
| use nix::sys::eventfd::EfdFlags; |
| use nix::sys::eventfd::EventFd; |
| |
| use thiserror::Error; |
| |
| use std::collections::VecDeque; |
| use std::marker::PhantomData; |
| use std::sync::atomic::AtomicU32; |
| use std::sync::Arc; |
| use std::sync::Condvar; |
| use std::sync::Mutex; |
| use std::thread; |
| use std::thread::JoinHandle; |
| use std::vec::Vec; |
| |
| use crate::decoder::StreamInfo; |
| use crate::video_frame::VideoFrame; |
| use crate::Fourcc; |
| |
| pub mod c2_decoder; |
| pub mod c2_encoder; |
| #[cfg(feature = "v4l2")] |
| pub mod c2_v4l2_decoder; |
| #[cfg(feature = "v4l2")] |
| pub mod c2_v4l2_encoder; |
| #[cfg(feature = "vaapi")] |
| pub mod c2_vaapi_decoder; |
| #[cfg(feature = "vaapi")] |
| pub mod c2_vaapi_encoder; |
| |
| #[derive(Debug, Default, PartialEq, Eq, Copy, Clone)] |
| pub enum DrainMode { |
| // Not draining |
| #[default] |
| NoDrain = -1, |
| // Drain the C2 component and signal an EOS. Currently we also change the state to stop. |
| EOSDrain = 0, |
| // Drain the C2 component, but keep accepting new jobs in the queue immediately after. |
| NoEOSDrain = 1, |
| // Drain signal coming from a drain() or flush() call. These are distinct because we should |
| // not return C2Work items for these. |
| SyntheticDrain = 2, |
| } |
| |
| #[derive(Debug)] |
| pub struct C2DecodeJob<V: VideoFrame> { |
| // Compressed input data |
| // TODO: Use VideoFrame for input too |
| pub input: Vec<u8>, |
| // Decompressed output frame. Note that this needs to be reference counted because we may still |
| // use this frame as a reference frame even while we're displaying it. |
| pub output: Option<Arc<V>>, |
| pub timestamp: u64, |
| pub drain: DrainMode, |
| // Keeps track of whether or not the corresponding C2Work item contains actual frame data. |
| // Codec2 will expect the C2Work item back regardless, so we will need to send back an empty |
| // C2Work with the correct timestamp if this is false. Note that this field is populated by the |
| // C2DecoderWorker based on parsing, so we don't actually need to populate this at the HAL |
| // level. |
| pub contains_visible_frame: bool, |
| pub codec_specific_data: bool, |
| // TODO: Add output delay and color aspect support as needed. |
| } |
| |
| impl<V> Job for C2DecodeJob<V> |
| where |
| V: VideoFrame, |
| { |
| type Frame = V; |
| |
| fn set_drain(&mut self, drain: DrainMode) { |
| self.drain = drain; |
| } |
| |
| fn get_drain(&self) -> DrainMode { |
| self.drain |
| } |
| } |
| |
| impl<V: VideoFrame> Default for C2DecodeJob<V> { |
| fn default() -> Self { |
| Self { |
| input: vec![], |
| output: None, |
| timestamp: 0, |
| contains_visible_frame: false, |
| drain: DrainMode::NoDrain, |
| codec_specific_data: false, |
| } |
| } |
| } |
| |
| pub trait Job: Send + 'static { |
| type Frame: VideoFrame; |
| |
| fn set_drain(&mut self, drain: DrainMode) -> (); |
| fn get_drain(&self) -> DrainMode; |
| } |
| |
| #[derive(Debug)] |
| pub struct C2EncodeJob<V: VideoFrame> { |
| pub input: Option<V>, |
| // TODO: Use VideoFrame for output too |
| pub output: Vec<u8>, |
| // Codec-specific data to output as part of this C2EncodeJob. This is intended to be used in the |
| // H.264 case as "initialization data" for the client: the first C2EncodeJob to be sent to the |
| // client should contain the concatenation of the SPS and PPS NALUs separated by start codes as |
| // follows: |
| // |
| // [0x0, 0x0, 0x0, 0x1, ... SPS ..., 0x0, 0x0, 0x0, 0x1, ... PPS ...] |
| // |
| // On the C++ HAL side, this CSD, if not empty, is expected to get copied to a |
| // C2StreamInitDataInfo::output parameter. |
| pub csd: Vec<u8>, |
| // This is an input/output member: |
| // |
| // - When `Self` is the input, this member indicates whether to request a "sync frame" from the |
| // underlying encoder. |
| // |
| // - When `Self` is the output (i.e., on job completion), this member indicates whether |
| // `Self::output` compresses a "sync frame." |
| // |
| // Refer to `CodedBitstreamBuffer::is_sync_frame` for details on what a "sync frame" is. |
| pub is_sync_frame: bool, |
| // In microseconds. |
| pub timestamp: u64, |
| // TODO: only support CBR right now, follow up with VBR support. |
| pub bitrate: u64, |
| // Framerate is actually negotiated, so the encoder can change this value |
| // based on the timestamps of the frames it receives. |
| pub framerate: Arc<AtomicU32>, |
| pub drain: DrainMode, |
| } |
| |
| impl<V: VideoFrame> Default for C2EncodeJob<V> { |
| fn default() -> Self { |
| Self { |
| input: None, |
| output: vec![], |
| csd: vec![], |
| is_sync_frame: false, |
| timestamp: 0, |
| bitrate: 0, |
| framerate: Arc::new(AtomicU32::new(0)), |
| drain: DrainMode::NoDrain, |
| } |
| } |
| } |
| |
| impl<V> Job for C2EncodeJob<V> |
| where |
| V: VideoFrame, |
| { |
| type Frame = V; |
| |
| fn set_drain(&mut self, drain: DrainMode) { |
| self.drain = drain; |
| } |
| |
| fn get_drain(&self) -> DrainMode { |
| self.drain |
| } |
| } |
| |
| #[derive(Debug, PartialEq, Eq, Copy, Clone)] |
| pub enum C2State { |
| C2Running, |
| C2Stopping, |
| C2Stopped, |
| // Note that on state C2Error, stop() must be called before we can start() |
| // again. |
| C2Error, |
| C2Release, |
| } |
| |
| // This is not a very "Rust-y" way of doing error handling, but it will |
| // hopefully make the FFI easier to write. Numerical values taken from |
| // frameworks/av/media/codec2/core/include/C2.h |
| // TODO: Improve error handling by adding more statuses. |
| #[derive(Debug, PartialEq, Eq, Copy, Clone)] |
| pub enum C2Status { |
| C2Ok = 0, |
| C2BadState = 1, // EPERM |
| C2BadValue = 22, // EINVAL |
| } |
| |
| // J should be either C2DecodeJob or C2EncodeJob. |
| pub trait C2Worker<J> |
| where |
| J: Send + Job + 'static, |
| { |
| type Options: Clone + Send + 'static; |
| |
| fn new( |
| input_fourcc: Fourcc, |
| output_fourcc: Fourcc, |
| awaiting_job_event: Arc<EventFd>, |
| error_cb: Arc<Mutex<dyn FnMut(C2Status) + Send + 'static>>, |
| work_done_cb: Arc<Mutex<dyn FnMut(J) + Send + 'static>>, |
| work_queue: Arc<Mutex<VecDeque<J>>>, |
| state: Arc<(Mutex<C2State>, Condvar)>, |
| framepool_hint_cb: Arc< |
| Mutex< |
| dyn FnMut( |
| StreamInfo, |
| ) -> ( |
| /*client_linear_modifier: */ bool, |
| /*needs_bit_depth_subsampling: */ bool, |
| ) + Send |
| + 'static, |
| >, |
| >, |
| alloc_cb: Arc<Mutex<dyn FnMut() -> Option<<J as Job>::Frame> + Send + 'static>>, |
| options: Self::Options, |
| ) -> Result<Self, String> |
| where |
| Self: Sized; |
| |
| fn process_loop(&mut self); |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum C2WrapperError { |
| #[error("failed to create EventFd for awaiting job event: {0}")] |
| AwaitingJobEventFd(Errno), |
| } |
| |
| // Note that we do not guarantee thread safety in C2CrosCodecsWrapper. |
| pub struct C2Wrapper<J, W> |
| where |
| J: Send + Default + Job + 'static, |
| W: C2Worker<J>, |
| { |
| awaiting_job_event: Arc<EventFd>, |
| work_queue: Arc<Mutex<VecDeque<J>>>, |
| state: Arc<(Mutex<C2State>, Condvar)>, |
| // This isn't actually optional, but we want to join this handle in drop(), but because drop() |
| // takes an &mut self, we can't actually take ownership of this variable. So we workaround this |
| // by just making it an optional and swapping it with None in drop(). |
| worker_thread: Option<JoinHandle<()>>, |
| // The instance of W actually lives in the thread creation closure, not |
| // this struct. We use "fn() -> W" for this type signature instead of just regular "W" as a |
| // workaround to make sure this PhantomData doesn't affect the Send and Sync properties of the |
| // overall C2Wrapper. |
| _phantom: PhantomData<fn() -> W>, |
| } |
| |
| impl<J, W> C2Wrapper<J, W> |
| where |
| J: Send + Default + Job + 'static, |
| W: C2Worker<J>, |
| { |
| pub fn new( |
| input_fourcc: Fourcc, |
| output_fourcc: Fourcc, |
| error_cb: impl FnMut(C2Status) + Send + 'static, |
| work_done_cb: impl FnMut(J) + Send + 'static, |
| framepool_hint_cb: impl FnMut( |
| StreamInfo, |
| ) -> ( |
| /*client_linear_modifier: */ bool, |
| /*needs_bit_depth_subsampling: */ bool, |
| ) + Send |
| + 'static, |
| alloc_cb: impl FnMut() -> Option<<J as Job>::Frame> + Send + 'static, |
| options: <W as C2Worker<J>>::Options, |
| ) -> Self { |
| let awaiting_job_event = Arc::new( |
| EventFd::from_flags(EfdFlags::EFD_SEMAPHORE) |
| .map_err(C2WrapperError::AwaitingJobEventFd) |
| .unwrap(), |
| ); |
| let awaiting_job_event_clone = awaiting_job_event.clone(); |
| let error_cb = Arc::new(Mutex::new(error_cb)); |
| let work_done_cb = Arc::new(Mutex::new(work_done_cb)); |
| let work_queue: Arc<Mutex<VecDeque<J>>> = Arc::new(Mutex::new(VecDeque::new())); |
| let work_queue_clone = work_queue.clone(); |
| let state = Arc::new((Mutex::new(C2State::C2Stopped), Condvar::new())); |
| let state_clone = state.clone(); |
| let framepool_hint_cb = Arc::new(Mutex::new(framepool_hint_cb)); |
| let alloc_cb = Arc::new(Mutex::new(alloc_cb)); |
| let worker_thread = Some(thread::spawn(move || { |
| let (state_lock, state_cvar) = &*state_clone; |
| let mut state = state_lock.lock().expect("Could not lock state"); |
| while *state != C2State::C2Release { |
| if *state == C2State::C2Running { |
| // Otherwise we will just hold the lock during the processing loop, which will |
| // cause a deadlock. |
| drop(state); |
| |
| let worker = W::new( |
| input_fourcc.clone(), |
| output_fourcc.clone(), |
| awaiting_job_event_clone.clone(), |
| error_cb.clone(), |
| work_done_cb.clone(), |
| work_queue_clone.clone(), |
| state_clone.clone(), |
| framepool_hint_cb.clone(), |
| alloc_cb.clone(), |
| options.clone(), |
| ); |
| match worker { |
| Ok(mut worker) => { |
| worker.process_loop(); |
| |
| // Note that we only lock the state again after the process loop exits. |
| state = state_lock.lock().expect("Could not lock state"); |
| *state = C2State::C2Stopped; |
| state_cvar.notify_one(); |
| } |
| Err(msg) => { |
| log::debug!("Error instantiating C2Worker {}", msg); |
| state = state_lock.lock().expect("Could not lock state"); |
| *state = C2State::C2Error; |
| state_cvar.notify_one(); |
| (*error_cb.lock().unwrap())(C2Status::C2BadValue); |
| } |
| }; |
| } else { |
| // This is needed to handle the circumstance in which we call reset() after an |
| // error. The state will be C2Error, not C2Running, so we can't rely on the |
| // above logic to process the stop request. |
| if *state == C2State::C2Stopping { |
| *state = C2State::C2Stopped; |
| state_cvar.notify_one(); |
| } |
| |
| // It's important that this wait() call goes here, after the check for |
| // C2Running. Otherwise the call to start() might be executed before we fully |
| // initialize this thread. Because notify_one() doesn't do any kind of |
| // buffering, we can miss our "wake-up call" and just wait indefinitely. |
| state = state_cvar.wait(state).unwrap(); |
| } |
| } |
| })); |
| |
| Self { |
| awaiting_job_event: awaiting_job_event, |
| work_queue, |
| state, |
| worker_thread, |
| _phantom: Default::default(), |
| } |
| } |
| |
| // Start the decoder/encoder. |
| // State will be C2Running after this call. |
| pub fn start(&mut self) -> C2Status { |
| let (state_lock, state_cvar) = &*self.state; |
| { |
| let mut state = state_lock.lock().expect("Could not lock state"); |
| if *state != C2State::C2Stopped { |
| return C2Status::C2BadState; |
| } |
| *state = C2State::C2Running; |
| state_cvar.notify_one(); |
| } |
| |
| C2Status::C2Ok |
| } |
| |
| // Helper method for stop() and reset() to re-use code: if `is_reset` is |
| // true, no state validation is performed (suitable for reset()), otherwise |
| // we validate that we're in the C2Running state (suitable for stop()). This |
| // is necessary to abide by the C2Component API. |
| fn stop_internal(&mut self, is_reset: bool) -> C2Status { |
| let (state_lock, state_cvar) = &*self.state; |
| { |
| let mut state = state_lock.lock().expect("Could not lock state"); |
| if !is_reset && *state != C2State::C2Running { |
| return C2Status::C2BadState; |
| } |
| *state = C2State::C2Stopping; |
| state_cvar.notify_one(); |
| } |
| |
| self.work_queue.lock().unwrap().drain(..); |
| |
| self.awaiting_job_event.write(1).unwrap(); |
| |
| let mut state = state_lock.lock().expect("Could not lock state"); |
| while *state == C2State::C2Stopping { |
| state = state_cvar.wait(state).unwrap(); |
| } |
| |
| C2Status::C2Ok |
| } |
| |
| // Stop the decoder/encoder and abandon in-flight work. |
| // Note that in event of error, stop() must be called before we can start() |
| // again. This is to ensure we clear out the work queue. |
| // State will be C2Stopped after this call. |
| pub fn stop(&mut self) -> C2Status { |
| self.stop_internal(/*is_reset=*/ false) |
| } |
| |
| // Reset the decoder/encoder and abandon in-flight work. |
| // For our purposes, this is equivalent to stop() except for the fact that |
| // this method doesn't fail if the state is already C2Stopped. |
| // State will be C2Stopped after this call. |
| pub fn reset(&mut self) -> C2Status { |
| self.stop_internal(/*is_reset=*/ true) |
| } |
| |
| // Add work to the work queue. |
| // State must be C2Running or this function is invalid. |
| // State will remain C2Running. |
| pub fn queue(&mut self, work_items: Vec<J>) -> C2Status { |
| if *self.state.0.lock().expect("Could not lock state") != C2State::C2Running { |
| return C2Status::C2BadState; |
| } |
| |
| self.work_queue.lock().unwrap().extend(work_items.into_iter()); |
| |
| self.awaiting_job_event.write(1).unwrap(); |
| |
| C2Status::C2Ok |
| } |
| |
| // Flush work from the queue and return it as |flushed_work|. |
| // State will not change after this call. |
| // TODO: Support different flush modes. |
| pub fn flush(&mut self, flushed_work: &mut Vec<J>) -> C2Status { |
| if *self.state.0.lock().expect("Could not lock state") != C2State::C2Running { |
| return C2Status::C2BadState; |
| } |
| |
| { |
| let mut work_queue = self.work_queue.lock().unwrap(); |
| let mut tmp = work_queue.drain(..).collect::<Vec<J>>(); |
| flushed_work.append(&mut tmp); |
| |
| // Note that we don't just call drain() because we want to guarantee atomicity with respect |
| // to the work_queue eviction. |
| let mut drain_job: J = Default::default(); |
| drain_job.set_drain(DrainMode::SyntheticDrain); |
| work_queue.push_back(drain_job); |
| } |
| |
| self.awaiting_job_event.write(1).unwrap(); |
| |
| C2Status::C2Ok |
| } |
| |
| // Signal to the decoder/encoder that it does not need to wait for |
| // additional work to begin processing. This is an unusual name for this |
| // function, but it is the convention that C2 uses. |
| // State must be C2Running or this function is invalid. |
| // State will remain C2Running after the drain is complete. |
| // |
| // TODO(b/389993558): The "state will remain C2Running after the drain is complete" behavior |
| // still needs to be implemented for the encoder. |
| // |
| // TODO: Support different drain modes. |
| pub fn drain(&mut self, _mode: DrainMode) -> C2Status { |
| if *self.state.0.lock().expect("Could not lock state") != C2State::C2Running { |
| return C2Status::C2BadState; |
| } |
| |
| let mut drain_job: J = Default::default(); |
| drain_job.set_drain(DrainMode::SyntheticDrain); |
| self.work_queue.lock().unwrap().push_back(drain_job); |
| |
| self.awaiting_job_event.write(1).unwrap(); |
| |
| C2Status::C2Ok |
| } |
| } |
| |
| // Instead of C2's release() function, we implement Drop and use RAII to |
| // accomplish the same thing |
| impl<J, W> Drop for C2Wrapper<J, W> |
| where |
| J: Send + Default + Job + 'static, |
| W: C2Worker<J>, |
| { |
| fn drop(&mut self) { |
| // Note: we call reset() instead of stop() so that it doesn't matter if |
| // we're already in the C2Stopped state. |
| self.reset(); |
| |
| let (state_lock, state_cvar) = &*self.state; |
| *state_lock.lock().expect("Could not lock state") = C2State::C2Release; |
| state_cvar.notify_one(); |
| self.worker_thread.take().unwrap().join(); |
| } |
| } |