blob: 77d5333f59afe0038d7baf3bc35420955836348a [file] [log] [blame]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::fmt;
use std::io;
use std::io::Read;
use std::io::Write;
use std::rc::Rc;
use async_trait::async_trait;
use audio_streams::capture::AsyncCaptureBuffer;
use audio_streams::AsyncPlaybackBuffer;
use base::debug;
use base::error;
use cros_async::sync::Condvar;
use cros_async::sync::Mutex as AsyncMutex;
use cros_async::EventAsync;
use cros_async::Executor;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::pin_mut;
use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use thiserror::Error as ThisError;
use vm_memory::GuestMemory;
#[cfg(windows)]
use win_audio::AudioSharedFormat;
use zerocopy::AsBytes;
use super::Error;
use super::SndData;
use super::WorkerStatus;
use crate::virtio::snd::common::*;
use crate::virtio::snd::common_backend::stream_info::SetParams;
use crate::virtio::snd::common_backend::stream_info::StreamInfo;
use crate::virtio::snd::common_backend::DirectionalStream;
use crate::virtio::snd::common_backend::PcmResponse;
use crate::virtio::snd::constants::*;
use crate::virtio::snd::layout::*;
use crate::virtio::DescriptorChain;
use crate::virtio::Queue;
use crate::virtio::Reader;
use crate::virtio::SignalableInterrupt;
use crate::virtio::Writer;
// TODO(b/246601226): Remove once a generic audio_stream solution that can accpet
// arbitrarily size buffers.
/// Trait to wrap system specific helpers for writing to endpoint playback buffers.
#[async_trait(?Send)]
pub trait PlaybackBufferWriter {
fn new(
guest_period_bytes: usize,
#[cfg(windows)] frame_size: usize,
#[cfg(windows)] frame_rate: usize,
#[cfg(windows)] guest_num_channels: usize,
#[cfg(windows)] audio_shared_format: AudioSharedFormat,
) -> Self
where
Self: Sized;
/// Returns the period of the endpoint device.
fn endpoint_period_bytes(&self) -> usize;
/// Read audio samples from the tx virtqueue.
fn copy_to_buffer(
&mut self,
dst_buf: &mut AsyncPlaybackBuffer<'_>,
reader: &mut Reader,
) -> Result<usize, Error> {
dst_buf.copy_from(reader).map_err(Error::Io)
}
/// Check to see if an additional read from the tx virtqueue is needed during a playback
/// loop. If so, read from the virtqueue.
///
/// Prefill will happen, for example, if the endpoint buffer requires a 513 frame period, but
/// each tx virtqueue read only produces 480 frames.
#[cfg(windows)]
async fn check_and_prefill(
&mut self,
desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
sender: &mut mpsc::UnboundedSender<PcmResponse>,
) -> Result<(), Error>;
}
#[derive(Debug)]
enum VirtioSndPcmCmd {
SetParams { set_params: SetParams },
Prepare,
Start,
Stop,
Release,
}
impl fmt::Display for VirtioSndPcmCmd {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let cmd_code = match self {
VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
};
f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
}
}
#[derive(ThisError, Debug)]
enum VirtioSndPcmCmdError {
#[error("SetParams requires additional parameters")]
SetParams,
#[error("Invalid virtio snd command code")]
InvalidCode,
}
impl TryFrom<u32> for VirtioSndPcmCmd {
type Error = VirtioSndPcmCmdError;
fn try_from(code: u32) -> Result<Self, Self::Error> {
match code {
VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
_ => Err(VirtioSndPcmCmdError::InvalidCode),
}
}
}
impl VirtioSndPcmCmd {
fn with_set_params_and_direction(
set_params: virtio_snd_pcm_set_params,
dir: u8,
) -> VirtioSndPcmCmd {
let buffer_bytes: u32 = set_params.buffer_bytes.into();
let period_bytes: u32 = set_params.period_bytes.into();
VirtioSndPcmCmd::SetParams {
set_params: SetParams {
channels: set_params.channels,
format: from_virtio_sample_format(set_params.format).unwrap(),
frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
buffer_bytes: buffer_bytes as usize,
period_bytes: period_bytes as usize,
dir,
},
}
}
}
// Returns true if the operation is successful. Returns error if there is
// a runtime/internal error
async fn process_pcm_ctrl(
ex: &Executor,
tx_send: &mpsc::UnboundedSender<PcmResponse>,
rx_send: &mpsc::UnboundedSender<PcmResponse>,
streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
cmd: VirtioSndPcmCmd,
writer: &mut Writer,
stream_id: usize,
) -> Result<(), Error> {
let streams = streams.read_lock().await;
let mut stream = match streams.get(stream_id) {
Some(stream_info) => stream_info.lock().await,
None => {
error!(
"Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
stream_id, cmd
);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
};
debug!("{} for stream id={}", cmd, stream_id);
let result = match cmd {
VirtioSndPcmCmd::SetParams { set_params } => {
let result = stream.set_params(set_params).await;
if result.is_ok() {
debug!(
"VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
stream_id, *stream
);
}
result
}
VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
VirtioSndPcmCmd::Start => stream.start().await,
VirtioSndPcmCmd::Stop => stream.stop().await,
VirtioSndPcmCmd::Release => stream.release().await,
};
match result {
Ok(_) => writer
.write_obj(VIRTIO_SND_S_OK)
.map_err(Error::WriteResponse),
Err(Error::OperationNotSupported) => {
error!(
"{} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
cmd, stream_id
);
writer
.write_obj(VIRTIO_SND_S_NOT_SUPP)
.map_err(Error::WriteResponse)
}
Err(e) => {
// Runtime/internal error would be more appropriate, but there's
// no such error type
error!(
"{} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
cmd, stream_id, e
);
writer
.write_obj(VIRTIO_SND_S_IO_ERR)
.map_err(Error::WriteResponse)
}
}
}
async fn write_data(
mut dst_buf: AsyncPlaybackBuffer<'_>,
reader: Option<&mut Reader>,
buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
) -> Result<u32, Error> {
let transferred = match reader {
Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
None => dst_buf
.copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
.map_err(Error::Io)?,
};
if transferred != buffer_writer.endpoint_period_bytes() {
error!(
"Bytes written {} != period_bytes {}",
transferred,
buffer_writer.endpoint_period_bytes()
);
Err(Error::InvalidBufferSize)
} else {
dst_buf.commit().await;
Ok(dst_buf.latency_bytes())
}
}
async fn read_data<'a>(
mut src_buf: AsyncCaptureBuffer<'a>,
writer: Option<&mut Writer>,
period_bytes: usize,
) -> Result<u32, Error> {
let transferred = match writer {
Some(writer) => src_buf.copy_to(writer),
None => src_buf.copy_to(&mut io::sink()),
}
.map_err(Error::Io)?;
if transferred != period_bytes {
error!(
"Bytes written {} != period_bytes {}",
transferred, period_bytes
);
Err(Error::InvalidBufferSize)
} else {
src_buf.commit().await;
Ok(src_buf.latency_bytes())
}
}
impl From<Result<u32, Error>> for virtio_snd_pcm_status {
fn from(res: Result<u32, Error>) -> Self {
match res {
Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
Err(e) => {
error!("PCM I/O message failed: {}", e);
virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
}
}
}
}
// Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
async fn drain_desc_receiver(
desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
sender: &mut mpsc::UnboundedSender<PcmResponse>,
) -> Result<(), Error> {
let mut o_desc_chain = desc_receiver.next().await;
while let Some(desc_chain) = o_desc_chain {
// From the virtio-snd spec:
// The device MUST complete all pending I/O messages for the specified stream ID.
let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
// Fetch next DescriptorChain to see if the current one is the last one.
o_desc_chain = desc_receiver.next().await;
let (done, future) = if o_desc_chain.is_none() {
let (done, future) = oneshot::channel();
(Some(done), Some(future))
} else {
(None, None)
};
sender
.send(PcmResponse {
desc_chain,
status,
done,
})
.await
.map_err(Error::MpscSend)?;
if let Some(f) = future {
// From the virtio-snd spec:
// The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
// while there are pending I/O messages for the specified stream ID.
f.await.map_err(Error::DoneNotTriggered)?;
};
}
Ok(())
}
/// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
/// queue, and forward them to CRAS. One pcm worker per stream.
///
/// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
/// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
pub async fn start_pcm_worker(
ex: Executor,
dstream: DirectionalStream,
mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
status_mutex: Rc<AsyncMutex<WorkerStatus>>,
mut sender: mpsc::UnboundedSender<PcmResponse>,
) -> Result<(), Error> {
let res = pcm_worker_loop(ex, dstream, &mut desc_receiver, &status_mutex, &mut sender).await;
*status_mutex.lock().await = WorkerStatus::Quit;
if res.is_err() {
error!(
"pcm_worker error: {:#?}. Draining desc_receiver",
res.as_ref().err()
);
// On error, guaranteed that desc_receiver has not been drained, so drain it here.
// Note that drain blocks until the stream is release.
drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
}
res
}
async fn pcm_worker_loop(
ex: Executor,
dstream: DirectionalStream,
desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
status_mutex: &Rc<AsyncMutex<WorkerStatus>>,
sender: &mut mpsc::UnboundedSender<PcmResponse>,
) -> Result<(), Error> {
match dstream {
#[allow(unused_mut)]
DirectionalStream::Output(mut stream, mut buffer_writer) => loop {
let dst_buf = stream
.next_playback_buffer(&ex)
.await
.map_err(Error::FetchBuffer)?;
let worker_status = status_mutex.lock().await;
match *worker_status {
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
if let Err(e) = write_data(dst_buf, None, &mut buffer_writer).await {
error!("Error on write_data after worker quit: {}", e)
}
break Ok(());
}
WorkerStatus::Pause => {
write_data(dst_buf, None, &mut buffer_writer).await?;
}
WorkerStatus::Running => {
// TODO(b/246601226): Remove once a generic audio_stream solution that can
// accpet arbitrarily size buffers
#[cfg(windows)]
buffer_writer
.check_and_prefill(desc_receiver, sender)
.await?;
match desc_receiver.try_next() {
Err(e) => {
error!("Underrun. No new DescriptorChain while running: {}", e);
write_data(dst_buf, None, &mut buffer_writer).await?;
}
Ok(None) => {
error!("Unreachable. status should be Quit when the channel is closed");
write_data(dst_buf, None, &mut buffer_writer).await?;
return Err(Error::InvalidPCMWorkerState);
}
Ok(Some(mut desc_chain)) => {
// stream_id was already read in handle_pcm_queue
let status = write_data(
dst_buf,
Some(&mut desc_chain.reader),
&mut buffer_writer,
)
.await
.into();
sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
.await
.map_err(Error::MpscSend)?;
}
}
}
}
},
DirectionalStream::Input(mut stream, period_bytes) => loop {
let src_buf = stream
.next_capture_buffer(&ex)
.await
.map_err(Error::FetchBuffer)?;
let worker_status = status_mutex.lock().await;
match *worker_status {
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
if let Err(e) = read_data(src_buf, None, period_bytes).await {
error!("Error on read_data after worker quit: {}", e)
}
break Ok(());
}
WorkerStatus::Pause => {
read_data(src_buf, None, period_bytes).await?;
}
WorkerStatus::Running => match desc_receiver.try_next() {
Err(e) => {
error!("Overrun. No new DescriptorChain while running: {}", e);
read_data(src_buf, None, period_bytes).await?;
}
Ok(None) => {
error!("Unreachable. status should be Quit when the channel is closed");
read_data(src_buf, None, period_bytes).await?;
return Err(Error::InvalidPCMWorkerState);
}
Ok(Some(mut desc_chain)) => {
let status = read_data(src_buf, Some(&mut desc_chain.writer), period_bytes)
.await
.into();
sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
.await
.map_err(Error::MpscSend)?;
}
},
}
},
}
}
// Defer pcm message response to the pcm response worker
async fn defer_pcm_response_to_worker(
desc_chain: DescriptorChain,
status: virtio_snd_pcm_status,
response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
) -> Result<(), Error> {
response_sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
.await
.map_err(Error::MpscSend)
}
fn send_pcm_response<I: SignalableInterrupt>(
mut desc_chain: DescriptorChain,
mem: &GuestMemory,
queue: &mut Queue,
interrupt: &I,
status: virtio_snd_pcm_status,
) -> Result<(), Error> {
let writer = &mut desc_chain.writer;
// For rx queue only. Fast forward the unused audio data buffer.
if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
writer
.consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
}
writer.write_obj(status).map_err(Error::WriteResponse)?;
let len = writer.bytes_written() as u32;
queue.add_used(mem, desc_chain, len);
queue.trigger_interrupt(mem, interrupt);
Ok(())
}
// Await until reset_signal has been released
async fn await_reset_signal(reset_signal_option: Option<&(AsyncMutex<bool>, Condvar)>) {
match reset_signal_option {
Some((lock, cvar)) => {
let mut reset = lock.lock().await;
while !*reset {
reset = cvar.wait(reset).await;
}
}
None => futures::future::pending().await,
};
}
pub async fn send_pcm_response_worker<I: SignalableInterrupt>(
mem: &GuestMemory,
queue: Rc<AsyncMutex<Queue>>,
interrupt: I,
recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
) -> Result<(), Error> {
let on_reset = await_reset_signal(reset_signal).fuse();
pin_mut!(on_reset);
loop {
let next_async = recv.next().fuse();
pin_mut!(next_async);
let res = select! {
_ = on_reset => break,
res = next_async => res,
};
if let Some(r) = res {
send_pcm_response(
r.desc_chain,
mem,
&mut *queue.lock().await,
&interrupt,
r.status,
)?;
// Resume pcm_worker
if let Some(done) = r.done {
done.send(()).map_err(Error::OneshotSend)?;
}
} else {
debug!("PcmResponse channel is closed.");
break;
}
}
Ok(())
}
/// Handle messages from the tx or the rx queue. One invocation is needed for
/// each queue.
pub async fn handle_pcm_queue(
mem: &GuestMemory,
streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
mut response_sender: mpsc::UnboundedSender<PcmResponse>,
queue: Rc<AsyncMutex<Queue>>,
queue_event: &EventAsync,
reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
) -> Result<(), Error> {
let on_reset = await_reset_signal(reset_signal).fuse();
pin_mut!(on_reset);
loop {
// Manual queue.next_async() to avoid holding the mutex
let next_async = async {
loop {
// Check if there are more descriptors available.
if let Some(chain) = queue.lock().await.pop(mem) {
return Ok(chain);
}
queue_event.next_val().await?;
}
}
.fuse();
pin_mut!(next_async);
let mut desc_chain = select! {
_ = on_reset => break,
res = next_async => res.map_err(Error::Async)?,
};
let pcm_xfer: virtio_snd_pcm_xfer =
desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
let streams = streams.read_lock().await;
let stream_info = match streams.get(stream_id) {
Some(stream_info) => stream_info.read_lock().await,
None => {
error!(
"stream_id ({}) >= num_streams ({})",
stream_id,
streams.len()
);
defer_pcm_response_to_worker(
desc_chain,
virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
&mut response_sender,
)
.await?;
continue;
}
};
match stream_info.sender.as_ref() {
Some(mut s) => {
s.send(desc_chain).await.map_err(Error::MpscSend)?;
if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
// If sender channel is still intact but worker status is quit,
// the worker quitted unexpectedly. Return error to request a reset.
return Err(Error::PCMWorkerQuittedUnexpectedly);
}
}
None => {
if !stream_info.just_reset {
error!(
"stream {} is not ready. state: {}",
stream_id,
get_virtio_snd_r_pcm_cmd_name(stream_info.state)
);
}
defer_pcm_response_to_worker(
desc_chain,
virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
&mut response_sender,
)
.await?;
}
};
}
Ok(())
}
/// Handle all the control messages from the ctrl queue.
pub async fn handle_ctrl_queue<I: SignalableInterrupt>(
ex: &Executor,
mem: &GuestMemory,
streams: &Rc<AsyncMutex<Vec<AsyncMutex<StreamInfo>>>>,
snd_data: &SndData,
queue: Rc<AsyncMutex<Queue>>,
queue_event: &mut EventAsync,
interrupt: I,
tx_send: mpsc::UnboundedSender<PcmResponse>,
rx_send: mpsc::UnboundedSender<PcmResponse>,
reset_signal: Option<&(AsyncMutex<bool>, Condvar)>,
) -> Result<(), Error> {
let on_reset = await_reset_signal(reset_signal).fuse();
pin_mut!(on_reset);
let mut queue = queue.lock().await;
loop {
let mut desc_chain = {
let next_async = queue.next_async(mem, queue_event).fuse();
pin_mut!(next_async);
select! {
_ = on_reset => break,
res = next_async => res.map_err(Error::Async)?,
}
};
let reader = &mut desc_chain.reader;
let writer = &mut desc_chain.writer;
// Don't advance the reader
let code = reader
.clone()
.read_obj::<virtio_snd_hdr>()
.map_err(Error::ReadMessage)?
.code
.into();
let handle_ctrl_msg = async {
return match code {
VIRTIO_SND_R_JACK_INFO => {
let query_info: virtio_snd_query_info =
reader.read_obj().map_err(Error::ReadMessage)?;
let start_id: usize = u32::from(query_info.start_id) as usize;
let count: usize = u32::from(query_info.count) as usize;
if start_id + count > snd_data.jack_info.len() {
error!(
"start_id({}) + count({}) must be smaller than \
the number of jacks ({})",
start_id,
count,
snd_data.jack_info.len()
);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
// The response consists of the virtio_snd_hdr structure (contains the request
// status code), followed by the device-writable information structures of the
// item. Each information structure begins with the following common header
writer
.write_obj(VIRTIO_SND_S_OK)
.map_err(Error::WriteResponse)?;
for i in start_id..(start_id + count) {
writer
.write_all(snd_data.jack_info[i].as_bytes())
.map_err(Error::WriteResponse)?;
}
Ok(())
}
VIRTIO_SND_R_PCM_INFO => {
let query_info: virtio_snd_query_info =
reader.read_obj().map_err(Error::ReadMessage)?;
let start_id: usize = u32::from(query_info.start_id) as usize;
let count: usize = u32::from(query_info.count) as usize;
if start_id + count > snd_data.pcm_info.len() {
error!(
"start_id({}) + count({}) must be smaller than \
the number of streams ({})",
start_id,
count,
snd_data.pcm_info.len()
);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
// The response consists of the virtio_snd_hdr structure (contains the request
// status code), followed by the device-writable information structures of the
// item. Each information structure begins with the following common header
writer
.write_obj(VIRTIO_SND_S_OK)
.map_err(Error::WriteResponse)?;
for i in start_id..(start_id + count) {
writer
.write_all(snd_data.pcm_info[i].as_bytes())
.map_err(Error::WriteResponse)?;
}
Ok(())
}
VIRTIO_SND_R_CHMAP_INFO => {
let query_info: virtio_snd_query_info =
reader.read_obj().map_err(Error::ReadMessage)?;
let start_id: usize = u32::from(query_info.start_id) as usize;
let count: usize = u32::from(query_info.count) as usize;
if start_id + count > snd_data.chmap_info.len() {
error!(
"start_id({}) + count({}) must be smaller than \
the number of chmaps ({})",
start_id,
count,
snd_data.chmap_info.len()
);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
// The response consists of the virtio_snd_hdr structure (contains the request
// status code), followed by the device-writable information structures of the
// item. Each information structure begins with the following common header
writer
.write_obj(VIRTIO_SND_S_OK)
.map_err(Error::WriteResponse)?;
for i in start_id..(start_id + count) {
writer
.write_all(snd_data.chmap_info[i].as_bytes())
.map_err(Error::WriteResponse)?;
}
Ok(())
}
VIRTIO_SND_R_JACK_REMAP => {
unreachable!("remap is unsupported");
}
VIRTIO_SND_R_PCM_SET_PARAMS => {
// Raise VIRTIO_SND_S_BAD_MSG or IO error?
let set_params: virtio_snd_pcm_set_params =
reader.read_obj().map_err(Error::ReadMessage)?;
let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
let buffer_bytes: u32 = set_params.buffer_bytes.into();
let period_bytes: u32 = set_params.period_bytes.into();
let dir = match snd_data.pcm_info.get(stream_id) {
Some(pcm_info) => {
if set_params.channels < pcm_info.channels_min
|| set_params.channels > pcm_info.channels_max
{
error!(
"Number of channels ({}) must be between {} and {}",
set_params.channels,
pcm_info.channels_min,
pcm_info.channels_max
);
return writer
.write_obj(VIRTIO_SND_S_NOT_SUPP)
.map_err(Error::WriteResponse);
}
if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
error!("PCM format {} is not supported.", set_params.format);
return writer
.write_obj(VIRTIO_SND_S_NOT_SUPP)
.map_err(Error::WriteResponse);
}
if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
error!("PCM frame rate {} is not supported.", set_params.rate);
return writer
.write_obj(VIRTIO_SND_S_NOT_SUPP)
.map_err(Error::WriteResponse);
}
pcm_info.direction
}
None => {
error!(
"stream_id {} < streams {}",
stream_id,
snd_data.pcm_info.len()
);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
};
if set_params.features != 0 {
error!("No feature is supported");
return writer
.write_obj(VIRTIO_SND_S_NOT_SUPP)
.map_err(Error::WriteResponse);
}
if buffer_bytes % period_bytes != 0 {
error!(
"buffer_bytes({}) must be dividable by period_bytes({})",
buffer_bytes, period_bytes
);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
process_pcm_ctrl(
ex,
&tx_send,
&rx_send,
streams,
VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
writer,
stream_id,
)
.await
}
VIRTIO_SND_R_PCM_PREPARE
| VIRTIO_SND_R_PCM_START
| VIRTIO_SND_R_PCM_STOP
| VIRTIO_SND_R_PCM_RELEASE => {
let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
let stream_id: usize = u32::from(hdr.stream_id) as usize;
let cmd = match VirtioSndPcmCmd::try_from(code) {
Ok(cmd) => cmd,
Err(err) => {
error!("Error converting code to command: {}", err);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
};
process_pcm_ctrl(ex, &tx_send, &rx_send, streams, cmd, writer, stream_id)
.await
.and(Ok(()))?;
Ok(())
}
c => {
error!("Unrecognized code: {}", c);
return writer
.write_obj(VIRTIO_SND_S_BAD_MSG)
.map_err(Error::WriteResponse);
}
};
};
handle_ctrl_msg.await?;
let len = writer.bytes_written() as u32;
queue.add_used(mem, desc_chain, len);
queue.trigger_interrupt(mem, &interrupt);
}
Ok(())
}
/// Send events to the audio driver.
pub async fn handle_event_queue<I: SignalableInterrupt>(
mem: &GuestMemory,
mut queue: Queue,
mut queue_event: EventAsync,
interrupt: I,
) -> Result<(), Error> {
loop {
let desc_chain = queue
.next_async(mem, &mut queue_event)
.await
.map_err(Error::Async)?;
// TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
queue.add_used(mem, desc_chain, 0);
queue.trigger_interrupt(mem, &interrupt);
}
}