| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## This program is published under a GPLv2 license |
| |
| """ |
| General utility functions. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| import os, sys, socket, types |
| import random, time |
| import gzip, zlib |
| import re, struct, array |
| import subprocess |
| import tempfile |
| |
| import warnings |
| import scapy.modules.six as six |
| from scapy.modules.six.moves import range |
| warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__) |
| |
| from scapy.config import conf |
| from scapy.consts import DARWIN, WINDOWS |
| from scapy.data import MTU |
| from scapy.compat import * |
| from scapy.error import log_runtime, log_loading, log_interactive, Scapy_Exception, warning |
| from scapy.base_classes import BasePacketList |
| |
| ########### |
| ## Tools ## |
| ########### |
| |
| def get_temp_file(keep=False, autoext=""): |
| """Create a temporary file and return its name. When keep is False, |
| the file is deleted when scapy exits. |
| |
| """ |
| fname = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, |
| delete=False).name |
| if not keep: |
| conf.temp_files.append(fname) |
| return fname |
| |
| def sane_color(x): |
| r="" |
| for i in x: |
| j = orb(i) |
| if (j < 32) or (j >= 127): |
| r=r+conf.color_theme.not_printable(".") |
| else: |
| r=r+chr(j) |
| return r |
| |
| def sane(x): |
| r="" |
| for i in x: |
| j = orb(i) |
| if (j < 32) or (j >= 127): |
| r=r+"." |
| else: |
| r=r+chr(j) |
| return r |
| |
| @conf.commands.register |
| def restart(): |
| """Restarts scapy""" |
| if not conf.interactive or not os.path.isfile(sys.argv[0]): |
| raise OSError("Scapy was not started from console") |
| if WINDOWS: |
| os._exit(subprocess.call([sys.executable] + sys.argv)) |
| os.execv(sys.executable, [sys.executable] + sys.argv) |
| |
| def lhex(x): |
| if type(x) in six.integer_types: |
| return hex(x) |
| elif isinstance(x, tuple): |
| return "(%s)" % ", ".join(map(lhex, x)) |
| elif isinstance(x, list): |
| return "[%s]" % ", ".join(map(lhex, x)) |
| else: |
| return x |
| |
| @conf.commands.register |
| def hexdump(x, dump=False): |
| """ Build a tcpdump like hexadecimal view |
| |
| :param x: a Packet |
| :param dump: define if the result must be printed or returned in a variable |
| :returns: a String only when dump=True |
| """ |
| s = "" |
| x = raw(x) |
| l = len(x) |
| i = 0 |
| while i < l: |
| s += "%04x " % i |
| for j in range(16): |
| if i+j < l: |
| s += "%02X" % orb(x[i+j]) |
| else: |
| s += " " |
| if j%16 == 7: |
| s += "" |
| s += " " |
| s += sane_color(x[i:i+16]) |
| i += 16 |
| s += "\n" |
| # remove trailing \n |
| if s.endswith("\n"): |
| s = s[:-1] |
| if dump: |
| return s |
| else: |
| print(s) |
| |
| |
| @conf.commands.register |
| def linehexdump(x, onlyasc=0, onlyhex=0, dump=False): |
| """ Build an equivalent view of hexdump() on a single line |
| |
| Note that setting both onlyasc and onlyhex to 1 results in a empty output |
| |
| :param x: a Packet |
| :param onlyasc: 1 to display only the ascii view |
| :param onlyhex: 1 to display only the hexadecimal view |
| :param dump: print the view if False |
| :returns: a String only when dump=True |
| """ |
| s = "" |
| x = raw(x) |
| l = len(x) |
| if not onlyasc: |
| for i in range(l): |
| s += "%02X" % orb(x[i]) |
| if not onlyhex: # separate asc & hex if both are displayed |
| s += " " |
| if not onlyhex: |
| s += sane_color(x) |
| if dump: |
| return s |
| else: |
| print(s) |
| |
| @conf.commands.register |
| def chexdump(x, dump=False): |
| """ Build a per byte hexadecimal representation |
| |
| Example: |
| >>> chexdump(IP()) |
| 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 |
| |
| :param x: a Packet |
| :param dump: print the view if False |
| :returns: a String only if dump=True |
| """ |
| x = raw(x) |
| s = ", ".join("%#04x" % orb(x) for x in x) |
| if dump: |
| return s |
| else: |
| print(s) |
| |
| @conf.commands.register |
| def hexstr(x, onlyasc=0, onlyhex=0): |
| s = [] |
| if not onlyasc: |
| s.append(" ".join("%02x" % orb(b) for b in x)) |
| if not onlyhex: |
| s.append(sane(x)) |
| return " ".join(s) |
| |
| def repr_hex(s): |
| """ Convert provided bitstring to a simple string of hex digits """ |
| return "".join("%02x" % orb(x) for x in s) |
| |
| @conf.commands.register |
| def hexdiff(x,y): |
| """Show differences between 2 binary strings""" |
| x=raw(x)[::-1] |
| y=raw(y)[::-1] |
| SUBST=1 |
| INSERT=1 |
| d = {(-1, -1): (0, (-1, -1))} |
| for j in range(len(y)): |
| d[-1,j] = d[-1,j-1][0]+INSERT, (-1,j-1) |
| for i in range(len(x)): |
| d[i,-1] = d[i-1,-1][0]+INSERT, (i-1,-1) |
| |
| for j in range(len(y)): |
| for i in range(len(x)): |
| d[i,j] = min( ( d[i-1,j-1][0]+SUBST*(x[i] != y[j]), (i-1,j-1) ), |
| ( d[i-1,j][0]+INSERT, (i-1,j) ), |
| ( d[i,j-1][0]+INSERT, (i,j-1) ) ) |
| |
| |
| backtrackx = [] |
| backtracky = [] |
| i=len(x)-1 |
| j=len(y)-1 |
| while not (i == j == -1): |
| i2,j2 = d[i,j][1] |
| backtrackx.append(x[i2+1:i+1]) |
| backtracky.append(y[j2+1:j+1]) |
| i,j = i2,j2 |
| |
| |
| |
| x = y = i = 0 |
| colorize = { 0: lambda x:x, |
| -1: conf.color_theme.left, |
| 1: conf.color_theme.right } |
| |
| dox=1 |
| doy=0 |
| l = len(backtrackx) |
| while i < l: |
| separate=0 |
| linex = backtrackx[i:i+16] |
| liney = backtracky[i:i+16] |
| xx = sum(len(k) for k in linex) |
| yy = sum(len(k) for k in liney) |
| if dox and not xx: |
| dox = 0 |
| doy = 1 |
| if dox and linex == liney: |
| doy=1 |
| |
| if dox: |
| xd = y |
| j = 0 |
| while not linex[j]: |
| j += 1 |
| xd -= 1 |
| print(colorize[doy-dox]("%04x" % xd), end=' ') |
| x += xx |
| line=linex |
| else: |
| print(" ", end=' ') |
| if doy: |
| yd = y |
| j = 0 |
| while not liney[j]: |
| j += 1 |
| yd -= 1 |
| print(colorize[doy-dox]("%04x" % yd), end=' ') |
| y += yy |
| line=liney |
| else: |
| print(" ", end=' ') |
| |
| print(" ", end=' ') |
| |
| cl = "" |
| for j in range(16): |
| if i+j < l: |
| if line[j]: |
| col = colorize[(linex[j]!=liney[j])*(doy-dox)] |
| print(col("%02X" % orb(line[j])), end=' ') |
| if linex[j]==liney[j]: |
| cl += sane_color(line[j]) |
| else: |
| cl += col(sane(line[j])) |
| else: |
| print(" ", end=' ') |
| cl += " " |
| else: |
| print(" ", end=' ') |
| if j == 7: |
| print("", end=' ') |
| |
| |
| print(" ",cl) |
| |
| if doy or not yy: |
| doy=0 |
| dox=1 |
| i += 16 |
| else: |
| if yy: |
| dox=0 |
| doy=1 |
| else: |
| i += 16 |
| |
| if struct.pack("H",1) == b"\x00\x01": # big endian |
| def checksum(pkt): |
| if len(pkt) % 2 == 1: |
| pkt += b"\0" |
| s = sum(array.array("H", pkt)) |
| s = (s >> 16) + (s & 0xffff) |
| s += s >> 16 |
| s = ~s |
| return s & 0xffff |
| else: |
| def checksum(pkt): |
| if len(pkt) % 2 == 1: |
| pkt += b"\0" |
| s = sum(array.array("H", pkt)) |
| s = (s >> 16) + (s & 0xffff) |
| s += s >> 16 |
| s = ~s |
| return (((s>>8)&0xff)|s<<8) & 0xffff |
| |
| |
| def _fletcher16(charbuf): |
| # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> |
| c0 = c1 = 0 |
| for char in charbuf: |
| c0 += orb(char) |
| c1 += c0 |
| |
| c0 %= 255 |
| c1 %= 255 |
| return (c0,c1) |
| |
| @conf.commands.register |
| def fletcher16_checksum(binbuf): |
| """ Calculates Fletcher-16 checksum of the given buffer. |
| |
| Note: |
| If the buffer contains the two checkbytes derived from the Fletcher-16 checksum |
| the result of this function has to be 0. Otherwise the buffer has been corrupted. |
| """ |
| (c0,c1)= _fletcher16(binbuf) |
| return (c1 << 8) | c0 |
| |
| |
| @conf.commands.register |
| def fletcher16_checkbytes(binbuf, offset): |
| """ Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. |
| |
| Including the bytes into the buffer (at the position marked by offset) the |
| global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify |
| the integrity of the buffer on the receiver side. |
| |
| For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. |
| """ |
| |
| # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> |
| if len(binbuf) < offset: |
| raise Exception("Packet too short for checkbytes %d" % len(binbuf)) |
| |
| binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] |
| (c0,c1)= _fletcher16(binbuf) |
| |
| x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 |
| |
| if (x <= 0): |
| x += 255 |
| |
| y = 510 - c0 - x |
| |
| if (y > 255): |
| y -= 255 |
| return chb(x) + chb(y) |
| |
| |
| def mac2str(mac): |
| return b"".join(chb(int(x, 16)) for x in mac.split(':')) |
| |
| def str2mac(s): |
| if isinstance(s, str): |
| return ("%02x:"*6)[:-1] % tuple(map(ord, s)) |
| return ("%02x:"*6)[:-1] % tuple(s) |
| |
| def randstring(l): |
| """ |
| Returns a random string of length l (l >= 0) |
| """ |
| return b"".join(struct.pack('B', random.randint(0, 255)) for _ in range(l)) |
| |
| def zerofree_randstring(l): |
| """ |
| Returns a random string of length l (l >= 0) without zero in it. |
| """ |
| return b"".join(struct.pack('B', random.randint(1, 255)) for _ in range(l)) |
| |
| def strxor(s1, s2): |
| """ |
| Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 |
| must be of same length. |
| """ |
| return b"".join(map(lambda x,y:chb(orb(x)^orb(y)), s1, s2)) |
| |
| def strand(s1, s2): |
| """ |
| Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 |
| must be of same length. |
| """ |
| return b"".join(map(lambda x,y:chb(orb(x)&orb(y)), s1, s2)) |
| |
| |
| # Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 |
| try: |
| socket.inet_aton("255.255.255.255") |
| except socket.error: |
| def inet_aton(x): |
| if x == "255.255.255.255": |
| return b"\xff"*4 |
| else: |
| return socket.inet_aton(x) |
| else: |
| inet_aton = socket.inet_aton |
| |
| inet_ntoa = socket.inet_ntoa |
| from scapy.pton_ntop import * |
| |
| |
| def atol(x): |
| try: |
| ip = inet_aton(x) |
| except socket.error: |
| ip = inet_aton(socket.gethostbyname(x)) |
| return struct.unpack("!I", ip)[0] |
| def ltoa(x): |
| return inet_ntoa(struct.pack("!I", x&0xffffffff)) |
| |
| def itom(x): |
| return (0xffffffff00000000>>x)&0xffffffff |
| |
| class ContextManagerSubprocess(object): |
| """ |
| Context manager that eases checking for unknown command. |
| |
| Example: |
| >>> with ContextManagerSubprocess("my custom message"): |
| >>> subprocess.Popen(["unknown_command"]) |
| |
| """ |
| def __init__(self, name, prog): |
| self.name = name |
| self.prog = prog |
| |
| def __enter__(self): |
| pass |
| |
| def __exit__(self, exc_type, exc_value, traceback): |
| if isinstance(exc_value, (OSError, TypeError)): |
| msg = "%s: executing %r failed" % (self.name, self.prog) if self.prog else "Could not execute %s, is it installed ?" % self.name |
| if not conf.interactive: |
| raise OSError(msg) |
| else: |
| log_runtime.error(msg, exc_info=True) |
| return True # Suppress the exception |
| |
| class ContextManagerCaptureOutput(object): |
| """ |
| Context manager that intercept the console's output. |
| |
| Example: |
| >>> with ContextManagerCaptureOutput() as cmco: |
| ... print("hey") |
| ... assert cmco.get_output() == "hey" |
| """ |
| def __init__(self): |
| self.result_export_object = "" |
| try: |
| import mock |
| except: |
| raise ImportError("The mock module needs to be installed !") |
| def __enter__(self): |
| import mock |
| def write(s, decorator=self): |
| decorator.result_export_object += s |
| mock_stdout = mock.Mock() |
| mock_stdout.write = write |
| self.bck_stdout = sys.stdout |
| sys.stdout = mock_stdout |
| return self |
| def __exit__(self, *exc): |
| sys.stdout = self.bck_stdout |
| return False |
| def get_output(self, eval_bytes=False): |
| if self.result_export_object.startswith("b'") and eval_bytes: |
| return plain_str(eval(self.result_export_object)) |
| return self.result_export_object |
| |
| def do_graph(graph,prog=None,format=None,target=None,type=None,string=None,options=None): |
| """do_graph(graph, prog=conf.prog.dot, format="svg", |
| target="| conf.prog.display", options=None, [string=1]): |
| string: if not None, simply return the graph string |
| graph: GraphViz graph description |
| format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option |
| target: filename or redirect. Defaults pipe to Imagemagick's display program |
| prog: which graphviz program to use |
| options: options to be passed to prog""" |
| |
| if format is None: |
| if WINDOWS: |
| format = "png" # use common format to make sure a viewer is installed |
| else: |
| format = "svg" |
| if string: |
| return graph |
| if type is not None: |
| format=type |
| if prog is None: |
| prog = conf.prog.dot |
| start_viewer=False |
| if target is None: |
| if WINDOWS: |
| target = get_temp_file(autoext="."+format) |
| start_viewer = True |
| else: |
| with ContextManagerSubprocess("do_graph()", conf.prog.display): |
| target = subprocess.Popen([conf.prog.display], |
| stdin=subprocess.PIPE).stdin |
| if format is not None: |
| format = "-T%s" % format |
| if isinstance(target, str): |
| if target.startswith('|'): |
| target = subprocess.Popen(target[1:].lstrip(), shell=True, |
| stdin=subprocess.PIPE).stdin |
| elif target.startswith('>'): |
| target = open(target[1:].lstrip(), "wb") |
| else: |
| target = open(os.path.abspath(target), "wb") |
| proc = subprocess.Popen("\"%s\" %s %s" % (prog, options or "", format or ""), |
| shell=True, stdin=subprocess.PIPE, stdout=target) |
| proc.stdin.write(raw(graph)) |
| try: |
| target.close() |
| except: |
| pass |
| if start_viewer: |
| # Workaround for file not found error: We wait until tempfile is written. |
| waiting_start = time.time() |
| while not os.path.exists(target.name): |
| time.sleep(0.1) |
| if time.time() - waiting_start > 3: |
| warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) |
| break |
| else: |
| if conf.prog.display == conf.prog._default: |
| os.startfile(target.name) |
| else: |
| with ContextManagerSubprocess("do_graph()", conf.prog.display): |
| subprocess.Popen([conf.prog.display, target.name]) |
| |
| _TEX_TR = { |
| "{":"{\\tt\\char123}", |
| "}":"{\\tt\\char125}", |
| "\\":"{\\tt\\char92}", |
| "^":"\\^{}", |
| "$":"\\$", |
| "#":"\\#", |
| "~":"\\~", |
| "_":"\\_", |
| "&":"\\&", |
| "%":"\\%", |
| "|":"{\\tt\\char124}", |
| "~":"{\\tt\\char126}", |
| "<":"{\\tt\\char60}", |
| ">":"{\\tt\\char62}", |
| } |
| |
| def tex_escape(x): |
| s = "" |
| for c in x: |
| s += _TEX_TR.get(c,c) |
| return s |
| |
| def colgen(*lstcol,**kargs): |
| """Returns a generator that mixes provided quantities forever |
| trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" |
| if len(lstcol) < 2: |
| lstcol *= 2 |
| trans = kargs.get("trans", lambda x,y,z: (x,y,z)) |
| while True: |
| for i in range(len(lstcol)): |
| for j in range(len(lstcol)): |
| for k in range(len(lstcol)): |
| if i != j or j != k or k != i: |
| yield trans(lstcol[(i+j)%len(lstcol)],lstcol[(j+k)%len(lstcol)],lstcol[(k+i)%len(lstcol)]) |
| |
| def incremental_label(label="tag%05i", start=0): |
| while True: |
| yield label % start |
| start += 1 |
| |
| def binrepr(val): |
| return bin(val)[2:] |
| |
| def long_converter(s): |
| return int(s.replace('\n', '').replace(' ', ''), 16) |
| |
| ######################### |
| #### Enum management #### |
| ######################### |
| |
| class EnumElement: |
| _value=None |
| def __init__(self, key, value): |
| self._key = key |
| self._value = value |
| def __repr__(self): |
| return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) |
| def __getattr__(self, attr): |
| return getattr(self._value, attr) |
| def __str__(self): |
| return self._key |
| def __bytes__(self): |
| return raw(self.__str__()) |
| def __hash__(self): |
| return self._value |
| def __int__(self): |
| return int(self._value) |
| def __eq__(self, other): |
| return self._value == int(other) |
| def __neq__(self, other): |
| return not self.__eq__(other) |
| |
| |
| class Enum_metaclass(type): |
| element_class = EnumElement |
| def __new__(cls, name, bases, dct): |
| rdict={} |
| for k,v in six.iteritems(dct): |
| if isinstance(v, int): |
| v = cls.element_class(k,v) |
| dct[k] = v |
| rdict[v] = k |
| dct["__rdict__"] = rdict |
| return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) |
| def __getitem__(self, attr): |
| return self.__rdict__[attr] |
| def __contains__(self, val): |
| return val in self.__rdict__ |
| def get(self, attr, val=None): |
| return self.__rdict__.get(attr, val) |
| def __repr__(self): |
| return "<%s>" % self.__dict__.get("name", self.__name__) |
| |
| |
| |
| ################### |
| ## Object saving ## |
| ################### |
| |
| |
| def export_object(obj): |
| print(bytes_base64(gzip.zlib.compress(six.moves.cPickle.dumps(obj, 2), 9))) |
| |
| def import_object(obj=None): |
| if obj is None: |
| obj = sys.stdin.read() |
| return six.moves.cPickle.loads(gzip.zlib.decompress(base64_bytes(obj.strip()))) |
| |
| |
| def save_object(fname, obj): |
| """Pickle a Python object""" |
| |
| fd = gzip.open(fname, "wb") |
| six.moves.cPickle.dump(obj, fd) |
| fd.close() |
| |
| def load_object(fname): |
| """unpickle a Python object""" |
| return six.moves.cPickle.load(gzip.open(fname,"rb")) |
| |
| @conf.commands.register |
| def corrupt_bytes(s, p=0.01, n=None): |
| """Corrupt a given percentage or number of bytes from a string""" |
| s = array.array("B",raw(s)) |
| l = len(s) |
| if n is None: |
| n = max(1,int(l*p)) |
| for i in random.sample(range(l), n): |
| s[i] = (s[i]+random.randint(1,255))%256 |
| return s.tostring() |
| |
| @conf.commands.register |
| def corrupt_bits(s, p=0.01, n=None): |
| """Flip a given percentage or number of bits from a string""" |
| s = array.array("B",raw(s)) |
| l = len(s)*8 |
| if n is None: |
| n = max(1,int(l*p)) |
| for i in random.sample(range(l), n): |
| s[i // 8] ^= 1 << (i % 8) |
| return s.tostring() |
| |
| |
| |
| |
| ############################# |
| ## pcap capture file stuff ## |
| ############################# |
| |
| @conf.commands.register |
| def wrpcap(filename, pkt, *args, **kargs): |
| """Write a list of packets to a pcap file |
| |
| filename: the name of the file to write packets to, or an open, |
| writable file-like object. The file descriptor will be |
| closed at the end of the call, so do not use an object you |
| do not want to close (e.g., running wrpcap(sys.stdout, []) |
| in interactive mode will crash Scapy). |
| gz: set to 1 to save a gzipped capture |
| linktype: force linktype value |
| endianness: "<" or ">", force endianness |
| sync: do not bufferize writes to the capture file |
| |
| """ |
| with PcapWriter(filename, *args, **kargs) as fdesc: |
| fdesc.write(pkt) |
| |
| @conf.commands.register |
| def rdpcap(filename, count=-1): |
| """Read a pcap or pcapng file and return a packet list |
| |
| count: read only <count> packets |
| |
| """ |
| with PcapReader(filename) as fdesc: |
| return fdesc.read_all(count=count) |
| |
| |
| class PcapReader_metaclass(type): |
| """Metaclass for (Raw)Pcap(Ng)Readers""" |
| |
| def __new__(cls, name, bases, dct): |
| """The `alternative` class attribute is declared in the PcapNg |
| variant, and set here to the Pcap variant. |
| |
| """ |
| newcls = super(PcapReader_metaclass, cls).__new__(cls, name, bases, dct) |
| if 'alternative' in dct: |
| dct['alternative'].alternative = newcls |
| return newcls |
| |
| def __call__(cls, filename): |
| """Creates a cls instance, use the `alternative` if that |
| fails. |
| |
| """ |
| i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) |
| filename, fdesc, magic = cls.open(filename) |
| try: |
| i.__init__(filename, fdesc, magic) |
| except Scapy_Exception: |
| if "alternative" in cls.__dict__: |
| cls = cls.__dict__["alternative"] |
| i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) |
| try: |
| i.__init__(filename, fdesc, magic) |
| except Scapy_Exception: |
| raise |
| try: |
| i.f.seek(-4, 1) |
| except: |
| pass |
| raise Scapy_Exception("Not a supported capture file") |
| |
| return i |
| |
| @staticmethod |
| def open(filename): |
| """Open (if necessary) filename, and read the magic.""" |
| if isinstance(filename, six.string_types): |
| try: |
| fdesc = gzip.open(filename,"rb") |
| magic = fdesc.read(4) |
| except IOError: |
| fdesc = open(filename, "rb") |
| magic = fdesc.read(4) |
| else: |
| fdesc = filename |
| filename = (fdesc.name |
| if hasattr(fdesc, "name") else |
| "No name") |
| magic = fdesc.read(4) |
| return filename, fdesc, magic |
| |
| |
| class RawPcapReader(six.with_metaclass(PcapReader_metaclass)): |
| """A stateful pcap reader. Each packet is returned as a string""" |
| def __init__(self, filename, fdesc, magic): |
| self.filename = filename |
| self.f = fdesc |
| if magic == b"\xa1\xb2\xc3\xd4": # big endian |
| self.endian = ">" |
| self.nano = False |
| elif magic == b"\xd4\xc3\xb2\xa1": # little endian |
| self.endian = "<" |
| self.nano = False |
| elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision |
| self.endian = ">" |
| self.nano = True |
| elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision |
| self.endian = "<" |
| self.nano = True |
| else: |
| raise Scapy_Exception( |
| "Not a pcap capture file (bad magic: %r)" % magic |
| ) |
| hdr = self.f.read(20) |
| if len(hdr)<20: |
| raise Scapy_Exception("Invalid pcap file (too short)") |
| vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( |
| self.endian + "HHIIII", hdr |
| ) |
| self.linktype = linktype |
| |
| def __iter__(self): |
| return self |
| |
| def next(self): |
| """implement the iterator protocol on a set of packets in a pcap file""" |
| pkt = self.read_packet() |
| if pkt == None: |
| raise StopIteration |
| return pkt |
| __next__ = next |
| |
| |
| def read_packet(self, size=MTU): |
| """return a single packet read from the file |
| |
| returns None when no more packets are available |
| """ |
| hdr = self.f.read(16) |
| if len(hdr) < 16: |
| return None |
| sec,usec,caplen,wirelen = struct.unpack(self.endian+"IIII", hdr) |
| s = self.f.read(caplen)[:size] |
| return s,(sec,usec,wirelen) # caplen = len(s) |
| |
| |
| def dispatch(self, callback): |
| """call the specified callback routine for each packet read |
| |
| This is just a convenience function for the main loop |
| that allows for easy launching of packet processing in a |
| thread. |
| """ |
| for p in self: |
| callback(p) |
| |
| def read_all(self,count=-1): |
| """return a list of all packets in the pcap file |
| """ |
| res=[] |
| while count != 0: |
| count -= 1 |
| p = self.read_packet() |
| if p is None: |
| break |
| res.append(p) |
| return res |
| |
| def recv(self, size=MTU): |
| """ Emulate a socket |
| """ |
| return self.read_packet(size=size)[0] |
| |
| def fileno(self): |
| return self.f.fileno() |
| |
| def close(self): |
| return self.f.close() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_value, tracback): |
| self.close() |
| |
| |
| class PcapReader(RawPcapReader): |
| def __init__(self, filename, fdesc, magic): |
| RawPcapReader.__init__(self, filename, fdesc, magic) |
| try: |
| self.LLcls = conf.l2types[self.linktype] |
| except KeyError: |
| warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) |
| self.LLcls = conf.raw_layer |
| def read_packet(self, size=MTU): |
| rp = RawPcapReader.read_packet(self, size=size) |
| if rp is None: |
| return None |
| s,(sec,usec,wirelen) = rp |
| |
| try: |
| p = self.LLcls(s) |
| except KeyboardInterrupt: |
| raise |
| except: |
| if conf.debug_dissector: |
| raise |
| p = conf.raw_layer(s) |
| p.time = sec + (0.000000001 if self.nano else 0.000001) * usec |
| return p |
| def read_all(self,count=-1): |
| res = RawPcapReader.read_all(self, count) |
| from scapy import plist |
| return plist.PacketList(res,name = os.path.basename(self.filename)) |
| def recv(self, size=MTU): |
| return self.read_packet(size=size) |
| |
| |
| class RawPcapNgReader(RawPcapReader): |
| """A stateful pcapng reader. Each packet is returned as a |
| string. |
| |
| """ |
| |
| alternative = RawPcapReader |
| |
| def __init__(self, filename, fdesc, magic): |
| self.filename = filename |
| self.f = fdesc |
| # A list of (linktype, snaplen, tsresol); will be populated by IDBs. |
| self.interfaces = [] |
| self.blocktypes = { |
| 1: self.read_block_idb, |
| 2: self.read_block_pkt, |
| 3: self.read_block_spb, |
| 6: self.read_block_epb, |
| } |
| if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: |
| raise Scapy_Exception( |
| "Not a pcapng capture file (bad magic: %r)" % magic |
| ) |
| # see https://github.com/pcapng/pcapng |
| blocklen, magic = self.f.read(4), self.f.read(4) |
| if magic == b"\x1a\x2b\x3c\x4d": |
| self.endian = ">" |
| elif magic == b"\x4d\x3c\x2b\x1a": |
| self.endian = "<" |
| else: |
| raise Scapy_Exception("Not a pcapng capture file (bad magic)") |
| try: |
| self.f.seek(0) |
| except: |
| pass |
| |
| def read_packet(self, size=MTU): |
| """Read blocks until it reaches either EOF or a packet, and |
| returns None or (packet, (linktype, sec, usec, wirelen)), |
| where packet is a string. |
| |
| """ |
| while True: |
| try: |
| blocktype, blocklen = struct.unpack(self.endian + "2I", |
| self.f.read(8)) |
| except struct.error: |
| return None |
| block = self.f.read(blocklen - 12) |
| if blocklen % 4: |
| pad = self.f.read(4 - (blocklen % 4)) |
| warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " |
| "Ignored padding %r" % (blocklen, pad)) |
| try: |
| if (blocklen,) != struct.unpack(self.endian + 'I', |
| self.f.read(4)): |
| warning("PcapNg: Invalid pcapng block (bad blocklen)") |
| except struct.error: |
| return None |
| res = self.blocktypes.get(blocktype, |
| lambda block, size: None)(block, size) |
| if res is not None: |
| return res |
| |
| def read_block_idb(self, block, _): |
| """Interface Description Block""" |
| options = block[16:] |
| tsresol = 1000000 |
| while len(options) >= 4: |
| code, length = struct.unpack(self.endian + "HH", options[:4]) |
| # PCAP Next Generation (pcapng) Capture File Format |
| # 4.2. - Interface Description Block |
| # http://xml2rfc.tools.ietf.org/cgi-bin/xml2rfc.cgi?url=https://raw.githubusercontent.com/pcapng/pcapng/master/draft-tuexen-opsawg-pcapng.xml&modeAsFormat=html/ascii&type=ascii#rfc.section.4.2 |
| if code == 9 and length == 1 and len(options) >= 5: |
| tsresol = orb(options[4]) |
| tsresol = (2 if tsresol & 128 else 10) ** (tsresol & 127) |
| if code == 0: |
| if length != 0: |
| warning("PcapNg: invalid option length %d for end-of-option" % length) |
| break |
| if length % 4: |
| length += (4 - (length % 4)) |
| options = options[4 + length:] |
| self.interfaces.append(struct.unpack(self.endian + "HxxI", block[:8]) |
| + (tsresol,)) |
| |
| def read_block_epb(self, block, size): |
| """Enhanced Packet Block""" |
| intid, tshigh, tslow, caplen, wirelen = struct.unpack( |
| self.endian + "5I", |
| block[:20], |
| ) |
| return (block[20:20 + caplen][:size], |
| (self.interfaces[intid][0], self.interfaces[intid][2], |
| tshigh, tslow, wirelen)) |
| |
| def read_block_spb(self, block, size): |
| """Simple Packet Block""" |
| # "it MUST be assumed that all the Simple Packet Blocks have |
| # been captured on the interface previously specified in the |
| # first Interface Description Block." |
| intid = 0 |
| wirelen, = struct.unpack(self.endian + "I", block[:4]) |
| caplen = min(wirelen, self.interfaces[intid][1]) |
| return (block[4:4 + caplen][:size], |
| (self.interfaces[intid][0], self.interfaces[intid][2], |
| None, None, wirelen)) |
| |
| def read_block_pkt(self, block, size): |
| """(Obsolete) Packet Block""" |
| intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( |
| self.endian + "HH4I", |
| block[:20], |
| ) |
| return (block[20:20 + caplen][:size], |
| (self.interfaces[intid][0], self.interfaces[intid][2], |
| tshigh, tslow, wirelen)) |
| |
| |
| class PcapNgReader(RawPcapNgReader): |
| |
| alternative = PcapReader |
| |
| def __init__(self, filename, fdesc, magic): |
| RawPcapNgReader.__init__(self, filename, fdesc, magic) |
| |
| def read_packet(self, size=MTU): |
| rp = RawPcapNgReader.read_packet(self, size=size) |
| if rp is None: |
| return None |
| s, (linktype, tsresol, tshigh, tslow, wirelen) = rp |
| try: |
| p = conf.l2types[linktype](s) |
| except KeyboardInterrupt: |
| raise |
| except: |
| if conf.debug_dissector: |
| raise |
| p = conf.raw_layer(s) |
| if tshigh is not None: |
| p.time = float((tshigh << 32) + tslow) / tsresol |
| return p |
| def read_all(self,count=-1): |
| res = RawPcapNgReader.read_all(self, count) |
| from scapy import plist |
| return plist.PacketList(res, name=os.path.basename(self.filename)) |
| def recv(self, size=MTU): |
| return self.read_packet() |
| |
| |
| class RawPcapWriter: |
| """A stream PCAP writer with more control than wrpcap()""" |
| def __init__(self, filename, linktype=None, gz=False, endianness="", |
| append=False, sync=False, nano=False): |
| """ |
| filename: the name of the file to write packets to, or an open, |
| writable file-like object. |
| linktype: force linktype to a given value. If None, linktype is taken |
| from the first writer packet |
| gz: compress the capture on the fly |
| endianness: force an endianness (little:"<", big:">"). Default is native |
| append: append packets to the capture file instead of truncating it |
| sync: do not bufferize writes to the capture file |
| nano: use nanosecond-precision (requires libpcap >= 1.5.0) |
| |
| """ |
| |
| self.linktype = linktype |
| self.header_present = 0 |
| self.append = append |
| self.gz = gz |
| self.endian = endianness |
| self.sync = sync |
| self.nano = nano |
| bufsz=4096 |
| if sync: |
| bufsz = 0 |
| |
| if isinstance(filename, six.string_types): |
| self.filename = filename |
| self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz) |
| else: |
| self.f = filename |
| self.filename = (filename.name |
| if hasattr(filename, "name") else |
| "No name") |
| |
| def fileno(self): |
| return self.f.fileno() |
| |
| def _write_header(self, pkt): |
| self.header_present=1 |
| |
| if self.append: |
| # Even if prone to race conditions, this seems to be |
| # safest way to tell whether the header is already present |
| # because we have to handle compressed streams that |
| # are not as flexible as basic files |
| g = [open,gzip.open][self.gz](self.filename,"rb") |
| if g.read(16): |
| return |
| |
| self.f.write(struct.pack(self.endian+"IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, |
| 2, 4, 0, 0, MTU, self.linktype)) |
| self.f.flush() |
| |
| |
| def write(self, pkt): |
| """accepts either a single packet or a list of packets to be |
| written to the dumpfile |
| |
| """ |
| if isinstance(pkt, str): |
| if not self.header_present: |
| self._write_header(pkt) |
| self._write_packet(pkt) |
| else: |
| pkt = pkt.__iter__() |
| if not self.header_present: |
| try: |
| p = next(pkt) |
| except StopIteration: |
| self._write_header(b"") |
| return |
| self._write_header(p) |
| self._write_packet(p) |
| for p in pkt: |
| self._write_packet(p) |
| |
| def _write_packet(self, packet, sec=None, usec=None, caplen=None, wirelen=None): |
| """writes a single packet to the pcap file |
| """ |
| if isinstance(packet, tuple): |
| for pkt in packet: |
| self._write_packet(pkt, sec=sec, usec=usec, caplen=caplen, |
| wirelen=wirelen) |
| return |
| if caplen is None: |
| caplen = len(packet) |
| if wirelen is None: |
| wirelen = caplen |
| if sec is None or usec is None: |
| t=time.time() |
| it = int(t) |
| if sec is None: |
| sec = it |
| if usec is None: |
| usec = int(round((t - it) * (1000000000 if self.nano else 1000000))) |
| self.f.write(struct.pack(self.endian+"IIII", sec, usec, caplen, wirelen)) |
| self.f.write(packet) |
| if self.sync: |
| self.f.flush() |
| |
| def flush(self): |
| return self.f.flush() |
| |
| def close(self): |
| return self.f.close() |
| |
| def __enter__(self): |
| return self |
| def __exit__(self, exc_type, exc_value, tracback): |
| self.flush() |
| self.close() |
| |
| |
| class PcapWriter(RawPcapWriter): |
| """A stream PCAP writer with more control than wrpcap()""" |
| def _write_header(self, pkt): |
| if isinstance(pkt, tuple) and pkt: |
| pkt = pkt[0] |
| if self.linktype == None: |
| try: |
| self.linktype = conf.l2types[pkt.__class__] |
| except KeyError: |
| warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)", pkt.__class__.__name__) |
| self.linktype = 1 |
| RawPcapWriter._write_header(self, pkt) |
| |
| def _write_packet(self, packet): |
| if isinstance(packet, tuple): |
| for pkt in packet: |
| self._write_packet(pkt) |
| return |
| sec = int(packet.time) |
| usec = int(round((packet.time - sec) * (1000000000 if self.nano else 1000000))) |
| s = raw(packet) |
| caplen = len(s) |
| RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) |
| |
| |
| re_extract_hexcap = re.compile("^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") |
| |
| @conf.commands.register |
| def import_hexcap(): |
| p = "" |
| try: |
| while True: |
| l = input().strip() |
| try: |
| p += re_extract_hexcap.match(l).groups()[2] |
| except: |
| warning("Parsing error during hexcap") |
| continue |
| except EOFError: |
| pass |
| |
| p = p.replace(" ","") |
| return p.decode("hex") |
| |
| |
| |
| @conf.commands.register |
| def wireshark(pktlist): |
| """Run wireshark on a list of packets""" |
| f = get_temp_file() |
| wrpcap(f, pktlist) |
| with ContextManagerSubprocess("wireshark()", conf.prog.wireshark): |
| subprocess.Popen([conf.prog.wireshark, "-r", f]) |
| |
| @conf.commands.register |
| def tcpdump(pktlist, dump=False, getfd=False, args=None, |
| prog=None, getproc=False, quiet=False): |
| """Run tcpdump or tshark on a list of packets |
| |
| pktlist: a Packet instance, a PacketList instance or a list of Packet |
| instances. Can also be a filename (as a string) or an open |
| file-like object that must be a file format readable by |
| tshark (Pcap, PcapNg, etc.) |
| |
| dump: when set to True, returns a string instead of displaying it. |
| getfd: when set to True, returns a file-like object to read data |
| from tcpdump or tshark from. |
| getproc: when set to True, the subprocess.Popen object is returned |
| args: arguments (as a list) to pass to tshark (example for tshark: |
| args=["-T", "json"]). Defaults to ["-n"]. |
| prog: program to use (defaults to tcpdump, will work with tshark) |
| quiet: when set to True, the process stderr is discarded |
| |
| Examples: |
| |
| >>> tcpdump([IP()/TCP(), IP()/UDP()]) |
| reading from file -, link-type RAW (Raw IP) |
| 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 |
| 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] |
| |
| >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) |
| 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 |
| 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 |
| |
| To get a JSON representation of a tshark-parsed PacketList(), one can: |
| >>> import json, pprint |
| >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", dst="45.33.32.156"), |
| ... prog=conf.prog.tshark, args=["-T", "json"], |
| ... getfd=True)) |
| >>> pprint.pprint(json_data) |
| [{u'_index': u'packets-2016-12-23', |
| u'_score': None, |
| u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', |
| u'frame.encap_type': u'7', |
| [...] |
| u'frame.time_relative': u'0.000000000'}, |
| u'ip': {u'ip.addr': u'45.33.32.156', |
| u'ip.checksum': u'0x0000a20d', |
| [...] |
| u'ip.ttl': u'64', |
| u'ip.version': u'4'}, |
| u'raw': u'Raw packet data'}}, |
| u'_type': u'pcap_file'}] |
| >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] |
| u'64' |
| |
| """ |
| getfd = getfd or getproc |
| if prog is None: |
| prog = [conf.prog.tcpdump] |
| elif isinstance(prog, six.string_types): |
| prog = [prog] |
| _prog_name = "windump()" if WINDOWS else "tcpdump()" |
| if pktlist is None: |
| with ContextManagerSubprocess(_prog_name, prog[0]): |
| proc = subprocess.Popen( |
| prog + (args if args is not None else []), |
| stdout=subprocess.PIPE if dump or getfd else None, |
| stderr=open(os.devnull) if quiet else None, |
| ) |
| elif isinstance(pktlist, six.string_types): |
| with ContextManagerSubprocess(_prog_name, prog[0]): |
| proc = subprocess.Popen( |
| prog + ["-r", pktlist] + (args if args is not None else []), |
| stdout=subprocess.PIPE if dump or getfd else None, |
| stderr=open(os.devnull) if quiet else None, |
| ) |
| elif DARWIN: |
| # Tcpdump cannot read from stdin, see |
| # <http://apple.stackexchange.com/questions/152682/> |
| tmpfile = tempfile.NamedTemporaryFile(delete=False) |
| try: |
| tmpfile.writelines(iter(lambda: pktlist.read(1048576), b"")) |
| except AttributeError: |
| wrpcap(tmpfile, pktlist) |
| else: |
| tmpfile.close() |
| with ContextManagerSubprocess(_prog_name, prog[0]): |
| proc = subprocess.Popen( |
| prog + ["-r", tmpfile.name] + (args if args is not None else []), |
| stdout=subprocess.PIPE if dump or getfd else None, |
| stderr=open(os.devnull) if quiet else None, |
| ) |
| conf.temp_files.append(tmpfile.name) |
| else: |
| with ContextManagerSubprocess(_prog_name, prog[0]): |
| proc = subprocess.Popen( |
| prog + ["-r", "-"] + (args if args is not None else []), |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE if dump or getfd else None, |
| stderr=open(os.devnull) if quiet else None, |
| ) |
| try: |
| proc.stdin.writelines(iter(lambda: pktlist.read(1048576), b"")) |
| except AttributeError: |
| wrpcap(proc.stdin, pktlist) |
| else: |
| proc.stdin.close() |
| if dump: |
| return b"".join(iter(lambda: proc.stdout.read(1048576), b"")) |
| if getproc: |
| return proc |
| if getfd: |
| return proc.stdout |
| proc.wait() |
| |
| @conf.commands.register |
| def hexedit(x): |
| x = str(x) |
| f = get_temp_file() |
| open(f,"wb").write(x) |
| with ContextManagerSubprocess("hexedit()", conf.prog.hexedit): |
| subprocess.call([conf.prog.hexedit, f]) |
| x = open(f).read() |
| os.unlink(f) |
| return x |
| |
| def get_terminal_width(): |
| """Get terminal width if in a window""" |
| if WINDOWS: |
| from ctypes import windll, create_string_buffer |
| # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ |
| h = windll.kernel32.GetStdHandle(-12) |
| csbi = create_string_buffer(22) |
| res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) |
| if res: |
| import struct |
| (bufx, bufy, curx, cury, wattr, |
| left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) |
| sizex = right - left + 1 |
| #sizey = bottom - top + 1 |
| return sizex |
| else: |
| return None |
| else: |
| sizex = 0 |
| try: |
| import struct, fcntl, termios |
| s = struct.pack('HHHH', 0, 0, 0, 0) |
| x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) |
| sizex = struct.unpack('HHHH', x)[1] |
| except IOError: |
| pass |
| if not sizex: |
| try: |
| sizex = int(os.environ['COLUMNS']) |
| except: |
| pass |
| if sizex: |
| return sizex |
| else: |
| return None |
| |
| def pretty_list(rtlst, header, sortBy=0): |
| """Pretty list to fit the terminal, and add header""" |
| _l_header = len(header[0]) |
| _space = " " |
| # Sort correctly |
| rtlst.sort(key=lambda x: x[sortBy]) |
| # Append tag |
| rtlst = header + rtlst |
| # Detect column's width |
| colwidth = [max([len(y) for y in x]) for x in zip(*rtlst)] |
| # Make text fit in box (if exist) |
| # TODO: find a better and more precise way of doing this. That's currently working but very complicated |
| width = get_terminal_width() |
| if width: |
| if sum(colwidth) > width: |
| # Needs to be cropped |
| _med = (width // _l_header) - (1 if WINDOWS else 0) # Windows has a fat window border |
| # Crop biggest until size is correct |
| for i in range(1, len(colwidth)): # Should use while, but this is safer |
| if (sum(colwidth)+6) <= width: |
| break |
| _max = max(colwidth) |
| colwidth = [_med if x == _max else x for x in colwidth] |
| def _crop(x, width): |
| _r = x[:width] |
| if _r != x: |
| _r = x[:width-3] |
| return _r + "..." |
| return _r |
| rtlst = [tuple([_crop(rtlst[j][i], colwidth[i]) for i in range(0, len(rtlst[j]))]) for j in range(0, len(rtlst))] |
| # Recalculate column's width |
| colwidth = [max([len(y) for y in x]) for x in zip(*rtlst)] |
| fmt = _space.join(["%%-%ds"%x for x in colwidth]) |
| rt = "\n".join([fmt % x for x in rtlst]) |
| return rt |
| |
| def __make_table(yfmtfunc, fmtfunc, endline, data, fxyz, sortx=None, sorty=None, seplinefunc=None): |
| vx = {} |
| vy = {} |
| vz = {} |
| vxf = {} |
| vyf = {} |
| l = 0 |
| for e in data: |
| xx, yy, zz = [str(s) for s in fxyz(e)] |
| l = max(len(yy),l) |
| vx[xx] = max(vx.get(xx,0), len(xx), len(zz)) |
| vy[yy] = None |
| vz[(xx,yy)] = zz |
| |
| vxk = list(vx) |
| vyk = list(vy) |
| if sortx: |
| vxk.sort(key=sortx) |
| else: |
| try: |
| vxk.sort(key=int) |
| except: |
| try: |
| vxk.sort(key=atol) |
| except: |
| vxk.sort() |
| if sorty: |
| vyk.sort(key=sorty) |
| else: |
| try: |
| vyk.sort(key=int) |
| except: |
| try: |
| vyk.sort(key=atol) |
| except: |
| vyk.sort() |
| |
| |
| if seplinefunc: |
| sepline = seplinefunc(l, [vx[x] for x in vxk]) |
| print(sepline) |
| |
| fmt = yfmtfunc(l) |
| print(fmt % "", end=' ') |
| for x in vxk: |
| vxf[x] = fmtfunc(vx[x]) |
| print(vxf[x] % x, end=' ') |
| print(endline) |
| if seplinefunc: |
| print(sepline) |
| for y in vyk: |
| print(fmt % y, end=' ') |
| for x in vxk: |
| print(vxf[x] % vz.get((x,y), "-"), end=' ') |
| print(endline) |
| if seplinefunc: |
| print(sepline) |
| |
| def make_table(*args, **kargs): |
| __make_table(lambda l:"%%-%is" % l, lambda l:"%%-%is" % l, "", *args, **kargs) |
| |
| def make_lined_table(*args, **kargs): |
| __make_table(lambda l:"%%-%is |" % l, lambda l:"%%-%is |" % l, "", |
| seplinefunc=lambda a,x:"+".join('-'*(y+2) for y in [a-1]+x+[-2]), |
| *args, **kargs) |
| |
| def make_tex_table(*args, **kargs): |
| __make_table(lambda l: "%s", lambda l: "& %s", "\\\\", seplinefunc=lambda a,x:"\\hline", *args, **kargs) |
| |
| ############################################### |
| ### WHOIS CLIENT (not available on windows) ### |
| ############################################### |
| |
| def whois(ip_address): |
| """Whois client for Python""" |
| whois_ip = str(ip_address) |
| try: |
| query = socket.gethostbyname(whois_ip) |
| except: |
| query = whois_ip |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.connect(("whois.ripe.net", 43)) |
| s.send(query.encode("utf8") + b"\r\n") |
| answer = b"" |
| while True: |
| d = s.recv(4096) |
| answer += d |
| if not d: |
| break |
| s.close() |
| ignore_tag = b"remarks:" |
| # ignore all lines starting with the ignore_tag |
| lines = [ line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] |
| # remove empty lines at the bottom |
| for i in range(1, len(lines)): |
| if not lines[-i].strip(): |
| del lines[-i] |
| else: |
| break |
| return b"\n".join(lines[3:]) |