blob: fcdd1cd1bae24a6f96913f060a6bfa5d9115ce48 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Worker that runs in a virtio-video thread.
use std::collections::VecDeque;
use base::{error, Event, PollContext};
use vm_memory::GuestMemory;
use crate::virtio::queue::{DescriptorChain, Queue};
use crate::virtio::resource_bridge::ResourceRequestSocket;
use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
use crate::virtio::video::command::{QueueType, VideoCmd};
use crate::virtio::video::device::{
AsyncCmdResponse, Device, Token, VideoCmdResponseType, VideoEvtResponseType,
};
use crate::virtio::video::event::{self, EvtType, VideoEvt};
use crate::virtio::video::response::{self, Response};
use crate::virtio::video::{Error, Result};
use crate::virtio::{Interrupt, Reader, Writer};
pub struct Worker {
pub interrupt: Interrupt,
pub mem: GuestMemory,
pub cmd_evt: Event,
pub event_evt: Event,
pub kill_evt: Event,
pub resource_bridge: ResourceRequestSocket,
}
/// Pair of a descriptor chain and a response to be written.
type WritableResp = (DescriptorChain, response::CmdResponse);
impl Worker {
/// Writes responses into the command queue.
fn write_responses(
&self,
cmd_queue: &mut Queue,
responses: &mut VecDeque<WritableResp>,
) -> Result<()> {
if responses.is_empty() {
return Ok(());
}
while let Some((desc, response)) = responses.pop_front() {
let desc_index = desc.index;
let mut writer =
Writer::new(self.mem.clone(), desc).map_err(Error::InvalidDescriptorChain)?;
if let Err(e) = response.write(&mut writer) {
error!(
"failed to write a command response for {:?}: {}",
response, e
);
}
cmd_queue.add_used(&self.mem, desc_index, writer.bytes_written() as u32);
}
self.interrupt.signal_used_queue(cmd_queue.vector);
Ok(())
}
/// Writes a `VideoEvt` into the event queue.
fn write_event(&self, event_queue: &mut Queue, event: event::VideoEvt) -> Result<()> {
let desc = event_queue
.peek(&self.mem)
.ok_or_else(|| Error::DescriptorNotAvailable)?;
event_queue.pop_peeked(&self.mem);
let desc_index = desc.index;
let mut writer =
Writer::new(self.mem.clone(), desc).map_err(Error::InvalidDescriptorChain)?;
event
.write(&mut writer)
.map_err(|error| Error::WriteEventFailure { event, error })?;
event_queue.add_used(&self.mem, desc_index, writer.bytes_written() as u32);
self.interrupt.signal_used_queue(event_queue.vector);
Ok(())
}
/// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque`
/// of `WritableResp` to be sent to the guest.
fn handle_command_desc<T: Device>(
&self,
device: &mut T,
poll_ctx: &PollContext<Token>,
desc_map: &mut AsyncCmdDescMap,
desc: DescriptorChain,
) -> Result<VecDeque<WritableResp>> {
let mut responses: VecDeque<WritableResp> = Default::default();
let mut reader =
Reader::new(self.mem.clone(), desc.clone()).map_err(Error::InvalidDescriptorChain)?;
let cmd = VideoCmd::from_reader(&mut reader).map_err(Error::ReadFailure)?;
// If a destruction command comes, cancel pending requests.
// TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this
// into encoder/decoder.
let async_responses = match cmd {
VideoCmd::ResourceDestroyAll {
stream_id,
queue_type,
} => desc_map.create_cancellation_responses(&stream_id, Some(queue_type), None),
VideoCmd::StreamDestroy { stream_id } => {
desc_map.create_cancellation_responses(&stream_id, None, None)
}
VideoCmd::QueueClear {
stream_id,
queue_type: QueueType::Output,
} => {
// TODO(b/153406792): Due to a workaround for a limitation in the VDA api,
// clearing the output queue doesn't go through the same Async path as clearing
// the input queue. However, we still need to cancel the pending resources.
desc_map.create_cancellation_responses(&stream_id, Some(QueueType::Output), None)
}
_ => Default::default(),
};
for async_response in async_responses {
let AsyncCmdResponse {
tag,
response: cmd_result,
} = async_response;
let destroy_desc = desc_map
.remove(&tag)
.ok_or_else(|| Error::UnexpectedResponse(tag))?;
let destroy_response = match cmd_result {
Ok(r) => r,
Err(e) => {
error!("returning async error response: {}", &e);
e.into()
}
};
responses.push_back((destroy_desc, destroy_response));
}
// Process the command by the device.
let process_cmd_response = device.process_cmd(cmd, &poll_ctx, &self.resource_bridge);
match process_cmd_response {
Ok(VideoCmdResponseType::Sync(r)) => {
responses.push_back((desc, r));
}
Ok(VideoCmdResponseType::Async(tag)) => {
// If the command expects an asynchronous response,
// store `desc` to use it after the back-end device notifies the
// completion.
desc_map.insert(tag, desc);
}
Err(e) => {
error!("returning error response: {}", &e);
responses.push_back((desc, e.into()));
}
}
Ok(responses)
}
/// Handles each command in the command queue.
fn handle_command_queue<T: Device>(
&self,
cmd_queue: &mut Queue,
device: &mut T,
poll_ctx: &PollContext<Token>,
desc_map: &mut AsyncCmdDescMap,
) -> Result<()> {
let _ = self.cmd_evt.read();
while let Some(desc) = cmd_queue.pop(&self.mem) {
let mut resps = self.handle_command_desc(device, poll_ctx, desc_map, desc)?;
self.write_responses(cmd_queue, &mut resps)?;
}
Ok(())
}
/// Handles an event notified via an event.
fn handle_event<T: Device>(
&self,
cmd_queue: &mut Queue,
event_queue: &mut Queue,
device: &mut T,
desc_map: &mut AsyncCmdDescMap,
stream_id: u32,
) -> Result<()> {
let mut responses: VecDeque<WritableResp> = Default::default();
if let Some(event_responses) = device.process_event(desc_map, stream_id) {
for event_response in event_responses {
match event_response {
VideoEvtResponseType::AsyncCmd(async_response) => {
let AsyncCmdResponse {
tag,
response: cmd_result,
} = async_response;
let desc = desc_map
.remove(&tag)
.ok_or_else(|| Error::UnexpectedResponse(tag))?;
let cmd_response = match cmd_result {
Ok(r) => r,
Err(e) => {
error!("returning async error response: {}", &e);
e.into()
}
};
responses.push_back((desc, cmd_response));
}
VideoEvtResponseType::Event(evt) => {
self.write_event(event_queue, evt)?;
}
}
}
}
if let Err(e) = self.write_responses(cmd_queue, &mut responses) {
error!("Failed to write event responses: {:?}", e);
// Ignore result of write_event for a fatal error.
let _ = self.write_event(
event_queue,
VideoEvt {
typ: EvtType::Error,
stream_id,
},
);
return Err(e);
}
Ok(())
}
pub fn run<T: Device>(
&mut self,
mut cmd_queue: Queue,
mut event_queue: Queue,
mut device: T,
) -> Result<()> {
let poll_ctx: PollContext<Token> = PollContext::build_with(&[
(&self.cmd_evt, Token::CmdQueue),
(&self.event_evt, Token::EventQueue),
(&self.kill_evt, Token::Kill),
(self.interrupt.get_resample_evt(), Token::InterruptResample),
])
.map_err(Error::PollContextCreationFailed)?;
// Stores descriptors in which responses for asynchronous commands will be written.
let mut desc_map: AsyncCmdDescMap = Default::default();
loop {
let poll_events = poll_ctx.wait().map_err(Error::PollError)?;
for poll_event in poll_events.iter_readable() {
match poll_event.token() {
Token::CmdQueue => {
self.handle_command_queue(
&mut cmd_queue,
&mut device,
&poll_ctx,
&mut desc_map,
)?;
}
Token::EventQueue => {
let _ = self.event_evt.read();
}
Token::Event { id } => {
self.handle_event(
&mut cmd_queue,
&mut event_queue,
&mut device,
&mut desc_map,
id,
)?;
}
Token::InterruptResample => {
self.interrupt.interrupt_resample();
}
Token::Kill => return Ok(()),
}
}
}
}
}