decoder/stateful: add STOP waker

This allows us to stop the decoder before the initial resolution change
event has been received.
diff --git a/src/decoder/stateful.rs b/src/decoder/stateful.rs
index fe871f9..15f3fad 100644
--- a/src/decoder/stateful.rs
+++ b/src/decoder/stateful.rs
@@ -236,6 +236,8 @@
             set_capture_format_cb,
         )?;
 
+        let stop_waker = Arc::clone(&decoder_thread.stop_waker);
+
         if let Some(counter) = &self.state.poll_wakeups_counter {
             output_poller.set_poll_counter(Arc::clone(counter));
             decoder_thread.set_poll_counter(Arc::clone(counter));
@@ -253,6 +255,7 @@
                 output_queue: self.state.output_queue,
                 input_done_cb,
                 output_poller,
+                stop_waker,
                 handle,
             },
         })
@@ -270,6 +273,7 @@
     output_queue: Queue<Output, BuffersAllocated<OP>>,
     input_done_cb: InputDoneCb,
     output_poller: Poller,
+    stop_waker: Arc<Waker>,
 
     handle: JoinHandle<DecoderThread<P, OutputReadyCb, SetCaptureFormatCb>>,
 }
@@ -304,22 +308,12 @@
         self.state.output_queue.get_format()
     }
 
-    pub fn stop(self) -> Result<(), ioctl::DecoderCmdError> {
-        // TODO if the CAPTURE queue is not running, we cannot dequeue the
-        // LAST buffer. In this case we need another way to stop the thread.
-        // It's probably better to just signal the decoder thread we want to
-        // stop, and let it manage the situation as it sees fit (CMD_STOP or
-        // just exit).
-        ioctl::decoder_cmd(&*self.device, ioctl::DecoderCommand::Stop)?;
+    pub fn stop(self) -> Result<(), io::Error> {
+        self.state.stop_waker.wake()?;
 
-        let decoding_thread = self.state.handle.join().unwrap();
-
-        match &decoding_thread.capture_queue {
-            CaptureQueue::Decoding { capture_queue, .. } => {
-                capture_queue.stream_off().unwrap();
-            }
-            _ => todo!(),
-        }
+        // TODO remove this unwrap. We are throwing the decoding thread away anyway,
+        // so if the thread panicked we can just return this as our own error.
+        let _decoding_thread = self.state.handle.join().unwrap();
 
         self.state.output_queue.stream_off().unwrap();
 
@@ -455,6 +449,8 @@
     }
 }
 
+// TODO use ::new functions that take the queue and configure the state properly, with
+// the poller, wakers, and all.
 enum CaptureQueue<P: HandlesProvider> {
     AwaitingResolution(Queue<Capture, QueueInit>),
     Decoding {
@@ -472,7 +468,8 @@
     device: Arc<Device>,
     capture_queue: CaptureQueue<P>,
     poller: Poller,
-    waker: Arc<Waker>,
+    cap_buffer_waker: Arc<Waker>,
+    stop_waker: Arc<Waker>,
     output_ready_cb: OutputReadyCb,
     set_capture_format_cb: SetCaptureFormatCb,
 }
@@ -489,6 +486,9 @@
     StreamOn(#[from] ioctl::StreamOnError),
 }
 
+const CAPTURE_READY: u32 = 0;
+const STOP_DECODING: u32 = 1;
+
 #[derive(Debug, Error)]
 enum ProcessEventsError {
     #[error("Error while dequeueing event")]
@@ -517,13 +517,15 @@
         // Start by only listening to V4L2 events in order to catch the initial
         // resolution change.
         poller.enable_event(DeviceEvent::V4L2Event)?;
-        let waker = poller.add_waker(0)?;
+        let cap_buffer_waker = poller.add_waker(CAPTURE_READY)?;
+        let stop_waker = poller.add_waker(STOP_DECODING)?;
 
         let decoder_thread = DecoderThread {
             device: Arc::clone(&device),
             capture_queue: CaptureQueue::AwaitingResolution(capture_queue),
             poller,
-            waker,
+            cap_buffer_waker,
+            stop_waker,
             output_ready_cb,
             set_capture_format_cb,
         };
@@ -620,7 +622,7 @@
 
                     // Add a drop callback to the dequeued buffer so we
                     // re-queue it as soon as it is dropped.
-                    let cap_waker = Arc::clone(&self.waker);
+                    let cap_waker = Arc::clone(&self.cap_buffer_waker);
                     cap_buf.add_drop_callback(move |_dqbuf| {
                         // Intentionally ignore the result here.
                         let _ = cap_waker.wake();
@@ -689,6 +691,12 @@
                             PollEvent::Device(DeviceEvent::V4L2Event) => {
                                 self = self.process_events().unwrap()
                             }
+                            // If we are requested to stop, then we just need to
+                            // break the loop since we haven't started producing
+                            // buffers.
+                            PollEvent::Waker(1) => {
+                                break 'polling;
+                            }
                             _ => panic!("Unexpected event!"),
                         }
                     }
@@ -730,10 +738,17 @@
                             // old waker a no-op (maybe by reinitializing it to a new file?)
                             // before streaming the CAPTURE queue off. Maybe allocate a new Poller
                             // as we morph our queue type?
-                            PollEvent::Waker(0) => {
+                            PollEvent::Waker(CAPTURE_READY) => {
                                 // Requeue all available CAPTURE buffers.
                                 self.enqueue_capture_buffers();
                             }
+                            PollEvent::Waker(STOP_DECODING) => {
+                                // We are already producing buffers, send the STOP command
+                                // and exit the loop once the buffer with the LAST tag is received.
+                                // TODO remove this unwrap.
+                                ioctl::decoder_cmd(&*self.device, ioctl::DecoderCommand::Stop)
+                                    .unwrap();
+                            }
                             _ => panic!("Unexpected event!"),
                         }
                     }
@@ -741,7 +756,17 @@
             }
         }
 
-        self
+        // Return the decoder to the awaiting resolution state.
+        match self.capture_queue {
+            CaptureQueue::AwaitingResolution(_) => self,
+            CaptureQueue::Decoding { capture_queue, .. } => Self {
+                capture_queue: CaptureQueue::AwaitingResolution({
+                    capture_queue.stream_off().unwrap();
+                    capture_queue.free_buffers().unwrap().queue
+                }),
+                ..self
+            },
+        }
     }
 
     fn enqueue_capture_buffers(&mut self) {