| #![cfg(feature = "macros")] |
| #![allow(clippy::disallowed_names)] |
| |
| #[cfg(tokio_wasm_not_wasi)] |
| use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; |
| |
| #[cfg(not(tokio_wasm_not_wasi))] |
| use tokio::test as maybe_tokio_test; |
| |
| use tokio::sync::oneshot; |
| use tokio_test::{assert_ok, assert_pending, assert_ready}; |
| |
| use futures::future::poll_fn; |
| use std::task::Poll::Ready; |
| |
| #[maybe_tokio_test] |
| async fn sync_one_lit_expr_comma() { |
| let foo = tokio::select! { |
| foo = async { 1 } => foo, |
| }; |
| |
| assert_eq!(foo, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn nested_one() { |
| let foo = tokio::select! { |
| foo = async { 1 } => tokio::select! { |
| bar = async { foo } => bar, |
| }, |
| }; |
| |
| assert_eq!(foo, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn sync_one_lit_expr_no_comma() { |
| let foo = tokio::select! { |
| foo = async { 1 } => foo |
| }; |
| |
| assert_eq!(foo, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn sync_one_lit_expr_block() { |
| let foo = tokio::select! { |
| foo = async { 1 } => { foo } |
| }; |
| |
| assert_eq!(foo, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn sync_one_await() { |
| let foo = tokio::select! { |
| foo = one() => foo, |
| }; |
| |
| assert_eq!(foo, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn sync_one_ident() { |
| let one = one(); |
| |
| let foo = tokio::select! { |
| foo = one => foo, |
| }; |
| |
| assert_eq!(foo, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn sync_two() { |
| use std::cell::Cell; |
| |
| let cnt = Cell::new(0); |
| |
| let res = tokio::select! { |
| foo = async { |
| cnt.set(cnt.get() + 1); |
| 1 |
| } => foo, |
| bar = async { |
| cnt.set(cnt.get() + 1); |
| 2 |
| } => bar, |
| }; |
| |
| assert_eq!(1, cnt.get()); |
| assert!(res == 1 || res == 2); |
| } |
| |
| #[maybe_tokio_test] |
| async fn drop_in_fut() { |
| let s = "hello".to_string(); |
| |
| let res = tokio::select! { |
| foo = async { |
| let v = one().await; |
| drop(s); |
| v |
| } => foo |
| }; |
| |
| assert_eq!(res, 1); |
| } |
| |
| #[maybe_tokio_test] |
| #[cfg(feature = "full")] |
| async fn one_ready() { |
| let (tx1, rx1) = oneshot::channel::<i32>(); |
| let (_tx2, rx2) = oneshot::channel::<i32>(); |
| |
| tx1.send(1).unwrap(); |
| |
| let v = tokio::select! { |
| res = rx1 => { |
| assert_ok!(res) |
| }, |
| _ = rx2 => unreachable!(), |
| }; |
| |
| assert_eq!(1, v); |
| } |
| |
| #[maybe_tokio_test] |
| #[cfg(feature = "full")] |
| async fn select_streams() { |
| use tokio::sync::mpsc; |
| |
| let (tx1, mut rx1) = mpsc::unbounded_channel::<i32>(); |
| let (tx2, mut rx2) = mpsc::unbounded_channel::<i32>(); |
| |
| tokio::spawn(async move { |
| assert_ok!(tx2.send(1)); |
| tokio::task::yield_now().await; |
| |
| assert_ok!(tx1.send(2)); |
| tokio::task::yield_now().await; |
| |
| assert_ok!(tx2.send(3)); |
| tokio::task::yield_now().await; |
| |
| drop((tx1, tx2)); |
| }); |
| |
| let mut rem = true; |
| let mut msgs = vec![]; |
| |
| while rem { |
| tokio::select! { |
| Some(x) = rx1.recv() => { |
| msgs.push(x); |
| } |
| Some(y) = rx2.recv() => { |
| msgs.push(y); |
| } |
| else => { |
| rem = false; |
| } |
| } |
| } |
| |
| msgs.sort_unstable(); |
| assert_eq!(&msgs[..], &[1, 2, 3]); |
| } |
| |
| #[maybe_tokio_test] |
| async fn move_uncompleted_futures() { |
| let (tx1, mut rx1) = oneshot::channel::<i32>(); |
| let (tx2, mut rx2) = oneshot::channel::<i32>(); |
| |
| tx1.send(1).unwrap(); |
| tx2.send(2).unwrap(); |
| |
| let ran; |
| |
| tokio::select! { |
| res = &mut rx1 => { |
| assert_eq!(1, assert_ok!(res)); |
| assert_eq!(2, assert_ok!(rx2.await)); |
| ran = true; |
| }, |
| res = &mut rx2 => { |
| assert_eq!(2, assert_ok!(res)); |
| assert_eq!(1, assert_ok!(rx1.await)); |
| ran = true; |
| }, |
| } |
| |
| assert!(ran); |
| } |
| |
| #[maybe_tokio_test] |
| async fn nested() { |
| let res = tokio::select! { |
| x = async { 1 } => { |
| tokio::select! { |
| y = async { 2 } => x + y, |
| } |
| } |
| }; |
| |
| assert_eq!(res, 3); |
| } |
| |
| #[maybe_tokio_test] |
| #[cfg(target_pointer_width = "64")] |
| async fn struct_size() { |
| use futures::future; |
| use std::mem; |
| |
| let fut = async { |
| let ready = future::ready(0i32); |
| |
| tokio::select! { |
| _ = ready => {}, |
| } |
| }; |
| |
| assert_eq!(mem::size_of_val(&fut), 40); |
| |
| let fut = async { |
| let ready1 = future::ready(0i32); |
| let ready2 = future::ready(0i32); |
| |
| tokio::select! { |
| _ = ready1 => {}, |
| _ = ready2 => {}, |
| } |
| }; |
| |
| assert_eq!(mem::size_of_val(&fut), 48); |
| |
| let fut = async { |
| let ready1 = future::ready(0i32); |
| let ready2 = future::ready(0i32); |
| let ready3 = future::ready(0i32); |
| |
| tokio::select! { |
| _ = ready1 => {}, |
| _ = ready2 => {}, |
| _ = ready3 => {}, |
| } |
| }; |
| |
| assert_eq!(mem::size_of_val(&fut), 56); |
| } |
| |
| #[maybe_tokio_test] |
| async fn mutable_borrowing_future_with_same_borrow_in_block() { |
| let mut value = 234; |
| |
| tokio::select! { |
| _ = require_mutable(&mut value) => { }, |
| _ = async_noop() => { |
| value += 5; |
| }, |
| } |
| |
| assert!(value >= 234); |
| } |
| |
| #[maybe_tokio_test] |
| async fn mutable_borrowing_future_with_same_borrow_in_block_and_else() { |
| let mut value = 234; |
| |
| tokio::select! { |
| _ = require_mutable(&mut value) => { }, |
| _ = async_noop() => { |
| value += 5; |
| }, |
| else => { |
| value += 27; |
| }, |
| } |
| |
| assert!(value >= 234); |
| } |
| |
| #[maybe_tokio_test] |
| async fn future_panics_after_poll() { |
| use tokio_test::task; |
| |
| let (tx, rx) = oneshot::channel(); |
| |
| let mut polled = false; |
| |
| let f = poll_fn(|_| { |
| assert!(!polled); |
| polled = true; |
| Ready(None::<()>) |
| }); |
| |
| let mut f = task::spawn(async { |
| tokio::select! { |
| Some(_) = f => unreachable!(), |
| ret = rx => ret.unwrap(), |
| } |
| }); |
| |
| assert_pending!(f.poll()); |
| assert_pending!(f.poll()); |
| |
| assert_ok!(tx.send(1)); |
| |
| let res = assert_ready!(f.poll()); |
| assert_eq!(1, res); |
| } |
| |
| #[maybe_tokio_test] |
| async fn disable_with_if() { |
| use tokio_test::task; |
| |
| let f = poll_fn(|_| panic!()); |
| let (tx, rx) = oneshot::channel(); |
| |
| let mut f = task::spawn(async { |
| tokio::select! { |
| _ = f, if false => unreachable!(), |
| _ = rx => (), |
| } |
| }); |
| |
| assert_pending!(f.poll()); |
| |
| assert_ok!(tx.send(())); |
| assert!(f.is_woken()); |
| |
| assert_ready!(f.poll()); |
| } |
| |
| #[maybe_tokio_test] |
| async fn join_with_select() { |
| use tokio_test::task; |
| |
| let (tx1, mut rx1) = oneshot::channel(); |
| let (tx2, mut rx2) = oneshot::channel(); |
| |
| let mut f = task::spawn(async { |
| let mut a = None; |
| let mut b = None; |
| |
| while a.is_none() || b.is_none() { |
| tokio::select! { |
| v1 = &mut rx1, if a.is_none() => a = Some(assert_ok!(v1)), |
| v2 = &mut rx2, if b.is_none() => b = Some(assert_ok!(v2)) |
| } |
| } |
| |
| (a.unwrap(), b.unwrap()) |
| }); |
| |
| assert_pending!(f.poll()); |
| |
| assert_ok!(tx1.send(123)); |
| assert!(f.is_woken()); |
| assert_pending!(f.poll()); |
| |
| assert_ok!(tx2.send(456)); |
| assert!(f.is_woken()); |
| let (a, b) = assert_ready!(f.poll()); |
| |
| assert_eq!(a, 123); |
| assert_eq!(b, 456); |
| } |
| |
| #[tokio::test] |
| #[cfg(feature = "full")] |
| async fn use_future_in_if_condition() { |
| use tokio::time::{self, Duration}; |
| |
| tokio::select! { |
| _ = time::sleep(Duration::from_millis(10)), if false => { |
| panic!("if condition ignored") |
| } |
| _ = async { 1u32 } => { |
| } |
| } |
| } |
| |
| #[tokio::test] |
| #[cfg(feature = "full")] |
| async fn use_future_in_if_condition_biased() { |
| use tokio::time::{self, Duration}; |
| |
| tokio::select! { |
| biased; |
| _ = time::sleep(Duration::from_millis(10)), if false => { |
| panic!("if condition ignored") |
| } |
| _ = async { 1u32 } => { |
| } |
| } |
| } |
| |
| #[maybe_tokio_test] |
| async fn many_branches() { |
| let num = tokio::select! { |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| x = async { 1 } => x, |
| }; |
| |
| assert_eq!(1, num); |
| } |
| |
| #[maybe_tokio_test] |
| async fn never_branch_no_warnings() { |
| let t = tokio::select! { |
| _ = async_never() => 0, |
| one_async_ready = one() => one_async_ready, |
| }; |
| assert_eq!(t, 1); |
| } |
| |
| async fn one() -> usize { |
| 1 |
| } |
| |
| async fn require_mutable(_: &mut i32) {} |
| async fn async_noop() {} |
| |
| async fn async_never() -> ! { |
| futures::future::pending().await |
| } |
| |
| // From https://github.com/tokio-rs/tokio/issues/2857 |
| #[maybe_tokio_test] |
| async fn mut_on_left_hand_side() { |
| let v = async move { |
| let ok = async { 1 }; |
| tokio::pin!(ok); |
| tokio::select! { |
| mut a = &mut ok => { |
| a += 1; |
| a |
| } |
| } |
| } |
| .await; |
| assert_eq!(v, 2); |
| } |
| |
| #[maybe_tokio_test] |
| async fn biased_one_not_ready() { |
| let (_tx1, rx1) = oneshot::channel::<i32>(); |
| let (tx2, rx2) = oneshot::channel::<i32>(); |
| let (tx3, rx3) = oneshot::channel::<i32>(); |
| |
| tx2.send(2).unwrap(); |
| tx3.send(3).unwrap(); |
| |
| let v = tokio::select! { |
| biased; |
| |
| _ = rx1 => unreachable!(), |
| res = rx2 => { |
| assert_ok!(res) |
| }, |
| _ = rx3 => { |
| panic!("This branch should never be activated because `rx2` should be polled before `rx3` due to `biased;`.") |
| } |
| }; |
| |
| assert_eq!(2, v); |
| } |
| |
| #[maybe_tokio_test] |
| #[cfg(feature = "full")] |
| async fn biased_eventually_ready() { |
| use tokio::task::yield_now; |
| |
| let one = async {}; |
| let two = async { yield_now().await }; |
| let three = async { yield_now().await }; |
| |
| let mut count = 0u8; |
| |
| tokio::pin!(one, two, three); |
| |
| loop { |
| tokio::select! { |
| biased; |
| |
| _ = &mut two, if count < 2 => { |
| count += 1; |
| assert_eq!(count, 2); |
| } |
| _ = &mut three, if count < 3 => { |
| count += 1; |
| assert_eq!(count, 3); |
| } |
| _ = &mut one, if count < 1 => { |
| count += 1; |
| assert_eq!(count, 1); |
| } |
| else => break, |
| } |
| } |
| |
| assert_eq!(count, 3); |
| } |
| |
| // https://github.com/tokio-rs/tokio/issues/3830 |
| // https://github.com/rust-lang/rust-clippy/issues/7304 |
| #[warn(clippy::default_numeric_fallback)] |
| pub async fn default_numeric_fallback() { |
| tokio::select! { |
| _ = async {} => (), |
| else => (), |
| } |
| } |
| |
| // https://github.com/tokio-rs/tokio/issues/4182 |
| #[maybe_tokio_test] |
| async fn mut_ref_patterns() { |
| tokio::select! { |
| Some(mut foo) = async { Some("1".to_string()) } => { |
| assert_eq!(foo, "1"); |
| foo = "2".to_string(); |
| assert_eq!(foo, "2"); |
| }, |
| }; |
| |
| tokio::select! { |
| Some(ref foo) = async { Some("1".to_string()) } => { |
| assert_eq!(*foo, "1"); |
| }, |
| }; |
| |
| tokio::select! { |
| Some(ref mut foo) = async { Some("1".to_string()) } => { |
| assert_eq!(*foo, "1"); |
| *foo = "2".to_string(); |
| assert_eq!(*foo, "2"); |
| }, |
| }; |
| } |
| |
| #[cfg(tokio_unstable)] |
| mod unstable { |
| use tokio::runtime::RngSeed; |
| |
| #[test] |
| fn deterministic_select_current_thread() { |
| let seed = b"bytes used to generate seed"; |
| let rt1 = tokio::runtime::Builder::new_current_thread() |
| .rng_seed(RngSeed::from_bytes(seed)) |
| .build() |
| .unwrap(); |
| let rt1_values = rt1.block_on(async { (select_0_to_9().await, select_0_to_9().await) }); |
| |
| let rt2 = tokio::runtime::Builder::new_current_thread() |
| .rng_seed(RngSeed::from_bytes(seed)) |
| .build() |
| .unwrap(); |
| let rt2_values = rt2.block_on(async { (select_0_to_9().await, select_0_to_9().await) }); |
| |
| assert_eq!(rt1_values, rt2_values); |
| } |
| |
| #[test] |
| #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] |
| fn deterministic_select_multi_thread() { |
| let seed = b"bytes used to generate seed"; |
| let rt1 = tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(1) |
| .rng_seed(RngSeed::from_bytes(seed)) |
| .build() |
| .unwrap(); |
| let rt1_values = rt1.block_on(async { |
| let _ = tokio::spawn(async { (select_0_to_9().await, select_0_to_9().await) }).await; |
| }); |
| |
| let rt2 = tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(1) |
| .rng_seed(RngSeed::from_bytes(seed)) |
| .build() |
| .unwrap(); |
| let rt2_values = rt2.block_on(async { |
| let _ = tokio::spawn(async { (select_0_to_9().await, select_0_to_9().await) }).await; |
| }); |
| |
| assert_eq!(rt1_values, rt2_values); |
| } |
| |
| async fn select_0_to_9() -> u32 { |
| tokio::select!( |
| x = async { 0 } => x, |
| x = async { 1 } => x, |
| x = async { 2 } => x, |
| x = async { 3 } => x, |
| x = async { 4 } => x, |
| x = async { 5 } => x, |
| x = async { 6 } => x, |
| x = async { 7 } => x, |
| x = async { 8 } => x, |
| x = async { 9 } => x, |
| ) |
| } |
| } |