blob: 1046ac5db0cb6dc0ac14202e05c6adf8de5a3c0f [file] [log] [blame]
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license
"""
Global variables and functions for handling external data sets.
"""
import os
import re
import sys
import time
from scapy.dadict import DADict
from scapy.consts import DARWIN, FREEBSD, NETBSD, OPENBSD
from scapy.error import log_loading
from scapy.compat import *
############
## Consts ##
############
ETHER_ANY = b"\x00"*6
ETHER_BROADCAST = b"\xff"*6
ETH_P_ALL = 3
ETH_P_IP = 0x800
ETH_P_ARP = 0x806
ETH_P_IPV6 = 0x86dd
ETH_P_MACSEC = 0x88e5
# From net/if_arp.h
ARPHDR_ETHER = 1
ARPHDR_METRICOM = 23
ARPHDR_PPP = 512
ARPHDR_LOOPBACK = 772
ARPHDR_TUN = 65534
# From pcap/dlt.h
DLT_NULL = 0
DLT_EN10MB = 1
DLT_EN3MB = 2
DLT_AX25 = 3
DLT_PRONET = 4
DLT_CHAOS = 5
DLT_IEEE802 = 6
DLT_ARCNET = 7
DLT_SLIP = 8
DLT_PPP = 9
DLT_FDDI = 10
if OPENBSD:
DLT_RAW = 14
else:
DLT_RAW = 12
DLT_RAW_ALT = 101 # At least in Argus
if FREEBSD or NETBSD:
DLT_SLIP_BSDOS = 13
DLT_PPP_BSDOS = 14
else:
DLT_SLIP_BSDOS = 15
DLT_PPP_BSDOS = 16
if FREEBSD:
DLT_PFSYNC = 121
else:
DLT_PFSYNC = 18
DLT_HHDLC = 121
DLT_ATM_CLIP = 19
DLT_PPP_SERIAL = 50
DLT_PPP_ETHER = 51
DLT_SYMANTEC_FIREWALL = 99
DLT_C_HDLC = 104
DLT_IEEE802_11 = 105
if OPENBSD:
DLT_LOOP = 12
DLT_ENC = 13
else:
DLT_LOOP = 108
DLT_ENC = 109
DLT_LINUX_SLL = 113
DLT_PFLOG = 117
DLT_PRISM_HEADER = 119
DLT_AIRONET_HEADER = 120
DLT_IEEE802_11_RADIO = 127
DLT_LINUX_IRDA = 144
DLT_BLUETOOTH_HCI_H4 = 187
DLT_PPI = 192
DLT_CAN_SOCKETCAN = 227
DLT_IPV4 = 228
DLT_IPV6 = 229
# From net/ipv6.h on Linux (+ Additions)
IPV6_ADDR_UNICAST = 0x01
IPV6_ADDR_MULTICAST = 0x02
IPV6_ADDR_CAST_MASK = 0x0F
IPV6_ADDR_LOOPBACK = 0x10
IPV6_ADDR_GLOBAL = 0x00
IPV6_ADDR_LINKLOCAL = 0x20
IPV6_ADDR_SITELOCAL = 0x40 # deprecated since Sept. 2004 by RFC 3879
IPV6_ADDR_SCOPE_MASK = 0xF0
#IPV6_ADDR_COMPATv4 = 0x80 # deprecated; i.e. ::/96
#IPV6_ADDR_MAPPED = 0x1000 # i.e.; ::ffff:0.0.0.0/96
IPV6_ADDR_6TO4 = 0x0100 # Added to have more specific info (should be 0x0101 ?)
IPV6_ADDR_UNSPECIFIED = 0x10000
# On windows, epoch is 01/02/1970 at 00:00
EPOCH = time.mktime((1970, 1, 2, 0, 0, 0, 3, 1, 0))-86400
MTU = 0xffff # a.k.a give me all you have
WINDOWS=sys.platform.startswith("win")
# file parsing to get some values :
def load_protocols(filename):
spaces = re.compile(b"[ \t]+|\n")
dct = DADict(_name=filename)
try:
for l in open(filename, "rb"):
try:
shrp = l.find(b"#")
if shrp >= 0:
l = l[:shrp]
l = l.strip()
if not l:
continue
lt = tuple(re.split(spaces, l))
if len(lt) < 2 or not lt[0]:
continue
dct[lt[0]] = int(lt[1])
except Exception as e:
log_loading.info("Couldn't parse file [%s]: line [%r] (%s)", filename, l, e)
except IOError:
log_loading.info("Can't open %s file", filename)
return dct
def load_ethertypes(filename):
spaces = re.compile(b"[ \t]+|\n")
dct = DADict(_name=filename)
try:
f=open(filename, "rb")
for l in f:
try:
shrp = l.find(b"#")
if shrp >= 0:
l = l[:shrp]
l = l.strip()
if not l:
continue
lt = tuple(re.split(spaces, l))
if len(lt) < 2 or not lt[0]:
continue
dct[lt[0]] = int(lt[1], 16)
except Exception as e:
log_loading.info("Couldn't parse file [%s]: line [%r] (%s)", filename, l, e)
f.close()
except IOError as msg:
pass
return dct
def load_services(filename):
spaces = re.compile(b"[ \t]+|\n")
tdct=DADict(_name="%s-tcp"%filename)
udct=DADict(_name="%s-udp"%filename)
try:
f=open(filename, "rb")
for l in f:
try:
shrp = l.find(b"#")
if shrp >= 0:
l = l[:shrp]
l = l.strip()
if not l:
continue
lt = tuple(re.split(spaces, l))
if len(lt) < 2 or not lt[0]:
continue
if lt[1].endswith(b"/tcp"):
tdct[lt[0]] = int(lt[1].split(b'/')[0])
elif lt[1].endswith(b"/udp"):
udct[lt[0]] = int(lt[1].split(b'/')[0])
except Exception as e:
log_loading.warning("Couldn't parse file [%s]: line [%r] (%s)", filename, l, e)
f.close()
except IOError:
log_loading.info("Can't open /etc/services file")
return tdct,udct
class ManufDA(DADict):
def fixname(self, val):
return plain_str(val)
def _get_manuf_couple(self, mac):
oui = ":".join(mac.split(":")[:3]).upper()
return self.__dict__.get(oui,(mac,mac))
def _get_manuf(self, mac):
return self._get_manuf_couple(mac)[1]
def _get_short_manuf(self, mac):
return self._get_manuf_couple(mac)[0]
def _resolve_MAC(self, mac):
oui = ":".join(mac.split(":")[:3]).upper()
if oui in self:
return ":".join([self[oui][0]]+ mac.split(":")[3:])
return mac
def __repr__(self):
return "\n".join("<%s %s, %s>" % (i[0], i[1][0], i[1][1]) for i in self.__dict__.items())
def load_manuf(filename):
manufdb=ManufDA(_name=filename)
with open(filename, "rb") as fdesc:
for l in fdesc:
try:
l = l.strip()
if not l or l.startswith(b"#"):
continue
oui,shrt=l.split()[:2]
i = l.find(b"#")
if i < 0:
lng=shrt
else:
lng = l[i+2:]
manufdb[oui] = plain_str(shrt), plain_str(lng)
except Exception:
log_loading.warning("Couldn't parse one line from [%s] [%r]",
filename, l, exc_info=True)
return manufdb
if WINDOWS:
ETHER_TYPES=load_ethertypes("ethertypes")
IP_PROTOS=load_protocols(os.environ["SystemRoot"]+"\system32\drivers\etc\protocol")
TCP_SERVICES,UDP_SERVICES=load_services(os.environ["SystemRoot"] + "\system32\drivers\etc\services")
# Default value, will be updated by arch.windows
try:
MANUFDB = load_manuf(os.environ["ProgramFiles"] + "\\wireshark\\manuf")
except IOError:
MANUFDB = None
else:
IP_PROTOS=load_protocols("/etc/protocols")
ETHER_TYPES=load_ethertypes("/etc/ethertypes")
TCP_SERVICES,UDP_SERVICES=load_services("/etc/services")
MANUFDB = None
for prefix in ['/usr', '/usr/local', '/opt', '/opt/wireshark']:
try:
MANUFDB = load_manuf(os.path.join(prefix, "share", "wireshark",
"manuf"))
if MANUFDB:
break
except IOError:
pass
if not MANUFDB:
log_loading.warning("Cannot read wireshark manuf database")
#####################
## knowledge bases ##
#####################
class KnowledgeBase:
def __init__(self, filename):
self.filename = filename
self.base = None
def lazy_init(self):
self.base = ""
def reload(self, filename = None):
if filename is not None:
self.filename = filename
oldbase = self.base
self.base = None
self.lazy_init()
if self.base is None:
self.base = oldbase
def get_base(self):
if self.base is None:
self.lazy_init()
return self.base