| % Regression tests for Linux only |
| |
| # More informations at http://www.secdev.org/projects/UTscapy/ |
| |
| |
| ############ |
| ############ |
| |
| + Linux only test |
| |
| = TCP client automaton |
| ~ automaton netaccess linux needs_root |
| * This test retries on failure because it often fails |
| |
| from __future__ import print_function |
| import os |
| import time |
| import signal |
| |
| from scapy.modules.six.moves import range |
| |
| def handler(signum, stack_frame): |
| raise Exception("Timer expired !") |
| |
| tmp = signal.signal(signal.SIGALRM, handler) |
| |
| SECDEV_IP4 = "203.178.141.194" |
| IPTABLE_RULE = "iptables -%c INPUT -s %s -p tcp --sport 80 -j DROP" |
| |
| # Drop packets from SECDEV_IP4 |
| assert(os.system(IPTABLE_RULE % ('A', SECDEV_IP4)) == 0) |
| |
| success = False |
| for i in range(10): |
| tmp = signal.alarm(5) |
| try: |
| r, w = os.pipe() |
| t = TCP_client(SECDEV_IP4, 80, external_fd={ "tcp": (r,w) }) |
| tmp = os.write(w, b"HEAD / HTTP/1.0\r\n\r\n") |
| t.runbg() |
| time.sleep(0.5) |
| response = os.read(r, 4096) |
| tmp = signal.alarm(0) # cancel the alarm |
| t.stop() |
| os.close(r) |
| os.close(w) |
| if response.startswith(b"HTTP/1.1 200 OK"): |
| success = True |
| break |
| else: |
| time.sleep(0.5) |
| except Exception as e: |
| print(e) |
| |
| # Remove the iptables rule |
| assert(os.system(IPTABLE_RULE % ('D', SECDEV_IP4)) == 0) |
| |
| assert(success) |
| |
| = L3RawSocket |
| ~ netaccess IP ICMP linux needs_root |
| |
| old_l3socket = conf.L3socket |
| old_debug_dissector = conf.debug_dissector |
| conf.debug_dissector = False |
| conf.L3socket = L3RawSocket |
| x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3) |
| conf.debug_dissector = old_debug_dissector |
| conf.L3socket = old_l3socket |
| x |
| assert x[IP].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IP].hops() <= 126 |
| x is not None and ICMP in x and x[ICMP].type == 0 |
| |
| # TODO: fix this test (randomly stuck) |
| # ex: https://travis-ci.org/secdev/scapy/jobs/247473497 |
| |
| #= Supersocket _flush_fd |
| #~ needs_root linux |
| # |
| #import select |
| # |
| #from scapy.arch.linux import _flush_fd |
| #socket = conf.L2listen() |
| #select.select([socket],[],[],2) |
| #_flush_fd(socket.ins) |
| |
| = Test legacy attach_filter function |
| ~ linux needs_root |
| from scapy.arch.common import get_bpf_pointer |
| |
| old_pypy = conf.use_pypy |
| conf.use_pypy = True |
| |
| tcpdump_lines = ['12\n', '40 0 0 12\n', '21 0 5 34525\n', '48 0 0 20\n', '21 6 0 6\n', '21 0 6 44\n', '48 0 0 54\n', '21 3 4 6\n', '21 0 3 2048\n', '48 0 0 23\n', '21 0 1 6\n', '6 0 0 1600\n', '6 0 0 0\n'] |
| pointer = get_bpf_pointer(tcpdump_lines) |
| assert six.PY3 or isinstance(pointer, str) |
| assert six.PY3 or len(pointer) > 1 |
| |
| conf.use_pypy = old_pypy |
| |
| = Interface aliases & sub-interfaces |
| ~ linux needs_root |
| |
| import os |
| exit_status = os.system("ip link add name scapy0 type dummy") |
| exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") |
| exit_status = os.system("ip link set scapy0 up") |
| exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up") |
| exit_status = os.system("ip addr show scapy0") |
| print(get_if_list()) |
| conf.route.resync() |
| print(conf.route.routes) |
| assert(conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0")) |
| route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0) |
| assert(route_alias in conf.route.routes) |
| exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42") |
| exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42") |
| exit_status = os.system("ip link set scapy0.42 up") |
| exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41") |
| print(get_if_list()) |
| conf.route.resync() |
| print(conf.route.routes) |
| assert(conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41")) |
| route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "203.0.113.42", 0) |
| assert(route_specific in conf.route.routes) |
| exit_status = os.system("ip link del name dev scapy0") |