| /* |
| * Copyright (C) 2021 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| pub mod uci_hmsgs; |
| pub mod uci_hrcv; |
| |
| use crate::adaptation::{UwbAdaptation, UwbAdaptationImpl}; |
| use crate::error::UwbErr; |
| use crate::event_manager::{EventManager, EventManagerImpl}; |
| use crate::uci::uci_hrcv::UciResponse; |
| use android_hardware_uwb::aidl::android::hardware::uwb::{ |
| UwbEvent::UwbEvent, UwbStatus::UwbStatus, |
| }; |
| use log::{debug, error, info, warn}; |
| use num_traits::ToPrimitive; |
| use std::future::Future; |
| use std::option::Option; |
| use std::sync::Arc; |
| use tokio::runtime::{Builder, Runtime}; |
| use tokio::sync::{mpsc, oneshot, Notify}; |
| use tokio::{select, task}; |
| use uwb_uci_packets::{ |
| GetDeviceInfoCmdBuilder, GetDeviceInfoRspPacket, Packet, RangeStartCmdBuilder, |
| RangeStopCmdBuilder, SessionDeinitCmdBuilder, SessionGetAppConfigCmdBuilder, |
| SessionGetCountCmdBuilder, SessionGetStateCmdBuilder, SessionState, SessionStatusNtfPacket, |
| StatusCode, |
| }; |
| |
| pub type Result<T> = std::result::Result<T, UwbErr>; |
| pub type UciResponseHandle = oneshot::Sender<UciResponse>; |
| type SyncUwbAdaptation = Box<dyn UwbAdaptation + std::marker::Send + std::marker::Sync>; |
| |
| // Commands sent from JNI. |
| #[derive(Debug)] |
| pub enum JNICommand { |
| // Blocking UCI commands |
| UciGetDeviceInfo, |
| UciSessionInit(u32, u8), |
| UciSessionDeinit(u32), |
| UciSessionGetCount, |
| UciStartRange(u32), |
| UciStopRange(u32), |
| UciGetSessionState(u32), |
| UciSessionUpdateMulticastList { |
| session_id: u32, |
| action: u8, |
| no_of_controlee: u8, |
| address_list: Vec<u8>, |
| sub_session_id_list: Vec<i32>, |
| }, |
| UciSetCountryCode { |
| code: Vec<u8>, |
| }, |
| UciSetAppConfig { |
| session_id: u32, |
| no_of_params: u32, |
| app_config_param_len: u32, |
| app_configs: Vec<u8>, |
| }, |
| UciGetAppConfig { |
| session_id: u32, |
| no_of_params: u32, |
| app_config_param_len: u32, |
| app_configs: Vec<u8>, |
| }, |
| UciRawVendorCmd { |
| gid: u32, |
| oid: u32, |
| payload: Vec<u8>, |
| }, |
| |
| // Non blocking commands |
| Enable, |
| Disable(bool), |
| Exit, |
| } |
| |
| // Responses from the HAL. |
| #[derive(Debug)] |
| pub enum HalCallback { |
| Event { event: UwbEvent, event_status: UwbStatus }, |
| UciRsp(uci_hrcv::UciResponse), |
| UciNtf(uci_hrcv::UciNotification), |
| } |
| |
| #[derive(Debug, PartialEq)] |
| pub enum UwbState { |
| None, |
| W4HalOpen, |
| Ready, |
| W4UciResp, |
| W4HalClose, |
| } |
| |
| #[derive(Clone)] |
| struct Retryer { |
| received: Arc<Notify>, |
| failed: Arc<Notify>, |
| retry: Arc<Notify>, |
| } |
| |
| impl Retryer { |
| fn new() -> Self { |
| Self { |
| received: Arc::new(Notify::new()), |
| failed: Arc::new(Notify::new()), |
| retry: Arc::new(Notify::new()), |
| } |
| } |
| |
| async fn command_failed(&self) { |
| self.failed.notified().await |
| } |
| |
| async fn immediate_retry(&self) { |
| self.retry.notified().await |
| } |
| |
| async fn command_serviced(&self) { |
| self.received.notified().await |
| } |
| |
| fn received(&self) { |
| self.received.notify_one() |
| } |
| |
| fn retry(&self) { |
| self.retry.notify_one() |
| } |
| |
| fn failed(&self) { |
| self.failed.notify_one() |
| } |
| |
| fn send_with_retry(self, adaptation: Arc<SyncUwbAdaptation>, bytes: Vec<u8>) { |
| tokio::task::spawn(async move { |
| let mut received_response = false; |
| for retry in 0..MAX_RETRIES { |
| // TODO this must be non-blocking to avoid blocking the runtime if the HAL locks up. |
| // Will address in follow-up CL moving adaptation to be asynchronous. |
| adaptation.send_uci_message(&bytes); |
| select! { |
| _ = tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)) => warn!("UWB chip did not respond within {}ms deadline. Retrying (#{})...", RETRY_DELAY_MS, retry + 1), |
| _ = self.command_serviced() => { |
| received_response = true; |
| break; |
| } |
| _ = self.immediate_retry() => debug!("UWB chip requested immediate retry. Retrying (#{})...", retry + 1), |
| } |
| } |
| if !received_response { |
| error!("After {} retries, no response from UWB chip", MAX_RETRIES); |
| adaptation.core_initialization(); |
| self.failed(); |
| } |
| }); |
| } |
| } |
| |
| //TODO pull in libfutures instead of open-coding this |
| async fn option_future<R, T: Future<Output = R>>(mf: Option<T>) -> Option<R> { |
| if let Some(f) = mf { |
| Some(f.await) |
| } else { |
| None |
| } |
| } |
| |
| struct Driver<T: EventManager> { |
| adaptation: Arc<SyncUwbAdaptation>, |
| event_manager: T, |
| cmd_receiver: mpsc::UnboundedReceiver<(JNICommand, Option<UciResponseHandle>)>, |
| rsp_receiver: mpsc::UnboundedReceiver<HalCallback>, |
| response_channel: Option<(UciResponseHandle, Retryer)>, |
| state: UwbState, |
| } |
| |
| // Creates a future that handles messages from JNI and the HAL. |
| async fn drive<T: EventManager>( |
| adaptation: SyncUwbAdaptation, |
| event_manager: T, |
| cmd_receiver: mpsc::UnboundedReceiver<(JNICommand, Option<UciResponseHandle>)>, |
| rsp_receiver: mpsc::UnboundedReceiver<HalCallback>, |
| ) -> Result<()> { |
| Driver::new(Arc::new(adaptation), event_manager, cmd_receiver, rsp_receiver).drive().await |
| } |
| |
| const MAX_RETRIES: usize = 10; |
| const RETRY_DELAY_MS: u64 = 100; |
| |
| impl<T: EventManager> Driver<T> { |
| fn new( |
| adaptation: Arc<SyncUwbAdaptation>, |
| event_manager: T, |
| cmd_receiver: mpsc::UnboundedReceiver<(JNICommand, Option<UciResponseHandle>)>, |
| rsp_receiver: mpsc::UnboundedReceiver<HalCallback>, |
| ) -> Self { |
| Self { |
| adaptation, |
| event_manager, |
| cmd_receiver, |
| rsp_receiver, |
| response_channel: None, |
| state: UwbState::None, |
| } |
| } |
| |
| // Continually handles messages. |
| async fn drive(mut self) -> Result<()> { |
| loop { |
| self.drive_once().await? |
| } |
| } |
| |
| fn handle_blocking_jni_cmd( |
| &mut self, |
| tx: oneshot::Sender<UciResponse>, |
| cmd: JNICommand, |
| ) -> Result<()> { |
| log::debug!("Received blocking cmd {:?}", cmd); |
| let bytes = match cmd { |
| JNICommand::UciGetDeviceInfo => GetDeviceInfoCmdBuilder {}.build().to_vec(), |
| JNICommand::UciSessionInit(session_id, session_type) => { |
| uci_hmsgs::build_session_init_cmd(session_id, session_type).build().to_vec() |
| } |
| JNICommand::UciSessionDeinit(session_id) => { |
| SessionDeinitCmdBuilder { session_id }.build().to_vec() |
| } |
| JNICommand::UciSessionGetCount => SessionGetCountCmdBuilder {}.build().to_vec(), |
| JNICommand::UciStartRange(session_id) => { |
| RangeStartCmdBuilder { session_id }.build().to_vec() |
| } |
| JNICommand::UciStopRange(session_id) => { |
| RangeStopCmdBuilder { session_id }.build().to_vec() |
| } |
| JNICommand::UciGetSessionState(session_id) => { |
| SessionGetStateCmdBuilder { session_id }.build().to_vec() |
| } |
| JNICommand::UciSessionUpdateMulticastList { |
| session_id, |
| action, |
| no_of_controlee, |
| ref address_list, |
| ref sub_session_id_list, |
| } => uci_hmsgs::build_multicast_list_update_cmd( |
| session_id, |
| action, |
| no_of_controlee, |
| address_list, |
| sub_session_id_list, |
| ) |
| .build() |
| .to_vec(), |
| JNICommand::UciSetCountryCode { ref code } => { |
| uci_hmsgs::build_set_country_code_cmd(code).build().to_vec() |
| } |
| JNICommand::UciSetAppConfig { |
| session_id, |
| no_of_params, |
| app_config_param_len, |
| ref app_configs, |
| } => uci_hmsgs::build_set_app_config_cmd( |
| session_id, |
| no_of_params, |
| app_config_param_len, |
| app_configs, |
| )? |
| .build() |
| .to_vec(), |
| JNICommand::UciGetAppConfig { |
| session_id, |
| no_of_params, |
| app_config_param_len, |
| ref app_configs, |
| } => SessionGetAppConfigCmdBuilder { session_id, app_cfg: app_configs.to_vec() } |
| .build() |
| .to_vec(), |
| JNICommand::UciRawVendorCmd { gid, oid, payload } => { |
| uci_hmsgs::build_uci_vendor_cmd_packet(gid, oid, payload)?.to_vec() |
| } |
| _ => { |
| error!("Unexpected blocking cmd received {:?}", cmd); |
| return Ok(()); |
| } |
| }; |
| |
| let retryer = Retryer::new(); |
| self.response_channel = Some((tx, retryer.clone())); |
| retryer.send_with_retry(self.adaptation.clone(), bytes); |
| self.set_state(UwbState::W4UciResp); |
| Ok(()) |
| } |
| |
| fn handle_non_blocking_jni_cmd(&mut self, cmd: JNICommand) -> Result<()> { |
| log::debug!("Received non blocking cmd {:?}", cmd); |
| match cmd { |
| JNICommand::Enable => { |
| self.adaptation.hal_open(); |
| self.adaptation |
| .core_initialization() |
| .unwrap_or_else(|e| error!("Error invoking core init HAL API : {:?}", e)); |
| self.set_state(UwbState::W4HalOpen); |
| } |
| JNICommand::Disable(graceful) => { |
| self.set_state(UwbState::W4HalClose); |
| self.adaptation.hal_close(); |
| } |
| JNICommand::Exit => { |
| return Err(UwbErr::Exit); |
| } |
| _ => { |
| error!("Unexpected non blocking cmd received {:?}", cmd); |
| return Ok(()); |
| } |
| } |
| Ok(()) |
| } |
| |
| fn handle_hal_notification(&self, response: uci_hrcv::UciNotification) -> Result<()> { |
| log::debug!("Received hal notification {:?}", response); |
| match response { |
| uci_hrcv::UciNotification::DeviceStatusNtf(response) => { |
| self.event_manager.device_status_notification_received(response); |
| } |
| uci_hrcv::UciNotification::GenericError(response) => { |
| match (response.get_status(), self.response_channel.as_ref()) { |
| (StatusCode::UciStatusCommandRetry, Some((_, retryer))) => retryer.retry(), |
| _ => (), |
| } |
| self.event_manager.core_generic_error_notification_received(response); |
| } |
| uci_hrcv::UciNotification::SessionStatusNtf(response) => { |
| self.invoke_hal_session_init_if_necessary(&response); |
| self.event_manager.session_status_notification_received(response); |
| } |
| uci_hrcv::UciNotification::ShortMacTwoWayRangeDataNtf(response) => { |
| self.event_manager.short_range_data_notification_received(response); |
| } |
| uci_hrcv::UciNotification::ExtendedMacTwoWayRangeDataNtf(response) => { |
| self.event_manager.extended_range_data_notification_received(response); |
| } |
| uci_hrcv::UciNotification::SessionUpdateControllerMulticastListNtf(response) => { |
| self.event_manager |
| .session_update_controller_multicast_list_notification_received(response); |
| } |
| uci_hrcv::UciNotification::RawVendorNtf { gid, oid, payload } => { |
| self.event_manager.vendor_uci_notification_received(gid, oid, payload); |
| } |
| _ => log::error!("Unexpected hal notification received {:?}", response), |
| } |
| Ok(()) |
| } |
| |
| // Handles a single message from JNI or the HAL. |
| async fn drive_once(&mut self) -> Result<()> { |
| select! { |
| Some(()) = option_future(self.response_channel.as_ref() |
| .map(|(_, retryer)| retryer.command_failed())) => { |
| // TODO: Do we want to flush the incoming queue of commands when this happens? |
| self.set_state(UwbState::W4HalOpen); |
| self.response_channel = None |
| } |
| // Note: If a blocking command is awaiting a response, any non-blocking commands are not |
| // dequeued until the blocking cmd's response is received. |
| Some((cmd, tx)) = self.cmd_receiver.recv(), if self.can_process_cmd() => { |
| match tx { |
| Some(tx) => { // Blocking JNI commands processing. |
| self.handle_blocking_jni_cmd(tx, cmd)?; |
| }, |
| None => { // Non Blocking JNI commands processing. |
| self.handle_non_blocking_jni_cmd(cmd)?; |
| } |
| } |
| } |
| Some(rsp) = self.rsp_receiver.recv() => { |
| match rsp { |
| HalCallback::Event{event, event_status} => { |
| log::info!("Received HAL event: {:?} with status: {:?}", event, event_status); |
| match event { |
| UwbEvent::POST_INIT_CPLT => { |
| self.set_state(UwbState::Ready); |
| } |
| UwbEvent::CLOSE_CPLT => { |
| self.set_state(UwbState::None); |
| } |
| _ => (), |
| } |
| }, |
| HalCallback::UciRsp(response) => { |
| log::debug!("Received hal response {:?}", response); |
| self.set_state(UwbState::Ready); |
| if let Some((channel, retryer)) = self.response_channel.take() { |
| retryer.received(); |
| channel.send(response); |
| } else { |
| error!("Received response packet, but no response channel available"); |
| } |
| }, |
| HalCallback::UciNtf(response) => { |
| self.handle_hal_notification(response)?; |
| } |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| // Triggers the session init HAL API, if a new session is initialized. |
| fn invoke_hal_session_init_if_necessary(&self, response: &SessionStatusNtfPacket) -> () { |
| let session_id = |
| response.get_session_id().to_i32().expect("Failed converting session_id to u32"); |
| if let SessionState::SessionStateInit = response.get_session_state() { |
| info!("Session {:?} initialized, invoking session init HAL API", session_id); |
| self.adaptation |
| .session_initialization(session_id) |
| .unwrap_or_else(|e| error!("Error invoking session init HAL API : {:?}", e)); |
| } |
| } |
| |
| fn set_state(&mut self, state: UwbState) { |
| info!("UWB state change from {:?} to {:?}", self.state, state); |
| self.state = state; |
| } |
| |
| fn can_process_cmd(&mut self) -> bool { |
| self.state == UwbState::None || self.state == UwbState::Ready |
| } |
| } |
| |
| // Controller for sending tasks for the native thread to handle. |
| pub struct Dispatcher { |
| cmd_sender: mpsc::UnboundedSender<(JNICommand, Option<UciResponseHandle>)>, |
| join_handle: task::JoinHandle<Result<()>>, |
| runtime: Runtime, |
| pub device_info: Option<GetDeviceInfoRspPacket>, |
| } |
| |
| impl Dispatcher { |
| pub fn new<T: 'static + EventManager + std::marker::Send>(event_manager: T) -> Result<Self> { |
| let (rsp_sender, rsp_receiver) = mpsc::unbounded_channel::<HalCallback>(); |
| let adaptation: SyncUwbAdaptation = Box::new(UwbAdaptationImpl::new(rsp_sender)?); |
| |
| Self::new_with_args(event_manager, adaptation, rsp_receiver) |
| } |
| |
| #[cfg(test)] |
| pub fn new_for_testing<T: 'static + EventManager + std::marker::Send>( |
| event_manager: T, |
| adaptation: SyncUwbAdaptation, |
| rsp_receiver: mpsc::UnboundedReceiver<HalCallback>, |
| ) -> Result<Self> { |
| Self::new_with_args(event_manager, adaptation, rsp_receiver) |
| } |
| |
| fn new_with_args<T: 'static + EventManager + std::marker::Send>( |
| event_manager: T, |
| adaptation: SyncUwbAdaptation, |
| rsp_receiver: mpsc::UnboundedReceiver<HalCallback>, |
| ) -> Result<Self> { |
| info!("initializing dispatcher"); |
| let (cmd_sender, cmd_receiver) = |
| mpsc::unbounded_channel::<(JNICommand, Option<UciResponseHandle>)>(); |
| |
| // We create a new thread here both to avoid reusing the Java service thread and because |
| // binder threads will call into this. |
| let runtime = Builder::new_multi_thread() |
| .worker_threads(1) |
| .thread_name("uwb-uci-handler") |
| .enable_all() |
| .build()?; |
| let join_handle = |
| runtime.spawn(drive(adaptation, event_manager, cmd_receiver, rsp_receiver)); |
| Ok(Dispatcher { cmd_sender, join_handle, runtime, device_info: None }) |
| } |
| |
| pub fn send_jni_command(&self, cmd: JNICommand) -> Result<()> { |
| self.cmd_sender.send((cmd, None))?; |
| Ok(()) |
| } |
| |
| // TODO: Consider implementing these separate for different commands so we can have more |
| // specific return types. |
| pub fn block_on_jni_command(&self, cmd: JNICommand) -> Result<UciResponse> { |
| let (tx, rx) = oneshot::channel(); |
| self.cmd_sender.send((cmd, Some(tx)))?; |
| let ret = self.runtime.block_on(rx)?; |
| log::trace!("{:?}", ret); |
| Ok(ret) |
| } |
| |
| fn exit(&mut self) -> Result<()> { |
| self.send_jni_command(JNICommand::Exit)?; |
| let _ = self.runtime.block_on(&mut self.join_handle); |
| Ok(()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::adaptation::MockUwbAdaptation; |
| use crate::event_manager::MockEventManager; |
| |
| #[test] |
| fn test_driver() -> Result<()> { |
| // TODO: Remove this once we call it somewhere real. |
| logger::init( |
| logger::Config::default().with_tag_on_device("uwb").with_min_level(log::Level::Error), |
| ); |
| |
| let (rsp_sender, rsp_receiver) = mpsc::unbounded_channel::<HalCallback>(); |
| let mut mock_adaptation: SyncUwbAdaptation = Box::new(MockUwbAdaptation::new(rsp_sender)); |
| let mock_event_manager = MockEventManager::new(); |
| |
| let mut dispatcher = |
| Dispatcher::new_for_testing(mock_event_manager, mock_adaptation, rsp_receiver)?; |
| dispatcher.send_jni_command(JNICommand::Enable)?; |
| dispatcher.send_jni_command(JNICommand::UciGetDeviceInfo)?; |
| dispatcher.exit()?; |
| Ok(()) |
| } |
| } |