| //! A level-triggered `Poller` for V4L2 devices that allows a user to be notified |
| //! when a CAPTURE or OUTPUT buffer is ready to be dequeued, or when a V4L2 |
| //! event is ready to be dequeued. |
| //! |
| //! It also provides a `Waker` companion that allows other threads to interrupt |
| //! an ongoing (or coming) poll. Useful to implement an event-based loop. |
| |
| use std::{ |
| collections::BTreeMap, |
| fs::File, |
| io::{self, Read, Write}, |
| mem, |
| os::unix::io::{AsRawFd, FromRawFd}, |
| sync::atomic::{AtomicUsize, Ordering}, |
| sync::Arc, |
| task::Wake, |
| }; |
| |
| use log::{error, warn}; |
| use nix::sys::{ |
| epoll::{self, EpollEvent, EpollFlags}, |
| eventfd::{eventfd, EfdFlags}, |
| }; |
| use thiserror::Error; |
| |
| use crate::device::Device; |
| |
| #[derive(Debug, PartialEq)] |
| pub enum DeviceEvent { |
| CaptureReady, |
| OutputReady, |
| V4L2Event, |
| } |
| |
| #[derive(Debug, PartialEq)] |
| pub enum PollEvent { |
| Device(DeviceEvent), |
| Waker(u32), |
| } |
| |
| pub struct PollEvents { |
| events: [EpollEvent; 4], |
| nb_events: usize, |
| cur_event: usize, |
| } |
| |
| impl PollEvents { |
| fn new() -> Self { |
| PollEvents { |
| // Safe because that's the rightful initial state for epoll_event. |
| events: unsafe { mem::zeroed() }, |
| nb_events: 0, |
| cur_event: 0, |
| } |
| } |
| } |
| |
| impl Iterator for PollEvents { |
| type Item = PollEvent; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| // No more slot to process, end of iterator. |
| if self.cur_event >= self.nb_events { |
| return None; |
| } |
| |
| let slot = &mut self.events[self.cur_event]; |
| match slot.data() { |
| DEVICE_ID => { |
| // Figure out which event to return next, if any for this slot. |
| if slot.events().contains(EpollFlags::EPOLLOUT) { |
| *slot = EpollEvent::new( |
| slot.events().difference(EpollFlags::EPOLLOUT), |
| slot.data(), |
| ); |
| Some(PollEvent::Device(DeviceEvent::OutputReady)) |
| } else if slot.events().contains(EpollFlags::EPOLLIN) { |
| *slot = |
| EpollEvent::new(slot.events().difference(EpollFlags::EPOLLIN), slot.data()); |
| Some(PollEvent::Device(DeviceEvent::CaptureReady)) |
| } else if slot.events().contains(EpollFlags::EPOLLPRI) { |
| *slot = EpollEvent::new( |
| slot.events().difference(EpollFlags::EPOLLPRI), |
| slot.data(), |
| ); |
| Some(PollEvent::Device(DeviceEvent::V4L2Event)) |
| } else { |
| // If no more events for this slot, try the next one. |
| self.cur_event += 1; |
| self.next() |
| } |
| } |
| waker_id @ FIRST_WAKER_ID..=LAST_WAKER_ID => { |
| self.cur_event += 1; |
| Some(PollEvent::Waker(waker_id as u32)) |
| } |
| _ => panic!("Unregistered token returned by epoll_wait!"), |
| } |
| } |
| } |
| |
| pub struct Waker { |
| fd: File, |
| } |
| |
| impl Waker { |
| fn new() -> io::Result<Self> { |
| let fd = eventfd(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?; |
| |
| Ok(Waker { |
| fd: unsafe { File::from_raw_fd(fd) }, |
| }) |
| } |
| |
| /// Users will want to use the `wake()` method on an Arc<Waker>. |
| fn wake_direct(&self) -> io::Result<()> { |
| let buf = 1u64.to_ne_bytes(); |
| // Files support concurrent access at the OS level. The implementation |
| // of Write for &File lets us call the write mutable method even on a |
| // non-mutable File instance. |
| (&self.fd).write(&buf).map(|_| ()) |
| } |
| |
| /// Perform a read on this waker in order to reset its counter to 0. This |
| /// means it will make subsequent calls to `poll()` block until `wake()` is |
| /// called again. |
| fn reset(&self) -> io::Result<()> { |
| let mut buf = 0u64.to_ne_bytes(); |
| match (&self.fd).read(&mut buf).map(|_| ()) { |
| Ok(_) => Ok(()), |
| // If the counter was already zero, it is already reset so this is |
| // not an error. |
| Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()), |
| Err(e) => Err(e), |
| } |
| } |
| } |
| |
| impl Wake for Waker { |
| fn wake(self: Arc<Self>) { |
| self.wake_direct().unwrap_or_else(|e| { |
| error!("Failed to signal Waker: {}", e); |
| }); |
| } |
| } |
| |
| pub struct Poller { |
| device: Arc<Device>, |
| wakers: BTreeMap<u32, Arc<Waker>>, |
| epoll: File, |
| |
| // Whether or not to listen to specific device events. |
| capture_enabled: bool, |
| output_enabled: bool, |
| events_enabled: bool, |
| |
| // If set, incremented every time we wake up from a poll. |
| poll_wakeups_counter: Option<Arc<AtomicUsize>>, |
| } |
| |
| /// Wakers IDs range. |
| const FIRST_WAKER_ID: u64 = 0; |
| const LAST_WAKER_ID: u64 = DEVICE_ID - 1; |
| /// Give us a comfortable range of 4 billion ids usable for wakers. |
| const DEVICE_ID: u64 = 1 << 32; |
| |
| #[derive(Debug, Error)] |
| pub enum PollError { |
| #[error("Error during call to epoll_wait: {0}")] |
| EPollWait(nix::Error), |
| #[error("Error while resetting the waker: {0}")] |
| WakerReset(io::Error), |
| #[error("V4L2 device returned EPOLLERR")] |
| V4L2Device, |
| } |
| |
| impl Poller { |
| pub fn new(device: Arc<Device>) -> nix::Result<Self> { |
| let epoll = epoll::epoll_create1(epoll::EpollCreateFlags::EPOLL_CLOEXEC) |
| .map(|fd| unsafe { File::from_raw_fd(fd) })?; |
| |
| // Register our device. |
| // There is a bug in some Linux kernels (at least 5.9 and older) where EPOLLIN |
| // and EPOLLOUT events wont be signaled to epoll if the first call to epoll did |
| // not include at least one of EPOLLIN or EPOLLOUT as desired events. |
| // Make sure we don't fall into this trap by registering EPOLLIN first and doing |
| // a dummy poll call. This call will immediately return with an error because the |
| // CAPTURE queue is not streaming, but it will set the right hooks in the kernel |
| // and we can now reconfigure our events to only include EPOLLPRI and have poll |
| // working as expected. |
| epoll::epoll_ctl( |
| epoll.as_raw_fd(), |
| epoll::EpollOp::EpollCtlAdd, |
| device.as_raw_fd(), |
| Some(&mut EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID)), |
| )?; |
| // This call should return an EPOLLERR event immediately. But it will |
| // also ensure that the CAPTURE and OUTPUT poll handlers are registered |
| // in the kernel for our device. |
| epoll::epoll_wait(epoll.as_raw_fd(), &mut [EpollEvent::empty()], 10)?; |
| // Now reset our device events. We must keep it registered for the |
| // workaround's effect to persist. |
| epoll::epoll_ctl( |
| epoll.as_raw_fd(), |
| epoll::EpollOp::EpollCtlMod, |
| device.as_raw_fd(), |
| Some(&mut EpollEvent::new(EpollFlags::empty(), DEVICE_ID)), |
| )?; |
| |
| Ok(Poller { |
| device, |
| wakers: BTreeMap::new(), |
| epoll, |
| capture_enabled: false, |
| output_enabled: false, |
| events_enabled: false, |
| poll_wakeups_counter: None, |
| }) |
| } |
| |
| /// Create a `Waker` with identifier `id` and start polling on it. Returns |
| /// the `Waker` if successful, or an error if `id` was already in use or the |
| /// waker could not be polled on. |
| pub fn add_waker(&mut self, id: u32) -> io::Result<Arc<Waker>> { |
| match self.wakers.entry(id) { |
| std::collections::btree_map::Entry::Vacant(entry) => { |
| let waker = Waker::new()?; |
| |
| epoll::epoll_ctl( |
| self.epoll.as_raw_fd(), |
| epoll::EpollOp::EpollCtlAdd, |
| waker.fd.as_raw_fd(), |
| Some(&mut EpollEvent::new( |
| EpollFlags::EPOLLIN, |
| FIRST_WAKER_ID + id as u64, |
| )), |
| )?; |
| |
| let waker = Arc::new(waker); |
| entry.insert(Arc::clone(&waker)); |
| Ok(waker) |
| } |
| std::collections::btree_map::Entry::Occupied(_) => Err(io::Error::new( |
| io::ErrorKind::AlreadyExists, |
| format!("A waker with id {} is already registered", id), |
| )), |
| } |
| } |
| |
| pub fn remove_waker(&mut self, id: u32) -> io::Result<Arc<Waker>> { |
| match self.wakers.entry(id) { |
| std::collections::btree_map::Entry::Vacant(_) => Err(io::Error::new( |
| io::ErrorKind::AlreadyExists, |
| format!("No waker with id {} in this poller", id), |
| )), |
| std::collections::btree_map::Entry::Occupied(entry) => { |
| epoll::epoll_ctl( |
| self.epoll.as_raw_fd(), |
| epoll::EpollOp::EpollCtlDel, |
| entry.get().fd.as_raw_fd(), |
| Some(&mut EpollEvent::new( |
| EpollFlags::EPOLLIN, |
| FIRST_WAKER_ID + id as u64, |
| )), |
| )?; |
| |
| Ok(entry.remove()) |
| } |
| } |
| } |
| |
| pub fn set_poll_counter(&mut self, poll_wakeup_counter: Arc<AtomicUsize>) { |
| self.poll_wakeups_counter = Some(poll_wakeup_counter); |
| } |
| |
| fn update_device_registration(&mut self) -> nix::Result<()> { |
| let mut epoll_flags = EpollFlags::empty(); |
| if self.capture_enabled { |
| epoll_flags.insert(EpollFlags::EPOLLIN); |
| } |
| if self.output_enabled { |
| epoll_flags.insert(EpollFlags::EPOLLOUT); |
| } |
| if self.events_enabled { |
| epoll_flags.insert(EpollFlags::EPOLLPRI); |
| } |
| |
| let mut epoll_event = EpollEvent::new(epoll_flags, DEVICE_ID); |
| |
| epoll::epoll_ctl( |
| self.epoll.as_raw_fd(), |
| epoll::EpollOp::EpollCtlMod, |
| self.device.as_raw_fd(), |
| Some(&mut epoll_event), |
| ) |
| .map(|_| ()) |
| } |
| |
| fn set_event(&mut self, event: DeviceEvent, enable: bool) -> nix::Result<()> { |
| let event = match event { |
| DeviceEvent::CaptureReady => &mut self.capture_enabled, |
| DeviceEvent::OutputReady => &mut self.output_enabled, |
| DeviceEvent::V4L2Event => &mut self.events_enabled, |
| }; |
| |
| // Do not alter event if it was already in the desired state. |
| if *event == enable { |
| return Ok(()); |
| } |
| |
| *event = enable; |
| self.update_device_registration() |
| } |
| |
| /// Enable listening to (and reporting) `event`. |
| pub fn enable_event(&mut self, event: DeviceEvent) -> nix::Result<()> { |
| self.set_event(event, true) |
| } |
| |
| /// Disable listening to (and reporting of) `event`. |
| pub fn disable_event(&mut self, event: DeviceEvent) -> nix::Result<()> { |
| self.set_event(event, false) |
| } |
| |
| /// Returns whether the given event is currently listened to. |
| pub fn is_event_enabled(&self, event: DeviceEvent) -> bool { |
| match event { |
| DeviceEvent::CaptureReady => self.capture_enabled, |
| DeviceEvent::OutputReady => self.output_enabled, |
| DeviceEvent::V4L2Event => self.events_enabled, |
| } |
| } |
| |
| pub fn poll(&mut self, duration: Option<std::time::Duration>) -> Result<PollEvents, PollError> { |
| let mut events = PollEvents::new(); |
| let duration: isize = match duration { |
| None => -1, |
| Some(d) => d.as_millis() as isize, |
| }; |
| |
| events.nb_events = epoll::epoll_wait(self.epoll.as_raw_fd(), &mut events.events, duration) |
| .map_err(PollError::EPollWait)?; |
| |
| // Update our wake up stats |
| if let Some(wakeup_counter) = &self.poll_wakeups_counter { |
| wakeup_counter.fetch_add(1, Ordering::SeqCst); |
| } |
| |
| // Reset all the wakers that have been signaled. |
| for event in &events.events[0..events.nb_events] { |
| if event.data() <= LAST_WAKER_ID { |
| match self.wakers.get(&(event.data() as u32)) { |
| Some(waker) => waker.reset().map_err(PollError::WakerReset)?, |
| None => warn!("Unregistered waker has been signaled."), |
| } |
| } |
| } |
| |
| for event in &events.events[0..events.nb_events] { |
| if event.data() == DEVICE_ID && event.events().contains(EpollFlags::EPOLLERR) { |
| error!("V4L2 device returned EPOLLERR!"); |
| return Err(PollError::V4L2Device); |
| } |
| } |
| |
| Ok(events) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::{DeviceEvent::*, PollEvent::*, PollEvents}; |
| use super::{DEVICE_ID, FIRST_WAKER_ID}; |
| use nix::sys::epoll::{EpollEvent, EpollFlags}; |
| |
| #[test] |
| fn test_pollevents_iterator() { |
| let mut poll_events = PollEvents::new(); |
| assert_eq!(poll_events.next(), None); |
| |
| // Single device events |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID); |
| poll_events.nb_events = 1; |
| assert_eq!(poll_events.next(), Some(Device(CaptureReady))); |
| assert_eq!(poll_events.next(), None); |
| |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLOUT, DEVICE_ID); |
| poll_events.nb_events = 1; |
| assert_eq!(poll_events.next(), Some(Device(OutputReady))); |
| assert_eq!(poll_events.next(), None); |
| |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLPRI, DEVICE_ID); |
| poll_events.nb_events = 1; |
| assert_eq!(poll_events.next(), Some(Device(V4L2Event))); |
| assert_eq!(poll_events.next(), None); |
| |
| // Multiple device events in one event |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = |
| EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLOUT, DEVICE_ID); |
| poll_events.nb_events = 1; |
| assert_eq!(poll_events.next(), Some(Device(OutputReady))); |
| assert_eq!(poll_events.next(), Some(Device(V4L2Event))); |
| assert_eq!(poll_events.next(), None); |
| |
| // Separated device events |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID); |
| poll_events.events[1] = |
| EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLOUT, DEVICE_ID); |
| poll_events.nb_events = 2; |
| assert_eq!(poll_events.next(), Some(Device(CaptureReady))); |
| assert_eq!(poll_events.next(), Some(Device(OutputReady))); |
| assert_eq!(poll_events.next(), Some(Device(V4L2Event))); |
| assert_eq!(poll_events.next(), None); |
| |
| // Single waker event |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID); |
| poll_events.nb_events = 1; |
| assert_eq!(poll_events.next(), Some(Waker(0))); |
| assert_eq!(poll_events.next(), None); |
| |
| // Multiple waker events |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 20); |
| poll_events.events[1] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 42); |
| poll_events.events[2] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID); |
| poll_events.nb_events = 3; |
| assert_eq!(poll_events.next(), Some(Waker(20))); |
| assert_eq!(poll_events.next(), Some(Waker(42))); |
| assert_eq!(poll_events.next(), Some(Waker(0))); |
| assert_eq!(poll_events.next(), None); |
| |
| // Wakers and device events |
| let mut poll_events = PollEvents::new(); |
| poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 20); |
| poll_events.events[1] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 42); |
| poll_events.events[2] = |
| EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLIN, DEVICE_ID); |
| poll_events.events[3] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID); |
| poll_events.nb_events = 4; |
| assert_eq!(poll_events.next(), Some(Waker(20))); |
| assert_eq!(poll_events.next(), Some(Waker(42))); |
| assert_eq!(poll_events.next(), Some(Device(CaptureReady))); |
| assert_eq!(poll_events.next(), Some(Device(V4L2Event))); |
| assert_eq!(poll_events.next(), Some(Waker(0))); |
| assert_eq!(poll_events.next(), None); |
| } |
| } |