| #![allow(clippy::needless_range_loop)] |
| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "full")] |
| |
| // Tests to run on both current-thread & multi-thread runtime variants. |
| |
| macro_rules! rt_test { |
| ($($t:tt)*) => { |
| mod current_thread_scheduler { |
| $($t)* |
| |
| #[cfg(not(target_os="wasi"))] |
| const NUM_WORKERS: usize = 1; |
| |
| fn rt() -> Arc<Runtime> { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build() |
| .unwrap() |
| .into() |
| } |
| } |
| |
| #[cfg(not(tokio_wasi))] // Wasi doesn't support threads |
| mod threaded_scheduler_4_threads { |
| $($t)* |
| |
| const NUM_WORKERS: usize = 4; |
| |
| fn rt() -> Arc<Runtime> { |
| tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(4) |
| .enable_all() |
| .build() |
| .unwrap() |
| .into() |
| } |
| } |
| |
| #[cfg(not(tokio_wasi))] // Wasi doesn't support threads |
| mod threaded_scheduler_1_thread { |
| $($t)* |
| |
| const NUM_WORKERS: usize = 1; |
| |
| fn rt() -> Arc<Runtime> { |
| tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(1) |
| .enable_all() |
| .build() |
| .unwrap() |
| .into() |
| } |
| } |
| } |
| } |
| |
| #[test] |
| fn send_sync_bound() { |
| use tokio::runtime::Runtime; |
| fn is_send<T: Send + Sync>() {} |
| |
| is_send::<Runtime>(); |
| } |
| |
| rt_test! { |
| #[cfg(not(target_os="wasi"))] |
| use tokio::net::{TcpListener, TcpStream}; |
| #[cfg(not(target_os="wasi"))] |
| use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
| |
| use tokio::runtime::Runtime; |
| use tokio::sync::oneshot; |
| use tokio::{task, time}; |
| |
| #[cfg(not(target_os="wasi"))] |
| use tokio_test::assert_err; |
| use tokio_test::assert_ok; |
| |
| use futures::future::poll_fn; |
| use std::future::Future; |
| use std::pin::Pin; |
| |
| #[cfg(not(target_os="wasi"))] |
| use std::sync::mpsc; |
| |
| use std::sync::Arc; |
| use std::task::{Context, Poll}; |
| |
| #[cfg(not(target_os="wasi"))] |
| use std::thread; |
| use std::time::{Duration, Instant}; |
| |
| #[test] |
| fn block_on_sync() { |
| let rt = rt(); |
| |
| let mut win = false; |
| rt.block_on(async { |
| win = true; |
| }); |
| |
| assert!(win); |
| } |
| |
| |
| #[cfg(not(target_os="wasi"))] |
| #[test] |
| fn block_on_async() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| let (tx, rx) = oneshot::channel(); |
| |
| thread::spawn(move || { |
| thread::sleep(Duration::from_millis(50)); |
| tx.send("ZOMG").unwrap(); |
| }); |
| |
| assert_ok!(rx.await) |
| }); |
| |
| assert_eq!(out, "ZOMG"); |
| } |
| |
| #[test] |
| fn spawn_one_bg() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| let (tx, rx) = oneshot::channel(); |
| |
| tokio::spawn(async move { |
| tx.send("ZOMG").unwrap(); |
| }); |
| |
| assert_ok!(rx.await) |
| }); |
| |
| assert_eq!(out, "ZOMG"); |
| } |
| |
| #[test] |
| fn spawn_one_join() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| let (tx, rx) = oneshot::channel(); |
| |
| let handle = tokio::spawn(async move { |
| tx.send("ZOMG").unwrap(); |
| "DONE" |
| }); |
| |
| let msg = assert_ok!(rx.await); |
| |
| let out = assert_ok!(handle.await); |
| assert_eq!(out, "DONE"); |
| |
| msg |
| }); |
| |
| assert_eq!(out, "ZOMG"); |
| } |
| |
| #[test] |
| fn spawn_two() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| let (tx1, rx1) = oneshot::channel(); |
| let (tx2, rx2) = oneshot::channel(); |
| |
| tokio::spawn(async move { |
| assert_ok!(tx1.send("ZOMG")); |
| }); |
| |
| tokio::spawn(async move { |
| let msg = assert_ok!(rx1.await); |
| assert_ok!(tx2.send(msg)); |
| }); |
| |
| assert_ok!(rx2.await) |
| }); |
| |
| assert_eq!(out, "ZOMG"); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn spawn_many_from_block_on() { |
| use tokio::sync::mpsc; |
| |
| const ITER: usize = 200; |
| |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| let (done_tx, mut done_rx) = mpsc::unbounded_channel(); |
| |
| let mut txs = (0..ITER) |
| .map(|i| { |
| let (tx, rx) = oneshot::channel(); |
| let done_tx = done_tx.clone(); |
| |
| tokio::spawn(async move { |
| let msg = assert_ok!(rx.await); |
| assert_eq!(i, msg); |
| assert_ok!(done_tx.send(msg)); |
| }); |
| |
| tx |
| }) |
| .collect::<Vec<_>>(); |
| |
| drop(done_tx); |
| |
| thread::spawn(move || { |
| for (i, tx) in txs.drain(..).enumerate() { |
| assert_ok!(tx.send(i)); |
| } |
| }); |
| |
| let mut out = vec![]; |
| while let Some(i) = done_rx.recv().await { |
| out.push(i); |
| } |
| |
| out.sort_unstable(); |
| out |
| }); |
| |
| assert_eq!(ITER, out.len()); |
| |
| for i in 0..ITER { |
| assert_eq!(i, out[i]); |
| } |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn spawn_many_from_task() { |
| use tokio::sync::mpsc; |
| |
| const ITER: usize = 500; |
| |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| tokio::spawn(async move { |
| let (done_tx, mut done_rx) = mpsc::unbounded_channel(); |
| |
| let mut txs = (0..ITER) |
| .map(|i| { |
| let (tx, rx) = oneshot::channel(); |
| let done_tx = done_tx.clone(); |
| |
| tokio::spawn(async move { |
| let msg = assert_ok!(rx.await); |
| assert_eq!(i, msg); |
| assert_ok!(done_tx.send(msg)); |
| }); |
| |
| tx |
| }) |
| .collect::<Vec<_>>(); |
| |
| drop(done_tx); |
| |
| thread::spawn(move || { |
| for (i, tx) in txs.drain(..).enumerate() { |
| assert_ok!(tx.send(i)); |
| } |
| }); |
| |
| let mut out = vec![]; |
| while let Some(i) = done_rx.recv().await { |
| out.push(i); |
| } |
| |
| out.sort_unstable(); |
| out |
| }).await.unwrap() |
| }); |
| |
| assert_eq!(ITER, out.len()); |
| |
| for i in 0..ITER { |
| assert_eq!(i, out[i]); |
| } |
| } |
| |
| #[test] |
| fn spawn_one_from_block_on_called_on_handle() { |
| let rt = rt(); |
| let (tx, rx) = oneshot::channel(); |
| |
| #[allow(clippy::async_yields_async)] |
| let handle = rt.handle().block_on(async { |
| tokio::spawn(async move { |
| tx.send("ZOMG").unwrap(); |
| "DONE" |
| }) |
| }); |
| |
| let out = rt.block_on(async { |
| let msg = assert_ok!(rx.await); |
| |
| let out = assert_ok!(handle.await); |
| assert_eq!(out, "DONE"); |
| |
| msg |
| }); |
| |
| assert_eq!(out, "ZOMG"); |
| } |
| |
| #[test] |
| fn spawn_await_chain() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async { |
| assert_ok!(tokio::spawn(async { |
| assert_ok!(tokio::spawn(async { |
| "hello" |
| }).await) |
| }).await) |
| }); |
| |
| assert_eq!(out, "hello"); |
| } |
| |
| #[test] |
| fn outstanding_tasks_dropped() { |
| let rt = rt(); |
| |
| let cnt = Arc::new(()); |
| |
| rt.block_on(async { |
| let cnt = cnt.clone(); |
| |
| tokio::spawn(poll_fn(move |_| { |
| assert_eq!(2, Arc::strong_count(&cnt)); |
| Poll::<()>::Pending |
| })); |
| }); |
| |
| assert_eq!(2, Arc::strong_count(&cnt)); |
| |
| drop(rt); |
| |
| assert_eq!(1, Arc::strong_count(&cnt)); |
| } |
| |
| #[test] |
| #[should_panic] |
| fn nested_rt() { |
| let rt1 = rt(); |
| let rt2 = rt(); |
| |
| rt1.block_on(async { rt2.block_on(async { "hello" }) }); |
| } |
| |
| #[test] |
| fn create_rt_in_block_on() { |
| let rt1 = rt(); |
| let rt2 = rt1.block_on(async { rt() }); |
| let out = rt2.block_on(async { "ZOMG" }); |
| |
| assert_eq!(out, "ZOMG"); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn complete_block_on_under_load() { |
| let rt = rt(); |
| |
| rt.block_on(async { |
| let (tx, rx) = oneshot::channel(); |
| |
| // Spin hard |
| tokio::spawn(async { |
| loop { |
| yield_once().await; |
| } |
| }); |
| |
| thread::spawn(move || { |
| thread::sleep(Duration::from_millis(50)); |
| assert_ok!(tx.send(())); |
| }); |
| |
| assert_ok!(rx.await); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn complete_task_under_load() { |
| let rt = rt(); |
| |
| rt.block_on(async { |
| let (tx1, rx1) = oneshot::channel(); |
| let (tx2, rx2) = oneshot::channel(); |
| |
| // Spin hard |
| tokio::spawn(async { |
| loop { |
| yield_once().await; |
| } |
| }); |
| |
| thread::spawn(move || { |
| thread::sleep(Duration::from_millis(50)); |
| assert_ok!(tx1.send(())); |
| }); |
| |
| tokio::spawn(async move { |
| assert_ok!(rx1.await); |
| assert_ok!(tx2.send(())); |
| }); |
| |
| assert_ok!(rx2.await); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn spawn_from_other_thread_idle() { |
| let rt = rt(); |
| let handle = rt.clone(); |
| |
| let (tx, rx) = oneshot::channel(); |
| |
| thread::spawn(move || { |
| thread::sleep(Duration::from_millis(50)); |
| |
| handle.spawn(async move { |
| assert_ok!(tx.send(())); |
| }); |
| }); |
| |
| rt.block_on(async move { |
| assert_ok!(rx.await); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn spawn_from_other_thread_under_load() { |
| let rt = rt(); |
| let handle = rt.clone(); |
| |
| let (tx, rx) = oneshot::channel(); |
| |
| thread::spawn(move || { |
| handle.spawn(async move { |
| assert_ok!(tx.send(())); |
| }); |
| }); |
| |
| rt.block_on(async move { |
| // Spin hard |
| tokio::spawn(async { |
| loop { |
| yield_once().await; |
| } |
| }); |
| |
| assert_ok!(rx.await); |
| }); |
| } |
| |
| #[test] |
| fn sleep_at_root() { |
| let rt = rt(); |
| |
| let now = Instant::now(); |
| let dur = Duration::from_millis(50); |
| |
| rt.block_on(async move { |
| time::sleep(dur).await; |
| }); |
| |
| assert!(now.elapsed() >= dur); |
| } |
| |
| #[test] |
| fn sleep_in_spawn() { |
| let rt = rt(); |
| |
| let now = Instant::now(); |
| let dur = Duration::from_millis(50); |
| |
| rt.block_on(async move { |
| let (tx, rx) = oneshot::channel(); |
| |
| tokio::spawn(async move { |
| time::sleep(dur).await; |
| assert_ok!(tx.send(())); |
| }); |
| |
| assert_ok!(rx.await); |
| }); |
| |
| assert!(now.elapsed() >= dur); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support bind |
| #[test] |
| fn block_on_socket() { |
| let rt = rt(); |
| |
| rt.block_on(async move { |
| let (tx, rx) = oneshot::channel(); |
| |
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |
| let addr = listener.local_addr().unwrap(); |
| |
| tokio::spawn(async move { |
| let _ = listener.accept().await; |
| tx.send(()).unwrap(); |
| }); |
| |
| TcpStream::connect(&addr).await.unwrap(); |
| rx.await.unwrap(); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn spawn_from_blocking() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async move { |
| let inner = assert_ok!(tokio::task::spawn_blocking(|| { |
| tokio::spawn(async move { "hello" }) |
| }).await); |
| |
| assert_ok!(inner.await) |
| }); |
| |
| assert_eq!(out, "hello") |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn spawn_blocking_from_blocking() { |
| let rt = rt(); |
| |
| let out = rt.block_on(async move { |
| let inner = assert_ok!(tokio::task::spawn_blocking(|| { |
| tokio::task::spawn_blocking(|| "hello") |
| }).await); |
| |
| assert_ok!(inner.await) |
| }); |
| |
| assert_eq!(out, "hello") |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn sleep_from_blocking() { |
| let rt = rt(); |
| |
| rt.block_on(async move { |
| assert_ok!(tokio::task::spawn_blocking(|| { |
| let now = std::time::Instant::now(); |
| let dur = Duration::from_millis(1); |
| |
| // use the futures' block_on fn to make sure we aren't setting |
| // any Tokio context |
| futures::executor::block_on(async { |
| tokio::time::sleep(dur).await; |
| }); |
| |
| assert!(now.elapsed() >= dur); |
| }).await); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support bind |
| #[test] |
| fn socket_from_blocking() { |
| let rt = rt(); |
| |
| rt.block_on(async move { |
| let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); |
| let addr = assert_ok!(listener.local_addr()); |
| |
| let peer = tokio::task::spawn_blocking(move || { |
| // use the futures' block_on fn to make sure we aren't setting |
| // any Tokio context |
| futures::executor::block_on(async { |
| assert_ok!(TcpStream::connect(addr).await); |
| }); |
| }); |
| |
| // Wait for the client to connect |
| let _ = assert_ok!(listener.accept().await); |
| |
| assert_ok!(peer.await); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn always_active_parker() { |
| // This test it to show that we will always have |
| // an active parker even if we call block_on concurrently |
| |
| let rt = rt(); |
| let rt2 = rt.clone(); |
| |
| let (tx1, rx1) = oneshot::channel(); |
| let (tx2, rx2) = oneshot::channel(); |
| |
| let jh1 = thread::spawn(move || { |
| rt.block_on(async move { |
| rx2.await.unwrap(); |
| time::sleep(Duration::from_millis(5)).await; |
| tx1.send(()).unwrap(); |
| }); |
| }); |
| |
| let jh2 = thread::spawn(move || { |
| rt2.block_on(async move { |
| tx2.send(()).unwrap(); |
| time::sleep(Duration::from_millis(5)).await; |
| rx1.await.unwrap(); |
| time::sleep(Duration::from_millis(5)).await; |
| }); |
| }); |
| |
| jh1.join().unwrap(); |
| jh2.join().unwrap(); |
| } |
| |
| #[test] |
| // IOCP requires setting the "max thread" concurrency value. The sane, |
| // default, is to set this to the number of cores. Threads that poll I/O |
| // become associated with the IOCP handle. Once those threads sleep for any |
| // reason (mutex), they yield their ownership. |
| // |
| // This test hits an edge case on windows where more threads than cores are |
| // created, none of those threads ever yield due to being at capacity, so |
| // IOCP gets "starved". |
| // |
| // For now, this is a very edge case that is probably not a real production |
| // concern. There also isn't a great/obvious solution to take. For now, the |
| // test is disabled. |
| #[cfg(not(windows))] |
| #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads |
| fn io_driver_called_when_under_load() { |
| let rt = rt(); |
| |
| // Create a lot of constant load. The scheduler will always be busy. |
| for _ in 0..100 { |
| rt.spawn(async { |
| loop { |
| // Don't use Tokio's `yield_now()` to avoid special defer |
| // logic. |
| futures::future::poll_fn::<(), _>(|cx| { |
| cx.waker().wake_by_ref(); |
| std::task::Poll::Pending |
| }).await; |
| } |
| }); |
| } |
| |
| // Do some I/O work |
| rt.block_on(async { |
| let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); |
| let addr = assert_ok!(listener.local_addr()); |
| |
| let srv = tokio::spawn(async move { |
| let (mut stream, _) = assert_ok!(listener.accept().await); |
| assert_ok!(stream.write_all(b"hello world").await); |
| }); |
| |
| let cli = tokio::spawn(async move { |
| let mut stream = assert_ok!(TcpStream::connect(addr).await); |
| let mut dst = vec![0; 11]; |
| |
| assert_ok!(stream.read_exact(&mut dst).await); |
| assert_eq!(dst, b"hello world"); |
| }); |
| |
| assert_ok!(srv.await); |
| assert_ok!(cli.await); |
| }); |
| } |
| |
| /// Tests that yielded tasks are not scheduled until **after** resource |
| /// drivers are polled. |
| /// |
| /// Note: we may have to delete this test as it is not necessarily reliable. |
| /// The OS does not guarantee when I/O events are delivered, so there may be |
| /// more yields than anticipated. |
| #[test] |
| #[cfg(not(target_os="wasi"))] |
| fn yield_defers_until_park() { |
| use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; |
| use std::sync::Barrier; |
| |
| let rt = rt(); |
| |
| let flag = Arc::new(AtomicBool::new(false)); |
| let barrier = Arc::new(Barrier::new(NUM_WORKERS)); |
| |
| rt.block_on(async { |
| // Make sure other workers cannot steal tasks |
| #[allow(clippy::reversed_empty_ranges)] |
| for _ in 0..(NUM_WORKERS-1) { |
| let flag = flag.clone(); |
| let barrier = barrier.clone(); |
| |
| tokio::spawn(async move { |
| barrier.wait(); |
| |
| while !flag.load(SeqCst) { |
| std::thread::sleep(std::time::Duration::from_millis(1)); |
| } |
| }); |
| } |
| |
| barrier.wait(); |
| |
| tokio::spawn(async move { |
| // Create a TCP litener |
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |
| let addr = listener.local_addr().unwrap(); |
| |
| tokio::join!( |
| async { |
| // Done blocking intentionally |
| let _socket = std::net::TcpStream::connect(addr).unwrap(); |
| |
| // Yield until connected |
| let mut cnt = 0; |
| while !flag.load(SeqCst){ |
| tokio::task::yield_now().await; |
| cnt += 1; |
| |
| if cnt >= 10 { |
| panic!("yielded too many times; TODO: delete this test?"); |
| } |
| } |
| }, |
| async { |
| let _ = listener.accept().await.unwrap(); |
| flag.store(true, SeqCst); |
| } |
| ); |
| }).await.unwrap(); |
| }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn client_server_block_on() { |
| let rt = rt(); |
| let (tx, rx) = mpsc::channel(); |
| |
| rt.block_on(async move { client_server(tx).await }); |
| |
| assert_ok!(rx.try_recv()); |
| assert_err!(rx.try_recv()); |
| } |
| |
| #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")] |
| #[test] |
| #[cfg(not(target_os = "android"))] |
| fn panic_in_task() { |
| let rt = rt(); |
| let (tx, rx) = oneshot::channel(); |
| |
| struct Boom(Option<oneshot::Sender<()>>); |
| |
| impl Future for Boom { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
| panic!(); |
| } |
| } |
| |
| impl Drop for Boom { |
| fn drop(&mut self) { |
| assert!(std::thread::panicking()); |
| self.0.take().unwrap().send(()).unwrap(); |
| } |
| } |
| |
| rt.spawn(Boom(Some(tx))); |
| assert_ok!(rt.block_on(rx)); |
| } |
| |
| #[test] |
| #[should_panic] |
| #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] |
| fn panic_in_block_on() { |
| let rt = rt(); |
| rt.block_on(async { panic!() }); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| async fn yield_once() { |
| let mut yielded = false; |
| poll_fn(|cx| { |
| if yielded { |
| Poll::Ready(()) |
| } else { |
| yielded = true; |
| cx.waker().wake_by_ref(); |
| Poll::Pending |
| } |
| }) |
| .await |
| } |
| |
| #[test] |
| fn enter_and_spawn() { |
| let rt = rt(); |
| let handle = { |
| let _enter = rt.enter(); |
| tokio::spawn(async {}) |
| }; |
| |
| assert_ok!(rt.block_on(handle)); |
| } |
| |
| #[test] |
| fn eagerly_drops_futures_on_shutdown() { |
| use std::sync::mpsc; |
| |
| struct Never { |
| drop_tx: mpsc::Sender<()>, |
| } |
| |
| impl Future for Never { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
| Poll::Pending |
| } |
| } |
| |
| impl Drop for Never { |
| fn drop(&mut self) { |
| self.drop_tx.send(()).unwrap(); |
| } |
| } |
| |
| let rt = rt(); |
| |
| let (drop_tx, drop_rx) = mpsc::channel(); |
| let (run_tx, run_rx) = oneshot::channel(); |
| |
| rt.block_on(async move { |
| tokio::spawn(async move { |
| assert_ok!(run_tx.send(())); |
| |
| Never { drop_tx }.await |
| }); |
| |
| assert_ok!(run_rx.await); |
| }); |
| |
| drop(rt); |
| |
| assert_ok!(drop_rx.recv()); |
| } |
| |
| #[test] |
| fn wake_while_rt_is_dropping() { |
| use tokio::sync::Barrier; |
| use core::sync::atomic::{AtomicBool, Ordering}; |
| |
| let drop_triggered = Arc::new(AtomicBool::new(false)); |
| let set_drop_triggered = drop_triggered.clone(); |
| |
| struct OnDrop<F: FnMut()>(F); |
| |
| impl<F: FnMut()> Drop for OnDrop<F> { |
| fn drop(&mut self) { |
| (self.0)() |
| } |
| } |
| |
| let (tx1, rx1) = oneshot::channel(); |
| let (tx2, rx2) = oneshot::channel(); |
| let (tx3, rx3) = oneshot::channel(); |
| |
| let barrier = Arc::new(Barrier::new(4)); |
| let barrier1 = barrier.clone(); |
| let barrier2 = barrier.clone(); |
| let barrier3 = barrier.clone(); |
| |
| let rt = rt(); |
| |
| rt.spawn(async move { |
| // Ensure a waker gets stored in oneshot 1. |
| let _ = tokio::join!(rx1, barrier1.wait()); |
| tx3.send(()).unwrap(); |
| }); |
| |
| rt.spawn(async move { |
| let h1 = tokio::runtime::Handle::current(); |
| // When this task is dropped, we'll be "closing remotes". |
| // We spawn a new task that owns the `tx1`, to move its Drop |
| // out of here. |
| // |
| // Importantly, the oneshot 1 has a waker already stored, so |
| // the eventual drop here will try to re-schedule again. |
| let mut opt_tx1 = Some(tx1); |
| let _d = OnDrop(move || { |
| let tx1 = opt_tx1.take().unwrap(); |
| h1.spawn(async move { |
| tx1.send(()).unwrap(); |
| }); |
| // Just a sanity check that this entire thing actually happened |
| set_drop_triggered.store(true, Ordering::Relaxed); |
| }); |
| let _ = tokio::join!(rx2, barrier2.wait()); |
| }); |
| |
| rt.spawn(async move { |
| let _ = tokio::join!(rx3, barrier3.wait()); |
| // We'll never get here, but once task 3 drops, this will |
| // force task 2 to re-schedule since it's waiting on oneshot 2. |
| tx2.send(()).unwrap(); |
| }); |
| |
| // Wait until every oneshot channel has been polled. |
| rt.block_on(barrier.wait()); |
| |
| // Drop the rt |
| drop(rt); |
| |
| // Make sure that the spawn actually happened |
| assert!(drop_triggered.load(Ordering::Relaxed)); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind() |
| #[test] |
| fn io_notify_while_shutting_down() { |
| use tokio::net::UdpSocket; |
| use std::sync::Arc; |
| |
| for _ in 1..10 { |
| let runtime = rt(); |
| |
| runtime.block_on(async { |
| let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let addr = socket.local_addr().unwrap(); |
| let send_half = Arc::new(socket); |
| let recv_half = send_half.clone(); |
| |
| tokio::spawn(async move { |
| let mut buf = [0]; |
| loop { |
| recv_half.recv_from(&mut buf).await.unwrap(); |
| std::thread::sleep(Duration::from_millis(2)); |
| } |
| }); |
| |
| tokio::spawn(async move { |
| let buf = [0]; |
| loop { |
| send_half.send_to(&buf, &addr).await.unwrap(); |
| tokio::time::sleep(Duration::from_millis(1)).await; |
| } |
| }); |
| |
| tokio::time::sleep(Duration::from_millis(5)).await; |
| }); |
| } |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn shutdown_timeout() { |
| let (tx, rx) = oneshot::channel(); |
| let runtime = rt(); |
| |
| runtime.block_on(async move { |
| task::spawn_blocking(move || { |
| tx.send(()).unwrap(); |
| thread::sleep(Duration::from_secs(10_000)); |
| }); |
| |
| rx.await.unwrap(); |
| }); |
| |
| Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support threads |
| #[test] |
| fn shutdown_timeout_0() { |
| let runtime = rt(); |
| |
| runtime.block_on(async move { |
| task::spawn_blocking(move || { |
| thread::sleep(Duration::from_secs(10_000)); |
| }); |
| }); |
| |
| let now = Instant::now(); |
| Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0)); |
| assert!(now.elapsed().as_secs() < 1); |
| } |
| |
| #[test] |
| fn shutdown_wakeup_time() { |
| let runtime = rt(); |
| |
| runtime.block_on(async move { |
| tokio::time::sleep(std::time::Duration::from_millis(100)).await; |
| }); |
| |
| Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); |
| } |
| |
| // This test is currently ignored on Windows because of a |
| // rust-lang issue in thread local storage destructors. |
| // See https://github.com/rust-lang/rust/issues/74875 |
| #[test] |
| #[cfg(not(windows))] |
| #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")] |
| fn runtime_in_thread_local() { |
| use std::cell::RefCell; |
| use std::thread; |
| |
| thread_local!( |
| static R: RefCell<Option<Runtime>> = RefCell::new(None); |
| ); |
| |
| thread::spawn(|| { |
| R.with(|cell| { |
| let rt = rt(); |
| let rt = Arc::try_unwrap(rt).unwrap(); |
| *cell.borrow_mut() = Some(rt); |
| }); |
| |
| let _rt = rt(); |
| }).join().unwrap(); |
| } |
| |
| #[cfg(not(target_os="wasi"))] // Wasi does not support bind |
| async fn client_server(tx: mpsc::Sender<()>) { |
| let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); |
| |
| // Get the assigned address |
| let addr = assert_ok!(server.local_addr()); |
| |
| // Spawn the server |
| tokio::spawn(async move { |
| // Accept a socket |
| let (mut socket, _) = server.accept().await.unwrap(); |
| |
| // Write some data |
| socket.write_all(b"hello").await.unwrap(); |
| }); |
| |
| let mut client = TcpStream::connect(&addr).await.unwrap(); |
| |
| let mut buf = vec![]; |
| client.read_to_end(&mut buf).await.unwrap(); |
| |
| assert_eq!(buf, b"hello"); |
| tx.send(()).unwrap(); |
| } |
| |
| #[cfg(not(tokio_wasi))] // Wasi does not support bind |
| #[test] |
| fn local_set_block_on_socket() { |
| let rt = rt(); |
| let local = task::LocalSet::new(); |
| |
| local.block_on(&rt, async move { |
| let (tx, rx) = oneshot::channel(); |
| |
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |
| let addr = listener.local_addr().unwrap(); |
| |
| task::spawn_local(async move { |
| let _ = listener.accept().await; |
| tx.send(()).unwrap(); |
| }); |
| |
| TcpStream::connect(&addr).await.unwrap(); |
| rx.await.unwrap(); |
| }); |
| } |
| |
| #[cfg(not(tokio_wasi))] // Wasi does not support bind |
| #[test] |
| fn local_set_client_server_block_on() { |
| let rt = rt(); |
| let (tx, rx) = mpsc::channel(); |
| |
| let local = task::LocalSet::new(); |
| |
| local.block_on(&rt, async move { client_server_local(tx).await }); |
| |
| assert_ok!(rx.try_recv()); |
| assert_err!(rx.try_recv()); |
| } |
| |
| #[cfg(not(tokio_wasi))] // Wasi does not support bind |
| async fn client_server_local(tx: mpsc::Sender<()>) { |
| let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); |
| |
| // Get the assigned address |
| let addr = assert_ok!(server.local_addr()); |
| |
| // Spawn the server |
| task::spawn_local(async move { |
| // Accept a socket |
| let (mut socket, _) = server.accept().await.unwrap(); |
| |
| // Write some data |
| socket.write_all(b"hello").await.unwrap(); |
| }); |
| |
| let mut client = TcpStream::connect(&addr).await.unwrap(); |
| |
| let mut buf = vec![]; |
| client.read_to_end(&mut buf).await.unwrap(); |
| |
| assert_eq!(buf, b"hello"); |
| tx.send(()).unwrap(); |
| } |
| |
| #[test] |
| fn coop() { |
| use std::task::Poll::Ready; |
| use tokio::sync::mpsc; |
| |
| let rt = rt(); |
| |
| rt.block_on(async { |
| let (send, mut recv) = mpsc::unbounded_channel(); |
| |
| // Send a bunch of messages. |
| for _ in 0..1_000 { |
| send.send(()).unwrap(); |
| } |
| |
| poll_fn(|cx| { |
| // At least one response should return pending. |
| for _ in 0..1_000 { |
| if recv.poll_recv(cx).is_pending() { |
| return Ready(()); |
| } |
| } |
| |
| panic!("did not yield"); |
| }).await; |
| }); |
| } |
| |
| #[test] |
| fn coop_unconstrained() { |
| use std::task::Poll::Ready; |
| use tokio::sync::mpsc; |
| |
| let rt = rt(); |
| |
| rt.block_on(async { |
| let (send, mut recv) = mpsc::unbounded_channel(); |
| |
| // Send a bunch of messages. |
| for _ in 0..1_000 { |
| send.send(()).unwrap(); |
| } |
| |
| tokio::task::unconstrained(poll_fn(|cx| { |
| // All the responses should be ready. |
| for _ in 0..1_000 { |
| assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(()))); |
| } |
| |
| Ready(()) |
| })).await; |
| }); |
| } |
| |
| #[cfg(tokio_unstable)] |
| #[test] |
| fn coop_consume_budget() { |
| let rt = rt(); |
| |
| rt.block_on(async { |
| poll_fn(|cx| { |
| let counter = Arc::new(std::sync::Mutex::new(0)); |
| let counter_clone = Arc::clone(&counter); |
| let mut worker = Box::pin(async move { |
| // Consume the budget until a yield happens |
| for _ in 0..1000 { |
| *counter.lock().unwrap() += 1; |
| task::consume_budget().await |
| } |
| }); |
| // Assert that the worker was yielded and it didn't manage |
| // to finish the whole work (assuming the total budget of 128) |
| assert!(Pin::new(&mut worker).poll(cx).is_pending()); |
| assert!(*counter_clone.lock().unwrap() < 1000); |
| std::task::Poll::Ready(()) |
| }).await; |
| }); |
| } |
| |
| // Tests that the "next task" scheduler optimization is not able to starve |
| // other tasks. |
| #[test] |
| fn ping_pong_saturation() { |
| use std::sync::atomic::{Ordering, AtomicBool}; |
| use tokio::sync::mpsc; |
| |
| const NUM: usize = 100; |
| |
| let rt = rt(); |
| |
| let running = Arc::new(AtomicBool::new(true)); |
| |
| rt.block_on(async { |
| let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); |
| |
| let mut tasks = vec![]; |
| // Spawn a bunch of tasks that ping-pong between each other to |
| // saturate the runtime. |
| for _ in 0..NUM { |
| let (tx1, mut rx1) = mpsc::unbounded_channel(); |
| let (tx2, mut rx2) = mpsc::unbounded_channel(); |
| let spawned_tx = spawned_tx.clone(); |
| let running = running.clone(); |
| tasks.push(task::spawn(async move { |
| spawned_tx.send(()).unwrap(); |
| |
| |
| while running.load(Ordering::Relaxed) { |
| tx1.send(()).unwrap(); |
| rx2.recv().await.unwrap(); |
| } |
| |
| // Close the channel and wait for the other task to exit. |
| drop(tx1); |
| assert!(rx2.recv().await.is_none()); |
| })); |
| |
| tasks.push(task::spawn(async move { |
| while rx1.recv().await.is_some() { |
| tx2.send(()).unwrap(); |
| } |
| })); |
| } |
| |
| for _ in 0..NUM { |
| spawned_rx.recv().await.unwrap(); |
| } |
| |
| // spawn another task and wait for it to complete |
| let handle = task::spawn(async { |
| for _ in 0..5 { |
| // Yielding forces it back into the local queue. |
| task::yield_now().await; |
| } |
| }); |
| handle.await.unwrap(); |
| running.store(false, Ordering::Relaxed); |
| for t in tasks { |
| t.await.unwrap(); |
| } |
| }); |
| } |
| } |