| # |
| # Copyright 2021 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import time |
| |
| from acts import asserts |
| from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT |
| from acts.test_decorators import test_tracker_info |
| from acts_contrib.test_utils.net.net_test_utils import start_tcpdump |
| from acts_contrib.test_utils.net.net_test_utils import stop_tcpdump |
| from acts_contrib.test_utils.wifi import wifi_test_utils as wutils |
| from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest |
| from scapy.all import rdpcap, DHCP, IPv6 |
| from scapy.layers.inet6 import ICMPv6ND_NA as NA |
| |
| WLAN = "wlan0" |
| PING_ADDR = "google.com" |
| RAPID_COMMIT_OPTION = (80, b'') |
| DEFAULT_IPV6_ALLROUTERS = "ff02::2" |
| |
| |
| class DhcpTest(WifiBaseTest): |
| """DHCP related test for Android.""" |
| |
| def setup_class(self): |
| self.dut = self.android_devices[0] |
| |
| wutils.wifi_test_device_init(self.dut) |
| req_params = [] |
| opt_param = ["wifi_network", "configure_OpenWrt"] |
| self.unpack_userparams( |
| req_param_names=req_params, opt_param_names=opt_param) |
| asserts.assert_true(OPENWRT in self.user_params, |
| "OpenWrtAP is not in testbed.") |
| |
| self.openwrt = self.access_points[0] |
| if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip": |
| self.dut.log.info("Skip configure Wifi interface due to config setup.") |
| else: |
| self.configure_openwrt_ap_and_start(wpa_network=True) |
| self.wifi_network = self.openwrt.get_wifi_network() |
| self.openwrt.network_setting.setup_ipv6_bridge() |
| asserts.assert_true(self.openwrt.verify_wifi_status(), |
| "OpenWrt Wifi interface is not ready.") |
| |
| def teardown_class(self): |
| """Reset wifi and stop tcpdump cleanly.""" |
| wutils.reset_wifi(self.dut) |
| self.openwrt.network_setting.clear_tcpdump() |
| |
| def teardown_test(self): |
| """Reset wifi to make sure DUT tears down cleanly.""" |
| wutils.reset_wifi(self.dut) |
| |
| def _verify_ping(self, option="", dest=PING_ADDR): |
| try: |
| out = self.dut.adb.shell("ping%s -c1 %s" % (option, dest)) |
| return "100%" not in out |
| except Exception as e: |
| self.dut.log.debug(e) |
| return False |
| |
| def _verify_device_address(self, ipv4=True, ipv6=True, timeout=15): |
| """Verify device get assign address on wireless interface.""" |
| current_time = time.time() |
| while time.time() < current_time + timeout: |
| try: |
| if ipv4: |
| ipv4_addr = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0] |
| self.dut.log.info("ipv4_address is %s" % ipv4_addr) |
| if ipv6: |
| ipv6_addr = self.dut.droid.connectivityGetIPv6Addresses(WLAN)[0] |
| self.dut.log.info("ipv6_address is %s" % ipv6_addr) |
| return True |
| except: |
| time.sleep(1) |
| return False |
| |
| def verify_dhcp_packet(self, packets, support_rapid_commit): |
| for pkt in packets: |
| if pkt.haslayer(DHCP): |
| # Remove dhcp discover checking since rapid commit enable by default(aosp/2943087). |
| if pkt[DHCP].options[0][1] == 2: |
| asserts.assert_true(not support_rapid_commit, |
| "Should not find DHCP OFFER when RAPID_COMMIT_OPTION supported.") |
| elif pkt[DHCP].options[0][1] == 3: |
| asserts.assert_true(not support_rapid_commit, |
| "Should not find DHCP REQUEST when RAPID_COMMIT_OPTION supported.") |
| elif pkt[DHCP].options[0][1] == 5: |
| send_option = RAPID_COMMIT_OPTION in pkt[DHCP].options |
| asserts.assert_true(send_option == support_rapid_commit, |
| "Unexpected result in DHCP ACK.") |
| |
| def verify_gratuitous_na(self, packets): |
| ipv6localaddress = self.dut.droid.connectivityGetLinkLocalIpv6Address(WLAN).strip("%wlan0") |
| self.dut.log.info("Device local address : %s" % ipv6localaddress) |
| ipv6globaladdress = sorted(self.dut.droid.connectivityGetIPv6Addresses(WLAN)) |
| self.dut.log.info("Device global address : %s" % ipv6globaladdress) |
| target_address = [] |
| for pkt in packets: |
| if pkt.haslayer(NA) and pkt.haslayer(IPv6) and pkt[IPv6].src == ipv6localaddress\ |
| and pkt[IPv6].dst == DEFAULT_IPV6_ALLROUTERS: |
| # broadcast global address |
| target_address.append(pkt.tgt) |
| self.dut.log.info("Broadcast target address : %s" % target_address) |
| asserts.assert_equal(ipv6globaladdress, sorted(target_address), |
| "Target address from NA is not match to device ipv6 address.") |
| |
| @test_tracker_info(uuid="01148659-6a3d-4a74-88b6-04b19c4acaaa") |
| def test_ipv4_ipv6_network(self): |
| """Verify device can get both ipv4 ipv6 address.""" |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| |
| asserts.assert_true(self._verify_device_address(), |
| "Fail to get ipv4/ipv6 address.") |
| asserts.assert_true(self._verify_ping(), "Fail to ping on ipv4.") |
| asserts.assert_true(self._verify_ping("6"), "Fail to ping on ipv6.") |
| |
| @test_tracker_info(uuid="d3f37ba7-504e-48fc-95be-6eca9a148e4a") |
| def test_ipv6_only_prefer_option(self): |
| """Verify DUT can only get ipv6 address and ping out.""" |
| self.openwrt.network_setting.add_ipv6_prefer_option() |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| |
| asserts.assert_true(self._verify_device_address(ipv4=False), |
| "Fail to get ipv6 address.") |
| asserts.assert_false(self._verify_ping(), |
| "Should not ping on success on ipv4.") |
| asserts.assert_true(self._verify_ping("6"), |
| "Fail to ping on ipv6.") |
| self.openwrt.network_setting.remove_ipv6_prefer_option() |
| |
| @test_tracker_info(uuid="a16f2a3c-e3ca-4fca-b3ee-bccb5cf34bab") |
| def test_dhcp_rapid_commit(self): |
| """Verify DUT can run with rapid commit on IPv4.""" |
| self.dut.adb.shell("device_config put connectivity dhcp_rapid_commit_version 1") |
| self.openwrt.network_setting.add_dhcp_rapid_commit() |
| remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| local_pcap_path = self.openwrt.network_setting.stop_tcpdump( |
| remote_pcap_path, self.dut.device_log_path) |
| self.dut.log.info("pcap file path : %s" % local_pcap_path) |
| packets = rdpcap(local_pcap_path) |
| self.verify_dhcp_packet(packets, True) |
| self.openwrt.network_setting.remove_dhcp_rapid_commit() |
| |
| @test_tracker_info(uuid="cddb3d33-e5ef-4efd-8ae5-1325010a05c8") |
| def test_dhcp_4_way_handshake(self): |
| """Verify DUT can run with rapid commit on IPv4.""" |
| self.dut.adb.shell("device_config put connectivity dhcp_rapid_commit_version 0") |
| remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| local_pcap_path = self.openwrt.network_setting.stop_tcpdump( |
| remote_pcap_path, self.dut.device_log_path) |
| self.dut.log.info("pcap file path : %s" % local_pcap_path) |
| packets = rdpcap(local_pcap_path) |
| self.verify_dhcp_packet(packets, False) |
| |
| @test_tracker_info(uuid="69fd9619-db35-406a-96e2-8425f8f5e8bd") |
| def test_gratuitous_na(self): |
| """Verify DUT will send NA after ipv6 address set.""" |
| self.dut.adb.shell("device_config put connectivity ipclient_gratuitous_na_version 1") |
| remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) |
| self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| local_pcap_path = self.openwrt.network_setting.stop_tcpdump( |
| remote_pcap_path, self.dut.device_log_path) |
| stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) |
| self.dut.log.info("pcap file path : %s" % local_pcap_path) |
| packets = rdpcap(local_pcap_path) |
| self.verify_gratuitous_na(packets) |