| use std::fmt; |
| use std::io; |
| use std::net::{SocketAddr, TcpListener as StdTcpListener}; |
| use std::time::{Duration, Instant}; |
| |
| use futures::{Async, Future, Poll, Stream}; |
| use tokio_reactor::Handle; |
| use tokio_tcp::TcpListener; |
| use tokio_timer::Delay; |
| |
| pub use self::addr_stream::AddrStream; |
| |
| /// A stream of connections from binding to an address. |
| #[must_use = "streams do nothing unless polled"] |
| pub struct AddrIncoming { |
| addr: SocketAddr, |
| listener: TcpListener, |
| sleep_on_errors: bool, |
| tcp_keepalive_timeout: Option<Duration>, |
| tcp_nodelay: bool, |
| timeout: Option<Delay>, |
| } |
| |
| impl AddrIncoming { |
| pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result<Self> { |
| let std_listener = StdTcpListener::bind(addr) |
| .map_err(::Error::new_listen)?; |
| |
| if let Some(handle) = handle { |
| AddrIncoming::from_std(std_listener, handle) |
| } else { |
| let handle = Handle::default(); |
| AddrIncoming::from_std(std_listener, &handle) |
| } |
| } |
| |
| pub(super) fn from_std(std_listener: StdTcpListener, handle: &Handle) -> ::Result<Self> { |
| let listener = TcpListener::from_std(std_listener, &handle) |
| .map_err(::Error::new_listen)?; |
| let addr = listener.local_addr().map_err(::Error::new_listen)?; |
| Ok(AddrIncoming { |
| listener, |
| addr: addr, |
| sleep_on_errors: true, |
| tcp_keepalive_timeout: None, |
| tcp_nodelay: false, |
| timeout: None, |
| }) |
| } |
| |
| /// Creates a new `AddrIncoming` binding to provided socket address. |
| pub fn bind(addr: &SocketAddr) -> ::Result<Self> { |
| AddrIncoming::new(addr, None) |
| } |
| |
| /// Get the local address bound to this listener. |
| pub fn local_addr(&self) -> SocketAddr { |
| self.addr |
| } |
| |
| /// Set whether TCP keepalive messages are enabled on accepted connections. |
| /// |
| /// If `None` is specified, keepalive is disabled, otherwise the duration |
| /// specified will be the time to remain idle before sending TCP keepalive |
| /// probes. |
| pub fn set_keepalive(&mut self, keepalive: Option<Duration>) -> &mut Self { |
| self.tcp_keepalive_timeout = keepalive; |
| self |
| } |
| |
| /// Set the value of `TCP_NODELAY` option for accepted connections. |
| pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self { |
| self.tcp_nodelay = enabled; |
| self |
| } |
| |
| /// Set whether to sleep on accept errors. |
| /// |
| /// A possible scenario is that the process has hit the max open files |
| /// allowed, and so trying to accept a new connection will fail with |
| /// `EMFILE`. In some cases, it's preferable to just wait for some time, if |
| /// the application will likely close some files (or connections), and try |
| /// to accept the connection again. If this option is `true`, the error |
| /// will be logged at the `error` level, since it is still a big deal, |
| /// and then the listener will sleep for 1 second. |
| /// |
| /// In other cases, hitting the max open files should be treat similarly |
| /// to being out-of-memory, and simply error (and shutdown). Setting |
| /// this option to `false` will allow that. |
| /// |
| /// Default is `true`. |
| pub fn set_sleep_on_errors(&mut self, val: bool) { |
| self.sleep_on_errors = val; |
| } |
| } |
| |
| impl Stream for AddrIncoming { |
| // currently unnameable... |
| type Item = AddrStream; |
| type Error = ::std::io::Error; |
| |
| fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |
| // Check if a previous timeout is active that was set by IO errors. |
| if let Some(ref mut to) = self.timeout { |
| match to.poll() { |
| Ok(Async::Ready(())) => {} |
| Ok(Async::NotReady) => return Ok(Async::NotReady), |
| Err(err) => { |
| error!("sleep timer error: {}", err); |
| } |
| } |
| } |
| self.timeout = None; |
| loop { |
| match self.listener.poll_accept() { |
| Ok(Async::Ready((socket, addr))) => { |
| if let Some(dur) = self.tcp_keepalive_timeout { |
| if let Err(e) = socket.set_keepalive(Some(dur)) { |
| trace!("error trying to set TCP keepalive: {}", e); |
| } |
| } |
| if let Err(e) = socket.set_nodelay(self.tcp_nodelay) { |
| trace!("error trying to set TCP nodelay: {}", e); |
| } |
| return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); |
| }, |
| Ok(Async::NotReady) => return Ok(Async::NotReady), |
| Err(e) => { |
| // Connection errors can be ignored directly, continue by |
| // accepting the next request. |
| if is_connection_error(&e) { |
| debug!("accepted connection already errored: {}", e); |
| continue; |
| } |
| |
| if self.sleep_on_errors { |
| // Sleep 1s. |
| let delay = Instant::now() + Duration::from_secs(1); |
| let mut timeout = Delay::new(delay); |
| |
| match timeout.poll() { |
| Ok(Async::Ready(())) => { |
| // Wow, it's been a second already? Ok then... |
| error!("accept error: {}", e); |
| continue |
| }, |
| Ok(Async::NotReady) => { |
| error!("accept error: {}", e); |
| self.timeout = Some(timeout); |
| return Ok(Async::NotReady); |
| }, |
| Err(timer_err) => { |
| error!("couldn't sleep on error, timer error: {}", timer_err); |
| return Err(e); |
| } |
| } |
| } else { |
| return Err(e); |
| } |
| }, |
| } |
| } |
| } |
| } |
| |
| /// This function defines errors that are per-connection. Which basically |
| /// means that if we get this error from `accept()` system call it means |
| /// next connection might be ready to be accepted. |
| /// |
| /// All other errors will incur a timeout before next `accept()` is performed. |
| /// The timeout is useful to handle resource exhaustion errors like ENFILE |
| /// and EMFILE. Otherwise, could enter into tight loop. |
| fn is_connection_error(e: &io::Error) -> bool { |
| match e.kind() { |
| io::ErrorKind::ConnectionRefused | |
| io::ErrorKind::ConnectionAborted | |
| io::ErrorKind::ConnectionReset => true, |
| _ => false, |
| } |
| } |
| |
| impl fmt::Debug for AddrIncoming { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("AddrIncoming") |
| .field("addr", &self.addr) |
| .field("sleep_on_errors", &self.sleep_on_errors) |
| .field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout) |
| .field("tcp_nodelay", &self.tcp_nodelay) |
| .finish() |
| } |
| } |
| |
| mod addr_stream { |
| use std::io::{self, Read, Write}; |
| use std::net::SocketAddr; |
| use bytes::{Buf, BufMut}; |
| use futures::Poll; |
| use tokio_tcp::TcpStream; |
| use tokio_io::{AsyncRead, AsyncWrite}; |
| |
| |
| /// A transport returned yieled by `AddrIncoming`. |
| #[derive(Debug)] |
| pub struct AddrStream { |
| inner: TcpStream, |
| pub(super) remote_addr: SocketAddr, |
| } |
| |
| impl AddrStream { |
| pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream { |
| AddrStream { |
| inner: tcp, |
| remote_addr: addr, |
| } |
| } |
| |
| /// Returns the remote (peer) address of this connection. |
| #[inline] |
| pub fn remote_addr(&self) -> SocketAddr { |
| self.remote_addr |
| } |
| |
| /// Consumes the AddrStream and returns the underlying IO object |
| #[inline] |
| pub fn into_inner(self) -> TcpStream { |
| self.inner |
| } |
| } |
| |
| impl Read for AddrStream { |
| #[inline] |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| self.inner.read(buf) |
| } |
| } |
| |
| impl Write for AddrStream { |
| #[inline] |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| self.inner.write(buf) |
| } |
| |
| #[inline] |
| fn flush(&mut self) -> io::Result<()> { |
| // TcpStream::flush is a noop, so skip calling it... |
| Ok(()) |
| } |
| } |
| |
| impl AsyncRead for AddrStream { |
| #[inline] |
| unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { |
| self.inner.prepare_uninitialized_buffer(buf) |
| } |
| |
| #[inline] |
| fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| self.inner.read_buf(buf) |
| } |
| } |
| |
| impl AsyncWrite for AddrStream { |
| #[inline] |
| fn shutdown(&mut self) -> Poll<(), io::Error> { |
| AsyncWrite::shutdown(&mut self.inner) |
| } |
| |
| #[inline] |
| fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| self.inner.write_buf(buf) |
| } |
| } |
| } |