|  | // Copyright 2019 Intel Corporation. All Rights Reserved. | 
|  | // Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved. | 
|  | // | 
|  | // SPDX-License-Identifier: Apache-2.0 | 
|  |  | 
|  | //! A simple framework to run a vhost-user backend service. | 
|  |  | 
|  | #[macro_use] | 
|  | extern crate log; | 
|  |  | 
|  | use std::fmt::{Display, Formatter}; | 
|  | use std::sync::{Arc, Mutex}; | 
|  | use std::thread; | 
|  |  | 
|  | use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler}; | 
|  | use vm_memory::bitmap::Bitmap; | 
|  | use vm_memory::mmap::NewBitmap; | 
|  | use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; | 
|  |  | 
|  | use self::handler::VhostUserHandler; | 
|  |  | 
|  | mod backend; | 
|  | pub use self::backend::{VhostUserBackend, VhostUserBackendMut}; | 
|  |  | 
|  | mod event_loop; | 
|  | pub use self::event_loop::VringEpollHandler; | 
|  |  | 
|  | mod handler; | 
|  | pub use self::handler::VhostUserHandlerError; | 
|  |  | 
|  | mod vring; | 
|  | pub use self::vring::{ | 
|  | VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT, | 
|  | }; | 
|  |  | 
|  | /// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code. | 
|  | type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>; | 
|  |  | 
|  | #[derive(Debug)] | 
|  | /// Errors related to vhost-user daemon. | 
|  | pub enum Error { | 
|  | /// Failed to create a new vhost-user handler. | 
|  | NewVhostUserHandler(VhostUserHandlerError), | 
|  | /// Failed creating vhost-user slave listener. | 
|  | CreateSlaveListener(VhostUserError), | 
|  | /// Failed creating vhost-user slave handler. | 
|  | CreateSlaveReqHandler(VhostUserError), | 
|  | /// Failed starting daemon thread. | 
|  | StartDaemon(std::io::Error), | 
|  | /// Failed waiting for daemon thread. | 
|  | WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>), | 
|  | /// Failed handling a vhost-user request. | 
|  | HandleRequest(VhostUserError), | 
|  | } | 
|  |  | 
|  | impl Display for Error { | 
|  | fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { | 
|  | match self { | 
|  | Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e), | 
|  | Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e), | 
|  | Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e), | 
|  | Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e), | 
|  | Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"), | 
|  | Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e), | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Result of vhost-user daemon operations. | 
|  | pub type Result<T> = std::result::Result<T, Error>; | 
|  |  | 
|  | /// Implement a simple framework to run a vhost-user service daemon. | 
|  | /// | 
|  | /// This structure is the public API the backend is allowed to interact with in order to run | 
|  | /// a fully functional vhost-user daemon. | 
|  | pub struct VhostUserDaemon<S, V, B: Bitmap + 'static = ()> { | 
|  | name: String, | 
|  | handler: Arc<Mutex<VhostUserHandler<S, V, B>>>, | 
|  | main_thread: Option<thread::JoinHandle<Result<()>>>, | 
|  | } | 
|  |  | 
|  | impl<S, V, B> VhostUserDaemon<S, V, B> | 
|  | where | 
|  | S: VhostUserBackend<V, B> + Clone + 'static, | 
|  | V: VringT<GM<B>> + Clone + Send + Sync + 'static, | 
|  | B: NewBitmap + Clone + Send + Sync, | 
|  | { | 
|  | /// Create the daemon instance, providing the backend implementation of `VhostUserBackend`. | 
|  | /// | 
|  | /// Under the hood, this will start a dedicated thread responsible for listening onto | 
|  | /// registered event. Those events can be vring events or custom events from the backend, | 
|  | /// but they get to be registered later during the sequence. | 
|  | pub fn new( | 
|  | name: String, | 
|  | backend: S, | 
|  | atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>, | 
|  | ) -> Result<Self> { | 
|  | let handler = Arc::new(Mutex::new( | 
|  | VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?, | 
|  | )); | 
|  |  | 
|  | Ok(VhostUserDaemon { | 
|  | name, | 
|  | handler, | 
|  | main_thread: None, | 
|  | }) | 
|  | } | 
|  |  | 
|  | /// Run a dedicated thread handling all requests coming through the socket. | 
|  | /// This runs in an infinite loop that should be terminating once the other | 
|  | /// end of the socket (the VMM) hangs up. | 
|  | /// | 
|  | /// This function is the common code for starting a new daemon, no matter if | 
|  | /// it acts as a client or a server. | 
|  | fn start_daemon( | 
|  | &mut self, | 
|  | mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>, | 
|  | ) -> Result<()> { | 
|  | let handle = thread::Builder::new() | 
|  | .name(self.name.clone()) | 
|  | .spawn(move || loop { | 
|  | handler.handle_request().map_err(Error::HandleRequest)?; | 
|  | }) | 
|  | .map_err(Error::StartDaemon)?; | 
|  |  | 
|  | self.main_thread = Some(handle); | 
|  |  | 
|  | Ok(()) | 
|  | } | 
|  |  | 
|  | /// Connect to the vhost-user socket and run a dedicated thread handling | 
|  | /// all requests coming through this socket. This runs in an infinite loop | 
|  | /// that should be terminating once the other end of the socket (the VMM) | 
|  | /// hangs up. | 
|  | pub fn start_client(&mut self, socket_path: &str) -> Result<()> { | 
|  | let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone()) | 
|  | .map_err(Error::CreateSlaveReqHandler)?; | 
|  | self.start_daemon(slave_handler) | 
|  | } | 
|  |  | 
|  | /// Listen to the vhost-user socket and run a dedicated thread handling all requests coming | 
|  | /// through this socket. | 
|  | /// | 
|  | /// This runs in an infinite loop that should be terminating once the other end of the socket | 
|  | /// (the VMM) disconnects. | 
|  | // TODO: the current implementation has limitations that only one incoming connection will be | 
|  | // handled from the listener. Should it be enhanced to support reconnection? | 
|  | pub fn start(&mut self, listener: Listener) -> Result<()> { | 
|  | let mut slave_listener = SlaveListener::new(listener, self.handler.clone()) | 
|  | .map_err(Error::CreateSlaveListener)?; | 
|  | let slave_handler = self.accept(&mut slave_listener)?; | 
|  | self.start_daemon(slave_handler) | 
|  | } | 
|  |  | 
|  | fn accept( | 
|  | &self, | 
|  | slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>, | 
|  | ) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>> { | 
|  | loop { | 
|  | match slave_listener.accept() { | 
|  | Err(e) => return Err(Error::CreateSlaveListener(e)), | 
|  | Ok(Some(v)) => return Ok(v), | 
|  | Ok(None) => continue, | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Wait for the thread handling the vhost-user socket connection to terminate. | 
|  | pub fn wait(&mut self) -> Result<()> { | 
|  | if let Some(handle) = self.main_thread.take() { | 
|  | match handle.join().map_err(Error::WaitDaemon)? { | 
|  | Ok(()) => Ok(()), | 
|  | Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()), | 
|  | Err(e) => Err(e), | 
|  | } | 
|  | } else { | 
|  | Ok(()) | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Retrieve the vring epoll handler. | 
|  | /// | 
|  | /// This is necessary to perform further actions like registering and unregistering some extra | 
|  | /// event file descriptors. | 
|  | pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> { | 
|  | // Do not expect poisoned lock. | 
|  | self.handler.lock().unwrap().get_epoll_handlers() | 
|  | } | 
|  | } | 
|  |  | 
|  | #[cfg(test)] | 
|  | mod tests { | 
|  | use super::backend::tests::MockVhostBackend; | 
|  | use super::*; | 
|  | use std::os::unix::net::{UnixListener, UnixStream}; | 
|  | use std::sync::Barrier; | 
|  | use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; | 
|  |  | 
|  | #[test] | 
|  | fn test_new_daemon() { | 
|  | let mem = GuestMemoryAtomic::new( | 
|  | GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), | 
|  | ); | 
|  | let backend = Arc::new(Mutex::new(MockVhostBackend::new())); | 
|  | let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); | 
|  |  | 
|  | let handlers = daemon.get_epoll_handlers(); | 
|  | assert_eq!(handlers.len(), 2); | 
|  |  | 
|  | let barrier = Arc::new(Barrier::new(2)); | 
|  | let tmpdir = tempfile::tempdir().unwrap(); | 
|  | let mut path = tmpdir.path().to_path_buf(); | 
|  | path.push("socket"); | 
|  |  | 
|  | let barrier2 = barrier.clone(); | 
|  | let path1 = path.clone(); | 
|  | let thread = thread::spawn(move || { | 
|  | barrier2.wait(); | 
|  | let socket = UnixStream::connect(&path1).unwrap(); | 
|  | barrier2.wait(); | 
|  | drop(socket) | 
|  | }); | 
|  |  | 
|  | let listener = Listener::new(&path, false).unwrap(); | 
|  | barrier.wait(); | 
|  | daemon.start(listener).unwrap(); | 
|  | barrier.wait(); | 
|  | // Above process generates a `HandleRequest(PartialMessage)` error. | 
|  | daemon.wait().unwrap_err(); | 
|  | daemon.wait().unwrap(); | 
|  | thread.join().unwrap(); | 
|  | } | 
|  |  | 
|  | #[test] | 
|  | fn test_new_daemon_client() { | 
|  | let mem = GuestMemoryAtomic::new( | 
|  | GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), | 
|  | ); | 
|  | let backend = Arc::new(Mutex::new(MockVhostBackend::new())); | 
|  | let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); | 
|  |  | 
|  | let handlers = daemon.get_epoll_handlers(); | 
|  | assert_eq!(handlers.len(), 2); | 
|  |  | 
|  | let barrier = Arc::new(Barrier::new(2)); | 
|  | let tmpdir = tempfile::tempdir().unwrap(); | 
|  | let mut path = tmpdir.path().to_path_buf(); | 
|  | path.push("socket"); | 
|  |  | 
|  | let barrier2 = barrier.clone(); | 
|  | let path1 = path.clone(); | 
|  | let thread = thread::spawn(move || { | 
|  | let listener = UnixListener::bind(&path1).unwrap(); | 
|  | barrier2.wait(); | 
|  | let (stream, _) = listener.accept().unwrap(); | 
|  | barrier2.wait(); | 
|  | drop(stream) | 
|  | }); | 
|  |  | 
|  | barrier.wait(); | 
|  | daemon | 
|  | .start_client(path.as_path().to_str().unwrap()) | 
|  | .unwrap(); | 
|  | barrier.wait(); | 
|  | // Above process generates a `HandleRequest(PartialMessage)` error. | 
|  | daemon.wait().unwrap_err(); | 
|  | daemon.wait().unwrap(); | 
|  | thread.join().unwrap(); | 
|  | } | 
|  | } |