Add test to exercise the tcp_nuke_addr functionality
Change-Id: I5da6a2ae4638a8405c46a03911d1257ca58cc072
diff --git a/net/test/tcp_nuke_addr_test.py b/net/test/tcp_nuke_addr_test.py
new file mode 100755
index 0000000..82116b6
--- /dev/null
+++ b/net/test/tcp_nuke_addr_test.py
@@ -0,0 +1,122 @@
+#!/usr/bin/python
+#
+# Copyright 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import fcntl
+import os
+import socket
+import struct
+import threading
+import time
+import unittest
+
+
+IPV4_LOOPBACK_ADDR = '127.0.0.1'
+IPV6_LOOPBACK_ADDR = '::1'
+
+SIOCKILLADDR = 0x8939
+
+DEFAULT_TCP_PORT = 8001
+DEFAULT_BUFFER_SIZE = 20
+DEFAULT_TEST_MESSAGE = "TCP NUKE ADDR TEST"
+
+
+@contextlib.contextmanager
+def RunInBackground(thread):
+ """Starts a thread and waits until it joins.
+
+ Args:
+ thread: A not yet started threading.Thread object.
+ """
+ try:
+ thread.start()
+ yield thread
+ finally:
+ thread.join()
+
+
+def TcpAcceptAndReceive(listening_sock, buffer_size=DEFAULT_BUFFER_SIZE):
+ """Accepts a single connection and blocks receiving data from it.
+
+ Args:
+ listening_socket: A socket in LISTEN state.
+ buffer_size: Size of buffer where to read a message.
+ """
+ connection, _ = listening_sock.accept()
+ with contextlib.closing(connection):
+ _ = connection.recv(buffer_size)
+
+
+def ExchangeMessage(addr_family, ip_addr, tcp_port,
+ message=DEFAULT_TEST_MESSAGE):
+ """Creates a listening socket, accepts a connection and sends data to it.
+
+ Args:
+ addr_family: The address family (e.g. AF_INET6).
+ ip_addr: The IP address (IPv4 or IPv6 depending on the addr_family).
+ tcp_port: The TCP port to listen on.
+ message: The message to send on the socket.
+ """
+ test_addr = (ip_addr, tcp_port)
+ with contextlib.closing(
+ socket.socket(addr_family, socket.SOCK_STREAM)) as listening_socket:
+ listening_socket.bind(test_addr)
+ listening_socket.listen(1)
+ with RunInBackground(threading.Thread(target=TcpAcceptAndReceive,
+ args=(listening_socket,))):
+ with contextlib.closing(
+ socket.socket(addr_family, socket.SOCK_STREAM)) as client_socket:
+ client_socket.connect(test_addr)
+ client_socket.send(message)
+
+
+def KillAddrIoctl(addr_family):
+ """Calls the SIOCKILLADDR on IPv6 address family.
+
+ Args:
+ addr_family: The address family (e.g. AF_INET6).
+
+ Raises:
+ ValueError: If the address family is invalid for the ioctl.
+ """
+ if addr_family == socket.AF_INET6:
+ ifreq = struct.pack('BBBBBBBBBBBBBBBBIi',
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
+ 128, 1)
+ elif addr_family == socket.AF_INET:
+ raise NotImplementedError('Support for IPv4 not implemented yet.')
+ else:
+ raise ValueError('Address family %r not supported.' % addr_family)
+ datagram_socket = socket.socket(addr_family, socket.SOCK_DGRAM)
+ fcntl.ioctl(datagram_socket.fileno(), SIOCKILLADDR, ifreq)
+ datagram_socket.close()
+
+
+class TcpNukeAddrTest(unittest.TestCase):
+
+ def testIPv6KillAddr(self):
+ """Tests that SIOCKILLADDR works as expected.
+
+ Relevant kernel commits:
+ https://www.codeaurora.org/cgit/quic/la/kernel/msm-3.18/commit/net/ipv4/tcp.c?h=aosp/android-3.10&id=1dcd3a1fa2fe78251cc91700eb1d384ab02e2dd6
+ """
+ ExchangeMessage(socket.AF_INET6, IPV6_LOOPBACK_ADDR, DEFAULT_TCP_PORT)
+ KillAddrIoctl(socket.AF_INET6)
+ # Test passes if kernel does not crash.
+
+
+if __name__ == "__main__":
+ unittest.main()