| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## Copyright (C) Mike Ryan <mikeryan@lacklustre.net> |
| ## This program is published under a GPLv2 license |
| |
| """ |
| Bluetooth layers, sockets and send/receive functions. |
| """ |
| |
| import socket,struct,array |
| from ctypes import * |
| from select import select |
| |
| from scapy.config import conf |
| from scapy.data import DLT_BLUETOOTH_HCI_H4 |
| from scapy.packet import * |
| from scapy.fields import * |
| from scapy.supersocket import SuperSocket |
| from scapy.sendrecv import sndrcv |
| from scapy.data import MTU |
| from scapy.consts import WINDOWS |
| from scapy.error import warning, log_loading |
| |
| |
| ########## |
| # Fields # |
| ########## |
| |
| class XLEShortField(LEShortField): |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt, x)) |
| |
| class XLELongField(LEShortField): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "<Q") |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt, x)) |
| |
| |
| class LEMACField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "6s") |
| def i2m(self, pkt, x): |
| if x is None: |
| return b"\0\0\0\0\0\0" |
| return mac2str(x)[::-1] |
| def m2i(self, pkt, x): |
| return str2mac(x[::-1]) |
| def any2i(self, pkt, x): |
| if isinstance(x, str) and len(x) is 6: |
| x = self.m2i(pkt, x) |
| return x |
| def i2repr(self, pkt, x): |
| x = self.i2h(pkt, x) |
| if self in conf.resolve: |
| x = conf.manufdb._resolve_MAC(x) |
| return x |
| def randval(self): |
| return RandMAC() |
| |
| _bluetooth_packet_types = { |
| 0: "Acknowledgement", |
| 1: "Command", |
| 2: "ACL Data", |
| 3: "Synchronous", |
| 4: "Event", |
| 5: "Reserve", |
| 14: "Vendor", |
| 15: "Link Control" |
| } |
| |
| class HCI_Hdr(Packet): |
| name = "HCI header" |
| fields_desc = [ ByteEnumField("type", 2, _bluetooth_packet_types) ] |
| |
| def mysummary(self): |
| return self.sprintf("HCI %type%") |
| |
| class HCI_ACL_Hdr(Packet): |
| name = "HCI ACL header" |
| fields_desc = [ BitField("handle",0,12), # TODO: Create and use LEBitField |
| BitField("PB",0,2), # They are recieved as a **combined** LE Short |
| BitField("BC",0,2), # Handle is 12 bits, eacg flag is 2 bits. |
| LEShortField("len",None), ] |
| |
| def pre_dissect(self, s): |
| # Recieve data as LE stored as |
| # .... 1111 0100 1100 = handle |
| # 1010 .... .... .... = flags |
| # And turn it into |
| # 1111 0100 1100 .... = handle |
| # .... .... .... 1010 = flags |
| hf = socket.ntohs(struct.unpack("!H", s[:2])[0]) |
| r = ((hf & 0x0fff) << 4) + (hf >> 12) |
| return struct.pack("!H", r) + s[2:] |
| |
| def post_dissect(self, s): |
| self.raw_packet_cache = None # Reset packet to allow post_build |
| return s |
| |
| def post_build(self, p, pay): |
| p += pay |
| if self.len is None: |
| p = p[:2] + struct.pack("<H", len(pay)) + p[4:] |
| # Reverse, opposite of pre_dissect |
| hf = struct.unpack("!H", p[:2])[0] |
| r = socket.ntohs(((hf & 0xf) << 12) + (hf >> 4)) |
| return struct.pack("!H", r) + p[2:] |
| |
| |
| class L2CAP_Hdr(Packet): |
| name = "L2CAP header" |
| fields_desc = [ LEShortField("len",None), |
| LEShortEnumField("cid",0,{1:"control", 4:"attribute"}),] |
| |
| def post_build(self, p, pay): |
| p += pay |
| if self.len is None: |
| p = struct.pack("<H", len(pay)) + p[2:] |
| return p |
| |
| |
| |
| class L2CAP_CmdHdr(Packet): |
| name = "L2CAP command header" |
| fields_desc = [ |
| ByteEnumField("code",8,{1:"rej",2:"conn_req",3:"conn_resp", |
| 4:"conf_req",5:"conf_resp",6:"disconn_req", |
| 7:"disconn_resp",8:"echo_req",9:"echo_resp", |
| 10:"info_req",11:"info_resp", 18:"conn_param_update_req", |
| 19:"conn_param_update_resp"}), |
| ByteField("id",0), |
| LEShortField("len",None) ] |
| def post_build(self, p, pay): |
| p += pay |
| if self.len is None: |
| p = p[:2] + struct.pack("<H", len(pay)) + p[4:] |
| return p |
| def answers(self, other): |
| if other.id == self.id: |
| if self.code == 1: |
| return 1 |
| if other.code in [2,4,6,8,10,18] and self.code == other.code+1: |
| if other.code == 8: |
| return 1 |
| return self.payload.answers(other.payload) |
| return 0 |
| |
| class L2CAP_ConnReq(Packet): |
| name = "L2CAP Conn Req" |
| fields_desc = [ LEShortEnumField("psm",0,{1:"SDP",3:"RFCOMM",5:"telephony control"}), |
| LEShortField("scid",0), |
| ] |
| |
| class L2CAP_ConnResp(Packet): |
| name = "L2CAP Conn Resp" |
| fields_desc = [ LEShortField("dcid",0), |
| LEShortField("scid",0), |
| LEShortEnumField("result",0,["success", "pend", "cr_bad_psm", "cr_sec_block", "cr_no_mem", "reserved","cr_inval_scid", "cr_scid_in_use"]), |
| LEShortEnumField("status",0,["no_info", "authen_pend", "author_pend", "reserved"]), |
| ] |
| def answers(self, other): |
| return isinstance(other, L2CAP_ConnReq) and self.dcid == other.scid |
| |
| class L2CAP_CmdRej(Packet): |
| name = "L2CAP Command Rej" |
| fields_desc = [ LEShortField("reason",0), |
| ] |
| |
| |
| class L2CAP_ConfReq(Packet): |
| name = "L2CAP Conf Req" |
| fields_desc = [ LEShortField("dcid",0), |
| LEShortField("flags",0), |
| ] |
| |
| class L2CAP_ConfResp(Packet): |
| name = "L2CAP Conf Resp" |
| fields_desc = [ LEShortField("scid",0), |
| LEShortField("flags",0), |
| LEShortEnumField("result",0,["success","unaccept","reject","unknown"]), |
| ] |
| def answers(self, other): |
| return isinstance(other, L2CAP_ConfReq) and self.scid == other.dcid |
| |
| |
| class L2CAP_DisconnReq(Packet): |
| name = "L2CAP Disconn Req" |
| fields_desc = [ LEShortField("dcid",0), |
| LEShortField("scid",0), ] |
| |
| class L2CAP_DisconnResp(Packet): |
| name = "L2CAP Disconn Resp" |
| fields_desc = [ LEShortField("dcid",0), |
| LEShortField("scid",0), ] |
| def answers(self, other): |
| return self.scid == other.scid |
| |
| |
| |
| class L2CAP_InfoReq(Packet): |
| name = "L2CAP Info Req" |
| fields_desc = [ LEShortEnumField("type",0,{1:"CL_MTU",2:"FEAT_MASK"}), |
| StrField("data","") |
| ] |
| |
| |
| class L2CAP_InfoResp(Packet): |
| name = "L2CAP Info Resp" |
| fields_desc = [ LEShortField("type",0), |
| LEShortEnumField("result",0,["success","not_supp"]), |
| StrField("data",""), ] |
| def answers(self, other): |
| return self.type == other.type |
| |
| |
| class L2CAP_Connection_Parameter_Update_Request(Packet): |
| name = "L2CAP Connection Parameter Update Request" |
| fields_desc = [ LEShortField("min_interval", 0), |
| LEShortField("max_interval", 0), |
| LEShortField("slave_latency", 0), |
| LEShortField("timeout_mult", 0), ] |
| |
| |
| class L2CAP_Connection_Parameter_Update_Response(Packet): |
| name = "L2CAP Connection Parameter Update Response" |
| fields_desc = [ LEShortField("move_result", 0), ] |
| |
| |
| class ATT_Hdr(Packet): |
| name = "ATT header" |
| fields_desc = [ XByteField("opcode", None), ] |
| |
| |
| class ATT_Error_Response(Packet): |
| name = "Error Response" |
| fields_desc = [ XByteField("request", 0), |
| LEShortField("handle", 0), |
| XByteField("ecode", 0), ] |
| |
| class ATT_Exchange_MTU_Request(Packet): |
| name = "Exchange MTU Request" |
| fields_desc = [ LEShortField("mtu", 0), ] |
| |
| class ATT_Exchange_MTU_Response(Packet): |
| name = "Exchange MTU Response" |
| fields_desc = [ LEShortField("mtu", 0), ] |
| |
| class ATT_Find_Information_Request(Packet): |
| name = "Find Information Request" |
| fields_desc = [ XLEShortField("start", 0x0000), |
| XLEShortField("end", 0xffff), ] |
| |
| class ATT_Find_Information_Response(Packet): |
| name = "Find Information Reponse" |
| fields_desc = [ XByteField("format", 1), |
| StrField("data", "") ] |
| |
| class ATT_Find_By_Type_Value_Request(Packet): |
| name = "Find By Type Value Request" |
| fields_desc = [ XLEShortField("start", 0x0001), |
| XLEShortField("end", 0xffff), |
| XLEShortField("uuid", None), |
| StrField("data", ""), ] |
| |
| class ATT_Find_By_Type_Value_Response(Packet): |
| name = "Find By Type Value Response" |
| fields_desc = [ StrField("handles", ""), ] |
| |
| class ATT_Read_By_Type_Request_128bit(Packet): |
| name = "Read By Type Request" |
| fields_desc = [ XLEShortField("start", 0x0001), |
| XLEShortField("end", 0xffff), |
| XLELongField("uuid1", None), |
| XLELongField("uuid2", None)] |
| @classmethod |
| def dispatch_hook(cls, _pkt=None, *args, **kargs): |
| if _pkt and len(_pkt) == 6: |
| return ATT_Read_By_Type_Request |
| return ATT_Read_By_Type_Request_128bit |
| |
| class ATT_Read_By_Type_Request(Packet): |
| name = "Read By Type Request" |
| fields_desc = [ XLEShortField("start", 0x0001), |
| XLEShortField("end", 0xffff), |
| XLEShortField("uuid", None)] |
| |
| class ATT_Read_By_Type_Response(Packet): |
| name = "Read By Type Response" |
| # fields_desc = [ FieldLenField("len", None, length_of="data", fmt="B"), |
| # StrLenField("data", "", length_from=lambda pkt:pkt.len), ] |
| fields_desc = [ StrField("data", "") ] |
| |
| class ATT_Read_Request(Packet): |
| name = "Read Request" |
| fields_desc = [ XLEShortField("gatt_handle", 0), ] |
| |
| class ATT_Read_Response(Packet): |
| name = "Read Response" |
| fields_desc = [ StrField("value", ""), ] |
| |
| class ATT_Read_By_Group_Type_Request(Packet): |
| name = "Read By Group Type Request" |
| fields_desc = [ XLEShortField("start", 0), |
| XLEShortField("end", 0xffff), |
| XLEShortField("uuid", 0), ] |
| |
| class ATT_Read_By_Group_Type_Response(Packet): |
| name = "Read By Group Type Response" |
| fields_desc = [ XByteField("length", 0), |
| StrField("data", ""), ] |
| |
| class ATT_Write_Request(Packet): |
| name = "Write Request" |
| fields_desc = [ XLEShortField("gatt_handle", 0), |
| StrField("data", ""), ] |
| |
| class ATT_Write_Command(Packet): |
| name = "Write Request" |
| fields_desc = [ XLEShortField("gatt_handle", 0), |
| StrField("data", ""), ] |
| |
| class ATT_Write_Response(Packet): |
| name = "Write Response" |
| fields_desc = [ ] |
| |
| class ATT_Handle_Value_Notification(Packet): |
| name = "Handle Value Notification" |
| fields_desc = [ XLEShortField("handle", 0), |
| StrField("value", ""), ] |
| |
| |
| class SM_Hdr(Packet): |
| name = "SM header" |
| fields_desc = [ ByteField("sm_command", None) ] |
| |
| |
| class SM_Pairing_Request(Packet): |
| name = "Pairing Request" |
| fields_desc = [ ByteEnumField("iocap", 3, {0:"DisplayOnly", 1:"DisplayYesNo", 2:"KeyboardOnly", 3:"NoInputNoOutput", 4:"KeyboardDisplay"}), |
| ByteEnumField("oob", 0, {0:"Not Present", 1:"Present (from remote device)"}), |
| BitField("authentication", 0, 8), |
| ByteField("max_key_size", 16), |
| ByteField("initiator_key_distribution", 0), |
| ByteField("responder_key_distribution", 0), ] |
| |
| class SM_Pairing_Response(Packet): |
| name = "Pairing Response" |
| fields_desc = [ ByteEnumField("iocap", 3, {0:"DisplayOnly", 1:"DisplayYesNo", 2:"KeyboardOnly", 3:"NoInputNoOutput", 4:"KeyboardDisplay"}), |
| ByteEnumField("oob", 0, {0:"Not Present", 1:"Present (from remote device)"}), |
| BitField("authentication", 0, 8), |
| ByteField("max_key_size", 16), |
| ByteField("initiator_key_distribution", 0), |
| ByteField("responder_key_distribution", 0), ] |
| |
| |
| class SM_Confirm(Packet): |
| name = "Pairing Confirm" |
| fields_desc = [ StrFixedLenField("confirm", b'\x00' * 16, 16) ] |
| |
| class SM_Random(Packet): |
| name = "Pairing Random" |
| fields_desc = [ StrFixedLenField("random", b'\x00' * 16, 16) ] |
| |
| class SM_Failed(Packet): |
| name = "Pairing Failed" |
| fields_desc = [ XByteField("reason", 0) ] |
| |
| class SM_Encryption_Information(Packet): |
| name = "Encryption Information" |
| fields_desc = [ StrFixedLenField("ltk", b"\x00" * 16, 16), ] |
| |
| class SM_Master_Identification(Packet): |
| name = "Master Identification" |
| fields_desc = [ XLEShortField("ediv", 0), |
| StrFixedLenField("rand", b'\x00' * 8, 8), ] |
| |
| class SM_Identity_Information(Packet): |
| name = "Identity Information" |
| fields_desc = [ StrFixedLenField("irk", b'\x00' * 16, 16), ] |
| |
| class SM_Identity_Address_Information(Packet): |
| name = "Identity Address Information" |
| fields_desc = [ ByteEnumField("atype", 0, {0:"public"}), |
| LEMACField("address", None), ] |
| |
| class SM_Signing_Information(Packet): |
| name = "Signing Information" |
| fields_desc = [ StrFixedLenField("csrk", b'\x00' * 16, 16), ] |
| |
| |
| class EIR_Hdr(Packet): |
| name = "EIR Header" |
| fields_desc = [ |
| LenField("len", None, fmt="B", adjust=lambda x: x+1), # Add bytes mark |
| ByteEnumField("type", 0, { |
| 0x01: "flags", |
| 0x02: "incomplete_list_16_bit_svc_uuids", |
| 0x03: "complete_list_16_bit_svc_uuids", |
| 0x04: "incomplete_list_32_bit_svc_uuids", |
| 0x05: "complete_list_32_bit_svc_uuids", |
| 0x06: "incomplete_list_128_bit_svc_uuids", |
| 0x07: "complete_list_128_bit_svc_uuids", |
| 0x08: "shortened_local_name", |
| 0x09: "complete_local_name", |
| 0x0a: "tx_power_level", |
| 0x0d: "class_of_device", |
| 0x0e: "simple_pairing_hash", |
| 0x0f: "simple_pairing_rand", |
| 0x10: "sec_mgr_tk", |
| 0x11: "sec_mgr_oob_flags", |
| 0x12: "slave_conn_intvl_range", |
| 0x17: "pub_target_addr", |
| 0x18: "rand_target_addr", |
| 0x19: "appearance", |
| 0x1a: "adv_intvl", |
| 0x1b: "le_addr", |
| 0x1c: "le_role", |
| 0x14: "list_16_bit_svc_sollication_uuids", |
| 0x1f: "list_32_bit_svc_sollication_uuids", |
| 0x15: "list_128_bit_svc_sollication_uuids", |
| 0x16: "svc_data_16_bit_uuid", |
| 0x20: "svc_data_32_bit_uuid", |
| 0x21: "svc_data_128_bit_uuid", |
| 0x22: "sec_conn_confirm", |
| 0x22: "sec_conn_rand", |
| 0x24: "uri", |
| 0xff: "mfg_specific_data", |
| }), |
| ] |
| |
| def mysummary(self): |
| return self.sprintf("EIR %type%") |
| |
| class EIR_Element(Packet): |
| name = "EIR Element" |
| |
| def extract_padding(self, s): |
| # Needed to end each EIR_Element packet and make PacketListField work. |
| return '', s |
| |
| @staticmethod |
| def length_from(pkt): |
| if not pkt.underlayer: |
| warning("Missing an upper-layer") |
| return 0 |
| # 'type' byte is included in the length, so substract 1: |
| return pkt.underlayer.len - 1 |
| |
| class EIR_Raw(EIR_Element): |
| name = "EIR Raw" |
| fields_desc = [ |
| StrLenField("data", "", length_from=EIR_Element.length_from) |
| ] |
| |
| class EIR_Flags(EIR_Element): |
| name = "Flags" |
| fields_desc = [ |
| FlagsField("flags", 0x2, 8, |
| ["limited_disc_mode", "general_disc_mode", |
| "br_edr_not_supported", "simul_le_br_edr_ctrl", |
| "simul_le_br_edr_host"] + 3*["reserved"]) |
| ] |
| |
| class EIR_CompleteList16BitServiceUUIDs(EIR_Element): |
| name = "Complete list of 16-bit service UUIDs" |
| fields_desc = [ |
| FieldListField("svc_uuids", None, XLEShortField("uuid", 0), |
| length_from=EIR_Element.length_from) |
| ] |
| |
| class EIR_IncompleteList16BitServiceUUIDs(EIR_CompleteList16BitServiceUUIDs): |
| name = "Incomplete list of 16-bit service UUIDs" |
| |
| class EIR_CompleteLocalName(EIR_Element): |
| name = "Complete Local Name" |
| fields_desc = [ |
| StrLenField("local_name", "", length_from=EIR_Element.length_from) |
| ] |
| |
| class EIR_ShortenedLocalName(EIR_CompleteLocalName): |
| name = "Shortened Local Name" |
| |
| class EIR_TX_Power_Level(EIR_Element): |
| name = "TX Power Level" |
| fields_desc = [SignedByteField("level", 0)] |
| |
| class EIR_Manufacturer_Specific_Data(EIR_Element): |
| name = "EIR Manufacturer Specific Data" |
| fields_desc = [ |
| XLEShortField("company_id", 0), |
| StrLenField("data", "", |
| length_from=lambda pkt: EIR_Element.length_from(pkt) - 2) |
| ] |
| |
| |
| class HCI_Command_Hdr(Packet): |
| name = "HCI Command header" |
| fields_desc = [ XLEShortField("opcode", 0), |
| ByteField("len", None), ] |
| |
| def post_build(self, p, pay): |
| p += pay |
| if self.len is None: |
| p = p[:2] + struct.pack("B", len(pay)) + p[3:] |
| return p |
| |
| class HCI_Cmd_Reset(Packet): |
| name = "Reset" |
| |
| class HCI_Cmd_Set_Event_Filter(Packet): |
| name = "Set Event Filter" |
| fields_desc = [ ByteEnumField("type", 0, {0:"clear"}), ] |
| |
| class HCI_Cmd_Connect_Accept_Timeout(Packet): |
| name = "Connection Attempt Timeout" |
| fields_desc = [ LEShortField("timeout", 32000) ] # 32000 slots is 20000 msec |
| |
| class HCI_Cmd_LE_Host_Supported(Packet): |
| name = "LE Host Supported" |
| fields_desc = [ ByteField("supported", 1), |
| ByteField("simultaneous", 1), ] |
| |
| class HCI_Cmd_Set_Event_Mask(Packet): |
| name = "Set Event Mask" |
| fields_desc = [ StrFixedLenField("mask", b"\xff\xff\xfb\xff\x07\xf8\xbf\x3d", 8) ] |
| |
| class HCI_Cmd_Read_BD_Addr(Packet): |
| name = "Read BD Addr" |
| |
| |
| class HCI_Cmd_LE_Set_Scan_Parameters(Packet): |
| name = "LE Set Scan Parameters" |
| fields_desc = [ ByteEnumField("type", 1, {1:"active"}), |
| XLEShortField("interval", 16), |
| XLEShortField("window", 16), |
| ByteEnumField("atype", 0, {0:"public"}), |
| ByteEnumField("policy", 0, {0:"all"}), ] |
| |
| class HCI_Cmd_LE_Set_Scan_Enable(Packet): |
| name = "LE Set Scan Enable" |
| fields_desc = [ ByteField("enable", 1), |
| ByteField("filter_dups", 1), ] |
| |
| class HCI_Cmd_Disconnect(Packet): |
| name = "Disconnect" |
| fields_desc = [ XLEShortField("handle", 0), |
| ByteField("reason", 0x13), ] |
| |
| class HCI_Cmd_LE_Create_Connection(Packet): |
| name = "LE Create Connection" |
| fields_desc = [ LEShortField("interval", 96), |
| LEShortField("window", 48), |
| ByteEnumField("filter", 0, {0:"address"}), |
| ByteEnumField("patype", 0, {0:"public", 1:"random"}), |
| LEMACField("paddr", None), |
| ByteEnumField("atype", 0, {0:"public", 1:"random"}), |
| LEShortField("min_interval", 40), |
| LEShortField("max_interval", 56), |
| LEShortField("latency", 0), |
| LEShortField("timeout", 42), |
| LEShortField("min_ce", 0), |
| LEShortField("max_ce", 0), ] |
| |
| class HCI_Cmd_LE_Create_Connection_Cancel(Packet): |
| name = "LE Create Connection Cancel" |
| |
| class HCI_Cmd_LE_Connection_Update(Packet): |
| name = "LE Connection Update" |
| fields_desc = [ XLEShortField("handle", 0), |
| XLEShortField("min_interval", 0), |
| XLEShortField("max_interval", 0), |
| XLEShortField("latency", 0), |
| XLEShortField("timeout", 0), |
| LEShortField("min_ce", 0), |
| LEShortField("max_ce", 0xffff), ] |
| |
| class HCI_Cmd_LE_Read_Buffer_Size(Packet): |
| name = "LE Read Buffer Size" |
| |
| class HCI_Cmd_LE_Set_Random_Address(Packet): |
| name = "LE Set Random Address" |
| fields_desc = [ LEMACField("address", None) ] |
| |
| class HCI_Cmd_LE_Set_Advertising_Parameters(Packet): |
| name = "LE Set Advertising Parameters" |
| fields_desc = [ LEShortField("interval_min", 0x0800), |
| LEShortField("interval_max", 0x0800), |
| ByteEnumField("adv_type", 0, {0:"ADV_IND", 1:"ADV_DIRECT_IND", 2:"ADV_SCAN_IND", 3:"ADV_NONCONN_IND", 4:"ADV_DIRECT_IND_LOW"}), |
| ByteEnumField("oatype", 0, {0:"public", 1:"random"}), |
| ByteEnumField("datype", 0, {0:"public", 1:"random"}), |
| LEMACField("daddr", None), |
| ByteField("channel_map", 7), |
| ByteEnumField("filter_policy", 0, {0:"all:all", 1:"connect:all scan:whitelist", 2:"connect:whitelist scan:all", 3:"all:whitelist"}), ] |
| |
| class HCI_Cmd_LE_Set_Advertising_Data(Packet): |
| name = "LE Set Advertising Data" |
| fields_desc = [ FieldLenField("len", None, length_of="data", fmt="B"), |
| StrLenField("data", "", length_from=lambda pkt:pkt.len), ] |
| |
| class HCI_Cmd_LE_Set_Advertise_Enable(Packet): |
| name = "LE Set Advertise Enable" |
| fields_desc = [ ByteField("enable", 0) ] |
| |
| class HCI_Cmd_LE_Start_Encryption_Request(Packet): |
| name = "LE Start Encryption" |
| fields_desc = [ LEShortField("handle", 0), |
| StrFixedLenField("rand", None, 8), |
| XLEShortField("ediv", 0), |
| StrFixedLenField("ltk", b'\x00' * 16, 16), ] |
| |
| class HCI_Cmd_LE_Long_Term_Key_Request_Negative_Reply(Packet): |
| name = "LE Long Term Key Request Negative Reply" |
| fields_desc = [ LEShortField("handle", 0), ] |
| |
| class HCI_Cmd_LE_Long_Term_Key_Request_Reply(Packet): |
| name = "LE Long Term Key Request Reply" |
| fields_desc = [ LEShortField("handle", 0), |
| StrFixedLenField("ltk", b'\x00' * 16, 16), ] |
| |
| class HCI_Event_Hdr(Packet): |
| name = "HCI Event header" |
| fields_desc = [ XByteField("code", 0), |
| ByteField("length", 0), ] |
| |
| |
| class HCI_Event_Disconnection_Complete(Packet): |
| name = "Disconnection Complete" |
| fields_desc = [ ByteEnumField("status", 0, {0:"success"}), |
| LEShortField("handle", 0), |
| XByteField("reason", 0), ] |
| |
| |
| class HCI_Event_Encryption_Change(Packet): |
| name = "Encryption Change" |
| fields_desc = [ ByteEnumField("status", 0, {0:"change has occurred"}), |
| LEShortField("handle", 0), |
| ByteEnumField("enabled", 0, {0:"OFF", 1:"ON (LE)", 2:"ON (BR/EDR)"}), ] |
| |
| class HCI_Event_Command_Complete(Packet): |
| name = "Command Complete" |
| fields_desc = [ ByteField("number", 0), |
| XLEShortField("opcode", 0), |
| ByteEnumField("status", 0, {0:"success"}), ] |
| |
| |
| class HCI_Cmd_Complete_Read_BD_Addr(Packet): |
| name = "Read BD Addr" |
| fields_desc = [ LEMACField("addr", None), ] |
| |
| |
| |
| class HCI_Event_Command_Status(Packet): |
| name = "Command Status" |
| fields_desc = [ ByteEnumField("status", 0, {0:"pending"}), |
| ByteField("number", 0), |
| XLEShortField("opcode", None), ] |
| |
| class HCI_Event_Number_Of_Completed_Packets(Packet): |
| name = "Number Of Completed Packets" |
| fields_desc = [ ByteField("number", 0) ] |
| |
| class HCI_Event_LE_Meta(Packet): |
| name = "LE Meta" |
| fields_desc = [ ByteEnumField("event", 0, {2:"advertising_report"}) ] |
| |
| class HCI_LE_Meta_Connection_Complete(Packet): |
| name = "Connection Complete" |
| fields_desc = [ ByteEnumField("status", 0, {0:"success"}), |
| LEShortField("handle", 0), |
| ByteEnumField("role", 0, {0:"master"}), |
| ByteEnumField("patype", 0, {0:"public", 1:"random"}), |
| LEMACField("paddr", None), |
| LEShortField("interval", 54), |
| LEShortField("latency", 0), |
| LEShortField("supervision", 42), |
| XByteField("clock_latency", 5), ] |
| |
| class HCI_LE_Meta_Connection_Update_Complete(Packet): |
| name = "Connection Update Complete" |
| fields_desc = [ ByteEnumField("status", 0, {0:"success"}), |
| LEShortField("handle", 0), |
| LEShortField("interval", 54), |
| LEShortField("latency", 0), |
| LEShortField("timeout", 42), ] |
| |
| class HCI_LE_Meta_Advertising_Report(Packet): |
| name = "Advertising Report" |
| fields_desc = [ ByteField("number", 0), |
| ByteEnumField("type", 0, {0:"conn_und", 4:"scan_rsp"}), |
| ByteEnumField("atype", 0, {0:"public", 1:"random"}), |
| LEMACField("addr", None), |
| FieldLenField("len", None, length_of="data", fmt="B"), |
| PacketListField("data", [], EIR_Hdr, |
| length_from=lambda pkt:pkt.len), |
| SignedByteField("rssi", 0)] |
| |
| |
| class HCI_LE_Meta_Long_Term_Key_Request(Packet): |
| name = "Long Term Key Request" |
| fields_desc = [ LEShortField("handle", 0), |
| StrFixedLenField("rand", None, 8), |
| XLEShortField("ediv", 0), ] |
| |
| |
| bind_layers( HCI_Hdr, HCI_Command_Hdr, type=1) |
| bind_layers( HCI_Hdr, HCI_ACL_Hdr, type=2) |
| bind_layers( HCI_Hdr, HCI_Event_Hdr, type=4) |
| bind_layers( HCI_Hdr, conf.raw_layer, ) |
| |
| conf.l2types.register(DLT_BLUETOOTH_HCI_H4, HCI_Hdr) |
| |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_Reset, opcode=0x0c03) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_Set_Event_Mask, opcode=0x0c01) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_Set_Event_Filter, opcode=0x0c05) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_Connect_Accept_Timeout, opcode=0x0c16) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Host_Supported, opcode=0x0c6d) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_Read_BD_Addr, opcode=0x1009) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Read_Buffer_Size, opcode=0x2002) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Random_Address, opcode=0x2005) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Advertising_Parameters, opcode=0x2006) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Advertising_Data, opcode=0x2008) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Advertise_Enable, opcode=0x200a) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Scan_Parameters, opcode=0x200b) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Set_Scan_Enable, opcode=0x200c) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_Disconnect, opcode=0x406) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Create_Connection, opcode=0x200d) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Create_Connection_Cancel, opcode=0x200e) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Connection_Update, opcode=0x2013) |
| |
| |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Start_Encryption_Request, opcode=0x2019) |
| |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Long_Term_Key_Request_Reply, opcode=0x201a) |
| bind_layers( HCI_Command_Hdr, HCI_Cmd_LE_Long_Term_Key_Request_Negative_Reply, opcode=0x201b) |
| |
| bind_layers( HCI_Event_Hdr, HCI_Event_Disconnection_Complete, code=0x5) |
| bind_layers( HCI_Event_Hdr, HCI_Event_Encryption_Change, code=0x8) |
| bind_layers( HCI_Event_Hdr, HCI_Event_Command_Complete, code=0xe) |
| bind_layers( HCI_Event_Hdr, HCI_Event_Command_Status, code=0xf) |
| bind_layers( HCI_Event_Hdr, HCI_Event_Number_Of_Completed_Packets, code=0x13) |
| bind_layers( HCI_Event_Hdr, HCI_Event_LE_Meta, code=0x3e) |
| |
| bind_layers( HCI_Event_Command_Complete, HCI_Cmd_Complete_Read_BD_Addr, opcode=0x1009) |
| |
| bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Connection_Complete, event=1) |
| bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Advertising_Report, event=2) |
| bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Connection_Update_Complete, event=3) |
| bind_layers( HCI_Event_LE_Meta, HCI_LE_Meta_Long_Term_Key_Request, event=5) |
| |
| bind_layers(EIR_Hdr, EIR_Flags, type=0x01) |
| bind_layers(EIR_Hdr, EIR_IncompleteList16BitServiceUUIDs, type=0x02) |
| bind_layers(EIR_Hdr, EIR_CompleteList16BitServiceUUIDs, type=0x03) |
| bind_layers(EIR_Hdr, EIR_ShortenedLocalName, type=0x08) |
| bind_layers(EIR_Hdr, EIR_CompleteLocalName, type=0x09) |
| bind_layers(EIR_Hdr, EIR_TX_Power_Level, type=0x0a) |
| bind_layers(EIR_Hdr, EIR_Manufacturer_Specific_Data, type=0xff) |
| bind_layers(EIR_Hdr, EIR_Raw) |
| |
| bind_layers( HCI_ACL_Hdr, L2CAP_Hdr, ) |
| bind_layers( L2CAP_Hdr, L2CAP_CmdHdr, cid=1) |
| bind_layers( L2CAP_Hdr, L2CAP_CmdHdr, cid=5) #LE L2CAP Signaling Channel |
| bind_layers( L2CAP_CmdHdr, L2CAP_CmdRej, code=1) |
| bind_layers( L2CAP_CmdHdr, L2CAP_ConnReq, code=2) |
| bind_layers( L2CAP_CmdHdr, L2CAP_ConnResp, code=3) |
| bind_layers( L2CAP_CmdHdr, L2CAP_ConfReq, code=4) |
| bind_layers( L2CAP_CmdHdr, L2CAP_ConfResp, code=5) |
| bind_layers( L2CAP_CmdHdr, L2CAP_DisconnReq, code=6) |
| bind_layers( L2CAP_CmdHdr, L2CAP_DisconnResp, code=7) |
| bind_layers( L2CAP_CmdHdr, L2CAP_InfoReq, code=10) |
| bind_layers( L2CAP_CmdHdr, L2CAP_InfoResp, code=11) |
| bind_layers( L2CAP_CmdHdr, L2CAP_Connection_Parameter_Update_Request, code=18) |
| bind_layers( L2CAP_CmdHdr, L2CAP_Connection_Parameter_Update_Response, code=19) |
| bind_layers( L2CAP_Hdr, ATT_Hdr, cid=4) |
| bind_layers( ATT_Hdr, ATT_Error_Response, opcode=0x1) |
| bind_layers( ATT_Hdr, ATT_Exchange_MTU_Request, opcode=0x2) |
| bind_layers( ATT_Hdr, ATT_Exchange_MTU_Response, opcode=0x3) |
| bind_layers( ATT_Hdr, ATT_Find_Information_Request, opcode=0x4) |
| bind_layers( ATT_Hdr, ATT_Find_Information_Response, opcode=0x5) |
| bind_layers( ATT_Hdr, ATT_Find_By_Type_Value_Request, opcode=0x6) |
| bind_layers( ATT_Hdr, ATT_Find_By_Type_Value_Response, opcode=0x7) |
| bind_layers( ATT_Hdr, ATT_Read_By_Type_Request_128bit, opcode=0x8) |
| bind_layers( ATT_Hdr, ATT_Read_By_Type_Request, opcode=0x8) |
| bind_layers( ATT_Hdr, ATT_Read_By_Type_Response, opcode=0x9) |
| bind_layers( ATT_Hdr, ATT_Read_Request, opcode=0xa) |
| bind_layers( ATT_Hdr, ATT_Read_Response, opcode=0xb) |
| bind_layers( ATT_Hdr, ATT_Read_By_Group_Type_Request, opcode=0x10) |
| bind_layers( ATT_Hdr, ATT_Read_By_Group_Type_Response, opcode=0x11) |
| bind_layers( ATT_Hdr, ATT_Write_Request, opcode=0x12) |
| bind_layers( ATT_Hdr, ATT_Write_Response, opcode=0x13) |
| bind_layers( ATT_Hdr, ATT_Write_Command, opcode=0x52) |
| bind_layers( ATT_Hdr, ATT_Handle_Value_Notification, opcode=0x1b) |
| bind_layers( L2CAP_Hdr, SM_Hdr, cid=6) |
| bind_layers( SM_Hdr, SM_Pairing_Request, sm_command=1) |
| bind_layers( SM_Hdr, SM_Pairing_Response, sm_command=2) |
| bind_layers( SM_Hdr, SM_Confirm, sm_command=3) |
| bind_layers( SM_Hdr, SM_Random, sm_command=4) |
| bind_layers( SM_Hdr, SM_Failed, sm_command=5) |
| bind_layers( SM_Hdr, SM_Encryption_Information, sm_command=6) |
| bind_layers( SM_Hdr, SM_Master_Identification, sm_command=7) |
| bind_layers( SM_Hdr, SM_Identity_Information, sm_command=8) |
| bind_layers( SM_Hdr, SM_Identity_Address_Information, sm_command=9) |
| bind_layers( SM_Hdr, SM_Signing_Information, sm_command=0x0a) |
| |
| class BluetoothSocketError(BaseException): |
| pass |
| |
| class BluetoothCommandError(BaseException): |
| pass |
| |
| class BluetoothL2CAPSocket(SuperSocket): |
| desc = "read/write packets on a connected L2CAP socket" |
| def __init__(self, bt_address): |
| if WINDOWS: |
| warning("Not available on Windows") |
| return |
| s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, |
| socket.BTPROTO_L2CAP) |
| s.connect((bt_address,0)) |
| self.ins = self.outs = s |
| |
| def recv(self, x=MTU): |
| return L2CAP_CmdHdr(self.ins.recv(x)) |
| |
| class BluetoothRFCommSocket(BluetoothL2CAPSocket): |
| """read/write packets on a connected RFCOMM socket""" |
| def __init__(self, bt_address, port=0): |
| s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, |
| socket.BTPROTO_RFCOMM) |
| s.connect((bt_address,port)) |
| self.ins = self.outs = s |
| |
| class BluetoothHCISocket(SuperSocket): |
| desc = "read/write on a BlueTooth HCI socket" |
| def __init__(self, iface=0x10000, type=None): |
| if WINDOWS: |
| warning("Not available on Windows") |
| return |
| s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) |
| s.setsockopt(socket.SOL_HCI, socket.HCI_DATA_DIR,1) |
| s.setsockopt(socket.SOL_HCI, socket.HCI_TIME_STAMP,1) |
| s.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, struct.pack("IIIh2x", 0xffffffff,0xffffffff,0xffffffff,0)) #type mask, event mask, event mask, opcode |
| s.bind((iface,)) |
| self.ins = self.outs = s |
| # s.connect((peer,0)) |
| |
| |
| def recv(self, x): |
| return HCI_Hdr(self.ins.recv(x)) |
| |
| class sockaddr_hci(Structure): |
| _fields_ = [ |
| ("sin_family", c_ushort), |
| ("hci_dev", c_ushort), |
| ("hci_channel", c_ushort), |
| ] |
| |
| class BluetoothUserSocket(SuperSocket): |
| desc = "read/write H4 over a Bluetooth user channel" |
| def __init__(self, adapter_index=0): |
| if WINDOWS: |
| warning("Not available on Windows") |
| return |
| # s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) |
| # s.bind((0,1)) |
| |
| # yeah, if only |
| # thanks to Python's weak ass socket and bind implementations, we have |
| # to call down into libc with ctypes |
| |
| sockaddr_hcip = POINTER(sockaddr_hci) |
| cdll.LoadLibrary("libc.so.6") |
| libc = CDLL("libc.so.6") |
| |
| socket_c = libc.socket |
| socket_c.argtypes = (c_int, c_int, c_int); |
| socket_c.restype = c_int |
| |
| bind = libc.bind |
| bind.argtypes = (c_int, POINTER(sockaddr_hci), c_int) |
| bind.restype = c_int |
| |
| ######## |
| ## actual code |
| |
| s = socket_c(31, 3, 1) # (AF_BLUETOOTH, SOCK_RAW, HCI_CHANNEL_USER) |
| if s < 0: |
| raise BluetoothSocketError("Unable to open PF_BLUETOOTH socket") |
| |
| sa = sockaddr_hci() |
| sa.sin_family = 31 # AF_BLUETOOTH |
| sa.hci_dev = adapter_index # adapter index |
| sa.hci_channel = 1 # HCI_USER_CHANNEL |
| |
| r = bind(s, sockaddr_hcip(sa), sizeof(sa)) |
| if r != 0: |
| raise BluetoothSocketError("Unable to bind") |
| |
| self.ins = self.outs = socket.fromfd(s, 31, 3, 1) |
| |
| def send_command(self, cmd): |
| opcode = cmd.opcode |
| self.send(cmd) |
| while True: |
| r = self.recv() |
| if r.type == 0x04 and r.code == 0xe and r.opcode == opcode: |
| if r.status != 0: |
| raise BluetoothCommandError("Command %x failed with %x" % (opcode, r.status)) |
| return r |
| |
| def recv(self, x=512): |
| return HCI_Hdr(self.ins.recv(x)) |
| |
| def readable(self, timeout=0): |
| (ins, outs, foo) = select([self.ins], [], [], timeout) |
| return len(ins) > 0 |
| |
| def flush(self): |
| while self.readable(): |
| self.recv() |
| |
| conf.BTsocket = BluetoothRFCommSocket |
| |
| ## Bluetooth |
| |
| @conf.commands.register |
| def srbt(bt_address, pkts, inter=0.1, *args, **kargs): |
| """send and receive using a bluetooth socket""" |
| if "port" in kargs.keys(): |
| s = conf.BTsocket(bt_address=bt_address, port=kargs.pop("port")) |
| else: |
| s = conf.BTsocket(bt_address=bt_address) |
| a,b = sndrcv(s,pkts,inter=inter,*args,**kargs) |
| s.close() |
| return a,b |
| |
| @conf.commands.register |
| def srbt1(bt_address, pkts, *args, **kargs): |
| """send and receive 1 packet using a bluetooth socket""" |
| a,b = srbt(bt_address, pkts, *args, **kargs) |
| if len(a) > 0: |
| return a[0][1] |