| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "sync")] |
| |
| #[cfg(tokio_wasm_not_wasi)] |
| use wasm_bindgen_test::wasm_bindgen_test as test; |
| #[cfg(tokio_wasm_not_wasi)] |
| use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; |
| |
| #[cfg(not(tokio_wasm_not_wasi))] |
| use tokio::test as maybe_tokio_test; |
| |
| use tokio::sync::oneshot; |
| use tokio::sync::oneshot::error::TryRecvError; |
| use tokio_test::*; |
| |
| use std::future::Future; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| |
| trait AssertSend: Send {} |
| impl AssertSend for oneshot::Sender<i32> {} |
| impl AssertSend for oneshot::Receiver<i32> {} |
| |
| trait SenderExt { |
| fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>; |
| } |
| impl<T> SenderExt for oneshot::Sender<T> { |
| fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
| tokio::pin! { |
| let fut = self.closed(); |
| } |
| fut.poll(cx) |
| } |
| } |
| |
| #[test] |
| fn send_recv() { |
| let (tx, rx) = oneshot::channel(); |
| let mut rx = task::spawn(rx); |
| |
| assert_pending!(rx.poll()); |
| |
| assert_ok!(tx.send(1)); |
| |
| assert!(rx.is_woken()); |
| |
| let val = assert_ready_ok!(rx.poll()); |
| assert_eq!(val, 1); |
| } |
| |
| #[maybe_tokio_test] |
| async fn async_send_recv() { |
| let (tx, rx) = oneshot::channel(); |
| |
| assert_ok!(tx.send(1)); |
| assert_eq!(1, assert_ok!(rx.await)); |
| } |
| |
| #[test] |
| fn close_tx() { |
| let (tx, rx) = oneshot::channel::<i32>(); |
| let mut rx = task::spawn(rx); |
| |
| assert_pending!(rx.poll()); |
| |
| drop(tx); |
| |
| assert!(rx.is_woken()); |
| assert_ready_err!(rx.poll()); |
| } |
| |
| #[test] |
| fn close_rx() { |
| // First, without checking poll_closed() |
| // |
| let (tx, _) = oneshot::channel(); |
| |
| assert_err!(tx.send(1)); |
| |
| // Second, via poll_closed(); |
| |
| let (tx, rx) = oneshot::channel(); |
| let mut tx = task::spawn(tx); |
| |
| assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| drop(rx); |
| |
| assert!(tx.is_woken()); |
| assert!(tx.is_closed()); |
| assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| assert_err!(tx.into_inner().send(1)); |
| } |
| |
| #[tokio::test] |
| #[cfg(feature = "full")] |
| async fn async_rx_closed() { |
| let (mut tx, rx) = oneshot::channel::<()>(); |
| |
| tokio::spawn(async move { |
| drop(rx); |
| }); |
| |
| tx.closed().await; |
| } |
| |
| #[test] |
| fn explicit_close_poll() { |
| // First, with message sent |
| let (tx, rx) = oneshot::channel(); |
| let mut rx = task::spawn(rx); |
| |
| assert_ok!(tx.send(1)); |
| |
| rx.close(); |
| |
| let value = assert_ready_ok!(rx.poll()); |
| assert_eq!(value, 1); |
| |
| // Second, without the message sent |
| let (tx, rx) = oneshot::channel::<i32>(); |
| let mut tx = task::spawn(tx); |
| let mut rx = task::spawn(rx); |
| |
| assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| rx.close(); |
| |
| assert!(tx.is_woken()); |
| assert!(tx.is_closed()); |
| assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| assert_err!(tx.into_inner().send(1)); |
| assert_ready_err!(rx.poll()); |
| |
| // Again, but without sending the value this time |
| let (tx, rx) = oneshot::channel::<i32>(); |
| let mut tx = task::spawn(tx); |
| let mut rx = task::spawn(rx); |
| |
| assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| rx.close(); |
| |
| assert!(tx.is_woken()); |
| assert!(tx.is_closed()); |
| assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| assert_ready_err!(rx.poll()); |
| } |
| |
| #[test] |
| fn explicit_close_try_recv() { |
| // First, with message sent |
| let (tx, mut rx) = oneshot::channel(); |
| |
| assert_ok!(tx.send(1)); |
| |
| rx.close(); |
| |
| let val = assert_ok!(rx.try_recv()); |
| assert_eq!(1, val); |
| |
| // Second, without the message sent |
| let (tx, mut rx) = oneshot::channel::<i32>(); |
| let mut tx = task::spawn(tx); |
| |
| assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| rx.close(); |
| |
| assert!(tx.is_woken()); |
| assert!(tx.is_closed()); |
| assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
| |
| assert_err!(rx.try_recv()); |
| } |
| |
| #[test] |
| #[should_panic] |
| #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding |
| fn close_try_recv_poll() { |
| let (_tx, rx) = oneshot::channel::<i32>(); |
| let mut rx = task::spawn(rx); |
| |
| rx.close(); |
| |
| assert_err!(rx.try_recv()); |
| |
| let _ = rx.poll(); |
| } |
| |
| #[test] |
| fn close_after_recv() { |
| let (tx, mut rx) = oneshot::channel::<i32>(); |
| |
| tx.send(17).unwrap(); |
| |
| assert_eq!(17, rx.try_recv().unwrap()); |
| rx.close(); |
| } |
| |
| #[test] |
| fn try_recv_after_completion() { |
| let (tx, mut rx) = oneshot::channel::<i32>(); |
| |
| tx.send(17).unwrap(); |
| |
| assert_eq!(17, rx.try_recv().unwrap()); |
| assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); |
| rx.close(); |
| } |
| |
| #[test] |
| fn try_recv_after_completion_await() { |
| let (tx, rx) = oneshot::channel::<i32>(); |
| let mut rx = task::spawn(rx); |
| |
| tx.send(17).unwrap(); |
| |
| assert_eq!(Ok(17), assert_ready!(rx.poll())); |
| assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); |
| rx.close(); |
| } |
| |
| #[test] |
| fn drops_tasks() { |
| let (mut tx, mut rx) = oneshot::channel::<i32>(); |
| let mut tx_task = task::spawn(()); |
| let mut rx_task = task::spawn(()); |
| |
| assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx))); |
| assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
| |
| drop(tx); |
| drop(rx); |
| |
| assert_eq!(1, tx_task.waker_ref_count()); |
| assert_eq!(1, rx_task.waker_ref_count()); |
| } |
| |
| #[test] |
| fn receiver_changes_task() { |
| let (tx, mut rx) = oneshot::channel(); |
| |
| let mut task1 = task::spawn(()); |
| let mut task2 = task::spawn(()); |
| |
| assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
| |
| assert_eq!(2, task1.waker_ref_count()); |
| assert_eq!(1, task2.waker_ref_count()); |
| |
| assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
| |
| assert_eq!(1, task1.waker_ref_count()); |
| assert_eq!(2, task2.waker_ref_count()); |
| |
| assert_ok!(tx.send(1)); |
| |
| assert!(!task1.is_woken()); |
| assert!(task2.is_woken()); |
| |
| assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
| } |
| |
| #[test] |
| fn sender_changes_task() { |
| let (mut tx, rx) = oneshot::channel::<i32>(); |
| |
| let mut task1 = task::spawn(()); |
| let mut task2 = task::spawn(()); |
| |
| assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx))); |
| |
| assert_eq!(2, task1.waker_ref_count()); |
| assert_eq!(1, task2.waker_ref_count()); |
| |
| assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx))); |
| |
| assert_eq!(1, task1.waker_ref_count()); |
| assert_eq!(2, task2.waker_ref_count()); |
| |
| drop(rx); |
| |
| assert!(!task1.is_woken()); |
| assert!(task2.is_woken()); |
| |
| assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx))); |
| } |