| use std::future::Future; | 
 | use std::pin::Pin; | 
 | use std::sync::atomic::{AtomicUsize, Ordering}; | 
 | use std::task::{Context, Poll}; | 
 | use std::thread; | 
 | use std::time::Duration; | 
 |  | 
 | use async_task::Runnable; | 
 | use atomic_waker::AtomicWaker; | 
 | use easy_parallel::Parallel; | 
 |  | 
 | // Creates a future with event counters. | 
 | // | 
 | // Usage: `future!(f, get_waker, POLL, DROP)` | 
 | // | 
 | // The future `f` always sleeps for 200 ms and returns `Poll::Pending`. | 
 | // When it gets polled, `POLL` is incremented. | 
 | // When it gets dropped, `DROP` is incremented. | 
 | // | 
 | // Every time the future is run, it stores the waker into a global variable. | 
 | // This waker can be extracted using the `get_waker()` function. | 
 | macro_rules! future { | 
 |     ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => { | 
 |         static $poll: AtomicUsize = AtomicUsize::new(0); | 
 |         static $drop: AtomicUsize = AtomicUsize::new(0); | 
 |         static WAKER: AtomicWaker = AtomicWaker::new(); | 
 |  | 
 |         let ($name, $get_waker) = { | 
 |             struct Fut(Box<i32>); | 
 |  | 
 |             impl Future for Fut { | 
 |                 type Output = (); | 
 |  | 
 |                 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 
 |                     WAKER.register(cx.waker()); | 
 |                     $poll.fetch_add(1, Ordering::SeqCst); | 
 |                     thread::sleep(ms(400)); | 
 |                     Poll::Pending | 
 |                 } | 
 |             } | 
 |  | 
 |             impl Drop for Fut { | 
 |                 fn drop(&mut self) { | 
 |                     $drop.fetch_add(1, Ordering::SeqCst); | 
 |                 } | 
 |             } | 
 |  | 
 |             (Fut(Box::new(0)), || WAKER.take().unwrap()) | 
 |         }; | 
 |     }; | 
 | } | 
 |  | 
 | // Creates a schedule function with event counters. | 
 | // | 
 | // Usage: `schedule!(s, chan, SCHED, DROP)` | 
 | // | 
 | // The schedule function `s` pushes the task into `chan`. | 
 | // When it gets invoked, `SCHED` is incremented. | 
 | // When it gets dropped, `DROP` is incremented. | 
 | // | 
 | // Receiver `chan` extracts the task when it is scheduled. | 
 | macro_rules! schedule { | 
 |     ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { | 
 |         static $drop: AtomicUsize = AtomicUsize::new(0); | 
 |         static $sched: AtomicUsize = AtomicUsize::new(0); | 
 |  | 
 |         let ($name, $chan) = { | 
 |             let (s, r) = flume::unbounded(); | 
 |  | 
 |             struct Guard(Box<i32>); | 
 |  | 
 |             impl Drop for Guard { | 
 |                 fn drop(&mut self) { | 
 |                     $drop.fetch_add(1, Ordering::SeqCst); | 
 |                 } | 
 |             } | 
 |  | 
 |             let guard = Guard(Box::new(0)); | 
 |             let sched = move |runnable: Runnable| { | 
 |                 let _ = &guard; | 
 |                 $sched.fetch_add(1, Ordering::SeqCst); | 
 |                 s.send(runnable).unwrap(); | 
 |             }; | 
 |  | 
 |             (sched, r) | 
 |         }; | 
 |     }; | 
 | } | 
 |  | 
 | fn ms(ms: u64) -> Duration { | 
 |     Duration::from_millis(ms) | 
 | } | 
 |  | 
 | #[test] | 
 | fn wake_during_run() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, _task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     let waker = get_waker(); | 
 |     waker.wake_by_ref(); | 
 |     let runnable = chan.recv().unwrap(); | 
 |  | 
 |     Parallel::new() | 
 |         .add(|| { | 
 |             runnable.run(); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 1); | 
 |         }) | 
 |         .add(|| { | 
 |             thread::sleep(ms(200)); | 
 |  | 
 |             waker.wake_by_ref(); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 0); | 
 |  | 
 |             thread::sleep(ms(400)); | 
 |  | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 1); | 
 |         }) | 
 |         .run(); | 
 |  | 
 |     chan.recv().unwrap(); | 
 |     drop(get_waker()); | 
 | } | 
 |  | 
 | #[test] | 
 | fn cancel_during_run() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     let waker = get_waker(); | 
 |     waker.wake(); | 
 |     let runnable = chan.recv().unwrap(); | 
 |  | 
 |     Parallel::new() | 
 |         .add(|| { | 
 |             runnable.run(); | 
 |             drop(get_waker()); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(chan.len(), 0); | 
 |         }) | 
 |         .add(|| { | 
 |             thread::sleep(ms(200)); | 
 |  | 
 |             drop(task); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 0); | 
 |  | 
 |             thread::sleep(ms(400)); | 
 |  | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(chan.len(), 0); | 
 |         }) | 
 |         .run(); | 
 | } | 
 |  | 
 | #[test] | 
 | fn wake_and_cancel_during_run() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     let waker = get_waker(); | 
 |     waker.wake_by_ref(); | 
 |     let runnable = chan.recv().unwrap(); | 
 |  | 
 |     Parallel::new() | 
 |         .add(|| { | 
 |             runnable.run(); | 
 |             drop(get_waker()); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(chan.len(), 0); | 
 |         }) | 
 |         .add(|| { | 
 |             thread::sleep(ms(200)); | 
 |  | 
 |             waker.wake(); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 0); | 
 |  | 
 |             drop(task); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 0); | 
 |  | 
 |             thread::sleep(ms(400)); | 
 |  | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(chan.len(), 0); | 
 |         }) | 
 |         .run(); | 
 | } | 
 |  | 
 | #[test] | 
 | fn cancel_and_wake_during_run() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     let waker = get_waker(); | 
 |     waker.wake_by_ref(); | 
 |     let runnable = chan.recv().unwrap(); | 
 |  | 
 |     Parallel::new() | 
 |         .add(|| { | 
 |             runnable.run(); | 
 |             drop(get_waker()); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(chan.len(), 0); | 
 |         }) | 
 |         .add(|| { | 
 |             thread::sleep(ms(200)); | 
 |  | 
 |             drop(task); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 0); | 
 |  | 
 |             waker.wake(); | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |             assert_eq!(chan.len(), 0); | 
 |  | 
 |             thread::sleep(ms(400)); | 
 |  | 
 |             assert_eq!(POLL.load(Ordering::SeqCst), 2); | 
 |             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |             assert_eq!(chan.len(), 0); | 
 |         }) | 
 |         .run(); | 
 | } | 
 |  | 
 | #[test] | 
 | fn drop_last_waker() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     let waker = get_waker(); | 
 |  | 
 |     task.detach(); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(chan.len(), 0); | 
 |  | 
 |     drop(waker); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(chan.len(), 1); | 
 |  | 
 |     chan.recv().unwrap().run(); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(chan.len(), 0); | 
 | } | 
 |  | 
 | #[test] | 
 | fn cancel_last_task() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     drop(get_waker()); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(chan.len(), 0); | 
 |  | 
 |     drop(task); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(chan.len(), 1); | 
 |  | 
 |     chan.recv().unwrap().run(); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(chan.len(), 0); | 
 | } | 
 |  | 
 | #[test] | 
 | fn drop_last_task() { | 
 |     future!(f, get_waker, POLL, DROP_F); | 
 |     schedule!(s, chan, SCHEDULE, DROP_S); | 
 |     let (runnable, task) = async_task::spawn(f, s); | 
 |  | 
 |     runnable.run(); | 
 |     drop(get_waker()); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(chan.len(), 0); | 
 |  | 
 |     task.detach(); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 0); | 
 |     assert_eq!(chan.len(), 1); | 
 |  | 
 |     chan.recv().unwrap().run(); | 
 |     assert_eq!(POLL.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_F.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(DROP_S.load(Ordering::SeqCst), 1); | 
 |     assert_eq!(chan.len(), 0); | 
 | } |