| % Regression tests for Scapy |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| ############ |
| ############ |
| + Information on Scapy |
| |
| = Setup |
| def expect_exception(e, c): |
| try: |
| c() |
| return False |
| except e: |
| return True |
| |
| = Get conf |
| ~ conf command |
| * Dump the current configuration |
| conf |
| |
| IP().src |
| conf.loopback_name |
| |
| = Test module version detection |
| ~ conf |
| |
| class FakeModule(object): |
| __version__ = "v1.12" |
| |
| class FakeModule2(object): |
| __version__ = "5.143.3.12" |
| |
| class FakeModule3(object): |
| __version__ = "v2.4.2.dev42" |
| |
| from scapy.config import _version_checker |
| |
| assert _version_checker(FakeModule, (1,11,5)) |
| assert not _version_checker(FakeModule, (1,13)) |
| |
| assert _version_checker(FakeModule2, (5, 1)) |
| assert not _version_checker(FakeModule2, (5, 143, 4)) |
| |
| assert _version_checker(FakeModule3, (2, 4, 2)) |
| |
| = Check Scapy version |
| |
| import mock |
| |
| import scapy |
| from scapy import _parse_tag, _version_from_git_describe |
| from scapy.config import _version_checker |
| |
| b = Bunch(returncode=0, communicate=lambda *args, **kargs: (b"v2.4.5rc1-261-g44b98e14", None)) |
| with mock.patch('scapy.subprocess.Popen', return_value=b): |
| with mock.patch('scapy.os.path.isdir', return_value=True): |
| class GitModuleScapy(object): |
| __version__ = _version_from_git_describe() |
| |
| # GH3847 |
| with mock.patch('scapy.subprocess.Popen', return_value=b): |
| with mock.patch('scapy.os.path.isdir', return_value=False): |
| try: |
| _version_from_git_describe() |
| assert False |
| except ValueError: |
| pass |
| |
| assert GitModuleScapy.__version__ == '2.4.5rc1.dev261' |
| assert _version_checker(GitModuleScapy, (2, 4, 5)) |
| |
| = List layers |
| ~ conf command |
| ls() |
| |
| = List layers - advanced |
| ~ conf command |
| |
| with ContextManagerCaptureOutput() as cmco: |
| ls("IP", case_sensitive=True) |
| result_ls = cmco.get_output().split("\n") |
| |
| assert all("IP" in x for x in result_ls if x.strip()) |
| assert len(result_ls) >= 3 |
| |
| = List packet fields - ls |
| ~ command |
| |
| with ContextManagerCaptureOutput() as cmco: |
| ls(ARP(hwsrc="aa:aa:aa:aa:aa:aa", psrc="1.1.1.1")) |
| result_ls = cmco.get_output().split("\n") |
| |
| result_ls |
| assert result_ls[5] == "hwsrc : MultipleTypeField (SourceMACField, StrFixedLenField) = 'aa:aa:aa:aa:aa:aa' ('None')" |
| assert result_ls[6] == "psrc : MultipleTypeField (SourceIPField, SourceIP6Field, StrFixedLenField) = '1.1.1.1' ('None')" |
| |
| = List commands |
| ~ conf command |
| lsc() |
| |
| = List contribs |
| ~ command |
| def test_list_contrib(): |
| with ContextManagerCaptureOutput() as cmco: |
| list_contrib() |
| result_list_contrib = cmco.get_output() |
| assert "http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib |
| assert len(result_list_contrib.split('\n')) > 40 |
| |
| test_list_contrib() |
| |
| = Test packet show() on LatexTheme |
| % with LatexTheme |
| |
| class SmallPacket(Packet): |
| fields_desc = [ByteField("a", 0)] |
| |
| conf_color_theme = conf.color_theme |
| conf.color_theme = LatexTheme() |
| pkt = SmallPacket() |
| with ContextManagerCaptureOutput() as cmco: |
| pkt.show() |
| result = cmco.get_output().strip() |
| |
| assert result == '\\#\\#\\#[ \\textcolor{red}{\\bf SmallPacket} ]\\#\\#\\#\n \\textcolor{blue}{a} = \\textcolor{purple}{0}' |
| conf.color_theme = conf_color_theme |
| |
| |
| = Test automatic doc generation |
| ~ command |
| |
| dat = rfc(IP, ret=True).split("\n") |
| assert dat[0].replace(" ", "").strip() == "0123" |
| assert "0123456789" in dat[1].replace(" ", "") |
| for l in dat: |
| # only upper case and +- |
| assert re.match(r"[A-Z+-]*", l) |
| |
| # Add a space before each + to avoid conflicts with UTscapy ! |
| # They will be stripped below |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |VERSION| IHL | TOS | LEN | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | ID |FLAGS| FRAG | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | TTL | PROTO | CHKSUM | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | SRC | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | DST | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | OPTIONS | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. IP |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(IP, ret=True).strip().split("\n")] |
| assert result == output |
| |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | CODE | ID | LEN | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | TYPE |L|M|S|RES|VERSI| MESSAGE LEN | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | | DATA | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. EAP_TTLS |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(EAP_TTLS, ret=True).strip().split("\n")] |
| assert result == output |
| |
| |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |VERSION| TC | FL | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | PLEN | NH | HLIM | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | SRC | |
| + + |
| | | |
| + + |
| | | |
| + + |
| | | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | DST | |
| + + |
| | | |
| + + |
| | | |
| + + |
| | | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. IPv6 |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(IPv6, ret=True).strip().split("\n")] |
| assert result == output |
| |
| = Check that all contrib modules are well-configured |
| ~ command |
| list_contrib(_debug=True) |
| |
| = Configuration |
| ~ conf |
| conf.debug_dissector = True |
| |
| = Configuration conf.use_* LINUX |
| ~ linux |
| |
| try: |
| conf.use_bpf = True |
| assert False |
| except: |
| True |
| |
| assert not conf.use_bpf |
| |
| = Configuration conf.use_* WINDOWS |
| ~ windows |
| |
| try: |
| conf.use_bpf = True |
| assert False |
| except: |
| True |
| |
| assert not conf.use_bpf |
| |
| = Configuration conf.use_pcap |
| ~ linux libpcap |
| |
| if not conf.use_pcap: |
| assert not conf.iface.provider.libpcap |
| conf.use_pcap = True |
| assert conf.iface.provider.libpcap |
| for iface in conf.ifaces.values(): |
| assert iface.provider.libpcap or iface.is_valid() == False |
| conf.use_pcap = False |
| assert not conf.iface.provider.libpcap |
| |
| = Test layer filtering |
| ~ filter |
| |
| pkt = NetflowHeader()/NetflowHeaderV5()/NetflowRecordV5() |
| |
| conf.layers.filter([NetflowHeader, NetflowHeaderV5]) |
| assert NetflowRecordV5 not in NetflowHeader(bytes(pkt)) |
| |
| conf.layers.unfilter() |
| assert NetflowRecordV5 in NetflowHeader(bytes(pkt)) |
| |
| |
| ########### |
| ########### |
| = UTscapy route check |
| * Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise |
| |
| p = IP().src |
| p |
| assert p == "127.0.0.1" |
| |
| ############ |
| ############ |
| + Scapy functions tests |
| |
| = Interface related functions |
| |
| import mock |
| |
| conf.iface |
| |
| get_if_raw_hwaddr(conf.iface) |
| |
| bytes_hex(get_if_raw_addr(conf.iface)) |
| |
| def get_dummy_interface(): |
| """Returns a dummy network interface""" |
| conf.ifaces._add_fake_iface("dummy0") |
| return "dummy0" |
| |
| get_if_raw_addr(get_dummy_interface()) |
| |
| get_if_list() |
| |
| get_working_if() |
| |
| get_if_raw_addr6(conf.iface) |
| |
| if conf.use_bpf: |
| addr = u"lladdr 29:0b:c2:ff:fe:53:21:e9\n" |
| b = Bunch(returncode=0, communicate=lambda *args, **kargs: (addr, None)) |
| with mock.patch('scapy.arch.bpf.core.subprocess.Popen', return_value=b) as popen: |
| try: |
| scapy.arch.bpf.core.get_if_raw_hwaddr("fw0") |
| assert False |
| except Scapy_Exception: |
| assert True |
| |
| = More Interfaces related functions |
| |
| # Test name resolution |
| old = conf.iface |
| conf.iface = conf.iface.name |
| assert conf.iface == old |
| |
| assert isinstance(conf.iface, NetworkInterface) |
| assert conf.iface.is_valid() |
| |
| import mock |
| @mock.patch("scapy.interfaces.conf.route.routes", []) |
| @mock.patch("scapy.interfaces.conf.ifaces.values") |
| def _test_get_working_if(rou): |
| rou.side_effect = lambda: [] |
| assert get_working_if() == conf.loopback_name |
| |
| assert conf.iface + "a" # left + |
| assert "hey! are you, ready to go ? %s" % conf.iface # format |
| assert "cuz you know the way to go" + conf.iface # right + |
| |
| _test_get_working_if() |
| |
| = Test conf.ifaces |
| |
| conf.iface |
| conf.ifaces |
| |
| assert conf.iface in conf.ifaces.values() |
| assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface |
| assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface |
| |
| conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})} |
| |
| with ContextManagerCaptureOutput() as cmco: |
| conf.ifaces.show() |
| output = cmco.get_output() |
| |
| data = """ |
| Source Index Name MAC IPv4 IPv6 |
| Unknown 0 a aa:aa:aa:aa:aa:aa 127.0.0.1 ::1 |
| 127.0.0.2 ::2 |
| """.strip() |
| |
| output = [x.strip() for x in output.strip().split("\n")] |
| data = [x.strip() for x in data.strip().split("\n")] |
| |
| assert output == data |
| |
| conf.ifaces.reload() |
| |
| = Test extcap detection in conf.ifaces |
| ~ linux extcap |
| |
| import os |
| from scapy.libs.extcap import load_extcap |
| |
| _bkp_extcap = conf.prog.extcap_folders |
| _bkp_providers = conf.ifaces.providers.copy() |
| |
| conf.ifaces.providers.clear() |
| |
| # Create some sort of extcap parody program |
| extcapfld = get_temp_dir() |
| extcapprog = os.path.join(extcapfld, "runner.sh") |
| data = """#!/usr/bin/env python3 |
| |
| import struct |
| import argparse |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--extcap-interfaces', action='store_true') |
| parser.add_argument('--capture', action='store_true') |
| parser.add_argument('--extcap-config', action='store_true') |
| parser.add_argument('--scan-follow-rsp', action='store_true') |
| parser.add_argument('--scan-follow-aux', action='store_true') |
| parser.add_argument('--extcap-interface', type=str) |
| parser.add_argument('--fifo', type=str) |
| |
| args = parser.parse_args() |
| if args.extcap_interfaces: |
| # List interfaces |
| print(bytes.fromhex("0a657874636170207b76657273696f6e3d342e312e317d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d7b68656c703d68747470733a2f2f7777772e6e6f7264696373656d692e636f6d2f536f6674776172652d616e642d546f6f6c732f446576656c6f706d656e742d546f6f6c732f6e52462d536e69666665722d666f722d426c7565746f6f74682d4c457d0a696e74657266616365207b76616c75653d2f6465762f747479555342352d4e6f6e657d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d0a636f6e74726f6c207b6e756d6265723d307d7b747970653d73656c6563746f727d7b646973706c61793d4465766963657d7b746f6f6c7469703d446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d317d7b747970653d73656c6563746f727d7b646973706c61793d4b65797d7b746f6f6c7469703d7d0a636f6e74726f6c207b6e756d6265723d327d7b747970653d737472696e677d7b646973706c61793d56616c75657d7b746f6f6c7469703d3620646967697420706173736b6579206f72203136206f7220333220627974657320656e6372797074696f6e206b657920696e2068657861646563696d616c207374617274696e67207769746820273078272c2062696720656e6469616e20666f726d61742e49662074686520656e7465726564206b65792069732073686f72746572207468616e203136206f722033322062797465732c2069742077696c6c206265207a65726f2d70616464656420696e2066726f6e74277d7b76616c69646174696f6e3d5c625e28285b302d395d7b367d297c2830785b302d39612d66412d465d7b312c36347d297c285b302d39412d46612d665d7b327d5b3a2d5d297b357d285b302d39412d46612d665d7b327d2920287075626c69637c72616e646f6d2929245c627d0a636f6e74726f6c207b6e756d6265723d337d7b747970653d737472696e677d7b646973706c61793d41647620486f707d7b64656661756c743d33372c33382c33397d7b746f6f6c7469703d4164766572746973696e67206368616e6e656c20686f702073657175656e63652e204368616e676520746865206f7264657220696e2077686963682074686520736e6966666572207377697463686573206164766572746973696e67206368616e6e656c732e2056616c6964206368616e6e656c73206172652033372c20333820616e642033392073657061726174656420627920636f6d6d612e7d7b76616c69646174696f6e3d5e5c732a282833377c33387c3339295c732a2c5c732a297b302c327d2833377c33387c3339297b317d5c732a247d7b72657175697265643d747275657d0a636f6e74726f6c207b6e756d6265723d377d7b747970653d627574746f6e7d7b646973706c61793d436c6561727d7b746f6f6c746f703d436c656172206f722072656d6f7665206465766963652066726f6d20446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d347d7b747970653d627574746f6e7d7b726f6c653d68656c707d7b646973706c61793d48656c707d7b746f6f6c7469703d416363657373207573657220677569646520286c61756e636865732062726f77736572297d0a636f6e74726f6c207b6e756d6265723d357d7b747970653d627574746f6e7d7b726f6c653d726573746f72657d7b646973706c61793d44656661756c74737d7b746f6f6c7469703d52657365747320746865207573657220696e7465726661636520616e6420636c6561727320746865206c6f672066696c657d0a636f6e74726f6c207b6e756d6265723d367d7b747970653d627574746f6e7d7b726f6c653d6c6f676765727d7b646973706c61793d4c6f677d7b746f6f6c7469703d4c6f672070657220696e746572666163657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d207d7b646973706c61793d416c6c206164766572746973696e6720646576696365737d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d5b30302c30302c30302c30302c30302c30302c305d7d7b646973706c61793d466f6c6c6f772049524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d307d7b646973706c61793d4c656761637920506173736b65797d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d317d7b646973706c61793d4c6567616379204f4f4220646174617d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d327d7b646973706c61793d4c6567616379204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d337d7b646973706c61793d5343204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d347d7b646973706c61793d53432050726976617465204b65797d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d357d7b646973706c61793d49524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d367d7b646973706c61793d416464204c4520616464726573737d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d377d7b646973706c61793d466f6c6c6f77204c4520616464726573737d").decode()) |
| elif args.extcap_interface and args.extcap_config: |
| # List config |
| print(bytes.fromhex("617267207b6e756d6265723d307d7b63616c6c3d2d2d6f6e6c792d6164766572746973696e677d7b646973706c61793d4f6e6c79206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d317d7b63616c6c3d2d2d6f6e6c792d6c65676163792d6164766572746973696e677d7b646973706c61793d4f6e6c79206c6567616379206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206c6567616379206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d327d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d7273707d7b646973706c61793d46696e64207363616e20726573706f6e736520646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f77207363616e20726571756573747320616e64207363616e20726573706f6e73657320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d6175787d7b646973706c61793d46696e6420617578696c6961727920706f696e74657220646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f772061757820706f696e7465727320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d636f6465647d7b646973706c61793d5363616e20616e6420666f6c6c6f772064657669636573206f6e204c4520436f646564205048597d7b746f6f6c7469703d5363616e20666f72206465766963657320616e6420666f6c6c6f772061647665727469736572206f6e204c4520436f646564205048597d7b747970653d626f6f6c666c61677d7b64656661756c743d66616c73657d7b736176653d747275657d").decode()) |
| elif args.capture and args.extcap_interface and args.fifo: |
| # Capture |
| pkts = [ |
| bytes.fromhex("ffffffffffff00000000000008004500001c0001000040117cce7f0000017f0000010035003500080172") |
| ] |
| with open(args.fifo, "wb", 0) as fd: |
| # header |
| fd.write( |
| struct.pack( |
| "IHHIIII", |
| 0xa1b2c3d4, |
| 2, 4, 0, 0, 65535, 1 |
| ) |
| ) |
| for pkt in pkts: |
| fd.write(struct.pack("IIII", 0, 0, len(pkt), len(pkt))) |
| fd.write(bytes(pkt)) |
| else: |
| raise ValueError("Bad arguments") |
| """.strip() |
| with open(extcapprog, "w") as fd: |
| fd.write(data) |
| |
| print(data) |
| |
| os.chmod(extcapprog, 0o777) |
| |
| # Inject and load provider |
| conf.prog.extcap_folders = [extcapfld] |
| load_extcap() |
| print(conf.ifaces.providers) |
| conf.ifaces.reload() |
| |
| # Now do the tests |
| iface = conf.ifaces.dev_from_networkname('/dev/ttyUSB5-None') |
| assert iface.name == "nRF Sniffer for Bluetooth LE" |
| sock = iface.l2listen()(iface=iface) |
| pkts = sock.sniff(timeout=2) |
| sock.close() |
| assert UDP in pkts[0] |
| |
| config = iface.get_extcap_config() |
| assert config["arg"] == [ |
| ('0', '--only-advertising', 'Only advertising packets', '', ''), |
| ('1', '--only-legacy-advertising', 'Only legacy advertising packets', '', ''), |
| ('2', '--scan-follow-rsp', 'Find scan response data', 'true', ''), |
| ('3', '--scan-follow-aux', 'Find auxiliary pointer data', 'true', ''), |
| ('3', '--coded', 'Scan and follow devices on LE Coded PHY', 'false', '') |
| ] |
| |
| # Restore |
| conf.prog.extcap_folders = _bkp_extcap |
| conf.ifaces.providers = _bkp_providers |
| conf.ifaces.reload() |
| |
| = Test read_routes6() - default output |
| |
| routes6 = read_routes6() |
| if WINDOWS: |
| from scapy.arch.windows import _route_add_loopback |
| _route_add_loopback(routes6, True) |
| |
| routes6 |
| |
| # Expected results: |
| # - one route if there is only the loopback interface |
| # - one route if IPv6 is supported but disabled on network interfaces |
| # - three routes if there is a network interface |
| # - on OpenBSD, only two routes on lo0 are expected |
| |
| if routes6: |
| iflist = get_if_list() |
| if WINDOWS: |
| from scapy.arch.windows import _route_add_loopback |
| _route_add_loopback(ipv6=True, iflist=iflist) |
| if OPENBSD: |
| len(routes6) >= 2 |
| elif iflist == [conf.loopback_name]: |
| len(routes6) == 1 |
| elif len(iflist) >= 2: |
| len(routes6) >= 1 |
| else: |
| False |
| else: |
| # IPv6 seems disabled. Force a route to ::1 |
| conf.route6.routes.append(("::1", 128, "::", conf.loopback_name, ["::1"], 1)) |
| conf.route6.ipv6_ifaces = set([conf.loopback_name]) |
| True |
| |
| = Build HBHOptUnknown for IPv6ExtHdrHopByHop with disabled autopad |
| ~ ipv6 hbh opt |
| * Build the HBHOptUnknown of IPv6ExtHdrHopByHop with autopad=0 |
| v6Opt = HBHOptUnknown(otype=3, optlen=7, optdata="Beijing") |
| pkt = Ether()/IPv6()/IPv6ExtHdrHopByHop(autopad=0, options=[v6Opt, ]) |
| pkt.build() |
| |
| = Build HBHOptUnknown for IPv6ExtHdrDestOpt with disabled autopad |
| ~ ipv6 hbh opt |
| * Build the HBHOptUnknown of IPv6ExtHdrDestOpt with autopad=0 |
| v6Opt = HBHOptUnknown(otype=3, optlen=6, optdata="Haikou") |
| pkt = Ether()/IPv6()/IPv6ExtHdrDestOpt(autopad=0, options=[v6Opt, ]) |
| pkt.build() |
| |
| |
| = Test read_routes6() - check mandatory routes |
| |
| conf.route6 |
| |
| # Doesn't pass on Travis Bionic XXX |
| if len(routes6) > 2 and not WINDOWS: |
| # Identify routes to fe80::/64 |
| assert sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) >= 1 |
| if not OPENBSD and len(iflist) >= 2: |
| assert sum(1 for r in routes6 if r[0] == "fe80::" and r[1] == 64) >= 1 |
| try: |
| # Identify a route to a node IPv6 link-local address |
| assert sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128) >= 1 |
| except: |
| # IPv6 is not available, but we still check the loopback |
| assert conf.route6.route("::/0") == (conf.loopback_name, "::", "::") |
| assert sum(1 for r in routes6 if r[1] == 128 and r[4] == ["::1"]) >= 1 |
| else: |
| True |
| |
| = Test ifchange() |
| conf.route6.ifchange(conf.loopback_name, "::1/128") |
| if WINDOWS: |
| conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff" # Restore fake cache |
| |
| True |
| |
| = Packet.route() |
| assert (Ether() / ARP()).route()[0] is not None |
| assert (Ether() / ARP()).payload.route()[0] is not None |
| assert (ARP(ptype=0, pdst="hello. this isn't a valid IP")).route()[0] is None |
| |
| |
| = plain_str test |
| |
| data = b"\xffsweet\xef celestia\xab" |
| assert plain_str(data) == "\\xffsweet\\xef celestia\\xab" |
| |
| ############ |
| ############ |
| + compat.py |
| |
| = test bytes_hex/hex_bytes |
| |
| monty_data = b"Stop! Who approaches the Bridge of Death must answer me these questions three, 'ere the other side he see." |
| hex_data = bytes_hex(monty_data) |
| assert hex_data == b'53746f70212057686f20617070726f61636865732074686520427269646765206f66204465617468206d75737420616e73776572206d65207468657365207175657374696f6e732074687265652c202765726520746865206f746865722073696465206865207365652e' |
| assert hex_bytes(hex_data) == monty_data |
| |
| = orb/chb |
| |
| assert orb(b"\x01"[0]) == 1 |
| assert chb(1) == b"\x01" |
| |
| ############ |
| ############ |
| + Main.py tests |
| |
| = Pickle and unpickle a packet |
| |
| import pickle |
| |
| a = IP(dst="192.168.0.1")/UDP() |
| |
| b = pickle.dumps(a) |
| c = pickle.loads(b) |
| |
| assert c[IP].dst == "192.168.0.1" |
| assert raw(c) == raw(a) |
| |
| = Usage test |
| |
| from scapy.main import _usage |
| try: |
| _usage() |
| assert False |
| except SystemExit: |
| assert True |
| |
| = Session test |
| |
| import builtins |
| |
| # This is automatic when using the console |
| def get_var(var): |
| return builtins.__dict__["scapy_session"][var] |
| |
| def set_var(var, value): |
| builtins.__dict__["scapy_session"][var] = value |
| |
| def del_var(var): |
| del builtins.__dict__["scapy_session"][var] |
| |
| init_session(None, {"init_value": 123}) |
| set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8" |
| save_session() |
| del_var("test_value") |
| load_session() |
| update_session() |
| assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8" |
| assert get_var("init_value") == 123 |
| |
| = Session test with fname |
| |
| session_name = tempfile.mktemp() |
| init_session(session_name) |
| set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1") |
| save_session(fname="%s.dat" % session_name) |
| del_var("test_value") |
| |
| set_var("z", True) #z = True |
| load_session(fname="%s.dat" % session_name) |
| try: |
| get_var("z") |
| assert False |
| except: |
| pass |
| |
| set_var("z", False) #z = False |
| update_session(fname="%s.dat" % session_name) |
| assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1" |
| assert not get_var("z") |
| |
| = Clear session files |
| |
| os.remove("%s.dat" % session_name) |
| |
| = Test temporary file creation |
| ~ ci_only |
| |
| scapy_delete_temp_files() |
| |
| tmpfile = get_temp_file(autoext=".ut") |
| tmpfile |
| if WINDOWS: |
| assert "scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp') |
| else: |
| import platform |
| BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN |
| assert "scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile) |
| |
| assert conf.temp_files[0].endswith(".ut") |
| scapy_delete_temp_files() |
| assert len(conf.temp_files) == 0 |
| |
| = Emulate interact() |
| ~ interact |
| |
| import mock, sys |
| from scapy.main import interact |
| |
| from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file |
| _read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART) |
| # By now .config/scapy/startup.py should have been created |
| with open(DEFAULT_PRESTART_FILE, "r") as fd: |
| OLD_DEFAULT_PRESTART = fd.read() |
| |
| with open(DEFAULT_PRESTART_FILE, "w+") as fd: |
| fd.write("conf.interactive_shell = 'ipython'") |
| |
| # Detect IPython |
| try: |
| import IPython |
| except: |
| code_interact_import = "scapy.main.code.interact" |
| else: |
| code_interact_import = "IPython.start_ipython" |
| |
| @mock.patch(code_interact_import) |
| def interact_emulator(code_int, extra_args=[]): |
| try: |
| code_int.side_effect = lambda *args, **kwargs: lambda *args, **kwargs: None |
| interact(argv=["-s scapy1"] + extra_args, mybanner="What a test") |
| finally: |
| sys.ps1 = ">>> " |
| |
| interact_emulator() # Default |
| |
| try: |
| interact_emulator(extra_args=["-?"]) # Failing |
| assert False |
| except: |
| pass |
| |
| interact_emulator(extra_args=["-d"]) # Extended |
| |
| = Emulate interact() and test startup.py with ptpython |
| ~ interact |
| |
| import sys |
| import mock |
| |
| from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file |
| _read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART) |
| # By now .config/scapy/startup.py should have been created |
| with open(DEFAULT_PRESTART_FILE, "w+") as fd: |
| fd.write("conf.interactive_shell = 'ptpython'") |
| |
| called = [] |
| def checker(*args, **kwargs): |
| locals = kwargs.pop("locals") |
| assert locals["IP"] |
| history_filename = kwargs.pop("history_filename") |
| assert history_filename == conf.histfile |
| called.append(True) |
| |
| ptpython_mocked_module = Bunch( |
| repl=Bunch( |
| embed=checker |
| ) |
| ) |
| |
| modules_patched = { |
| "ptpython": ptpython_mocked_module, |
| "ptpython.repl": ptpython_mocked_module.repl, |
| "ptpython.repl.embed": ptpython_mocked_module.repl.embed, |
| } |
| |
| with mock.patch.dict("sys.modules", modules_patched): |
| try: |
| interact() |
| finally: |
| sys.ps1 = ">>> " |
| |
| # Restore |
| with open(DEFAULT_PRESTART_FILE, "w") as fd: |
| print(OLD_DEFAULT_PRESTART) |
| r = fd.write(OLD_DEFAULT_PRESTART) |
| |
| assert called |
| |
| = Test explore() with GUI mode |
| ~ command |
| |
| import mock |
| |
| def test_explore_gui(is_layer, layer): |
| prompt_toolkit_mocked_module = Bunch( |
| shortcuts=Bunch( |
| dialogs=Bunch( |
| radiolist_dialog=(lambda *args, **kargs: layer), |
| button_dialog=(lambda *args, **kargs: "layers" if is_layer else "contribs") |
| ) |
| ), |
| formatted_text=Bunch(HTML=lambda x: x), |
| __version__="2.0.0" |
| ) |
| # a mock.patch isn't enough to mock a module. Let's roll sys.modules |
| modules_patched = { |
| "prompt_toolkit": prompt_toolkit_mocked_module, |
| "prompt_toolkit.shortcuts": prompt_toolkit_mocked_module.shortcuts, |
| "prompt_toolkit.shortcuts.dialogs": prompt_toolkit_mocked_module.shortcuts.dialogs, |
| "prompt_toolkit.formatted_text": prompt_toolkit_mocked_module.formatted_text, |
| } |
| with mock.patch.dict("sys.modules", modules_patched): |
| with ContextManagerCaptureOutput() as cmco: |
| explore() |
| result_explore = cmco.get_output() |
| return result_explore |
| |
| conf.interactive = True |
| explore_dns = test_explore_gui(True, "scapy.layers.dns") |
| assert "DNS" in explore_dns |
| assert "DNS Question Record" in explore_dns |
| assert "DNSRRNSEC3" in explore_dns |
| assert "DNS TSIG Resource Record" in explore_dns |
| |
| explore_avs = test_explore_gui(False, "avs") |
| assert "AVSWLANHeader" in explore_avs |
| assert "AVS WLAN Monitor Header" in explore_avs |
| |
| = Test explore() with non-GUI mode |
| ~ command |
| |
| def test_explore_non_gui(layer): |
| with ContextManagerCaptureOutput() as cmco: |
| explore(layer) |
| result_explore = cmco.get_output() |
| return result_explore |
| |
| explore_dns = test_explore_non_gui("scapy.layers.dns") |
| assert "DNS" in explore_dns |
| assert "DNS Question Record" in explore_dns |
| assert "DNSRRNSEC3" in explore_dns |
| assert "DNS TSIG Resource Record" in explore_dns |
| |
| explore_avs = test_explore_non_gui("avs") |
| assert "AVSWLANHeader" in explore_avs |
| assert "AVS WLAN Monitor Header" in explore_avs |
| |
| assert test_explore_non_gui("scapy.layers.dns") == test_explore_non_gui("dns") |
| assert test_explore_non_gui("scapy.contrib.avs") == test_explore_non_gui("avs") |
| |
| try: |
| explore("unknown_module") |
| assert False # The previous should have raised an exception |
| except Scapy_Exception: |
| pass |
| |
| = Test load_contrib overwrite |
| load_contrib("gtp") |
| assert GTPHeader.__module__ == "scapy.contrib.gtp" |
| |
| load_contrib("gtp_v2") |
| assert GTPHeader.__module__ == "scapy.contrib.gtp_v2" |
| |
| load_contrib("gtp") |
| assert GTPHeader.__module__ == "scapy.contrib.gtp" |
| |
| = Test load_contrib failure |
| try: |
| load_contrib("doesnotexist") |
| assert False |
| except: |
| pass |
| |
| = Test sane function |
| sane("A\x00\xFFB") == "A..B" |
| |
| = Test lhex function |
| assert lhex(42) == "0x2a" |
| assert lhex((28,7)) == "(0x1c, 0x7)" |
| assert lhex([28,7]) == "[0x1c, 0x7]" |
| |
| = Test restart function |
| import mock |
| conf.interactive = True |
| |
| try: |
| from scapy.utils import restart |
| import os |
| @mock.patch("os.execv") |
| @mock.patch("subprocess.call") |
| @mock.patch("os._exit") |
| def _test(e, m, m2): |
| def check(x, y=[]): |
| z = [x] + y if not isinstance(x, list) else x + y |
| assert os.path.isfile(z[0]) |
| assert os.path.isfile(z[1]) |
| return 0 |
| m2.side_effect = check |
| m.side_effect = check |
| e.side_effect = lambda x: None |
| restart() |
| _test() |
| finally: |
| conf.interactive = False |
| |
| = Test linehexdump function |
| conf_color_theme = conf.color_theme |
| conf.color_theme = BlackAndWhite() |
| assert linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == 'FF FF FF FF FF FF 00 01 02 03 04 05 90 00 ..............' |
| conf.color_theme = conf_color_theme |
| |
| = Test chexdump function |
| chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00" |
| |
| = Test repr_hex function |
| repr_hex("scapy") == "7363617079" |
| |
| = Test hexstr function |
| hexstr(b"A\x00\xFFB") == "41 00 FF 42 A..B" |
| |
| = Test fletcher16 functions |
| assert fletcher16_checksum(b"\x28\x07") == 22319 |
| assert fletcher16_checkbytes(b"\x28\x07", 1) == b"\xaf(" |
| |
| = Test hexdiff function |
| ~ not_pypy |
| def test_hexdiff(a, b, algo=None, autojunk=False): |
| conf_color_theme = conf.color_theme |
| conf.color_theme = BlackAndWhite() |
| with ContextManagerCaptureOutput() as cmco: |
| hexdiff(a, b, algo=algo, autojunk=autojunk) |
| result_hexdiff = cmco.get_output() |
| conf.interactive = True |
| conf.color_theme = conf_color_theme |
| return result_hexdiff |
| |
| # Basic string test |
| |
| result_hexdiff = test_hexdiff("abcde", "abCde") |
| expected = "0000 61 62 63 64 65 abcde\n" |
| expected += " 0000 61 62 43 64 65 abCde\n" |
| assert result_hexdiff == expected |
| |
| # More advanced string test |
| |
| result_hexdiff = test_hexdiff("add_common_", "_common_removed") |
| expected = "0000 61 64 64 5F 63 6F 6D 6D 6F 6E 5F add_common_ \n" |
| expected += " -003 5F 63 6F 6D 6D 6F 6E 5F 72 65 6D 6F 76 _common_remov\n" |
| expected += " 000d 65 64 ed\n" |
| assert result_hexdiff == expected |
| |
| # Compare packets |
| |
| result_hexdiff = test_hexdiff(IP(dst="127.0.0.1", src="127.0.0.1"), IP(dst="127.0.0.2", src="127.0.0.1")) |
| expected = "0000 45 00 00 14 00 01 00 00 40 00 7C E7 7F 00 00 01 E.......@.|.....\n" |
| expected += " 0000 45 00 00 14 00 01 00 00 40 00 7C E6 7F 00 00 01 E.......@.|.....\n" |
| expected += "0010 7F 00 00 01 ....\n" |
| expected += " 0010 7F 00 00 02 ....\n" |
| assert result_hexdiff == expected |
| |
| # Compare using difflib |
| |
| a = "A" * 1000 + "findme" + "B" * 1000 |
| b = "A" * 1000 + "B" * 1000 |
| ret1 = test_hexdiff(a, b, algo="difflib") |
| ret2 = test_hexdiff(a, b, algo="difflib", autojunk=True) |
| |
| expected_ret1 = """ |
| 03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA |
| 03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB |
| 03e0 41 41 41 41 41 41 41 41 42 42 AAAAAAAA BB |
| 03ea 03ea 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB |
| """ |
| expected_ret2 = """ |
| 03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA |
| 03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB |
| 03e0 41 41 41 41 41 41 41 41 42 42 42 42 42 42 42 42 AAAAAAAABBBBBBBB |
| 03f0 03f0 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB |
| """ |
| |
| assert ret1 != ret2 |
| assert expected_ret1 in ret1 |
| assert expected_ret2 in ret2 |
| |
| # Test corner cases that should not crash |
| |
| hexdiff(b"abc", IP() / TCP()) |
| hexdiff(IP() / TCP(), b"abc") |
| |
| = Test mysummary functions - Ether |
| |
| p = Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000) |
| p |
| assert p.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop |
| for loop in ['0x9000', 'LOOP']] |
| |
| = Test zerofree_randstring function |
| random.seed(0x2807) |
| zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12'] |
| |
| = Test strand function |
| assert strand("AC", "BC") == b'@C' |
| |
| = Test export_object and import_object functions |
| import mock |
| def test_export_import_object(): |
| with ContextManagerCaptureOutput() as cmco: |
| export_object(2807) |
| result_export_object = cmco.get_output(eval_bytes=True) |
| assert result_export_object.startswith("eNprYPL9zqUHAAdrAf8=") |
| assert import_object(result_export_object) == 2807 |
| |
| test_export_import_object() |
| |
| = Test tex_escape function |
| tex_escape("$#_") == "\\$\\#\\_" |
| |
| = Test colgen function |
| f = colgen(range(3)) |
| assert len([next(f) for i in range(2)]) == 2 |
| |
| = Test incremental_label function |
| f = incremental_label() |
| assert [next(f) for i in range(2)] == ["tag00000", "tag00001"] |
| |
| = Test corrupt_* functions |
| import random |
| random.seed(0x2807) |
| assert corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"] |
| assert sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"] |
| |
| assert corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"] |
| assert sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"] |
| |
| = Test save_object and load_object functions |
| import tempfile |
| fd, fname = tempfile.mkstemp() |
| save_object(fname, 2807) |
| assert load_object(fname) == 2807 |
| |
| = Test whois function |
| ~ netaccess |
| |
| if not WINDOWS: |
| result = whois("193.0.6.139") |
| assert b"inetnum" in result and b"Amsterdam" in result |
| |
| = Test manuf DB methods |
| ~ manufdb |
| assert conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03" |
| assert conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next" |
| assert in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next") |
| |
| assert conf.manufdb.lookup("00:00:0F:01:02:03") == ('Next', 'Next, Inc.') |
| assert "00:00:0F" in conf.manufdb.reverse_lookup("Next") |
| |
| = Test multiple wireshark's manuf formats |
| ~ manufdb |
| |
| new_format = """ |
| # comment |
| 00:00:00 JokyIsland Joky Insland Corp SA |
| 00:01:12 SecdevCorp Secdev Corporation SA LLC |
| EE:05:01 Scapy Scapy CO LTD & CIE |
| FF:00:11 NoName |
| """ |
| old_format = """ |
| # comment |
| 00:00:00 JokyIsland # Joky Insland Corp SA |
| 00:01:12 SecdevCorp # Secdev Corporation SA LLC |
| EE:05:01 Scapy # Scapy CO LTD & CIE |
| FF:00:11 NoName |
| """ |
| |
| manuf1 = get_temp_file() |
| manuf2 = get_temp_file() |
| |
| with open(manuf1, "w") as w: |
| w.write(old_format) |
| |
| with open(manuf2, "w") as w: |
| w.write(new_format) |
| |
| a = load_manuf(manuf1) |
| b = load_manuf(manuf2) |
| |
| a.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA'), |
| a.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName') |
| a.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')} |
| a.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')} |
| |
| |
| b.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA'), |
| b.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName') |
| b.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')} |
| b.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')} |
| |
| scapy_delete_temp_files() |
| |
| = Test load_services |
| |
| data_services = """ |
| itu-bicc-stc 3097/sctp |
| cvsup 5999/udp # CVSup |
| x11 6000-6063/tcp # X Window System |
| x11 6000-6063/udp # X Window System |
| ndl-ahp-svc 6064/tcp # NDL-AHP-SVC |
| """ |
| |
| services = get_temp_file() |
| with open(services, "w") as w: |
| w.write(data_services) |
| |
| tcp, udp, sctp = load_services(services) |
| assert tcp[6002] == "x11" |
| assert tcp.ndl_ahp_svc == 6064 |
| assert tcp.x11 in range(6000, 6093) |
| assert udp[6002] == "x11" |
| assert udp.x11 in range(6000, 6093) |
| assert udp.cvsup == 5999 |
| assert sctp[3097] == "itu_bicc_stc" |
| assert sctp.itu_bicc_stc == 3097 |
| |
| scapy_delete_temp_files() |
| |
| = Test utility functions - network related |
| ~ netaccess |
| |
| assert atol("1.1.1.1") == 0x1010101 |
| assert atol("192.168.0.1") == 0xc0a80001 |
| |
| = Test autorun functions |
| ~ autorun |
| |
| ret = autorun_get_text_interactive_session("IP().src") |
| ret |
| assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1') |
| |
| ret = autorun_get_html_interactive_session("IP().src") |
| ret |
| assert ret == ("<span class=prompt>>>> </span>IP().src\n'127.0.0.1'\n", '127.0.0.1') |
| |
| ret = autorun_get_latex_interactive_session("IP().src") |
| ret |
| assert ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1') |
| |
| ret = autorun_get_text_interactive_session("scapy_undefined") |
| assert "NameError" in ret[0] |
| |
| = Test autorun with logging |
| |
| cmds = """log_runtime.info(hex_bytes("446166742050756e6b"))\n""" |
| ret = autorun_get_text_interactive_session(cmds) |
| ret |
| assert "Daft Punk" in ret[0] |
| |
| = Test utility TEX functions |
| |
| assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}" |
| |
| a = colgen(1, 2, 3) |
| assert next(a) == (1, 2, 2) |
| assert next(a) == (1, 3, 3) |
| assert next(a) == (2, 2, 1) |
| assert next(a) == (2, 3, 2) |
| assert next(a) == (2, 1, 3) |
| assert next(a) == (3, 3, 1) |
| assert next(a) == (3, 1, 2) |
| assert next(a) == (3, 2, 3) |
| |
| = Test config file functions |
| |
| saved_conf_verb = conf.verb |
| fd, fname = tempfile.mkstemp() |
| os.write(fd, b"conf.verb = 42\n") |
| os.close(fd) |
| from scapy.main import _read_config_file |
| _read_config_file(fname, globals(), locals()) |
| assert conf.verb == 42 |
| conf.verb = saved_conf_verb |
| |
| = Test config file functions failures |
| |
| from scapy.main import _read_config_file, _probe_config_file |
| assert _read_config_file(_probe_config_file("filethatdoesnotexistnorwillever.tsppajfsrdrr")) is None |
| |
| = Test CacheInstance repr |
| |
| conf.netcache |
| |
| = Test pyx detection functions |
| |
| from mock import patch |
| |
| def _r(*args, **kwargs): |
| raise OSError |
| |
| with patch("scapy.libs.test_pyx.subprocess.check_call", _r): |
| from scapy.libs.test_pyx import _test_pyx |
| assert _test_pyx() == False |
| |
| = Test matplotlib detection functions |
| |
| from mock import MagicMock, patch |
| |
| bck_scapy_libs_matplot = sys.modules.get("scapy.libs.matplot", None) |
| if bck_scapy_libs_matplot: |
| del sys.modules["scapy.libs.matplot"] |
| |
| mock_matplotlib = MagicMock() |
| mock_matplotlib.get_backend.return_value = "inline" |
| mock_matplotlib.pyplot = MagicMock() |
| mock_matplotlib.pyplot.plt = None |
| with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}): |
| from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS, Line2D |
| assert MATPLOTLIB == 1 |
| assert MATPLOTLIB_INLINED == 1 |
| assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS |
| |
| mock_matplotlib.get_backend.return_value = "ko" |
| with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}): |
| from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS |
| assert MATPLOTLIB == 1 |
| assert MATPLOTLIB_INLINED == 0 |
| assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS |
| |
| if bck_scapy_libs_matplot: |
| sys.modules["scapy.libs.matplot"] = bck_scapy_libs_matplot |
| |
| |
| ############ |
| ############ |
| + Basic tests |
| |
| * Those test are here mainly to check nothing has been broken |
| * and to catch Exceptions |
| |
| = Packet class methods |
| p = IP()/ICMP() |
| ret = p.do_build_ps() |
| assert ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00" |
| assert len(ret[1]) == 2 |
| |
| assert p[ICMP].firstlayer() == p |
| |
| assert p.command() == "IP()/ICMP()" |
| |
| p.decode_payload_as(UDP) |
| assert p.sport == 2048 and p.dport == 63487 |
| |
| = hide_defaults |
| conf_color_theme = conf.color_theme |
| conf.color_theme = BlackAndWhite() |
| p = IP(ttl=64)/ICMP() |
| assert repr(p) in ["<IP frag=0 ttl=64 proto=icmp |<ICMP |>>", "<IP frag=0 ttl=64 proto=1 |<ICMP |>>"] |
| p.hide_defaults() |
| assert repr(p) in ["<IP frag=0 proto=icmp |<ICMP |>>", "<IP frag=0 proto=1 |<ICMP |>>"] |
| conf.color_theme = conf_color_theme |
| |
| = split_layers |
| p = IP()/ICMP() |
| s = raw(p) |
| split_layers(IP, ICMP, proto=1) |
| assert Raw in IP(s) |
| bind_layers(IP, ICMP, frag=0, proto=1) |
| |
| = fuzz |
| |
| r = fuzz(IP(tos=2)/ICMP()) |
| assert r.tos == 2 |
| z = r.ttl |
| assert r.ttl != z |
| assert r.ttl != z |
| |
| |
| = fuzz a Packet with MultipleTypeField |
| |
| fuzz(ARP(pdst="127.0.0.1")) |
| fuzz(IP()/ARP(pdst='10.0.0.254')) |
| |
| = fuzz on packets with advanced RandNum |
| |
| x = IP(dst="8.8.8.8")/fuzz(UDP()/NTP(version=4)) |
| x.show2() |
| x = IP(raw(x)) |
| assert NTP in x |
| |
| = fuzz on packets with FlagsField |
| assert isinstance(fuzz(TCP()).flags, VolatileValue) |
| |
| = Building some packets |
| ~ basic IP TCP UDP NTP LLC SNAP Dot11 |
| IP()/TCP() |
| Ether()/IP()/UDP()/NTP() |
| Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX" |
| IP(ttl=25)/TCP(sport=12, dport=42) |
| IP().summary() |
| |
| = Manipulating some packets |
| ~ basic IP TCP |
| a=IP(ttl=4)/TCP() |
| a.ttl |
| a.ttl=10 |
| del a.ttl |
| a.ttl |
| TCP in a |
| a[TCP] |
| a[TCP].dport=[80,443] |
| a |
| assert a.copy().time == a.time |
| a=3 |
| |
| = Bind string array as payload |
| ~ basic |
| assert bytes(Raw("sca")/"py") == b"scapy" |
| assert bytes(Raw("sca")/b"py") == b"scapy" |
| assert bytes(Raw("sca")/bytearray(b"py")) == b"scapy" |
| assert bytes("sca"/Raw("py")) == b"scapy" |
| assert bytes(b"sca"/Raw("py")) == b"scapy" |
| assert bytes(bytearray(b"sca")/Raw("py")) == b"scapy" |
| a=Raw("sca") |
| a.add_payload("py") |
| assert bytes(a) == b"scapy" |
| a=Raw("sca") |
| a.add_payload(b"py") |
| assert bytes(a) == b"scapy" |
| a=Raw("sca") |
| a.add_payload(bytearray(b"py")) |
| assert bytes(a) == b"scapy" |
| |
| = Checking overloads |
| ~ basic IP TCP Ether |
| a=Ether()/IP()/TCP() |
| r = a.proto |
| r |
| r == 6 |
| |
| |
| = sprintf() function |
| ~ basic sprintf Ether IP UDP NTP |
| a=Ether()/IP()/IP(ttl=4)/UDP()/NTP() |
| r = a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%") |
| r |
| r in ['0x800 64 0x07b 4', 'IPv4 64 0x07b 4'] |
| |
| |
| = sprintf() function |
| ~ basic sprintf IP TCP SNAP LLC Dot11 |
| * This test is on the conditional substring feature of <tt>sprintf()</tt> |
| a=Dot11()/LLC()/SNAP()/IP()/TCP() |
| r = a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}") |
| r |
| r == 'flags=S 127.0.0.1' |
| |
| |
| = haslayer function |
| ~ basic haslayer IP TCP ICMP ISAKMP |
| x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP() |
| r = (TCP in x, ICMP in x, IP in x, UDP in x) |
| r |
| r == (True,True,True,False) |
| |
| = getlayer function |
| ~ basic getlayer IP ISAKMP UDP |
| x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2) |
| x[IP] |
| x[IP:2] |
| x[IP:3] |
| x.getlayer(IP,3) |
| x.getlayer(IP,4) |
| x[UDP] |
| x[UDP:1] |
| x[UDP:2] |
| assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and |
| x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and |
| x.getlayer(IP,4) == None and |
| x[UDP].dport == 1 and x[UDP:2].dport == 2) |
| try: |
| x[IP:4] |
| except IndexError: |
| True |
| else: |
| False |
| |
| = getlayer / haslayer with name |
| ~ basic getlayer IP ICMP IPerror TCPerror |
| x = IP() / ICMP() / IPerror() |
| assert x.getlayer(ICMP) is not None |
| assert x.getlayer(IPerror) is not None |
| assert x.getlayer("IP in ICMP") is not None |
| assert x.getlayer(TCPerror) is None |
| assert x.getlayer("TCP in ICMP") is None |
| assert x.haslayer(ICMP) |
| assert x.haslayer(IPerror) |
| assert x.haslayer("IP in ICMP") |
| assert not x.haslayer(TCPerror) |
| assert not x.haslayer("TCP in ICMP") |
| |
| = getlayer with a filter |
| ~ getlayer IP |
| pkt = IP() / IP(ttl=3) / IP() |
| assert pkt[IP::{"ttl":3}].ttl == 3 |
| assert pkt.getlayer(IP, ttl=3).ttl == 3 |
| assert IPv6ExtHdrHopByHop(options=[HBHOptUnknown()]).getlayer(HBHOptUnknown, otype=42) is None |
| |
| = specific haslayer and getlayer implementations for EAP |
| ~ haslayer getlayer EAP |
| pkt = Ether() / EAPOL() / EAP_MD5() |
| assert EAP in pkt |
| assert pkt.haslayer(EAP) |
| assert isinstance(pkt[EAP], EAP_MD5) |
| assert isinstance(pkt.getlayer(EAP), EAP_MD5) |
| |
| = specific haslayer and getlayer implementations for RadiusAttribute |
| ~ haslayer getlayer RadiusAttribute |
| pkt = RadiusAttr_EAP_Message() |
| assert RadiusAttribute in pkt |
| assert pkt.haslayer(RadiusAttribute) |
| assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message) |
| assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message) |
| |
| |
| = equality |
| ~ basic |
| w=Ether()/IP()/UDP(dport=53) |
| x=Ether()/IP(version=4)/UDP() |
| y=Ether()/IP()/UDP(dport=4) |
| z=Ether()/IP()/UDP()/NTP() |
| t=Ether()/IP()/TCP() |
| assert x != y and x != z and x != t and y != z and y != t and z != t and w == x |
| |
| = answers |
| ~ basic |
| a1, a2 = "1.2.3.4", "5.6.7.8" |
| p1 = IP(src=a1, dst=a2)/ICMP(type=8) |
| p2 = IP(src=a2, dst=a1)/ICMP(type=0) |
| assert p1.hashret() == p2.hashret() |
| assert not p1.answers(p2) |
| assert p2.answers(p1) |
| assert p1 > p2 |
| assert p2 < p1 |
| assert p1 == p1 |
| conf_back = conf.checkIPinIP |
| conf.checkIPinIP = True |
| px = [IP()/p1, IPv6()/p1] |
| assert not any(p.hashret() == p2.hashret() for p in px) |
| assert not any(p.answers(p2) for p in px) |
| assert not any(p2.answers(p) for p in px) |
| conf.checkIPinIP = False |
| assert all(p.hashret() == p2.hashret() for p in px) |
| assert not any(p.answers(p2) for p in px) |
| assert all(p2.answers(p) for p in px) |
| conf.checkIPinIP = conf_back |
| |
| = answers - Net |
| ~ netaccess |
| |
| a1, a2 = Net("www.google.com"), Net("www.secdev.org") |
| prt1, prt2 = 12345, 54321 |
| s1, s2 = 2767216324, 3845532842 |
| p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1) |
| assert p2.answers(p1) |
| assert not p1.answers(p2) |
| # Not available yet because of IPv6 |
| # a1, a2 = Net6("www.google.com"), Net6("www.secdev.org") |
| p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1) |
| assert p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1) |
| assert p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2) |
| assert not p2.answers(p1) |
| assert p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1) |
| assert not p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2) |
| assert not p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2) |
| assert not p2.answers(p1) |
| assert not p1.answers(p2) |
| |
| = conf.checkIPsrc |
| |
| conf_checkIPsrc = conf.checkIPsrc |
| conf.checkIPsrc = 0 |
| query = IP(id=42676, src='10.128.0.7', dst='192.168.0.1')/ICMP(id=26) |
| answer = IP(src='192.168.48.19', dst='10.128.0.7')/ICMP(type=11)/IPerror(id=42676, src='192.168.51.23', dst='192.168.0.1')/ICMPerror(id=26) |
| assert answer.answers(query) |
| conf.checkIPsrc = conf_checkIPsrc |
| |
| |
| ############ |
| ############ |
| + Tests on padding |
| |
| = Padding assembly |
| r = raw(Padding("abc")) |
| r |
| assert r == b"abc" |
| r = raw(Padding("abc")/Padding("def")) |
| r |
| assert r == b"abcdef" |
| r = raw(Raw("ABC")/Padding("abc")/Padding("def")) |
| r |
| assert r == b"ABCabcdef" |
| r = raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def")) |
| r |
| assert r == b"ABCDEFabcdef" |
| |
| = Padding and length computation |
| p = IP(raw(IP()/Padding("abc"))) |
| p |
| assert p.len == 20 and len(p) == 23 |
| p = IP(raw(IP()/Raw("ABC")/Padding("abc"))) |
| p |
| assert p.len == 23 and len(p) == 26 |
| p = IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def"))) |
| p |
| assert p.len == 23 and len(p) == 29 |
| |
| = PadField test |
| ~ PadField padding |
| |
| class TestPad(Packet): |
| fields_desc = [ PadField(StrNullField("st", b""), 6, padwith=b"\xff"), StrField("id", b"")] |
| |
| assert TestPad() == TestPad(raw(TestPad())) |
| assert raw(TestPad(st=b"st", id=b"id")) == b'st\x00\xff\xff\xffid' |
| |
| = ReversePadField |
| ~ PadField padding |
| |
| class TestReversePad(Packet): |
| fields_desc = [ ByteField("a", 0), |
| ReversePadField(IntField("b", 0), 4)] |
| |
| assert raw(TestReversePad(a=1, b=0xffffffff)) == b'\x01\x00\x00\x00\xff\xff\xff\xff' |
| assert TestReversePad(raw(TestReversePad(a=1, b=0xffffffff))).b == 0xffffffff |
| |
| ############ |
| ############ |
| + Tests on default value changes mechanism |
| |
| = Creation of an IPv3 class from IP class with different default values |
| class IPv3(IP): |
| version = 3 |
| ttl = 32 |
| |
| = Test of IPv3 class |
| a = IPv3() |
| v,t = a.version, a.ttl |
| v,t |
| assert (v,t) == (3,32) |
| r = raw(a) |
| r |
| assert r == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01' |
| |
| ############ |
| ############ |
| + ASN.1 tests |
| |
| = ASN1 - ASN1_Object |
| assert ASN1_Object(1) == ASN1_Object(1) |
| assert ASN1_Object(1) > ASN1_Object(0) |
| assert ASN1_Object(1) >= ASN1_Object(1) |
| assert ASN1_Object(0) < ASN1_Object(1) |
| assert ASN1_Object(1) <= ASN1_Object(2) |
| assert ASN1_Object(1) != ASN1_Object(2) |
| ASN1_Object(2).show() |
| |
| = ASN1 - RandASN1Object |
| a = RandASN1Object() |
| random.seed(0x2807) |
| o = bytes(a) |
| o |
| assert o in [ |
| b'\x1e\x023V', # PyPy 2.7 |
| b'A\x02\x07q', # Python 2.7 |
| b'B\x02\xfe\x92', # python 3.7-3.9 |
| ] |
| |
| = ASN1 - ASN1_BIT_STRING |
| a = ASN1_BIT_STRING("test", readable=True) |
| a |
| assert a.val == '01110100011001010111001101110100' |
| assert raw(a) == b'\x03\x05\x00test' |
| |
| a = ASN1_BIT_STRING(b"\xff"*16, readable=True) |
| a |
| assert a.val == "1" * 128 |
| assert raw(a) == b'\x03\x11\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' |
| |
| = ASN1 - ASN1_SEQUENCE |
| a = ASN1_SEQUENCE([ASN1_Object(1), ASN1_Object(0)]) |
| assert a.strshow() == '# ASN1_SEQUENCE:\n <ASN1_Object[1]>\n <ASN1_Object[0]>\n' |
| |
| = ASN1 - ASN1_DECODING_ERROR |
| a = ASN1_DECODING_ERROR("error", exc=OSError(1)) |
| assert repr(a) == "<ASN1_DECODING_ERROR['error']{{1}}>" |
| b = ASN1_DECODING_ERROR("error", exc=OSError(ASN1_BIT_STRING("0"))) |
| assert repr(b) in ["<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]=b'\\x00' (7 unused bits)>}}>", |
| "<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]='\\x00' (7 unused bits)>}}>"] |
| |
| = ASN1 - ASN1_INTEGER |
| a = ASN1_INTEGER(int("1"*23)) |
| assert repr(a) in ["0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...1111111111]>", |
| "0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...111111111L]>"] |
| |
| = ASN1 - ASN1_OID |
| assert raw(ASN1_OID("")) == b"\x06\x00" |
| |
| = RandASN1Object(), specific crashes |
| |
| import random |
| |
| # ASN1F_NUMERIC_STRING |
| random.seed(1514315682) |
| raw(RandASN1Object()) |
| |
| # ASN1F_VIDEOTEX_STRING |
| random.seed(1240186058) |
| raw(RandASN1Object()) |
| |
| # ASN1F_UTC_TIME & ASN1F_GENERALIZED_TIME |
| random.seed(1873503288) |
| raw(RandASN1Object()) |
| |
| = SSID is parsed properly even with the presence of RSN Information |
| ~ SSID RSN Information |
| # A regression test for https://github.com/secdev/scapy/pull/2685. |
| # https://github.com/secdev/scapy/issues/2683 describes a packet with |
| # RSN Information that isn't parsed properly, |
| # causing the SSID to be overridden. |
| # This test checks the SSID is parsed properly. |
| filename = scapy_path("/test/pcaps/bad_rsn_parsing_overrides_ssid.pcap") |
| frame = rdpcap(filename)[0] |
| beacon = frame.getlayer(5) |
| ssid = beacon.network_stats()['ssid'] |
| assert ssid == "ROUTE-821E295" |
| |
| = SSID is parsed properly even when the Country Information Tag Element has an odd length (not complying with the standard) and a missing pad byte |
| ~ Missing Pad Byte in Country Info |
| # A regression test for https://github.com/secdev/scapy/pull/2685. |
| # https://github.com/secdev/scapy/issues/4132 describes a packet with |
| # a Country Information element tag that has an odd length, even though it's against the standard. |
| # The transmitter should have added a padding byte to make the length even, but it didn't. |
| # The effect on scapy used to be improper parsing of the next tag elements, causing the SSID to be overridden. |
| # This test checks the SSID is parsed properly. |
| from io import BytesIO |
| pcapfile = BytesIO(b'\n\r\r\n\x80\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x10\x00Linux 6.1.21-v8+\x04\x00E\x00Dumpcap (Wireshark) 3.4.10 (Git v3.4.10 packaged as 3.4.10-0+deb11u1)\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x7f\x00\x00\x00\x00\x04\x00\x00\x02\x00\x05\x00wifi2\x00\x00\x00\t\x00\x01\x00\t\x00\x00\x00\x0c\x00\x10\x00Linux 6.1.21-v8+\x00\x00\x00\x00@\x00\x00\x00\x06\x00\x00\x00\xb0\x01\x00\x00\x00\x00\x00\x00c\xd3\x87\x17\xe3c5\x82\x90\x01\x00\x00\x90\x01\x00\x00\x00\x00 \x00\xae@\x00\xa0 \x08\x00\xa0 \x08\x00\x00\x10\x0cd\x14@\x01\xa9\x00\x0c\x00\x00\x00\xa6\x00\xa8\x01\x80\x00\x00\x00\xff\xff\xff\xff\xff\xff\x02\xbf\xaf\x9f\xf8\x07\x02\xbf\xaf\x9f\xf8\x070\x96[p\xdcM\x06\x00\x00\x00d\x00\x11\x00\x00\x00\x01\x08\x8c\x12\x98$\xb0H`l\x03\x01,\x05\x04\x00\x01\x00\x00\x07QUS \x01\r\x80$\x01\x80(\x01\x80,\x01\x800\x01\x804\x01\x808\x01\x80<\x01\x80@\x01\x80d\x01\x80h\x01\x80l\x01\x80p\x01\x80t\x01\x80x\x01\x80|\x01\x80\x80\x01\x80\x84\x01\x80\x88\x01\x80\x8c\x01\x80\x90\x01\x80\x95\x01\x80\x99\x01\x80\x9d\x01\x80\xa1\x01\x80\xa5\x01\x800\x14\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x02\x0c\x00;\x02s\x00-\x1a,\t\x13\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00,\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00=\x16,\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\x18\x00P\xf2\x02\x01\x01\x81\x00\x03\xa4\x00\x00\'\xa4\x00\x00BC]\x00a\x11.\x00\xdd;\x00P\xf2\x04\x10J\x00\x01\x10\x10D\x00\x01\x02\x10I\x00\x06\x007*\x00\x01 \x10\x11\x00\x1358" Hisense Roku TV\x10T\x00\x08\x00\x07\x00P\xf2\x04\x00\x01\xdd\x16\xc8:k\x01\x01\x1048<@dhlptx|\x80\x84\x88\x8c\x90\xdd\x12Po\x9a\t\x02\x02\x00!\x0b\x03\x06\x00\x02\xbf\xaf\x9f\xf8\x07\xdd\rPo\x9a\n\x00\x00\x06\x01\x11\x1cD\x002\xf5N\xfbh\xb0\x01\x00\x00') |
| pktpcap = rdpcap(pcapfile) |
| frame = pktpcap[0] |
| beacon = frame.getlayer(4) |
| stats = beacon.network_stats() |
| ssid = stats['ssid'] |
| assert ssid == "" |
| country = stats['country'] |
| assert country == 'US' |
| |
| ############ |
| ############ |
| + Network tests |
| |
| * Those tests need network access |
| |
| = Sending and receiving an ICMP |
| ~ netaccess needs_root IP ICMP icmp_firewall |
| def _test(): |
| with no_debug_dissector(): |
| x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3) |
| x |
| assert x[IP].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IP].hops() <= 126 |
| x is not None and ICMP in x and x[ICMP].type == 0 |
| |
| retry_test(_test) |
| |
| = Sending a TCP syn message at layer 2 and layer 3 |
| ~ netaccess needs_root IP |
| def _test(): |
| tmp = send(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True) |
| assert len(tmp) == 1 |
| |
| tmp = sendp(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True) |
| assert len(tmp) == 1 |
| |
| p = Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S") |
| from decimal import Decimal |
| p.time = Decimal(p.time) |
| tmp = sendp(p, return_packets=True, realtime=True) |
| assert len(tmp) == 1 |
| |
| retry_test(_test) |
| |
| = Latency check: localhost ICMP |
| ~ netaccess needs_root linux latency |
| |
| # Note: still needs to enforce L3RawSocket as this won't work otherwise with libpcap |
| sock = conf.L3socket |
| conf.L3socket = L3RawSocket |
| |
| def _test(): |
| req = IP(dst="127.0.0.1")/ICMP() |
| ans = sr1(req, timeout=3) |
| assert (ans.time - req.sent_time) >= 0 |
| assert (ans.time - req.sent_time) <= 1e-3 |
| |
| try: |
| retry_test(_test) |
| finally: |
| conf.L3socket = sock |
| |
| = Test sniffing on multiple sockets |
| ~ netaccess needs_root sniff |
| |
| # This test sniffs on the same interface twice at the same time, to |
| # simulate sniffing on multiple interfaces. |
| |
| def _test(): |
| iface = conf.route.route(str(Net("www.google.com")))[0] |
| port = int(RandShort()) |
| pkt = IP(dst="www.google.com")/TCP(sport=port, dport=80, flags="S") |
| def cb(): |
| sr1(pkt, timeout=3) |
| sniffer = AsyncSniffer(started_callback=cb, |
| iface=[iface, iface], |
| lfilter=lambda x: TCP in x and x[TCP].dport == port, |
| prn=lambda x: x.summary(), |
| count=2) |
| sniffer.start() |
| sniffer.join(timeout=3) |
| assert len(sniffer.results) == 2 |
| for pkt in sniffer.results: |
| assert pkt.sniffed_on == iface |
| |
| retry_test(_test) |
| |
| = Sending a TCP syn 'forever' at layer 2 and layer 3 |
| ~ netaccess needs_root IP |
| def _test(): |
| tmp = srloop(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3) |
| assert type(tmp) == tuple and len(tmp[0]) == 1 |
| |
| tmp = srploop(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3) |
| assert type(tmp) == tuple and len(tmp[0]) == 1 |
| |
| retry_test(_test) |
| |
| = Sending and receiving an TCP syn with flooding methods |
| ~ netaccess needs_root IP flood |
| from functools import partial |
| # flooding methods do not support timeout. Packing the test for security |
| def _test_flood(ip, flood_function, add_ether=False): |
| with no_debug_dissector(): |
| p = IP(dst=ip)/TCP(sport=RandShort(), dport=80, flags="S") |
| if add_ether: |
| p = Ether()/p |
| p.show2() |
| x = flood_function(p, timeout=0.5, maxretries=10) |
| if type(x) == tuple: |
| x = x[0][0][1] |
| x |
| assert x[IP].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IP].hops() <= 126 |
| |
| _test_srflood = partial(_test_flood, "www.google.com", srflood) |
| retry_test(_test_srflood) |
| |
| _test_sr1flood = partial(_test_flood, "www.google.fr", sr1flood) |
| retry_test(_test_sr1flood) |
| |
| _test_srpflood = partial(_test_flood, "www.google.net", srpflood, True) |
| retry_test(_test_srpflood) |
| |
| _test_srp1flood = partial(_test_flood, "www.google.co.uk", srp1flood, True) |
| retry_test(_test_srp1flood) |
| |
| = test chainEX |
| ~ netaccess |
| |
| import socket |
| sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| ssck = StreamSocket(sck) |
| |
| try: |
| r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True) |
| assert False |
| except Exception: |
| assert True |
| finally: |
| sck.close() |
| |
| = Sending and receiving an ICMPv6EchoRequest |
| ~ netaccess ipv6 |
| def _test(): |
| with no_debug_dissector(): |
| x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3) |
| x |
| assert x[IPv6].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IPv6].hops() <= 126 |
| x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129 |
| |
| retry_test(_test) |
| |
| = Whois request |
| ~ netaccess IP |
| * This test retries on failure because it often fails |
| def _test(): |
| IP(src="8.8.8.8").whois() |
| |
| retry_test(_test) |
| |
| = AS resolvers |
| ~ netaccess IP as_resolvers |
| * This test retries on failure because it often fails |
| |
| def _test(): |
| ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4") |
| assert (len(ret) == 2) |
| assert any(x[1] == "AS15169" for x in ret) |
| |
| retry_test(_test) |
| |
| riswhois_data = b"route: 8.8.8.0/24\ndescr: Google\norigin: AS15169\nnotify: radb-contact@google.com\nmnt-by: MAINT-AS15169\nchanged: radb-contact@google.com 20150728\nsource: RADB\n\nroute: 8.0.0.0/9\ndescr: Proxy-registered route object\norigin: AS3356\nremarks: auto-generated route object\nremarks: this next line gives the robot something to recognize\nremarks: L'enfer, c'est les autres\nremarks: \nremarks: This route object is for a Level 3 customer route\nremarks: which is being exported under this origin AS.\nremarks: \nremarks: This route object was created because no existing\nremarks: route object with the same origin was found, and\nremarks: since some Level 3 peers filter based on these objects\nremarks: this route may be rejected if this object is not created.\nremarks: \nremarks: Please contact routing@Level3.net if you have any\nremarks: questions regarding this object.\nmnt-by: LEVEL3-MNT\nchanged: roy@Level3.net 20060203\nsource: LEVEL3\n\n\n" |
| |
| ret = AS_resolver_riswhois()._parse_whois(riswhois_data) |
| assert ret == ('AS15169', 'Google') |
| |
| retry_test(_test) |
| |
| # This test is too buggy, and is simulated below |
| #def _test(): |
| # ret = AS_resolver_cymru().resolve("8.8.8.8") |
| # assert (len(ret) == 1) |
| # all(x[1] == "AS15169" for x in ret) |
| # |
| #retry_test(_test) |
| |
| cymru_bulk_data = """ |
| Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000] |
| 24776 | 217.25.178.5 | INFOCLIP-AS, FR |
| 36459 | 192.30.253.112 | GITHUB - GitHub, Inc., US |
| 26496 | 68.178.213.61 | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US |
| """ |
| tmp = AS_resolver_cymru().parse(cymru_bulk_data) |
| assert len(tmp) == 3 |
| assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496'] |
| |
| = AS resolver - IPv6 |
| ~ netaccess IP |
| * This test retries on failure because it often fails |
| |
| def _test(): |
| as_resolver6 = AS_resolver6() |
| ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444") |
| assert (len(ret) == 2) |
| assert any(x[1] == 15169 for x in ret) |
| |
| retry_test(_test) |
| |
| = AS resolver - socket error |
| ~ IP |
| * This test checks that a failing resolver will not crash a script |
| |
| class MockAS_resolver(object): |
| def resolve(self, *ips): |
| raise socket.error |
| |
| asrm = AS_resolver_multi(MockAS_resolver()) |
| assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0 |
| |
| = sendpfast |
| ~ tcpreplay |
| |
| old_interactive = conf.interactive |
| conf.interactive = False |
| try: |
| sendpfast([]) |
| assert False |
| except Exception: |
| assert True |
| |
| conf.interactive = old_interactive |
| assert True |
| |
| ############ |
| ############ |
| + Generator tests |
| |
| = Implicit logic 1 |
| ~ IP TCP |
| a = Ether() / IP(ttl=(5, 10)) / TCP(dport=[80, 443]) |
| ls(a) |
| ls(a, verbose=True) |
| l = [p for p in a] |
| len(l) == 12 |
| |
| = Implicit logic 2 |
| ~ IP |
| a = IP(ttl=[1,2,(5,9)]) |
| ls(a) |
| ls(a, verbose=True) |
| l = [p for p in a] |
| len(l) == 7 |
| |
| = Implicit logic 3 |
| # In case there's a single option: __iter__ should not return self |
| a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP() |
| for i in a: |
| i.sent_time = 1 |
| |
| assert a.sent_time is None |
| |
| # In case they are several, self should never be returned |
| a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP(seq=(0, 5)) |
| for i in a: |
| i.sent_time = 1 |
| |
| assert a.sent_time is None |
| |
| |
| ############ |
| ############ |
| + Real usages |
| |
| = Port scan |
| ~ netaccess needs_root IP TCP |
| def _test(): |
| with no_debug_dissector(): |
| ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]), timeout=2) |
| |
| # New format: all Python versions |
| ans.make_table(lambda s, r: (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}"))) |
| |
| retry_test(_test) |
| |
| = Send & receive with debug_match |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| old_debug_match = conf.debug_match |
| conf.debug_match = True |
| with no_debug_dissector(): |
| ans, unans = sr(IP(dst="www.google.fr") / TCP(sport=RandShort(), dport=80, flags="S"), timeout=2) |
| assert ans[0].query == ans[0][0] |
| assert ans[0].answer == ans[0][1] |
| conf.debug_match = old_debug_match |
| assert ans and not unans |
| |
| retry_test(_test) |
| |
| = Send & receive with retry |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| with no_debug_dissector(): |
| ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, retry=1) |
| assert len(ans) == 1 and len(unans) == 1 |
| |
| retry_test(_test) |
| |
| = Send & receive with multi |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| with no_debug_dissector(): |
| ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, multi=1) |
| assert len(ans) >= 1 and len(unans) == 1 |
| |
| retry_test(_test) |
| |
| = Traceroute function |
| ~ netaccess needs_root tcpdump |
| * Let's test traceroute |
| def _test(): |
| ans, unans = traceroute("www.slashdot.org") |
| ans.nsummary() |
| s,r=ans[0] |
| s.show() |
| s.show(2) |
| |
| retry_test(_test) |
| |
| = send() and sniff() |
| ~ netaccess needs_root |
| |
| def _test(): |
| sendp(Ether()/IP(src="9.0.0.0")/UDP(), count=3, iface=conf.iface) |
| |
| r = sniff(timeout=3, count=1, |
| lfilter=lambda x: IP in x and x[IP].src == "9.0.0.0", |
| iface=conf.iface, |
| started_callback=_test) |
| |
| assert r |
| |
| = sniff() with socket failure |
| * GH issue 3631 |
| |
| REFPACKET = Ether()/IP()/UDP() |
| |
| # A socket that fails after 10 packets |
| class OOPipe(ObjectPipe): |
| def recv(self, x=MTU): |
| self.i = getattr(self, "i", 0) + 1 |
| if self.i == 11: |
| self.close() |
| raise OSError("Giant failure") |
| pkt = super(OOPipe, self).recv(x) |
| self.send(REFPACKET) |
| return pkt |
| |
| o = OOPipe() |
| o.send(REFPACKET) |
| |
| pkts = sniff(opened_socket=[o], timeout=3) |
| assert len(pkts) == 10 |
| |
| = GH issue 3306 |
| ~ netaccess needs_root |
| |
| send(fuzz(ARP())) |
| |
| = Test SuperSocket.select |
| ~ select |
| |
| import mock |
| |
| @mock.patch("scapy.supersocket.select") |
| def _test_select(select): |
| def f(a, b, c, d): |
| raise IOError(0) |
| select.side_effect = f |
| try: |
| SuperSocket.select([]) |
| return False |
| except: |
| return True |
| |
| assert _test_select() |
| |
| = Test L2ListenTcpdump socket |
| ~ netaccess |
| |
| # Needs to be fixed. Fails randomly |
| #import time |
| #for i in range(10): |
| # read_s = L2ListenTcpdump(iface=conf.iface) |
| # out_s = conf.L2socket(iface=conf.iface) |
| # time.sleep(5) # wait for read_s to be ready |
| # icmp_r = Ether()/IP(dst="secdev.org")/ICMP() |
| # res = sndrcv(out_s, icmp_r, timeout=5, rcv_pks=read_s)[0] |
| # read_s.close() |
| # out_s.close() |
| # time.sleep(5) |
| # if res: |
| # break |
| # |
| #response = res[0][1] |
| #assert response[ICMP].type == 0 |
| |
| True |
| |
| = Test set of sent_time by sr |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet = IP(dst="8.8.8.8")/ICMP() |
| r = sr(packet, timeout=2) |
| assert packet.sent_time is not None |
| |
| retry_test(_test) |
| |
| = Test set of sent_time by sr (multiple packets) |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet1 = IP(dst="8.8.8.8")/ICMP() |
| packet2 = IP(dst="8.8.4.4")/ICMP() |
| r = sr([packet1, packet2], timeout=2) |
| assert packet1.sent_time is not None |
| assert packet2.sent_time is not None |
| |
| retry_test(_test) |
| |
| = Test set of sent_time by srflood |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet = IP(dst="8.8.8.8")/ICMP() |
| r = srflood(packet, timeout=2) |
| assert packet.sent_time is not None |
| |
| retry_test(_test) |
| |
| = Test set of sent_time by srflood (multiple packets) |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet1 = IP(dst="8.8.8.8")/ICMP() |
| packet2 = IP(dst="8.8.4.4")/ICMP() |
| r = srflood([packet1, packet2], timeout=2) |
| assert packet1.sent_time is not None |
| assert packet2.sent_time is not None |
| |
| retry_test(_test) |
| |
| ############ |
| ############ |
| + ManuFDB tests |
| |
| = __repr__ |
| |
| if conf.manufdb: |
| len(conf.manufdb) |
| else: |
| True |
| |
| = check _resolve_MAC |
| |
| if conf.manufdb: |
| assert conf.manufdb._resolve_MAC("00:00:17") == "Oracle" |
| else: |
| True |
| |
| |
| ############ |
| ############ |
| + Ether tests with IPv6 |
| |
| = Ether IPv6 checking for dst |
| ~ netaccess ipv6 |
| |
| p = Ether()/IPv6(dst="www.google.com")/TCP() |
| assert p.dst != p[IPv6].dst |
| p.show() |
| |
| |
| ############ |
| ############ |
| + pcap / pcapng format support |
| |
| = Variable creations |
| from io import BytesIO |
| pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') |
| pcapngfile = BytesIO(b'\n\r\r\n\\\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00,\x00File created by merging: \nFile1: test.pcap \n\x04\x00\x08\x00mergecap\x00\x00\x00\x00\\\x00\x00\x00\x01\x00\x00\x00\\\x00\x00\x00e\x00\x00\x00\xff\xff\x00\x00\x02\x006\x00Unknown/not available in original file format(libpcap)\x00\x00\t\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\\\x00\x00\x00\x06\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00/\xfc[\xcd(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00H\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\x1f\xff[\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\xb9\x02\\\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00<\x00\x00\x00') |
| pcapnanofile = BytesIO(b"M<\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacV\xc9\xc1\xb5'(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV-;\xc1'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\x9aL\xcf'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00") |
| pcapwirelenfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00}\x87pZ.\xa2\x08\x00\x0f\x00\x00\x00\x10\x00\x00\x00\xff\xff\xff\xff\xff\xff GG\xee\xdd\xa8\x90\x00a') |
| pcapngdefaults = BytesIO(base64_bytes(b'Cg0NChwAAABNPCsaAQAAAP//////////HAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACUeZiQAAAAAgAAAAAQAAACAAAAASAQAA//8AAAkAAQAJAAAAAAAAACAAAAABAAAAIAAAABIBAAD//wAACQABAAkAAAAAAAAAIAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACQAAAAAAAAAgAAAABgAAAIQBAAADAAAApO/bFdgJaeBiAQAAYgEAAFVVVVVVVVXV////////IMbr4D7PCABFAAFIlQkAAEAR5JwAAAAA/////wBEAEMBNJDsAQEGAFSpVwIACoAAAAAAAAAAAAAAAAAAAAAAACDG6+A+zwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABjglNjNQEB/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsOs+bAAAhAEAAAYAAACAAQAAAwAAAKTv2xXIDYznYAEAAGABAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABRgGPAAAEEal3qf5wqO////rhbgdsATJi0U5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRg0KDQpcQcvWgAEAAAYAAAC4AQAAAwAAAKTv2xV4Ao3nlQEAAJUBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABewGQAAAEEalBqf5wqO////rhbgdsAWfu+k5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOmRldmljZTpwMDBSZW1vdGVDb250cm9sbGVyOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46cGFuYXNvbmljLWNvbTpkZXZpY2U6cDAwUmVtb3RlQ29udHJvbGxlcjoxDQoNCrLVKmoAAAC4AQAABgAAAHgBAAADAAAApO/bFVjbjedXAQAAVwEAAFVVVVVVVVXVAQBef//6IMbr4D7PCABFAAE9AZEAAAQRqX6p/nCo7///+uFuB2wBKaZATk9USUZZICogSFRUUC8xLjENCkhPU1Q6IDIzOS4yNTUuMjU1LjI1MDoxOTAwDQpDQUNIRS1DT05UUk9MOiBtYXgtYWdlPTE4MDANCkxPQ0FUSU9OOiBodHRwOi8vMTY5LjI1NC4xMTIuMTY4OjU1MDAwL25yYy9kZGQueG1sDQpOVDogdXBucDpyb290ZGV2aWNlDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRjo6dXBucDpyb290ZGV2aWNlDQoNCjagXoUAeAEAAAYAAAC0AQAAAwAAAKTv2xXYw47nkwEAAJMBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABeQGSAAAEEalBqf5wqO////rhbgdsAWWV4E5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KTlRTOiBzc2RwOmFsaXZlDQpTRVJWRVI6IEZyZWVCU0QvOC4wIFVQblAvMS4wIFBhbmFzb25pYy1NSUwtRExOQS1TVi8xLjANClVTTjogdXVpZDo0RDQ1NDkzMC0wMjAwLTEwMDAtODAwMS0yMEM2RUJFMDNFQ0Y6OnVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KDQovXKFrALQBAAAGAAAAqAEAAAMAAACk79sVuJKP54cBAACHAQAAVVVVVVVVVdUBAF5///ogxuvgPs8IAEUAAW0BkwAABBGpTKn+cKjv///64W4HbAFZRNJOT1RJRlkgKiBIVFRQLzEuMQ0KSE9TVDogMjM5LjI1NS4yNTUuMjUwOjE5MDANCkNBQ0hFLUNPTlRST0w6IG1heC1hZ2U9MTgwMA0KTE9DQVRJT046IGh0dHA6Ly8xNjkuMjU0LjExMi4xNjg6NTUwMDAvbnJjL2RkZC54bWwNCk5UOiB1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCg0KLn5A6QCoAQAA')) |
| |
| = Read a pcap file |
| pktpcap = rdpcap(pcapfile) |
| |
| = Read a pcapng file |
| pktpcapng = rdpcap(pcapngfile) |
| assert pktpcapng[0].time == 1454163407.666223 |
| |
| = Read a pcap file with nanosecond precision |
| pktpcapnano = rdpcap(pcapnanofile) |
| assert pktpcapnano[0].time == 1454163407.666223049 |
| |
| = Read a pcapng file with nanosecond precision and default tsresol |
| pktpcapngdefaults = rdpcap(pcapngdefaults) |
| assert pktpcapngdefaults[0].time == 1575115986.114775512 |
| assert Ether in pktpcapngdefaults[0] |
| |
| = Read a pcapng with little-endian SHB |
| pktcapng = sniff(offline=scapy_path("/test/pcaps/macos.pcapng.gz")) |
| assert len(pktcapng) != 0 |
| |
| = Write a pcapng |
| |
| tmpfile = get_temp_file(autoext=".pcapng") |
| r = RawPcapNgWriter(tmpfile) |
| r._write_block_shb() |
| r._write_block_idb() |
| ts = 1632568366.384185 |
| r._write_block_epb(raw(Ether()/"Hello Scapy!!!"), ts) |
| r.f.close() |
| |
| assert os.stat(tmpfile).st_size == 108 |
| |
| l = rdpcap(tmpfile) |
| assert b"Scapy" in l[0][Raw].load |
| assert l[0].time == ts |
| |
| = Check wrpcapng() |
| |
| tmpfile = get_temp_file(autoext=".pcapng") |
| p = Ether()/"Hello Scapy!!!" |
| p.time = 1632568366.384185 |
| wrpcapng(tmpfile, p) |
| |
| assert os.stat(tmpfile).st_size == 108 |
| |
| l = rdpcap(tmpfile) |
| assert b"Scapy" in l[0][Raw].load |
| assert l[0].time == ts |
| |
| p = Ether() / IPv6() / TCP() |
| p.comment = b"Hello Scapy!" |
| wrpcapng(tmpfile, p) |
| l = rdpcap(tmpfile) |
| assert l[0].comment == p.comment |
| |
| = Read a pcap file with wirelen != captured len |
| pktpcapwirelen = rdpcap(pcapwirelenfile) |
| |
| = Check all packet lists are the same |
| assert list(pktpcap) == list(pktpcapng) == list(pktpcapnano) |
| assert [float(p.time) for p in pktpcap] == [float(p.time) for p in pktpcapng] == [float(p.time) for p in pktpcapnano] |
| |
| = Check packets from pcap file |
| assert all(IP in pkt for pkt in pktpcap) |
| assert all(any(proto in pkt for pkt in pktpcap) for proto in [ICMP, UDP, TCP]) |
| |
| = Check wirelen value from pcap file |
| assert len(pktpcapwirelen) == 1 |
| assert pktpcapwirelen[0].wirelen is not None |
| assert len(pktpcapwirelen[0]) < pktpcapwirelen[0].wirelen |
| |
| = Check wrpcap() then rdpcap() with wirelen |
| import os, tempfile |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| wrpcap(fdesc, pktpcapwirelen) |
| fdesc.close() |
| newpktpcapwirelen = rdpcap(filename) |
| assert len(newpktpcapwirelen) == 1 |
| assert newpktpcapwirelen[0].wirelen is not None |
| assert len(newpktpcapwirelen[0]) < newpktpcapwirelen[0].wirelen |
| assert newpktpcapwirelen[0].wirelen == pktpcapwirelen[0].wirelen |
| |
| = Check wrpcap() then rdpcap() with sent_time on SndRcvList |
| f = get_temp_file() |
| s = Ether()/IP() |
| r = Ether()/IP() |
| s.sent_time = 1 |
| r.time = 2 |
| wrpcap(f, SndRcvList([(s, r)])) |
| pcap = rdpcap(f) |
| assert pcap[0].time == 1 |
| assert pcap[1].time == 2 |
| |
| = Check wrpcap() |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| wrpcap(fdesc, pktpcap) |
| fdesc.close() |
| |
| = Check offline sniff() (by PacketList) |
| l=sniff(offline=PacketList([IP()/TCP(),IP()/TCP()])) |
| assert len(l) == 2 |
| assert all(TCP in p for p in l) |
| |
| = Check offline sniff() (by filename) |
| assert list(pktpcap) == list(sniff(offline=filename)) |
| |
| = Check offline sniff() (by file object) |
| fdesc = open(filename, "rb") |
| assert list(pktpcap) == list(sniff(offline=fdesc)) |
| fdesc.close() |
| |
| = Check offline sniff() with a filter (by filename) |
| ~ tcpdump libpcap |
| pktpcap_flt = [(proto, sniff(offline=filename, filter=proto.__name__.lower())) |
| for proto in [ICMP, UDP, TCP]] |
| assert all(list(pktpcap[proto]) == list(packets) for proto, packets in pktpcap_flt) |
| |
| = Check offline sniff() with a filter (by file object) |
| ~ tcpdump libpcap |
| fdesc = open(filename, "rb") |
| pktpcap_tcp = sniff(offline=fdesc, filter="tcp") |
| fdesc.close() |
| assert list(pktpcap[TCP]) == list(pktpcap_tcp) |
| os.unlink(filename) |
| |
| = Check offline sniff() with a PcapNg file and a filter (by file object) |
| ~ tcpdump libpcap |
| |
| pcapng_data = b'\n\r\r\n`\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x04\x009\x00TShark (Wireshark) 3.2.3 (Git v3.2.3 packaged as 3.2.3-1)\x00\x00\x00\x00\x00\x00\x00`\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\xff\xff\x00\x00\x14\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x98\xcd\x05\x00\x19\x83\xf7\x9e\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00' |
| |
| if OPENBSD: |
| # Note: OpenBSD tcpdump does not support PcapNg |
| assert True |
| else: |
| fdesc, filename = tempfile.mkstemp() |
| os.close(fdesc) |
| fd = open(filename, "wb") |
| fd.write(pcapng_data) |
| fd.close() |
| packets = sniff(offline=filename, filter="udp") |
| os.unlink(filename) |
| assert UDP in packets[0] |
| |
| = Check offline sniff() with Packets and tcpdump with a filter |
| ~ tcpdump libpcap |
| |
| l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp") |
| assert len(l) == 2 |
| assert all(UDP in p for p in l) |
| |
| l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp") |
| assert len(l) == 2 |
| assert all(UDP in p for p in l) |
| |
| l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp") |
| assert len(l) == 0 |
| |
| = Check offline sniff() with Packets, tcpdump and a bad filter |
| ~ tcpdump libpcap |
| |
| try: |
| sniff(offline=IP()/UDP(), filter="bad filter") |
| except Scapy_Exception: |
| pass |
| else: |
| assert False |
| |
| = Check offline sniff with lfilter |
| assert len(sniff(offline=[IP()/UDP(), IP()/TCP()], lfilter=lambda x: TCP in x)) == 1 |
| |
| = Check offline sniff() without a tcpdump binary |
| ~ tcpdump |
| import mock |
| |
| conf_prog_tcpdump = conf.prog.tcpdump |
| conf.prog.tcpdump = "tcpdump_fake" |
| |
| def _test_sniff_notcpdump(): |
| try: |
| sniff(offline="fake.pcap", filter="tcp") |
| assert False |
| except: |
| assert True |
| |
| _test_sniff_notcpdump() |
| conf.prog.tcpdump = conf_prog_tcpdump |
| |
| = Check wrpcap(nano=True) |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| pktpcapnano[0].time += Decimal('1E-9') |
| wrpcap(fdesc, pktpcapnano, nano=True) |
| fdesc.close() |
| pktpcapnanoread = rdpcap(filename) |
| assert pktpcapnanoread[0].time == pktpcapnano[0].time |
| os.unlink(filename) |
| |
| = Check PcapNg with nanosecond precision using obsolete packet block |
| * first packet from capture file icmp2.ntar -- https://wiki.wireshark.org/Development/PcapNg?action=AttachFile&do=view&target=icmp2.ntar |
| pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x02\x00\x00\x00n\x00\x00\x00\x00\x00\x00\x00e\x14\x00\x00)4\'ON\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00n\x00\x00\x00') |
| pktpcapng = rdpcap(pcapngfile) |
| assert len(pktpcapng) == 1 |
| pkt = pktpcapng[0] |
| # weird, but wireshark agrees |
| assert pkt.time == 22425.352221737 |
| assert isinstance(pkt, Ether) |
| pkt = pkt.payload |
| assert isinstance(pkt, IP) |
| pkt = pkt.payload |
| assert isinstance(pkt, ICMP) |
| pkt = pkt.payload |
| assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi' |
| pkt = pkt.payload |
| assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6' |
| pkt = pkt.payload |
| assert isinstance(pkt, NoPayload) |
| |
| = Check PcapNg using Simple Packet Block |
| * previous file with the (obsolete) packet block replaced by a Simple Packet Block |
| pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x03\x00\x00\x00`\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00`\x00\x00\x00') |
| pktpcapng = rdpcap(pcapngfile) |
| assert len(pktpcapng) == 1 |
| pkt = pktpcapng[0] |
| assert isinstance(pkt, Ether) |
| pkt = pkt.payload |
| assert isinstance(pkt, IP) |
| pkt = pkt.payload |
| assert isinstance(pkt, ICMP) |
| pkt = pkt.payload |
| assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi' |
| pkt = pkt.payload |
| assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6' |
| pkt = pkt.payload |
| assert isinstance(pkt, NoPayload) |
| |
| = Invalid pcapng files |
| |
| from io import BytesIO |
| |
| # Invalid PCAPNG format -> Raise |
| try: |
| invalid_pcapngfile_1 = BytesIO(b'\n\r\r\n\r\x00\x00\x00M<+\x1a\xb2<\xb2\xa1\x01\x00\x00\x00\r\x00\x00\x00M<+\x1a\x80\xaa\xb2\x02') |
| rdpcap(invalid_pcapngfile_1) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| # Invalid Packet in PCAPNG -> return |
| invalid_pcapngfile_2 = BytesIO(b'\n\r\r\n\x00\x00\x00\x10\x1a+<M\x00\x00\x00\x10\x00\x00\x00\x01\x00\x00\x00\x10 \x00\x00\x00\x10') |
| assert len(rdpcap(invalid_pcapngfile_2)) == 0 |
| |
| # Invalid interface ID in PCAPNG -> raise EOFError |
| try: |
| invalid_pcapngfile_3 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x03\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00') |
| rdpcap(invalid_pcapngfile_3) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| # Invalid SPB in PCAPNG -> raise EOFError |
| try: |
| invalid_pcapngfile_4 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x0c\x00\x00\x00') |
| rdpcap(invalid_pcapngfile_4) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| = Check PcapWriter on null write |
| |
| f = BytesIO() |
| w = PcapWriter(f) |
| w.write([]) |
| assert len(f.getvalue()) == 0 |
| |
| # Stop being closed for reals, but we still want to have the header written |
| with mock.patch.object(f, 'close') as cf: |
| w.close() |
| |
| cf.assert_called_once_with() |
| assert len(f.getvalue()) != 0 |
| |
| = Check PcapWriter sets correct linktype after null write |
| |
| f = BytesIO() |
| w = PcapWriter(f) |
| w.write([]) |
| assert len(f.getvalue()) == 0 |
| w.write(Ether()/IP()/ICMP()) |
| assert len(f.getvalue()) != 0 |
| |
| # Stop being closed for reals, but we still want to have the header written |
| with mock.patch.object(f, 'close') as cf: |
| w.close() |
| |
| cf.assert_called_once_with() |
| f.seek(0) or None |
| assert len(f.getvalue()) != 0 |
| |
| r = PcapReader(f) |
| f.seek(0) or None |
| assert r.LLcls is Ether |
| assert r.linktype == DLT_EN10MB |
| |
| l = [ p for p in RawPcapReader(f) ] |
| assert len(l) == 1 |
| |
| = Check RawPcapReader on pcap |
| ~ pcap |
| |
| fd = get_temp_file() |
| wrpcap(fd, [Ether()/IP()/ICMP()]) |
| assert len([p for p in RawPcapReader(fd)]) == 1 |
| |
| for (x, y) in RawPcapReader(fd): |
| pass |
| |
| = Check RawPcapReader with a Context Manager |
| ~ pcap |
| |
| filename = get_temp_file(fd=False) |
| wrpcap(filename, [IP()/TCP(), IP()/UDP()]) |
| |
| try: |
| with RawPcapReader(filename) as reader: |
| packet = next(reader, None) |
| assert True |
| except TypeError: |
| assert False |
| |
| = Check RawPcapWriter |
| ~ pcap |
| |
| # GH3256 |
| fd = get_temp_file() |
| with RawPcapWriter(fd, linktype=1) as w: |
| w.write(b"test") |
| |
| fd = get_temp_file() |
| with RawPcapWriter(fd) as w: |
| w.write(b"test") |
| assert w.linktype == 1 |
| |
| = Check tcpdump() |
| ~ tcpdump |
| from io import BytesIO |
| * No very specific tests because we do not want to depend on tcpdump output |
| pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x000}$]\xff\\\t\x006\x00\x00\x006\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x000}$]\x87i\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r0}$]\xfbp\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') |
| |
| data = tcpdump(pcapfile, dump=True, args=['-nn']).split(b'\n') |
| print(data) |
| assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0] |
| assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1] |
| assert b'127.0.0.1 > 127.0.0.1:' in data[2] |
| |
| * Non existing tcpdump binary |
| |
| import mock |
| |
| conf_prog_tcpdump = conf.prog.tcpdump |
| conf.prog.tcpdump = "tcpdump_fake" |
| |
| def _test_tcpdump_notcpdump(): |
| try: |
| tcpdump(IP()/TCP()) |
| assert False |
| except: |
| assert True |
| |
| _test_tcpdump_notcpdump() |
| conf.prog.tcpdump = conf_prog_tcpdump |
| |
| # Also check with use_tempfile=True (for non-OSX platforms) |
| pcapfile.seek(0) or None |
| tempfile_count = len(conf.temp_files) |
| data = tcpdump(pcapfile, dump=True, args=['-nn'], use_tempfile=True).split(b'\n') |
| print(data) |
| assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0] |
| assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1] |
| assert b'127.0.0.1 > 127.0.0.1:' in data[2] |
| # We should have another tempfile tracked. |
| assert len(conf.temp_files) > tempfile_count |
| |
| # Check with a simple packet |
| data = tcpdump([Ether()/IP()/ICMP()], dump=True, args=['-nn']).split(b'\n') |
| print(data) |
| assert b'127.0.0.1 > 127.0.0.1: ICMP' in data[0].upper() |
| |
| = Check tcpdump() command with linktype |
| ~ tcpdump libpcap |
| |
| f = BytesIO() |
| pkt = Ether()/IP()/ICMP() |
| |
| with mock.patch('subprocess.Popen', return_value=Bunch( |
| stdin=f, wait=lambda: None)) as popen: |
| # Prevent closing the BytesIO |
| with mock.patch.object(f, 'close'): |
| tcpdump([pkt], linktype="DLT_EN10MB", use_tempfile=False) |
| |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-'] |
| if OPENBSD: |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-'] |
| |
| popen.assert_called_once_with( |
| expected_command, |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| print(bytes_hex(f.getvalue())) |
| assert raw(pkt) in f.getvalue() |
| f.close() |
| del f, pkt |
| |
| = Check tcpdump() command with linktype and args |
| ~ tcpdump libpcap |
| |
| f = BytesIO() |
| pkt = Ether()/IP()/ICMP() |
| |
| with mock.patch('subprocess.Popen', return_value=Bunch( |
| stdin=f, wait=lambda: None)) as popen: |
| # Prevent closing the BytesIO |
| with mock.patch.object(f, 'close'): |
| tcpdump([pkt], linktype=scapy.data.DLT_EN10MB, use_tempfile=False) |
| |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-'] |
| if OPENBSD: |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-'] |
| |
| popen.assert_called_once_with( |
| expected_command, |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| print(bytes_hex(f.getvalue())) |
| assert raw(pkt) in f.getvalue() |
| f.close() |
| del f, pkt |
| |
| = Check sniff() offline with linktype & 802.11 filter |
| ~ tcpdump linux |
| |
| fd = get_temp_file() |
| wrpcap(fd, [RadioTap()/Dot11()/Dot11ProbeReq(), RadioTap()/Dot11()]) |
| lst = sniff(offline=fd, filter="subtype probe-req") |
| assert len(lst) == 1 |
| |
| = Check tcpdump() command rejects non-string input for prog |
| |
| pkt = Ether()/IP()/ICMP() |
| |
| try: |
| tcpdump([pkt], prog=+17607067425, args=['-nn']) |
| except ValueError as e: |
| if hasattr(e, 'args'): |
| assert 'prog' in e.args[0] |
| else: |
| assert 'prog' in e.message |
| else: |
| assert False, 'expected exception' |
| |
| = Check tcpdump() command with tshark |
| ~ tshark |
| pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') |
| # tshark doesn't need workarounds on OSX |
| tempfile_count = len(conf.temp_files) |
| values = [tuple(int(val) for val in line[:-1].split(b'\t')) for line in tcpdump(pcapfile, prog=conf.prog.tshark, getfd=True, args=['-T', 'fields', '-e', 'ip.ttl', '-e', 'ip.proto'])] |
| assert values == [(64, 6), (64, 17), (64, 1)] |
| assert len(conf.temp_files) == tempfile_count |
| |
| = Check tdecode command directly for tshark |
| ~ tshark |
| |
| pkts = [ |
| Ether()/IP(src='192.0.2.1', dst='192.0.2.2')/ICMP(type='echo-request')/Raw(b'X'*100), |
| Ether()/IP(src='192.0.2.2', dst='192.0.2.1')/ICMP(type='echo-reply')/Raw(b'X'*100), |
| ] |
| |
| # tshark doesn't need workarounds on OSX |
| tempfile_count = len(conf.temp_files) |
| |
| r = tdecode(pkts, dump=True) |
| r |
| assert b'Src: 192.0.2.1' in r |
| assert b'Src: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.1' in r |
| assert b'Echo (ping) request' in r |
| assert b'Echo (ping) reply' in r |
| assert b'ICMP' in r |
| assert len(conf.temp_files) == tempfile_count |
| |
| = Check tdecode with linktype |
| ~ tshark |
| |
| # These are the same as the ping packets above |
| pkts = [ |
| b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x01\xc0\x00\x02\x02\x08\x00\xb6\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', |
| b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x02\xc0\x00\x02\x01\x00\x00\xbe\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', |
| ] |
| |
| # tshark doesn't need workarounds on OSX |
| tempfile_count = len(conf.temp_files) |
| |
| r = tdecode(pkts, dump=True, linktype=DLT_EN10MB) |
| assert b'Src: 192.0.2.1' in r |
| assert b'Src: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.1' in r |
| assert b'Echo (ping) request' in r |
| assert b'Echo (ping) reply' in r |
| assert b'ICMP' in r |
| assert len(conf.temp_files) == tempfile_count |
| |
| |
| = Run scapy's tshark command |
| ~ needs_root |
| tshark(count=1, timeout=3) |
| |
| = Check wireshark() |
| ~ wireshark |
| |
| f = BytesIO() |
| pkt = Ether()/IP()/ICMP() |
| |
| with mock.patch('subprocess.Popen', return_value=Bunch(stdin=f)) as popen: |
| # Prevent closing the BytesIO |
| with mock.patch.object(f, 'close'): |
| wireshark([pkt]) |
| |
| popen.assert_called_once_with( |
| [conf.prog.wireshark, '-ki', '-'], |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| print(bytes_hex(f.getvalue())) |
| assert raw(pkt) in f.getvalue() |
| f.close() |
| del f, pkt |
| |
| = Check Raw IP pcap files |
| |
| import tempfile |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, [IP()/UDP(), IPv6()/UDP()], linktype=DLT_RAW) |
| packets = rdpcap(filename) |
| assert isinstance(packets[0], IP) and isinstance(packets[1], IPv6) |
| |
| = Check wrpcap() with no packet |
| |
| import tempfile |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, []) |
| fstat = os.stat(filename) |
| assert fstat.st_size != 0 |
| os.remove(filename) |
| |
| = Check wrpcap() with SndRcvList |
| |
| import tempfile |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, SndRcvList(res=[(Ether()/IP(), Ether()/IP())])) |
| assert len(rdpcap(filename)) == 2 |
| os.remove(filename) |
| |
| = Check wrpcap() with different packets types |
| |
| import mock |
| import os |
| import tempfile |
| |
| with mock.patch("scapy.utils.warning") as warning: |
| filename = tempfile.mktemp() |
| wrpcap(filename, [IP(), Ether(), IP(), IP()]) |
| os.remove(filename) |
| assert any("Inconsistent" in arg for arg in warning.call_args[0]) |
| |
| = Check wrpcap() with the Loopback layer |
| ~ tshark |
| |
| for cls in [Loopback, LoopbackOpenBSD]: |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, [cls()/IP()/ICMP()]) |
| return_value = b"".join(line for line in tcpdump(filename, prog=conf.prog.tshark, getfd=True)) |
| assert b"Echo (ping) request" in return_value |
| |
| ############ |
| ############ |
| + ERF Ethernet format support |
| |
| = Variable creations |
| erffile = BytesIO(b'3;!E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e') |
| erffilewithheader = BytesIO(b'4;!E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e') |
| |
| = Check reading of ERF Ethernet file |
| pkterf = rderf(erffile) |
| assert pkterf[0].time == 1603418463.270038318 |
| assert pkterf[0][IP].src == "10.1.93.219" |
| assert pkterf[0][IP].dst == "10.251.57.218" |
| assert pkterf[0][Ether].src == "1c:6a:7a:18:90:ed" |
| assert pkterf[0][Ether].dst == "00:0f:53:3f:ca:c0" |
| |
| = Check writing of ERF Ethernet file |
| import os, tempfile |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| wrerf(fdesc, pkterf) |
| fdesc.close() |
| newpkterf = rderf(filename) |
| |
| assert pkterf[1][Ether].src == newpkterf[1][Ether].src |
| |
| assert len(pkterf) == len(newpkterf) |
| assert newpkterf[0].time is not None |
| assert newpkterf[0].wirelen is not None |
| assert newpkterf[0].time == pkterf[0].time |
| assert newpkterf[0].wirelen == pkterf[0].wirelen |
| assert newpkterf[1].time is not None |
| assert newpkterf[1].wirelen is not None |
| assert newpkterf[1].time == pkterf[1].time |
| assert newpkterf[1].wirelen == pkterf[1].wirelen |
| |
| _, filename = tempfile.mkstemp() |
| wrerf(filename, pkterf, append=True) |
| wrerf(filename, pkterf, append=True) |
| newdoublepkterf = rderf(filename) |
| |
| assert len(newpkterf) * 2 == len(newdoublepkterf) |
| |
| = Check rderf |
| pkterf = rderf(erffilewithheader) |
| assert pkterf[1].time == 1603418463.270062279 |
| assert pkterf[1][Ether].src == "00:0f:53:3f:ca:c0" |
| |
| ############ |
| ############ |
| + Mocked read_routes() calls |
| |
| = Truncated netstat -rn output on OS X |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.get_if_addr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_osx_netstat_truncated(mock_os, mock_get_if_addr): |
| """Test read_routes() on OS X 10.? with a long interface name""" |
| # netstat & ifconfig outputs from https://github.com/secdev/scapy/pull/119 |
| netstat_output = u""" |
| Routing tables |
| |
| Internet: |
| Destination Gateway Flags Refs Use Netif Expire |
| default 192.168.1.1 UGSc 460 0 en1 |
| default link#11 UCSI 1 0 bridge1 |
| 127 127.0.0.1 UCS 1 0 lo0 |
| 127.0.0.1 127.0.0.1 UH 10 2012351 lo0 |
| """ |
| ifconfig_output = u"lo0 en1 bridge10\n" |
| # Mocked file descriptors |
| def se_popen(command): |
| """Perform specific side effects""" |
| if command.startswith("netstat -rn"): |
| return StringIO(netstat_output) |
| elif command == "ifconfig -l": |
| ret = StringIO(ifconfig_output) |
| def unit(): |
| return ret |
| ret.__call__ = unit |
| ret.__enter__ = unit |
| ret.__exit__ = lambda x,y,z: None |
| return ret |
| raise Exception("Command not mocked: %s" % command) |
| mock_os.popen.side_effect = se_popen |
| # Mocked get_if_addr() behavior |
| def se_get_if_addr(iface): |
| """Perform specific side effects""" |
| if iface == "bridge1": |
| return "0.0.0.0" |
| return "1.2.3.4" |
| mock_get_if_addr.side_effect = se_get_if_addr |
| # Test the function |
| from scapy.arch.unix import read_routes |
| scapy.arch.unix.DARWIN = True |
| scapy.arch.unix.FREEBSD = False |
| scapy.arch.unix.NETBSD = False |
| scapy.arch.unix.OPENBSD = False |
| routes = read_routes() |
| assert len(routes) == 4 |
| assert [r for r in routes if r[3] == "bridge10"] |
| |
| |
| test_osx_netstat_truncated() |
| |
| |
| = macOS 10.13 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.get_if_addr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_osx_10_13_ipv4(mock_os, mock_get_if_addr): |
| """Test read_routes() on OS X 10.13""" |
| # 'netstat -rn -f inet' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet: |
| Destination Gateway Flags Refs Use Netif Expire |
| default 192.168.28.1 UGSc 82 0 en0 |
| 127 127.0.0.1 UCS 0 0 lo0 |
| 127.0.0.1 127.0.0.1 UH 1 878 lo0 |
| 169.254 link#5 UCS 0 0 en0 |
| 192.168.28 link#5 UCS 4 0 en0 |
| 192.168.28.1/32 link#5 UCS 2 0 en0 |
| 192.168.28.1 88:32:9c:f5:4e:ea UHLWIir 40 37 en0 1177 |
| 192.168.28.2 62:aa:56:4b:51:54 UHLWI 0 0 en0 619 |
| 192.168.28.4 38:17:ed:9a:58:28 UHLWIi 1 6 en0 428 |
| 192.168.28.18/32 link#5 UCS 1 0 en0 |
| 192.168.28.18 88:32:9c:f5:4e:eb UHLWI 0 1 lo0 |
| 192.168.28.28 04:0e:eb:11:74:a7 UHLWI 0 0 en0 576 |
| 224.0.0/4 link#5 UmCS 1 0 en0 |
| 224.0.0.251 1:0:5e:0:0:fb UHmLWI 0 0 en0 |
| 255.255.255.255/32 link#5 UCS 0 0 en0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked get_if_addr() output |
| def se_get_if_addr(iface): |
| """Perform specific side effects""" |
| import socket |
| if iface == "en0": |
| return "192.168.28.18" |
| return "127.0.0.1" |
| mock_get_if_addr.side_effect = se_get_if_addr |
| # Test the function |
| from scapy.arch.unix import read_routes |
| scapy.arch.unix.DARWIN = False |
| scapy.arch.unix.FREEBSD = True |
| scapy.arch.unix.NETBSD = False |
| scapy.arch.unix.OPENBSD = False |
| routes = read_routes() |
| for r in routes: |
| print(r) |
| assert len(routes) == 15 |
| default_route = [r for r in routes if r[0] == 0][0] |
| assert default_route[3] == "en0" and default_route[4] == "192.168.28.18" |
| |
| test_osx_10_13_ipv4() |
| |
| |
| = macOS 10.15 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.get_if_addr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_osx_10_15_ipv4(mock_os, mock_get_if_addr): |
| """Test read_routes() on OS X 10.15""" |
| # 'netstat -rn -f inet' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet: |
| Destination Gateway Flags Netif Expire |
| default 192.168.122.1 UGSc en0 |
| 127 127.0.0.1 UCS lo0 |
| 127.0.0.1 127.0.0.1 UH lo0 |
| 169.254 link#8 UCS en0 ! |
| 192.168.122 link#8 UCS en0 ! |
| 192.168.122.1/32 link#8 UCS en0 ! |
| 192.168.122.1 52:54:0:c0:b7:af UHLWIir en0 1169 |
| 192.168.122.63/32 link#8 UCS en0 ! |
| 224.0.0/4 link#8 UmCS en0 ! |
| 224.0.0.251 1:0:5e:0:0:fb UHmLWI en0 |
| 255.255.255.255/32 link#8 UCS en0 ! |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked get_if_addr() output |
| def se_get_if_addr(iface): |
| """Perform specific side effects""" |
| import socket |
| if iface == "en0": |
| return "192.168.122.42" |
| return "127.0.0.1" |
| mock_get_if_addr.side_effect = se_get_if_addr |
| # Test the function |
| from scapy.arch.unix import read_routes |
| scapy.arch.unix.DARWIN = False |
| scapy.arch.unix.FREEBSD = True |
| scapy.arch.unix.NETBSD = False |
| scapy.arch.unix.OPENBSD = False |
| routes = read_routes() |
| for r in routes: |
| print(r) |
| assert len(routes) == 11 |
| default_route = [r for r in routes if r[0] == 0][0] |
| assert default_route[3] == "en0" and default_route[4] == "192.168.122.42" |
| |
| test_osx_10_15_ipv4() |
| |
| |
| = OpenBSD 6.3 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.get_if_addr") |
| @mock.patch("scapy.arch.unix.OPENBSD") |
| @mock.patch("scapy.arch.unix.os") |
| def test_openbsd_6_3(mock_os, mock_openbsd, mock_get_if_addr): |
| """Test read_routes() on OpenBSD 6.3""" |
| # 'netstat -rn -f inet' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet: |
| Destination Gateway Flags Refs Use Mtu Prio Iface |
| default 10.0.1.254 UGS 0 0 - 8 bge0 |
| 224/4 127.0.0.1 URS 0 23 32768 8 lo0 |
| 10.0.1/24 10.0.1.26 UCn 4 192 - 4 bge0 |
| 10.0.1.1 00:30:48:57:ed:0b UHLc 2 338 - 3 bge0 |
| 10.0.1.2 00:03:ba:0c:0b:52 UHLc 1 186 - 3 bge0 |
| 10.0.1.26 00:30:48:62:b3:f4 UHLl 0 47877 - 1 bge0 |
| 10.0.1.135 link#1 UHLch 1 194 - 3 bge0 |
| 10.0.1.254 link#1 UHLch 1 190 - 3 bge0 |
| 10.0.1.255 10.0.1.26 UHb 0 0 - 1 bge0 |
| 10.188.6/24 10.188.6.17 Cn 0 0 - 4 tap3 |
| 10.188.6.17 fe:e1:ba:d7:ff:32 UHLl 0 25 - 1 tap3 |
| 10.188.6.255 10.188.6.17 Hb 0 0 - 1 tap3 |
| 10.188.135/24 10.0.1.135 UGS 0 0 1350 L 8 bge0 |
| 127/8 127.0.0.1 UGRS 0 0 32768 8 lo0 |
| 127.0.0.1 127.0.0.1 UHhl 1 3835230 32768 1 lo0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| |
| # Mocked OpenBSD parsing behavior |
| mock_openbsd = True |
| |
| # Mocked get_if_addr() output |
| def se_get_if_addr(iface): |
| """Perform specific side effects""" |
| import socket |
| if iface == "bge0": |
| return "192.168.122.42" |
| return "10.0.1.26" |
| mock_get_if_addr.side_effect = se_get_if_addr |
| |
| # Test the function |
| from scapy.arch.unix import read_routes |
| return read_routes() |
| |
| routes = test_openbsd_6_3() |
| |
| for r in routes: |
| print(ltoa(r[0]), ltoa(r[1]), r) |
| # check that default route exists in parsed data structure |
| if ltoa(r[0]) == "0.0.0.0": |
| default = r |
| # check that route with locked mtu exists in parsed data structure |
| if ltoa(r[0]) == "10.188.135.0": |
| locked = r |
| |
| assert len(routes) == 11 |
| assert default[2] == "10.0.1.254" |
| assert default[3] == "bge0" |
| assert locked[2] == "10.0.1.135" |
| assert locked[3] == "bge0" |
| |
| = Solaris 11.1 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| # Mocked Solaris 11.1 parsing behavior |
| |
| @mock.patch("scapy.arch.get_if_addr") |
| @mock.patch("scapy.arch.unix.SOLARIS", True) |
| @mock.patch("scapy.arch.unix.os") |
| def test_solaris_111(mock_os, mock_get_if_addr): |
| """Test read_routes() on Solaris 11.1""" |
| # 'netstat -rvn -f inet' output |
| netstat_output = u""" |
| IRE Table: IPv4 |
| Destination Mask Gateway Device MTU Ref Flg Out In/Fwd |
| -------------------- --------------- -------------------- ------ ----- --- --- ----- ------ |
| default 0.0.0.0 10.0.2.2 net0 1500 2 UG 5 0 |
| 10.0.2.0 255.255.255.0 10.0.2.15 net0 1500 3 U 0 0 |
| 127.0.0.1 255.255.255.255 127.0.0.1 lo0 8232 2 UH 1517 1517 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| print(scapy.arch.unix.SOLARIS) |
| |
| # Mocked get_if_addr() output |
| def se_get_if_addr(iface): |
| """Perform specific side effects""" |
| import socket |
| if iface == "net0": |
| return "10.0.2.15" |
| return "127.0.0.1" |
| mock_get_if_addr.side_effect = se_get_if_addr |
| |
| # Test the function |
| from scapy.arch.unix import read_routes |
| return read_routes() |
| |
| routes = test_solaris_111() |
| print(routes) |
| assert len(routes) == 3 |
| assert routes[0][:4] == (0, 0, '10.0.2.2', 'net0') |
| assert routes[1][:4] == (167772672, 4294967040, '0.0.0.0', 'net0') |
| assert routes[2][:4] == (2130706433, 4294967295, '0.0.0.0', 'lo0') |
| |
| |
| ############ |
| ############ |
| + Mocked _parse_tcpreplay_result(stdout, stderr, argv, results_dict) |
| ~ mock_parse_tcpreplay_result |
| |
| = Test mocked _parse_tcpreplay_result |
| |
| from scapy.sendrecv import _parse_tcpreplay_result |
| |
| stdout = """Actual: 1024 packets (198929 bytes) sent in 67.88 seconds. |
| Rated: 2930.6 bps, 0.02 Mbps, 15.09 pps |
| Statistics for network device: mon0 |
| Attempted packets: 1024 |
| Successful packets: 1024 |
| Failed packets: 0 |
| Retried packets (ENOBUFS): 0 |
| Retried packets (EAGAIN): 0""" |
| |
| stderr = """Warning in sendpacket.c:sendpacket_open_pf() line 669: |
| Unsupported physical layer type 0x0323 on mon0. Maybe it works, maybe it won't. See tickets #123/318 |
| sending out mon0 |
| processing file: replay-example.pcap""" |
| |
| argv = ['tcpreplay', '--intf1=mon0', '--multiplier=1.00', '--timer=nano', 'replay-example.pcap'] |
| results_dict = _parse_tcpreplay_result(stdout, stderr, argv) |
| |
| results_dict |
| |
| assert results_dict["packets"] == 1024 |
| assert results_dict["bytes"] == 198929 |
| assert results_dict["time"] == 67.88 |
| assert results_dict["bps"] == 2930.6 |
| assert results_dict["mbps"] == 0.02 |
| assert results_dict["pps"] == 15.09 |
| assert results_dict["attempted"] == 1024 |
| assert results_dict["successful"] == 1024 |
| assert results_dict["failed"] == 0 |
| assert results_dict["retried_enobufs"] == 0 |
| assert results_dict["retried_eagain"] == 0 |
| assert results_dict["command"] == " ".join(argv) |
| assert len(results_dict["warnings"]) == 3 |
| |
| = Test more recent version with flows |
| |
| data = """Actual: 1 packets (42 bytes) sent in 0.000278 seconds |
| Rated: 151079.1 Bps, 1.20 Mbps, 3597.12 pps |
| Flows: 1 flows, 3597.12 fps, 1 flow packets, 0 non-flow |
| Statistics for network device: enp0s3 |
| Successful packets: 1 |
| Failed packets: 0 |
| Truncated packets: 0 |
| Retried packets (ENOBUFS): 0 |
| Retried packets (EAGAIN): 0 |
| """ |
| |
| results_dict = _parse_tcpreplay_result(data, "", []) |
| results_dict |
| |
| expected = { |
| 'bps': 151079.1, |
| 'bytes': 42, |
| 'command': '', |
| 'failed': 0, |
| 'flow_packets': 1, |
| 'flows': 1, |
| 'fps': 3597.12, |
| 'mbps': 1.2, |
| 'non_flow': 0, |
| 'packets': 1, |
| 'pps': 3597.12, |
| 'retried_eagain': 0, |
| 'retried_enobufs': 0, |
| 'successful': 1, |
| 'time': 0.000278, |
| 'truncated': 0, |
| 'warnings': [] |
| } |
| |
| assert results_dict == expected |
| |
| ############ |
| ############ |
| + Mocked read_routes6() calls |
| |
| = Preliminary definitions |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| def valid_output_read_routes6(routes): |
| """"Return True if 'routes' contains correctly formatted entries, False otherwise""" |
| for destination, plen, next_hop, dev, cset, me in routes: |
| if not in6_isvalid(destination) or not type(plen) == int: |
| return False |
| if not in6_isvalid(next_hop) or not isinstance(dev, str): |
| return False |
| for address in cset: |
| if not in6_isvalid(address): |
| return False |
| return True |
| |
| def check_mandatory_ipv6_routes(routes6): |
| """Ensure that mandatory IPv6 routes are present""" |
| if sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) < 1: |
| return False |
| if sum(1 for r in routes6 if r[0] == "fe80::" and r[1] == 64) < 1: |
| return False |
| if sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128 and \ |
| r[4] == ["::1"]) < 1: |
| return False |
| return True |
| |
| |
| = Mac OS X 10.9.5 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.unix.in6_getifaddr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_osx_10_9_5(mock_os, mock_in6_getifaddr): |
| """Test read_routes6() on OS X 10.9.5""" |
| # 'netstat -rn -f inet6' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet6: |
| Destination Gateway Flags Netif Expire |
| ::1 ::1 UHL lo0 |
| fe80::%lo0/64 fe80::1%lo0 UcI lo0 |
| fe80::1%lo0 link#1 UHLI lo0 |
| fe80::%en0/64 link#4 UCI en0 |
| fe80::ba26:6cff:fe5f:4eee%en0 b8:26:6c:5f:4e:ee UHLWIi en0 |
| fe80::bae8:56ff:fe45:8ce6%en0 b8:e8:56:45:8c:e6 UHLI lo0 |
| ff01::%lo0/32 ::1 UmCI lo0 |
| ff01::%en0/32 link#4 UmCI en0 |
| ff02::%lo0/32 ::1 UmCI lo0 |
| ff02::%en0/32 link#4 UmCI en0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked in6_getifaddr() output |
| mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), |
| ("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")] |
| # Test the function |
| from scapy.arch.unix import read_routes6 |
| scapy.arch.unix.DARWIN = False |
| scapy.arch.unix.FREEBSD = True |
| scapy.arch.unix.NETBSD = False |
| scapy.arch.unix.OPENBSD = False |
| routes = read_routes6() |
| for r in routes: |
| print(r) |
| assert len(routes) == 6 |
| assert check_mandatory_ipv6_routes(routes) |
| |
| test_osx_10_9_5() |
| |
| |
| = Mac OS X 10.9.5 with global IPv6 connectivity |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.unix.in6_getifaddr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_osx_10_9_5_global(mock_os, mock_in6_getifaddr): |
| """Test read_routes6() on OS X 10.9.5 with an IPv6 connectivity""" |
| # 'netstat -rn -f inet6' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet6: |
| Destination Gateway Flags Netif Expire |
| default fe80::ba26:8aff:fe5f:4eef%en0 UGc en0 |
| ::1 ::1 UHL lo0 |
| 2a01:ab09:7d:1f01::/64 link#4 UC en0 |
| 2a01:ab09:7d:1f01:420:205c:9fab:5be7 b8:e9:55:44:7c:e5 UHL lo0 |
| 2a01:ab09:7d:1f01:ba26:8aff:fe5f:4eef b8:26:8a:5f:4e:ef UHLWI en0 |
| 2a01:ab09:7d:1f01:bae9:55ff:fe44:7ce5 b8:e9:55:44:7c:e5 UHL lo0 |
| fe80::%lo0/64 fe80::1%lo0 UcI lo0 |
| fe80::1%lo0 link#1 UHLI lo0 |
| fe80::%en0/64 link#4 UCI en0 |
| fe80::5664:d9ff:fe79:4e00%en0 54:64:d9:79:4e:0 UHLWI en0 |
| fe80::6ead:f8ff:fe74:945a%en0 6c:ad:f8:74:94:5a UHLWI en0 |
| fe80::a2f3:c1ff:fec4:5b50%en0 a0:f3:c1:c4:5b:50 UHLWI en0 |
| fe80::ba26:8aff:fe5f:4eef%en0 b8:26:8a:5f:4e:ef UHLWIir en0 |
| fe80::bae9:55ff:fe44:7ce5%en0 b8:e9:55:44:7c:e5 UHLI lo0 |
| ff01::%lo0/32 ::1 UmCI lo0 |
| ff01::%en0/32 link#4 UmCI en0 |
| ff02::%lo0/32 ::1 UmCI lo |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked in6_getifaddr() output |
| mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), |
| ("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")] |
| # Test the function |
| from scapy.arch.unix import read_routes6 |
| routes = read_routes6() |
| print(routes) |
| assert valid_output_read_routes6(routes) |
| for r in routes: |
| print(r) |
| assert len(routes) == 11 |
| assert check_mandatory_ipv6_routes(routes) |
| |
| test_osx_10_9_5_global() |
| |
| |
| = Mac OS X 10.10.4 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.unix.in6_getifaddr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_osx_10_10_4(mock_os, mock_in6_getifaddr): |
| """Test read_routes6() on OS X 10.10.4""" |
| # 'netstat -rn -f inet6' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet6: |
| Destination Gateway Flags Netif Expire |
| ::1 ::1 UHL lo0 |
| fe80::%lo0/64 fe80::1%lo0 UcI lo0 |
| fe80::1%lo0 link#1 UHLI lo0 |
| fe80::%en0/64 link#4 UCI en0 |
| fe80::a00:27ff:fe9b:c965%en0 8:0:27:9b:c9:65 UHLI lo0 |
| ff01::%lo0/32 ::1 UmCI lo0 |
| ff01::%en0/32 link#4 UmCI en0 |
| ff02::%lo0/32 ::1 UmCI lo0 |
| ff02::%en0/32 link#4 UmCI en0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked in6_getifaddr() output |
| mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), |
| ("fe80::a00:27ff:fe9b:c965", IPV6_ADDR_LINKLOCAL, "en0")] |
| # Test the function |
| from scapy.arch.unix import read_routes6 |
| routes = read_routes6() |
| for r in routes: |
| print(r) |
| assert len(routes) == 5 |
| assert check_mandatory_ipv6_routes(routes) |
| |
| test_osx_10_10_4() |
| |
| |
| = FreeBSD 10.2 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.unix.in6_getifaddr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_freebsd_10_2(mock_os, mock_in6_getifaddr): |
| """Test read_routes6() on FreeBSD 10.2""" |
| # 'netstat -rn -f inet6' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet6: |
| Destination Gateway Flags Netif Expire |
| ::/96 ::1 UGRS lo0 |
| ::1 link#2 UH lo0 |
| ::ffff:0.0.0.0/96 ::1 UGRS lo0 |
| fe80::/10 ::1 UGRS lo0 |
| fe80::%lo0/64 link#2 U lo0 |
| fe80::1%lo0 link#2 UHS lo0 |
| ff01::%lo0/32 ::1 U lo0 |
| ff02::/16 ::1 UGRS lo0 |
| ff02::%lo0/32 ::1 U lo0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked in6_getifaddr() output |
| mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0")] |
| # Test the function |
| from scapy.arch.unix import read_routes6 |
| routes = read_routes6() |
| scapy.arch.unix.DARWIN = False |
| scapy.arch.unix.FREEBSD = True |
| scapy.arch.unix.NETBSD = False |
| scapy.arch.unix.OPENBSD = False |
| for r in routes: |
| print(r) |
| assert len(routes) == 3 |
| assert check_mandatory_ipv6_routes(routes) |
| |
| test_freebsd_10_2() |
| |
| |
| = FreeBSD 13.0 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.get_if_addr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_freebsd_13(mock_os, mock_get_if_addr): |
| """Test read_routes() on FreeBSD 13""" |
| # 'netstat -rnW -f inet' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet: |
| Destination Gateway Flags Nhop# Mtu Netif Expire |
| default 10.0.0.1 UGS 3 1500 vtnet0 |
| 10.0.0.0/24 link#1 U 2 1500 vtnet0 |
| 10.0.0.8 link#2 UHS 1 16384 lo0 |
| 127.0.0.1 link#2 UH 1 16384 lo0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked get_if_addr() behavior |
| def se_get_if_addr(iface): |
| """Perform specific side effects""" |
| if iface == "vtnet0": |
| return "10.0.0.1" |
| return "1.2.3.4" |
| mock_get_if_addr.side_effect = se_get_if_addr |
| # Test the function |
| from scapy.arch.unix import read_routes |
| routes = read_routes() |
| scapy.arch.unix.DARWIN = False |
| scapy.arch.unix.FREEBSD = True |
| scapy.arch.unix.NETBSD = False |
| scapy.arch.unix.OPENBSD = False |
| for r in routes: |
| print(r) |
| assert r[3] in ["vtnet0", "lo0"] |
| assert len(routes) == 4 |
| |
| test_freebsd_13() |
| |
| |
| = OpenBSD 5.5 |
| ~ mock_read_routes_bsd |
| |
| import mock |
| from io import StringIO |
| |
| @mock.patch("scapy.arch.unix.OPENBSD") |
| @mock.patch("scapy.arch.unix.in6_getifaddr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_openbsd_5_5(mock_os, mock_in6_getifaddr, mock_openbsd): |
| """Test read_routes6() on OpenBSD 5.5""" |
| # 'netstat -rn -f inet6' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet6: |
| Destination Gateway Flags Refs Use Mtu Prio Iface |
| ::/104 ::1 UGRS 0 0 - 8 lo0 |
| ::/96 ::1 UGRS 0 0 - 8 lo0 |
| ::1 ::1 UH 14 0 33144 4 lo0 |
| ::127.0.0.0/104 ::1 UGRS 0 0 - 8 lo0 |
| ::224.0.0.0/100 ::1 UGRS 0 0 - 8 lo0 |
| ::255.0.0.0/104 ::1 UGRS 0 0 - 8 lo0 |
| ::ffff:0.0.0.0/96 ::1 UGRS 0 0 - 8 lo0 |
| 2002::/24 ::1 UGRS 0 0 - 8 lo0 |
| 2002:7f00::/24 ::1 UGRS 0 0 - 8 lo0 |
| 2002:e000::/20 ::1 UGRS 0 0 - 8 lo0 |
| 2002:ff00::/24 ::1 UGRS 0 0 - 8 lo0 |
| fe80::/10 ::1 UGRS 0 0 - 8 lo0 |
| fe80::%em0/64 link#1 UC 0 0 - 4 em0 |
| fe80::a00:27ff:fe04:59bf%em0 08:00:27:04:59:bf UHL 0 0 - 4 lo0 |
| fe80::%lo0/64 fe80::1%lo0 U 0 0 - 4 lo0 |
| fe80::1%lo0 link#3 UHL 0 0 - 4 lo0 |
| fec0::/10 ::1 UGRS 0 0 - 8 lo0 |
| ff01::/16 ::1 UGRS 0 0 - 8 lo0 |
| ff01::%em0/32 link#1 UC 0 0 - 4 em0 |
| ff01::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0 |
| ff02::/16 ::1 UGRS 0 0 - 8 lo0 |
| ff02::%em0/32 link#1 UC 0 0 - 4 em0 |
| ff02::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| |
| # Mocked in6_getifaddr() output |
| mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), |
| ("fe80::a00:27ff:fe04:59bf", IPV6_ADDR_LINKLOCAL, "em0")] |
| # Mocked OpenBSD parsing behavior |
| mock_openbsd = True |
| # Test the function |
| from scapy.arch.unix import read_routes6 |
| routes = read_routes6() |
| for r in routes: |
| print(r) |
| assert len(routes) == 5 |
| assert check_mandatory_ipv6_routes(routes) |
| |
| test_openbsd_5_5() |
| |
| |
| = NetBSD 7.0 |
| ~ mock_read_routes_bsd |
| |
| @mock.patch("scapy.arch.unix.NETBSD") |
| @mock.patch("scapy.arch.unix.in6_getifaddr") |
| @mock.patch("scapy.arch.unix.os") |
| def test_netbsd_7_0(mock_os, mock_in6_getifaddr, mock_netbsd): |
| """Test read_routes6() on NetBSD 7.0""" |
| # 'netstat -rn -f inet6' output |
| netstat_output = u""" |
| Routing tables |
| |
| Internet6: |
| Destination Gateway Flags Refs Use Mtu Interface |
| ::/104 ::1 UGRS - - - lo0 |
| ::/96 ::1 UGRS - - - lo0 |
| ::1 ::1 UH - - 33648 lo0 |
| ::127.0.0.0/104 ::1 UGRS - - - lo0 |
| ::224.0.0.0/100 ::1 UGRS - - - lo0 |
| ::255.0.0.0/104 ::1 UGRS - - - lo0 |
| ::ffff:0.0.0.0/96 ::1 UGRS - - - lo0 |
| 2001:db8::/32 ::1 UGRS - - - lo0 |
| 2002::/24 ::1 UGRS - - - lo0 |
| 2002:7f00::/24 ::1 UGRS - - - lo0 |
| 2002:e000::/20 ::1 UGRS - - - lo0 |
| 2002:ff00::/24 ::1 UGRS - - - lo0 |
| fe80::/10 ::1 UGRS - - - lo0 |
| fe80::%wm0/64 link#1 UC - - - wm0 |
| fe80::acd1:3989:180e:fde0 08:00:27:a1:64:d8 UHL - - - lo0 |
| fe80::%lo0/64 fe80::1 U - - - lo0 |
| fe80::1 link#2 UHL - - - lo0 |
| ff01:1::/32 link#1 UC - - - wm0 |
| ff01:2::/32 ::1 UC - - - lo0 |
| ff02::%wm0/32 link#1 UC - - - wm0 |
| ff02::%lo0/32 ::1 UC - - - lo0 |
| """ |
| # Mocked file descriptor |
| strio = StringIO(netstat_output) |
| mock_os.popen = mock.MagicMock(return_value=strio) |
| # Mocked in6_getifaddr() output |
| mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), |
| ("fe80::acd1:3989:180e:fde0", IPV6_ADDR_LINKLOCAL, "wm0")] |
| # Test the function |
| from scapy.arch.unix import read_routes6 |
| routes = read_routes6() |
| for r in routes: |
| print(r) |
| assert len(routes) == 5 |
| assert check_mandatory_ipv6_routes(routes) |
| |
| test_netbsd_7_0() |
| |
| |
| ############ |
| ############ |
| + Mocked route() calls |
| |
| = Mocked IPv4 routes calls |
| |
| import scapy |
| |
| conf.ifaces._add_fake_iface("enp3s0") |
| conf.ifaces._add_fake_iface("lo") |
| |
| old_iface = conf.iface |
| old_loopback = conf.loopback_name |
| try: |
| conf.iface = 'enp3s0' |
| conf.loopback_name = 'lo' |
| conf.route.invalidate_cache() |
| conf.route.routes = [ |
| (4294967295, 4294967295, '0.0.0.0', 'wlan0', '', 281), |
| (4294967295, 4294967295, '0.0.0.0', 'lo', '', 291), |
| (4294967295, 4294967295, '0.0.0.0', 'enp3s0', '192.168.0.119', 281), |
| (3758096384, 4026531840, '0.0.0.0', 'lo', '', 291), |
| (3758096384, 4026531840, '0.0.0.0', 'wlan0', '', 281), |
| (3758096384, 4026531840, '0.0.0.0', 'enp3s0', '1.1.1.1', 281), |
| (3232235775, 4294967295, '0.0.0.0', 'enp3s0', '2.2.2.2', 281), |
| (3232235639, 4294967295, '0.0.0.0', 'enp3s0', '3.3.3.3', 281), |
| (3232235520, 4294967040, '0.0.0.0', 'enp3s0', '4.4.4.4', 281), |
| (0, 0, '192.168.0.254', 'enp3s0', '192.168.0.119', 25) |
| ] |
| assert conf.route.route("192.168.0.0-10") == ('enp3s0', '4.4.4.4', '0.0.0.0') |
| assert conf.route.route("192.168.0.119") == ('lo', '192.168.0.119', '0.0.0.0') |
| assert conf.route.route("224.0.0.0") == ('enp3s0', '1.1.1.1', '0.0.0.0') |
| assert conf.route.route("255.255.255.255") == ('enp3s0', '192.168.0.119', '0.0.0.0') |
| assert conf.route.route("*") == ('enp3s0', '192.168.0.119', '192.168.0.254') |
| finally: |
| conf.loopback_name = old_loopback |
| conf.iface = old_iface |
| conf.route.resync() |
| conf.ifaces.reload() |
| |
| |
| = Mocked IPv6 routes calls |
| |
| conf.ifaces._add_fake_iface("enp3s0") |
| conf.ifaces._add_fake_iface("lo") |
| |
| old_iface = conf.iface |
| old_loopback = conf.loopback_name |
| try: |
| conf.route6.ipv6_ifaces = set(['enp3s0', 'wlan0', 'lo']) |
| conf.iface = 'enp3s0' |
| conf.loopback_name = 'lo' |
| conf.route6.invalidate_cache() |
| conf.route6.routes = [ |
| ('fe80::dd17:1fa6:a123:ab4', 128, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291), |
| ('fe80::7101:5678:1234:da65', 128, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281), |
| ('fe80::1f:ae12:4d2c:abff', 128, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281), |
| ('fe80::', 64, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281), |
| ('fe80::', 64, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291), |
| ('fe80::', 64, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281), |
| ('2a01:e35:1e06:ab56:7010:6548:9646:fa77', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281), |
| ('2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281), |
| ('2a01:e35:1e06:ab56::', 64, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281), |
| ('::', 0, 'fe80::160c:64aa:ef6f:fe14', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281) |
| ] |
| assert conf.route6.route("2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', '::') |
| assert conf.route6.route("::1") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', 'fe80::160c:64aa:ef6f:fe14') |
| assert conf.route6.route("ff02::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::') |
| assert conf.route6.route("fe80::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::') |
| assert conf.route6.route("fe80::1", dev='lo') == ('lo', 'fe80::dd17:1fa6:a123:ab4', '::') |
| finally: |
| conf.loopback_name = old_loopback |
| conf.iface = old_iface |
| conf.route6.resync() |
| conf.ifaces.reload() |
| |
| = Find a link-local address when conf.iface does not support IPv6 |
| |
| old_iface = conf.iface |
| conf.route6.ipv6_ifaces = set(['eth1', 'lo']) |
| conf.iface = "eth0" |
| conf.route6.routes = [("fe80::", 64, "::", "eth1", ["fe80::a00:28ff:fe07:1980"], 256), ("::1", 128, "::", "lo", ["::1"], 0), ("fe80::a00:28ff:fe07:1980", 128, "::", "lo", ["::1"], 0)] |
| assert conf.route6.route("fe80::2807") == ("eth1", "fe80::a00:28ff:fe07:1980", "::") |
| conf.iface = old_iface |
| conf.route6.resync() |
| |
| = Windows: reset routes properly |
| |
| if WINDOWS: |
| from scapy.arch.windows import _route_add_loopback |
| _route_add_loopback() |
| |
| |
| ############ |
| ############ |
| ############ |
| + Tests of StreamSocket |
| |
| = Test with DNS over TCP |
| ~ netaccess |
| |
| import socket |
| sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sck.connect(("8.8.8.8", 53)) |
| |
| class DNSTCP(Packet): |
| name = "DNS over TCP" |
| fields_desc = [ FieldLenField("len", None, fmt="!H", length_of="dns"), |
| PacketLenField("dns", 0, DNS, length_from=lambda p: p.len)] |
| |
| ssck = StreamSocket(sck, DNSTCP) |
| |
| r = ssck.sr1(DNSTCP(dns=DNS(rd=1, qd=DNSQR(qname="www.example.com"))), timeout=3) |
| sck.close() |
| assert DNSTCP in r and len(r.dns.an) |
| |
| ############ |
| + Tests of SSLStreamContext |
| |
| = Test with recv() calls that return exact packet-length rawings |
| ~ sslraweamsocket |
| |
| import socket |
| class MockSocket(object): |
| def __init__(self): |
| self.l = [ b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x03' ] |
| def recv(self, x): |
| if len(self.l) == 0: |
| raise socket.error(100, 'EOF') |
| return self.l.pop(0) |
| def fileno(self): |
| return -1 |
| def close(self): |
| return |
| |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ |
| IntField('data', 0) |
| ] |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| s = MockSocket() |
| ss = SSLStreamSocket(s, basecls=TestPacket) |
| |
| p = ss.recv() |
| assert p.data == 1 |
| p = ss.recv() |
| assert p.data == 2 |
| p = ss.recv() |
| assert p.data == 3 |
| try: |
| ss.recv() |
| ret = False |
| except socket.error: |
| ret = True |
| |
| assert ret |
| |
| = Test with recv() calls that return twice as much data as the exact packet-length |
| ~ sslraweamsocket |
| |
| import socket |
| class MockSocket(object): |
| def __init__(self): |
| self.l = [ b'\x00\x00\x00\x01\x00\x00\x00\x02', b'\x00\x00\x00\x03\x00\x00\x00\x04' ] |
| def recv(self, x): |
| if len(self.l) == 0: |
| raise socket.error(100, 'EOF') |
| return self.l.pop(0) |
| def fileno(self): |
| return -1 |
| def close(self): |
| return |
| |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ |
| IntField('data', 0) |
| ] |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| s = MockSocket() |
| ss = SSLStreamSocket(s, basecls=TestPacket) |
| |
| p = ss.recv() |
| assert p.data == 1 |
| p = ss.recv() |
| assert p.data == 2 |
| p = ss.recv() |
| assert p.data == 3 |
| p = ss.recv() |
| assert p.data == 4 |
| try: |
| ss.recv() |
| ret = False |
| except socket.error: |
| ret = True |
| |
| assert ret |
| |
| = Test with recv() calls that return not enough data |
| ~ sslraweamsocket |
| |
| import socket |
| class MockSocket(object): |
| def __init__(self): |
| self.l = [ b'\x00\x00', b'\x00\x01', b'\x00\x00\x00', b'\x02', b'\x00\x00', b'\x00', b'\x03' ] |
| def recv(self, x): |
| if len(self.l) == 0: |
| raise socket.error(100, 'EOF') |
| return self.l.pop(0) |
| def fileno(self): |
| return -1 |
| def close(self): |
| return |
| |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ |
| IntField('data', 0) |
| ] |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| s = MockSocket() |
| ss = SSLStreamSocket(s, basecls=TestPacket) |
| |
| try: |
| p = ss.recv() |
| ret = False |
| except: |
| ret = True |
| |
| assert ret |
| p = ss.recv() |
| assert p.data == 1 |
| try: |
| p = ss.recv() |
| ret = False |
| except: |
| ret = True |
| |
| assert ret |
| p = ss.recv() |
| assert p.data == 2 |
| try: |
| p = ss.recv() |
| ret = False |
| except: |
| ret = True |
| |
| assert ret |
| try: |
| p = ss.recv() |
| ret = False |
| except: |
| ret = True |
| |
| assert ret |
| p = ss.recv() |
| assert p.data == 3 |
| |
| |
| ############ |
| ############ |
| + Test correct conversion from binary to rawing of IPv6 addresses |
| |
| = IPv6 bin to rawing conversion |
| from scapy.pton_ntop import _inet6_ntop, inet_ntop |
| import socket |
| for binfrm, address in [ |
| (b'\x00' * 16, '::'), |
| (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88', |
| '1111:2222:3333:4444:5555:6666:7777:8888'), |
| (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00', |
| '1111:2222:3333:4444:5555::'), |
| (b'\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88', |
| '::4444:5555:6666:7777:8888'), |
| (b'\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88', |
| '0:0:3333:4444::8888'), |
| (b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', |
| '1::'), |
| (b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01', |
| '::1'), |
| (b'\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88', |
| '1111::4444:0:0:7777:8888'), |
| (b'\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00', |
| '1000:200:30:4:5:60:700:8000'), |
| ]: |
| addr1 = inet_ntop(socket.AF_INET6, binfrm) |
| addr2 = _inet6_ntop(binfrm) |
| assert address == addr1 == addr2 |
| |
| = IPv6 bin to rawing conversion - Zero-block of length 1 |
| binfrm = b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88' |
| addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm) |
| # On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and |
| # shortens the single zero block to '::'. This is a valid IPv6 address |
| # representation anyway. |
| assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888', |
| '1111:2222:3333:4444:5555:6666::8888']) |
| assert addr2 == '1111:2222:3333:4444:5555:6666:0:8888' |
| |
| = IPv6 bin to rawing conversion - Illegal sizes |
| for binfrm in ["\x00" * 15, b"\x00" * 17]: |
| rc = False |
| try: |
| inet_ntop(socket.AF_INET6, binfrm) |
| except Exception as exc1: |
| _exc1 = exc1 |
| rc = True |
| assert rc |
| try: |
| _inet6_ntop(binfrm) |
| except Exception as exc2: |
| rc = isinstance(exc2, type(_exc1)) |
| assert rc |
| |
| |
| ############ |
| ############ |
| + Addresses generators |
| |
| = Net |
| |
| assert list(Net("192.168.0.0/31")) == ["192.168.0.0", "192.168.0.1"] |
| |
| assert "1.2.3.4" in Net("0.0.0.0/0") |
| |
| assert "192.168.0.0/25" in Net("192.168.0.0/24") |
| |
| assert "192.168.0.0/23" not in Net("192.168.0.0/24") |
| |
| assert "0.0.0.0/1" in Net("0.0.0.0/0") |
| |
| assert "0.0.0.0/0" not in Net("0.0.0.0/1") |
| |
| assert Net("1.2.3.0/24") == Net("1.2.3.0", "1.2.3.255") |
| |
| assert hash(Net("1.2.3.0/24")) == hash(Net("1.2.3.0", "1.2.3.255")) |
| |
| = Net using name |
| ~ netaccess |
| |
| ip = IP(dst="www.google.com") |
| n1 = ip.dst |
| assert isinstance(n1, Net) |
| ip.show() |
| |
| = Multiple IP addresses test |
| ~ netaccess |
| |
| ip = IP(dst=['192.168.0.1', 'www.google.fr'],ihl=(1,5)) |
| assert ip.dst[0] == '192.168.0.1' |
| assert isinstance(ip.dst[1], Net) |
| src = ip.src |
| assert src |
| assert isinstance(src, str) |
| |
| = OID |
| |
| oid = OID("1.2.3.4.5.6-8") |
| sum(1 for o in oid) == 3 |
| assert oid.__iterlen__() == 3 |
| |
| = Net6 |
| |
| n1 = Net6("2001:db8::/127") |
| assert len(list(n1)) == 2 |
| assert len(n1) == 2 |
| |
| n2 = Net6("fec0::/110") |
| assert len(n2) == 262144 |
| |
| assert "ffff::ffff" in Net6("::/0") |
| |
| assert "::/1" in Net6("::/0") |
| |
| assert "::/0" not in Net6("::/1") |
| |
| assert Net6("::/120") == Net6("::", "::ff") |
| |
| assert hash(Net6("::/120")) == hash(Net6("::", "::ff")) |
| |
| assert Net6("::1.2.3.0/120") == Net6("::1.2.3.0", "::1.2.3.255") |
| |
| assert hash(Net6("::1.2.3.0/120")) == hash(Net6("::1.2.3.0", "::1.2.3.255")) |
| |
| assert Net6("::1.2.3.0/120") != Net("1.2.3.0/24") |
| |
| assert hash(Net6("::1.2.3.0/120")) != hash(Net("1.2.3.0/24")) |
| |
| = Net6 using web address |
| ~ netaccess ipv6 |
| |
| ip = IPv6(dst="www.google.com") |
| n1 = ip.dst |
| assert isinstance(n1, Net6) |
| assert "www.google.com" in repr(n1) |
| ip.show() |
| |
| ip = IPv6(dst="www.yahoo.com") |
| assert IPv6(raw(ip)).dst == [p.dst for p in ip][0] |
| |
| = Multiple IPv6 addresses test |
| ~ netaccess ipv6 |
| |
| ip = IPv6(dst=['2001:db8::1', 'www.google.fr'],hlim=(1,5)) |
| assert ip.dst[0] == '2001:db8::1' |
| assert isinstance(ip.dst[1], Net6) |
| src = ip.src |
| assert src |
| assert isinstance(src, str) |
| |
| = Test repr on Net |
| ~ netaccess |
| |
| conf.color_theme = BlackAndWhite() |
| output = repr(IP(src="www.google.com")) |
| assert 'Net("www.google.com/32")' in output |
| |
| = Test repr on Net |
| ~ netaccess ipv6 |
| |
| conf.color_theme = BlackAndWhite() |
| assert 'Net6("www.google.com/128")' in repr(IPv6(src="www.google.com")) |
| |
| ############ |
| ############ |
| + IPv6 helpers |
| |
| = in6_getLocalUniquePrefix() |
| |
| p = in6_getLocalUniquePrefix() |
| len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd") |
| |
| = Misc addresses manipulation functions |
| |
| teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807) |
| |
| ip6 = IP6Field("test", None) |
| ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]" |
| ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]" |
| |
| in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True |
| |
| in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True |
| |
| a = inet_pton(socket.AF_INET6, "2001:db8::2807") |
| in6_xor(a, a) == b"\x00" * 16 |
| |
| a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6") |
| r = inet_ntop(socket.AF_INET6, in6_getnsma(a)) |
| r == "ff02::1:ffd4:e5f6" |
| |
| in6_isllsnmaddr(r) == True |
| |
| in6_isdocaddr("2001:db8::2807") == True |
| |
| in6_isaddrllallnodes("ff02::1") == True |
| |
| in6_isaddrllallservers("ff02::2") == True |
| |
| = in6_getscope() |
| |
| assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL |
| assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL |
| assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL |
| assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL |
| assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL |
| assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL |
| assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK |
| assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK |
| |
| = construct_source_candidate_set() |
| |
| dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")] |
| |
| assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"] |
| assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"] |
| assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"] |
| assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"] |
| assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"] |
| assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"] |
| assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"] |
| assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"] |
| |
| = inet_pton() |
| |
| from scapy.pton_ntop import _inet6_pton, inet_pton |
| import socket |
| |
| ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a", |
| "fe80:1234:abcd::192.168.40.12:abcd", |
| "fe80:1234:abcd::192.168.40", |
| "fe80:1234:abcd::192.168.400.12", |
| "1234:5678:9abc:def0:1234:5678:9abc:def0:", |
| "1234:5678:9abc:def0:1234:5678:9abc:def0:1234"] |
| for ip6 in ip6_bad_addrs: |
| rc = False |
| exc1 = None |
| try: |
| res1 = inet_pton(socket.AF_INET6, ip6) |
| except Exception as e: |
| rc = True |
| exc1 = e |
| assert rc |
| rc = False |
| try: |
| res2 = _inet6_pton(ip6) |
| except Exception as exc2: |
| rc = isinstance(exc2, type(exc1)) |
| assert rc |
| |
| ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12", |
| b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'), |
| ("fe80:1234:abcd::fe06", |
| b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'), |
| ("fe80::2e67:ef2d:7ece:ed8a", |
| b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'), |
| ("::ffff", |
| b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'), |
| ("ffff::", |
| b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), |
| ('::', b'\x00' * 16)] |
| for ip6, res in ip6_good_addrs: |
| res1 = inet_pton(socket.AF_INET6, ip6) |
| res2 = _inet6_pton(ip6) |
| assert res == res1 == res2 |
| |
| |
| ############ |
| ############ |
| + Test Route class |
| |
| = make_route() |
| |
| r4 = Route() |
| tmp_route = r4.make_route(host="10.12.13.14") |
| (tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0') |
| |
| tmp_route = r4.make_route(net="10.12.13.0/24") |
| (tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0') |
| |
| = add() & delt() |
| |
| r4 = Route() |
| len_r4 = len(r4.routes) |
| r4.add(net="192.168.1.0/24", gw="1.2.3.4") |
| len(r4.routes) == len_r4 + 1 |
| r4.delt(net="192.168.1.0/24", gw="1.2.3.4") |
| len(r4.routes) == len_r4 |
| |
| = ifchange() |
| |
| r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface()) |
| r4.ifchange(get_dummy_interface(), "5.6.7.8") |
| r4.routes[-1][4] == "5.6.7.8" |
| |
| = ifdel() |
| |
| r4.ifdel(get_dummy_interface()) |
| len(r4.routes) == len_r4 |
| |
| = ifadd() & get_if_bcast() |
| |
| r4 = Route() |
| len_r4 = len(r4.routes) |
| |
| r4.ifadd(get_dummy_interface(), "1.2.3.4/24") |
| len(r4.routes) == len_r4 +1 |
| |
| r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255" |
| |
| r4.ifdel(get_dummy_interface()) |
| len(r4.routes) == len_r4 |
| |
| dummy_interface = get_dummy_interface() |
| |
| bck_conf_route_routes = conf.route.routes |
| conf.route.routes = [ |
| (0, 0, '172.21.230.1', dummy_interface, '172.21.230.10', 1), # 0.0.0.0 / 0.0.0.0 == 255.255.255.255 |
| (2851995648, 4294901760, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 169.254.0.0 / 255.255.0.0 == 169.254.255.255 |
| (2887116288, 4294967040, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.0 / 255.255.255.0 == 172.21.230.255 |
| (2887116289, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.1 / 255.255.255.255 == 172.21.230.1 |
| (3758096384, 4026531840, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.0 / 240.0.0.0 == 239.255.255.255 |
| (3758096635, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.251 / 255.255.255.255 == 224.0.0.251 |
| (4294967295, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 255.255.255.255 / 255.255.255.255 == 255.255.255.255 |
| ] |
| |
| assert sorted(conf.route.get_if_bcast(dummy_interface)) == sorted(['169.254.255.255', '172.21.230.255', '239.255.255.255']) |
| conf.route.routes = bck_conf_route_routes |
| |
| = Remove dummy interface |
| |
| conf.ifaces.reload() |
| |
| ############ |
| ############ |
| + Flags |
| |
| = IP flags |
| ~ IP |
| |
| pkt = IP(flags="MF") |
| assert pkt.flags.MF |
| assert not pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 1 (MF)>' |
| pkt.flags.MF = 0 |
| pkt.flags.DF = 1 |
| assert not pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 2 (DF)>' |
| pkt.flags |= 'evil+MF' |
| pkt.flags &= 'DF+MF' |
| assert pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 3 (MF+DF)>' |
| |
| pkt = IP(flags=3) |
| assert pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 3 (MF+DF)>' |
| pkt.flags = 6 |
| assert not pkt.flags.MF |
| assert pkt.flags.DF |
| assert pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 6 (DF+evil)>' |
| |
| assert len({IP().flags, IP().flags}) == 1 |
| |
| pkt = IP() |
| pkt.flags = "" |
| assert pkt.flags == 0 |
| |
| = TCP flags |
| ~ TCP |
| |
| pkt = TCP(flags="SA") |
| assert pkt.flags == 18 |
| assert pkt.flags.S |
| assert pkt.flags.A |
| assert pkt.flags.SA |
| assert not any(getattr(pkt.flags, f) for f in 'FRPUECN') |
| assert repr(pkt.flags) == '<Flag 18 (SA)>' |
| pkt.flags.U = True |
| pkt.flags.S = False |
| assert pkt.flags.A |
| assert pkt.flags.U |
| assert pkt.flags.AU |
| assert not any(getattr(pkt.flags, f) for f in 'FSRPECN') |
| assert repr(pkt.flags) == '<Flag 48 (AU)>' |
| pkt.flags &= 'SFA' |
| pkt.flags |= 'P' |
| assert pkt.flags.P |
| assert pkt.flags.A |
| assert pkt.flags.PA |
| assert not any(getattr(pkt.flags, f) for f in 'FSRUECN') |
| |
| pkt = TCP(flags=56) |
| assert all(getattr(pkt.flags, f) for f in 'PAU') |
| assert pkt.flags.PAU |
| assert not any(getattr(pkt.flags, f) for f in 'FSRECN') |
| assert repr(pkt.flags) == '<Flag 56 (PAU)>' |
| pkt.flags = 50 |
| assert all(getattr(pkt.flags, f) for f in 'SAU') |
| assert pkt.flags.SAU |
| assert not any(getattr(pkt.flags, f) for f in 'FRPECN') |
| assert repr(pkt.flags) == '<Flag 50 (SAU)>' |
| |
| = Flag values mutation with .raw_packet_cache |
| ~ IP TCP |
| |
| pkt = IP(raw(IP(flags="MF")/TCP(flags="SA"))) |
| assert pkt.raw_packet_cache is not None |
| assert pkt[TCP].raw_packet_cache is not None |
| assert pkt.flags.MF |
| assert not pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 1 (MF)>' |
| assert pkt[TCP].flags.S |
| assert pkt[TCP].flags.A |
| assert pkt[TCP].flags.SA |
| assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN') |
| assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>' |
| pkt.flags.MF = 0 |
| pkt.flags.DF = 1 |
| pkt[TCP].flags.U = True |
| pkt[TCP].flags.S = False |
| pkt = IP(raw(pkt)) |
| assert not pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 2 (DF)>' |
| assert pkt[TCP].flags.A |
| assert pkt[TCP].flags.U |
| assert pkt[TCP].flags.AU |
| assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN') |
| assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>' |
| |
| = Operations on flag values |
| ~ TCP |
| |
| p1, p2 = TCP(flags="SU"), TCP(flags="AU") |
| assert (p1.flags & p2.flags).U |
| assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN') |
| assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU') |
| assert (p1.flags | p2.flags).SAU |
| assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN') |
| |
| assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags |
| assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags |
| |
| |
| ############ |
| ############ |
| + 802.3 |
| |
| = Test detection |
| |
| assert isinstance(Dot3(raw(Ether())),Ether) |
| assert isinstance(Ether(raw(Dot3())),Dot3) |
| |
| a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00') |
| assert isinstance(a,Dot3) |
| assert a.dst == 'ff:ff:ff:ff:ff:ff' |
| assert a.src == '00:00:00:00:00:00' |
| |
| a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00') |
| assert isinstance(a,Ether) |
| assert a.dst == 'ff:ff:ff:ff:ff:ff' |
| assert a.src == '00:00:00:00:00:00' |
| |
| |
| ############ |
| ############ |
| + ASN.1 |
| |
| = MIB |
| ~ mib |
| |
| import tempfile |
| fd, fname = tempfile.mkstemp() |
| os.write(fd, b"-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n") |
| os.close(fd) |
| |
| load_mib(fname) |
| assert sum(1 for k in conf.mib.d.values() if "scapy" in k) == 1 |
| |
| assert sum(1 for oid in conf.mib) > 100 |
| |
| = MIB - graph |
| ~ mib |
| |
| import mock |
| |
| @mock.patch("scapy.asn1.mib.do_graph") |
| def get_mib_graph(do_graph): |
| def store_graph(graph, **kargs): |
| assert graph.startswith("""digraph "mib" {""") |
| assert """"test.2807" [ label="scapy" ];""" in graph |
| do_graph.side_effect = store_graph |
| conf.mib._make_graph() |
| |
| get_mib_graph() |
| |
| = MIB - test aliases |
| ~ mib |
| |
| # https://github.com/secdev/scapy/issues/2542 |
| assert conf.mib._oidname("2.5.29.19") == "basicConstraints" |
| |
| = DADict tests |
| |
| a = DADict("test") |
| a[0] = "test_value1" |
| a["scapy"] = "test_value2" |
| |
| assert a.test_value1 == 0 |
| assert a.test_value2 == "scapy" |
| |
| with ContextManagerCaptureOutput() as cmco: |
| a._show() |
| outp = cmco.get_output() |
| |
| assert "scapy = 'test_value2'" in outp |
| assert "0 = 'test_value1'" in outp |
| |
| = Test ETHER_TYPES |
| |
| assert ETHER_TYPES.IPv4 == 2048 |
| try: |
| import warnings |
| |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| ETHER_TYPES["BAOBAB"] = 0xffff |
| assert ETHER_TYPES.BAOBAB == 0xffff |
| assert issubclass(w[-1].category, DeprecationWarning) |
| except DeprecationWarning: |
| # -Werror is used |
| pass |
| |
| = BER tests |
| |
| BER_id_enc(42) == '*' |
| BER_id_enc(2807) == b'\xbfw' |
| |
| b = BERcodec_IPADDRESS() |
| r1 = b.enc("8.8.8.8") |
| r1 == b'@\x04\x08\x08\x08\x08' |
| |
| r2 = b.dec(r1)[0] |
| r2.val == '8.8.8.8' |
| |
| a = b'\x1f\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\x01\x01\x00C\x02\x01U0\x0f0\r\x06\x08+\x06\x01\x02\x01\x02\x01\x00\x02\x01!' |
| ret = False |
| try: |
| BERcodec_Object.check_type(a) |
| except BER_BadTag_Decoding_Error: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| |
| = BER trigger failures |
| |
| try: |
| BERcodec_INTEGER.do_dec(b"\x02\x01") |
| assert False |
| except BER_Decoding_Error: |
| pass |
| |
| |
| ############ |
| ############ |
| + Fields |
| |
| = FieldLenField with BitField |
| class Test(Packet): |
| name = "Test" |
| fields_desc = [ |
| FieldLenField("BitCount", None, fmt="H", count_of="Values"), |
| FieldLenField("ByteCount", None, fmt="B", length_of="Values"), |
| FieldListField("Values", [], BitField("data", 0x0, size=1), |
| count_from=lambda pkt: pkt.BitCount), |
| ] |
| |
| pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1]))) |
| assert pkt.BitCount == 8 |
| assert pkt.ByteCount == 1 |
| |
| = PacketListField |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ PacketListField('list', [], 0) ] |
| |
| a = TestPacket() |
| a.list.append(1) |
| assert len(a.list) == 1 |
| |
| b = TestPacket() |
| assert len(b.list) == 0 |
| |
| = Test PacketListField deepcopy |
| class SubPacket(Packet): |
| name = "SubPacket" |
| fields_desc = [ |
| ByteField("mem", 1), |
| ] |
| |
| class TestPacket(Packet): |
| name = "TestPacket" |
| fields_desc = [ |
| PacketListField("packlist", SubPacket(), SubPacket), |
| ] |
| |
| a = TestPacket() |
| b = a.copy() |
| fuzz(b) |
| assert a.packlist[0].mem == 1 |
| |
| = PacketField |
| |
| class InnerPacket(Packet): |
| fields_desc = [ StrField("f_name", "test") ] |
| |
| class TestPacket(Packet): |
| fields_desc = [ PacketField("inner", InnerPacket(), InnerPacket) ] |
| |
| p = TestPacket() |
| print(p.inner.f_name) |
| assert p.inner.f_name == b"test" |
| |
| p = TestPacket() |
| p.inner.f_name = b"scapy" |
| assert p.inner.f_name == b"scapy" |
| |
| p = TestPacket() |
| assert p.inner.f_name == b"test" |
| |
| + UUIDField |
| |
| = Parsing a human-readable UUID |
| f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef') |
| f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') |
| |
| = Parsing a machine-encoded UUID |
| f = UUIDField('f', bytearray.fromhex('0123456789abcdef0123456789abcdef')) |
| f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') |
| |
| = Parsing a tuple of values |
| f = UUIDField('f', (0x01234567, 0x89ab, 0xcdef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef)) |
| f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') |
| |
| = Handle None values |
| f = UUIDField('f', None) |
| f.addfield(None, b'', f.default) == hex_bytes('00000000000000000000000000000000') |
| |
| = Get a UUID for dissection |
| from uuid import UUID |
| f = UUIDField('f', None) |
| f.getfield(None, bytearray.fromhex('0123456789abcdef0123456789abcdef01')) == (b'\x01', UUID('01234567-89ab-cdef-0123-456789abcdef')) |
| |
| = Verify little endian UUIDField |
| * The endianness of a UUIDField should be apply by block on each block in parenthesis '(01234567)-(89ab)-(cdef)-(01)(23)-(45)(67)(89)(ab)(cd)(ef)' |
| f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_LE) |
| f.addfield(None, b'', f.default) == hex_bytes('67452301ab89efcd0123456789abcdef') |
| |
| = Verify reversed UUIDField |
| * This should reverse the entire value as 128-bits |
| f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_REV) |
| f.addfield(None, b'', f.default) == hex_bytes('efcdab8967452301efcdab8967452301') |
| |
| + RandUUID |
| |
| = RandUUID setup |
| |
| RANDUUID_TEMPLATE = '01234567-89ab-*-01*-*****ef' |
| RANDUUID_FIXED = uuid.uuid4() |
| |
| = RandUUID default behaviour |
| |
| ru = RandUUID() |
| assert ru._fix().version == 4 |
| assert ru.command() == "RandUUID()" |
| |
| = RandUUID incorrect implicit args |
| |
| assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, name="scapy")) |
| assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, name="scapy")) |
| assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(name="scapy")) |
| assert expect_exception(ValueError, lambda: RandUUID(namespace=uuid.uuid4())) |
| |
| = RandUUID v4 UUID (correct args) |
| |
| u = RandUUID(version=4)._fix() |
| assert u.version == 4 |
| |
| u2 = RandUUID(version=4)._fix() |
| assert u2.version == 4 |
| |
| assert str(u) != str(u2) |
| |
| = RandUUID v4 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, node=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, clock_seq=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, name="scapy")) |
| |
| = RandUUID v1 UUID |
| |
| u = RandUUID(version=1)._fix() |
| assert u.version in [1, 4] |
| |
| u = RandUUID(version=1, node=0x1234)._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| |
| u = RandUUID(version=1, clock_seq=0x1234)._fix() |
| assert u.version == 1 |
| assert u.clock_seq == 0x1234 |
| |
| ru = RandUUID(version=1, node=0x1234, clock_seq=0x1bcd) |
| assert ru.command() == "RandUUID(node=4660, clock_seq=7117, version=1)" |
| u = ru._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| assert u.clock_seq == 0x1bcd |
| |
| = RandUUID v1 UUID (implicit version) |
| |
| u = RandUUID(node=0x1234)._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| |
| u = RandUUID(clock_seq=0x1234)._fix() |
| assert u.version == 1 |
| assert u.clock_seq == 0x1234 |
| |
| u = RandUUID(node=0x1234, clock_seq=0x1bcd)._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| assert u.clock_seq == 0x1bcd |
| |
| = RandUUID v1 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=1, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=1, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(version=1, name="scapy")) |
| |
| = RandUUID v5 UUID |
| |
| ru = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy") |
| u = ru._fix() |
| assert u.version == 5 |
| assert ru.command() == "RandUUID(namespace=%r, name='scapy', version=5)" % RANDUUID_FIXED |
| |
| u2 = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u2.version == 5 |
| assert u.bytes == u2.bytes |
| |
| # implicit v5 |
| u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u.bytes == u2.bytes |
| |
| = RandUUID v5 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234)) |
| |
| = RandUUID v3 UUID |
| |
| u = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u.version == 3 |
| |
| u2 = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u2.version == 3 |
| assert u.bytes == u2.bytes |
| |
| # implicit v5 |
| u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u.bytes != u2.bytes |
| |
| = RandUUID v3 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234)) |
| |
| = RandUUID looks like a UUID with str |
| assert re.match(r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', str(RandUUID()), re.I) is not None |
| |
| = RandUUID with a static part |
| * RandUUID template can contain static part such a 01234567-89ab-*-01*-*****ef |
| ru = RandUUID('01234567-89ab-*-01*-*****ef') |
| assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{10}ef', str(ru), re.I) is not None |
| assert ru.command() == "RandUUID(template='01234567-89ab-*-01*-*****ef')" |
| |
| = RandUUID with a range part |
| * RandUUID template can contain a part with a range of values such a 01234567-89ab-*-01*-****c0:c9ef |
| assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{8}c[0-9]ef', str(RandUUID('01234567-89ab-*-01*-****c0:c9ef')), re.I) is not None |
| |
| ############ |
| ############ |
| + MPLS tests |
| |
| = MPLS - build/dissection |
| from scapy.contrib.mpls import EoMCW, MPLS |
| p1 = MPLS()/IP()/UDP() |
| assert p1[MPLS].s == 1 |
| p2 = MPLS()/MPLS()/IP()/UDP() |
| assert p2[MPLS].s == 0 |
| |
| p1[MPLS] |
| p1[IP] |
| p2[MPLS] |
| p2[MPLS:1] |
| p2[IP] |
| |
| = MPLS encapsulated Ethernet with CW - build/dissection |
| p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") |
| p /= MPLS(label=1)/EoMCW(seq=1234) |
| p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP() |
| p = Ether(raw(p)) |
| assert p[EoMCW].zero == 0 |
| assert p[EoMCW].reserved == 0 |
| assert p[EoMCW].seq == 1234 |
| |
| = MPLS encapsulated Ethernet without CW - build/dissection |
| p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") |
| p /= MPLS(label=2)/MPLS(label=1) |
| p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP() |
| p = Ether(raw(p)) |
| assert p[Ether:2].type == 0x0800 |
| |
| try: |
| p[EoMCW] |
| except IndexError: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| assert p[Ether:2].type == 0x0800 |
| |
| = MPLS encapsulated IP - build/dissection |
| p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") |
| p /= MPLS(label=1)/IP() |
| p = Ether(raw(p)) |
| |
| try: |
| p[EoMCW] |
| except IndexError: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| |
| try: |
| p[Ether:2] |
| except IndexError: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| |
| p[IP] |
| |
| |
| ############ |
| ############ |
| + PacketList methods |
| |
| = sr() |
| |
| class Req(Packet): |
| fields_desc = [ |
| ByteField("raw", 0) |
| ] |
| def answers(self, other): |
| return False |
| |
| class Res(Packet): |
| fields_desc = [ |
| ByteField("raw", 0) |
| ] |
| def answers(self, other): |
| return other.__class__ == Req and other.raw == self.raw |
| |
| pl = PacketList([Req(b"1"), Res(b"1"), Req(b"2"), Req(b"3"), Req(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"4")]) |
| |
| srl, rl = pl.sr() |
| assert len(srl) == 3 |
| assert len(rl) == 3 |
| |
| srl, rl = pl.sr(lookahead=1) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| srl, rl = pl.sr(lookahead=2) |
| assert len(srl) == 2 |
| assert len(rl) == 5 |
| |
| srl, rl = pl.sr(lookahead=3) |
| assert len(srl) == 3 |
| assert len(rl) == 3 |
| |
| pl = PacketList([Req(b"\x05"), Res(b"1"), Res(b"2"), Res(b"3"), Res(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"\x05")]) |
| |
| srl, rl = pl.sr(lookahead=3) |
| assert len(srl) == 0 |
| assert len(rl) == 9 |
| |
| srl, rl = pl.sr(lookahead=7) |
| assert len(srl) == 0 |
| assert len(rl) == 9 |
| |
| srl, rl = pl.sr(lookahead=8) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| srl, rl = pl.sr(lookahead=0) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| srl, rl = pl.sr(lookahead=None) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| = pickle test |
| import pickle |
| import io |
| |
| srl, rl = PacketList([Raw(b"1"), Raw(b"1"), Raw(b"2"), Raw(b"3"), Raw(b"4"), Raw(b"3"), Raw(b"1"), Raw(b"1"), Raw(b"4")]).sr() |
| assert len(srl) == 4 |
| |
| f = io.BytesIO() |
| |
| pickle.dump(srl, f) |
| |
| unp = pickle.loads(f.getvalue()) |
| |
| assert len(unp) == len(srl) |
| assert all(bytes(a[0]) == bytes(b[0]) for a, b in zip(unp, srl)) |
| |
| = plot() |
| |
| import mock |
| import scapy.libs.matplot |
| |
| @mock.patch("scapy.libs.matplot.plt") |
| def test_plot(mock_plt): |
| def fake_plot(data, **kwargs): |
| return data |
| mock_plt.plot = fake_plot |
| plist = PacketList([IP(id=i)/TCP() for i in range(10)]) |
| lines = plist.plot(lambda p: (p.time, p.id)) |
| assert len(lines) == 10 |
| |
| test_plot() |
| |
| = diffplot() |
| |
| import mock |
| import scapy.libs.matplot |
| |
| @mock.patch("scapy.libs.matplot.plt") |
| def test_diffplot(mock_plt): |
| def fake_plot(data, **kwargs): |
| return data |
| mock_plt.plot = fake_plot |
| plist = PacketList([IP(id=i)/TCP() for i in range(10)]) |
| lines = plist.diffplot(lambda x,y: (x.time, y.id-x.id)) |
| assert len(lines) == 9 |
| |
| test_diffplot() |
| |
| = multiplot() |
| |
| import mock |
| import scapy.libs.matplot |
| |
| @mock.patch("scapy.libs.matplot.plt") |
| def test_multiplot(mock_plt): |
| def fake_plot(data, **kwargs): |
| return data |
| mock_plt.plot = fake_plot |
| tmp = [IP(id=i)/TCP() for i in range(10)] |
| plist = PacketList([tuple(tmp[i-2:i]) for i in range(2, 10, 2)]) |
| lines = plist.multiplot(lambda x, y: (y[IP].src, (y.time, y[IP].id))) |
| assert len(lines) == 1 |
| assert len(lines[0]) == 4 |
| |
| test_multiplot() |
| |
| = rawhexdump() |
| |
| def test_rawhexdump(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/TCP() for i in range(2)]) |
| p.rawhexdump() |
| result_pl_rawhexdump = cmco.get_output() |
| assert len(result_pl_rawhexdump.split('\n')) == 7 |
| assert result_pl_rawhexdump.startswith("0000 45 00 00 28") |
| |
| test_rawhexdump() |
| |
| = hexraw() |
| |
| def test_hexraw(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/Raw(str(i)) for i in range(2)]) |
| p.hexraw() |
| result_pl_hexraw = cmco.get_output() |
| assert len(result_pl_hexraw.split('\n')) == 5 |
| assert "0000 30" in result_pl_hexraw |
| |
| test_hexraw() |
| |
| = hexdump() |
| |
| def test_hexdump(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/Raw(str(i)) for i in range(2)]) |
| p.hexdump() |
| result_pl_hexdump = cmco.get_output() |
| assert len(result_pl_hexdump.split('\n')) == 7 |
| assert "0010 7F 00 00 01 31" in result_pl_hexdump |
| |
| test_hexdump() |
| |
| = import_hexcap() |
| |
| @mock.patch("scapy.utils.input") |
| def test_import_hexcap(mock_input): |
| data = """ |
| 0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E. |
| 0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|....... |
| 0020 00 01 08 00 F7 FF 00 00 00 00 .......... |
| """[1:].split("\n") |
| lines = iter(data) |
| mock_input.side_effect = lambda: next(lines) |
| return import_hexcap() |
| |
| pkt = test_import_hexcap() |
| pkt = Ether(pkt) |
| assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff" |
| assert pkt[IP].dst == "127.0.0.1" |
| assert ICMP in pkt |
| |
| = import_hexcap(input_string) |
| data = """ |
| 0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E. |
| 0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|....... |
| 0020 00 01 08 00 F7 FF 00 00 00 00 .......... |
| """[1:] |
| pkt = import_hexcap(data) |
| pkt = Ether(pkt) |
| assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff" |
| assert pkt[IP].dst == "127.0.0.1" |
| assert ICMP in pkt |
| |
| = padding() |
| |
| def test_padding(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/conf.padding_layer(str(i)) for i in range(2)]) |
| p.padding() |
| result_pl_padding = cmco.get_output() |
| assert len(result_pl_padding.split('\n')) == 5 |
| assert "0000 30" in result_pl_padding |
| |
| test_padding() |
| |
| = nzpadding() |
| |
| def test_nzpadding(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/conf.padding_layer("AB"), IP()/conf.padding_layer("\x00\x00")]) |
| p.nzpadding() |
| result_pl_nzpadding = cmco.get_output() |
| assert len(result_pl_nzpadding.split('\n')) == 3 |
| assert "0000 41 42" in result_pl_nzpadding |
| |
| test_nzpadding() |
| |
| = conversations() |
| |
| import mock |
| @mock.patch("scapy.plist.do_graph") |
| def test_conversations(mock_do_graph): |
| def fake_do_graph(graph, **kwargs): |
| return graph |
| mock_do_graph.side_effect = fake_do_graph |
| plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)]) |
| plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)]) |
| plist.extend([IPv6(dst="::2", src="::1")/TCP(sport=i) for i in range(2)]) |
| plist.extend([IPv6(src="::2", dst="::1")/TCP(sport=i) for i in range(2)]) |
| plist.extend([Ether()/ARP(pdst="127.0.0.1")]) |
| result_conversations = plist.conversations() |
| assert len(result_conversations.split('\n')) == 8 |
| assert result_conversations.startswith('digraph "conv" {') |
| assert "127.0.0.1" in result_conversations |
| assert "::1" in result_conversations |
| |
| test_conversations() |
| |
| = sessions() |
| |
| pl = PacketList([Ether()/IPv6()/ICMPv6EchoRequest(), Ether()/IPv6()/IPv6()]) |
| pl.extend([Ether()/IP()/IP(), Ether()/ARP()]) |
| pl.extend([Ether()/Ether()/IP()]) |
| assert len(pl.sessions().keys()) == 5 |
| |
| = afterglow() |
| |
| import mock |
| @mock.patch("scapy.plist.do_graph") |
| def test_afterglow(mock_do_graph): |
| def fake_do_graph(graph, **kwargs): |
| return graph |
| mock_do_graph.side_effect = fake_do_graph |
| plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)]) |
| plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)]) |
| result_afterglow = plist.afterglow() |
| assert len(result_afterglow.split('\n')) == 19 |
| assert result_afterglow.startswith('digraph "afterglow" {') |
| |
| test_afterglow() |
| |
| = psdump() |
| |
| print("PYX: %d" % PYX) |
| if PYX: |
| import tempfile |
| import os |
| filename = tempfile.mktemp(suffix=".eps") |
| plist = PacketList([IP()/TCP()]) |
| plist.psdump(filename) |
| assert os.path.exists(filename) |
| os.unlink(filename) |
| |
| = pdfdump() |
| |
| print("PYX: %d" % PYX) |
| if PYX: |
| import tempfile |
| import os |
| filename = tempfile.mktemp(suffix=".pdf") |
| plist = PacketList([IP()/TCP()]) |
| plist.pdfdump(filename) |
| assert os.path.exists(filename) |
| os.unlink(filename) |
| |
| = svgdump() |
| |
| print("PYX: %d" % PYX) |
| if PYX: |
| import tempfile |
| import os |
| filename = tempfile.mktemp(suffix=".svg") |
| plist = PacketList([IP()/TCP()]) |
| plist.svgdump(filename) |
| assert os.path.exists(filename) |
| os.unlink(filename) |
| |
| = __getstate__ / __setstate__ (used by pickle) |
| |
| import pickle |
| |
| frm = Ether(src='00:11:22:33:44:55', dst='00:22:33:44:55:66')/Raw() |
| frm.time = EDecimal(123.45) |
| frm.sniffed_on = "iface" |
| frm.wirelen = 1 |
| pl = PacketList(res=[frm, frm], name='WhatAGreatName') |
| pickled = pickle.dumps(pl) |
| pl = pickle.loads(pickled) |
| assert pl.listname == "WhatAGreatName" |
| assert len(pl) == 2 |
| assert pl[0].time == 123.45 |
| assert pl[0].sniffed_on == "iface" |
| assert pl[0].wirelen == 1 |
| assert pl[0][Ether].src == '00:11:22:33:44:55' |
| assert pl[1][Ether].dst == '00:22:33:44:55:66' |
| |
| |
| ############ |
| ############ |
| + Scapy version |
| |
| = _version() |
| |
| import os |
| from datetime import datetime |
| |
| version_filename = os.path.join(scapy._SCAPY_PKG_DIR, "VERSION") |
| mtime = datetime.utcfromtimestamp(os.path.getmtime(scapy.__file__)) |
| version = "2.0.0" |
| with open(version_filename, "w") as fd: |
| fd.write(version) |
| |
| os.environ["SCAPY_VERSION"] = "9.9.9" |
| assert scapy._version() == "9.9.9" |
| del os.environ["SCAPY_VERSION"] |
| |
| assert scapy._version() == version |
| os.unlink(version_filename) |
| |
| import mock |
| with mock.patch("scapy._version_from_git_archive") as archive: |
| archive.return_value = "4.4.4" |
| assert scapy._version() == "4.4.4" |
| archive.side_effect = ValueError() |
| with mock.patch("scapy._version_from_git_describe") as git: |
| git.return_value = "3.3.3" |
| assert scapy._version() == "3.3.3" |
| git.side_effect = Exception() |
| assert scapy._version() == mtime.strftime("%Y.%m.%d") |
| with mock.patch("os.path.getmtime") as getmtime: |
| getmtime.side_effect = Exception() |
| assert scapy._version() == "0.0.0" |
| |
| |
| = UTscapy HTML output |
| |
| import tempfile, os |
| from scapy.tools.UTscapy import TestCampaign, pack_html_campaigns |
| test_campaign = TestCampaign("test") |
| test_campaign.output_file = tempfile.mktemp() |
| html = pack_html_campaigns([test_campaign], None, local=True) |
| dirname = os.path.dirname(test_campaign.output_file) |
| filename_js = "%s/UTscapy.js" % dirname |
| filename_css = "%s/UTscapy.css" % dirname |
| assert os.path.isfile(filename_js) |
| assert os.path.isfile(filename_css) |
| os.remove(filename_js) |
| os.remove(filename_css) |
| |
| = test get_temp_dir |
| |
| dname = get_temp_dir() |
| assert os.path.isdir(dname) |
| |
| = test fragleak functions |
| ~ netaccess linux fragleak |
| |
| import mock |
| |
| @mock.patch("scapy.layers.inet.conf.L3socket") |
| @mock.patch("scapy.layers.inet.select.select") |
| @mock.patch("scapy.layers.inet.sr1") |
| def _test_fragleak(func, sr1, select, L3socket): |
| packets = [IP(src="4.4.4.4")/ICMP()/IPerror(dst="8.8.8.8")/conf.padding_layer(load=b"greatdata")] |
| iterator = iter(packets) |
| ne = lambda *args, **kwargs: next(iterator) |
| L3socket.side_effect = lambda: Bunch(recv=ne, send=lambda x: None) |
| sr1.side_effect = ne |
| select.side_effect = lambda a, b, c, d: a+b+c |
| with ContextManagerCaptureOutput() as cmco: |
| func("8.8.8.8", count=1) |
| out = cmco.get_output() |
| return "greatdata" in out |
| |
| assert _test_fragleak(fragleak) |
| assert _test_fragleak(fragleak2) |