| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## This program is published under a GPLv2 license |
| |
| """ |
| DHCP (Dynamic Host Configuration Protocol) and BOOTP |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| from collections import Iterable |
| import struct |
| |
| from scapy.packet import * |
| from scapy.fields import * |
| from scapy.ansmachine import * |
| from scapy.data import * |
| from scapy.compat import * |
| from scapy.layers.inet import UDP,IP |
| from scapy.layers.l2 import Ether |
| from scapy.base_classes import Net |
| from scapy.volatile import RandField |
| |
| from scapy.arch import get_if_raw_hwaddr |
| from scapy.sendrecv import * |
| from scapy.error import warning |
| import scapy.modules.six as six |
| from scapy.modules.six.moves import range |
| |
| dhcpmagic=b"c\x82Sc" |
| |
| |
| class BOOTP(Packet): |
| name = "BOOTP" |
| fields_desc = [ ByteEnumField("op",1, {1:"BOOTREQUEST", 2:"BOOTREPLY"}), |
| ByteField("htype",1), |
| ByteField("hlen",6), |
| ByteField("hops",0), |
| IntField("xid",0), |
| ShortField("secs",0), |
| FlagsField("flags", 0, 16, "???????????????B"), |
| IPField("ciaddr","0.0.0.0"), |
| IPField("yiaddr","0.0.0.0"), |
| IPField("siaddr","0.0.0.0"), |
| IPField("giaddr","0.0.0.0"), |
| Field("chaddr",b"", "16s"), |
| Field("sname",b"","64s"), |
| Field("file",b"","128s"), |
| StrField("options",b"") ] |
| def guess_payload_class(self, payload): |
| if self.options[:len(dhcpmagic)] == dhcpmagic: |
| return DHCP |
| else: |
| return Packet.guess_payload_class(self, payload) |
| def extract_padding(self,s): |
| if self.options[:len(dhcpmagic)] == dhcpmagic: |
| # set BOOTP options to DHCP magic cookie and make rest a payload of DHCP options |
| payload = self.options[len(dhcpmagic):] |
| self.options = self.options[:len(dhcpmagic)] |
| return payload, None |
| else: |
| return b"", None |
| def hashret(self): |
| return struct.pack("L", self.xid) |
| def answers(self, other): |
| if not isinstance(other, BOOTP): |
| return 0 |
| return self.xid == other.xid |
| |
| |
| class _DHCPParamReqFieldListField(FieldListField): |
| def getfield(self, pkt, s): |
| ret = [] |
| while s: |
| s, val = FieldListField.getfield(self, pkt, s) |
| ret.append(val) |
| return b"", [x[0] for x in ret] |
| |
| #DHCP_UNKNOWN, DHCP_IP, DHCP_IPLIST, DHCP_TYPE \ |
| #= range(4) |
| # |
| |
| DHCPTypes = { |
| 1: "discover", |
| 2: "offer", |
| 3: "request", |
| 4: "decline", |
| 5: "ack", |
| 6: "nak", |
| 7: "release", |
| 8: "inform", |
| 9: "force_renew", |
| 10:"lease_query", |
| 11:"lease_unassigned", |
| 12:"lease_unknown", |
| 13:"lease_active", |
| } |
| |
| DHCPOptions = { |
| 0: "pad", |
| 1: IPField("subnet_mask", "0.0.0.0"), |
| 2: "time_zone", |
| 3: IPField("router","0.0.0.0"), |
| 4: IPField("time_server","0.0.0.0"), |
| 5: IPField("IEN_name_server","0.0.0.0"), |
| 6: IPField("name_server","0.0.0.0"), |
| 7: IPField("log_server","0.0.0.0"), |
| 8: IPField("cookie_server","0.0.0.0"), |
| 9: IPField("lpr_server","0.0.0.0"), |
| 12: "hostname", |
| 14: "dump_path", |
| 15: "domain", |
| 17: "root_disk_path", |
| 22: "max_dgram_reass_size", |
| 23: "default_ttl", |
| 24: "pmtu_timeout", |
| 28: IPField("broadcast_address","0.0.0.0"), |
| 35: "arp_cache_timeout", |
| 36: "ether_or_dot3", |
| 37: "tcp_ttl", |
| 38: "tcp_keepalive_interval", |
| 39: "tcp_keepalive_garbage", |
| 40: "NIS_domain", |
| 41: IPField("NIS_server","0.0.0.0"), |
| 42: IPField("NTP_server","0.0.0.0"), |
| 43: "vendor_specific", |
| 44: IPField("NetBIOS_server","0.0.0.0"), |
| 45: IPField("NetBIOS_dist_server","0.0.0.0"), |
| 50: IPField("requested_addr","0.0.0.0"), |
| 51: IntField("lease_time", 43200), |
| 53: ByteEnumField("message-type", 1, DHCPTypes), |
| 54: IPField("server_id","0.0.0.0"), |
| 55: _DHCPParamReqFieldListField("param_req_list", [], ByteField("opcode", 0), length_from=lambda x: 1), |
| 56: "error_message", |
| 57: ShortField("max_dhcp_size", 1500), |
| 58: IntField("renewal_time", 21600), |
| 59: IntField("rebinding_time", 37800), |
| 60: "vendor_class_id", |
| 61: "client_id", |
| |
| 64: "NISplus_domain", |
| 65: IPField("NISplus_server","0.0.0.0"), |
| 69: IPField("SMTP_server","0.0.0.0"), |
| 70: IPField("POP3_server","0.0.0.0"), |
| 71: IPField("NNTP_server","0.0.0.0"), |
| 72: IPField("WWW_server","0.0.0.0"), |
| 73: IPField("Finger_server","0.0.0.0"), |
| 74: IPField("IRC_server","0.0.0.0"), |
| 75: IPField("StreetTalk_server","0.0.0.0"), |
| 76: "StreetTalk_Dir_Assistance", |
| 82: "relay_agent_Information", |
| 255: "end" |
| } |
| |
| DHCPRevOptions = {} |
| |
| for k,v in six.iteritems(DHCPOptions): |
| if isinstance(v, str): |
| n = v |
| v = None |
| else: |
| n = v.name |
| DHCPRevOptions[n] = (k,v) |
| del(n) |
| del(v) |
| del(k) |
| |
| |
| |
| |
| class RandDHCPOptions(RandField): |
| def __init__(self, size=None, rndstr=None): |
| if size is None: |
| size = RandNumExpo(0.05) |
| self.size = size |
| if rndstr is None: |
| rndstr = RandBin(RandNum(0,255)) |
| self.rndstr=rndstr |
| self._opts = list(DHCPOptions.values()) |
| self._opts.remove("pad") |
| self._opts.remove("end") |
| def _fix(self): |
| op = [] |
| for k in range(self.size): |
| o = random.choice(self._opts) |
| if isinstance(o, str): |
| op.append((o,self.rndstr*1)) |
| else: |
| op.append((o.name, o.randval()._fix())) |
| return op |
| |
| |
| class DHCPOptionsField(StrField): |
| islist=1 |
| def i2repr(self,pkt,x): |
| s = [] |
| for v in x: |
| if isinstance(v, tuple) and len(v) >= 2: |
| if v[0] in DHCPRevOptions and isinstance(DHCPRevOptions[v[0]][1],Field): |
| f = DHCPRevOptions[v[0]][1] |
| vv = ",".join(f.i2repr(pkt,val) for val in v[1:]) |
| else: |
| vv = ",".join(repr(val) for val in v[1:]) |
| r = "%s=%s" % (v[0],vv) |
| s.append(r) |
| else: |
| s.append(sane(v)) |
| return "[%s]" % (" ".join(s)) |
| |
| def getfield(self, pkt, s): |
| return b"", self.m2i(pkt, s) |
| def m2i(self, pkt, x): |
| opt = [] |
| while x: |
| o = orb(x[0]) |
| if o == 255: |
| opt.append("end") |
| x = x[1:] |
| continue |
| if o == 0: |
| opt.append("pad") |
| x = x[1:] |
| continue |
| if len(x) < 2 or len(x) < orb(x[1])+2: |
| opt.append(x) |
| break |
| elif o in DHCPOptions: |
| f = DHCPOptions[o] |
| |
| if isinstance(f, str): |
| olen = orb(x[1]) |
| opt.append( (f,x[2:olen+2]) ) |
| x = x[olen+2:] |
| else: |
| olen = orb(x[1]) |
| lval = [f.name] |
| try: |
| left = x[2:olen+2] |
| while left: |
| left, val = f.getfield(pkt,left) |
| lval.append(val) |
| except: |
| opt.append(x) |
| break |
| else: |
| otuple = tuple(lval) |
| opt.append(otuple) |
| x = x[olen+2:] |
| else: |
| olen = orb(x[1]) |
| opt.append((o, x[2:olen+2])) |
| x = x[olen+2:] |
| return opt |
| def i2m(self, pkt, x): |
| if isinstance(x, str): |
| return x |
| s = b"" |
| for o in x: |
| if isinstance(o, tuple) and len(o) >= 2: |
| name = o[0] |
| lval = o[1:] |
| |
| if isinstance(name, int): |
| onum, oval = name, b"".join(lval) |
| elif name in DHCPRevOptions: |
| onum, f = DHCPRevOptions[name] |
| if f is not None: |
| lval = [f.addfield(pkt,b"",f.any2i(pkt,val)) for val in lval] |
| oval = b"".join(lval) |
| else: |
| warning("Unknown field option %s", name) |
| continue |
| |
| s += chb(onum) |
| s += chb(len(oval)) |
| s += oval |
| |
| elif (isinstance(o, str) and o in DHCPRevOptions and |
| DHCPRevOptions[o][1] == None): |
| s += chb(DHCPRevOptions[o][0]) |
| elif isinstance(o, int): |
| s += chb(o)+b"\0" |
| elif isinstance(o, (str, bytes)): |
| s += raw(o) |
| else: |
| warning("Malformed option %s", o) |
| return s |
| |
| |
| class DHCP(Packet): |
| name = "DHCP options" |
| fields_desc = [ DHCPOptionsField("options",b"") ] |
| |
| |
| bind_layers( UDP, BOOTP, dport=67, sport=68) |
| bind_layers( UDP, BOOTP, dport=68, sport=67) |
| bind_bottom_up( UDP, BOOTP, dport=67, sport=67) |
| bind_layers( BOOTP, DHCP, options=b'c\x82Sc') |
| |
| @conf.commands.register |
| def dhcp_request(iface=None,**kargs): |
| if conf.checkIPaddr != 0: |
| warning("conf.checkIPaddr is not 0, I may not be able to match the answer") |
| if iface is None: |
| iface = conf.iface |
| fam,hw = get_if_raw_hwaddr(iface) |
| return srp1(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(src="0.0.0.0",dst="255.255.255.255")/UDP(sport=68,dport=67) |
| /BOOTP(chaddr=hw)/DHCP(options=[("message-type","discover"),"end"]),iface=iface,**kargs) |
| |
| |
| class BOOTP_am(AnsweringMachine): |
| function_name = "bootpd" |
| filter = "udp and port 68 and port 67" |
| send_function = staticmethod(sendp) |
| def parse_options(self, pool=Net("192.168.1.128/25"), network="192.168.1.0/24",gw="192.168.1.1", |
| domain="localnet", renewal_time=60, lease_time=1800): |
| self.domain = domain |
| netw,msk = (network.split("/")+["32"])[:2] |
| msk = itom(int(msk)) |
| self.netmask = ltoa(msk) |
| self.network = ltoa(atol(netw)&msk) |
| self.broadcast = ltoa( atol(self.network) | (0xffffffff&~msk) ) |
| self.gw = gw |
| if isinstance(pool, six.string_types): |
| pool = Net(pool) |
| if isinstance(pool, Iterable): |
| pool = [k for k in pool if k not in [gw, self.network, self.broadcast]] |
| pool.reverse() |
| if len(pool) == 1: |
| pool, = pool |
| self.pool = pool |
| self.lease_time = lease_time |
| self.renewal_time = renewal_time |
| self.leases = {} |
| |
| def is_request(self, req): |
| if not req.haslayer(BOOTP): |
| return 0 |
| reqb = req.getlayer(BOOTP) |
| if reqb.op != 1: |
| return 0 |
| return 1 |
| |
| def print_reply(self, req, reply): |
| print("Reply %s to %s" % (reply.getlayer(IP).dst,reply.dst)) |
| |
| def make_reply(self, req): |
| mac = req.src |
| if isinstance(self.pool, list): |
| if mac not in self.leases: |
| self.leases[mac] = self.pool.pop() |
| ip = self.leases[mac] |
| else: |
| ip = self.pool |
| |
| repb = req.getlayer(BOOTP).copy() |
| repb.op="BOOTREPLY" |
| repb.yiaddr = ip |
| repb.siaddr = self.gw |
| repb.ciaddr = self.gw |
| repb.giaddr = self.gw |
| del(repb.payload) |
| rep=Ether(dst=mac)/IP(dst=ip)/UDP(sport=req.dport,dport=req.sport)/repb |
| return rep |
| |
| |
| class DHCP_am(BOOTP_am): |
| function_name="dhcpd" |
| def make_reply(self, req): |
| resp = BOOTP_am.make_reply(self, req) |
| if DHCP in req: |
| dhcp_options = [(op[0],{1:2,3:5}.get(op[1],op[1])) |
| for op in req[DHCP].options |
| if isinstance(op, tuple) and op[0] == "message-type"] |
| dhcp_options += [("server_id",self.gw), |
| ("domain", self.domain), |
| ("router", self.gw), |
| ("name_server", self.gw), |
| ("broadcast_address", self.broadcast), |
| ("subnet_mask", self.netmask), |
| ("renewal_time", self.renewal_time), |
| ("lease_time", self.lease_time), |
| "end" |
| ] |
| resp /= DHCP(options=dhcp_options) |
| return resp |
| |
| |