blob: ba47799084fa411d386205926d1d5325c3d2ad89 [file] [log] [blame]
//! An extension trait for Futures that provides a variety of convenient adapters.
mod with_cancellation_token;
use with_cancellation_token::{WithCancellationTokenFuture, WithCancellationTokenFutureOwned};
use std::future::Future;
use crate::sync::CancellationToken;
/// A trait which contains a variety of convenient adapters and utilities for `Future`s.
pub trait FutureExt: Future {
cfg_time! {
/// A wrapper around [`tokio::time::timeout`], with the advantage that it is easier to write
/// fluent call chains.
///
/// # Examples
///
/// ```rust
/// use tokio::{sync::oneshot, time::Duration};
/// use tokio_util::future::FutureExt;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
///
/// let res = rx.timeout(Duration::from_millis(10)).await;
/// assert!(res.is_err());
/// # }
/// ```
fn timeout(self, timeout: std::time::Duration) -> tokio::time::Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout(timeout, self)
}
/// A wrapper around [`tokio::time::timeout_at`], with the advantage that it is easier to write
/// fluent call chains.
///
/// # Examples
///
/// ```rust
/// use tokio::{sync::oneshot, time::{Duration, Instant}};
/// use tokio_util::future::FutureExt;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
/// let deadline = Instant::now() + Duration::from_millis(10);
///
/// let res = rx.timeout_at(deadline).await;
/// assert!(res.is_err());
/// # }
/// ```
fn timeout_at(self, deadline: tokio::time::Instant) -> tokio::time::Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout_at(deadline, self)
}
}
/// Similar to [`CancellationToken::run_until_cancelled`],
/// but with the advantage that it is easier to write fluent call chains,
/// and biased towards waiting for [`CancellationToken`] to complete.
///
/// # Fairness
///
/// Calling this on an already-cancelled token directly returns `None`.
/// For all subsequent polls, in case of concurrent completion and
/// cancellation, this is biased towards the future completion.
///
/// # Examples
///
/// ```rust
/// use tokio::sync::oneshot;
/// use tokio_util::future::FutureExt;
/// use tokio_util::sync::CancellationToken;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
/// let token = CancellationToken::new();
/// let token_clone = token.clone();
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
/// assert!(rx.with_cancellation_token(&token_clone).await.is_none())
/// # }
/// ```
fn with_cancellation_token(
self,
cancellation_token: &CancellationToken,
) -> WithCancellationTokenFuture<'_, Self>
where
Self: Sized,
{
WithCancellationTokenFuture::new(cancellation_token, self)
}
/// Similar to [`CancellationToken::run_until_cancelled_owned`],
/// but with the advantage that it is easier to write fluent call chains,
/// and biased towards waiting for [`CancellationToken`] to complete.
///
/// # Fairness
///
/// Calling this on an already-cancelled token directly returns `None`.
/// For all subsequent polls, in case of concurrent completion and
/// cancellation, this is biased towards the future completion.
///
/// # Examples
///
/// ```rust
/// use tokio::sync::oneshot;
/// use tokio_util::future::FutureExt;
/// use tokio_util::sync::CancellationToken;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
/// let token = CancellationToken::new();
/// let token_clone = token.clone();
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
/// assert!(rx.with_cancellation_token_owned(token_clone).await.is_none())
/// # }
/// ```
fn with_cancellation_token_owned(
self,
cancellation_token: CancellationToken,
) -> WithCancellationTokenFutureOwned<Self>
where
Self: Sized,
{
WithCancellationTokenFutureOwned::new(cancellation_token, self)
}
}
impl<T: Future + ?Sized> FutureExt for T {}