Stop requiring a live Internet connection in ping6_test.py.
Instead, use tun interfaces like multinetwork_test does, and
a separate thread to reply to ping packets. This allows the
test cases to be almost completely unchanged.
Change-Id: Ie7e07ebdf493a3a1eb3b7774130b71205423944e
diff --git a/net/test/ping6_test.py b/net/test/ping6_test.py
index 57365ae..ca6ebbd 100755
--- a/net/test/ping6_test.py
+++ b/net/test/ping6_test.py
@@ -19,26 +19,194 @@
import errno
import os
import posix
+import random
import re
from socket import * # pylint: disable=wildcard-import
+import threading
+import time
import unittest
from scapy import all as scapy
+import multinetwork_base
import net_test
HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
+ICMP_ECHO = 8
+ICMP_ECHOREPLY = 0
+ICMPV6_ECHO_REQUEST = 128
+ICMPV6_ECHO_REPLY = 129
-class Ping6Test(net_test.NetworkTest):
+
+class PingReplyThread(threading.Thread):
+
+ MIN_TTL = 10
+ INTERMEDIATE_IPV4 = "192.0.2.2"
+ INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
+ NEIGHBOURS = ["fe80::1"]
+
+ def __init__(self, tun, mymac, routermac):
+ super(PingReplyThread, self).__init__()
+ self._tun = tun
+ self._stopped = False
+ self._mymac = mymac
+ self._routermac = routermac
+
+ def Stop(self):
+ self._stopped = True
+
+ def ChecksumValid(self, packet):
+ # Get and clear the checksums.
+ def GetAndClearChecksum(layer):
+ if not layer:
+ return
+ try:
+ checksum = layer.chksum
+ del layer.chksum
+ except AttributeError:
+ checksum = layer.cksum
+ del layer.cksum
+ return checksum
+
+ def GetChecksum(layer):
+ try:
+ return layer.chksum
+ except AttributeError:
+ return layer.cksum
+
+ layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
+ sums = {}
+ for name in layers:
+ sums[name] = GetAndClearChecksum(packet.getlayer(name))
+
+ # Serialize the packet, so scapy recalculates the checksums, and compare
+ # them with the ones in the packet.
+ packet = packet.__class__(str(packet))
+ for name in layers:
+ layer = packet.getlayer(name)
+ if layer and GetChecksum(layer) != sums[name]:
+ return False
+
+ return True
+
+ def SendTimeExceeded(self, version, packet):
+ if version == 4:
+ src = packet.getlayer(scapy.IP).src
+ self.SendPacket(
+ scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
+ scapy.ICMP(type=11, code=0) /
+ packet)
+ elif version == 6:
+ src = packet.getlayer(scapy.IPv6).src
+ self.SendPacket(
+ scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
+ scapy.ICMPv6TimeExceeded(code=0) /
+ packet)
+
+ def IPv4Packet(self, ip):
+ icmp = ip.getlayer(scapy.ICMP)
+
+ # We only support ping for now.
+ if (ip.proto != IPPROTO_ICMP or
+ icmp.type != ICMP_ECHO or
+ icmp.code != 0):
+ return
+
+ # Check the checksums.
+ if not self.ChecksumValid(ip):
+ return
+
+ if ip.ttl < self.MIN_TTL:
+ self.SendTimeExceeded(4, ip)
+ return
+
+ icmp.type = ICMP_ECHOREPLY
+ self.SwapAddresses(ip)
+ self.SendPacket(ip)
+
+ def IPv6Packet(self, ipv6):
+ icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
+
+ # We only support ping for now.
+ if (ipv6.nh != IPPROTO_ICMPV6 or
+ not icmpv6 or
+ icmpv6.type != ICMPV6_ECHO_REQUEST or
+ icmpv6.code != 0):
+ return
+
+ # Check the checksums.
+ if not self.ChecksumValid(ipv6):
+ return
+
+ if ipv6.dst.startswith("ff02::"):
+ ipv6.dst = ipv6.src
+ for src in self.NEIGHBOURS:
+ ipv6.src = src
+ icmpv6.type = ICMPV6_ECHO_REPLY
+ self.SendPacket(ipv6)
+ elif ipv6.hlim < self.MIN_TTL:
+ self.SendTimeExceeded(6, ipv6)
+ else:
+ icmpv6.type = ICMPV6_ECHO_REPLY
+ self.SwapAddresses(ipv6)
+ self.SendPacket(ipv6)
+
+ def SwapAddresses(self, packet):
+ src = packet.src
+ packet.src = packet.dst
+ packet.dst = src
+
+ def SendPacket(self, packet):
+ packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
+ try:
+ posix.write(self._tun.fileno(), str(packet))
+ except ValueError:
+ pass
+
+ def run(self):
+ while not self._stopped:
+
+ try:
+ packet = posix.read(self._tun.fileno(), 4096)
+ except OSError, e:
+ if e.errno == errno.EAGAIN:
+ continue
+ else:
+ break
+
+ ether = scapy.Ether(packet)
+ if ether.type == net_test.ETH_P_IPV6:
+ self.IPv6Packet(ether.payload)
+ elif ether.type == net_test.ETH_P_IP:
+ self.IPv4Packet(ether.payload)
+
+
+class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(Ping6Test, cls).setUpClass()
+ cls.netid = random.choice(cls.NETIDS)
+ cls.reply_thread = PingReplyThread(
+ cls.tuns[cls.netid],
+ cls.MyMacAddress(cls.netid),
+ cls.RouterMacAddress(cls.netid))
+ cls.SetDefaultNetwork(cls.netid)
+ cls.reply_thread.start()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.reply_thread.Stop()
+ cls.ClearDefaultNetwork()
+ super(Ping6Test, cls).tearDownClass()
def setUp(self):
- if net_test.HAVE_IPV6:
- self.ifname = net_test.GetDefaultRouteInterface()
- self.ifindex = net_test.GetInterfaceIndex(self.ifname)
- self.lladdr = net_test.GetLinkAddress(self.ifname, True)
- self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
+ self.ifname = self.GetInterfaceName(self.netid)
+ self.ifindex = self.ifindices[self.netid]
+ self.lladdr = net_test.GetLinkAddress(self.ifname, True)
+ self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
def assertValidPingResponse(self, s, data):
family = s.family
@@ -73,7 +241,10 @@
self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
# Check that the flow label is zero and that the scope ID is sane.
self.assertEqual(flowlabel, 0)
- self.assertLess(scope_id, 100)
+ if addr.startswith("fe80::"):
+ self.assertTrue(scope_id in self.ifindices.values())
+ else:
+ self.assertEquals(0, scope_id)
# TODO: check the checksum. We can't do this easily now for ICMPv6 because
# we don't have the IP addresses so we can't construct the pseudoheader.
@@ -132,7 +303,6 @@
s = net_test.IPv4PingSocket()
self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6SendWithNoConnection(self):
s = net_test.IPv6PingSocket()
self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
@@ -150,14 +320,12 @@
s.send(net_test.IPV6_PING)
self.assertValidPingResponse(s, net_test.IPV6_PING)
- @unittest.skipUnless(net_test.HAVE_IPV4, "skipping: no IPv4")
def testIPv4PingUsingSendto(self):
s = net_test.IPv4PingSocket()
written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
self.assertEquals(len(net_test.IPV4_PING), written)
self.assertValidPingResponse(s, net_test.IPV4_PING)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6PingUsingSendto(self):
s = net_test.IPv6PingSocket()
written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
@@ -210,7 +378,6 @@
scapy.send(GetIPv6Unreachable(port))
# No crash? Good.
- @unittest.skipUnless(net_test.HAVE_IPV4, "skipping: no IPv4")
def testIPv4Bind(self):
# Bind to unspecified address.
s = net_test.IPv4PingSocket()
@@ -254,7 +421,6 @@
s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6Bind(self):
# Bind to unspecified address.
s = net_test.IPv6PingSocket()
@@ -304,7 +470,6 @@
if e.errno == errno.EACCES:
pass # We're not root. let it go for now.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6InvalidBind(self):
s = net_test.IPv6PingSocket()
self.assertRaisesErrno(errno.EINVAL,
@@ -320,7 +485,6 @@
if e.errno == errno.EACCES:
pass # We're not root. let it go for now.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6ScopedBind(self):
# Can't bind to a link-local address without a scope ID.
s = net_test.IPv6PingSocket()
@@ -344,7 +508,6 @@
s.bind(("::1", 1234, 0, 1))
self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testBindAffectsIdentifier(self):
s = net_test.IPv6PingSocket()
s.bind((self.globaladdr, 0xf976))
@@ -356,7 +519,6 @@
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testLinkLocalAddress(self):
s = net_test.IPv6PingSocket()
# Sending to a link-local address with no scope fails with EINVAL.
@@ -367,7 +529,6 @@
s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
# No exceptions? Good.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testMappedAddressFails(self):
s = net_test.IPv6PingSocket()
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
@@ -404,7 +565,6 @@
_, _, flowlabel, _ = src
self.assertEqual(0xdead, flowlabel & 0xfffff)
- @unittest.skipUnless(net_test.HAVE_IPV4, "skipping: no IPv4")
def testIPv4Error(self):
s = net_test.IPv4PingSocket()
s.setsockopt(SOL_IP, IP_TTL, 2)
@@ -414,7 +574,6 @@
# recvmsg, but we can at least check that the socket returns an error.
self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6Error(self):
s = net_test.IPv6PingSocket()
s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
@@ -424,10 +583,11 @@
# recvmsg, but we can at least check that the socket returns an error.
self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testIPv6MulticastPing(self):
s = net_test.IPv6PingSocket()
# Send a multicast ping and check we get at least one duplicate.
+ # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
self.assertValidPingResponse(s, net_test.IPV6_PING)
self.assertValidPingResponse(s, net_test.IPV6_PING)
@@ -444,7 +604,6 @@
data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
s.sendto(data, ("::1", 953))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmpSocketsNotInIcmp6(self):
numrows = len(self.ReadProcNetSocket("icmp"))
@@ -455,7 +614,6 @@
self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmp6SocketsNotInIcmp(self):
numrows = len(self.ReadProcNetSocket("icmp"))
@@ -472,7 +630,6 @@
s.connect(("127.0.0.1", 0xbeef))
self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testProcNetIcmp6(self):
numrows6 = len(self.ReadProcNetSocket("icmp6"))
@@ -498,23 +655,23 @@
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
# Check receive bytes.
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
s.connect(("ff02::1", 0xdead))
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
s.send(net_test.IPV6_PING)
+ time.sleep(0.01) # Give the other thread time to reply.
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
- txmem=0, rxmem=0x880)
+ txmem=0, rxmem=0x300)
self.assertValidPingResponse(s, net_test.IPV6_PING)
self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
txmem=0, rxmem=0)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testProcNetUdp6(self):
s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
s.bind(("::1", 0xace))
s.connect(("::1", 0xbeef))
self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
- @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
def testProcNetRaw6(self):
s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
s.bind(("::1", 0xace))