Prepare the VioSClient implementation to support a virtio-snd device
- Adds the ability to start/stop background thread on demand
- Changes the way audio data is injected to not assume how the data is
kept by users of the api
- Adds new functions for jacks and chmaps
- Rename constants to match the name used in the virtio-snd spec
BUG=b:174713663
Change-Id: Ie0fe20747a26122258cb63bac09ec0347f13ecc0
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2983388
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Jorge Moreira Broche <jemoreira@google.com>
Reviewed-by: Chih-Yang Hsia <paulhsia@chromium.org>
diff --git a/devices/src/virtio/snd/constants.rs b/devices/src/virtio/snd/constants.rs
index 28a73a0..114d098 100644
--- a/devices/src/virtio/snd/constants.rs
+++ b/devices/src/virtio/snd/constants.rs
@@ -1,21 +1,41 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-pub const JACK_INFO: u32 = 1;
-pub const JACK_REMAP: u32 = 2;
+pub const VIRTIO_SND_R_JACK_INFO: u32 = 1;
+pub const VIRTIO_SND_R_JACK_REMAP: u32 = 2;
-pub const STREAM_INFO: u32 = 0x0100;
-pub const STREAM_SET_PARAMS: u32 = 0x0100 + 1;
-pub const STREAM_PREPARE: u32 = 0x0100 + 2;
-pub const STREAM_RELEASE: u32 = 0x0100 + 3;
-pub const STREAM_START: u32 = 0x0100 + 4;
-pub const STREAM_STOP: u32 = 0x0100 + 5;
+/* PCM control request types */
+pub const VIRTIO_SND_R_PCM_INFO: u32 = 0x0100;
+pub const VIRTIO_SND_R_PCM_SET_PARAMS: u32 = 0x0101;
+pub const VIRTIO_SND_R_PCM_PREPARE: u32 = 0x0102;
+pub const VIRTIO_SND_R_PCM_RELEASE: u32 = 0x0103;
+pub const VIRTIO_SND_R_PCM_START: u32 = 0x0104;
+pub const VIRTIO_SND_R_PCM_STOP: u32 = 0x0105;
-pub const CHANNEL_MAP_INFO: u32 = 0x0200;
+/* channel map control request types */
+pub const VIRTIO_SND_R_CHMAP_INFO: u32 = 0x0200;
+/* jack event types */
+pub const VIRTIO_SND_EVT_JACK_CONNECTED: u32 = 0x1000;
+pub const VIRTIO_SND_EVT_JACK_DISCONNECTED: u32 = 0x1001;
+
+/* PCM event types */
+pub const VIRTIO_SND_EVT_PCM_PERIOD_ELAPSED: u32 = 0x1100;
+pub const VIRTIO_SND_EVT_PCM_XRUN: u32 = 0x1101;
+
+/* common status codes */
+pub const VIRTIO_SND_S_OK: u32 = 0x8000;
+pub const VIRTIO_SND_S_BAD_MSG: u32 = 0x8001;
+pub const VIRTIO_SND_S_NOT_SUPP: u32 = 0x8002;
+pub const VIRTIO_SND_S_IO_ERR: u32 = 0x8003;
+
+/* stream direction */
pub const VIRTIO_SND_D_OUTPUT: u8 = 0;
pub const VIRTIO_SND_D_INPUT: u8 = 1;
+/* supported jack features */
+pub const VIRTIO_SND_JACK_F_REMAP: u32 = 0;
+
/* supported PCM stream features */
pub const VIRTIO_SND_PCM_F_SHMEM_HOST: u8 = 0;
pub const VIRTIO_SND_PCM_F_SHMEM_GUEST: u8 = 1;
@@ -45,11 +65,13 @@
pub const VIRTIO_SND_PCM_FMT_U32: u8 = 18;
pub const VIRTIO_SND_PCM_FMT_FLOAT: u8 = 19;
pub const VIRTIO_SND_PCM_FMT_FLOAT64: u8 = 20;
+/* digital formats (width / physical width) */
pub const VIRTIO_SND_PCM_FMT_DSD_U8: u8 = 21;
pub const VIRTIO_SND_PCM_FMT_DSD_U16: u8 = 22;
pub const VIRTIO_SND_PCM_FMT_DSD_U32: u8 = 23;
pub const VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME: u8 = 24;
+/* supported PCM frame rates */
pub const VIRTIO_SND_PCM_RATE_5512: u8 = 0;
pub const VIRTIO_SND_PCM_RATE_8000: u8 = 1;
pub const VIRTIO_SND_PCM_RATE_11025: u8 = 2;
@@ -65,38 +87,6 @@
pub const VIRTIO_SND_PCM_RATE_192000: u8 = 12;
pub const VIRTIO_SND_PCM_RATE_384000: u8 = 13;
-// From https://github.com/oasis-tcs/virtio-spec/blob/master/virtio-sound.tex
-/* jack control request types */
-pub const VIRTIO_SND_R_JACK_INFO: u32 = 1;
-pub const VIRTIO_SND_R_JACK_REMAP: u32 = 2;
-
-/* PCM control request types */
-pub const VIRTIO_SND_R_PCM_INFO: u32 = 0x0100;
-pub const VIRTIO_SND_R_PCM_SET_PARAMS: u32 = 0x0101;
-pub const VIRTIO_SND_R_PCM_PREPARE: u32 = 0x0102;
-pub const VIRTIO_SND_R_PCM_RELEASE: u32 = 0x0103;
-pub const VIRTIO_SND_R_PCM_START: u32 = 0x0104;
-pub const VIRTIO_SND_R_PCM_STOP: u32 = 0x0105;
-
-/* channel map control request types */
-pub const VIRTIO_SND_R_CHMAP_INFO: u32 = 0x0200;
-
-/* jack event types */
-pub const VIRTIO_SND_EVT_JACK_CONNECTED: u32 = 0x1000;
-pub const VIRTIO_SND_EVT_JACK_DISCONNECTED: u32 = 0x1001;
-
-/* PCM event types */
-pub const VIRTIO_SND_EVT_PCM_PERIOD_ELAPSED: u32 = 0x1100;
-pub const VIRTIO_SND_EVT_PCM_XRUN: u32 = 0x1101;
-
-/* common status codes */
-pub const VIRTIO_SND_S_OK: u32 = 0x8000;
-pub const VIRTIO_SND_S_BAD_MSG: u32 = 0x8001;
-pub const VIRTIO_SND_S_NOT_SUPP: u32 = 0x8002;
-pub const VIRTIO_SND_S_IO_ERR: u32 = 0x8003;
-
-pub const VIRTIO_SND_JACK_F_REMAP: u32 = 0;
-
/* standard channel position definition */
pub const VIRTIO_SND_CHMAP_NONE: u32 = 0; /* undefined */
pub const VIRTIO_SND_CHMAP_NA: u32 = 1; /* silent */
diff --git a/devices/src/virtio/snd/vios_backend/shm_streams.rs b/devices/src/virtio/snd/vios_backend/shm_streams.rs
index 87bfbee..a4f6833 100644
--- a/devices/src/virtio/snd/vios_backend/shm_streams.rs
+++ b/devices/src/virtio/snd/vios_backend/shm_streams.rs
@@ -15,7 +15,8 @@
use audio_streams::shm_streams::{BufferSet, ServerRequest, ShmStream, ShmStreamSource};
use audio_streams::{BoxError, SampleFormat, StreamDirection, StreamEffect};
-use base::{error, SharedMemory, SharedMemoryUnix};
+use base::{error, MemoryMapping, MemoryMappingBuilder, SharedMemory, SharedMemoryUnix};
+use data_model::VolatileMemory;
use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
@@ -98,7 +99,7 @@
client_shm: &SysSharedMemory,
buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>> {
- self.vios_client.ensure_bg_thread_started()?;
+ self.vios_client.start_bg_thread()?;
let virtio_dir = match direction {
StreamDirection::Playback => VIRTIO_SND_D_OUTPUT,
StreamDirection::Capture => VIRTIO_SND_D_INPUT,
@@ -233,20 +234,54 @@
fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()> {
match self.direction {
StreamDirection::Playback => {
- self.vios_client.inject_audio_data(
+ let requested_size = frames * self.frame_size;
+ let shm_ref = &mut self.client_shm;
+ let (_, res) = self.vios_client.inject_audio_data::<Result<()>, _>(
self.stream_id,
- &mut self.client_shm,
- offset,
- frames * self.frame_size,
+ requested_size,
+ |slice| {
+ if requested_size != slice.size() {
+ error!(
+ "Buffer size is different than the requested size: {} vs {}",
+ requested_size,
+ slice.size()
+ );
+ }
+ let size = std::cmp::min(requested_size, slice.size());
+ let (src_mmap, mmap_offset) = mmap_buffer(shm_ref, offset, size)?;
+ let src_slice = src_mmap
+ .get_slice(mmap_offset, size)
+ .map_err(Error::VolatileMemoryError)?;
+ src_slice.copy_to_volatile_slice(slice);
+ Ok(())
+ },
)?;
+ res?;
}
StreamDirection::Capture => {
- self.vios_client.request_audio_data(
+ let requested_size = frames * self.frame_size;
+ let shm_ref = &mut self.client_shm;
+ let (_, res) = self.vios_client.request_audio_data::<Result<()>, _>(
self.stream_id,
- &mut self.client_shm,
- offset,
- frames * self.frame_size,
+ requested_size,
+ |slice| {
+ if requested_size != slice.size() {
+ error!(
+ "Buffer size is different than the requested size: {} vs {}",
+ requested_size,
+ slice.size()
+ );
+ }
+ let size = std::cmp::min(requested_size, slice.size());
+ let (dst_mmap, mmap_offset) = mmap_buffer(shm_ref, offset, size)?;
+ let dst_slice = dst_mmap
+ .get_slice(mmap_offset, size)
+ .map_err(Error::VolatileMemoryError)?;
+ slice.copy_to_volatile_slice(dst_slice);
+ Ok(())
+ },
)?;
+ res?;
}
}
Ok(())
@@ -269,3 +304,25 @@
}
}
}
+
+/// Memory map a shared memory object to access an audio buffer. The buffer may not be located at an
+/// offset aligned to page size, so the offset within the mapped region is returned along with the
+/// MemoryMapping struct.
+fn mmap_buffer(
+ src: &mut SharedMemory,
+ offset: usize,
+ size: usize,
+) -> Result<(MemoryMapping, usize)> {
+ // If the buffer is not aligned to page size a bigger region needs to be mapped.
+ let aligned_offset = offset & !(base::pagesize() - 1);
+ let offset_from_mapping_start = offset - aligned_offset;
+ let extended_size = size + offset_from_mapping_start;
+
+ let mmap = MemoryMappingBuilder::new(extended_size)
+ .offset(aligned_offset as u64)
+ .from_shared_memory(src)
+ .build()
+ .map_err(Error::GuestMmapError)?;
+
+ Ok((mmap, offset_from_mapping_start))
+}
diff --git a/devices/src/virtio/snd/vios_backend/shm_vios.rs b/devices/src/virtio/snd/vios_backend/shm_vios.rs
index 9aec672..5978c2d 100644
--- a/devices/src/virtio/snd/vios_backend/shm_vios.rs
+++ b/devices/src/virtio/snd/vios_backend/shm_vios.rs
@@ -8,9 +8,9 @@
use base::{
error, net::UnixSeqpacket, AsRawDescriptor, Error as BaseError, Event, FromRawDescriptor,
IntoRawDescriptor, MemoryMapping, MemoryMappingBuilder, MmapError, PollToken, SafeDescriptor,
- ScmSocket, SharedMemory, WaitContext,
+ ScmSocket, WaitContext,
};
-use data_model::{DataInit, VolatileMemory, VolatileMemoryError};
+use data_model::{DataInit, Le32, Le64, VolatileMemory, VolatileMemoryError, VolatileSlice};
use std::collections::HashMap;
use std::fs::File;
@@ -75,6 +75,8 @@
EventCreateError(BaseError),
#[error("Failed to dup Recv event: {0}")]
EventDupError(BaseError),
+ #[error("Failed to signal event: {0}")]
+ EventWriteError(BaseError),
#[error("Failed to create Recv thread's WaitContext: {0}")]
WaitContextCreateError(BaseError),
#[error("Error waiting for events")]
@@ -107,7 +109,8 @@
event_socket: Mutex<UnixSeqpacket>,
tx: Mutex<IoBufferQueue>,
rx: Mutex<IoBufferQueue>,
- rx_subscribers: Arc<Mutex<HashMap<usize, Sender<(u32, usize)>>>>,
+ tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
+ rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
recv_running: Arc<Mutex<bool>>,
recv_event: Mutex<Event>,
recv_thread: Mutex<Option<JoinHandle<Result<()>>>>,
@@ -180,7 +183,9 @@
));
}
- let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<(u32, usize)>>>> =
+ let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
+ Arc::new(Mutex::new(HashMap::new()));
+ let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
Arc::new(Mutex::new(HashMap::new()));
let recv_running = Arc::new(Mutex::new(true));
let recv_event = Event::new().map_err(Error::EventCreateError)?;
@@ -192,6 +197,7 @@
event_socket: Mutex::new(event_socket),
tx: Mutex::new(IoBufferQueue::new(tx_socket, tx_shm_file)?),
rx: Mutex::new(IoBufferQueue::new(rx_socket, rx_shm_file)?),
+ tx_subscribers,
rx_subscribers,
recv_running,
recv_event: Mutex::new(recv_event),
@@ -201,7 +207,34 @@
Ok(client)
}
- pub fn ensure_bg_thread_started(&self) -> Result<()> {
+ /// Get the number of jacks
+ pub fn num_jacks(&self) -> u32 {
+ self.config.jacks
+ }
+
+ /// Get the number of pcm streams
+ pub fn num_streams(&self) -> u32 {
+ self.config.streams
+ }
+
+ /// Get the number of channel maps
+ pub fn num_chmaps(&self) -> u32 {
+ self.config.chmaps
+ }
+
+ /// Get the configuration information on a pcm stream
+ pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {
+ self.streams
+ .lock()
+ .get(idx as usize)
+ .map(virtio_snd_pcm_info::from)
+ }
+
+ /// Starts the background thread that receives release messages from the server. If the thread
+ /// was already started this function does nothing.
+ /// This thread must be started prior to attempting any stream IO operation or the calling
+ /// thread would block.
+ pub fn start_bg_thread(&self) -> Result<()> {
if self.recv_thread.lock().is_some() {
return Ok(());
}
@@ -210,6 +243,12 @@
.lock()
.try_clone()
.map_err(Error::EventDupError)?;
+ let tx_socket = self
+ .tx
+ .lock()
+ .socket
+ .try_clone()
+ .map_err(Error::UnixSeqpacketDupError)?;
let rx_socket = self
.rx
.lock()
@@ -221,15 +260,39 @@
// while duplicating the fds. So we have to check again the condition.
if opt.is_none() {
*opt = Some(spawn_recv_thread(
+ self.tx_subscribers.clone(),
self.rx_subscribers.clone(),
event_socket,
self.recv_running.clone(),
+ tx_socket,
rx_socket,
));
}
Ok(())
}
+ /// Stops the background thread.
+ pub fn stop_bg_thread(&self) -> Result<()> {
+ if self.recv_thread.lock().is_none() {
+ return Ok(());
+ }
+ *self.recv_running.lock() = false;
+ self.recv_event
+ .lock()
+ .write(1u64)
+ .map_err(Error::EventWriteError)?;
+ if let Some(handle) = self.recv_thread.lock().take() {
+ return match handle.join() {
+ Ok(r) => r,
+ Err(e) => {
+ error!("Recv thread panicked: {:?}", e);
+ Ok(())
+ }
+ };
+ }
+ Ok(())
+ }
+
/// Gets an unused stream id of the specified direction. `direction` must be one of
/// VIRTIO_SND_D_INPUT OR VIRTIO_SND_D_OUTPUT.
pub fn get_unused_stream_id(&self, direction: u8) -> Option<u32> {
@@ -254,13 +317,26 @@
Ok(())
}
+ /// Configures a stream with the given parameters.
+ pub fn set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()> {
+ let stream_id = raw_params.hdr.stream_id.to_native();
+ self.validate_stream_id(
+ stream_id,
+ &[StreamState::Available, StreamState::Acquired],
+ None,
+ )?;
+ self.send_cmd(raw_params)?;
+ self.streams.lock()[stream_id as usize].state = StreamState::Acquired;
+ Ok(())
+ }
+
/// Send the PREPARE_STREAM command to the server.
pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {
self.common_stream_op(
stream_id,
&[StreamState::Available, StreamState::Acquired],
StreamState::Acquired,
- STREAM_PREPARE,
+ VIRTIO_SND_R_PCM_PREPARE,
)
}
@@ -270,7 +346,7 @@
stream_id,
&[StreamState::Acquired],
StreamState::Available,
- STREAM_RELEASE,
+ VIRTIO_SND_R_PCM_RELEASE,
)
}
@@ -280,7 +356,7 @@
stream_id,
&[StreamState::Acquired],
StreamState::Active,
- STREAM_START,
+ VIRTIO_SND_R_PCM_START,
)
}
@@ -290,40 +366,53 @@
stream_id,
&[StreamState::Active],
StreamState::Acquired,
- STREAM_STOP,
+ VIRTIO_SND_R_PCM_STOP,
)
}
- /// Send audio frames to the server. The audio data is taken from a shared memory resource.
- pub fn inject_audio_data(
+ /// Send audio frames to the server. Blocks the calling thread until the server acknowledges
+ /// the data.
+ pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(
&self,
stream_id: u32,
- buffer: &mut SharedMemory,
- src_offset: usize,
size: usize,
- ) -> Result<()> {
+ callback: Cb,
+ ) -> Result<(u32, R)> {
self.validate_stream_id(stream_id, &[StreamState::Active], Some(VIRTIO_SND_D_OUTPUT))?;
- let mut tx_lock = self.tx.lock();
- let tx = &mut *tx_lock;
- let dst_offset = tx.push_buffer(buffer, src_offset, size)?;
- let msg = IoTransferMsg::new(stream_id, dst_offset, size);
- seq_socket_send(&tx.socket, msg)
+ let (status_promise, ret) = {
+ let mut tx_lock = self.tx.lock();
+ let tx = &mut *tx_lock;
+ let dst_offset = tx.allocate_buffer(size)?;
+ let buffer_slice = tx.buffer_at(dst_offset, size)?;
+ let ret = callback(buffer_slice);
+ // Register to receive the status before sending the buffer to the server
+ let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) =
+ channel();
+ // It's OK to acquire tx_subscriber's lock after tx_lock
+ self.tx_subscribers.lock().insert(dst_offset, sender);
+ let msg = IoTransferMsg::new(stream_id, dst_offset, size);
+ seq_socket_send(&tx.socket, msg)?;
+ (receiver, ret)
+ };
+ let (_, latency) = await_status(status_promise)?;
+ Ok((latency, ret))
}
- pub fn request_audio_data(
+ /// Request audio frames from the server. It blocks until the data is available.
+ pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(
&self,
stream_id: u32,
- buffer: &mut SharedMemory,
- dst_offset: usize,
size: usize,
- ) -> Result<usize> {
+ callback: Cb,
+ ) -> Result<(u32, R)> {
self.validate_stream_id(stream_id, &[StreamState::Active], Some(VIRTIO_SND_D_INPUT))?;
let (src_offset, status_promise) = {
let mut rx_lock = self.rx.lock();
let rx = &mut *rx_lock;
let src_offset = rx.allocate_buffer(size)?;
// Register to receive the status before sending the buffer to the server
- let (sender, receiver): (Sender<(u32, usize)>, Receiver<(u32, usize)>) = channel();
+ let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) =
+ channel();
// It's OK to acquire rx_subscriber's lock after rx_lock
self.rx_subscribers.lock().insert(src_offset, sender);
let msg = IoTransferMsg::new(stream_id, src_offset, size);
@@ -331,12 +420,11 @@
(src_offset, receiver)
};
// Make sure no mutexes are held while awaiting for the buffer to be written to
- let recv_size = await_status(status_promise)?;
+ let (recv_size, latency) = await_status(status_promise)?;
{
let mut rx_lock = self.rx.lock();
- rx_lock
- .pop_buffer(buffer, dst_offset, recv_size, src_offset)
- .map(|()| recv_size)
+ let buffer_slice = rx_lock.buffer_at(src_offset, recv_size)?;
+ Ok((latency, callback(&buffer_slice)))
}
}
@@ -418,7 +506,7 @@
let info_size = std::mem::size_of::<virtio_snd_pcm_info>();
let req = virtio_snd_query_info {
hdr: virtio_snd_hdr {
- code: STREAM_INFO.into(),
+ code: VIRTIO_SND_R_PCM_INFO.into(),
},
start_id: 0u32.into(),
count: (num_streams as u32).into(),
@@ -451,20 +539,8 @@
impl Drop for VioSClient {
fn drop(&mut self) {
- // Stop the recv thread
- *self.recv_running.lock() = false;
- if let Err(e) = self.recv_event.lock().write(1u64) {
- error!("Failed to notify recv thread: {:?}", e);
- }
- if let Some(handle) = self.recv_thread.lock().take() {
- match handle.join() {
- Ok(r) => {
- if let Err(e) = r {
- error!("Error detected on Recv Thread: {}", e);
- }
- }
- Err(e) => error!("Recv thread panicked: {:?}", e),
- };
+ if let Err(e) = self.stop_bg_thread() {
+ error!("Error stopping Recv thread: {}", e);
}
}
}
@@ -472,60 +548,75 @@
#[derive(PollToken)]
enum Token {
Notification,
+ TxBufferMsg,
RxBufferMsg,
}
+fn recv_buffer_status_msg(
+ socket: &UnixSeqpacket,
+ subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
+) -> Result<()> {
+ let mut msg: IoStatusMsg = Default::default();
+ let size = socket
+ .recv(msg.as_mut_slice())
+ .map_err(Error::ServerIOError)?;
+ if size != std::mem::size_of::<IoStatusMsg>() {
+ return Err(Error::ProtocolError(
+ ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),
+ ));
+ }
+ let mut status = msg.status.status.into();
+ if status == u32::MAX {
+ // Anyone waiting for this would continue to wait for as long as status is
+ // u32::MAX
+ status -= 1;
+ }
+ let latency = msg.status.latency_bytes.into();
+ let offset = msg.buffer_offset as usize;
+ let consumed_len = msg.consumed_len as usize;
+ let promise_opt = subscribers.lock().remove(&offset);
+ match promise_opt {
+ None => error!(
+ "Received an unexpected buffer status message: {}. This is a BUG!!",
+ offset
+ ),
+ Some(sender) => {
+ if let Err(e) = sender.send(BufferReleaseMsg {
+ status,
+ latency,
+ consumed_len,
+ }) {
+ error!("Failed to notify waiting thread: {:?}", e);
+ }
+ }
+ }
+ Ok(())
+}
+
fn spawn_recv_thread(
- rx_subscribers: Arc<Mutex<HashMap<usize, Sender<(u32, usize)>>>>,
+ tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
+ rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
event: Event,
running: Arc<Mutex<bool>>,
+ tx_socket: UnixSeqpacket,
rx_socket: UnixSeqpacket,
) -> JoinHandle<Result<()>> {
std::thread::spawn(move || {
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
+ (&tx_socket, Token::TxBufferMsg),
(&rx_socket, Token::RxBufferMsg),
(&event, Token::Notification),
])
.map_err(Error::WaitContextCreateError)?;
- while *running.lock() {
+ loop {
+ if !*running.lock() {
+ break;
+ }
let events = wait_ctx.wait().map_err(Error::WaitError)?;
for evt in events {
match evt.token {
- Token::RxBufferMsg => {
- let mut msg: IoStatusMsg = Default::default();
- let size = rx_socket
- .recv(msg.as_mut_slice())
- .map_err(Error::ServerIOError)?;
- if size != std::mem::size_of::<IoStatusMsg>() {
- return Err(Error::ProtocolError(
- ProtocolErrorKind::UnexpectedMessageSize(
- std::mem::size_of::<IoStatusMsg>(),
- size,
- ),
- ));
- }
- let mut status = msg.status.status.into();
- if status == u32::MAX {
- // Anyone waiting for this would continue to wait for as long as status is
- // u32::MAX
- status -= 1;
- }
- let offset = msg.buffer_offset as usize;
- let consumed_len = msg.consumed_len as usize;
- // Acquire and immediately release the mutex protecting the hashmap
- let promise_opt = rx_subscribers.lock().remove(&offset);
- match promise_opt {
- None => error!(
- "Received an unexpected buffer status message: {}. This is a BUG!!",
- offset
- ),
- Some(sender) => {
- if let Err(e) = sender.send((status, consumed_len)) {
- error!("Failed to notify waiting thread: {:?}", e);
- }
- }
- }
- }
+ Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,
+ Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,
Token::Notification => {
// Just consume the notification and check for termination on the next
// iteration
@@ -540,10 +631,14 @@
})
}
-fn await_status(promise: Receiver<(u32, usize)>) -> Result<usize> {
- let (status, consumed_len) = promise.recv().map_err(Error::BufferStatusSenderLost)?;
+fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {
+ let BufferReleaseMsg {
+ status,
+ latency,
+ consumed_len,
+ } = promise.recv().map_err(Error::BufferStatusSenderLost)?;
if status == VIRTIO_SND_S_OK {
- Ok(consumed_len)
+ Ok((consumed_len, latency))
} else {
Err(Error::IOBufferError(status))
}
@@ -589,37 +684,10 @@
Ok(offset)
}
- fn push_buffer(&mut self, src: &mut SharedMemory, offset: usize, size: usize) -> Result<usize> {
- let shm_offset = self.allocate_buffer(size)?;
- let (src_mmap, mmap_offset) = mmap_buffer(src, offset, size)?;
- let src_slice = src_mmap
- .get_slice(mmap_offset, size)
- .map_err(Error::VolatileMemoryError)?;
- let dst_slice = self
- .mmap
- .get_slice(shm_offset, size)
- .map_err(Error::VolatileMemoryError)?;
- src_slice.copy_to_volatile_slice(dst_slice);
- Ok(shm_offset)
- }
-
- fn pop_buffer(
- &mut self,
- dst: &mut SharedMemory,
- dst_offset: usize,
- size: usize,
- src_offset: usize,
- ) -> Result<()> {
- let (dst_mmap, mmap_offset) = mmap_buffer(dst, dst_offset, size)?;
- let dst_slice = dst_mmap
- .get_slice(mmap_offset, size)
- .map_err(Error::VolatileMemoryError)?;
- let src_slice = self
- .mmap
- .get_slice(src_offset, size)
- .map_err(Error::VolatileMemoryError)?;
- src_slice.copy_to_volatile_slice(dst_slice);
- Ok(())
+ fn buffer_at(&mut self, offset: usize, len: usize) -> Result<VolatileSlice> {
+ self.mmap
+ .get_slice(offset, len)
+ .map_err(Error::VolatileMemoryError)
}
}
@@ -652,6 +720,23 @@
}
}
+impl std::convert::From<&VioSStreamInfo> for virtio_snd_pcm_info {
+ fn from(info: &VioSStreamInfo) -> virtio_snd_pcm_info {
+ virtio_snd_pcm_info {
+ hdr: virtio_snd_info {
+ hda_fn_nid: Le32::from(info.hda_fn_nid),
+ },
+ features: Le32::from(info.features),
+ formats: Le64::from(info.formats),
+ rates: Le64::from(info.rates),
+ direction: info.direction,
+ channels_min: info.channels_min,
+ channels_max: info.channels_max,
+ padding: [0u8; 5],
+ }
+ }
+}
+
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum StreamState {
Available,
@@ -674,7 +759,7 @@
virtio_snd_pcm_set_params {
hdr: virtio_snd_pcm_hdr {
hdr: virtio_snd_hdr {
- code: STREAM_SET_PARAMS.into(),
+ code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),
},
stream_id: self.0.into(),
},
@@ -689,28 +774,6 @@
}
}
-/// Memory map a shared memory object to access an audio buffer. The buffer may not be located at an
-/// offset aligned to page size, so the offset within the mapped region is returned along with the
-/// MemoryMapping struct.
-fn mmap_buffer(
- src: &mut SharedMemory,
- offset: usize,
- size: usize,
-) -> Result<(MemoryMapping, usize)> {
- // If the buffer is not aligned to page size a bigger region needs to be mapped.
- let aligned_offset = offset & !(base::pagesize() - 1);
- let offset_from_mapping_start = offset - aligned_offset;
- let extended_size = size + offset_from_mapping_start;
-
- let mmap = MemoryMappingBuilder::new(extended_size)
- .offset(aligned_offset as u64)
- .from_shared_memory(src)
- .build()
- .map_err(Error::GuestMmapError)?;
-
- Ok((mmap, offset_from_mapping_start))
-}
-
fn recv_cmd_status(control_socket: &mut UnixSeqpacket) -> Result<()> {
let mut status: virtio_snd_hdr = Default::default();
control_socket
@@ -752,6 +815,12 @@
// Safe because it only has data and has no implicit padding.
unsafe impl DataInit for VioSConfig {}
+struct BufferReleaseMsg {
+ status: u32,
+ latency: u32,
+ consumed_len: usize,
+}
+
#[repr(C)]
#[derive(Copy, Clone)]
struct IoTransferMsg {