blob: d2c78f9ad39345e628afb5534b16247804b8d1cd [file] [log] [blame]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
//! Asynchronous stream of elements.
//!
//! Provides two macros, `stream!` and `try_stream!`, allowing the caller to
//! define asynchronous streams of elements. These are implemented using `async`
//! & `await` notation. This crate works without unstable features.
//!
//! The `stream!` macro returns an anonymous type implementing the [`Stream`]
//! trait. The `Item` associated type is the type of the values yielded from the
//! stream. The `try_stream!` also returns an anonymous type implementing the
//! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
//! `try_stream!` macro supports using `?` notiation as part of the
//! implementation.
//!
//! # Usage
//!
//! A basic stream yielding numbers. Values are yielded using the `yield`
//! keyword. The stream block must return `()`.
//!
//! ```rust
//! use async_stream::stream;
//!
//! use futures_util::pin_mut;
//! use futures_util::stream::StreamExt;
//!
//! #[tokio::main]
//! async fn main() {
//! let s = stream! {
//! for i in 0..3 {
//! yield i;
//! }
//! };
//!
//! pin_mut!(s); // needed for iteration
//!
//! while let Some(value) = s.next().await {
//! println!("got {}", value);
//! }
//! }
//! ```
//!
//! Streams may be returned by using `impl Stream<Item = T>`:
//!
//! ```rust
//! use async_stream::stream;
//!
//! use futures_core::stream::Stream;
//! use futures_util::pin_mut;
//! use futures_util::stream::StreamExt;
//!
//! fn zero_to_three() -> impl Stream<Item = u32> {
//! stream! {
//! for i in 0..3 {
//! yield i;
//! }
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let s = zero_to_three();
//! pin_mut!(s); // needed for iteration
//!
//! while let Some(value) = s.next().await {
//! println!("got {}", value);
//! }
//! }
//! ```
//!
//! Streams may be implemented in terms of other streams - `async-stream` provides `for await`
//! syntax to assist with this:
//!
//! ```rust
//! use async_stream::stream;
//!
//! use futures_core::stream::Stream;
//! use futures_util::pin_mut;
//! use futures_util::stream::StreamExt;
//!
//! fn zero_to_three() -> impl Stream<Item = u32> {
//! stream! {
//! for i in 0..3 {
//! yield i;
//! }
//! }
//! }
//!
//! fn double<S: Stream<Item = u32>>(input: S)
//! -> impl Stream<Item = u32>
//! {
//! stream! {
//! for await value in input {
//! yield value * 2;
//! }
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let s = double(zero_to_three());
//! pin_mut!(s); // needed for iteration
//!
//! while let Some(value) = s.next().await {
//! println!("got {}", value);
//! }
//! }
//! ```
//!
//! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
//! of the returned stream is `Result` with `Ok` being the value yielded and
//! `Err` the error type returned by `?`.
//!
//! ```rust
//! use tokio::net::{TcpListener, TcpStream};
//!
//! use async_stream::try_stream;
//! use futures_core::stream::Stream;
//!
//! use std::io;
//! use std::net::SocketAddr;
//!
//! fn bind_and_accept(addr: SocketAddr)
//! -> impl Stream<Item = io::Result<TcpStream>>
//! {
//! try_stream! {
//! let mut listener = TcpListener::bind(addr).await?;
//!
//! loop {
//! let (stream, addr) = listener.accept().await?;
//! println!("received on {:?}", addr);
//! yield stream;
//! }
//! }
//! }
//! ```
//!
//! # Implementation
//!
//! The `stream!` and `try_stream!` macros are implemented using proc macros.
//! The macro searches the syntax tree for instances of `sender.send($expr)` and
//! transforms them into `sender.send($expr).await`.
//!
//! The stream uses a lightweight sender to send values from the stream
//! implementation to the caller. When entering the stream, an `Option<T>` is
//! stored on the stack. A pointer to the cell is stored in a thread local and
//! `poll` is called on the async block. When `poll` returns.
//! `sender.send(value)` stores the value that cell and yields back to the
//! caller.
//!
//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
mod async_stream;
mod next;
#[doc(hidden)]
pub mod yielder;
// Used by the macro, but not intended to be accessed publicly.
#[doc(hidden)]
pub use crate::async_stream::AsyncStream;
#[doc(hidden)]
pub use async_stream_impl;
/// Asynchronous stream
///
/// See [crate](index.html) documentation for more details.
///
/// # Examples
///
/// ```
/// use async_stream::stream;
///
/// use futures_util::pin_mut;
/// use futures_util::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let s = stream! {
/// for i in 0..3 {
/// yield i;
/// }
/// };
///
/// pin_mut!(s); // needed for iteration
///
/// while let Some(value) = s.next().await {
/// println!("got {}", value);
/// }
/// }
/// ```
#[macro_export]
macro_rules! stream {
($($tt:tt)*) => {
$crate::async_stream_impl::stream_inner!(($crate) $($tt)*)
}
}
/// Asynchronous fallible stream
///
/// See [crate](index.html) documentation for more details.
///
/// # Examples
///
/// ```
/// use tokio::net::{TcpListener, TcpStream};
///
/// use async_stream::try_stream;
/// use futures_core::stream::Stream;
///
/// use std::io;
/// use std::net::SocketAddr;
///
/// fn bind_and_accept(addr: SocketAddr)
/// -> impl Stream<Item = io::Result<TcpStream>>
/// {
/// try_stream! {
/// let mut listener = TcpListener::bind(addr).await?;
///
/// loop {
/// let (stream, addr) = listener.accept().await?;
/// println!("received on {:?}", addr);
/// yield stream;
/// }
/// }
/// }
/// ```
#[macro_export]
macro_rules! try_stream {
($($tt:tt)*) => {
$crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*)
}
}
#[doc(hidden)]
pub mod reexport {
#[doc(hidden)]
pub use crate::next::next;
}