| use crate::{ |
| device::{ |
| poller::{DeviceEvent, PollError, PollEvent, Poller, Waker}, |
| queue::{ |
| self, |
| direction::{Capture, Output}, |
| dqbuf::DQBuffer, |
| handles_provider::HandlesProvider, |
| qbuf::{ |
| get_free::{GetFreeBufferError, GetFreeCaptureBuffer, GetFreeOutputBuffer}, |
| get_indexed::GetCaptureBufferByIndex, |
| CaptureQueueable, OutputQueueableProvider, |
| }, |
| BuffersAllocated, CreateQueueError, FormatBuilder, Queue, QueueInit, |
| RequestBuffersError, |
| }, |
| AllocatedQueue, Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue, |
| }, |
| ioctl::{ |
| self, subscribe_event, BufferCapabilities, Fmt, FormatFlags, SelectionTarget, StreamOnError, |
| }, |
| memory::{BufferHandles, PrimitiveBufferHandles}, |
| FormatConversionError, |
| }; |
| |
| use log::{debug, error, info, trace, warn}; |
| use std::{ |
| io, |
| path::Path, |
| sync::{atomic::AtomicUsize, mpsc, Arc}, |
| thread::JoinHandle, |
| }; |
| use thiserror::Error; |
| |
| use super::*; |
| |
| // Trait implemented by all states of the decoder. |
| pub trait DecoderState {} |
| |
| pub struct Decoder<S: DecoderState> { |
| device: Arc<Device>, |
| state: S, |
| } |
| |
| pub struct AwaitingOutputFormat { |
| output_queue: Queue<Output, QueueInit>, |
| capture_queue: Queue<Capture, QueueInit>, |
| } |
| impl DecoderState for AwaitingOutputFormat {} |
| |
| #[derive(Debug, Error)] |
| pub enum DecoderOpenError { |
| #[error("Error while opening device")] |
| DeviceOpenError(#[from] DeviceOpenError), |
| #[error("Error while creating queue")] |
| CreateQueueError(#[from] CreateQueueError), |
| #[error("Specified device is not a stateful decoder")] |
| NotAStatefulDecoder, |
| } |
| |
| impl Decoder<AwaitingOutputFormat> { |
| pub fn open(path: &Path) -> Result<Self, DecoderOpenError> { |
| let config = DeviceConfig::new().non_blocking_dqbuf(); |
| let device = Arc::new(Device::open(path, config)?); |
| |
| // Check that the device is indeed a stateful decoder. |
| let capture_queue = Queue::get_capture_mplane_queue(device.clone())?; |
| let output_queue = Queue::get_output_mplane_queue(device.clone())?; |
| |
| // On a decoder, the OUTPUT formats are compressed, but the CAPTURE ones are not. |
| // Return an error if our device does not satisfy these conditions. |
| output_queue |
| .format_iter() |
| .find(|fmt| fmt.flags.contains(FormatFlags::COMPRESSED)) |
| .and( |
| capture_queue |
| .format_iter() |
| .find(|fmt| !fmt.flags.contains(FormatFlags::COMPRESSED)), |
| ) |
| .ok_or(DecoderOpenError::NotAStatefulDecoder) |
| .map(|_| ())?; |
| |
| // A stateful decoder won't expose the requests capability on the OUTPUT |
| // queue, a stateless one will. |
| if output_queue |
| .get_capabilities() |
| .contains(BufferCapabilities::SUPPORTS_REQUESTS) |
| { |
| return Err(DecoderOpenError::NotAStatefulDecoder); |
| } |
| |
| Ok(Decoder { |
| device, |
| state: AwaitingOutputFormat { |
| output_queue, |
| capture_queue, |
| }, |
| }) |
| } |
| |
| pub fn set_output_format<F>(mut self, f: F) -> anyhow::Result<Decoder<AwaitingOutputBuffers>> |
| where |
| F: FnOnce(FormatBuilder) -> anyhow::Result<()>, |
| { |
| let builder = self.state.output_queue.change_format()?; |
| f(builder)?; |
| |
| Ok(Decoder { |
| device: self.device, |
| state: AwaitingOutputBuffers { |
| output_queue: self.state.output_queue, |
| capture_queue: self.state.capture_queue, |
| }, |
| }) |
| } |
| } |
| |
| pub struct AwaitingOutputBuffers { |
| output_queue: Queue<Output, QueueInit>, |
| capture_queue: Queue<Capture, QueueInit>, |
| } |
| impl DecoderState for AwaitingOutputBuffers {} |
| |
| impl Decoder<AwaitingOutputBuffers> { |
| pub fn allocate_output_buffers_generic<OP: BufferHandles>( |
| self, |
| memory_type: OP::SupportedMemoryType, |
| num_buffers: usize, |
| ) -> Result<Decoder<OutputBuffersAllocated<OP>>, RequestBuffersError> { |
| Ok(Decoder { |
| device: self.device, |
| state: OutputBuffersAllocated { |
| output_queue: self |
| .state |
| .output_queue |
| .request_buffers_generic::<OP>(memory_type, num_buffers as u32)?, |
| capture_queue: self.state.capture_queue, |
| poll_wakeups_counter: None, |
| }, |
| }) |
| } |
| |
| pub fn allocate_output_buffers<OP: PrimitiveBufferHandles>( |
| self, |
| num_output: usize, |
| ) -> Result<Decoder<OutputBuffersAllocated<OP>>, RequestBuffersError> { |
| self.allocate_output_buffers_generic(OP::MEMORY_TYPE, num_output) |
| } |
| } |
| |
| pub struct OutputBuffersAllocated<OP: BufferHandles> { |
| output_queue: Queue<Output, BuffersAllocated<OP>>, |
| capture_queue: Queue<Capture, QueueInit>, |
| poll_wakeups_counter: Option<Arc<AtomicUsize>>, |
| } |
| impl<OP: BufferHandles> DecoderState for OutputBuffersAllocated<OP> {} |
| |
| #[derive(Debug, Error)] |
| pub enum StartDecoderError { |
| #[error("IO error")] |
| IoError(#[from] io::Error), |
| #[error("Cannot subscribe to decoder event")] |
| SubscribeEventError(#[from] ioctl::SubscribeEventError), |
| #[error("Error while starting the output queue")] |
| StreamOnError(#[from] StreamOnError), |
| } |
| |
| impl<OP: BufferHandles> Decoder<OutputBuffersAllocated<OP>> { |
| pub fn set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self { |
| self.state.poll_wakeups_counter = Some(poll_wakeups_counter); |
| self |
| } |
| |
| #[allow(clippy::type_complexity)] |
| pub fn start<P, InputDoneCb, FrameDecodedCb, FormatChangedCb>( |
| self, |
| input_done_cb: InputDoneCb, |
| output_ready_cb: FrameDecodedCb, |
| set_capture_format_cb: FormatChangedCb, |
| ) -> Result< |
| Decoder<Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb>>, |
| StartDecoderError, |
| > |
| where |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: |
| GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>, |
| { |
| // We are interested in all resolution change events. |
| subscribe_event( |
| &*self.device, |
| ioctl::EventType::SourceChange, |
| ioctl::SubscribeEventFlags::empty(), |
| )?; |
| |
| let mut output_poller = Poller::new(Arc::clone(&self.device))?; |
| output_poller.enable_event(DeviceEvent::OutputReady)?; |
| |
| let (command_sender, command_receiver) = mpsc::channel::<DecoderCommand>(); |
| let (response_sender, response_receiver) = mpsc::channel::<CaptureThreadResponse>(); |
| |
| let mut decoder_thread = DecoderThread::new( |
| &self.device, |
| self.state.capture_queue, |
| output_ready_cb, |
| set_capture_format_cb, |
| command_receiver, |
| response_sender, |
| )?; |
| |
| let command_waker = Arc::clone(&decoder_thread.command_waker); |
| |
| if let Some(counter) = &self.state.poll_wakeups_counter { |
| output_poller.set_poll_counter(Arc::clone(counter)); |
| decoder_thread.set_poll_counter(Arc::clone(counter)); |
| } |
| |
| let handle = std::thread::Builder::new() |
| .name("V4L2 Decoder".into()) |
| .spawn(move || decoder_thread.run())?; |
| |
| self.state.output_queue.stream_on()?; |
| |
| Ok(Decoder { |
| device: self.device, |
| state: Decoding { |
| output_queue: self.state.output_queue, |
| input_done_cb, |
| output_poller, |
| command_waker, |
| command_sender, |
| response_receiver, |
| handle, |
| }, |
| }) |
| } |
| } |
| |
| #[derive(Debug)] |
| enum DecoderCommand { |
| Drain(bool), |
| Flush, |
| Stop, |
| } |
| |
| #[derive(Debug)] |
| enum CaptureThreadResponse { |
| DrainDone(Result<bool, DrainError>), |
| FlushDone(anyhow::Result<()>), |
| } |
| |
| pub struct Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> |
| where |
| OP: BufferHandles, |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| output_queue: Queue<Output, BuffersAllocated<OP>>, |
| input_done_cb: InputDoneCb, |
| output_poller: Poller, |
| |
| command_waker: Arc<Waker>, |
| command_sender: mpsc::Sender<DecoderCommand>, |
| response_receiver: mpsc::Receiver<CaptureThreadResponse>, |
| |
| handle: JoinHandle<DecoderThread<P, FrameDecodedCb, FormatChangedCb>>, |
| } |
| impl<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> DecoderState |
| for Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> |
| where |
| OP: BufferHandles, |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum SendCommandError { |
| #[error("Error while queueing the message")] |
| SendError, |
| #[error("Error while poking the command waker")] |
| WakerError(#[from] io::Error), |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum StopError { |
| #[error("Error while sending the stop command to the capture thread")] |
| SendCommand(#[from] SendCommandError), |
| #[error("Error while waiting for the decoder thread to finish")] |
| Join, |
| #[error("Error while stopping the OUTPUT queue")] |
| Streamoff(#[from] ioctl::StreamOffError), |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum DrainError { |
| #[error("Cannot drain now: output format not yet determined")] |
| TryAgain, |
| #[error("Error while sending the flush command to the capture thread")] |
| SendCommand(#[from] SendCommandError), |
| #[error("Error while waiting for the decoder thread to drain")] |
| RecvError(#[from] mpsc::RecvError), |
| #[error("Error while draining on the capture thread")] |
| CaptureThreadError(anyhow::Error), |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum FlushError { |
| #[error("Error while stopping the OUTPUT queue")] |
| StreamoffError(#[from] ioctl::StreamOffError), |
| #[error("Error while sending the flush command to the capture thread")] |
| SendCommand(#[from] SendCommandError), |
| #[error("Error while waiting for the decoder thread to flush")] |
| RecvError(#[from] mpsc::RecvError), |
| #[error("Error while flushing on the capture thread")] |
| CaptureThreadError(anyhow::Error), |
| #[error("Error while starting the OUTPUT queue")] |
| StreamonError(#[from] ioctl::StreamOnError), |
| } |
| |
| #[allow(type_alias_bounds)] |
| type DequeueOutputBufferError<OP: BufferHandles> = ioctl::DQBufError<DQBuffer<Output, OP>>; |
| #[allow(type_alias_bounds)] |
| type CanceledBuffers<OP: BufferHandles> = |
| Vec<<Queue<Output, BuffersAllocated<OP>> as Stream>::Canceled>; |
| |
| impl<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> |
| Decoder<Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb>> |
| where |
| OP: BufferHandles, |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| pub fn num_output_buffers(&self) -> usize { |
| self.state.output_queue.num_buffers() |
| } |
| |
| /// Send a command to the capture thread. |
| fn send_command(&self, command: DecoderCommand) -> Result<(), SendCommandError> { |
| trace!("Sending command: {:?}", command); |
| |
| self.state |
| .command_sender |
| .send(command) |
| .map_err(|_| SendCommandError::SendError)?; |
| self.state.command_waker.wake()?; |
| |
| Ok(()) |
| } |
| |
| pub fn get_output_format<E: Into<FormatConversionError>, T: Fmt<E>>( |
| &self, |
| ) -> Result<T, ioctl::GFmtError> { |
| self.state.output_queue.get_format() |
| } |
| |
| /// Stop the decoder. |
| /// |
| /// This will stop any pending operation consume the decoder, which cannot |
| /// be used anymore. To make sure all submitted encoded buffers have been |
| /// processed, call the [`drain`] method and wait for the output buffer with |
| /// the LAST flag before calling this method. |
| /// |
| /// TODO potential bug: the LAST buffer could also be the one signaling a |
| /// DRC. We need another way to manage this? Probably a good idea to split |
| /// into two properties of DQBuf. |
| pub fn stop(self) -> Result<CanceledBuffers<OP>, StopError> { |
| debug!("Stop requested"); |
| self.send_command(DecoderCommand::Stop)?; |
| |
| match self.state.handle.join() { |
| Ok(_) => (), |
| Err(_) => return Err(StopError::Join), |
| } |
| |
| Ok(self.state.output_queue.stream_off()?) |
| } |
| |
| /// Drain the decoder, i.e. make sure all its pending work is processed. |
| /// |
| /// The `blocking` parameters decides whether this method is permitted to |
| /// block: if `true`, then all the frames corresponding to the encoded |
| /// buffers queued so far will have been emitted when this function returns. |
| /// In this case the method will always return `true` to signal that drain |
| /// has been completed. |
| /// |
| /// If `false`, then the method may also return `false` to signal that drain |
| /// has not completed yet. When this is the case, the client must look for |
| /// a decoded frame with the LAST flag set, as this flag signals that this |
| /// frame is the last one before the drain has completed. |
| /// |
| /// Note that requesting a blocking drain can be hazardous if the current |
| /// thread is responsible for e.g. submitting handles for decoded frames. It |
| /// is easy to put the decoding pipeline in a deadlock situation. |
| /// |
| /// The client can keep submitting buffers with encoded data as the drain is |
| /// ongoing. They will be processed in order and their frames will come |
| /// after the ones still in the pipeline. For a way to cancel all the |
| /// pending jobs, see the [`flush`] method. |
| pub fn drain(&self, blocking: bool) -> Result<bool, DrainError> { |
| debug!("Drain requested"); |
| self.send_command(DecoderCommand::Drain(blocking))?; |
| |
| match self.state.response_receiver.recv()? { |
| CaptureThreadResponse::DrainDone(response) => match response { |
| Ok(completed) => Ok(completed), |
| Err(e) => { |
| error!("Error while draining on the capture thread: {}", e); |
| Err(e) |
| } |
| }, |
| r => { |
| error!( |
| "Unexpected capture thread response received while draining: {:?}", |
| r |
| ); |
| Err(DrainError::CaptureThreadError(anyhow::anyhow!( |
| "Unexpected response while draining" |
| ))) |
| } |
| } |
| } |
| |
| /// Flush the decoder, i.e. try to cancel all pending work. |
| /// |
| /// The canceled input buffers will be returned as |
| /// `CompletedInputBuffer::Canceled` through the input done callback. |
| /// |
| /// If a [`drain`] operation was in progress, it is also canceled. |
| /// |
| /// This function is blocking. When is returns, the frame decoded callback |
| /// has been called for all pre-flush frames, and the decoder can accept new |
| /// content to decode. |
| pub fn flush(&self) -> Result<(), FlushError> { |
| debug!("Flush requested"); |
| let canceled_buffers = self.state.output_queue.stream_off()?; |
| |
| // Request the decoder to flush itself. |
| self.send_command(DecoderCommand::Flush)?; |
| |
| // Process our canceled input buffers in the meantime. |
| for buffer in canceled_buffers { |
| (self.state.input_done_cb)(CompletedInputBuffer::Canceled(buffer)); |
| } |
| |
| // Wait for the decoder thread to signal it is done with our request. |
| // TODO add timeout? |
| match self.state.response_receiver.recv()? { |
| CaptureThreadResponse::FlushDone(response) => match response { |
| Ok(()) => (), |
| Err(e) => { |
| error!("Error while flushing on the capture thread: {}", e); |
| return Err(FlushError::CaptureThreadError(e)); |
| } |
| }, |
| r => { |
| error!( |
| "Unexpected capture thread response received while flushing: {:?}", |
| r |
| ); |
| return Err(FlushError::CaptureThreadError(anyhow::anyhow!( |
| "Unexpected response while flushing" |
| ))); |
| } |
| } |
| |
| // Resume business. |
| self.state.output_queue.stream_on()?; |
| |
| debug!("Flush complete"); |
| Ok(()) |
| } |
| |
| /// Attempts to dequeue and release output buffers that the driver is done with. |
| fn dequeue_output_buffers(&self) -> Result<(), DequeueOutputBufferError<OP>> { |
| let output_queue = &self.state.output_queue; |
| |
| while output_queue.num_queued_buffers() > 0 { |
| match output_queue.try_dequeue() { |
| Ok(buf) => { |
| (self.state.input_done_cb)(CompletedInputBuffer::Dequeued(buf)); |
| } |
| Err(ioctl::DQBufError::NotReady) => break, |
| // TODO buffers with the error flag set should not result in |
| // a fatal error! |
| Err(e) => return Err(e), |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| // Make this thread sleep until at least one OUTPUT buffer is ready to be |
| // obtained through [`Decoder::try_get_buffer()`]. |
| fn wait_for_output_buffer(&mut self) -> Result<(), GetBufferError<OP>> { |
| for event in self.state.output_poller.poll(None)? { |
| match event { |
| PollEvent::Device(DeviceEvent::OutputReady) => { |
| self.dequeue_output_buffers()?; |
| } |
| _ => panic!("Unexpected return from OUTPUT queue poll!"), |
| } |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| impl<'a, OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> OutputQueueableProvider<'a, OP> |
| for Decoder<Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb>> |
| where |
| Queue<Output, BuffersAllocated<OP>>: OutputQueueableProvider<'a, OP>, |
| OP: BufferHandles, |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| type Queueable = |
| <Queue<Output, BuffersAllocated<OP>> as OutputQueueableProvider<'a, OP>>::Queueable; |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum GetBufferError<OP: BufferHandles> { |
| #[error("Error while dequeueing buffer")] |
| DequeueError(#[from] DequeueOutputBufferError<OP>), |
| #[error("Error during poll")] |
| PollError(#[from] PollError), |
| #[error("Error while obtaining buffer")] |
| GetFreeBufferError(#[from] GetFreeBufferError), |
| } |
| |
| /// Let the decoder provide the buffers from the OUTPUT queue. |
| impl<'a, OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> |
| GetFreeOutputBuffer<'a, OP, GetBufferError<OP>> |
| for Decoder<Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb>> |
| where |
| Queue<Output, BuffersAllocated<OP>>: GetFreeOutputBuffer<'a, OP>, |
| OP: BufferHandles, |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| /// Returns a V4L2 buffer to be filled with a frame to decode if one |
| /// is available. |
| /// |
| /// This method will return None immediately if all the allocated buffers |
| /// are currently queued. |
| fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError<OP>> { |
| self.dequeue_output_buffers()?; |
| Ok(self.state.output_queue.try_get_free_buffer()?) |
| } |
| } |
| |
| // If [`GetFreeBuffer`] is implemented, we can also provide a blocking `get_buffer` |
| // method. |
| impl<'a, OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb> |
| Decoder<Decoding<OP, P, InputDoneCb, FrameDecodedCb, FormatChangedCb>> |
| where |
| Self: GetFreeOutputBuffer<'a, OP, GetBufferError<OP>>, |
| OP: BufferHandles, |
| P: HandlesProvider, |
| InputDoneCb: InputDoneCallback<OP>, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| /// Returns a V4L2 buffer to be filled with a frame to encode, waiting for |
| /// one to be available if needed. |
| /// |
| /// Contrary to [`Decoder::try_get_free_buffer()`], this method will wait for a buffer |
| /// to be available if needed. |
| pub fn get_buffer( |
| &'a mut self, |
| ) -> Result<<Self as OutputQueueableProvider<'a, OP>>::Queueable, GetBufferError<OP>> { |
| let output_queue = &self.state.output_queue; |
| |
| // If all our buffers are queued, wait until we can dequeue some. |
| if output_queue.num_queued_buffers() == output_queue.num_buffers() { |
| self.wait_for_output_buffer()?; |
| } |
| |
| self.try_get_free_buffer() |
| } |
| |
| /// Kick the decoder and see if some input buffers fall as a result. |
| /// |
| /// No, really. Completed input buffers are typically checked when calling |
| /// [`Decoder::get_buffer`] (which is also the time when the input done callback is |
| /// invoked), but this mechanism is not foolproof: if the client works with |
| /// a limited set of input buffers and queues them all before an output |
| /// frame can be produced, then the client has no more buffers to fill and |
| /// thus no reason to call [`Decoder::get_buffer`], resulting in the decoding process |
| /// being blocked. |
| /// |
| /// This method mitigates this problem by adding a way to check for |
| /// completed input buffers and calling the input done callback without the |
| /// need for new encoded content. It is suggested to call it from the thread |
| /// that owns the decoder every time a decoded frame is produced. |
| /// That way the client can recycle its input buffers |
| /// and the decoding process does not get stuck. |
| pub fn kick(&mut self) -> Result<(), DequeueOutputBufferError<OP>> { |
| info!("Kick!"); |
| self.dequeue_output_buffers() |
| } |
| } |
| |
| // TODO use ::new functions that take the queue and configure the state properly, with |
| // the poller, wakers, and all. |
| enum CaptureQueue<P: HandlesProvider> { |
| AwaitingResolution { |
| capture_queue: Queue<Capture, QueueInit>, |
| }, |
| Decoding { |
| capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>, |
| provider: P, |
| cap_buffer_waker: Arc<Waker>, |
| // TODO not super elegant... |
| blocking_drain_in_progress: bool, |
| }, |
| } |
| |
| struct DecoderThread<P, FrameDecodedCb, FormatChangedCb> |
| where |
| P: HandlesProvider, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| { |
| device: Arc<Device>, |
| capture_queue: CaptureQueue<P>, |
| poller: Poller, |
| |
| // Switched when we need the capture thread to quit and return the decoder |
| // to its initial state. |
| stop_flag: bool, |
| |
| output_ready_cb: FrameDecodedCb, |
| set_capture_format_cb: FormatChangedCb, |
| |
| // Waker signaled when the main thread has commands pending for us. |
| command_waker: Arc<Waker>, |
| // Receiver we read commands from when `command_waker` is signaled. |
| command_receiver: mpsc::Receiver<DecoderCommand>, |
| // Sender we use to send status messages after receiving commands from the |
| // main thread. |
| response_sender: mpsc::Sender<CaptureThreadResponse>, |
| } |
| |
| #[derive(Debug, Error)] |
| enum UpdateCaptureError { |
| #[error("Error while enabling poller events: {0}")] |
| PollerEvents(io::Error), |
| #[error("Error while removing CAPTURE waker: {0}")] |
| RemoveWaker(io::Error), |
| #[error("Error while stopping CAPTURE queue: {0}")] |
| Streamoff(#[from] ioctl::StreamOffError), |
| #[error("Error while freeing CAPTURE buffers: {0}")] |
| FreeBuffers(#[from] ioctl::ReqbufsError), |
| #[error("Error while obtaining CAPTURE format: {0}")] |
| GFmt(#[from] ioctl::GFmtError), |
| #[error("Error while obtaining selection target from CAPTURE queue: {0}")] |
| GSelection(#[from] ioctl::GSelectionError), |
| #[error("Error while running the CAPTURE format callback: {0}")] |
| Callback(#[from] anyhow::Error), |
| #[error("Error while requesting CAPTURE buffers: {0}")] |
| RequestBuffers(#[from] queue::RequestBuffersError), |
| #[error("Error while adding the CAPTURE buffer waker: {0}")] |
| AddWaker(io::Error), |
| #[error("Error while signaling the CAPTURE buffer waker: {0}")] |
| WakeWaker(io::Error), |
| #[error("Error while streaming CAPTURE queue: {0}")] |
| StreamOn(#[from] ioctl::StreamOnError), |
| } |
| |
| const CAPTURE_READY: u32 = 1; |
| const COMMAND_WAITING: u32 = 2; |
| |
| #[derive(Debug, Error)] |
| enum ProcessEventsError { |
| #[error("Error while dequeueing event")] |
| DQEvent(#[from] ioctl::DQEventError), |
| #[error("Error while requesting buffers")] |
| RequestBuffers(#[from] queue::RequestBuffersError), |
| #[error("Error while updating CAPTURE format")] |
| UpdateCapture(#[from] UpdateCaptureError), |
| } |
| |
| impl<P, FrameDecodedCb, FormatChangedCb> DecoderThread<P, FrameDecodedCb, FormatChangedCb> |
| where |
| P: HandlesProvider, |
| FrameDecodedCb: FrameDecodedCallback<P>, |
| FormatChangedCb: FormatChangedCallback<P>, |
| for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>: |
| GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>, |
| { |
| fn new( |
| device: &Arc<Device>, |
| capture_queue: Queue<Capture, QueueInit>, |
| output_ready_cb: FrameDecodedCb, |
| set_capture_format_cb: FormatChangedCb, |
| command_receiver: mpsc::Receiver<DecoderCommand>, |
| response_sender: mpsc::Sender<CaptureThreadResponse>, |
| ) -> io::Result<Self> { |
| // Start by only listening to V4L2 events in order to catch the initial |
| // resolution change, and to the stop waker in case the user had a |
| // change of heart about decoding something now. |
| let mut poller = Poller::new(Arc::clone(device))?; |
| poller.enable_event(DeviceEvent::V4L2Event)?; |
| let command_waker = poller.add_waker(COMMAND_WAITING)?; |
| |
| let decoder_thread = DecoderThread { |
| device: Arc::clone(&device), |
| capture_queue: CaptureQueue::AwaitingResolution { capture_queue }, |
| poller, |
| stop_flag: false, |
| output_ready_cb, |
| set_capture_format_cb, |
| command_waker, |
| command_receiver, |
| response_sender, |
| }; |
| |
| Ok(decoder_thread) |
| } |
| |
| fn set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>) { |
| self.poller.set_poll_counter(poll_wakeups_counter); |
| } |
| |
| fn send_response(&self, response: CaptureThreadResponse) { |
| trace!("Sending response: {:?}", response); |
| |
| self.response_sender.send(response).unwrap(); |
| } |
| |
| fn stop(&mut self) { |
| trace!("Processing stop command"); |
| self.stop_flag = true; |
| } |
| |
| fn drain(&mut self, blocking: bool) { |
| trace!("Processing Drain({}) command", blocking); |
| let response = match &mut self.capture_queue { |
| // We cannot initiate the flush sequence before receiving the initial |
| // resolution. |
| CaptureQueue::AwaitingResolution { .. } => { |
| Some(CaptureThreadResponse::DrainDone(Err(DrainError::TryAgain))) |
| } |
| CaptureQueue::Decoding { |
| blocking_drain_in_progress, |
| .. |
| } => { |
| // We can receive the LAST buffer, send the STOP command |
| // and exit the loop once the buffer with the LAST tag is received. |
| ioctl::decoder_cmd(&*self.device, ioctl::DecoderCommand::Stop).unwrap(); |
| if blocking { |
| // If we are blocking, we will send the answer when the drain |
| // is completed. |
| *blocking_drain_in_progress = true; |
| None |
| } else { |
| // If not blocking, send the response now so the client can keep going. |
| Some(CaptureThreadResponse::DrainDone(Ok(false))) |
| } |
| } |
| }; |
| |
| if let Some(response) = response { |
| self.send_response(response); |
| } |
| } |
| |
| fn end_of_drain(&mut self) { |
| match &mut self.capture_queue { |
| CaptureQueue::AwaitingResolution { .. } => (), |
| CaptureQueue::Decoding { |
| capture_queue, |
| blocking_drain_in_progress, |
| .. |
| } => { |
| // We are supposed to be able to run the START command |
| // instead, but with vicodec the CAPTURE queue reports |
| // as ready in subsequent polls() and DQBUF returns |
| // -EPIPE... |
| capture_queue.stream_off().unwrap(); |
| capture_queue.stream_on().unwrap(); |
| if *blocking_drain_in_progress { |
| *blocking_drain_in_progress = false; |
| self.send_response(CaptureThreadResponse::DrainDone(Ok(true))); |
| } |
| } |
| } |
| } |
| |
| fn enqueue_capture_buffers(&mut self) { |
| trace!("Queueing available CAPTURE buffers"); |
| match &mut self.capture_queue { |
| // Capture queue is not set up yet, no buffers to queue. |
| CaptureQueue::AwaitingResolution { .. } => (), |
| CaptureQueue::Decoding { |
| capture_queue, |
| provider, |
| cap_buffer_waker, |
| .. |
| } => { |
| // Requeue all available CAPTURE buffers. |
| 'enqueue: while let Some(handles) = provider.get_handles(&cap_buffer_waker) { |
| // TODO potential problem: the handles will be dropped if no V4L2 buffer |
| // is available. There is no guarantee that the provider will get them back |
| // in this case (e.g. with the C FFI). |
| let buffer = match provider.get_suitable_buffer_for(&handles, capture_queue) { |
| Ok(buffer) => buffer, |
| Err(e) => { |
| error!("Could not find suitable buffer for handles: {}", e); |
| warn!("Handles potentially lost due to no V4L2 buffer being available"); |
| break 'enqueue; |
| } |
| }; |
| match buffer.queue_with_handles(handles) { |
| Ok(()) => (), |
| Err(e) => error!("Error while queueing CAPTURE buffer: {}", e), |
| } |
| } |
| } |
| } |
| } |
| |
| fn process_v4l2_event(mut self) -> Self { |
| trace!("Processing V4L2 event"); |
| match self.capture_queue { |
| CaptureQueue::AwaitingResolution { .. } => { |
| if self.is_drc_event_pending().unwrap() { |
| self = self.update_capture_format().unwrap() |
| } |
| } |
| CaptureQueue::Decoding { .. } => { |
| // TODO normally we shouldn't listen to events while in Decoding |
| // state. We do this as a workaround for virtio-video. Remove |
| // this block once the issue is fixed in that driver and replace |
| // it with unreachable!(). |
| if self.is_drc_event_pending().unwrap() { |
| warn!("Detected DRC during decode."); |
| for _ in 0..16 { |
| self.process_capture_buffer(); |
| } |
| self = self.update_capture_format().unwrap() |
| } |
| } |
| } |
| |
| self |
| } |
| |
| fn update_capture_format(mut self) -> Result<Self, UpdateCaptureError> { |
| debug!("Updating CAPTURE format"); |
| // First reset the capture queue to the `Init` state if needed. |
| let mut capture_queue = match self.capture_queue { |
| // Initial resolution |
| CaptureQueue::AwaitingResolution { capture_queue } => { |
| // Keep listening to DRC event as workaround for virtio-video. |
| /* |
| // Stop listening to V4L2 events. We will check them when we get |
| // a buffer with the LAST flag. |
| self.poller |
| .disable_event(DeviceEvent::V4L2Event) |
| .map_err(UpdateCaptureError::PollerEvents)?; |
| */ |
| // Listen to CAPTURE buffers being ready to dequeue, as we will |
| // be streaming soon. |
| self.poller |
| .enable_event(DeviceEvent::CaptureReady) |
| .map_err(UpdateCaptureError::PollerEvents)?; |
| capture_queue |
| } |
| // Dynamic resolution change |
| CaptureQueue::Decoding { capture_queue, .. } => { |
| // Remove the waker for the previous buffers pool, as we will |
| // get a new set of buffers. |
| self.poller |
| .remove_waker(CAPTURE_READY) |
| .map_err(UpdateCaptureError::RemoveWaker)?; |
| // Deallocate the queue and return it to the `Init` state. Good |
| // as new! |
| capture_queue.stream_off()?; |
| capture_queue.free_buffers()?.queue |
| } |
| }; |
| |
| // Now get the parameters of the new format and build our new CAPTURE |
| // queue. |
| |
| // TODO use the proper control to get the right value. |
| let min_num_buffers = 4usize; |
| debug!("Stream requires {} capture buffers", min_num_buffers); |
| |
| let visible_rect = capture_queue.get_selection(SelectionTarget::Compose)?; |
| debug!( |
| "Visible rectangle: ({}, {}), {}x{}", |
| visible_rect.left, visible_rect.top, visible_rect.width, visible_rect.height |
| ); |
| |
| // Let the client adjust the new format and give us the handles provider. |
| let FormatChangedReply { |
| provider, |
| mem_type, |
| num_buffers, |
| } = (self.set_capture_format_cb)( |
| capture_queue.change_format()?, |
| visible_rect, |
| min_num_buffers, |
| )?; |
| |
| debug!("Client requires {} capture buffers", num_buffers); |
| |
| // Allocate the new CAPTURE buffers and get ourselves a new waker for |
| // returning buffers. |
| let capture_queue = |
| capture_queue.request_buffers_generic::<P::HandleType>(mem_type, num_buffers as u32)?; |
| let cap_buffer_waker = self |
| .poller |
| .add_waker(CAPTURE_READY) |
| .map_err(UpdateCaptureError::AddWaker)?; |
| |
| // Ready to decode - signal the waker so we immediately enqueue buffers |
| // and start streaming. |
| cap_buffer_waker |
| .wake() |
| .map_err(UpdateCaptureError::WakeWaker)?; |
| capture_queue.stream_on()?; |
| |
| Ok(Self { |
| capture_queue: CaptureQueue::Decoding { |
| capture_queue, |
| provider, |
| cap_buffer_waker, |
| blocking_drain_in_progress: false, |
| }, |
| ..self |
| }) |
| } |
| |
| fn dequeue_capture_buffers(mut self) -> Self { |
| trace!("Dequeueing decoded CAPTURE buffers"); |
| match self.capture_queue { |
| CaptureQueue::AwaitingResolution { .. } => unreachable!(), |
| CaptureQueue::Decoding { .. } => { |
| let is_last = self.process_capture_buffer(); |
| if is_last { |
| debug!("CAPTURE buffer marked with LAST flag"); |
| if self.is_drc_event_pending().unwrap() { |
| self = self.update_capture_format().unwrap() |
| } |
| // No DRC event pending, this is the end of the stream. |
| // We need to stop and restart the CAPTURE queue, otherwise |
| // it will keep signaling buffers as ready and dequeueing |
| // them will return `EPIPE`. |
| else { |
| debug!("No DRC event pending, restarting capture queue"); |
| self.end_of_drain(); |
| } |
| } |
| } |
| } |
| self |
| } |
| |
| /// Stream the capture queue off and back on, dropping any queued buffer, |
| /// and making the decoder ready to work again if it was halted. |
| fn restart_capture_queue(&mut self) { |
| match &mut self.capture_queue { |
| CaptureQueue::AwaitingResolution { .. } => {} |
| CaptureQueue::Decoding { |
| capture_queue, |
| blocking_drain_in_progress, |
| .. |
| } => { |
| capture_queue.stream_off().unwrap(); |
| capture_queue.stream_on().unwrap(); |
| *blocking_drain_in_progress = false; |
| } |
| } |
| } |
| |
| fn flush(&mut self) { |
| trace!("Processing flush command"); |
| self.restart_capture_queue(); |
| |
| // We are flushed, let the client know. |
| self.send_response(CaptureThreadResponse::FlushDone(Ok(()))); |
| |
| self.enqueue_capture_buffers() |
| } |
| |
| /// Check if we have a dynamic resolution change event pending. |
| /// |
| /// Dequeues all pending V4L2 events and returns `true` if a |
| /// SRC_CHANGE_EVENT (indicating a format change on the CAPTURE queue) was |
| /// detected. This consumes all the event, meaning that if this method |
| /// returned `true` once it will return `false` until a new resolution |
| /// change happens in the stream. |
| fn is_drc_event_pending(&self) -> Result<bool, ioctl::DQEventError> { |
| let mut drc_pending = false; |
| |
| loop { |
| // TODO what if we used an iterator here? |
| let event = match ioctl::dqevent(&*self.device) { |
| Ok(event) => event, |
| Err(ioctl::DQEventError::NotReady) => return Ok(drc_pending), |
| Err(e) => return Err(e), |
| }; |
| |
| match event { |
| ioctl::Event::SrcChangeEvent(changes) => { |
| if changes.contains(ioctl::SrcChanges::RESOLUTION) { |
| debug!("Received resolution change event"); |
| drc_pending = true; |
| } |
| } |
| } |
| } |
| } |
| |
| /// Try to dequeue and process a single CAPTURE buffer. |
| /// |
| /// Returns `true` if the buffer had the LAST flag set, `false` otherwise. |
| fn process_capture_buffer(&mut self) -> bool { |
| if let CaptureQueue::Decoding { |
| capture_queue, |
| cap_buffer_waker, |
| .. |
| } = &mut self.capture_queue |
| { |
| match capture_queue.try_dequeue() { |
| Ok(mut cap_buf) => { |
| let is_last = cap_buf.data.flags().contains(ioctl::BufferFlags::LAST); |
| |
| // Add a drop callback to the dequeued buffer so we |
| // re-queue it as soon as it is dropped. |
| let cap_waker = Arc::clone(&cap_buffer_waker); |
| cap_buf.add_drop_callback(move |_dqbuf| { |
| // Intentionally ignore the result here. |
| let _ = cap_waker.wake(); |
| }); |
| |
| // Pass buffers to the client |
| (self.output_ready_cb)(cap_buf); |
| is_last |
| } |
| Err(e) => { |
| warn!( |
| "Expected a CAPTURE buffer but none available, possible driver bug: {}", |
| e |
| ); |
| false |
| } |
| } |
| } else { |
| // TODO replace with something more elegant. |
| panic!(); |
| } |
| } |
| |
| fn run(mut self) -> Self { |
| 'mainloop: while !self.stop_flag { |
| if let CaptureQueue::Decoding { capture_queue, .. } = &self.capture_queue { |
| match capture_queue.num_queued_buffers() { |
| // If there are no buffers on the CAPTURE queue, poll() will return |
| // immediately with EPOLLERR and we would loop indefinitely. |
| // Prevent this by temporarily disabling polling the CAPTURE queue |
| // in such cases. |
| 0 => { |
| self.poller |
| .disable_event(DeviceEvent::CaptureReady) |
| .unwrap(); |
| } |
| // If device polling was disabled and we have buffers queued, we |
| // can reenable it as poll will now wait for a CAPTURE buffer to |
| // be ready for dequeue. |
| _ => { |
| self.poller.enable_event(DeviceEvent::CaptureReady).unwrap(); |
| } |
| } |
| } |
| |
| trace!("Polling..."); |
| let events = match self.poller.poll(None) { |
| Ok(events) => events, |
| Err(e) => { |
| error!("Polling failure, exiting capture thread: {}", e); |
| break 'mainloop; |
| } |
| }; |
| for event in events { |
| self = match event { |
| PollEvent::Device(DeviceEvent::V4L2Event) => self.process_v4l2_event(), |
| PollEvent::Device(DeviceEvent::CaptureReady) => self.dequeue_capture_buffers(), |
| PollEvent::Waker(CAPTURE_READY) => { |
| self.enqueue_capture_buffers(); |
| self |
| } |
| PollEvent::Waker(COMMAND_WAITING) => { |
| loop { |
| let command = |
| match self.command_receiver.recv_timeout(Default::default()) { |
| Ok(command) => command, |
| Err(mpsc::RecvTimeoutError::Timeout) => break, |
| Err(e) => { |
| error!("Error while reading decoder command: {}", e); |
| break; |
| } |
| }; |
| match command { |
| DecoderCommand::Drain(blocking) => self.drain(blocking), |
| DecoderCommand::Flush => self.flush(), |
| DecoderCommand::Stop => self.stop(), |
| } |
| } |
| self |
| } |
| _ => panic!("Unexpected event!"), |
| } |
| } |
| } |
| |
| // Return the decoder to the awaiting resolution state. |
| match self.capture_queue { |
| CaptureQueue::AwaitingResolution { .. } => self, |
| CaptureQueue::Decoding { capture_queue, .. } => Self { |
| capture_queue: CaptureQueue::AwaitingResolution { |
| capture_queue: { |
| capture_queue.stream_off().unwrap(); |
| capture_queue.free_buffers().unwrap().queue |
| }, |
| }, |
| poller: { |
| let mut poller = self.poller; |
| poller.disable_event(DeviceEvent::CaptureReady).unwrap(); |
| poller.enable_event(DeviceEvent::V4L2Event).unwrap(); |
| poller.remove_waker(CAPTURE_READY).unwrap(); |
| poller |
| }, |
| ..self |
| }, |
| } |
| } |
| } |