blob: e46fc5a1f73d9a866c45f411d17c5ec42d926cf7 [file] [log] [blame]
use async_stream::stream;
use futures_core::stream::{FusedStream, Stream};
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
use tokio::sync::mpsc;
use tokio_test::assert_ok;
#[tokio::test]
async fn noop_stream() {
let s = stream! {};
pin_mut!(s);
while let Some(_) = s.next().await {
unreachable!();
}
}
#[tokio::test]
async fn empty_stream() {
let mut ran = false;
{
let r = &mut ran;
let s = stream! {
*r = true;
println!("hello world!");
};
pin_mut!(s);
while let Some(_) = s.next().await {
unreachable!();
}
}
assert!(ran);
}
#[tokio::test]
async fn yield_single_value() {
let s = stream! {
yield "hello";
};
let values: Vec<_> = s.collect().await;
assert_eq!(1, values.len());
assert_eq!("hello", values[0]);
}
#[tokio::test]
async fn fused() {
let s = stream! {
yield "hello";
};
pin_mut!(s);
assert!(!s.is_terminated());
assert_eq!(s.next().await, Some("hello"));
assert_eq!(s.next().await, None);
assert!(s.is_terminated());
// This should return None from now on
assert_eq!(s.next().await, None);
}
#[tokio::test]
async fn yield_multi_value() {
let s = stream! {
yield "hello";
yield "world";
yield "dizzy";
};
let values: Vec<_> = s.collect().await;
assert_eq!(3, values.len());
assert_eq!("hello", values[0]);
assert_eq!("world", values[1]);
assert_eq!("dizzy", values[2]);
}
#[tokio::test]
async fn return_stream() {
fn build_stream() -> impl Stream<Item = u32> {
stream! {
yield 1;
yield 2;
yield 3;
}
}
let s = build_stream();
let values: Vec<_> = s.collect().await;
assert_eq!(3, values.len());
assert_eq!(1, values[0]);
assert_eq!(2, values[1]);
assert_eq!(3, values[2]);
}
#[tokio::test]
async fn consume_channel() {
let (mut tx, mut rx) = mpsc::channel(10);
let s = stream! {
while let Some(v) = rx.recv().await {
yield v;
}
};
pin_mut!(s);
for i in 0..3 {
assert_ok!(tx.send(i).await);
assert_eq!(Some(i), s.next().await);
}
drop(tx);
assert_eq!(None, s.next().await);
}
#[tokio::test]
async fn borrow_self() {
struct Data(String);
impl Data {
fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
stream! {
yield &self.0[..];
}
}
}
let data = Data("hello".to_string());
let s = data.stream();
pin_mut!(s);
assert_eq!(Some("hello"), s.next().await);
}
#[tokio::test]
async fn stream_in_stream() {
let s = stream! {
let s = stream! {
for i in 0..3 {
yield i;
}
};
pin_mut!(s);
while let Some(v) = s.next().await {
yield v;
}
};
let values: Vec<_> = s.collect().await;
assert_eq!(3, values.len());
}
#[test]
fn test() {
let t = trybuild::TestCases::new();
t.compile_fail("tests/ui/*.rs");
}