# This file is part of Scapy
## See for more informations
## Copyright (C) Philippe Biondi <>
## This program is published under a GPLv2 license
Classes and functions for layer 2 protocols.
from __future__ import absolute_import
from __future__ import print_function
import os, struct, time, socket
import scapy
from scapy.base_classes import Net
from scapy.config import conf
from import *
from scapy.compat import *
from scapy.packet import *
from scapy.ansmachine import *
from scapy.plist import SndRcvList
from scapy.fields import *
from scapy.sendrecv import srp, srp1, srpflood
from scapy.arch import get_if_hwaddr
from scapy.utils import inet_ntoa, inet_aton
from scapy.error import warning
if conf.route is None:
# unused import, only to initialize conf.route
import scapy.route
## Tools ##
class Neighbor:
def __init__(self):
self.resolvers = {}
def register_l3(self, l2, l3, resolve_method):
def resolve(self, l2inst, l3inst):
k = l2inst.__class__,l3inst.__class__
if k in self.resolvers:
return self.resolvers[k](l2inst,l3inst)
def __repr__(self):
return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2,l3 in self.resolvers)
conf.neighbor = Neighbor()
conf.netcache.new_cache("arp_cache", 120) # cache entries expire after 120s
def getmacbyip(ip, chainCC=0):
"""Return MAC address corresponding to a given IP address"""
if isinstance(ip, Net):
ip = next(iter(ip))
ip = inet_ntoa(inet_aton(ip))
tmp = [orb(e) for e in inet_aton(ip)]
if (tmp[0] & 0xf0) == 0xe0: # mcast @
return "01:00:5e:%.2x:%.2x:%.2x" % (tmp[1]&0x7f,tmp[2],tmp[3])
iff,a,gw = conf.route.route(ip)
if ( (iff == scapy.consts.LOOPBACK_INTERFACE) or (ip == conf.route.get_if_bcast(iff)) ):
return "ff:ff:ff:ff:ff:ff"
if gw != "":
ip = gw
mac = conf.netcache.arp_cache.get(ip)
if mac:
return mac
res = srp1(Ether(dst=ETHER_BROADCAST)/ARP(op="who-has", pdst=ip),
iface = iff,
if res is not None:
mac = res.payload.hwsrc
conf.netcache.arp_cache[ip] = mac
return mac
return None
### Fields
class DestMACField(MACField):
def __init__(self, name):
MACField.__init__(self, name, None)
def i2h(self, pkt, x):
if x is None:
x = conf.neighbor.resolve(pkt,pkt.payload)
except socket.error:
if x is None:
x = "ff:ff:ff:ff:ff:ff"
warning("Mac address to reach destination not found. Using broadcast.")
return MACField.i2h(self, pkt, x)
def i2m(self, pkt, x):
return MACField.i2m(self, pkt, self.i2h(pkt, x))
class SourceMACField(MACField):
__slots__ = ["getif"]
def __init__(self, name, getif=None):
MACField.__init__(self, name, None)
self.getif = ((lambda pkt: pkt.payload.route()[0])
if getif is None else getif)
def i2h(self, pkt, x):
if x is None:
iff = self.getif(pkt)
if iff is None:
iff = conf.iface
if iff:
x = get_if_hwaddr(iff)
if x is None:
x = "00:00:00:00:00:00"
return MACField.i2h(self, pkt, x)
def i2m(self, pkt, x):
return MACField.i2m(self, pkt, self.i2h(pkt, x))
class ARPSourceMACField(SourceMACField):
def __init__(self, name):
super(ARPSourceMACField, self).__init__(
getif=lambda pkt: pkt.route()[0],
### Layers
ETHER_TYPES['802_AD'] = 0x88a8
class Ether(Packet):
name = "Ethernet"
fields_desc = [ DestMACField("dst"),
XShortEnumField("type", 0x9000, ETHER_TYPES) ]
__slots__ = ["_defrag_pos"]
def hashret(self):
return struct.pack("H",self.type)+self.payload.hashret()
def answers(self, other):
if isinstance(other,Ether):
if self.type == other.type:
return self.payload.answers(other.payload)
return 0
def mysummary(self):
return self.sprintf("%src% > %dst% (%type%)")
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and len(_pkt) >= 14:
if struct.unpack("!H", _pkt[12:14])[0] <= 1500:
return Dot3
return cls
class Dot3(Packet):
name = "802.3"
fields_desc = [ DestMACField("dst"),
MACField("src", ETHER_ANY),
LenField("len", None, "H") ]
def extract_padding(self,s):
l = self.len
return s[:l],s[l:]
def answers(self, other):
if isinstance(other,Dot3):
return self.payload.answers(other.payload)
return 0
def mysummary(self):
return "802.3 %s > %s" % (self.src, self.dst)
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and len(_pkt) >= 14:
if struct.unpack("!H", _pkt[12:14])[0] > 1500:
return Ether
return cls
class LLC(Packet):
name = "LLC"
fields_desc = [ XByteField("dsap", 0x00),
XByteField("ssap", 0x00),
ByteField("ctrl", 0) ]
def l2_register_l3(l2, l3):
return conf.neighbor.resolve(l2, l3.payload)
conf.neighbor.register_l3(Ether, LLC, l2_register_l3)
conf.neighbor.register_l3(Dot3, LLC, l2_register_l3)
class CookedLinux(Packet):
# Documentation:
name = "cooked linux"
# from wireshark's database
fields_desc = [ ShortEnumField("pkttype",0, {0: "unicast",
1: "broadcast",
2: "multicast",
3: "unicast-to-another-host",
XShortEnumField("proto",0x800,ETHER_TYPES) ]
class SNAP(Packet):
name = "SNAP"
fields_desc = [ X3BytesField("OUI",0x000000),
XShortEnumField("code", 0x000, ETHER_TYPES) ]
conf.neighbor.register_l3(Dot3, SNAP, l2_register_l3)
class Dot1Q(Packet):
name = "802.1Q"
aliastypes = [ Ether ]
fields_desc = [ BitField("prio", 0, 3),
BitField("id", 0, 1),
BitField("vlan", 1, 12),
XShortEnumField("type", 0x0000, ETHER_TYPES) ]
def answers(self, other):
if isinstance(other,Dot1Q):
if ( (self.type == other.type) and
(self.vlan == other.vlan) ):
return self.payload.answers(other.payload)
return self.payload.answers(other)
return 0
def default_payload_class(self, pay):
if self.type <= 1500:
return LLC
return conf.raw_layer
def extract_padding(self,s):
if self.type <= 1500:
return s[:self.type],s[self.type:]
return s,None
def mysummary(self):
if isinstance(self.underlayer, Ether):
return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%")
return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%")
conf.neighbor.register_l3(Ether, Dot1Q, l2_register_l3)
class STP(Packet):
name = "Spanning Tree Protocol"
fields_desc = [ ShortField("proto", 0),
ByteField("version", 0),
ByteField("bpdutype", 0),
ByteField("bpduflags", 0),
ShortField("rootid", 0),
MACField("rootmac", ETHER_ANY),
IntField("pathcost", 0),
ShortField("bridgeid", 0),
MACField("bridgemac", ETHER_ANY),
ShortField("portid", 0),
BCDFloatField("age", 1),
BCDFloatField("maxage", 20),
BCDFloatField("hellotime", 2),
BCDFloatField("fwddelay", 15) ]
class ARP(Packet):
name = "ARP"
fields_desc = [ XShortField("hwtype", 0x0001),
XShortEnumField("ptype", 0x0800, ETHER_TYPES),
ByteField("hwlen", 6),
ByteField("plen", 4),
ShortEnumField("op", 1, {"who-has":1, "is-at":2, "RARP-req":3, "RARP-rep":4, "Dyn-RARP-req":5, "Dyn-RAR-rep":6, "Dyn-RARP-err":7, "InARP-req":8, "InARP-rep":9}),
MACField("hwdst", ETHER_ANY),
IPField("pdst", "") ]
who_has = 1
is_at = 2
def answers(self, other):
if isinstance(other,ARP):
if ( (self.op == self.is_at) and
(other.op == self.who_has) and
(self.psrc == other.pdst) ):
return 1
return 0
def route(self):
dst = self.pdst
if isinstance(dst,Gen):
dst = next(iter(dst))
return conf.route.route(dst)
def extract_padding(self, s):
return "",s
def mysummary(self):
if self.op == self.is_at:
return self.sprintf("ARP is at %hwsrc% says %psrc%")
elif self.op == self.who_has:
return self.sprintf("ARP who has %pdst% says %psrc%")
return self.sprintf("ARP %op% %psrc% > %pdst%")
def l2_register_l3_arp(l2, l3):
return getmacbyip(l3.pdst)
conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp)
class GRErouting(Packet):
name = "GRE routing informations"
fields_desc = [ ShortField("address_family",0),
ByteField("SRE_offset", 0),
FieldLenField("SRE_len", None, "routing_info", "B"),
StrLenField("routing_info", "", "SRE_len"),
class GRE(Packet):
name = "GRE"
fields_desc = [ BitField("chksum_present",0,1),
XShortEnumField("proto", 0x0000, ETHER_TYPES),
ConditionalField(XShortField("chksum",None), lambda pkt:pkt.chksum_present==1 or pkt.routing_present==1),
ConditionalField(XShortField("offset",None), lambda pkt:pkt.chksum_present==1 or pkt.routing_present==1),
ConditionalField(XIntField("key",None), lambda pkt:pkt.key_present==1),
ConditionalField(XIntField("seqence_number",None), lambda pkt:pkt.seqnum_present==1),
def dispatch_hook(cls, _pkt=None, *args, **kargs):
if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b:
return GRE_PPTP
return cls
def post_build(self, p, pay):
p += pay
if self.chksum_present and self.chksum is None:
c = checksum(p)
p = p[:4]+chb((c>>8)&0xff)+chb(c&0xff)+p[6:]
return p
class GRE_PPTP(GRE):
Enhanced GRE header used with PPTP
RFC 2637
name = "GRE PPTP"
fields_desc = [BitField("chksum_present", 0, 1),
BitField("routing_present", 0, 1),
BitField("key_present", 1, 1),
BitField("seqnum_present", 0, 1),
BitField("strict_route_source", 0, 1),
BitField("recursion_control", 0, 3),
BitField("acknum_present", 0, 1),
BitField("flags", 0, 4),
BitField("version", 1, 3),
XShortEnumField("proto", 0x880b, ETHER_TYPES),
ShortField("payload_len", None),
ShortField("call_id", None),
ConditionalField(XIntField("seqence_number", None), lambda pkt: pkt.seqnum_present == 1),
ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)]
def post_build(self, p, pay):
p += pay
if self.payload_len is None:
pay_len = len(pay)
p = p[:4] + chb((pay_len >> 8) & 0xff) + chb(pay_len & 0xff) + p[6:]
return p
### *BSD loopback layer
class LoIntEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "!I")
def m2i(self, pkt, x):
return x >> 24
def i2m(self, pkt, x):
return x << 24
LOOPBACK_TYPES = { 0x2: "IPv4", 0x1c: "IPv6" }
class Loopback(Packet):
"""*BSD loopback layer"""
name = "Loopback"
fields_desc = [ LoIntEnumField("type", 0x2, LOOPBACK_TYPES) ]
__slots__ = ["_defrag_pos"]
class Dot1AD(Dot1Q):
name = '802_1AD'
bind_layers( Dot3, LLC, )
bind_layers( Ether, LLC, type=122)
bind_layers( Ether, LLC, type=34928)
bind_layers( Ether, Dot1Q, type=33024)
bind_layers( Ether, Dot1AD, type=0x88a8)
bind_layers( Dot1AD, Dot1AD, type=0x88a8)
bind_layers( Dot1AD, Dot1Q, type=0x8100)
bind_layers( Dot1Q, Dot1AD, type=0x88a8)
bind_layers( Ether, Ether, type=1)
bind_layers( Ether, ARP, type=2054)
bind_layers( CookedLinux, LLC, proto=122)
bind_layers( CookedLinux, Dot1Q, proto=33024)
bind_layers( CookedLinux, Dot1AD, type=0x88a8)
bind_layers( CookedLinux, Ether, proto=1)
bind_layers( CookedLinux, ARP, proto=2054)
bind_layers( GRE, LLC, proto=122)
bind_layers( GRE, Dot1Q, proto=33024)
bind_layers( GRE, Dot1AD, type=0x88a8)
bind_layers( GRE, Ether, proto=0x6558)
bind_layers( GRE, ARP, proto=2054)
bind_layers( GRE, GRErouting, { "routing_present" : 1 } )
bind_layers( GRErouting, conf.raw_layer,{ "address_family" : 0, "SRE_len" : 0 })
bind_layers( GRErouting, GRErouting, { } )
bind_layers( LLC, STP, dsap=66, ssap=66, ctrl=3)
bind_layers( LLC, SNAP, dsap=170, ssap=170, ctrl=3)
bind_layers( SNAP, Dot1Q, code=33024)
bind_layers( SNAP, Dot1AD, type=0x88a8)
bind_layers( SNAP, Ether, code=1)
bind_layers( SNAP, ARP, code=2054)
bind_layers( SNAP, STP, code=267)
conf.l2types.register(ARPHDR_ETHER, Ether)
conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether)
conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether)
conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3)
conf.l2types.register(DLT_LINUX_SLL, CookedLinux)
conf.l2types.register_num2layer(DLT_LINUX_IRDA, CookedLinux)
conf.l2types.register(DLT_LOOP, Loopback)
conf.l2types.register_num2layer(DLT_NULL, Loopback)
conf.l3types.register(ETH_P_ARP, ARP)
### Technics
def arpcachepoison(target, victim, interval=60):
"""Poison target's cache with (your MAC,victim's IP) couple
arpcachepoison(target, victim, [interval=60]) -> None
tmac = getmacbyip(target)
p = Ether(dst=tmac)/ARP(op="who-has", psrc=victim, pdst=target)
while True:
sendp(p, iface_hint=target)
if conf.verb > 1:
os.write(1, b".")
except KeyboardInterrupt:
class ARPingResult(SndRcvList):
def __init__(self, res=None, name="ARPing", stats=None):
SndRcvList.__init__(self, res, name, stats)
def show(self):
for s,r in self.res:
print(r.sprintf("%19s,Ether.src% %ARP.psrc%"))
def arping(net, timeout=2, cache=0, verbose=None, **kargs):
"""Send ARP who-has requests to determine which hosts are up
arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None
Set cache=True if you want arping to modify internal ARP-Cache"""
if verbose is None:
verbose = conf.verb
ans,unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=net), verbose=verbose,
filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs)
ans = ARPingResult(ans.res)
if cache and ans is not None:
for pair in ans:
conf.netcache.arp_cache[pair[1].psrc] = (pair[1].hwsrc, time.time())
if verbose:
return ans,unans
def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00",**kargs):
"""Try to guess if target is in Promisc mode. The target is provided by its ip."""
responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip),type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0,**kargs)
return responses is not None
def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs):
"""Send ARP who-has requests to determine which hosts are in promiscuous mode
promiscping(net, iface=conf.iface)"""
ans,unans = srp(Ether(dst=fake_bcast)/ARP(pdst=net),
filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs)
ans = ARPingResult(ans.res, name="PROMISCPing")
return ans,unans
class ARP_am(AnsweringMachine):
"""Fake ARP Relay Daemon (farpd)
To respond to an ARP request for 192.168.100 replying on the
ingress interface;
To respond on a different interface add the interface parameter
To respond on ANY arp request on an interface with mac address ARP_addr
To respond on ANY arp request with my mac addr on the given interface
Optional Args
inter=<n> Interval in seconds between ARP replies being sent
filter = "arp"
send_function = staticmethod(sendp)
def parse_options(self, IP_addr=None, ARP_addr=None):
def is_request(self, req):
return (req.haslayer(ARP) and
req.getlayer(ARP).op == 1 and
(self.IP_addr == None or self.IP_addr == req.getlayer(ARP).pdst))
def make_reply(self, req):
ether = req.getlayer(Ether)
arp = req.getlayer(ARP)
if 'iface' in self.optsend:
iff = self.optsend.get('iface')
iff,a,gw = conf.route.route(arp.psrc)
self.iff = iff
if self.ARP_addr is None:
ARP_addr = get_if_hwaddr(iff)
ARP_addr = "00:00:00:00:00:00"
ARP_addr = self.ARP_addr
resp = Ether(dst=ether.src,
return resp
def send_reply(self, reply):
if 'iface' in self.optsend:
self.send_function(reply, **self.optsend)
self.send_function(reply, iface=self.iff, **self.optsend)
def print_reply(self, req, reply):
print("%s ==> %s on %s" % (req.summary(),reply.summary(),self.iff))
def etherleak(target, **kargs):
"""Exploit Etherleak flaw"""
return srpflood(Ether()/ARP(pdst=target),
prn=lambda s_r: conf.padding_layer in s_r[1] and hexstr(s_r[1][conf.padding_layer].load),
filter="arp", **kargs)