blob: 41b3f5058002719c83c9ff0ccb2f917ad6de3f05 [file] [log] [blame]
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## Copyright (C) Mike Ryan <mikeryan@lacklustre.net>
## This program is published under a GPLv2 license
"""
Bluetooth layers, sockets and send/receive functions.
"""
import socket,struct,array
from ctypes import *
from select import select
from scapy.config import conf
from scapy.data import DLT_BLUETOOTH_HCI_H4
from scapy.packet import *
from scapy.fields import *
from scapy.supersocket import SuperSocket
from scapy.sendrecv import sndrcv
from scapy.data import MTU
from scapy.consts import WINDOWS
from scapy.error import warning, log_loading
##########
# Fields #
##########
class XLEShortField(LEShortField):
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt, x))
class XLELongField(LEShortField):
def __init__(self, name, default):
Field.__init__(self, name, default, "<Q")
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt, x))
class LEMACField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "6s")
def i2m(self, pkt, x):
if x is None:
return b"\0\0\0\0\0\0"
return mac2str(x)[::-1]
def m2i(self, pkt, x):
return str2mac(x[::-1])
def any2i(self, pkt, x):
if isinstance(x, str) and len(x) is 6:
x = self.m2i(pkt, x)
return x
def i2repr(self, pkt, x):
x = self.i2h(pkt, x)
if self in conf.resolve:
x = conf.manufdb._resolve_MAC(x)
return x
def randval(self):
return RandMAC()
_bluetooth_packet_types = {
0: "Acknowledgement",
1: "Command",
2: "ACL Data",
3: "Synchronous",
4: "Event",
5: "Reserve",
14: "Vendor",
15: "Link Control"
}
class HCI_Hdr(Packet):
name = "HCI header"
fields_desc = [ ByteEnumField("type", 2, _bluetooth_packet_types) ]
def mysummary(self):
return self.sprintf("HCI %type%")
class HCI_ACL_Hdr(Packet):
name = "HCI ACL header"
fields_desc = [ BitField("handle",0,12), # TODO: Create and use LEBitField
BitField("PB",0,2), # They are recieved as a **combined** LE Short
BitField("BC",0,2), # Handle is 12 bits, eacg flag is 2 bits.
LEShortField("len",None), ]
def pre_dissect(self, s):
# Recieve data as LE stored as
# .... 1111 0100 1100 = handle
# 1010 .... .... .... = flags
# And turn it into
# 1111 0100 1100 .... = handle
# .... .... .... 1010 = flags
hf = socket.ntohs(struct.unpack("!H", s[:2])[0])
r = ((hf & 0x0fff) << 4) + (hf >> 12)
return struct.pack("!H", r) + s[2:]
def post_dissect(self, s):
self.raw_packet_cache = None # Reset packet to allow post_build
return s
def post_build(self, p, pay):
p += pay
if self.len is None:
p = p[:2] + struct.pack("<H", len(pay)) + p[4:]
# Reverse, opposite of pre_dissect
hf = struct.unpack("!H", p[:2])[0]
r = socket.ntohs(((hf & 0xf) << 12) + (hf >> 4))
return struct.pack("!H", r) + p[2:]
class L2CAP_Hdr(Packet):
name = "L2CAP header"
fields_desc = [ LEShortField("len",None),
LEShortEnumField("cid",0,{1:"control", 4:"attribute"}),]
def post_build(self, p, pay):
p += pay
if self.len is None:
p = struct.pack("<H", len(pay)) + p[2:]
return p
class L2CAP_CmdHdr(Packet):
name = "L2CAP command header"
fields_desc = [
ByteEnumField("code",8,{1:"rej",2:"conn_req",3:"conn_resp",
4:"conf_req",5:"conf_resp",6:"disconn_req",
7:"disconn_resp",8:"echo_req",9:"echo_resp",
10:"info_req",11:"info_resp", 18:"conn_param_update_req",
19:"conn_param_update_resp"}),
ByteField("id",0),
LEShortField("len",None) ]
def post_build(self, p, pay):
p += pay
if self.len is None:
p = p[:2] + struct.pack("<H", len(pay)) + p[4:]
return p
def answers(self, other):
if other.id == self.id:
if self.code == 1:
return 1
if other.code in [2,4,6,8,10,18] and self.code == other.code+1:
if other.code == 8:
return 1
return self.payload.answers(other.payload)
return 0
class L2CAP_ConnReq(Packet):
name = "L2CAP Conn Req"
fields_desc = [ LEShortEnumField("psm",0,{1:"SDP",3:"RFCOMM",5:"telephony control"}),
LEShortField("scid",0),
]
class L2CAP_ConnResp(Packet):
name = "L2CAP Conn Resp"
fields_desc = [ LEShortField("dcid",0),
LEShortField("scid",0),
LEShortEnumField("result",0,["success", "pend", "cr_bad_psm", "cr_sec_block", "cr_no_mem", "reserved","cr_inval_scid", "cr_scid_in_use"]),
LEShortEnumField("status",0,["no_info", "authen_pend", "author_pend", "reserved"]),
]
def answers(self, other):
return isinstance(other, L2CAP_ConnReq) and self.dcid == other.scid
class L2CAP_CmdRej(Packet):
name = "L2CAP Command Rej"
fields_desc = [ LEShortField("reason",0),
]
class L2CAP_ConfReq(Packet):
name = "L2CAP Conf Req"
fields_desc = [ LEShortField("dcid",0),
LEShortField("flags",0),
]
class L2CAP_ConfResp(Packet):
name = "L2CAP Conf Resp"
fields_desc = [ LEShortField("scid",0),
LEShortField("flags",0),
LEShortEnumField("result",0,["success","unaccept","reject","unknown"]),
]
def answers(self, other):
return isinstance(other, L2CAP_ConfReq) and self.scid == other.dcid
class L2CAP_DisconnReq(Packet):
name = "L2CAP Disconn Req"
fields_desc = [ LEShortField("dcid",0),
LEShortField("scid",0), ]
class L2CAP_DisconnResp(Packet):
name = "L2CAP Disconn Resp"
fields_desc = [ LEShortField("dcid",0),
LEShortField("scid",0), ]
def answers(self, other):
return self.scid == other.scid
class L2CAP_InfoReq(Packet):
name = "L2CAP Info Req"
fields_desc = [ LEShortEnumField("type",0,{1:"CL_MTU",2:"FEAT_MASK"}),
StrField("data","")
]
class L2CAP_InfoResp(Packet):
name = "L2CAP Info Resp"
fields_desc = [ LEShortField("type",0),
LEShortEnumField("result",0,["success","not_supp"]),
StrField("data",""), ]
def answers(self, other):
return self.type == other.type
class L2CAP_Connection_Parameter_Update_Request(Packet):
name = "L2CAP Connection Parameter Update Request"
fields_desc = [ LEShortField("min_interval", 0),
LEShortField("max_interval", 0),
LEShortField("slave_latency", 0),
LEShortField("timeout_mult", 0), ]
class L2CAP_Connection_Parameter_Update_Response(Packet):
name = "L2CAP Connection Parameter Update Response"
fields_desc = [ LEShortField("move_result", 0), ]
class ATT_Hdr(Packet):
name = "ATT header"
fields_desc = [ XByteField("opcode", None), ]
class ATT_Error_Response(Packet):
name = "Error Response"
fields_desc = [ XByteField("request", 0),
LEShortField("handle", 0),
XByteField("ecode", 0), ]
class ATT_Exchange_MTU_Request(Packet):
name = "Exchange MTU Request"
fields_desc = [ LEShortField("mtu", 0), ]
class ATT_Exchange_MTU_Response(Packet):
name = "Exchange MTU Response"
fields_desc = [ LEShortField("mtu", 0), ]
class ATT_Find_Information_Request(Packet):
name = "Find Information Request"
fields_desc = [ XLEShortField("start", 0x0000),
XLEShortField("end", 0xffff), ]
class ATT_Find_Information_Response(Packet):
name = "Find Information Reponse"
fields_desc = [ XByteField("format", 1),
StrField("data", "") ]
class ATT_Find_By_Type_Value_Request(Packet):
name = "Find By Type Value Request"
fields_desc = [ XLEShortField("start", 0x0001),
XLEShortField("end", 0xffff),
XLEShortField("uuid", None),
StrField("data", ""), ]
class ATT_Find_By_Type_Value_Response(Packet):
name = "Find By Type Value Response"
fields_desc = [ StrField("handles", ""), ]
class ATT_Read_By_Type_Request_128bit(Packet):
name = "Read By Type Request"
fields_desc = [ XLEShortField("start", 0x0001),
XLEShortField("end", 0xffff),
XLELongField("uuid1", None),
XLELongField("uuid2", None)]
@classmethod
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and len(_pkt) == 6:
return ATT_Read_By_Type_Request
return ATT_Read_By_Type_Request_128bit
class ATT_Read_By_Type_Request(Packet):
name = "Read By Type Request"
fields_desc = [ XLEShortField("start", 0x0001),
XLEShortField("end", 0xffff),
XLEShortField("uuid", None)]
class ATT_Read_By_Type_Response(Packet):
name = "Read By Type Response"
# fields_desc = [ FieldLenField("len", None, length_of="data", fmt="B"),
# StrLenField("data", "", length_from=lambda pkt:pkt.len), ]
fields_desc = [ StrField("data", "") ]
class ATT_Read_Request(Packet):
name = "Read Request"
fields_desc = [ XLEShortField("gatt_handle", 0), ]
class ATT_Read_Response(Packet):
name = "Read Response"
fields_desc = [ StrField("value", ""), ]
class ATT_Read_By_Group_Type_Request(Packet):
name = "Read By Group Type Request"
fields_desc = [ XLEShortField("start", 0),
XLEShortField("end", 0xffff),
XLEShortField("uuid", 0), ]
class ATT_Read_By_Group_Type_Response(Packet):
name = "Read By Group Type Response"
fields_desc = [ XByteField("length", 0),
StrField("data", ""), ]
class ATT_Write_Request(Packet):
name = "Write Request"
fields_desc = [ XLEShortField("gatt_handle", 0),
StrField("data", ""), ]
class ATT_Write_Command(Packet):
name = "Write Request"
fields_desc = [ XLEShortField("gatt_handle", 0),
StrField("data", ""), ]
class ATT_Write_Response(Packet):
name = "Write Response"
fields_desc = [ ]
class ATT_Handle_Value_Notification(Packet):
name = "Handle Value Notification"
fields_desc = [ XLEShortField("handle", 0),
StrField("value", ""), ]
class SM_Hdr(Packet):
name = "SM header"
fields_desc = [ ByteField("sm_command", None) ]
class SM_Pairing_Request(Packet):
name = "Pairing Request"
fields_desc = [ ByteEnumField("iocap", 3, {0:"DisplayOnly", 1:"DisplayYesNo", 2:"KeyboardOnly", 3:"NoInputNoOutput", 4:"KeyboardDisplay"}),
ByteEnumField("oob", 0, {0:"Not Present", 1:"Present (from remote device)"}),
BitField("authentication", 0, 8),
ByteField("max_key_size", 16),
ByteField("initiator_key_distribution", 0),
ByteField("responder_key_distribution", 0), ]
class SM_Pairing_Response(Packet):
name = "Pairing Response"
fields_desc = [ ByteEnumField("iocap", 3, {0:"DisplayOnly", 1:"DisplayYesNo", 2:"KeyboardOnly", 3:"NoInputNoOutput", 4:"KeyboardDisplay"}),
ByteEnumField("oob", 0, {0:"Not Present", 1:"Present (from remote device)"}),
BitField("authentication", 0, 8),
ByteField("max_key_size", 16),
ByteField("initiator_key_distribution", 0),
ByteField("responder_key_distribution", 0), ]
class SM_Confirm(Packet):
name = "Pairing Confirm"
fields_desc = [ StrFixedLenField("confirm", b'\x00' * 16, 16) ]
class SM_Random(Packet):
name = "Pairing Random"
fields_desc = [ StrFixedLenField("random", b'\x00' * 16, 16) ]
class SM_Failed(Packet):
name = "Pairing Failed"
fields_desc = [ XByteField("reason", 0) ]
class SM_Encryption_Information(Packet):
name = "Encryption Information"
fields_desc = [ StrFixedLenField("ltk", b"\x00" * 16, 16), ]
class SM_Master_Identification(Packet):
name = "Master Identification"
fields_desc = [ XLEShortField("ediv", 0),
StrFixedLenField("rand", b'\x00' * 8, 8), ]
class SM_Identity_Information(Packet):
name = "Identity Information"
fields_desc = [ StrFixedLenField("irk", b'\x00' * 16, 16), ]
class SM_Identity_Address_Information(Packet):
name = "Identity Address Information"
fields_desc = [ ByteEnumField("atype", 0, {0:"public"}),
LEMACField("address", None), ]
class SM_Signing_Information(Packet):
name = "Signing Information"
fields_desc = [ StrFixedLenField("csrk", b'\x00' * 16, 16), ]
class EIR_Hdr(Packet):
name = "EIR Header"
fields_desc = [
LenField("len", None, fmt="B", adjust=lambda x: x+1), # Add bytes mark
ByteEnumField("type", 0, {
0x01: "flags",
0x02: "incomplete_list_16_bit_svc_uuids",
0x03: "complete_list_16_bit_svc_uuids",
0x04: "incomplete_list_32_bit_svc_uuids",
0x05: "complete_list_32_bit_svc_uuids",
0x06: "incomplete_list_128_bit_svc_uuids",
0x07: "complete_list_128_bit_svc_uuids",
0x08: "shortened_local_name",
0x09: "complete_local_name",
0x0a: "tx_power_level",
0x0d: "class_of_device",
0x0e: "simple_pairing_hash",
0x0f: "simple_pairing_rand",
0x10: "sec_mgr_tk",
0x11: "sec_mgr_oob_flags",
0x12: "slave_conn_intvl_range",
0x17: "pub_target_addr",
0x18: "rand_target_addr",
0x19: "appearance",
0x1a: "adv_intvl",
0x1b: "le_addr",
0x1c: "le_role",
0x14: "list_16_bit_svc_sollication_uuids",
0x1f: "list_32_bit_svc_sollication_uuids",
0x15: "list_128_bit_svc_sollication_uuids",
0x16: "svc_data_16_bit_uuid",
0x20: "svc_data_32_bit_uuid",
0x21: "svc_data_128_bit_uuid",
0x22: "sec_conn_confirm",
0x22: "sec_conn_rand",
0x24: "uri",
0xff: "mfg_specific_data",
}),
]
def mysummary(self):
return self.sprintf("EIR %type%")
class EIR_Element(Packet):
name = "EIR Element"
def extract_padding(self, s):
# Needed to end each EIR_Element packet and make PacketListField work.
return '', s
@staticmethod
def length_from(pkt):
if not pkt.underlayer:
warning("Missing an upper-layer")
return 0
# 'type' byte is included in the length, so substract 1:
return pkt.underlayer.len - 1
class EIR_Raw(EIR_Element):
name = "EIR Raw"
fields_desc = [
StrLenField("data", "", length_from=EIR_Element.length_from)
]
class EIR_Flags(EIR_Element):
name = "Flags"
fields_desc = [
FlagsField("flags", 0x2, 8,
["limited_disc_mode", "general_disc_mode",
"br_edr_not_supported", "simul_le_br_edr_ctrl",
"simul_le_br_edr_host"] + 3*["reserved"])
]
class EIR_CompleteList16BitServiceUUIDs(EIR_Element):
name = "Complete list of 16-bit service UUIDs"
fields_desc = [
FieldListField("svc_uuids", None, XLEShortField("uuid", 0),
length_from=EIR_Element.length_from)
]
class EIR_IncompleteList16BitServiceUUIDs(EIR_CompleteList16BitServiceUUIDs):
name = "Incomplete list of 16-bit service UUIDs"
class EIR_CompleteLocalName(EIR_Element):
name = "Complete Local Name"
fields_desc = [
StrLenField("local_name", "", length_from=EIR_Element.length_from)
]
class EIR_ShortenedLocalName(EIR_CompleteLocalName):
name = "Shortened Local Name"
class EIR_TX_Power_Level(EIR_Element):
name = "TX Power Level"
fields_desc = [SignedByteField("level", 0)]
class EIR_Manufacturer_Specific_Data(EIR_Element):
name = "EIR Manufacturer Specific Data"
fields_desc = [
XLEShortField("company_id", 0),
StrLenField("data", "",
length_from=lambda pkt: EIR_Element.length_from(pkt) - 2)
]
class HCI_Command_Hdr(Packet):
name = "HCI Command header"
fields_desc = [ XLEShortField("opcode", 0),
ByteField("len", None), ]
def post_build(self, p, pay):
p += pay
if self.len is None:
p = p[:2] + struct.pack("B", len(pay)) + p[3:]
return p
class HCI_Cmd_Reset(Packet):
name = "Reset"
class HCI_Cmd_Set_Event_Filter(Packet):
name = "Set Event Filter"
fields_desc = [ ByteEnumField("type", 0, {0:"clear"}), ]
class HCI_Cmd_Connect_Accept_Timeout(Packet):
name = "Connection Attempt Timeout"
fields_desc = [ LEShortField("timeout", 32000) ] # 32000 slots is 20000 msec
class HCI_Cmd_LE_Host_Supported(Packet):
name = "LE Host Supported"
fields_desc = [ ByteField("supported", 1),
ByteField("simultaneous", 1), ]
class HCI_Cmd_Set_Event_Mask(Packet):
name = "Set Event Mask"
fields_desc = [ StrFixedLenField("mask", b"\xff\xff\xfb\xff\x07\xf8\xbf\x3d", 8) ]
class HCI_Cmd_Read_BD_Addr(Packet):
name = "Read BD Addr"
class HCI_Cmd_LE_Set_Scan_Parameters(Packet):
name = "LE Set Scan Parameters"
fields_desc = [ ByteEnumField("type", 1, {1:"active"}),
XLEShortField("interval", 16),
XLEShortField("window", 16),
ByteEnumField("atype", 0, {0:"public"}),
ByteEnumField("policy", 0, {0:"all"}), ]
class HCI_Cmd_LE_Set_Scan_Enable(Packet):
name = "LE Set Scan Enable"
fields_desc = [ ByteField("enable", 1),
ByteField("filter_dups", 1), ]
class HCI_Cmd_Disconnect(Packet):
name = "Disconnect"
fields_desc = [ XLEShortField("handle", 0),
ByteField("reason", 0x13), ]
class HCI_Cmd_LE_Create_Connection(Packet):
name = "LE Create Connection"
fields_desc = [ LEShortField("interval", 96),
LEShortField("window", 48),
ByteEnumField("filter", 0, {0:"address"}),
ByteEnumField("patype", 0, {0:"public", 1:"random"}),
LEMACField("paddr", None),
ByteEnumField("atype", 0, {0:"public", 1:"random"}),
LEShortField("min_interval", 40),
LEShortField("max_interval", 56),
LEShortField("latency", 0),
LEShortField("timeout", 42),
LEShortField("min_ce", 0),
LEShortField("max_ce", 0), ]
class HCI_Cmd_LE_Create_Connection_Cancel(Packet):
name = "LE Create Connection Cancel"
class HCI_Cmd_LE_Connection_Update(Packet):
name = "LE Connection Update"
fields_desc = [ XLEShortField("handle", 0),
XLEShortField("min_interval", 0),
XLEShortField("max_interval", 0),
XLEShortField("latency", 0),
XLEShortField("timeout", 0),
LEShortField("min_ce", 0),
LEShortField("max_ce", 0xffff), ]
class HCI_Cmd_LE_Read_Buffer_Size(Packet):
name = "LE Read Buffer Size"
class HCI_Cmd_LE_Set_Random_Address(Packet):
name = "LE Set Random Address"
fields_desc = [ LEMACField("address", None) ]
class HCI_Cmd_LE_Set_Advertising_Parameters(Packet):
name = "LE Set Advertising Parameters"
fields_desc = [ LEShortField("interval_min", 0x0800),
LEShortField("interval_max", 0x0800),
ByteEnumField("adv_type", 0, {0:"ADV_IND", 1:"ADV_DIRECT_IND", 2:"ADV_SCAN_IND", 3:"ADV_NONCONN_IND", 4:"ADV_DIRECT_IND_LOW"}),
ByteEnumField("oatype", 0, {0:"public", 1:"random"}),
ByteEnumField("datype", 0, {0:"public", 1:"random"}),
LEMACField("daddr", None),
ByteField("channel_map", 7),
ByteEnumField("filter_policy", 0, {0:"all:all", 1:"connect:all scan:whitelist", 2:"connect:whitelist scan:all", 3:"all:whitelist"}), ]
class HCI_Cmd_LE_Set_Advertising_Data(Packet):
name = "LE Set Advertising Data"
fields_desc = [ FieldLenField("len", None, length_of="data", fmt="B"),
StrLenField("data", "", length_from=lambda pkt:pkt.len), ]
class HCI_Cmd_LE_Set_Advertise_Enable(Packet):
name = "LE Set Advertise Enable"
fields_desc = [ ByteField("enable", 0) ]
class HCI_Cmd_LE_Start_Encryption_Request(Packet):
name = "LE Start Encryption"
fields_desc = [ LEShortField("handle", 0),
StrFixedLenField("rand", None, 8),
XLEShortField("ediv", 0),
StrFixedLenField("ltk", b'\x00' * 16, 16), ]
class HCI_Cmd_LE_Long_Term_Key_Request_Negative_Reply(Packet):
name = "LE Long Term Key Request Negative Reply"
fields_desc = [ LEShortField("handle", 0), ]
class HCI_Cmd_LE_Long_Term_Key_Request_Reply(Packet):
name = "LE Long Term Key Request Reply"
fields_desc = [ LEShortField("handle", 0),
StrFixedLenField("ltk", b'\x00' * 16, 16), ]
class HCI_Event_Hdr(Packet):
name = "HCI Event header"
fields_desc = [ XByteField("code", 0),
ByteField("length", 0), ]
class HCI_Event_Disconnection_Complete(Packet):
name = "Disconnection Complete"
fields_desc = [ ByteEnumField("status", 0, {0:"success"}),
LEShortField("handle", 0),
XByteField("reason", 0), ]
class HCI_Event_Encryption_Change(Packet):
name = "Encryption Change"
fields_desc = [ ByteEnumField("status", 0, {0:"change has occurred"}),
LEShortField("handle", 0),
ByteEnumField("enabled", 0, {0:"OFF", 1:"ON (LE)", 2:"ON (BR/EDR)"}), ]
class HCI_Event_Command_Complete(Packet):
name = "Command Complete"
fields_desc = [ ByteField("number", 0),
XLEShortField("opcode", 0),
ByteEnumField("status", 0, {0:"success"}), ]
class HCI_Cmd_Complete_Read_BD_Addr(Packet):
name = "Read BD Addr"
fields_desc = [ LEMACField("addr", None), ]
class HCI_Event_Command_Status(Packet):
name = "Command Status"
fields_desc = [ ByteEnumField("status", 0, {0:"pending"}),
ByteField("number", 0),
XLEShortField("opcode", None), ]
class HCI_Event_Number_Of_Completed_Packets(Packet):
name = "Number Of Completed Packets"
fields_desc = [ ByteField("number", 0) ]
class HCI_Event_LE_Meta(Packet):
name = "LE Meta"
fields_desc = [ ByteEnumField("event", 0, {2:"advertising_report"}) ]
class HCI_LE_Meta_Connection_Complete(Packet):
name = "Connection Complete"
fields_desc = [ ByteEnumField("status", 0, {0:"success"}),
LEShortField("handle", 0),
ByteEnumField("role", 0, {0:"master"}),
ByteEnumField("patype", 0, {0:"public", 1:"random"}),
LEMACField("paddr", None),
LEShortField("interval", 54),
LEShortField("latency", 0),
LEShortField("supervision", 42),
XByteField("clock_latency", 5), ]
class HCI_LE_Meta_Connection_Update_Complete(Packet):
name = "Connection Update Complete"
fields_desc = [ ByteEnumField("status", 0, {0:"success"}),
LEShortField("handle", 0),
LEShortField("interval", 54),
LEShortField("latency", 0),
LEShortField("timeout", 42), ]
class HCI_LE_Meta_Advertising_Report(Packet):
name = "Advertising Report"
fields_desc = [ ByteField("number", 0),
ByteEnumField("type", 0, {0:"conn_und", 4:"scan_rsp"}),
ByteEnumField("atype", 0, {0:"public", 1:"random"}),
LEMACField("addr", None),
FieldLenField("len", None, length_of="data", fmt="B"),
PacketListField("data", [], EIR_Hdr,
length_from=lambda pkt:pkt.len),
SignedByteField("rssi", 0)]
class HCI_LE_Meta_Long_Term_Key_Request(Packet):
name = "Long Term Key Request"
fields_desc = [ LEShortField("handle", 0),
StrFixedLenField("rand", None, 8),
XLEShortField("ediv", 0), ]
bind_layers( HCI_Hdr, HCI_Command_Hdr, type=1)
bind_layers( HCI_Hdr, HCI_ACL_Hdr, type=2)
bind_layers( HCI_Hdr, HCI_Event_Hdr, type=4)
bind_layers( HCI_Hdr, conf.raw_layer, )
conf.l2types.register(DLT_BLUETOOTH_HCI_H4, HCI_Hdr)
bind_layers( HCI_Command_Hdr, HCI_Cmd_Reset, opcode=0x0c03)
bind_layers( HCI_Command_Hdr, HCI_Cmd_Set_Event_Mask, opcode=0x0c01)
bind_layers( HCI_Command_Hdr, HCI_Cmd_Set_Event_Filter, opcode=0x0c05)
bind_layers( HCI_Command_Hdr, HCI_Cmd_Connect_Accept_Timeout, opcode=0x0c16)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Host_Supported, opcode=0x0c6d)
bind_layers( HCI_Command_Hdr, HCI_Cmd_Read_BD_Addr, opcode=0x1009)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Read_Buffer_Size, opcode=0x2002)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Random_Address, opcode=0x2005)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Advertising_Parameters, opcode=0x2006)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Advertising_Data, opcode=0x2008)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Advertise_Enable, opcode=0x200a)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Scan_Parameters, opcode=0x200b)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Scan_Enable, opcode=0x200c)
bind_layers( HCI_Command_Hdr, HCI_Cmd_Disconnect, opcode=0x406)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Create_Connection, opcode=0x200d)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Create_Connection_Cancel, opcode=0x200e)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Connection_Update, opcode=0x2013)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Start_Encryption_Request, opcode=0x2019)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Long_Term_Key_Request_Reply, opcode=0x201a)
bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Long_Term_Key_Request_Negative_Reply, opcode=0x201b)
bind_layers( HCI_Event_Hdr, HCI_Event_Disconnection_Complete, code=0x5)
bind_layers( HCI_Event_Hdr, HCI_Event_Encryption_Change, code=0x8)
bind_layers( HCI_Event_Hdr, HCI_Event_Command_Complete, code=0xe)
bind_layers( HCI_Event_Hdr, HCI_Event_Command_Status, code=0xf)
bind_layers( HCI_Event_Hdr, HCI_Event_Number_Of_Completed_Packets, code=0x13)
bind_layers( HCI_Event_Hdr, HCI_Event_LE_Meta, code=0x3e)
bind_layers( HCI_Event_Command_Complete, HCI_Cmd_Complete_Read_BD_Addr, opcode=0x1009)
bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Connection_Complete, event=1)
bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Advertising_Report, event=2)
bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Connection_Update_Complete, event=3)
bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Long_Term_Key_Request, event=5)
bind_layers(EIR_Hdr, EIR_Flags, type=0x01)
bind_layers(EIR_Hdr, EIR_IncompleteList16BitServiceUUIDs, type=0x02)
bind_layers(EIR_Hdr, EIR_CompleteList16BitServiceUUIDs, type=0x03)
bind_layers(EIR_Hdr, EIR_ShortenedLocalName, type=0x08)
bind_layers(EIR_Hdr, EIR_CompleteLocalName, type=0x09)
bind_layers(EIR_Hdr, EIR_TX_Power_Level, type=0x0a)
bind_layers(EIR_Hdr, EIR_Manufacturer_Specific_Data, type=0xff)
bind_layers(EIR_Hdr, EIR_Raw)
bind_layers( HCI_ACL_Hdr, L2CAP_Hdr, )
bind_layers( L2CAP_Hdr, L2CAP_CmdHdr, cid=1)
bind_layers( L2CAP_Hdr, L2CAP_CmdHdr, cid=5) #LE L2CAP Signaling Channel
bind_layers( L2CAP_CmdHdr, L2CAP_CmdRej, code=1)
bind_layers( L2CAP_CmdHdr, L2CAP_ConnReq, code=2)
bind_layers( L2CAP_CmdHdr, L2CAP_ConnResp, code=3)
bind_layers( L2CAP_CmdHdr, L2CAP_ConfReq, code=4)
bind_layers( L2CAP_CmdHdr, L2CAP_ConfResp, code=5)
bind_layers( L2CAP_CmdHdr, L2CAP_DisconnReq, code=6)
bind_layers( L2CAP_CmdHdr, L2CAP_DisconnResp, code=7)
bind_layers( L2CAP_CmdHdr, L2CAP_InfoReq, code=10)
bind_layers( L2CAP_CmdHdr, L2CAP_InfoResp, code=11)
bind_layers( L2CAP_CmdHdr, L2CAP_Connection_Parameter_Update_Request, code=18)
bind_layers( L2CAP_CmdHdr, L2CAP_Connection_Parameter_Update_Response, code=19)
bind_layers( L2CAP_Hdr, ATT_Hdr, cid=4)
bind_layers( ATT_Hdr, ATT_Error_Response, opcode=0x1)
bind_layers( ATT_Hdr, ATT_Exchange_MTU_Request, opcode=0x2)
bind_layers( ATT_Hdr, ATT_Exchange_MTU_Response, opcode=0x3)
bind_layers( ATT_Hdr, ATT_Find_Information_Request, opcode=0x4)
bind_layers( ATT_Hdr, ATT_Find_Information_Response, opcode=0x5)
bind_layers( ATT_Hdr, ATT_Find_By_Type_Value_Request, opcode=0x6)
bind_layers( ATT_Hdr, ATT_Find_By_Type_Value_Response, opcode=0x7)
bind_layers( ATT_Hdr, ATT_Read_By_Type_Request_128bit, opcode=0x8)
bind_layers( ATT_Hdr, ATT_Read_By_Type_Request, opcode=0x8)
bind_layers( ATT_Hdr, ATT_Read_By_Type_Response, opcode=0x9)
bind_layers( ATT_Hdr, ATT_Read_Request, opcode=0xa)
bind_layers( ATT_Hdr, ATT_Read_Response, opcode=0xb)
bind_layers( ATT_Hdr, ATT_Read_By_Group_Type_Request, opcode=0x10)
bind_layers( ATT_Hdr, ATT_Read_By_Group_Type_Response, opcode=0x11)
bind_layers( ATT_Hdr, ATT_Write_Request, opcode=0x12)
bind_layers( ATT_Hdr, ATT_Write_Response, opcode=0x13)
bind_layers( ATT_Hdr, ATT_Write_Command, opcode=0x52)
bind_layers( ATT_Hdr, ATT_Handle_Value_Notification, opcode=0x1b)
bind_layers( L2CAP_Hdr, SM_Hdr, cid=6)
bind_layers( SM_Hdr, SM_Pairing_Request, sm_command=1)
bind_layers( SM_Hdr, SM_Pairing_Response, sm_command=2)
bind_layers( SM_Hdr, SM_Confirm, sm_command=3)
bind_layers( SM_Hdr, SM_Random, sm_command=4)
bind_layers( SM_Hdr, SM_Failed, sm_command=5)
bind_layers( SM_Hdr, SM_Encryption_Information, sm_command=6)
bind_layers( SM_Hdr, SM_Master_Identification, sm_command=7)
bind_layers( SM_Hdr, SM_Identity_Information, sm_command=8)
bind_layers( SM_Hdr, SM_Identity_Address_Information, sm_command=9)
bind_layers( SM_Hdr, SM_Signing_Information, sm_command=0x0a)
class BluetoothSocketError(BaseException):
pass
class BluetoothCommandError(BaseException):
pass
class BluetoothL2CAPSocket(SuperSocket):
desc = "read/write packets on a connected L2CAP socket"
def __init__(self, bt_address):
if WINDOWS:
warning("Not available on Windows")
return
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW,
socket.BTPROTO_L2CAP)
s.connect((bt_address,0))
self.ins = self.outs = s
def recv(self, x=MTU):
return L2CAP_CmdHdr(self.ins.recv(x))
class BluetoothRFCommSocket(BluetoothL2CAPSocket):
"""read/write packets on a connected RFCOMM socket"""
def __init__(self, bt_address, port=0):
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW,
socket.BTPROTO_RFCOMM)
s.connect((bt_address,port))
self.ins = self.outs = s
class BluetoothHCISocket(SuperSocket):
desc = "read/write on a BlueTooth HCI socket"
def __init__(self, iface=0x10000, type=None):
if WINDOWS:
warning("Not available on Windows")
return
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI)
s.setsockopt(socket.SOL_HCI, socket.HCI_DATA_DIR,1)
s.setsockopt(socket.SOL_HCI, socket.HCI_TIME_STAMP,1)
s.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, struct.pack("IIIh2x", 0xffffffff,0xffffffff,0xffffffff,0)) #type mask, event mask, event mask, opcode
s.bind((iface,))
self.ins = self.outs = s
# s.connect((peer,0))
def recv(self, x):
return HCI_Hdr(self.ins.recv(x))
class sockaddr_hci(Structure):
_fields_ = [
("sin_family", c_ushort),
("hci_dev", c_ushort),
("hci_channel", c_ushort),
]
class BluetoothUserSocket(SuperSocket):
desc = "read/write H4 over a Bluetooth user channel"
def __init__(self, adapter_index=0):
if WINDOWS:
warning("Not available on Windows")
return
# s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI)
# s.bind((0,1))
# yeah, if only
# thanks to Python's weak ass socket and bind implementations, we have
# to call down into libc with ctypes
sockaddr_hcip = POINTER(sockaddr_hci)
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
socket_c = libc.socket
socket_c.argtypes = (c_int, c_int, c_int);
socket_c.restype = c_int
bind = libc.bind
bind.argtypes = (c_int, POINTER(sockaddr_hci), c_int)
bind.restype = c_int
########
## actual code
s = socket_c(31, 3, 1) # (AF_BLUETOOTH, SOCK_RAW, HCI_CHANNEL_USER)
if s < 0:
raise BluetoothSocketError("Unable to open PF_BLUETOOTH socket")
sa = sockaddr_hci()
sa.sin_family = 31 # AF_BLUETOOTH
sa.hci_dev = adapter_index # adapter index
sa.hci_channel = 1 # HCI_USER_CHANNEL
r = bind(s, sockaddr_hcip(sa), sizeof(sa))
if r != 0:
raise BluetoothSocketError("Unable to bind")
self.ins = self.outs = socket.fromfd(s, 31, 3, 1)
def send_command(self, cmd):
opcode = cmd.opcode
self.send(cmd)
while True:
r = self.recv()
if r.type == 0x04 and r.code == 0xe and r.opcode == opcode:
if r.status != 0:
raise BluetoothCommandError("Command %x failed with %x" % (opcode, r.status))
return r
def recv(self, x=512):
return HCI_Hdr(self.ins.recv(x))
def readable(self, timeout=0):
(ins, outs, foo) = select([self.ins], [], [], timeout)
return len(ins) > 0
def flush(self):
while self.readable():
self.recv()
conf.BTsocket = BluetoothRFCommSocket
## Bluetooth
@conf.commands.register
def srbt(bt_address, pkts, inter=0.1, *args, **kargs):
"""send and receive using a bluetooth socket"""
if "port" in kargs.keys():
s = conf.BTsocket(bt_address=bt_address, port=kargs.pop("port"))
else:
s = conf.BTsocket(bt_address=bt_address)
a,b = sndrcv(s,pkts,inter=inter,*args,**kargs)
s.close()
return a,b
@conf.commands.register
def srbt1(bt_address, pkts, *args, **kargs):
"""send and receive 1 packet using a bluetooth socket"""
a,b = srbt(bt_address, pkts, *args, **kargs)
if len(a) > 0:
return a[0][1]