Merge "TEST_MAPPING: add vts net tests to kernel-presubmit"
diff --git a/net/test/bpf_test.py b/net/test/bpf_test.py
index 291e540..d98c494 100755
--- a/net/test/bpf_test.py
+++ b/net/test/bpf_test.py
@@ -84,18 +84,6 @@
 
 libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
 
-# bpf_ktime_get_ns() was made non-GPL requiring in 5.8 and at the same time
-# bpf_ktime_get_boot_ns() was added, both of these changes were backported to
-# Android Common Kernel in 4.14.221, 4.19.175, 5.4.97.
-# As such we require 4.14.222+ 4.19.176+ 5.4.98+ 5.8.0+,
-# but since we only really care about LTS releases:
-HAVE_EBPF_KTIME_GET_NS_APACHE2 = (
-    ((LINUX_VERSION > (4, 14, 221)) and (LINUX_VERSION < (4, 19, 0))) or
-    ((LINUX_VERSION > (4, 19, 175)) and (LINUX_VERSION < (5, 4, 0))) or
-    (LINUX_VERSION > (5, 4, 97))
-)
-HAVE_EBPF_KTIME_GET_BOOT_NS = HAVE_EBPF_KTIME_GET_NS_APACHE2
-
 KEY_SIZE = 4
 VALUE_SIZE = 4
 TOTAL_ENTRIES = 20
@@ -383,8 +371,6 @@
   # 5.4:  https://android-review.googlesource.com/c/kernel/common/+/1355422
   #       commit 45217b91eaaa3a563247c4f470f4cb785de6b1c6
   #
-  @unittest.skipUnless(HAVE_EBPF_KTIME_GET_NS_APACHE2,
-                       "no bpf_ktime_get_ns() support for non-GPL programs")
   def testKtimeGetNsApache2(self):
     instructions = [BpfFuncCall(BPF_FUNC_ktime_get_ns)] + INS_BPF_EXIT_BLOCK
     self.prog_fd = BpfProgLoad(BPF_PROG_TYPE_SCHED_CLS, instructions,
@@ -406,8 +392,6 @@
   # 5.4:  https://android-review.googlesource.com/c/kernel/common/+/1585252
   #       commit 57b3f4830fb66a6038c4c1c66ca2e138fe8be231
   #
-  @unittest.skipUnless(HAVE_EBPF_KTIME_GET_BOOT_NS,
-                       "no bpf_ktime_get_boot_ns() support")
   def testKtimeGetBootNs(self):
     instructions = [
         BpfFuncCall(BPF_FUNC_ktime_get_boot_ns),
diff --git a/net/test/build_rootfs.sh b/net/test/build_rootfs.sh
index a86496b..3a88cad 100755
--- a/net/test/build_rootfs.sh
+++ b/net/test/build_rootfs.sh
@@ -402,6 +402,7 @@
 
 # Process the initrd to remove kernel-specific metadata
 kernel_version=$(basename $(lz4 -lcd "${raw_initrd}" | sudo cpio -idumv 2>&1 | grep usr/lib/modules/ - | head -n1))
+lz4 -lcd "${raw_initrd}" | sudo cpio -idumv
 sudo rm -rf usr/lib/modules
 sudo mkdir -p usr/lib/modules
 
@@ -421,33 +422,56 @@
 # Leave workdir to boot-test combined initrd
 cd -
 
+rootfs_partition_tempfile=$(mktemp)
 # Mount the new filesystem locally
 if [[ ${rootfs_partition} = "raw" ]]; then
     sudo mount -o loop -t ext3 "${disk}" "${mount}"
 else
     rootfs_partition_start=$(partx -g -o START -s -n "${rootfs_partition}" "${disk}" | xargs)
     rootfs_partition_offset=$((${rootfs_partition_start} * 512))
-    sudo mount -o loop,offset=${rootfs_partition_offset} -t ext3 "${disk}" "${mount}"
+    rootfs_partition_end=$(partx -g -o END -s -n "${rootfs_partition}" "${disk}" | xargs)
+    rootfs_partition_num_sectors=$((${rootfs_partition_end} - ${rootfs_partition_start} + 1))
+    dd if="${disk}" of="${rootfs_partition_tempfile}" bs=512 skip=${rootfs_partition_start} count=${rootfs_partition_num_sectors}
 fi
 image_unmount2() {
   sudo umount "${mount}"
   raw_initrd_remove
 }
-trap image_unmount2 EXIT
+if [[ ${rootfs_partition} = "raw" ]]; then
+    trap image_unmount2 EXIT
+fi
 
 # Embed the kernel and dtb images now, if requested
-if [[ "${embed_kernel_initrd_dtb}" = "1" ]]; then
-  if [ -n "${dtb}" ]; then
-    sudo mkdir -p "${mount}/boot/dtb/${dtb_subdir}"
-    sudo cp -a "${dtb}" "${mount}/boot/dtb/${dtb_subdir}"
-    sudo chown -R root:root "${mount}/boot/dtb/${dtb_subdir}"
-  fi
-  sudo cp -a "${kernel}" "${mount}/boot/vmlinuz-${kernel_version}"
-  sudo chown root:root "${mount}/boot/vmlinuz-${kernel_version}"
+if [[ ${rootfs_partition} = "raw" ]]; then
+    if [[ "${embed_kernel_initrd_dtb}" = "1" ]]; then
+	if [ -n "${dtb}" ]; then
+	    sudo mkdir -p "${mount}/boot/dtb/${dtb_subdir}"
+	    sudo cp -a "${dtb}" "${mount}/boot/dtb/${dtb_subdir}"
+	    sudo chown -R root:root "${mount}/boot/dtb/${dtb_subdir}"
+	fi
+	sudo cp -a "${kernel}" "${mount}/boot/vmlinuz-${kernel_version}"
+	sudo chown root:root "${mount}/boot/vmlinuz-${kernel_version}"
+    fi
+else
+    if [[ "${embed_kernel_initrd_dtb}" = "1" ]]; then
+	if [ -n "${dtb}" ]; then
+	    e2mkdir -G 0 -O 0 "${rootfs_partition_tempfile}":"/boot/dtb/${dtb_subdir}"
+	    e2cp -G 0 -O 0 "${dtb}" "${rootfs_partition_tempfile}":"/boot/dtb/${dtb_subdir}"
+	fi
+	e2cp -G 0 -O 0 "${kernel}" "${rootfs_partition_tempfile}":"/boot/vmlinuz-${kernel_version}"
+    fi
 fi
 
 # Unmount the initial ramdisk
-sudo umount "${mount}"
+if [[ ${rootfs_partition} = "raw" ]]; then
+    sudo umount "${mount}"
+else
+    rootfs_partition_start=$(partx -g -o START -s -n "${rootfs_partition}" "${disk}" | xargs)
+    rootfs_partition_end=$(partx -g -o END -s -n "${rootfs_partition}" "${disk}" | xargs)
+    rootfs_partition_num_sectors=$((${rootfs_partition_end} - ${rootfs_partition_start} + 1))
+    dd if="${rootfs_partition_tempfile}" of="${disk}" bs=512 seek=${rootfs_partition_start} count=${rootfs_partition_num_sectors} conv=fsync,notrunc
+fi
+rm -f "${rootfs_partition_tempfile}"
 trap raw_initrd_remove EXIT
 
 # Boot test the new system and run stage 3
diff --git a/net/test/net_test.py b/net/test/net_test.py
index 0dae1c2..bbff4e7 100644
--- a/net/test/net_test.py
+++ b/net/test/net_test.py
@@ -96,6 +96,32 @@
 LINUX_VERSION = csocket.LinuxVersion()
 LINUX_ANY_VERSION = (0, 0)
 
+def KernelAtLeast(versions):
+  """Checks the kernel version matches the specified versions.
+
+  Args:
+    versions: a list of versions expressed as tuples,
+    e.g., [(5, 10, 108), (5, 15, 31)]. The kernel version matches if it's
+    between each specified version and the next minor version with last digit
+    set to 0. In this example, the kernel version must match either:
+      >= 5.10.108 and < 5.15.0
+      >= 5.15.31
+    While this is less flexible than matching exact tuples, it allows the caller
+    to pass in fewer arguments, because Android only supports certain minor
+    versions (4.19, 5.4, 5.10, ...)
+
+  Returns:
+    True if the kernel version matches, False otherwise
+  """
+  maxversion = (1000, 255, 65535)
+  for version in sorted(versions, reverse=True):
+    if version[:2] == maxversion[:2]:
+      raise ValueError("Duplicate minor version: %s %s", (version, maxversion))
+    if LINUX_VERSION >= version and LINUX_VERSION < maxversion:
+      return True
+    maxversion = (version[0], version[1], 0)
+  return False
+
 def ByteToHex(b):
   return "%02x" % (ord(b) if isinstance(b, str) else b)
 
diff --git a/net/test/ping6_test.py b/net/test/ping6_test.py
index d7cc35c..7cb75cc 100755
--- a/net/test/ping6_test.py
+++ b/net/test/ping6_test.py
@@ -35,8 +35,6 @@
 import net_test
 
 
-HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
-
 ICMP_ECHO = 8
 ICMP_ECHOREPLY = 0
 ICMPV6_ECHO_REQUEST = 128
@@ -751,7 +749,6 @@
     data = net_test.IPV6_PING + b"\x01" + 19994 * b"\x00" + b"aaaaa"
     s.sendto(data, ("::1", 953))
 
-  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
   def testIcmpSocketsNotInIcmp6(self):
     numrows = len(self.ReadProcNetSocket("icmp"))
     numrows6 = len(self.ReadProcNetSocket("icmp6"))
@@ -761,7 +758,6 @@
     self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp")))
     self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
 
-  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
   def testIcmp6SocketsNotInIcmp(self):
     numrows = len(self.ReadProcNetSocket("icmp"))
     numrows6 = len(self.ReadProcNetSocket("icmp6"))
@@ -777,7 +773,6 @@
     s.connect(("127.0.0.1", 0xbeef))
     self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
 
-  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
   def testProcNetIcmp6(self):
     numrows6 = len(self.ReadProcNetSocket("icmp6"))
     s = net_test.IPv6PingSocket()
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index 06e8cf2..e5aadf3 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -143,6 +143,18 @@
   return payload_len
 
 
+def GetEspTrailer(length, nexthdr):
+  # ESP padding per RFC 4303 section 2.4.
+  # For a null cipher with a block size of 1, padding is only necessary to
+  # ensure that the 1-byte Pad Length and Next Header fields are right aligned
+  # on a 4-byte boundary.
+  esplen = length + 2  # Packet length plus Pad Length and Next Header.
+  padlen = util.GetPadLength(4, esplen)
+  # The pad bytes are consecutive integers starting from 0x01.
+  padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
+  return padding + struct.pack("BB", padlen, nexthdr)
+
+
 def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
   """Apply null encryption to a packet.
 
@@ -189,16 +201,7 @@
     inner_layer = udp_layer
     esp_nexthdr = IPPROTO_UDP
 
-
-  # ESP padding per RFC 4303 section 2.4.
-  # For a null cipher with a block size of 1, padding is only necessary to
-  # ensure that the 1-byte Pad Length and Next Header fields are right aligned
-  # on a 4-byte boundary.
-  esplen = (len(inner_layer) + 2)  # UDP length plus Pad Length and Next Header.
-  padlen = util.GetPadLength(4, esplen)
-  # The pad bytes are consecutive integers starting from 0x01.
-  padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
-  trailer = padding + struct.pack("BB", padlen, esp_nexthdr)
+  trailer = GetEspTrailer(len(inner_layer), esp_nexthdr)
 
   # Assemble the packet.
   esp_packet.payload = scapy.Raw(inner_layer)
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 6296b40..4c5bff5 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -54,7 +54,8 @@
 class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
 
   def assertIsUdpEncapEsp(self, packet, spi, seq, length):
-    self.assertEqual(IPPROTO_UDP, packet.proto)
+    protocol = packet.nh if packet.version == 6 else packet.proto
+    self.assertEqual(IPPROTO_UDP, protocol)
     udp_hdr = packet[scapy.UDP]
     self.assertEqual(4500, udp_hdr.dport)
     self.assertEqual(length, len(udp_hdr))
@@ -213,29 +214,31 @@
     self._TestSocketPolicy(5)
 
   # Sets up sockets and marks to correct netid
-  def _SetupUdpEncapSockets(self):
+  def _SetupUdpEncapSockets(self, version):
     netid = self.RandomNetid()
-    myaddr = self.MyAddress(4, netid)
-    remoteaddr = self.GetRemoteAddress(4)
+    myaddr = self.MyAddress(version, netid)
+    remoteaddr = self.GetRemoteAddress(version)
+    family = net_test.GetAddressFamily(version)
 
     # Reserve a port on which to receive UDP encapsulated packets. Sending
     # packets works without this (and potentially can send packets with a source
     # port belonging to another application), but receiving requires the port to
     # be bound and the encapsulation socket option enabled.
-    encap_sock = net_test.Socket(AF_INET, SOCK_DGRAM, 0)
+    encap_sock = net_test.Socket(family, SOCK_DGRAM, 0)
     encap_sock.bind((myaddr, 0))
     encap_port = encap_sock.getsockname()[1]
     encap_sock.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP, xfrm.UDP_ENCAP_ESPINUDP)
 
     # Open a socket to send traffic.
-    s = socket(AF_INET, SOCK_DGRAM, 0)
+    # TODO: test with a different family than the encap socket.
+    s = socket(family, SOCK_DGRAM, 0)
     self.SelectInterface(s, netid, "mark")
     s.connect((remoteaddr, 53))
 
     return netid, myaddr, remoteaddr, encap_sock, encap_port, s
 
   # Sets up SAs and applies socket policy to given socket
-  def _SetupUdpEncapSaPair(self, myaddr, remoteaddr, in_spi, out_spi,
+  def _SetupUdpEncapSaPair(self, version, myaddr, remoteaddr, in_spi, out_spi,
                            encap_port, s, use_null_auth):
     in_reqid = 123
     out_reqid = 456
@@ -252,21 +255,22 @@
                      use_null_auth)
 
     # Apply socket policies to s.
-    xfrm_base.ApplySocketPolicy(s, AF_INET, xfrm.XFRM_POLICY_OUT, out_spi,
+    family = net_test.GetAddressFamily(version)
+    xfrm_base.ApplySocketPolicy(s, family, xfrm.XFRM_POLICY_OUT, out_spi,
                                 out_reqid, None)
 
     # TODO: why does this work without a per-socket policy applied?
     # The received  packet obviously matches an SA, but don't inbound packets
     # need to match a policy as well? (b/71541609)
-    xfrm_base.ApplySocketPolicy(s, AF_INET, xfrm.XFRM_POLICY_IN, in_spi,
+    xfrm_base.ApplySocketPolicy(s, family, xfrm.XFRM_POLICY_IN, in_spi,
                                 in_reqid, None)
 
     # Uncomment for debugging.
     # subprocess.call("ip xfrm state".split())
 
   # Check that packets can be sent and received.
-  def _VerifyUdpEncapSocket(self, netid, remoteaddr, myaddr, encap_port, sock,
-                           in_spi, out_spi, null_auth, seq_num):
+  def _VerifyUdpEncapSocket(self, version, netid, remoteaddr, myaddr, encap_port,
+                           sock, in_spi, out_spi, null_auth, seq_num):
     # Now send a packet.
     sock.sendto(net_test.UDP_PAYLOAD, (remoteaddr, 53))
     srcport = sock.getsockname()[1]
@@ -279,8 +283,8 @@
     auth_algo = (
         xfrm_base._ALGO_AUTH_NULL if null_auth else xfrm_base._ALGO_HMAC_SHA1)
     expected_len = xfrm_base.GetEspPacketLength(
-        xfrm.XFRM_MODE_TRANSPORT, 4, True, net_test.UDP_PAYLOAD, auth_algo,
-        xfrm_base._ALGO_CBC_AES_256)
+        xfrm.XFRM_MODE_TRANSPORT, version, True, net_test.UDP_PAYLOAD,
+        auth_algo, xfrm_base._ALGO_CBC_AES_256)
     self.assertIsUdpEncapEsp(packet, out_spi, seq_num, expected_len)
 
     # Now test the receive path. Because we don't know how to decrypt packets,
@@ -291,9 +295,10 @@
     # So the source and destination ports are swapped and the packet appears to
     # be sent from srcport to port 53. Open another socket on that port, and
     # apply the inbound policy to it.
-    twisted_socket = socket(AF_INET, SOCK_DGRAM, 0)
+    family = net_test.GetAddressFamily(version)
+    twisted_socket = socket(family, SOCK_DGRAM, 0)
     csocket.SetSocketTimeout(twisted_socket, 100)
-    twisted_socket.bind(("0.0.0.0", 53))
+    twisted_socket.bind((net_test.GetWildcardAddress(version), 53))
 
     # Save the payload of the packet so we can replay it back to ourselves, and
     # replace the SPI with our inbound SPI.
@@ -305,9 +310,10 @@
     start_integrity_failures = sainfo.stats.integrity_failed
 
     # Now play back the valid packet and check that we receive it.
-    incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
+    ip = {4: scapy.IP, 6: scapy.IPv6}[version]
+    incoming = (ip(src=remoteaddr, dst=myaddr) /
                 scapy.UDP(sport=4500, dport=encap_port) / payload)
-    incoming = scapy.IP(bytes(incoming))
+    incoming = ip(bytes(incoming))
     self.ReceivePacketOn(netid, incoming)
 
     sainfo = self.xfrm.FindSaInfo(in_spi)
@@ -324,27 +330,27 @@
     else:
       data, src = twisted_socket.recvfrom(4096)
       self.assertEqual(net_test.UDP_PAYLOAD, data)
-      self.assertEqual((remoteaddr, srcport), src)
+      self.assertEqual((remoteaddr, srcport), src[:2])
       self.assertEqual(start_integrity_failures, sainfo.stats.integrity_failed)
 
     # Check that unencrypted packets on twisted_socket are not received.
     unencrypted = (
-        scapy.IP(src=remoteaddr, dst=myaddr) / scapy.UDP(
+        ip(src=remoteaddr, dst=myaddr) / scapy.UDP(
             sport=srcport, dport=53) / net_test.UDP_PAYLOAD)
     self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
 
     twisted_socket.close()
 
-  def _RunEncapSocketPolicyTest(self, in_spi, out_spi, use_null_auth):
+  def _RunEncapSocketPolicyTest(self, version, in_spi, out_spi, use_null_auth):
     netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
-        self._SetupUdpEncapSockets()
+        self._SetupUdpEncapSockets(version)
 
-    self._SetupUdpEncapSaPair(myaddr, remoteaddr, in_spi, out_spi, encap_port,
-                              s, use_null_auth)
+    self._SetupUdpEncapSaPair(version, myaddr, remoteaddr, in_spi, out_spi,
+                              encap_port, s, use_null_auth)
 
     # Check that UDP encap sockets work with socket policy and given SAs
-    self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s, in_spi,
-                               out_spi, use_null_auth, 1)
+    self._VerifyUdpEncapSocket(version, netid, remoteaddr, myaddr, encap_port,
+                               s, in_spi, out_spi, use_null_auth, 1)
     encap_sock.close()
     s.close()
 
@@ -353,16 +359,16 @@
     # Use the same SPI both inbound and outbound because this lets us receive
     # encrypted packets by simply replaying the packets the kernel sends
     # without having to disable authentication
-    self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI, True)
+    self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI, True)
 
   def testUdpEncapSameSpis(self):
-    self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI, False)
+    self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI, False)
 
   def testUdpEncapDifferentSpisNullAuth(self):
-    self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI2, True)
+    self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI2, True)
 
   def testUdpEncapDifferentSpis(self):
-    self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI2, False)
+    self._RunEncapSocketPolicyTest(4, TEST_SPI, TEST_SPI2, False)
 
   def testUdpEncapRekey(self):
     # Select the two SPIs that will be used
@@ -371,31 +377,31 @@
 
     # Setup sockets
     netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
-        self._SetupUdpEncapSockets()
+        self._SetupUdpEncapSockets(4)
 
     # The SAs must use null authentication, since we change SPIs on the fly
     # Without null authentication, this would result in an ESP authentication
     # error since the SPI is part of the authenticated section. The packet
     # would then be dropped
-    self._SetupUdpEncapSaPair(myaddr, remoteaddr, start_spi, start_spi,
+    self._SetupUdpEncapSaPair(4, myaddr, remoteaddr, start_spi, start_spi,
                               encap_port, s, True)
 
     # Check that UDP encap sockets work with socket policy and given SAs
-    self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+    self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
                                start_spi, start_spi, True, 1)
 
     # Rekey this socket using the make-before-break paradigm. First we create
     # new SAs, update the per-socket policies, and only then remove the old SAs
     #
     # This allows us to switch to the new SA without breaking the outbound path.
-    self._SetupUdpEncapSaPair(myaddr, remoteaddr, rekey_spi, rekey_spi,
+    self._SetupUdpEncapSaPair(4, myaddr, remoteaddr, rekey_spi, rekey_spi,
                               encap_port, s, True)
 
     # Check that UDP encap socket works with updated socket policy, sending
     # using new SA, but receiving on both old and new SAs
-    self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+    self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
                                rekey_spi, rekey_spi, True, 1)
-    self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+    self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
                                start_spi, rekey_spi, True, 2)
 
     # Delete old SAs
@@ -403,11 +409,93 @@
     self.xfrm.DeleteSaInfo(myaddr, start_spi, IPPROTO_ESP)
 
     # Check that UDP encap socket works with updated socket policy and new SAs
-    self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
+    self._VerifyUdpEncapSocket(4, netid, remoteaddr, myaddr, encap_port, s,
                                rekey_spi, rekey_spi, True, 3)
     encap_sock.close()
     s.close()
 
+  def _CheckUDPEncapRecv(self, version, mode):
+    netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
+        self._SetupUdpEncapSockets(version)
+
+    # Create inbound and outbound SAs that specify UDP encapsulation.
+    reqid = 123
+    encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port),
+                                    htons(4500), 16 * b"\x00"))
+    self.xfrm.AddSaInfo(remoteaddr, myaddr, TEST_SPI, mode, reqid,
+                    xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None,
+                    encaptmpl, None, None)
+
+    sainfo = self.xfrm.FindSaInfo(TEST_SPI)
+    self.assertEqual(0, sainfo.curlft.packets)
+    self.assertEqual(0, sainfo.curlft.bytes)
+    self.assertEqual(0, sainfo.stats.integrity_failed)
+
+    IpType = {4: scapy.IP, 6: scapy.IPv6}[version]
+    if mode == xfrm.XFRM_MODE_TRANSPORT:
+      # Due to a bug in the IPv6 UDP encap code, there must be at least 32
+      # bytes after the ESP header or the packet will be dropped.
+      # 8 (UDP header) + 18 (payload) + 2 (ESP trailer) = 28, dropped
+      # 8 (UDP header) + 19 (payload) + 4 (ESP trailer) = 32, received
+      # There is a similar bug in IPv4 encap, but the minimum is only 12 bytes,
+      # which is much less likely to occur. This doesn't affect tunnel mode
+      # because IP headers are always at least 20 bytes long.
+      data = 19 * b"a"
+      datalen = len(data)
+      data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP)
+      self.assertEqual(32, len(data) + 8)
+      # TODO: update scapy and use scapy.ESP instead of manually generating ESP header.
+      inner_pkt = xfrm.EspHdr(spi=TEST_SPI, seqnum=1).Pack() + bytes(
+          scapy.UDP(sport=443, dport=32123) / data)
+      input_pkt = (IpType(src=remoteaddr, dst=myaddr) /
+                   scapy.UDP(sport=4500, dport=encap_port) /
+                   inner_pkt)
+    else:
+      # TODO: test IPv4 in IPv6 encap and vice versa.
+      data = b""  # Empty UDP payload
+      datalen = len(data) + {4: 20, 6: 40}[version]
+      data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP)
+      # TODO: update scapy and use scapy.ESP instead of manually generating ESP header.
+      inner_pkt = xfrm.EspHdr(spi=TEST_SPI, seqnum=1).Pack() + bytes(
+          IpType(src=remoteaddr, dst=myaddr) /
+          scapy.UDP(sport=443, dport=32123) / data)
+      input_pkt = (IpType(src=remoteaddr, dst=myaddr) /
+                   scapy.UDP(sport=4500, dport=encap_port) /
+                   inner_pkt)
+
+    # input_pkt.show2()
+    self.ReceivePacketOn(netid, input_pkt)
+
+    sainfo = self.xfrm.FindSaInfo(TEST_SPI)
+    self.assertEqual(1, sainfo.curlft.packets)
+    self.assertEqual(datalen + 8, sainfo.curlft.bytes)
+    self.assertEqual(0, sainfo.stats.integrity_failed)
+
+    # Uncomment for debugging.
+    # subprocess.call("ip -s xfrm state".split())
+
+    encap_sock.close()
+    s.close()
+
+  def testIPv4UDPEncapRecvTransport(self):
+    self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TRANSPORT)
+
+  def testIPv4UDPEncapRecvTunnel(self):
+    self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TUNNEL)
+
+  # IPv6 UDP encap is broken between:
+  # 4db4075f92af ("esp6: fix check on ipv6_skip_exthdr's return value") and
+  # 5f9c55c8066b ("ipv6: check return value of ipv6_skip_exthdr")
+  @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]),
+                       reason="Unsupported or broken on current kernel")
+  def testIPv6UDPEncapRecvTransport(self):
+    self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TRANSPORT)
+
+  @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]),
+                       reason="Unsupported or broken on current kernel")
+  def testIPv6UDPEncapRecvTunnel(self):
+    self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TUNNEL)
+
   def testAllocSpecificSpi(self):
     spi = 0xABCD
     new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)