blob: 805931bc3c37e9902ebd2f66cb2b589f119e8a30 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2020 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
class L2capMatchers(object):
@staticmethod
def ConnectionResponse(scid):
return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)
@staticmethod
def ConnectionRequest():
return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
@staticmethod
def ConfigurationResponse():
return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)
@staticmethod
def ConfigurationRequest():
return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
@staticmethod
def DisconnectionRequest():
return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
@staticmethod
def DisconnectionResponse(scid, dcid):
return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)
@staticmethod
def CommandReject():
return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)
@staticmethod
def LeCommandReject():
return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
@staticmethod
def CreditBasedConnectionRequest(psm):
return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)
@staticmethod
def CreditBasedConnectionResponse(
scid, result=LeCreditBasedConnectionResponseResult.SUCCESS):
return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, scid, result)
@staticmethod
def LeDisconnectionRequest(scid, dcid):
return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)
@staticmethod
def LeDisconnectionResponse(scid, dcid):
return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)
@staticmethod
def SFrame(req_seq=None, f=None, s=None, p=None):
return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
@staticmethod
def IFrame(tx_seq=None, payload=None, f=None):
return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f)
@staticmethod
def Data(payload):
return lambda packet: packet.GetPayload().GetBytes() == payload
@staticmethod
def FirstLeIFrame(payload, sdu_size):
return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)
# this is a hack - should be removed
@staticmethod
def PartialData(payload):
return lambda packet: payload in packet.GetPayload().GetBytes()
# this is a hack - should be removed
@staticmethod
def PacketPayloadRawData(payload):
return lambda packet: payload in packet.payload
# this is a hack - should be removed
@staticmethod
def PacketPayloadWithMatchingPsm(psm):
return lambda packet: None if psm != packet.psm else packet
@staticmethod
def ExtractBasicFrame(scid):
return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
@staticmethod
def InformationResponseExtendedFeatures(supports_ertm=None,
supports_streaming=None,
supports_fcs=None,
supports_fixed_channels=None):
return lambda packet: L2capMatchers._is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels)
@staticmethod
def _basic_frame(packet):
if packet is None:
return None
return l2cap_packets.BasicFrameView(
bt_packets.PacketViewLittleEndian(list(packet.payload)))
@staticmethod
def _basic_frame_for(packet, scid):
frame = L2capMatchers._basic_frame(packet)
if frame.GetChannelId() != scid:
return None
return frame
@staticmethod
def _information_frame(packet):
standard_frame = l2cap_packets.StandardFrameView(packet)
if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
return None
return l2cap_packets.EnhancedInformationFrameView(standard_frame)
@staticmethod
def _supervisory_frame(packet):
standard_frame = l2cap_packets.StandardFrameView(packet)
if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
return None
return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)
@staticmethod
def _is_matching_information_frame(packet, tx_seq, payload, f):
frame = L2capMatchers._information_frame(packet)
if frame is None:
return False
if tx_seq is not None and frame.GetTxSeq() != tx_seq:
return False
if payload is not None and frame.GetPayload().GetBytes() != payload:
return False
if f is not None and frame.GetF() != f:
return False
return True
@staticmethod
def _is_matching_supervisory_frame(packet, req_seq, f, s, p):
frame = L2capMatchers._supervisory_frame(packet)
if frame is None:
return False
if req_seq is not None and frame.GetReqSeq() != req_seq:
return False
if f is not None and frame.GetF() != f:
return False
if s is not None and frame.GetS() != s:
return False
if p is not None and frame.GetP() != p:
return False
return True
@staticmethod
def _is_matching_first_le_i_frame(packet, payload, sdu_size):
first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
return first_le_i_frame.GetPayload().GetBytes(
) == payload and first_le_i_frame.GetL2capSduLength() == sdu_size
@staticmethod
def _control_frame(packet):
if packet.GetChannelId() != 1:
return None
return l2cap_packets.ControlView(packet.GetPayload())
@staticmethod
def _le_control_frame(packet):
if packet.GetChannelId() != 5:
return None
return l2cap_packets.LeControlView(packet.GetPayload())
@staticmethod
def control_frame_with_code(packet, code):
frame = L2capMatchers._control_frame(packet)
if frame is None or frame.GetCode() != code:
return None
return frame
@staticmethod
def le_control_frame_with_code(packet, code):
frame = L2capMatchers._le_control_frame(packet)
if frame is None or frame.GetCode() != code:
return None
return frame
@staticmethod
def _is_control_frame_with_code(packet, code):
return L2capMatchers.control_frame_with_code(packet, code) is not None
@staticmethod
def _is_le_control_frame_with_code(packet, code):
return L2capMatchers.le_control_frame_with_code(packet,
code) is not None
@staticmethod
def _is_matching_connection_response(packet, scid):
frame = L2capMatchers.control_frame_with_code(
packet, CommandCode.CONNECTION_RESPONSE)
if frame is None:
return False
response = l2cap_packets.ConnectionResponseView(frame)
return response.GetSourceCid() == scid and response.GetResult(
) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid(
) != 0
@staticmethod
def _is_matching_disconnection_response(packet, scid, dcid):
frame = L2capMatchers.control_frame_with_code(
packet, CommandCode.DISCONNECTION_RESPONSE)
if frame is None:
return False
response = l2cap_packets.DisconnectionResponseView(frame)
return response.GetSourceCid() == scid and response.GetDestinationCid(
) == dcid
@staticmethod
def _is_matching_le_disconnection_response(packet, scid, dcid):
frame = L2capMatchers.le_control_frame_with_code(
packet, LeCommandCode.DISCONNECTION_RESPONSE)
if frame is None:
return False
response = l2cap_packets.LeDisconnectionResponseView(frame)
return response.GetSourceCid() == scid and response.GetDestinationCid(
) == dcid
@staticmethod
def _is_matching_le_disconnection_request(packet, scid, dcid):
frame = L2capMatchers.le_control_frame_with_code(
packet, LeCommandCode.DISCONNECTION_REQUEST)
if frame is None:
return False
request = l2cap_packets.LeDisconnectionRequestView(frame)
return request.GetSourceCid() == scid and request.GetDestinationCid(
) == dcid
@staticmethod
def _information_response_with_type(packet, info_type):
frame = L2capMatchers.control_frame_with_code(
packet, CommandCode.INFORMATION_RESPONSE)
if frame is None:
return None
response = l2cap_packets.InformationResponseView(frame)
if response.GetInfoType() != info_type:
return None
return response
@staticmethod
def _is_matching_information_response_extended_features(
packet, supports_ertm, supports_streaming, supports_fcs,
supports_fixed_channels):
frame = L2capMatchers._information_response_with_type(
packet, InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
if frame is None:
return False
features = l2cap_packets.InformationResponseExtendedFeaturesView(frame)
if supports_ertm is not None and features.GetEnhancedRetransmissionMode(
) != supports_ertm:
return False
if supports_streaming is not None and features.GetStreamingMode != supports_streaming:
return False
if supports_fcs is not None and features.GetFcsOption() != supports_fcs:
return False
if supports_fixed_channels is not None and features.GetFixedChannels(
) != supports_fixed_channels:
return False
return True
@staticmethod
def _is_matching_credit_based_connection_request(packet, psm):
frame = L2capMatchers.le_control_frame_with_code(
packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
if frame is None:
return False
request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
return request.GetLePsm() == psm
@staticmethod
def _is_matching_credit_based_connection_response(packet, scid, result):
frame = L2capMatchers.le_control_frame_with_code(
packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
if frame is None:
return False
response = l2cap_packets.LeCreditBasedConnectionResponseView(frame)
return response.GetResult() == result and (
result != LeCreditBasedConnectionResponseResult.SUCCESS or
response.GetDestinationCid() != 0)