blob: 5e30185b0537010a22873a56a5575ee506ac8219 [file] [log] [blame]
use crate::{
device::{
poller::{DeviceEvent, PollEvent, Poller, Waker},
queue::{
self,
direction::{Capture, Output},
dqbuf::DQBuffer,
generic::{GenericBufferHandles, GenericQBuffer},
qbuf::{
get_free::{GetFreeBufferError, GetFreeCaptureBuffer, GetFreeOutputBuffer},
CaptureQueueable, QBuffer,
},
BuffersAllocated, CreateQueueError, FormatBuilder, Queue, QueueInit,
RequestBuffersError,
},
AllocatedQueue, Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue,
},
ioctl::{self, subscribe_event, BufferCapabilities, FormatFlags, StreamOnError},
memory::{BufferHandles, HandlesProvider, PrimitiveBufferHandles},
Format,
};
use std::{
io,
path::Path,
sync::{atomic::AtomicUsize, Arc},
thread::JoinHandle,
};
use thiserror::Error;
// Trait implemented by all states of the decoder.
pub trait DecoderState {}
pub struct Decoder<S: DecoderState> {
device: Arc<Device>,
state: S,
}
pub struct AwaitingOutputFormat {
output_queue: Queue<Output, QueueInit>,
capture_queue: Queue<Capture, QueueInit>,
}
impl DecoderState for AwaitingOutputFormat {}
#[derive(Debug, Error)]
pub enum DecoderOpenError {
#[error("Error while opening device")]
DeviceOpenError(#[from] DeviceOpenError),
#[error("Error while creating queue")]
CreateQueueError(#[from] CreateQueueError),
#[error("Specified device is not a stateful decoder")]
NotAStatefulDecoder,
}
impl Decoder<AwaitingOutputFormat> {
pub fn open(path: &Path) -> Result<Self, DecoderOpenError> {
let config = DeviceConfig::new().non_blocking_dqbuf();
let device = Arc::new(Device::open(path, config)?);
// Check that the device is indeed a stateful decoder.
let capture_queue = Queue::get_capture_mplane_queue(device.clone())?;
let output_queue = Queue::get_output_mplane_queue(device.clone())?;
// On a decoder, the OUTPUT formats are compressed, but the CAPTURE ones are not.
// Return an error if our device does not satisfy these conditions.
output_queue
.format_iter()
.find(|fmt| fmt.flags.contains(FormatFlags::COMPRESSED))
.and(
capture_queue
.format_iter()
.find(|fmt| !fmt.flags.contains(FormatFlags::COMPRESSED)),
)
.ok_or(DecoderOpenError::NotAStatefulDecoder)
.map(|_| ())?;
// A stateful decoder won't expose the requests capability on the OUTPUT
// queue, a stateless one will.
if output_queue
.get_capabilities()
.contains(BufferCapabilities::SUPPORTS_REQUESTS)
{
return Err(DecoderOpenError::NotAStatefulDecoder);
}
Ok(Decoder {
device,
state: AwaitingOutputFormat {
output_queue,
capture_queue,
},
})
}
pub fn set_output_format<F>(mut self, f: F) -> anyhow::Result<Decoder<AwaitingOutputBuffers>>
where
F: FnOnce(FormatBuilder) -> anyhow::Result<()>,
{
let builder = self.state.output_queue.change_format()?;
f(builder)?;
Ok(Decoder {
device: self.device,
state: AwaitingOutputBuffers {
output_queue: self.state.output_queue,
capture_queue: self.state.capture_queue,
},
})
}
}
pub struct AwaitingOutputBuffers {
output_queue: Queue<Output, QueueInit>,
capture_queue: Queue<Capture, QueueInit>,
}
impl DecoderState for AwaitingOutputBuffers {}
impl Decoder<AwaitingOutputBuffers> {
pub fn allocate_output_buffers_generic<OP: BufferHandles>(
self,
memory_type: OP::SupportedMemoryType,
num_buffers: usize,
) -> Result<Decoder<OutputBuffersAllocated<OP>>, RequestBuffersError> {
Ok(Decoder {
device: self.device,
state: OutputBuffersAllocated {
output_queue: self
.state
.output_queue
.request_buffers_generic::<OP>(memory_type, num_buffers as u32)?,
capture_queue: self.state.capture_queue,
poll_wakeups_counter: None,
},
})
}
pub fn allocate_output_buffers<OP: PrimitiveBufferHandles>(
self,
num_output: usize,
) -> Result<Decoder<OutputBuffersAllocated<OP>>, RequestBuffersError> {
self.allocate_output_buffers_generic(OP::MEMORY_TYPE, num_output)
}
}
pub struct OutputBuffersAllocated<OP: BufferHandles> {
output_queue: Queue<Output, BuffersAllocated<OP>>,
capture_queue: Queue<Capture, QueueInit>,
poll_wakeups_counter: Option<Arc<AtomicUsize>>,
}
impl<OP: BufferHandles> DecoderState for OutputBuffersAllocated<OP> {}
#[derive(Debug, Error)]
pub enum StartDecoderError {
#[error("IO error")]
IoError(#[from] io::Error),
#[error("Cannot subscribe to decoder event")]
SubscribeEventError(#[from] ioctl::SubscribeEventError),
#[error("Error while starting the output queue")]
StreamOnError(#[from] StreamOnError),
}
pub trait InputDoneCallback<OP: BufferHandles>: Fn(DQBuffer<Output, OP>) {}
impl<OP, F> InputDoneCallback<OP> for F
where
OP: BufferHandles,
F: Fn(DQBuffer<Output, OP>),
{
}
pub trait OutputReadyCallback<P: HandlesProvider>:
FnMut(DQBuffer<Capture, P::HandleType>) + Send + 'static
{
}
impl<P, F> OutputReadyCallback<P> for F
where
P: HandlesProvider,
F: FnMut(DQBuffer<Capture, P::HandleType>) + Send + 'static,
{
}
pub struct SetCaptureFormatRet<P: HandlesProvider> {
pub provider: P,
pub mem_type: <P::HandleType as BufferHandles>::SupportedMemoryType,
pub num_buffers: usize,
}
pub trait SetCaptureFormatCallback<P: HandlesProvider>:
Fn(FormatBuilder, usize) -> anyhow::Result<SetCaptureFormatRet<P>> + Send + 'static
{
}
impl<P, F> SetCaptureFormatCallback<P> for F
where
P: HandlesProvider,
F: Fn(FormatBuilder, usize) -> anyhow::Result<SetCaptureFormatRet<P>> + Send + 'static,
{
}
impl<OP: BufferHandles> Decoder<OutputBuffersAllocated<OP>> {
pub fn set_poll_counter(mut self, poll_wakeups_counter: Arc<AtomicUsize>) -> Self {
self.state.poll_wakeups_counter = Some(poll_wakeups_counter);
self
}
#[allow(clippy::type_complexity)]
pub fn start<P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>(
self,
input_done_cb: InputDoneCb,
output_ready_cb: OutputReadyCb,
set_capture_format_cb: SetCaptureFormatCb,
) -> Result<
Decoder<Decoding<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>>,
StartDecoderError,
>
where
P: HandlesProvider,
InputDoneCb: InputDoneCallback<OP>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
GetFreeCaptureBuffer<'a, P::HandleType>,
{
// We are interested in all resolution change events.
subscribe_event(
&*self.device,
ioctl::EventType::SourceChange,
ioctl::SubscribeEventFlags::empty(),
)?;
let mut output_poller = Poller::new(Arc::clone(&self.device))?;
output_poller.enable_event(DeviceEvent::OutputReady)?;
let mut decoder_thread = DecoderThread::new(
&self.device,
self.state.capture_queue,
output_ready_cb,
set_capture_format_cb,
)?;
let stop_waker = Arc::clone(&decoder_thread.stop_waker);
if let Some(counter) = &self.state.poll_wakeups_counter {
output_poller.set_poll_counter(Arc::clone(counter));
decoder_thread.set_poll_counter(Arc::clone(counter));
}
let handle = std::thread::Builder::new()
.name("V4L2 Decoder".into())
.spawn(move || decoder_thread.run())?;
self.state.output_queue.stream_on()?;
Ok(Decoder {
device: self.device,
state: Decoding {
output_queue: self.state.output_queue,
input_done_cb,
output_poller,
stop_waker,
handle,
},
})
}
}
pub struct Decoding<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>
where
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: InputDoneCallback<OP>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
output_queue: Queue<Output, BuffersAllocated<OP>>,
input_done_cb: InputDoneCb,
output_poller: Poller,
stop_waker: Arc<Waker>,
handle: JoinHandle<DecoderThread<P, OutputReadyCb, SetCaptureFormatCb>>,
}
impl<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb> DecoderState
for Decoding<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>
where
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: InputDoneCallback<OP>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
}
#[allow(type_alias_bounds)]
type DequeueOutputBufferError<OP: BufferHandles> = ioctl::DQBufError<DQBuffer<Output, OP>>;
impl<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>
Decoder<Decoding<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>>
where
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: InputDoneCallback<OP>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
pub fn num_output_buffers(&self) -> usize {
self.state.output_queue.num_buffers()
}
pub fn get_output_format(&self) -> Result<Format, ioctl::GFmtError> {
self.state.output_queue.get_format()
}
pub fn stop(self) -> Result<(), io::Error> {
self.state.stop_waker.wake()?;
// TODO remove this unwrap. We are throwing the decoding thread away anyway,
// so if the thread panicked we can just return this as our own error.
let _decoding_thread = self.state.handle.join().unwrap();
self.state.output_queue.stream_off().unwrap();
Ok(())
}
/// Attempts to dequeue and release output buffers that the driver is done with.
fn dequeue_output_buffers(&self) -> Result<(), DequeueOutputBufferError<OP>> {
let output_queue = &self.state.output_queue;
while output_queue.num_queued_buffers() > 0 {
match output_queue.try_dequeue() {
Ok(buf) => {
// unwrap() is safe here as we just dequeued the buffer.
(self.state.input_done_cb)(buf);
}
Err(ioctl::DQBufError::NotReady) => break,
// TODO buffers with the error flag set should not result in
// a fatal error!
Err(e) => return Err(e),
}
}
Ok(())
}
// Make this thread sleep until at least one OUTPUT buffer is ready to be
// obtained through `try_get_buffer()`, dequeuing buffers if necessary.
fn wait_for_output_buffer(&mut self) -> Result<(), GetBufferError<OP>> {
for event in self.state.output_poller.poll(None)? {
match event {
PollEvent::Device(DeviceEvent::OutputReady) => {
self.dequeue_output_buffers()?;
}
_ => panic!("Unexpected return from OUTPUT queue poll!"),
}
}
Ok(())
}
}
#[derive(Debug, Error)]
pub enum GetBufferError<OP: BufferHandles> {
#[error("Error while dequeueing buffer")]
DequeueError(#[from] DequeueOutputBufferError<OP>),
#[error("Error during poll")]
PollError(#[from] io::Error),
#[error("Error while obtaining buffer")]
GetFreeBufferError(#[from] GetFreeBufferError),
}
/// Support for primitive plane handles on the OUTPUT queue.
impl<'a, OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>
GetFreeOutputBuffer<'a, OP, GetBufferError<OP>>
for Decoder<Decoding<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>>
where
OP: PrimitiveBufferHandles,
P: HandlesProvider,
InputDoneCb: InputDoneCallback<OP>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
type Queueable = QBuffer<'a, Output, OP, OP>;
/// Returns a V4L2 buffer to be filled with a frame to encode if one
/// is available.
///
/// This method will return None immediately if all the allocated buffers
/// are currently queued.
fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetBufferError<OP>> {
self.dequeue_output_buffers()?;
Ok(self.state.output_queue.try_get_free_buffer()?)
}
}
/// Support for dynamic plane handles on the OUTPUT queue.
impl<'a, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>
GetFreeOutputBuffer<'a, GenericBufferHandles, GetBufferError<GenericBufferHandles>>
for Decoder<Decoding<GenericBufferHandles, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>>
where
P: HandlesProvider,
InputDoneCb: InputDoneCallback<GenericBufferHandles>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
type Queueable = GenericQBuffer<'a, Output>;
/// Returns a V4L2 buffer to be filled with a frame to encode if one
/// is available.
///
/// This method will return None immediately if all the allocated buffers
/// are currently queued.
fn try_get_free_buffer(
&'a self,
) -> Result<Self::Queueable, GetBufferError<GenericBufferHandles>> {
self.dequeue_output_buffers()?;
Ok(self.state.output_queue.try_get_free_buffer()?)
}
}
// If `GetFreeBuffer` is implemented, we can also provide a blocking `get_buffer`
// method.
impl<'a, OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>
Decoder<Decoding<OP, P, InputDoneCb, OutputReadyCb, SetCaptureFormatCb>>
where
Self: GetFreeOutputBuffer<'a, OP, GetBufferError<OP>>,
OP: BufferHandles,
P: HandlesProvider,
InputDoneCb: InputDoneCallback<OP>,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
/// Returns a V4L2 buffer to be filled with a frame to encode, waiting for
/// one to be available if needed.
///
/// Contrary to `try_get_free_buffer(), this method will wait for a buffer
/// to be available if needed.
pub fn get_buffer(
&'a mut self,
) -> Result<
<Self as GetFreeOutputBuffer<'a, OP, GetBufferError<OP>>>::Queueable,
GetBufferError<OP>,
> {
let output_queue = &self.state.output_queue;
// If all our buffers are queued, wait until we can dequeue some.
if output_queue.num_queued_buffers() == output_queue.num_buffers() {
self.wait_for_output_buffer()?;
}
self.try_get_free_buffer()
}
}
// TODO use ::new functions that take the queue and configure the state properly, with
// the poller, wakers, and all.
enum CaptureQueue<P: HandlesProvider> {
AwaitingResolution {
capture_queue: Queue<Capture, QueueInit>,
},
Decoding {
capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
provider: P,
cap_buffer_waker: Arc<Waker>,
},
}
struct DecoderThread<P, OutputReadyCb, SetCaptureFormatCb>
where
P: HandlesProvider,
OutputReadyCb: OutputReadyCallback<P>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
device: Arc<Device>,
capture_queue: CaptureQueue<P>,
poller: Poller,
stop_waker: Arc<Waker>,
output_ready_cb: OutputReadyCb,
set_capture_format_cb: SetCaptureFormatCb,
}
#[derive(Debug, Error)]
enum UpdateCaptureError {
#[error("Error while obtaining CAPTURE format")]
GFmt(#[from] ioctl::GFmtError),
#[error("Error while setting CAPTURE format")]
SFmt(#[from] ioctl::SFmtError),
#[error("Error while requesting CAPTURE buffers")]
RequestBuffers(#[from] queue::RequestBuffersError),
#[error("Error while streaming CAPTURE queue")]
StreamOn(#[from] ioctl::StreamOnError),
}
const CAPTURE_READY: u32 = 0;
const STOP_DECODING: u32 = 1;
#[derive(Debug, Error)]
enum ProcessEventsError {
#[error("Error while dequeueing event")]
DQEvent(#[from] ioctl::DQEventError),
#[error("Error while requesting buffers")]
RequestBuffers(#[from] queue::RequestBuffersError),
#[error("Error while updating CAPTURE format")]
UpdateCapture(#[from] UpdateCaptureError),
}
impl<P, OutputReadyCb, SetCaptureFormatCb> DecoderThread<P, OutputReadyCb, SetCaptureFormatCb>
where
P: HandlesProvider,
OutputReadyCb: OutputReadyCallback<P>,
for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
GetFreeCaptureBuffer<'a, P::HandleType>,
SetCaptureFormatCb: SetCaptureFormatCallback<P>,
{
fn new(
device: &Arc<Device>,
capture_queue: Queue<Capture, QueueInit>,
output_ready_cb: OutputReadyCb,
set_capture_format_cb: SetCaptureFormatCb,
) -> io::Result<Self> {
// Start by only listening to V4L2 events in order to catch the initial
// resolution change, and to the stop waker in case the user had a
// change of heart about decoding something now.
let mut poller = Poller::new(Arc::clone(device))?;
poller.enable_event(DeviceEvent::V4L2Event)?;
let stop_waker = poller.add_waker(STOP_DECODING)?;
let decoder_thread = DecoderThread {
device: Arc::clone(&device),
capture_queue: CaptureQueue::AwaitingResolution { capture_queue },
poller,
stop_waker,
output_ready_cb,
set_capture_format_cb,
};
Ok(decoder_thread)
}
fn set_poll_counter(&mut self, poll_wakeups_counter: Arc<AtomicUsize>) {
self.poller.set_poll_counter(poll_wakeups_counter);
}
fn update_capture_resolution(mut self) -> Result<Self, UpdateCaptureError> {
let mut capture_queue = match self.capture_queue {
// Initial resolution
CaptureQueue::AwaitingResolution { capture_queue } => capture_queue,
// Dynamic resolution change
CaptureQueue::Decoding { capture_queue, .. } => {
self.poller.remove_waker(CAPTURE_READY).unwrap();
// TODO remove unwrap.
// TODO must do complete flush sequence before this...
capture_queue.stream_off().unwrap();
capture_queue.free_buffers().unwrap().queue
}
};
// TODO use the proper control to get the right value.
let min_num_buffers = 4usize;
let SetCaptureFormatRet {
provider,
mem_type,
num_buffers,
} = (self.set_capture_format_cb)(capture_queue.change_format()?, min_num_buffers).unwrap();
let capture_queue =
capture_queue.request_buffers_generic::<P::HandleType>(mem_type, num_buffers as u32)?;
println!("Allocated {} buffers", capture_queue.num_buffers());
// TODO use two closures, one to set the format, another one to decide
// the number of buffers, given the minimum number of buffers for the
// stream (need control support for that).
// Reconfigure poller to listen to capture buffers being ready.
let mut poller = self.poller;
poller.enable_event(DeviceEvent::CaptureReady).unwrap();
poller.disable_event(DeviceEvent::V4L2Event).unwrap();
let cap_buffer_waker = poller.add_waker(CAPTURE_READY).unwrap();
capture_queue.stream_on()?;
let mut new_self = Self {
capture_queue: CaptureQueue::Decoding {
capture_queue,
provider,
cap_buffer_waker,
},
poller,
..self
};
new_self.enqueue_capture_buffers();
Ok(new_self)
}
// A resolution change event will potentially morph the capture queue
// from the Init state to BuffersAllocated - thus we take full ownership
// of self and return a new object.
fn process_events(mut self) -> Result<Self, ProcessEventsError> {
loop {
// TODO what if we used an iterator here?
let event = match ioctl::dqevent(&*self.device) {
Ok(event) => event,
Err(ioctl::DQEventError::NotReady) => break,
Err(e) => return Err(e.into()),
};
match event {
ioctl::Event::SrcChangeEvent(changes) => {
if changes.contains(ioctl::SrcChanges::RESOLUTION) {
println!("Got resolution change event!");
self = self.update_capture_resolution()?;
}
}
}
}
Ok(self)
}
fn process_capture_buffer(&mut self) -> bool {
match &mut self.capture_queue {
CaptureQueue::Decoding {
capture_queue,
cap_buffer_waker,
..
} => {
if let Ok(mut cap_buf) = capture_queue.try_dequeue() {
let is_last = cap_buf.data.flags().contains(ioctl::BufferFlags::LAST);
let is_empty = cap_buf.data.get_first_plane().bytesused() == 0;
// Add a drop callback to the dequeued buffer so we
// re-queue it as soon as it is dropped.
let cap_waker = Arc::clone(&cap_buffer_waker);
cap_buf.add_drop_callback(move |_dqbuf| {
// Intentionally ignore the result here.
let _ = cap_waker.wake();
// TODO how about a way to immediately re-queue the buffer
// in the drop callback? That way we don't need to interrupt
// polling on the device.
// Actually, the buffer is back into the free list when
// we are here! So we can completely do that, provided
// we have a reference to the queue. If we use a sync::Weak
// pointer to the queue we should be able to do it. And
// when buffers are reallocated the Arc to the queue needs
// to be destroyed anyway, so the weak pointer cannot be
// upgraded!
// We already have a weak reference in the fuse, and a weak
// pointer to the device in the dqbuffer, can't we reuse that?
// What we need: a Weak reference to the queue, passed to the callback.
// Then we can call try_get_buffer() from here using the
// buffer index as argument, and requeue the buffer using
// the handles from the dqbuffer!
// Or maybe that won't work. We shouldn't be able to call streamoff while
// we hold a QBuffer, and that would allow this to happen if the destructor
// runs in another thread while we attempt to stop the queue.
// Maybe have a DQBuffer::requeue() method that requeues the
// buffer as is after removing the plane handles and data?
// TODO streamoff and try_get*buffer() should be &mut self to avoid calling
// streamoff while we hold a qbuffer? What happens if we do? -> Nothing since
// the buffer is not queued and we can queue it if the queue is streamed off!
// That's no problem at all.
// But wait - we need to change the poll state when requeuing buffers anyway,
// so we need to wake up from the poll...
});
// Empty buffers do not need to be passed to the client.
if !is_empty {
(self.output_ready_cb)(cap_buf);
}
// Last buffer of the stream? Time for us to terminate.
// TODO but not if there is a resolution change event.
// in this case we need to perform a DRC.
if is_last {
return true;
}
} else {
// TODO we should not crash here.
panic!("Expected a CAPTURE buffer but none available!");
}
}
// TODO replace with something more elegant.
_ => panic!(),
}
false
}
fn run(mut self) -> Self {
'polling: loop {
match &self.capture_queue {
CaptureQueue::AwaitingResolution { .. } => {
// Here we only check for the initial resolution change
// event.
// TODO remove this unwrap.
for event in self.poller.poll(None).unwrap() {
match event {
PollEvent::Device(DeviceEvent::V4L2Event) => {
self = self.process_events().unwrap()
}
// If we are requested to stop, then we just need to
// break the loop since we haven't started producing
// buffers.
PollEvent::Waker(1) => {
break 'polling;
}
_ => panic!("Unexpected event!"),
}
}
}
CaptureQueue::Decoding { capture_queue, .. } => {
// Here we process buffers as usual while looking for the
// LAST buffer and checking if we need to res change (and
// set the boolean if we do.
match capture_queue.num_queued_buffers() {
// If there are no buffers on the CAPTURE queue, poll() will return
// immediately with EPOLLERR and we would loop indefinitely.
// Prevent this by temporarily disabling polling the CAPTURE queue
// in such cases.
0 => {
self.poller
.disable_event(DeviceEvent::CaptureReady)
.unwrap();
}
// If device polling was disabled and we have buffers queued, we
// can reenable it as poll will now wait for a CAPTURE buffer to
// be ready for dequeue.
_ => {
self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
}
}
// TODO remove this unwrap.
for event in self.poller.poll(None).unwrap() {
match event {
PollEvent::Device(DeviceEvent::CaptureReady) => {
let do_exit = self.process_capture_buffer();
if do_exit {
break 'polling;
}
}
// TODO when doing DRC, it can happen that buffers from the previous
// resolution are released and trigger this. We need to make the
// old waker a no-op (maybe by reinitializing it to a new file?)
// before streaming the CAPTURE queue off. Maybe allocate a new Poller
// as we morph our queue type?
PollEvent::Waker(CAPTURE_READY) => {
// Requeue all available CAPTURE buffers.
self.enqueue_capture_buffers();
}
PollEvent::Waker(STOP_DECODING) => {
// We are already producing buffers, send the STOP command
// and exit the loop once the buffer with the LAST tag is received.
// TODO remove this unwrap.
ioctl::decoder_cmd(&*self.device, ioctl::DecoderCommand::Stop)
.unwrap();
}
_ => panic!("Unexpected event!"),
}
}
}
}
}
// Return the decoder to the awaiting resolution state.
match self.capture_queue {
CaptureQueue::AwaitingResolution { .. } => self,
CaptureQueue::Decoding { capture_queue, .. } => Self {
capture_queue: CaptureQueue::AwaitingResolution {
capture_queue: {
capture_queue.stream_off().unwrap();
capture_queue.free_buffers().unwrap().queue
},
},
poller: {
let mut poller = self.poller;
poller.disable_event(DeviceEvent::CaptureReady).unwrap();
poller.enable_event(DeviceEvent::V4L2Event).unwrap();
poller.remove_waker(CAPTURE_READY).unwrap();
poller
},
..self
},
}
}
fn enqueue_capture_buffers(&mut self) {
if let CaptureQueue::Decoding {
capture_queue,
provider,
..
} = &mut self.capture_queue
{
'enqueue: while let Ok(buffer) = capture_queue.try_get_free_buffer() {
if let Some(handles) = provider.get_handles() {
buffer.queue_with_handles(handles).unwrap();
} else {
break 'enqueue;
}
}
}
}
}