blob: a22d039e4023cc01d8d12b6397742552ee08da4e [file] [log] [blame]
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{FutureExt, poll_fn};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::sync::mpsc;
use std::thread;
#[test]
fn smoke_poll() {
let (mut tx, rx) = oneshot::channel::<u32>();
let mut rx = Some(rx);
let f = poll_fn(|cx| {
assert!(tx.poll_canceled(cx).is_pending());
assert!(tx.poll_canceled(cx).is_pending());
drop(rx.take());
assert!(tx.poll_canceled(cx).is_ready());
assert!(tx.poll_canceled(cx).is_ready());
Poll::Ready(())
});
block_on(f);
}
#[test]
fn cancel_notifies() {
let (mut tx, rx) = oneshot::channel::<u32>();
let t = thread::spawn(move || {
block_on(tx.cancellation());
});
drop(rx);
t.join().unwrap();
}
#[test]
fn cancel_lots() {
let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
let t = thread::spawn(move || {
for (mut tx, tx2) in rx {
block_on(tx.cancellation());
tx2.send(()).unwrap();
}
});
for _ in 0..20000 {
let (otx, orx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
tx.send((otx, tx2)).unwrap();
drop(orx);
rx2.recv().unwrap();
}
drop(tx);
t.join().unwrap();
}
#[test]
fn cancel_after_sender_drop_doesnt_notify() {
let (mut tx, rx) = oneshot::channel::<u32>();
let mut cx = Context::from_waker(panic_waker_ref());
assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
drop(tx);
drop(rx);
}
#[test]
fn close() {
let (mut tx, mut rx) = oneshot::channel::<u32>();
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
Poll::Ready(Err(_)) => {},
_ => panic!(),
};
assert!(tx.poll_canceled(cx).is_ready());
Poll::Ready(())
}));
}
#[test]
fn close_wakes() {
let (mut tx, mut rx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
let t = thread::spawn(move || {
rx.close();
rx2.recv().unwrap();
});
block_on(tx.cancellation());
tx2.send(()).unwrap();
t.join().unwrap();
}
#[test]
fn is_canceled() {
let (tx, rx) = oneshot::channel::<u32>();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
}
#[test]
fn cancel_sends() {
let (tx, rx) = mpsc::channel::<Sender<_>>();
let t = thread::spawn(move || {
for otx in rx {
let _ = otx.send(42);
}
});
for _ in 0..20000 {
let (otx, mut orx) = oneshot::channel::<u32>();
tx.send(otx).unwrap();
orx.close();
let _ = block_on(orx);
}
drop(tx);
t.join().unwrap();
}
// #[test]
// fn spawn_sends_items() {
// let core = local_executor::Core::new();
// let future = ok::<_, ()>(1);
// let rx = spawn(future, &core);
// assert_eq!(core.run(rx).unwrap(), 1);
// }
//
// #[test]
// fn spawn_kill_dead_stream() {
// use std::thread;
// use std::time::Duration;
// use futures::future::Either;
// use futures::sync::oneshot;
//
// // a future which never returns anything (forever accepting incoming
// // connections), but dropping it leads to observable side effects
// // (like closing listening sockets, releasing limited resources,
// // ...)
// #[derive(Debug)]
// struct Dead {
// // when dropped you should get Err(oneshot::Canceled) on the
// // receiving end
// done: oneshot::Sender<()>,
// }
// impl Future for Dead {
// type Item = ();
// type Error = ();
//
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Ok(Poll::Pending)
// }
// }
//
// // need to implement a timeout for the test, as it would hang
// // forever right now
// let (timeout_tx, timeout_rx) = oneshot::channel();
// thread::spawn(move || {
// thread::sleep(Duration::from_millis(1000));
// let _ = timeout_tx.send(());
// });
//
// let core = local_executor::Core::new();
// let (done_tx, done_rx) = oneshot::channel();
// let future = Dead{done: done_tx};
// let rx = spawn(future, &core);
// let res = core.run(
// Ok::<_, ()>(())
// .into_future()
// .then(move |_| {
// // now drop the spawned future: maybe some timeout exceeded,
// // or some connection on this end was closed by the remote
// // end.
// drop(rx);
// // and wait for the spawned future to release its resources
// done_rx
// })
// .select2(timeout_rx)
// );
// match res {
// Err(Either::A((oneshot::Canceled, _))) => (),
// Ok(Either::B(((), _))) => {
// panic!("dead future wasn't canceled (timeout)");
// },
// _ => {
// panic!("dead future wasn't canceled (unexpected result)");
// },
// }
// }
//
// #[test]
// fn spawn_dont_kill_forgot_dead_stream() {
// use std::thread;
// use std::time::Duration;
// use futures::future::Either;
// use futures::sync::oneshot;
//
// // a future which never returns anything (forever accepting incoming
// // connections), but dropping it leads to observable side effects
// // (like closing listening sockets, releasing limited resources,
// // ...)
// #[derive(Debug)]
// struct Dead {
// // when dropped you should get Err(oneshot::Canceled) on the
// // receiving end
// done: oneshot::Sender<()>,
// }
// impl Future for Dead {
// type Item = ();
// type Error = ();
//
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Ok(Poll::Pending)
// }
// }
//
// // need to implement a timeout for the test, as it would hang
// // forever right now
// let (timeout_tx, timeout_rx) = oneshot::channel();
// thread::spawn(move || {
// thread::sleep(Duration::from_millis(1000));
// let _ = timeout_tx.send(());
// });
//
// let core = local_executor::Core::new();
// let (done_tx, done_rx) = oneshot::channel();
// let future = Dead{done: done_tx};
// let rx = spawn(future, &core);
// let res = core.run(
// Ok::<_, ()>(())
// .into_future()
// .then(move |_| {
// // forget the spawned future: should keep running, i.e. hit
// // the timeout below.
// rx.forget();
// // and wait for the spawned future to release its resources
// done_rx
// })
// .select2(timeout_rx)
// );
// match res {
// Err(Either::A((oneshot::Canceled, _))) => {
// panic!("forgotten dead future was canceled");
// },
// Ok(Either::B(((), _))) => (), // reached timeout
// _ => {
// panic!("forgotten dead future was canceled (unexpected result)");
// },
// }
// }