blob: 6e52cd50c5577efe62ed3497a7eb2894953012bc [file] [log] [blame]
//! Handles fragmentation & reassembly of ACL packets into whole L2CAP payloads
use bt_common::Bluetooth;
use bt_packets::hci::PacketBoundaryFlag::{
ContinuingFragment, FirstAutomaticallyFlushable, FirstNonAutomaticallyFlushable,
};
use bt_packets::hci::{AclBuilder, AclChild, AclPacket, BroadcastFlag};
use bytes::{Buf, Bytes, BytesMut};
use futures::stream::{self, StreamExt};
use log::{error, info, warn};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;
const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;
pub struct Reassembler {
buffer: Option<BytesMut>,
remaining: usize,
out: Sender<Bytes>,
}
impl Reassembler {
/// Create a new reassembler
pub fn new(out: Sender<Bytes>) -> Self {
Self { buffer: None, remaining: 0, out }
}
/// Injest the packet and send out if fully reassembled
pub async fn on_packet(&mut self, packet: AclPacket) {
let payload = match packet.specialize() {
AclChild::Payload(payload) => payload,
AclChild::None => {
info!("dropping ACL packet with empty payload");
return;
}
};
if let BroadcastFlag::ActivePeripheralBroadcast = packet.get_broadcast_flag() {
// we do not accept broadcast packets
return;
}
match packet.get_packet_boundary_flag() {
FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
FirstAutomaticallyFlushable => {
if self.buffer.take().is_some() {
error!("got a start packet without finishing previous reassembly - dropping previous");
}
let full_size = get_l2cap_pdu_size(&payload);
self.remaining = full_size - (payload.len() - L2CAP_BASIC_FRAME_HEADER_LEN);
if self.remaining > 0 {
let mut buffer = BytesMut::with_capacity(full_size);
buffer.extend_from_slice(&payload[..]);
self.buffer = Some(buffer);
} else {
self.out.send(payload).await.unwrap();
}
},
ContinuingFragment => {
match self.buffer.take() {
None => warn!("got continuation packet without pending reassembly"),
Some(_) if self.remaining < payload.len() => warn!("remote sent unexpected L2CAP PDU - dropping entire packet"),
Some(mut buffer) => {
self.remaining -= payload.len();
buffer.extend_from_slice(&payload[..]);
if self.remaining == 0 {
self.out.send(buffer.freeze()).await.unwrap();
} else {
self.buffer = Some(buffer);
}
}
}
},
}
}
}
fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
if first_packet.len() <= L2CAP_BASIC_FRAME_HEADER_LEN {
error!("invalid l2cap starting packet");
0
} else {
(&first_packet[..]).get_u16_le() as usize
}
}
pub fn fragmenting_stream(
rx: ReceiverStream<Bytes>,
mtu: usize,
handle: u16,
bt: Bluetooth,
close_rx: oneshot::Receiver<()>,
) -> std::pin::Pin<
std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>,
> {
rx.flat_map(move |data| {
stream::iter(
data.chunks(mtu)
.enumerate()
.map(move |(i, chunk)| {
AclBuilder {
handle,
packet_boundary_flag: match bt {
Bluetooth::Classic if i == 0 => FirstAutomaticallyFlushable,
Bluetooth::Le if i == 0 => FirstNonAutomaticallyFlushable,
_ => ContinuingFragment,
},
broadcast_flag: BroadcastFlag::PointToPoint,
payload: Some(Bytes::copy_from_slice(chunk)),
}
.build()
})
.collect::<Vec<AclPacket>>(),
)
})
.take_until(close_rx)
.boxed()
}