| % send, sniff, sr* tests for Scapy |
| |
| ~ netaccess |
| |
| ############ |
| ############ |
| + Test bridge_and_sniff() using tap sockets |
| |
| ~ tap linux |
| |
| = Create two tap interfaces |
| |
| import subprocess |
| from threading import Thread |
| |
| tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)] |
| |
| if LINUX: |
| for i in range(2): |
| assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"]) == 0 |
| else: |
| for i in range(2): |
| assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"]) == 0 |
| |
| = Run a sniff thread on the tap1 **interface** |
| * It will terminate when 5 IP packets from 1.2.3.4 have been sniffed |
| t_sniff = Thread( |
| target=sniff, |
| kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} |
| ) |
| t_sniff.start() |
| |
| = Run a bridge_and_sniff thread between the taps **sockets** |
| * It will terminate when 5 IP packets from 1.2.3.4 have been forwarded |
| t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) |
| t_bridge.start() |
| |
| = Send five IP packets from 1.2.3.4 to the tap0 **interface** |
| time.sleep(1) |
| sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", |
| count=5) |
| |
| = Wait for the threads |
| t_bridge.join() |
| t_sniff.join() |
| |
| = Run a sniff thread on the tap1 **interface** |
| * It will terminate when 5 IP packets from 2.3.4.5 have been sniffed |
| t_sniff = Thread( |
| target=sniff, |
| kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} |
| ) |
| t_sniff.start() |
| |
| = Run a bridge_and_sniff thread between the taps **sockets** |
| * It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded |
| def nat_1_2(pkt): |
| if IP in pkt and pkt[IP].src == "1.2.3.4": |
| pkt[IP].src = "2.3.4.5" |
| del pkt[IP].chksum |
| return pkt |
| return False |
| |
| t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "xfrm12": nat_1_2, |
| "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) |
| t_bridge.start() |
| |
| = Send five IP packets from 1.2.3.4 to the tap0 **interface** |
| time.sleep(1) |
| sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", |
| count=5) |
| |
| = Wait for the threads |
| t_bridge.join() |
| t_sniff.join() |
| |
| = Delete the tap interfaces |
| del tap0, tap1 |
| |
| |
| ############ |
| ############ |
| + Test bridge_and_sniff() using tun sockets |
| |
| ~ tun linux not_pcapdnet |
| |
| = Create two tun interfaces |
| |
| import subprocess |
| from threading import Thread |
| |
| tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)] |
| |
| if LINUX: |
| for i in range(2): |
| assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"]) == 0 |
| else: |
| for i in range(2): |
| assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"]) == 0 |
| |
| = Run a sniff thread on the tun1 **interface** |
| * It will terminate when 5 IP packets from 1.2.3.4 have been sniffed |
| t_sniff = Thread( |
| target=sniff, |
| kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} |
| ) |
| t_sniff.start() |
| |
| = Run a bridge_and_sniff thread between the tuns **sockets** |
| * It will terminate when 5 IP packets from 1.2.3.4 have been forwarded. |
| t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "xfrm12": lambda pkt: pkt, |
| "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) |
| t_bridge.start() |
| |
| = Send five IP packets from 1.2.3.4 to the tun0 **interface** |
| time.sleep(1) |
| conf.route.add(net="1.2.3.4/32", dev="tun0") |
| send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5) |
| conf.route.delt(net="1.2.3.4/32", dev="tun0") |
| |
| = Wait for the threads |
| t_bridge.join() |
| t_sniff.join() |
| |
| = Run a sniff thread on the tun1 **interface** |
| * It will terminate when 5 IP packets from 2.3.4.5 have been sniffed |
| t_sniff = Thread( |
| target=sniff, |
| kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} |
| ) |
| t_sniff.start() |
| |
| = Run a bridge_and_sniff thread between the tuns **sockets** |
| * It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded |
| def nat_1_2(pkt): |
| if IP in pkt and pkt[IP].src == "1.2.3.4": |
| pkt[IP].src = "2.3.4.5" |
| del pkt[IP].chksum |
| return pkt |
| return False |
| |
| t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "xfrm12": nat_1_2, |
| "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) |
| t_bridge.start() |
| |
| = Send five IP packets from 1.2.3.4 to the tun0 **interface** |
| time.sleep(1) |
| conf.route.add(net="1.2.3.4/32", dev="tun0") |
| send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5) |
| conf.route.delt(net="1.2.3.4/32", dev="tun0") |
| |
| = Wait for the threads |
| t_bridge.join() |
| t_sniff.join() |
| |
| = Delete the tun interfaces |
| del tun0, tun1 |