| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## Copyright (C) Gabriel Potter <gabriel@potter.fr> |
| ## This program is published under a GPLv2 license |
| |
| """ |
| Customizations needed to support Microsoft Windows. |
| """ |
| from __future__ import absolute_import |
| from __future__ import print_function |
| import os, re, sys, socket, time, itertools, platform |
| import subprocess as sp |
| from glob import glob |
| import tempfile |
| from threading import Thread, Event |
| |
| import scapy |
| from scapy.config import conf, ConfClass |
| from scapy.error import Scapy_Exception, log_loading, log_runtime, warning |
| from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader, pretty_list |
| from scapy.utils6 import construct_source_candidate_set |
| from scapy.base_classes import Gen, Net, SetGen |
| from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP |
| |
| import scapy.modules.six as six |
| from scapy.modules.six.moves import range, zip, input |
| from scapy.compat import plain_str |
| |
| conf.use_pcap = False |
| conf.use_dnet = False |
| conf.use_winpcapy = True |
| |
| WINDOWS = (os.name == 'nt') |
| NEW_RELEASE = None |
| |
| #hot-patching socket for missing variables on Windows |
| import socket |
| if not hasattr(socket, 'IPPROTO_IPIP'): |
| socket.IPPROTO_IPIP=4 |
| if not hasattr(socket, 'IPPROTO_AH'): |
| socket.IPPROTO_AH=51 |
| if not hasattr(socket, 'IPPROTO_ESP'): |
| socket.IPPROTO_ESP=50 |
| if not hasattr(socket, 'IPPROTO_GRE'): |
| socket.IPPROTO_GRE=47 |
| |
| from scapy.arch import pcapdnet |
| from scapy.arch.pcapdnet import * |
| |
| _WlanHelper = NPCAP_PATH + "\\WlanHelper.exe" |
| |
| import scapy.consts |
| |
| def is_new_release(ignoreVBS=False): |
| if NEW_RELEASE and conf.prog.powershell is not None: |
| return True |
| release = platform.release() |
| if conf.prog.powershell is None and not ignoreVBS: |
| return False |
| try: |
| if float(release) >= 8: |
| return True |
| except ValueError: |
| if (release=="post2008Server"): |
| return True |
| return False |
| |
| def _encapsulate_admin(cmd): |
| """Encapsulate a command with an Administrator flag""" |
| # To get admin access, we start a new powershell instance with admin |
| # rights, which will execute the command |
| return "Start-Process PowerShell -windowstyle hidden -Wait -Verb RunAs -ArgumentList '-command &{%s}'" % cmd |
| |
| class _PowershellManager(Thread): |
| """Instance used to send multiple commands on the same Powershell process. |
| Will be instantiated on loading and automatically stopped. |
| """ |
| def __init__(self): |
| # Start & redirect input |
| if conf.prog.powershell: |
| self.process = sp.Popen([conf.prog.powershell, |
| "-NoLogo", "-NonInteractive", # Do not print headers |
| "-Command", "-"], # Listen commands from stdin |
| stdout=sp.PIPE, |
| stdin=sp.PIPE, |
| stderr=sp.STDOUT) |
| self.cmd = False |
| else: # Fallback on CMD (powershell-only commands will fail, but scapy use the VBS fallback) |
| self.process = sp.Popen([conf.prog.cmd], |
| stdout=sp.PIPE, |
| stdin=sp.PIPE, |
| stderr=sp.STDOUT) |
| self.cmd = True |
| self.buffer = [] |
| self.running = True |
| self.query_complete = Event() |
| Thread.__init__(self) |
| self.daemon = True |
| self.start() |
| if self.cmd: |
| self.query(["echo @off"]) # Remove header |
| else: |
| self.query(["$FormatEnumerationLimit=-1"]) # Do not crop long IP lists |
| |
| def run(self): |
| while self.running: |
| read_line = self.process.stdout.readline().strip() |
| if read_line == b"scapy_end": |
| self.query_complete.set() |
| else: |
| self.buffer.append(read_line.decode("utf8", "ignore") if six.PY3 else read_line) |
| |
| def query(self, command): |
| self.query_complete.clear() |
| if not self.running: |
| self.__init__(self) |
| # Call powershell query using running process |
| self.buffer = [] |
| # 'scapy_end' is used as a marker of the end of execution |
| query = " ".join(command) + ("&" if self.cmd else ";") + " echo scapy_end\n" |
| self.process.stdin.write(query.encode()) |
| self.process.stdin.flush() |
| self.query_complete.wait() |
| return self.buffer[1:] # Crops first line: the command |
| |
| def close(self): |
| self.running = False |
| try: |
| self.process.stdin.write("exit\n") |
| self.process.terminate() |
| except: |
| pass |
| |
| def _exec_query_ps(cmd, fields): |
| """Execute a PowerShell query, using the cmd command, |
| and select and parse the provided fields. |
| """ |
| if not conf.prog.powershell: |
| raise OSError("Scapy could not detect powershell !") |
| # Build query |
| query_cmd = cmd + ['|', 'select %s' % ', '.join(fields), # select fields |
| '|', 'fl', # print as a list |
| '|', 'out-string', '-Width', '4096'] # do not crop |
| l=[] |
| # Ask the powershell manager to process the query |
| stdout = POWERSHELL_PROCESS.query(query_cmd) |
| # Process stdout |
| for line in stdout: |
| if not line.strip(): # skip empty lines |
| continue |
| sl = line.split(':', 1) |
| if len(sl) == 1: |
| l[-1] += sl[0].strip() |
| continue |
| else: |
| l.append(sl[1].strip()) |
| if len(l) == len(fields): |
| yield l |
| l=[] |
| |
| def _vbs_exec_code(code, split_tag="@"): |
| if not conf.prog.cscript: |
| raise OSError("Scapy could not detect cscript !") |
| tmpfile = tempfile.NamedTemporaryFile(mode="wb", suffix=".vbs", delete=False) |
| tmpfile.write(raw(code)) |
| tmpfile.close() |
| ps = sp.Popen([conf.prog.cscript, tmpfile.name], |
| stdout=sp.PIPE, stderr=open(os.devnull), |
| universal_newlines=True) |
| for _ in range(3): |
| # skip 3 first lines |
| ps.stdout.readline() |
| for line in ps.stdout: |
| data = line.replace("\n", "").split(split_tag) |
| for l in data: |
| yield l |
| os.unlink(tmpfile.name) |
| |
| def _vbs_get_hardware_iface_guid(devid): |
| try: |
| devid = str(int(devid) + 1) |
| guid = next(iter(_vbs_exec_code("""WScript.Echo CreateObject("WScript.Shell").RegRead("HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\NetworkCards\\%s\\ServiceName") |
| """ % devid))) |
| guid = guid[:-1] if guid.endswith('}\n') else guid |
| if guid.startswith('{') and guid.endswith('}'): |
| return guid |
| except StopIteration: |
| return None |
| |
| # Some names differ between VBS and PS |
| ## None: field will not be returned under VBS |
| _VBS_WMI_FIELDS = { |
| "Win32_NetworkAdapter": { |
| "InterfaceDescription": "Description", |
| # Note: when using VBS, the GUID is not the same than with Powershell |
| # So we use get the device ID instead, then use _vbs_get_hardware_iface_guid |
| # To get its real GUID |
| "GUID": "DeviceID" |
| }, |
| "*": { |
| "Status": "State" |
| } |
| } |
| |
| _VBS_WMI_REPLACE = { |
| "Win32_NetworkAdapterConfiguration": { |
| "line.IPAddress": "\"{\" & Join( line.IPAddress, \", \" ) & \"}\"", |
| } |
| } |
| |
| _VBS_WMI_OUTPUT = { |
| "Win32_NetworkAdapter": { |
| "DeviceID": _vbs_get_hardware_iface_guid, |
| } |
| } |
| |
| def _exec_query_vbs(cmd, fields): |
| """Execute a query using VBS. Currently Get-WmiObject, Get-Service |
| queries are supported. |
| |
| """ |
| if not(len(cmd) == 2 and cmd[0] in ["Get-WmiObject", "Get-Service"]): |
| return |
| action = cmd[0] |
| fields = [_VBS_WMI_FIELDS.get(cmd[1], _VBS_WMI_FIELDS.get("*", {})).get(fld, fld) for fld in fields] |
| parsed_command = "WScript.Echo " + " & \" @ \" & ".join("line.%s" % fld for fld in fields |
| if fld is not None) |
| # The IPAddress is an array: convert it to a string |
| for key,val in _VBS_WMI_REPLACE.get(cmd[1], {}).items(): |
| parsed_command = parsed_command.replace(key, val) |
| if action == "Get-WmiObject": |
| values = _vbs_exec_code("""Set wmi = GetObject("winmgmts:") |
| Set lines = wmi.InstancesOf("%s") |
| On Error Resume Next |
| Err.clear |
| For Each line in lines |
| %s |
| Next |
| """ % (cmd[1], parsed_command), "@") |
| elif action == "Get-Service": |
| values = _vbs_exec_code("""serviceName = "%s" |
| Set wmi = GetObject("winmgmts://./root/cimv2") |
| Set line = wmi.Get("Win32_Service.Name='" & serviceName & "'") |
| %s |
| """ % (cmd[1], parsed_command), "@") |
| |
| while True: |
| yield [None if fld is None else |
| _VBS_WMI_OUTPUT.get(cmd[1], {}).get(fld, lambda x: x)( |
| next(values).strip() |
| ) |
| for fld in fields] |
| |
| def exec_query(cmd, fields): |
| """Execute a system query using PowerShell if it is available, and |
| using VBS/cscript as a fallback. |
| |
| """ |
| if conf.prog.powershell is None: |
| return _exec_query_vbs(cmd, fields) |
| return _exec_query_ps(cmd, fields) |
| |
| def _where(filename, dirs=None, env="PATH"): |
| """Find file in current dir, in deep_lookup cache or in system path""" |
| if dirs is None: |
| dirs = [] |
| if not isinstance(dirs, list): |
| dirs = [dirs] |
| if glob(filename): |
| return filename |
| paths = [os.curdir] + os.environ[env].split(os.path.pathsep) + dirs |
| for path in paths: |
| for match in glob(os.path.join(path, filename)): |
| if match: |
| return os.path.normpath(match) |
| raise IOError("File not found: %s" % filename) |
| |
| def win_find_exe(filename, installsubdir=None, env="ProgramFiles"): |
| """Find executable in current dir, system path or given ProgramFiles subdir""" |
| fns = [filename] if filename.endswith(".exe") else [filename+".exe", filename] |
| for fn in fns: |
| try: |
| if installsubdir is None: |
| path = _where(fn) |
| else: |
| path = _where(fn, dirs=[os.path.join(os.environ[env], installsubdir)]) |
| except IOError: |
| path = None |
| else: |
| break |
| return path |
| |
| |
| class WinProgPath(ConfClass): |
| _default = "<System default>" |
| def __init__(self): |
| self._reload() |
| |
| def _reload(self): |
| # We try some magic to find the appropriate executables |
| self.pdfreader = win_find_exe("AcroRd32") |
| self.psreader = win_find_exe("gsview32") |
| self.dot = win_find_exe("dot") |
| self.tcpdump = win_find_exe("windump") |
| self.tshark = win_find_exe("tshark") |
| self.tcpreplay = win_find_exe("tcpreplay") |
| self.display = self._default |
| self.hexedit = win_find_exe("hexer") |
| self.sox = win_find_exe("sox") |
| self.wireshark = win_find_exe("wireshark", "wireshark") |
| self.powershell = win_find_exe( |
| "powershell", |
| installsubdir="System32\\WindowsPowerShell\\v1.0", |
| env="SystemRoot" |
| ) |
| self.cscript = win_find_exe("cscript", installsubdir="System32", |
| env="SystemRoot") |
| self.cmd = win_find_exe("cmd", installsubdir="System32", |
| env="SystemRoot") |
| if self.wireshark: |
| manu_path = load_manuf(os.path.sep.join(self.wireshark.split(os.path.sep)[:-1])+os.path.sep+"manuf") |
| scapy.data.MANUFDB = conf.manufdb = manu_path |
| |
| self.os_access = (self.powershell is not None) or (self.cscript is not None) |
| |
| conf.prog = WinProgPath() |
| if not conf.prog.os_access: |
| warning("Scapy did not detect powershell and cscript ! Routes, interfaces and much more won't work !", onlyOnce=True) |
| |
| if conf.prog.tcpdump and conf.use_npcap and conf.prog.os_access: |
| def test_windump_npcap(): |
| """Return wether windump version is correct or not""" |
| try: |
| p_test_windump = sp.Popen([conf.prog.tcpdump, "-help"], stdout=sp.PIPE, stderr=sp.STDOUT) |
| stdout, err = p_test_windump.communicate() |
| _output = stdout.lower() |
| return b"npcap" in _output and not b"winpcap" in _output |
| except: |
| return False |
| windump_ok = test_windump_npcap() |
| if not windump_ok: |
| warning("The installed Windump version does not work with Npcap ! Refer to 'Winpcap/Npcap conflicts' in scapy's doc", onlyOnce=True) |
| del windump_ok |
| |
| # Auto-detect release |
| NEW_RELEASE = is_new_release() |
| |
| class PcapNameNotFoundError(Scapy_Exception): |
| pass |
| |
| def is_interface_valid(iface): |
| if "guid" in iface and iface["guid"]: |
| # Fix '-' instead of ':' |
| if "mac" in iface: |
| iface["mac"] = iface["mac"].replace("-", ":") |
| return True |
| return False |
| |
| def get_windows_if_list(): |
| """Returns windows interfaces.""" |
| if not conf.prog.os_access: |
| return [] |
| if is_new_release(): |
| # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed |
| # Careful: this is weird, but Get-NetAdaptater works like: (Name isn't the interface name) |
| # Name InterfaceDescription ifIndex Status MacAddress LinkSpeed |
| # ---- -------------------- ------- ------ ---------- --------- |
| # Ethernet Killer E2200 Gigabit Ethernet Contro... 13 Up D0-50-99-56-DD-F9 1 Gbps |
| query = exec_query(['Get-NetAdapter'], |
| ['InterfaceDescription', 'InterfaceIndex', 'Name', |
| 'InterfaceGuid', 'MacAddress', 'InterfaceAlias']) # It is normal that it is in this order |
| else: |
| query = exec_query(['Get-WmiObject', 'Win32_NetworkAdapter'], |
| ['Name', 'InterfaceIndex', 'InterfaceDescription', |
| 'GUID', 'MacAddress', 'NetConnectionID']) |
| return [ |
| iface for iface in |
| (dict(zip(['name', 'win_index', 'description', 'guid', 'mac', 'netid'], line)) |
| for line in query) |
| if is_interface_valid(iface) |
| ] |
| |
| def get_ips(v6=False): |
| """Returns all available IPs matching to interfaces, using the windows system. |
| Should only be used as a WinPcapy fallback.""" |
| res = {} |
| for descr, ipaddr in exec_query(['Get-WmiObject', |
| 'Win32_NetworkAdapterConfiguration'], |
| ['Description', 'IPAddress']): |
| if ipaddr.strip(): |
| res[descr] = ipaddr.split(",", 1)[v6].strip('{}').strip() |
| return res |
| |
| def get_ip_from_name(ifname, v6=False): |
| """Backward compatibility: indirectly calls get_ips |
| Deprecated.""" |
| return get_ips(v6=v6).get(ifname, "") |
| |
| class NetworkInterface(object): |
| """A network interface of your local host""" |
| |
| def __init__(self, data=None): |
| self.name = None |
| self.ip = None |
| self.mac = None |
| self.pcap_name = None |
| self.description = None |
| self.data = data |
| self.invalid = False |
| self.raw80211 = None |
| if data is not None: |
| self.update(data) |
| |
| def update(self, data): |
| """Update info about network interface according to given dnet dictionary""" |
| if 'netid' in data and data['netid'] == scapy.consts.LOOPBACK_NAME: |
| # Force LOOPBACK_NAME: Some Windows systems overwrite 'name' |
| self.name = scapy.consts.LOOPBACK_NAME |
| else: |
| self.name = data['name'] |
| self.description = data['description'] |
| self.win_index = data['win_index'] |
| self.guid = data['guid'] |
| if 'invalid' in data: |
| self.invalid = data['invalid'] |
| # Other attributes are optional |
| self._update_pcapdata() |
| |
| try: |
| # Npcap loopback interface |
| if self.name == scapy.consts.LOOPBACK_NAME and conf.use_npcap: |
| # https://nmap.org/npcap/guide/npcap-devguide.html |
| self.mac = "00:00:00:00:00:00" |
| self.ip = "127.0.0.1" |
| conf.cache_ipaddrs[self.pcap_name] = socket.inet_aton(self.ip) |
| return |
| else: |
| self.mac = data['mac'] |
| except KeyError: |
| pass |
| |
| try: |
| self.ip = socket.inet_ntoa(get_if_raw_addr(self)) |
| except (TypeError, NameError): |
| pass |
| |
| try: |
| # Windows native loopback interface |
| if not self.ip and self.name == scapy.consts.LOOPBACK_NAME: |
| self.ip = "127.0.0.1" |
| conf.cache_ipaddrs[self.pcap_name] = socket.inet_aton(self.ip) |
| except (KeyError, AttributeError, NameError) as e: |
| print(e) |
| |
| def _update_pcapdata(self): |
| if self.is_invalid(): |
| return |
| for i in get_if_list(): |
| if i.endswith(self.data['guid']): |
| self.pcap_name = i |
| return |
| |
| raise PcapNameNotFoundError |
| |
| def is_invalid(self): |
| return self.invalid |
| |
| def _check_npcap_requirement(self): |
| if not conf.use_npcap: |
| raise OSError("This operation requires Npcap.") |
| if self.raw80211 is None: |
| # This checks if npcap has Dot11 enabled and if the interface is compatible, |
| # by looking for the npcap/Parameters/Dot11Adapters key in the registry. |
| try: |
| dot11adapters = next(iter(_vbs_exec_code("""WScript.Echo CreateObject("WScript.Shell").RegRead("HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Services\\npcap\\Parameters\\Dot11Adapters")"""))) |
| except StopIteration: |
| pass |
| else: |
| self.raw80211 = ("\\Device\\" + self.guid).lower() in dot11adapters.lower() |
| if not self.raw80211: |
| raise Scapy_Exception("This interface does not support raw 802.11") |
| |
| def mode(self): |
| """Get the interface operation mode. |
| Only available with Npcap.""" |
| self._check_npcap_requirement() |
| return sp.Popen([_WlanHelper, self.guid[1:-1], "mode"], stdout=sp.PIPE).communicate()[0].strip() |
| |
| def availablemodes(self): |
| """Get all available interface modes. |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.Popen([_WlanHelper, self.guid[1:-1], "modes"], stdout=sp.PIPE).communicate()[0].strip().split(",") |
| |
| def setmode(self, mode): |
| """Set the interface mode. It can be: |
| - 0 or managed: Managed Mode (aka "Extensible Station Mode") |
| - 1 or monitor: Monitor Mode (aka "Network Monitor Mode") |
| - 2 or master: Master Mode (aka "Extensible Access Point") (supported from Windows 7 and later) |
| - 3 or wfd_device: The Wi-Fi Direct Device operation mode (supported from Windows 8 and later) |
| - 4 or wfd_owner: The Wi-Fi Direct Group Owner operation mode (supported from Windows 8 and later) |
| - 5 or wfd_client: The Wi-Fi Direct Client operation mode (supported from Windows 8 and later) |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| _modes = { |
| 0: "managed", |
| 1: "monitor", |
| 2: "master", |
| 3: "wfd_device", |
| 4: "wfd_owner", |
| 5: "wfd_client" |
| } |
| m = _modes.get(mode, "unknown") if isinstance(mode, int) else mode |
| return sp.call(_WlanHelper + " " + self.guid[1:-1] + " mode " + m) |
| |
| def channel(self): |
| """Get the channel of the interface. |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.Popen([_WlanHelper, self.guid[1:-1], "channel"], stdout=sp.PIPE).communicate()[0].strip().strip() |
| |
| def setchannel(self, channel): |
| """Set the channel of the interface (1-14): |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.call(_WlanHelper + " " + self.guid[1:-1] + " channel " + str(channel)) |
| |
| def frequence(self): |
| """Get the frequence of the interface. |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.Popen([_WlanHelper, self.guid[1:-1], "freq"], stdout=sp.PIPE).communicate()[0].strip() |
| |
| def setfrequence(self, freq): |
| """Set the channel of the interface (1-14): |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.call(_WlanHelper + " " + self.guid[1:-1] + " freq " + str(freq)) |
| |
| def availablemodulations(self): |
| """Get all available 802.11 interface modulations. |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.Popen([_WlanHelper, self.guid[1:-1], "modus"], stdout=sp.PIPE).communicate()[0].strip().split(",") |
| |
| def modulation(self): |
| """Get the 802.11 modulation of the interface. |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| return sp.Popen([_WlanHelper, self.guid[1:-1], "modu"], stdout=sp.PIPE).communicate()[0].strip() |
| |
| def setmodulation(self, modu): |
| """Set the interface modulation. It can be: |
| - 0: dsss |
| - 1: fhss |
| - 2: irbaseband |
| - 3: ofdm |
| - 4: hrdss |
| - 5: erp |
| - 6: ht |
| - 7: vht |
| - 8: ihv |
| - 9: mimo-ofdm |
| - 10: mimo-ofdm |
| Only available with Npcap.""" |
| # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 |
| self._check_npcap_requirement() |
| _modus = { |
| 0: "dsss", |
| 1: "fhss", |
| 2: "irbaseband", |
| 3: "ofdm", |
| 4: "hrdss", |
| 5: "erp", |
| 6: "ht", |
| 7: "vht", |
| 8: "ihv", |
| 9: "mimo-ofdm", |
| 10: "mimo-ofdm", |
| } |
| m = _modus.get(modu, "unknown") if isinstance(modu, int) else modu |
| return sp.call(_WlanHelper + " " + self.guid[1:-1] + " mode " + m) |
| |
| def __repr__(self): |
| return "<%s %s %s>" % (self.__class__.__name__, self.name, self.guid) |
| |
| def pcap_service_name(): |
| """Return the pcap adapter service's name""" |
| return "npcap" if conf.use_npcap else "npf" |
| |
| def pcap_service_status(): |
| """Returns a tuple (name, description, started) of the windows pcap adapter""" |
| for i in exec_query(['Get-Service', pcap_service_name()], ['Name', 'DisplayName', 'Status']): |
| name = i[0] |
| description = i[1] |
| started = (i[2].lower().strip() == 'running') |
| if name == pcap_service_name(): |
| return (name, description, started) |
| return (None, None, None) |
| |
| def pcap_service_control(action, askadmin=True): |
| """Util to run pcap control command""" |
| if not conf.prog.powershell: |
| return False |
| command = action + ' ' + pcap_service_name() |
| stdout = POWERSHELL_PROCESS.query([_encapsulate_admin(command) if askadmin else command]) |
| return "error" not in "".join(stdout).lower() |
| |
| def pcap_service_start(askadmin=True): |
| """Starts the pcap adapter. Will ask for admin. Returns True if success""" |
| return pcap_service_control('Start-Service', askadmin=askadmin) |
| |
| def pcap_service_stop(askadmin=True): |
| """Stops the pcap adapter. Will ask for admin. Returns True if success""" |
| return pcap_service_control('Stop-Service', askadmin=askadmin) |
| |
| from scapy.modules.six.moves import UserDict |
| |
| class NetworkInterfaceDict(UserDict): |
| """Store information about network interfaces and convert between names""" |
| def load_from_powershell(self): |
| if not conf.prog.os_access: |
| return |
| ifaces_ips = None |
| for i in get_windows_if_list(): |
| try: |
| interface = NetworkInterface(i) |
| self.data[interface.guid] = interface |
| # If no IP address was detected using winpcap and if |
| # the interface is not the loopback one, look for |
| # internal windows interfaces |
| if not interface.ip: |
| if not ifaces_ips: # ifaces_ips is used as a cache |
| ifaces_ips = get_ips() |
| # If it exists, retrieve the interface's IP from the cache |
| interface.ip = ifaces_ips.get(interface.name, "") |
| except (KeyError, PcapNameNotFoundError): |
| pass |
| |
| if not self.data and conf.use_winpcapy: |
| _detect = pcap_service_status() |
| def _ask_user(): |
| if not conf.interactive: |
| return False |
| while True: |
| _confir = input("Do you want to start it ? (yes/no) [y]: ").lower().strip() |
| if _confir in ["yes", "y", ""]: |
| return True |
| elif _confir in ["no", "n"]: |
| return False |
| return False |
| _error_msg = "No match between your pcap and windows network interfaces found. " |
| if _detect[0] and not _detect[2] and not (hasattr(self, "restarted_adapter") and self.restarted_adapter): |
| warning("Scapy has detected that your pcap service is not running !") |
| if not conf.interactive or _ask_user(): |
| succeed = pcap_service_start(askadmin=conf.interactive) |
| self.restarted_adapter = True |
| if succeed: |
| log_loading.info("Pcap service started !") |
| self.load_from_powershell() |
| return |
| _error_msg = "Could not start the pcap service ! " |
| warning(_error_msg + |
| "You probably won't be able to send packets. " |
| "Deactivating unneeded interfaces and restarting Scapy might help. " |
| "Check your winpcap and powershell installation, and access rights.", onlyOnce=True) |
| else: |
| # Loading state: remove invalid interfaces |
| self.remove_invalid_ifaces() |
| # Replace LOOPBACK_INTERFACE |
| try: |
| scapy.consts.LOOPBACK_INTERFACE = self.dev_from_name( |
| scapy.consts.LOOPBACK_NAME, |
| ) |
| except: |
| pass |
| |
| def dev_from_name(self, name): |
| """Return the first pcap device name for a given Windows |
| device name. |
| """ |
| for iface in six.itervalues(self): |
| if iface.name == name: |
| return iface |
| raise ValueError("Unknown network interface %r" % name) |
| |
| def dev_from_pcapname(self, pcap_name): |
| """Return Windows device name for given pcap device name.""" |
| for iface in six.itervalues(self): |
| if iface.pcap_name == pcap_name: |
| return iface |
| raise ValueError("Unknown pypcap network interface %r" % pcap_name) |
| |
| def dev_from_index(self, if_index): |
| """Return interface name from interface index""" |
| for devname, iface in six.iteritems(self): |
| if iface.win_index == str(if_index): |
| return iface |
| if str(if_index) == "1": |
| # Test if the loopback interface is set up |
| if isinstance(scapy.consts.LOOPBACK_INTERFACE, NetworkInterface): |
| return scapy.consts.LOOPBACK_INTERFACE |
| raise ValueError("Unknown network interface index %r" % if_index) |
| |
| def remove_invalid_ifaces(self): |
| """Remove all invalid interfaces""" |
| for devname in list(self.keys()): |
| iface = self.data[devname] |
| if iface.is_invalid(): |
| self.data.pop(devname) |
| |
| def reload(self): |
| """Reload interface list""" |
| self.restarted_adapter = False |
| self.data.clear() |
| self.load_from_powershell() |
| |
| def show(self, resolve_mac=True, print_result=True): |
| """Print list of available network interfaces in human readable form""" |
| res = [] |
| for iface_name in sorted(self.data): |
| dev = self.data[iface_name] |
| mac = dev.mac |
| if resolve_mac and conf.manufdb: |
| mac = conf.manufdb._resolve_MAC(mac) |
| res.append((str(dev.win_index).ljust(5), str(dev.name).ljust(35), str(dev.ip).ljust(15), mac)) |
| |
| res = pretty_list(res, [("INDEX", "IFACE", "IP", "MAC")]) |
| if print_result: |
| print(res) |
| else: |
| return res |
| |
| def __repr__(self): |
| return self.show(print_result=False) |
| |
| # Init POWERSHELL_PROCESS |
| POWERSHELL_PROCESS = _PowershellManager() |
| |
| IFACES = NetworkInterfaceDict() |
| IFACES.load_from_powershell() |
| |
| def pcapname(dev): |
| """Return pypcap device name for given interface or libdnet/Scapy |
| device name. |
| |
| """ |
| if isinstance(dev, NetworkInterface): |
| if dev.is_invalid(): |
| return None |
| return dev.pcap_name |
| try: |
| return IFACES.dev_from_name(dev).pcap_name |
| except ValueError: |
| if conf.use_pcap: |
| # pcap.pcap() will choose a sensible default for sniffing if |
| # iface=None |
| return None |
| raise |
| |
| def dev_from_pcapname(pcap_name): |
| """Return libdnet/Scapy device name for given pypcap device name""" |
| return IFACES.dev_from_pcapname(pcap_name) |
| |
| def dev_from_index(if_index): |
| """Return Windows adapter name for given Windows interface index""" |
| return IFACES.dev_from_index(if_index) |
| |
| def show_interfaces(resolve_mac=True): |
| """Print list of available network interfaces""" |
| return IFACES.show(resolve_mac) |
| |
| _orig_open_pcap = pcapdnet.open_pcap |
| pcapdnet.open_pcap = lambda iface,*args,**kargs: _orig_open_pcap(pcapname(iface),*args,**kargs) |
| |
| get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr = lambda iface, *args, **kargs: ( |
| ARPHDR_ETHER, mac2str(IFACES.dev_from_pcapname(pcapname(iface)).mac) |
| ) |
| |
| def _read_routes_xp(): |
| # The InterfaceIndex in Win32_IP4RouteTable does not match the |
| # InterfaceIndex in Win32_NetworkAdapter under some platforms |
| # (namely Windows XP): let's try an IP association |
| routes = [] |
| partial_routes = [] |
| # map local IP addresses to interfaces |
| local_addresses = {iface.ip: iface for iface in six.itervalues(IFACES)} |
| iface_indexes = {} |
| for line in exec_query(['Get-WmiObject', 'Win32_IP4RouteTable'], |
| ['Name', 'Mask', 'NextHop', 'InterfaceIndex', 'Metric1']): |
| if line[2] in local_addresses: |
| iface = local_addresses[line[2]] |
| # This gives us an association InterfaceIndex <-> interface |
| iface_indexes[line[3]] = iface |
| routes.append((atol(line[0]), atol(line[1]), "0.0.0.0", iface, |
| iface.ip, int(line[4]))) |
| else: |
| partial_routes.append((atol(line[0]), atol(line[1]), line[2], |
| line[3], int(line[4]))) |
| for dst, mask, gw, ifidx, metric in partial_routes: |
| if ifidx in iface_indexes: |
| iface = iface_indexes[ifidx] |
| routes.append((dst, mask, gw, iface, iface.ip, metric)) |
| return routes |
| |
| def _read_routes_7(): |
| routes=[] |
| for line in exec_query(['Get-WmiObject', 'Win32_IP4RouteTable'], |
| ['Name', 'Mask', 'NextHop', 'InterfaceIndex', 'Metric1']): |
| try: |
| iface = dev_from_index(line[3]) |
| ip = "127.0.0.1" if line[3] == "1" else iface.ip # Force loopback on iface 1 |
| routes.append((atol(line[0]), atol(line[1]), line[2], iface, ip, int(line[4]))) |
| except ValueError: |
| continue |
| return routes |
| |
| def read_routes(): |
| routes = [] |
| if not conf.prog.os_access: |
| return routes |
| release = platform.release() |
| try: |
| if is_new_release(): |
| routes = _read_routes_post2008() |
| elif release == "XP": |
| routes = _read_routes_xp() |
| else: |
| routes = _read_routes_7() |
| except Exception as e: |
| warning("Error building scapy IPv4 routing table : %s", e, onlyOnce=True) |
| else: |
| if not routes: |
| warning("No default IPv4 routes found. Your Windows release may no be supported and you have to enter your routes manually", onlyOnce=True) |
| return routes |
| |
| def _get_metrics(ipv6=False): |
| """Returns a dict containing all IPv4 or IPv6 interfaces' metric, |
| ordered by their interface index. |
| """ |
| query_cmd = "netsh interface " + ("ipv6" if ipv6 else "ipv4") + " show interfaces level=verbose" |
| stdout = POWERSHELL_PROCESS.query([query_cmd]) |
| res = {} |
| _buffer = [] |
| _pattern = re.compile(".*:\s+(\d+)") |
| for _line in stdout: |
| if not _line.strip() and len(_buffer) > 0: |
| if_index = re.search(_pattern, _buffer[3]).group(1) |
| if_metric = int(re.search(_pattern, _buffer[5]).group(1)) |
| res[if_index] = if_metric |
| _buffer = [] |
| else: |
| _buffer.append(_line) |
| return res |
| |
| def _read_routes_post2008(): |
| routes = [] |
| if4_metrics = None |
| # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed |
| # Get-NetRoute -AddressFamily IPV4 | select ifIndex, DestinationPrefix, NextHop, RouteMetric, InterfaceMetric | fl |
| for line in exec_query(['Get-NetRoute', '-AddressFamily IPV4'], ['ifIndex', 'DestinationPrefix', 'NextHop', 'RouteMetric', 'InterfaceMetric']): |
| try: |
| iface = dev_from_index(line[0]) |
| if iface.ip == "0.0.0.0": |
| continue |
| except: |
| continue |
| # try: |
| # intf = pcapdnet.dnet.intf().get_dst(pcapdnet.dnet.addr(type=2, addrtxt=dest)) |
| # except OSError: |
| # log_loading.warning("Building Scapy's routing table: Couldn't get outgoing interface for destination %s", dest) |
| # continue |
| dest, mask = line[1].split('/') |
| ip = "127.0.0.1" if line[0] == "1" else iface.ip # Force loopback on iface 1 |
| if not line[4].strip(): # InterfaceMetric is not available. Load it from netsh |
| if not if4_metrics: |
| if4_metrics = _get_metrics() |
| metric = int(line[3]) + if4_metrics.get(iface.win_index, 0) # RouteMetric + InterfaceMetric |
| else: |
| metric = int(line[3]) + int(line[4]) # RouteMetric + InterfaceMetric |
| routes.append((atol(dest), itom(int(mask)), |
| line[2], iface, ip, metric)) |
| return routes |
| |
| ############ |
| ### IPv6 ### |
| ############ |
| |
| def in6_getifaddr(): |
| """ |
| Returns all IPv6 addresses found on the computer |
| """ |
| ifaddrs = [] |
| for ifaddr in in6_getifaddr_raw(): |
| try: |
| ifaddrs.append((ifaddr[0], ifaddr[1], dev_from_pcapname(ifaddr[2]))) |
| except ValueError: |
| pass |
| # Appends Npcap loopback if available |
| if conf.use_npcap and scapy.consts.LOOPBACK_INTERFACE: |
| ifaddrs.append(("::1", 0, scapy.consts.LOOPBACK_INTERFACE)) |
| return ifaddrs |
| |
| def _append_route6(routes, dpref, dp, nh, iface, lifaddr, metric): |
| cset = [] # candidate set (possible source addresses) |
| if iface.name == scapy.consts.LOOPBACK_NAME: |
| if dpref == '::': |
| return |
| cset = ['::1'] |
| else: |
| devaddrs = (x for x in lifaddr if x[2] == iface) |
| cset = construct_source_candidate_set(dpref, dp, devaddrs) |
| if not cset: |
| return |
| # APPEND (DESTINATION, NETMASK, NEXT HOP, IFACE, CANDIDATS, METRIC) |
| routes.append((dpref, dp, nh, iface, cset, metric)) |
| |
| def _read_routes6_post2008(): |
| routes6 = [] |
| # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed |
| # Get-NetRoute -AddressFamily IPV6 | select ifIndex, DestinationPrefix, NextHop | fl |
| lifaddr = in6_getifaddr() |
| for line in exec_query(['Get-NetRoute', '-AddressFamily IPV6'], ['ifIndex', 'DestinationPrefix', 'NextHop', 'RouteMetric', 'InterfaceMetric']): |
| try: |
| if_index = line[0] |
| iface = dev_from_index(if_index) |
| except: |
| continue |
| |
| dpref, dp = line[1].split('/') |
| dp = int(dp) |
| nh = line[2] |
| metric = int(line[3])+int(line[4]) |
| |
| _append_route6(routes6, dpref, dp, nh, iface, lifaddr, metric) |
| return routes6 |
| |
| def _read_routes6_7(): |
| # Not supported in powershell, we have to use netsh |
| routes = [] |
| query_cmd = "netsh interface ipv6 show route level=verbose" |
| stdout = POWERSHELL_PROCESS.query([query_cmd]) |
| lifaddr = in6_getifaddr() |
| if6_metrics = _get_metrics(ipv6=True) |
| # Define regexes |
| r_int = [".*:\s+(\d+)"] |
| r_all = ["(.*)"] |
| r_ipv6 = [".*:\s+([A-z|0-9|:]+(\/\d+)?)"] |
| # Build regex list for each object |
| regex_list = r_ipv6*2 + r_int + r_all*3 + r_int + r_all*3 |
| current_object = [] |
| index = 0 |
| for l in stdout: |
| if not l.strip(): |
| if not current_object: |
| continue |
| |
| if len(current_object) == len(regex_list): |
| try: |
| if_index = current_object[2] |
| iface = dev_from_index(if_index) |
| except: |
| current_object = [] |
| index = 0 |
| continue |
| _ip = current_object[0].split("/") |
| dpref = _ip[0] |
| dp = int(_ip[1]) |
| _match = re.search(r_ipv6[0], current_object[3]) |
| nh = "::" |
| if _match: # Detect if Next Hop is specified (if not, it will be the IFName) |
| _nhg1 = _match.group(1) |
| nh = _nhg1 if re.match(".*:.*:.*", _nhg1) else "::" |
| metric = int(current_object[6]) + if6_metrics.get(if_index, 0) |
| _append_route6(routes, dpref, dp, nh, iface, lifaddr, metric) |
| |
| # Reset current object |
| current_object = [] |
| index = 0 |
| else: |
| pattern = re.compile(regex_list[index]) |
| match = re.search(pattern, l) |
| if match: |
| current_object.append(match.group(1)) |
| index = index + 1 |
| return routes |
| |
| def read_routes6(): |
| routes6 = [] |
| if not conf.prog.os_access: |
| return routes6 |
| try: |
| if is_new_release(): |
| routes6 = _read_routes6_post2008() |
| else: |
| routes6 = _read_routes6_7() |
| except Exception as e: |
| warning("Error building scapy IPv6 routing table : %s", e, onlyOnce=True) |
| return routes6 |
| |
| def get_working_if(): |
| try: |
| # return the interface associated with the route with smallest |
| # mask (route by default if it exists) |
| return min(conf.route.routes, key=lambda x: x[1])[3] |
| except ValueError: |
| # no route |
| return scapy.consts.LOOPBACK_INTERFACE |
| |
| def _get_valid_guid(): |
| if scapy.consts.LOOPBACK_INTERFACE: |
| return scapy.consts.LOOPBACK_INTERFACE.guid |
| else: |
| for i in six.itervalues(IFACES): |
| if not i.is_invalid(): |
| return i.guid |
| |
| def route_add_loopback(routes=None, ipv6=False, iflist=None): |
| """Add a route to 127.0.0.1 and ::1 to simplify unit tests on Windows""" |
| if not WINDOWS: |
| warning("Not available") |
| return |
| warning("This will completly mess up the routes. Testing purpose only !") |
| # Add only if some adpaters already exist |
| if ipv6: |
| if not conf.route6.routes: |
| return |
| else: |
| if not conf.route.routes: |
| return |
| data = { |
| 'name': scapy.consts.LOOPBACK_NAME, |
| 'description': "Loopback", |
| 'win_index': -1, |
| 'guid': _get_valid_guid(), |
| 'invalid': False, |
| 'mac': '00:00:00:00:00:00', |
| } |
| data['pcap_name'] = six.text_type("\\Device\\NPF_" + data['guid']) |
| adapter = NetworkInterface(data) |
| adapter.ip = "127.0.0.1" |
| if iflist: |
| iflist.append(adapter.pcap_name) |
| return |
| # Remove all LOOPBACK_NAME routes |
| for route in list(conf.route.routes): |
| iface = route[3] |
| if iface.name == scapy.consts.LOOPBACK_NAME: |
| conf.route.routes.remove(route) |
| # Remove LOOPBACK_NAME interface |
| for devname, iface in list(IFACES.items()): |
| if iface.name == scapy.consts.LOOPBACK_NAME: |
| IFACES.pop(devname) |
| # Inject interface |
| IFACES["{0XX00000-X000-0X0X-X00X-00XXXX000XXX}"] = adapter |
| scapy.consts.LOOPBACK_INTERFACE = adapter |
| if isinstance(conf.iface, NetworkInterface): |
| if conf.iface.name == LOOPBACK_NAME: |
| conf.iface = adapter |
| if isinstance(conf.iface6, NetworkInterface): |
| if conf.iface6.name == LOOPBACK_NAME: |
| conf.iface6 = adapter |
| # Build the packed network addresses |
| loop_net = struct.unpack("!I", socket.inet_aton("127.0.0.0"))[0] |
| loop_mask = struct.unpack("!I", socket.inet_aton("255.0.0.0"))[0] |
| # Build the fake routes |
| loopback_route = (loop_net, loop_mask, "0.0.0.0", adapter, "127.0.0.1", 1) |
| loopback_route6 = ('::1', 128, '::', adapter, ["::1"], 1) |
| loopback_route6_custom = ("fe80::", 128, "::", adapter, ["::1"], 1) |
| if routes == None: |
| # Injection |
| conf.route6.routes.append(loopback_route6) |
| conf.route6.routes.append(loopback_route6_custom) |
| conf.route.routes.append(loopback_route) |
| # Flush the caches |
| conf.route6.invalidate_cache() |
| conf.route.invalidate_cache() |
| else: |
| if ipv6: |
| routes.append(loopback_route6) |
| routes.append(loopback_route6_custom) |
| else: |
| routes.append(loopback_route) |
| |
| |
| if not conf.use_winpcapy: |
| |
| class NotAvailableSocket(SuperSocket): |
| desc = "wpcap.dll missing" |
| def __init__(self, *args, **kargs): |
| raise RuntimeError("Sniffing and sending packets is not available: " |
| "winpcap is not installed") |
| |
| conf.L2socket = NotAvailableSocket |
| conf.L2listen = NotAvailableSocket |
| conf.L3socket = NotAvailableSocket |