blob: 5b9f41c848674b2fc30863c725c5cdb84636df20 [file] [log] [blame]
% Regression tests for Scapy
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Information on Scapy
= Setup
def expect_exception(e, c):
try:
c()
return False
except e:
return True
= Get conf
~ conf command
* Dump the current configuration
conf
IP().src
conf.loopback_name
= Test module version detection
~ conf
class FakeModule(object):
__version__ = "v1.12"
class FakeModule2(object):
__version__ = "5.143.3.12"
class FakeModule3(object):
__version__ = "v2.4.2.dev42"
from scapy.config import _version_checker
assert _version_checker(FakeModule, (1,11,5))
assert not _version_checker(FakeModule, (1,13))
assert _version_checker(FakeModule2, (5, 1))
assert not _version_checker(FakeModule2, (5, 143, 4))
assert _version_checker(FakeModule3, (2, 4, 2))
= Check Scapy version
import mock
import scapy
from scapy import _parse_tag, _version_from_git_describe
from scapy.config import _version_checker
b = Bunch(returncode=0, communicate=lambda *args, **kargs: (b"v2.4.5rc1-261-g44b98e14", None))
with mock.patch('scapy.subprocess.Popen', return_value=b):
with mock.patch('scapy.os.path.isdir', return_value=True):
class GitModuleScapy(object):
__version__ = _version_from_git_describe()
# GH3847
with mock.patch('scapy.subprocess.Popen', return_value=b):
with mock.patch('scapy.os.path.isdir', return_value=False):
try:
_version_from_git_describe()
assert False
except ValueError:
pass
assert GitModuleScapy.__version__ == '2.4.5rc1.dev261'
assert _version_checker(GitModuleScapy, (2, 4, 5))
= List layers
~ conf command
ls()
= List layers - advanced
~ conf command
with ContextManagerCaptureOutput() as cmco:
ls("IP", case_sensitive=True)
result_ls = cmco.get_output().split("\n")
assert all("IP" in x for x in result_ls if x.strip())
assert len(result_ls) >= 3
= List packet fields - ls
~ command
with ContextManagerCaptureOutput() as cmco:
ls(ARP(hwsrc="aa:aa:aa:aa:aa:aa", psrc="1.1.1.1"))
result_ls = cmco.get_output().split("\n")
result_ls
assert result_ls[5] == "hwsrc : MultipleTypeField (SourceMACField, StrFixedLenField) = 'aa:aa:aa:aa:aa:aa' ('None')"
assert result_ls[6] == "psrc : MultipleTypeField (SourceIPField, SourceIP6Field, StrFixedLenField) = '1.1.1.1' ('None')"
= List commands
~ conf command
lsc()
= List contribs
~ command
def test_list_contrib():
with ContextManagerCaptureOutput() as cmco:
list_contrib()
result_list_contrib = cmco.get_output()
assert "http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib
assert len(result_list_contrib.split('\n')) > 40
test_list_contrib()
= Test packet show() on LatexTheme
% with LatexTheme
class SmallPacket(Packet):
fields_desc = [ByteField("a", 0)]
conf_color_theme = conf.color_theme
conf.color_theme = LatexTheme()
pkt = SmallPacket()
with ContextManagerCaptureOutput() as cmco:
pkt.show()
result = cmco.get_output().strip()
assert result == '\\#\\#\\#[ \\textcolor{red}{\\bf SmallPacket} ]\\#\\#\\#\n \\textcolor{blue}{a} = \\textcolor{purple}{0}'
conf.color_theme = conf_color_theme
= Test automatic doc generation
~ command
dat = rfc(IP, ret=True).split("\n")
assert dat[0].replace(" ", "").strip() == "0123"
assert "0123456789" in dat[1].replace(" ", "")
for l in dat:
# only upper case and +-
assert re.match(r"[A-Z+-]*", l)
# Add a space before each + to avoid conflicts with UTscapy !
# They will be stripped below
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|VERSION| IHL | TOS | LEN |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ID |FLAGS| FRAG |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TTL | PROTO | CHKSUM |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| DST |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| OPTIONS |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. IP
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(IP, ret=True).strip().split("\n")]
assert result == output
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| CODE | ID | LEN |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TYPE |L|M|S|RES|VERSI| MESSAGE LEN |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| | DATA |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. EAP_TTLS
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(EAP_TTLS, ret=True).strip().split("\n")]
assert result == output
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|VERSION| TC | FL |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PLEN | NH | HLIM |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SRC |
+ +
| |
+ +
| |
+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| DST |
+ +
| |
+ +
| |
+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. IPv6
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(IPv6, ret=True).strip().split("\n")]
assert result == output
= Check that all contrib modules are well-configured
~ command
list_contrib(_debug=True)
= Configuration
~ conf
conf.debug_dissector = True
= Configuration conf.use_* LINUX
~ linux
try:
conf.use_bpf = True
assert False
except:
True
assert not conf.use_bpf
= Configuration conf.use_* WINDOWS
~ windows
try:
conf.use_bpf = True
assert False
except:
True
assert not conf.use_bpf
= Configuration conf.use_pcap
~ linux libpcap
if not conf.use_pcap:
assert not conf.iface.provider.libpcap
conf.use_pcap = True
assert conf.iface.provider.libpcap
for iface in conf.ifaces.values():
assert iface.provider.libpcap or iface.is_valid() == False
conf.use_pcap = False
assert not conf.iface.provider.libpcap
= Test layer filtering
~ filter
pkt = NetflowHeader()/NetflowHeaderV5()/NetflowRecordV5()
conf.layers.filter([NetflowHeader, NetflowHeaderV5])
assert NetflowRecordV5 not in NetflowHeader(bytes(pkt))
conf.layers.unfilter()
assert NetflowRecordV5 in NetflowHeader(bytes(pkt))
###########
###########
= UTscapy route check
* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise
p = IP().src
p
assert p == "127.0.0.1"
############
############
+ Scapy functions tests
= Interface related functions
import mock
conf.iface
get_if_raw_hwaddr(conf.iface)
bytes_hex(get_if_raw_addr(conf.iface))
def get_dummy_interface():
"""Returns a dummy network interface"""
conf.ifaces._add_fake_iface("dummy0")
return "dummy0"
get_if_raw_addr(get_dummy_interface())
get_if_list()
get_working_if()
get_if_raw_addr6(conf.iface)
if conf.use_bpf:
addr = u"lladdr 29:0b:c2:ff:fe:53:21:e9\n"
b = Bunch(returncode=0, communicate=lambda *args, **kargs: (addr, None))
with mock.patch('scapy.arch.bpf.core.subprocess.Popen', return_value=b) as popen:
try:
scapy.arch.bpf.core.get_if_raw_hwaddr("fw0")
assert False
except Scapy_Exception:
assert True
= More Interfaces related functions
# Test name resolution
old = conf.iface
conf.iface = conf.iface.name
assert conf.iface == old
assert isinstance(conf.iface, NetworkInterface)
assert conf.iface.is_valid()
import mock
@mock.patch("scapy.interfaces.conf.route.routes", [])
@mock.patch("scapy.interfaces.conf.ifaces.values")
def _test_get_working_if(rou):
rou.side_effect = lambda: []
assert get_working_if() == conf.loopback_name
assert conf.iface + "a" # left +
assert "hey! are you, ready to go ? %s" % conf.iface # format
assert "cuz you know the way to go" + conf.iface # right +
_test_get_working_if()
= Test conf.ifaces
conf.iface
conf.ifaces
assert conf.iface in conf.ifaces.values()
assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface
assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface
conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})}
with ContextManagerCaptureOutput() as cmco:
conf.ifaces.show()
output = cmco.get_output()
data = """
Source Index Name MAC IPv4 IPv6
Unknown 0 a aa:aa:aa:aa:aa:aa 127.0.0.1 ::1
127.0.0.2 ::2
""".strip()
output = [x.strip() for x in output.strip().split("\n")]
data = [x.strip() for x in data.strip().split("\n")]
assert output == data
conf.ifaces.reload()
= Test extcap detection in conf.ifaces
~ linux extcap
import os
from scapy.libs.extcap import load_extcap
_bkp_extcap = conf.prog.extcap_folders
_bkp_providers = conf.ifaces.providers.copy()
conf.ifaces.providers.clear()
# Create some sort of extcap parody program
extcapfld = get_temp_dir()
extcapprog = os.path.join(extcapfld, "runner.sh")
data = """#!/usr/bin/env python3
import struct
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--extcap-interfaces', action='store_true')
parser.add_argument('--capture', action='store_true')
parser.add_argument('--extcap-config', action='store_true')
parser.add_argument('--scan-follow-rsp', action='store_true')
parser.add_argument('--scan-follow-aux', action='store_true')
parser.add_argument('--extcap-interface', type=str)
parser.add_argument('--fifo', type=str)
args = parser.parse_args()
if args.extcap_interfaces:
# List interfaces
print(bytes.fromhex("0a657874636170207b76657273696f6e3d342e312e317d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d7b68656c703d68747470733a2f2f7777772e6e6f7264696373656d692e636f6d2f536f6674776172652d616e642d546f6f6c732f446576656c6f706d656e742d546f6f6c732f6e52462d536e69666665722d666f722d426c7565746f6f74682d4c457d0a696e74657266616365207b76616c75653d2f6465762f747479555342352d4e6f6e657d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d0a636f6e74726f6c207b6e756d6265723d307d7b747970653d73656c6563746f727d7b646973706c61793d4465766963657d7b746f6f6c7469703d446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d317d7b747970653d73656c6563746f727d7b646973706c61793d4b65797d7b746f6f6c7469703d7d0a636f6e74726f6c207b6e756d6265723d327d7b747970653d737472696e677d7b646973706c61793d56616c75657d7b746f6f6c7469703d3620646967697420706173736b6579206f72203136206f7220333220627974657320656e6372797074696f6e206b657920696e2068657861646563696d616c207374617274696e67207769746820273078272c2062696720656e6469616e20666f726d61742e49662074686520656e7465726564206b65792069732073686f72746572207468616e203136206f722033322062797465732c2069742077696c6c206265207a65726f2d70616464656420696e2066726f6e74277d7b76616c69646174696f6e3d5c625e28285b302d395d7b367d297c2830785b302d39612d66412d465d7b312c36347d297c285b302d39412d46612d665d7b327d5b3a2d5d297b357d285b302d39412d46612d665d7b327d2920287075626c69637c72616e646f6d2929245c627d0a636f6e74726f6c207b6e756d6265723d337d7b747970653d737472696e677d7b646973706c61793d41647620486f707d7b64656661756c743d33372c33382c33397d7b746f6f6c7469703d4164766572746973696e67206368616e6e656c20686f702073657175656e63652e204368616e676520746865206f7264657220696e2077686963682074686520736e6966666572207377697463686573206164766572746973696e67206368616e6e656c732e2056616c6964206368616e6e656c73206172652033372c20333820616e642033392073657061726174656420627920636f6d6d612e7d7b76616c69646174696f6e3d5e5c732a282833377c33387c3339295c732a2c5c732a297b302c327d2833377c33387c3339297b317d5c732a247d7b72657175697265643d747275657d0a636f6e74726f6c207b6e756d6265723d377d7b747970653d627574746f6e7d7b646973706c61793d436c6561727d7b746f6f6c746f703d436c656172206f722072656d6f7665206465766963652066726f6d20446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d347d7b747970653d627574746f6e7d7b726f6c653d68656c707d7b646973706c61793d48656c707d7b746f6f6c7469703d416363657373207573657220677569646520286c61756e636865732062726f77736572297d0a636f6e74726f6c207b6e756d6265723d357d7b747970653d627574746f6e7d7b726f6c653d726573746f72657d7b646973706c61793d44656661756c74737d7b746f6f6c7469703d52657365747320746865207573657220696e7465726661636520616e6420636c6561727320746865206c6f672066696c657d0a636f6e74726f6c207b6e756d6265723d367d7b747970653d627574746f6e7d7b726f6c653d6c6f676765727d7b646973706c61793d4c6f677d7b746f6f6c7469703d4c6f672070657220696e746572666163657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d207d7b646973706c61793d416c6c206164766572746973696e6720646576696365737d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d5b30302c30302c30302c30302c30302c30302c305d7d7b646973706c61793d466f6c6c6f772049524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d307d7b646973706c61793d4c656761637920506173736b65797d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d317d7b646973706c61793d4c6567616379204f4f4220646174617d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d327d7b646973706c61793d4c6567616379204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d337d7b646973706c61793d5343204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d347d7b646973706c61793d53432050726976617465204b65797d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d357d7b646973706c61793d49524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d367d7b646973706c61793d416464204c4520616464726573737d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d377d7b646973706c61793d466f6c6c6f77204c4520616464726573737d").decode())
elif args.extcap_interface and args.extcap_config:
# List config
print(bytes.fromhex("617267207b6e756d6265723d307d7b63616c6c3d2d2d6f6e6c792d6164766572746973696e677d7b646973706c61793d4f6e6c79206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d317d7b63616c6c3d2d2d6f6e6c792d6c65676163792d6164766572746973696e677d7b646973706c61793d4f6e6c79206c6567616379206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206c6567616379206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d327d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d7273707d7b646973706c61793d46696e64207363616e20726573706f6e736520646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f77207363616e20726571756573747320616e64207363616e20726573706f6e73657320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d6175787d7b646973706c61793d46696e6420617578696c6961727920706f696e74657220646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f772061757820706f696e7465727320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d636f6465647d7b646973706c61793d5363616e20616e6420666f6c6c6f772064657669636573206f6e204c4520436f646564205048597d7b746f6f6c7469703d5363616e20666f72206465766963657320616e6420666f6c6c6f772061647665727469736572206f6e204c4520436f646564205048597d7b747970653d626f6f6c666c61677d7b64656661756c743d66616c73657d7b736176653d747275657d").decode())
elif args.capture and args.extcap_interface and args.fifo:
# Capture
pkts = [
bytes.fromhex("ffffffffffff00000000000008004500001c0001000040117cce7f0000017f0000010035003500080172")
]
with open(args.fifo, "wb", 0) as fd:
# header
fd.write(
struct.pack(
"IHHIIII",
0xa1b2c3d4,
2, 4, 0, 0, 65535, 1
)
)
for pkt in pkts:
fd.write(struct.pack("IIII", 0, 0, len(pkt), len(pkt)))
fd.write(bytes(pkt))
else:
raise ValueError("Bad arguments")
""".strip()
with open(extcapprog, "w") as fd:
fd.write(data)
print(data)
os.chmod(extcapprog, 0o777)
# Inject and load provider
conf.prog.extcap_folders = [extcapfld]
load_extcap()
print(conf.ifaces.providers)
conf.ifaces.reload()
# Now do the tests
iface = conf.ifaces.dev_from_networkname('/dev/ttyUSB5-None')
assert iface.name == "nRF Sniffer for Bluetooth LE"
sock = iface.l2listen()(iface=iface)
pkts = sock.sniff(timeout=2)
sock.close()
assert UDP in pkts[0]
config = iface.get_extcap_config()
assert config["arg"] == [
('0', '--only-advertising', 'Only advertising packets', '', ''),
('1', '--only-legacy-advertising', 'Only legacy advertising packets', '', ''),
('2', '--scan-follow-rsp', 'Find scan response data', 'true', ''),
('3', '--scan-follow-aux', 'Find auxiliary pointer data', 'true', ''),
('3', '--coded', 'Scan and follow devices on LE Coded PHY', 'false', '')
]
# Restore
conf.prog.extcap_folders = _bkp_extcap
conf.ifaces.providers = _bkp_providers
conf.ifaces.reload()
= Test read_routes6() - default output
routes6 = read_routes6()
if WINDOWS:
from scapy.arch.windows import _route_add_loopback
_route_add_loopback(routes6, True)
routes6
# Expected results:
# - one route if there is only the loopback interface
# - one route if IPv6 is supported but disabled on network interfaces
# - three routes if there is a network interface
# - on OpenBSD, only two routes on lo0 are expected
if routes6:
iflist = get_if_list()
if WINDOWS:
from scapy.arch.windows import _route_add_loopback
_route_add_loopback(ipv6=True, iflist=iflist)
if OPENBSD:
len(routes6) >= 2
elif iflist == [conf.loopback_name]:
len(routes6) == 1
elif len(iflist) >= 2:
len(routes6) >= 1
else:
False
else:
# IPv6 seems disabled. Force a route to ::1
conf.route6.routes.append(("::1", 128, "::", conf.loopback_name, ["::1"], 1))
conf.route6.ipv6_ifaces = set([conf.loopback_name])
True
= Build HBHOptUnknown for IPv6ExtHdrHopByHop with disabled autopad
~ ipv6 hbh opt
* Build the HBHOptUnknown of IPv6ExtHdrHopByHop with autopad=0
v6Opt = HBHOptUnknown(otype=3, optlen=7, optdata="Beijing")
pkt = Ether()/IPv6()/IPv6ExtHdrHopByHop(autopad=0, options=[v6Opt, ])
pkt.build()
= Build HBHOptUnknown for IPv6ExtHdrDestOpt with disabled autopad
~ ipv6 hbh opt
* Build the HBHOptUnknown of IPv6ExtHdrDestOpt with autopad=0
v6Opt = HBHOptUnknown(otype=3, optlen=6, optdata="Haikou")
pkt = Ether()/IPv6()/IPv6ExtHdrDestOpt(autopad=0, options=[v6Opt, ])
pkt.build()
= Test read_routes6() - check mandatory routes
conf.route6
# Doesn't pass on Travis Bionic XXX
if len(routes6) > 2 and not WINDOWS:
# Identify routes to fe80::/64
assert sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) >= 1
if not OPENBSD and len(iflist) >= 2:
assert sum(1 for r in routes6 if r[0] == "fe80::" and r[1] == 64) >= 1
try:
# Identify a route to a node IPv6 link-local address
assert sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128) >= 1
except:
# IPv6 is not available, but we still check the loopback
assert conf.route6.route("::/0") == (conf.loopback_name, "::", "::")
assert sum(1 for r in routes6 if r[1] == 128 and r[4] == ["::1"]) >= 1
else:
True
= Test ifchange()
conf.route6.ifchange(conf.loopback_name, "::1/128")
if WINDOWS:
conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff" # Restore fake cache
True
= Packet.route()
assert (Ether() / ARP()).route()[0] is not None
assert (Ether() / ARP()).payload.route()[0] is not None
assert (ARP(ptype=0, pdst="hello. this isn't a valid IP")).route()[0] is None
= plain_str test
data = b"\xffsweet\xef celestia\xab"
assert plain_str(data) == "\\xffsweet\\xef celestia\\xab"
############
############
+ compat.py
= test bytes_hex/hex_bytes
monty_data = b"Stop! Who approaches the Bridge of Death must answer me these questions three, 'ere the other side he see."
hex_data = bytes_hex(monty_data)
assert hex_data == b'53746f70212057686f20617070726f61636865732074686520427269646765206f66204465617468206d75737420616e73776572206d65207468657365207175657374696f6e732074687265652c202765726520746865206f746865722073696465206865207365652e'
assert hex_bytes(hex_data) == monty_data
= orb/chb
assert orb(b"\x01"[0]) == 1
assert chb(1) == b"\x01"
############
############
+ Main.py tests
= Pickle and unpickle a packet
import pickle
a = IP(dst="192.168.0.1")/UDP()
b = pickle.dumps(a)
c = pickle.loads(b)
assert c[IP].dst == "192.168.0.1"
assert raw(c) == raw(a)
= Usage test
from scapy.main import _usage
try:
_usage()
assert False
except SystemExit:
assert True
= Session test
import builtins
# This is automatic when using the console
def get_var(var):
return builtins.__dict__["scapy_session"][var]
def set_var(var, value):
builtins.__dict__["scapy_session"][var] = value
def del_var(var):
del builtins.__dict__["scapy_session"][var]
init_session(None, {"init_value": 123})
set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8"
save_session()
del_var("test_value")
load_session()
update_session()
assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8"
assert get_var("init_value") == 123
= Session test with fname
session_name = tempfile.mktemp()
init_session(session_name)
set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1")
save_session(fname="%s.dat" % session_name)
del_var("test_value")
set_var("z", True) #z = True
load_session(fname="%s.dat" % session_name)
try:
get_var("z")
assert False
except:
pass
set_var("z", False) #z = False
update_session(fname="%s.dat" % session_name)
assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1"
assert not get_var("z")
= Clear session files
os.remove("%s.dat" % session_name)
= Test temporary file creation
~ ci_only
scapy_delete_temp_files()
tmpfile = get_temp_file(autoext=".ut")
tmpfile
if WINDOWS:
assert "scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp')
else:
import platform
BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN
assert "scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile)
assert conf.temp_files[0].endswith(".ut")
scapy_delete_temp_files()
assert len(conf.temp_files) == 0
= Emulate interact()
~ interact
import mock, sys
from scapy.main import interact
from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file
_read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART)
# By now .config/scapy/startup.py should have been created
with open(DEFAULT_PRESTART_FILE, "r") as fd:
OLD_DEFAULT_PRESTART = fd.read()
with open(DEFAULT_PRESTART_FILE, "w+") as fd:
fd.write("conf.interactive_shell = 'ipython'")
# Detect IPython
try:
import IPython
except:
code_interact_import = "scapy.main.code.interact"
else:
code_interact_import = "IPython.start_ipython"
@mock.patch(code_interact_import)
def interact_emulator(code_int, extra_args=[]):
try:
code_int.side_effect = lambda *args, **kwargs: lambda *args, **kwargs: None
interact(argv=["-s scapy1"] + extra_args, mybanner="What a test")
finally:
sys.ps1 = ">>> "
interact_emulator() # Default
try:
interact_emulator(extra_args=["-?"]) # Failing
assert False
except:
pass
interact_emulator(extra_args=["-d"]) # Extended
= Emulate interact() and test startup.py with ptpython
~ interact
import sys
import mock
from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file
_read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART)
# By now .config/scapy/startup.py should have been created
with open(DEFAULT_PRESTART_FILE, "w+") as fd:
fd.write("conf.interactive_shell = 'ptpython'")
called = []
def checker(*args, **kwargs):
locals = kwargs.pop("locals")
assert locals["IP"]
history_filename = kwargs.pop("history_filename")
assert history_filename == conf.histfile
called.append(True)
ptpython_mocked_module = Bunch(
repl=Bunch(
embed=checker
)
)
modules_patched = {
"ptpython": ptpython_mocked_module,
"ptpython.repl": ptpython_mocked_module.repl,
"ptpython.repl.embed": ptpython_mocked_module.repl.embed,
}
with mock.patch.dict("sys.modules", modules_patched):
try:
interact()
finally:
sys.ps1 = ">>> "
# Restore
with open(DEFAULT_PRESTART_FILE, "w") as fd:
print(OLD_DEFAULT_PRESTART)
r = fd.write(OLD_DEFAULT_PRESTART)
assert called
= Test explore() with GUI mode
~ command
import mock
def test_explore_gui(is_layer, layer):
prompt_toolkit_mocked_module = Bunch(
shortcuts=Bunch(
dialogs=Bunch(
radiolist_dialog=(lambda *args, **kargs: layer),
button_dialog=(lambda *args, **kargs: "layers" if is_layer else "contribs")
)
),
formatted_text=Bunch(HTML=lambda x: x),
__version__="2.0.0"
)
# a mock.patch isn't enough to mock a module. Let's roll sys.modules
modules_patched = {
"prompt_toolkit": prompt_toolkit_mocked_module,
"prompt_toolkit.shortcuts": prompt_toolkit_mocked_module.shortcuts,
"prompt_toolkit.shortcuts.dialogs": prompt_toolkit_mocked_module.shortcuts.dialogs,
"prompt_toolkit.formatted_text": prompt_toolkit_mocked_module.formatted_text,
}
with mock.patch.dict("sys.modules", modules_patched):
with ContextManagerCaptureOutput() as cmco:
explore()
result_explore = cmco.get_output()
return result_explore
conf.interactive = True
explore_dns = test_explore_gui(True, "scapy.layers.dns")
assert "DNS" in explore_dns
assert "DNS Question Record" in explore_dns
assert "DNSRRNSEC3" in explore_dns
assert "DNS TSIG Resource Record" in explore_dns
explore_avs = test_explore_gui(False, "avs")
assert "AVSWLANHeader" in explore_avs
assert "AVS WLAN Monitor Header" in explore_avs
= Test explore() with non-GUI mode
~ command
def test_explore_non_gui(layer):
with ContextManagerCaptureOutput() as cmco:
explore(layer)
result_explore = cmco.get_output()
return result_explore
explore_dns = test_explore_non_gui("scapy.layers.dns")
assert "DNS" in explore_dns
assert "DNS Question Record" in explore_dns
assert "DNSRRNSEC3" in explore_dns
assert "DNS TSIG Resource Record" in explore_dns
explore_avs = test_explore_non_gui("avs")
assert "AVSWLANHeader" in explore_avs
assert "AVS WLAN Monitor Header" in explore_avs
assert test_explore_non_gui("scapy.layers.dns") == test_explore_non_gui("dns")
assert test_explore_non_gui("scapy.contrib.avs") == test_explore_non_gui("avs")
try:
explore("unknown_module")
assert False # The previous should have raised an exception
except Scapy_Exception:
pass
= Test load_contrib overwrite
load_contrib("gtp")
assert GTPHeader.__module__ == "scapy.contrib.gtp"
load_contrib("gtp_v2")
assert GTPHeader.__module__ == "scapy.contrib.gtp_v2"
load_contrib("gtp")
assert GTPHeader.__module__ == "scapy.contrib.gtp"
= Test load_contrib failure
try:
load_contrib("doesnotexist")
assert False
except:
pass
= Test sane function
sane("A\x00\xFFB") == "A..B"
= Test lhex function
assert lhex(42) == "0x2a"
assert lhex((28,7)) == "(0x1c, 0x7)"
assert lhex([28,7]) == "[0x1c, 0x7]"
= Test restart function
import mock
conf.interactive = True
try:
from scapy.utils import restart
import os
@mock.patch("os.execv")
@mock.patch("subprocess.call")
@mock.patch("os._exit")
def _test(e, m, m2):
def check(x, y=[]):
z = [x] + y if not isinstance(x, list) else x + y
assert os.path.isfile(z[0])
assert os.path.isfile(z[1])
return 0
m2.side_effect = check
m.side_effect = check
e.side_effect = lambda x: None
restart()
_test()
finally:
conf.interactive = False
= Test linehexdump function
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == 'FF FF FF FF FF FF 00 01 02 03 04 05 90 00 ..............'
conf.color_theme = conf_color_theme
= Test chexdump function
chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00"
= Test repr_hex function
repr_hex("scapy") == "7363617079"
= Test hexstr function
hexstr(b"A\x00\xFFB") == "41 00 FF 42 A..B"
= Test fletcher16 functions
assert fletcher16_checksum(b"\x28\x07") == 22319
assert fletcher16_checkbytes(b"\x28\x07", 1) == b"\xaf("
= Test hexdiff function
~ not_pypy
def test_hexdiff(a, b, algo=None, autojunk=False):
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
with ContextManagerCaptureOutput() as cmco:
hexdiff(a, b, algo=algo, autojunk=autojunk)
result_hexdiff = cmco.get_output()
conf.interactive = True
conf.color_theme = conf_color_theme
return result_hexdiff
# Basic string test
result_hexdiff = test_hexdiff("abcde", "abCde")
expected = "0000 61 62 63 64 65 abcde\n"
expected += " 0000 61 62 43 64 65 abCde\n"
assert result_hexdiff == expected
# More advanced string test
result_hexdiff = test_hexdiff("add_common_", "_common_removed")
expected = "0000 61 64 64 5F 63 6F 6D 6D 6F 6E 5F add_common_ \n"
expected += " -003 5F 63 6F 6D 6D 6F 6E 5F 72 65 6D 6F 76 _common_remov\n"
expected += " 000d 65 64 ed\n"
assert result_hexdiff == expected
# Compare packets
result_hexdiff = test_hexdiff(IP(dst="127.0.0.1", src="127.0.0.1"), IP(dst="127.0.0.2", src="127.0.0.1"))
expected = "0000 45 00 00 14 00 01 00 00 40 00 7C E7 7F 00 00 01 E.......@.|.....\n"
expected += " 0000 45 00 00 14 00 01 00 00 40 00 7C E6 7F 00 00 01 E.......@.|.....\n"
expected += "0010 7F 00 00 01 ....\n"
expected += " 0010 7F 00 00 02 ....\n"
assert result_hexdiff == expected
# Compare using difflib
a = "A" * 1000 + "findme" + "B" * 1000
b = "A" * 1000 + "B" * 1000
ret1 = test_hexdiff(a, b, algo="difflib")
ret2 = test_hexdiff(a, b, algo="difflib", autojunk=True)
expected_ret1 = """
03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA
03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB
03e0 41 41 41 41 41 41 41 41 42 42 AAAAAAAA BB
03ea 03ea 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB
"""
expected_ret2 = """
03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA
03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB
03e0 41 41 41 41 41 41 41 41 42 42 42 42 42 42 42 42 AAAAAAAABBBBBBBB
03f0 03f0 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB
"""
assert ret1 != ret2
assert expected_ret1 in ret1
assert expected_ret2 in ret2
# Test corner cases that should not crash
hexdiff(b"abc", IP() / TCP())
hexdiff(IP() / TCP(), b"abc")
= Test mysummary functions - Ether
p = Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000)
p
assert p.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop
for loop in ['0x9000', 'LOOP']]
= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12']
= Test strand function
assert strand("AC", "BC") == b'@C'
= Test export_object and import_object functions
import mock
def test_export_import_object():
with ContextManagerCaptureOutput() as cmco:
export_object(2807)
result_export_object = cmco.get_output(eval_bytes=True)
assert result_export_object.startswith("eNprYPL9zqUHAAdrAf8=")
assert import_object(result_export_object) == 2807
test_export_import_object()
= Test tex_escape function
tex_escape("$#_") == "\\$\\#\\_"
= Test colgen function
f = colgen(range(3))
assert len([next(f) for i in range(2)]) == 2
= Test incremental_label function
f = incremental_label()
assert [next(f) for i in range(2)] == ["tag00000", "tag00001"]
= Test corrupt_* functions
import random
random.seed(0x2807)
assert corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"]
assert sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"]
assert corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"]
assert sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"]
= Test save_object and load_object functions
import tempfile
fd, fname = tempfile.mkstemp()
save_object(fname, 2807)
assert load_object(fname) == 2807
= Test whois function
~ netaccess
if not WINDOWS:
result = whois("193.0.6.139")
assert b"inetnum" in result and b"Amsterdam" in result
= Test manuf DB methods
~ manufdb
assert conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03"
assert conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next"
assert in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next")
assert conf.manufdb.lookup("00:00:0F:01:02:03") == ('Next', 'Next, Inc.')
assert "00:00:0F" in conf.manufdb.reverse_lookup("Next")
= Test multiple wireshark's manuf formats
~ manufdb
new_format = """
# comment
00:00:00 JokyIsland Joky Insland Corp SA
00:01:12 SecdevCorp Secdev Corporation SA LLC
EE:05:01 Scapy Scapy CO LTD & CIE
FF:00:11 NoName
"""
old_format = """
# comment
00:00:00 JokyIsland # Joky Insland Corp SA
00:01:12 SecdevCorp # Secdev Corporation SA LLC
EE:05:01 Scapy # Scapy CO LTD & CIE
FF:00:11 NoName
"""
manuf1 = get_temp_file()
manuf2 = get_temp_file()
with open(manuf1, "w") as w:
w.write(old_format)
with open(manuf2, "w") as w:
w.write(new_format)
a = load_manuf(manuf1)
b = load_manuf(manuf2)
a.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA'),
a.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName')
a.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')}
a.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')}
b.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA'),
b.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName')
b.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')}
b.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')}
scapy_delete_temp_files()
= Test load_services
data_services = """
itu-bicc-stc 3097/sctp
cvsup 5999/udp # CVSup
x11 6000-6063/tcp # X Window System
x11 6000-6063/udp # X Window System
ndl-ahp-svc 6064/tcp # NDL-AHP-SVC
"""
services = get_temp_file()
with open(services, "w") as w:
w.write(data_services)
tcp, udp, sctp = load_services(services)
assert tcp[6002] == "x11"
assert tcp.ndl_ahp_svc == 6064
assert tcp.x11 in range(6000, 6093)
assert udp[6002] == "x11"
assert udp.x11 in range(6000, 6093)
assert udp.cvsup == 5999
assert sctp[3097] == "itu_bicc_stc"
assert sctp.itu_bicc_stc == 3097
scapy_delete_temp_files()
= Test utility functions - network related
~ netaccess
assert atol("1.1.1.1") == 0x1010101
assert atol("192.168.0.1") == 0xc0a80001
= Test autorun functions
~ autorun
ret = autorun_get_text_interactive_session("IP().src")
ret
assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1')
ret = autorun_get_html_interactive_session("IP().src")
ret
assert ret == ("<span class=prompt>&gt;&gt;&gt; </span>IP().src\n'127.0.0.1'\n", '127.0.0.1')
ret = autorun_get_latex_interactive_session("IP().src")
ret
assert ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1')
ret = autorun_get_text_interactive_session("scapy_undefined")
assert "NameError" in ret[0]
= Test autorun with logging
cmds = """log_runtime.info(hex_bytes("446166742050756e6b"))\n"""
ret = autorun_get_text_interactive_session(cmds)
ret
assert "Daft Punk" in ret[0]
= Test utility TEX functions
assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}"
a = colgen(1, 2, 3)
assert next(a) == (1, 2, 2)
assert next(a) == (1, 3, 3)
assert next(a) == (2, 2, 1)
assert next(a) == (2, 3, 2)
assert next(a) == (2, 1, 3)
assert next(a) == (3, 3, 1)
assert next(a) == (3, 1, 2)
assert next(a) == (3, 2, 3)
= Test config file functions
saved_conf_verb = conf.verb
fd, fname = tempfile.mkstemp()
os.write(fd, b"conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname, globals(), locals())
assert conf.verb == 42
conf.verb = saved_conf_verb
= Test config file functions failures
from scapy.main import _read_config_file, _probe_config_file
assert _read_config_file(_probe_config_file("filethatdoesnotexistnorwillever.tsppajfsrdrr")) is None
= Test CacheInstance repr
conf.netcache
= Test pyx detection functions
from mock import patch
def _r(*args, **kwargs):
raise OSError
with patch("scapy.libs.test_pyx.subprocess.check_call", _r):
from scapy.libs.test_pyx import _test_pyx
assert _test_pyx() == False
= Test matplotlib detection functions
from mock import MagicMock, patch
bck_scapy_libs_matplot = sys.modules.get("scapy.libs.matplot", None)
if bck_scapy_libs_matplot:
del sys.modules["scapy.libs.matplot"]
mock_matplotlib = MagicMock()
mock_matplotlib.get_backend.return_value = "inline"
mock_matplotlib.pyplot = MagicMock()
mock_matplotlib.pyplot.plt = None
with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}):
from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS, Line2D
assert MATPLOTLIB == 1
assert MATPLOTLIB_INLINED == 1
assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS
mock_matplotlib.get_backend.return_value = "ko"
with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}):
from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS
assert MATPLOTLIB == 1
assert MATPLOTLIB_INLINED == 0
assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS
if bck_scapy_libs_matplot:
sys.modules["scapy.libs.matplot"] = bck_scapy_libs_matplot
############
############
+ Basic tests
* Those test are here mainly to check nothing has been broken
* and to catch Exceptions
= Packet class methods
p = IP()/ICMP()
ret = p.do_build_ps()
assert ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00"
assert len(ret[1]) == 2
assert p[ICMP].firstlayer() == p
assert p.command() == "IP()/ICMP()"
p.decode_payload_as(UDP)
assert p.sport == 2048 and p.dport == 63487
= hide_defaults
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
p = IP(ttl=64)/ICMP()
assert repr(p) in ["<IP frag=0 ttl=64 proto=icmp |<ICMP |>>", "<IP frag=0 ttl=64 proto=1 |<ICMP |>>"]
p.hide_defaults()
assert repr(p) in ["<IP frag=0 proto=icmp |<ICMP |>>", "<IP frag=0 proto=1 |<ICMP |>>"]
conf.color_theme = conf_color_theme
= split_layers
p = IP()/ICMP()
s = raw(p)
split_layers(IP, ICMP, proto=1)
assert Raw in IP(s)
bind_layers(IP, ICMP, frag=0, proto=1)
= fuzz
r = fuzz(IP(tos=2)/ICMP())
assert r.tos == 2
z = r.ttl
assert r.ttl != z
assert r.ttl != z
= fuzz a Packet with MultipleTypeField
fuzz(ARP(pdst="127.0.0.1"))
fuzz(IP()/ARP(pdst='10.0.0.254'))
= fuzz on packets with advanced RandNum
x = IP(dst="8.8.8.8")/fuzz(UDP()/NTP(version=4))
x.show2()
x = IP(raw(x))
assert NTP in x
= fuzz on packets with FlagsField
assert isinstance(fuzz(TCP()).flags, VolatileValue)
= Building some packets
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)
IP().summary()
= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del a.ttl
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert a.copy().time == a.time
a=3
= Bind string array as payload
~ basic
assert bytes(Raw("sca")/"py") == b"scapy"
assert bytes(Raw("sca")/b"py") == b"scapy"
assert bytes(Raw("sca")/bytearray(b"py")) == b"scapy"
assert bytes("sca"/Raw("py")) == b"scapy"
assert bytes(b"sca"/Raw("py")) == b"scapy"
assert bytes(bytearray(b"sca")/Raw("py")) == b"scapy"
a=Raw("sca")
a.add_payload("py")
assert bytes(a) == b"scapy"
a=Raw("sca")
a.add_payload(b"py")
assert bytes(a) == b"scapy"
a=Raw("sca")
a.add_payload(bytearray(b"py"))
assert bytes(a) == b"scapy"
= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
r = a.proto
r
r == 6
= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
r = a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
r
r in ['0x800 64 0x07b 4', 'IPv4 64 0x07b 4']
= sprintf() function
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditional substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
r = a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
r
r == 'flags=S 127.0.0.1'
= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
r = (TCP in x, ICMP in x, IP in x, UDP in x)
r
r == (True,True,True,False)
= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and
x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
x.getlayer(IP,4) == None and
x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
x[IP:4]
except IndexError:
True
else:
False
= getlayer / haslayer with name
~ basic getlayer IP ICMP IPerror TCPerror
x = IP() / ICMP() / IPerror()
assert x.getlayer(ICMP) is not None
assert x.getlayer(IPerror) is not None
assert x.getlayer("IP in ICMP") is not None
assert x.getlayer(TCPerror) is None
assert x.getlayer("TCP in ICMP") is None
assert x.haslayer(ICMP)
assert x.haslayer(IPerror)
assert x.haslayer("IP in ICMP")
assert not x.haslayer(TCPerror)
assert not x.haslayer("TCP in ICMP")
= getlayer with a filter
~ getlayer IP
pkt = IP() / IP(ttl=3) / IP()
assert pkt[IP::{"ttl":3}].ttl == 3
assert pkt.getlayer(IP, ttl=3).ttl == 3
assert IPv6ExtHdrHopByHop(options=[HBHOptUnknown()]).getlayer(HBHOptUnknown, otype=42) is None
= specific haslayer and getlayer implementations for EAP
~ haslayer getlayer EAP
pkt = Ether() / EAPOL() / EAP_MD5()
assert EAP in pkt
assert pkt.haslayer(EAP)
assert isinstance(pkt[EAP], EAP_MD5)
assert isinstance(pkt.getlayer(EAP), EAP_MD5)
= specific haslayer and getlayer implementations for RadiusAttribute
~ haslayer getlayer RadiusAttribute
pkt = RadiusAttr_EAP_Message()
assert RadiusAttribute in pkt
assert pkt.haslayer(RadiusAttribute)
assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message)
assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message)
= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
x=Ether()/IP(version=4)/UDP()
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
assert x != y and x != z and x != t and y != z and y != t and z != t and w == x
= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
assert p1 > p2
assert p2 < p1
assert p1 == p1
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back
= answers - Net
~ netaccess
a1, a2 = Net("www.google.com"), Net("www.secdev.org")
prt1, prt2 = 12345, 54321
s1, s2 = 2767216324, 3845532842
p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
# Not available yet because of IPv6
# a1, a2 = Net6("www.google.com"), Net6("www.secdev.org")
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
= conf.checkIPsrc
conf_checkIPsrc = conf.checkIPsrc
conf.checkIPsrc = 0
query = IP(id=42676, src='10.128.0.7', dst='192.168.0.1')/ICMP(id=26)
answer = IP(src='192.168.48.19', dst='10.128.0.7')/ICMP(type=11)/IPerror(id=42676, src='192.168.51.23', dst='192.168.0.1')/ICMPerror(id=26)
assert answer.answers(query)
conf.checkIPsrc = conf_checkIPsrc
############
############
+ Tests on padding
= Padding assembly
r = raw(Padding("abc"))
r
assert r == b"abc"
r = raw(Padding("abc")/Padding("def"))
r
assert r == b"abcdef"
r = raw(Raw("ABC")/Padding("abc")/Padding("def"))
r
assert r == b"ABCabcdef"
r = raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
r
assert r == b"ABCDEFabcdef"
= Padding and length computation
p = IP(raw(IP()/Padding("abc")))
p
assert p.len == 20 and len(p) == 23
p = IP(raw(IP()/Raw("ABC")/Padding("abc")))
p
assert p.len == 23 and len(p) == 26
p = IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
p
assert p.len == 23 and len(p) == 29
= PadField test
~ PadField padding
class TestPad(Packet):
fields_desc = [ PadField(StrNullField("st", b""), 6, padwith=b"\xff"), StrField("id", b"")]
assert TestPad() == TestPad(raw(TestPad()))
assert raw(TestPad(st=b"st", id=b"id")) == b'st\x00\xff\xff\xffid'
= ReversePadField
~ PadField padding
class TestReversePad(Packet):
fields_desc = [ ByteField("a", 0),
ReversePadField(IntField("b", 0), 4)]
assert raw(TestReversePad(a=1, b=0xffffffff)) == b'\x01\x00\x00\x00\xff\xff\xff\xff'
assert TestReversePad(raw(TestReversePad(a=1, b=0xffffffff))).b == 0xffffffff
############
############
+ Tests on default value changes mechanism
= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
version = 3
ttl = 32
= Test of IPv3 class
a = IPv3()
v,t = a.version, a.ttl
v,t
assert (v,t) == (3,32)
r = raw(a)
r
assert r == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01'
############
############
+ ASN.1 tests
= ASN1 - ASN1_Object
assert ASN1_Object(1) == ASN1_Object(1)
assert ASN1_Object(1) > ASN1_Object(0)
assert ASN1_Object(1) >= ASN1_Object(1)
assert ASN1_Object(0) < ASN1_Object(1)
assert ASN1_Object(1) <= ASN1_Object(2)
assert ASN1_Object(1) != ASN1_Object(2)
ASN1_Object(2).show()
= ASN1 - RandASN1Object
a = RandASN1Object()
random.seed(0x2807)
o = bytes(a)
o
assert o in [
b'\x1e\x023V', # PyPy 2.7
b'A\x02\x07q', # Python 2.7
b'B\x02\xfe\x92', # python 3.7-3.9
]
= ASN1 - ASN1_BIT_STRING
a = ASN1_BIT_STRING("test", readable=True)
a
assert a.val == '01110100011001010111001101110100'
assert raw(a) == b'\x03\x05\x00test'
a = ASN1_BIT_STRING(b"\xff"*16, readable=True)
a
assert a.val == "1" * 128
assert raw(a) == b'\x03\x11\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
= ASN1 - ASN1_SEQUENCE
a = ASN1_SEQUENCE([ASN1_Object(1), ASN1_Object(0)])
assert a.strshow() == '# ASN1_SEQUENCE:\n <ASN1_Object[1]>\n <ASN1_Object[0]>\n'
= ASN1 - ASN1_DECODING_ERROR
a = ASN1_DECODING_ERROR("error", exc=OSError(1))
assert repr(a) == "<ASN1_DECODING_ERROR['error']{{1}}>"
b = ASN1_DECODING_ERROR("error", exc=OSError(ASN1_BIT_STRING("0")))
assert repr(b) in ["<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]=b'\\x00' (7 unused bits)>}}>",
"<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]='\\x00' (7 unused bits)>}}>"]
= ASN1 - ASN1_INTEGER
a = ASN1_INTEGER(int("1"*23))
assert repr(a) in ["0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...1111111111]>",
"0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...111111111L]>"]
= ASN1 - ASN1_OID
assert raw(ASN1_OID("")) == b"\x06\x00"
= RandASN1Object(), specific crashes
import random
# ASN1F_NUMERIC_STRING
random.seed(1514315682)
raw(RandASN1Object())
# ASN1F_VIDEOTEX_STRING
random.seed(1240186058)
raw(RandASN1Object())
# ASN1F_UTC_TIME & ASN1F_GENERALIZED_TIME
random.seed(1873503288)
raw(RandASN1Object())
= SSID is parsed properly even with the presence of RSN Information
~ SSID RSN Information
# A regression test for https://github.com/secdev/scapy/pull/2685.
# https://github.com/secdev/scapy/issues/2683 describes a packet with
# RSN Information that isn't parsed properly,
# causing the SSID to be overridden.
# This test checks the SSID is parsed properly.
filename = scapy_path("/test/pcaps/bad_rsn_parsing_overrides_ssid.pcap")
frame = rdpcap(filename)[0]
beacon = frame.getlayer(5)
ssid = beacon.network_stats()['ssid']
assert ssid == "ROUTE-821E295"
= SSID is parsed properly even when the Country Information Tag Element has an odd length (not complying with the standard) and a missing pad byte
~ Missing Pad Byte in Country Info
# A regression test for https://github.com/secdev/scapy/pull/2685.
# https://github.com/secdev/scapy/issues/4132 describes a packet with
# a Country Information element tag that has an odd length, even though it's against the standard.
# The transmitter should have added a padding byte to make the length even, but it didn't.
# The effect on scapy used to be improper parsing of the next tag elements, causing the SSID to be overridden.
# This test checks the SSID is parsed properly.
from io import BytesIO
pcapfile = BytesIO(b'\n\r\r\n\x80\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x10\x00Linux 6.1.21-v8+\x04\x00E\x00Dumpcap (Wireshark) 3.4.10 (Git v3.4.10 packaged as 3.4.10-0+deb11u1)\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x7f\x00\x00\x00\x00\x04\x00\x00\x02\x00\x05\x00wifi2\x00\x00\x00\t\x00\x01\x00\t\x00\x00\x00\x0c\x00\x10\x00Linux 6.1.21-v8+\x00\x00\x00\x00@\x00\x00\x00\x06\x00\x00\x00\xb0\x01\x00\x00\x00\x00\x00\x00c\xd3\x87\x17\xe3c5\x82\x90\x01\x00\x00\x90\x01\x00\x00\x00\x00 \x00\xae@\x00\xa0 \x08\x00\xa0 \x08\x00\x00\x10\x0cd\x14@\x01\xa9\x00\x0c\x00\x00\x00\xa6\x00\xa8\x01\x80\x00\x00\x00\xff\xff\xff\xff\xff\xff\x02\xbf\xaf\x9f\xf8\x07\x02\xbf\xaf\x9f\xf8\x070\x96[p\xdcM\x06\x00\x00\x00d\x00\x11\x00\x00\x00\x01\x08\x8c\x12\x98$\xb0H`l\x03\x01,\x05\x04\x00\x01\x00\x00\x07QUS \x01\r\x80$\x01\x80(\x01\x80,\x01\x800\x01\x804\x01\x808\x01\x80<\x01\x80@\x01\x80d\x01\x80h\x01\x80l\x01\x80p\x01\x80t\x01\x80x\x01\x80|\x01\x80\x80\x01\x80\x84\x01\x80\x88\x01\x80\x8c\x01\x80\x90\x01\x80\x95\x01\x80\x99\x01\x80\x9d\x01\x80\xa1\x01\x80\xa5\x01\x800\x14\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x02\x0c\x00;\x02s\x00-\x1a,\t\x13\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00,\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00=\x16,\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\x18\x00P\xf2\x02\x01\x01\x81\x00\x03\xa4\x00\x00\'\xa4\x00\x00BC]\x00a\x11.\x00\xdd;\x00P\xf2\x04\x10J\x00\x01\x10\x10D\x00\x01\x02\x10I\x00\x06\x007*\x00\x01 \x10\x11\x00\x1358" Hisense Roku TV\x10T\x00\x08\x00\x07\x00P\xf2\x04\x00\x01\xdd\x16\xc8:k\x01\x01\x1048<@dhlptx|\x80\x84\x88\x8c\x90\xdd\x12Po\x9a\t\x02\x02\x00!\x0b\x03\x06\x00\x02\xbf\xaf\x9f\xf8\x07\xdd\rPo\x9a\n\x00\x00\x06\x01\x11\x1cD\x002\xf5N\xfbh\xb0\x01\x00\x00')
pktpcap = rdpcap(pcapfile)
frame = pktpcap[0]
beacon = frame.getlayer(4)
stats = beacon.network_stats()
ssid = stats['ssid']
assert ssid == ""
country = stats['country']
assert country == 'US'
############
############
+ Network tests
* Those tests need network access
= Sending and receiving an ICMP
~ netaccess needs_root IP ICMP icmp_firewall
def _test():
with no_debug_dissector():
x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
x is not None and ICMP in x and x[ICMP].type == 0
retry_test(_test)
= Sending a TCP syn message at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
tmp = send(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True)
assert len(tmp) == 1
tmp = sendp(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True)
assert len(tmp) == 1
p = Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S")
from decimal import Decimal
p.time = Decimal(p.time)
tmp = sendp(p, return_packets=True, realtime=True)
assert len(tmp) == 1
retry_test(_test)
= Latency check: localhost ICMP
~ netaccess needs_root linux latency
# Note: still needs to enforce L3RawSocket as this won't work otherwise with libpcap
sock = conf.L3socket
conf.L3socket = L3RawSocket
def _test():
req = IP(dst="127.0.0.1")/ICMP()
ans = sr1(req, timeout=3)
assert (ans.time - req.sent_time) >= 0
assert (ans.time - req.sent_time) <= 1e-3
try:
retry_test(_test)
finally:
conf.L3socket = sock
= Test sniffing on multiple sockets
~ netaccess needs_root sniff
# This test sniffs on the same interface twice at the same time, to
# simulate sniffing on multiple interfaces.
def _test():
iface = conf.route.route(str(Net("www.google.com")))[0]
port = int(RandShort())
pkt = IP(dst="www.google.com")/TCP(sport=port, dport=80, flags="S")
def cb():
sr1(pkt, timeout=3)
sniffer = AsyncSniffer(started_callback=cb,
iface=[iface, iface],
lfilter=lambda x: TCP in x and x[TCP].dport == port,
prn=lambda x: x.summary(),
count=2)
sniffer.start()
sniffer.join(timeout=3)
assert len(sniffer.results) == 2
for pkt in sniffer.results:
assert pkt.sniffed_on == iface
retry_test(_test)
= Sending a TCP syn 'forever' at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
tmp = srloop(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3)
assert type(tmp) == tuple and len(tmp[0]) == 1
tmp = srploop(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3)
assert type(tmp) == tuple and len(tmp[0]) == 1
retry_test(_test)
= Sending and receiving an TCP syn with flooding methods
~ netaccess needs_root IP flood
from functools import partial
# flooding methods do not support timeout. Packing the test for security
def _test_flood(ip, flood_function, add_ether=False):
with no_debug_dissector():
p = IP(dst=ip)/TCP(sport=RandShort(), dport=80, flags="S")
if add_ether:
p = Ether()/p
p.show2()
x = flood_function(p, timeout=0.5, maxretries=10)
if type(x) == tuple:
x = x[0][0][1]
x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
_test_srflood = partial(_test_flood, "www.google.com", srflood)
retry_test(_test_srflood)
_test_sr1flood = partial(_test_flood, "www.google.fr", sr1flood)
retry_test(_test_sr1flood)
_test_srpflood = partial(_test_flood, "www.google.net", srpflood, True)
retry_test(_test_srpflood)
_test_srp1flood = partial(_test_flood, "www.google.co.uk", srp1flood, True)
retry_test(_test_srp1flood)
= test chainEX
~ netaccess
import socket
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssck = StreamSocket(sck)
try:
r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True)
assert False
except Exception:
assert True
finally:
sck.close()
= Sending and receiving an ICMPv6EchoRequest
~ netaccess ipv6
def _test():
with no_debug_dissector():
x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3)
x
assert x[IPv6].ottl() in [32, 64, 128, 255]
assert 0 <= x[IPv6].hops() <= 126
x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129
retry_test(_test)
= Whois request
~ netaccess IP
* This test retries on failure because it often fails
def _test():
IP(src="8.8.8.8").whois()
retry_test(_test)
= AS resolvers
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():
ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4")
assert (len(ret) == 2)
assert any(x[1] == "AS15169" for x in ret)
retry_test(_test)
riswhois_data = b"route: 8.8.8.0/24\ndescr: Google\norigin: AS15169\nnotify: radb-contact@google.com\nmnt-by: MAINT-AS15169\nchanged: radb-contact@google.com 20150728\nsource: RADB\n\nroute: 8.0.0.0/9\ndescr: Proxy-registered route object\norigin: AS3356\nremarks: auto-generated route object\nremarks: this next line gives the robot something to recognize\nremarks: L'enfer, c'est les autres\nremarks: \nremarks: This route object is for a Level 3 customer route\nremarks: which is being exported under this origin AS.\nremarks: \nremarks: This route object was created because no existing\nremarks: route object with the same origin was found, and\nremarks: since some Level 3 peers filter based on these objects\nremarks: this route may be rejected if this object is not created.\nremarks: \nremarks: Please contact routing@Level3.net if you have any\nremarks: questions regarding this object.\nmnt-by: LEVEL3-MNT\nchanged: roy@Level3.net 20060203\nsource: LEVEL3\n\n\n"
ret = AS_resolver_riswhois()._parse_whois(riswhois_data)
assert ret == ('AS15169', 'Google')
retry_test(_test)
# This test is too buggy, and is simulated below
#def _test():
# ret = AS_resolver_cymru().resolve("8.8.8.8")
# assert (len(ret) == 1)
# all(x[1] == "AS15169" for x in ret)
#
#retry_test(_test)
cymru_bulk_data = """
Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000]
24776 | 217.25.178.5 | INFOCLIP-AS, FR
36459 | 192.30.253.112 | GITHUB - GitHub, Inc., US
26496 | 68.178.213.61 | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US
"""
tmp = AS_resolver_cymru().parse(cymru_bulk_data)
assert len(tmp) == 3
assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496']
= AS resolver - IPv6
~ netaccess IP
* This test retries on failure because it often fails
def _test():
as_resolver6 = AS_resolver6()
ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
assert (len(ret) == 2)
assert any(x[1] == 15169 for x in ret)
retry_test(_test)
= AS resolver - socket error
~ IP
* This test checks that a failing resolver will not crash a script
class MockAS_resolver(object):
def resolve(self, *ips):
raise socket.error
asrm = AS_resolver_multi(MockAS_resolver())
assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0
= sendpfast
~ tcpreplay
old_interactive = conf.interactive
conf.interactive = False
try:
sendpfast([])
assert False
except Exception:
assert True
conf.interactive = old_interactive
assert True
############
############
+ Generator tests
= Implicit logic 1
~ IP TCP
a = Ether() / IP(ttl=(5, 10)) / TCP(dport=[80, 443])
ls(a)
ls(a, verbose=True)
l = [p for p in a]
len(l) == 12
= Implicit logic 2
~ IP
a = IP(ttl=[1,2,(5,9)])
ls(a)
ls(a, verbose=True)
l = [p for p in a]
len(l) == 7
= Implicit logic 3
# In case there's a single option: __iter__ should not return self
a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP()
for i in a:
i.sent_time = 1
assert a.sent_time is None
# In case they are several, self should never be returned
a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP(seq=(0, 5))
for i in a:
i.sent_time = 1
assert a.sent_time is None
############
############
+ Real usages
= Port scan
~ netaccess needs_root IP TCP
def _test():
with no_debug_dissector():
ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]), timeout=2)
# New format: all Python versions
ans.make_table(lambda s, r: (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))
retry_test(_test)
= Send & receive with debug_match
~ netaccess needs_root IP ICMP
def _test():
old_debug_match = conf.debug_match
conf.debug_match = True
with no_debug_dissector():
ans, unans = sr(IP(dst="www.google.fr") / TCP(sport=RandShort(), dport=80, flags="S"), timeout=2)
assert ans[0].query == ans[0][0]
assert ans[0].answer == ans[0][1]
conf.debug_match = old_debug_match
assert ans and not unans
retry_test(_test)
= Send & receive with retry
~ netaccess needs_root IP ICMP
def _test():
with no_debug_dissector():
ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, retry=1)
assert len(ans) == 1 and len(unans) == 1
retry_test(_test)
= Send & receive with multi
~ netaccess needs_root IP ICMP
def _test():
with no_debug_dissector():
ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, multi=1)
assert len(ans) >= 1 and len(unans) == 1
retry_test(_test)
= Traceroute function
~ netaccess needs_root tcpdump
* Let's test traceroute
def _test():
ans, unans = traceroute("www.slashdot.org")
ans.nsummary()
s,r=ans[0]
s.show()
s.show(2)
retry_test(_test)
= send() and sniff()
~ netaccess needs_root
def _test():
sendp(Ether()/IP(src="9.0.0.0")/UDP(), count=3, iface=conf.iface)
r = sniff(timeout=3, count=1,
lfilter=lambda x: IP in x and x[IP].src == "9.0.0.0",
iface=conf.iface,
started_callback=_test)
assert r
= sniff() with socket failure
* GH issue 3631
REFPACKET = Ether()/IP()/UDP()
# A socket that fails after 10 packets
class OOPipe(ObjectPipe):
def recv(self, x=MTU):
self.i = getattr(self, "i", 0) + 1
if self.i == 11:
self.close()
raise OSError("Giant failure")
pkt = super(OOPipe, self).recv(x)
self.send(REFPACKET)
return pkt
o = OOPipe()
o.send(REFPACKET)
pkts = sniff(opened_socket=[o], timeout=3)
assert len(pkts) == 10
= GH issue 3306
~ netaccess needs_root
send(fuzz(ARP()))
= Test SuperSocket.select
~ select
import mock
@mock.patch("scapy.supersocket.select")
def _test_select(select):
def f(a, b, c, d):
raise IOError(0)
select.side_effect = f
try:
SuperSocket.select([])
return False
except:
return True
assert _test_select()
= Test L2ListenTcpdump socket
~ netaccess
# Needs to be fixed. Fails randomly
#import time
#for i in range(10):
# read_s = L2ListenTcpdump(iface=conf.iface)
# out_s = conf.L2socket(iface=conf.iface)
# time.sleep(5) # wait for read_s to be ready
# icmp_r = Ether()/IP(dst="secdev.org")/ICMP()
# res = sndrcv(out_s, icmp_r, timeout=5, rcv_pks=read_s)[0]
# read_s.close()
# out_s.close()
# time.sleep(5)
# if res:
# break
#
#response = res[0][1]
#assert response[ICMP].type == 0
True
= Test set of sent_time by sr
~ netaccess needs_root IP ICMP
def _test():
packet = IP(dst="8.8.8.8")/ICMP()
r = sr(packet, timeout=2)
assert packet.sent_time is not None
retry_test(_test)
= Test set of sent_time by sr (multiple packets)
~ netaccess needs_root IP ICMP
def _test():
packet1 = IP(dst="8.8.8.8")/ICMP()
packet2 = IP(dst="8.8.4.4")/ICMP()
r = sr([packet1, packet2], timeout=2)
assert packet1.sent_time is not None
assert packet2.sent_time is not None
retry_test(_test)
= Test set of sent_time by srflood
~ netaccess needs_root IP ICMP
def _test():
packet = IP(dst="8.8.8.8")/ICMP()
r = srflood(packet, timeout=2)
assert packet.sent_time is not None
retry_test(_test)
= Test set of sent_time by srflood (multiple packets)
~ netaccess needs_root IP ICMP
def _test():
packet1 = IP(dst="8.8.8.8")/ICMP()
packet2 = IP(dst="8.8.4.4")/ICMP()
r = srflood([packet1, packet2], timeout=2)
assert packet1.sent_time is not None
assert packet2.sent_time is not None
retry_test(_test)
############
############
+ ManuFDB tests
= __repr__
if conf.manufdb:
len(conf.manufdb)
else:
True
= check _resolve_MAC
if conf.manufdb:
assert conf.manufdb._resolve_MAC("00:00:17") == "Oracle"
else:
True
############
############
+ Ether tests with IPv6
= Ether IPv6 checking for dst
~ netaccess ipv6
p = Ether()/IPv6(dst="www.google.com")/TCP()
assert p.dst != p[IPv6].dst
p.show()
############
############
+ pcap / pcapng format support
= Variable creations
from io import BytesIO
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
pcapngfile = BytesIO(b'\n\r\r\n\\\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00,\x00File created by merging: \nFile1: test.pcap \n\x04\x00\x08\x00mergecap\x00\x00\x00\x00\\\x00\x00\x00\x01\x00\x00\x00\\\x00\x00\x00e\x00\x00\x00\xff\xff\x00\x00\x02\x006\x00Unknown/not available in original file format(libpcap)\x00\x00\t\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\\\x00\x00\x00\x06\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00/\xfc[\xcd(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00H\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\x1f\xff[\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\xb9\x02\\\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00<\x00\x00\x00')
pcapnanofile = BytesIO(b"M<\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacV\xc9\xc1\xb5'(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV-;\xc1'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\x9aL\xcf'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00")
pcapwirelenfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00}\x87pZ.\xa2\x08\x00\x0f\x00\x00\x00\x10\x00\x00\x00\xff\xff\xff\xff\xff\xff GG\xee\xdd\xa8\x90\x00a')
pcapngdefaults = BytesIO(base64_bytes(b'Cg0NChwAAABNPCsaAQAAAP//////////HAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACUeZiQAAAAAgAAAAAQAAACAAAAASAQAA//8AAAkAAQAJAAAAAAAAACAAAAABAAAAIAAAABIBAAD//wAACQABAAkAAAAAAAAAIAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACQAAAAAAAAAgAAAABgAAAIQBAAADAAAApO/bFdgJaeBiAQAAYgEAAFVVVVVVVVXV////////IMbr4D7PCABFAAFIlQkAAEAR5JwAAAAA/////wBEAEMBNJDsAQEGAFSpVwIACoAAAAAAAAAAAAAAAAAAAAAAACDG6+A+zwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABjglNjNQEB/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsOs+bAAAhAEAAAYAAACAAQAAAwAAAKTv2xXIDYznYAEAAGABAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABRgGPAAAEEal3qf5wqO////rhbgdsATJi0U5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRg0KDQpcQcvWgAEAAAYAAAC4AQAAAwAAAKTv2xV4Ao3nlQEAAJUBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABewGQAAAEEalBqf5wqO////rhbgdsAWfu+k5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOmRldmljZTpwMDBSZW1vdGVDb250cm9sbGVyOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46cGFuYXNvbmljLWNvbTpkZXZpY2U6cDAwUmVtb3RlQ29udHJvbGxlcjoxDQoNCrLVKmoAAAC4AQAABgAAAHgBAAADAAAApO/bFVjbjedXAQAAVwEAAFVVVVVVVVXVAQBef//6IMbr4D7PCABFAAE9AZEAAAQRqX6p/nCo7///+uFuB2wBKaZATk9USUZZICogSFRUUC8xLjENCkhPU1Q6IDIzOS4yNTUuMjU1LjI1MDoxOTAwDQpDQUNIRS1DT05UUk9MOiBtYXgtYWdlPTE4MDANCkxPQ0FUSU9OOiBodHRwOi8vMTY5LjI1NC4xMTIuMTY4OjU1MDAwL25yYy9kZGQueG1sDQpOVDogdXBucDpyb290ZGV2aWNlDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRjo6dXBucDpyb290ZGV2aWNlDQoNCjagXoUAeAEAAAYAAAC0AQAAAwAAAKTv2xXYw47nkwEAAJMBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABeQGSAAAEEalBqf5wqO////rhbgdsAWWV4E5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KTlRTOiBzc2RwOmFsaXZlDQpTRVJWRVI6IEZyZWVCU0QvOC4wIFVQblAvMS4wIFBhbmFzb25pYy1NSUwtRExOQS1TVi8xLjANClVTTjogdXVpZDo0RDQ1NDkzMC0wMjAwLTEwMDAtODAwMS0yMEM2RUJFMDNFQ0Y6OnVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KDQovXKFrALQBAAAGAAAAqAEAAAMAAACk79sVuJKP54cBAACHAQAAVVVVVVVVVdUBAF5///ogxuvgPs8IAEUAAW0BkwAABBGpTKn+cKjv///64W4HbAFZRNJOT1RJRlkgKiBIVFRQLzEuMQ0KSE9TVDogMjM5LjI1NS4yNTUuMjUwOjE5MDANCkNBQ0hFLUNPTlRST0w6IG1heC1hZ2U9MTgwMA0KTE9DQVRJT046IGh0dHA6Ly8xNjkuMjU0LjExMi4xNjg6NTUwMDAvbnJjL2RkZC54bWwNCk5UOiB1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCg0KLn5A6QCoAQAA'))
= Read a pcap file
pktpcap = rdpcap(pcapfile)
= Read a pcapng file
pktpcapng = rdpcap(pcapngfile)
assert pktpcapng[0].time == 1454163407.666223
= Read a pcap file with nanosecond precision
pktpcapnano = rdpcap(pcapnanofile)
assert pktpcapnano[0].time == 1454163407.666223049
= Read a pcapng file with nanosecond precision and default tsresol
pktpcapngdefaults = rdpcap(pcapngdefaults)
assert pktpcapngdefaults[0].time == 1575115986.114775512
assert Ether in pktpcapngdefaults[0]
= Read a pcapng with little-endian SHB
pktcapng = sniff(offline=scapy_path("/test/pcaps/macos.pcapng.gz"))
assert len(pktcapng) != 0
= Write a pcapng
tmpfile = get_temp_file(autoext=".pcapng")
r = RawPcapNgWriter(tmpfile)
r._write_block_shb()
r._write_block_idb()
ts = 1632568366.384185
r._write_block_epb(raw(Ether()/"Hello Scapy!!!"), ts)
r.f.close()
assert os.stat(tmpfile).st_size == 108
l = rdpcap(tmpfile)
assert b"Scapy" in l[0][Raw].load
assert l[0].time == ts
= Check wrpcapng()
tmpfile = get_temp_file(autoext=".pcapng")
p = Ether()/"Hello Scapy!!!"
p.time = 1632568366.384185
wrpcapng(tmpfile, p)
assert os.stat(tmpfile).st_size == 108
l = rdpcap(tmpfile)
assert b"Scapy" in l[0][Raw].load
assert l[0].time == ts
p = Ether() / IPv6() / TCP()
p.comment = b"Hello Scapy!"
wrpcapng(tmpfile, p)
l = rdpcap(tmpfile)
assert l[0].comment == p.comment
= Read a pcap file with wirelen != captured len
pktpcapwirelen = rdpcap(pcapwirelenfile)
= Check all packet lists are the same
assert list(pktpcap) == list(pktpcapng) == list(pktpcapnano)
assert [float(p.time) for p in pktpcap] == [float(p.time) for p in pktpcapng] == [float(p.time) for p in pktpcapnano]
= Check packets from pcap file
assert all(IP in pkt for pkt in pktpcap)
assert all(any(proto in pkt for pkt in pktpcap) for proto in [ICMP, UDP, TCP])
= Check wirelen value from pcap file
assert len(pktpcapwirelen) == 1
assert pktpcapwirelen[0].wirelen is not None
assert len(pktpcapwirelen[0]) < pktpcapwirelen[0].wirelen
= Check wrpcap() then rdpcap() with wirelen
import os, tempfile
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrpcap(fdesc, pktpcapwirelen)
fdesc.close()
newpktpcapwirelen = rdpcap(filename)
assert len(newpktpcapwirelen) == 1
assert newpktpcapwirelen[0].wirelen is not None
assert len(newpktpcapwirelen[0]) < newpktpcapwirelen[0].wirelen
assert newpktpcapwirelen[0].wirelen == pktpcapwirelen[0].wirelen
= Check wrpcap() then rdpcap() with sent_time on SndRcvList
f = get_temp_file()
s = Ether()/IP()
r = Ether()/IP()
s.sent_time = 1
r.time = 2
wrpcap(f, SndRcvList([(s, r)]))
pcap = rdpcap(f)
assert pcap[0].time == 1
assert pcap[1].time == 2
= Check wrpcap()
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrpcap(fdesc, pktpcap)
fdesc.close()
= Check offline sniff() (by PacketList)
l=sniff(offline=PacketList([IP()/TCP(),IP()/TCP()]))
assert len(l) == 2
assert all(TCP in p for p in l)
= Check offline sniff() (by filename)
assert list(pktpcap) == list(sniff(offline=filename))
= Check offline sniff() (by file object)
fdesc = open(filename, "rb")
assert list(pktpcap) == list(sniff(offline=fdesc))
fdesc.close()
= Check offline sniff() with a filter (by filename)
~ tcpdump libpcap
pktpcap_flt = [(proto, sniff(offline=filename, filter=proto.__name__.lower()))
for proto in [ICMP, UDP, TCP]]
assert all(list(pktpcap[proto]) == list(packets) for proto, packets in pktpcap_flt)
= Check offline sniff() with a filter (by file object)
~ tcpdump libpcap
fdesc = open(filename, "rb")
pktpcap_tcp = sniff(offline=fdesc, filter="tcp")
fdesc.close()
assert list(pktpcap[TCP]) == list(pktpcap_tcp)
os.unlink(filename)
= Check offline sniff() with a PcapNg file and a filter (by file object)
~ tcpdump libpcap
pcapng_data = b'\n\r\r\n`\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x04\x009\x00TShark (Wireshark) 3.2.3 (Git v3.2.3 packaged as 3.2.3-1)\x00\x00\x00\x00\x00\x00\x00`\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\xff\xff\x00\x00\x14\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x98\xcd\x05\x00\x19\x83\xf7\x9e\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00'
if OPENBSD:
# Note: OpenBSD tcpdump does not support PcapNg
assert True
else:
fdesc, filename = tempfile.mkstemp()
os.close(fdesc)
fd = open(filename, "wb")
fd.write(pcapng_data)
fd.close()
packets = sniff(offline=filename, filter="udp")
os.unlink(filename)
assert UDP in packets[0]
= Check offline sniff() with Packets and tcpdump with a filter
~ tcpdump libpcap
l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp")
assert len(l) == 2
assert all(UDP in p for p in l)
l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp")
assert len(l) == 2
assert all(UDP in p for p in l)
l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp")
assert len(l) == 0
= Check offline sniff() with Packets, tcpdump and a bad filter
~ tcpdump libpcap
try:
sniff(offline=IP()/UDP(), filter="bad filter")
except Scapy_Exception:
pass
else:
assert False
= Check offline sniff with lfilter
assert len(sniff(offline=[IP()/UDP(), IP()/TCP()], lfilter=lambda x: TCP in x)) == 1
= Check offline sniff() without a tcpdump binary
~ tcpdump
import mock
conf_prog_tcpdump = conf.prog.tcpdump
conf.prog.tcpdump = "tcpdump_fake"
def _test_sniff_notcpdump():
try:
sniff(offline="fake.pcap", filter="tcp")
assert False
except:
assert True
_test_sniff_notcpdump()
conf.prog.tcpdump = conf_prog_tcpdump
= Check wrpcap(nano=True)
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
pktpcapnano[0].time += Decimal('1E-9')
wrpcap(fdesc, pktpcapnano, nano=True)
fdesc.close()
pktpcapnanoread = rdpcap(filename)
assert pktpcapnanoread[0].time == pktpcapnano[0].time
os.unlink(filename)
= Check PcapNg with nanosecond precision using obsolete packet block
* first packet from capture file icmp2.ntar -- https://wiki.wireshark.org/Development/PcapNg?action=AttachFile&do=view&target=icmp2.ntar
pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x02\x00\x00\x00n\x00\x00\x00\x00\x00\x00\x00e\x14\x00\x00)4\'ON\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00n\x00\x00\x00')
pktpcapng = rdpcap(pcapngfile)
assert len(pktpcapng) == 1
pkt = pktpcapng[0]
# weird, but wireshark agrees
assert pkt.time == 22425.352221737
assert isinstance(pkt, Ether)
pkt = pkt.payload
assert isinstance(pkt, IP)
pkt = pkt.payload
assert isinstance(pkt, ICMP)
pkt = pkt.payload
assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi'
pkt = pkt.payload
assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6'
pkt = pkt.payload
assert isinstance(pkt, NoPayload)
= Check PcapNg using Simple Packet Block
* previous file with the (obsolete) packet block replaced by a Simple Packet Block
pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x03\x00\x00\x00`\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00`\x00\x00\x00')
pktpcapng = rdpcap(pcapngfile)
assert len(pktpcapng) == 1
pkt = pktpcapng[0]
assert isinstance(pkt, Ether)
pkt = pkt.payload
assert isinstance(pkt, IP)
pkt = pkt.payload
assert isinstance(pkt, ICMP)
pkt = pkt.payload
assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi'
pkt = pkt.payload
assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6'
pkt = pkt.payload
assert isinstance(pkt, NoPayload)
= Invalid pcapng files
from io import BytesIO
# Invalid PCAPNG format -> Raise
try:
invalid_pcapngfile_1 = BytesIO(b'\n\r\r\n\r\x00\x00\x00M<+\x1a\xb2<\xb2\xa1\x01\x00\x00\x00\r\x00\x00\x00M<+\x1a\x80\xaa\xb2\x02')
rdpcap(invalid_pcapngfile_1)
assert False
except Scapy_Exception:
pass
# Invalid Packet in PCAPNG -> return
invalid_pcapngfile_2 = BytesIO(b'\n\r\r\n\x00\x00\x00\x10\x1a+<M\x00\x00\x00\x10\x00\x00\x00\x01\x00\x00\x00\x10 \x00\x00\x00\x10')
assert len(rdpcap(invalid_pcapngfile_2)) == 0
# Invalid interface ID in PCAPNG -> raise EOFError
try:
invalid_pcapngfile_3 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x03\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00')
rdpcap(invalid_pcapngfile_3)
assert False
except Scapy_Exception:
pass
# Invalid SPB in PCAPNG -> raise EOFError
try:
invalid_pcapngfile_4 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x0c\x00\x00\x00')
rdpcap(invalid_pcapngfile_4)
assert False
except Scapy_Exception:
pass
= Check PcapWriter on null write
f = BytesIO()
w = PcapWriter(f)
w.write([])
assert len(f.getvalue()) == 0
# Stop being closed for reals, but we still want to have the header written
with mock.patch.object(f, 'close') as cf:
w.close()
cf.assert_called_once_with()
assert len(f.getvalue()) != 0
= Check PcapWriter sets correct linktype after null write
f = BytesIO()
w = PcapWriter(f)
w.write([])
assert len(f.getvalue()) == 0
w.write(Ether()/IP()/ICMP())
assert len(f.getvalue()) != 0
# Stop being closed for reals, but we still want to have the header written
with mock.patch.object(f, 'close') as cf:
w.close()
cf.assert_called_once_with()
f.seek(0) or None
assert len(f.getvalue()) != 0
r = PcapReader(f)
f.seek(0) or None
assert r.LLcls is Ether
assert r.linktype == DLT_EN10MB
l = [ p for p in RawPcapReader(f) ]
assert len(l) == 1
= Check RawPcapReader on pcap
~ pcap
fd = get_temp_file()
wrpcap(fd, [Ether()/IP()/ICMP()])
assert len([p for p in RawPcapReader(fd)]) == 1
for (x, y) in RawPcapReader(fd):
pass
= Check RawPcapReader with a Context Manager
~ pcap
filename = get_temp_file(fd=False)
wrpcap(filename, [IP()/TCP(), IP()/UDP()])
try:
with RawPcapReader(filename) as reader:
packet = next(reader, None)
assert True
except TypeError:
assert False
= Check RawPcapWriter
~ pcap
# GH3256
fd = get_temp_file()
with RawPcapWriter(fd, linktype=1) as w:
w.write(b"test")
fd = get_temp_file()
with RawPcapWriter(fd) as w:
w.write(b"test")
assert w.linktype == 1
= Check tcpdump()
~ tcpdump
from io import BytesIO
* No very specific tests because we do not want to depend on tcpdump output
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x000}$]\xff\\\t\x006\x00\x00\x006\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x000}$]\x87i\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r0}$]\xfbp\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
data = tcpdump(pcapfile, dump=True, args=['-nn']).split(b'\n')
print(data)
assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0]
assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1]
assert b'127.0.0.1 > 127.0.0.1:' in data[2]
* Non existing tcpdump binary
import mock
conf_prog_tcpdump = conf.prog.tcpdump
conf.prog.tcpdump = "tcpdump_fake"
def _test_tcpdump_notcpdump():
try:
tcpdump(IP()/TCP())
assert False
except:
assert True
_test_tcpdump_notcpdump()
conf.prog.tcpdump = conf_prog_tcpdump
# Also check with use_tempfile=True (for non-OSX platforms)
pcapfile.seek(0) or None
tempfile_count = len(conf.temp_files)
data = tcpdump(pcapfile, dump=True, args=['-nn'], use_tempfile=True).split(b'\n')
print(data)
assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0]
assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1]
assert b'127.0.0.1 > 127.0.0.1:' in data[2]
# We should have another tempfile tracked.
assert len(conf.temp_files) > tempfile_count
# Check with a simple packet
data = tcpdump([Ether()/IP()/ICMP()], dump=True, args=['-nn']).split(b'\n')
print(data)
assert b'127.0.0.1 > 127.0.0.1: ICMP' in data[0].upper()
= Check tcpdump() command with linktype
~ tcpdump libpcap
f = BytesIO()
pkt = Ether()/IP()/ICMP()
with mock.patch('subprocess.Popen', return_value=Bunch(
stdin=f, wait=lambda: None)) as popen:
# Prevent closing the BytesIO
with mock.patch.object(f, 'close'):
tcpdump([pkt], linktype="DLT_EN10MB", use_tempfile=False)
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-']
if OPENBSD:
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-']
popen.assert_called_once_with(
expected_command,
stdin=subprocess.PIPE, stdout=None, stderr=None)
print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt
= Check tcpdump() command with linktype and args
~ tcpdump libpcap
f = BytesIO()
pkt = Ether()/IP()/ICMP()
with mock.patch('subprocess.Popen', return_value=Bunch(
stdin=f, wait=lambda: None)) as popen:
# Prevent closing the BytesIO
with mock.patch.object(f, 'close'):
tcpdump([pkt], linktype=scapy.data.DLT_EN10MB, use_tempfile=False)
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-']
if OPENBSD:
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-']
popen.assert_called_once_with(
expected_command,
stdin=subprocess.PIPE, stdout=None, stderr=None)
print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt
= Check sniff() offline with linktype & 802.11 filter
~ tcpdump linux
fd = get_temp_file()
wrpcap(fd, [RadioTap()/Dot11()/Dot11ProbeReq(), RadioTap()/Dot11()])
lst = sniff(offline=fd, filter="subtype probe-req")
assert len(lst) == 1
= Check tcpdump() command rejects non-string input for prog
pkt = Ether()/IP()/ICMP()
try:
tcpdump([pkt], prog=+17607067425, args=['-nn'])
except ValueError as e:
if hasattr(e, 'args'):
assert 'prog' in e.args[0]
else:
assert 'prog' in e.message
else:
assert False, 'expected exception'
= Check tcpdump() command with tshark
~ tshark
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
values = [tuple(int(val) for val in line[:-1].split(b'\t')) for line in tcpdump(pcapfile, prog=conf.prog.tshark, getfd=True, args=['-T', 'fields', '-e', 'ip.ttl', '-e', 'ip.proto'])]
assert values == [(64, 6), (64, 17), (64, 1)]
assert len(conf.temp_files) == tempfile_count
= Check tdecode command directly for tshark
~ tshark
pkts = [
Ether()/IP(src='192.0.2.1', dst='192.0.2.2')/ICMP(type='echo-request')/Raw(b'X'*100),
Ether()/IP(src='192.0.2.2', dst='192.0.2.1')/ICMP(type='echo-reply')/Raw(b'X'*100),
]
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
r = tdecode(pkts, dump=True)
r
assert b'Src: 192.0.2.1' in r
assert b'Src: 192.0.2.2' in r
assert b'Dst: 192.0.2.2' in r
assert b'Dst: 192.0.2.1' in r
assert b'Echo (ping) request' in r
assert b'Echo (ping) reply' in r
assert b'ICMP' in r
assert len(conf.temp_files) == tempfile_count
= Check tdecode with linktype
~ tshark
# These are the same as the ping packets above
pkts = [
b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x01\xc0\x00\x02\x02\x08\x00\xb6\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x02\xc0\x00\x02\x01\x00\x00\xbe\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
]
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
r = tdecode(pkts, dump=True, linktype=DLT_EN10MB)
assert b'Src: 192.0.2.1' in r
assert b'Src: 192.0.2.2' in r
assert b'Dst: 192.0.2.2' in r
assert b'Dst: 192.0.2.1' in r
assert b'Echo (ping) request' in r
assert b'Echo (ping) reply' in r
assert b'ICMP' in r
assert len(conf.temp_files) == tempfile_count
= Run scapy's tshark command
~ needs_root
tshark(count=1, timeout=3)
= Check wireshark()
~ wireshark
f = BytesIO()
pkt = Ether()/IP()/ICMP()
with mock.patch('subprocess.Popen', return_value=Bunch(stdin=f)) as popen:
# Prevent closing the BytesIO
with mock.patch.object(f, 'close'):
wireshark([pkt])
popen.assert_called_once_with(
[conf.prog.wireshark, '-ki', '-'],
stdin=subprocess.PIPE, stdout=None, stderr=None)
print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt
= Check Raw IP pcap files
import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [IP()/UDP(), IPv6()/UDP()], linktype=DLT_RAW)
packets = rdpcap(filename)
assert isinstance(packets[0], IP) and isinstance(packets[1], IPv6)
= Check wrpcap() with no packet
import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [])
fstat = os.stat(filename)
assert fstat.st_size != 0
os.remove(filename)
= Check wrpcap() with SndRcvList
import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, SndRcvList(res=[(Ether()/IP(), Ether()/IP())]))
assert len(rdpcap(filename)) == 2
os.remove(filename)
= Check wrpcap() with different packets types
import mock
import os
import tempfile
with mock.patch("scapy.utils.warning") as warning:
filename = tempfile.mktemp()
wrpcap(filename, [IP(), Ether(), IP(), IP()])
os.remove(filename)
assert any("Inconsistent" in arg for arg in warning.call_args[0])
= Check wrpcap() with the Loopback layer
~ tshark
for cls in [Loopback, LoopbackOpenBSD]:
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [cls()/IP()/ICMP()])
return_value = b"".join(line for line in tcpdump(filename, prog=conf.prog.tshark, getfd=True))
assert b"Echo (ping) request" in return_value
############
############
+ ERF Ethernet format support
= Variable creations
erffile = BytesIO(b'3;!E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e')
erffilewithheader = BytesIO(b'4;!E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e')
= Check reading of ERF Ethernet file
pkterf = rderf(erffile)
assert pkterf[0].time == 1603418463.270038318
assert pkterf[0][IP].src == "10.1.93.219"
assert pkterf[0][IP].dst == "10.251.57.218"
assert pkterf[0][Ether].src == "1c:6a:7a:18:90:ed"
assert pkterf[0][Ether].dst == "00:0f:53:3f:ca:c0"
= Check writing of ERF Ethernet file
import os, tempfile
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrerf(fdesc, pkterf)
fdesc.close()
newpkterf = rderf(filename)
assert pkterf[1][Ether].src == newpkterf[1][Ether].src
assert len(pkterf) == len(newpkterf)
assert newpkterf[0].time is not None
assert newpkterf[0].wirelen is not None
assert newpkterf[0].time == pkterf[0].time
assert newpkterf[0].wirelen == pkterf[0].wirelen
assert newpkterf[1].time is not None
assert newpkterf[1].wirelen is not None
assert newpkterf[1].time == pkterf[1].time
assert newpkterf[1].wirelen == pkterf[1].wirelen
_, filename = tempfile.mkstemp()
wrerf(filename, pkterf, append=True)
wrerf(filename, pkterf, append=True)
newdoublepkterf = rderf(filename)
assert len(newpkterf) * 2 == len(newdoublepkterf)
= Check rderf
pkterf = rderf(erffilewithheader)
assert pkterf[1].time == 1603418463.270062279
assert pkterf[1][Ether].src == "00:0f:53:3f:ca:c0"
############
############
+ Mocked read_routes() calls
= Truncated netstat -rn output on OS X
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.get_if_addr")
@mock.patch("scapy.arch.unix.os")
def test_osx_netstat_truncated(mock_os, mock_get_if_addr):
"""Test read_routes() on OS X 10.? with a long interface name"""
# netstat & ifconfig outputs from https://github.com/secdev/scapy/pull/119
netstat_output = u"""
Routing tables
Internet:
Destination Gateway Flags Refs Use Netif Expire
default 192.168.1.1 UGSc 460 0 en1
default link#11 UCSI 1 0 bridge1
127 127.0.0.1 UCS 1 0 lo0
127.0.0.1 127.0.0.1 UH 10 2012351 lo0
"""
ifconfig_output = u"lo0 en1 bridge10\n"
# Mocked file descriptors
def se_popen(command):
"""Perform specific side effects"""
if command.startswith("netstat -rn"):
return StringIO(netstat_output)
elif command == "ifconfig -l":
ret = StringIO(ifconfig_output)
def unit():
return ret
ret.__call__ = unit
ret.__enter__ = unit
ret.__exit__ = lambda x,y,z: None
return ret
raise Exception("Command not mocked: %s" % command)
mock_os.popen.side_effect = se_popen
# Mocked get_if_addr() behavior
def se_get_if_addr(iface):
"""Perform specific side effects"""
if iface == "bridge1":
return "0.0.0.0"
return "1.2.3.4"
mock_get_if_addr.side_effect = se_get_if_addr
# Test the function
from scapy.arch.unix import read_routes
scapy.arch.unix.DARWIN = True
scapy.arch.unix.FREEBSD = False
scapy.arch.unix.NETBSD = False
scapy.arch.unix.OPENBSD = False
routes = read_routes()
assert len(routes) == 4
assert [r for r in routes if r[3] == "bridge10"]
test_osx_netstat_truncated()
= macOS 10.13
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.get_if_addr")
@mock.patch("scapy.arch.unix.os")
def test_osx_10_13_ipv4(mock_os, mock_get_if_addr):
"""Test read_routes() on OS X 10.13"""
# 'netstat -rn -f inet' output
netstat_output = u"""
Routing tables
Internet:
Destination Gateway Flags Refs Use Netif Expire
default 192.168.28.1 UGSc 82 0 en0
127 127.0.0.1 UCS 0 0 lo0
127.0.0.1 127.0.0.1 UH 1 878 lo0
169.254 link#5 UCS 0 0 en0
192.168.28 link#5 UCS 4 0 en0
192.168.28.1/32 link#5 UCS 2 0 en0
192.168.28.1 88:32:9c:f5:4e:ea UHLWIir 40 37 en0 1177
192.168.28.2 62:aa:56:4b:51:54 UHLWI 0 0 en0 619
192.168.28.4 38:17:ed:9a:58:28 UHLWIi 1 6 en0 428
192.168.28.18/32 link#5 UCS 1 0 en0
192.168.28.18 88:32:9c:f5:4e:eb UHLWI 0 1 lo0
192.168.28.28 04:0e:eb:11:74:a7 UHLWI 0 0 en0 576
224.0.0/4 link#5 UmCS 1 0 en0
224.0.0.251 1:0:5e:0:0:fb UHmLWI 0 0 en0
255.255.255.255/32 link#5 UCS 0 0 en0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked get_if_addr() output
def se_get_if_addr(iface):
"""Perform specific side effects"""
import socket
if iface == "en0":
return "192.168.28.18"
return "127.0.0.1"
mock_get_if_addr.side_effect = se_get_if_addr
# Test the function
from scapy.arch.unix import read_routes
scapy.arch.unix.DARWIN = False
scapy.arch.unix.FREEBSD = True
scapy.arch.unix.NETBSD = False
scapy.arch.unix.OPENBSD = False
routes = read_routes()
for r in routes:
print(r)
assert len(routes) == 15
default_route = [r for r in routes if r[0] == 0][0]
assert default_route[3] == "en0" and default_route[4] == "192.168.28.18"
test_osx_10_13_ipv4()
= macOS 10.15
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.get_if_addr")
@mock.patch("scapy.arch.unix.os")
def test_osx_10_15_ipv4(mock_os, mock_get_if_addr):
"""Test read_routes() on OS X 10.15"""
# 'netstat -rn -f inet' output
netstat_output = u"""
Routing tables
Internet:
Destination Gateway Flags Netif Expire
default 192.168.122.1 UGSc en0
127 127.0.0.1 UCS lo0
127.0.0.1 127.0.0.1 UH lo0
169.254 link#8 UCS en0 !
192.168.122 link#8 UCS en0 !
192.168.122.1/32 link#8 UCS en0 !
192.168.122.1 52:54:0:c0:b7:af UHLWIir en0 1169
192.168.122.63/32 link#8 UCS en0 !
224.0.0/4 link#8 UmCS en0 !
224.0.0.251 1:0:5e:0:0:fb UHmLWI en0
255.255.255.255/32 link#8 UCS en0 !
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked get_if_addr() output
def se_get_if_addr(iface):
"""Perform specific side effects"""
import socket
if iface == "en0":
return "192.168.122.42"
return "127.0.0.1"
mock_get_if_addr.side_effect = se_get_if_addr
# Test the function
from scapy.arch.unix import read_routes
scapy.arch.unix.DARWIN = False
scapy.arch.unix.FREEBSD = True
scapy.arch.unix.NETBSD = False
scapy.arch.unix.OPENBSD = False
routes = read_routes()
for r in routes:
print(r)
assert len(routes) == 11
default_route = [r for r in routes if r[0] == 0][0]
assert default_route[3] == "en0" and default_route[4] == "192.168.122.42"
test_osx_10_15_ipv4()
= OpenBSD 6.3
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.get_if_addr")
@mock.patch("scapy.arch.unix.OPENBSD")
@mock.patch("scapy.arch.unix.os")
def test_openbsd_6_3(mock_os, mock_openbsd, mock_get_if_addr):
"""Test read_routes() on OpenBSD 6.3"""
# 'netstat -rn -f inet' output
netstat_output = u"""
Routing tables
Internet:
Destination Gateway Flags Refs Use Mtu Prio Iface
default 10.0.1.254 UGS 0 0 - 8 bge0
224/4 127.0.0.1 URS 0 23 32768 8 lo0
10.0.1/24 10.0.1.26 UCn 4 192 - 4 bge0
10.0.1.1 00:30:48:57:ed:0b UHLc 2 338 - 3 bge0
10.0.1.2 00:03:ba:0c:0b:52 UHLc 1 186 - 3 bge0
10.0.1.26 00:30:48:62:b3:f4 UHLl 0 47877 - 1 bge0
10.0.1.135 link#1 UHLch 1 194 - 3 bge0
10.0.1.254 link#1 UHLch 1 190 - 3 bge0
10.0.1.255 10.0.1.26 UHb 0 0 - 1 bge0
10.188.6/24 10.188.6.17 Cn 0 0 - 4 tap3
10.188.6.17 fe:e1:ba:d7:ff:32 UHLl 0 25 - 1 tap3
10.188.6.255 10.188.6.17 Hb 0 0 - 1 tap3
10.188.135/24 10.0.1.135 UGS 0 0 1350 L 8 bge0
127/8 127.0.0.1 UGRS 0 0 32768 8 lo0
127.0.0.1 127.0.0.1 UHhl 1 3835230 32768 1 lo0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked OpenBSD parsing behavior
mock_openbsd = True
# Mocked get_if_addr() output
def se_get_if_addr(iface):
"""Perform specific side effects"""
import socket
if iface == "bge0":
return "192.168.122.42"
return "10.0.1.26"
mock_get_if_addr.side_effect = se_get_if_addr
# Test the function
from scapy.arch.unix import read_routes
return read_routes()
routes = test_openbsd_6_3()
for r in routes:
print(ltoa(r[0]), ltoa(r[1]), r)
# check that default route exists in parsed data structure
if ltoa(r[0]) == "0.0.0.0":
default = r
# check that route with locked mtu exists in parsed data structure
if ltoa(r[0]) == "10.188.135.0":
locked = r
assert len(routes) == 11
assert default[2] == "10.0.1.254"
assert default[3] == "bge0"
assert locked[2] == "10.0.1.135"
assert locked[3] == "bge0"
= Solaris 11.1
~ mock_read_routes_bsd
import mock
from io import StringIO
# Mocked Solaris 11.1 parsing behavior
@mock.patch("scapy.arch.get_if_addr")
@mock.patch("scapy.arch.unix.SOLARIS", True)
@mock.patch("scapy.arch.unix.os")
def test_solaris_111(mock_os, mock_get_if_addr):
"""Test read_routes() on Solaris 11.1"""
# 'netstat -rvn -f inet' output
netstat_output = u"""
IRE Table: IPv4
Destination Mask Gateway Device MTU Ref Flg Out In/Fwd
-------------------- --------------- -------------------- ------ ----- --- --- ----- ------
default 0.0.0.0 10.0.2.2 net0 1500 2 UG 5 0
10.0.2.0 255.255.255.0 10.0.2.15 net0 1500 3 U 0 0
127.0.0.1 255.255.255.255 127.0.0.1 lo0 8232 2 UH 1517 1517
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
print(scapy.arch.unix.SOLARIS)
# Mocked get_if_addr() output
def se_get_if_addr(iface):
"""Perform specific side effects"""
import socket
if iface == "net0":
return "10.0.2.15"
return "127.0.0.1"
mock_get_if_addr.side_effect = se_get_if_addr
# Test the function
from scapy.arch.unix import read_routes
return read_routes()
routes = test_solaris_111()
print(routes)
assert len(routes) == 3
assert routes[0][:4] == (0, 0, '10.0.2.2', 'net0')
assert routes[1][:4] == (167772672, 4294967040, '0.0.0.0', 'net0')
assert routes[2][:4] == (2130706433, 4294967295, '0.0.0.0', 'lo0')
############
############
+ Mocked _parse_tcpreplay_result(stdout, stderr, argv, results_dict)
~ mock_parse_tcpreplay_result
= Test mocked _parse_tcpreplay_result
from scapy.sendrecv import _parse_tcpreplay_result
stdout = """Actual: 1024 packets (198929 bytes) sent in 67.88 seconds.
Rated: 2930.6 bps, 0.02 Mbps, 15.09 pps
Statistics for network device: mon0
Attempted packets: 1024
Successful packets: 1024
Failed packets: 0
Retried packets (ENOBUFS): 0
Retried packets (EAGAIN): 0"""
stderr = """Warning in sendpacket.c:sendpacket_open_pf() line 669:
Unsupported physical layer type 0x0323 on mon0. Maybe it works, maybe it won't. See tickets #123/318
sending out mon0
processing file: replay-example.pcap"""
argv = ['tcpreplay', '--intf1=mon0', '--multiplier=1.00', '--timer=nano', 'replay-example.pcap']
results_dict = _parse_tcpreplay_result(stdout, stderr, argv)
results_dict
assert results_dict["packets"] == 1024
assert results_dict["bytes"] == 198929
assert results_dict["time"] == 67.88
assert results_dict["bps"] == 2930.6
assert results_dict["mbps"] == 0.02
assert results_dict["pps"] == 15.09
assert results_dict["attempted"] == 1024
assert results_dict["successful"] == 1024
assert results_dict["failed"] == 0
assert results_dict["retried_enobufs"] == 0
assert results_dict["retried_eagain"] == 0
assert results_dict["command"] == " ".join(argv)
assert len(results_dict["warnings"]) == 3
= Test more recent version with flows
data = """Actual: 1 packets (42 bytes) sent in 0.000278 seconds
Rated: 151079.1 Bps, 1.20 Mbps, 3597.12 pps
Flows: 1 flows, 3597.12 fps, 1 flow packets, 0 non-flow
Statistics for network device: enp0s3
Successful packets: 1
Failed packets: 0
Truncated packets: 0
Retried packets (ENOBUFS): 0
Retried packets (EAGAIN): 0
"""
results_dict = _parse_tcpreplay_result(data, "", [])
results_dict
expected = {
'bps': 151079.1,
'bytes': 42,
'command': '',
'failed': 0,
'flow_packets': 1,
'flows': 1,
'fps': 3597.12,
'mbps': 1.2,
'non_flow': 0,
'packets': 1,
'pps': 3597.12,
'retried_eagain': 0,
'retried_enobufs': 0,
'successful': 1,
'time': 0.000278,
'truncated': 0,
'warnings': []
}
assert results_dict == expected
############
############
+ Mocked read_routes6() calls
= Preliminary definitions
~ mock_read_routes_bsd
import mock
from io import StringIO
def valid_output_read_routes6(routes):
""""Return True if 'routes' contains correctly formatted entries, False otherwise"""
for destination, plen, next_hop, dev, cset, me in routes:
if not in6_isvalid(destination) or not type(plen) == int:
return False
if not in6_isvalid(next_hop) or not isinstance(dev, str):
return False
for address in cset:
if not in6_isvalid(address):
return False
return True
def check_mandatory_ipv6_routes(routes6):
"""Ensure that mandatory IPv6 routes are present"""
if sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) < 1:
return False
if sum(1 for r in routes6 if r[0] == "fe80::" and r[1] == 64) < 1:
return False
if sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128 and \
r[4] == ["::1"]) < 1:
return False
return True
= Mac OS X 10.9.5
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.unix.in6_getifaddr")
@mock.patch("scapy.arch.unix.os")
def test_osx_10_9_5(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on OS X 10.9.5"""
# 'netstat -rn -f inet6' output
netstat_output = u"""
Routing tables
Internet6:
Destination Gateway Flags Netif Expire
::1 ::1 UHL lo0
fe80::%lo0/64 fe80::1%lo0 UcI lo0
fe80::1%lo0 link#1 UHLI lo0
fe80::%en0/64 link#4 UCI en0
fe80::ba26:6cff:fe5f:4eee%en0 b8:26:6c:5f:4e:ee UHLWIi en0
fe80::bae8:56ff:fe45:8ce6%en0 b8:e8:56:45:8c:e6 UHLI lo0
ff01::%lo0/32 ::1 UmCI lo0
ff01::%en0/32 link#4 UmCI en0
ff02::%lo0/32 ::1 UmCI lo0
ff02::%en0/32 link#4 UmCI en0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")]
# Test the function
from scapy.arch.unix import read_routes6
scapy.arch.unix.DARWIN = False
scapy.arch.unix.FREEBSD = True
scapy.arch.unix.NETBSD = False
scapy.arch.unix.OPENBSD = False
routes = read_routes6()
for r in routes:
print(r)
assert len(routes) == 6
assert check_mandatory_ipv6_routes(routes)
test_osx_10_9_5()
= Mac OS X 10.9.5 with global IPv6 connectivity
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.unix.in6_getifaddr")
@mock.patch("scapy.arch.unix.os")
def test_osx_10_9_5_global(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on OS X 10.9.5 with an IPv6 connectivity"""
# 'netstat -rn -f inet6' output
netstat_output = u"""
Routing tables
Internet6:
Destination Gateway Flags Netif Expire
default fe80::ba26:8aff:fe5f:4eef%en0 UGc en0
::1 ::1 UHL lo0
2a01:ab09:7d:1f01::/64 link#4 UC en0
2a01:ab09:7d:1f01:420:205c:9fab:5be7 b8:e9:55:44:7c:e5 UHL lo0
2a01:ab09:7d:1f01:ba26:8aff:fe5f:4eef b8:26:8a:5f:4e:ef UHLWI en0
2a01:ab09:7d:1f01:bae9:55ff:fe44:7ce5 b8:e9:55:44:7c:e5 UHL lo0
fe80::%lo0/64 fe80::1%lo0 UcI lo0
fe80::1%lo0 link#1 UHLI lo0
fe80::%en0/64 link#4 UCI en0
fe80::5664:d9ff:fe79:4e00%en0 54:64:d9:79:4e:0 UHLWI en0
fe80::6ead:f8ff:fe74:945a%en0 6c:ad:f8:74:94:5a UHLWI en0
fe80::a2f3:c1ff:fec4:5b50%en0 a0:f3:c1:c4:5b:50 UHLWI en0
fe80::ba26:8aff:fe5f:4eef%en0 b8:26:8a:5f:4e:ef UHLWIir en0
fe80::bae9:55ff:fe44:7ce5%en0 b8:e9:55:44:7c:e5 UHLI lo0
ff01::%lo0/32 ::1 UmCI lo0
ff01::%en0/32 link#4 UmCI en0
ff02::%lo0/32 ::1 UmCI lo
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")]
# Test the function
from scapy.arch.unix import read_routes6
routes = read_routes6()
print(routes)
assert valid_output_read_routes6(routes)
for r in routes:
print(r)
assert len(routes) == 11
assert check_mandatory_ipv6_routes(routes)
test_osx_10_9_5_global()
= Mac OS X 10.10.4
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.unix.in6_getifaddr")
@mock.patch("scapy.arch.unix.os")
def test_osx_10_10_4(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on OS X 10.10.4"""
# 'netstat -rn -f inet6' output
netstat_output = u"""
Routing tables
Internet6:
Destination Gateway Flags Netif Expire
::1 ::1 UHL lo0
fe80::%lo0/64 fe80::1%lo0 UcI lo0
fe80::1%lo0 link#1 UHLI lo0
fe80::%en0/64 link#4 UCI en0
fe80::a00:27ff:fe9b:c965%en0 8:0:27:9b:c9:65 UHLI lo0
ff01::%lo0/32 ::1 UmCI lo0
ff01::%en0/32 link#4 UmCI en0
ff02::%lo0/32 ::1 UmCI lo0
ff02::%en0/32 link#4 UmCI en0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
("fe80::a00:27ff:fe9b:c965", IPV6_ADDR_LINKLOCAL, "en0")]
# Test the function
from scapy.arch.unix import read_routes6
routes = read_routes6()
for r in routes:
print(r)
assert len(routes) == 5
assert check_mandatory_ipv6_routes(routes)
test_osx_10_10_4()
= FreeBSD 10.2
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.unix.in6_getifaddr")
@mock.patch("scapy.arch.unix.os")
def test_freebsd_10_2(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on FreeBSD 10.2"""
# 'netstat -rn -f inet6' output
netstat_output = u"""
Routing tables
Internet6:
Destination Gateway Flags Netif Expire
::/96 ::1 UGRS lo0
::1 link#2 UH lo0
::ffff:0.0.0.0/96 ::1 UGRS lo0
fe80::/10 ::1 UGRS lo0
fe80::%lo0/64 link#2 U lo0
fe80::1%lo0 link#2 UHS lo0
ff01::%lo0/32 ::1 U lo0
ff02::/16 ::1 UGRS lo0
ff02::%lo0/32 ::1 U lo0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0")]
# Test the function
from scapy.arch.unix import read_routes6
routes = read_routes6()
scapy.arch.unix.DARWIN = False
scapy.arch.unix.FREEBSD = True
scapy.arch.unix.NETBSD = False
scapy.arch.unix.OPENBSD = False
for r in routes:
print(r)
assert len(routes) == 3
assert check_mandatory_ipv6_routes(routes)
test_freebsd_10_2()
= FreeBSD 13.0
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.get_if_addr")
@mock.patch("scapy.arch.unix.os")
def test_freebsd_13(mock_os, mock_get_if_addr):
"""Test read_routes() on FreeBSD 13"""
# 'netstat -rnW -f inet' output
netstat_output = u"""
Routing tables
Internet:
Destination Gateway Flags Nhop# Mtu Netif Expire
default 10.0.0.1 UGS 3 1500 vtnet0
10.0.0.0/24 link#1 U 2 1500 vtnet0
10.0.0.8 link#2 UHS 1 16384 lo0
127.0.0.1 link#2 UH 1 16384 lo0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked get_if_addr() behavior
def se_get_if_addr(iface):
"""Perform specific side effects"""
if iface == "vtnet0":
return "10.0.0.1"
return "1.2.3.4"
mock_get_if_addr.side_effect = se_get_if_addr
# Test the function
from scapy.arch.unix import read_routes
routes = read_routes()
scapy.arch.unix.DARWIN = False
scapy.arch.unix.FREEBSD = True
scapy.arch.unix.NETBSD = False
scapy.arch.unix.OPENBSD = False
for r in routes:
print(r)
assert r[3] in ["vtnet0", "lo0"]
assert len(routes) == 4
test_freebsd_13()
= OpenBSD 5.5
~ mock_read_routes_bsd
import mock
from io import StringIO
@mock.patch("scapy.arch.unix.OPENBSD")
@mock.patch("scapy.arch.unix.in6_getifaddr")
@mock.patch("scapy.arch.unix.os")
def test_openbsd_5_5(mock_os, mock_in6_getifaddr, mock_openbsd):
"""Test read_routes6() on OpenBSD 5.5"""
# 'netstat -rn -f inet6' output
netstat_output = u"""
Routing tables
Internet6:
Destination Gateway Flags Refs Use Mtu Prio Iface
::/104 ::1 UGRS 0 0 - 8 lo0
::/96 ::1 UGRS 0 0 - 8 lo0
::1 ::1 UH 14 0 33144 4 lo0
::127.0.0.0/104 ::1 UGRS 0 0 - 8 lo0
::224.0.0.0/100 ::1 UGRS 0 0 - 8 lo0
::255.0.0.0/104 ::1 UGRS 0 0 - 8 lo0
::ffff:0.0.0.0/96 ::1 UGRS 0 0 - 8 lo0
2002::/24 ::1 UGRS 0 0 - 8 lo0
2002:7f00::/24 ::1 UGRS 0 0 - 8 lo0
2002:e000::/20 ::1 UGRS 0 0 - 8 lo0
2002:ff00::/24 ::1 UGRS 0 0 - 8 lo0
fe80::/10 ::1 UGRS 0 0 - 8 lo0
fe80::%em0/64 link#1 UC 0 0 - 4 em0
fe80::a00:27ff:fe04:59bf%em0 08:00:27:04:59:bf UHL 0 0 - 4 lo0
fe80::%lo0/64 fe80::1%lo0 U 0 0 - 4 lo0
fe80::1%lo0 link#3 UHL 0 0 - 4 lo0
fec0::/10 ::1 UGRS 0 0 - 8 lo0
ff01::/16 ::1 UGRS 0 0 - 8 lo0
ff01::%em0/32 link#1 UC 0 0 - 4 em0
ff01::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0
ff02::/16 ::1 UGRS 0 0 - 8 lo0
ff02::%em0/32 link#1 UC 0 0 - 4 em0
ff02::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
("fe80::a00:27ff:fe04:59bf", IPV6_ADDR_LINKLOCAL, "em0")]
# Mocked OpenBSD parsing behavior
mock_openbsd = True
# Test the function
from scapy.arch.unix import read_routes6
routes = read_routes6()
for r in routes:
print(r)
assert len(routes) == 5
assert check_mandatory_ipv6_routes(routes)
test_openbsd_5_5()
= NetBSD 7.0
~ mock_read_routes_bsd
@mock.patch("scapy.arch.unix.NETBSD")
@mock.patch("scapy.arch.unix.in6_getifaddr")
@mock.patch("scapy.arch.unix.os")
def test_netbsd_7_0(mock_os, mock_in6_getifaddr, mock_netbsd):
"""Test read_routes6() on NetBSD 7.0"""
# 'netstat -rn -f inet6' output
netstat_output = u"""
Routing tables
Internet6:
Destination Gateway Flags Refs Use Mtu Interface
::/104 ::1 UGRS - - - lo0
::/96 ::1 UGRS - - - lo0
::1 ::1 UH - - 33648 lo0
::127.0.0.0/104 ::1 UGRS - - - lo0
::224.0.0.0/100 ::1 UGRS - - - lo0
::255.0.0.0/104 ::1 UGRS - - - lo0
::ffff:0.0.0.0/96 ::1 UGRS - - - lo0
2001:db8::/32 ::1 UGRS - - - lo0
2002::/24 ::1 UGRS - - - lo0
2002:7f00::/24 ::1 UGRS - - - lo0
2002:e000::/20 ::1 UGRS - - - lo0
2002:ff00::/24 ::1 UGRS - - - lo0
fe80::/10 ::1 UGRS - - - lo0
fe80::%wm0/64 link#1 UC - - - wm0
fe80::acd1:3989:180e:fde0 08:00:27:a1:64:d8 UHL - - - lo0
fe80::%lo0/64 fe80::1 U - - - lo0
fe80::1 link#2 UHL - - - lo0
ff01:1::/32 link#1 UC - - - wm0
ff01:2::/32 ::1 UC - - - lo0
ff02::%wm0/32 link#1 UC - - - wm0
ff02::%lo0/32 ::1 UC - - - lo0
"""
# Mocked file descriptor
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
("fe80::acd1:3989:180e:fde0", IPV6_ADDR_LINKLOCAL, "wm0")]
# Test the function
from scapy.arch.unix import read_routes6
routes = read_routes6()
for r in routes:
print(r)
assert len(routes) == 5
assert check_mandatory_ipv6_routes(routes)
test_netbsd_7_0()
############
############
+ Mocked route() calls
= Mocked IPv4 routes calls
import scapy
conf.ifaces._add_fake_iface("enp3s0")
conf.ifaces._add_fake_iface("lo")
old_iface = conf.iface
old_loopback = conf.loopback_name
try:
conf.iface = 'enp3s0'
conf.loopback_name = 'lo'
conf.route.invalidate_cache()
conf.route.routes = [
(4294967295, 4294967295, '0.0.0.0', 'wlan0', '', 281),
(4294967295, 4294967295, '0.0.0.0', 'lo', '', 291),
(4294967295, 4294967295, '0.0.0.0', 'enp3s0', '192.168.0.119', 281),
(3758096384, 4026531840, '0.0.0.0', 'lo', '', 291),
(3758096384, 4026531840, '0.0.0.0', 'wlan0', '', 281),
(3758096384, 4026531840, '0.0.0.0', 'enp3s0', '1.1.1.1', 281),
(3232235775, 4294967295, '0.0.0.0', 'enp3s0', '2.2.2.2', 281),
(3232235639, 4294967295, '0.0.0.0', 'enp3s0', '3.3.3.3', 281),
(3232235520, 4294967040, '0.0.0.0', 'enp3s0', '4.4.4.4', 281),
(0, 0, '192.168.0.254', 'enp3s0', '192.168.0.119', 25)
]
assert conf.route.route("192.168.0.0-10") == ('enp3s0', '4.4.4.4', '0.0.0.0')
assert conf.route.route("192.168.0.119") == ('lo', '192.168.0.119', '0.0.0.0')
assert conf.route.route("224.0.0.0") == ('enp3s0', '1.1.1.1', '0.0.0.0')
assert conf.route.route("255.255.255.255") == ('enp3s0', '192.168.0.119', '0.0.0.0')
assert conf.route.route("*") == ('enp3s0', '192.168.0.119', '192.168.0.254')
finally:
conf.loopback_name = old_loopback
conf.iface = old_iface
conf.route.resync()
conf.ifaces.reload()
= Mocked IPv6 routes calls
conf.ifaces._add_fake_iface("enp3s0")
conf.ifaces._add_fake_iface("lo")
old_iface = conf.iface
old_loopback = conf.loopback_name
try:
conf.route6.ipv6_ifaces = set(['enp3s0', 'wlan0', 'lo'])
conf.iface = 'enp3s0'
conf.loopback_name = 'lo'
conf.route6.invalidate_cache()
conf.route6.routes = [
('fe80::dd17:1fa6:a123:ab4', 128, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291),
('fe80::7101:5678:1234:da65', 128, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281),
('fe80::1f:ae12:4d2c:abff', 128, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281),
('fe80::', 64, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281),
('fe80::', 64, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291),
('fe80::', 64, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281),
('2a01:e35:1e06:ab56:7010:6548:9646:fa77', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
('2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
('2a01:e35:1e06:ab56::', 64, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
('::', 0, 'fe80::160c:64aa:ef6f:fe14', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281)
]
assert conf.route6.route("2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', '::')
assert conf.route6.route("::1") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', 'fe80::160c:64aa:ef6f:fe14')
assert conf.route6.route("ff02::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::')
assert conf.route6.route("fe80::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::')
assert conf.route6.route("fe80::1", dev='lo') == ('lo', 'fe80::dd17:1fa6:a123:ab4', '::')
finally:
conf.loopback_name = old_loopback
conf.iface = old_iface
conf.route6.resync()
conf.ifaces.reload()
= Find a link-local address when conf.iface does not support IPv6
old_iface = conf.iface
conf.route6.ipv6_ifaces = set(['eth1', 'lo'])
conf.iface = "eth0"
conf.route6.routes = [("fe80::", 64, "::", "eth1", ["fe80::a00:28ff:fe07:1980"], 256), ("::1", 128, "::", "lo", ["::1"], 0), ("fe80::a00:28ff:fe07:1980", 128, "::", "lo", ["::1"], 0)]
assert conf.route6.route("fe80::2807") == ("eth1", "fe80::a00:28ff:fe07:1980", "::")
conf.iface = old_iface
conf.route6.resync()
= Windows: reset routes properly
if WINDOWS:
from scapy.arch.windows import _route_add_loopback
_route_add_loopback()
############
############
############
+ Tests of StreamSocket
= Test with DNS over TCP
~ netaccess
import socket
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sck.connect(("8.8.8.8", 53))
class DNSTCP(Packet):
name = "DNS over TCP"
fields_desc = [ FieldLenField("len", None, fmt="!H", length_of="dns"),
PacketLenField("dns", 0, DNS, length_from=lambda p: p.len)]
ssck = StreamSocket(sck, DNSTCP)
r = ssck.sr1(DNSTCP(dns=DNS(rd=1, qd=DNSQR(qname="www.example.com"))), timeout=3)
sck.close()
assert DNSTCP in r and len(r.dns.an)
############
+ Tests of SSLStreamContext
= Test with recv() calls that return exact packet-length rawings
~ sslraweamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x03' ]
def recv(self, x):
if len(self.l) == 0:
raise socket.error(100, 'EOF')
return self.l.pop(0)
def fileno(self):
return -1
def close(self):
return
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
try:
ss.recv()
ret = False
except socket.error:
ret = True
assert ret
= Test with recv() calls that return twice as much data as the exact packet-length
~ sslraweamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ b'\x00\x00\x00\x01\x00\x00\x00\x02', b'\x00\x00\x00\x03\x00\x00\x00\x04' ]
def recv(self, x):
if len(self.l) == 0:
raise socket.error(100, 'EOF')
return self.l.pop(0)
def fileno(self):
return -1
def close(self):
return
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
p = ss.recv()
assert p.data == 4
try:
ss.recv()
ret = False
except socket.error:
ret = True
assert ret
= Test with recv() calls that return not enough data
~ sslraweamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ b'\x00\x00', b'\x00\x01', b'\x00\x00\x00', b'\x02', b'\x00\x00', b'\x00', b'\x03' ]
def recv(self, x):
if len(self.l) == 0:
raise socket.error(100, 'EOF')
return self.l.pop(0)
def fileno(self):
return -1
def close(self):
return
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
try:
p = ss.recv()
ret = False
except:
ret = True
assert ret
p = ss.recv()
assert p.data == 1
try:
p = ss.recv()
ret = False
except:
ret = True
assert ret
p = ss.recv()
assert p.data == 2
try:
p = ss.recv()
ret = False
except:
ret = True
assert ret
try:
p = ss.recv()
ret = False
except:
ret = True
assert ret
p = ss.recv()
assert p.data == 3
############
############
+ Test correct conversion from binary to rawing of IPv6 addresses
= IPv6 bin to rawing conversion
from scapy.pton_ntop import _inet6_ntop, inet_ntop
import socket
for binfrm, address in [
(b'\x00' * 16, '::'),
(b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
'1111:2222:3333:4444:5555:6666:7777:8888'),
(b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00',
'1111:2222:3333:4444:5555::'),
(b'\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
'::4444:5555:6666:7777:8888'),
(b'\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88',
'0:0:3333:4444::8888'),
(b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
'1::'),
(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
'::1'),
(b'\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88',
'1111::4444:0:0:7777:8888'),
(b'\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00',
'1000:200:30:4:5:60:700:8000'),
]:
addr1 = inet_ntop(socket.AF_INET6, binfrm)
addr2 = _inet6_ntop(binfrm)
assert address == addr1 == addr2
= IPv6 bin to rawing conversion - Zero-block of length 1
binfrm = b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88'
addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm)
# On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and
# shortens the single zero block to '::'. This is a valid IPv6 address
# representation anyway.
assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888',
'1111:2222:3333:4444:5555:6666::8888'])
assert addr2 == '1111:2222:3333:4444:5555:6666:0:8888'
= IPv6 bin to rawing conversion - Illegal sizes
for binfrm in ["\x00" * 15, b"\x00" * 17]:
rc = False
try:
inet_ntop(socket.AF_INET6, binfrm)
except Exception as exc1:
_exc1 = exc1
rc = True
assert rc
try:
_inet6_ntop(binfrm)
except Exception as exc2:
rc = isinstance(exc2, type(_exc1))
assert rc
############
############
+ Addresses generators
= Net
assert list(Net("192.168.0.0/31")) == ["192.168.0.0", "192.168.0.1"]
assert "1.2.3.4" in Net("0.0.0.0/0")
assert "192.168.0.0/25" in Net("192.168.0.0/24")
assert "192.168.0.0/23" not in Net("192.168.0.0/24")
assert "0.0.0.0/1" in Net("0.0.0.0/0")
assert "0.0.0.0/0" not in Net("0.0.0.0/1")
assert Net("1.2.3.0/24") == Net("1.2.3.0", "1.2.3.255")
assert hash(Net("1.2.3.0/24")) == hash(Net("1.2.3.0", "1.2.3.255"))
= Net using name
~ netaccess
ip = IP(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net)
ip.show()
= Multiple IP addresses test
~ netaccess
ip = IP(dst=['192.168.0.1', 'www.google.fr'],ihl=(1,5))
assert ip.dst[0] == '192.168.0.1'
assert isinstance(ip.dst[1], Net)
src = ip.src
assert src
assert isinstance(src, str)
= OID
oid = OID("1.2.3.4.5.6-8")
sum(1 for o in oid) == 3
assert oid.__iterlen__() == 3
= Net6
n1 = Net6("2001:db8::/127")
assert len(list(n1)) == 2
assert len(n1) == 2
n2 = Net6("fec0::/110")
assert len(n2) == 262144
assert "ffff::ffff" in Net6("::/0")
assert "::/1" in Net6("::/0")
assert "::/0" not in Net6("::/1")
assert Net6("::/120") == Net6("::", "::ff")
assert hash(Net6("::/120")) == hash(Net6("::", "::ff"))
assert Net6("::1.2.3.0/120") == Net6("::1.2.3.0", "::1.2.3.255")
assert hash(Net6("::1.2.3.0/120")) == hash(Net6("::1.2.3.0", "::1.2.3.255"))
assert Net6("::1.2.3.0/120") != Net("1.2.3.0/24")
assert hash(Net6("::1.2.3.0/120")) != hash(Net("1.2.3.0/24"))
= Net6 using web address
~ netaccess ipv6
ip = IPv6(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net6)
assert "www.google.com" in repr(n1)
ip.show()
ip = IPv6(dst="www.yahoo.com")
assert IPv6(raw(ip)).dst == [p.dst for p in ip][0]
= Multiple IPv6 addresses test
~ netaccess ipv6
ip = IPv6(dst=['2001:db8::1', 'www.google.fr'],hlim=(1,5))
assert ip.dst[0] == '2001:db8::1'
assert isinstance(ip.dst[1], Net6)
src = ip.src
assert src
assert isinstance(src, str)
= Test repr on Net
~ netaccess
conf.color_theme = BlackAndWhite()
output = repr(IP(src="www.google.com"))
assert 'Net("www.google.com/32")' in output
= Test repr on Net
~ netaccess ipv6
conf.color_theme = BlackAndWhite()
assert 'Net6("www.google.com/128")' in repr(IPv6(src="www.google.com"))
############
############
+ IPv6 helpers
= in6_getLocalUniquePrefix()
p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")
= Misc addresses manipulation functions
teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)
ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"
in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True
in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True
a = inet_pton(socket.AF_INET6, "2001:db8::2807")
in6_xor(a, a) == b"\x00" * 16
a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a))
r == "ff02::1:ffd4:e5f6"
in6_isllsnmaddr(r) == True
in6_isdocaddr("2001:db8::2807") == True
in6_isaddrllallnodes("ff02::1") == True
in6_isaddrllallservers("ff02::2") == True
= in6_getscope()
assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK
= construct_source_candidate_set()
dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")]
assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"]
assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"]
= inet_pton()
from scapy.pton_ntop import _inet6_pton, inet_pton
import socket
ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
"fe80:1234:abcd::192.168.40.12:abcd",
"fe80:1234:abcd::192.168.40",
"fe80:1234:abcd::192.168.400.12",
"1234:5678:9abc:def0:1234:5678:9abc:def0:",
"1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
rc = False
exc1 = None
try:
res1 = inet_pton(socket.AF_INET6, ip6)
except Exception as e:
rc = True
exc1 = e
assert rc
rc = False
try:
res2 = _inet6_pton(ip6)
except Exception as exc2:
rc = isinstance(exc2, type(exc1))
assert rc
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
("fe80:1234:abcd::fe06",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
("fe80::2e67:ef2d:7ece:ed8a",
b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
("::ffff",
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
("ffff::",
b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
res1 = inet_pton(socket.AF_INET6, ip6)
res2 = _inet6_pton(ip6)
assert res == res1 == res2
############
############
+ Test Route class
= make_route()
r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0')
tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0')
= add() & delt()
r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4 + 1
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4
= ifchange()
r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][4] == "5.6.7.8"
= ifdel()
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
= ifadd() & get_if_bcast()
r4 = Route()
len_r4 = len(r4.routes)
r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
len(r4.routes) == len_r4 +1
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
dummy_interface = get_dummy_interface()
bck_conf_route_routes = conf.route.routes
conf.route.routes = [
(0, 0, '172.21.230.1', dummy_interface, '172.21.230.10', 1), # 0.0.0.0 / 0.0.0.0 == 255.255.255.255
(2851995648, 4294901760, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 169.254.0.0 / 255.255.0.0 == 169.254.255.255
(2887116288, 4294967040, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.0 / 255.255.255.0 == 172.21.230.255
(2887116289, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.1 / 255.255.255.255 == 172.21.230.1
(3758096384, 4026531840, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.0 / 240.0.0.0 == 239.255.255.255
(3758096635, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.251 / 255.255.255.255 == 224.0.0.251
(4294967295, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 255.255.255.255 / 255.255.255.255 == 255.255.255.255
]
assert sorted(conf.route.get_if_bcast(dummy_interface)) == sorted(['169.254.255.255', '172.21.230.255', '239.255.255.255'])
conf.route.routes = bck_conf_route_routes
= Remove dummy interface
conf.ifaces.reload()
############
############
+ Flags
= IP flags
~ IP
pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'
assert len({IP().flags, IP().flags}) == 1
pkt = IP()
pkt.flags = ""
assert pkt.flags == 0
= TCP flags
~ TCP
pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert pkt.flags.SA
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert pkt.flags.AU
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')
pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert pkt.flags.PAU
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert pkt.flags.SAU
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'
= Flag values mutation with .raw_packet_cache
~ IP TCP
pkt = IP(raw(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(raw(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'
= Operations on flag values
~ TCP
p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')
assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags
############
############
+ 802.3
= Test detection
assert isinstance(Dot3(raw(Ether())),Ether)
assert isinstance(Ether(raw(Dot3())),Dot3)
a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00')
assert isinstance(a,Dot3)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00')
assert isinstance(a,Ether)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
############
############
+ ASN.1
= MIB
~ mib
import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, b"-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)
load_mib(fname)
assert sum(1 for k in conf.mib.d.values() if "scapy" in k) == 1
assert sum(1 for oid in conf.mib) > 100
= MIB - graph
~ mib
import mock
@mock.patch("scapy.asn1.mib.do_graph")
def get_mib_graph(do_graph):
def store_graph(graph, **kargs):
assert graph.startswith("""digraph "mib" {""")
assert """"test.2807" [ label="scapy" ];""" in graph
do_graph.side_effect = store_graph
conf.mib._make_graph()
get_mib_graph()
= MIB - test aliases
~ mib
# https://github.com/secdev/scapy/issues/2542
assert conf.mib._oidname("2.5.29.19") == "basicConstraints"
= DADict tests
a = DADict("test")
a[0] = "test_value1"
a["scapy"] = "test_value2"
assert a.test_value1 == 0
assert a.test_value2 == "scapy"
with ContextManagerCaptureOutput() as cmco:
a._show()
outp = cmco.get_output()
assert "scapy = 'test_value2'" in outp
assert "0 = 'test_value1'" in outp
= Test ETHER_TYPES
assert ETHER_TYPES.IPv4 == 2048
try:
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
ETHER_TYPES["BAOBAB"] = 0xffff
assert ETHER_TYPES.BAOBAB == 0xffff
assert issubclass(w[-1].category, DeprecationWarning)
except DeprecationWarning:
# -Werror is used
pass
= BER tests
BER_id_enc(42) == '*'
BER_id_enc(2807) == b'\xbfw'
b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
r1 == b'@\x04\x08\x08\x08\x08'
r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'
a = b'\x1f\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\x01\x01\x00C\x02\x01U0\x0f0\r\x06\x08+\x06\x01\x02\x01\x02\x01\x00\x02\x01!'
ret = False
try:
BERcodec_Object.check_type(a)
except BER_BadTag_Decoding_Error:
ret = True
else:
ret = False
assert ret
= BER trigger failures
try:
BERcodec_INTEGER.do_dec(b"\x02\x01")
assert False
except BER_Decoding_Error:
pass
############
############
+ Fields
= FieldLenField with BitField
class Test(Packet):
name = "Test"
fields_desc = [
FieldLenField("BitCount", None, fmt="H", count_of="Values"),
FieldLenField("ByteCount", None, fmt="B", length_of="Values"),
FieldListField("Values", [], BitField("data", 0x0, size=1),
count_from=lambda pkt: pkt.BitCount),
]
pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1])))
assert pkt.BitCount == 8
assert pkt.ByteCount == 1
= PacketListField
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [ PacketListField('list', [], 0) ]
a = TestPacket()
a.list.append(1)
assert len(a.list) == 1
b = TestPacket()
assert len(b.list) == 0
= Test PacketListField deepcopy
class SubPacket(Packet):
name = "SubPacket"
fields_desc = [
ByteField("mem", 1),
]
class TestPacket(Packet):
name = "TestPacket"
fields_desc = [
PacketListField("packlist", SubPacket(), SubPacket),
]
a = TestPacket()
b = a.copy()
fuzz(b)
assert a.packlist[0].mem == 1
= PacketField
class InnerPacket(Packet):
fields_desc = [ StrField("f_name", "test") ]
class TestPacket(Packet):
fields_desc = [ PacketField("inner", InnerPacket(), InnerPacket) ]
p = TestPacket()
print(p.inner.f_name)
assert p.inner.f_name == b"test"
p = TestPacket()
p.inner.f_name = b"scapy"
assert p.inner.f_name == b"scapy"
p = TestPacket()
assert p.inner.f_name == b"test"
+ UUIDField
= Parsing a human-readable UUID
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef')
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')
= Parsing a machine-encoded UUID
f = UUIDField('f', bytearray.fromhex('0123456789abcdef0123456789abcdef'))
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')
= Parsing a tuple of values
f = UUIDField('f', (0x01234567, 0x89ab, 0xcdef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef))
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')
= Handle None values
f = UUIDField('f', None)
f.addfield(None, b'', f.default) == hex_bytes('00000000000000000000000000000000')
= Get a UUID for dissection
from uuid import UUID
f = UUIDField('f', None)
f.getfield(None, bytearray.fromhex('0123456789abcdef0123456789abcdef01')) == (b'\x01', UUID('01234567-89ab-cdef-0123-456789abcdef'))
= Verify little endian UUIDField
* The endianness of a UUIDField should be apply by block on each block in parenthesis '(01234567)-(89ab)-(cdef)-(01)(23)-(45)(67)(89)(ab)(cd)(ef)'
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_LE)
f.addfield(None, b'', f.default) == hex_bytes('67452301ab89efcd0123456789abcdef')
= Verify reversed UUIDField
* This should reverse the entire value as 128-bits
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_REV)
f.addfield(None, b'', f.default) == hex_bytes('efcdab8967452301efcdab8967452301')
+ RandUUID
= RandUUID setup
RANDUUID_TEMPLATE = '01234567-89ab-*-01*-*****ef'
RANDUUID_FIXED = uuid.uuid4()
= RandUUID default behaviour
ru = RandUUID()
assert ru._fix().version == 4
assert ru.command() == "RandUUID()"
= RandUUID incorrect implicit args
assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(namespace=uuid.uuid4()))
= RandUUID v4 UUID (correct args)
u = RandUUID(version=4)._fix()
assert u.version == 4
u2 = RandUUID(version=4)._fix()
assert u2.version == 4
assert str(u) != str(u2)
= RandUUID v4 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=4, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=4, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=4, clock_seq=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=4, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(version=4, name="scapy"))
= RandUUID v1 UUID
u = RandUUID(version=1)._fix()
assert u.version in [1, 4]
u = RandUUID(version=1, node=0x1234)._fix()
assert u.version == 1
assert u.node == 0x1234
u = RandUUID(version=1, clock_seq=0x1234)._fix()
assert u.version == 1
assert u.clock_seq == 0x1234
ru = RandUUID(version=1, node=0x1234, clock_seq=0x1bcd)
assert ru.command() == "RandUUID(node=4660, clock_seq=7117, version=1)"
u = ru._fix()
assert u.version == 1
assert u.node == 0x1234
assert u.clock_seq == 0x1bcd
= RandUUID v1 UUID (implicit version)
u = RandUUID(node=0x1234)._fix()
assert u.version == 1
assert u.node == 0x1234
u = RandUUID(clock_seq=0x1234)._fix()
assert u.version == 1
assert u.clock_seq == 0x1234
u = RandUUID(node=0x1234, clock_seq=0x1bcd)._fix()
assert u.version == 1
assert u.node == 0x1234
assert u.clock_seq == 0x1bcd
= RandUUID v1 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=1, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=1, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(version=1, name="scapy"))
= RandUUID v5 UUID
ru = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")
u = ru._fix()
assert u.version == 5
assert ru.command() == "RandUUID(namespace=%r, name='scapy', version=5)" % RANDUUID_FIXED
u2 = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u2.version == 5
assert u.bytes == u2.bytes
# implicit v5
u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.bytes == u2.bytes
= RandUUID v5 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234))
= RandUUID v3 UUID
u = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.version == 3
u2 = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u2.version == 3
assert u.bytes == u2.bytes
# implicit v5
u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.bytes != u2.bytes
= RandUUID v3 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234))
= RandUUID looks like a UUID with str
assert re.match(r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', str(RandUUID()), re.I) is not None
= RandUUID with a static part
* RandUUID template can contain static part such a 01234567-89ab-*-01*-*****ef
ru = RandUUID('01234567-89ab-*-01*-*****ef')
assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{10}ef', str(ru), re.I) is not None
assert ru.command() == "RandUUID(template='01234567-89ab-*-01*-*****ef')"
= RandUUID with a range part
* RandUUID template can contain a part with a range of values such a 01234567-89ab-*-01*-****c0:c9ef
assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{8}c[0-9]ef', str(RandUUID('01234567-89ab-*-01*-****c0:c9ef')), re.I) is not None
############
############
+ MPLS tests
= MPLS - build/dissection
from scapy.contrib.mpls import EoMCW, MPLS
p1 = MPLS()/IP()/UDP()
assert p1[MPLS].s == 1
p2 = MPLS()/MPLS()/IP()/UDP()
assert p2[MPLS].s == 0
p1[MPLS]
p1[IP]
p2[MPLS]
p2[MPLS:1]
p2[IP]
= MPLS encapsulated Ethernet with CW - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=1)/EoMCW(seq=1234)
p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP()
p = Ether(raw(p))
assert p[EoMCW].zero == 0
assert p[EoMCW].reserved == 0
assert p[EoMCW].seq == 1234
= MPLS encapsulated Ethernet without CW - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=2)/MPLS(label=1)
p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP()
p = Ether(raw(p))
assert p[Ether:2].type == 0x0800
try:
p[EoMCW]
except IndexError:
ret = True
else:
ret = False
assert ret
assert p[Ether:2].type == 0x0800
= MPLS encapsulated IP - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=1)/IP()
p = Ether(raw(p))
try:
p[EoMCW]
except IndexError:
ret = True
else:
ret = False
assert ret
try:
p[Ether:2]
except IndexError:
ret = True
else:
ret = False
assert ret
p[IP]
############
############
+ PacketList methods
= sr()
class Req(Packet):
fields_desc = [
ByteField("raw", 0)
]
def answers(self, other):
return False
class Res(Packet):
fields_desc = [
ByteField("raw", 0)
]
def answers(self, other):
return other.__class__ == Req and other.raw == self.raw
pl = PacketList([Req(b"1"), Res(b"1"), Req(b"2"), Req(b"3"), Req(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"4")])
srl, rl = pl.sr()
assert len(srl) == 3
assert len(rl) == 3
srl, rl = pl.sr(lookahead=1)
assert len(srl) == 1
assert len(rl) == 7
srl, rl = pl.sr(lookahead=2)
assert len(srl) == 2
assert len(rl) == 5
srl, rl = pl.sr(lookahead=3)
assert len(srl) == 3
assert len(rl) == 3
pl = PacketList([Req(b"\x05"), Res(b"1"), Res(b"2"), Res(b"3"), Res(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"\x05")])
srl, rl = pl.sr(lookahead=3)
assert len(srl) == 0
assert len(rl) == 9
srl, rl = pl.sr(lookahead=7)
assert len(srl) == 0
assert len(rl) == 9
srl, rl = pl.sr(lookahead=8)
assert len(srl) == 1
assert len(rl) == 7
srl, rl = pl.sr(lookahead=0)
assert len(srl) == 1
assert len(rl) == 7
srl, rl = pl.sr(lookahead=None)
assert len(srl) == 1
assert len(rl) == 7
= pickle test
import pickle
import io
srl, rl = PacketList([Raw(b"1"), Raw(b"1"), Raw(b"2"), Raw(b"3"), Raw(b"4"), Raw(b"3"), Raw(b"1"), Raw(b"1"), Raw(b"4")]).sr()
assert len(srl) == 4
f = io.BytesIO()
pickle.dump(srl, f)
unp = pickle.loads(f.getvalue())
assert len(unp) == len(srl)
assert all(bytes(a[0]) == bytes(b[0]) for a, b in zip(unp, srl))
= plot()
import mock
import scapy.libs.matplot
@mock.patch("scapy.libs.matplot.plt")
def test_plot(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
plist = PacketList([IP(id=i)/TCP() for i in range(10)])
lines = plist.plot(lambda p: (p.time, p.id))
assert len(lines) == 10
test_plot()
= diffplot()
import mock
import scapy.libs.matplot
@mock.patch("scapy.libs.matplot.plt")
def test_diffplot(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
plist = PacketList([IP(id=i)/TCP() for i in range(10)])
lines = plist.diffplot(lambda x,y: (x.time, y.id-x.id))
assert len(lines) == 9
test_diffplot()
= multiplot()
import mock
import scapy.libs.matplot
@mock.patch("scapy.libs.matplot.plt")
def test_multiplot(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
tmp = [IP(id=i)/TCP() for i in range(10)]
plist = PacketList([tuple(tmp[i-2:i]) for i in range(2, 10, 2)])
lines = plist.multiplot(lambda x, y: (y[IP].src, (y.time, y[IP].id)))
assert len(lines) == 1
assert len(lines[0]) == 4
test_multiplot()
= rawhexdump()
def test_rawhexdump():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/TCP() for i in range(2)])
p.rawhexdump()
result_pl_rawhexdump = cmco.get_output()
assert len(result_pl_rawhexdump.split('\n')) == 7
assert result_pl_rawhexdump.startswith("0000 45 00 00 28")
test_rawhexdump()
= hexraw()
def test_hexraw():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/Raw(str(i)) for i in range(2)])
p.hexraw()
result_pl_hexraw = cmco.get_output()
assert len(result_pl_hexraw.split('\n')) == 5
assert "0000 30" in result_pl_hexraw
test_hexraw()
= hexdump()
def test_hexdump():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/Raw(str(i)) for i in range(2)])
p.hexdump()
result_pl_hexdump = cmco.get_output()
assert len(result_pl_hexdump.split('\n')) == 7
assert "0010 7F 00 00 01 31" in result_pl_hexdump
test_hexdump()
= import_hexcap()
@mock.patch("scapy.utils.input")
def test_import_hexcap(mock_input):
data = """
0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E.
0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|.......
0020 00 01 08 00 F7 FF 00 00 00 00 ..........
"""[1:].split("\n")
lines = iter(data)
mock_input.side_effect = lambda: next(lines)
return import_hexcap()
pkt = test_import_hexcap()
pkt = Ether(pkt)
assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff"
assert pkt[IP].dst == "127.0.0.1"
assert ICMP in pkt
= import_hexcap(input_string)
data = """
0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E.
0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|.......
0020 00 01 08 00 F7 FF 00 00 00 00 ..........
"""[1:]
pkt = import_hexcap(data)
pkt = Ether(pkt)
assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff"
assert pkt[IP].dst == "127.0.0.1"
assert ICMP in pkt
= padding()
def test_padding():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/conf.padding_layer(str(i)) for i in range(2)])
p.padding()
result_pl_padding = cmco.get_output()
assert len(result_pl_padding.split('\n')) == 5
assert "0000 30" in result_pl_padding
test_padding()
= nzpadding()
def test_nzpadding():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/conf.padding_layer("AB"), IP()/conf.padding_layer("\x00\x00")])
p.nzpadding()
result_pl_nzpadding = cmco.get_output()
assert len(result_pl_nzpadding.split('\n')) == 3
assert "0000 41 42" in result_pl_nzpadding
test_nzpadding()
= conversations()
import mock
@mock.patch("scapy.plist.do_graph")
def test_conversations(mock_do_graph):
def fake_do_graph(graph, **kwargs):
return graph
mock_do_graph.side_effect = fake_do_graph
plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)])
plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)])
plist.extend([IPv6(dst="::2", src="::1")/TCP(sport=i) for i in range(2)])
plist.extend([IPv6(src="::2", dst="::1")/TCP(sport=i) for i in range(2)])
plist.extend([Ether()/ARP(pdst="127.0.0.1")])
result_conversations = plist.conversations()
assert len(result_conversations.split('\n')) == 8
assert result_conversations.startswith('digraph "conv" {')
assert "127.0.0.1" in result_conversations
assert "::1" in result_conversations
test_conversations()
= sessions()
pl = PacketList([Ether()/IPv6()/ICMPv6EchoRequest(), Ether()/IPv6()/IPv6()])
pl.extend([Ether()/IP()/IP(), Ether()/ARP()])
pl.extend([Ether()/Ether()/IP()])
assert len(pl.sessions().keys()) == 5
= afterglow()
import mock
@mock.patch("scapy.plist.do_graph")
def test_afterglow(mock_do_graph):
def fake_do_graph(graph, **kwargs):
return graph
mock_do_graph.side_effect = fake_do_graph
plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)])
plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)])
result_afterglow = plist.afterglow()
assert len(result_afterglow.split('\n')) == 19
assert result_afterglow.startswith('digraph "afterglow" {')
test_afterglow()
= psdump()
print("PYX: %d" % PYX)
if PYX:
import tempfile
import os
filename = tempfile.mktemp(suffix=".eps")
plist = PacketList([IP()/TCP()])
plist.psdump(filename)
assert os.path.exists(filename)
os.unlink(filename)
= pdfdump()
print("PYX: %d" % PYX)
if PYX:
import tempfile
import os
filename = tempfile.mktemp(suffix=".pdf")
plist = PacketList([IP()/TCP()])
plist.pdfdump(filename)
assert os.path.exists(filename)
os.unlink(filename)
= svgdump()
print("PYX: %d" % PYX)
if PYX:
import tempfile
import os
filename = tempfile.mktemp(suffix=".svg")
plist = PacketList([IP()/TCP()])
plist.svgdump(filename)
assert os.path.exists(filename)
os.unlink(filename)
= __getstate__ / __setstate__ (used by pickle)
import pickle
frm = Ether(src='00:11:22:33:44:55', dst='00:22:33:44:55:66')/Raw()
frm.time = EDecimal(123.45)
frm.sniffed_on = "iface"
frm.wirelen = 1
pl = PacketList(res=[frm, frm], name='WhatAGreatName')
pickled = pickle.dumps(pl)
pl = pickle.loads(pickled)
assert pl.listname == "WhatAGreatName"
assert len(pl) == 2
assert pl[0].time == 123.45
assert pl[0].sniffed_on == "iface"
assert pl[0].wirelen == 1
assert pl[0][Ether].src == '00:11:22:33:44:55'
assert pl[1][Ether].dst == '00:22:33:44:55:66'
############
############
+ Scapy version
= _version()
import os
from datetime import datetime
version_filename = os.path.join(scapy._SCAPY_PKG_DIR, "VERSION")
mtime = datetime.utcfromtimestamp(os.path.getmtime(scapy.__file__))
version = "2.0.0"
with open(version_filename, "w") as fd:
fd.write(version)
os.environ["SCAPY_VERSION"] = "9.9.9"
assert scapy._version() == "9.9.9"
del os.environ["SCAPY_VERSION"]
assert scapy._version() == version
os.unlink(version_filename)
import mock
with mock.patch("scapy._version_from_git_archive") as archive:
archive.return_value = "4.4.4"
assert scapy._version() == "4.4.4"
archive.side_effect = ValueError()
with mock.patch("scapy._version_from_git_describe") as git:
git.return_value = "3.3.3"
assert scapy._version() == "3.3.3"
git.side_effect = Exception()
assert scapy._version() == mtime.strftime("%Y.%m.%d")
with mock.patch("os.path.getmtime") as getmtime:
getmtime.side_effect = Exception()
assert scapy._version() == "0.0.0"
= UTscapy HTML output
import tempfile, os
from scapy.tools.UTscapy import TestCampaign, pack_html_campaigns
test_campaign = TestCampaign("test")
test_campaign.output_file = tempfile.mktemp()
html = pack_html_campaigns([test_campaign], None, local=True)
dirname = os.path.dirname(test_campaign.output_file)
filename_js = "%s/UTscapy.js" % dirname
filename_css = "%s/UTscapy.css" % dirname
assert os.path.isfile(filename_js)
assert os.path.isfile(filename_css)
os.remove(filename_js)
os.remove(filename_css)
= test get_temp_dir
dname = get_temp_dir()
assert os.path.isdir(dname)
= test fragleak functions
~ netaccess linux fragleak
import mock
@mock.patch("scapy.layers.inet.conf.L3socket")
@mock.patch("scapy.layers.inet.select.select")
@mock.patch("scapy.layers.inet.sr1")
def _test_fragleak(func, sr1, select, L3socket):
packets = [IP(src="4.4.4.4")/ICMP()/IPerror(dst="8.8.8.8")/conf.padding_layer(load=b"greatdata")]
iterator = iter(packets)
ne = lambda *args, **kwargs: next(iterator)
L3socket.side_effect = lambda: Bunch(recv=ne, send=lambda x: None)
sr1.side_effect = ne
select.side_effect = lambda a, b, c, d: a+b+c
with ContextManagerCaptureOutput() as cmco:
func("8.8.8.8", count=1)
out = cmco.get_output()
return "greatdata" in out
assert _test_fragleak(fragleak)
assert _test_fragleak(fragleak2)