blob: 24029f4549ed974b38f3b7aba56a6b445d525494 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import time
import re
from acts import asserts
from acts import utils
from acts.controllers.access_point import setup_ap, AccessPoint
from acts.controllers.ap_lib import dhcp_config
from acts.controllers.ap_lib import hostapd_constants
from acts.controllers.ap_lib.hostapd_security import Security
from acts.controllers.ap_lib.hostapd_utils import generate_random_password
from acts.controllers.utils_lib.commands import ip
from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
class Dhcpv4InteropFixture(WifiBaseTest):
"""Test helpers for validating DHCPv4 Interop
Test Bed Requirement:
* One Android device or Fuchsia device
* One Access Point
"""
access_point: AccessPoint
def setup_class(self):
super().setup_class()
if 'dut' in self.user_params:
if self.user_params['dut'] == 'fuchsia_devices':
self.dut = create_wlan_device(self.fuchsia_devices[0])
elif self.user_params['dut'] == 'android_devices':
self.dut = create_wlan_device(self.android_devices[0])
else:
raise ValueError('Invalid DUT specified in config. (%s)' %
self.user_params['dut'])
else:
# Default is an android device, just like the other tests
self.dut = create_wlan_device(self.android_devices[0])
self.access_point = self.access_points[0]
self.access_point.stop_all_aps()
def setup_test(self):
if hasattr(self, "android_devices"):
for ad in self.android_devices:
ad.droid.wakeLockAcquireBright()
ad.droid.wakeUpNow()
self.dut.wifi_toggle_state(True)
def teardown_test(self):
if hasattr(self, "android_devices"):
for ad in self.android_devices:
ad.droid.wakeLockRelease()
ad.droid.goToSleepNow()
self.dut.turn_location_off_and_scan_toggle_off()
self.dut.disconnect()
self.dut.reset_wifi()
self.access_point.stop_all_aps()
def connect(self, ap_params):
asserts.assert_true(
self.dut.associate(ap_params['ssid'],
target_pwd=ap_params['password'],
target_security=ap_params['target_security']),
'Failed to connect.')
def setup_ap(self):
"""Generates a hostapd config and sets up the AP with that config.
Does not run a DHCP server.
Returns: A dictionary of information about the AP.
"""
ssid = utils.rand_ascii_str(20)
security_mode = hostapd_constants.WPA2_STRING
security_profile = Security(
security_mode=security_mode,
password=generate_random_password(length=20),
wpa_cipher='CCMP',
wpa2_cipher='CCMP')
password = security_profile.password
target_security = hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get(
security_mode)
ap_ids = setup_ap(access_point=self.access_point,
profile_name='whirlwind',
mode=hostapd_constants.MODE_11N_MIXED,
channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
n_capabilities=[],
ac_capabilities=[],
force_wmm=True,
ssid=ssid,
security=security_profile,
password=password)
if len(ap_ids) > 1:
raise Exception("Expected only one SSID on AP")
configured_subnets = self.access_point.get_configured_subnets()
if len(configured_subnets) > 1:
raise Exception("Expected only one subnet on AP")
router_ip = configured_subnets[0].router
network = configured_subnets[0].network
self.access_point.stop_dhcp()
return {
'ssid': ssid,
'password': password,
'target_security': target_security,
'ip': router_ip,
'network': network,
'id': ap_ids[0],
}
def device_can_ping(self, dest_ip):
"""Checks if the DUT can ping the given address.
Returns: True if can ping, False otherwise"""
self.log.info('Attempting to ping %s...' % dest_ip)
ping_result = self.dut.can_ping(dest_ip, count=2)
if ping_result:
self.log.info('Success pinging: %s' % dest_ip)
else:
self.log.info('Failure pinging: %s' % dest_ip)
return ping_result
def get_device_ipv4_addr(self, interface=None, timeout=20):
"""Checks if device has an ipv4 private address. Sleeps 1 second between
retries.
Args:
interface: string, name of interface from which to get ipv4 address.
Raises:
ConnectionError, if DUT does not have an ipv4 address after all
timeout.
Returns:
The device's IP address
"""
self.log.debug('Fetching updated WLAN interface list')
if interface is None:
interface = self.dut.device.wlan_client_test_interface_name
self.log.info(
'Checking if DUT has received an ipv4 addr on iface %s. Will retry for %s '
'seconds.' % (interface, timeout))
timeout = time.time() + timeout
while time.time() < timeout:
ip_addrs = self.dut.device.get_interface_ip_addresses(interface)
if len(ip_addrs['ipv4_private']) > 0:
ip = ip_addrs['ipv4_private'][0]
self.log.info('DUT has an ipv4 address: %s' % ip)
return ip
else:
self.log.debug(
'DUT does not yet have an ipv4 address...retrying in 1 '
'second.')
time.sleep(1)
else:
raise ConnectionError('DUT failed to get an ipv4 address.')
def run_test_case_expect_dhcp_success(self, settings):
"""Starts the AP and DHCP server, and validates that the client
connects and obtains an address.
Args:
settings: a dictionary containing:
dhcp_parameters: a dictionary of DHCP parameters
dhcp_options: a dictionary of DHCP options
"""
ap_params = self.setup_ap()
subnet_conf = dhcp_config.Subnet(
subnet=ap_params['network'],
router=ap_params['ip'],
additional_parameters=settings['dhcp_parameters'],
additional_options=settings['dhcp_options'])
dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
self.log.debug('DHCP Configuration:\n' +
dhcp_conf.render_config_file() + "\n")
dhcp_logs_before = self.access_point.get_dhcp_logs().split('\n')
self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
self.connect(ap_params=ap_params)
# Typical log lines look like:
# dhcpd[26695]: DHCPDISCOVER from f8:0f:f9:3d:ce:d1 via wlan1
# dhcpd[26695]: DHCPOFFER on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1
# dhcpd[26695]: DHCPREQUEST for 192.168.9.2 (192.168.9.1) from f8:0f:f9:3d:ce:d1 via wlan1
# dhcpd[26695]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1
try:
ip = self.get_device_ipv4_addr()
except ConnectionError:
self.log.warn(dhcp_logs)
asserts.fail(f'DUT failed to get an IP address')
# Get updates to DHCP logs
dhcp_logs = self.access_point.get_dhcp_logs()
for line in dhcp_logs_before:
dhcp_logs = dhcp_logs.replace(line, '')
expected_string = f'DHCPDISCOVER from'
asserts.assert_equal(
dhcp_logs.count(expected_string), 1,
f'Incorrect count of DHCP Discovers ("{expected_string}") in logs:\n'
+ dhcp_logs + "\n")
expected_string = f'DHCPOFFER on {ip}'
asserts.assert_equal(
dhcp_logs.count(expected_string), 1,
f'Incorrect count of DHCP Offers ("{expected_string}") in logs:\n'
+ dhcp_logs + "\n")
expected_string = f'DHCPREQUEST for {ip}'
asserts.assert_true(
dhcp_logs.count(expected_string) >= 1,
f'Incorrect count of DHCP Requests ("{expected_string}") in logs: '
+ dhcp_logs + "\n")
expected_string = f'DHCPACK on {ip}'
asserts.assert_true(
dhcp_logs.count(expected_string) >= 1,
f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' +
dhcp_logs + "\n")
asserts.assert_true(self.device_can_ping(ap_params['ip']),
f'DUT failed to ping router at {ap_params["ip"]}')
class Dhcpv4InteropFixtureTest(Dhcpv4InteropFixture):
"""Tests which validate the behavior of the Dhcpv4InteropFixture.
In theory, these are more similar to unit tests than ACTS tests, but
since they interact with hardware (specifically, the AP), we have to
write and run them like the rest of the ACTS tests."""
def test_invalid_options_not_accepted(self):
"""Ensures the DHCP server doesn't accept invalid options"""
ap_params = self.setup_ap()
subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'],
router=ap_params['ip'],
additional_options={'foo': 'bar'})
dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
with asserts.assert_raises_regex(Exception, r'failed to start'):
self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
def test_invalid_parameters_not_accepted(self):
"""Ensures the DHCP server doesn't accept invalid parameters"""
ap_params = self.setup_ap()
subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'],
router=ap_params['ip'],
additional_parameters={'foo': 'bar'})
dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
with asserts.assert_raises_regex(Exception, r'failed to start'):
self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
def test_no_dhcp_server_started(self):
"""Validates that the test fixture does not start a DHCP server."""
ap_params = self.setup_ap()
self.connect(ap_params=ap_params)
with asserts.assert_raises(ConnectionError):
self.get_device_ipv4_addr()
class Dhcpv4InteropBasicTest(Dhcpv4InteropFixture):
"""DhcpV4 tests which validate basic DHCP client/server interactions."""
def test_basic_dhcp_assignment(self):
self.run_test_case_expect_dhcp_success(settings={
'dhcp_options': {},
'dhcp_parameters': {}
})
def test_pool_allows_unknown_clients(self):
self.run_test_case_expect_dhcp_success(settings={
'dhcp_options': {},
'dhcp_parameters': {
'allow': 'unknown-clients'
}
})
def test_pool_disallows_unknown_clients(self):
ap_params = self.setup_ap()
subnet_conf = dhcp_config.Subnet(
subnet=ap_params['network'],
router=ap_params['ip'],
additional_parameters={'deny': 'unknown-clients'})
dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
self.connect(ap_params=ap_params)
with asserts.assert_raises(ConnectionError):
self.get_device_ipv4_addr()
dhcp_logs = self.access_point.get_dhcp_logs()
asserts.assert_true(
re.search(r'DHCPDISCOVER from .*no free leases', dhcp_logs),
"Did not find expected message in dhcp logs: " + dhcp_logs + "\n")
def test_lease_renewal(self):
"""Validates that a client renews their DHCP lease."""
LEASE_TIME = 30
ap_params = self.setup_ap()
subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'],
router=ap_params['ip'])
dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf],
default_lease_time=LEASE_TIME,
max_lease_time=LEASE_TIME)
self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
self.connect(ap_params=ap_params)
ip = self.get_device_ipv4_addr()
dhcp_logs_before = self.access_point.get_dhcp_logs()
SLEEP_TIME = LEASE_TIME + 3
self.log.info(f'Sleeping {SLEEP_TIME}s to await DHCP renewal')
time.sleep(SLEEP_TIME)
dhcp_logs_after = self.access_point.get_dhcp_logs()
dhcp_logs = dhcp_logs_after.replace(dhcp_logs_before, '')
# Fuchsia renews at LEASE_TIME / 2, so there should be at least 2 DHCPREQUESTs in logs.
# The log lines look like:
# INFO dhcpd[17385]: DHCPREQUEST for 192.168.9.2 from f8:0f:f9:3d:ce:d1 via wlan1
# INFO dhcpd[17385]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1
expected_string = f'DHCPREQUEST for {ip}'
asserts.assert_true(
dhcp_logs.count(expected_string) >= 2,
f'Not enough DHCP renewals ("{expected_string}") in logs: ' +
dhcp_logs + "\n")
class Dhcpv4DuplicateAddressTest(Dhcpv4InteropFixture):
def setup_test(self):
super().setup_test()
self.extra_addresses = []
self.ap_params = self.setup_ap()
self.ap_ip_cmd = ip.LinuxIpCommand(self.access_point.ssh)
def teardown_test(self):
super().teardown_test()
for ip in self.extra_addresses:
self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'], ip)
def test_duplicate_address_assignment(self):
"""It's possible for a DHCP server to assign an address that already exists on the network.
DHCP clients are expected to perform a "gratuitous ARP" of the to-be-assigned address, and
refuse to assign that address. Clients should also recover by asking for a different
address.
"""
# Modify subnet to hold fewer addresses.
# A '/29' has 8 addresses (6 usable excluding router / broadcast)
subnet = next(self.ap_params['network'].subnets(new_prefix=29))
subnet_conf = dhcp_config.Subnet(
subnet=subnet,
router=self.ap_params['ip'],
# When the DHCP server is considering dynamically allocating an IP address to a client,
# it first sends an ICMP Echo request (a ping) to the address being assigned. It waits
# for a second, and if no ICMP Echo response has been heard, it assigns the address.
# If a response is heard, the lease is abandoned, and the server does not respond to
# the client.
# The ping-check configuration parameter can be used to control checking - if its value
# is false, no ping check is done.
additional_parameters={'ping-check': 'false'})
dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
# Add each of the usable IPs as an alias for the router's interface, such that the router
# will respond to any pings on it.
for ip in subnet.hosts():
self.ap_ip_cmd.add_ipv4_address(self.ap_params['id'], ip)
# Ensure we remove the address in self.teardown_test() even if the test fails
self.extra_addresses.append(ip)
self.connect(ap_params=self.ap_params)
with asserts.assert_raises(ConnectionError):
self.get_device_ipv4_addr()
# Per spec, the flow should be:
# Discover -> Offer -> Request -> Ack -> client optionally performs DAD
dhcp_logs = self.access_point.get_dhcp_logs()
for expected_message in [
r'DHCPDISCOVER from \S+',
r'DHCPOFFER on [0-9.]+ to \S+',
r'DHCPREQUEST for [0-9.]+',
r'DHCPACK on [0-9.]+',
r'DHCPDECLINE of [0-9.]+ from \S+ via .*: abandoned',
r'Abandoning IP address [0-9.]+: declined',
]:
asserts.assert_true(
re.search(expected_message, dhcp_logs),
f'Did not find expected message ({expected_message}) in dhcp logs: {dhcp_logs}'
+ "\n")
# Remove each of the IP aliases.
# Note: this also removes the router's address (e.g. 192.168.1.1), so pinging the
# router after this will not work.
while self.extra_addresses:
self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'],
self.extra_addresses.pop())
# Now, we should get an address successfully
ip = self.get_device_ipv4_addr()
dhcp_logs = self.access_point.get_dhcp_logs()
expected_string = f'DHCPREQUEST for {ip}'
asserts.assert_true(
dhcp_logs.count(expected_string) >= 1,
f'Incorrect count of DHCP Requests ("{expected_string}") in logs: '
+ dhcp_logs + "\n")
expected_string = f'DHCPACK on {ip}'
asserts.assert_true(
dhcp_logs.count(expected_string) >= 1,
f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' +
dhcp_logs + "\n")
class Dhcpv4InteropCombinatorialOptionsTest(Dhcpv4InteropFixture):
"""DhcpV4 tests which validate combinations of DHCP options."""
OPT_NUM_DOMAIN_SEARCH = 119
OPT_NUM_DOMAIN_NAME = 15
def setup_generated_tests(self):
self._generate_dhcp_options()
test_args = []
for test in self.DHCP_OPTIONS:
for option_list in self.DHCP_OPTIONS[test]:
test_args.append(({
'dhcp_options': option_list,
'dhcp_parameters': {}
}, ))
self.generate_tests(test_logic=self.run_test_case_expect_dhcp_success,
name_func=self.generate_test_name,
arg_sets=test_args)
def generate_test_name(self, settings):
return settings["dhcp_options"]["test_name"]
def _generate_dhcp_options(self):
self.DHCP_OPTIONS = {
'domain-name-tests': [{
'domain-name': '"example.invalid"',
'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_NAME,
'test_name': "test_domain_name_invalid_tld"
}, {
'domain-name': '"example.test"',
'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_NAME,
'test_name': "test_domain_name_valid_tld"
}],
'domain-search-tests': [{
'domain-search':
'"example.invalid"',
'dhcp-parameter-request-list':
self.OPT_NUM_DOMAIN_SEARCH,
'test_name':
"test_domain_search_invalid_tld"
}, {
'domain-search': '"example.test"',
'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_SEARCH,
'test_name': "test_domain_search_valid_tld"
}]
}
# The RFC limits DHCP payloads to 576 bytes unless the client signals it can handle larger
# payloads, which it does by sending DHCP option 57, "Maximum DHCP Message Size". Despite
# being able to accept larger payloads, clients typically don't advertise this.
# The test verifies that the client accepts a large message split across multiple ethernet
# frames.
# The test is created by sending many bytes of options through the domain-name-servers
# option, which is of unbounded length (though is compressed per RFC1035 section 4.1.4).
typical_ethernet_mtu = 1500
self.DHCP_OPTIONS['max-message-size-tests'] = []
long_dns_setting = ', '.join(
f'"ns{num}.example"'
for num in random.sample(range(100_000, 1_000_000), 250))
# RFC1035 compression means any shared suffix ('.example' in this case) will
# be deduplicated. Calculate approximate length by removing that suffix.
long_dns_setting_len = len(
long_dns_setting.replace(', ', '').replace('"', '').replace(
'.example', '').encode('utf-8'))
asserts.assert_true(
long_dns_setting_len > typical_ethernet_mtu,
"Expected to generate message greater than ethernet mtu")
self.DHCP_OPTIONS['max-message-size-tests'].append({
'dhcp-max-message-size':
long_dns_setting_len * 2,
'domain-search':
long_dns_setting,
'dhcp-parameter-request-list':
self.OPT_NUM_DOMAIN_SEARCH,
'test_name':
"test_max_sized_message",
})