blob: 83b16fc0bf6fb8395285561f5373f44bf3afb680 [file] [log] [blame]
pub mod direction;
pub mod dqbuf;
pub mod dual_queue;
pub mod qbuf;
pub mod states;
use super::{AllocatedQueue, Device, Stream, TryDequeue};
use crate::ioctl;
use crate::memory::*;
use crate::{Format, PixelFormat, QueueType};
use direction::*;
use dqbuf::*;
use ioctl::{
DQBufError, DQBufResult, GFmtError, QueryBuffer, SFmtError, StreamOffError, StreamOnError,
TryFmtError,
};
use qbuf::*;
use qbuf::{
get_free::{GetFreeBuffer, GetFreeBufferError},
get_indexed::{GetBufferByIndex, TryGetBufferError},
};
use states::{BufferInfo, BufferState};
use std::os::unix::io::{AsRawFd, RawFd};
use std::{
cell::Cell,
sync::{Arc, Mutex, Weak},
};
use thiserror::Error;
/// Base values of a queue, that are always value no matter the state the queue
/// is in. This base object remains alive as long as the queue is borrowed from
/// the `Device`.
pub struct QueueBase {
/// Reference to the device, so `fd` is kept valid and to let us mark the
/// queue as free again upon destruction.
device: Arc<Device>,
type_: QueueType,
capabilities: ioctl::BufferCapabilities,
}
impl AsRawFd for QueueBase {
fn as_raw_fd(&self) -> RawFd {
self.device.as_raw_fd()
}
}
impl<'a> Drop for QueueBase {
/// Make the queue available again.
fn drop(&mut self) {
assert_eq!(
self.device.used_queues.lock().unwrap().remove(&self.type_),
true
);
}
}
/// Trait for the different states a queue can be in. This allows us to limit
/// the available queue methods to the one that make sense at a given point of
/// the queue's lifecycle.
pub trait QueueState {}
/// V4L2 queue object. Specialized according to its configuration state so that
/// only valid methods can be called from a given point.
pub struct Queue<D, S>
where
D: Direction,
S: QueueState,
{
inner: QueueBase,
_d: std::marker::PhantomData<D>,
state: S,
}
/// Methods of `Queue` that are available no matter the state.
impl<D, S> Queue<D, S>
where
D: Direction,
S: QueueState,
{
pub fn get_capabilities(&self) -> ioctl::BufferCapabilities {
self.inner.capabilities
}
pub fn get_type(&self) -> QueueType {
self.inner.type_
}
pub fn get_format(&self) -> Result<Format, GFmtError> {
ioctl::g_fmt(&self.inner, self.inner.type_)
}
/// This method can invalidate any current format iterator, hence it requires
/// the queue to be mutable. This way of doing is not perfect though, as setting
/// the format on one queue can change the options available on another.
pub fn set_format(&mut self, format: Format) -> Result<Format, SFmtError> {
let type_ = self.inner.type_;
ioctl::s_fmt(&mut self.inner, type_, format)
}
/// Performs exactly as `set_format`, but does not actually apply `format`.
/// Useful to check what modifications need to be done to a format before it
/// can be used.
pub fn try_format(&self, format: Format) -> Result<Format, TryFmtError> {
ioctl::try_fmt(&self.inner, self.inner.type_, format)
}
/// Returns a `FormatBuilder` which is set to the currently active format
/// and can be modified and eventually applied. The `FormatBuilder` holds
/// a mutable reference to this `Queue`.
pub fn change_format(&mut self) -> Result<FormatBuilder, GFmtError> {
FormatBuilder::new(&mut self.inner)
}
/// Returns an iterator over all the formats currently supported by this queue.
pub fn format_iter(&self) -> ioctl::FormatIterator<QueueBase> {
ioctl::FormatIterator::new(&self.inner, self.inner.type_)
}
}
/// Builder for a V4L2 format. This takes a mutable reference on the queue, so
/// it is supposed to be short-lived: get one, adjust the format, and apply.
pub struct FormatBuilder<'a> {
queue: &'a mut QueueBase,
format: Format,
}
impl<'a> FormatBuilder<'a> {
fn new(queue: &'a mut QueueBase) -> Result<Self, GFmtError> {
let format = ioctl::g_fmt(queue, queue.type_)?;
Ok(Self { queue, format })
}
/// Get a reference to the format built so far. Useful for checking the
/// currently set format after getting a builder, or the actual settings
/// that will be applied by the kernel after a `try_apply()`.
pub fn format(&self) -> &Format {
&self.format
}
pub fn set_size(mut self, width: usize, height: usize) -> Self {
self.format.width = width as u32;
self.format.height = height as u32;
self
}
pub fn set_pixelformat(mut self, pixel_format: impl Into<PixelFormat>) -> Self {
self.format.pixelformat = pixel_format.into();
self
}
/// Apply the format built so far. The kernel will adjust the format to fit
/// the driver's capabilities if needed, and the format actually applied will
/// be returned.
pub fn apply(self) -> Result<Format, SFmtError> {
ioctl::s_fmt(self.queue, self.queue.type_, self.format)
}
/// Try to apply the format built so far. The kernel will adjust the format
/// to fit the driver's capabilities if needed, so make sure to check important
/// parameters upon return.
///
/// Calling `apply()` right after this method is guaranteed to successfully
/// apply the format without further change.
pub fn try_apply(&mut self) -> Result<(), TryFmtError> {
let new_format = ioctl::try_fmt(self.queue, self.queue.type_, self.format.clone())?;
self.format = new_format;
Ok(())
}
}
/// Initial state of the queue when created. Streaming and queuing are not
/// supported since buffers have not been allocated yet.
/// Allocating buffers makes the queue switch to the `BuffersAllocated` state.
pub struct QueueInit;
impl QueueState for QueueInit {}
#[derive(Debug, Error)]
pub enum CreateQueueError {
#[error("Queue is already in use")]
AlreadyBorrowed,
#[error("Error while querying queue capabilities")]
ReqbufsError(#[from] ioctl::ReqbufsError),
}
#[derive(Debug, Error)]
pub enum RequestBuffersError {
#[error("Error while requesting buffers")]
ReqbufsError(#[from] ioctl::ReqbufsError),
#[error("Error while querying buffer")]
QueryBufferError(#[from] crate::Error),
}
impl<D: Direction> Queue<D, QueueInit> {
/// Create a queue for type `queue_type` on `device`. A queue of a specific type
/// can be requested only once.
///
/// Not all devices support all kinds of queue. To test whether the queue is supported,
/// a REQBUFS(0) is issued on the device. If it is not successful, the device is
/// deemed to not support this kind of queue and this method will fail.
fn create(
device: Arc<Device>,
queue_type: QueueType,
) -> Result<Queue<D, QueueInit>, CreateQueueError> {
let mut used_queues = device.used_queues.lock().unwrap();
if used_queues.contains(&queue_type) {
return Err(CreateQueueError::AlreadyBorrowed);
}
// Check that the queue is valid for this device by doing a dummy REQBUFS.
// Obtain its capacities while we are at it.
let capabilities: ioctl::BufferCapabilities =
ioctl::reqbufs(&*device, queue_type, MemoryType::MMAP, 0)?;
used_queues.insert(queue_type);
drop(used_queues);
Ok(Queue::<D, QueueInit> {
inner: QueueBase {
device,
type_: queue_type,
capabilities,
},
_d: std::marker::PhantomData,
state: QueueInit {},
})
}
/// Allocate `count` buffers for this queue and make it transition to the
/// `BuffersAllocated` state.
pub fn request_buffers<M: Memory>(
self,
count: u32,
) -> Result<Queue<D, BuffersAllocated<M>>, RequestBuffersError> {
let type_ = self.inner.type_;
let num_buffers: usize =
ioctl::reqbufs(&self.inner, type_, M::HandleType::MEMORY_TYPE, count)?;
// The buffers have been allocated, now let's get their features.
// We cannot use functional programming here because we need to return
// the error from ioctl::querybuf(), if any.
let mut buffer_features = Vec::new();
for i in 0..num_buffers {
buffer_features.push(ioctl::querybuf(&self.inner, self.inner.type_, i)?);
}
let buffer_info = buffer_features
.into_iter()
.map(|features: QueryBuffer| BufferInfo {
state: Arc::new(Mutex::new(BufferState::Free)),
features: Arc::new(features),
})
.collect();
Ok(Queue {
inner: self.inner,
_d: std::marker::PhantomData,
state: BuffersAllocated {
num_queued_buffers: Default::default(),
buffer_info,
},
})
}
}
impl Queue<Output, QueueInit> {
/// Acquires the OUTPUT queue from `device`.
///
/// This method will fail if the queue has already been obtained and has not
/// yet been released.
pub fn get_output_queue(device: Arc<Device>) -> Result<Self, CreateQueueError> {
Queue::<Output, QueueInit>::create(device, QueueType::VideoOutput)
}
/// Acquires the OUTPUT_MPLANE queue from `device`.
///
/// This method will fail if the queue has already been obtained and has not
/// yet been released.
pub fn get_output_mplane_queue(device: Arc<Device>) -> Result<Self, CreateQueueError> {
Queue::<Output, QueueInit>::create(device, QueueType::VideoOutputMplane)
}
}
impl Queue<Capture, QueueInit> {
/// Acquires the CAPTURE queue from `device`.
///
/// This method will fail if the queue has already been obtained and has not
/// yet been released.
pub fn get_capture_queue(device: Arc<Device>) -> Result<Self, CreateQueueError> {
Queue::<Capture, QueueInit>::create(device, QueueType::VideoCapture)
}
/// Acquires the CAPTURE_MPLANE queue from `device`.
///
/// This method will fail if the queue has already been obtained and has not
/// yet been released.
pub fn get_capture_mplane_queue(device: Arc<Device>) -> Result<Self, CreateQueueError> {
Queue::<Capture, QueueInit>::create(device, QueueType::VideoCaptureMplane)
}
}
/// Allocated state for a queue. A queue with its buffers allocated can be
/// streamed on and off, and buffers can be queued and dequeued.
pub struct BuffersAllocated<M: Memory> {
num_queued_buffers: Cell<usize>,
buffer_info: Vec<BufferInfo<M>>,
}
impl<M: Memory> QueueState for BuffersAllocated<M> {}
impl<'a, D: Direction, M: Memory> AllocatedQueue<'a, D> for Queue<D, BuffersAllocated<M>> {
fn num_buffers(&self) -> usize {
self.state.buffer_info.len()
}
fn num_queued_buffers(&self) -> usize {
self.state.num_queued_buffers.get()
}
fn free_buffers(self) -> Result<Queue<D, QueueInit>, ioctl::ReqbufsError> {
let type_ = self.inner.type_;
ioctl::reqbufs(&self.inner, type_, M::HandleType::MEMORY_TYPE, 0)?;
Ok(Queue {
inner: self.inner,
_d: std::marker::PhantomData,
state: QueueInit {},
})
}
}
/// Represents a queued buffer which has not been processed due to `streamoff`
/// being called on a queue.
pub struct CanceledBuffer<M: Memory> {
/// Index of the buffer,
pub index: u32,
/// Plane handles that were passed when the buffer has been queued.
pub plane_handles: PlaneHandles<M>,
}
impl<D: Direction, M: Memory> Stream for Queue<D, BuffersAllocated<M>> {
type Canceled = CanceledBuffer<M>;
fn stream_on(&self) -> Result<(), StreamOnError> {
let type_ = self.inner.type_;
ioctl::streamon(&self.inner, type_)
}
fn stream_off(&self) -> Result<Vec<Self::Canceled>, StreamOffError> {
let type_ = self.inner.type_;
ioctl::streamoff(&self.inner, type_)?;
let canceled_buffers: Vec<_> = self
.state
.buffer_info
.iter()
.filter_map(|buffer_info| {
let buffer_index = buffer_info.features.index;
let mut state = buffer_info.state.lock().unwrap();
// Filter entries not in queued state.
match *state {
BufferState::Queued(_) => (),
_ => return None,
};
// Set entry to Free state and steal its handles.
let old_state = std::mem::replace(&mut (*state), BufferState::Free);
Some(CanceledBuffer::<M> {
index: buffer_index as u32,
plane_handles: match old_state {
// We have already tested for this state above, so this
// branch is guaranteed.
BufferState::Queued(plane_handles) => plane_handles,
_ => unreachable!("Inconsistent buffer state!"),
},
})
})
.collect();
let num_queued_buffers = self.state.num_queued_buffers.take();
self.state
.num_queued_buffers
.set(num_queued_buffers - canceled_buffers.len());
Ok(canceled_buffers)
}
}
impl<D: Direction, M: Memory> TryDequeue for Queue<D, BuffersAllocated<M>> {
type Dequeued = DQBuffer<D, M>;
fn try_dequeue(&self) -> DQBufResult<Self::Dequeued> {
let dqbuf: ioctl::DQBuffer;
let mut error_flag_set = false;
dqbuf = match ioctl::dqbuf(&self.inner, self.inner.type_) {
Ok(dqbuf) => dqbuf,
Err(DQBufError::CorruptedBuffer(dqbuf)) => {
error_flag_set = true;
dqbuf
}
Err(DQBufError::EOS) => return Err(DQBufError::EOS),
Err(DQBufError::NotReady) => return Err(DQBufError::NotReady),
Err(DQBufError::IoctlError(e)) => return Err(DQBufError::IoctlError(e)),
};
let id = dqbuf.index as usize;
let buffer_info = self
.state
.buffer_info
.get(id)
.expect("Inconsistent buffer state!");
let mut buffer_state = buffer_info.state.lock().unwrap();
// The buffer will remain Dequeued until our reference to it is destroyed.
let state = std::mem::replace(&mut (*buffer_state), BufferState::Dequeued);
let plane_handles = match state {
BufferState::Queued(plane_handles) => plane_handles,
_ => unreachable!("Inconsistent buffer state"),
};
let fuse = BufferStateFuse::new(Arc::downgrade(&buffer_info.state));
let num_queued_buffers = self.state.num_queued_buffers.take();
self.state.num_queued_buffers.set(num_queued_buffers - 1);
let dqbuffer = DQBuffer::new(self, &buffer_info.features, plane_handles, dqbuf, fuse);
if error_flag_set {
Err(DQBufError::CorruptedBuffer(dqbuffer))
} else {
Ok(dqbuffer)
}
}
}
impl<'a, D: Direction, M: Memory> GetBufferByIndex<'a> for Queue<D, BuffersAllocated<M>> {
type Queueable = QBuffer<'a, D, M>;
// Take buffer `id` in order to prepare it for queueing, provided it is available.
fn try_get_buffer(&'a self, index: usize) -> Result<Self::Queueable, TryGetBufferError> {
let buffer_info = self
.state
.buffer_info
.get(index)
.ok_or(TryGetBufferError::InvalidIndex(index))?;
let mut buffer_state = buffer_info.state.lock().unwrap();
match *buffer_state {
BufferState::Free => (),
_ => return Err(TryGetBufferError::AlreadyUsed),
};
// The buffer will remain in PreQueue state until it is queued
// or the reference to it is lost.
*buffer_state = BufferState::PreQueue;
drop(buffer_state);
Ok(QBuffer::new(self, buffer_info))
}
}
impl<'a, D: Direction, M: Memory> GetFreeBuffer<'a> for Queue<D, BuffersAllocated<M>> {
type Queueable = QBuffer<'a, D, M>;
fn try_get_free_buffer(&'a self) -> Result<Self::Queueable, GetFreeBufferError> {
let res = self
.state
.buffer_info
.iter()
.enumerate()
.find(|(_, s)| matches!(*s.state.lock().unwrap(), BufferState::Free));
match res {
None => Err(GetFreeBufferError::NoFreeBuffer),
Some((i, _)) => Ok(self.try_get_buffer(i).unwrap()),
}
}
}
/// A fuse that will return the buffer to the Free state when destroyed, unless
/// it has been disarmed.
struct BufferStateFuse<M: Memory> {
buffer_state: Weak<Mutex<BufferState<M>>>,
}
impl<M: Memory> BufferStateFuse<M> {
/// Create a new fuse that will set `state` to `BufferState::Free` if
/// destroyed before `disarm()` has been called.
fn new(buffer_state: Weak<Mutex<BufferState<M>>>) -> Self {
BufferStateFuse { buffer_state }
}
/// Disarm this fuse, e.g. the monitored state will be left untouched when
/// the fuse is destroyed.
fn disarm(&mut self) {
// Drop our weak reference.
self.buffer_state = Weak::new();
}
/// Trigger the fuse, i.e. make the buffer return to the Free state, unless
/// the fuse has been `disarm`ed. This method should only be called when
/// the buffer is being dropped, otherwise inconsistent state may ensue.
/// The fuse will be disarmed after this call.
fn trigger(&mut self) {
match self.buffer_state.upgrade() {
None => (),
Some(buffer_state_locked) => {
let mut buffer_state = buffer_state_locked.lock().unwrap();
*buffer_state = BufferState::Free;
self.buffer_state = Weak::new();
}
};
}
}
impl<M: Memory> Drop for BufferStateFuse<M> {
fn drop(&mut self) {
self.trigger();
}
}