blob: ce1f3a3379f59955611a969b3900d41ef9ad6f94 [file] [log] [blame]
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
use futures::future::{self, poll_fn, FutureExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::{Context, Poll};
use futures::{
join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
};
use std::mem;
#[test]
fn poll_and_pending() {
let pending_once = async { pending!() };
block_on(async {
pin_mut!(pending_once);
assert_eq!(Poll::Pending, poll!(&mut pending_once));
assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
});
}
#[test]
fn join() {
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
let fut = async {
let res = join!(rx1, rx2);
assert_eq!((Ok(1), Ok(2)), res);
};
block_on(async {
pin_mut!(fut);
assert_eq!(Poll::Pending, poll!(&mut fut));
tx1.send(1).unwrap();
assert_eq!(Poll::Pending, poll!(&mut fut));
tx2.send(2).unwrap();
assert_eq!(Poll::Ready(()), poll!(&mut fut));
});
}
#[test]
fn select() {
let (tx1, rx1) = oneshot::channel::<i32>();
let (_tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
let mut ran = false;
block_on(async {
select! {
res = rx1.fuse() => {
assert_eq!(Ok(1), res);
ran = true;
},
_ = rx2.fuse() => unreachable!(),
}
});
assert!(ran);
}
#[test]
fn select_biased() {
let (tx1, rx1) = oneshot::channel::<i32>();
let (_tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
let mut ran = false;
block_on(async {
select_biased! {
res = rx1.fuse() => {
assert_eq!(Ok(1), res);
ran = true;
},
_ = rx2.fuse() => unreachable!(),
}
});
assert!(ran);
}
#[test]
fn select_streams() {
let (mut tx1, rx1) = mpsc::channel::<i32>(1);
let (mut tx2, rx2) = mpsc::channel::<i32>(1);
let mut rx1 = rx1.fuse();
let mut rx2 = rx2.fuse();
let mut ran = false;
let mut total = 0;
block_on(async {
let mut tx1_opt;
let mut tx2_opt;
select! {
_ = rx1.next() => panic!(),
_ = rx2.next() => panic!(),
default => {
tx1.send(2).await.unwrap();
tx2.send(3).await.unwrap();
tx1_opt = Some(tx1);
tx2_opt = Some(tx2);
}
complete => panic!(),
}
loop {
select! {
// runs first and again after default
x = rx1.next() => if let Some(x) = x { total += x; },
// runs second and again after default
x = rx2.next() => if let Some(x) = x { total += x; },
// runs third
default => {
assert_eq!(total, 5);
ran = true;
drop(tx1_opt.take().unwrap());
drop(tx2_opt.take().unwrap());
},
// runs last
complete => break,
};
}
});
assert!(ran);
}
#[test]
fn select_can_move_uncompleted_futures() {
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
tx2.send(2).unwrap();
let mut ran = false;
let mut rx1 = rx1.fuse();
let mut rx2 = rx2.fuse();
block_on(async {
select! {
res = rx1 => {
assert_eq!(Ok(1), res);
assert_eq!(Ok(2), rx2.await);
ran = true;
},
res = rx2 => {
assert_eq!(Ok(2), res);
assert_eq!(Ok(1), rx1.await);
ran = true;
},
}
});
assert!(ran);
}
#[test]
fn select_nested() {
let mut outer_fut = future::ready(1);
let mut inner_fut = future::ready(2);
let res = block_on(async {
select! {
x = outer_fut => {
select! {
y = inner_fut => x + y,
}
}
}
});
assert_eq!(res, 3);
}
#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn select_size() {
let fut = async {
let mut ready = future::ready(0i32);
select! {
_ = ready => {},
}
};
assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let mut ready1 = future::ready(0i32);
let mut ready2 = future::ready(0i32);
select! {
_ = ready1 => {},
_ = ready2 => {},
}
};
assert_eq!(mem::size_of_val(&fut), 40);
}
#[test]
fn select_on_non_unpin_expressions() {
// The returned Future is !Unpin
let make_non_unpin_fut = || async { 5 };
let res = block_on(async {
let select_res;
select! {
value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
};
select_res
});
assert_eq!(res, 5);
}
#[test]
fn select_on_non_unpin_expressions_with_default() {
// The returned Future is !Unpin
let make_non_unpin_fut = || async { 5 };
let res = block_on(async {
let select_res;
select! {
value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
default => select_res = 7,
};
select_res
});
assert_eq!(res, 5);
}
#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn select_on_non_unpin_size() {
// The returned Future is !Unpin
let make_non_unpin_fut = || async { 5 };
let fut = async {
let select_res;
select! {
value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
};
select_res
};
assert_eq!(32, mem::size_of_val(&fut));
}
#[test]
fn select_can_be_used_as_expression() {
block_on(async {
let res = select! {
x = future::ready(7) => x,
y = future::ready(3) => y + 1,
};
assert!(res == 7 || res == 4);
});
}
#[test]
fn select_with_default_can_be_used_as_expression() {
fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
Poll::Pending
}
block_on(async {
let res = select! {
x = poll_fn(poll_always_pending::<i32>).fuse() => x,
y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1,
default => 99,
};
assert_eq!(res, 99);
});
}
#[test]
fn select_with_complete_can_be_used_as_expression() {
block_on(async {
let res = select! {
x = future::pending::<i32>() => x,
y = future::pending::<i32>() => y + 1,
default => 99,
complete => 237,
};
assert_eq!(res, 237);
});
}
#[test]
#[allow(unused_assignments)]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
async fn require_mutable(_: &mut i32) {}
async fn async_noop() {}
block_on(async {
let mut value = 234;
select! {
_ = require_mutable(&mut value).fuse() => { },
_ = async_noop().fuse() => {
value += 5;
},
}
});
}
#[test]
#[allow(unused_assignments)]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
async fn require_mutable(_: &mut i32) {}
async fn async_noop() {}
block_on(async {
let mut value = 234;
select! {
_ = require_mutable(&mut value).fuse() => { },
_ = async_noop().fuse() => {
value += 5;
},
default => {
value += 27;
},
}
});
}
#[test]
#[allow(unused_assignments)]
fn stream_select() {
// stream_select! macro
block_on(async {
let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
assert_eq!(endless_ones.next().await, Some(1));
assert_eq!(endless_ones.next().await, Some(1));
let mut finite_list =
stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
assert_eq!(finite_list.next().await, Some(1));
assert_eq!(finite_list.next().await, Some(1));
assert_eq!(finite_list.next().await, None);
let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
// Take 1000, and assert a somewhat even distribution of values.
// The fairness is randomized, but over 1000 samples we should be pretty close to even.
// This test may be a bit flaky. Feel free to adjust the margins as you see fit.
let mut count = 0;
let results = endless_mixed
.take_while(move |_| {
count += 1;
let ret = count < 1000;
async move { ret }
})
.collect::<Vec<_>>()
.await;
assert!(results.iter().filter(|x| **x == 1).count() >= 299);
assert!(results.iter().filter(|x| **x == 2).count() >= 299);
assert!(results.iter().filter(|x| **x == 3).count() >= 299);
});
}
#[test]
fn join_size() {
let fut = async {
let ready = future::ready(0i32);
join!(ready)
};
assert_eq!(mem::size_of_val(&fut), 16);
let fut = async {
let ready1 = future::ready(0i32);
let ready2 = future::ready(0i32);
join!(ready1, ready2)
};
assert_eq!(mem::size_of_val(&fut), 28);
}
#[test]
fn try_join_size() {
let fut = async {
let ready = future::ready(Ok::<i32, i32>(0));
try_join!(ready)
};
assert_eq!(mem::size_of_val(&fut), 16);
let fut = async {
let ready1 = future::ready(Ok::<i32, i32>(0));
let ready2 = future::ready(Ok::<i32, i32>(0));
try_join!(ready1, ready2)
};
assert_eq!(mem::size_of_val(&fut), 28);
}
#[test]
fn join_doesnt_require_unpin() {
let _ = async { join!(async {}, async {}) };
}
#[test]
fn try_join_doesnt_require_unpin() {
let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
}