blob: 13976c3e3c98fc9a5134002f7702d1b742237c4e [file] [log] [blame]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod sys;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use base::warn;
use base::Event;
use base::Timer;
use cros_async::sync::Mutex as AsyncMutex;
use cros_async::AsyncTube;
use cros_async::EventAsync;
use cros_async::Executor;
use cros_async::ExecutorKind;
use cros_async::TimerAsync;
use futures::future::AbortHandle;
use futures::future::Abortable;
use sync::Mutex;
pub use sys::start_device as run_block_device;
pub use sys::Options;
use vm_memory::GuestMemory;
use vmm_vhost::message::*;
use vmm_vhost::VhostUserSlaveReqHandler;
use zerocopy::AsBytes;
use crate::virtio;
use crate::virtio::block::asynchronous::flush_disk;
use crate::virtio::block::asynchronous::handle_queue;
use crate::virtio::block::asynchronous::handle_vhost_user_command_tube;
use crate::virtio::block::asynchronous::BlockAsync;
use crate::virtio::block::DiskState;
use crate::virtio::copy_config;
use crate::virtio::vhost::user::device::handler::sys::Doorbell;
use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
use crate::virtio::vhost::user::device::handler::Error as DeviceError;
use crate::virtio::vhost::user::device::handler::VhostBackendReqConnection;
use crate::virtio::vhost::user::device::handler::VhostBackendReqConnectionState;
use crate::virtio::vhost::user::device::handler::VhostUserBackend;
use crate::virtio::vhost::user::device::handler::VhostUserPlatformOps;
use crate::virtio::vhost::user::device::handler::WorkerState;
use crate::virtio::vhost::user::device::VhostUserDevice;
use crate::virtio::Queue;
const NUM_QUEUES: u16 = 16;
struct BlockBackend {
ex: Executor,
disk_state: Rc<AsyncMutex<DiskState>>,
disk_size: Arc<AtomicU64>,
block_size: u32,
seg_max: u32,
avail_features: u64,
acked_features: u64,
acked_protocol_features: VhostUserProtocolFeatures,
flush_timer: Rc<RefCell<TimerAsync>>,
flush_timer_armed: Rc<RefCell<bool>>,
backend_req_conn: Arc<Mutex<VhostBackendReqConnectionState>>,
workers: [Option<WorkerState<Rc<RefCell<Queue>>, ()>>; NUM_QUEUES as usize],
}
impl VhostUserDevice for BlockAsync {
fn max_queue_num(&self) -> usize {
NUM_QUEUES as usize
}
fn into_req_handler(
mut self: Box<Self>,
ops: Box<dyn VhostUserPlatformOps>,
ex: &Executor,
) -> anyhow::Result<Box<dyn VhostUserSlaveReqHandler>> {
let avail_features =
self.avail_features | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
let disk_image = match self.disk_image.take() {
Some(disk_image) => disk_image,
None => bail!("cannot create a vhost-user backend from an empty disk image"),
};
let async_image = disk_image.to_async_disk(ex)?;
let disk_state = Rc::new(AsyncMutex::new(DiskState::new(
async_image,
Arc::clone(&self.disk_size),
self.read_only,
self.sparse,
self.id,
)));
let timer = Timer::new().context("Failed to create a timer")?;
let flush_timer_write = Rc::new(RefCell::new(
TimerAsync::new(
// Call try_clone() to share the same underlying FD with the `flush_disk` task.
timer.try_clone().context("Failed to clone flush_timer")?,
ex,
)
.context("Failed to create an async timer")?,
));
// Create a separate TimerAsync with the same backing kernel timer. This allows the
// `flush_disk` task to borrow its copy waiting for events while the queue handlers can
// still borrow their copy momentarily to set timeouts.
// Call try_clone() to share the same underlying FD with the `flush_disk` task.
let flush_timer_read = timer
.try_clone()
.context("Failed to clone flush_timer")
.and_then(|t| TimerAsync::new(t, ex).context("Failed to create an async timer"))?;
let flush_timer_armed = Rc::new(RefCell::new(false));
ex.spawn_local(flush_disk(
Rc::clone(&disk_state),
flush_timer_read,
Rc::clone(&flush_timer_armed),
))
.detach();
let backend_req_conn = Arc::new(Mutex::new(VhostBackendReqConnectionState::NoConnection));
if let Some(control_tube) = self.control_tube.take() {
let async_tube = AsyncTube::new(ex, control_tube)?;
ex.spawn_local(handle_vhost_user_command_tube(
async_tube,
Arc::clone(&backend_req_conn),
Rc::clone(&disk_state),
))
.detach();
}
let backend = BlockBackend {
ex: ex.clone(),
disk_state,
disk_size: Arc::clone(&self.disk_size),
block_size: self.block_size,
seg_max: self.seg_max,
avail_features,
acked_features: 0,
acked_protocol_features: VhostUserProtocolFeatures::empty(),
flush_timer: flush_timer_write,
backend_req_conn: Arc::clone(&backend_req_conn),
flush_timer_armed,
workers: Default::default(),
};
let handler = DeviceRequestHandler::new(Box::new(backend), ops);
Ok(Box::new(std::sync::Mutex::new(handler)))
}
fn executor_kind(&self) -> Option<ExecutorKind> {
Some(self.executor_kind)
}
}
impl VhostUserBackend for BlockBackend {
fn max_queue_num(&self) -> usize {
NUM_QUEUES as usize
}
fn features(&self) -> u64 {
self.avail_features
}
fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
let unrequested_features = value & !self.avail_features;
if unrequested_features != 0 {
bail!("invalid features are given: {:#x}", unrequested_features);
}
self.acked_features |= value;
Ok(())
}
fn acked_features(&self) -> u64 {
self.acked_features
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
VhostUserProtocolFeatures::CONFIG
| VhostUserProtocolFeatures::MQ
| VhostUserProtocolFeatures::SLAVE_REQ
}
fn ack_protocol_features(&mut self, features: u64) -> anyhow::Result<()> {
let features = VhostUserProtocolFeatures::from_bits(features)
.ok_or_else(|| anyhow!("invalid protocol features are given: {:#x}", features))?;
let supported = self.protocol_features();
self.acked_protocol_features = features & supported;
Ok(())
}
fn acked_protocol_features(&self) -> u64 {
self.acked_protocol_features.bits()
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
let config_space = {
let disk_size = self.disk_size.load(Ordering::Relaxed);
BlockAsync::build_config_space(disk_size, self.seg_max, self.block_size, NUM_QUEUES)
};
copy_config(data, 0, config_space.as_bytes(), offset);
}
fn reset(&mut self) {
panic!("Unsupported call to reset");
}
fn start_queue(
&mut self,
idx: usize,
queue: virtio::Queue,
mem: GuestMemory,
doorbell: Doorbell,
kick_evt: Event,
) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
self.stop_queue(idx)?;
}
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();
let disk_state = Rc::clone(&self.disk_state);
let timer = Rc::clone(&self.flush_timer);
let timer_armed = Rc::clone(&self.flush_timer_armed);
let queue = Rc::new(RefCell::new(queue));
let queue_task = self.ex.spawn_local(Abortable::new(
handle_queue(
mem,
disk_state,
queue.clone(),
kick_evt,
doorbell,
timer,
timer_armed,
),
registration,
));
self.workers[idx] = Some(WorkerState {
abort_handle: handle,
queue_task,
queue,
});
Ok(())
}
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
if let Some(worker) = self.workers.get_mut(idx).and_then(Option::take) {
worker.abort_handle.abort();
// Wait for queue_task to be aborted.
let _ = self.ex.run_until(async { worker.queue_task.await });
let queue = match Rc::try_unwrap(worker.queue) {
Ok(queue_cell) => queue_cell.into_inner(),
Err(_) => panic!("failed to recover queue from worker"),
};
Ok(queue)
} else {
Err(anyhow::Error::new(DeviceError::WorkerNotFound))
}
}
fn set_backend_req_connection(&mut self, conn: VhostBackendReqConnection) {
let mut backend_req_conn = self.backend_req_conn.lock();
if let VhostBackendReqConnectionState::Connected(_) = &*backend_req_conn {
warn!("Backend Request Connection already established. Overwriting");
}
*backend_req_conn = VhostBackendReqConnectionState::Connected(conn);
}
}