| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## This program is published under a GPLv2 license |
| |
| """ |
| Fields: basic data structures that make up parts of packets. |
| """ |
| |
| from __future__ import absolute_import |
| import struct,copy,socket,collections |
| from scapy.config import conf |
| from scapy.dadict import DADict |
| from scapy.volatile import * |
| from scapy.data import * |
| from scapy.compat import * |
| from scapy.utils import * |
| from scapy.base_classes import BasePacket, Gen, Net, Field_metaclass |
| from scapy.error import warning |
| import scapy.modules.six as six |
| from scapy.modules.six.moves import range |
| |
| |
| ############ |
| ## Fields ## |
| ############ |
| |
| class Field(six.with_metaclass(Field_metaclass, object)): |
| """For more informations on how this work, please refer to |
| http://www.secdev.org/projects/scapy/files/scapydoc.pdf |
| chapter ``Adding a New Field''""" |
| __slots__ = ["name", "fmt", "default", "sz", "owners"] |
| islist = 0 |
| ismutable = False |
| holds_packets = 0 |
| def __init__(self, name, default, fmt="H"): |
| self.name = name |
| if fmt[0] in "@=<>!": |
| self.fmt = fmt |
| else: |
| self.fmt = "!"+fmt |
| self.default = self.any2i(None,default) |
| self.sz = struct.calcsize(self.fmt) |
| self.owners = [] |
| |
| def register_owner(self, cls): |
| self.owners.append(cls) |
| |
| def i2len(self, pkt, x): |
| """Convert internal value to a length usable by a FieldLenField""" |
| return self.sz |
| def i2count(self, pkt, x): |
| """Convert internal value to a number of elements usable by a FieldLenField. |
| Always 1 except for list fields""" |
| return 1 |
| def h2i(self, pkt, x): |
| """Convert human value to internal value""" |
| return x |
| def i2h(self, pkt, x): |
| """Convert internal value to human value""" |
| return x |
| def m2i(self, pkt, x): |
| """Convert machine value to internal value""" |
| return x |
| def i2m(self, pkt, x): |
| """Convert internal value to machine value""" |
| if x is None: |
| x = 0 |
| elif isinstance(x, str): |
| return raw(x) |
| return x |
| def any2i(self, pkt, x): |
| """Try to understand the most input values possible and make an internal value from them""" |
| return self.h2i(pkt, x) |
| def i2repr(self, pkt, x): |
| """Convert internal value to a nice representation""" |
| return repr(self.i2h(pkt,x)) |
| def addfield(self, pkt, s, val): |
| """Add an internal value to a string""" |
| return s+struct.pack(self.fmt, self.i2m(pkt,val)) |
| def getfield(self, pkt, s): |
| """Extract an internal value from a string""" |
| return s[self.sz:], self.m2i(pkt, struct.unpack(self.fmt, s[:self.sz])[0]) |
| def do_copy(self, x): |
| if hasattr(x, "copy"): |
| return x.copy() |
| if isinstance(x, list): |
| x = x[:] |
| for i in range(len(x)): |
| if isinstance(x[i], BasePacket): |
| x[i] = x[i].copy() |
| return x |
| def __repr__(self): |
| return "<Field (%s).%s>" % (",".join(x.__name__ for x in self.owners),self.name) |
| def copy(self): |
| return copy.deepcopy(self) |
| def randval(self): |
| """Return a volatile object whose value is both random and suitable for this field""" |
| fmtt = self.fmt[-1] |
| if fmtt in "BHIQ": |
| return {"B":RandByte,"H":RandShort,"I":RandInt, "Q":RandLong}[fmtt]() |
| elif fmtt == "s": |
| if self.fmt[0] in "0123456789": |
| l = int(self.fmt[:-1]) |
| else: |
| l = int(self.fmt[1:-1]) |
| return RandBin(l) |
| else: |
| warning("no random class for [%s] (fmt=%s).", self.name, self.fmt) |
| |
| |
| |
| |
| class Emph(object): |
| __slots__ = ["fld"] |
| def __init__(self, fld): |
| self.fld = fld |
| def __getattr__(self, attr): |
| return getattr(self.fld,attr) |
| def __hash__(self): |
| return hash(self.fld) |
| def __eq__(self, other): |
| return self.fld == other |
| |
| |
| class ActionField(object): |
| __slots__ = ["_fld", "_action_method", "_privdata"] |
| def __init__(self, fld, action_method, **kargs): |
| self._fld = fld |
| self._action_method = action_method |
| self._privdata = kargs |
| def any2i(self, pkt, val): |
| getattr(pkt, self._action_method)(val, self._fld, **self._privdata) |
| return getattr(self._fld, "any2i")(pkt, val) |
| def __getattr__(self, attr): |
| return getattr(self._fld,attr) |
| |
| |
| class ConditionalField(object): |
| __slots__ = ["fld", "cond"] |
| def __init__(self, fld, cond): |
| self.fld = fld |
| self.cond = cond |
| def _evalcond(self,pkt): |
| return self.cond(pkt) |
| |
| def getfield(self, pkt, s): |
| if self._evalcond(pkt): |
| return self.fld.getfield(pkt,s) |
| else: |
| return s,None |
| |
| def addfield(self, pkt, s, val): |
| if self._evalcond(pkt): |
| return self.fld.addfield(pkt,s,val) |
| else: |
| return s |
| def __getattr__(self, attr): |
| return getattr(self.fld,attr) |
| |
| |
| class PadField(object): |
| """Add bytes after the proxified field so that it ends at the specified |
| alignment from its beginning""" |
| __slots__ = ["_fld", "_align", "_padwith"] |
| def __init__(self, fld, align, padwith=None): |
| self._fld = fld |
| self._align = align |
| self._padwith = padwith or b"" |
| |
| def padlen(self, flen): |
| return -flen%self._align |
| |
| def getfield(self, pkt, s): |
| remain,val = self._fld.getfield(pkt,s) |
| padlen = self.padlen(len(s)-len(remain)) |
| return remain[padlen:], val |
| |
| def addfield(self, pkt, s, val): |
| sval = self._fld.addfield(pkt, b"", val) |
| return s+sval+struct.pack("%is" % (self.padlen(len(sval))), self._padwith) |
| |
| def __getattr__(self, attr): |
| return getattr(self._fld,attr) |
| |
| |
| class DestField(Field): |
| __slots__ = ["defaultdst"] |
| # Each subclass must have its own bindings attribute |
| # bindings = {} |
| def __init__(self, name, default): |
| self.defaultdst = default |
| def dst_from_pkt(self, pkt): |
| for addr, condition in self.bindings.get(pkt.payload.__class__, []): |
| try: |
| if all(pkt.payload.getfieldval(field) == value |
| for field, value in six.iteritems(condition)): |
| return addr |
| except AttributeError: |
| pass |
| return self.defaultdst |
| @classmethod |
| def bind_addr(cls, layer, addr, **condition): |
| cls.bindings.setdefault(layer, []).append((addr, condition)) |
| |
| |
| class MACField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "6s") |
| def i2m(self, pkt, x): |
| if x is None: |
| return b"\0\0\0\0\0\0" |
| return mac2str(x) |
| def m2i(self, pkt, x): |
| return str2mac(x) |
| def any2i(self, pkt, x): |
| if isinstance(x, bytes) and len(x) == 6: |
| x = self.m2i(pkt, x) |
| return x |
| def i2repr(self, pkt, x): |
| x = self.i2h(pkt, x) |
| if self in conf.resolve: |
| x = conf.manufdb._resolve_MAC(x) |
| return x |
| def randval(self): |
| return RandMAC() |
| |
| |
| class IPField(Field): |
| slots = [] |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "4s") |
| def h2i(self, pkt, x): |
| if isinstance(x, bytes): |
| x = plain_str(x) |
| if isinstance(x, str): |
| try: |
| inet_aton(x) |
| except socket.error: |
| x = Net(x) |
| elif isinstance(x, list): |
| x = [self.h2i(pkt, n) for n in x] |
| return x |
| def resolve(self, x): |
| if self in conf.resolve: |
| try: |
| ret = socket.gethostbyaddr(x)[0] |
| except: |
| pass |
| else: |
| if ret: |
| return ret |
| return x |
| def i2m(self, pkt, x): |
| return inet_aton(x) |
| def m2i(self, pkt, x): |
| return inet_ntoa(x) |
| def any2i(self, pkt, x): |
| return self.h2i(pkt,x) |
| def i2repr(self, pkt, x): |
| return self.resolve(self.i2h(pkt, x)) |
| def randval(self): |
| return RandIP() |
| |
| class SourceIPField(IPField): |
| __slots__ = ["dstname"] |
| def __init__(self, name, dstname): |
| IPField.__init__(self, name, None) |
| self.dstname = dstname |
| def __findaddr(self, pkt): |
| if conf.route is None: |
| # unused import, only to initialize conf.route |
| import scapy.route |
| dst = ("0.0.0.0" if self.dstname is None |
| else getattr(pkt, self.dstname)) |
| if isinstance(dst, (Gen, list)): |
| r = {conf.route.route(daddr) for daddr in dst} |
| if len(r) > 1: |
| warning("More than one possible route for %r" % (dst,)) |
| return min(r)[1] |
| return conf.route.route(dst)[1] |
| def i2m(self, pkt, x): |
| if x is None: |
| x = self.__findaddr(pkt) |
| return IPField.i2m(self, pkt, x) |
| def i2h(self, pkt, x): |
| if x is None: |
| x = self.__findaddr(pkt) |
| return IPField.i2h(self, pkt, x) |
| |
| |
| |
| |
| class ByteField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "B") |
| |
| class XByteField(ByteField): |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt, x)) |
| |
| class OByteField(ByteField): |
| def i2repr(self, pkt, x): |
| return "%03o"%self.i2h(pkt, x) |
| |
| class X3BytesField(XByteField): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "!I") |
| def addfield(self, pkt, s, val): |
| return s+struct.pack(self.fmt, self.i2m(pkt,val))[1:4] |
| def getfield(self, pkt, s): |
| return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00"+s[:3])[0]) |
| |
| class ThreeBytesField(X3BytesField, ByteField): |
| def i2repr(self, pkt, x): |
| return ByteField.i2repr(self, pkt, x) |
| |
| class SignedByteField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "b") |
| |
| class ShortField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "H") |
| |
| class SignedShortField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "h") |
| |
| class LEShortField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "<H") |
| |
| class XShortField(ShortField): |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt, x)) |
| |
| |
| class IntField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "I") |
| |
| class SignedIntField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "i") |
| def randval(self): |
| return RandSInt() |
| |
| class LEIntField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "<I") |
| |
| class LESignedIntField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "<i") |
| def randval(self): |
| return RandSInt() |
| |
| class XIntField(IntField): |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt, x)) |
| |
| |
| class LongField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "Q") |
| |
| class LELongField(LongField): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "<Q") |
| |
| class XLongField(LongField): |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt, x)) |
| |
| class IEEEFloatField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "f") |
| |
| class IEEEDoubleField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "d") |
| |
| |
| class StrField(Field): |
| __slots__ = ["remain"] |
| def __init__(self, name, default, fmt="H", remain=0): |
| Field.__init__(self,name,default,fmt) |
| self.remain = remain |
| def i2len(self, pkt, i): |
| return len(i) |
| def any2i(self, pkt, x): |
| if isinstance(x, str if six.PY3 else unicode): |
| x = raw(x) |
| return super(StrField, self).any2i(pkt, x) |
| def i2repr(self, pkt, x): |
| val = super(StrField, self).i2repr(pkt, x) |
| if val[:2] in ['b"', "b'"]: |
| return val[1:] |
| return val |
| def i2m(self, pkt, x): |
| if x is None: |
| return b"" |
| if not isinstance(x, bytes): |
| return raw(x) |
| return x |
| def addfield(self, pkt, s, val): |
| return s + self.i2m(pkt, val) |
| def getfield(self, pkt, s): |
| if self.remain == 0: |
| return b"", self.m2i(pkt, s) |
| else: |
| return s[-self.remain:],self.m2i(pkt, s[:-self.remain]) |
| def randval(self): |
| return RandBin(RandNum(0,1200)) |
| |
| class PacketField(StrField): |
| __slots__ = ["cls"] |
| holds_packets = 1 |
| def __init__(self, name, default, cls, remain=0): |
| StrField.__init__(self, name, default, remain=remain) |
| self.cls = cls |
| def i2m(self, pkt, i): |
| if i is None: |
| return b"" |
| return raw(i) |
| def m2i(self, pkt, m): |
| return self.cls(m) |
| def getfield(self, pkt, s): |
| i = self.m2i(pkt, s) |
| remain = b"" |
| if conf.padding_layer in i: |
| r = i[conf.padding_layer] |
| del(r.underlayer.payload) |
| remain = r.load |
| return remain,i |
| |
| class PacketLenField(PacketField): |
| __slots__ = ["length_from"] |
| def __init__(self, name, default, cls, length_from=None): |
| PacketField.__init__(self, name, default, cls) |
| self.length_from = length_from |
| def getfield(self, pkt, s): |
| l = self.length_from(pkt) |
| try: |
| i = self.m2i(pkt, s[:l]) |
| except Exception: |
| if conf.debug_dissector: |
| raise |
| i = conf.raw_layer(load=s[:l]) |
| return s[l:],i |
| |
| |
| class PacketListField(PacketField): |
| """ PacketListField represents a series of Packet instances that might occur right in the middle of another Packet |
| field list. |
| This field type may also be used to indicate that a series of Packet instances have a sibling semantic instead of |
| a parent/child relationship (i.e. a stack of layers). |
| """ |
| __slots__ = ["count_from", "length_from", "next_cls_cb"] |
| islist = 1 |
| def __init__(self, name, default, cls=None, count_from=None, length_from=None, next_cls_cb=None): |
| """ The number of Packet instances that are dissected by this field can be parametrized using one of three |
| different mechanisms/parameters: |
| * count_from: a callback that returns the number of Packet instances to dissect. The callback prototype is: |
| count_from(pkt:Packet) -> int |
| * length_from: a callback that returns the number of bytes that must be dissected by this field. The |
| callback prototype is: |
| length_from(pkt:Packet) -> int |
| * next_cls_cb: a callback that enables a Scapy developer to dynamically discover if another Packet instance |
| should be dissected or not. See below for this callback prototype. |
| |
| The bytes that are not consumed during the dissection of this field are passed to the next field of the current |
| packet. |
| |
| For the serialization of such a field, the list of Packets that are contained in a PacketListField can be |
| heterogeneous and is unrestricted. |
| |
| The type of the Packet instances that are dissected with this field is specified or discovered using one of the |
| following mechanism: |
| * the cls parameter may contain a callable that returns an instance of the dissected Packet. This |
| may either be a reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) to generate an |
| homogeneous PacketListField or a function deciding the type of the Packet instance |
| (e.g. _CDPGuessAddrRecord in contrib/cdp.py) |
| * the cls parameter may contain a class object with a defined "dispatch_hook" classmethod. That |
| method must return a Packet instance. The dispatch_hook callmethod must implement the following prototype: |
| dispatch_hook(cls, _pkt:Optional[Packet], *args, **kargs) -> Packet_metaclass |
| The _pkt parameter may contain a reference to the packet instance containing the PacketListField that is |
| being dissected. |
| * the next_cls_cb parameter may contain a callable whose prototype is: |
| cbk(pkt:Packet, lst:List[Packet], cur:Optional[Packet], remain:str) -> Optional[Packet_metaclass] |
| The pkt argument contains a reference to the Packet instance containing the PacketListField that is |
| being dissected. The lst argument is the list of all Packet instances that were previously parsed during |
| the current PacketListField dissection, save for the very last Packet instance. The cur argument |
| contains a reference to that very last parsed Packet instance. The remain argument contains the bytes |
| that may still be consumed by the current PacketListField dissection operation. This callback returns |
| either the type of the next Packet to dissect or None to indicate that no more Packet are to be |
| dissected. |
| These four arguments allows a variety of dynamic discovery of the number of Packet to dissect and of the |
| type of each one of these Packets, including: type determination based on current Packet instances or |
| its underlayers, continuation based on the previously parsed Packet instances within that |
| PacketListField, continuation based on a look-ahead on the bytes to be dissected... |
| |
| The cls and next_cls_cb parameters are semantically exclusive, although one could specify both. If both are |
| specified, cls is silently ignored. The same is true for count_from and next_cls_cb. |
| length_from and next_cls_cb are compatible and the dissection will end, whichever of the two stop conditions |
| comes first. |
| |
| @param name: the name of the field |
| @param default: the default value of this field; generally an empty Python list |
| @param cls: either a callable returning a Packet instance or a class object defining a dispatch_hook class |
| method |
| @param count_from: a callback returning the number of Packet instances to dissect |
| @param length_from: a callback returning the number of bytes to dissect |
| @param next_cls_cb: a callback returning either None or the type of the next Packet to dissect. |
| """ |
| if default is None: |
| default = [] # Create a new list for each instance |
| PacketField.__init__(self, name, default, cls) |
| self.count_from = count_from |
| self.length_from = length_from |
| self.next_cls_cb = next_cls_cb |
| |
| def any2i(self, pkt, x): |
| if not isinstance(x, list): |
| return [x] |
| else: |
| return x |
| def i2count(self, pkt, val): |
| if isinstance(val, list): |
| return len(val) |
| return 1 |
| def i2len(self, pkt, val): |
| return sum( len(p) for p in val ) |
| def do_copy(self, x): |
| if x is None: |
| return None |
| else: |
| return [p if isinstance(p, six.string_types) else p.copy() for p in x] |
| def getfield(self, pkt, s): |
| c = l = cls = None |
| if self.length_from is not None: |
| l = self.length_from(pkt) |
| elif self.count_from is not None: |
| c = self.count_from(pkt) |
| if self.next_cls_cb is not None: |
| cls = self.next_cls_cb(pkt, [], None, s) |
| c = 1 |
| |
| lst = [] |
| ret = b"" |
| remain = s |
| if l is not None: |
| remain,ret = s[:l],s[l:] |
| while remain: |
| if c is not None: |
| if c <= 0: |
| break |
| c -= 1 |
| try: |
| if cls is not None: |
| p = cls(remain) |
| else: |
| p = self.m2i(pkt, remain) |
| except Exception: |
| if conf.debug_dissector: |
| raise |
| p = conf.raw_layer(load=remain) |
| remain = b"" |
| else: |
| if conf.padding_layer in p: |
| pad = p[conf.padding_layer] |
| remain = pad.load |
| del(pad.underlayer.payload) |
| if self.next_cls_cb is not None: |
| cls = self.next_cls_cb(pkt, lst, p, remain) |
| if cls is not None: |
| c += 1 |
| else: |
| remain = b"" |
| lst.append(p) |
| return remain+ret,lst |
| def addfield(self, pkt, s, val): |
| return s + b"".join(raw(v) for v in val) |
| |
| |
| class StrFixedLenField(StrField): |
| __slots__ = ["length_from"] |
| def __init__(self, name, default, length=None, length_from=None): |
| StrField.__init__(self, name, default) |
| self.length_from = length_from |
| if length is not None: |
| self.length_from = lambda pkt,length=length: length |
| def i2repr(self, pkt, v): |
| if isinstance(v, bytes): |
| v = v.rstrip(b"\0") |
| return super(StrFixedLenField, self).i2repr(pkt, v) |
| def getfield(self, pkt, s): |
| l = self.length_from(pkt) |
| return s[l:], self.m2i(pkt,s[:l]) |
| def addfield(self, pkt, s, val): |
| l = self.length_from(pkt) |
| return s+struct.pack("%is"%l,self.i2m(pkt, val)) |
| def randval(self): |
| try: |
| l = self.length_from(None) |
| except: |
| l = RandNum(0,200) |
| return RandBin(l) |
| |
| class StrFixedLenEnumField(StrFixedLenField): |
| __slots__ = ["enum"] |
| def __init__(self, name, default, length=None, enum=None, length_from=None): |
| StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) |
| self.enum = enum |
| def i2repr(self, pkt, v): |
| r = v.rstrip("\0") |
| rr = repr(r) |
| if v in self.enum: |
| rr = "%s (%s)" % (rr, self.enum[v]) |
| elif r in self.enum: |
| rr = "%s (%s)" % (rr, self.enum[r]) |
| return rr |
| |
| class NetBIOSNameField(StrFixedLenField): |
| def __init__(self, name, default, length=31): |
| StrFixedLenField.__init__(self, name, default, length) |
| def i2m(self, pkt, x): |
| l = self.length_from(pkt)//2 |
| if x is None: |
| x = b"" |
| x += b" "*(l) |
| x = x[:l] |
| x = b"".join(chb(0x41 + orb(b)>>4) + chb(0x41 + orb(b)&0xf) for b in x) |
| x = b" "+x |
| return x |
| def m2i(self, pkt, x): |
| x = x.strip(b"\x00").strip(b" ") |
| return b"".join(map(lambda x,y: chb((((orb(x)-1)&0xf)<<4)+((orb(y)-1)&0xf)), x[::2],x[1::2])) |
| |
| class StrLenField(StrField): |
| __slots__ = ["length_from"] |
| def __init__(self, name, default, fld=None, length_from=None): |
| StrField.__init__(self, name, default) |
| self.length_from = length_from |
| def getfield(self, pkt, s): |
| l = self.length_from(pkt) |
| return s[l:], self.m2i(pkt,s[:l]) |
| |
| class XStrField(StrField): |
| """ |
| StrField which value is printed as hexadecimal. |
| """ |
| |
| def i2repr(self, pkt, x): |
| if x is None: |
| return repr(x) |
| return bytes_hex(x).decode() |
| |
| class XStrLenField(StrLenField): |
| """ |
| StrLenField which value is printed as hexadecimal. |
| """ |
| |
| def i2repr(self, pkt, x): |
| if not x: |
| return repr(x) |
| return bytes_hex(x[:self.length_from(pkt)]).decode() |
| |
| class XStrFixedLenField(StrFixedLenField): |
| """ |
| StrFixedLenField which value is printed as hexadecimal. |
| """ |
| |
| def i2repr(self, pkt, x): |
| if not x: |
| return repr(x) |
| return bytes_hex(x[:self.length_from(pkt)]).decode() |
| |
| class StrLenFieldUtf16(StrLenField): |
| def h2i(self, pkt, x): |
| return plain_str(x).encode('utf-16')[2:] |
| def i2h(self, pkt, x): |
| return x.decode('utf-16') |
| |
| class BoundStrLenField(StrLenField): |
| __slots__ = ["minlen", "maxlen"] |
| def __init__(self,name, default, minlen= 0, maxlen= 255, fld=None, length_from=None): |
| StrLenField.__init__(self, name, default, fld, length_from) |
| self.minlen = minlen |
| self.maxlen = maxlen |
| |
| def randval(self): |
| return RandBin(RandNum(self.minlen, self.maxlen)) |
| |
| class FieldListField(Field): |
| __slots__ = ["field", "count_from", "length_from"] |
| islist = 1 |
| def __init__(self, name, default, field, length_from=None, count_from=None): |
| if default is None: |
| default = [] # Create a new list for each instance |
| self.field = field |
| Field.__init__(self, name, default) |
| self.count_from = count_from |
| self.length_from = length_from |
| |
| def i2count(self, pkt, val): |
| if isinstance(val, list): |
| return len(val) |
| return 1 |
| def i2len(self, pkt, val): |
| return int(sum(self.field.i2len(pkt,v) for v in val)) |
| |
| def i2m(self, pkt, val): |
| if val is None: |
| val = [] |
| return val |
| def any2i(self, pkt, x): |
| if not isinstance(x, list): |
| return [self.field.any2i(pkt, x)] |
| else: |
| return [self.field.any2i(pkt, e) for e in x] |
| def i2repr(self, pkt, x): |
| res = [] |
| for v in x: |
| r = self.field.i2repr(pkt, v) |
| res.append(r) |
| return "[%s]" % ", ".join(res) |
| def addfield(self, pkt, s, val): |
| val = self.i2m(pkt, val) |
| for v in val: |
| s = self.field.addfield(pkt, s, v) |
| return s |
| def getfield(self, pkt, s): |
| c = l = None |
| if self.length_from is not None: |
| l = self.length_from(pkt) |
| elif self.count_from is not None: |
| c = self.count_from(pkt) |
| |
| val = [] |
| ret = b"" |
| if l is not None: |
| s,ret = s[:l],s[l:] |
| |
| while s: |
| if c is not None: |
| if c <= 0: |
| break |
| c -= 1 |
| s,v = self.field.getfield(pkt, s) |
| val.append(v) |
| return s+ret, val |
| |
| class FieldLenField(Field): |
| __slots__ = ["length_of", "count_of", "adjust"] |
| def __init__(self, name, default, length_of=None, fmt = "H", count_of=None, adjust=lambda pkt,x:x, fld=None): |
| Field.__init__(self, name, default, fmt) |
| self.length_of = length_of |
| self.count_of = count_of |
| self.adjust = adjust |
| if fld is not None: |
| #FIELD_LENGTH_MANAGEMENT_DEPRECATION(self.__class__.__name__) |
| self.length_of = fld |
| def i2m(self, pkt, x): |
| if x is None: |
| if self.length_of is not None: |
| fld,fval = pkt.getfield_and_val(self.length_of) |
| f = fld.i2len(pkt, fval) |
| else: |
| fld,fval = pkt.getfield_and_val(self.count_of) |
| f = fld.i2count(pkt, fval) |
| x = self.adjust(pkt,f) |
| return x |
| |
| class StrNullField(StrField): |
| def addfield(self, pkt, s, val): |
| return s+self.i2m(pkt, val)+b"\x00" |
| def getfield(self, pkt, s): |
| l = s.find(b"\x00") |
| if l < 0: |
| #XXX \x00 not found |
| return b"",s |
| return s[l+1:],self.m2i(pkt, s[:l]) |
| def randval(self): |
| return RandTermString(RandNum(0,1200),b"\x00") |
| |
| class StrStopField(StrField): |
| __slots__ = ["stop", "additionnal"] |
| def __init__(self, name, default, stop, additionnal=0): |
| Field.__init__(self, name, default) |
| self.stop = stop |
| self.additionnal = additionnal |
| def getfield(self, pkt, s): |
| l = s.find(self.stop) |
| if l < 0: |
| return b"",s |
| # raise Scapy_Exception,"StrStopField: stop value [%s] not found" %stop |
| l += len(self.stop)+self.additionnal |
| return s[l:],s[:l] |
| def randval(self): |
| return RandTermString(RandNum(0,1200),self.stop) |
| |
| class LenField(Field): |
| __slots__ = ["adjust"] |
| def __init__(self, name, default, fmt="H", adjust=lambda x: x): |
| Field.__init__(self, name, default, fmt) |
| self.adjust = adjust |
| def i2m(self, pkt, x): |
| if x is None: |
| x = self.adjust(len(pkt.payload)) |
| return x |
| |
| class BCDFloatField(Field): |
| def i2m(self, pkt, x): |
| return int(256*x) |
| def m2i(self, pkt, x): |
| return x/256.0 |
| |
| class BitField(Field): |
| __slots__ = ["rev", "size"] |
| def __init__(self, name, default, size): |
| Field.__init__(self, name, default) |
| self.rev = size < 0 |
| self.size = abs(size) |
| def reverse(self, val): |
| if self.size == 16: |
| # Replaces socket.ntohs (but work on both little/big endian) |
| val = struct.unpack('>H',struct.pack('<H', int(val)))[0] |
| elif self.size == 32: |
| # Same here but for socket.ntohl |
| val = struct.unpack('>I',struct.pack('<I', int(val)))[0] |
| return val |
| |
| def addfield(self, pkt, s, val): |
| val = self.i2m(pkt, val) |
| if isinstance(s, tuple): |
| s,bitsdone,v = s |
| else: |
| bitsdone = 0 |
| v = 0 |
| if self.rev: |
| val = self.reverse(val) |
| v <<= self.size |
| v |= val & ((1<<self.size) - 1) |
| bitsdone += self.size |
| while bitsdone >= 8: |
| bitsdone -= 8 |
| s = s+struct.pack("!B", v >> bitsdone) |
| v &= (1<<bitsdone)-1 |
| if bitsdone: |
| return s,bitsdone,v |
| else: |
| return s |
| def getfield(self, pkt, s): |
| if isinstance(s, tuple): |
| s,bn = s |
| else: |
| bn = 0 |
| # we don't want to process all the string |
| nb_bytes = (self.size+bn-1)//8 + 1 |
| w = s[:nb_bytes] |
| |
| # split the substring byte by byte |
| _bytes = struct.unpack('!%dB' % nb_bytes , w) |
| |
| b = 0 |
| for c in range(nb_bytes): |
| b |= int(_bytes[c]) << (nb_bytes-c-1)*8 |
| |
| # get rid of high order bits |
| b &= (1 << (nb_bytes*8-bn)) - 1 |
| |
| # remove low order bits |
| b = b >> (nb_bytes*8 - self.size - bn) |
| |
| if self.rev: |
| b = self.reverse(b) |
| |
| bn += self.size |
| s = s[bn//8:] |
| bn = bn%8 |
| b = self.m2i(pkt, b) |
| if bn: |
| return (s,bn),b |
| else: |
| return s,b |
| def randval(self): |
| return RandNum(0,2**self.size-1) |
| def i2len(self, pkt, x): |
| return float(self.size)/8 |
| |
| |
| class BitFieldLenField(BitField): |
| __slots__ = ["length_of", "count_of", "adjust"] |
| def __init__(self, name, default, size, length_of=None, count_of=None, adjust=lambda pkt,x:x): |
| BitField.__init__(self, name, default, size) |
| self.length_of = length_of |
| self.count_of = count_of |
| self.adjust = adjust |
| def i2m(self, pkt, x): |
| return (FieldLenField.i2m.__func__ if six.PY2 else FieldLenField.i2m)(self, pkt, x) |
| |
| |
| class XBitField(BitField): |
| def i2repr(self, pkt, x): |
| return lhex(self.i2h(pkt,x)) |
| |
| |
| class _EnumField(Field): |
| def __init__(self, name, default, enum, fmt = "H"): |
| """ Initializes enum fields. |
| |
| @param name: name of this field |
| @param default: default value of this field |
| @param enum: either a dict or a tuple of two callables. Dict keys are |
| the internal values, while the dict values are the |
| user-friendly representations. If the tuple is provided, |
| the first callable receives the internal value as |
| parameter and returns the user-friendly representation |
| and the second callable does the converse. The first |
| callable may return None to default to a literal string |
| (repr()) representation. |
| @param fmt: struct.pack format used to parse and serialize the |
| internal value from and to machine representation. |
| """ |
| if isinstance(enum, tuple): |
| self.i2s_cb = enum[0] |
| self.s2i_cb = enum[1] |
| self.i2s = None |
| self.s2i = None |
| else: |
| i2s = self.i2s = {} |
| s2i = self.s2i = {} |
| self.i2s_cb = None |
| self.s2i_cb = None |
| if isinstance(enum, list): |
| keys = range(len(enum)) |
| elif isinstance(enum, DADict): |
| keys = enum.iterkeys() |
| else: |
| keys = list(enum) |
| if any(isinstance(x, str) for x in keys): |
| i2s, s2i = s2i, i2s |
| for k in keys: |
| i2s[k] = enum[k] |
| s2i[enum[k]] = k |
| Field.__init__(self, name, default, fmt) |
| |
| def any2i_one(self, pkt, x): |
| if isinstance(x, str): |
| try: |
| x = self.s2i[x] |
| except TypeError: |
| x = self.s2i_cb(x) |
| return x |
| |
| def i2repr_one(self, pkt, x): |
| if self not in conf.noenum and not isinstance(x,VolatileValue): |
| try: |
| return self.i2s[x] |
| except KeyError: |
| pass |
| except TypeError: |
| ret = self.i2s_cb(x) |
| if ret is not None: |
| return ret |
| return repr(x) |
| |
| def any2i(self, pkt, x): |
| if isinstance(x, list): |
| return [self.any2i_one(pkt, z) for z in x] |
| else: |
| return self.any2i_one(pkt,x) |
| |
| def i2repr(self, pkt, x): |
| if isinstance(x, list): |
| return [self.i2repr_one(pkt, z) for z in x] |
| else: |
| return self.i2repr_one(pkt,x) |
| |
| class EnumField(_EnumField): |
| __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"] |
| |
| class CharEnumField(EnumField): |
| def __init__(self, name, default, enum, fmt = "1s"): |
| EnumField.__init__(self, name, default, enum, fmt) |
| if self.i2s is not None: |
| k = list(self.i2s) |
| if k and len(k[0]) != 1: |
| self.i2s,self.s2i = self.s2i,self.i2s |
| def any2i_one(self, pkt, x): |
| if len(x) != 1: |
| if self.s2i is None: |
| x = self.s2i_cb(x) |
| else: |
| x = self.s2i[x] |
| return x |
| |
| class BitEnumField(BitField, _EnumField): |
| __slots__ = EnumField.__slots__ |
| def __init__(self, name, default, size, enum): |
| _EnumField.__init__(self, name, default, enum) |
| self.rev = size < 0 |
| self.size = abs(size) |
| def any2i(self, pkt, x): |
| return _EnumField.any2i(self, pkt, x) |
| def i2repr(self, pkt, x): |
| return _EnumField.i2repr(self, pkt, x) |
| |
| class ShortEnumField(EnumField): |
| __slots__ = EnumField.__slots__ |
| def __init__(self, name, default, enum): |
| EnumField.__init__(self, name, default, enum, "H") |
| |
| class LEShortEnumField(EnumField): |
| def __init__(self, name, default, enum): |
| EnumField.__init__(self, name, default, enum, "<H") |
| |
| class ByteEnumField(EnumField): |
| def __init__(self, name, default, enum): |
| EnumField.__init__(self, name, default, enum, "B") |
| |
| class IntEnumField(EnumField): |
| def __init__(self, name, default, enum): |
| EnumField.__init__(self, name, default, enum, "I") |
| |
| class SignedIntEnumField(EnumField): |
| def __init__(self, name, default, enum): |
| EnumField.__init__(self, name, default, enum, "i") |
| def randval(self): |
| return RandSInt() |
| |
| class LEIntEnumField(EnumField): |
| def __init__(self, name, default, enum): |
| EnumField.__init__(self, name, default, enum, "<I") |
| |
| class XShortEnumField(ShortEnumField): |
| def i2repr_one(self, pkt, x): |
| if self not in conf.noenum and not isinstance(x,VolatileValue): |
| try: |
| return self.i2s[x] |
| except KeyError: |
| pass |
| except TypeError: |
| ret = self.i2s_cb(x) |
| if ret is not None: |
| return ret |
| return lhex(x) |
| |
| |
| class _MultiEnumField(_EnumField): |
| def __init__(self, name, default, enum, depends_on, fmt = "H"): |
| |
| self.depends_on = depends_on |
| self.i2s_multi = enum |
| self.s2i_multi = {} |
| self.s2i_all = {} |
| for m in enum: |
| self.s2i_multi[m] = s2i = {} |
| for k,v in six.iteritems(enum[m]): |
| s2i[v] = k |
| self.s2i_all[v] = k |
| Field.__init__(self, name, default, fmt) |
| def any2i_one(self, pkt, x): |
| if isinstance(x, str): |
| v = self.depends_on(pkt) |
| if v in self.s2i_multi: |
| s2i = self.s2i_multi[v] |
| if x in s2i: |
| return s2i[x] |
| return self.s2i_all[x] |
| return x |
| def i2repr_one(self, pkt, x): |
| v = self.depends_on(pkt) |
| if v in self.i2s_multi: |
| return self.i2s_multi[v].get(x,x) |
| return x |
| |
| class MultiEnumField(_MultiEnumField, EnumField): |
| __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"] |
| |
| class BitMultiEnumField(BitField, _MultiEnumField): |
| __slots__ = EnumField.__slots__ + MultiEnumField.__slots__ |
| def __init__(self, name, default, size, enum, depends_on): |
| _MultiEnumField.__init__(self, name, default, enum, depends_on) |
| self.rev = size < 0 |
| self.size = abs(size) |
| def any2i(self, pkt, x): |
| return _MultiEnumField.any2i(self, pkt, x) |
| def i2repr(self, pkt, x): |
| return _MultiEnumField.i2repr(self, pkt, x) |
| |
| |
| class ByteEnumKeysField(ByteEnumField): |
| """ByteEnumField that picks valid values when fuzzed. """ |
| def randval(self): |
| return RandEnumKeys(self.i2s) |
| |
| |
| class ShortEnumKeysField(ShortEnumField): |
| """ShortEnumField that picks valid values when fuzzed. """ |
| def randval(self): |
| return RandEnumKeys(self.i2s) |
| |
| |
| class IntEnumKeysField(IntEnumField): |
| """IntEnumField that picks valid values when fuzzed. """ |
| def randval(self): |
| return RandEnumKeys(self.i2s) |
| |
| |
| # Little endian long field |
| class LELongField(Field): |
| def __init__(self, name, default): |
| Field.__init__(self, name, default, "<Q") |
| |
| # Little endian fixed length field |
| class LEFieldLenField(FieldLenField): |
| def __init__(self, name, default, length_of=None, fmt = "<H", count_of=None, adjust=lambda pkt,x:x, fld=None): |
| FieldLenField.__init__(self, name, default, length_of=length_of, fmt=fmt, count_of=count_of, fld=fld, adjust=adjust) |
| |
| |
| class FlagValue(object): |
| __slots__ = ["value", "names", "multi"] |
| def _fixvalue(self, value): |
| if isinstance(value, six.string_types): |
| value = value.split('+') if self.multi else list(value) |
| if isinstance(value, list): |
| y = 0 |
| for i in value: |
| y |= 1 << self.names.index(i) |
| value = y |
| return None if value is None else int(value) |
| def __init__(self, value, names): |
| self.multi = isinstance(names, list) |
| self.names = names |
| self.value = self._fixvalue(value) |
| def __hash__(self): |
| return hash(self.value) |
| def __int__(self): |
| return self.value |
| def __eq__(self, other): |
| return self.value == self._fixvalue(other) |
| def __lt__(self, other): |
| return self.value < self._fixvalue(other) |
| def __le__(self, other): |
| return self.value <= self._fixvalue(other) |
| def __gt__(self, other): |
| return self.value > self._fixvalue(other) |
| def __ge__(self, other): |
| return self.value >= self._fixvalue(other) |
| def __ne__(self, other): |
| return self.value != self._fixvalue(other) |
| def __and__(self, other): |
| return self.__class__(self.value & self._fixvalue(other), self.names) |
| __rand__ = __and__ |
| def __or__(self, other): |
| return self.__class__(self.value | self._fixvalue(other), self.names) |
| __ror__ = __or__ |
| def __lshift__(self, other): |
| return self.value << self._fixvalue(other) |
| def __rshift__(self, other): |
| return self.value >> self._fixvalue(other) |
| def __nonzero__(self): |
| return bool(self.value) |
| __bool__ = __nonzero__ |
| def flagrepr(self): |
| warning("obj.flagrepr() is obsolete. Use str(obj) instead.") |
| return str(self) |
| def __str__(self): |
| i = 0 |
| r = [] |
| x = int(self) |
| while x: |
| if x & 1: |
| r.append(self.names[i]) |
| i += 1 |
| x >>= 1 |
| return ("+" if self.multi else "").join(r) |
| def __repr__(self): |
| return "<Flag %d (%s)>" % (self, self) |
| def __deepcopy__(self, memo): |
| return self.__class__(int(self), self.names) |
| def __getattr__(self, attr): |
| if attr in self.__slots__: |
| return super(FlagValue, self).__getattr__(attr) |
| try: |
| if self.multi: |
| return bool((2 ** self.names.index(attr)) & int(self)) |
| return all(bool((2 ** self.names.index(flag)) & int(self)) |
| for flag in attr) |
| except ValueError: |
| return super(FlagValue, self).__getattr__(attr) |
| def __setattr__(self, attr, value): |
| if attr == "value" and not isinstance(value, six.integer_types): |
| raise ValueError(value) |
| if attr in self.__slots__: |
| return super(FlagValue, self).__setattr__(attr, value) |
| if attr in self.names: |
| if value: |
| self.value |= (2 ** self.names.index(attr)) |
| else: |
| self.value &= ~(2 ** self.names.index(attr)) |
| else: |
| return super(FlagValue, self).__setattr__(attr, value) |
| def copy(self): |
| return self.__class__(self.value, self.names) |
| |
| |
| class FlagsField(BitField): |
| """ Handle Flag type field |
| |
| Make sure all your flags have a label |
| |
| Example: |
| >>> from scapy.packet import Packet |
| >>> class FlagsTest(Packet): |
| fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])] |
| >>> FlagsTest(flags=9).show2() |
| ###[ FlagsTest ]### |
| flags = f0+f3 |
| >>> FlagsTest(flags=0).show2().strip() |
| ###[ FlagsTest ]### |
| flags = |
| |
| :param name: field's name |
| :param default: default value for the field |
| :param size: number of bits in the field |
| :param names: (list or dict) label for each flag, Least Significant Bit tag's name is written first |
| """ |
| ismutable = True |
| __slots__ = ["multi", "names"] |
| |
| def __init__(self, name, default, size, names): |
| self.multi = isinstance(names, list) |
| self.names = names |
| BitField.__init__(self, name, default, size) |
| |
| def _fixup_val(self, x): |
| """Returns a FlagValue instance when needed. Internal method, to be |
| used in *2i() and i2*() methods. |
| |
| """ |
| if isinstance(x, (list, tuple)): |
| return type(x)( |
| v if v is None or isinstance(v, FlagValue) |
| else FlagValue(v, self.names) |
| for v in x |
| ) |
| return x if x is None or isinstance(x, FlagValue) else FlagValue(x, self.names) |
| |
| def any2i(self, pkt, x): |
| return self._fixup_val(super(FlagsField, self).any2i(pkt, x)) |
| |
| def m2i(self, pkt, x): |
| return self._fixup_val(super(FlagsField, self).m2i(pkt, x)) |
| |
| def i2h(self, pkt, x): |
| return self._fixup_val(super(FlagsField, self).i2h(pkt, x)) |
| |
| def i2repr(self, pkt, x): |
| if isinstance(x, (list, tuple)): |
| return repr(type(x)( |
| None if v is None else str(self._fixup_val(v)) for v in x |
| )) |
| return None if x is None else str(self._fixup_val(x)) |
| |
| |
| MultiFlagsEntry = collections.namedtuple('MultiFlagEntry', ['short', 'long']) |
| |
| |
| class MultiFlagsField(BitField): |
| __slots__ = FlagsField.__slots__ + ["depends_on"] |
| |
| def __init__(self, name, default, size, names, depends_on): |
| self.names = names |
| self.depends_on = depends_on |
| super(MultiFlagsField, self).__init__(name, default, size) |
| |
| def any2i(self, pkt, x): |
| assert isinstance(x, six.integer_types + (set,)), 'set expected' |
| |
| if pkt is not None: |
| if isinstance(x, six.integer_types): |
| x = self.m2i(pkt, x) |
| else: |
| v = self.depends_on(pkt) |
| if v is not None: |
| assert v in self.names, 'invalid dependency' |
| these_names = self.names[v] |
| s = set() |
| for i in x: |
| for val in six.itervalues(these_names): |
| if val.short == i: |
| s.add(i) |
| break |
| else: |
| assert False, 'Unknown flag "{}" with this dependency'.format(i) |
| continue |
| x = s |
| return x |
| |
| def i2m(self, pkt, x): |
| v = self.depends_on(pkt) |
| if v in self.names: |
| these_names = self.names[v] |
| else: |
| these_names = {} |
| |
| r = 0 |
| for flag_set in x: |
| for i, val in six.iteritems(these_names): |
| if val.short == flag_set: |
| r |= 1 << i |
| break |
| else: |
| r |= 1 << int(flag_set[len('bit '):]) |
| return r |
| |
| def m2i(self, pkt, x): |
| v = self.depends_on(pkt) |
| if v in self.names: |
| these_names = self.names[v] |
| else: |
| these_names = {} |
| |
| r = set() |
| i = 0 |
| |
| while x: |
| if x & 1: |
| if i in these_names: |
| r.add(these_names[i].short) |
| else: |
| r.add('bit {}'.format(i)) |
| x >>= 1 |
| i += 1 |
| return r |
| |
| def i2repr(self, pkt, x): |
| v = self.depends_on(pkt) |
| if v in self.names: |
| these_names = self.names[v] |
| else: |
| these_names = {} |
| |
| r = set() |
| for flag_set in x: |
| for i in six.itervalues(these_names): |
| if i.short == flag_set: |
| r.add("{} ({})".format(i.long, i.short)) |
| break |
| else: |
| r.add(flag_set) |
| return repr(r) |
| |
| |
| class FixedPointField(BitField): |
| __slots__ = ['frac_bits'] |
| def __init__(self, name, default, size, frac_bits=16): |
| self.frac_bits = frac_bits |
| BitField.__init__(self, name, default, size) |
| |
| def any2i(self, pkt, val): |
| if val is None: |
| return val |
| ival = int(val) |
| fract = int( (val-ival) * 2**self.frac_bits ) |
| return (ival << self.frac_bits) | fract |
| |
| def i2h(self, pkt, val): |
| int_part = val >> self.frac_bits |
| frac_part = val & (1 << self.frac_bits) - 1 |
| frac_part /= 2.0**self.frac_bits |
| return int_part+frac_part |
| def i2repr(self, pkt, val): |
| return self.i2h(pkt, val) |
| |
| |
| # Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field. |
| # Machine values are encoded in a multiple of wordbytes bytes. |
| class _IPPrefixFieldBase(Field): |
| __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"] |
| def __init__(self, name, default, wordbytes, maxbytes, aton, ntoa, length_from): |
| self.wordbytes = wordbytes |
| self.maxbytes = maxbytes |
| self.aton = aton |
| self.ntoa = ntoa |
| Field.__init__(self, name, default, "%is" % self.maxbytes) |
| self.length_from = length_from |
| |
| def _numbytes(self, pfxlen): |
| wbits= self.wordbytes * 8 |
| return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes |
| |
| def h2i(self, pkt, x): |
| # "fc00:1::1/64" -> ("fc00:1::1", 64) |
| [pfx,pfxlen]= x.split('/') |
| self.aton(pfx) # check for validity |
| return (pfx, int(pfxlen)) |
| |
| |
| def i2h(self, pkt, x): |
| # ("fc00:1::1", 64) -> "fc00:1::1/64" |
| (pfx,pfxlen)= x |
| return "%s/%i" % (pfx,pfxlen) |
| |
| def i2m(self, pkt, x): |
| # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) |
| (pfx,pfxlen)= x |
| s= self.aton(pfx); |
| return (s[:self._numbytes(pfxlen)], pfxlen) |
| |
| def m2i(self, pkt, x): |
| # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64) |
| (s,pfxlen)= x |
| |
| if len(s) < self.maxbytes: |
| s= s + (b"\0" * (self.maxbytes - len(s))) |
| return (self.ntoa(s), pfxlen) |
| |
| def any2i(self, pkt, x): |
| if x is None: |
| return (self.ntoa(b"\0"*self.maxbytes), 1) |
| |
| return self.h2i(pkt,x) |
| |
| def i2len(self, pkt, x): |
| (_,pfxlen)= x |
| return pfxlen |
| |
| def addfield(self, pkt, s, val): |
| (rawpfx,pfxlen)= self.i2m(pkt,val) |
| fmt= "!%is" % self._numbytes(pfxlen) |
| return s+struct.pack(fmt, rawpfx) |
| |
| def getfield(self, pkt, s): |
| pfxlen= self.length_from(pkt) |
| numbytes= self._numbytes(pfxlen) |
| fmt= "!%is" % numbytes |
| return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen)) |
| |
| |
| class IPPrefixField(_IPPrefixFieldBase): |
| def __init__(self, name, default, wordbytes=1, length_from= None): |
| _IPPrefixFieldBase.__init__(self, name, default, wordbytes, 4, inet_aton, inet_ntoa, length_from) |
| |
| |
| class IP6PrefixField(_IPPrefixFieldBase): |
| def __init__(self, name, default, wordbytes= 1, length_from= None): |
| _IPPrefixFieldBase.__init__(self, name, default, wordbytes, 16, lambda a: inet_pton(socket.AF_INET6, a), lambda n: inet_ntop(socket.AF_INET6, n), length_from) |
| |
| class UTCTimeField(IntField): |
| __slots__ = ["epoch", "delta", "strf", "use_nano"] |
| def __init__(self, name, default, epoch=None, use_nano=False, strf="%a, %d %b %Y %H:%M:%S +0000"): |
| IntField.__init__(self, name, default) |
| if epoch is None: |
| mk_epoch = EPOCH |
| else: |
| mk_epoch = time.mktime(epoch) |
| self.epoch = mk_epoch |
| self.delta = mk_epoch - EPOCH |
| self.strf = strf |
| self.use_nano = use_nano |
| def i2repr(self, pkt, x): |
| if x is None: |
| x = 0 |
| elif self.use_nano: |
| x = x/1e9 |
| x = int(x) + self.delta |
| t = time.strftime(self.strf, time.gmtime(x)) |
| return "%s (%d)" % (t, x) |
| def i2m(self, pkt, x): |
| return int(x) if x != None else 0 |