blob: ec98d33d87c5f7191ff29d3a14a256741669180b [file] [log] [blame]
//! Classic ACL manager
use crate::hci::{Address, CommandSender, EventRegistry};
use crate::link::acl::core;
use bt_common::Bluetooth;
use bt_packets::hci::EventChild::{
AuthenticationComplete, ConnectionComplete, DisconnectionComplete,
};
use bt_packets::hci::{
AcceptConnectionRequestBuilder, AcceptConnectionRequestRole, ClockOffsetValid,
CreateConnectionBuilder, CreateConnectionCancelBuilder, CreateConnectionRoleSwitch,
DisconnectBuilder, DisconnectReason, ErrorCode, EventChild, EventCode, EventPacket,
PageScanRepetitionMode, RejectConnectionReason, RejectConnectionRequestBuilder, Role,
};
use bytes::Bytes;
use gddi::{module, provides, Stoppable};
use log::warn;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
module! {
classic_acl_module,
providers {
AclManager => provide_acl_manager,
},
}
/// Classic ACL manager
#[derive(Clone, Stoppable)]
pub struct AclManager {
req_tx: Sender<Request>,
/// High level events from AclManager
pub evt_rx: Arc<Mutex<Receiver<Event>>>,
}
/// Events generated by AclManager
#[derive(Debug)]
pub enum Event {
/// Connection was successful - provides the newly created connection
ConnectSuccess(Connection),
/// Locally initialted connection was not successful - indicates address & reason
ConnectFail {
/// Address of the failed connection
addr: Address,
/// Reason of the failed connection
reason: ErrorCode,
},
}
/// A classic ACL connection
#[derive(Debug)]
pub struct Connection {
addr: Address,
rx: Receiver<Bytes>,
tx: Sender<Bytes>,
shared: Arc<Mutex<ConnectionShared>>,
requests: Sender<ConnectionRequest>,
evt_rx: Receiver<ConnectionEvent>,
}
/// Events generated by Connection
#[derive(Debug)]
pub enum ConnectionEvent {
/// Connection was disconnected with the specified code.
Disconnected(ErrorCode),
/// Connection authentication was completed
AuthenticationComplete,
}
impl Connection {
/// Disconnect the connection with the specified reason.
pub async fn disconnect(&mut self, reason: DisconnectReason) {
let (tx, rx) = oneshot::channel();
self.requests.send(ConnectionRequest::Disconnect { reason, fut: tx }).await.unwrap();
rx.await.unwrap()
}
}
#[derive(Debug)]
enum ConnectionRequest {
Disconnect { reason: DisconnectReason, fut: oneshot::Sender<()> },
}
struct ConnectionInternal {
addr: Address,
#[allow(dead_code)]
shared: Arc<Mutex<ConnectionShared>>,
hci_evt_tx: Sender<EventPacket>,
}
#[derive(Debug)]
struct ConnectionShared {
role: Role,
}
impl AclManager {
/// Connect to the specified address, or queue it if a connection is already pending
pub async fn connect(&mut self, addr: Address) {
self.req_tx.send(Request::Connect { addr }).await.unwrap();
}
/// Cancel the connection to the specified address, if it is pending
pub async fn cancel_connect(&mut self, addr: Address) {
let (tx, rx) = oneshot::channel();
self.req_tx.send(Request::CancelConnect { addr, fut: tx }).await.unwrap();
rx.await.unwrap();
}
}
#[derive(Debug)]
enum Request {
Connect { addr: Address },
CancelConnect { addr: Address, fut: oneshot::Sender<()> },
}
#[derive(Eq, PartialEq)]
enum PendingConnect {
Outgoing(Address),
Incoming(Address),
None,
}
impl PendingConnect {
fn take(&mut self) -> Self {
std::mem::replace(self, PendingConnect::None)
}
}
#[provides]
async fn provide_acl_manager(
mut hci: CommandSender,
mut events: EventRegistry,
mut dispatch: core::AclDispatch,
rt: Arc<Runtime>,
) -> AclManager {
let (req_tx, mut req_rx) = channel::<Request>(10);
let (conn_evt_tx, conn_evt_rx) = channel::<Event>(10);
let local_rt = rt.clone();
local_rt.spawn(async move {
let connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>> = Arc::new(Mutex::new(HashMap::new()));
let mut connect_queue: Vec<Address> = Vec::new();
let mut pending = PendingConnect::None;
let (evt_tx, mut evt_rx) = channel(3);
events.register(EventCode::ConnectionComplete, evt_tx.clone()).await;
events.register(EventCode::ConnectionRequest, evt_tx.clone()).await;
events.register(EventCode::AuthenticationComplete, evt_tx).await;
loop {
select! {
Some(req) = req_rx.recv() => {
match req {
Request::Connect { addr } => {
if connections.lock().await.values().any(|c| c.addr == addr) {
warn!("already connected: {}", addr);
return;
}
if let PendingConnect::None = pending {
pending = PendingConnect::Outgoing(addr);
hci.send(build_create_connection(addr)).await;
} else {
connect_queue.insert(0, addr);
}
},
Request::CancelConnect { addr, fut } => {
connect_queue.retain(|p| *p != addr);
if pending == PendingConnect::Outgoing(addr) {
hci.send(CreateConnectionCancelBuilder { bd_addr: addr }).await;
}
fut.send(()).unwrap();
}
}
}
Some(evt) = evt_rx.recv() => {
match evt.specialize() {
ConnectionComplete(evt) => {
let addr = evt.get_bd_addr();
let status = evt.get_status();
let handle = evt.get_connection_handle();
let role = match pending.take() {
PendingConnect::Outgoing(a) if a == addr => Role::Central,
PendingConnect::Incoming(a) if a == addr => Role::Peripheral,
_ => panic!("No prior connection request for {}", addr),
};
match status {
ErrorCode::Success => {
let mut core_conn = dispatch.register(handle, Bluetooth::Classic).await;
let shared = Arc::new(Mutex::new(ConnectionShared { role }));
let (evt_tx, evt_rx) = channel(10);
let (req_tx, req_rx) = channel(10);
let connection = Connection {
addr,
shared: shared.clone(),
rx: core_conn.rx.take().unwrap(),
tx: core_conn.tx.take().unwrap(),
requests: req_tx,
evt_rx,
};
let connection_internal = ConnectionInternal {
addr,
shared,
hci_evt_tx: core_conn.evt_tx.clone(),
};
assert!(connections.lock().await.insert(handle, connection_internal).is_none());
rt.spawn(run_connection(handle, evt_tx, req_rx, core_conn, connections.clone(), hci.clone()));
conn_evt_tx.send(Event::ConnectSuccess(connection)).await.unwrap();
},
_ => conn_evt_tx.send(Event::ConnectFail { addr, reason: status }).await.unwrap(),
}
},
EventChild::ConnectionRequest(evt) => {
let addr = evt.get_bd_addr();
pending = PendingConnect::Incoming(addr);
if connections.lock().await.values().any(|c| c.addr == addr) {
hci.send(RejectConnectionRequestBuilder {
bd_addr: addr,
reason: RejectConnectionReason::UnacceptableBdAddr
}).await;
} else {
hci.send(AcceptConnectionRequestBuilder {
bd_addr: addr,
role: AcceptConnectionRequestRole::BecomeCentral
}).await;
}
},
AuthenticationComplete(e) => dispatch_to(e.get_connection_handle(), &connections, evt).await,
_ => unimplemented!(),
}
}
}
}
});
AclManager { req_tx, evt_rx: Arc::new(Mutex::new(conn_evt_rx)) }
}
fn build_create_connection(bd_addr: Address) -> CreateConnectionBuilder {
CreateConnectionBuilder {
bd_addr,
packet_type: 0x4408 /* DM 1,3,5 */ | 0x8810, /*DH 1,3,5 */
page_scan_repetition_mode: PageScanRepetitionMode::R1,
clock_offset: 0,
clock_offset_valid: ClockOffsetValid::Invalid,
allow_role_switch: CreateConnectionRoleSwitch::AllowRoleSwitch,
}
}
async fn dispatch_to(
handle: u16,
connections: &Arc<Mutex<HashMap<u16, ConnectionInternal>>>,
event: EventPacket,
) {
if let Some(c) = connections.lock().await.get_mut(&handle) {
c.hci_evt_tx.send(event).await.unwrap();
}
}
async fn run_connection(
handle: u16,
evt_tx: Sender<ConnectionEvent>,
mut req_rx: Receiver<ConnectionRequest>,
mut core: core::Connection,
connections: Arc<Mutex<HashMap<u16, ConnectionInternal>>>,
mut hci: CommandSender,
) {
loop {
select! {
Some(evt) = core.evt_rx.recv() => {
match evt.specialize() {
DisconnectionComplete(evt) => {
connections.lock().await.remove(&handle);
evt_tx.send(ConnectionEvent::Disconnected(evt.get_reason())).await.unwrap();
return; // At this point, there is nothing more to run on the connection.
},
AuthenticationComplete(_) => evt_tx.send(ConnectionEvent::AuthenticationComplete).await.unwrap(),
_ => unimplemented!(),
}
},
Some(req) = req_rx.recv() => {
match req {
ConnectionRequest::Disconnect{reason, fut} => {
hci.send(DisconnectBuilder { connection_handle: handle, reason }).await;
fut.send(()).unwrap();
}
}
},
}
}
}