Merge "TEST_MAPPING: add vts net tests to kernel-presubmit"
diff --git a/net/test/bpf_test.py b/net/test/bpf_test.py
index 291e540..d98c494 100755
--- a/net/test/bpf_test.py
+++ b/net/test/bpf_test.py
@@ -84,18 +84,6 @@
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
-# bpf_ktime_get_ns() was made non-GPL requiring in 5.8 and at the same time
-# bpf_ktime_get_boot_ns() was added, both of these changes were backported to
-# Android Common Kernel in 4.14.221, 4.19.175, 5.4.97.
-# As such we require 4.14.222+ 4.19.176+ 5.4.98+ 5.8.0+,
-# but since we only really care about LTS releases:
-HAVE_EBPF_KTIME_GET_NS_APACHE2 = (
- ((LINUX_VERSION > (4, 14, 221)) and (LINUX_VERSION < (4, 19, 0))) or
- ((LINUX_VERSION > (4, 19, 175)) and (LINUX_VERSION < (5, 4, 0))) or
- (LINUX_VERSION > (5, 4, 97))
-)
-HAVE_EBPF_KTIME_GET_BOOT_NS = HAVE_EBPF_KTIME_GET_NS_APACHE2
-
KEY_SIZE = 4
VALUE_SIZE = 4
TOTAL_ENTRIES = 20
@@ -383,8 +371,6 @@
# 5.4: https://android-review.googlesource.com/c/kernel/common/+/1355422
# commit 45217b91eaaa3a563247c4f470f4cb785de6b1c6
#
- @unittest.skipUnless(HAVE_EBPF_KTIME_GET_NS_APACHE2,
- "no bpf_ktime_get_ns() support for non-GPL programs")
def testKtimeGetNsApache2(self):
instructions = [BpfFuncCall(BPF_FUNC_ktime_get_ns)] + INS_BPF_EXIT_BLOCK
self.prog_fd = BpfProgLoad(BPF_PROG_TYPE_SCHED_CLS, instructions,
@@ -406,8 +392,6 @@
# 5.4: https://android-review.googlesource.com/c/kernel/common/+/1585252
# commit 57b3f4830fb66a6038c4c1c66ca2e138fe8be231
#
- @unittest.skipUnless(HAVE_EBPF_KTIME_GET_BOOT_NS,
- "no bpf_ktime_get_boot_ns() support")
def testKtimeGetBootNs(self):
instructions = [
BpfFuncCall(BPF_FUNC_ktime_get_boot_ns),
diff --git a/net/test/build_rootfs.sh b/net/test/build_rootfs.sh
index a86496b..3a88cad 100755
--- a/net/test/build_rootfs.sh
+++ b/net/test/build_rootfs.sh
@@ -402,6 +402,7 @@
# Process the initrd to remove kernel-specific metadata
kernel_version=$(basename $(lz4 -lcd "${raw_initrd}" | sudo cpio -idumv 2>&1 | grep usr/lib/modules/ - | head -n1))
+lz4 -lcd "${raw_initrd}" | sudo cpio -idumv
sudo rm -rf usr/lib/modules
sudo mkdir -p usr/lib/modules
@@ -421,33 +422,56 @@
# Leave workdir to boot-test combined initrd
cd -
+rootfs_partition_tempfile=$(mktemp)
# Mount the new filesystem locally
if [[ ${rootfs_partition} = "raw" ]]; then
sudo mount -o loop -t ext3 "${disk}" "${mount}"
else
rootfs_partition_start=$(partx -g -o START -s -n "${rootfs_partition}" "${disk}" | xargs)
rootfs_partition_offset=$((${rootfs_partition_start} * 512))
- sudo mount -o loop,offset=${rootfs_partition_offset} -t ext3 "${disk}" "${mount}"
+ rootfs_partition_end=$(partx -g -o END -s -n "${rootfs_partition}" "${disk}" | xargs)
+ rootfs_partition_num_sectors=$((${rootfs_partition_end} - ${rootfs_partition_start} + 1))
+ dd if="${disk}" of="${rootfs_partition_tempfile}" bs=512 skip=${rootfs_partition_start} count=${rootfs_partition_num_sectors}
fi
image_unmount2() {
sudo umount "${mount}"
raw_initrd_remove
}
-trap image_unmount2 EXIT
+if [[ ${rootfs_partition} = "raw" ]]; then
+ trap image_unmount2 EXIT
+fi
# Embed the kernel and dtb images now, if requested
-if [[ "${embed_kernel_initrd_dtb}" = "1" ]]; then
- if [ -n "${dtb}" ]; then
- sudo mkdir -p "${mount}/boot/dtb/${dtb_subdir}"
- sudo cp -a "${dtb}" "${mount}/boot/dtb/${dtb_subdir}"
- sudo chown -R root:root "${mount}/boot/dtb/${dtb_subdir}"
- fi
- sudo cp -a "${kernel}" "${mount}/boot/vmlinuz-${kernel_version}"
- sudo chown root:root "${mount}/boot/vmlinuz-${kernel_version}"
+if [[ ${rootfs_partition} = "raw" ]]; then
+ if [[ "${embed_kernel_initrd_dtb}" = "1" ]]; then
+ if [ -n "${dtb}" ]; then
+ sudo mkdir -p "${mount}/boot/dtb/${dtb_subdir}"
+ sudo cp -a "${dtb}" "${mount}/boot/dtb/${dtb_subdir}"
+ sudo chown -R root:root "${mount}/boot/dtb/${dtb_subdir}"
+ fi
+ sudo cp -a "${kernel}" "${mount}/boot/vmlinuz-${kernel_version}"
+ sudo chown root:root "${mount}/boot/vmlinuz-${kernel_version}"
+ fi
+else
+ if [[ "${embed_kernel_initrd_dtb}" = "1" ]]; then
+ if [ -n "${dtb}" ]; then
+ e2mkdir -G 0 -O 0 "${rootfs_partition_tempfile}":"/boot/dtb/${dtb_subdir}"
+ e2cp -G 0 -O 0 "${dtb}" "${rootfs_partition_tempfile}":"/boot/dtb/${dtb_subdir}"
+ fi
+ e2cp -G 0 -O 0 "${kernel}" "${rootfs_partition_tempfile}":"/boot/vmlinuz-${kernel_version}"
+ fi
fi
# Unmount the initial ramdisk
-sudo umount "${mount}"
+if [[ ${rootfs_partition} = "raw" ]]; then
+ sudo umount "${mount}"
+else
+ rootfs_partition_start=$(partx -g -o START -s -n "${rootfs_partition}" "${disk}" | xargs)
+ rootfs_partition_end=$(partx -g -o END -s -n "${rootfs_partition}" "${disk}" | xargs)
+ rootfs_partition_num_sectors=$((${rootfs_partition_end} - ${rootfs_partition_start} + 1))
+ dd if="${rootfs_partition_tempfile}" of="${disk}" bs=512 seek=${rootfs_partition_start} count=${rootfs_partition_num_sectors} conv=fsync,notrunc
+fi
+rm -f "${rootfs_partition_tempfile}"
trap raw_initrd_remove EXIT
# Boot test the new system and run stage 3
diff --git a/net/test/net_test.py b/net/test/net_test.py
index 0dae1c2..bbff4e7 100644
--- a/net/test/net_test.py
+++ b/net/test/net_test.py
@@ -96,6 +96,32 @@
LINUX_VERSION = csocket.LinuxVersion()
LINUX_ANY_VERSION = (0, 0)
+def KernelAtLeast(versions):
+ """Checks the kernel version matches the specified versions.
+
+ Args:
+ versions: a list of versions expressed as tuples,
+ e.g., [(5, 10, 108), (5, 15, 31)]. The kernel version matches if it's
+ between each specified version and the next minor version with last digit
+ set to 0. In this example, the kernel version must match either:
+ >= 5.10.108 and < 5.15.0
+ >= 5.15.31
+ While this is less flexible than matching exact tuples, it allows the caller
+ to pass in fewer arguments, because Android only supports certain minor
+ versions (4.19, 5.4, 5.10, ...)
+
+ Returns:
+ True if the kernel version matches, False otherwise
+ """
+ maxversion = (1000, 255, 65535)
+ for version in sorted(versions, reverse=True):
+ if version[:2] == maxversion[:2]:
+ raise ValueError("Duplicate minor version: %s %s", (version, maxversion))
+ if LINUX_VERSION >= version and LINUX_VERSION < maxversion:
+ return True
+ maxversion = (version[0], version[1], 0)
+ return False
+
def ByteToHex(b):
return "%02x" % (ord(b) if isinstance(b, str) else b)
diff --git a/net/test/ping6_test.py b/net/test/ping6_test.py
index d7cc35c..7cb75cc 100755
--- a/net/test/ping6_test.py
+++ b/net/test/ping6_test.py
@@ -35,8 +35,6 @@
import net_test
-HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
-
ICMP_ECHO = 8
ICMP_ECHOREPLY = 0
ICMPV6_ECHO_REQUEST = 128
@@ -751,7 +749,6 @@
data = net_test.IPV6_PING + b"\x01" + 19994 * b"\x00" + b"aaaaa"
s.sendto(data, ("::1", 953))
- @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmpSocketsNotInIcmp6(self):
numrows = len(self.ReadProcNetSocket("icmp"))
numrows6 = len(self.ReadProcNetSocket("icmp6"))
@@ -761,7 +758,6 @@
self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp")))
self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
- @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmp6SocketsNotInIcmp(self):
numrows = len(self.ReadProcNetSocket("icmp"))
numrows6 = len(self.ReadProcNetSocket("icmp6"))
@@ -777,7 +773,6 @@
s.connect(("127.0.0.1", 0xbeef))
self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
- @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testProcNetIcmp6(self):
numrows6 = len(self.ReadProcNetSocket("icmp6"))
s = net_test.IPv6PingSocket()
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index 06e8cf2..e5aadf3 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -143,6 +143,18 @@
return payload_len
+def GetEspTrailer(length, nexthdr):
+ # ESP padding per RFC 4303 section 2.4.
+ # For a null cipher with a block size of 1, padding is only necessary to
+ # ensure that the 1-byte Pad Length and Next Header fields are right aligned
+ # on a 4-byte boundary.
+ esplen = length + 2 # Packet length plus Pad Length and Next Header.
+ padlen = util.GetPadLength(4, esplen)
+ # The pad bytes are consecutive integers starting from 0x01.
+ padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
+ return padding + struct.pack("BB", padlen, nexthdr)
+
+
def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
"""Apply null encryption to a packet.
@@ -189,16 +201,7 @@
inner_layer = udp_layer
esp_nexthdr = IPPROTO_UDP
-
- # ESP padding per RFC 4303 section 2.4.
- # For a null cipher with a block size of 1, padding is only necessary to
- # ensure that the 1-byte Pad Length and Next Header fields are right aligned
- # on a 4-byte boundary.
- esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header.
- padlen = util.GetPadLength(4, esplen)
- # The pad bytes are consecutive integers starting from 0x01.
- padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
- trailer = padding + struct.pack("BB", padlen, esp_nexthdr)
+ trailer = GetEspTrailer(len(inner_layer), esp_nexthdr)
# Assemble the packet.
esp_packet.payload = scapy.Raw(inner_layer)
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 6296b40..4c5bff5 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -54,7 +54,8 @@
class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
def assertIsUdpEncapEsp(self, packet, spi, seq, length):
- self.assertEqual(IPPROTO_UDP, packet.proto)
+ protocol = packet.nh if packet.version == 6 else packet.proto
+ self.assertEqual(IPPROTO_UDP, protocol)
udp_hdr = packet[scapy.UDP]
self.assertEqual(4500, udp_hdr.dport)
self.assertEqual(length, len(udp_hdr))
@@ -213,29 +214,31 @@
self._TestSocketPolicy(5)
# Sets up sockets and marks to correct netid
- def _SetupUdpEncapSockets(self):
+ def _SetupUdpEncapSockets(self, version):
netid = self.RandomNetid()
- myaddr = self.MyAddress(4, netid)
- remoteaddr = self.GetRemoteAddress(4)
+ myaddr = self.MyAddress(version, netid)
+ remoteaddr = self.GetRemoteAddress(version)
+ family = net_test.GetAddressFamily(version)
# Reserve a port on which to receive UDP encapsulated packets. Sending
# packets works without this (and potentially can send packets with a source
# port belonging to another application), but receiving requires the port to
# be bound and the encapsulation socket option enabled.
- encap_sock = net_test.Socket(AF_INET, SOCK_DGRAM, 0)
+ encap_sock = net_test.Socket(family, SOCK_DGRAM, 0)
encap_sock.bind((myaddr, 0))
encap_port = encap_sock.getsockname()[1]
encap_sock.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP, xfrm.UDP_ENCAP_ESPINUDP)
# Open a socket to send traffic.
- s = socket(AF_INET, SOCK_DGRAM, 0)
+ # TODO: test with a different family than the encap socket.
+ s = socket(family, SOCK_DGRAM, 0)
self.SelectInterface(s, netid, "mark")
s.connect((remoteaddr, 53))
return netid, myaddr, remoteaddr, encap_sock, encap_port, s
# Sets up SAs and applies socket policy to given socket
- def _SetupUdpEncapSaPair(self, myaddr, remoteaddr, in_spi, out_spi,
+ def _SetupUdpEncapSaPair(self, version, myaddr, remoteaddr, in_spi, out_spi,
encap_port, s, use_null_auth):
in_reqid = 123
out_reqid = 456
@@ -252,21 +255,22 @@
use_null_auth)
# Apply socket policies to s.
- xfrm_base.ApplySocketPolicy(s, AF_INET, xfrm.XFRM_POLICY_OUT, out_spi,
+ family = net_test.GetAddressFamily(version)
+ xfrm_base.ApplySocketPolicy(s, family, xfrm.XFRM_POLICY_OUT, out_spi,
out_reqid, None)
# TODO: why does this work without a per-socket policy applied?
# The received packet obviously matches an SA, but don't inbound packets
# need to match a policy as well? (b/71541609)
- xfrm_base.ApplySocketPolicy(s, AF_INET, xfrm.XFRM_POLICY_IN, in_spi,
+ xfrm_base.ApplySocketPolicy(s, family, xfrm.XFRM_POLICY_IN, in_spi,
in_reqid, None)
# Uncomment for debugging.
# subprocess.call("ip xfrm state".split())
# Check that packets can be sent and received.
- def _VerifyUdpEncapSocket(self, netid, remoteaddr, myaddr, encap_port, sock,
- in_spi, out_spi, null_auth, seq_num):
+ def _VerifyUdpEncapSocket(self, version, netid, remoteaddr, myaddr, encap_port,
+ sock, in_spi, out_spi, null_auth, seq_num):
# Now send a packet.
sock.sendto(net_test.UDP_PAYLOAD, (remoteaddr, 53))
srcport = sock.getsockname()[1]
@@ -279,8 +283,8 @@
auth_algo = (
xfrm_base._ALGO_AUTH_NULL if null_auth else xfrm_base._ALGO_HMAC_SHA1)
expected_len = xfrm_base.GetEspPacketLength(
- xfrm.XFRM_MODE_TRANSPORT, 4, True, net_test.UDP_PAYLOAD, auth_algo,
- xfrm_base._ALGO_CBC_AES_256)
+ xfrm.XFRM_MODE_TRANSPORT, version, True, net_test.UDP_PAYLOAD,
+ auth_algo, xfrm_base._ALGO_CBC_AES_256)
self.assertIsUdpEncapEsp(packet, out_spi, seq_num, expected_len)
# Now test the receive path. Because we don't know how to decrypt packets,
@@ -291,9 +295,10 @@
# So the source and destination ports are swapped and the packet appears to
# be sent from srcport to port 53. Open another socket on that port, and
# apply the inbound policy to it.
- twisted_socket = socket(AF_INET, SOCK_DGRAM, 0)
+ family = net_test.GetAddressFamily(version)
+ twisted_socket = socket(family, SOCK_DGRAM, 0)
csocket.SetSocketTimeout(twisted_socket, 100)
- twisted_socket.bind(("0.0.0.0", 53))
+ twisted_socket.bind((net_test.GetWildcardAddress(version), 53))
# Save the payload of the packet so we can replay it back to ourselves, and
# replace the SPI with our inbound SPI.
@@ -305,9 +310,10 @@
start_integrity_failures = sainfo.stats.integrity_failed
# Now play back the valid packet and check that we receive it.
- incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
+ ip = {4: scapy.IP, 6: scapy.IPv6}[version]
+ incoming = (ip(src=remoteaddr, dst=myaddr) /
scapy.UDP(sport=4500, dport=encap_port) / payload)
- incoming = scapy.IP(bytes(incoming))
+ incoming = ip(bytes(incoming))
self.ReceivePacketOn(netid, incoming)
sainfo = self.xfrm.FindSaInfo(in_spi)
@@ -324,27 +330,27 @@
else:
data, src = twisted_socket.recvfrom(4096)
self.assertEqual(net_test.UDP_PAYLOAD, data)
- self.assertEqual((remoteaddr, srcport), src)
+ self.assertEqual((remoteaddr, srcport), src[:2])
self.assertEqual(start_integrity_failures, sainfo.stats.integrity_failed)
# Check that unencrypted packets on twisted_socket are not received.
unencrypted = (
- scapy.IP(src=remoteaddr, dst=myaddr) / scapy.UDP(
+ ip(src=remoteaddr, dst=myaddr) / scapy.UDP(
sport=srcport, dport=53) / net_test.UDP_PAYLOAD)
self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
twisted_socket.close()
- def _RunEncapSocketPolicyTest(self, in_spi, out_spi, use_null_auth):
+ def _RunEncapSocketPolicyTest(self, version, in_spi, out_spi, use_null_auth):
netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
- self._SetupUdpEncapSockets()
+ self._SetupUdpEncapSockets(version)
- self._SetupUdpEncapSaPair(myaddr, remoteaddr, in_spi, out_spi, encap_port,
- s, use_null_auth)
+ self._SetupUdpEncapSaPair(version, myaddr, remoteaddr, in_spi, out_spi,
+ encap_port, s, use_null_auth)
# Check that UDP encap sockets work with socket policy and given SAs
- self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s, in_spi,
- out_spi, use_null_auth, 1)
+ self._VerifyUdpEncapSocket(version, netid, remoteaddr, myaddr, encap_port,
+ s, in_spi, out_spi, use_null_auth, 1)
encap_sock.close()
s.close()
@@ -353,16 +359,16 @@
# Use the same SPI both inbound and outbound because this lets us receive
# encrypted packets by simply replaying the packets the kernel sends
# without having to disable authentication
- self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI, True)
+ self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI, True)
def testUdpEncapSameSpis(self):
- self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI, False)
+ self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI, False)
def testUdpEncapDifferentSpisNullAuth(self):
- self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI2, True)
+ self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI2, True)
def testUdpEncapDifferentSpis(self):
- self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI2, False)
+ self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI2, False)
def testUdpEncapRekey(self):
# Select the two SPIs that will be used
@@ -371,31 +377,31 @@
# Setup sockets
netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
- self._SetupUdpEncapSockets()
+ self._SetupUdpEncapSockets(4)
# The SAs must use null authentication, since we change SPIs on the fly
# Without null authentication, this would result in an ESP authentication
# error since the SPI is part of the authenticated section. The packet
# would then be dropped
- self._SetupUdpEncapSaPair(myaddr, remoteaddr, start_spi, start_spi,
+ self._SetupUdpEncapSaPair(4, myaddr, remoteaddr, start_spi, start_spi,
encap_port, s, True)
# Check that UDP encap sockets work with socket policy and given SAs
- self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+ self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
start_spi, start_spi, True, 1)
# Rekey this socket using the make-before-break paradigm. First we create
# new SAs, update the per-socket policies, and only then remove the old SAs
#
# This allows us to switch to the new SA without breaking the outbound path.
- self._SetupUdpEncapSaPair(myaddr, remoteaddr, rekey_spi, rekey_spi,
+ self._SetupUdpEncapSaPair(4, myaddr, remoteaddr, rekey_spi, rekey_spi,
encap_port, s, True)
# Check that UDP encap socket works with updated socket policy, sending
# using new SA, but receiving on both old and new SAs
- self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+ self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
rekey_spi, rekey_spi, True, 1)
- self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+ self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
start_spi, rekey_spi, True, 2)
# Delete old SAs
@@ -403,11 +409,93 @@
self.xfrm.DeleteSaInfo(myaddr, start_spi, IPPROTO_ESP)
# Check that UDP encap socket works with updated socket policy and new SAs
- self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+ self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
rekey_spi, rekey_spi, True, 3)
encap_sock.close()
s.close()
+ def _CheckUDPEncapRecv(self, version, mode):
+ netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
+ self._SetupUdpEncapSockets(version)
+
+ # Create inbound and outbound SAs that specify UDP encapsulation.
+ reqid = 123
+ encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port),
+ htons(4500), 16 * b"\x00"))
+ self.xfrm.AddSaInfo(remoteaddr, myaddr, TEST_SPI, mode, reqid,
+ xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None,
+ encaptmpl, None, None)
+
+ sainfo = self.xfrm.FindSaInfo(TEST_SPI)
+ self.assertEqual(0, sainfo.curlft.packets)
+ self.assertEqual(0, sainfo.curlft.bytes)
+ self.assertEqual(0, sainfo.stats.integrity_failed)
+
+ IpType = {4: scapy.IP, 6: scapy.IPv6}[version]
+ if mode == xfrm.XFRM_MODE_TRANSPORT:
+ # Due to a bug in the IPv6 UDP encap code, there must be at least 32
+ # bytes after the ESP header or the packet will be dropped.
+ # 8 (UDP header) + 18 (payload) + 2 (ESP trailer) = 28, dropped
+ # 8 (UDP header) + 19 (payload) + 4 (ESP trailer) = 32, received
+ # There is a similar bug in IPv4 encap, but the minimum is only 12 bytes,
+ # which is much less likely to occur. This doesn't affect tunnel mode
+ # because IP headers are always at least 20 bytes long.
+ data = 19 * b"a"
+ datalen = len(data)
+ data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP)
+ self.assertEqual(32, len(data) + 8)
+ # TODO: update scapy and use scapy.ESP instead of manually generating ESP header.
+ inner_pkt = xfrm.EspHdr(spi=TEST_SPI, seqnum=1).Pack() + bytes(
+ scapy.UDP(sport=443, dport=32123) / data)
+ input_pkt = (IpType(src=remoteaddr, dst=myaddr) /
+ scapy.UDP(sport=4500, dport=encap_port) /
+ inner_pkt)
+ else:
+ # TODO: test IPv4 in IPv6 encap and vice versa.
+ data = b"" # Empty UDP payload
+ datalen = len(data) + {4: 20, 6: 40}[version]
+ data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP)
+ # TODO: update scapy and use scapy.ESP instead of manually generating ESP header.
+ inner_pkt = xfrm.EspHdr(spi=TEST_SPI, seqnum=1).Pack() + bytes(
+ IpType(src=remoteaddr, dst=myaddr) /
+ scapy.UDP(sport=443, dport=32123) / data)
+ input_pkt = (IpType(src=remoteaddr, dst=myaddr) /
+ scapy.UDP(sport=4500, dport=encap_port) /
+ inner_pkt)
+
+ # input_pkt.show2()
+ self.ReceivePacketOn(netid, input_pkt)
+
+ sainfo = self.xfrm.FindSaInfo(TEST_SPI)
+ self.assertEqual(1, sainfo.curlft.packets)
+ self.assertEqual(datalen + 8, sainfo.curlft.bytes)
+ self.assertEqual(0, sainfo.stats.integrity_failed)
+
+ # Uncomment for debugging.
+ # subprocess.call("ip -s xfrm state".split())
+
+ encap_sock.close()
+ s.close()
+
+ def testIPv4UDPEncapRecvTransport(self):
+ self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TRANSPORT)
+
+ def testIPv4UDPEncapRecvTunnel(self):
+ self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TUNNEL)
+
+ # IPv6 UDP encap is broken between:
+ # 4db4075f92af ("esp6: fix check on ipv6_skip_exthdr's return value") and
+ # 5f9c55c8066b ("ipv6: check return value of ipv6_skip_exthdr")
+ @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]),
+ reason="Unsupported or broken on current kernel")
+ def testIPv6UDPEncapRecvTransport(self):
+ self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TRANSPORT)
+
+ @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]),
+ reason="Unsupported or broken on current kernel")
+ def testIPv6UDPEncapRecvTunnel(self):
+ self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TUNNEL)
+
def testAllocSpecificSpi(self):
spi = 0xABCD
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)