blob: 718757966648b54295175c0b424102023146ee13 [file] [log] [blame]
//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
//!
//! A channel can be used as a communication primitive between tasks running on
//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
//! handles. `Receiver` implements `Stream` and allows a task to read values
//! out of the channel. If there is no message to read from the channel, the
//! current task will be notified when a new value is sent. `Sender` implements
//! the `Sink` trait and allows a task to send messages into the channel. If
//! the channel is at capacity, then send will be rejected and the task will be
//! notified when additional capacity is available.
//!
//! # Disconnection
//!
//! When all `Sender` handles have been dropped, it is no longer possible to
//! send values into the channel. This is considered the termination event of
//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
//!
//! If the receiver handle is dropped, then messages can no longer be read out
//! of the channel. In this case, a `send` will result in an error.
//!
//! # Clean Shutdown
//!
//! If the `Receiver` is simply dropped, then it is possible for there to be
//! messages still in the channel that will not be processed. As such, it is
//! usually desirable to perform a "clean" shutdown. To do this, the receiver
//! will first call `close`, which will prevent any further messages to be sent
//! into the channel. Then, the receiver consumes the channel to completion, at
//! which point the receiver can be dropped.
// At the core, the channel uses an atomic FIFO queue for message passing. This
// queue is used as the primary coordination primitive. In order to enforce
// capacity limits and handle back pressure, a secondary FIFO queue is used to
// send parked task handles.
//
// The general idea is that the channel is created with a `buffer` size of `n`.
// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
// slot to hold a message. This allows `Sender` to know for a fact that a send
// will succeed *before* starting to do the actual work of sending the value.
// Since most of this work is lock-free, once the work starts, it is impossible
// to safely revert.
//
// If the sender is unable to process a send operation, then the current
// task is parked and the handle is sent on the parked task queue.
//
// Note that the implementation guarantees that the channel capacity will never
// exceed the configured limit, however there is no *strict* guarantee that the
// receiver will wake up a parked task *immediately* when a slot becomes
// available. However, it will almost always unpark a task when a slot becomes
// available and it is *guaranteed* that a sender will be unparked when the
// message that caused the sender to become parked is read out of the channel.
//
// The steps for sending a message are roughly:
//
// 1) Increment the channel message count
// 2) If the channel is at capacity, push the task handle onto the wait queue
// 3) Push the message onto the message queue.
//
// The steps for receiving a message are roughly:
//
// 1) Pop a message from the message queue
// 2) Pop a task handle from the wait queue
// 3) Decrement the channel message count.
//
// It's important for the order of operations on lock-free structures to happen
// in reverse order between the sender and receiver. This makes the message
// queue the primary coordination structure and establishes the necessary
// happens-before semantics required for the acquire / release semantics used
// by the queue structure.
use std::fmt;
use std::error::Error;
use std::any::Any;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::thread;
use std::usize;
use sync::mpsc::queue::{Queue, PopResult};
use sync::oneshot;
use task::{self, Task};
use future::Executor;
use sink::SendAll;
use resultstream::{self, Results};
use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
mod queue;
/// The transmission end of a channel which is used to send values.
///
/// This is created by the `channel` method.
#[derive(Debug)]
pub struct Sender<T> {
// Channel state shared between the sender and receiver.
inner: Arc<Inner<T>>,
// Handle to the task that is blocked on this sender. This handle is sent
// to the receiver half in order to be notified when the sender becomes
// unblocked.
sender_task: Arc<Mutex<SenderTask>>,
// True if the sender might be blocked. This is an optimization to avoid
// having to lock the mutex most of the time.
maybe_parked: bool,
}
/// The transmission end of a channel which is used to send values.
///
/// This is created by the `unbounded` method.
#[derive(Debug)]
pub struct UnboundedSender<T>(Sender<T>);
trait AssertKinds: Send + Sync + Clone {}
impl AssertKinds for UnboundedSender<u32> {}
/// The receiving end of a channel which implements the `Stream` trait.
///
/// This is a concrete implementation of a stream which can be used to represent
/// a stream of values being computed elsewhere. This is created by the
/// `channel` method.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
/// The receiving end of a channel which implements the `Stream` trait.
///
/// This is a concrete implementation of a stream which can be used to represent
/// a stream of values being computed elsewhere. This is created by the
/// `unbounded` method.
#[derive(Debug)]
pub struct UnboundedReceiver<T>(Receiver<T>);
/// Error type for sending, used when the receiving end of a channel is
/// dropped
#[derive(Clone, PartialEq, Eq)]
pub struct SendError<T>(T);
/// Error type returned from `try_send`
#[derive(Clone, PartialEq, Eq)]
pub struct TrySendError<T> {
kind: TrySendErrorKind<T>,
}
#[derive(Clone, PartialEq, Eq)]
enum TrySendErrorKind<T> {
Full(T),
Disconnected(T),
}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SendError")
.field(&"...")
.finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T: Any> Error for SendError<T>
{
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("TrySendError")
.field(&"...")
.finish()
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if self.is_full() {
write!(fmt, "send failed because channel is full")
} else {
write!(fmt, "send failed because receiver is gone")
}
}
}
impl<T: Any> Error for TrySendError<T> {
fn description(&self) -> &str {
if self.is_full() {
"send failed because channel is full"
} else {
"send failed because receiver is gone"
}
}
}
impl<T> TrySendError<T> {
/// Returns true if this error is a result of the channel being full
pub fn is_full(&self) -> bool {
use self::TrySendErrorKind::*;
match self.kind {
Full(_) => true,
_ => false,
}
}
/// Returns true if this error is a result of the receiver being dropped
pub fn is_disconnected(&self) -> bool {
use self::TrySendErrorKind::*;
match self.kind {
Disconnected(_) => true,
_ => false,
}
}
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
use self::TrySendErrorKind::*;
match self.kind {
Full(v) | Disconnected(v) => v,
}
}
}
#[derive(Debug)]
struct Inner<T> {
// Max buffer size of the channel. If `None` then the channel is unbounded.
buffer: Option<usize>,
// Internal channel state. Consists of the number of messages stored in the
// channel as well as a flag signalling that the channel is closed.
state: AtomicUsize,
// Atomic, FIFO queue used to send messages to the receiver
message_queue: Queue<Option<T>>,
// Atomic, FIFO queue used to send parked task handles to the receiver.
parked_queue: Queue<Arc<Mutex<SenderTask>>>,
// Number of senders in existence
num_senders: AtomicUsize,
// Handle to the receiver's task.
recv_task: Mutex<ReceiverTask>,
}
// Struct representation of `Inner::state`.
#[derive(Debug, Clone, Copy)]
struct State {
// `true` when the channel is open
is_open: bool,
// Number of messages in the channel
num_messages: usize,
}
#[derive(Debug)]
struct ReceiverTask {
unparked: bool,
task: Option<Task>,
}
// Returned from Receiver::try_park()
enum TryPark {
Parked,
Closed,
NotEmpty,
}
// The `is_open` flag is stored in the left-most bit of `Inner::state`
const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
// When a new channel is created, it is created in the open state with no
// pending messages.
const INIT_STATE: usize = OPEN_MASK;
// The maximum number of messages that a channel can track is `usize::MAX >> 1`
const MAX_CAPACITY: usize = !(OPEN_MASK);
// The maximum requested buffer size must be less than the maximum capacity of
// a channel. This is because each sender gets a guaranteed slot.
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
// Sent to the consumer to wake up blocked producers
#[derive(Debug)]
struct SenderTask {
task: Option<Task>,
is_parked: bool,
}
impl SenderTask {
fn new() -> Self {
SenderTask {
task: None,
is_parked: false,
}
}
fn notify(&mut self) {
self.is_parked = false;
if let Some(task) = self.task.take() {
task.notify();
}
}
}
/// Creates an in-memory channel implementation of the `Stream` trait with
/// bounded capacity.
///
/// This method creates a concrete implementation of the `Stream` trait which
/// can be used to send values across threads in a streaming fashion. This
/// channel is unique in that it implements back pressure to ensure that the
/// sender never outpaces the receiver. The channel capacity is equal to
/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
/// in the channel capacity, and on top of that there are `buffer` "first come,
/// first serve" slots available to all senders.
///
/// The `Receiver` returned implements the `Stream` trait and has access to any
/// number of the associated combinators for transforming the result.
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
// Check that the requested buffer size does not exceed the maximum buffer
// size permitted by the system.
assert!(buffer < MAX_BUFFER, "requested buffer size too large");
channel2(Some(buffer))
}
/// Creates an in-memory channel implementation of the `Stream` trait with
/// unbounded capacity.
///
/// This method creates a concrete implementation of the `Stream` trait which
/// can be used to send values across threads in a streaming fashion. A `send`
/// on this channel will always succeed as long as the receive half has not
/// been closed. If the receiver falls behind, messages will be buffered
/// internally.
///
/// **Note** that the amount of available system memory is an implicit bound to
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = channel2(None);
(UnboundedSender(tx), UnboundedReceiver(rx))
}
fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
buffer: buffer,
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
parked_queue: Queue::new(),
num_senders: AtomicUsize::new(1),
recv_task: Mutex::new(ReceiverTask {
unparked: false,
task: None,
}),
});
let tx = Sender {
inner: inner.clone(),
sender_task: Arc::new(Mutex::new(SenderTask::new())),
maybe_parked: false,
};
let rx = Receiver {
inner: inner,
};
(tx, rx)
}
/*
*
* ===== impl Sender =====
*
*/
impl<T> Sender<T> {
/// Attempts to send a message on this `Sender<T>` without blocking.
///
/// This function, unlike `start_send`, is safe to call whether it's being
/// called on a task or not. Note that this function, however, will *not*
/// attempt to block the current task if the message cannot be sent.
///
/// It is not recommended to call this function from inside of a future,
/// only from an external thread where you've otherwise arranged to be
/// notified when the channel is no longer full.
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(false).is_ready() {
return Err(TrySendError {
kind: TrySendErrorKind::Full(msg),
});
}
// The channel has capacity to accept the message, so send it
self.do_send(Some(msg), false)
.map_err(|SendError(v)| {
TrySendError {
kind: TrySendErrorKind::Disconnected(v),
}
})
}
// Do the send without failing
// None means close
fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
// First, increment the number of messages contained by the channel.
// This operation will also atomically determine if the sender task
// should be parked.
//
// None is returned in the case that the channel has been closed by the
// receiver. This happens when `Receiver::close` is called or the
// receiver is dropped.
let park_self = match self.inc_num_messages(msg.is_none()) {
Some(park_self) => park_self,
None => {
// The receiver has closed the channel. Only abort if actually
// sending a message. It is important that the stream
// termination (None) is always sent. This technically means
// that it is possible for the queue to contain the following
// number of messages:
//
// num-senders + buffer + 1
//
if let Some(msg) = msg {
return Err(SendError(msg));
} else {
return Ok(());
}
}
};
// If the channel has reached capacity, then the sender task needs to
// be parked. This will send the task handle on the parked task queue.
//
// However, when `do_send` is called while dropping the `Sender`,
// `task::current()` can't be called safely. In this case, in order to
// maintain internal consistency, a blank message is pushed onto the
// parked task queue.
if park_self {
self.park(do_park);
}
self.queue_push_and_signal(msg);
Ok(())
}
// Do the send without parking current task.
//
// To be called from unbounded sender.
fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
match self.inc_num_messages(false) {
Some(park_self) => assert!(!park_self),
None => return Err(SendError(msg)),
};
self.queue_push_and_signal(Some(msg));
Ok(())
}
// Push message to the queue and signal to the receiver
fn queue_push_and_signal(&self, msg: Option<T>) {
// Push the message onto the message queue
self.inner.message_queue.push(msg);
// Signal to the receiver that a message has been enqueued. If the
// receiver is parked, this will unpark the task.
self.signal();
}
// Increment the number of queued messages. Returns if the sender should
// block.
fn inc_num_messages(&self, close: bool) -> Option<bool> {
let mut curr = self.inner.state.load(SeqCst);
loop {
let mut state = decode_state(curr);
// The receiver end closed the channel.
if !state.is_open {
return None;
}
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
sending this messages would overflow the state");
state.num_messages += 1;
// The channel is closed by all sender handles being dropped.
if close {
state.is_open = false;
}
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => {
// Block if the current number of pending messages has exceeded
// the configured buffer size
let park_self = match self.inner.buffer {
Some(buffer) => state.num_messages > buffer,
None => false,
};
return Some(park_self)
}
Err(actual) => curr = actual,
}
}
}
// Signal to the receiver task that a message has been enqueued
fn signal(&self) {
// TODO
// This logic can probably be improved by guarding the lock with an
// atomic.
//
// Do this step first so that the lock is dropped when
// `unpark` is called
let task = {
let mut recv_task = self.inner.recv_task.lock().unwrap();
// If the receiver has already been unparked, then there is nothing
// more to do
if recv_task.unparked {
return;
}
// Setting this flag enables the receiving end to detect that
// an unpark event happened in order to avoid unnecessarily
// parking.
recv_task.unparked = true;
recv_task.task.take()
};
if let Some(task) = task {
task.notify();
}
}
fn park(&mut self, can_park: bool) {
// TODO: clean up internal state if the task::current will fail
let task = if can_park {
Some(task::current())
} else {
None
};
{
let mut sender = self.sender_task.lock().unwrap();
sender.task = task;
sender.is_parked = true;
}
// Send handle over queue
let t = self.sender_task.clone();
self.inner.parked_queue.push(t);
// Check to make sure we weren't closed after we sent our task on the
// queue
let state = decode_state(self.inner.state.load(SeqCst));
self.maybe_parked = state.is_open;
}
/// Polls the channel to determine if there is guaranteed to be capacity to send at least one
/// item without waiting.
///
/// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
/// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
/// `Err(SendError(_))` if the receiver has been dropped.
///
/// # Panics
///
/// This method will panic if called from outside the context of a task or future.
pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Err(SendError(()));
}
Ok(self.poll_unparked(true))
}
fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked {
// Get a lock on the task handle
let mut task = self.sender_task.lock().unwrap();
if !task.is_parked {
self.maybe_parked = false;
return Async::Ready(())
}
// At this point, an unpark request is pending, so there will be an
// unpark sometime in the future. We just need to make sure that
// the correct task will be notified.
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = if do_park {
Some(task::current())
} else {
None
};
Async::NotReady
} else {
Async::Ready(())
}
}
}
impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
// If the sender is currently blocked, reject the message before doing
// any work.
if !self.poll_unparked(true).is_ready() {
return Ok(AsyncSink::NotReady(msg));
}
// The channel has capacity to accept the message, so send it.
self.do_send(Some(msg), true)?;
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<T> UnboundedSender<T> {
/// Sends the provided message along this channel.
///
/// This is an unbounded sender, so this function differs from `Sink::send`
/// by ensuring the return type reflects that the channel is always ready to
/// receive messages.
#[deprecated(note = "renamed to `unbounded_send`")]
#[doc(hidden)]
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.unbounded_send(msg)
}
/// Sends the provided message along this channel.
///
/// This is an unbounded sender, so this function differs from `Sink::send`
/// by ensuring the return type reflects that the channel is always ready to
/// receive messages.
pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
self.0.do_send_nb(msg)
}
}
impl<T> Sink for UnboundedSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
self.0.start_send(msg)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
self.0.poll_complete()
}
fn close(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<'a, T> Sink for &'a UnboundedSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
self.0.do_send_nb(msg)?;
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> UnboundedSender<T> {
UnboundedSender(self.0.clone())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
// Since this atomic op isn't actually guarding any memory and we don't
// care about any orderings besides the ordering on the single atomic
// variable, a relaxed ordering is acceptable.
let mut curr = self.inner.num_senders.load(SeqCst);
loop {
// If the maximum number of senders has been reached, then fail
if curr == self.inner.max_senders() {
panic!("cannot clone `Sender` -- too many outstanding senders");
}
debug_assert!(curr < self.inner.max_senders());
let next = curr + 1;
let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
// The ABA problem doesn't matter here. We only care that the
// number of senders never exceeds the maximum.
if actual == curr {
return Sender {
inner: self.inner.clone(),
sender_task: Arc::new(Mutex::new(SenderTask::new())),
maybe_parked: false,
};
}
curr = actual;
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// Ordering between variables don't matter here
let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
if prev == 1 {
let _ = self.do_send(None, false);
}
}
}
/*
*
* ===== impl Receiver =====
*
*/
impl<T> Receiver<T> {
/// Closes the receiving half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
let mut curr = self.inner.state.load(SeqCst);
loop {
let mut state = decode_state(curr);
if !state.is_open {
break
}
state.is_open = false;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => break,
Err(actual) => curr = actual,
}
}
// Wake up any threads waiting as they'll see that we've closed the
// channel and will continue on their merry way.
loop {
match unsafe { self.inner.parked_queue.pop() } {
PopResult::Data(task) => {
task.lock().unwrap().notify();
}
PopResult::Empty => break,
PopResult::Inconsistent => thread::yield_now(),
}
}
}
fn next_message(&mut self) -> Async<Option<T>> {
// Pop off a message
loop {
match unsafe { self.inner.message_queue.pop() } {
PopResult::Data(msg) => {
return Async::Ready(msg);
}
PopResult::Empty => {
// The queue is empty, return NotReady
return Async::NotReady;
}
PopResult::Inconsistent => {
// Inconsistent means that there will be a message to pop
// in a short time. This branch can only be reached if
// values are being produced from another thread, so there
// are a few ways that we can deal with this:
//
// 1) Spin
// 2) thread::yield_now()
// 3) task::current().unwrap() & return NotReady
//
// For now, thread::yield_now() is used, but it would
// probably be better to spin a few times then yield.
thread::yield_now();
}
}
}
}
// Unpark a single task handle if there is one pending in the parked queue
fn unpark_one(&mut self) {
loop {
match unsafe { self.inner.parked_queue.pop() } {
PopResult::Data(task) => {
task.lock().unwrap().notify();
return;
}
PopResult::Empty => {
// Queue empty, no task to wake up.
return;
}
PopResult::Inconsistent => {
// Same as above
thread::yield_now();
}
}
}
}
// Try to park the receiver task
fn try_park(&self) -> TryPark {
let curr = self.inner.state.load(SeqCst);
let state = decode_state(curr);
// If the channel is closed, then there is no need to park.
if !state.is_open && state.num_messages == 0 {
return TryPark::Closed;
}
// First, track the task in the `recv_task` slot
let mut recv_task = self.inner.recv_task.lock().unwrap();
if recv_task.unparked {
// Consume the `unpark` signal without actually parking
recv_task.unparked = false;
return TryPark::NotEmpty;
}
recv_task.task = Some(task::current());
TryPark::Parked
}
fn dec_num_messages(&self) {
let mut curr = self.inner.state.load(SeqCst);
loop {
let mut state = decode_state(curr);
state.num_messages -= 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => break,
Err(actual) => curr = actual,
}
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<T>, ()> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Async::Ready(msg) => msg,
Async::NotReady => {
// There are no messages to read, in this case, attempt to
// park. The act of parking will verify that the channel is
// still empty after the park operation has completed.
match self.try_park() {
TryPark::Parked => {
// The task was parked, and the channel is still
// empty, return NotReady.
return Ok(Async::NotReady);
}
TryPark::Closed => {
// The channel is closed, there will be no further
// messages.
return Ok(Async::Ready(None));
}
TryPark::NotEmpty => {
// A message has been sent while attempting to
// park. Loop again, the next iteration is
// guaranteed to get the message.
continue;
}
}
}
};
// If there are any parked task handles in the parked queue, pop
// one and unpark it.
self.unpark_one();
// Decrement number of messages
self.dec_num_messages();
// Return the message
return Ok(Async::Ready(msg));
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Drain the channel of all pending messages
self.close();
while self.next_message().is_ready() {
// ...
}
}
}
impl<T> UnboundedReceiver<T> {
/// Closes the receiving half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.0.close();
}
}
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<T>, ()> {
self.0.poll()
}
}
/// Handle returned from the `spawn` function.
///
/// This handle is a stream that proxies a stream on a separate `Executor`.
/// Created through the `mpsc::spawn` function, this handle will produce
/// the same values as the proxied stream, as they are produced in the executor,
/// and uses a limited buffer to exert back-pressure on the remote stream.
///
/// If this handle is dropped, then the stream will no longer be polled and is
/// scheduled to be dropped.
pub struct SpawnHandle<Item, Error> {
rx: Receiver<Result<Item, Error>>,
_cancel_tx: oneshot::Sender<()>,
}
/// Type of future which `Executor` instances must be able to execute for `spawn`.
pub struct Execute<S: Stream> {
inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>,
cancel_rx: oneshot::Receiver<()>,
}
/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
/// returning a handle representing the remote stream.
///
/// The `stream` will be canceled if the `SpawnHandle` is dropped.
///
/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
/// When `stream` has additional items available, then the `SpawnHandle`
/// will have those same items available.
///
/// At most `buffer + 1` elements will be buffered at a time. If the buffer
/// is full, then `stream` will stop progressing until more space is available.
/// This allows the `SpawnHandle` to exert backpressure on the `stream`.
///
/// # Panics
///
/// This function will panic if `executor` is unable spawn a `Future` containing
/// the entirety of the `stream`.
pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
where S: Stream,
E: Executor<Execute<S>>
{
let (cancel_tx, cancel_rx) = oneshot::channel();
let (tx, rx) = channel(buffer);
executor.execute(Execute {
inner: tx.send_all(resultstream::new(stream)),
cancel_rx: cancel_rx,
}).expect("failed to spawn stream");
SpawnHandle {
rx: rx,
_cancel_tx: cancel_tx,
}
}
/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
/// returning a handle representing the remote stream, with unbounded buffering.
///
/// The `stream` will be canceled if the `SpawnHandle` is dropped.
///
/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
/// When `stream` has additional items available, then the `SpawnHandle`
/// will have those same items available.
///
/// An unbounded buffer is used, which means that values will be buffered as
/// fast as `stream` can produce them, without any backpressure. Therefore, if
/// `stream` is an infinite stream, it can use an unbounded amount of memory, and
/// potentially hog CPU resources.
///
/// # Panics
///
/// This function will panic if `executor` is unable spawn a `Future` containing
/// the entirety of the `stream`.
pub fn spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
where S: Stream,
E: Executor<Execute<S>>
{
let (cancel_tx, cancel_rx) = oneshot::channel();
let (tx, rx) = channel2(None);
executor.execute(Execute {
inner: tx.send_all(resultstream::new(stream)),
cancel_rx: cancel_rx,
}).expect("failed to spawn stream");
SpawnHandle {
rx: rx,
_cancel_tx: cancel_tx,
}
}
impl<I, E> Stream for SpawnHandle<I, E> {
type Item = I;
type Error = E;
fn poll(&mut self) -> Poll<Option<I>, E> {
match self.rx.poll() {
Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
Ok(Async::Ready(Some(Err(e)))) => Err(e),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => unreachable!("mpsc::Receiver should never return Err"),
}
}
}
impl<I, E> fmt::Debug for SpawnHandle<I, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SpawnHandle")
.finish()
}
}
impl<S: Stream> Future for Execute<S> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.cancel_rx.poll() {
Ok(Async::NotReady) => (),
_ => return Ok(Async::Ready(())),
}
match self.inner.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
_ => Ok(Async::Ready(()))
}
}
}
impl<S: Stream> fmt::Debug for Execute<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Execute")
.finish()
}
}
/*
*
* ===== impl Inner =====
*
*/
impl<T> Inner<T> {
// The return value is such that the total number of messages that can be
// enqueued into the channel will never exceed MAX_CAPACITY
fn max_senders(&self) -> usize {
match self.buffer {
Some(buffer) => MAX_CAPACITY - buffer,
None => MAX_BUFFER,
}
}
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
/*
*
* ===== Helpers =====
*
*/
fn decode_state(num: usize) -> State {
State {
is_open: num & OPEN_MASK == OPEN_MASK,
num_messages: num & MAX_CAPACITY,
}
}
fn encode_state(state: &State) -> usize {
let mut num = state.num_messages;
if state.is_open {
num |= OPEN_MASK;
}
num
}