blob: fc13738562d3799c5dcaa48b2894237854fa92dd [file]
//
// Copyright (C) 2025 The Android Open-Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::{anyhow, bail, Context, Result};
use log::{error, info};
use looper_bindgen::{
ALooper, ALooper_addFd, ALooper_callbackFunc, ALooper_pollOnce, ALooper_prepare,
ALooper_removeFd, ALOOPER_EVENT_INPUT, ALOOPER_POLL_CALLBACK, ALOOPER_POLL_ERROR,
};
use std::{
ffi::{c_int, c_void},
os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
sync::mpsc::{self, channel, TryRecvError},
thread,
};
const ALOOPER_CALLBACK_FUNC_RETURN_VALUE_CONTINUE: c_int = 1;
macro_rules! retry_eintr {
($libc_call:expr) => {
loop {
match $libc_call {
-1 => {
let e = std::io::Error::last_os_error();
match e.raw_os_error() {
Some(libc::EINTR) => continue,
_ => break Err(e),
}
}
result => {
break Ok(result);
}
}
}
};
}
/// A struct used to send tasks to `Handler`.
pub struct Sender<T: Send> {
tx: mpsc::Sender<T>,
waker_fd: OwnedFd,
}
impl<T: Send> Sender<T> {
/// Send a task to the associated `Handler`.
pub fn send(&self, task: T) -> Result<()> {
self.tx.send(task).map_err(|_| anyhow!("Failed to send the task"))?;
self.wake()
}
fn wake(&self) -> Result<()> {
let res = retry_eintr!(
// SAFETY: `self.waker_fd` is a valid eventfd.
unsafe { libc::eventfd_write(self.waker_fd.as_raw_fd(), 1) }
);
if let Err(e) = res {
bail!("Failed to write to the waker fd: {}", e);
}
Ok(())
}
}
/// A trait defining expected behavior of callback functions for `Handler`.
pub trait HandlerCallback<T: Send> {
/// Handle a task.
/// This function is called on the same thread that created the `Handler` owning the callback.
/// If this function returns Err, the handler is deactivated and this function will never be
/// called anymore even if there is a sent task.
fn handle_task(&mut self, task: T) -> Result<()>;
}
struct HandlerInner<T: Send, C: HandlerCallback<T>> {
callback: C,
event_fd: OwnedFd,
tx: mpsc::Sender<T>,
rx: mpsc::Receiver<T>,
}
impl<T: Send, C: HandlerCallback<T>> HandlerInner<T, C> {
fn handle_tasks(&mut self) -> Result<()> {
loop {
let req = self.rx.try_recv();
match req {
Ok(req) => self.callback.handle_task(req)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => bail!("mpsc disconnected"),
}
}
}
}
/// A struct representing a task handler.
pub struct Handler<T: Send, C: HandlerCallback<T>> {
// This makes Handler !Send.
looper: *mut ALooper,
// Wrap members used for task handling with Box to ensure they are alive during the handler is
// registered to the looper.
inner: Box<HandlerInner<T, C>>,
}
impl<T: Send, C: HandlerCallback<T>> Handler<T, C> {
pub fn new_on_current_thread(callback: C) -> Result<Self> {
// SAFETY: 0 is a valid argument.
let looper = unsafe { ALooper_prepare(0) };
assert!(!looper.is_null());
// SAFETY: Passing valid arguments.
let fd: RawFd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
if fd == -1 {
bail!("Failed to create an eventfd");
}
// SAFETY: `fd` is a valid owned fd.
let event_fd = unsafe { OwnedFd::from_raw_fd(fd) };
let (tx, rx) = channel::<T>();
let mut inner = Box::new(HandlerInner { callback, event_fd, tx, rx });
let inner_ptr = &mut *inner as *mut HandlerInner<T, C> as *mut c_void;
let handler = Self { looper, inner };
// SAFETY: `inner_ptr` outlives the duration `poll_callback` is registered.
unsafe {
handler.add_fd(
handler.inner.event_fd.as_raw_fd(),
ALOOPER_POLL_CALLBACK,
ALOOPER_EVENT_INPUT as c_int,
Some(Self::poll_callback),
inner_ptr,
)
}
.context("Failed to add the waker fd")?;
info!("A handler is activated on the thread {:?}", thread::current().id());
Ok(handler)
}
pub fn get_sender(&self) -> Result<Sender<T>> {
let tx = self.inner.tx.clone();
let waker_fd = self.inner.event_fd.try_clone().context("Failed to clone the eventfd")?;
Ok(Sender::<T> { tx, waker_fd })
}
/// # Safety
///
/// Users must ensure the safety requirements for the callback function to be registered are
/// met while it's registered.
unsafe fn add_fd(
&self,
fd: RawFd,
ident: c_int,
events: c_int,
callback: ALooper_callbackFunc,
data: *mut c_void,
) -> Result<()> {
// SAFETY: `self.looper` is a valid ALooper pointer.
let ret = unsafe { ALooper_addFd(self.looper, fd, ident, events, callback, data) };
if ret == -1 {
bail!("ALooper_addFd failed");
}
Ok(())
}
fn remove_fd(&self, fd: RawFd) -> Result<()> {
// SAFETY: `self.looper` is a valid ALooper pointer.
let ret = unsafe { ALooper_removeFd(self.looper, fd) };
match ret {
1 => Ok(()),
0 => bail!("The fd hasn't been added"),
_ => bail!("ALooper_removeFd failed"),
}
}
/// This function is supposed to be used as a callback function for `ALooper_addFd`.
/// There's no easy way to tell the caller of `ALooper_pollOnce` that an error occurred, so
/// this function will panic instead of silently unregistering itself from the looper in such
/// cases.
///
/// # Safety
///
/// Users must ensure that the associated `data` is a valid pointer to an HandlerInner
/// instance while this callback is registered.
unsafe extern "C" fn poll_callback(fd: RawFd, _events: c_int, data: *mut c_void) -> c_int {
let inner_ptr = data as *mut HandlerInner<T, C>;
// SAFETY: `inner_ptr` is a valid HandlerInner pointer.
let inner = unsafe { inner_ptr.as_mut() }.unwrap();
assert_eq!(fd, inner.event_fd.as_raw_fd());
let mut val = std::mem::MaybeUninit::<libc::eventfd_t>::uninit();
let res = retry_eintr!(
// SAFETY: `inner.event_fd` is a valid eventfd and `val` is properly allocated.
unsafe { libc::eventfd_read(inner.event_fd.as_raw_fd(), val.as_mut_ptr()) }
);
if let Err(e) = res {
panic!("Failed to read from the event fd: {e}");
}
let res = inner.handle_tasks();
if let Err(e) = res {
panic!("Failed to handle a task: {e}");
}
ALOOPER_CALLBACK_FUNC_RETURN_VALUE_CONTINUE
}
}
impl<T: Send, C: HandlerCallback<T>> Drop for Handler<T, C> {
fn drop(&mut self) {
if self.remove_fd(self.inner.event_fd.as_raw_fd()).is_err() {
error!("Failed to remove the event fd");
}
}
}
/// Run the server loop on this thread.
pub fn run_thread_loop_once() -> Result<()> {
// SAFETY: `ALooper_pollOnce` accepts the null pointer for `outFd`, `outEvents` and `outData`.
let ret = unsafe {
ALooper_pollOnce(-1, std::ptr::null_mut(), std::ptr::null_mut(), std::ptr::null_mut())
};
if ret == ALOOPER_POLL_ERROR {
bail!("ALooper_pollOnce failed");
}
Ok(())
}
/// Run the server loop on this thread. This function will never return until an error occurs.
pub fn run_thread_loop() -> Result<()> {
loop {
run_thread_loop_once()?;
}
}