Merge changes I82fd7793,Id3cca05d
* changes:
Make sock_diag_test.py pass on 4.9 devices that don't have SCTP.
Make BPF tests pass on device.
diff --git a/net/test/xfrm.py b/net/test/xfrm.py
index 93d15ac..1bd10da 100755
--- a/net/test/xfrm.py
+++ b/net/test/xfrm.py
@@ -630,6 +630,15 @@
tmpl = UserTemplate(outer_family, spi, 0, (src, dst))
self.AddPolicyInfo(policy, tmpl, mark)
+ def DeleteTunnel(self, direction, selector, dst, spi, mark):
+ self.DeleteSaInfo(dst, spi, IPPROTO_ESP, ExactMatchMark(mark))
+ if selector is None:
+ selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)]
+ else:
+ selectors = [selector]
+ for selector in selectors:
+ self.DeletePolicyInfo(selector, direction, ExactMatchMark(mark))
+
if __name__ == "__main__":
x = Xfrm()
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index e1fabb2..94e846e 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -18,6 +18,7 @@
from errno import * # pylint: disable=wildcard-import
from socket import * # pylint: disable=wildcard-import
+import random
import struct
import unittest
@@ -31,14 +32,18 @@
import xfrm_base
# Parameters to Set up VTI as a special network
+_BASE_VTI_NETID = {4: 40, 6: 60}
+_BASE_VTI_OKEY = 2000000100
+_BASE_VTI_IKEY = 2000000200
+
_VTI_NETID = 50
_VTI_IFNAME = "test_vti"
_TEST_OUT_SPI = 0x1234
_TEST_IN_SPI = _TEST_OUT_SPI
-_TEST_OKEY = _TEST_OUT_SPI + _VTI_NETID
-_TEST_IKEY = _TEST_IN_SPI + _VTI_NETID
+_TEST_OKEY = 2000000100
+_TEST_IKEY = 2000000200
def _GetLocalInnerAddress(version):
@@ -100,57 +105,7 @@
@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
-class XfrmVtiTest(xfrm_base.XfrmLazyTest):
-
- @classmethod
- def setUpClass(cls):
- xfrm_base.XfrmBaseTest.setUpClass()
- # VTI interfaces use marks extensively, so configure realistic packet
- # marking rules to make the test representative, make PMTUD work, etc.
- cls.SetInboundMarks(True)
- cls._SetInboundMarking(_VTI_NETID, _VTI_IFNAME, True)
- cls.SetMarkReflectSysctls(1)
-
- @classmethod
- def tearDownClass(cls):
- # The sysctls are restored by MultinetworkBaseTest.tearDownClass.
- cls._SetInboundMarking(_VTI_NETID, _VTI_IFNAME, False)
- cls.SetInboundMarks(False)
- xfrm_base.XfrmBaseTest.tearDownClass()
-
- def setUp(self):
- super(XfrmVtiTest, self).setUp()
- # If the hard-coded netids are redefined this will catch the error.
- self.assertNotIn(_VTI_NETID, self.NETIDS,
- "VTI netid %d already in use" % _VTI_NETID)
- self.iproute = iproute.IPRoute()
- self._QuietDeleteLink(_VTI_IFNAME)
-
- def tearDown(self):
- super(XfrmVtiTest, self).tearDown()
- self._QuietDeleteLink(_VTI_IFNAME)
-
- def _QuietDeleteLink(self, ifname):
- try:
- self.iproute.DeleteLink(ifname)
- except IOError:
- # The link was not present.
- pass
-
- def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
- """Exchange two addresses on a given interface.
-
- Args:
- ifname: Name of the interface
- old_addr: An address to be removed from the interface
- new_addr: An address to be added to an interface
- """
- version = 6 if ":" in new_addr else 4
- ifindex = net_test.GetInterfaceIndex(ifname)
- self.iproute.AddAddress(new_addr,
- net_test.AddressLengthBits(version), ifindex)
- self.iproute.DelAddress(old_addr,
- net_test.AddressLengthBits(version), ifindex)
+class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
def testAddVti(self):
"""Test the creation of a Virtual Tunnel Interface."""
@@ -171,7 +126,130 @@
with self.assertRaises(IOError):
self.iproute.GetIfIndex(_VTI_IFNAME)
- def _SetupVtiNetwork(self, netid, ifname, is_add):
+ def _QuietDeleteLink(self, ifname):
+ try:
+ self.iproute.DeleteLink(ifname)
+ except IOError:
+ # The link was not present.
+ pass
+
+ def tearDown(self):
+ super(XfrmAddDeleteVtiTest, self).tearDown()
+ self._QuietDeleteLink(_VTI_IFNAME)
+
+
+class VtiInterface(object):
+
+ def __init__(self, iface, netid, underlying_netid, local, remote):
+ self.iface = iface
+ self.netid = netid
+ self.underlying_netid = underlying_netid
+ self.local, self.remote = local, remote
+ self.rx = self.tx = 0
+ self.ikey = _TEST_IKEY + netid
+ self.okey = _TEST_OKEY + netid
+ self.out_spi = self.in_spi = random.randint(0, 0x7fffffff)
+
+ self.iproute = iproute.IPRoute()
+ self.xfrm = xfrm.Xfrm()
+
+ self.SetupInterface()
+ self.SetupXfrm()
+ self.addrs = {}
+
+ def Teardown(self):
+ self.TeardownXfrm()
+ self.TeardownInterface()
+
+ def SetupInterface(self):
+ self.iproute.CreateVirtualTunnelInterface(
+ self.iface, self.local, self.remote, self.ikey, self.okey)
+
+ def TeardownInterface(self):
+ self.iproute.DeleteLink(self.iface)
+
+ def SetupXfrm(self):
+ # For the VTI, the selectors are wildcard since packets will only
+ # be selected if they have the appropriate mark, hence the inner
+ # addresses are wildcard.
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
+ self.out_spi, xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ xfrm.ExactMatchMark(self.okey),
+ self.underlying_netid)
+
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
+ self.in_spi, xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ xfrm.ExactMatchMark(self.ikey), None)
+
+ def TeardownXfrm(self):
+ self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
+ self.out_spi, self.okey)
+ self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
+ self.in_spi, self.ikey)
+
+
+@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
+class XfrmVtiTest(xfrm_base.XfrmBaseTest):
+
+ @classmethod
+ def setUpClass(cls):
+ xfrm_base.XfrmBaseTest.setUpClass()
+ # VTI interfaces use marks extensively, so configure realistic packet
+ # marking rules to make the test representative, make PMTUD work, etc.
+ cls.SetInboundMarks(True)
+ cls.SetMarkReflectSysctls(1)
+
+ cls.vtis = {}
+ for i, underlying_netid in enumerate(cls.tuns):
+ for version in 4, 6:
+ netid = _BASE_VTI_NETID[version] + i
+ iface = "ipsec%s" % netid
+ local = cls.MyAddress(version, underlying_netid)
+ if version == 4:
+ remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR
+ else:
+ remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR
+ vti = VtiInterface(iface, netid, underlying_netid, local, remote)
+ cls._SetInboundMarking(netid, iface, True)
+ cls._SetupVtiNetwork(vti, True)
+ cls.vtis[netid] = vti
+
+ @classmethod
+ def tearDownClass(cls):
+ # The sysctls are restored by MultinetworkBaseTest.tearDownClass.
+ cls.SetInboundMarks(False)
+ for vti in cls.vtis.values():
+ cls._SetInboundMarking(vti.netid, vti.iface, False)
+ cls._SetupVtiNetwork(vti, False)
+ vti.Teardown()
+ xfrm_base.XfrmBaseTest.tearDownClass()
+
+ def setUp(self):
+ multinetwork_base.MultiNetworkBaseTest.setUp(self)
+ self.iproute = iproute.IPRoute()
+
+ def tearDown(self):
+ multinetwork_base.MultiNetworkBaseTest.tearDown(self)
+
+ def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
+ """Exchange two addresses on a given interface.
+
+ Args:
+ ifname: Name of the interface
+ old_addr: An address to be removed from the interface
+ new_addr: An address to be added to an interface
+ """
+ version = 6 if ":" in new_addr else 4
+ ifindex = net_test.GetInterfaceIndex(ifname)
+ self.iproute.AddAddress(new_addr,
+ net_test.AddressLengthBits(version), ifindex)
+ self.iproute.DelAddress(old_addr,
+ net_test.AddressLengthBits(version), ifindex)
+
+ @classmethod
+ def _SetupVtiNetwork(cls, vti, is_add):
"""Setup rules and routes for a VTI Network.
Takes an interface and depending on the boolean
@@ -180,83 +258,63 @@
Network for purposes of testing.
Args:
- ifname: The name of a linux interface
+ vti: A VtiInterface, the VTI to set up.
is_add: Boolean that causes this method to perform setup if True or
teardown if False
"""
if is_add:
- # Bring up the interface so that we can start adding addresses
- # and routes.
- net_test.SetInterfaceUp(ifname)
-
# Disable router solicitations to avoid occasional spurious packets
# arriving on the underlying network; there are two possible behaviors
# when that occurred: either only the RA packet is read, and when it
# is echoed back to the VTI, it causes the test to fail by not receiving
# the UDP_PAYLOAD; or, two packets may arrive on the underlying
# network which fails the assertion that only one ESP packet is received.
- self.SetSysctl(
- "/proc/sys/net/ipv6/conf/%s/router_solicitations" % ifname, 0)
+ cls.SetSysctl(
+ "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0)
+ net_test.SetInterfaceUp(vti.iface)
+
for version in [4, 6]:
- ifindex = net_test.GetInterfaceIndex(ifname)
- table = netid
+ ifindex = net_test.GetInterfaceIndex(vti.iface)
+ table = vti.netid
# Set up routing rules.
- start, end = self.UidRangeForNetid(netid)
- self.iproute.UidRangeRule(version, is_add, start, end, table,
- self.PRIORITY_UID)
- self.iproute.OifRule(version, is_add, ifname, table, self.PRIORITY_OIF)
- self.iproute.FwmarkRule(version, is_add, netid, self.NETID_FWMASK, table,
- self.PRIORITY_FWMARK)
- if is_add:
- self.iproute.AddAddress(
- _GetLocalInnerAddress(version),
- net_test.AddressLengthBits(version), ifindex)
- self.iproute.AddRoute(version, table, "default", 0, None, ifindex)
+ start, end = cls.UidRangeForNetid(vti.netid)
+ cls.iproute.UidRangeRule(version, is_add, start, end, table,
+ cls.PRIORITY_UID)
+ cls.iproute.OifRule(version, is_add, vti.iface, table, cls.PRIORITY_OIF)
+ cls.iproute.FwmarkRule(version, is_add, vti.netid, cls.NETID_FWMASK,
+ table, cls.PRIORITY_FWMARK)
+
+ # Configure IP addresses.
+ if version == 4:
+ addr = cls._MyIPv4Address(vti.netid)
else:
- self.iproute.DelRoute(version, table, "default", 0, None, ifindex)
- self.iproute.DelAddress(
- _GetLocalInnerAddress(version),
- net_test.AddressLengthBits(version), ifindex)
- if not is_add:
- net_test.SetInterfaceDown(ifname)
+ addr = cls.OnlinkPrefix(6, vti.netid) + "1"
+ prefixlen = net_test.AddressLengthBits(version)
+ vti.addrs[version] = addr
+ if is_add:
+ cls.iproute.AddAddress(addr, prefixlen, ifindex)
+ cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
+ else:
+ cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
+ cls.iproute.DelAddress(addr, prefixlen, ifindex)
+
+ def assertReceivedPacket(self, vti):
+ vti.rx += 1
+ self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
+
+ def assertSentPacket(self, vti):
+ vti.tx += 1
+ self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
# TODO: Should we completely re-write this using null encryption and null
# authentication? We could then assemble and disassemble packets for each
# direction individually. This approach would improve debuggability, avoid the
# complexity of the twister, and allow the test to more-closely validate
# deployable configurations.
- def _CreateVti(self, netid, vti_netid, ifname, outer_version):
- local_outer = self.MyAddress(outer_version, netid)
- remote_outer = _GetRemoteOuterAddress(outer_version)
- self.iproute.CreateVirtualTunnelInterface(
- dev_name=ifname,
- local_addr=local_outer,
- remote_addr=remote_outer,
- i_key=_TEST_IKEY,
- o_key=_TEST_OKEY)
-
- self._SetupVtiNetwork(vti_netid, ifname, True)
-
- # For the VTI, the selectors are wildcard since packets will only
- # be selected if they have the appropriate mark, hence the inner
- # addresses are wildcard.
- self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer,
- remote_outer, _TEST_OUT_SPI,
- xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(_TEST_OKEY), netid)
-
- self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer,
- local_outer, _TEST_IN_SPI,
- xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(_TEST_IKEY), None)
-
- def _CheckVtiInputOutput(self, netid, vti_netid, iface, outer_version,
- inner_version, rx, tx):
- local_outer = self.MyAddress(outer_version, netid)
- remote_outer = _GetRemoteOuterAddress(outer_version)
+ def _CheckVtiInputOutput(self, vti, inner_version):
+ local_outer = vti.local
+ remote_outer = vti.remote
# Create a socket to receive packets.
read_sock = socket(
@@ -271,40 +329,39 @@
# of the input socket.
write_sock = socket(
net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- self.SelectInterface(write_sock, vti_netid, "mark")
+ self.SelectInterface(write_sock, vti.netid, "mark")
write_sock.sendto(net_test.UDP_PAYLOAD,
(_GetRemoteInnerAddress(inner_version), port))
# Read a tunneled IP packet on the underlying (outbound) network
# verifying that it is an ESP packet.
- pkt = self._ExpectEspPacketOn(netid, _TEST_OUT_SPI, tx + 1, None,
+ self.assertSentPacket(vti)
+ pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
local_outer, remote_outer)
- self.assertEquals((rx, tx + 1), self.iproute.GetRxTxPackets(iface))
-
# Perform an address switcheroo so that the inner address of the remote
# end of the tunnel is now the address on the local VTI interface; this
# way, the twisted inner packet finds a destination via the VTI once
# decrypted.
remote = _GetRemoteInnerAddress(inner_version)
- local = _GetLocalInnerAddress(inner_version)
- self._SwapInterfaceAddress(iface, new_addr=remote, old_addr=local)
+ local = vti.addrs[inner_version]
+ self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local)
try:
# Swap the packet's IP headers and write it back to the
# underlying network.
pkt = TunTwister.TwistPacket(pkt)
- self.ReceivePacketOn(netid, pkt)
+ self.ReceivePacketOn(vti.underlying_netid, pkt)
+ self.assertReceivedPacket(vti)
# Receive the decrypted packet on the dest port number.
read_packet = read_sock.recv(4096)
self.assertEquals(read_packet, net_test.UDP_PAYLOAD)
- self.assertEquals((rx + 1, tx + 1), self.iproute.GetRxTxPackets(iface))
finally:
# Unwind the switcheroo
- self._SwapInterfaceAddress(iface, new_addr=local, old_addr=remote)
+ self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote)
# Now attempt to provoke an ICMP error.
# TODO: deduplicate with multinetwork_test.py.
- version = outer_version
+ version = net_test.GetAddressVersion(vti.remote)
dst_prefix, intermediate = {
4: ("172.19.", "172.16.9.12"),
6: ("2001:db8::", "2001:db8::1")
@@ -312,45 +369,29 @@
write_sock.sendto(net_test.UDP_PAYLOAD,
(_GetRemoteInnerAddress(inner_version), port))
- pkt = self._ExpectEspPacketOn(netid, _TEST_OUT_SPI, tx + 2, None,
+ self.assertSentPacket(vti)
+ pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
local_outer, remote_outer)
- myaddr = self.MyAddress(version, netid)
+ myaddr = self.MyAddress(version, vti.underlying_netid)
_, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt)
- self.ReceivePacketOn(netid, toobig)
+ self.ReceivePacketOn(vti.underlying_netid, toobig)
# Check that the packet too big reduced the MTU.
- routes = self.iproute.GetRoutes(remote_outer, 0, netid, None)
+ routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None)
self.assertEquals(1, len(routes))
rtmsg, attributes = routes[0]
self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
# Clear PMTU information so that future tests don't have to worry about it.
- self.InvalidateDstCache(version, netid)
+ self.InvalidateDstCache(version, vti.underlying_netid)
- self.assertEquals((rx + 1, tx + 2),
- self.iproute.GetRxTxPackets(iface))
- return rx + 1, tx + 2
-
- def _TestVti(self, outer_version):
+ def testVtiInputOutput(self):
"""Test packet input and output over a Virtual Tunnel Interface."""
- netid = self.RandomNetid()
-
- self._CreateVti(netid, _VTI_NETID, _VTI_IFNAME, outer_version)
-
- try:
- rx, tx = self._CheckVtiInputOutput(netid, _VTI_NETID, _VTI_IFNAME,
- outer_version, 4, 0, 0)
- self._CheckVtiInputOutput(netid, _VTI_NETID, _VTI_IFNAME, outer_version,
- 4, rx, tx)
- finally:
- self._SetupVtiNetwork(_VTI_NETID, _VTI_IFNAME, False)
-
- def testIpv4Vti(self):
- self._TestVti(4)
-
- def testIpv6Vti(self):
- self._TestVti(6)
+ for i in xrange(3 * len(self.vtis.values())):
+ vti = random.choice(self.vtis.values())
+ self._CheckVtiInputOutput(vti, 4)
+ self._CheckVtiInputOutput(vti, 6)
if __name__ == "__main__":