| #![allow(clippy::unit_arg)] |
| |
| use crate::signal::os::{OsExtraData, OsStorage}; |
| use crate::sync::watch; |
| use crate::util::once_cell::OnceCell; |
| |
| use std::ops; |
| use std::sync::atomic::{AtomicBool, Ordering}; |
| |
| pub(crate) type EventId = usize; |
| |
| /// State for a specific event, whether a notification is pending delivery, |
| /// and what listeners are registered. |
| #[derive(Debug)] |
| pub(crate) struct EventInfo { |
| pending: AtomicBool, |
| tx: watch::Sender<()>, |
| } |
| |
| impl Default for EventInfo { |
| fn default() -> Self { |
| let (tx, _rx) = watch::channel(()); |
| |
| Self { |
| pending: AtomicBool::new(false), |
| tx, |
| } |
| } |
| } |
| |
| /// An interface for retrieving the `EventInfo` for a particular eventId. |
| pub(crate) trait Storage { |
| /// Gets the `EventInfo` for `id` if it exists. |
| fn event_info(&self, id: EventId) -> Option<&EventInfo>; |
| |
| /// Invokes `f` once for each defined `EventInfo` in this storage. |
| fn for_each<'a, F>(&'a self, f: F) |
| where |
| F: FnMut(&'a EventInfo); |
| } |
| |
| impl Storage for Vec<EventInfo> { |
| fn event_info(&self, id: EventId) -> Option<&EventInfo> { |
| self.get(id) |
| } |
| |
| fn for_each<'a, F>(&'a self, f: F) |
| where |
| F: FnMut(&'a EventInfo), |
| { |
| self.iter().for_each(f) |
| } |
| } |
| |
| /// An interface for initializing a type. Useful for situations where we cannot |
| /// inject a configured instance in the constructor of another type. |
| pub(crate) trait Init { |
| fn init() -> Self; |
| } |
| |
| /// Manages and distributes event notifications to any registered listeners. |
| /// |
| /// Generic over the underlying storage to allow for domain specific |
| /// optimizations (e.g. eventIds may or may not be contiguous). |
| #[derive(Debug)] |
| pub(crate) struct Registry<S> { |
| storage: S, |
| } |
| |
| impl<S> Registry<S> { |
| fn new(storage: S) -> Self { |
| Self { storage } |
| } |
| } |
| |
| impl<S: Storage> Registry<S> { |
| /// Registers a new listener for `event_id`. |
| fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
| self.storage |
| .event_info(event_id) |
| .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) |
| .tx |
| .subscribe() |
| } |
| |
| /// Marks `event_id` as having been delivered, without broadcasting it to |
| /// any listeners. |
| fn record_event(&self, event_id: EventId) { |
| if let Some(event_info) = self.storage.event_info(event_id) { |
| event_info.pending.store(true, Ordering::SeqCst) |
| } |
| } |
| |
| /// Broadcasts all previously recorded events to their respective listeners. |
| /// |
| /// Returns `true` if an event was delivered to at least one listener. |
| fn broadcast(&self) -> bool { |
| let mut did_notify = false; |
| self.storage.for_each(|event_info| { |
| // Any signal of this kind arrived since we checked last? |
| if !event_info.pending.swap(false, Ordering::SeqCst) { |
| return; |
| } |
| |
| // Ignore errors if there are no listeners |
| if event_info.tx.send(()).is_ok() { |
| did_notify = true; |
| } |
| }); |
| |
| did_notify |
| } |
| } |
| |
| pub(crate) struct Globals { |
| extra: OsExtraData, |
| registry: Registry<OsStorage>, |
| } |
| |
| impl ops::Deref for Globals { |
| type Target = OsExtraData; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.extra |
| } |
| } |
| |
| impl Globals { |
| /// Registers a new listener for `event_id`. |
| pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
| self.registry.register_listener(event_id) |
| } |
| |
| /// Marks `event_id` as having been delivered, without broadcasting it to |
| /// any listeners. |
| pub(crate) fn record_event(&self, event_id: EventId) { |
| self.registry.record_event(event_id); |
| } |
| |
| /// Broadcasts all previously recorded events to their respective listeners. |
| /// |
| /// Returns `true` if an event was delivered to at least one listener. |
| pub(crate) fn broadcast(&self) -> bool { |
| self.registry.broadcast() |
| } |
| |
| #[cfg(unix)] |
| pub(crate) fn storage(&self) -> &OsStorage { |
| &self.registry.storage |
| } |
| } |
| |
| fn globals_init() -> Globals |
| where |
| OsExtraData: 'static + Send + Sync + Init, |
| OsStorage: 'static + Send + Sync + Init, |
| { |
| Globals { |
| extra: OsExtraData::init(), |
| registry: Registry::new(OsStorage::init()), |
| } |
| } |
| |
| pub(crate) fn globals() -> &'static Globals |
| where |
| OsExtraData: 'static + Send + Sync + Init, |
| OsStorage: 'static + Send + Sync + Init, |
| { |
| static GLOBALS: OnceCell<Globals> = OnceCell::new(); |
| |
| GLOBALS.get(globals_init) |
| } |
| |
| #[cfg(all(test, not(loom)))] |
| mod tests { |
| use super::*; |
| use crate::runtime::{self, Runtime}; |
| use crate::sync::{oneshot, watch}; |
| |
| use futures::future; |
| |
| #[test] |
| fn smoke() { |
| let rt = rt(); |
| rt.block_on(async move { |
| let registry = Registry::new(vec![ |
| EventInfo::default(), |
| EventInfo::default(), |
| EventInfo::default(), |
| ]); |
| |
| let first = registry.register_listener(0); |
| let second = registry.register_listener(1); |
| let third = registry.register_listener(2); |
| |
| let (fire, wait) = oneshot::channel(); |
| |
| crate::spawn(async { |
| wait.await.expect("wait failed"); |
| |
| // Record some events which should get coalesced |
| registry.record_event(0); |
| registry.record_event(0); |
| registry.record_event(1); |
| registry.record_event(1); |
| registry.broadcast(); |
| |
| // Yield so the previous broadcast can get received |
| // |
| // This yields many times since the block_on task is only polled every 61 |
| // ticks. |
| for _ in 0..100 { |
| crate::task::yield_now().await; |
| } |
| |
| // Send subsequent signal |
| registry.record_event(0); |
| registry.broadcast(); |
| |
| drop(registry); |
| }); |
| |
| let _ = fire.send(()); |
| let all = future::join3(collect(first), collect(second), collect(third)); |
| |
| let (first_results, second_results, third_results) = all.await; |
| assert_eq!(2, first_results.len()); |
| assert_eq!(1, second_results.len()); |
| assert_eq!(0, third_results.len()); |
| }); |
| } |
| |
| #[test] |
| #[should_panic = "invalid event_id: 1"] |
| fn register_panics_on_invalid_input() { |
| let registry = Registry::new(vec![EventInfo::default()]); |
| |
| registry.register_listener(1); |
| } |
| |
| #[test] |
| fn record_invalid_event_does_nothing() { |
| let registry = Registry::new(vec![EventInfo::default()]); |
| registry.record_event(1302); |
| } |
| |
| #[test] |
| fn broadcast_returns_if_at_least_one_event_fired() { |
| let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); |
| |
| registry.record_event(0); |
| assert!(!registry.broadcast()); |
| |
| let first = registry.register_listener(0); |
| let second = registry.register_listener(1); |
| |
| registry.record_event(0); |
| assert!(registry.broadcast()); |
| |
| drop(first); |
| registry.record_event(0); |
| assert!(!registry.broadcast()); |
| |
| drop(second); |
| } |
| |
| fn rt() -> Runtime { |
| runtime::Builder::new_current_thread() |
| .enable_time() |
| .build() |
| .unwrap() |
| } |
| |
| async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { |
| let mut ret = vec![]; |
| |
| while let Ok(v) = rx.changed().await { |
| ret.push(v); |
| } |
| |
| ret |
| } |
| } |