| // Copyright 2019 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| use std::error; |
| use std::fmt; |
| use std::os::unix::io::RawFd; |
| use std::sync::Arc; |
| use std::time::{Duration, Instant}; |
| |
| use sync::{Condvar, Mutex}; |
| use sys_util::SharedMemory; |
| |
| use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect}; |
| |
| type GenericResult<T> = std::result::Result<T, BoxError>; |
| |
| /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects. |
| /// It is meant to be implemented by the audio stream, allowing arbitrary code |
| /// to be run after a buffer offset and length is set. |
| pub trait BufferSet { |
| /// Called when the client sets a buffer offset and length. |
| /// |
| /// `offset` is the offset within shared memory of the buffer and `frames` |
| /// indicates the number of audio frames that can be read from or written to |
| /// the buffer. |
| fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>; |
| |
| /// Called when the client ignores a request from the server. |
| fn ignore(&mut self) -> GenericResult<()>; |
| } |
| |
| #[derive(Debug)] |
| pub enum Error { |
| TooManyFrames(usize, usize), |
| } |
| |
| impl error::Error for Error {} |
| |
| impl fmt::Display for Error { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| match self { |
| Error::TooManyFrames(provided, requested) => write!( |
| f, |
| "Provided number of frames {} exceeds requested number of frames {}", |
| provided, requested |
| ), |
| } |
| } |
| } |
| |
| /// `ServerRequest` represents an active request from the server for the client |
| /// to provide a buffer in shared memory to playback from or capture to. |
| pub struct ServerRequest<'a> { |
| requested_frames: usize, |
| buffer_set: &'a mut dyn BufferSet, |
| } |
| |
| impl<'a> ServerRequest<'a> { |
| /// Create a new ServerRequest object |
| /// |
| /// Create a ServerRequest object representing a request from the server |
| /// for a buffer `requested_frames` in size. |
| /// |
| /// When the client responds to this request by calling |
| /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames), |
| /// BufferSet::callback will be called on `buffer_set`. |
| /// |
| /// # Arguments |
| /// * `requested_frames` - The requested buffer size in frames. |
| /// * `buffer_set` - The object implementing the callback for when a buffer is provided. |
| pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self { |
| Self { |
| requested_frames, |
| buffer_set, |
| } |
| } |
| |
| /// Get the number of frames of audio data requested by the server. |
| /// |
| /// The returned value should never be greater than the `buffer_size` |
| /// given in [`new_stream`](ShmStreamSource::new_stream). |
| pub fn requested_frames(&self) -> usize { |
| self.requested_frames |
| } |
| |
| /// Sets the buffer offset and length for the requested buffer. |
| /// |
| /// Sets the buffer offset and length of the buffer that fulfills this |
| /// server request to `offset` and `length`, respectively. This means that |
| /// `length` bytes of audio samples may be read from/written to that |
| /// location in `client_shm` for a playback/capture stream, respectively. |
| /// This function may only be called once for a `ServerRequest`, at which |
| /// point the ServerRequest is dropped and no further calls are possible. |
| /// |
| /// # Arguments |
| /// |
| /// * `offset` - The value to use as the new buffer offset for the next buffer. |
| /// * `frames` - The length of the next buffer in frames. |
| /// |
| /// # Errors |
| /// |
| /// * If `frames` is greater than `requested_frames`. |
| pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> { |
| if frames > self.requested_frames { |
| return Err(Box::new(Error::TooManyFrames( |
| frames, |
| self.requested_frames, |
| ))); |
| } |
| |
| self.buffer_set.callback(offset, frames) |
| } |
| |
| /// Ignore this request |
| /// |
| /// If the client does not intend to respond to this ServerRequest with a |
| /// buffer, they should call this function. The stream will be notified that |
| /// the request has been ignored and will handle it properly. |
| pub fn ignore_request(self) -> GenericResult<()> { |
| self.buffer_set.ignore() |
| } |
| } |
| |
| /// `ShmStream` allows a client to interact with an active CRAS stream. |
| pub trait ShmStream: Send { |
| /// Get the size of a frame of audio data for this stream. |
| fn frame_size(&self) -> usize; |
| |
| /// Get the number of channels of audio data for this stream. |
| fn num_channels(&self) -> usize; |
| |
| /// Get the frame rate of audio data for this stream. |
| fn frame_rate(&self) -> u32; |
| |
| /// Waits until the next server message indicating action is required. |
| /// |
| /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning |
| /// that we must set the buffer offset to the next location where playback |
| /// data can be found. |
| /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning |
| /// that we must set the buffer offset to the next location where captured |
| /// data can be written to. |
| /// Will return early if `timeout` elapses before a message is received. |
| /// |
| /// # Arguments |
| /// |
| /// * `timeout` - The amount of time to wait until a message is received. |
| /// |
| /// # Return value |
| /// |
| /// Returns `Some(request)` where `request` is an object that implements the |
| /// [`ServerRequest`](ServerRequest) trait and which can be used to get the |
| /// number of bytes requested for playback streams or that have already been |
| /// written to shm for capture streams. |
| /// |
| /// If the timeout occurs before a message is received, returns `None`. |
| /// |
| /// # Errors |
| /// |
| /// * If an invalid message type is received for the stream. |
| fn wait_for_next_action_with_timeout( |
| &mut self, |
| timeout: Duration, |
| ) -> GenericResult<Option<ServerRequest>>; |
| } |
| |
| /// `ShmStreamSource` creates streams for playback or capture of audio. |
| pub trait ShmStreamSource: Send { |
| /// Creates a new [`ShmStream`](ShmStream) |
| /// |
| /// Creates a new `ShmStream` object, which allows: |
| /// * Waiting until the server has communicated that data is ready or |
| /// requested that we make more data available. |
| /// * Setting the location and length of buffers for reading/writing audio data. |
| /// |
| /// # Arguments |
| /// |
| /// * `direction` - The direction of the stream, either `Playback` or `Capture`. |
| /// * `num_channels` - The number of audio channels for the stream. |
| /// * `format` - The audio format to use for audio samples. |
| /// * `frame_rate` - The stream's frame rate in Hz. |
| /// * `buffer_size` - The maximum size of an audio buffer. This will be the |
| /// size used for transfers of audio data between client |
| /// and server. |
| /// * `effects` - Audio effects to use for the stream, such as echo-cancellation. |
| /// * `client_shm` - The shared memory area that will contain samples. |
| /// * `buffer_offsets` - The two initial values to use as buffer offsets |
| /// for streams. This way, the server will not write |
| /// audio data to an arbitrary offset in `client_shm` |
| /// if the client fails to update offsets in time. |
| /// |
| /// # Errors |
| /// |
| /// * If sending the connect stream message to the server fails. |
| #[allow(clippy::too_many_arguments)] |
| fn new_stream( |
| &mut self, |
| direction: StreamDirection, |
| num_channels: usize, |
| format: SampleFormat, |
| frame_rate: u32, |
| buffer_size: usize, |
| effects: &[StreamEffect], |
| client_shm: &SharedMemory, |
| buffer_offsets: [u64; 2], |
| ) -> GenericResult<Box<dyn ShmStream>>; |
| |
| /// Get a list of file descriptors used by the implementation. |
| /// |
| /// Returns any open file descriptors needed by the implementation. |
| /// This list helps users of the ShmStreamSource enter Linux jails without |
| /// closing needed file descriptors. |
| fn keep_fds(&self) -> Vec<RawFd> { |
| Vec::new() |
| } |
| } |
| |
| /// Class that implements ShmStream trait but does nothing with the samples |
| pub struct NullShmStream { |
| num_channels: usize, |
| frame_rate: u32, |
| buffer_size: usize, |
| frame_size: usize, |
| interval: Duration, |
| next_frame: Duration, |
| start_time: Instant, |
| } |
| |
| impl NullShmStream { |
| /// Attempt to create a new NullShmStream with the given number of channels, |
| /// format, frame_rate, and buffer_size. |
| pub fn new( |
| buffer_size: usize, |
| num_channels: usize, |
| format: SampleFormat, |
| frame_rate: u32, |
| ) -> Self { |
| let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); |
| Self { |
| num_channels, |
| frame_rate, |
| buffer_size, |
| frame_size: format.sample_bytes() * num_channels, |
| interval, |
| next_frame: interval, |
| start_time: Instant::now(), |
| } |
| } |
| } |
| |
| impl BufferSet for NullShmStream { |
| fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { |
| Ok(()) |
| } |
| |
| fn ignore(&mut self) -> GenericResult<()> { |
| Ok(()) |
| } |
| } |
| |
| impl ShmStream for NullShmStream { |
| fn frame_size(&self) -> usize { |
| self.frame_size |
| } |
| |
| fn num_channels(&self) -> usize { |
| self.num_channels |
| } |
| |
| fn frame_rate(&self) -> u32 { |
| self.frame_rate |
| } |
| |
| fn wait_for_next_action_with_timeout( |
| &mut self, |
| timeout: Duration, |
| ) -> GenericResult<Option<ServerRequest>> { |
| let elapsed = self.start_time.elapsed(); |
| if elapsed < self.next_frame { |
| if timeout < self.next_frame - elapsed { |
| std::thread::sleep(timeout); |
| return Ok(None); |
| } else { |
| std::thread::sleep(self.next_frame - elapsed); |
| } |
| } |
| self.next_frame += self.interval; |
| Ok(Some(ServerRequest::new(self.buffer_size, self))) |
| } |
| } |
| |
| /// Source of `NullShmStream` objects. |
| #[derive(Default)] |
| pub struct NullShmStreamSource; |
| |
| impl NullShmStreamSource { |
| pub fn new() -> Self { |
| Self::default() |
| } |
| } |
| |
| impl ShmStreamSource for NullShmStreamSource { |
| fn new_stream( |
| &mut self, |
| _direction: StreamDirection, |
| num_channels: usize, |
| format: SampleFormat, |
| frame_rate: u32, |
| buffer_size: usize, |
| _effects: &[StreamEffect], |
| _client_shm: &SharedMemory, |
| _buffer_offsets: [u64; 2], |
| ) -> GenericResult<Box<dyn ShmStream>> { |
| let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate); |
| Ok(Box::new(new_stream)) |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct MockShmStream { |
| num_channels: usize, |
| frame_rate: u32, |
| request_size: usize, |
| frame_size: usize, |
| request_notifier: Arc<(Mutex<bool>, Condvar)>, |
| } |
| |
| impl MockShmStream { |
| /// Attempt to create a new MockShmStream with the given number of |
| /// channels, frame_rate, format, and buffer_size. |
| pub fn new( |
| num_channels: usize, |
| frame_rate: u32, |
| format: SampleFormat, |
| buffer_size: usize, |
| ) -> Self { |
| Self { |
| num_channels, |
| frame_rate, |
| request_size: buffer_size, |
| frame_size: format.sample_bytes() * num_channels, |
| request_notifier: Arc::new((Mutex::new(false), Condvar::new())), |
| } |
| } |
| |
| /// Call to request data from the stream, causing it to return from |
| /// `wait_for_next_action_with_timeout`. Will block until |
| /// `set_buffer_offset_and_frames` is called on the ServerRequest returned |
| /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses. |
| /// Returns true if a response was successfully received. |
| pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool { |
| let &(ref lock, ref cvar) = &*self.request_notifier; |
| let mut requested = lock.lock(); |
| *requested = true; |
| cvar.notify_one(); |
| let start_time = Instant::now(); |
| while *requested { |
| requested = cvar.wait_timeout(requested, timeout).0; |
| if start_time.elapsed() > timeout { |
| // We failed to get a callback in time, mark this as false. |
| *requested = false; |
| return false; |
| } |
| } |
| |
| true |
| } |
| |
| fn notify_request(&mut self) { |
| let &(ref lock, ref cvar) = &*self.request_notifier; |
| let mut requested = lock.lock(); |
| *requested = false; |
| cvar.notify_one(); |
| } |
| } |
| |
| impl BufferSet for MockShmStream { |
| fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { |
| self.notify_request(); |
| Ok(()) |
| } |
| |
| fn ignore(&mut self) -> GenericResult<()> { |
| self.notify_request(); |
| Ok(()) |
| } |
| } |
| |
| impl ShmStream for MockShmStream { |
| fn frame_size(&self) -> usize { |
| self.frame_size |
| } |
| |
| fn num_channels(&self) -> usize { |
| self.num_channels |
| } |
| |
| fn frame_rate(&self) -> u32 { |
| self.frame_rate |
| } |
| |
| fn wait_for_next_action_with_timeout( |
| &mut self, |
| timeout: Duration, |
| ) -> GenericResult<Option<ServerRequest>> { |
| { |
| let start_time = Instant::now(); |
| let &(ref lock, ref cvar) = &*self.request_notifier; |
| let mut requested = lock.lock(); |
| while !*requested { |
| requested = cvar.wait_timeout(requested, timeout).0; |
| if start_time.elapsed() > timeout { |
| return Ok(None); |
| } |
| } |
| } |
| |
| Ok(Some(ServerRequest::new(self.request_size, self))) |
| } |
| } |
| |
| /// Source of `MockShmStream` objects. |
| #[derive(Clone, Default)] |
| pub struct MockShmStreamSource { |
| last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>, |
| } |
| |
| impl MockShmStreamSource { |
| pub fn new() -> Self { |
| Default::default() |
| } |
| |
| /// Get the last stream that has been created from this source. If no stream |
| /// has been created, block until one has. |
| pub fn get_last_stream(&self) -> MockShmStream { |
| let &(ref last_stream, ref cvar) = &*self.last_stream; |
| let mut stream = last_stream.lock(); |
| loop { |
| match &*stream { |
| None => stream = cvar.wait(stream), |
| Some(ref s) => return s.clone(), |
| }; |
| } |
| } |
| } |
| |
| impl ShmStreamSource for MockShmStreamSource { |
| fn new_stream( |
| &mut self, |
| _direction: StreamDirection, |
| num_channels: usize, |
| format: SampleFormat, |
| frame_rate: u32, |
| buffer_size: usize, |
| _effects: &[StreamEffect], |
| _client_shm: &SharedMemory, |
| _buffer_offsets: [u64; 2], |
| ) -> GenericResult<Box<dyn ShmStream>> { |
| let &(ref last_stream, ref cvar) = &*self.last_stream; |
| let mut stream = last_stream.lock(); |
| |
| let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size); |
| *stream = Some(new_stream.clone()); |
| cvar.notify_one(); |
| Ok(Box::new(new_stream)) |
| } |
| } |
| |
| #[cfg(test)] |
| pub mod tests { |
| use super::*; |
| |
| #[test] |
| fn mock_trigger_callback() { |
| let stream_source = MockShmStreamSource::new(); |
| let mut thread_stream_source = stream_source.clone(); |
| |
| let buffer_size = 480; |
| let num_channels = 2; |
| let format = SampleFormat::S24LE; |
| let shm = SharedMemory::anon().expect("Failed to create shm"); |
| |
| let handle = std::thread::spawn(move || { |
| let mut stream = thread_stream_source |
| .new_stream( |
| StreamDirection::Playback, |
| num_channels, |
| format, |
| 44100, |
| buffer_size, |
| &[], |
| &shm, |
| [400, 8000], |
| ) |
| .expect("Failed to create stream"); |
| |
| let request = stream |
| .wait_for_next_action_with_timeout(Duration::from_secs(5)) |
| .expect("Failed to wait for next action"); |
| match request { |
| Some(r) => { |
| let requested = r.requested_frames(); |
| r.set_buffer_offset_and_frames(872, requested) |
| .expect("Failed to set buffer offset and frames"); |
| requested |
| } |
| None => 0, |
| } |
| }); |
| |
| let mut stream = stream_source.get_last_stream(); |
| assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1))); |
| |
| let requested_frames = handle.join().expect("Failed to join thread"); |
| assert_eq!(requested_frames, buffer_size); |
| } |
| |
| #[test] |
| fn null_consumption_rate() { |
| let frame_rate = 44100; |
| let buffer_size = 480; |
| let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); |
| |
| let shm = SharedMemory::anon().expect("Failed to create shm"); |
| |
| let start = Instant::now(); |
| |
| let mut stream_source = NullShmStreamSource::new(); |
| let mut stream = stream_source |
| .new_stream( |
| StreamDirection::Playback, |
| 2, |
| SampleFormat::S24LE, |
| frame_rate, |
| buffer_size, |
| &[], |
| &shm, |
| [400, 8000], |
| ) |
| .expect("Failed to create stream"); |
| |
| let timeout = Duration::from_secs(5); |
| let request = stream |
| .wait_for_next_action_with_timeout(timeout) |
| .expect("Failed to wait for first request") |
| .expect("First request should not have timed out"); |
| request |
| .set_buffer_offset_and_frames(276, 480) |
| .expect("Failed to set buffer offset and length"); |
| |
| // The second call should block until the first buffer is consumed. |
| let _request = stream |
| .wait_for_next_action_with_timeout(timeout) |
| .expect("Failed to wait for second request"); |
| let elapsed = start.elapsed(); |
| assert!( |
| elapsed > interval, |
| "wait_for_next_action_with_timeout didn't block long enough: {:?}", |
| elapsed |
| ); |
| |
| assert!( |
| elapsed < timeout, |
| "wait_for_next_action_with_timeout blocked for too long: {:?}", |
| elapsed |
| ); |
| } |
| } |