Floss: Refactor callback management using `Callbacks`

To avoid code duplication for easier maintenance, more consistent
behavior, less bugs.

Bug: 217273154
Tag: #floss
Test: Manual - build Floss on Linux

Change-Id: I542215dfac58de25199bbe013e126cd92424845c
diff --git a/system/gd/rust/linux/stack/src/bluetooth.rs b/system/gd/rust/linux/stack/src/bluetooth.rs
index 5cc1da6..8c85bd7 100644
--- a/system/gd/rust/linux/stack/src/bluetooth.rs
+++ b/system/gd/rust/linux/stack/src/bluetooth.rs
@@ -25,8 +25,9 @@
 use tokio::time;
 
 use crate::bluetooth_media::{BluetoothMedia, IBluetoothMedia, MediaActions};
+use crate::callbacks::Callbacks;
 use crate::uuid::{Profile, UuidHelper};
-use crate::{BluetoothCallbackType, Message, RPCProxy};
+use crate::{Message, RPCProxy};
 
 const DEFAULT_DISCOVERY_TIMEOUT_MS: u64 = 12800;
 const MIN_ADV_INSTANCES_FOR_MULTI_ADV: u8 = 5;
@@ -299,8 +300,8 @@
 
     bonded_devices: HashMap<String, BluetoothDeviceContext>,
     bluetooth_media: Arc<Mutex<Box<BluetoothMedia>>>,
-    callbacks: HashMap<u32, Box<dyn IBluetoothCallback + Send>>,
-    connection_callbacks: HashMap<u32, Box<dyn IBluetoothConnectionCallback + Send>>,
+    callbacks: Callbacks<dyn IBluetoothCallback + Send>,
+    connection_callbacks: Callbacks<dyn IBluetoothConnectionCallback + Send>,
     discovering_started: Instant,
     hh: Option<HidHost>,
     is_connectable: bool,
@@ -327,8 +328,11 @@
     ) -> Bluetooth {
         Bluetooth {
             bonded_devices: HashMap::new(),
-            callbacks: HashMap::new(),
-            connection_callbacks: HashMap::new(),
+            callbacks: Callbacks::new(tx.clone(), Message::AdapterCallbackDisconnected),
+            connection_callbacks: Callbacks::new(
+                tx.clone(),
+                Message::ConnectionCallbackDisconnected,
+            ),
             hh: None,
             bluetooth_media,
             discovering_started: Instant::now(),
@@ -378,26 +382,11 @@
     fn update_local_address(&mut self, addr: &RawAddress) {
         self.local_address = Some(*addr);
 
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_address_changed(self.local_address.unwrap().to_string());
         });
     }
 
-    fn for_all_callbacks<F: Fn(&Box<dyn IBluetoothCallback + Send>)>(&self, f: F) {
-        for (_, callback) in self.callbacks.iter() {
-            f(&callback);
-        }
-    }
-
-    fn for_all_connection_callbacks<F: Fn(&Box<dyn IBluetoothConnectionCallback + Send>)>(
-        &self,
-        f: F,
-    ) {
-        for (_, callback) in self.connection_callbacks.iter() {
-            f(&callback);
-        }
-    }
-
     pub fn get_connectable(&self) -> bool {
         match self.properties.get(&BtPropertyType::AdapterScanMode) {
             Some(prop) => match prop {
@@ -421,15 +410,12 @@
         )) == 0
     }
 
-    pub(crate) fn callback_disconnected(&mut self, id: u32, cb_type: BluetoothCallbackType) {
-        match cb_type {
-            BluetoothCallbackType::Adapter => {
-                self.callbacks.remove(&id);
-            }
-            BluetoothCallbackType::Connection => {
-                self.connection_callbacks.remove(&id);
-            }
-        };
+    pub(crate) fn adapter_callback_disconnected(&mut self, id: u32) {
+        self.callbacks.remove_callback(id);
+    }
+
+    pub(crate) fn connection_callback_disconnected(&mut self, id: u32) {
+        self.connection_callbacks.remove_callback(id);
     }
 
     fn get_remote_device_if_found(&self, address: &str) -> Option<&BluetoothDeviceContext> {
@@ -506,7 +492,7 @@
         self.found_devices.retain(|_, d| is_fresh(d, &now));
 
         for d in stale_devices {
-            self.for_all_callbacks(|callback| {
+            self.callbacks.for_all_callbacks(|callback| {
                 callback.on_device_cleared(d.clone());
             });
         }
@@ -669,12 +655,12 @@
                     }
                 }
                 BluetoothProperty::BdName(bdname) => {
-                    self.for_all_callbacks(|callback| {
+                    self.callbacks.for_all_callbacks(|callback| {
                         callback.on_name_changed(bdname.clone());
                     });
                 }
                 BluetoothProperty::AdapterScanMode(mode) => {
-                    self.for_all_callbacks(|callback| {
+                    self.callbacks.for_all_callbacks(|callback| {
                         callback
                             .on_discoverable_changed(*mode == BtScanMode::ConnectableDiscoverable);
                     });
@@ -684,7 +670,7 @@
 
             self.properties.insert(prop.get_type(), prop.clone());
 
-            self.for_all_callbacks(|callback| {
+            self.callbacks.for_all_callbacks(|callback| {
                 callback.on_adapter_property_changed(prop.get_type());
             });
         }
@@ -710,7 +696,7 @@
 
         let device = self.found_devices.get(&address).unwrap();
 
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_device_found(device.info.clone());
         });
     }
@@ -729,7 +715,7 @@
             self.discovering_started = Instant::now();
         }
 
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_discovering_changed(state == BtDiscoveryState::Started);
         });
 
@@ -751,7 +737,7 @@
     ) {
         // Currently this supports many agent because we accept many callbacks.
         // TODO: We need a way to select the default agent.
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_ssp_request(
                 BluetoothDevice::new(remote_addr.to_string(), remote_name.clone()),
                 cod,
@@ -806,7 +792,7 @@
         }
 
         // Send bond state changed notifications
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_bond_state_changed(
                 status.to_u32().unwrap(),
                 address.clone(),
@@ -898,12 +884,12 @@
 
                     match state {
                         BtAclState::Connected => {
-                            self.for_all_connection_callbacks(|callback| {
+                            self.connection_callbacks.for_all_callbacks(|callback| {
                                 callback.on_device_connected(device.clone());
                             });
                         }
                         BtAclState::Disconnected => {
-                            self.for_all_connection_callbacks(|callback| {
+                            self.connection_callbacks.for_all_callbacks(|callback| {
                                 callback.on_device_disconnected(device.clone());
                             });
                         }
@@ -917,52 +903,19 @@
 
 // TODO: Add unit tests for this implementation
 impl IBluetooth for Bluetooth {
-    fn register_callback(&mut self, mut callback: Box<dyn IBluetoothCallback + Send>) {
-        let tx = self.tx.clone();
-
-        let id = callback.register_disconnect(Box::new(move |cb_id| {
-            let tx = tx.clone();
-            tokio::spawn(async move {
-                let _result = tx
-                    .send(Message::BluetoothCallbackDisconnected(
-                        cb_id,
-                        BluetoothCallbackType::Adapter,
-                    ))
-                    .await;
-            });
-        }));
-
-        self.callbacks.insert(id, callback);
+    fn register_callback(&mut self, callback: Box<dyn IBluetoothCallback + Send>) {
+        self.callbacks.add_callback(callback);
     }
 
     fn register_connection_callback(
         &mut self,
-        mut callback: Box<dyn IBluetoothConnectionCallback + Send>,
+        callback: Box<dyn IBluetoothConnectionCallback + Send>,
     ) -> u32 {
-        let tx = self.tx.clone();
-
-        let id = callback.register_disconnect(Box::new(move |cb_id| {
-            let tx = tx.clone();
-            tokio::spawn(async move {
-                let _ = tx
-                    .send(Message::BluetoothCallbackDisconnected(
-                        cb_id,
-                        BluetoothCallbackType::Connection,
-                    ))
-                    .await;
-            });
-        }));
-
-        self.connection_callbacks.insert(id, callback);
-
-        id
+        self.connection_callbacks.add_callback(callback)
     }
 
     fn unregister_connection_callback(&mut self, callback_id: u32) -> bool {
-        match self.connection_callbacks.get_mut(&callback_id) {
-            Some(cb) => cb.unregister(callback_id),
-            None => false,
-        }
+        self.connection_callbacks.remove_callback(callback_id)
     }
 
     fn enable(&mut self) -> bool {
diff --git a/system/gd/rust/linux/stack/src/bluetooth_media.rs b/system/gd/rust/linux/stack/src/bluetooth_media.rs
index f993fed..12a3983 100644
--- a/system/gd/rust/linux/stack/src/bluetooth_media.rs
+++ b/system/gd/rust/linux/stack/src/bluetooth_media.rs
@@ -25,7 +25,8 @@
 use tokio::time::{sleep, Duration};
 
 use crate::bluetooth::{Bluetooth, BluetoothDevice, IBluetooth};
-use crate::Message;
+use crate::callbacks::Callbacks;
+use crate::{Message, RPCProxy};
 
 const DEFAULT_PROFILE_DISCOVERY_TIMEOUT_SEC: u64 = 5;
 
@@ -64,7 +65,7 @@
     fn stop_sco_call(&mut self, address: String);
 }
 
-pub trait IBluetoothMediaCallback {
+pub trait IBluetoothMediaCallback: RPCProxy {
     /// Triggered when a Bluetooth audio device is ready to be used. This should
     /// only be triggered once for a device and send an event to clients. If the
     /// device supports both HFP and A2DP, both should be ready when this is
@@ -119,8 +120,7 @@
 pub struct BluetoothMedia {
     intf: Arc<Mutex<BluetoothInterface>>,
     initialized: bool,
-    callbacks: Arc<Mutex<Vec<(u32, Box<dyn IBluetoothMediaCallback + Send>)>>>,
-    callback_last_id: u32,
+    callbacks: Arc<Mutex<Callbacks<dyn IBluetoothMediaCallback + Send>>>,
     tx: Sender<Message>,
     adapter: Option<Arc<Mutex<Box<Bluetooth>>>>,
     a2dp: Option<A2dp>,
@@ -139,8 +139,10 @@
         BluetoothMedia {
             intf,
             initialized: false,
-            callbacks: Arc::new(Mutex::new(vec![])),
-            callback_last_id: 0,
+            callbacks: Arc::new(Mutex::new(Callbacks::new(
+                tx.clone(),
+                Message::MediaCallbackDisconnected,
+            ))),
             tx,
             adapter: None,
             a2dp: None,
@@ -196,12 +198,12 @@
         match cb {
             AvrcpCallbacks::AvrcpAbsoluteVolumeEnabled(supported) => {
                 self.absolute_volume = supported;
-                self.for_all_callbacks(|callback| {
+                self.callbacks.lock().unwrap().for_all_callbacks(|callback| {
                     callback.on_absolute_volume_supported_changed(supported);
                 });
             }
             AvrcpCallbacks::AvrcpAbsoluteVolumeUpdate(volume) => {
-                self.for_all_callbacks(|callback| {
+                self.callbacks.lock().unwrap().for_all_callbacks(|callback| {
                     callback.on_absolute_volume_changed(volume);
                 });
             }
@@ -279,27 +281,31 @@
                 }
             }
             HfpCallbacks::VolumeUpdate(volume, addr) => {
-                self.for_all_callbacks(|callback| {
+                self.callbacks.lock().unwrap().for_all_callbacks(|callback| {
                     callback.on_hfp_volume_changed(volume, addr.to_string());
                 });
             }
         }
     }
 
+    pub fn remove_callback(&mut self, id: u32) -> bool {
+        self.callbacks.lock().unwrap().remove_callback(id)
+    }
+
     fn notify_media_capability_added(&self, addr: RawAddress) {
         // Return true if the device added message is sent by the call.
         fn dedup_added_cb(
             device_added_tasks: Arc<Mutex<HashMap<RawAddress, Option<JoinHandle<()>>>>>,
             addr: RawAddress,
-            callbacks: Arc<Mutex<Vec<(u32, Box<dyn IBluetoothMediaCallback + Send>)>>>,
+            callbacks: Arc<Mutex<Callbacks<dyn IBluetoothMediaCallback + Send>>>,
             device: BluetoothAudioDevice,
             is_delayed: bool,
         ) -> bool {
             // Closure used to lock and trigger the device added callbacks.
             let trigger_device_added = || {
-                for callback in &*callbacks.lock().unwrap() {
-                    callback.1.on_bluetooth_audio_device_added(device.clone());
-                }
+                callbacks.lock().unwrap().for_all_callbacks(|callback| {
+                    callback.on_bluetooth_audio_device_added(device.clone());
+                });
             };
             let mut guard = device_added_tasks.lock().unwrap();
             let task = guard.insert(addr, None);
@@ -390,7 +396,7 @@
                 // Abort what is pending
                 Some(handler) => handler.abort(),
                 // This addr has been added so tell audio server to remove it
-                None => self.for_all_callbacks(|callback| {
+                None => self.callbacks.lock().unwrap().for_all_callbacks(|callback| {
                     callback.on_bluetooth_audio_device_removed(addr.to_string());
                 }),
             }
@@ -399,12 +405,6 @@
         }
     }
 
-    fn for_all_callbacks<F: Fn(&Box<dyn IBluetoothMediaCallback + Send>)>(&self, f: F) {
-        for callback in &*self.callbacks.lock().unwrap() {
-            f(&callback.1);
-        }
-    }
-
     fn adapter_get_remote_name(&self, addr: RawAddress) -> String {
         let device = BluetoothDevice::new(
             addr.to_string(),
@@ -472,8 +472,7 @@
 
 impl IBluetoothMedia for BluetoothMedia {
     fn register_callback(&mut self, callback: Box<dyn IBluetoothMediaCallback + Send>) -> bool {
-        self.callback_last_id += 1;
-        self.callbacks.lock().unwrap().push((self.callback_last_id, callback));
+        let _id = self.callbacks.lock().unwrap().add_callback(callback);
         true
     }
 
diff --git a/system/gd/rust/linux/stack/src/lib.rs b/system/gd/rust/linux/stack/src/lib.rs
index 69414c7..d04b70d 100644
--- a/system/gd/rust/linux/stack/src/lib.rs
+++ b/system/gd/rust/linux/stack/src/lib.rs
@@ -31,12 +31,6 @@
     },
 };
 
-#[derive(Clone, Debug)]
-pub enum BluetoothCallbackType {
-    Adapter,
-    Connection,
-}
-
 /// Message types that are sent to the stack main dispatch loop.
 pub enum Message {
     // Callbacks from libbluetooth
@@ -51,9 +45,11 @@
 
     // Actions within the stack
     Media(MediaActions),
+    MediaCallbackDisconnected(u32),
 
     // Client callback disconnections
-    BluetoothCallbackDisconnected(u32, BluetoothCallbackType),
+    AdapterCallbackDisconnected(u32),
+    ConnectionCallbackDisconnected(u32),
 
     // Update list of found devices and remove old instances.
     DeviceFreshnessCheck,
@@ -130,8 +126,16 @@
                     bluetooth_media.lock().unwrap().dispatch_media_actions(action);
                 }
 
-                Message::BluetoothCallbackDisconnected(id, cb_type) => {
-                    bluetooth.lock().unwrap().callback_disconnected(id, cb_type);
+                Message::MediaCallbackDisconnected(cb_id) => {
+                    bluetooth_media.lock().unwrap().remove_callback(cb_id);
+                }
+
+                Message::AdapterCallbackDisconnected(id) => {
+                    bluetooth.lock().unwrap().adapter_callback_disconnected(id);
+                }
+
+                Message::ConnectionCallbackDisconnected(id) => {
+                    bluetooth.lock().unwrap().connection_callback_disconnected(id);
                 }
 
                 Message::DeviceFreshnessCheck => {
diff --git a/system/gd/rust/linux/stack/src/suspend.rs b/system/gd/rust/linux/stack/src/suspend.rs
index 0b9a8be..3d6d50e 100644
--- a/system/gd/rust/linux/stack/src/suspend.rs
+++ b/system/gd/rust/linux/stack/src/suspend.rs
@@ -1,9 +1,9 @@
 //! Suspend/Resume API.
 
+use crate::callbacks::Callbacks;
 use crate::{Message, RPCProxy};
 use bt_topshim::btif::BluetoothInterface;
 use log::warn;
-use std::collections::HashMap;
 use std::sync::{Arc, Mutex};
 use tokio::sync::mpsc::Sender;
 
@@ -60,7 +60,7 @@
 pub struct Suspend {
     intf: Arc<Mutex<BluetoothInterface>>,
     tx: Sender<Message>,
-    callbacks: HashMap<u32, Box<dyn ISuspendCallback + Send>>,
+    callbacks: Callbacks<dyn ISuspendCallback + Send>,
     is_connected_suspend: bool,
     was_a2dp_connected: bool,
 }
@@ -69,55 +69,34 @@
     pub fn new(intf: Arc<Mutex<BluetoothInterface>>, tx: Sender<Message>) -> Suspend {
         Self {
             intf: intf,
-            tx,
-            callbacks: HashMap::new(),
+            tx: tx.clone(),
+            callbacks: Callbacks::new(tx.clone(), Message::SuspendCallbackDisconnected),
             is_connected_suspend: false,
             was_a2dp_connected: false,
         }
     }
 
     pub(crate) fn callback_registered(&mut self, id: u32) {
-        match self.callbacks.get(&id) {
+        match self.callbacks.get_by_id(id) {
             Some(callback) => callback.on_callback_registered(id),
             None => warn!("Suspend callback {} does not exist", id),
         }
     }
 
     pub(crate) fn remove_callback(&mut self, id: u32) -> bool {
-        match self.callbacks.get_mut(&id) {
-            Some(callback) => {
-                callback.unregister(id);
-                self.callbacks.remove(&id);
-                true
-            }
-            None => false,
-        }
-    }
-
-    fn for_all_callbacks<F: Fn(&Box<dyn ISuspendCallback + Send>)>(&self, f: F) {
-        for (_, callback) in self.callbacks.iter() {
-            f(&callback);
-        }
+        self.callbacks.remove_callback(id)
     }
 }
 
 impl ISuspend for Suspend {
-    fn register_callback(&mut self, mut callback: Box<dyn ISuspendCallback + Send>) -> bool {
-        let tx = self.tx.clone();
-
-        let id = callback.register_disconnect(Box::new(move |cb_id| {
-            let tx = tx.clone();
-            tokio::spawn(async move {
-                let _result = tx.send(Message::SuspendCallbackDisconnected(cb_id)).await;
-            });
-        }));
+    fn register_callback(&mut self, callback: Box<dyn ISuspendCallback + Send>) -> bool {
+        let id = self.callbacks.add_callback(callback);
 
         let tx = self.tx.clone();
         tokio::spawn(async move {
             let _result = tx.send(Message::SuspendCallbackRegistered(id)).await;
         });
 
-        self.callbacks.insert(id, callback);
         true
     }
 
@@ -147,7 +126,7 @@
         self.intf.lock().unwrap().clear_filter_accept_list();
         // TODO(231435700): self.intf.lock().unwrap().disconnect_all_acls();
         self.intf.lock().unwrap().le_rand();
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_suspend_ready(suspend_id);
         });
         return 1;
@@ -165,7 +144,7 @@
             // TODO(224603198): start all advertising again
         }
         self.intf.lock().unwrap().le_rand();
-        self.for_all_callbacks(|callback| {
+        self.callbacks.for_all_callbacks(|callback| {
             callback.on_resumed(suspend_id);
         });
         return true;