| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## This program is published under a GPLv2 license |
| |
| """ |
| Packet class. Binding mechanism. fuzz() method. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| import re |
| import time,itertools |
| import copy |
| import subprocess |
| |
| from scapy.fields import StrField, ConditionalField, Emph, PacketListField, BitField, \ |
| MultiEnumField, EnumField, FlagsField |
| from scapy.config import conf |
| from scapy.compat import * |
| from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass |
| from scapy.volatile import VolatileValue |
| from scapy.utils import import_hexcap,tex_escape,colgen,get_temp_file, \ |
| ContextManagerSubprocess |
| from scapy.error import Scapy_Exception, log_runtime |
| from scapy.consts import PYX |
| import scapy.modules.six as six |
| |
| try: |
| import pyx |
| except ImportError: |
| pass |
| |
| |
| class RawVal: |
| def __init__(self, val=""): |
| self.val = val |
| def __str__(self): |
| return str(self.val) |
| def __bytes__(self): |
| return raw(self.val) |
| def __repr__(self): |
| return "<RawVal [%r]>" % self.val |
| |
| |
| class Packet(six.with_metaclass(Packet_metaclass, BasePacket)): |
| __slots__ = [ |
| "time", "sent_time", "name", "default_fields", |
| "overload_fields", "overloaded_fields", "fields", "fieldtype", |
| "packetfields", |
| "original", "explicit", "raw_packet_cache", |
| "raw_packet_cache_fields", "_pkt", "post_transforms", |
| # then payload and underlayer |
| "payload", "underlayer", |
| "name", |
| # used for sr() |
| "_answered", |
| # used when sniffing |
| "direction", "sniffed_on" |
| ] |
| name = None |
| fields_desc = [] |
| overload_fields = {} |
| payload_guess = [] |
| show_indent = 1 |
| show_summary = True |
| |
| @classmethod |
| def from_hexcap(cls): |
| return cls(import_hexcap()) |
| |
| @classmethod |
| def upper_bonds(self): |
| for fval,upper in self.payload_guess: |
| print("%-20s %s" % (upper.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in six.iteritems(fval)))) |
| |
| @classmethod |
| def lower_bonds(self): |
| for lower,fval in six.iteritems(self._overload_fields): |
| print("%-20s %s" % (lower.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in six.iteritems(fval)))) |
| |
| def _unpickle(self, dlist): |
| """Used to unpack pickling""" |
| self.__init__(b"".join(dlist)) |
| return self |
| |
| def __reduce__(self): |
| """Used by pickling methods""" |
| return (self.__class__, (), (self.build(),)) |
| |
| def __reduce_ex__(self, proto): |
| """Used by pickling methods""" |
| return self.__reduce__() |
| |
| def __getstate__(self): |
| """Mark object as pickable""" |
| return self.__reduce__()[2] |
| |
| def __setstate__(self, state): |
| """Rebuild state using pickable methods""" |
| return self._unpickle(state) |
| |
| def __deepcopy__(self, memo): |
| """Used by copy.deepcopy""" |
| return self.copy() |
| |
| def __init__(self, _pkt=b"", post_transform=None, _internal=0, _underlayer=None, **fields): |
| self.time = time.time() |
| self.sent_time = None |
| self.name = (self.__class__.__name__ |
| if self._name is None else |
| self._name) |
| self.default_fields = {} |
| self.overload_fields = self._overload_fields |
| self.overloaded_fields = {} |
| self.fields = {} |
| self.fieldtype = {} |
| self.packetfields = [] |
| self.payload = NoPayload() |
| self.init_fields() |
| self.underlayer = _underlayer |
| self.original = _pkt |
| self.explicit = 0 |
| self.raw_packet_cache = None |
| self.raw_packet_cache_fields = None |
| if _pkt: |
| self.dissect(_pkt) |
| if not _internal: |
| self.dissection_done(self) |
| for f, v in six.iteritems(fields): |
| self.fields[f] = self.get_field(f).any2i(self, v) |
| if isinstance(post_transform, list): |
| self.post_transforms = post_transform |
| elif post_transform is None: |
| self.post_transforms = [] |
| else: |
| self.post_transforms = [post_transform] |
| |
| def init_fields(self): |
| """ |
| Initialize each fields of the fields_desc dict |
| """ |
| self.do_init_fields(self.fields_desc) |
| |
| def do_init_fields(self, flist): |
| """ |
| Initialize each fields of the fields_desc dict |
| """ |
| for f in flist: |
| self.default_fields[f.name] = copy.deepcopy(f.default) |
| self.fieldtype[f.name] = f |
| if f.holds_packets: |
| self.packetfields.append(f) |
| |
| def dissection_done(self,pkt): |
| """DEV: will be called after a dissection is completed""" |
| self.post_dissection(pkt) |
| self.payload.dissection_done(pkt) |
| |
| def post_dissection(self, pkt): |
| """DEV: is called after the dissection of the whole packet""" |
| pass |
| |
| def get_field(self, fld): |
| """DEV: returns the field instance from the name of the field""" |
| return self.fieldtype[fld] |
| |
| def add_payload(self, payload): |
| if payload is None: |
| return |
| elif not isinstance(self.payload, NoPayload): |
| self.payload.add_payload(payload) |
| else: |
| if isinstance(payload, Packet): |
| self.payload = payload |
| payload.add_underlayer(self) |
| for t in self.aliastypes: |
| if t in payload.overload_fields: |
| self.overloaded_fields = payload.overload_fields[t] |
| break |
| elif isinstance(payload, bytes): |
| self.payload = conf.raw_layer(load=payload) |
| else: |
| raise TypeError("payload must be either 'Packet' or 'bytes', not [%s]" % repr(payload)) |
| def remove_payload(self): |
| self.payload.remove_underlayer(self) |
| self.payload = NoPayload() |
| self.overloaded_fields = {} |
| def add_underlayer(self, underlayer): |
| self.underlayer = underlayer |
| def remove_underlayer(self,other): |
| self.underlayer = None |
| def copy(self): |
| """Returns a deep copy of the instance.""" |
| clone = self.__class__() |
| clone.fields = self.copy_fields_dict(self.fields) |
| clone.default_fields = self.copy_fields_dict(self.default_fields) |
| clone.overloaded_fields = self.overloaded_fields.copy() |
| clone.underlayer = self.underlayer |
| clone.explicit = self.explicit |
| clone.raw_packet_cache = self.raw_packet_cache |
| clone.raw_packet_cache_fields = self.copy_fields_dict( |
| self.raw_packet_cache_fields |
| ) |
| clone.post_transforms = self.post_transforms[:] |
| clone.payload = self.payload.copy() |
| clone.payload.add_underlayer(clone) |
| clone.time = self.time |
| return clone |
| |
| def getfieldval(self, attr): |
| if attr in self.fields: |
| return self.fields[attr] |
| if attr in self.overloaded_fields: |
| return self.overloaded_fields[attr] |
| if attr in self.default_fields: |
| return self.default_fields[attr] |
| return self.payload.getfieldval(attr) |
| |
| def getfield_and_val(self, attr): |
| if attr in self.fields: |
| return self.get_field(attr),self.fields[attr] |
| if attr in self.overloaded_fields: |
| return self.get_field(attr),self.overloaded_fields[attr] |
| if attr in self.default_fields: |
| return self.get_field(attr),self.default_fields[attr] |
| |
| def __getattr__(self, attr): |
| try: |
| fld, v = self.getfield_and_val(attr) |
| except TypeError: |
| return self.payload.__getattr__(attr) |
| if fld is not None: |
| return fld.i2h(self, v) |
| return v |
| |
| def setfieldval(self, attr, val): |
| if attr in self.default_fields: |
| fld = self.get_field(attr) |
| if fld is None: |
| any2i = lambda x,y: y |
| else: |
| any2i = fld.any2i |
| self.fields[attr] = any2i(self, val) |
| self.explicit = 0 |
| self.raw_packet_cache = None |
| self.raw_packet_cache_fields = None |
| elif attr == "payload": |
| self.remove_payload() |
| self.add_payload(val) |
| else: |
| self.payload.setfieldval(attr,val) |
| |
| def __setattr__(self, attr, val): |
| if attr in self.__all_slots__: |
| return object.__setattr__(self, attr, val) |
| try: |
| return self.setfieldval(attr,val) |
| except AttributeError: |
| pass |
| return object.__setattr__(self, attr, val) |
| |
| def delfieldval(self, attr): |
| if attr in self.fields: |
| del(self.fields[attr]) |
| self.explicit = 0 # in case a default value must be explicited |
| self.raw_packet_cache = None |
| self.raw_packet_cache_fields = None |
| elif attr in self.default_fields: |
| pass |
| elif attr == "payload": |
| self.remove_payload() |
| else: |
| self.payload.delfieldval(attr) |
| |
| def __delattr__(self, attr): |
| if attr == "payload": |
| return self.remove_payload() |
| if attr in self.__all_slots__: |
| return object.__delattr__(self, attr) |
| try: |
| return self.delfieldval(attr) |
| except AttributeError: |
| pass |
| return object.__delattr__(self, attr) |
| |
| def _superdir(self): |
| """ |
| Return a list of slots and methods, including those from subclasses. |
| """ |
| attrs = set() |
| cls = self.__class__ |
| if hasattr(cls, '__all_slots__'): |
| attrs.update(cls.__all_slots__) |
| for bcls in cls.__mro__: |
| if hasattr(bcls, '__dict__'): |
| attrs.update(bcls.__dict__) |
| return attrs |
| |
| def __dir__(self): |
| """ |
| Add fields to tab completion list. |
| """ |
| return sorted(itertools.chain(self._superdir(), self.default_fields)) |
| |
| def __repr__(self): |
| s = "" |
| ct = conf.color_theme |
| for f in self.fields_desc: |
| if isinstance(f, ConditionalField) and not f._evalcond(self): |
| continue |
| if f.name in self.fields: |
| val = f.i2repr(self, self.fields[f.name]) |
| elif f.name in self.overloaded_fields: |
| val = f.i2repr(self, self.overloaded_fields[f.name]) |
| else: |
| continue |
| if isinstance(f, Emph) or f in conf.emph: |
| ncol = ct.emph_field_name |
| vcol = ct.emph_field_value |
| else: |
| ncol = ct.field_name |
| vcol = ct.field_value |
| |
| |
| s += " %s%s%s" % (ncol(f.name), |
| ct.punct("="), |
| vcol(val)) |
| return "%s%s %s %s%s%s"% (ct.punct("<"), |
| ct.layer_name(self.__class__.__name__), |
| s, |
| ct.punct("|"), |
| repr(self.payload), |
| ct.punct(">")) |
| def __str__(self): |
| return str(self.build()) |
| def __bytes__(self): |
| return self.build() |
| def __div__(self, other): |
| if isinstance(other, Packet): |
| cloneA = self.copy() |
| cloneB = other.copy() |
| cloneA.add_payload(cloneB) |
| return cloneA |
| elif isinstance(other, (bytes, str)): |
| return self/conf.raw_layer(load=other) |
| else: |
| return other.__rdiv__(self) |
| __truediv__ = __div__ |
| def __rdiv__(self, other): |
| if isinstance(other, (bytes, str)): |
| return conf.raw_layer(load=other)/self |
| else: |
| raise TypeError |
| __rtruediv__ = __rdiv__ |
| def __mul__(self, other): |
| if isinstance(other, int): |
| return [self]*other |
| else: |
| raise TypeError |
| def __rmul__(self,other): |
| return self.__mul__(other) |
| |
| def __nonzero__(self): |
| return True |
| __bool__ = __nonzero__ |
| def __len__(self): |
| return len(self.__bytes__()) |
| def copy_field_value(self, fieldname, value): |
| return self.get_field(fieldname).do_copy(value) |
| def copy_fields_dict(self, fields): |
| if fields is None: |
| return None |
| return {fname: self.copy_field_value(fname, fval) |
| for fname, fval in six.iteritems(fields)} |
| def self_build(self, field_pos_list=None): |
| """ |
| Create the default layer regarding fields_desc dict |
| |
| :param field_pos_list: |
| """ |
| if self.raw_packet_cache is not None: |
| for fname, fval in six.iteritems(self.raw_packet_cache_fields): |
| if self.getfieldval(fname) != fval: |
| self.raw_packet_cache = None |
| self.raw_packet_cache_fields = None |
| break |
| if self.raw_packet_cache is not None: |
| return self.raw_packet_cache |
| p=b"" |
| for f in self.fields_desc: |
| val = self.getfieldval(f.name) |
| if isinstance(val, RawVal): |
| sval = raw(val) |
| p += sval |
| if field_pos_list is not None: |
| field_pos_list.append( (f.name, sval.encode("string_escape"), len(p), len(sval) ) ) |
| else: |
| p = f.addfield(self, p, val) |
| return p |
| |
| def do_build_payload(self): |
| """ |
| Create the default version of the payload layer |
| |
| :return: a string of payload layer |
| """ |
| return self.payload.do_build() |
| |
| def do_build(self): |
| """ |
| Create the default version of the layer |
| |
| :return: a string of the packet with the payload |
| """ |
| if not self.explicit: |
| self = next(iter(self)) |
| pkt = self.self_build() |
| for t in self.post_transforms: |
| pkt = t(pkt) |
| pay = self.do_build_payload() |
| if self.raw_packet_cache is None: |
| return self.post_build(pkt, pay) |
| else: |
| return pkt + pay |
| |
| def build_padding(self): |
| return self.payload.build_padding() |
| |
| def build(self): |
| """ |
| Create the current layer |
| |
| :return: string of the packet with the payload |
| """ |
| p = self.do_build() |
| p += self.build_padding() |
| p = self.build_done(p) |
| return p |
| |
| def post_build(self, pkt, pay): |
| """ |
| DEV: called right after the current layer is build. |
| |
| :param str pkt: the current packet (build by self_buil function) |
| :param str pay: the packet payload (build by do_build_payload function) |
| :return: a string of the packet with the payload |
| """ |
| return pkt+pay |
| |
| def build_done(self, p): |
| return self.payload.build_done(p) |
| |
| def do_build_ps(self): |
| p = b"" |
| pl = [] |
| q = b"" |
| for f in self.fields_desc: |
| if isinstance(f, ConditionalField) and not f._evalcond(self): |
| continue |
| p = f.addfield(self, p, self.getfieldval(f.name) ) |
| if isinstance(p, bytes): |
| r = p[len(q):] |
| q = p |
| else: |
| r = b"" |
| pl.append( (f, f.i2repr(self,self.getfieldval(f.name)), r) ) |
| |
| pkt,lst = self.payload.build_ps(internal=1) |
| p += pkt |
| lst.append( (self, pl) ) |
| |
| return p,lst |
| |
| def build_ps(self,internal=0): |
| p,lst = self.do_build_ps() |
| # if not internal: |
| # pkt = self |
| # while pkt.haslayer(conf.padding_layer): |
| # pkt = pkt.getlayer(conf.padding_layer) |
| # lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) |
| # p += pkt.load |
| # pkt = pkt.payload |
| return p,lst |
| |
| |
| def psdump(self, filename=None, **kargs): |
| """ |
| psdump(filename=None, layer_shift=0, rebuild=1) |
| |
| Creates an EPS file describing a packet. If filename is not provided a |
| temporary file is created and gs is called. |
| |
| :param filename: the file's filename |
| """ |
| canvas = self.canvas_dump(**kargs) |
| if filename is None: |
| fname = get_temp_file(autoext=".eps") |
| canvas.writeEPSfile(fname) |
| with ContextManagerSubprocess("psdump()"): |
| subprocess.Popen([conf.prog.psreader, fname]) |
| else: |
| canvas.writeEPSfile(filename) |
| |
| def pdfdump(self, filename=None, **kargs): |
| """ |
| pdfdump(filename=None, layer_shift=0, rebuild=1) |
| |
| Creates a PDF file describing a packet. If filename is not provided a |
| temporary file is created and xpdf is called. |
| |
| :param filename: the file's filename |
| """ |
| canvas = self.canvas_dump(**kargs) |
| if filename is None: |
| fname = get_temp_file(autoext=".pdf") |
| canvas.writePDFfile(fname) |
| with ContextManagerSubprocess("pdfdump()"): |
| subprocess.Popen([conf.prog.pdfreader, fname]) |
| else: |
| canvas.writePDFfile(filename) |
| |
| |
| def canvas_dump(self, layer_shift=0, rebuild=1): |
| if PYX == 0: |
| raise ImportError("PyX and its depedencies must be installed") |
| canvas = pyx.canvas.canvas() |
| if rebuild: |
| p,t = self.__class__(raw(self)).build_ps() |
| else: |
| p,t = self.build_ps() |
| YTXT=len(t) |
| for n,l in t: |
| YTXT += len(l) |
| YTXT = float(YTXT) |
| YDUMP=YTXT |
| |
| XSTART = 1 |
| XDSTART = 10 |
| y = 0.0 |
| yd = 0.0 |
| xd = 0 |
| XMUL= 0.55 |
| YMUL = 0.4 |
| |
| backcolor=colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) |
| forecolor=colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) |
| # backcolor=makecol(0.376, 0.729, 0.525, 1.0) |
| |
| |
| def hexstr(x): |
| s = [] |
| for c in x: |
| s.append("%02x" % orb(c)) |
| return " ".join(s) |
| |
| |
| def make_dump_txt(x,y,txt): |
| return pyx.text.text(XDSTART+x*XMUL, (YDUMP-y)*YMUL, r"\tt{%s}"%hexstr(txt), [pyx.text.size.Large]) |
| |
| def make_box(o): |
| return pyx.box.rect(o.left(), o.bottom(), o.width(), o.height(), relcenter=(0.5,0.5)) |
| |
| def make_frame(lst): |
| if len(lst) == 1: |
| b = lst[0].bbox() |
| b.enlarge(pyx.unit.u_pt) |
| return b.path() |
| else: |
| fb = lst[0].bbox() |
| fb.enlarge(pyx.unit.u_pt) |
| lb = lst[-1].bbox() |
| lb.enlarge(pyx.unit.u_pt) |
| if len(lst) == 2 and fb.left() > lb.right(): |
| return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), |
| pyx.path.lineto(fb.left(), fb.top()), |
| pyx.path.lineto(fb.left(), fb.bottom()), |
| pyx.path.lineto(fb.right(), fb.bottom()), |
| pyx.path.moveto(lb.left(), lb.top()), |
| pyx.path.lineto(lb.right(), lb.top()), |
| pyx.path.lineto(lb.right(), lb.bottom()), |
| pyx.path.lineto(lb.left(), lb.bottom())) |
| else: |
| # XXX |
| gb = lst[1].bbox() |
| if gb != lb: |
| gb.enlarge(pyx.unit.u_pt) |
| kb = lst[-2].bbox() |
| if kb != gb and kb != lb: |
| kb.enlarge(pyx.unit.u_pt) |
| return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), |
| pyx.path.lineto(fb.right(), fb.top()), |
| pyx.path.lineto(fb.right(), kb.bottom()), |
| pyx.path.lineto(lb.right(), kb.bottom()), |
| pyx.path.lineto(lb.right(), lb.bottom()), |
| pyx.path.lineto(lb.left(), lb.bottom()), |
| pyx.path.lineto(lb.left(), gb.top()), |
| pyx.path.lineto(fb.left(), gb.top()), |
| pyx.path.closepath(),) |
| |
| |
| def make_dump(s, shift=0, y=0, col=None, bkcol=None, larg=16): |
| c = pyx.canvas.canvas() |
| tlist = [] |
| while s: |
| dmp,s = s[:larg-shift],s[larg-shift:] |
| txt = make_dump_txt(shift, y, dmp) |
| tlist.append(txt) |
| shift += len(dmp) |
| if shift >= 16: |
| shift = 0 |
| y += 1 |
| if col is None: |
| col = pyx.color.rgb.red |
| if bkcol is None: |
| col = pyx.color.rgb.white |
| c.stroke(make_frame(tlist),[col,pyx.deco.filled([bkcol]),pyx.style.linewidth.Thick]) |
| for txt in tlist: |
| c.insert(txt) |
| return c, tlist[-1].bbox(), shift, y |
| |
| |
| last_shift,last_y=0,0.0 |
| while t: |
| bkcol = next(backcolor) |
| proto,fields = t.pop() |
| y += 0.5 |
| pt = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % proto.name, [ pyx.text.size.Large]) |
| y += 1 |
| ptbb=pt.bbox() |
| ptbb.enlarge(pyx.unit.u_pt*2) |
| canvas.stroke(ptbb.path(),[pyx.color.rgb.black, pyx.deco.filled([bkcol])]) |
| canvas.insert(pt) |
| for fname, fval, fdump in fields: |
| col = next(forecolor) |
| ft = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fname.name)) |
| if isinstance(fval, str): |
| if len(fval) > 18: |
| fval = fval[:18]+"[...]" |
| else: |
| fval="" |
| vt = pyx.text.text(XSTART+3, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) |
| y += 1.0 |
| if fdump: |
| dt,target,last_shift,last_y = make_dump(fdump, last_shift, last_y, col, bkcol) |
| |
| dtb = dt.bbox() |
| dtb=target |
| vtb = vt.bbox() |
| bxvt = make_box(vtb) |
| bxdt = make_box(dtb) |
| dtb.enlarge(pyx.unit.u_pt) |
| try: |
| if yd < 0: |
| cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=-90) |
| else: |
| cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=90) |
| except: |
| pass |
| else: |
| canvas.stroke(cnx,[pyx.style.linewidth.thin,pyx.deco.earrow.small,col]) |
| |
| canvas.insert(dt) |
| |
| canvas.insert(ft) |
| canvas.insert(vt) |
| last_y += layer_shift |
| |
| return canvas |
| |
| |
| |
| def extract_padding(self, s): |
| """ |
| DEV: to be overloaded to extract current layer's padding. |
| |
| :param str s: the current layer |
| :return: a couple of strings (actual layer, padding) |
| """ |
| return s,None |
| |
| def post_dissect(self, s): |
| """DEV: is called right after the current layer has been dissected""" |
| return s |
| |
| def pre_dissect(self, s): |
| """DEV: is called right before the current layer is dissected""" |
| return s |
| |
| def do_dissect(self, s): |
| s = raw(s) |
| _raw = s |
| self.raw_packet_cache_fields = {} |
| for f in self.fields_desc: |
| if not s: |
| break |
| s, fval = f.getfield(self, s) |
| # We need to track fields with mutable values to discard |
| # .raw_packet_cache when needed. |
| if f.islist or f.holds_packets or f.ismutable: |
| self.raw_packet_cache_fields[f.name] = f.do_copy(fval) |
| self.fields[f.name] = fval |
| assert(_raw.endswith(raw(s))) |
| self.raw_packet_cache = _raw[:-len(s)] if s else _raw |
| self.explicit = 1 |
| return s |
| |
| def do_dissect_payload(self, s): |
| """ |
| Perform the dissection of the layer's payload |
| |
| :param str s: the raw layer |
| """ |
| if s: |
| cls = self.guess_payload_class(s) |
| try: |
| p = cls(s, _internal=1, _underlayer=self) |
| except KeyboardInterrupt: |
| raise |
| except: |
| if conf.debug_dissector: |
| if isinstance(cls,type) and issubclass(cls,Packet): |
| log_runtime.error("%s dissector failed" % cls.__name__) |
| else: |
| log_runtime.error("%s.guess_payload_class() returned [%s]" % (self.__class__.__name__,repr(cls))) |
| if cls is not None: |
| raise |
| p = conf.raw_layer(s, _internal=1, _underlayer=self) |
| self.add_payload(p) |
| |
| def dissect(self, s): |
| s = self.pre_dissect(s) |
| |
| s = self.do_dissect(s) |
| |
| s = self.post_dissect(s) |
| |
| payl,pad = self.extract_padding(s) |
| self.do_dissect_payload(payl) |
| if pad and conf.padding: |
| self.add_payload(conf.padding_layer(pad)) |
| |
| |
| def guess_payload_class(self, payload): |
| """ |
| DEV: Guesses the next payload class from layer bonds. |
| Can be overloaded to use a different mechanism. |
| |
| :param str payload: the layer's payload |
| :return: the payload class |
| """ |
| for t in self.aliastypes: |
| for fval, cls in t.payload_guess: |
| ok = 1 |
| for k, v in six.iteritems(fval): |
| if not hasattr(self, k) or v != self.getfieldval(k): |
| ok = 0 |
| break |
| if ok: |
| return cls |
| return self.default_payload_class(payload) |
| |
| def default_payload_class(self, payload): |
| """ |
| DEV: Returns the default payload class if nothing has been found by the |
| guess_payload_class() method. |
| |
| :param str payload: the layer's payload |
| :return: the default payload class define inside the configuration file |
| """ |
| return conf.raw_layer |
| |
| def hide_defaults(self): |
| """Removes fields' values that are the same as default values.""" |
| for k, v in list(self.fields.items()): # use list(): self.fields is modified in the loop |
| v = self.fields[k] |
| if k in self.default_fields: |
| if self.default_fields[k] == v: |
| del self.fields[k] |
| self.payload.hide_defaults() |
| |
| def clone_with(self, payload=None, **kargs): |
| pkt = self.__class__() |
| pkt.explicit = 1 |
| pkt.fields = kargs |
| pkt.default_fields = self.copy_fields_dict(self.default_fields) |
| pkt.overloaded_fields = self.overloaded_fields.copy() |
| pkt.time = self.time |
| pkt.underlayer = self.underlayer |
| pkt.post_transforms = self.post_transforms |
| pkt.raw_packet_cache = self.raw_packet_cache |
| pkt.raw_packet_cache_fields = self.copy_fields_dict( |
| self.raw_packet_cache_fields |
| ) |
| if payload is not None: |
| pkt.add_payload(payload) |
| return pkt |
| |
| def __iter__(self): |
| def loop(todo, done, self=self): |
| if todo: |
| eltname = todo.pop() |
| elt = self.getfieldval(eltname) |
| if not isinstance(elt, Gen): |
| if self.get_field(eltname).islist: |
| elt = SetGen([elt]) |
| else: |
| elt = SetGen(elt) |
| for e in elt: |
| done[eltname]=e |
| for x in loop(todo[:], done): |
| yield x |
| else: |
| if isinstance(self.payload,NoPayload): |
| payloads = [None] |
| else: |
| payloads = self.payload |
| for payl in payloads: |
| done2=done.copy() |
| for k in done2: |
| if isinstance(done2[k], VolatileValue): |
| done2[k] = done2[k]._fix() |
| pkt = self.clone_with(payload=payl, **done2) |
| yield pkt |
| |
| if self.explicit or self.raw_packet_cache is not None: |
| todo = [] |
| done = self.fields |
| else: |
| todo = [k for (k,v) in itertools.chain(six.iteritems(self.default_fields), |
| six.iteritems(self.overloaded_fields)) |
| if isinstance(v, VolatileValue)] + list(self.fields.keys()) |
| done = {} |
| return loop(todo, done) |
| |
| def __gt__(self, other): |
| """True if other is an answer from self (self ==> other).""" |
| if isinstance(other, Packet): |
| return other < self |
| elif isinstance(other, bytes): |
| return 1 |
| else: |
| raise TypeError((self, other)) |
| def __lt__(self, other): |
| """True if self is an answer from other (other ==> self).""" |
| if isinstance(other, Packet): |
| return self.answers(other) |
| elif isinstance(other, bytes): |
| return 1 |
| else: |
| raise TypeError((self, other)) |
| |
| def __eq__(self, other): |
| if not isinstance(other, self.__class__): |
| return False |
| for f in self.fields_desc: |
| if f not in other.fields_desc: |
| return False |
| if self.getfieldval(f.name) != other.getfieldval(f.name): |
| return False |
| return self.payload == other.payload |
| |
| def __ne__(self, other): |
| return not self.__eq__(other) |
| |
| def hashret(self): |
| """DEV: returns a string that has the same value for a request and its answer.""" |
| return self.payload.hashret() |
| def answers(self, other): |
| """DEV: true if self is an answer from other""" |
| if other.__class__ == self.__class__: |
| return self.payload.answers(other.payload) |
| return 0 |
| |
| def haslayer(self, cls): |
| """true if self has a layer that is an instance of cls. Superseded by "cls in self" syntax.""" |
| if self.__class__ == cls or self.__class__.__name__ == cls: |
| return 1 |
| for f in self.packetfields: |
| fvalue_gen = self.getfieldval(f.name) |
| if fvalue_gen is None: |
| continue |
| if not f.islist: |
| fvalue_gen = SetGen(fvalue_gen,_iterpacket=0) |
| for fvalue in fvalue_gen: |
| if isinstance(fvalue, Packet): |
| ret = fvalue.haslayer(cls) |
| if ret: |
| return ret |
| return self.payload.haslayer(cls) |
| |
| def getlayer(self, cls, nb=1, _track=None, _subclass=False, **flt): |
| """Return the nb^th layer that is an instance of cls, matching flt |
| values. |
| |
| """ |
| if _subclass: |
| match = lambda cls1, cls2: issubclass(cls1, cls2) |
| else: |
| match = lambda cls1, cls2: cls1 == cls2 |
| if isinstance(cls, int): |
| nb = cls+1 |
| cls = None |
| if isinstance(cls, str) and "." in cls: |
| ccls,fld = cls.split(".",1) |
| else: |
| ccls,fld = cls,None |
| if cls is None or match(self.__class__, cls) or self.__class__.__name__ == ccls: |
| if all(self.getfieldval(fldname) == fldvalue |
| for fldname, fldvalue in six.iteritems(flt)): |
| if nb == 1: |
| if fld is None: |
| return self |
| else: |
| return self.getfieldval(fld) |
| else: |
| nb -=1 |
| for f in self.packetfields: |
| fvalue_gen = self.getfieldval(f.name) |
| if fvalue_gen is None: |
| continue |
| if not f.islist: |
| fvalue_gen = SetGen(fvalue_gen,_iterpacket=0) |
| for fvalue in fvalue_gen: |
| if isinstance(fvalue, Packet): |
| track=[] |
| ret = fvalue.getlayer(cls, nb=nb, _track=track, |
| _subclass=_subclass) |
| if ret is not None: |
| return ret |
| nb = track[0] |
| return self.payload.getlayer(cls, nb=nb, _track=_track, |
| _subclass=_subclass, **flt) |
| |
| def firstlayer(self): |
| q = self |
| while q.underlayer is not None: |
| q = q.underlayer |
| return q |
| |
| def __getitem__(self, cls): |
| if isinstance(cls, slice): |
| lname = cls.start |
| if cls.stop: |
| ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) |
| else: |
| ret = self.getlayer(cls.start, **(cls.step or {})) |
| else: |
| lname = cls |
| ret = self.getlayer(cls) |
| if ret is None: |
| if isinstance(lname, Packet_metaclass): |
| lname = lname.__name__ |
| elif not isinstance(lname, bytes): |
| lname = repr(lname) |
| raise IndexError("Layer [%s] not found" % lname) |
| return ret |
| |
| def __delitem__(self, cls): |
| del(self[cls].underlayer.payload) |
| |
| def __setitem__(self, cls, val): |
| self[cls].underlayer.payload = val |
| |
| def __contains__(self, cls): |
| """"cls in self" returns true if self has a layer which is an instance of cls.""" |
| return self.haslayer(cls) |
| |
| def route(self): |
| return (None,None,None) |
| |
| def fragment(self, *args, **kargs): |
| return self.payload.fragment(*args, **kargs) |
| |
| |
| def display(self,*args,**kargs): # Deprecated. Use show() |
| """Deprecated. Use show() method.""" |
| self.show(*args,**kargs) |
| |
| def _show_or_dump(self, dump=False, indent=3, lvl="", label_lvl="", first_call=True): |
| """ |
| Internal method that shows or dumps a hierarchical view of a packet. |
| Called by show. |
| |
| :param dump: determine if it prints or returns the string value |
| :param int indent: the size of indentation for each layer |
| :param str lvl: additional information about the layer lvl |
| :param str label_lvl: additional information about the layer fields |
| :param first_call: determine if the current function is the first |
| :return: return a hierarchical view if dump, else print it |
| """ |
| |
| if dump: |
| from scapy.themes import AnsiColorTheme |
| ct = AnsiColorTheme() # No color for dump output |
| else: |
| ct = conf.color_theme |
| s = "%s%s %s %s \n" % (label_lvl, |
| ct.punct("###["), |
| ct.layer_name(self.name), |
| ct.punct("]###")) |
| for f in self.fields_desc: |
| if isinstance(f, ConditionalField) and not f._evalcond(self): |
| continue |
| if isinstance(f, Emph) or f in conf.emph: |
| ncol = ct.emph_field_name |
| vcol = ct.emph_field_value |
| else: |
| ncol = ct.field_name |
| vcol = ct.field_value |
| fvalue = self.getfieldval(f.name) |
| if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): |
| s += "%s \\%-10s\\\n" % (label_lvl+lvl, ncol(f.name)) |
| fvalue_gen = SetGen(fvalue,_iterpacket=0) |
| for fvalue in fvalue_gen: |
| s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl+lvl+" |", first_call=False) |
| else: |
| begn = "%s %-10s%s " % (label_lvl+lvl, |
| ncol(f.name), |
| ct.punct("="),) |
| reprval = f.i2repr(self,fvalue) |
| if isinstance(reprval, str): |
| reprval = reprval.replace("\n", "\n"+" "*(len(label_lvl) |
| +len(lvl) |
| +len(f.name) |
| +4)) |
| s += "%s%s\n" % (begn,vcol(reprval)) |
| if self.payload: |
| s += self.payload._show_or_dump(dump=dump, indent=indent, lvl=lvl+(" "*indent*self.show_indent), label_lvl=label_lvl, first_call=False) |
| |
| if first_call and not dump: |
| print(s) |
| else: |
| return s |
| |
| def show(self, dump=False, indent=3, lvl="", label_lvl=""): |
| """ |
| Prints or returns (when "dump" is true) a hierarchical view of the |
| packet. |
| |
| :param dump: determine if it prints or returns the string value |
| :param int indent: the size of indentation for each layer |
| :param str lvl: additional information about the layer lvl |
| :param str label_lvl: additional information about the layer fields |
| :return: return a hierarchical view if dump, else print it |
| """ |
| return self._show_or_dump(dump, indent, lvl, label_lvl) |
| |
| def show2(self, dump=False, indent=3, lvl="", label_lvl=""): |
| """ |
| Prints or returns (when "dump" is true) a hierarchical view of an |
| assembled version of the packet, so that automatic fields are |
| calculated (checksums, etc.) |
| |
| :param dump: determine if it prints or returns the string value |
| :param int indent: the size of indentation for each layer |
| :param str lvl: additional information about the layer lvl |
| :param str label_lvl: additional information about the layer fields |
| :return: return a hierarchical view if dump, else print it |
| """ |
| return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) |
| |
| def sprintf(self, fmt, relax=1): |
| """sprintf(format, [relax=1]) -> str |
| where format is a string that can include directives. A directive begins and |
| ends by % and has the following format %[fmt[r],][cls[:nb].]field%. |
| |
| fmt is a classic printf directive, "r" can be appended for raw substitution |
| (ex: IP.flags=0x18 instead of SA), nb is the number of the layer we want |
| (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). |
| Special case : "%.time%" is the creation time. |
| Ex : p.sprintf("%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " |
| "%03xr,IP.proto% %r,TCP.flags%") |
| |
| Moreover, the format string can include conditional statements. A conditional |
| statement looks like : {layer:string} where layer is a layer name, and string |
| is the string to insert in place of the condition if it is true, i.e. if layer |
| is present. If layer is preceded by a "!", the result is inverted. Conditions |
| can be imbricated. A valid statement can be : |
| p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") |
| p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") |
| |
| A side effect is that, to obtain "{" and "}" characters, you must use |
| "%(" and "%)". |
| """ |
| |
| escape = { "%": "%", |
| "(": "{", |
| ")": "}" } |
| |
| |
| # Evaluate conditions |
| while "{" in fmt: |
| i = fmt.rindex("{") |
| j = fmt[i+1:].index("}") |
| cond = fmt[i+1:i+j+1] |
| k = cond.find(":") |
| if k < 0: |
| raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)"%cond) |
| cond,format = cond[:k],cond[k+1:] |
| res = False |
| if cond[0] == "!": |
| res = True |
| cond = cond[1:] |
| if self.haslayer(cond): |
| res = not res |
| if not res: |
| format = "" |
| fmt = fmt[:i]+format+fmt[i+j+2:] |
| |
| # Evaluate directives |
| s = "" |
| while "%" in fmt: |
| i = fmt.index("%") |
| s += fmt[:i] |
| fmt = fmt[i+1:] |
| if fmt and fmt[0] in escape: |
| s += escape[fmt[0]] |
| fmt = fmt[1:] |
| continue |
| try: |
| i = fmt.index("%") |
| sfclsfld = fmt[:i] |
| fclsfld = sfclsfld.split(",") |
| if len(fclsfld) == 1: |
| f = "s" |
| clsfld = fclsfld[0] |
| elif len(fclsfld) == 2: |
| f,clsfld = fclsfld |
| else: |
| raise Scapy_Exception |
| if "." in clsfld: |
| cls,fld = clsfld.split(".") |
| else: |
| cls = self.__class__.__name__ |
| fld = clsfld |
| num = 1 |
| if ":" in cls: |
| cls,num = cls.split(":") |
| num = int(num) |
| fmt = fmt[i+1:] |
| except: |
| raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) |
| else: |
| if fld == "time": |
| val = time.strftime("%H:%M:%S.%%06i", time.localtime(self.time)) % int((self.time-int(self.time))*1000000) |
| elif cls == self.__class__.__name__ and hasattr(self, fld): |
| if num > 1: |
| val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f,cls,num-1,fld), relax) |
| f = "s" |
| elif f[-1] == "r": # Raw field value |
| val = getattr(self,fld) |
| f = f[:-1] |
| if not f: |
| f = "s" |
| else: |
| val = getattr(self,fld) |
| if fld in self.fieldtype: |
| val = self.fieldtype[fld].i2repr(self,val) |
| else: |
| val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) |
| f = "s" |
| s += ("%"+f) % val |
| |
| s += fmt |
| return s |
| |
| def mysummary(self): |
| """DEV: can be overloaded to return a string that summarizes the layer. |
| Only one mysummary() is used in a whole packet summary: the one of the upper layer, |
| except if a mysummary() also returns (as a couple) a list of layers whose |
| mysummary() must be called if they are present.""" |
| return "" |
| |
| def _do_summary(self): |
| found, s, needed = self.payload._do_summary() |
| ret = "" |
| if not found or self.__class__ in needed: |
| ret = self.mysummary() |
| if isinstance(ret, tuple): |
| ret,n = ret |
| needed += n |
| if ret or needed: |
| found = 1 |
| if not ret: |
| ret = self.__class__.__name__ if self.show_summary else "" |
| if self.__class__ in conf.emph: |
| impf = [] |
| for f in self.fields_desc: |
| if f in conf.emph: |
| impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) |
| ret = "%s [%s]" % (ret," ".join(impf)) |
| if ret and s: |
| ret = "%s / %s" % (ret, s) |
| else: |
| ret = "%s%s" % (ret,s) |
| return found,ret,needed |
| |
| def summary(self, intern=0): |
| """Prints a one line summary of a packet.""" |
| found,s,needed = self._do_summary() |
| return s |
| |
| |
| def lastlayer(self,layer=None): |
| """Returns the uppest layer of the packet""" |
| return self.payload.lastlayer(self) |
| |
| def decode_payload_as(self,cls): |
| """Reassembles the payload and decode it using another packet class""" |
| s = raw(self.payload) |
| self.payload = cls(s, _internal=1, _underlayer=self) |
| pp = self |
| while pp.underlayer is not None: |
| pp = pp.underlayer |
| self.payload.dissection_done(pp) |
| |
| def command(self): |
| """Returns a string representing the command you have to type to obtain the same packet""" |
| f = [] |
| for fn,fv in self.fields.items(): |
| fld = self.get_field(fn) |
| if isinstance(fv, Packet): |
| fv = fv.command() |
| elif fld.islist and fld.holds_packets and isinstance(fv, list): |
| fv = "[%s]" % ",".join( map(Packet.command, fv)) |
| elif isinstance(fld, FlagsField): |
| fv = int(fv) |
| else: |
| fv = repr(fv) |
| f.append("%s=%s" % (fn, fv)) |
| c = "%s(%s)" % (self.__class__.__name__, ", ".join(f)) |
| pc = self.payload.command() |
| if pc: |
| c += "/"+pc |
| return c |
| |
| class NoPayload(Packet): |
| def __new__(cls, *args, **kargs): |
| singl = cls.__dict__.get("__singl__") |
| if singl is None: |
| cls.__singl__ = singl = Packet.__new__(cls) |
| Packet.__init__(singl) |
| return singl |
| def __init__(self, *args, **kargs): |
| pass |
| def dissection_done(self,pkt): |
| return |
| def add_payload(self, payload): |
| raise Scapy_Exception("Can't add payload to NoPayload instance") |
| def remove_payload(self): |
| pass |
| def add_underlayer(self,underlayer): |
| pass |
| def remove_underlayer(self,other): |
| pass |
| def copy(self): |
| return self |
| def __repr__(self): |
| return "" |
| def __str__(self): |
| return "" |
| def __bytes__(self): |
| return b"" |
| def __nonzero__(self): |
| return False |
| __bool__ = __nonzero__ |
| def do_build(self): |
| return b"" |
| def build(self): |
| return b"" |
| def build_padding(self): |
| return b"" |
| def build_done(self, p): |
| return p |
| def build_ps(self, internal=0): |
| return b"",[] |
| def getfieldval(self, attr): |
| raise AttributeError(attr) |
| def getfield_and_val(self, attr): |
| raise AttributeError(attr) |
| def setfieldval(self, attr, val): |
| raise AttributeError(attr) |
| def delfieldval(self, attr): |
| raise AttributeError(attr) |
| def hide_defaults(self): |
| pass |
| def __iter__(self): |
| return iter([]) |
| def __eq__(self, other): |
| if isinstance(other, NoPayload): |
| return True |
| return False |
| def hashret(self): |
| return b"" |
| def answers(self, other): |
| return isinstance(other, NoPayload) or isinstance(other, conf.padding_layer) |
| def haslayer(self, cls): |
| return 0 |
| def getlayer(self, cls, nb=1, _track=None, **flt): |
| if _track is not None: |
| _track.append(nb) |
| return None |
| def fragment(self, *args, **kargs): |
| raise Scapy_Exception("cannot fragment this packet") |
| def show(self, indent=3, lvl="", label_lvl=""): |
| pass |
| def sprintf(self, fmt, relax): |
| if relax: |
| return "??" |
| else: |
| raise Scapy_Exception("Format not found [%s]"%fmt) |
| def _do_summary(self): |
| return 0,"",[] |
| def lastlayer(self,layer): |
| return layer |
| def command(self): |
| return "" |
| |
| #################### |
| ## packet classes ## |
| #################### |
| |
| |
| class Raw(Packet): |
| name = "Raw" |
| fields_desc = [ StrField("load", "") ] |
| def answers(self, other): |
| return 1 |
| # s = raw(other) |
| # t = self.load |
| # l = min(len(s), len(t)) |
| # return s[:l] == t[:l] |
| def mysummary(self): |
| cs = conf.raw_summary |
| if cs: |
| if callable(cs): |
| return "Raw %s" % cs(self.load) |
| else: |
| return "Raw %r" % self.load |
| return Packet.mysummary(self) |
| |
| class Padding(Raw): |
| name = "Padding" |
| def self_build(self): |
| return b"" |
| def build_padding(self): |
| return (raw(self.load) if self.raw_packet_cache is None |
| else self.raw_packet_cache) + self.payload.build_padding() |
| |
| conf.raw_layer = Raw |
| conf.padding_layer = Padding |
| if conf.default_l2 is None: |
| conf.default_l2 = Raw |
| |
| ################# |
| ## Bind layers ## |
| ################# |
| |
| |
| def bind_bottom_up(lower, upper, __fval=None, **fval): |
| if __fval is not None: |
| fval.update(__fval) |
| lower.payload_guess = lower.payload_guess[:] |
| lower.payload_guess.append((fval, upper)) |
| |
| |
| def bind_top_down(lower, upper, __fval=None, **fval): |
| if __fval is not None: |
| fval.update(__fval) |
| upper._overload_fields = upper._overload_fields.copy() |
| upper._overload_fields[lower] = fval |
| |
| @conf.commands.register |
| def bind_layers(lower, upper, __fval=None, **fval): |
| """Bind 2 layers on some specific fields' values""" |
| if __fval is not None: |
| fval.update(__fval) |
| bind_top_down(lower, upper, **fval) |
| bind_bottom_up(lower, upper, **fval) |
| |
| def split_bottom_up(lower, upper, __fval=None, **fval): |
| if __fval is not None: |
| fval.update(__fval) |
| def do_filter(xxx_todo_changeme,upper=upper,fval=fval): |
| (f,u) = xxx_todo_changeme |
| if u != upper: |
| return True |
| for k in fval: |
| if k not in f or f[k] != fval[k]: |
| return True |
| return False |
| lower.payload_guess = [x for x in lower.payload_guess if do_filter(x)] |
| |
| def split_top_down(lower, upper, __fval=None, **fval): |
| if __fval is not None: |
| fval.update(__fval) |
| if lower in upper._overload_fields: |
| ofval = upper._overload_fields[lower] |
| for k in fval: |
| if k not in ofval or ofval[k] != fval[k]: |
| return |
| upper._overload_fields = upper._overload_fields.copy() |
| del(upper._overload_fields[lower]) |
| |
| @conf.commands.register |
| def split_layers(lower, upper, __fval=None, **fval): |
| """Split 2 layers previously bound""" |
| if __fval is not None: |
| fval.update(__fval) |
| split_bottom_up(lower, upper, **fval) |
| split_top_down(lower, upper, **fval) |
| |
| |
| @conf.commands.register |
| def ls(obj=None, case_sensitive=False, verbose=False): |
| """List available layers, or infos on a given layer class or name""" |
| is_string = isinstance(obj, six.string_types) |
| |
| if obj is None or is_string: |
| if obj is None: |
| all_layers = sorted(conf.layers, key=lambda x: x.__name__) |
| else: |
| pattern = re.compile(obj, 0 if case_sensitive else re.I) |
| all_layers = sorted((layer for layer in conf.layers |
| if (pattern.search(layer.__name__ or '') |
| or pattern.search(layer.name or ''))), |
| key=lambda x: x.__name__) |
| for layer in all_layers: |
| print("%-10s : %s" % (layer.__name__, layer._name)) |
| |
| else: |
| is_pkt = isinstance(obj, Packet) |
| if (isinstance(obj, type) and issubclass(obj, Packet)) or is_pkt: |
| for f in obj.fields_desc: |
| cur_fld = f |
| attrs = [] |
| long_attrs = [] |
| while isinstance(cur_fld, (Emph, ConditionalField)): |
| if isinstance(cur_fld, ConditionalField): |
| attrs.append(cur_fld.__class__.__name__[:4]) |
| cur_fld = cur_fld.fld |
| if verbose and isinstance(cur_fld, EnumField) \ |
| and hasattr(cur_fld, "i2s"): |
| if len(cur_fld.i2s) < 50: |
| long_attrs.extend( |
| "%s: %d" % (strval, numval) |
| for numval, strval in |
| sorted(six.iteritems(cur_fld.i2s)) |
| ) |
| elif isinstance(cur_fld, MultiEnumField): |
| fld_depend = cur_fld.depends_on(obj.__class__ |
| if is_pkt else obj) |
| attrs.append("Depends on %s" % fld_depend.name) |
| if verbose: |
| cur_i2s = cur_fld.i2s_multi.get( |
| cur_fld.depends_on(obj if is_pkt else obj()), {} |
| ) |
| if len(cur_i2s) < 50: |
| long_attrs.extend( |
| "%s: %d" % (strval, numval) |
| for numval, strval in |
| sorted(six.iteritems(cur_i2s)) |
| ) |
| elif verbose and isinstance(cur_fld, FlagsField): |
| names = cur_fld.names |
| long_attrs.append(", ".join(names)) |
| class_name = "%s (%s)" % ( |
| cur_fld.__class__.__name__, |
| ", ".join(attrs)) if attrs else cur_fld.__class__.__name__ |
| if isinstance(cur_fld, BitField): |
| class_name += " (%d bit%s)" % (cur_fld.size, |
| "s" if cur_fld.size > 1 |
| else "") |
| print("%-10s : %-35s =" % (f.name, class_name), end=' ') |
| if is_pkt: |
| print("%-15r" % (getattr(obj, f.name),), end=' ') |
| print("(%r)" % (f.default,)) |
| for attr in long_attrs: |
| print("%-15s%s" % ("", attr)) |
| if is_pkt and not isinstance(obj.payload, NoPayload): |
| print("--") |
| ls(obj.payload) |
| |
| else: |
| print("Not a packet class or name. Type 'ls()' to list packet classes.") |
| |
| |
| |
| ############# |
| ## Fuzzing ## |
| ############# |
| |
| @conf.commands.register |
| def fuzz(p, _inplace=0): |
| """Transform a layer into a fuzzy layer by replacing some default values by random objects""" |
| if not _inplace: |
| p = p.copy() |
| q = p |
| while not isinstance(q, NoPayload): |
| for f in q.fields_desc: |
| if isinstance(f, PacketListField): |
| for r in getattr(q, f.name): |
| print("fuzzing", repr(r)) |
| fuzz(r, _inplace=1) |
| elif f.default is not None: |
| if not isinstance(f, ConditionalField) or f._evalcond(q): |
| rnd = f.randval() |
| if rnd is not None: |
| q.default_fields[f.name] = rnd |
| q = q.payload |
| return p |
| |
| |
| |