Add tunnel input tests to net_tests

This commit adds tunnel (non-VTI) input tests to xfrm_tunnel_test
using null encryption.

This is CP from 585815 on master branch

Bug: 123202162
Test: VtsKernelNetTest can pass

Change-Id: Ied437a50cdb29b189d74aaa4c3604d4aa2aec8e4
Merged-In: I829e6b12e126bf40915544cdfa94271b0ad1b1b5
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index 761c5b0..b8da743 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -19,15 +19,18 @@
 from socket import *  # pylint: disable=wildcard-import
 
 import random
+import itertools
 import struct
 import unittest
 
+from scapy import all as scapy
 from tun_twister import TunTwister
 import csocket
 import iproute
 import multinetwork_base
 import net_test
 import packets
+import util
 import xfrm
 import xfrm_base
 
@@ -80,50 +83,99 @@
   return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
 
 
+def InjectTests():
+  InjectParameterizedTests(XfrmTunnelTest)
+
+
+def InjectParameterizedTests(cls):
+  VERSIONS = (4, 6)
+  param_list = itertools.product(VERSIONS, VERSIONS)
+
+  def NameGenerator(*args):
+    return "IPv%d_in_IPv%d" % tuple(args)
+
+  util.InjectParameterizedTest(cls, param_list, NameGenerator)
+
+
 class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
 
-  def _CheckTunnelOutput(self, inner_version, outer_version):
-    """Test a bi-directional XFRM Tunnel with explicit selectors"""
+  def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid,
+                         netid, local_inner, remote_inner, local_outer,
+                         remote_outer, write_sock):
+
+    write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
+    self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
+                            local_outer, remote_outer)
+
+  def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid,
+                        netid, local_inner, remote_inner, local_outer,
+                        remote_outer, read_sock):
+
+    # The second parameter of the tuple is the port number regardless of AF.
+    local_port = read_sock.getsockname()[1]
+
+    # Build and receive an ESP packet destined for the inner socket
+    IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
+    input_pkt = (
+        IpType(src=remote_inner, dst=local_inner) / scapy.UDP(
+            sport=1234, dport=local_port) / net_test.UDP_PAYLOAD)
+    input_pkt = IpType(str(input_pkt))  # Compute length, checksum.
+    input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, _TEST_IN_SPI, 1,
+                                                (remote_outer, local_outer))
+    self.ReceivePacketOn(underlying_netid, input_pkt)
+
+    # Verify that the packet data and src are correct
+    data, src = read_sock.recvfrom(4096)
+    self.assertEquals(net_test.UDP_PAYLOAD, data)
+    self.assertEquals(remote_inner, src[0])
+    self.assertEquals(1234, src[1])
+
+  def _TestTunnel(self, inner_version, outer_version, func, direction):
+    """Test a unidirectional XFRM Tunnel with explicit selectors"""
     # Select the underlying netid, which represents the external
     # interface from/to which to route ESP packets.
-    underlying_netid = self.RandomNetid()
+    u_netid = self.RandomNetid()
     # Select a random netid that will originate traffic locally and
-    # which represents the logical tunnel network.
-    netid = self.RandomNetid(exclude=underlying_netid)
+    # which represents the netid on which the plaintext is sent
+    netid = self.RandomNetid(exclude=u_netid)
 
     local_inner = self.MyAddress(inner_version, netid)
     remote_inner = _GetRemoteInnerAddress(inner_version)
-    local_outer = self.MyAddress(outer_version, underlying_netid)
+    local_outer = self.MyAddress(outer_version, u_netid)
     remote_outer = _GetRemoteOuterAddress(outer_version)
 
+    # Create input/ouput SPs, SAs and sockets to simulate a more realistic
+    # environment.
+    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN,
+                           xfrm.SrcDstSelector(remote_inner, local_inner),
+                           remote_outer, local_outer, _TEST_IN_SPI,
+                           xfrm_base._ALGO_CRYPT_NULL,
+                           xfrm_base._ALGO_AUTH_NULL, None, None, None)
+
     self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT,
                            xfrm.SrcDstSelector(local_inner, remote_inner),
                            local_outer, remote_outer, _TEST_OUT_SPI,
                            xfrm_base._ALGO_CBC_AES_256,
-                           xfrm_base._ALGO_HMAC_SHA1,
-                           None, underlying_netid, None)
+                           xfrm_base._ALGO_HMAC_SHA1, None, u_netid, None)
 
     write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
-    # Select an interface, which provides the source address of the inner
-    # packet.
     self.SelectInterface(write_sock, netid, "mark")
-    write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
-    self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
-                            local_outer, remote_outer)
+    read_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
+    read_sock.bind((net_test.GetWildcardAddress(inner_version), 0))
+    # Guard against the eventuality of the receive failing.
+    net_test.SetNonBlocking(read_sock.fileno())
 
-  # TODO: Add support for the input path.
+    sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
+    func(inner_version, outer_version, u_netid, netid, local_inner,
+         remote_inner, local_outer, remote_outer, sock)
 
-  def testIpv4InIpv4TunnelOutput(self):
-    self._CheckTunnelOutput(4, 4)
+  def ParamTestTunnelInput(self, inner_version, outer_version):
+    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput,
+                     xfrm.XFRM_POLICY_IN)
 
-  def testIpv4InIpv6TunnelOutput(self):
-    self._CheckTunnelOutput(4, 6)
-
-  def testIpv6InIpv4TunnelOutput(self):
-    self._CheckTunnelOutput(6, 4)
-
-  def testIpv6InIpv6TunnelOutput(self):
-    self._CheckTunnelOutput(6, 6)
+  def ParamTestTunnelOutput(self, inner_version, outer_version):
+    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
+                     xfrm.XFRM_POLICY_OUT)
 
 
 @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
@@ -448,7 +500,7 @@
     # The second parameter of the tuple is the port number regardless of AF.
     port = read_sock.getsockname()[1]
     # Guard against the eventuality of the receive failing.
-    csocket.SetSocketTimeout(read_sock, 100)
+    net_test.SetNonBlocking(read_sock.fileno())
 
     # Send a packet out via the tunnel-backed network, bound for the port number
     # of the input socket.
@@ -523,20 +575,21 @@
 @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
 class XfrmVtiTest(XfrmTunnelBase):
 
-   INTERFACE_CLASS = VtiInterface
+  INTERFACE_CLASS = VtiInterface
 
-   def testVtiInputOutput(self):
-     self.CheckTunnelInputOutput()
+  def testVtiInputOutput(self):
+    self.CheckTunnelInputOutput()
 
 
 @unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
 class XfrmInterfaceTest(XfrmTunnelBase):
 
-   INTERFACE_CLASS = XfrmInterface
+  INTERFACE_CLASS = XfrmInterface
 
-   def testXfrmiInputOutput(self):
-     self.CheckTunnelInputOutput()
+  def testXfrmiInputOutput(self):
+    self.CheckTunnelInputOutput()
 
 
 if __name__ == "__main__":
+  InjectTests()
   unittest.main()