| #!/usr/bin/python |
| # |
| # Copyright 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import |
| from errno import * # pylint: disable=wildcard-import |
| import os |
| import random |
| import re |
| from scapy import all as scapy |
| from socket import * # pylint: disable=wildcard-import |
| import struct |
| import subprocess |
| import time |
| import unittest |
| |
| import multinetwork_base |
| import net_test |
| import netlink |
| import packets |
| import xfrm |
| |
| XFRM_ADDR_ANY = 16 * "\x00" |
| LOOPBACK = 15 * "\x00" + "\x01" |
| ENCRYPTED_PAYLOAD = ("b1c74998efd6326faebe2061f00f2c750e90e76001664a80c287b150" |
| "59e74bf949769cc6af71e51b539e7de3a2a14cb05a231b969e035174" |
| "d98c5aa0cef1937db98889ec0d08fa408fecf616") |
| ENCRYPTION_KEY = ("308146eb3bd84b044573d60f5a5fd159" |
| "57c7d4fe567a2120f35bae0f9869ec22".decode("hex")) |
| AUTH_TRUNC_KEY = "af442892cdcd0ef650e9c299f9a8436a".decode("hex") |
| |
| TEST_ADDR1 = "2001:4860:4860::8888" |
| TEST_ADDR2 = "2001:4860:4860::8844" |
| |
| TEST_SPI = 0x1234 |
| |
| ALL_ALGORITHMS = 0xffffffff |
| ALGO_CBC_AES_256 = xfrm.XfrmAlgo(("cbc(aes)", 256)) |
| ALGO_HMAC_SHA1 = xfrm.XfrmAlgoAuth(("hmac(sha1)", 128, 96)) |
| |
| class XfrmTest(multinetwork_base.MultiNetworkBaseTest): |
| |
| @classmethod |
| def setUpClass(cls): |
| super(XfrmTest, cls).setUpClass() |
| cls.xfrm = xfrm.Xfrm() |
| |
| def setUp(self): |
| # TODO: delete this when we're more diligent about deleting our SAs. |
| super(XfrmTest, self).setUp() |
| subprocess.call("ip xfrm state flush".split()) |
| |
| def expectIPv6EspPacketOn(self, netid, spi, seq, length): |
| packets = self.ReadAllPacketsOn(netid) |
| self.assertEquals(1, len(packets)) |
| packet = packets[0] |
| self.assertEquals(IPPROTO_ESP, packet.nh) |
| spi_seq = struct.pack("!II", spi, seq) |
| self.assertEquals(spi_seq, str(packet.payload)[:len(spi_seq)]) |
| self.assertEquals(length, len(packet.payload)) |
| |
| def assertIsUdpEncapEsp(self, packet, spi, seq, length): |
| self.assertEquals(IPPROTO_UDP, packet.proto) |
| self.assertEquals(4500, packet.dport) |
| # Skip UDP header. TODO: isn't there a better way to do this? |
| payload = str(packet.payload)[8:] |
| self.assertEquals(length, len(payload)) |
| spi_seq = struct.pack("!II", ntohl(spi), seq) |
| self.assertEquals(spi_seq, str(payload)[:len(spi_seq)]) |
| |
| def testAddSa(self): |
| self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP, |
| xfrm.XFRM_MODE_TRANSPORT, 3320, |
| ALGO_CBC_AES_256, ENCRYPTION_KEY, |
| ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None) |
| expected = ( |
| "src :: dst 2001:4860:4860::8888\n" |
| "\tproto esp spi 0x00001234 reqid 3320 mode transport\n" |
| "\treplay-window 4 \n" |
| "\tauth-trunc hmac(sha1) 0x%s 96\n" |
| "\tenc cbc(aes) 0x%s\n" |
| "\tsel src ::/0 dst ::/0 \n" % ( |
| AUTH_TRUNC_KEY.encode("hex"), ENCRYPTION_KEY.encode("hex"))) |
| |
| actual = subprocess.check_output("ip xfrm state".split()) |
| try: |
| self.assertMultiLineEqual(expected, actual) |
| finally: |
| self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP) |
| |
| |
| @unittest.skipUnless(net_test.LINUX_VERSION < (4, 4, 0), "regression") |
| def testSocketPolicy(self): |
| # Open an IPv6 UDP socket and connect it. |
| s = socket(AF_INET6, SOCK_DGRAM, 0) |
| netid = random.choice(self.NETIDS) |
| self.SelectInterface(s, netid, "mark") |
| s.connect((TEST_ADDR1, 53)) |
| saddr, sport = s.getsockname()[:2] |
| daddr, dport = s.getpeername()[:2] |
| |
| # Create a selector that matches all UDP packets. It's not actually used to |
| # select traffic, that will be done by the socket policy, which selects the |
| # SA entry (i.e., xfrm state) via the SPI and reqid. |
| sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0, |
| AF_INET6, 0, 0, IPPROTO_UDP, 0, 0)) |
| |
| # Create a user policy that specifies that all outbound packets matching the |
| # (essentially no-op) selector should be encrypted. |
| info = xfrm.XfrmUserpolicyInfo((sel, |
| xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR, |
| 100, 0, |
| xfrm.XFRM_POLICY_OUT, |
| xfrm.XFRM_POLICY_ALLOW, |
| xfrm.XFRM_POLICY_LOCALOK, |
| xfrm.XFRM_SHARE_UNIQUE)) |
| |
| # Create a template that specifies the SPI and the protocol. |
| xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, htonl(TEST_SPI), IPPROTO_ESP)) |
| tmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET6, XFRM_ADDR_ANY, 0, |
| xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE, |
| 0, # require |
| ALL_ALGORITHMS, # auth algos |
| ALL_ALGORITHMS, # encryption algos |
| ALL_ALGORITHMS)) # compression algos |
| |
| # Set the policy and template on our socket. |
| data = info.Pack() + tmpl.Pack() |
| s.setsockopt(IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, data) |
| |
| # Because the policy has level set to "require" (the default), attempting |
| # to send a packet results in an error, because there is no SA that |
| # matches the socket policy we set. |
| self.assertRaisesErrno( |
| EAGAIN, |
| s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) |
| |
| # Adding a matching SA causes the packet to go out encrypted. The SA's |
| # SPI must match the one in our template, and the destination address must |
| # match the packet's destination address (in tunnel mode, it has to match |
| # the tunnel destination). |
| reqid = 0 |
| self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP, |
| xfrm.XFRM_MODE_TRANSPORT, reqid, |
| ALGO_CBC_AES_256, ENCRYPTION_KEY, |
| ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None) |
| |
| s.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) |
| self.expectIPv6EspPacketOn(netid, TEST_SPI, 1, 84) |
| |
| # Sending to another destination doesn't work: again, no matching SA. |
| self.assertRaisesErrno( |
| EAGAIN, |
| s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR2, 53)) |
| |
| # Sending on another socket without the policy applied results in an |
| # unencrypted packet going out. |
| s2 = socket(AF_INET6, SOCK_DGRAM, 0) |
| self.SelectInterface(s2, netid, "mark") |
| s2.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) |
| packets = self.ReadAllPacketsOn(netid) |
| self.assertEquals(1, len(packets)) |
| packet = packets[0] |
| self.assertEquals(IPPROTO_UDP, packet.nh) |
| |
| # Deleting the SA causes the first socket to return errors again. |
| self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP) |
| self.assertRaisesErrno( |
| EAGAIN, |
| s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) |
| |
| |
| def testUdpEncapWithSocketPolicy(self): |
| # TODO: test IPv6 instead of IPv4. |
| netid = random.choice(self.NETIDS) |
| myaddr = self.MyAddress(4, netid) |
| remoteaddr = self.GetRemoteAddress(4) |
| |
| # Reserve a port on which to receive UDP encapsulated packets. Sending |
| # packets works without this (and potentially can send packets with a source |
| # port belonging to another application), but receiving requires the port to |
| # be bound and the encapsulation socket option enabled. |
| encap_socket = net_test.Socket(AF_INET, SOCK_DGRAM, 0) |
| encap_socket.bind((myaddr, 0)) |
| encap_port = encap_socket.getsockname()[1] |
| encap_socket.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP, |
| xfrm.UDP_ENCAP_ESPINUDP) |
| |
| # Open a socket to send traffic. |
| s = socket(AF_INET, SOCK_DGRAM, 0) |
| self.SelectInterface(s, netid, "mark") |
| s.connect((remoteaddr, 53)) |
| |
| # Create a UDP encap policy and template inbound and outbound and apply |
| # them to s. |
| sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0, |
| AF_INET, 0, 0, IPPROTO_UDP, 0, 0)) |
| |
| # Use the same SPI both inbound and outbound because this lets us receive |
| # encrypted packets by simply replaying the packets the kernel sends. |
| in_reqid = 123 |
| in_spi = htonl(TEST_SPI) |
| out_reqid = 456 |
| out_spi = htonl(TEST_SPI) |
| |
| # Start with the outbound policy. |
| # TODO: what happens without XFRM_SHARE_UNIQUE? |
| info = xfrm.XfrmUserpolicyInfo((sel, |
| xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR, |
| 100, 0, |
| xfrm.XFRM_POLICY_OUT, |
| xfrm.XFRM_POLICY_ALLOW, |
| xfrm.XFRM_POLICY_LOCALOK, |
| xfrm.XFRM_SHARE_UNIQUE)) |
| xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, out_spi, IPPROTO_ESP)) |
| usertmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET, XFRM_ADDR_ANY, out_reqid, |
| xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE, |
| 0, # require |
| ALL_ALGORITHMS, # auth algos |
| ALL_ALGORITHMS, # encryption algos |
| ALL_ALGORITHMS)) # compression algos |
| |
| data = info.Pack() + usertmpl.Pack() |
| s.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data) |
| |
| # Uncomment for debugging. |
| # subprocess.call("ip xfrm policy".split()) |
| |
| # Create inbound and outbound SAs that specify UDP encapsulation. |
| encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port), |
| htons(4500), 16 * "\x00")) |
| self.xfrm.AddMinimalSaInfo(myaddr, remoteaddr, out_spi, IPPROTO_ESP, |
| xfrm.XFRM_MODE_TRANSPORT, out_reqid, |
| ALGO_CBC_AES_256, ENCRYPTION_KEY, |
| ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl) |
| |
| # Add an encap template that's the mirror of the outbound one. |
| encaptmpl.sport, encaptmpl.dport = encaptmpl.dport, encaptmpl.sport |
| self.xfrm.AddMinimalSaInfo(remoteaddr, myaddr, in_spi, IPPROTO_ESP, |
| xfrm.XFRM_MODE_TRANSPORT, in_reqid, |
| ALGO_CBC_AES_256, ENCRYPTION_KEY, |
| ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl) |
| |
| # Uncomment for debugging. |
| # subprocess.call("ip xfrm state".split()) |
| |
| # Now send a packet. |
| s.sendto("foo", (remoteaddr, 53)) |
| srcport = s.getsockname()[1] |
| # s.send("foo") # TODO: WHY DOES THIS NOT WORK? |
| |
| # Expect to see an UDP encapsulated packet. |
| packets = self.ReadAllPacketsOn(netid) |
| self.assertEquals(1, len(packets)) |
| packet = packets[0] |
| self.assertIsUdpEncapEsp(packet, out_spi, 1, 52) |
| |
| # Now test the receive path. Because we don't know how to decrypt packets, |
| # we just play back the encrypted packet that kernel sent earlier. We swap |
| # the addresses in the IP header to make the packet look like it's bound for |
| # us, but we can't do that for the port numbers because the UDP header is |
| # part of the integrity protected payload, which we can only replay as is. |
| # So the source and destination ports are swapped and the packet appears to |
| # be sent from srcport to port 53. Open another socket on that port, and |
| # apply the inbound policy to it. |
| twisted_socket = socket(AF_INET, SOCK_DGRAM, 0) |
| net_test.SetSocketTimeout(twisted_socket, 100) |
| twisted_socket.bind(("0.0.0.0", 53)) |
| |
| # TODO: why does this work even without the per-socket policy applied? The |
| # received packet obviously matches an SA, but don't inbound packets need to |
| # match a policy as well? |
| info.dir = xfrm.XFRM_POLICY_IN |
| xfrmid.spi = in_spi |
| usertmpl.reqid = in_reqid |
| data = info.Pack() + usertmpl.Pack() |
| twisted_socket.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data) |
| |
| # Save the payload of the packet so we can replay it back to ourselves. |
| payload = str(packet.payload)[8:] |
| spi_seq = struct.pack("!II", ntohl(in_spi), 1) |
| payload = spi_seq + payload[len(spi_seq):] |
| |
| # Tamper with the packet and check that it's dropped and counted as invalid. |
| sainfo = self.xfrm.FindSaInfo(in_spi) |
| self.assertEquals(0, sainfo.stats.integrity_failed) |
| broken = payload[:25] + chr((ord(payload[25]) + 1) % 256) + payload[26:] |
| incoming = (scapy.IP(src=remoteaddr, dst=myaddr) / |
| scapy.UDP(sport=4500, dport=encap_port) / broken) |
| self.ReceivePacketOn(netid, incoming) |
| sainfo = self.xfrm.FindSaInfo(in_spi) |
| self.assertEquals(1, sainfo.stats.integrity_failed) |
| |
| # Now play back the valid packet and check that we receive it. |
| incoming = (scapy.IP(src=remoteaddr, dst=myaddr) / |
| scapy.UDP(sport=4500, dport=encap_port) / payload) |
| self.ReceivePacketOn(netid, incoming) |
| data, src = twisted_socket.recvfrom(4096) |
| self.assertEquals("foo", data) |
| self.assertEquals((remoteaddr, srcport), src) |
| |
| # Check that unencrypted packets are not received. |
| unencrypted = (scapy.IP(src=remoteaddr, dst=myaddr) / |
| scapy.UDP(sport=srcport, dport=53) / "foo") |
| self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |