blob: 6c1052f9f492f9e04a57ea4073bda2a990eada70 [file] [log] [blame]
// Copyright 2017 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod sys;
use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
use balloon_control::BalloonStats;
use balloon_control::BalloonTubeCommand;
use balloon_control::BalloonTubeResult;
use base::error;
use base::warn;
use base::AsRawDescriptor;
use base::Event;
use base::RawDescriptor;
use base::SendTube;
use base::Tube;
use base::WorkerThread;
use cros_async::block_on;
use cros_async::select9;
use cros_async::sync::Mutex as AsyncMutex;
use cros_async::AsyncTube;
use cros_async::EventAsync;
use cros_async::Executor;
use cros_async::SendTubeAsync;
use data_model::Le16;
use data_model::Le32;
use data_model::Le64;
use futures::channel::mpsc;
use futures::pin_mut;
use futures::FutureExt;
use futures::StreamExt;
use remain::sorted;
use thiserror::Error as ThisError;
use vm_control::RegisteredEvent;
use vm_memory::GuestAddress;
use vm_memory::GuestMemory;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
use super::async_utils;
use super::copy_config;
use super::descriptor_utils;
use super::DescriptorChain;
use super::DeviceType;
use super::Interrupt;
use super::Queue;
use super::Reader;
use super::SignalableInterrupt;
use super::VirtioDevice;
use crate::Suspendable;
use crate::UnpinRequest;
use crate::UnpinResponse;
#[sorted]
#[derive(ThisError, Debug)]
pub enum BalloonError {
/// Failed an async await
#[error("failed async await: {0}")]
AsyncAwait(cros_async::AsyncError),
/// Failed to create event.
#[error("failed to create event: {0}")]
CreatingEvent(base::Error),
/// Failed to create async message receiver.
#[error("failed to create async message receiver: {0}")]
CreatingMessageReceiver(base::TubeError),
/// Failed to receive command message.
#[error("failed to receive command message: {0}")]
ReceivingCommand(base::TubeError),
/// Failed to send command response.
#[error("failed to send command response: {0}")]
SendResponse(base::TubeError),
/// Failed to write config event.
#[error("failed to write config event: {0}")]
WritingConfigEvent(base::Error),
}
pub type Result<T> = std::result::Result<T, BalloonError>;
// Balloon implements four virt IO queues: Inflate, Deflate, Stats, Event.
const QUEUE_SIZE: u16 = 128;
const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
const VIRTIO_BALLOON_PFN_SHIFT: u32 = 12;
const VIRTIO_BALLOON_PF_SIZE: u64 = 1 << VIRTIO_BALLOON_PFN_SHIFT;
// The feature bitmap for virtio balloon
const VIRTIO_BALLOON_F_MUST_TELL_HOST: u32 = 0; // Tell before reclaiming pages
const VIRTIO_BALLOON_F_STATS_VQ: u32 = 1; // Stats reporting enabled
const VIRTIO_BALLOON_F_DEFLATE_ON_OOM: u32 = 2; // Deflate balloon on OOM
const VIRTIO_BALLOON_F_PAGE_REPORTING: u32 = 5; // Page reporting virtqueue
#[derive(Copy, Clone)]
#[repr(u32)]
// Balloon virtqueues
pub enum BalloonFeatures {
// Page Reporting enabled
PageReporting = VIRTIO_BALLOON_F_PAGE_REPORTING,
}
// These feature bits are part of the proposal:
// https://lists.oasis-open.org/archives/virtio-comment/202201/msg00139.html
const VIRTIO_BALLOON_F_RESPONSIVE_DEVICE: u32 = 6; // Device actively watching guest memory
const VIRTIO_BALLOON_F_EVENTS_VQ: u32 = 7; // Event vq is enabled
// virtio_balloon_config is the balloon device configuration space defined by the virtio spec.
#[derive(Copy, Clone, Debug, Default, AsBytes, FromBytes)]
#[repr(C)]
struct virtio_balloon_config {
num_pages: Le32,
actual: Le32,
}
// BalloonState is shared by the worker and device thread.
#[derive(Default)]
struct BalloonState {
num_pages: u32,
actual_pages: u32,
// Flag indicating that the balloon is in the process of a failable update. This
// is set by an Adjust command that has allow_failure set, and is cleared when the
// Adjusted success/failure response is sent.
failable_update: bool,
pending_adjusted_responses: VecDeque<u32>,
}
// The constants defining stats types in virtio_baloon_stat
const VIRTIO_BALLOON_S_SWAP_IN: u16 = 0;
const VIRTIO_BALLOON_S_SWAP_OUT: u16 = 1;
const VIRTIO_BALLOON_S_MAJFLT: u16 = 2;
const VIRTIO_BALLOON_S_MINFLT: u16 = 3;
const VIRTIO_BALLOON_S_MEMFREE: u16 = 4;
const VIRTIO_BALLOON_S_MEMTOT: u16 = 5;
const VIRTIO_BALLOON_S_AVAIL: u16 = 6;
const VIRTIO_BALLOON_S_CACHES: u16 = 7;
const VIRTIO_BALLOON_S_HTLB_PGALLOC: u16 = 8;
const VIRTIO_BALLOON_S_HTLB_PGFAIL: u16 = 9;
const VIRTIO_BALLOON_S_NONSTANDARD_SHMEM: u16 = 65534;
const VIRTIO_BALLOON_S_NONSTANDARD_UNEVICTABLE: u16 = 65535;
// BalloonStat is used to deserialize stats from the stats_queue.
#[derive(Copy, Clone, FromBytes, AsBytes)]
#[repr(C, packed)]
struct BalloonStat {
tag: Le16,
val: Le64,
}
impl BalloonStat {
fn update_stats(&self, stats: &mut BalloonStats) {
let val = Some(self.val.to_native());
match self.tag.to_native() {
VIRTIO_BALLOON_S_SWAP_IN => stats.swap_in = val,
VIRTIO_BALLOON_S_SWAP_OUT => stats.swap_out = val,
VIRTIO_BALLOON_S_MAJFLT => stats.major_faults = val,
VIRTIO_BALLOON_S_MINFLT => stats.minor_faults = val,
VIRTIO_BALLOON_S_MEMFREE => stats.free_memory = val,
VIRTIO_BALLOON_S_MEMTOT => stats.total_memory = val,
VIRTIO_BALLOON_S_AVAIL => stats.available_memory = val,
VIRTIO_BALLOON_S_CACHES => stats.disk_caches = val,
VIRTIO_BALLOON_S_HTLB_PGALLOC => stats.hugetlb_allocations = val,
VIRTIO_BALLOON_S_HTLB_PGFAIL => stats.hugetlb_failures = val,
VIRTIO_BALLOON_S_NONSTANDARD_SHMEM => stats.shared_memory = val,
VIRTIO_BALLOON_S_NONSTANDARD_UNEVICTABLE => stats.unevictable_memory = val,
_ => (),
}
}
}
const VIRTIO_BALLOON_EVENT_PRESSURE: u32 = 1;
const VIRTIO_BALLOON_EVENT_PUFF_FAILURE: u32 = 2;
#[repr(C)]
#[derive(Copy, Clone, Default, AsBytes, FromBytes)]
struct virtio_balloon_event_header {
evt_type: Le32,
}
fn invoke_desc_handler<F>(ranges: Vec<(u64, u64)>, desc_handler: &mut F)
where
F: FnMut(GuestAddress, u64),
{
for range in ranges {
desc_handler(GuestAddress(range.0), range.1);
}
}
// Release a list of guest memory ranges back to the host system.
// Unpin requests for each inflate range will be sent via `release_memory_tube`
// if provided, and then `desc_handler` will be called for each inflate range.
fn release_ranges<F>(
release_memory_tube: &Option<Tube>,
inflate_ranges: Vec<(u64, u64)>,
desc_handler: &mut F,
) -> descriptor_utils::Result<()>
where
F: FnMut(GuestAddress, u64),
{
if let Some(tube) = release_memory_tube {
let unpin_ranges = inflate_ranges
.iter()
.map(|v| {
(
v.0 >> VIRTIO_BALLOON_PFN_SHIFT,
v.1 / VIRTIO_BALLOON_PF_SIZE,
)
})
.collect();
let req = UnpinRequest {
ranges: unpin_ranges,
};
if let Err(e) = tube.send(&req) {
error!("failed to send unpin request: {}", e);
} else {
match tube.recv() {
Ok(resp) => match resp {
UnpinResponse::Success => invoke_desc_handler(inflate_ranges, desc_handler),
UnpinResponse::Failed => error!("failed to handle unpin request"),
},
Err(e) => error!("failed to handle get unpin response: {}", e),
}
}
} else {
invoke_desc_handler(inflate_ranges, desc_handler);
}
Ok(())
}
// Processes one message's list of addresses.
fn handle_address_chain<F>(
release_memory_tube: &Option<Tube>,
avail_desc: DescriptorChain,
mem: &GuestMemory,
desc_handler: &mut F,
) -> descriptor_utils::Result<()>
where
F: FnMut(GuestAddress, u64),
{
// In a long-running system, there is no reason to expect that
// a significant number of freed pages are consecutive. However,
// batching is relatively simple and can result in significant
// gains in a newly booted system, so it's worth attempting.
let mut range_start = 0;
let mut range_size = 0;
let mut reader = Reader::new(mem.clone(), avail_desc)?;
let mut inflate_ranges: Vec<(u64, u64)> = Vec::new();
for res in reader.iter::<Le32>() {
let pfn = match res {
Ok(pfn) => pfn,
Err(e) => {
error!("error while reading unused pages: {}", e);
break;
}
};
let guest_address = (u64::from(pfn.to_native())) << VIRTIO_BALLOON_PFN_SHIFT;
if range_start + range_size == guest_address {
range_size += VIRTIO_BALLOON_PF_SIZE;
} else if range_start == guest_address + VIRTIO_BALLOON_PF_SIZE {
range_start = guest_address;
range_size += VIRTIO_BALLOON_PF_SIZE;
} else {
// Discontinuity, so flush the previous range. Note range_size
// will be 0 on the first iteration, so skip that.
if range_size != 0 {
inflate_ranges.push((range_start, range_size));
}
range_start = guest_address;
range_size = VIRTIO_BALLOON_PF_SIZE;
}
}
if range_size != 0 {
inflate_ranges.push((range_start, range_size));
}
release_ranges(release_memory_tube, inflate_ranges, desc_handler)
}
// Async task that handles the main balloon inflate and deflate queues.
async fn handle_queue<F>(
mem: &GuestMemory,
mut queue: Queue,
mut queue_event: EventAsync,
release_memory_tube: &Option<Tube>,
interrupt: Interrupt,
mut desc_handler: F,
) where
F: FnMut(GuestAddress, u64),
{
loop {
let avail_desc = match queue.next_async(mem, &mut queue_event).await {
Err(e) => {
error!("Failed to read descriptor {}", e);
return;
}
Ok(d) => d,
};
let index = avail_desc.index;
if let Err(e) =
handle_address_chain(release_memory_tube, avail_desc, mem, &mut desc_handler)
{
error!("balloon: failed to process inflate addresses: {}", e);
}
queue.add_used(mem, index, 0);
queue.trigger_interrupt(mem, &interrupt);
}
}
// Processes one page-reporting descriptor.
fn handle_reported_buffer<F>(
release_memory_tube: &Option<Tube>,
avail_desc: DescriptorChain,
desc_handler: &mut F,
) -> descriptor_utils::Result<()>
where
F: FnMut(GuestAddress, u64),
{
let mut reported_ranges: Vec<(u64, u64)> = Vec::new();
let regions = avail_desc.into_iter();
for desc in regions {
let (desc_regions, _exported) = desc.into_mem_regions();
for r in desc_regions {
reported_ranges.push((r.gpa.offset(), r.len));
}
}
release_ranges(release_memory_tube, reported_ranges, desc_handler)
}
// Async task that handles the page reporting queue.
async fn handle_reporting_queue<F>(
mem: &GuestMemory,
mut queue: Queue,
mut queue_event: EventAsync,
release_memory_tube: &Option<Tube>,
interrupt: Interrupt,
mut desc_handler: F,
) where
F: FnMut(GuestAddress, u64),
{
loop {
let avail_desc = match queue.next_async(mem, &mut queue_event).await {
Err(e) => {
error!("Failed to read descriptor {}", e);
return;
}
Ok(d) => d,
};
let index = avail_desc.index;
if let Err(e) = handle_reported_buffer(release_memory_tube, avail_desc, &mut desc_handler) {
error!("balloon: failed to process reported buffer: {}", e);
}
queue.add_used(mem, index, 0);
queue.trigger_interrupt(mem, &interrupt);
}
}
fn parse_balloon_stats(reader: &mut Reader) -> BalloonStats {
let mut stats: BalloonStats = Default::default();
for res in reader.iter::<BalloonStat>() {
match res {
Ok(stat) => stat.update_stats(&mut stats),
Err(e) => {
error!("error while reading stats: {}", e);
break;
}
};
}
stats
}
// Async task that handles the stats queue. Note that the cadence of this is driven by requests for
// balloon stats from the control pipe.
// The guests queues an initial buffer on boot, which is read and then this future will block until
// signaled from the command socket that stats should be collected again.
async fn handle_stats_queue(
mem: &GuestMemory,
mut queue: Queue,
mut queue_event: EventAsync,
mut stats_rx: mpsc::Receiver<u64>,
command_tube: &AsyncTube,
state: Arc<AsyncMutex<BalloonState>>,
interrupt: Interrupt,
) {
// Consume the first stats buffer sent from the guest at startup. It was not
// requested by anyone, and the stats are stale.
let mut index = match queue.next_async(mem, &mut queue_event).await {
Err(e) => {
error!("Failed to read descriptor {}", e);
return;
}
Ok(d) => d.index,
};
loop {
// Wait for a request to read the stats.
let id = match stats_rx.next().await {
Some(id) => id,
None => {
error!("stats signal tube was closed");
break;
}
};
// Request a new stats_desc to the guest.
queue.add_used(mem, index, 0);
queue.trigger_interrupt(mem, &interrupt);
let stats_desc = match queue.next_async(mem, &mut queue_event).await {
Err(e) => {
error!("Failed to read descriptor {}", e);
return;
}
Ok(d) => d,
};
index = stats_desc.index;
let mut reader = match Reader::new(mem.clone(), stats_desc) {
Ok(r) => r,
Err(e) => {
error!("balloon: failed to CREATE Reader: {}", e);
continue;
}
};
let stats = parse_balloon_stats(&mut reader);
let actual_pages = state.lock().await.actual_pages as u64;
let result = BalloonTubeResult::Stats {
balloon_actual: actual_pages << VIRTIO_BALLOON_PFN_SHIFT,
stats,
id,
};
let send_result = command_tube.send(result).await;
if let Err(e) = send_result {
error!("failed to send stats result: {}", e);
}
}
}
async fn send_adjusted_response(
tube: &AsyncTube,
num_pages: u32,
) -> std::result::Result<(), base::TubeError> {
let num_bytes = (num_pages as u64) << VIRTIO_BALLOON_PFN_SHIFT;
let result = BalloonTubeResult::Adjusted { num_bytes };
tube.send(result).await
}
async fn handle_event(
state: Arc<AsyncMutex<BalloonState>>,
interrupt: Interrupt,
r: &mut Reader,
command_tube: &AsyncTube,
) -> Result<()> {
match r.read_obj::<virtio_balloon_event_header>() {
Ok(hdr) => match hdr.evt_type.to_native() {
VIRTIO_BALLOON_EVENT_PRESSURE => {
// TODO(b/213962590): See how this can be integrated this into memory rebalancing
}
VIRTIO_BALLOON_EVENT_PUFF_FAILURE => {
let mut state = state.lock().await;
if state.failable_update {
state.num_pages = state.actual_pages;
interrupt.signal_config_changed();
state.failable_update = false;
send_adjusted_response(command_tube, state.actual_pages)
.await
.map_err(BalloonError::SendResponse)?;
}
}
_ => {
warn!("Unknown event {}", hdr.evt_type.to_native());
}
},
Err(e) => error!("Failed to parse event header {:?}", e),
}
Ok(())
}
// Async task that handles the events queue.
async fn handle_events_queue(
mem: &GuestMemory,
mut queue: Queue,
mut queue_event: EventAsync,
state: Arc<AsyncMutex<BalloonState>>,
interrupt: Interrupt,
command_tube: &AsyncTube,
) -> Result<()> {
loop {
let avail_desc = queue
.next_async(mem, &mut queue_event)
.await
.map_err(BalloonError::AsyncAwait)?;
let index = avail_desc.index;
match Reader::new(mem.clone(), avail_desc) {
Ok(mut r) => {
handle_event(state.clone(), interrupt.clone(), &mut r, command_tube).await?
}
Err(e) => error!("balloon: failed to CREATE Reader: {}", e),
};
queue.add_used(mem, index, 0);
queue.trigger_interrupt(mem, &interrupt);
}
}
// Async task that handles the command socket. The command socket handles messages from the host
// requesting that the guest balloon be adjusted or to report guest memory statistics.
async fn handle_command_tube(
command_tube: &AsyncTube,
interrupt: Interrupt,
state: Arc<AsyncMutex<BalloonState>>,
mut stats_tx: mpsc::Sender<u64>,
registered_evt_q: Option<SendTubeAsync>,
) -> Result<()> {
loop {
match command_tube.next().await {
Ok(command) => match command {
BalloonTubeCommand::Adjust {
num_bytes,
allow_failure,
} => {
let num_pages = (num_bytes >> VIRTIO_BALLOON_PFN_SHIFT) as u32;
let mut state = state.lock().await;
state.num_pages = num_pages;
interrupt.signal_config_changed();
if allow_failure {
if num_pages == state.actual_pages {
send_adjusted_response(command_tube, num_pages)
.await
.map_err(BalloonError::SendResponse)?;
} else {
state.failable_update = true;
}
}
if let Some(registered_evt_q) = &registered_evt_q {
if let Err(e) = registered_evt_q
.send(&RegisteredEvent::VirtioBalloonResize)
.await
{
error!("failed to send VirtioBalloonResize event: {}", e);
}
}
}
BalloonTubeCommand::Stats { id } => {
if let Err(e) = stats_tx.try_send(id) {
error!("failed to signal the stat handler: {}", e);
}
}
},
Err(e) => {
return Err(BalloonError::ReceivingCommand(e));
}
}
}
}
async fn handle_pending_adjusted_responses(
pending_adjusted_response_event: EventAsync,
command_tube: &AsyncTube,
state: Arc<AsyncMutex<BalloonState>>,
) -> Result<()> {
loop {
pending_adjusted_response_event
.next_val()
.await
.map_err(BalloonError::AsyncAwait)?;
while let Some(num_pages) = state.lock().await.pending_adjusted_responses.pop_front() {
send_adjusted_response(command_tube, num_pages)
.await
.map_err(BalloonError::SendResponse)?;
}
}
}
// The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
// to be processed.
fn run_worker(
inflate_queue: (Queue, Event),
deflate_queue: (Queue, Event),
stats_queue: Option<(Queue, Event)>,
reporting_queue: Option<(Queue, Event)>,
events_queue: Option<(Queue, Event)>,
command_tube: Tube,
#[cfg(windows)] dynamic_mapping_tube: Tube,
release_memory_tube: Option<Tube>,
interrupt: Interrupt,
kill_evt: Event,
pending_adjusted_response_event: Event,
mem: GuestMemory,
state: Arc<AsyncMutex<BalloonState>>,
registered_evt_q: Option<SendTube>,
) -> (Option<Tube>, Tube, Option<SendTube>) {
let ex = Executor::new().unwrap();
let command_tube = AsyncTube::new(&ex, command_tube).unwrap();
let registered_evt_q_async = registered_evt_q
.as_ref()
.map(|q| SendTubeAsync::new(q.try_clone().unwrap(), &ex).unwrap());
// We need a block to release all references to command_tube at the end before returning it.
{
// The first queue is used for inflate messages
let inflate = handle_queue(
&mem,
inflate_queue.0,
EventAsync::new(inflate_queue.1, &ex).expect("failed to create async event"),
&release_memory_tube,
interrupt.clone(),
|guest_address, len| {
sys::free_memory(
&guest_address,
len,
#[cfg(windows)]
&dynamic_mapping_tube,
#[cfg(unix)]
&mem,
)
},
);
pin_mut!(inflate);
// The second queue is used for deflate messages
let deflate = handle_queue(
&mem,
deflate_queue.0,
EventAsync::new(deflate_queue.1, &ex).expect("failed to create async event"),
&None,
interrupt.clone(),
|guest_address, len| {
sys::reclaim_memory(
&guest_address,
len,
#[cfg(windows)]
&dynamic_mapping_tube,
)
},
);
pin_mut!(deflate);
// The next queue is used for stats messages if VIRTIO_BALLOON_F_STATS_VQ is negotiated.
// The message type is the id of the stats request, so we can detect if there are any stale
// stats results that were queued during an error condition.
let (stats_tx, stats_rx) = mpsc::channel::<u64>(1);
let stats = if let Some((stats_queue, stats_queue_evt)) = stats_queue {
handle_stats_queue(
&mem,
stats_queue,
EventAsync::new(stats_queue_evt, &ex).expect("failed to create async event"),
stats_rx,
&command_tube,
state.clone(),
interrupt.clone(),
)
.left_future()
} else {
std::future::pending().right_future()
};
pin_mut!(stats);
// The next queue is used for reporting messages
let reporting = if let Some((reporting_queue, reporting_queue_evt)) = reporting_queue {
handle_reporting_queue(
&mem,
reporting_queue,
EventAsync::new(reporting_queue_evt, &ex).expect("failed to create async event"),
&release_memory_tube,
interrupt.clone(),
|guest_address, len| {
sys::free_memory(
&guest_address,
len,
#[cfg(windows)]
&dynamic_mapping_tube,
#[cfg(unix)]
&mem,
)
},
)
.left_future()
} else {
std::future::pending().right_future()
};
pin_mut!(reporting);
// Future to handle command messages that resize the balloon.
let command = handle_command_tube(
&command_tube,
interrupt.clone(),
state.clone(),
stats_tx,
registered_evt_q_async,
);
pin_mut!(command);
// Process any requests to resample the irq value.
let resample = async_utils::handle_irq_resample(&ex, interrupt.clone());
pin_mut!(resample);
// Exit if the kill event is triggered.
let kill = async_utils::await_and_exit(&ex, kill_evt);
pin_mut!(kill);
// The next queue is used for events if VIRTIO_BALLOON_F_EVENTS_VQ is negotiated.
let events = if let Some((events_queue, events_queue_evt)) = events_queue {
handle_events_queue(
&mem,
events_queue,
EventAsync::new(events_queue_evt, &ex).expect("failed to create async event"),
state.clone(),
interrupt,
&command_tube,
)
.left_future()
} else {
std::future::pending().right_future()
};
pin_mut!(events);
let pending_adjusted = handle_pending_adjusted_responses(
EventAsync::new(pending_adjusted_response_event, &ex)
.expect("failed to create async event"),
&command_tube,
state,
);
pin_mut!(pending_adjusted);
if let Err(e) = ex
.run_until(select9(
inflate,
deflate,
stats,
reporting,
command,
resample,
kill,
events,
pending_adjusted,
))
.map(|_| ())
{
error!("error happened in executor: {}", e);
}
}
(release_memory_tube, command_tube.into(), registered_evt_q)
}
/// Virtio device for memory balloon inflation/deflation.
pub struct Balloon {
command_tube: Option<Tube>,
#[cfg(windows)]
dynamic_mapping_tube: Option<Tube>,
release_memory_tube: Option<Tube>,
pending_adjusted_response_event: Event,
state: Arc<AsyncMutex<BalloonState>>,
features: u64,
acked_features: u64,
worker_thread: Option<WorkerThread<(Option<Tube>, Tube, Option<SendTube>)>>,
registered_evt_q: Option<SendTube>,
}
/// Operation mode of the balloon.
#[derive(PartialEq, Eq)]
pub enum BalloonMode {
/// The driver can access pages in the balloon (i.e. F_DEFLATE_ON_OOM)
Relaxed,
/// The driver cannot access pages in the balloon. Implies F_RESPONSIVE_DEVICE.
Strict,
}
impl Balloon {
/// Creates a new virtio balloon device.
/// To let Balloon able to successfully release the memory which are pinned
/// by CoIOMMU to host, the release_memory_tube will be used to send the inflate
/// ranges to CoIOMMU with UnpinRequest/UnpinResponse messages, so that The
/// memory in the inflate range can be unpinned first.
pub fn new(
base_features: u64,
command_tube: Tube,
#[cfg(windows)] dynamic_mapping_tube: Tube,
release_memory_tube: Option<Tube>,
init_balloon_size: u64,
mode: BalloonMode,
enabled_features: u64,
registered_evt_q: Option<SendTube>,
) -> Result<Balloon> {
let features = base_features
| 1 << VIRTIO_BALLOON_F_MUST_TELL_HOST
| 1 << VIRTIO_BALLOON_F_STATS_VQ
| 1 << VIRTIO_BALLOON_F_EVENTS_VQ
| enabled_features
| if mode == BalloonMode::Strict {
1 << VIRTIO_BALLOON_F_RESPONSIVE_DEVICE
} else {
1 << VIRTIO_BALLOON_F_DEFLATE_ON_OOM
};
Ok(Balloon {
command_tube: Some(command_tube),
#[cfg(windows)]
dynamic_mapping_tube: Some(dynamic_mapping_tube),
release_memory_tube,
pending_adjusted_response_event: Event::new().map_err(BalloonError::CreatingEvent)?,
state: Arc::new(AsyncMutex::new(BalloonState {
num_pages: (init_balloon_size >> VIRTIO_BALLOON_PFN_SHIFT) as u32,
actual_pages: 0,
failable_update: false,
pending_adjusted_responses: VecDeque::new(),
})),
worker_thread: None,
features,
acked_features: 0,
registered_evt_q,
})
}
fn get_config(&self) -> virtio_balloon_config {
let state = block_on(self.state.lock());
virtio_balloon_config {
num_pages: state.num_pages.into(),
actual: state.actual_pages.into(),
}
}
fn num_expected_queues(acked_features: u64) -> usize {
// mandatory inflate and deflate queues plus any optional ack'ed queues
let queue_bits = (1 << VIRTIO_BALLOON_F_STATS_VQ)
| (1 << VIRTIO_BALLOON_F_EVENTS_VQ)
| (1 << VIRTIO_BALLOON_F_PAGE_REPORTING);
2 + (acked_features & queue_bits as u64).count_ones() as usize
}
}
impl VirtioDevice for Balloon {
fn keep_rds(&self) -> Vec<RawDescriptor> {
let mut rds = Vec::new();
if let Some(command_tube) = &self.command_tube {
rds.push(command_tube.as_raw_descriptor());
}
if let Some(release_memory_tube) = &self.release_memory_tube {
rds.push(release_memory_tube.as_raw_descriptor());
}
rds.push(self.pending_adjusted_response_event.as_raw_descriptor());
rds
}
fn device_type(&self) -> DeviceType {
DeviceType::Balloon
}
fn queue_max_sizes(&self) -> &[u16] {
QUEUE_SIZES
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
copy_config(data, 0, self.get_config().as_bytes(), offset);
}
fn write_config(&mut self, offset: u64, data: &[u8]) {
let mut config = self.get_config();
copy_config(config.as_bytes_mut(), offset, data, 0);
let mut state = block_on(self.state.lock());
state.actual_pages = config.actual.to_native();
if state.failable_update && state.actual_pages == state.num_pages {
state.failable_update = false;
let num_pages = state.num_pages;
state.pending_adjusted_responses.push_back(num_pages);
let _ = self.pending_adjusted_response_event.signal();
}
}
fn features(&self) -> u64 {
self.features
}
fn ack_features(&mut self, mut value: u64) {
if value & !self.features != 0 {
warn!("virtio_balloon got unknown feature ack {:x}", value);
value &= self.features;
}
self.acked_features |= value;
}
fn activate(
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
mut queues: Vec<(Queue, Event)>,
) -> anyhow::Result<()> {
let expected_queues = Balloon::num_expected_queues(self.acked_features);
if queues.len() != expected_queues {
return Err(anyhow!(
"expected {} queues, got {}",
expected_queues,
queues.len()
));
}
let inflate_queue = queues.remove(0);
let deflate_queue = queues.remove(0);
let stats_queue = if self.acked_features & (1 << VIRTIO_BALLOON_F_STATS_VQ) != 0 {
Some(queues.remove(0))
} else {
None
};
let reporting_queue = if self.acked_features & (1 << VIRTIO_BALLOON_F_PAGE_REPORTING) != 0 {
Some(queues.remove(0))
} else {
None
};
let events_queue = if self.acked_features & (1 << VIRTIO_BALLOON_F_EVENTS_VQ) != 0 {
Some(queues.remove(0))
} else {
None
};
let state = self.state.clone();
let command_tube = self.command_tube.take().unwrap();
#[cfg(windows)]
let mapping_tube = self.dynamic_mapping_tube.take().unwrap();
let release_memory_tube = self.release_memory_tube.take();
let registered_evt_q = self.registered_evt_q.take();
let pending_adjusted_response_event = self
.pending_adjusted_response_event
.try_clone()
.context("failed to clone Event")?;
self.worker_thread = Some(WorkerThread::start("v_balloon", move |kill_evt| {
run_worker(
inflate_queue,
deflate_queue,
stats_queue,
reporting_queue,
events_queue,
command_tube,
#[cfg(windows)]
mapping_tube,
release_memory_tube,
interrupt,
kill_evt,
pending_adjusted_response_event,
mem,
state,
registered_evt_q,
)
}));
Ok(())
}
fn reset(&mut self) -> bool {
if let Some(worker_thread) = self.worker_thread.take() {
let (release_memory_tube, command_tube, registered_evt_q) = worker_thread.stop();
self.release_memory_tube = release_memory_tube;
self.command_tube = Some(command_tube);
self.registered_evt_q = registered_evt_q;
return true;
}
false
}
}
impl Suspendable for Balloon {}
#[cfg(test)]
mod tests {
use super::*;
use crate::virtio::descriptor_utils::create_descriptor_chain;
use crate::virtio::descriptor_utils::DescriptorType;
#[test]
fn desc_parsing_inflate() {
// Check that the memory addresses are parsed correctly by 'handle_address_chain' and passed
// to the closure.
let memory_start_addr = GuestAddress(0x0);
let memory = GuestMemory::new(&[(memory_start_addr, 0x10000)]).unwrap();
memory
.write_obj_at_addr(0x10u32, GuestAddress(0x100))
.unwrap();
memory
.write_obj_at_addr(0xaa55aa55u32, GuestAddress(0x104))
.unwrap();
let chain = create_descriptor_chain(
&memory,
GuestAddress(0x0),
GuestAddress(0x100),
vec![(DescriptorType::Readable, 8)],
0,
)
.expect("create_descriptor_chain failed");
let mut addrs = Vec::new();
let res = handle_address_chain(&None, chain, &memory, &mut |guest_address, len| {
addrs.push((guest_address, len));
});
assert!(res.is_ok());
assert_eq!(addrs.len(), 2);
assert_eq!(
addrs[0].0,
GuestAddress(0x10u64 << VIRTIO_BALLOON_PFN_SHIFT)
);
assert_eq!(
addrs[1].0,
GuestAddress(0xaa55aa55u64 << VIRTIO_BALLOON_PFN_SHIFT)
);
}
#[test]
fn num_expected_queues() {
let to_feature_bits =
|features: &[u32]| -> u64 { features.iter().fold(0, |acc, f| acc | (1_u64 << f)) };
assert_eq!(2, Balloon::num_expected_queues(0));
assert_eq!(
2,
Balloon::num_expected_queues(to_feature_bits(&[VIRTIO_BALLOON_F_MUST_TELL_HOST]))
);
assert_eq!(
3,
Balloon::num_expected_queues(to_feature_bits(&[VIRTIO_BALLOON_F_STATS_VQ]))
);
assert_eq!(
5,
Balloon::num_expected_queues(to_feature_bits(&[
VIRTIO_BALLOON_F_STATS_VQ,
VIRTIO_BALLOON_F_EVENTS_VQ,
VIRTIO_BALLOON_F_PAGE_REPORTING
]))
);
}
}