device: add buffer allocation traits
Add traits to obtain a buffer by index or free state.
diff --git a/examples/vicodec_pipeline/encoder.rs b/examples/vicodec_pipeline/encoder.rs
index 9546419..496be42 100644
--- a/examples/vicodec_pipeline/encoder.rs
+++ b/examples/vicodec_pipeline/encoder.rs
@@ -1,6 +1,8 @@
use v4l2::device::queue::{
- direction, dqbuf, qbuf::QBuffer, BuffersAllocated, CreateQueueError, FormatBuilder, Queue,
- QueueInit, RequestBuffersError,
+ direction, dqbuf,
+ qbuf::get_free::{GetFreeBuffer, GetFreeBufferError},
+ qbuf::QBuffer,
+ BuffersAllocated, CreateQueueError, FormatBuilder, Queue, QueueInit, RequestBuffersError,
};
use v4l2::device::{Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue};
use v4l2::ioctl::{BufferFlags, DQBufError, EncoderCommand, FormatFlags, GFmtError};
@@ -271,8 +273,8 @@
DequeueError(#[from] DequeueOutputBufferError),
#[error("Error during poll")]
PollError(#[from] io::Error),
- #[error("No buffer currently available")]
- NoBufferAvailable,
+ #[error("Error while obtaining buffer")]
+ GetFreeBufferError(#[from] GetFreeBufferError),
}
impl<InputDoneCb, OutputReadyCb> Encoder<Encoding<InputDoneCb, OutputReadyCb>>
@@ -364,10 +366,7 @@
#[allow(dead_code)]
pub fn try_get_buffer(&self) -> Result<OutputBuffer, GetBufferError> {
self.dequeue_output_buffers()?;
- self.state
- .output_queue
- .get_free_buffer()
- .ok_or(GetBufferError::NoBufferAvailable)
+ Ok(self.state.output_queue.try_get_free_buffer()?)
}
/// Returns a V4L2 buffer to be filled with a frame to encode, waiting for
@@ -384,10 +383,7 @@
self.wait_for_output_buffer()?;
}
- self.state
- .output_queue
- .get_free_buffer()
- .ok_or(GetBufferError::NoBufferAvailable)
+ Ok(self.state.output_queue.try_get_free_buffer()?)
}
}
@@ -533,7 +529,7 @@
}
fn enqueue_capture_buffers(&mut self) {
- while let Some(buffer) = self.capture_queue.get_free_buffer() {
+ while let Ok(buffer) = self.capture_queue.try_get_free_buffer() {
buffer.auto_queue().unwrap();
}
}
diff --git a/examples/vicodec_test/device_api.rs b/examples/vicodec_test/device_api.rs
index d51e177..4a5be3a 100644
--- a/examples/vicodec_test/device_api.rs
+++ b/examples/vicodec_test/device_api.rs
@@ -5,6 +5,7 @@
use std::sync::Arc;
use std::time::Instant;
+use qbuf::get_free::GetFreeBuffer;
use v4l2::device::queue::*;
use v4l2::device::*;
use v4l2::memory::{UserPtr, MMAP};
@@ -154,7 +155,7 @@
// There is no information to set on MMAP capture buffers: just queue
// them as soon as we get them.
capture_queue
- .get_free_buffer()
+ .try_get_free_buffer()
.expect("Failed to obtain capture buffer")
.auto_queue()
.expect("Failed to queue capture buffer");
@@ -165,7 +166,7 @@
// with it.
let bytes_used = output_buffer_data.len();
output_queue
- .get_free_buffer()
+ .try_get_free_buffer()
.expect("Failed to obtain output buffer")
.add_plane(output_buffer_data, bytes_used)
.queue()
diff --git a/src/device/queue.rs b/src/device/queue.rs
index b81dee7..eed9fb8 100644
--- a/src/device/queue.rs
+++ b/src/device/queue.rs
@@ -13,8 +13,11 @@
DQBufError, GFmtError, QueryBuffer, SFmtError, StreamOffError, StreamOnError, TryFmtError,
};
use qbuf::*;
-use states::BufferState;
-use states::*;
+use qbuf::{
+ get_free::{GetFreeBuffer, GetFreeBufferError},
+ get_indexed::{GetBufferByIndex, TryGetBufferError},
+};
+use states::{BufferInfo, BufferState};
use std::os::unix::io::{AsRawFd, RawFd};
use std::{
cell::Cell,
@@ -254,7 +257,6 @@
_d: std::marker::PhantomData,
state: BuffersAllocated {
num_queued_buffers: Default::default(),
- allocator: Arc::new(FifoBufferAllocator::new(num_buffers)),
buffer_info,
},
})
@@ -301,19 +303,10 @@
/// streamed on and off, and buffers can be queued and dequeued.
pub struct BuffersAllocated<M: Memory> {
num_queued_buffers: Cell<usize>,
- allocator: Arc<dyn BufferAllocator>,
buffer_info: Vec<BufferInfo<M>>,
}
impl<M: Memory> QueueState for BuffersAllocated<M> {}
-#[derive(Debug, Error)]
-pub enum GetBufferError {
- #[error("Buffer with provided index {0} does not exist")]
- InvalidIndex(usize),
- #[error("Buffer is already in use")]
- AlreadyUsed,
-}
-
impl<D: Direction, M: Memory> Queue<D, BuffersAllocated<M>> {
/// Returns the total number of buffers allocated for this queue.
pub fn num_buffers(&self) -> usize {
@@ -337,40 +330,6 @@
state: QueueInit {},
})
}
-
- // Take buffer `id` in order to prepare it for queueing, provided it is available.
- pub fn get_buffer(&self, index: usize) -> Result<QBuffer<D, M>, GetBufferError> {
- let buffer_info = self
- .state
- .buffer_info
- .get(index)
- .ok_or(GetBufferError::InvalidIndex(index))?;
-
- let mut buffer_state = buffer_info.state.lock().unwrap();
-
- match *buffer_state {
- BufferState::Free => (),
- _ => return Err(GetBufferError::AlreadyUsed),
- };
-
- // The buffer will remain in PreQueue state until it is queued
- // or the reference to it is lost.
- *buffer_state = BufferState::PreQueue;
-
- self.state.allocator.take_buffer(index);
- drop(buffer_state);
-
- Ok(QBuffer::new(self, buffer_info))
- }
-
- pub fn get_free_buffer(&self) -> Option<QBuffer<D, M>> {
- let index = match self.state.allocator.get_free_buffer() {
- Some(index) => index,
- None => return None,
- };
-
- self.get_buffer(index).ok()
- }
}
/// Represents a queued buffer which has not been processed due to `streamoff`
@@ -409,7 +368,6 @@
// Set entry to Free state and steal its handles.
let old_state = std::mem::replace(&mut (*state), BufferState::Free);
- self.state.allocator.return_buffer(buffer_index);
Some(CanceledBuffer::<M> {
index: buffer_index as u32,
@@ -470,16 +428,7 @@
let num_queued_buffers = self.state.num_queued_buffers.take();
self.state.num_queued_buffers.set(num_queued_buffers - 1);
- let mut dqbuffer = DQBuffer::new(self, &buffer_info.features, plane_handles, dqbuf, fuse);
-
- // Release callback for allocator: if we still exist, make the buffer
- // available to dequeue again.
- let allocator_for_cb = Arc::downgrade(&self.state.allocator);
- dqbuffer.add_drop_callback(move |dqbuf| {
- if let Some(allocator) = allocator_for_cb.upgrade() {
- allocator.return_buffer(dqbuf.data.index as usize);
- }
- });
+ let dqbuffer = DQBuffer::new(self, &buffer_info.features, plane_handles, dqbuf, fuse);
if error_flag_set {
Err(DQBufError::CorruptedBuffer(dqbuffer))
@@ -489,6 +438,50 @@
}
}
+impl<'a, D: Direction, M: Memory> GetBufferByIndex<'a> for Queue<D, BuffersAllocated<M>> {
+ type Queueable = QBuffer<'a, D, M>;
+
+ // Take buffer `id` in order to prepare it for queueing, provided it is available.
+ fn try_get_buffer(&'a self, index: usize) -> Result<Self::Queueable, TryGetBufferError> {
+ let buffer_info = self
+ .state
+ .buffer_info
+ .get(index)
+ .ok_or(TryGetBufferError::InvalidIndex(index))?;
+
+ let mut buffer_state = buffer_info.state.lock().unwrap();
+ match *buffer_state {
+ BufferState::Free => (),
+ _ => return Err(TryGetBufferError::AlreadyUsed),
+ };
+
+ // The buffer will remain in PreQueue state until it is queued
+ // or the reference to it is lost.
+ *buffer_state = BufferState::PreQueue;
+ drop(buffer_state);
+
+ Ok(QBuffer::new(self, buffer_info))
+ }
+}
+
+impl<'a, D: Direction, M: Memory> GetFreeBuffer<'a> for Queue<D, BuffersAllocated<M>> {
+ type Queueable = QBuffer<'a, D, M>;
+
+ fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetFreeBufferError> {
+ let res = self.state.buffer_info.iter().enumerate().find(|(_, s)| {
+ match *s.state.lock().unwrap() {
+ BufferState::Free => true,
+ _ => false,
+ }
+ });
+
+ match res {
+ None => Err(GetFreeBufferError::NoFreeBuffer),
+ Some((i, _)) => Ok(self.try_get_buffer(i).unwrap()),
+ }
+ }
+}
+
/// A fuse that will return the buffer to the Free state when destroyed, unless
/// it has been disarmed.
struct BufferStateFuse<M: Memory> {
diff --git a/src/device/queue/qbuf.rs b/src/device/queue/qbuf.rs
index 90f4ced..464be3b 100644
--- a/src/device/queue/qbuf.rs
+++ b/src/device/queue/qbuf.rs
@@ -12,6 +12,9 @@
use ioctl::{PlaneMapping, QBufError};
use thiserror::Error;
+pub mod get_free;
+pub mod get_indexed;
+
/// Error that can occur when queuing a buffer. It wraps a regular error and also
/// returns the plane handles back to the user.
#[derive(Error)]
diff --git a/src/device/queue/qbuf/get_free.rs b/src/device/queue/qbuf/get_free.rs
new file mode 100644
index 0000000..09062ea
--- /dev/null
+++ b/src/device/queue/qbuf/get_free.rs
@@ -0,0 +1,16 @@
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum GetFreeBufferError {
+ #[error("All buffers are currently being used")]
+ NoFreeBuffer,
+}
+
+/// Trait for buffers providers with their own allocation policy. Users of this
+/// interface leave the choice of which buffer to return to the implementor,
+/// which must define its own allocation policy.
+pub trait GetFreeBuffer<'a> {
+ type Queueable;
+
+ fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetFreeBufferError>;
+}
diff --git a/src/device/queue/qbuf/get_indexed.rs b/src/device/queue/qbuf/get_indexed.rs
new file mode 100644
index 0000000..b2f673b
--- /dev/null
+++ b/src/device/queue/qbuf/get_indexed.rs
@@ -0,0 +1,19 @@
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum TryGetBufferError {
+ #[error("Buffer with provided index {0} does not exist")]
+ InvalidIndex(usize),
+ #[error("Buffer is already in use")]
+ AlreadyUsed,
+}
+
+/// A trait for trying to obtain a queueable, writable buffer from its index.
+///
+/// Returns the buffer with specified `index`, provided that this buffer is
+/// currently available for use.
+pub trait GetBufferByIndex<'a> {
+ type Queueable;
+
+ fn try_get_buffer(&'a self, index: usize) -> Result<Self::Queueable, TryGetBufferError>;
+}
diff --git a/src/device/queue/states.rs b/src/device/queue/states.rs
index b76eb4f..531f004 100644
--- a/src/device/queue/states.rs
+++ b/src/device/queue/states.rs
@@ -1,42 +1,9 @@
use super::PlaneHandles;
use crate::ioctl;
use crate::memory::Memory;
-use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
-pub(super) trait BufferAllocator: Send + Sync {
- fn get_free_buffer(&self) -> Option<usize>;
- fn take_buffer(&self, index: usize);
- fn return_buffer(&self, index: usize);
-}
-
-pub(super) struct FifoBufferAllocator {
- queue: Mutex<VecDeque<usize>>,
-}
-
-impl FifoBufferAllocator {
- pub(super) fn new(nb_buffers: usize) -> Self {
- FifoBufferAllocator {
- queue: Mutex::new((0..nb_buffers).collect()),
- }
- }
-}
-
-impl BufferAllocator for FifoBufferAllocator {
- fn get_free_buffer(&self) -> Option<usize> {
- self.queue.lock().unwrap().front().copied()
- }
-
- fn take_buffer(&self, index: usize) {
- self.queue.lock().unwrap().retain(|i| *i != index);
- }
-
- fn return_buffer(&self, index: usize) {
- self.queue.lock().unwrap().push_back(index);
- }
-}
-
/// Represents the current state of an allocated buffer.
pub(super) enum BufferState<M: Memory> {
/// The buffer can be obtained via `get_buffer()` and be queued.