blob: b11626fdeae5aa3188934c220ff03f7924fcf2fb [file] [log] [blame]
// Copyright 2019 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::error;
use std::fmt;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::time::{Duration, Instant};
use sync::{Condvar, Mutex};
use sys_util::SharedMemory;
use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect};
type GenericResult<T> = std::result::Result<T, BoxError>;
/// `BufferSet` is used as a callback mechanism for `ServerRequest` objects.
/// It is meant to be implemented by the audio stream, allowing arbitrary code
/// to be run after a buffer offset and length is set.
pub trait BufferSet {
/// Called when the client sets a buffer offset and length.
///
/// `offset` is the offset within shared memory of the buffer and `frames`
/// indicates the number of audio frames that can be read from or written to
/// the buffer.
fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>;
/// Called when the client ignores a request from the server.
fn ignore(&mut self) -> GenericResult<()>;
}
#[derive(Debug)]
pub enum Error {
TooManyFrames(usize, usize),
}
impl error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::TooManyFrames(provided, requested) => write!(
f,
"Provided number of frames {} exceeds requested number of frames {}",
provided, requested
),
}
}
}
/// `ServerRequest` represents an active request from the server for the client
/// to provide a buffer in shared memory to playback from or capture to.
pub struct ServerRequest<'a> {
requested_frames: usize,
buffer_set: &'a mut dyn BufferSet,
}
impl<'a> ServerRequest<'a> {
/// Create a new ServerRequest object
///
/// Create a ServerRequest object representing a request from the server
/// for a buffer `requested_frames` in size.
///
/// When the client responds to this request by calling
/// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames),
/// BufferSet::callback will be called on `buffer_set`.
///
/// # Arguments
/// * `requested_frames` - The requested buffer size in frames.
/// * `buffer_set` - The object implementing the callback for when a buffer is provided.
pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self {
Self {
requested_frames,
buffer_set,
}
}
/// Get the number of frames of audio data requested by the server.
///
/// The returned value should never be greater than the `buffer_size`
/// given in [`new_stream`](ShmStreamSource::new_stream).
pub fn requested_frames(&self) -> usize {
self.requested_frames
}
/// Sets the buffer offset and length for the requested buffer.
///
/// Sets the buffer offset and length of the buffer that fulfills this
/// server request to `offset` and `length`, respectively. This means that
/// `length` bytes of audio samples may be read from/written to that
/// location in `client_shm` for a playback/capture stream, respectively.
/// This function may only be called once for a `ServerRequest`, at which
/// point the ServerRequest is dropped and no further calls are possible.
///
/// # Arguments
///
/// * `offset` - The value to use as the new buffer offset for the next buffer.
/// * `frames` - The length of the next buffer in frames.
///
/// # Errors
///
/// * If `frames` is greater than `requested_frames`.
pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> {
if frames > self.requested_frames {
return Err(Box::new(Error::TooManyFrames(
frames,
self.requested_frames,
)));
}
self.buffer_set.callback(offset, frames)
}
/// Ignore this request
///
/// If the client does not intend to respond to this ServerRequest with a
/// buffer, they should call this function. The stream will be notified that
/// the request has been ignored and will handle it properly.
pub fn ignore_request(self) -> GenericResult<()> {
self.buffer_set.ignore()
}
}
/// `ShmStream` allows a client to interact with an active CRAS stream.
pub trait ShmStream: Send {
/// Get the size of a frame of audio data for this stream.
fn frame_size(&self) -> usize;
/// Get the number of channels of audio data for this stream.
fn num_channels(&self) -> usize;
/// Get the frame rate of audio data for this stream.
fn frame_rate(&self) -> u32;
/// Waits until the next server message indicating action is required.
///
/// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning
/// that we must set the buffer offset to the next location where playback
/// data can be found.
/// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning
/// that we must set the buffer offset to the next location where captured
/// data can be written to.
/// Will return early if `timeout` elapses before a message is received.
///
/// # Arguments
///
/// * `timeout` - The amount of time to wait until a message is received.
///
/// # Return value
///
/// Returns `Some(request)` where `request` is an object that implements the
/// [`ServerRequest`](ServerRequest) trait and which can be used to get the
/// number of bytes requested for playback streams or that have already been
/// written to shm for capture streams.
///
/// If the timeout occurs before a message is received, returns `None`.
///
/// # Errors
///
/// * If an invalid message type is received for the stream.
fn wait_for_next_action_with_timeout(
&mut self,
timeout: Duration,
) -> GenericResult<Option<ServerRequest>>;
}
/// `ShmStreamSource` creates streams for playback or capture of audio.
pub trait ShmStreamSource: Send {
/// Creates a new [`ShmStream`](ShmStream)
///
/// Creates a new `ShmStream` object, which allows:
/// * Waiting until the server has communicated that data is ready or
/// requested that we make more data available.
/// * Setting the location and length of buffers for reading/writing audio data.
///
/// # Arguments
///
/// * `direction` - The direction of the stream, either `Playback` or `Capture`.
/// * `num_channels` - The number of audio channels for the stream.
/// * `format` - The audio format to use for audio samples.
/// * `frame_rate` - The stream's frame rate in Hz.
/// * `buffer_size` - The maximum size of an audio buffer. This will be the
/// size used for transfers of audio data between client
/// and server.
/// * `effects` - Audio effects to use for the stream, such as echo-cancellation.
/// * `client_shm` - The shared memory area that will contain samples.
/// * `buffer_offsets` - The two initial values to use as buffer offsets
/// for streams. This way, the server will not write
/// audio data to an arbitrary offset in `client_shm`
/// if the client fails to update offsets in time.
///
/// # Errors
///
/// * If sending the connect stream message to the server fails.
#[allow(clippy::too_many_arguments)]
fn new_stream(
&mut self,
direction: StreamDirection,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
effects: &[StreamEffect],
client_shm: &SharedMemory,
buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>>;
/// Get a list of file descriptors used by the implementation.
///
/// Returns any open file descriptors needed by the implementation.
/// This list helps users of the ShmStreamSource enter Linux jails without
/// closing needed file descriptors.
fn keep_fds(&self) -> Vec<RawFd> {
Vec::new()
}
}
/// Class that implements ShmStream trait but does nothing with the samples
pub struct NullShmStream {
num_channels: usize,
frame_rate: u32,
buffer_size: usize,
frame_size: usize,
interval: Duration,
next_frame: Duration,
start_time: Instant,
}
impl NullShmStream {
/// Attempt to create a new NullShmStream with the given number of channels,
/// format, frame_rate, and buffer_size.
pub fn new(
buffer_size: usize,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
) -> Self {
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
Self {
num_channels,
frame_rate,
buffer_size,
frame_size: format.sample_bytes() * num_channels,
interval,
next_frame: interval,
start_time: Instant::now(),
}
}
}
impl BufferSet for NullShmStream {
fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
Ok(())
}
fn ignore(&mut self) -> GenericResult<()> {
Ok(())
}
}
impl ShmStream for NullShmStream {
fn frame_size(&self) -> usize {
self.frame_size
}
fn num_channels(&self) -> usize {
self.num_channels
}
fn frame_rate(&self) -> u32 {
self.frame_rate
}
fn wait_for_next_action_with_timeout(
&mut self,
timeout: Duration,
) -> GenericResult<Option<ServerRequest>> {
let elapsed = self.start_time.elapsed();
if elapsed < self.next_frame {
if timeout < self.next_frame - elapsed {
std::thread::sleep(timeout);
return Ok(None);
} else {
std::thread::sleep(self.next_frame - elapsed);
}
}
self.next_frame += self.interval;
Ok(Some(ServerRequest::new(self.buffer_size, self)))
}
}
/// Source of `NullShmStream` objects.
#[derive(Default)]
pub struct NullShmStreamSource;
impl NullShmStreamSource {
pub fn new() -> Self {
Self::default()
}
}
impl ShmStreamSource for NullShmStreamSource {
fn new_stream(
&mut self,
_direction: StreamDirection,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
_effects: &[StreamEffect],
_client_shm: &SharedMemory,
_buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>> {
let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate);
Ok(Box::new(new_stream))
}
}
#[derive(Clone)]
pub struct MockShmStream {
num_channels: usize,
frame_rate: u32,
request_size: usize,
frame_size: usize,
request_notifier: Arc<(Mutex<bool>, Condvar)>,
}
impl MockShmStream {
/// Attempt to create a new MockShmStream with the given number of
/// channels, frame_rate, format, and buffer_size.
pub fn new(
num_channels: usize,
frame_rate: u32,
format: SampleFormat,
buffer_size: usize,
) -> Self {
Self {
num_channels,
frame_rate,
request_size: buffer_size,
frame_size: format.sample_bytes() * num_channels,
request_notifier: Arc::new((Mutex::new(false), Condvar::new())),
}
}
/// Call to request data from the stream, causing it to return from
/// `wait_for_next_action_with_timeout`. Will block until
/// `set_buffer_offset_and_frames` is called on the ServerRequest returned
/// from `wait_for_next_action_with_timeout`, or until `timeout` elapses.
/// Returns true if a response was successfully received.
pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool {
let &(ref lock, ref cvar) = &*self.request_notifier;
let mut requested = lock.lock();
*requested = true;
cvar.notify_one();
let start_time = Instant::now();
while *requested {
requested = cvar.wait_timeout(requested, timeout).0;
if start_time.elapsed() > timeout {
// We failed to get a callback in time, mark this as false.
*requested = false;
return false;
}
}
true
}
fn notify_request(&mut self) {
let &(ref lock, ref cvar) = &*self.request_notifier;
let mut requested = lock.lock();
*requested = false;
cvar.notify_one();
}
}
impl BufferSet for MockShmStream {
fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
self.notify_request();
Ok(())
}
fn ignore(&mut self) -> GenericResult<()> {
self.notify_request();
Ok(())
}
}
impl ShmStream for MockShmStream {
fn frame_size(&self) -> usize {
self.frame_size
}
fn num_channels(&self) -> usize {
self.num_channels
}
fn frame_rate(&self) -> u32 {
self.frame_rate
}
fn wait_for_next_action_with_timeout(
&mut self,
timeout: Duration,
) -> GenericResult<Option<ServerRequest>> {
{
let start_time = Instant::now();
let &(ref lock, ref cvar) = &*self.request_notifier;
let mut requested = lock.lock();
while !*requested {
requested = cvar.wait_timeout(requested, timeout).0;
if start_time.elapsed() > timeout {
return Ok(None);
}
}
}
Ok(Some(ServerRequest::new(self.request_size, self)))
}
}
/// Source of `MockShmStream` objects.
#[derive(Clone, Default)]
pub struct MockShmStreamSource {
last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>,
}
impl MockShmStreamSource {
pub fn new() -> Self {
Default::default()
}
/// Get the last stream that has been created from this source. If no stream
/// has been created, block until one has.
pub fn get_last_stream(&self) -> MockShmStream {
let &(ref last_stream, ref cvar) = &*self.last_stream;
let mut stream = last_stream.lock();
loop {
match &*stream {
None => stream = cvar.wait(stream),
Some(ref s) => return s.clone(),
};
}
}
}
impl ShmStreamSource for MockShmStreamSource {
fn new_stream(
&mut self,
_direction: StreamDirection,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
_effects: &[StreamEffect],
_client_shm: &SharedMemory,
_buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>> {
let &(ref last_stream, ref cvar) = &*self.last_stream;
let mut stream = last_stream.lock();
let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size);
*stream = Some(new_stream.clone());
cvar.notify_one();
Ok(Box::new(new_stream))
}
}
#[cfg(test)]
pub mod tests {
use super::*;
#[test]
fn mock_trigger_callback() {
let stream_source = MockShmStreamSource::new();
let mut thread_stream_source = stream_source.clone();
let buffer_size = 480;
let num_channels = 2;
let format = SampleFormat::S24LE;
let shm = SharedMemory::anon().expect("Failed to create shm");
let handle = std::thread::spawn(move || {
let mut stream = thread_stream_source
.new_stream(
StreamDirection::Playback,
num_channels,
format,
44100,
buffer_size,
&[],
&shm,
[400, 8000],
)
.expect("Failed to create stream");
let request = stream
.wait_for_next_action_with_timeout(Duration::from_secs(5))
.expect("Failed to wait for next action");
match request {
Some(r) => {
let requested = r.requested_frames();
r.set_buffer_offset_and_frames(872, requested)
.expect("Failed to set buffer offset and frames");
requested
}
None => 0,
}
});
let mut stream = stream_source.get_last_stream();
assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1)));
let requested_frames = handle.join().expect("Failed to join thread");
assert_eq!(requested_frames, buffer_size);
}
#[test]
fn null_consumption_rate() {
let frame_rate = 44100;
let buffer_size = 480;
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
let shm = SharedMemory::anon().expect("Failed to create shm");
let start = Instant::now();
let mut stream_source = NullShmStreamSource::new();
let mut stream = stream_source
.new_stream(
StreamDirection::Playback,
2,
SampleFormat::S24LE,
frame_rate,
buffer_size,
&[],
&shm,
[400, 8000],
)
.expect("Failed to create stream");
let timeout = Duration::from_secs(5);
let request = stream
.wait_for_next_action_with_timeout(timeout)
.expect("Failed to wait for first request")
.expect("First request should not have timed out");
request
.set_buffer_offset_and_frames(276, 480)
.expect("Failed to set buffer offset and length");
// The second call should block until the first buffer is consumed.
let _request = stream
.wait_for_next_action_with_timeout(timeout)
.expect("Failed to wait for second request");
let elapsed = start.elapsed();
assert!(
elapsed > interval,
"wait_for_next_action_with_timeout didn't block long enough: {:?}",
elapsed
);
assert!(
elapsed < timeout,
"wait_for_next_action_with_timeout blocked for too long: {:?}",
elapsed
);
}
}