blob: 17dd32fd0edcfcad44a790f3b57b9284382edaf8 [file] [log] [blame]
// Copyright 2020 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::HashMap;
use std::collections::VecDeque;
use std::fs::File;
use std::io::Error as IOError;
use std::io::ErrorKind as IOErrorKind;
use std::io::IoSliceMut;
use std::io::Seek;
use std::io::SeekFrom;
use std::os::unix::io::RawFd;
use std::path::Path;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvError;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::JoinHandle;
use base::error;
use base::AsRawDescriptor;
use base::Error as BaseError;
use base::Event;
use base::EventToken;
use base::FromRawDescriptor;
use base::IntoRawDescriptor;
use base::MemoryMapping;
use base::MemoryMappingBuilder;
use base::MmapError;
use base::RawDescriptor;
use base::SafeDescriptor;
use base::ScmSocket;
use base::UnixSeqpacket;
use base::WaitContext;
use data_model::VolatileMemory;
use data_model::VolatileMemoryError;
use data_model::VolatileSlice;
use remain::sorted;
use sync::Mutex;
use thiserror::Error as ThisError;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
use crate::virtio::snd::constants::*;
use crate::virtio::snd::layout::*;
pub type Result<T> = std::result::Result<T, Error>;
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
#[error("Error memory mapping client_shm: {0}")]
BaseMmapError(BaseError),
#[error("Sender was dropped without sending buffer status, the recv thread may have exited")]
BufferStatusSenderLost(RecvError),
#[error("Command failed with status {0}")]
CommandFailed(u32),
#[error("Error duplicating file descriptor: {0}")]
DupError(BaseError),
#[error("Failed to create Recv event: {0}")]
EventCreateError(BaseError),
#[error("Failed to dup Recv event: {0}")]
EventDupError(BaseError),
#[error("Failed to signal event: {0}")]
EventWriteError(BaseError),
#[error("Failed to get size of tx shared memory: {0}")]
FileSizeError(IOError),
#[error("Error accessing guest's shared memory: {0}")]
GuestMmapError(MmapError),
#[error("No jack with id {0}")]
InvalidJackId(u32),
#[error("No stream with id {0}")]
InvalidStreamId(u32),
#[error("IO buffer operation failed: status = {0}")]
IOBufferError(u32),
#[error("No PCM streams available")]
NoStreamsAvailable,
#[error("Insuficient space for the new buffer in the queue's buffer area")]
OutOfSpace,
#[error("Platform not supported")]
PlatformNotSupported,
#[error("{0}")]
ProtocolError(ProtocolErrorKind),
#[error("Failed to connect to VioS server {1}: {0:?}")]
ServerConnectionError(IOError, PathBuf),
#[error("Failed to communicate with VioS server: {0:?}")]
ServerError(BaseError),
#[error("Failed to communicate with VioS server: {0:?}")]
ServerIOError(IOError),
#[error("Error accessing VioS server's shared memory: {0}")]
ServerMmapError(MmapError),
#[error("Failed to duplicate UnixSeqpacket: {0}")]
UnixSeqpacketDupError(IOError),
#[error("Unsupported frame rate: {0}")]
UnsupportedFrameRate(u32),
#[error("Error accessing volatile memory: {0}")]
VolatileMemoryError(VolatileMemoryError),
#[error("Failed to create Recv thread's WaitContext: {0}")]
WaitContextCreateError(BaseError),
#[error("Error waiting for events")]
WaitError(BaseError),
#[error("Invalid operation for stream direction: {0}")]
WrongDirection(u8),
}
#[derive(ThisError, Debug)]
pub enum ProtocolErrorKind {
#[error("The server sent a config of the wrong size: {0}")]
UnexpectedConfigSize(usize),
#[error("Received {1} file descriptors from the server, expected {0}")]
UnexpectedNumberOfFileDescriptors(usize, usize), // expected, received
#[error("Server's version ({0}) doesn't match client's")]
VersionMismatch(u32),
#[error("Received a msg with an unexpected size: expected {0}, received {1}")]
UnexpectedMessageSize(usize, usize), // expected, received
}
/// The client for the VioS backend
///
/// Uses a protocol equivalent to virtio-snd over a shared memory file and a unix socket for
/// notifications. It's thread safe, it can be encapsulated in an Arc smart pointer and shared
/// between threads.
pub struct VioSClient {
// These mutexes should almost never be held simultaneously. If at some point they have to the
// locking order should match the order in which they are declared here.
config: VioSConfig,
jacks: Vec<virtio_snd_jack_info>,
streams: Vec<virtio_snd_pcm_info>,
chmaps: Vec<virtio_snd_chmap_info>,
// The control socket is used from multiple threads to send and wait for a reply, which needs
// to happen atomically, hence the need for a mutex instead of just sharing clones of the
// socket.
control_socket: Mutex<UnixSeqpacket>,
event_socket: UnixSeqpacket,
// These are thread safe and don't require locking
tx: IoBufferQueue,
rx: IoBufferQueue,
// This is accessed by the recv_thread and whatever thread processes the events
events: Arc<Mutex<VecDeque<virtio_snd_event>>>,
event_notifier: Event,
// These are accessed by the recv_thread and the stream threads
tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
recv_thread_state: Arc<Mutex<ThreadFlags>>,
recv_event: Event,
recv_thread: Mutex<Option<JoinHandle<Result<()>>>>,
}
impl VioSClient {
/// Create a new client given the path to the audio server's socket.
pub fn try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient> {
let client_socket = UnixSeqpacket::connect(server.as_ref())
.map_err(|e| Error::ServerConnectionError(e, server.as_ref().into()))?;
let mut config: VioSConfig = Default::default();
let mut fds: Vec<RawFd> = Vec::new();
const NUM_FDS: usize = 5;
fds.resize(NUM_FDS, 0);
let (recv_size, fd_count) = client_socket
.recv_with_fds(IoSliceMut::new(config.as_bytes_mut()), &mut fds)
.map_err(Error::ServerError)?;
// Resize the vector to the actual number of file descriptors received and wrap them in
// SafeDescriptors to prevent leaks
fds.resize(fd_count, -1);
let mut safe_fds: Vec<SafeDescriptor> = fds
.into_iter()
.map(|fd| unsafe {
// safe because the SafeDescriptor object completely assumes ownership of the fd.
SafeDescriptor::from_raw_descriptor(fd)
})
.collect();
if recv_size != std::mem::size_of::<VioSConfig>() {
return Err(Error::ProtocolError(
ProtocolErrorKind::UnexpectedConfigSize(recv_size),
));
}
if config.version != VIOS_VERSION {
return Err(Error::ProtocolError(ProtocolErrorKind::VersionMismatch(
config.version,
)));
}
fn pop<T: FromRawDescriptor>(
safe_fds: &mut Vec<SafeDescriptor>,
expected: usize,
received: usize,
) -> Result<T> {
unsafe {
// Safe because we transfer ownership from the SafeDescriptor to T
Ok(T::from_raw_descriptor(
safe_fds
.pop()
.ok_or(Error::ProtocolError(
ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(
expected, received,
),
))?
.into_raw_descriptor(),
))
}
}
let rx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
let tx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
let rx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
let tx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
let event_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
if !safe_fds.is_empty() {
return Err(Error::ProtocolError(
ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(NUM_FDS, fd_count),
));
}
let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
Arc::new(Mutex::new(HashMap::new()));
let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
Arc::new(Mutex::new(HashMap::new()));
let recv_thread_state = Arc::new(Mutex::new(ThreadFlags {
reporting_events: false,
}));
let recv_event = Event::new().map_err(Error::EventCreateError)?;
let mut client = VioSClient {
config,
jacks: Vec::new(),
streams: Vec::new(),
chmaps: Vec::new(),
control_socket: Mutex::new(client_socket),
event_socket,
tx: IoBufferQueue::new(tx_socket, tx_shm_file)?,
rx: IoBufferQueue::new(rx_socket, rx_shm_file)?,
events: Arc::new(Mutex::new(VecDeque::new())),
event_notifier: Event::new().map_err(Error::EventCreateError)?,
tx_subscribers,
rx_subscribers,
recv_thread_state,
recv_event,
recv_thread: Mutex::new(None),
};
client.request_and_cache_info()?;
Ok(client)
}
/// Get the number of jacks
pub fn num_jacks(&self) -> u32 {
self.config.jacks
}
/// Get the number of pcm streams
pub fn num_streams(&self) -> u32 {
self.config.streams
}
/// Get the number of channel maps
pub fn num_chmaps(&self) -> u32 {
self.config.chmaps
}
/// Get the configuration information on a jack
pub fn jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info> {
self.jacks.get(idx as usize).copied()
}
/// Get the configuration information on a pcm stream
pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {
self.streams.get(idx as usize).cloned()
}
/// Get the configuration information on a channel map
pub fn chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info> {
self.chmaps.get(idx as usize).copied()
}
/// Starts the background thread that receives release messages from the server. If the thread
/// was already started this function does nothing.
/// This thread must be started prior to attempting any stream IO operation or the calling
/// thread would block.
pub fn start_bg_thread(&self) -> Result<()> {
if self.recv_thread.lock().is_some() {
return Ok(());
}
let recv_event = self.recv_event.try_clone().map_err(Error::EventDupError)?;
let tx_socket = self.tx.try_clone_socket()?;
let rx_socket = self.rx.try_clone_socket()?;
let event_socket = self
.event_socket
.try_clone()
.map_err(Error::UnixSeqpacketDupError)?;
let mut opt = self.recv_thread.lock();
// The lock on recv_thread was released above to avoid holding more than one lock at a time
// while duplicating the fds. So we have to check the condition again.
if opt.is_none() {
*opt = Some(spawn_recv_thread(
self.tx_subscribers.clone(),
self.rx_subscribers.clone(),
self.event_notifier
.try_clone()
.map_err(Error::EventDupError)?,
self.events.clone(),
recv_event,
self.recv_thread_state.clone(),
tx_socket,
rx_socket,
event_socket,
));
}
Ok(())
}
/// Stops the background thread.
pub fn stop_bg_thread(&self) -> Result<()> {
if self.recv_thread.lock().is_none() {
return Ok(());
}
self.recv_event.signal().map_err(Error::EventWriteError)?;
if let Some(handle) = self.recv_thread.lock().take() {
return match handle.join() {
Ok(r) => r,
Err(e) => {
error!("Recv thread panicked: {:?}", e);
Ok(())
}
};
}
Ok(())
}
/// Gets an Event object that will trigger every time an event is received from the server
pub fn get_event_notifier(&self) -> Result<Event> {
// Let the background thread know that there is at least one consumer of events
self.recv_thread_state.lock().reporting_events = true;
self.event_notifier
.try_clone()
.map_err(Error::EventDupError)
}
/// Retrieves one event. Callers should have received a notification through the event notifier
/// before calling this function.
pub fn pop_event(&self) -> Option<virtio_snd_event> {
self.events.lock().pop_front()
}
/// Remap a jack. This should only be called if the jack announces support for the operation
/// through the features field in the corresponding virtio_snd_jack_info struct.
pub fn remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()> {
if jack_id >= self.config.jacks {
return Err(Error::InvalidJackId(jack_id));
}
let msg = virtio_snd_jack_remap {
hdr: virtio_snd_jack_hdr {
hdr: virtio_snd_hdr {
code: VIRTIO_SND_R_JACK_REMAP.into(),
},
jack_id: jack_id.into(),
},
association: association.into(),
sequence: sequence.into(),
};
let control_socket_lock = self.control_socket.lock();
send_cmd(&control_socket_lock, msg)
}
/// Configures a stream with the given parameters.
pub fn set_stream_parameters(&self, stream_id: u32, params: VioSStreamParams) -> Result<()> {
self.streams
.get(stream_id as usize)
.ok_or(Error::InvalidStreamId(stream_id))?;
let raw_params: virtio_snd_pcm_set_params = (stream_id, params).into();
let control_socket_lock = self.control_socket.lock();
send_cmd(&control_socket_lock, raw_params)
}
/// Configures a stream with the given parameters.
pub fn set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()> {
let stream_id = raw_params.hdr.stream_id.to_native();
self.streams
.get(stream_id as usize)
.ok_or(Error::InvalidStreamId(stream_id))?;
let control_socket_lock = self.control_socket.lock();
send_cmd(&control_socket_lock, raw_params)
}
/// Send the PREPARE_STREAM command to the server.
pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {
self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_PREPARE)
}
/// Send the RELEASE_STREAM command to the server.
pub fn release_stream(&self, stream_id: u32) -> Result<()> {
self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_RELEASE)
}
/// Send the START_STREAM command to the server.
pub fn start_stream(&self, stream_id: u32) -> Result<()> {
self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_START)
}
/// Send the STOP_STREAM command to the server.
pub fn stop_stream(&self, stream_id: u32) -> Result<()> {
self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_STOP)
}
/// Send audio frames to the server. Blocks the calling thread until the server acknowledges
/// the data.
pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(
&self,
stream_id: u32,
size: usize,
callback: Cb,
) -> Result<(u32, R)> {
if self
.streams
.get(stream_id as usize)
.ok_or(Error::InvalidStreamId(stream_id))?
.direction
!= VIRTIO_SND_D_OUTPUT
{
return Err(Error::WrongDirection(VIRTIO_SND_D_OUTPUT));
}
self.streams
.get(stream_id as usize)
.ok_or(Error::InvalidStreamId(stream_id))?;
let dst_offset = self.tx.allocate_buffer(size)?;
let buffer_slice = self.tx.buffer_at(dst_offset, size)?;
let ret = callback(buffer_slice);
// Register to receive the status before sending the buffer to the server
let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
self.tx_subscribers.lock().insert(dst_offset, sender);
self.tx.send_buffer(stream_id, dst_offset, size)?;
let (_, latency) = await_status(receiver)?;
Ok((latency, ret))
}
/// Request audio frames from the server. It blocks until the data is available.
pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(
&self,
stream_id: u32,
size: usize,
callback: Cb,
) -> Result<(u32, R)> {
if self
.streams
.get(stream_id as usize)
.ok_or(Error::InvalidStreamId(stream_id))?
.direction
!= VIRTIO_SND_D_INPUT
{
return Err(Error::WrongDirection(VIRTIO_SND_D_INPUT));
}
let src_offset = self.rx.allocate_buffer(size)?;
// Register to receive the status before sending the buffer to the server
let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
self.rx_subscribers.lock().insert(src_offset, sender);
self.rx.send_buffer(stream_id, src_offset, size)?;
// Make sure no mutexes are held while awaiting for the buffer to be written to
let (recv_size, latency) = await_status(receiver)?;
let buffer_slice = self.rx.buffer_at(src_offset, recv_size)?;
Ok((latency, callback(&buffer_slice)))
}
/// Get a list of file descriptors used by the implementation.
pub fn keep_rds(&self) -> Vec<RawDescriptor> {
let control_desc = self.control_socket.lock().as_raw_descriptor();
let event_desc = self.event_socket.as_raw_descriptor();
let recv_event = self.recv_event.as_raw_descriptor();
let event_notifier = self.event_notifier.as_raw_descriptor();
let mut ret = vec![control_desc, event_desc, recv_event, event_notifier];
ret.append(&mut self.tx.keep_rds());
ret.append(&mut self.rx.keep_rds());
ret
}
fn common_stream_op(&self, stream_id: u32, op: u32) -> Result<()> {
self.streams
.get(stream_id as usize)
.ok_or(Error::InvalidStreamId(stream_id))?;
let msg = virtio_snd_pcm_hdr {
hdr: virtio_snd_hdr { code: op.into() },
stream_id: stream_id.into(),
};
let control_socket_lock = self.control_socket.lock();
send_cmd(&control_socket_lock, msg)
}
fn request_and_cache_info(&mut self) -> Result<()> {
self.request_and_cache_jacks_info()?;
self.request_and_cache_streams_info()?;
self.request_and_cache_chmaps_info()?;
Ok(())
}
fn request_info<T: AsBytes + FromBytes + Default + Copy + Clone>(
&self,
req_code: u32,
count: usize,
) -> Result<Vec<T>> {
let info_size = std::mem::size_of::<T>();
let status_size = std::mem::size_of::<virtio_snd_hdr>();
let req = virtio_snd_query_info {
hdr: virtio_snd_hdr {
code: req_code.into(),
},
start_id: 0u32.into(),
count: (count as u32).into(),
size: (std::mem::size_of::<virtio_snd_query_info>() as u32).into(),
};
let control_socket_lock = self.control_socket.lock();
seq_socket_send(&control_socket_lock, req)?;
let reply = control_socket_lock
.recv_as_vec()
.map_err(Error::ServerIOError)?;
let mut status: virtio_snd_hdr = Default::default();
status
.as_bytes_mut()
.copy_from_slice(&reply[0..status_size]);
if status.code.to_native() != VIRTIO_SND_S_OK {
return Err(Error::CommandFailed(status.code.to_native()));
}
if reply.len() != status_size + count * info_size {
return Err(Error::ProtocolError(
ProtocolErrorKind::UnexpectedMessageSize(count * info_size, reply.len()),
));
}
Ok(reply[status_size..]
.chunks(info_size)
.map(|info_buffer| {
let mut info: T = Default::default();
// Need to use copy_from_slice instead of T::from_slice because the info_buffer may
// not be aligned correctly
info.as_bytes_mut().copy_from_slice(info_buffer);
info
})
.collect())
}
fn request_and_cache_jacks_info(&mut self) -> Result<()> {
let num_jacks = self.config.jacks as usize;
if num_jacks == 0 {
return Ok(());
}
self.jacks = self.request_info(VIRTIO_SND_R_JACK_INFO, num_jacks)?;
Ok(())
}
fn request_and_cache_streams_info(&mut self) -> Result<()> {
let num_streams = self.config.streams as usize;
if num_streams == 0 {
return Ok(());
}
self.streams = self.request_info(VIRTIO_SND_R_PCM_INFO, num_streams)?;
Ok(())
}
fn request_and_cache_chmaps_info(&mut self) -> Result<()> {
let num_chmaps = self.config.chmaps as usize;
if num_chmaps == 0 {
return Ok(());
}
self.chmaps = self.request_info(VIRTIO_SND_R_CHMAP_INFO, num_chmaps)?;
Ok(())
}
}
impl Drop for VioSClient {
fn drop(&mut self) {
if let Err(e) = self.stop_bg_thread() {
error!("Error stopping Recv thread: {}", e);
}
}
}
#[derive(Clone, Copy)]
struct ThreadFlags {
reporting_events: bool,
}
#[derive(EventToken)]
enum Token {
Notification,
TxBufferMsg,
RxBufferMsg,
EventMsg,
}
fn recv_buffer_status_msg(
socket: &UnixSeqpacket,
subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
) -> Result<()> {
let mut msg: IoStatusMsg = Default::default();
let size = socket
.recv(msg.as_bytes_mut())
.map_err(Error::ServerIOError)?;
if size != std::mem::size_of::<IoStatusMsg>() {
return Err(Error::ProtocolError(
ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),
));
}
let mut status = msg.status.status.into();
if status == u32::MAX {
// Anyone waiting for this would continue to wait for as long as status is
// u32::MAX
status -= 1;
}
let latency = msg.status.latency_bytes.into();
let offset = msg.buffer_offset as usize;
let consumed_len = msg.consumed_len as usize;
let promise_opt = subscribers.lock().remove(&offset);
match promise_opt {
None => error!(
"Received an unexpected buffer status message: {}. This is a BUG!!",
offset
),
Some(sender) => {
if let Err(e) = sender.send(BufferReleaseMsg {
status,
latency,
consumed_len,
}) {
error!("Failed to notify waiting thread: {:?}", e);
}
}
}
Ok(())
}
fn recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event> {
let mut msg: virtio_snd_event = Default::default();
let size = socket
.recv(msg.as_bytes_mut())
.map_err(Error::ServerIOError)?;
if size != std::mem::size_of::<virtio_snd_event>() {
return Err(Error::ProtocolError(
ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<virtio_snd_event>(), size),
));
}
Ok(msg)
}
fn spawn_recv_thread(
tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
event_notifier: Event,
event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>,
event: Event,
state: Arc<Mutex<ThreadFlags>>,
tx_socket: UnixSeqpacket,
rx_socket: UnixSeqpacket,
event_socket: UnixSeqpacket,
) -> JoinHandle<Result<()>> {
std::thread::spawn(move || {
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&tx_socket, Token::TxBufferMsg),
(&rx_socket, Token::RxBufferMsg),
(&event_socket, Token::EventMsg),
(&event, Token::Notification),
])
.map_err(Error::WaitContextCreateError)?;
let mut running = true;
while running {
let events = wait_ctx.wait().map_err(Error::WaitError)?;
for evt in events {
match evt.token {
Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,
Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,
Token::EventMsg => {
let evt = recv_event(&event_socket)?;
let state_cpy = *state.lock();
if state_cpy.reporting_events {
event_queue.lock().push_back(evt);
event_notifier.signal().map_err(Error::EventWriteError)?;
} // else just drop the events
}
Token::Notification => {
// Just consume the notification and check for termination on the next
// iteration
if let Err(e) = event.wait() {
error!("Failed to consume notification from recv thread: {:?}", e);
}
running = false;
}
}
}
}
Ok(())
})
}
fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {
let BufferReleaseMsg {
status,
latency,
consumed_len,
} = promise.recv().map_err(Error::BufferStatusSenderLost)?;
if status == VIRTIO_SND_S_OK {
Ok((consumed_len, latency))
} else {
Err(Error::IOBufferError(status))
}
}
struct IoBufferQueue {
socket: UnixSeqpacket,
file: File,
mmap: MemoryMapping,
size: usize,
next: Mutex<usize>,
}
impl IoBufferQueue {
fn new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue> {
let size = file.seek(SeekFrom::End(0)).map_err(Error::FileSizeError)? as usize;
let mmap = MemoryMappingBuilder::new(size)
.from_file(&file)
.build()
.map_err(Error::ServerMmapError)?;
Ok(IoBufferQueue {
socket,
file,
mmap,
size,
next: Mutex::new(0),
})
}
fn allocate_buffer(&self, size: usize) -> Result<usize> {
if size > self.size {
return Err(Error::OutOfSpace);
}
let mut next_lock = self.next.lock();
let offset = if size > self.size - *next_lock {
// Can't fit the new buffer at the end of the area, so put it at the beginning
0
} else {
*next_lock
};
*next_lock = offset + size;
Ok(offset)
}
fn buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice> {
self.mmap
.get_slice(offset, len)
.map_err(Error::VolatileMemoryError)
}
fn try_clone_socket(&self) -> Result<UnixSeqpacket> {
self.socket
.try_clone()
.map_err(Error::UnixSeqpacketDupError)
}
fn send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()> {
let msg = IoTransferMsg::new(stream_id, offset, size);
seq_socket_send(&self.socket, msg)
}
fn keep_rds(&self) -> Vec<RawDescriptor> {
vec![
self.file.as_raw_descriptor(),
self.socket.as_raw_descriptor(),
]
}
}
/// Groups the parameters used to configure a stream prior to using it.
pub struct VioSStreamParams {
pub buffer_bytes: u32,
pub period_bytes: u32,
pub features: u32,
pub channels: u8,
pub format: u8,
pub rate: u8,
}
impl From<(u32, VioSStreamParams)> for virtio_snd_pcm_set_params {
fn from(val: (u32, VioSStreamParams)) -> Self {
virtio_snd_pcm_set_params {
hdr: virtio_snd_pcm_hdr {
hdr: virtio_snd_hdr {
code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),
},
stream_id: val.0.into(),
},
buffer_bytes: val.1.buffer_bytes.into(),
period_bytes: val.1.period_bytes.into(),
features: val.1.features.into(),
channels: val.1.channels,
format: val.1.format,
rate: val.1.rate,
padding: 0u8,
}
}
}
fn send_cmd<T: AsBytes>(control_socket: &UnixSeqpacket, data: T) -> Result<()> {
seq_socket_send(control_socket, data)?;
recv_cmd_status(control_socket)
}
fn recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()> {
let mut status: virtio_snd_hdr = Default::default();
control_socket
.recv(status.as_bytes_mut())
.map_err(Error::ServerIOError)?;
if status.code.to_native() == VIRTIO_SND_S_OK {
Ok(())
} else {
Err(Error::CommandFailed(status.code.to_native()))
}
}
fn seq_socket_send<T: AsBytes>(socket: &UnixSeqpacket, data: T) -> Result<()> {
loop {
let send_res = socket.send(data.as_bytes());
if let Err(e) = send_res {
match e.kind() {
// Retry if interrupted
IOErrorKind::Interrupted => continue,
_ => return Err(Error::ServerIOError(e)),
}
}
// Success
break;
}
Ok(())
}
const VIOS_VERSION: u32 = 2;
#[repr(C)]
#[derive(Copy, Clone, Default, AsBytes, FromBytes)]
struct VioSConfig {
version: u32,
jacks: u32,
streams: u32,
chmaps: u32,
}
struct BufferReleaseMsg {
status: u32,
latency: u32,
consumed_len: usize,
}
#[repr(C)]
#[derive(Copy, Clone, AsBytes, FromBytes)]
struct IoTransferMsg {
io_xfer: virtio_snd_pcm_xfer,
buffer_offset: u32,
buffer_len: u32,
}
impl IoTransferMsg {
fn new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg {
IoTransferMsg {
io_xfer: virtio_snd_pcm_xfer {
stream_id: stream_id.into(),
},
buffer_offset: buffer_offset as u32,
buffer_len: buffer_len as u32,
}
}
}
#[repr(C)]
#[derive(Copy, Clone, Default, AsBytes, FromBytes)]
struct IoStatusMsg {
status: virtio_snd_pcm_status,
buffer_offset: u32,
consumed_len: u32,
}