Merge "Add Fuchsia DHCP Duplicate Address Test"
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py
index 0353b59..21e26bb 100755
--- a/acts/framework/acts/controllers/access_point.py
+++ b/acts/framework/acts/controllers/access_point.py
@@ -128,6 +128,13 @@
         additional_ap_parameters: Additional parameters to send the AP.
         password: Password to connect to WLAN if necessary.
         check_connectivity: Whether to check for internet connectivity.
+
+    Returns:
+        An identifier for each ssid being started. These identifiers can be
+        used later by this controller to control the ap.
+
+    Raises:
+        Error: When the ap can't be brought up.
     """
     ap = hostapd_ap_preset.create_ap_preset(profile_name=profile_name,
                                             iface_wlan_2g=access_point.wlan_2g,
@@ -148,9 +155,10 @@
                                             n_capabilities=n_capabilities,
                                             ac_capabilities=ac_capabilities,
                                             vht_bandwidth=vht_bandwidth)
-    access_point.start_ap(hostapd_config=ap,
-                          setup_bridge=setup_bridge,
-                          additional_parameters=additional_ap_parameters)
+    return access_point.start_ap(
+        hostapd_config=ap,
+        setup_bridge=setup_bridge,
+        additional_parameters=additional_ap_parameters)
 
 
 class Error(Exception):
diff --git a/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py b/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py
index d23d226..783f9d0 100644
--- a/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py
+++ b/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py
@@ -14,6 +14,7 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
+import ipaddress
 import itertools
 import time
 import re
@@ -25,6 +26,7 @@
 from acts.controllers.ap_lib import hostapd_constants
 from acts.controllers.ap_lib.hostapd_security import Security
 from acts.controllers.ap_lib.hostapd_utils import generate_random_password
+from acts.controllers.utils_lib.commands import ip
 from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device
 from acts_contrib.test_utils.abstract_devices.wlan_device_lib.AbstractDeviceWlanDeviceBaseTest import AbstractDeviceWlanDeviceBaseTest
 from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
@@ -100,16 +102,19 @@
         target_security = hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get(
             security_mode)
 
-        setup_ap(access_point=self.access_point,
-                 profile_name='whirlwind',
-                 mode=hostapd_constants.MODE_11N_MIXED,
-                 channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
-                 n_capabilities=[],
-                 ac_capabilities=[],
-                 force_wmm=True,
-                 ssid=ssid,
-                 security=security_profile,
-                 password=password)
+        ap_ids = setup_ap(access_point=self.access_point,
+                          profile_name='whirlwind',
+                          mode=hostapd_constants.MODE_11N_MIXED,
+                          channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
+                          n_capabilities=[],
+                          ac_capabilities=[],
+                          force_wmm=True,
+                          ssid=ssid,
+                          security=security_profile,
+                          password=password)
+
+        if len(ap_ids) > 1:
+            raise Exception("Expected only one SSID on AP")
 
         configured_subnets = self.access_point.get_configured_subnets()
         if len(configured_subnets) > 1:
@@ -125,6 +130,7 @@
             'target_security': target_security,
             'ip': router_ip,
             'network': network,
+            'id': ap_ids[0],
         }
 
     def device_can_ping(self, dest_ip):
@@ -338,6 +344,93 @@
             dhcp_logs + "\n")
 
 
+class Dhcpv4DuplicateAddressTest(Dhcpv4InteropFixture):
+    def setup_test(self):
+        super().setup_test()
+        self.extra_addresses = []
+        self.ap_params = self.setup_ap()
+        self.ap_ip_cmd = ip.LinuxIpCommand(self.access_point.ssh)
+
+    def teardown_test(self):
+        super().teardown_test()
+        for ip in self.extra_addresses:
+            self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'], ip)
+            pass
+
+    def test_duplicate_address_assignment(self):
+        """It's possible for a DHCP server to assign an address that already exists on the network.
+        DHCP clients are expected to perform a "gratuitous ARP" of the to-be-assigned address, and
+        refuse to assign that address. Clients should also recover by asking for a different
+        address.
+        """
+        # Modify subnet to hold fewer addresses.
+        # A '/29' has 8 addresses (6 usable excluding router / broadcast)
+        subnet = next(self.ap_params['network'].subnets(new_prefix=29))
+        subnet_conf = dhcp_config.Subnet(
+            subnet=subnet,
+            router=self.ap_params['ip'],
+            # When the DHCP server is considering dynamically allocating an IP address to a client,
+            # it first sends an ICMP Echo request (a ping) to the address being assigned. It waits
+            # for a second, and if no ICMP Echo response has been heard, it assigns the address.
+            # If a response is heard, the lease is abandoned, and the server does not respond to
+            # the client.
+            # The ping-check configuration parameter can be used to control checking - if its value
+            # is false, no ping check is done.
+            additional_parameters=[('ping-check', 'false')])
+        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
+        self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
+
+        # Add each of the usable IPs as an alias for the router's interface, such that the router
+        # will respond to any pings on it.
+        for ip in subnet.hosts():
+            self.ap_ip_cmd.add_ipv4_address(self.ap_params['id'], ip)
+            # Ensure we remove the address in self.teardown_test() even if the test fails
+            self.extra_addresses.append(ip)
+
+        self.connect(ap_params=self.ap_params)
+        with asserts.assert_raises(ConnectionError):
+            self.get_device_ipv4_addr()
+
+        # Per spec, the flow should be:
+        # Discover -> Offer -> Request -> Ack -> client optionally performs DAD
+        dhcp_logs = self.access_point.get_dhcp_logs()
+        for expected_message in [
+                r'DHCPDISCOVER from \S+',
+                r'DHCPOFFER on [0-9.]+ to \S+',
+                r'DHCPREQUEST for [0-9.]+',
+                r'DHCPACK on [0-9.]+',
+                r'DHCPDECLINE of [0-9.]+ from \S+ via .*: abandoned',
+                r'Abandoning IP address [0-9.]+: declined',
+        ]:
+            asserts.assert_true(
+                re.search(expected_message, dhcp_logs),
+                f'Did not find expected message ({expected_message}) in dhcp logs: {dhcp_logs}'
+                + "\n")
+
+        # Remove each of the IP aliases.
+        # Note: this also removes the router's address (e.g. 192.168.1.1), so pinging the
+        # router after this will not work.
+        while self.extra_addresses:
+            self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'],
+                                               self.extra_addresses.pop())
+
+        # Now, we should get an address successfully
+        ip = self.get_device_ipv4_addr()
+        dhcp_logs = self.access_point.get_dhcp_logs()
+
+        expected_string = f'DHCPREQUEST for {ip}'
+        asserts.assert_true(
+            dhcp_logs.count(expected_string) >= 1,
+            f'Incorrect count of DHCP Requests ("{expected_string}") in logs: '
+            + dhcp_logs + "\n")
+
+        expected_string = f'DHCPACK on {ip}'
+        asserts.assert_true(
+            dhcp_logs.count(expected_string) >= 1,
+            f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' +
+            dhcp_logs + "\n")
+
+
 class Dhcpv4InteropCombinatorialOptionsTest(Dhcpv4InteropFixture):
     """DhcpV4 tests which validate combinations of DHCP options."""
     OPTION_DOMAIN_NAME = [('domain-name', 'example.invalid'),