|  | //! A multi-producer, single-consumer queue for sending values across | 
|  | //! asynchronous tasks. | 
|  | //! | 
|  | //! Similarly to the `std`, channel creation provides [`Receiver`] and | 
|  | //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to | 
|  | //! read values out of the channel. If there is no message to read from the | 
|  | //! channel, the current task will be notified when a new value is sent. | 
|  | //! [`Sender`] implements the `Sink` trait and allows a task to send messages into | 
|  | //! the channel. If the channel is at capacity, the send will be rejected and | 
|  | //! the task will be notified when additional capacity is available. In other | 
|  | //! words, the channel provides backpressure. | 
|  | //! | 
|  | //! Unbounded channels are also available using the `unbounded` constructor. | 
|  | //! | 
|  | //! # Disconnection | 
|  | //! | 
|  | //! When all [`Sender`] handles have been dropped, it is no longer | 
|  | //! possible to send values into the channel. This is considered the termination | 
|  | //! event of the stream. As such, [`Receiver::poll_next`] | 
|  | //! will return `Ok(Ready(None))`. | 
|  | //! | 
|  | //! If the [`Receiver`] handle is dropped, then messages can no longer | 
|  | //! be read out of the channel. In this case, all further attempts to send will | 
|  | //! result in an error. | 
|  | //! | 
|  | //! # Clean Shutdown | 
|  | //! | 
|  | //! If the [`Receiver`] is simply dropped, then it is possible for | 
|  | //! there to be messages still in the channel that will not be processed. As | 
|  | //! such, it is usually desirable to perform a "clean" shutdown. To do this, the | 
|  | //! receiver will first call `close`, which will prevent any further messages to | 
|  | //! be sent into the channel. Then, the receiver consumes the channel to | 
|  | //! completion, at which point the receiver can be dropped. | 
|  | //! | 
|  | //! [`Sender`]: struct.Sender.html | 
|  | //! [`Receiver`]: struct.Receiver.html | 
|  | //! [`Stream`]: ../../futures_core/stream/trait.Stream.html | 
|  | //! [`Receiver::poll_next`]: | 
|  | //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next | 
|  |  | 
|  | // At the core, the channel uses an atomic FIFO queue for message passing. This | 
|  | // queue is used as the primary coordination primitive. In order to enforce | 
|  | // capacity limits and handle back pressure, a secondary FIFO queue is used to | 
|  | // send parked task handles. | 
|  | // | 
|  | // The general idea is that the channel is created with a `buffer` size of `n`. | 
|  | // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" | 
|  | // slot to hold a message. This allows `Sender` to know for a fact that a send | 
|  | // will succeed *before* starting to do the actual work of sending the value. | 
|  | // Since most of this work is lock-free, once the work starts, it is impossible | 
|  | // to safely revert. | 
|  | // | 
|  | // If the sender is unable to process a send operation, then the current | 
|  | // task is parked and the handle is sent on the parked task queue. | 
|  | // | 
|  | // Note that the implementation guarantees that the channel capacity will never | 
|  | // exceed the configured limit, however there is no *strict* guarantee that the | 
|  | // receiver will wake up a parked task *immediately* when a slot becomes | 
|  | // available. However, it will almost always unpark a task when a slot becomes | 
|  | // available and it is *guaranteed* that a sender will be unparked when the | 
|  | // message that caused the sender to become parked is read out of the channel. | 
|  | // | 
|  | // The steps for sending a message are roughly: | 
|  | // | 
|  | // 1) Increment the channel message count | 
|  | // 2) If the channel is at capacity, push the task handle onto the wait queue | 
|  | // 3) Push the message onto the message queue. | 
|  | // | 
|  | // The steps for receiving a message are roughly: | 
|  | // | 
|  | // 1) Pop a message from the message queue | 
|  | // 2) Pop a task handle from the wait queue | 
|  | // 3) Decrement the channel message count. | 
|  | // | 
|  | // It's important for the order of operations on lock-free structures to happen | 
|  | // in reverse order between the sender and receiver. This makes the message | 
|  | // queue the primary coordination structure and establishes the necessary | 
|  | // happens-before semantics required for the acquire / release semantics used | 
|  | // by the queue structure. | 
|  |  | 
|  | use futures_core::stream::{FusedStream, Stream}; | 
|  | use futures_core::task::__internal::AtomicWaker; | 
|  | use futures_core::task::{Context, Poll, Waker}; | 
|  | use std::fmt; | 
|  | use std::pin::Pin; | 
|  | use std::sync::atomic::AtomicUsize; | 
|  | use std::sync::atomic::Ordering::SeqCst; | 
|  | use std::sync::{Arc, Mutex}; | 
|  | use std::thread; | 
|  |  | 
|  | use crate::mpsc::queue::Queue; | 
|  |  | 
|  | mod queue; | 
|  | #[cfg(feature = "sink")] | 
|  | mod sink_impl; | 
|  |  | 
|  | struct UnboundedSenderInner<T> { | 
|  | // Channel state shared between the sender and receiver. | 
|  | inner: Arc<UnboundedInner<T>>, | 
|  | } | 
|  |  | 
|  | struct BoundedSenderInner<T> { | 
|  | // Channel state shared between the sender and receiver. | 
|  | inner: Arc<BoundedInner<T>>, | 
|  |  | 
|  | // Handle to the task that is blocked on this sender. This handle is sent | 
|  | // to the receiver half in order to be notified when the sender becomes | 
|  | // unblocked. | 
|  | sender_task: Arc<Mutex<SenderTask>>, | 
|  |  | 
|  | // `true` if the sender might be blocked. This is an optimization to avoid | 
|  | // having to lock the mutex most of the time. | 
|  | maybe_parked: bool, | 
|  | } | 
|  |  | 
|  | // We never project Pin<&mut SenderInner> to `Pin<&mut T>` | 
|  | impl<T> Unpin for UnboundedSenderInner<T> {} | 
|  | impl<T> Unpin for BoundedSenderInner<T> {} | 
|  |  | 
|  | /// The transmission end of a bounded mpsc channel. | 
|  | /// | 
|  | /// This value is created by the [`channel`] function. | 
|  | pub struct Sender<T>(Option<BoundedSenderInner<T>>); | 
|  |  | 
|  | /// The transmission end of an unbounded mpsc channel. | 
|  | /// | 
|  | /// This value is created by the [`unbounded`] function. | 
|  | pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); | 
|  |  | 
|  | #[allow(dead_code)] | 
|  | trait AssertKinds: Send + Sync + Clone {} | 
|  | impl AssertKinds for UnboundedSender<u32> {} | 
|  |  | 
|  | /// The receiving end of a bounded mpsc channel. | 
|  | /// | 
|  | /// This value is created by the [`channel`] function. | 
|  | pub struct Receiver<T> { | 
|  | inner: Option<Arc<BoundedInner<T>>>, | 
|  | } | 
|  |  | 
|  | /// The receiving end of an unbounded mpsc channel. | 
|  | /// | 
|  | /// This value is created by the [`unbounded`] function. | 
|  | pub struct UnboundedReceiver<T> { | 
|  | inner: Option<Arc<UnboundedInner<T>>>, | 
|  | } | 
|  |  | 
|  | // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` | 
|  | impl<T> Unpin for UnboundedReceiver<T> {} | 
|  |  | 
|  | /// The error type for [`Sender`s](Sender) used as `Sink`s. | 
|  | #[derive(Clone, Debug, PartialEq, Eq)] | 
|  | pub struct SendError { | 
|  | kind: SendErrorKind, | 
|  | } | 
|  |  | 
|  | /// The error type returned from [`try_send`](Sender::try_send). | 
|  | #[derive(Clone, PartialEq, Eq)] | 
|  | pub struct TrySendError<T> { | 
|  | err: SendError, | 
|  | val: T, | 
|  | } | 
|  |  | 
|  | #[derive(Clone, Debug, PartialEq, Eq)] | 
|  | enum SendErrorKind { | 
|  | Full, | 
|  | Disconnected, | 
|  | } | 
|  |  | 
|  | /// The error type returned from [`try_next`](Receiver::try_next). | 
|  | pub struct TryRecvError { | 
|  | _priv: (), | 
|  | } | 
|  |  | 
|  | impl fmt::Display for SendError { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | if self.is_full() { | 
|  | write!(f, "send failed because channel is full") | 
|  | } else { | 
|  | write!(f, "send failed because receiver is gone") | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl std::error::Error for SendError {} | 
|  |  | 
|  | impl SendError { | 
|  | /// Returns `true` if this error is a result of the channel being full. | 
|  | pub fn is_full(&self) -> bool { | 
|  | match self.kind { | 
|  | SendErrorKind::Full => true, | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Returns `true` if this error is a result of the receiver being dropped. | 
|  | pub fn is_disconnected(&self) -> bool { | 
|  | match self.kind { | 
|  | SendErrorKind::Disconnected => true, | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Debug for TrySendError<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Display for TrySendError<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | if self.is_full() { | 
|  | write!(f, "send failed because channel is full") | 
|  | } else { | 
|  | write!(f, "send failed because receiver is gone") | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T: core::any::Any> std::error::Error for TrySendError<T> {} | 
|  |  | 
|  | impl<T> TrySendError<T> { | 
|  | /// Returns `true` if this error is a result of the channel being full. | 
|  | pub fn is_full(&self) -> bool { | 
|  | self.err.is_full() | 
|  | } | 
|  |  | 
|  | /// Returns `true` if this error is a result of the receiver being dropped. | 
|  | pub fn is_disconnected(&self) -> bool { | 
|  | self.err.is_disconnected() | 
|  | } | 
|  |  | 
|  | /// Returns the message that was attempted to be sent but failed. | 
|  | pub fn into_inner(self) -> T { | 
|  | self.val | 
|  | } | 
|  |  | 
|  | /// Drops the message and converts into a `SendError`. | 
|  | pub fn into_send_error(self) -> SendError { | 
|  | self.err | 
|  | } | 
|  | } | 
|  |  | 
|  | impl fmt::Debug for TryRecvError { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | f.debug_tuple("TryRecvError").finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | impl fmt::Display for TryRecvError { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | write!(f, "receiver channel is empty") | 
|  | } | 
|  | } | 
|  |  | 
|  | impl std::error::Error for TryRecvError {} | 
|  |  | 
|  | struct UnboundedInner<T> { | 
|  | // Internal channel state. Consists of the number of messages stored in the | 
|  | // channel as well as a flag signalling that the channel is closed. | 
|  | state: AtomicUsize, | 
|  |  | 
|  | // Atomic, FIFO queue used to send messages to the receiver | 
|  | message_queue: Queue<T>, | 
|  |  | 
|  | // Number of senders in existence | 
|  | num_senders: AtomicUsize, | 
|  |  | 
|  | // Handle to the receiver's task. | 
|  | recv_task: AtomicWaker, | 
|  | } | 
|  |  | 
|  | struct BoundedInner<T> { | 
|  | // Max buffer size of the channel. If `None` then the channel is unbounded. | 
|  | buffer: usize, | 
|  |  | 
|  | // Internal channel state. Consists of the number of messages stored in the | 
|  | // channel as well as a flag signalling that the channel is closed. | 
|  | state: AtomicUsize, | 
|  |  | 
|  | // Atomic, FIFO queue used to send messages to the receiver | 
|  | message_queue: Queue<T>, | 
|  |  | 
|  | // Atomic, FIFO queue used to send parked task handles to the receiver. | 
|  | parked_queue: Queue<Arc<Mutex<SenderTask>>>, | 
|  |  | 
|  | // Number of senders in existence | 
|  | num_senders: AtomicUsize, | 
|  |  | 
|  | // Handle to the receiver's task. | 
|  | recv_task: AtomicWaker, | 
|  | } | 
|  |  | 
|  | // Struct representation of `Inner::state`. | 
|  | #[derive(Clone, Copy)] | 
|  | struct State { | 
|  | // `true` when the channel is open | 
|  | is_open: bool, | 
|  |  | 
|  | // Number of messages in the channel | 
|  | num_messages: usize, | 
|  | } | 
|  |  | 
|  | // The `is_open` flag is stored in the left-most bit of `Inner::state` | 
|  | const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); | 
|  |  | 
|  | // When a new channel is created, it is created in the open state with no | 
|  | // pending messages. | 
|  | const INIT_STATE: usize = OPEN_MASK; | 
|  |  | 
|  | // The maximum number of messages that a channel can track is `usize::MAX >> 1` | 
|  | const MAX_CAPACITY: usize = !(OPEN_MASK); | 
|  |  | 
|  | // The maximum requested buffer size must be less than the maximum capacity of | 
|  | // a channel. This is because each sender gets a guaranteed slot. | 
|  | const MAX_BUFFER: usize = MAX_CAPACITY >> 1; | 
|  |  | 
|  | // Sent to the consumer to wake up blocked producers | 
|  | struct SenderTask { | 
|  | task: Option<Waker>, | 
|  | is_parked: bool, | 
|  | } | 
|  |  | 
|  | impl SenderTask { | 
|  | fn new() -> Self { | 
|  | Self { task: None, is_parked: false } | 
|  | } | 
|  |  | 
|  | fn notify(&mut self) { | 
|  | self.is_parked = false; | 
|  |  | 
|  | if let Some(task) = self.task.take() { | 
|  | task.wake(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Creates a bounded mpsc channel for communicating between asynchronous tasks. | 
|  | /// | 
|  | /// Being bounded, this channel provides backpressure to ensure that the sender | 
|  | /// outpaces the receiver by only a limited amount. The channel's capacity is | 
|  | /// equal to `buffer + num-senders`. In other words, each sender gets a | 
|  | /// guaranteed slot in the channel capacity, and on top of that there are | 
|  | /// `buffer` "first come, first serve" slots available to all senders. | 
|  | /// | 
|  | /// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`] | 
|  | /// implements `Sink`. | 
|  | pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { | 
|  | // Check that the requested buffer size does not exceed the maximum buffer | 
|  | // size permitted by the system. | 
|  | assert!(buffer < MAX_BUFFER, "requested buffer size too large"); | 
|  |  | 
|  | let inner = Arc::new(BoundedInner { | 
|  | buffer, | 
|  | state: AtomicUsize::new(INIT_STATE), | 
|  | message_queue: Queue::new(), | 
|  | parked_queue: Queue::new(), | 
|  | num_senders: AtomicUsize::new(1), | 
|  | recv_task: AtomicWaker::new(), | 
|  | }); | 
|  |  | 
|  | let tx = BoundedSenderInner { | 
|  | inner: inner.clone(), | 
|  | sender_task: Arc::new(Mutex::new(SenderTask::new())), | 
|  | maybe_parked: false, | 
|  | }; | 
|  |  | 
|  | let rx = Receiver { inner: Some(inner) }; | 
|  |  | 
|  | (Sender(Some(tx)), rx) | 
|  | } | 
|  |  | 
|  | /// Creates an unbounded mpsc channel for communicating between asynchronous | 
|  | /// tasks. | 
|  | /// | 
|  | /// A `send` on this channel will always succeed as long as the receive half has | 
|  | /// not been closed. If the receiver falls behind, messages will be arbitrarily | 
|  | /// buffered. | 
|  | /// | 
|  | /// **Note** that the amount of available system memory is an implicit bound to | 
|  | /// the channel. Using an `unbounded` channel has the ability of causing the | 
|  | /// process to run out of memory. In this case, the process will be aborted. | 
|  | pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { | 
|  | let inner = Arc::new(UnboundedInner { | 
|  | state: AtomicUsize::new(INIT_STATE), | 
|  | message_queue: Queue::new(), | 
|  | num_senders: AtomicUsize::new(1), | 
|  | recv_task: AtomicWaker::new(), | 
|  | }); | 
|  |  | 
|  | let tx = UnboundedSenderInner { inner: inner.clone() }; | 
|  |  | 
|  | let rx = UnboundedReceiver { inner: Some(inner) }; | 
|  |  | 
|  | (UnboundedSender(Some(tx)), rx) | 
|  | } | 
|  |  | 
|  | /* | 
|  | * | 
|  | * ===== impl Sender ===== | 
|  | * | 
|  | */ | 
|  |  | 
|  | impl<T> UnboundedSenderInner<T> { | 
|  | fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { | 
|  | let state = decode_state(self.inner.state.load(SeqCst)); | 
|  | if state.is_open { | 
|  | Poll::Ready(Ok(())) | 
|  | } else { | 
|  | Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Push message to the queue and signal to the receiver | 
|  | fn queue_push_and_signal(&self, msg: T) { | 
|  | // Push the message onto the message queue | 
|  | self.inner.message_queue.push(msg); | 
|  |  | 
|  | // Signal to the receiver that a message has been enqueued. If the | 
|  | // receiver is parked, this will unpark the task. | 
|  | self.inner.recv_task.wake(); | 
|  | } | 
|  |  | 
|  | // Increment the number of queued messages. Returns the resulting number. | 
|  | fn inc_num_messages(&self) -> Option<usize> { | 
|  | let mut curr = self.inner.state.load(SeqCst); | 
|  |  | 
|  | loop { | 
|  | let mut state = decode_state(curr); | 
|  |  | 
|  | // The receiver end closed the channel. | 
|  | if !state.is_open { | 
|  | return None; | 
|  | } | 
|  |  | 
|  | // This probably is never hit? Odds are the process will run out of | 
|  | // memory first. It may be worth to return something else in this | 
|  | // case? | 
|  | assert!( | 
|  | state.num_messages < MAX_CAPACITY, | 
|  | "buffer space \ | 
|  | exhausted; sending this messages would overflow the state" | 
|  | ); | 
|  |  | 
|  | state.num_messages += 1; | 
|  |  | 
|  | let next = encode_state(&state); | 
|  | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { | 
|  | Ok(_) => return Some(state.num_messages), | 
|  | Err(actual) => curr = actual, | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Returns whether the senders send to the same receiver. | 
|  | fn same_receiver(&self, other: &Self) -> bool { | 
|  | Arc::ptr_eq(&self.inner, &other.inner) | 
|  | } | 
|  |  | 
|  | /// Returns whether the sender send to this receiver. | 
|  | fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool { | 
|  | Arc::ptr_eq(&self.inner, inner) | 
|  | } | 
|  |  | 
|  | /// Returns pointer to the Arc containing sender | 
|  | /// | 
|  | /// The returned pointer is not referenced and should be only used for hashing! | 
|  | fn ptr(&self) -> *const UnboundedInner<T> { | 
|  | &*self.inner | 
|  | } | 
|  |  | 
|  | /// Returns whether this channel is closed without needing a context. | 
|  | fn is_closed(&self) -> bool { | 
|  | !decode_state(self.inner.state.load(SeqCst)).is_open | 
|  | } | 
|  |  | 
|  | /// Closes this channel from the sender side, preventing any new messages. | 
|  | fn close_channel(&self) { | 
|  | // There's no need to park this sender, its dropping, | 
|  | // and we don't want to check for capacity, so skip | 
|  | // that stuff from `do_send`. | 
|  |  | 
|  | self.inner.set_closed(); | 
|  | self.inner.recv_task.wake(); | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> BoundedSenderInner<T> { | 
|  | /// Attempts to send a message on this `Sender`, returning the message | 
|  | /// if there was an error. | 
|  | fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { | 
|  | // If the sender is currently blocked, reject the message | 
|  | if !self.poll_unparked(None).is_ready() { | 
|  | return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); | 
|  | } | 
|  |  | 
|  | // The channel has capacity to accept the message, so send it | 
|  | self.do_send_b(msg) | 
|  | } | 
|  |  | 
|  | // Do the send without failing. | 
|  | // Can be called only by bounded sender. | 
|  | fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { | 
|  | // Anyone calling do_send *should* make sure there is room first, | 
|  | // but assert here for tests as a sanity check. | 
|  | debug_assert!(self.poll_unparked(None).is_ready()); | 
|  |  | 
|  | // First, increment the number of messages contained by the channel. | 
|  | // This operation will also atomically determine if the sender task | 
|  | // should be parked. | 
|  | // | 
|  | // `None` is returned in the case that the channel has been closed by the | 
|  | // receiver. This happens when `Receiver::close` is called or the | 
|  | // receiver is dropped. | 
|  | let park_self = match self.inc_num_messages() { | 
|  | Some(num_messages) => { | 
|  | // Block if the current number of pending messages has exceeded | 
|  | // the configured buffer size | 
|  | num_messages > self.inner.buffer | 
|  | } | 
|  | None => { | 
|  | return Err(TrySendError { | 
|  | err: SendError { kind: SendErrorKind::Disconnected }, | 
|  | val: msg, | 
|  | }) | 
|  | } | 
|  | }; | 
|  |  | 
|  | // If the channel has reached capacity, then the sender task needs to | 
|  | // be parked. This will send the task handle on the parked task queue. | 
|  | // | 
|  | // However, when `do_send` is called while dropping the `Sender`, | 
|  | // `task::current()` can't be called safely. In this case, in order to | 
|  | // maintain internal consistency, a blank message is pushed onto the | 
|  | // parked task queue. | 
|  | if park_self { | 
|  | self.park(); | 
|  | } | 
|  |  | 
|  | self.queue_push_and_signal(msg); | 
|  |  | 
|  | Ok(()) | 
|  | } | 
|  |  | 
|  | // Push message to the queue and signal to the receiver | 
|  | fn queue_push_and_signal(&self, msg: T) { | 
|  | // Push the message onto the message queue | 
|  | self.inner.message_queue.push(msg); | 
|  |  | 
|  | // Signal to the receiver that a message has been enqueued. If the | 
|  | // receiver is parked, this will unpark the task. | 
|  | self.inner.recv_task.wake(); | 
|  | } | 
|  |  | 
|  | // Increment the number of queued messages. Returns the resulting number. | 
|  | fn inc_num_messages(&self) -> Option<usize> { | 
|  | let mut curr = self.inner.state.load(SeqCst); | 
|  |  | 
|  | loop { | 
|  | let mut state = decode_state(curr); | 
|  |  | 
|  | // The receiver end closed the channel. | 
|  | if !state.is_open { | 
|  | return None; | 
|  | } | 
|  |  | 
|  | // This probably is never hit? Odds are the process will run out of | 
|  | // memory first. It may be worth to return something else in this | 
|  | // case? | 
|  | assert!( | 
|  | state.num_messages < MAX_CAPACITY, | 
|  | "buffer space \ | 
|  | exhausted; sending this messages would overflow the state" | 
|  | ); | 
|  |  | 
|  | state.num_messages += 1; | 
|  |  | 
|  | let next = encode_state(&state); | 
|  | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { | 
|  | Ok(_) => return Some(state.num_messages), | 
|  | Err(actual) => curr = actual, | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn park(&mut self) { | 
|  | { | 
|  | let mut sender = self.sender_task.lock().unwrap(); | 
|  | sender.task = None; | 
|  | sender.is_parked = true; | 
|  | } | 
|  |  | 
|  | // Send handle over queue | 
|  | let t = self.sender_task.clone(); | 
|  | self.inner.parked_queue.push(t); | 
|  |  | 
|  | // Check to make sure we weren't closed after we sent our task on the | 
|  | // queue | 
|  | let state = decode_state(self.inner.state.load(SeqCst)); | 
|  | self.maybe_parked = state.is_open; | 
|  | } | 
|  |  | 
|  | /// Polls the channel to determine if there is guaranteed capacity to send | 
|  | /// at least one item without waiting. | 
|  | /// | 
|  | /// # Return value | 
|  | /// | 
|  | /// This method returns: | 
|  | /// | 
|  | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; | 
|  | /// - `Poll::Pending` if the channel may not have | 
|  | ///   capacity, in which case the current task is queued to be notified once | 
|  | ///   capacity is available; | 
|  | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. | 
|  | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { | 
|  | let state = decode_state(self.inner.state.load(SeqCst)); | 
|  | if !state.is_open { | 
|  | return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); | 
|  | } | 
|  |  | 
|  | self.poll_unparked(Some(cx)).map(Ok) | 
|  | } | 
|  |  | 
|  | /// Returns whether the senders send to the same receiver. | 
|  | fn same_receiver(&self, other: &Self) -> bool { | 
|  | Arc::ptr_eq(&self.inner, &other.inner) | 
|  | } | 
|  |  | 
|  | /// Returns whether the sender send to this receiver. | 
|  | fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool { | 
|  | Arc::ptr_eq(&self.inner, receiver) | 
|  | } | 
|  |  | 
|  | /// Returns pointer to the Arc containing sender | 
|  | /// | 
|  | /// The returned pointer is not referenced and should be only used for hashing! | 
|  | fn ptr(&self) -> *const BoundedInner<T> { | 
|  | &*self.inner | 
|  | } | 
|  |  | 
|  | /// Returns whether this channel is closed without needing a context. | 
|  | fn is_closed(&self) -> bool { | 
|  | !decode_state(self.inner.state.load(SeqCst)).is_open | 
|  | } | 
|  |  | 
|  | /// Closes this channel from the sender side, preventing any new messages. | 
|  | fn close_channel(&self) { | 
|  | // There's no need to park this sender, its dropping, | 
|  | // and we don't want to check for capacity, so skip | 
|  | // that stuff from `do_send`. | 
|  |  | 
|  | self.inner.set_closed(); | 
|  | self.inner.recv_task.wake(); | 
|  | } | 
|  |  | 
|  | fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { | 
|  | // First check the `maybe_parked` variable. This avoids acquiring the | 
|  | // lock in most cases | 
|  | if self.maybe_parked { | 
|  | // Get a lock on the task handle | 
|  | let mut task = self.sender_task.lock().unwrap(); | 
|  |  | 
|  | if !task.is_parked { | 
|  | self.maybe_parked = false; | 
|  | return Poll::Ready(()); | 
|  | } | 
|  |  | 
|  | // At this point, an unpark request is pending, so there will be an | 
|  | // unpark sometime in the future. We just need to make sure that | 
|  | // the correct task will be notified. | 
|  | // | 
|  | // Update the task in case the `Sender` has been moved to another | 
|  | // task | 
|  | task.task = cx.map(|cx| cx.waker().clone()); | 
|  |  | 
|  | Poll::Pending | 
|  | } else { | 
|  | Poll::Ready(()) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Sender<T> { | 
|  | /// Attempts to send a message on this `Sender`, returning the message | 
|  | /// if there was an error. | 
|  | pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { | 
|  | if let Some(inner) = &mut self.0 { | 
|  | inner.try_send(msg) | 
|  | } else { | 
|  | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Send a message on the channel. | 
|  | /// | 
|  | /// This function should only be called after | 
|  | /// [`poll_ready`](Sender::poll_ready) has reported that the channel is | 
|  | /// ready to receive a message. | 
|  | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { | 
|  | self.try_send(msg).map_err(|e| e.err) | 
|  | } | 
|  |  | 
|  | /// Polls the channel to determine if there is guaranteed capacity to send | 
|  | /// at least one item without waiting. | 
|  | /// | 
|  | /// # Return value | 
|  | /// | 
|  | /// This method returns: | 
|  | /// | 
|  | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; | 
|  | /// - `Poll::Pending` if the channel may not have | 
|  | ///   capacity, in which case the current task is queued to be notified once | 
|  | ///   capacity is available; | 
|  | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. | 
|  | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { | 
|  | let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; | 
|  | inner.poll_ready(cx) | 
|  | } | 
|  |  | 
|  | /// Returns whether this channel is closed without needing a context. | 
|  | pub fn is_closed(&self) -> bool { | 
|  | self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) | 
|  | } | 
|  |  | 
|  | /// Closes this channel from the sender side, preventing any new messages. | 
|  | pub fn close_channel(&mut self) { | 
|  | if let Some(inner) = &mut self.0 { | 
|  | inner.close_channel(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Disconnects this sender from the channel, closing it if there are no more senders left. | 
|  | pub fn disconnect(&mut self) { | 
|  | self.0 = None; | 
|  | } | 
|  |  | 
|  | /// Returns whether the senders send to the same receiver. | 
|  | pub fn same_receiver(&self, other: &Self) -> bool { | 
|  | match (&self.0, &other.0) { | 
|  | (Some(inner), Some(other)) => inner.same_receiver(other), | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Returns whether the sender send to this receiver. | 
|  | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { | 
|  | match (&self.0, &receiver.inner) { | 
|  | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Hashes the receiver into the provided hasher | 
|  | pub fn hash_receiver<H>(&self, hasher: &mut H) | 
|  | where | 
|  | H: std::hash::Hasher, | 
|  | { | 
|  | use std::hash::Hash; | 
|  |  | 
|  | let ptr = self.0.as_ref().map(|inner| inner.ptr()); | 
|  | ptr.hash(hasher); | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> UnboundedSender<T> { | 
|  | /// Check if the channel is ready to receive a message. | 
|  | pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { | 
|  | let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; | 
|  | inner.poll_ready_nb() | 
|  | } | 
|  |  | 
|  | /// Returns whether this channel is closed without needing a context. | 
|  | pub fn is_closed(&self) -> bool { | 
|  | self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) | 
|  | } | 
|  |  | 
|  | /// Closes this channel from the sender side, preventing any new messages. | 
|  | pub fn close_channel(&self) { | 
|  | if let Some(inner) = &self.0 { | 
|  | inner.close_channel(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Disconnects this sender from the channel, closing it if there are no more senders left. | 
|  | pub fn disconnect(&mut self) { | 
|  | self.0 = None; | 
|  | } | 
|  |  | 
|  | // Do the send without parking current task. | 
|  | fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { | 
|  | if let Some(inner) = &self.0 { | 
|  | if inner.inc_num_messages().is_some() { | 
|  | inner.queue_push_and_signal(msg); | 
|  | return Ok(()); | 
|  | } | 
|  | } | 
|  |  | 
|  | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) | 
|  | } | 
|  |  | 
|  | /// Send a message on the channel. | 
|  | /// | 
|  | /// This method should only be called after `poll_ready` has been used to | 
|  | /// verify that the channel is ready to receive a message. | 
|  | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { | 
|  | self.do_send_nb(msg).map_err(|e| e.err) | 
|  | } | 
|  |  | 
|  | /// Sends a message along this channel. | 
|  | /// | 
|  | /// This is an unbounded sender, so this function differs from `Sink::send` | 
|  | /// by ensuring the return type reflects that the channel is always ready to | 
|  | /// receive messages. | 
|  | pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { | 
|  | self.do_send_nb(msg) | 
|  | } | 
|  |  | 
|  | /// Returns whether the senders send to the same receiver. | 
|  | pub fn same_receiver(&self, other: &Self) -> bool { | 
|  | match (&self.0, &other.0) { | 
|  | (Some(inner), Some(other)) => inner.same_receiver(other), | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Returns whether the sender send to this receiver. | 
|  | pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool { | 
|  | match (&self.0, &receiver.inner) { | 
|  | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Hashes the receiver into the provided hasher | 
|  | pub fn hash_receiver<H>(&self, hasher: &mut H) | 
|  | where | 
|  | H: std::hash::Hasher, | 
|  | { | 
|  | use std::hash::Hash; | 
|  |  | 
|  | let ptr = self.0.as_ref().map(|inner| inner.ptr()); | 
|  | ptr.hash(hasher); | 
|  | } | 
|  |  | 
|  | /// Return the number of messages in the queue or 0 if channel is disconnected. | 
|  | pub fn len(&self) -> usize { | 
|  | if let Some(sender) = &self.0 { | 
|  | decode_state(sender.inner.state.load(SeqCst)).num_messages | 
|  | } else { | 
|  | 0 | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Return false is channel has no queued messages, true otherwise. | 
|  | pub fn is_empty(&self) -> bool { | 
|  | self.len() == 0 | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Clone for Sender<T> { | 
|  | fn clone(&self) -> Self { | 
|  | Self(self.0.clone()) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Clone for UnboundedSender<T> { | 
|  | fn clone(&self) -> Self { | 
|  | Self(self.0.clone()) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Clone for UnboundedSenderInner<T> { | 
|  | fn clone(&self) -> Self { | 
|  | // Since this atomic op isn't actually guarding any memory and we don't | 
|  | // care about any orderings besides the ordering on the single atomic | 
|  | // variable, a relaxed ordering is acceptable. | 
|  | let mut curr = self.inner.num_senders.load(SeqCst); | 
|  |  | 
|  | loop { | 
|  | // If the maximum number of senders has been reached, then fail | 
|  | if curr == MAX_BUFFER { | 
|  | panic!("cannot clone `Sender` -- too many outstanding senders"); | 
|  | } | 
|  |  | 
|  | debug_assert!(curr < MAX_BUFFER); | 
|  |  | 
|  | let next = curr + 1; | 
|  | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { | 
|  | Ok(_) => { | 
|  | // The ABA problem doesn't matter here. We only care that the | 
|  | // number of senders never exceeds the maximum. | 
|  | return Self { inner: self.inner.clone() }; | 
|  | } | 
|  | Err(actual) => curr = actual, | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Clone for BoundedSenderInner<T> { | 
|  | fn clone(&self) -> Self { | 
|  | // Since this atomic op isn't actually guarding any memory and we don't | 
|  | // care about any orderings besides the ordering on the single atomic | 
|  | // variable, a relaxed ordering is acceptable. | 
|  | let mut curr = self.inner.num_senders.load(SeqCst); | 
|  |  | 
|  | loop { | 
|  | // If the maximum number of senders has been reached, then fail | 
|  | if curr == self.inner.max_senders() { | 
|  | panic!("cannot clone `Sender` -- too many outstanding senders"); | 
|  | } | 
|  |  | 
|  | debug_assert!(curr < self.inner.max_senders()); | 
|  |  | 
|  | let next = curr + 1; | 
|  | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { | 
|  | Ok(_) => { | 
|  | // The ABA problem doesn't matter here. We only care that the | 
|  | // number of senders never exceeds the maximum. | 
|  | return Self { | 
|  | inner: self.inner.clone(), | 
|  | sender_task: Arc::new(Mutex::new(SenderTask::new())), | 
|  | maybe_parked: false, | 
|  | }; | 
|  | } | 
|  | Err(actual) => curr = actual, | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Drop for UnboundedSenderInner<T> { | 
|  | fn drop(&mut self) { | 
|  | // Ordering between variables don't matter here | 
|  | let prev = self.inner.num_senders.fetch_sub(1, SeqCst); | 
|  |  | 
|  | if prev == 1 { | 
|  | self.close_channel(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Drop for BoundedSenderInner<T> { | 
|  | fn drop(&mut self) { | 
|  | // Ordering between variables don't matter here | 
|  | let prev = self.inner.num_senders.fetch_sub(1, SeqCst); | 
|  |  | 
|  | if prev == 1 { | 
|  | self.close_channel(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Debug for Sender<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | f.debug_struct("Sender").field("closed", &self.is_closed()).finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Debug for UnboundedSender<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * | 
|  | * ===== impl Receiver ===== | 
|  | * | 
|  | */ | 
|  |  | 
|  | impl<T> Receiver<T> { | 
|  | /// Closes the receiving half of a channel, without dropping it. | 
|  | /// | 
|  | /// This prevents any further messages from being sent on the channel while | 
|  | /// still enabling the receiver to drain messages that are buffered. | 
|  | pub fn close(&mut self) { | 
|  | if let Some(inner) = &mut self.inner { | 
|  | inner.set_closed(); | 
|  |  | 
|  | // Wake up any threads waiting as they'll see that we've closed the | 
|  | // channel and will continue on their merry way. | 
|  | while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { | 
|  | task.lock().unwrap().notify(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Tries to receive the next message without notifying a context if empty. | 
|  | /// | 
|  | /// It is not recommended to call this function from inside of a future, | 
|  | /// only when you've otherwise arranged to be notified when the channel is | 
|  | /// no longer empty. | 
|  | /// | 
|  | /// This function returns: | 
|  | /// * `Ok(Some(t))` when message is fetched | 
|  | /// * `Ok(None)` when channel is closed and no messages left in the queue | 
|  | /// * `Err(e)` when there are no messages available, but channel is not yet closed | 
|  | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { | 
|  | match self.next_message() { | 
|  | Poll::Ready(msg) => Ok(msg), | 
|  | Poll::Pending => Err(TryRecvError { _priv: () }), | 
|  | } | 
|  | } | 
|  |  | 
|  | fn next_message(&mut self) -> Poll<Option<T>> { | 
|  | let inner = match self.inner.as_mut() { | 
|  | None => return Poll::Ready(None), | 
|  | Some(inner) => inner, | 
|  | }; | 
|  | // Pop off a message | 
|  | match unsafe { inner.message_queue.pop_spin() } { | 
|  | Some(msg) => { | 
|  | // If there are any parked task handles in the parked queue, | 
|  | // pop one and unpark it. | 
|  | self.unpark_one(); | 
|  |  | 
|  | // Decrement number of messages | 
|  | self.dec_num_messages(); | 
|  |  | 
|  | Poll::Ready(Some(msg)) | 
|  | } | 
|  | None => { | 
|  | let state = decode_state(inner.state.load(SeqCst)); | 
|  | if state.is_closed() { | 
|  | // If closed flag is set AND there are no pending messages | 
|  | // it means end of stream | 
|  | self.inner = None; | 
|  | Poll::Ready(None) | 
|  | } else { | 
|  | // If queue is open, we need to return Pending | 
|  | // to be woken up when new messages arrive. | 
|  | // If queue is closed but num_messages is non-zero, | 
|  | // it means that senders updated the state, | 
|  | // but didn't put message to queue yet, | 
|  | // so we need to park until sender unparks the task | 
|  | // after queueing the message. | 
|  | Poll::Pending | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // Unpark a single task handle if there is one pending in the parked queue | 
|  | fn unpark_one(&mut self) { | 
|  | if let Some(inner) = &mut self.inner { | 
|  | if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { | 
|  | task.lock().unwrap().notify(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn dec_num_messages(&self) { | 
|  | if let Some(inner) = &self.inner { | 
|  | // OPEN_MASK is highest bit, so it's unaffected by subtraction | 
|  | // unless there's underflow, and we know there's no underflow | 
|  | // because number of messages at this point is always > 0. | 
|  | inner.state.fetch_sub(1, SeqCst); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // The receiver does not ever take a Pin to the inner T | 
|  | impl<T> Unpin for Receiver<T> {} | 
|  |  | 
|  | impl<T> FusedStream for Receiver<T> { | 
|  | fn is_terminated(&self) -> bool { | 
|  | self.inner.is_none() | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Stream for Receiver<T> { | 
|  | type Item = T; | 
|  |  | 
|  | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | 
|  | // Try to read a message off of the message queue. | 
|  | match self.next_message() { | 
|  | Poll::Ready(msg) => { | 
|  | if msg.is_none() { | 
|  | self.inner = None; | 
|  | } | 
|  | Poll::Ready(msg) | 
|  | } | 
|  | Poll::Pending => { | 
|  | // There are no messages to read, in this case, park. | 
|  | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); | 
|  | // Check queue again after parking to prevent race condition: | 
|  | // a message could be added to the queue after previous `next_message` | 
|  | // before `register` call. | 
|  | self.next_message() | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn size_hint(&self) -> (usize, Option<usize>) { | 
|  | if let Some(inner) = &self.inner { | 
|  | decode_state(inner.state.load(SeqCst)).size_hint() | 
|  | } else { | 
|  | (0, Some(0)) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Drop for Receiver<T> { | 
|  | fn drop(&mut self) { | 
|  | // Drain the channel of all pending messages | 
|  | self.close(); | 
|  | if self.inner.is_some() { | 
|  | loop { | 
|  | match self.next_message() { | 
|  | Poll::Ready(Some(_)) => {} | 
|  | Poll::Ready(None) => break, | 
|  | Poll::Pending => { | 
|  | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); | 
|  |  | 
|  | // If the channel is closed, then there is no need to park. | 
|  | if state.is_closed() { | 
|  | break; | 
|  | } | 
|  |  | 
|  | // TODO: Spinning isn't ideal, it might be worth | 
|  | // investigating using a condvar or some other strategy | 
|  | // here. That said, if this case is hit, then another thread | 
|  | // is about to push the value into the queue and this isn't | 
|  | // the only spinlock in the impl right now. | 
|  | thread::yield_now(); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Debug for Receiver<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | let closed = if let Some(ref inner) = self.inner { | 
|  | decode_state(inner.state.load(SeqCst)).is_closed() | 
|  | } else { | 
|  | false | 
|  | }; | 
|  |  | 
|  | f.debug_struct("Receiver").field("closed", &closed).finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> UnboundedReceiver<T> { | 
|  | /// Closes the receiving half of a channel, without dropping it. | 
|  | /// | 
|  | /// This prevents any further messages from being sent on the channel while | 
|  | /// still enabling the receiver to drain messages that are buffered. | 
|  | pub fn close(&mut self) { | 
|  | if let Some(inner) = &mut self.inner { | 
|  | inner.set_closed(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Tries to receive the next message without notifying a context if empty. | 
|  | /// | 
|  | /// It is not recommended to call this function from inside of a future, | 
|  | /// only when you've otherwise arranged to be notified when the channel is | 
|  | /// no longer empty. | 
|  | /// | 
|  | /// This function returns: | 
|  | /// * `Ok(Some(t))` when message is fetched | 
|  | /// * `Ok(None)` when channel is closed and no messages left in the queue | 
|  | /// * `Err(e)` when there are no messages available, but channel is not yet closed | 
|  | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { | 
|  | match self.next_message() { | 
|  | Poll::Ready(msg) => Ok(msg), | 
|  | Poll::Pending => Err(TryRecvError { _priv: () }), | 
|  | } | 
|  | } | 
|  |  | 
|  | fn next_message(&mut self) -> Poll<Option<T>> { | 
|  | let inner = match self.inner.as_mut() { | 
|  | None => return Poll::Ready(None), | 
|  | Some(inner) => inner, | 
|  | }; | 
|  | // Pop off a message | 
|  | match unsafe { inner.message_queue.pop_spin() } { | 
|  | Some(msg) => { | 
|  | // Decrement number of messages | 
|  | self.dec_num_messages(); | 
|  |  | 
|  | Poll::Ready(Some(msg)) | 
|  | } | 
|  | None => { | 
|  | let state = decode_state(inner.state.load(SeqCst)); | 
|  | if state.is_closed() { | 
|  | // If closed flag is set AND there are no pending messages | 
|  | // it means end of stream | 
|  | self.inner = None; | 
|  | Poll::Ready(None) | 
|  | } else { | 
|  | // If queue is open, we need to return Pending | 
|  | // to be woken up when new messages arrive. | 
|  | // If queue is closed but num_messages is non-zero, | 
|  | // it means that senders updated the state, | 
|  | // but didn't put message to queue yet, | 
|  | // so we need to park until sender unparks the task | 
|  | // after queueing the message. | 
|  | Poll::Pending | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn dec_num_messages(&self) { | 
|  | if let Some(inner) = &self.inner { | 
|  | // OPEN_MASK is highest bit, so it's unaffected by subtraction | 
|  | // unless there's underflow, and we know there's no underflow | 
|  | // because number of messages at this point is always > 0. | 
|  | inner.state.fetch_sub(1, SeqCst); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> FusedStream for UnboundedReceiver<T> { | 
|  | fn is_terminated(&self) -> bool { | 
|  | self.inner.is_none() | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Stream for UnboundedReceiver<T> { | 
|  | type Item = T; | 
|  |  | 
|  | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | 
|  | // Try to read a message off of the message queue. | 
|  | match self.next_message() { | 
|  | Poll::Ready(msg) => { | 
|  | if msg.is_none() { | 
|  | self.inner = None; | 
|  | } | 
|  | Poll::Ready(msg) | 
|  | } | 
|  | Poll::Pending => { | 
|  | // There are no messages to read, in this case, park. | 
|  | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); | 
|  | // Check queue again after parking to prevent race condition: | 
|  | // a message could be added to the queue after previous `next_message` | 
|  | // before `register` call. | 
|  | self.next_message() | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn size_hint(&self) -> (usize, Option<usize>) { | 
|  | if let Some(inner) = &self.inner { | 
|  | decode_state(inner.state.load(SeqCst)).size_hint() | 
|  | } else { | 
|  | (0, Some(0)) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Drop for UnboundedReceiver<T> { | 
|  | fn drop(&mut self) { | 
|  | // Drain the channel of all pending messages | 
|  | self.close(); | 
|  | if self.inner.is_some() { | 
|  | loop { | 
|  | match self.next_message() { | 
|  | Poll::Ready(Some(_)) => {} | 
|  | Poll::Ready(None) => break, | 
|  | Poll::Pending => { | 
|  | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); | 
|  |  | 
|  | // If the channel is closed, then there is no need to park. | 
|  | if state.is_closed() { | 
|  | break; | 
|  | } | 
|  |  | 
|  | // TODO: Spinning isn't ideal, it might be worth | 
|  | // investigating using a condvar or some other strategy | 
|  | // here. That said, if this case is hit, then another thread | 
|  | // is about to push the value into the queue and this isn't | 
|  | // the only spinlock in the impl right now. | 
|  | thread::yield_now(); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Debug for UnboundedReceiver<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | let closed = if let Some(ref inner) = self.inner { | 
|  | decode_state(inner.state.load(SeqCst)).is_closed() | 
|  | } else { | 
|  | false | 
|  | }; | 
|  |  | 
|  | f.debug_struct("Receiver").field("closed", &closed).finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * | 
|  | * ===== impl Inner ===== | 
|  | * | 
|  | */ | 
|  |  | 
|  | impl<T> UnboundedInner<T> { | 
|  | // Clear `open` flag in the state, keep `num_messages` intact. | 
|  | fn set_closed(&self) { | 
|  | let curr = self.state.load(SeqCst); | 
|  | if !decode_state(curr).is_open { | 
|  | return; | 
|  | } | 
|  |  | 
|  | self.state.fetch_and(!OPEN_MASK, SeqCst); | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> BoundedInner<T> { | 
|  | // The return value is such that the total number of messages that can be | 
|  | // enqueued into the channel will never exceed MAX_CAPACITY | 
|  | fn max_senders(&self) -> usize { | 
|  | MAX_CAPACITY - self.buffer | 
|  | } | 
|  |  | 
|  | // Clear `open` flag in the state, keep `num_messages` intact. | 
|  | fn set_closed(&self) { | 
|  | let curr = self.state.load(SeqCst); | 
|  | if !decode_state(curr).is_open { | 
|  | return; | 
|  | } | 
|  |  | 
|  | self.state.fetch_and(!OPEN_MASK, SeqCst); | 
|  | } | 
|  | } | 
|  |  | 
|  | unsafe impl<T: Send> Send for UnboundedInner<T> {} | 
|  | unsafe impl<T: Send> Sync for UnboundedInner<T> {} | 
|  |  | 
|  | unsafe impl<T: Send> Send for BoundedInner<T> {} | 
|  | unsafe impl<T: Send> Sync for BoundedInner<T> {} | 
|  |  | 
|  | impl State { | 
|  | fn is_closed(&self) -> bool { | 
|  | !self.is_open && self.num_messages == 0 | 
|  | } | 
|  |  | 
|  | fn size_hint(&self) -> (usize, Option<usize>) { | 
|  | if self.is_open { | 
|  | (self.num_messages, None) | 
|  | } else { | 
|  | (self.num_messages, Some(self.num_messages)) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * | 
|  | * ===== Helpers ===== | 
|  | * | 
|  | */ | 
|  |  | 
|  | fn decode_state(num: usize) -> State { | 
|  | State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } | 
|  | } | 
|  |  | 
|  | fn encode_state(state: &State) -> usize { | 
|  | let mut num = state.num_messages; | 
|  |  | 
|  | if state.is_open { | 
|  | num |= OPEN_MASK; | 
|  | } | 
|  |  | 
|  | num | 
|  | } |