decoder/stateful: add STOP waker
This allows us to stop the decoder before the initial resolution change
event has been received.
diff --git a/src/decoder/stateful.rs b/src/decoder/stateful.rs
index fe871f9..15f3fad 100644
--- a/src/decoder/stateful.rs
+++ b/src/decoder/stateful.rs
@@ -236,6 +236,8 @@
set_capture_format_cb,
)?;
+ let stop_waker = Arc::clone(&decoder_thread.stop_waker);
+
if let Some(counter) = &self.state.poll_wakeups_counter {
output_poller.set_poll_counter(Arc::clone(counter));
decoder_thread.set_poll_counter(Arc::clone(counter));
@@ -253,6 +255,7 @@
output_queue: self.state.output_queue,
input_done_cb,
output_poller,
+ stop_waker,
handle,
},
})
@@ -270,6 +273,7 @@
output_queue: Queue<Output, BuffersAllocated<OP>>,
input_done_cb: InputDoneCb,
output_poller: Poller,
+ stop_waker: Arc<Waker>,
handle: JoinHandle<DecoderThread<P, OutputReadyCb, SetCaptureFormatCb>>,
}
@@ -304,22 +308,12 @@
self.state.output_queue.get_format()
}
- pub fn stop(self) -> Result<(), ioctl::DecoderCmdError> {
- // TODO if the CAPTURE queue is not running, we cannot dequeue the
- // LAST buffer. In this case we need another way to stop the thread.
- // It's probably better to just signal the decoder thread we want to
- // stop, and let it manage the situation as it sees fit (CMD_STOP or
- // just exit).
- ioctl::decoder_cmd(&*self.device, ioctl::DecoderCommand::Stop)?;
+ pub fn stop(self) -> Result<(), io::Error> {
+ self.state.stop_waker.wake()?;
- let decoding_thread = self.state.handle.join().unwrap();
-
- match &decoding_thread.capture_queue {
- CaptureQueue::Decoding { capture_queue, .. } => {
- capture_queue.stream_off().unwrap();
- }
- _ => todo!(),
- }
+ // TODO remove this unwrap. We are throwing the decoding thread away anyway,
+ // so if the thread panicked we can just return this as our own error.
+ let _decoding_thread = self.state.handle.join().unwrap();
self.state.output_queue.stream_off().unwrap();
@@ -455,6 +449,8 @@
}
}
+// TODO use ::new functions that take the queue and configure the state properly, with
+// the poller, wakers, and all.
enum CaptureQueue<P: HandlesProvider> {
AwaitingResolution(Queue<Capture, QueueInit>),
Decoding {
@@ -472,7 +468,8 @@
device: Arc<Device>,
capture_queue: CaptureQueue<P>,
poller: Poller,
- waker: Arc<Waker>,
+ cap_buffer_waker: Arc<Waker>,
+ stop_waker: Arc<Waker>,
output_ready_cb: OutputReadyCb,
set_capture_format_cb: SetCaptureFormatCb,
}
@@ -489,6 +486,9 @@
StreamOn(#[from] ioctl::StreamOnError),
}
+const CAPTURE_READY: u32 = 0;
+const STOP_DECODING: u32 = 1;
+
#[derive(Debug, Error)]
enum ProcessEventsError {
#[error("Error while dequeueing event")]
@@ -517,13 +517,15 @@
// Start by only listening to V4L2 events in order to catch the initial
// resolution change.
poller.enable_event(DeviceEvent::V4L2Event)?;
- let waker = poller.add_waker(0)?;
+ let cap_buffer_waker = poller.add_waker(CAPTURE_READY)?;
+ let stop_waker = poller.add_waker(STOP_DECODING)?;
let decoder_thread = DecoderThread {
device: Arc::clone(&device),
capture_queue: CaptureQueue::AwaitingResolution(capture_queue),
poller,
- waker,
+ cap_buffer_waker,
+ stop_waker,
output_ready_cb,
set_capture_format_cb,
};
@@ -620,7 +622,7 @@
// Add a drop callback to the dequeued buffer so we
// re-queue it as soon as it is dropped.
- let cap_waker = Arc::clone(&self.waker);
+ let cap_waker = Arc::clone(&self.cap_buffer_waker);
cap_buf.add_drop_callback(move |_dqbuf| {
// Intentionally ignore the result here.
let _ = cap_waker.wake();
@@ -689,6 +691,12 @@
PollEvent::Device(DeviceEvent::V4L2Event) => {
self = self.process_events().unwrap()
}
+ // If we are requested to stop, then we just need to
+ // break the loop since we haven't started producing
+ // buffers.
+ PollEvent::Waker(1) => {
+ break 'polling;
+ }
_ => panic!("Unexpected event!"),
}
}
@@ -730,10 +738,17 @@
// old waker a no-op (maybe by reinitializing it to a new file?)
// before streaming the CAPTURE queue off. Maybe allocate a new Poller
// as we morph our queue type?
- PollEvent::Waker(0) => {
+ PollEvent::Waker(CAPTURE_READY) => {
// Requeue all available CAPTURE buffers.
self.enqueue_capture_buffers();
}
+ PollEvent::Waker(STOP_DECODING) => {
+ // We are already producing buffers, send the STOP command
+ // and exit the loop once the buffer with the LAST tag is received.
+ // TODO remove this unwrap.
+ ioctl::decoder_cmd(&*self.device, ioctl::DecoderCommand::Stop)
+ .unwrap();
+ }
_ => panic!("Unexpected event!"),
}
}
@@ -741,7 +756,17 @@
}
}
- self
+ // Return the decoder to the awaiting resolution state.
+ match self.capture_queue {
+ CaptureQueue::AwaitingResolution(_) => self,
+ CaptureQueue::Decoding { capture_queue, .. } => Self {
+ capture_queue: CaptureQueue::AwaitingResolution({
+ capture_queue.stream_off().unwrap();
+ capture_queue.free_buffers().unwrap().queue
+ }),
+ ..self
+ },
+ }
}
fn enqueue_capture_buffers(&mut self) {