Merge "Add Fuchsia DHCP Duplicate Address Test"
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py
index 0353b59..21e26bb 100755
--- a/acts/framework/acts/controllers/access_point.py
+++ b/acts/framework/acts/controllers/access_point.py
@@ -128,6 +128,13 @@
additional_ap_parameters: Additional parameters to send the AP.
password: Password to connect to WLAN if necessary.
check_connectivity: Whether to check for internet connectivity.
+
+ Returns:
+ An identifier for each ssid being started. These identifiers can be
+ used later by this controller to control the ap.
+
+ Raises:
+ Error: When the ap can't be brought up.
"""
ap = hostapd_ap_preset.create_ap_preset(profile_name=profile_name,
iface_wlan_2g=access_point.wlan_2g,
@@ -148,9 +155,10 @@
n_capabilities=n_capabilities,
ac_capabilities=ac_capabilities,
vht_bandwidth=vht_bandwidth)
- access_point.start_ap(hostapd_config=ap,
- setup_bridge=setup_bridge,
- additional_parameters=additional_ap_parameters)
+ return access_point.start_ap(
+ hostapd_config=ap,
+ setup_bridge=setup_bridge,
+ additional_parameters=additional_ap_parameters)
class Error(Exception):
diff --git a/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py b/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py
index d23d226..783f9d0 100644
--- a/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py
+++ b/acts_tests/tests/google/fuchsia/dhcp/Dhcpv4InteropTest.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import ipaddress
import itertools
import time
import re
@@ -25,6 +26,7 @@
from acts.controllers.ap_lib import hostapd_constants
from acts.controllers.ap_lib.hostapd_security import Security
from acts.controllers.ap_lib.hostapd_utils import generate_random_password
+from acts.controllers.utils_lib.commands import ip
from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device
from acts_contrib.test_utils.abstract_devices.wlan_device_lib.AbstractDeviceWlanDeviceBaseTest import AbstractDeviceWlanDeviceBaseTest
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
@@ -100,16 +102,19 @@
target_security = hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get(
security_mode)
- setup_ap(access_point=self.access_point,
- profile_name='whirlwind',
- mode=hostapd_constants.MODE_11N_MIXED,
- channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
- n_capabilities=[],
- ac_capabilities=[],
- force_wmm=True,
- ssid=ssid,
- security=security_profile,
- password=password)
+ ap_ids = setup_ap(access_point=self.access_point,
+ profile_name='whirlwind',
+ mode=hostapd_constants.MODE_11N_MIXED,
+ channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
+ n_capabilities=[],
+ ac_capabilities=[],
+ force_wmm=True,
+ ssid=ssid,
+ security=security_profile,
+ password=password)
+
+ if len(ap_ids) > 1:
+ raise Exception("Expected only one SSID on AP")
configured_subnets = self.access_point.get_configured_subnets()
if len(configured_subnets) > 1:
@@ -125,6 +130,7 @@
'target_security': target_security,
'ip': router_ip,
'network': network,
+ 'id': ap_ids[0],
}
def device_can_ping(self, dest_ip):
@@ -338,6 +344,93 @@
dhcp_logs + "\n")
+class Dhcpv4DuplicateAddressTest(Dhcpv4InteropFixture):
+ def setup_test(self):
+ super().setup_test()
+ self.extra_addresses = []
+ self.ap_params = self.setup_ap()
+ self.ap_ip_cmd = ip.LinuxIpCommand(self.access_point.ssh)
+
+ def teardown_test(self):
+ super().teardown_test()
+ for ip in self.extra_addresses:
+ self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'], ip)
+ pass
+
+ def test_duplicate_address_assignment(self):
+ """It's possible for a DHCP server to assign an address that already exists on the network.
+ DHCP clients are expected to perform a "gratuitous ARP" of the to-be-assigned address, and
+ refuse to assign that address. Clients should also recover by asking for a different
+ address.
+ """
+ # Modify subnet to hold fewer addresses.
+ # A '/29' has 8 addresses (6 usable excluding router / broadcast)
+ subnet = next(self.ap_params['network'].subnets(new_prefix=29))
+ subnet_conf = dhcp_config.Subnet(
+ subnet=subnet,
+ router=self.ap_params['ip'],
+ # When the DHCP server is considering dynamically allocating an IP address to a client,
+ # it first sends an ICMP Echo request (a ping) to the address being assigned. It waits
+ # for a second, and if no ICMP Echo response has been heard, it assigns the address.
+ # If a response is heard, the lease is abandoned, and the server does not respond to
+ # the client.
+ # The ping-check configuration parameter can be used to control checking - if its value
+ # is false, no ping check is done.
+ additional_parameters=[('ping-check', 'false')])
+ dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
+ self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
+
+ # Add each of the usable IPs as an alias for the router's interface, such that the router
+ # will respond to any pings on it.
+ for ip in subnet.hosts():
+ self.ap_ip_cmd.add_ipv4_address(self.ap_params['id'], ip)
+ # Ensure we remove the address in self.teardown_test() even if the test fails
+ self.extra_addresses.append(ip)
+
+ self.connect(ap_params=self.ap_params)
+ with asserts.assert_raises(ConnectionError):
+ self.get_device_ipv4_addr()
+
+ # Per spec, the flow should be:
+ # Discover -> Offer -> Request -> Ack -> client optionally performs DAD
+ dhcp_logs = self.access_point.get_dhcp_logs()
+ for expected_message in [
+ r'DHCPDISCOVER from \S+',
+ r'DHCPOFFER on [0-9.]+ to \S+',
+ r'DHCPREQUEST for [0-9.]+',
+ r'DHCPACK on [0-9.]+',
+ r'DHCPDECLINE of [0-9.]+ from \S+ via .*: abandoned',
+ r'Abandoning IP address [0-9.]+: declined',
+ ]:
+ asserts.assert_true(
+ re.search(expected_message, dhcp_logs),
+ f'Did not find expected message ({expected_message}) in dhcp logs: {dhcp_logs}'
+ + "\n")
+
+ # Remove each of the IP aliases.
+ # Note: this also removes the router's address (e.g. 192.168.1.1), so pinging the
+ # router after this will not work.
+ while self.extra_addresses:
+ self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'],
+ self.extra_addresses.pop())
+
+ # Now, we should get an address successfully
+ ip = self.get_device_ipv4_addr()
+ dhcp_logs = self.access_point.get_dhcp_logs()
+
+ expected_string = f'DHCPREQUEST for {ip}'
+ asserts.assert_true(
+ dhcp_logs.count(expected_string) >= 1,
+ f'Incorrect count of DHCP Requests ("{expected_string}") in logs: '
+ + dhcp_logs + "\n")
+
+ expected_string = f'DHCPACK on {ip}'
+ asserts.assert_true(
+ dhcp_logs.count(expected_string) >= 1,
+ f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' +
+ dhcp_logs + "\n")
+
+
class Dhcpv4InteropCombinatorialOptionsTest(Dhcpv4InteropFixture):
"""DhcpV4 tests which validate combinations of DHCP options."""
OPTION_DOMAIN_NAME = [('domain-name', 'example.invalid'),