| # |
| # Copyright 2021 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import random |
| |
| from acts import asserts |
| from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT |
| from acts.test_decorators import test_tracker_info |
| from acts_contrib.test_utils.net import connectivity_const as cconst |
| from acts_contrib.test_utils.net import connectivity_test_utils as cutils |
| from acts_contrib.test_utils.wifi import wifi_test_utils as wutils |
| from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest |
| from scapy.all import rdpcap, DNSRR, DNSQR, IP, IPv6 |
| |
| |
| WLAN = "wlan0" |
| PING_ADDR = "google.com" |
| |
| |
| class DNSTest(WifiBaseTest): |
| """DNS related test for Android.""" |
| |
| def setup_class(self): |
| self.dut = self.android_devices[0] |
| wutils.wifi_test_device_init(self.dut) |
| |
| req_params = [] |
| opt_param = ["wifi_network", "configure_OpenWrt"] |
| self.unpack_userparams( |
| req_param_names=req_params, opt_param_names=opt_param) |
| |
| asserts.assert_true(OPENWRT in self.user_params, |
| "OpenWrtAP is not in testbed.") |
| self.openwrt = self.access_points[0] |
| if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip": |
| self.dut.log.info("Skip configure Wifi interface due to config setup.") |
| else: |
| self.configure_openwrt_ap_and_start(wpa_network=True) |
| self.wifi_network = self.openwrt.get_wifi_network() |
| |
| asserts.assert_true(self.openwrt.verify_wifi_status(), |
| "OpenWrt Wifi interface is not ready.") |
| |
| def teardown_class(self): |
| """Reset wifi to make sure VPN tears down cleanly.""" |
| wutils.reset_wifi(self.dut) |
| |
| def teardown_test(self): |
| """Reset wifi to make sure VPN tears down cleanly.""" |
| wutils.reset_wifi(self.dut) |
| |
| def ping(self, addr, ignore_status=True, timeout=60): |
| """Start a ping from DUT and return ping result. |
| |
| Args: |
| addr: Address to ping. |
| ignore_status: ignore non zero return. |
| timeout: cmd timeout. |
| Returns: |
| Boolean for ping result. |
| """ |
| return "100%" not in self.dut.adb.shell("ping -c 1 %s" % addr, |
| ignore_status=ignore_status, |
| timeout=timeout) |
| |
| def generate_query_qname(self): |
| """Return a random query name.""" |
| return "%s-ds.metric.gstatic.com" % random.randint(0, 99999999) |
| |
| def _block_dns_response_and_ping(self, test_qname): |
| """Block the DNS response and ping |
| |
| Args: |
| test_qname: Address to ping |
| Returns: |
| Packets for the ping result |
| """ |
| # Start tcpdump on OpenWrt |
| remote_pcap_path = \ |
| self.openwrt.network_setting.start_tcpdump(self.test_name) |
| self.dut.log.info("Test query name = %s" % test_qname) |
| # Block the DNS response only before sending the DNS query |
| self.openwrt.network_setting.block_dns_response() |
| # Start send a query |
| self.ping(test_qname) |
| # Un-block the DNS response right after DNS query |
| self.openwrt.network_setting.unblock_dns_response() |
| local_pcap_path = self.openwrt.network_setting.stop_tcpdump( |
| remote_pcap_path, self.dut.device_log_path) |
| self.dut.log.info("pcap file path : %s" % local_pcap_path) |
| # Check DNSQR.qname in tcpdump to verify device retransmit the query |
| packets = rdpcap(local_pcap_path) |
| return packets |
| |
| def _get_dnsqr_packets(self, packets, layer, qname): |
| """Filter the DNSQR packets with specific layer |
| |
| Args: |
| packets: Packets that came from rdpcap function |
| layer: Keep the packets that contains this layer |
| qname: Keep the packets that related to this qname |
| Returns: |
| List of filtered packets |
| """ |
| filtered_packets = [] |
| for pkt in packets: |
| if not pkt.haslayer(DNSQR): |
| continue |
| if pkt[DNSQR].qname.decode().strip(".") != qname: |
| continue |
| if pkt.haslayer(layer): |
| filtered_packets.append(pkt) |
| return filtered_packets |
| |
| @test_tracker_info(uuid="dd7b8c92-c0f4-4403-a0ae-57a703162d83") |
| def test_dns_query(self): |
| # Setup environment |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| # Start tcpdump on OpenWrt |
| remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) |
| # Generate query name |
| test_qname = self.generate_query_qname() |
| self.dut.log.info("Test query name = %s" % test_qname) |
| # Start send a query |
| ping_result = self.ping(test_qname) |
| local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path, |
| self.dut.device_log_path) |
| # Check DNSRR.rrname in tcpdump to verify DNS response |
| packets = rdpcap(local_pcap_path) |
| self.dut.log.info("pcap file path : %s" % local_pcap_path) |
| pkt_count = 0 |
| for pkt in packets: |
| if pkt.haslayer(DNSRR) and pkt[DNSRR].rrname.decode().strip(".") == test_qname: |
| pkt_count = pkt_count + 1 |
| self.dut.log.info("DNS query response count : %s" % pkt_count) |
| if not ping_result: |
| asserts.assert_true(pkt_count > 0, |
| "Did not find match standard query response in tcpdump.") |
| asserts.assert_true(ping_result, "Device ping fail.") |
| |
| @test_tracker_info(uuid="cd20c6e7-9c2e-4286-b08e-c8e40e413da5") |
| def test_dns_query_retransmit(self): |
| # Setup environment |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| test_qname = self.generate_query_qname() |
| packets = self._block_dns_response_and_ping(test_qname) |
| pkts = self._get_dnsqr_packets(packets, IP, test_qname) |
| pkts6 = self._get_dnsqr_packets(packets, IPv6, test_qname) |
| self.dut.log.info("IPv4 DNS query count : %s" % len(pkts)) |
| self.dut.log.info("IPv6 DNS query count : %s" % len(pkts6)) |
| asserts.assert_true(len(pkts) >= 2 or len(pkts6) >= 2, |
| "Did not find match standard query in tcpdump.") |
| |
| @test_tracker_info(uuid="5f58775d-ee7b-4d2e-8e77-77d41e821415") |
| def test_private_dns_query_retransmit(self): |
| # set private DNS mode |
| cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_STRICT) |
| |
| # Setup environment |
| wutils.connect_to_wifi_network(self.dut, self.wifi_network) |
| test_qname = self.generate_query_qname() |
| packets = self._block_dns_response_and_ping(test_qname) |
| pkts = self._get_dnsqr_packets(packets, IP, test_qname) |
| pkts6 = self._get_dnsqr_packets(packets, IPv6, test_qname) |
| self.dut.log.info("IPv4 DNS query count : %s" % len(pkts)) |
| self.dut.log.info("IPv6 DNS query count : %s" % len(pkts6)) |
| asserts.assert_true(len(pkts) >= 2 or len(pkts6) >= 2, |
| "Did not find match standard query in tcpdump.") |
| |