Prepare the VioSClient implementation to support a virtio-snd device

- Adds the ability to start/stop background thread on demand
- Changes the way audio data is injected to not assume how the data is
kept by users of the api
- Adds new functions for jacks and chmaps
- Rename constants to match the name used in the virtio-snd spec

BUG=b:174713663

Change-Id: Ie0fe20747a26122258cb63bac09ec0347f13ecc0
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2983388
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Jorge Moreira Broche <jemoreira@google.com>
Reviewed-by: Chih-Yang Hsia <paulhsia@chromium.org>
diff --git a/devices/src/virtio/snd/constants.rs b/devices/src/virtio/snd/constants.rs
index 28a73a0..114d098 100644
--- a/devices/src/virtio/snd/constants.rs
+++ b/devices/src/virtio/snd/constants.rs
@@ -1,21 +1,41 @@
 // Copyright 2020 The Chromium OS Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
-pub const JACK_INFO: u32 = 1;
-pub const JACK_REMAP: u32 = 2;
+pub const VIRTIO_SND_R_JACK_INFO: u32 = 1;
+pub const VIRTIO_SND_R_JACK_REMAP: u32 = 2;
 
-pub const STREAM_INFO: u32 = 0x0100;
-pub const STREAM_SET_PARAMS: u32 = 0x0100 + 1;
-pub const STREAM_PREPARE: u32 = 0x0100 + 2;
-pub const STREAM_RELEASE: u32 = 0x0100 + 3;
-pub const STREAM_START: u32 = 0x0100 + 4;
-pub const STREAM_STOP: u32 = 0x0100 + 5;
+/* PCM control request types */
+pub const VIRTIO_SND_R_PCM_INFO: u32 = 0x0100;
+pub const VIRTIO_SND_R_PCM_SET_PARAMS: u32 = 0x0101;
+pub const VIRTIO_SND_R_PCM_PREPARE: u32 = 0x0102;
+pub const VIRTIO_SND_R_PCM_RELEASE: u32 = 0x0103;
+pub const VIRTIO_SND_R_PCM_START: u32 = 0x0104;
+pub const VIRTIO_SND_R_PCM_STOP: u32 = 0x0105;
 
-pub const CHANNEL_MAP_INFO: u32 = 0x0200;
+/* channel map control request types */
+pub const VIRTIO_SND_R_CHMAP_INFO: u32 = 0x0200;
 
+/* jack event types */
+pub const VIRTIO_SND_EVT_JACK_CONNECTED: u32 = 0x1000;
+pub const VIRTIO_SND_EVT_JACK_DISCONNECTED: u32 = 0x1001;
+
+/* PCM event types */
+pub const VIRTIO_SND_EVT_PCM_PERIOD_ELAPSED: u32 = 0x1100;
+pub const VIRTIO_SND_EVT_PCM_XRUN: u32 = 0x1101;
+
+/* common status codes */
+pub const VIRTIO_SND_S_OK: u32 = 0x8000;
+pub const VIRTIO_SND_S_BAD_MSG: u32 = 0x8001;
+pub const VIRTIO_SND_S_NOT_SUPP: u32 = 0x8002;
+pub const VIRTIO_SND_S_IO_ERR: u32 = 0x8003;
+
+/* stream direction */
 pub const VIRTIO_SND_D_OUTPUT: u8 = 0;
 pub const VIRTIO_SND_D_INPUT: u8 = 1;
 
+/* supported jack features */
+pub const VIRTIO_SND_JACK_F_REMAP: u32 = 0;
+
 /* supported PCM stream features */
 pub const VIRTIO_SND_PCM_F_SHMEM_HOST: u8 = 0;
 pub const VIRTIO_SND_PCM_F_SHMEM_GUEST: u8 = 1;
@@ -45,11 +65,13 @@
 pub const VIRTIO_SND_PCM_FMT_U32: u8 = 18;
 pub const VIRTIO_SND_PCM_FMT_FLOAT: u8 = 19;
 pub const VIRTIO_SND_PCM_FMT_FLOAT64: u8 = 20;
+/* digital formats (width / physical width) */
 pub const VIRTIO_SND_PCM_FMT_DSD_U8: u8 = 21;
 pub const VIRTIO_SND_PCM_FMT_DSD_U16: u8 = 22;
 pub const VIRTIO_SND_PCM_FMT_DSD_U32: u8 = 23;
 pub const VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME: u8 = 24;
 
+/* supported PCM frame rates */
 pub const VIRTIO_SND_PCM_RATE_5512: u8 = 0;
 pub const VIRTIO_SND_PCM_RATE_8000: u8 = 1;
 pub const VIRTIO_SND_PCM_RATE_11025: u8 = 2;
@@ -65,38 +87,6 @@
 pub const VIRTIO_SND_PCM_RATE_192000: u8 = 12;
 pub const VIRTIO_SND_PCM_RATE_384000: u8 = 13;
 
-// From https://github.com/oasis-tcs/virtio-spec/blob/master/virtio-sound.tex
-/* jack control request types */
-pub const VIRTIO_SND_R_JACK_INFO: u32 = 1;
-pub const VIRTIO_SND_R_JACK_REMAP: u32 = 2;
-
-/* PCM control request types */
-pub const VIRTIO_SND_R_PCM_INFO: u32 = 0x0100;
-pub const VIRTIO_SND_R_PCM_SET_PARAMS: u32 = 0x0101;
-pub const VIRTIO_SND_R_PCM_PREPARE: u32 = 0x0102;
-pub const VIRTIO_SND_R_PCM_RELEASE: u32 = 0x0103;
-pub const VIRTIO_SND_R_PCM_START: u32 = 0x0104;
-pub const VIRTIO_SND_R_PCM_STOP: u32 = 0x0105;
-
-/* channel map control request types */
-pub const VIRTIO_SND_R_CHMAP_INFO: u32 = 0x0200;
-
-/* jack event types */
-pub const VIRTIO_SND_EVT_JACK_CONNECTED: u32 = 0x1000;
-pub const VIRTIO_SND_EVT_JACK_DISCONNECTED: u32 = 0x1001;
-
-/* PCM event types */
-pub const VIRTIO_SND_EVT_PCM_PERIOD_ELAPSED: u32 = 0x1100;
-pub const VIRTIO_SND_EVT_PCM_XRUN: u32 = 0x1101;
-
-/* common status codes */
-pub const VIRTIO_SND_S_OK: u32 = 0x8000;
-pub const VIRTIO_SND_S_BAD_MSG: u32 = 0x8001;
-pub const VIRTIO_SND_S_NOT_SUPP: u32 = 0x8002;
-pub const VIRTIO_SND_S_IO_ERR: u32 = 0x8003;
-
-pub const VIRTIO_SND_JACK_F_REMAP: u32 = 0;
-
 /* standard channel position definition */
 pub const VIRTIO_SND_CHMAP_NONE: u32 = 0; /* undefined */
 pub const VIRTIO_SND_CHMAP_NA: u32 = 1; /* silent */
diff --git a/devices/src/virtio/snd/vios_backend/shm_streams.rs b/devices/src/virtio/snd/vios_backend/shm_streams.rs
index 87bfbee..a4f6833 100644
--- a/devices/src/virtio/snd/vios_backend/shm_streams.rs
+++ b/devices/src/virtio/snd/vios_backend/shm_streams.rs
@@ -15,7 +15,8 @@
 use audio_streams::shm_streams::{BufferSet, ServerRequest, ShmStream, ShmStreamSource};
 use audio_streams::{BoxError, SampleFormat, StreamDirection, StreamEffect};
 
-use base::{error, SharedMemory, SharedMemoryUnix};
+use base::{error, MemoryMapping, MemoryMappingBuilder, SharedMemory, SharedMemoryUnix};
+use data_model::VolatileMemory;
 
 use std::fs::File;
 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
@@ -98,7 +99,7 @@
         client_shm: &SysSharedMemory,
         buffer_offsets: [u64; 2],
     ) -> GenericResult<Box<dyn ShmStream>> {
-        self.vios_client.ensure_bg_thread_started()?;
+        self.vios_client.start_bg_thread()?;
         let virtio_dir = match direction {
             StreamDirection::Playback => VIRTIO_SND_D_OUTPUT,
             StreamDirection::Capture => VIRTIO_SND_D_INPUT,
@@ -233,20 +234,54 @@
     fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()> {
         match self.direction {
             StreamDirection::Playback => {
-                self.vios_client.inject_audio_data(
+                let requested_size = frames * self.frame_size;
+                let shm_ref = &mut self.client_shm;
+                let (_, res) = self.vios_client.inject_audio_data::<Result<()>, _>(
                     self.stream_id,
-                    &mut self.client_shm,
-                    offset,
-                    frames * self.frame_size,
+                    requested_size,
+                    |slice| {
+                        if requested_size != slice.size() {
+                            error!(
+                                "Buffer size is different than the requested size: {} vs {}",
+                                requested_size,
+                                slice.size()
+                            );
+                        }
+                        let size = std::cmp::min(requested_size, slice.size());
+                        let (src_mmap, mmap_offset) = mmap_buffer(shm_ref, offset, size)?;
+                        let src_slice = src_mmap
+                            .get_slice(mmap_offset, size)
+                            .map_err(Error::VolatileMemoryError)?;
+                        src_slice.copy_to_volatile_slice(slice);
+                        Ok(())
+                    },
                 )?;
+                res?;
             }
             StreamDirection::Capture => {
-                self.vios_client.request_audio_data(
+                let requested_size = frames * self.frame_size;
+                let shm_ref = &mut self.client_shm;
+                let (_, res) = self.vios_client.request_audio_data::<Result<()>, _>(
                     self.stream_id,
-                    &mut self.client_shm,
-                    offset,
-                    frames * self.frame_size,
+                    requested_size,
+                    |slice| {
+                        if requested_size != slice.size() {
+                            error!(
+                                "Buffer size is different than the requested size: {} vs {}",
+                                requested_size,
+                                slice.size()
+                            );
+                        }
+                        let size = std::cmp::min(requested_size, slice.size());
+                        let (dst_mmap, mmap_offset) = mmap_buffer(shm_ref, offset, size)?;
+                        let dst_slice = dst_mmap
+                            .get_slice(mmap_offset, size)
+                            .map_err(Error::VolatileMemoryError)?;
+                        slice.copy_to_volatile_slice(dst_slice);
+                        Ok(())
+                    },
                 )?;
+                res?;
             }
         }
         Ok(())
@@ -269,3 +304,25 @@
         }
     }
 }
+
+/// Memory map a shared memory object to access an audio buffer. The buffer may not be located at an
+/// offset aligned to page size, so the offset within the mapped region is returned along with the
+/// MemoryMapping struct.
+fn mmap_buffer(
+    src: &mut SharedMemory,
+    offset: usize,
+    size: usize,
+) -> Result<(MemoryMapping, usize)> {
+    // If the buffer is not aligned to page size a bigger region needs to be mapped.
+    let aligned_offset = offset & !(base::pagesize() - 1);
+    let offset_from_mapping_start = offset - aligned_offset;
+    let extended_size = size + offset_from_mapping_start;
+
+    let mmap = MemoryMappingBuilder::new(extended_size)
+        .offset(aligned_offset as u64)
+        .from_shared_memory(src)
+        .build()
+        .map_err(Error::GuestMmapError)?;
+
+    Ok((mmap, offset_from_mapping_start))
+}
diff --git a/devices/src/virtio/snd/vios_backend/shm_vios.rs b/devices/src/virtio/snd/vios_backend/shm_vios.rs
index 9aec672..5978c2d 100644
--- a/devices/src/virtio/snd/vios_backend/shm_vios.rs
+++ b/devices/src/virtio/snd/vios_backend/shm_vios.rs
@@ -8,9 +8,9 @@
 use base::{
     error, net::UnixSeqpacket, AsRawDescriptor, Error as BaseError, Event, FromRawDescriptor,
     IntoRawDescriptor, MemoryMapping, MemoryMappingBuilder, MmapError, PollToken, SafeDescriptor,
-    ScmSocket, SharedMemory, WaitContext,
+    ScmSocket, WaitContext,
 };
-use data_model::{DataInit, VolatileMemory, VolatileMemoryError};
+use data_model::{DataInit, Le32, Le64, VolatileMemory, VolatileMemoryError, VolatileSlice};
 
 use std::collections::HashMap;
 use std::fs::File;
@@ -75,6 +75,8 @@
     EventCreateError(BaseError),
     #[error("Failed to dup Recv event: {0}")]
     EventDupError(BaseError),
+    #[error("Failed to signal event: {0}")]
+    EventWriteError(BaseError),
     #[error("Failed to create Recv thread's WaitContext: {0}")]
     WaitContextCreateError(BaseError),
     #[error("Error waiting for events")]
@@ -107,7 +109,8 @@
     event_socket: Mutex<UnixSeqpacket>,
     tx: Mutex<IoBufferQueue>,
     rx: Mutex<IoBufferQueue>,
-    rx_subscribers: Arc<Mutex<HashMap<usize, Sender<(u32, usize)>>>>,
+    tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
+    rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
     recv_running: Arc<Mutex<bool>>,
     recv_event: Mutex<Event>,
     recv_thread: Mutex<Option<JoinHandle<Result<()>>>>,
@@ -180,7 +183,9 @@
             ));
         }
 
-        let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<(u32, usize)>>>> =
+        let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
+            Arc::new(Mutex::new(HashMap::new()));
+        let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
             Arc::new(Mutex::new(HashMap::new()));
         let recv_running = Arc::new(Mutex::new(true));
         let recv_event = Event::new().map_err(Error::EventCreateError)?;
@@ -192,6 +197,7 @@
             event_socket: Mutex::new(event_socket),
             tx: Mutex::new(IoBufferQueue::new(tx_socket, tx_shm_file)?),
             rx: Mutex::new(IoBufferQueue::new(rx_socket, rx_shm_file)?),
+            tx_subscribers,
             rx_subscribers,
             recv_running,
             recv_event: Mutex::new(recv_event),
@@ -201,7 +207,34 @@
         Ok(client)
     }
 
-    pub fn ensure_bg_thread_started(&self) -> Result<()> {
+    /// Get the number of jacks
+    pub fn num_jacks(&self) -> u32 {
+        self.config.jacks
+    }
+
+    /// Get the number of pcm streams
+    pub fn num_streams(&self) -> u32 {
+        self.config.streams
+    }
+
+    /// Get the number of channel maps
+    pub fn num_chmaps(&self) -> u32 {
+        self.config.chmaps
+    }
+
+    /// Get the configuration information on a pcm stream
+    pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {
+        self.streams
+            .lock()
+            .get(idx as usize)
+            .map(virtio_snd_pcm_info::from)
+    }
+
+    /// Starts the background thread that receives release messages from the server. If the thread
+    /// was already started this function does nothing.
+    /// This thread must be started prior to attempting any stream IO operation or the calling
+    /// thread would block.
+    pub fn start_bg_thread(&self) -> Result<()> {
         if self.recv_thread.lock().is_some() {
             return Ok(());
         }
@@ -210,6 +243,12 @@
             .lock()
             .try_clone()
             .map_err(Error::EventDupError)?;
+        let tx_socket = self
+            .tx
+            .lock()
+            .socket
+            .try_clone()
+            .map_err(Error::UnixSeqpacketDupError)?;
         let rx_socket = self
             .rx
             .lock()
@@ -221,15 +260,39 @@
         // while duplicating the fds. So we have to check again the condition.
         if opt.is_none() {
             *opt = Some(spawn_recv_thread(
+                self.tx_subscribers.clone(),
                 self.rx_subscribers.clone(),
                 event_socket,
                 self.recv_running.clone(),
+                tx_socket,
                 rx_socket,
             ));
         }
         Ok(())
     }
 
+    /// Stops the background thread.
+    pub fn stop_bg_thread(&self) -> Result<()> {
+        if self.recv_thread.lock().is_none() {
+            return Ok(());
+        }
+        *self.recv_running.lock() = false;
+        self.recv_event
+            .lock()
+            .write(1u64)
+            .map_err(Error::EventWriteError)?;
+        if let Some(handle) = self.recv_thread.lock().take() {
+            return match handle.join() {
+                Ok(r) => r,
+                Err(e) => {
+                    error!("Recv thread panicked: {:?}", e);
+                    Ok(())
+                }
+            };
+        }
+        Ok(())
+    }
+
     /// Gets an unused stream id of the specified direction. `direction` must be one of
     /// VIRTIO_SND_D_INPUT OR VIRTIO_SND_D_OUTPUT.
     pub fn get_unused_stream_id(&self, direction: u8) -> Option<u32> {
@@ -254,13 +317,26 @@
         Ok(())
     }
 
+    /// Configures a stream with the given parameters.
+    pub fn set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()> {
+        let stream_id = raw_params.hdr.stream_id.to_native();
+        self.validate_stream_id(
+            stream_id,
+            &[StreamState::Available, StreamState::Acquired],
+            None,
+        )?;
+        self.send_cmd(raw_params)?;
+        self.streams.lock()[stream_id as usize].state = StreamState::Acquired;
+        Ok(())
+    }
+
     /// Send the PREPARE_STREAM command to the server.
     pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {
         self.common_stream_op(
             stream_id,
             &[StreamState::Available, StreamState::Acquired],
             StreamState::Acquired,
-            STREAM_PREPARE,
+            VIRTIO_SND_R_PCM_PREPARE,
         )
     }
 
@@ -270,7 +346,7 @@
             stream_id,
             &[StreamState::Acquired],
             StreamState::Available,
-            STREAM_RELEASE,
+            VIRTIO_SND_R_PCM_RELEASE,
         )
     }
 
@@ -280,7 +356,7 @@
             stream_id,
             &[StreamState::Acquired],
             StreamState::Active,
-            STREAM_START,
+            VIRTIO_SND_R_PCM_START,
         )
     }
 
@@ -290,40 +366,53 @@
             stream_id,
             &[StreamState::Active],
             StreamState::Acquired,
-            STREAM_STOP,
+            VIRTIO_SND_R_PCM_STOP,
         )
     }
 
-    /// Send audio frames to the server. The audio data is taken from a shared memory resource.
-    pub fn inject_audio_data(
+    /// Send audio frames to the server. Blocks the calling thread until the server acknowledges
+    /// the data.
+    pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(
         &self,
         stream_id: u32,
-        buffer: &mut SharedMemory,
-        src_offset: usize,
         size: usize,
-    ) -> Result<()> {
+        callback: Cb,
+    ) -> Result<(u32, R)> {
         self.validate_stream_id(stream_id, &[StreamState::Active], Some(VIRTIO_SND_D_OUTPUT))?;
-        let mut tx_lock = self.tx.lock();
-        let tx = &mut *tx_lock;
-        let dst_offset = tx.push_buffer(buffer, src_offset, size)?;
-        let msg = IoTransferMsg::new(stream_id, dst_offset, size);
-        seq_socket_send(&tx.socket, msg)
+        let (status_promise, ret) = {
+            let mut tx_lock = self.tx.lock();
+            let tx = &mut *tx_lock;
+            let dst_offset = tx.allocate_buffer(size)?;
+            let buffer_slice = tx.buffer_at(dst_offset, size)?;
+            let ret = callback(buffer_slice);
+            // Register to receive the status before sending the buffer to the server
+            let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) =
+                channel();
+            // It's OK to acquire tx_subscriber's lock after tx_lock
+            self.tx_subscribers.lock().insert(dst_offset, sender);
+            let msg = IoTransferMsg::new(stream_id, dst_offset, size);
+            seq_socket_send(&tx.socket, msg)?;
+            (receiver, ret)
+        };
+        let (_, latency) = await_status(status_promise)?;
+        Ok((latency, ret))
     }
 
-    pub fn request_audio_data(
+    /// Request audio frames from the server. It blocks until the data is available.
+    pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(
         &self,
         stream_id: u32,
-        buffer: &mut SharedMemory,
-        dst_offset: usize,
         size: usize,
-    ) -> Result<usize> {
+        callback: Cb,
+    ) -> Result<(u32, R)> {
         self.validate_stream_id(stream_id, &[StreamState::Active], Some(VIRTIO_SND_D_INPUT))?;
         let (src_offset, status_promise) = {
             let mut rx_lock = self.rx.lock();
             let rx = &mut *rx_lock;
             let src_offset = rx.allocate_buffer(size)?;
             // Register to receive the status before sending the buffer to the server
-            let (sender, receiver): (Sender<(u32, usize)>, Receiver<(u32, usize)>) = channel();
+            let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) =
+                channel();
             // It's OK to acquire rx_subscriber's lock after rx_lock
             self.rx_subscribers.lock().insert(src_offset, sender);
             let msg = IoTransferMsg::new(stream_id, src_offset, size);
@@ -331,12 +420,11 @@
             (src_offset, receiver)
         };
         // Make sure no mutexes are held while awaiting for the buffer to be written to
-        let recv_size = await_status(status_promise)?;
+        let (recv_size, latency) = await_status(status_promise)?;
         {
             let mut rx_lock = self.rx.lock();
-            rx_lock
-                .pop_buffer(buffer, dst_offset, recv_size, src_offset)
-                .map(|()| recv_size)
+            let buffer_slice = rx_lock.buffer_at(src_offset, recv_size)?;
+            Ok((latency, callback(&buffer_slice)))
         }
     }
 
@@ -418,7 +506,7 @@
         let info_size = std::mem::size_of::<virtio_snd_pcm_info>();
         let req = virtio_snd_query_info {
             hdr: virtio_snd_hdr {
-                code: STREAM_INFO.into(),
+                code: VIRTIO_SND_R_PCM_INFO.into(),
             },
             start_id: 0u32.into(),
             count: (num_streams as u32).into(),
@@ -451,20 +539,8 @@
 
 impl Drop for VioSClient {
     fn drop(&mut self) {
-        // Stop the recv thread
-        *self.recv_running.lock() = false;
-        if let Err(e) = self.recv_event.lock().write(1u64) {
-            error!("Failed to notify recv thread: {:?}", e);
-        }
-        if let Some(handle) = self.recv_thread.lock().take() {
-            match handle.join() {
-                Ok(r) => {
-                    if let Err(e) = r {
-                        error!("Error detected on Recv Thread: {}", e);
-                    }
-                }
-                Err(e) => error!("Recv thread panicked: {:?}", e),
-            };
+        if let Err(e) = self.stop_bg_thread() {
+            error!("Error stopping Recv thread: {}", e);
         }
     }
 }
@@ -472,60 +548,75 @@
 #[derive(PollToken)]
 enum Token {
     Notification,
+    TxBufferMsg,
     RxBufferMsg,
 }
 
+fn recv_buffer_status_msg(
+    socket: &UnixSeqpacket,
+    subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
+) -> Result<()> {
+    let mut msg: IoStatusMsg = Default::default();
+    let size = socket
+        .recv(msg.as_mut_slice())
+        .map_err(Error::ServerIOError)?;
+    if size != std::mem::size_of::<IoStatusMsg>() {
+        return Err(Error::ProtocolError(
+            ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),
+        ));
+    }
+    let mut status = msg.status.status.into();
+    if status == u32::MAX {
+        // Anyone waiting for this would continue to wait for as long as status is
+        // u32::MAX
+        status -= 1;
+    }
+    let latency = msg.status.latency_bytes.into();
+    let offset = msg.buffer_offset as usize;
+    let consumed_len = msg.consumed_len as usize;
+    let promise_opt = subscribers.lock().remove(&offset);
+    match promise_opt {
+        None => error!(
+            "Received an unexpected buffer status message: {}. This is a BUG!!",
+            offset
+        ),
+        Some(sender) => {
+            if let Err(e) = sender.send(BufferReleaseMsg {
+                status,
+                latency,
+                consumed_len,
+            }) {
+                error!("Failed to notify waiting thread: {:?}", e);
+            }
+        }
+    }
+    Ok(())
+}
+
 fn spawn_recv_thread(
-    rx_subscribers: Arc<Mutex<HashMap<usize, Sender<(u32, usize)>>>>,
+    tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
+    rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
     event: Event,
     running: Arc<Mutex<bool>>,
+    tx_socket: UnixSeqpacket,
     rx_socket: UnixSeqpacket,
 ) -> JoinHandle<Result<()>> {
     std::thread::spawn(move || {
         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
+            (&tx_socket, Token::TxBufferMsg),
             (&rx_socket, Token::RxBufferMsg),
             (&event, Token::Notification),
         ])
         .map_err(Error::WaitContextCreateError)?;
-        while *running.lock() {
+        loop {
+            if !*running.lock() {
+                break;
+            }
             let events = wait_ctx.wait().map_err(Error::WaitError)?;
             for evt in events {
                 match evt.token {
-                    Token::RxBufferMsg => {
-                        let mut msg: IoStatusMsg = Default::default();
-                        let size = rx_socket
-                            .recv(msg.as_mut_slice())
-                            .map_err(Error::ServerIOError)?;
-                        if size != std::mem::size_of::<IoStatusMsg>() {
-                            return Err(Error::ProtocolError(
-                                ProtocolErrorKind::UnexpectedMessageSize(
-                                    std::mem::size_of::<IoStatusMsg>(),
-                                    size,
-                                ),
-                            ));
-                        }
-                        let mut status = msg.status.status.into();
-                        if status == u32::MAX {
-                            // Anyone waiting for this would continue to wait for as long as status is
-                            // u32::MAX
-                            status -= 1;
-                        }
-                        let offset = msg.buffer_offset as usize;
-                        let consumed_len = msg.consumed_len as usize;
-                        // Acquire and immediately release the mutex protecting the hashmap
-                        let promise_opt = rx_subscribers.lock().remove(&offset);
-                        match promise_opt {
-                            None => error!(
-                                "Received an unexpected buffer status message: {}. This is a BUG!!",
-                                offset
-                            ),
-                            Some(sender) => {
-                                if let Err(e) = sender.send((status, consumed_len)) {
-                                    error!("Failed to notify waiting thread: {:?}", e);
-                                }
-                            }
-                        }
-                    }
+                    Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,
+                    Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,
                     Token::Notification => {
                         // Just consume the notification and check for termination on the next
                         // iteration
@@ -540,10 +631,14 @@
     })
 }
 
-fn await_status(promise: Receiver<(u32, usize)>) -> Result<usize> {
-    let (status, consumed_len) = promise.recv().map_err(Error::BufferStatusSenderLost)?;
+fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {
+    let BufferReleaseMsg {
+        status,
+        latency,
+        consumed_len,
+    } = promise.recv().map_err(Error::BufferStatusSenderLost)?;
     if status == VIRTIO_SND_S_OK {
-        Ok(consumed_len)
+        Ok((consumed_len, latency))
     } else {
         Err(Error::IOBufferError(status))
     }
@@ -589,37 +684,10 @@
         Ok(offset)
     }
 
-    fn push_buffer(&mut self, src: &mut SharedMemory, offset: usize, size: usize) -> Result<usize> {
-        let shm_offset = self.allocate_buffer(size)?;
-        let (src_mmap, mmap_offset) = mmap_buffer(src, offset, size)?;
-        let src_slice = src_mmap
-            .get_slice(mmap_offset, size)
-            .map_err(Error::VolatileMemoryError)?;
-        let dst_slice = self
-            .mmap
-            .get_slice(shm_offset, size)
-            .map_err(Error::VolatileMemoryError)?;
-        src_slice.copy_to_volatile_slice(dst_slice);
-        Ok(shm_offset)
-    }
-
-    fn pop_buffer(
-        &mut self,
-        dst: &mut SharedMemory,
-        dst_offset: usize,
-        size: usize,
-        src_offset: usize,
-    ) -> Result<()> {
-        let (dst_mmap, mmap_offset) = mmap_buffer(dst, dst_offset, size)?;
-        let dst_slice = dst_mmap
-            .get_slice(mmap_offset, size)
-            .map_err(Error::VolatileMemoryError)?;
-        let src_slice = self
-            .mmap
-            .get_slice(src_offset, size)
-            .map_err(Error::VolatileMemoryError)?;
-        src_slice.copy_to_volatile_slice(dst_slice);
-        Ok(())
+    fn buffer_at(&mut self, offset: usize, len: usize) -> Result<VolatileSlice> {
+        self.mmap
+            .get_slice(offset, len)
+            .map_err(Error::VolatileMemoryError)
     }
 }
 
@@ -652,6 +720,23 @@
     }
 }
 
+impl std::convert::From<&VioSStreamInfo> for virtio_snd_pcm_info {
+    fn from(info: &VioSStreamInfo) -> virtio_snd_pcm_info {
+        virtio_snd_pcm_info {
+            hdr: virtio_snd_info {
+                hda_fn_nid: Le32::from(info.hda_fn_nid),
+            },
+            features: Le32::from(info.features),
+            formats: Le64::from(info.formats),
+            rates: Le64::from(info.rates),
+            direction: info.direction,
+            channels_min: info.channels_min,
+            channels_max: info.channels_max,
+            padding: [0u8; 5],
+        }
+    }
+}
+
 #[derive(PartialEq, Debug, Copy, Clone)]
 pub enum StreamState {
     Available,
@@ -674,7 +759,7 @@
         virtio_snd_pcm_set_params {
             hdr: virtio_snd_pcm_hdr {
                 hdr: virtio_snd_hdr {
-                    code: STREAM_SET_PARAMS.into(),
+                    code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),
                 },
                 stream_id: self.0.into(),
             },
@@ -689,28 +774,6 @@
     }
 }
 
-/// Memory map a shared memory object to access an audio buffer. The buffer may not be located at an
-/// offset aligned to page size, so the offset within the mapped region is returned along with the
-/// MemoryMapping struct.
-fn mmap_buffer(
-    src: &mut SharedMemory,
-    offset: usize,
-    size: usize,
-) -> Result<(MemoryMapping, usize)> {
-    // If the buffer is not aligned to page size a bigger region needs to be mapped.
-    let aligned_offset = offset & !(base::pagesize() - 1);
-    let offset_from_mapping_start = offset - aligned_offset;
-    let extended_size = size + offset_from_mapping_start;
-
-    let mmap = MemoryMappingBuilder::new(extended_size)
-        .offset(aligned_offset as u64)
-        .from_shared_memory(src)
-        .build()
-        .map_err(Error::GuestMmapError)?;
-
-    Ok((mmap, offset_from_mapping_start))
-}
-
 fn recv_cmd_status(control_socket: &mut UnixSeqpacket) -> Result<()> {
     let mut status: virtio_snd_hdr = Default::default();
     control_socket
@@ -752,6 +815,12 @@
 // Safe because it only has data and has no implicit padding.
 unsafe impl DataInit for VioSConfig {}
 
+struct BufferReleaseMsg {
+    status: u32,
+    latency: u32,
+    consumed_len: usize,
+}
+
 #[repr(C)]
 #[derive(Copy, Clone)]
 struct IoTransferMsg {