blob: 9a4da6cf445610fe5f75c225fd1104fdd2cab146 [file] [log] [blame]
// Copyright 2019 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::fs::File;
use std::io;
use std::sync::Arc;
use base::{error, Event, PollToken, WaitContext};
use fuse::filesystem::{FileSystem, ZeroCopyReader, ZeroCopyWriter};
use vm_memory::GuestMemory;
use crate::virtio::fs::{Error, Result};
use crate::virtio::{Interrupt, Queue, Reader, Writer};
impl fuse::Reader for Reader {}
impl fuse::Writer for Writer {
fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
where
F: Fn(&mut Self) -> io::Result<usize>,
{
let mut writer = Writer::split_at(self, offset);
f(&mut writer)
}
fn has_sufficient_buffer(&self, size: u32) -> bool {
self.available_bytes() >= size as usize
}
}
impl ZeroCopyReader for Reader {
fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
self.read_to_at(f, count, off)
}
}
impl ZeroCopyWriter for Writer {
fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
self.write_from_at(f, count, off)
}
}
pub struct Worker<F: FileSystem + Sync> {
mem: GuestMemory,
queue: Queue,
server: Arc<fuse::Server<F>>,
irq: Arc<Interrupt>,
}
impl<F: FileSystem + Sync> Worker<F> {
pub fn new(
mem: GuestMemory,
queue: Queue,
server: Arc<fuse::Server<F>>,
irq: Arc<Interrupt>,
) -> Worker<F> {
Worker {
mem,
queue,
server,
irq,
}
}
fn process_queue(&mut self) -> Result<()> {
let mut needs_interrupt = false;
while let Some(avail_desc) = self.queue.pop(&self.mem) {
let reader = Reader::new(self.mem.clone(), avail_desc.clone())
.map_err(Error::InvalidDescriptorChain)?;
let writer = Writer::new(self.mem.clone(), avail_desc.clone())
.map_err(Error::InvalidDescriptorChain)?;
let total = self.server.handle_message(reader, writer)?;
self.queue
.add_used(&self.mem, avail_desc.index, total as u32);
needs_interrupt = true;
}
if needs_interrupt {
self.irq.signal_used_queue(self.queue.vector);
}
Ok(())
}
pub fn run(
&mut self,
queue_evt: Event,
kill_evt: Event,
watch_resample_event: bool,
) -> Result<()> {
#[derive(PollToken)]
enum Token {
// A request is ready on the queue.
QueueReady,
// Check if any interrupts need to be re-asserted.
InterruptResample,
// The parent thread requested an exit.
Kill,
}
let wait_ctx =
WaitContext::build_with(&[(&queue_evt, Token::QueueReady), (&kill_evt, Token::Kill)])
.map_err(Error::CreateWaitContext)?;
if watch_resample_event {
wait_ctx
.add(self.irq.get_resample_evt(), Token::InterruptResample)
.map_err(Error::CreateWaitContext)?;
}
loop {
let events = wait_ctx.wait().map_err(Error::WaitError)?;
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::QueueReady => {
queue_evt.read().map_err(Error::ReadQueueEvent)?;
if let Err(e) = self.process_queue() {
error!("virtio-fs transport error: {}", e);
return Err(e);
}
}
Token::InterruptResample => {
self.irq.interrupt_resample();
}
Token::Kill => return Ok(()),
}
}
}
}
}