| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "full")] |
| |
| use tokio::runtime::Runtime; |
| use tokio::sync::oneshot; |
| use tokio::time::{timeout, Duration}; |
| use tokio_test::{assert_err, assert_ok}; |
| |
| use std::future::Future; |
| use std::pin::Pin; |
| use std::sync::atomic::{AtomicBool, Ordering}; |
| use std::task::{Context, Poll}; |
| use std::thread; |
| |
| mod support { |
| pub(crate) mod mpsc_stream; |
| } |
| |
| #[test] |
| fn spawned_task_does_not_progress_without_block_on() { |
| let (tx, mut rx) = oneshot::channel(); |
| |
| let rt = rt(); |
| |
| rt.spawn(async move { |
| assert_ok!(tx.send("hello")); |
| }); |
| |
| thread::sleep(Duration::from_millis(50)); |
| |
| assert_err!(rx.try_recv()); |
| |
| let out = rt.block_on(async { assert_ok!(rx.await) }); |
| |
| assert_eq!(out, "hello"); |
| } |
| |
| #[test] |
| fn no_extra_poll() { |
| use pin_project_lite::pin_project; |
| use std::pin::Pin; |
| use std::sync::{ |
| atomic::{AtomicUsize, Ordering::SeqCst}, |
| Arc, |
| }; |
| use std::task::{Context, Poll}; |
| use tokio_stream::{Stream, StreamExt}; |
| |
| pin_project! { |
| struct TrackPolls<S> { |
| npolls: Arc<AtomicUsize>, |
| #[pin] |
| s: S, |
| } |
| } |
| |
| impl<S> Stream for TrackPolls<S> |
| where |
| S: Stream, |
| { |
| type Item = S::Item; |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| this.npolls.fetch_add(1, SeqCst); |
| this.s.poll_next(cx) |
| } |
| } |
| |
| let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>(); |
| let rx = TrackPolls { |
| npolls: Arc::new(AtomicUsize::new(0)), |
| s: rx, |
| }; |
| let npolls = Arc::clone(&rx.npolls); |
| |
| let rt = rt(); |
| |
| // TODO: could probably avoid this, but why not. |
| let mut rx = Box::pin(rx); |
| |
| rt.spawn(async move { while rx.next().await.is_some() {} }); |
| rt.block_on(async { |
| tokio::task::yield_now().await; |
| }); |
| |
| // should have been polled exactly once: the initial poll |
| assert_eq!(npolls.load(SeqCst), 1); |
| |
| tx.send(()).unwrap(); |
| rt.block_on(async { |
| tokio::task::yield_now().await; |
| }); |
| |
| // should have been polled twice more: once to yield Some(), then once to yield Pending |
| assert_eq!(npolls.load(SeqCst), 1 + 2); |
| |
| drop(tx); |
| rt.block_on(async { |
| tokio::task::yield_now().await; |
| }); |
| |
| // should have been polled once more: to yield None |
| assert_eq!(npolls.load(SeqCst), 1 + 2 + 1); |
| } |
| |
| #[test] |
| fn acquire_mutex_in_drop() { |
| use futures::future::pending; |
| use tokio::task; |
| |
| let (tx1, rx1) = oneshot::channel(); |
| let (tx2, rx2) = oneshot::channel(); |
| |
| let rt = rt(); |
| |
| rt.spawn(async move { |
| let _ = rx2.await; |
| unreachable!(); |
| }); |
| |
| rt.spawn(async move { |
| let _ = rx1.await; |
| tx2.send(()).unwrap(); |
| unreachable!(); |
| }); |
| |
| // Spawn a task that will never notify |
| rt.spawn(async move { |
| pending::<()>().await; |
| tx1.send(()).unwrap(); |
| }); |
| |
| // Tick the loop |
| rt.block_on(async { |
| task::yield_now().await; |
| }); |
| |
| // Drop the rt |
| drop(rt); |
| } |
| |
| #[test] |
| fn drop_tasks_in_context() { |
| static SUCCESS: AtomicBool = AtomicBool::new(false); |
| |
| struct ContextOnDrop; |
| |
| impl Future for ContextOnDrop { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
| Poll::Pending |
| } |
| } |
| |
| impl Drop for ContextOnDrop { |
| fn drop(&mut self) { |
| if tokio::runtime::Handle::try_current().is_ok() { |
| SUCCESS.store(true, Ordering::SeqCst); |
| } |
| } |
| } |
| |
| let rt = rt(); |
| rt.spawn(ContextOnDrop); |
| drop(rt); |
| |
| assert!(SUCCESS.load(Ordering::SeqCst)); |
| } |
| |
| #[test] |
| #[should_panic( |
| expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." |
| )] |
| fn timeout_panics_when_no_time_handle() { |
| let rt = tokio::runtime::Builder::new_current_thread() |
| .build() |
| .unwrap(); |
| rt.block_on(async { |
| let (_tx, rx) = oneshot::channel::<()>(); |
| let dur = Duration::from_millis(20); |
| let _ = timeout(dur, rx).await; |
| }); |
| } |
| |
| fn rt() -> Runtime { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build() |
| .unwrap() |
| } |