| #!/usr/bin/env python3 |
| # |
| # Copyright 2016 Google, Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os |
| import re |
| import shutil |
| import time |
| |
| from retry import retry |
| |
| from collections import namedtuple |
| from enum import IntEnum |
| from queue import Empty |
| |
| from acts import asserts |
| from acts import context |
| from acts import signals |
| from acts import utils |
| from acts.controllers import attenuator |
| from acts.controllers.ap_lib import hostapd_security |
| from acts.controllers.ap_lib import hostapd_ap_preset |
| from acts.controllers.ap_lib.hostapd_constants import BAND_2G |
| from acts.controllers.ap_lib.hostapd_constants import BAND_5G |
| from acts_contrib.test_utils.net import connectivity_const as cconsts |
| from acts_contrib.test_utils.tel import tel_defines |
| from acts_contrib.test_utils.wifi import wifi_constants |
| from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils |
| |
| # Default timeout used for reboot, toggle WiFi and Airplane mode, |
| # for the system to settle down after the operation. |
| DEFAULT_TIMEOUT = 10 |
| # Number of seconds to wait for events that are supposed to happen quickly. |
| # Like onSuccess for start background scan and confirmation on wifi state |
| # change. |
| SHORT_TIMEOUT = 30 |
| ROAMING_TIMEOUT = 30 |
| WIFI_CONNECTION_TIMEOUT_DEFAULT = 30 |
| # Speed of light in m/s. |
| SPEED_OF_LIGHT = 299792458 |
| |
| DEFAULT_PING_ADDR = "https://www.google.com/robots.txt" |
| |
| CNSS_DIAG_CONFIG_PATH = "/data/vendor/wifi/cnss_diag/" |
| CNSS_DIAG_CONFIG_FILE = "cnss_diag.conf" |
| |
| ROAMING_ATTN = { |
| "AP1_on_AP2_off": [0, 0, 95, 95], |
| "AP1_off_AP2_on": [95, 95, 0, 0], |
| "default": [0, 0, 0, 0] |
| } |
| |
| |
| class WifiEnums(): |
| |
| SSID_KEY = "SSID" # Used for Wifi & SoftAp |
| SSID_PATTERN_KEY = "ssidPattern" |
| NETID_KEY = "network_id" |
| BSSID_KEY = "BSSID" # Used for Wifi & SoftAp |
| BSSID_PATTERN_KEY = "bssidPattern" |
| PWD_KEY = "password" # Used for Wifi & SoftAp |
| frequency_key = "frequency" |
| HIDDEN_KEY = "hiddenSSID" # Used for Wifi & SoftAp |
| IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired" |
| IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired" |
| IS_SUGGESTION_METERED = "isMetered" |
| PRIORITY = "priority" |
| SECURITY = "security" # Used for Wifi & SoftAp |
| |
| # Used for SoftAp |
| AP_BAND_KEY = "apBand" |
| AP_CHANNEL_KEY = "apChannel" |
| AP_BANDS_KEY = "apBands" |
| AP_CHANNEL_FREQUENCYS_KEY = "apChannelFrequencies" |
| AP_MAC_RANDOMIZATION_SETTING_KEY = "MacRandomizationSetting" |
| AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY = "BridgedModeOpportunisticShutdownEnabled" |
| AP_IEEE80211AX_ENABLED_KEY = "Ieee80211axEnabled" |
| AP_MAXCLIENTS_KEY = "MaxNumberOfClients" |
| AP_SHUTDOWNTIMEOUT_KEY = "ShutdownTimeoutMillis" |
| AP_SHUTDOWNTIMEOUTENABLE_KEY = "AutoShutdownEnabled" |
| AP_CLIENTCONTROL_KEY = "ClientControlByUserEnabled" |
| AP_ALLOWEDLIST_KEY = "AllowedClientList" |
| AP_BLOCKEDLIST_KEY = "BlockedClientList" |
| |
| WIFI_CONFIG_SOFTAP_BAND_2G = 1 |
| WIFI_CONFIG_SOFTAP_BAND_5G = 2 |
| WIFI_CONFIG_SOFTAP_BAND_2G_5G = 3 |
| WIFI_CONFIG_SOFTAP_BAND_6G = 4 |
| WIFI_CONFIG_SOFTAP_BAND_2G_6G = 5 |
| WIFI_CONFIG_SOFTAP_BAND_5G_6G = 6 |
| WIFI_CONFIG_SOFTAP_BAND_ANY = 7 |
| |
| # DO NOT USE IT for new test case! Replaced by WIFI_CONFIG_SOFTAP_BAND_ |
| WIFI_CONFIG_APBAND_2G = WIFI_CONFIG_SOFTAP_BAND_2G |
| WIFI_CONFIG_APBAND_5G = WIFI_CONFIG_SOFTAP_BAND_5G |
| WIFI_CONFIG_APBAND_AUTO = WIFI_CONFIG_SOFTAP_BAND_2G_5G |
| |
| WIFI_CONFIG_APBAND_2G_OLD = 0 |
| WIFI_CONFIG_APBAND_5G_OLD = 1 |
| WIFI_CONFIG_APBAND_AUTO_OLD = -1 |
| |
| WIFI_WPS_INFO_PBC = 0 |
| WIFI_WPS_INFO_DISPLAY = 1 |
| WIFI_WPS_INFO_KEYPAD = 2 |
| WIFI_WPS_INFO_LABEL = 3 |
| WIFI_WPS_INFO_INVALID = 4 |
| |
| class SoftApSecurityType(): |
| OPEN = "NONE" |
| WPA2 = "WPA2_PSK" |
| WPA3_SAE_TRANSITION = "WPA3_SAE_TRANSITION" |
| WPA3_SAE = "WPA3_SAE" |
| |
| class CountryCode(): |
| AUSTRALIA = "AU" |
| CHINA = "CN" |
| GERMANY = "DE" |
| JAPAN = "JP" |
| UK = "GB" |
| US = "US" |
| UNKNOWN = "UNKNOWN" |
| |
| # Start of Macros for EAP |
| # EAP types |
| class Eap(IntEnum): |
| NONE = -1 |
| PEAP = 0 |
| TLS = 1 |
| TTLS = 2 |
| PWD = 3 |
| SIM = 4 |
| AKA = 5 |
| AKA_PRIME = 6 |
| UNAUTH_TLS = 7 |
| |
| # EAP Phase2 types |
| class EapPhase2(IntEnum): |
| NONE = 0 |
| PAP = 1 |
| MSCHAP = 2 |
| MSCHAPV2 = 3 |
| GTC = 4 |
| |
| class Enterprise: |
| # Enterprise Config Macros |
| EMPTY_VALUE = "NULL" |
| EAP = "eap" |
| PHASE2 = "phase2" |
| IDENTITY = "identity" |
| ANON_IDENTITY = "anonymous_identity" |
| PASSWORD = "password" |
| SUBJECT_MATCH = "subject_match" |
| ALTSUBJECT_MATCH = "altsubject_match" |
| DOM_SUFFIX_MATCH = "domain_suffix_match" |
| CLIENT_CERT = "client_cert" |
| CA_CERT = "ca_cert" |
| ENGINE = "engine" |
| ENGINE_ID = "engine_id" |
| PRIVATE_KEY_ID = "key_id" |
| REALM = "realm" |
| PLMN = "plmn" |
| FQDN = "FQDN" |
| FRIENDLY_NAME = "providerFriendlyName" |
| ROAMING_IDS = "roamingConsortiumIds" |
| OCSP = "ocsp" |
| |
| # End of Macros for EAP |
| |
| # Macros for wifi p2p. |
| WIFI_P2P_SERVICE_TYPE_ALL = 0 |
| WIFI_P2P_SERVICE_TYPE_BONJOUR = 1 |
| WIFI_P2P_SERVICE_TYPE_UPNP = 2 |
| WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255 |
| |
| class ScanResult: |
| CHANNEL_WIDTH_20MHZ = 0 |
| CHANNEL_WIDTH_40MHZ = 1 |
| CHANNEL_WIDTH_80MHZ = 2 |
| CHANNEL_WIDTH_160MHZ = 3 |
| CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4 |
| |
| # Macros for wifi rtt. |
| class RttType(IntEnum): |
| TYPE_ONE_SIDED = 1 |
| TYPE_TWO_SIDED = 2 |
| |
| class RttPeerType(IntEnum): |
| PEER_TYPE_AP = 1 |
| PEER_TYPE_STA = 2 # Requires NAN. |
| PEER_P2P_GO = 3 |
| PEER_P2P_CLIENT = 4 |
| PEER_NAN = 5 |
| |
| class RttPreamble(IntEnum): |
| PREAMBLE_LEGACY = 0x01 |
| PREAMBLE_HT = 0x02 |
| PREAMBLE_VHT = 0x04 |
| |
| class RttBW(IntEnum): |
| BW_5_SUPPORT = 0x01 |
| BW_10_SUPPORT = 0x02 |
| BW_20_SUPPORT = 0x04 |
| BW_40_SUPPORT = 0x08 |
| BW_80_SUPPORT = 0x10 |
| BW_160_SUPPORT = 0x20 |
| |
| class Rtt(IntEnum): |
| STATUS_SUCCESS = 0 |
| STATUS_FAILURE = 1 |
| STATUS_FAIL_NO_RSP = 2 |
| STATUS_FAIL_REJECTED = 3 |
| STATUS_FAIL_NOT_SCHEDULED_YET = 4 |
| STATUS_FAIL_TM_TIMEOUT = 5 |
| STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6 |
| STATUS_FAIL_NO_CAPABILITY = 7 |
| STATUS_ABORTED = 8 |
| STATUS_FAIL_INVALID_TS = 9 |
| STATUS_FAIL_PROTOCOL = 10 |
| STATUS_FAIL_SCHEDULE = 11 |
| STATUS_FAIL_BUSY_TRY_LATER = 12 |
| STATUS_INVALID_REQ = 13 |
| STATUS_NO_WIFI = 14 |
| STATUS_FAIL_FTM_PARAM_OVERRIDE = 15 |
| |
| REASON_UNSPECIFIED = -1 |
| REASON_NOT_AVAILABLE = -2 |
| REASON_INVALID_LISTENER = -3 |
| REASON_INVALID_REQUEST = -4 |
| |
| class RttParam: |
| device_type = "deviceType" |
| request_type = "requestType" |
| BSSID = "bssid" |
| channel_width = "channelWidth" |
| frequency = "frequency" |
| center_freq0 = "centerFreq0" |
| center_freq1 = "centerFreq1" |
| number_burst = "numberBurst" |
| interval = "interval" |
| num_samples_per_burst = "numSamplesPerBurst" |
| num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame" |
| num_retries_per_FTMR = "numRetriesPerFTMR" |
| lci_request = "LCIRequest" |
| lcr_request = "LCRRequest" |
| burst_timeout = "burstTimeout" |
| preamble = "preamble" |
| bandwidth = "bandwidth" |
| margin = "margin" |
| |
| RTT_MARGIN_OF_ERROR = { |
| RttBW.BW_80_SUPPORT: 2, |
| RttBW.BW_40_SUPPORT: 5, |
| RttBW.BW_20_SUPPORT: 5 |
| } |
| |
| # Macros as specified in the WifiScanner code. |
| WIFI_BAND_UNSPECIFIED = 0 # not specified |
| WIFI_BAND_24_GHZ = 1 # 2.4 GHz band |
| WIFI_BAND_5_GHZ = 2 # 5 GHz band without DFS channels |
| WIFI_BAND_5_GHZ_DFS_ONLY = 4 # 5 GHz band with DFS channels |
| WIFI_BAND_5_GHZ_WITH_DFS = 6 # 5 GHz band with DFS channels |
| WIFI_BAND_BOTH = 3 # both bands without DFS channels |
| WIFI_BAND_BOTH_WITH_DFS = 7 # both bands with DFS channels |
| |
| REPORT_EVENT_AFTER_BUFFER_FULL = 0 |
| REPORT_EVENT_AFTER_EACH_SCAN = 1 |
| REPORT_EVENT_FULL_SCAN_RESULT = 2 |
| |
| SCAN_TYPE_LOW_LATENCY = 0 |
| SCAN_TYPE_LOW_POWER = 1 |
| SCAN_TYPE_HIGH_ACCURACY = 2 |
| |
| # US Wifi frequencies |
| ALL_2G_FREQUENCIES = [ |
| 2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462 |
| ] |
| DFS_5G_FREQUENCIES = [ |
| 5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, 5640, |
| 5660, 5680, 5700, 5720 |
| ] |
| NONE_DFS_5G_FREQUENCIES = [ |
| 5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825 |
| ] |
| ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES |
| |
| band_to_frequencies = { |
| WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES, |
| WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES, |
| WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES, |
| WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES, |
| WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES, |
| WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES |
| } |
| |
| # TODO: add all of the band mapping. |
| softap_band_frequencies = { |
| WIFI_CONFIG_SOFTAP_BAND_2G: ALL_2G_FREQUENCIES, |
| WIFI_CONFIG_SOFTAP_BAND_5G: ALL_5G_FREQUENCIES |
| } |
| |
| # All Wifi frequencies to channels lookup. |
| freq_to_channel = { |
| 2412: 1, |
| 2417: 2, |
| 2422: 3, |
| 2427: 4, |
| 2432: 5, |
| 2437: 6, |
| 2442: 7, |
| 2447: 8, |
| 2452: 9, |
| 2457: 10, |
| 2462: 11, |
| 2467: 12, |
| 2472: 13, |
| 2484: 14, |
| 4915: 183, |
| 4920: 184, |
| 4925: 185, |
| 4935: 187, |
| 4940: 188, |
| 4945: 189, |
| 4960: 192, |
| 4980: 196, |
| 5035: 7, |
| 5040: 8, |
| 5045: 9, |
| 5055: 11, |
| 5060: 12, |
| 5080: 16, |
| 5170: 34, |
| 5180: 36, |
| 5190: 38, |
| 5200: 40, |
| 5210: 42, |
| 5220: 44, |
| 5230: 46, |
| 5240: 48, |
| 5260: 52, |
| 5280: 56, |
| 5300: 60, |
| 5320: 64, |
| 5500: 100, |
| 5520: 104, |
| 5540: 108, |
| 5560: 112, |
| 5580: 116, |
| 5600: 120, |
| 5620: 124, |
| 5640: 128, |
| 5660: 132, |
| 5680: 136, |
| 5700: 140, |
| 5745: 149, |
| 5765: 153, |
| 5785: 157, |
| 5795: 159, |
| 5805: 161, |
| 5825: 165, |
| } |
| |
| # All Wifi channels to frequencies lookup. |
| channel_2G_to_freq = { |
| 1: 2412, |
| 2: 2417, |
| 3: 2422, |
| 4: 2427, |
| 5: 2432, |
| 6: 2437, |
| 7: 2442, |
| 8: 2447, |
| 9: 2452, |
| 10: 2457, |
| 11: 2462, |
| 12: 2467, |
| 13: 2472, |
| 14: 2484 |
| } |
| |
| channel_5G_to_freq = { |
| 183: 4915, |
| 184: 4920, |
| 185: 4925, |
| 187: 4935, |
| 188: 4940, |
| 189: 4945, |
| 192: 4960, |
| 196: 4980, |
| 7: 5035, |
| 8: 5040, |
| 9: 5045, |
| 11: 5055, |
| 12: 5060, |
| 16: 5080, |
| 34: 5170, |
| 36: 5180, |
| 38: 5190, |
| 40: 5200, |
| 42: 5210, |
| 44: 5220, |
| 46: 5230, |
| 48: 5240, |
| 50: 5250, |
| 52: 5260, |
| 56: 5280, |
| 60: 5300, |
| 64: 5320, |
| 100: 5500, |
| 104: 5520, |
| 108: 5540, |
| 112: 5560, |
| 116: 5580, |
| 120: 5600, |
| 124: 5620, |
| 128: 5640, |
| 132: 5660, |
| 136: 5680, |
| 140: 5700, |
| 149: 5745, |
| 151: 5755, |
| 153: 5765, |
| 155: 5775, |
| 157: 5785, |
| 159: 5795, |
| 161: 5805, |
| 165: 5825 |
| } |
| |
| |
| class WifiChannelBase: |
| ALL_2G_FREQUENCIES = [] |
| DFS_5G_FREQUENCIES = [] |
| NONE_DFS_5G_FREQUENCIES = [] |
| ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES |
| MIX_CHANNEL_SCAN = [] |
| |
| def band_to_freq(self, band): |
| _band_to_frequencies = { |
| WifiEnums.WIFI_BAND_24_GHZ: |
| self.ALL_2G_FREQUENCIES, |
| WifiEnums.WIFI_BAND_5_GHZ: |
| self.NONE_DFS_5G_FREQUENCIES, |
| WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY: |
| self.DFS_5G_FREQUENCIES, |
| WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS: |
| self.ALL_5G_FREQUENCIES, |
| WifiEnums.WIFI_BAND_BOTH: |
| self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES, |
| WifiEnums.WIFI_BAND_BOTH_WITH_DFS: |
| self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES |
| } |
| return _band_to_frequencies[band] |
| |
| |
| class WifiChannelUS(WifiChannelBase): |
| # US Wifi frequencies |
| ALL_2G_FREQUENCIES = [ |
| 2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462 |
| ] |
| NONE_DFS_5G_FREQUENCIES = [ |
| 5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825 |
| ] |
| MIX_CHANNEL_SCAN = [ |
| 2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500, 5320, 5520, 5560, |
| 5700, 5745, 5805 |
| ] |
| |
| def __init__(self, model=None, support_addition_channel=[]): |
| if model in support_addition_channel: |
| self.ALL_2G_FREQUENCIES = [ |
| 2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, |
| 2462, 2467, 2472 |
| ] |
| self.DFS_5G_FREQUENCIES = [ |
| 5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, |
| 5640, 5660, 5680, 5700, 5720 |
| ] |
| self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES |
| |
| |
| class WifiReferenceNetworks: |
| """ Class to parse and return networks of different band and |
| auth type from reference_networks |
| """ |
| def __init__(self, obj): |
| self.reference_networks = obj |
| self.WIFI_2G = "2g" |
| self.WIFI_5G = "5g" |
| |
| self.secure_networks_2g = [] |
| self.secure_networks_5g = [] |
| self.open_networks_2g = [] |
| self.open_networks_5g = [] |
| self._parse_networks() |
| |
| def _parse_networks(self): |
| for network in self.reference_networks: |
| for key in network: |
| if key == self.WIFI_2G: |
| if "password" in network[key]: |
| self.secure_networks_2g.append(network[key]) |
| else: |
| self.open_networks_2g.append(network[key]) |
| else: |
| if "password" in network[key]: |
| self.secure_networks_5g.append(network[key]) |
| else: |
| self.open_networks_5g.append(network[key]) |
| |
| def return_2g_secure_networks(self): |
| return self.secure_networks_2g |
| |
| def return_5g_secure_networks(self): |
| return self.secure_networks_5g |
| |
| def return_2g_open_networks(self): |
| return self.open_networks_2g |
| |
| def return_5g_open_networks(self): |
| return self.open_networks_5g |
| |
| def return_secure_networks(self): |
| return self.secure_networks_2g + self.secure_networks_5g |
| |
| def return_open_networks(self): |
| return self.open_networks_2g + self.open_networks_5g |
| |
| |
| def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs): |
| """Wrapper function that handles the bahevior of assert_on_fail. |
| |
| When assert_on_fail is True, let all test signals through, which can |
| terminate test cases directly. When assert_on_fail is False, the wrapper |
| raises no test signals and reports operation status by returning True or |
| False. |
| |
| Args: |
| func: The function to wrap. This function reports operation status by |
| raising test signals. |
| assert_on_fail: A boolean that specifies if the output of the wrapper |
| is test signal based or return value based. |
| args: Positional args for func. |
| kwargs: Name args for func. |
| |
| Returns: |
| If assert_on_fail is True, returns True/False to signal operation |
| status, otherwise return nothing. |
| """ |
| try: |
| func(*args, **kwargs) |
| if not assert_on_fail: |
| return True |
| except signals.TestSignal: |
| if assert_on_fail: |
| raise |
| return False |
| |
| |
| def assert_network_in_list(target, network_list): |
| """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi |
| networks. |
| |
| Args: |
| target: A dict representing a Wi-Fi network. |
| E.g. {WifiEnums.SSID_KEY: "SomeNetwork"} |
| network_list: A list of dicts, each representing a Wi-Fi network. |
| """ |
| match_results = match_networks(target, network_list) |
| asserts.assert_true( |
| match_results, "Target network %s, does not exist in network list %s" % |
| (target, network_list)) |
| |
| |
| def match_networks(target_params, networks): |
| """Finds the WiFi networks that match a given set of parameters in a list |
| of WiFi networks. |
| |
| To be considered a match, the network should contain every key-value pair |
| of target_params |
| |
| Args: |
| target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network. |
| E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' } |
| networks: A list of dict objects representing WiFi networks. |
| |
| Returns: |
| The networks that match the target parameters. |
| """ |
| results = [] |
| asserts.assert_true(target_params, |
| "Expected networks object 'target_params' is empty") |
| for n in networks: |
| add_network = 1 |
| for k, v in target_params.items(): |
| if k not in n: |
| add_network = 0 |
| break |
| if n[k] != v: |
| add_network = 0 |
| break |
| if add_network: |
| results.append(n) |
| return results |
| |
| |
| def wait_for_wifi_state(ad, state, assert_on_fail=True): |
| """Waits for the device to transition to the specified wifi state |
| |
| Args: |
| ad: An AndroidDevice object. |
| state: Wifi state to wait for. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| If assert_on_fail is False, function returns True if the device transitions |
| to the specified state, False otherwise. If assert_on_fail is True, no return value. |
| """ |
| return _assert_on_fail_handler(_wait_for_wifi_state, |
| assert_on_fail, |
| ad, |
| state=state) |
| |
| |
| def _wait_for_wifi_state(ad, state): |
| """Toggles the state of wifi. |
| |
| TestFailure signals are raised when something goes wrong. |
| |
| Args: |
| ad: An AndroidDevice object. |
| state: Wifi state to wait for. |
| """ |
| if state == ad.droid.wifiCheckState(): |
| # Check if the state is already achieved, so we don't wait for the |
| # state change event by mistake. |
| return |
| ad.droid.wifiStartTrackingStateChange() |
| fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % ( |
| state, ad.serial) |
| try: |
| ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED, |
| lambda x: x["data"]["enabled"] == state, |
| SHORT_TIMEOUT) |
| except Empty: |
| asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg) |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def wifi_toggle_state(ad, new_state=None, assert_on_fail=True): |
| """Toggles the state of wifi. |
| |
| Args: |
| ad: An AndroidDevice object. |
| new_state: Wifi state to set to. If None, opposite of the current state. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| If assert_on_fail is False, function returns True if the toggle was |
| successful, False otherwise. If assert_on_fail is True, no return value. |
| """ |
| return _assert_on_fail_handler(_wifi_toggle_state, |
| assert_on_fail, |
| ad, |
| new_state=new_state) |
| |
| |
| def _wifi_toggle_state(ad, new_state=None): |
| """Toggles the state of wifi. |
| |
| TestFailure signals are raised when something goes wrong. |
| |
| Args: |
| ad: An AndroidDevice object. |
| new_state: The state to set Wi-Fi to. If None, opposite of the current |
| state will be set. |
| """ |
| if new_state is None: |
| new_state = not ad.droid.wifiCheckState() |
| elif new_state == ad.droid.wifiCheckState(): |
| # Check if the new_state is already achieved, so we don't wait for the |
| # state change event by mistake. |
| return |
| ad.droid.wifiStartTrackingStateChange() |
| ad.log.info("Setting Wi-Fi state to %s.", new_state) |
| ad.ed.clear_all_events() |
| # Setting wifi state. |
| ad.droid.wifiToggleState(new_state) |
| time.sleep(2) |
| fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state, |
| ad.serial) |
| try: |
| ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED, |
| lambda x: x["data"]["enabled"] == new_state, |
| SHORT_TIMEOUT) |
| except Empty: |
| asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg) |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def reset_wifi(ad): |
| """Clears all saved Wi-Fi networks on a device. |
| |
| This will turn Wi-Fi on. |
| |
| Args: |
| ad: An AndroidDevice object. |
| |
| """ |
| networks = ad.droid.wifiGetConfiguredNetworks() |
| if not networks: |
| return |
| removed = [] |
| for n in networks: |
| if n['networkId'] not in removed: |
| ad.droid.wifiForgetNetwork(n['networkId']) |
| removed.append(n['networkId']) |
| else: |
| continue |
| try: |
| event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS, |
| SHORT_TIMEOUT) |
| except Empty: |
| logging.warning("Could not confirm the removal of network %s.", n) |
| # Check again to see if there's any network left. |
| asserts.assert_true( |
| not ad.droid.wifiGetConfiguredNetworks(), |
| "Failed to remove these configured Wi-Fi networks: %s" % networks) |
| |
| |
| |
| def toggle_airplane_mode_on_and_off(ad): |
| """Turn ON and OFF Airplane mode. |
| |
| ad: An AndroidDevice object. |
| Returns: Assert if turning on/off Airplane mode fails. |
| |
| """ |
| ad.log.debug("Toggling Airplane mode ON.") |
| asserts.assert_true(utils.force_airplane_mode(ad, True), |
| "Can not turn on airplane mode on: %s" % ad.serial) |
| time.sleep(DEFAULT_TIMEOUT) |
| ad.log.debug("Toggling Airplane mode OFF.") |
| asserts.assert_true(utils.force_airplane_mode(ad, False), |
| "Can not turn on airplane mode on: %s" % ad.serial) |
| time.sleep(DEFAULT_TIMEOUT) |
| |
| |
| def toggle_wifi_off_and_on(ad): |
| """Turn OFF and ON WiFi. |
| |
| ad: An AndroidDevice object. |
| Returns: Assert if turning off/on WiFi fails. |
| |
| """ |
| ad.log.debug("Toggling wifi OFF.") |
| wifi_toggle_state(ad, False) |
| time.sleep(DEFAULT_TIMEOUT) |
| ad.log.debug("Toggling wifi ON.") |
| wifi_toggle_state(ad, True) |
| time.sleep(DEFAULT_TIMEOUT) |
| |
| |
| def wifi_forget_network(ad, net_ssid): |
| """Remove configured Wifi network on an android device. |
| |
| Args: |
| ad: android_device object for forget network. |
| net_ssid: ssid of network to be forget |
| |
| """ |
| networks = ad.droid.wifiGetConfiguredNetworks() |
| if not networks: |
| return |
| removed = [] |
| for n in networks: |
| if net_ssid in n[WifiEnums.SSID_KEY] and n['networkId'] not in removed: |
| ad.droid.wifiForgetNetwork(n['networkId']) |
| removed.append(n['networkId']) |
| try: |
| event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS, |
| SHORT_TIMEOUT) |
| except Empty: |
| asserts.fail("Failed to remove network %s." % n) |
| break |
| |
| |
| def wifi_test_device_init(ad): |
| """Initializes an android device for wifi testing. |
| |
| 0. Make sure SL4A connection is established on the android device. |
| 1. Disable location service's WiFi scan. |
| 2. Turn WiFi on. |
| 3. Clear all saved networks. |
| 4. Set country code to US. |
| 5. Enable WiFi verbose logging. |
| 6. Sync device time with computer time. |
| 7. Turn off cellular data. |
| 8. Turn off ambient display. |
| """ |
| utils.require_sl4a((ad, )) |
| ad.droid.wifiScannerToggleAlwaysAvailable(False) |
| msg = "Failed to turn off location service's scan." |
| asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg) |
| wifi_toggle_state(ad, True) |
| reset_wifi(ad) |
| ad.droid.wifiEnableVerboseLogging(1) |
| msg = "Failed to enable WiFi verbose logging." |
| asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg) |
| # We don't verify the following settings since they are not critical. |
| # Set wpa_supplicant log level to EXCESSIVE. |
| output = ad.adb.shell( |
| "wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME=" |
| "wlan0 log_level EXCESSIVE", |
| ignore_status=True) |
| ad.log.info("wpa_supplicant log change status: %s", output) |
| utils.sync_device_time(ad) |
| ad.droid.telephonyToggleDataConnection(False) |
| set_wifi_country_code(ad, WifiEnums.CountryCode.US) |
| utils.set_ambient_display(ad, False) |
| |
| |
| def set_wifi_country_code(ad, country_code): |
| """Sets the wifi country code on the device. |
| |
| Args: |
| ad: An AndroidDevice object. |
| country_code: 2 letter ISO country code |
| |
| Raises: |
| An RpcException if unable to set the country code. |
| """ |
| try: |
| ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code) |
| except Exception as e: |
| ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US) |
| |
| |
| def start_wifi_connection_scan(ad): |
| """Starts a wifi connection scan and wait for results to become available. |
| |
| Args: |
| ad: An AndroidDevice object. |
| """ |
| ad.ed.clear_all_events() |
| ad.droid.wifiStartScan() |
| try: |
| ad.ed.pop_event("WifiManagerScanResultsAvailable", 60) |
| except Empty: |
| asserts.fail("Wi-Fi results did not become available within 60s.") |
| |
| |
| def start_wifi_connection_scan_and_return_status(ad): |
| """ |
| Starts a wifi connection scan and wait for results to become available |
| or a scan failure to be reported. |
| |
| Args: |
| ad: An AndroidDevice object. |
| Returns: |
| True: if scan succeeded & results are available |
| False: if scan failed |
| """ |
| ad.ed.clear_all_events() |
| ad.droid.wifiStartScan() |
| try: |
| events = ad.ed.pop_events("WifiManagerScan(ResultsAvailable|Failure)", |
| 60) |
| except Empty: |
| asserts.fail( |
| "Wi-Fi scan results/failure did not become available within 60s.") |
| # If there are multiple matches, we check for atleast one success. |
| for event in events: |
| if event["name"] == "WifiManagerScanResultsAvailable": |
| return True |
| elif event["name"] == "WifiManagerScanFailure": |
| ad.log.debug("Scan failure received") |
| return False |
| |
| |
| def start_wifi_connection_scan_and_check_for_network(ad, |
| network_ssid, |
| max_tries=3): |
| """ |
| Start connectivity scans & checks if the |network_ssid| is seen in |
| scan results. The method performs a max of |max_tries| connectivity scans |
| to find the network. |
| |
| Args: |
| ad: An AndroidDevice object. |
| network_ssid: SSID of the network we are looking for. |
| max_tries: Number of scans to try. |
| Returns: |
| True: if network_ssid is found in scan results. |
| False: if network_ssid is not found in scan results. |
| """ |
| start_time = time.time() |
| for num_tries in range(max_tries): |
| if start_wifi_connection_scan_and_return_status(ad): |
| scan_results = ad.droid.wifiGetScanResults() |
| match_results = match_networks({WifiEnums.SSID_KEY: network_ssid}, |
| scan_results) |
| if len(match_results) > 0: |
| ad.log.debug("Found network in %s seconds." % |
| (time.time() - start_time)) |
| return True |
| ad.log.debug("Did not find network in %s seconds." % |
| (time.time() - start_time)) |
| return False |
| |
| |
| def start_wifi_connection_scan_and_ensure_network_found( |
| ad, network_ssid, max_tries=3): |
| """ |
| Start connectivity scans & ensure the |network_ssid| is seen in |
| scan results. The method performs a max of |max_tries| connectivity scans |
| to find the network. |
| This method asserts on failure! |
| |
| Args: |
| ad: An AndroidDevice object. |
| network_ssid: SSID of the network we are looking for. |
| max_tries: Number of scans to try. |
| """ |
| ad.log.info("Starting scans to ensure %s is present", network_ssid) |
| assert_msg = "Failed to find " + network_ssid + " in scan results" \ |
| " after " + str(max_tries) + " tries" |
| asserts.assert_true( |
| start_wifi_connection_scan_and_check_for_network( |
| ad, network_ssid, max_tries), assert_msg) |
| |
| |
| def start_wifi_connection_scan_and_ensure_network_not_found( |
| ad, network_ssid, max_tries=3): |
| """ |
| Start connectivity scans & ensure the |network_ssid| is not seen in |
| scan results. The method performs a max of |max_tries| connectivity scans |
| to find the network. |
| This method asserts on failure! |
| |
| Args: |
| ad: An AndroidDevice object. |
| network_ssid: SSID of the network we are looking for. |
| max_tries: Number of scans to try. |
| """ |
| ad.log.info("Starting scans to ensure %s is not present", network_ssid) |
| assert_msg = "Found " + network_ssid + " in scan results" \ |
| " after " + str(max_tries) + " tries" |
| asserts.assert_false( |
| start_wifi_connection_scan_and_check_for_network( |
| ad, network_ssid, max_tries), assert_msg) |
| |
| |
| def start_wifi_background_scan(ad, scan_setting): |
| """Starts wifi background scan. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| scan_setting: A dict representing the settings of the scan. |
| |
| Returns: |
| If scan was started successfully, event data of success event is returned. |
| """ |
| idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting) |
| event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx), |
| SHORT_TIMEOUT) |
| return event['data'] |
| |
| |
| def start_wifi_tethering(ad, ssid, password, band=None, hidden=None, |
| security=None): |
| """Starts wifi tethering on an android_device. |
| |
| Args: |
| ad: android_device to start wifi tethering on. |
| ssid: The SSID the soft AP should broadcast. |
| password: The password the soft AP should use. |
| band: The band the soft AP should be set on. It should be either |
| WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G. |
| hidden: boolean to indicate if the AP needs to be hidden or not. |
| security: security type of softap. |
| |
| Returns: |
| No return value. Error checks in this function will raise test failure signals |
| """ |
| config = {WifiEnums.SSID_KEY: ssid} |
| if password: |
| config[WifiEnums.PWD_KEY] = password |
| if band: |
| config[WifiEnums.AP_BAND_KEY] = band |
| if hidden: |
| config[WifiEnums.HIDDEN_KEY] = hidden |
| if security: |
| config[WifiEnums.SECURITY] = security |
| asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(config), |
| "Failed to update WifiAp Configuration") |
| ad.droid.wifiStartTrackingTetherStateChange() |
| ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False) |
| try: |
| ad.ed.pop_event("ConnectivityManagerOnTetheringStarted") |
| ad.ed.wait_for_event("TetherStateChanged", |
| lambda x: x["data"]["ACTIVE_TETHER"], 30) |
| ad.log.debug("Tethering started successfully.") |
| except Empty: |
| msg = "Failed to receive confirmation of wifi tethering starting" |
| asserts.fail(msg) |
| finally: |
| ad.droid.wifiStopTrackingTetherStateChange() |
| |
| |
| def save_wifi_soft_ap_config(ad, |
| wifi_config, |
| band=None, |
| hidden=None, |
| security=None, |
| password=None, |
| channel=None, |
| max_clients=None, |
| shutdown_timeout_enable=None, |
| shutdown_timeout_millis=None, |
| client_control_enable=None, |
| allowedList=None, |
| blockedList=None, |
| bands=None, |
| channel_frequencys=None, |
| mac_randomization_setting=None, |
| bridged_opportunistic_shutdown_enabled=None, |
| ieee80211ax_enabled=None): |
| """ Save a soft ap configuration and verified |
| Args: |
| ad: android_device to set soft ap configuration. |
| wifi_config: a soft ap configuration object, at least include SSID. |
| band: specifies the band for the soft ap. |
| hidden: specifies the soft ap need to broadcast its SSID or not. |
| security: specifies the security type for the soft ap. |
| password: specifies the password for the soft ap. |
| channel: specifies the channel for the soft ap. |
| max_clients: specifies the maximum connected client number. |
| shutdown_timeout_enable: specifies the auto shut down enable or not. |
| shutdown_timeout_millis: specifies the shut down timeout value. |
| client_control_enable: specifies the client control enable or not. |
| allowedList: specifies allowed clients list. |
| blockedList: specifies blocked clients list. |
| bands: specifies the band list for the soft ap. |
| channel_frequencys: specifies the channel frequency list for soft ap. |
| mac_randomization_setting: specifies the mac randomization setting. |
| bridged_opportunistic_shutdown_enabled: specifies the opportunistic |
| shutdown enable or not. |
| ieee80211ax_enabled: specifies the ieee80211ax enable or not. |
| """ |
| if security and password: |
| wifi_config[WifiEnums.SECURITY] = security |
| wifi_config[WifiEnums.PWD_KEY] = password |
| if hidden is not None: |
| wifi_config[WifiEnums.HIDDEN_KEY] = hidden |
| if max_clients is not None: |
| wifi_config[WifiEnums.AP_MAXCLIENTS_KEY] = max_clients |
| if shutdown_timeout_enable is not None: |
| wifi_config[ |
| WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] = shutdown_timeout_enable |
| if shutdown_timeout_millis is not None: |
| wifi_config[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] = shutdown_timeout_millis |
| if client_control_enable is not None: |
| wifi_config[WifiEnums.AP_CLIENTCONTROL_KEY] = client_control_enable |
| if allowedList is not None: |
| wifi_config[WifiEnums.AP_ALLOWEDLIST_KEY] = allowedList |
| if blockedList is not None: |
| wifi_config[WifiEnums.AP_BLOCKEDLIST_KEY] = blockedList |
| if mac_randomization_setting is not None: |
| wifi_config[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY |
| ] = mac_randomization_setting |
| if bridged_opportunistic_shutdown_enabled is not None: |
| wifi_config[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY |
| ] = bridged_opportunistic_shutdown_enabled |
| if ieee80211ax_enabled is not None: |
| wifi_config[WifiEnums.AP_IEEE80211AX_ENABLED_KEY]= ieee80211ax_enabled |
| if channel_frequencys is not None: |
| wifi_config[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] = channel_frequencys |
| elif bands is not None: |
| wifi_config[WifiEnums.AP_BANDS_KEY] = bands |
| elif band is not None: |
| if channel is not None: |
| wifi_config[WifiEnums.AP_BAND_KEY] = band |
| wifi_config[WifiEnums.AP_CHANNEL_KEY] = channel |
| else: |
| wifi_config[WifiEnums.AP_BAND_KEY] = band |
| |
| if WifiEnums.AP_CHANNEL_KEY in wifi_config and wifi_config[ |
| WifiEnums.AP_CHANNEL_KEY] == 0: |
| del wifi_config[WifiEnums.AP_CHANNEL_KEY] |
| |
| if WifiEnums.SECURITY in wifi_config and wifi_config[ |
| WifiEnums.SECURITY] == WifiEnums.SoftApSecurityType.OPEN: |
| del wifi_config[WifiEnums.SECURITY] |
| del wifi_config[WifiEnums.PWD_KEY] |
| |
| asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config), |
| "Failed to set WifiAp Configuration") |
| |
| wifi_ap = ad.droid.wifiGetApConfiguration() |
| asserts.assert_true( |
| wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY], |
| "Hotspot SSID doesn't match") |
| if WifiEnums.SECURITY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.SECURITY] == wifi_config[WifiEnums.SECURITY], |
| "Hotspot Security doesn't match") |
| if WifiEnums.PWD_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.PWD_KEY] == wifi_config[WifiEnums.PWD_KEY], |
| "Hotspot Password doesn't match") |
| |
| if WifiEnums.HIDDEN_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.HIDDEN_KEY] == wifi_config[WifiEnums.HIDDEN_KEY], |
| "Hotspot hidden setting doesn't match") |
| |
| if WifiEnums.AP_CHANNEL_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_CHANNEL_KEY] == wifi_config[ |
| WifiEnums.AP_CHANNEL_KEY], "Hotspot Channel doesn't match") |
| if WifiEnums.AP_MAXCLIENTS_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_MAXCLIENTS_KEY] == wifi_config[ |
| WifiEnums.AP_MAXCLIENTS_KEY], |
| "Hotspot Max Clients doesn't match") |
| if WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] == wifi_config[ |
| WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY], |
| "Hotspot ShutDown feature flag doesn't match") |
| if WifiEnums.AP_SHUTDOWNTIMEOUT_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] == wifi_config[ |
| WifiEnums.AP_SHUTDOWNTIMEOUT_KEY], |
| "Hotspot ShutDown timeout setting doesn't match") |
| if WifiEnums.AP_CLIENTCONTROL_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_CLIENTCONTROL_KEY] == wifi_config[ |
| WifiEnums.AP_CLIENTCONTROL_KEY], |
| "Hotspot Client control flag doesn't match") |
| if WifiEnums.AP_ALLOWEDLIST_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_ALLOWEDLIST_KEY] == wifi_config[ |
| WifiEnums.AP_ALLOWEDLIST_KEY], |
| "Hotspot Allowed List doesn't match") |
| if WifiEnums.AP_BLOCKEDLIST_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_BLOCKEDLIST_KEY] == wifi_config[ |
| WifiEnums.AP_BLOCKEDLIST_KEY], |
| "Hotspot Blocked List doesn't match") |
| |
| if WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY] == wifi_config[ |
| WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY], |
| "Hotspot Mac randomization setting doesn't match") |
| |
| if WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY] == wifi_config[ |
| WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY], |
| "Hotspot bridged shutdown enable setting doesn't match") |
| |
| if WifiEnums.AP_IEEE80211AX_ENABLED_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_IEEE80211AX_ENABLED_KEY] == wifi_config[ |
| WifiEnums.AP_IEEE80211AX_ENABLED_KEY], |
| "Hotspot 80211 AX enable setting doesn't match") |
| |
| if WifiEnums.AP_CHANNEL_FREQUENCYS_KEY in wifi_config: |
| asserts.assert_true( |
| wifi_ap[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] == wifi_config[ |
| WifiEnums.AP_CHANNEL_FREQUENCYS_KEY], |
| "Hotspot channels setting doesn't match") |
| |
| def start_wifi_tethering_saved_config(ad): |
| """ Turn on wifi hotspot with a config that is already saved """ |
| ad.droid.wifiStartTrackingTetherStateChange() |
| ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False) |
| try: |
| ad.ed.pop_event("ConnectivityManagerOnTetheringStarted") |
| ad.ed.wait_for_event("TetherStateChanged", |
| lambda x: x["data"]["ACTIVE_TETHER"], 30) |
| except: |
| asserts.fail("Didn't receive wifi tethering starting confirmation") |
| finally: |
| ad.droid.wifiStopTrackingTetherStateChange() |
| |
| |
| def stop_wifi_tethering(ad): |
| """Stops wifi tethering on an android_device. |
| Args: |
| ad: android_device to stop wifi tethering on. |
| """ |
| ad.droid.wifiStartTrackingTetherStateChange() |
| ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI) |
| try: |
| ad.ed.pop_event("WifiManagerApDisabled", 30) |
| ad.ed.wait_for_event("TetherStateChanged", |
| lambda x: not x["data"]["ACTIVE_TETHER"], 30) |
| except Empty: |
| msg = "Failed to receive confirmation of wifi tethering stopping" |
| asserts.fail(msg) |
| finally: |
| ad.droid.wifiStopTrackingTetherStateChange() |
| |
| |
| def toggle_wifi_and_wait_for_reconnection(ad, |
| network, |
| num_of_tries=1, |
| assert_on_fail=True): |
| """Toggle wifi state and then wait for Android device to reconnect to |
| the provided wifi network. |
| |
| This expects the device to be already connected to the provided network. |
| |
| Logic steps are |
| 1. Ensure that we're connected to the network. |
| 2. Turn wifi off. |
| 3. Wait for 10 seconds. |
| 4. Turn wifi on. |
| 5. Wait for the "connected" event, then confirm the connected ssid is the |
| one requested. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network: A dictionary representing the network to await connection. The |
| dictionary must have the key "SSID". |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| If assert_on_fail is False, function returns True if the toggle was |
| successful, False otherwise. If assert_on_fail is True, no return value. |
| """ |
| return _assert_on_fail_handler(_toggle_wifi_and_wait_for_reconnection, |
| assert_on_fail, |
| ad, |
| network, |
| num_of_tries=num_of_tries) |
| |
| |
| def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=3): |
| """Toggle wifi state and then wait for Android device to reconnect to |
| the provided wifi network. |
| |
| This expects the device to be already connected to the provided network. |
| |
| Logic steps are |
| 1. Ensure that we're connected to the network. |
| 2. Turn wifi off. |
| 3. Wait for 10 seconds. |
| 4. Turn wifi on. |
| 5. Wait for the "connected" event, then confirm the connected ssid is the |
| one requested. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network: A dictionary representing the network to await connection. The |
| dictionary must have the key "SSID". |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| """ |
| expected_ssid = network[WifiEnums.SSID_KEY] |
| # First ensure that we're already connected to the provided network. |
| verify_con = {WifiEnums.SSID_KEY: expected_ssid} |
| verify_wifi_connection_info(ad, verify_con) |
| # Now toggle wifi state and wait for the connection event. |
| wifi_toggle_state(ad, False) |
| time.sleep(10) |
| wifi_toggle_state(ad, True) |
| ad.droid.wifiStartTrackingStateChange() |
| try: |
| connect_result = None |
| for i in range(num_of_tries): |
| try: |
| connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, |
| 30) |
| break |
| except Empty: |
| pass |
| asserts.assert_true( |
| connect_result, "Failed to connect to Wi-Fi network %s on %s" % |
| (network, ad.serial)) |
| logging.debug("Connection result on %s: %s.", ad.serial, |
| connect_result) |
| actual_ssid = connect_result['data'][WifiEnums.SSID_KEY] |
| asserts.assert_equal( |
| actual_ssid, expected_ssid, "Connected to the wrong network on %s." |
| "Expected %s, but got %s." % |
| (ad.serial, expected_ssid, actual_ssid)) |
| logging.info("Connected to Wi-Fi network %s on %s", actual_ssid, |
| ad.serial) |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def wait_for_connect(ad, |
| expected_ssid=None, |
| expected_id=None, |
| tries=2, |
| assert_on_fail=True): |
| """Wait for a connect event. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: An Android device object. |
| expected_ssid: SSID of the network to connect to. |
| expected_id: Network Id of the network to connect to. |
| tries: An integer that is the number of times to try before failing. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| Returns a value only if assert_on_fail is false. |
| Returns True if the connection was successful, False otherwise. |
| """ |
| return _assert_on_fail_handler(_wait_for_connect, assert_on_fail, ad, |
| expected_ssid, expected_id, tries) |
| |
| |
| def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2): |
| """Wait for a connect event. |
| |
| Args: |
| ad: An Android device object. |
| expected_ssid: SSID of the network to connect to. |
| expected_id: Network Id of the network to connect to. |
| tries: An integer that is the number of times to try before failing. |
| """ |
| ad.droid.wifiStartTrackingStateChange() |
| try: |
| connect_result = _wait_for_connect_event(ad, |
| ssid=expected_ssid, |
| id=expected_id, |
| tries=tries) |
| asserts.assert_true( |
| connect_result, |
| "Failed to connect to Wi-Fi network %s" % expected_ssid) |
| ad.log.debug("Wi-Fi connection result: %s.", connect_result) |
| actual_ssid = connect_result['data'][WifiEnums.SSID_KEY] |
| if expected_ssid: |
| asserts.assert_equal(actual_ssid, expected_ssid, |
| "Connected to the wrong network") |
| actual_id = connect_result['data'][WifiEnums.NETID_KEY] |
| if expected_id: |
| asserts.assert_equal(actual_id, expected_id, |
| "Connected to the wrong network") |
| ad.log.info("Connected to Wi-Fi network %s.", actual_ssid) |
| except Empty: |
| asserts.fail("Failed to start connection process to %s" % |
| expected_ssid) |
| except Exception as error: |
| ad.log.error("Failed to connect to %s with error %s", expected_ssid, |
| error) |
| raise signals.TestFailure("Failed to connect to %s network" % |
| expected_ssid) |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def _wait_for_connect_event(ad, ssid=None, id=None, tries=1): |
| """Wait for a connect event on queue and pop when available. |
| |
| Args: |
| ad: An Android device object. |
| ssid: SSID of the network to connect to. |
| id: Network Id of the network to connect to. |
| tries: An integer that is the number of times to try before failing. |
| |
| Returns: |
| A dict with details of the connection data, which looks like this: |
| { |
| 'time': 1485460337798, |
| 'name': 'WifiNetworkConnected', |
| 'data': { |
| 'rssi': -27, |
| 'is_24ghz': True, |
| 'mac_address': '02:00:00:00:00:00', |
| 'network_id': 1, |
| 'BSSID': '30:b5:c2:33:d3:fc', |
| 'ip_address': 117483712, |
| 'link_speed': 54, |
| 'supplicant_state': 'completed', |
| 'hidden_ssid': False, |
| 'SSID': 'wh_ap1_2g', |
| 'is_5ghz': False} |
| } |
| |
| """ |
| conn_result = None |
| |
| # If ssid and network id is None, just wait for any connect event. |
| if id is None and ssid is None: |
| for i in range(tries): |
| try: |
| conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, |
| 30) |
| break |
| except Empty: |
| pass |
| else: |
| # If ssid or network id is specified, wait for specific connect event. |
| for i in range(tries): |
| try: |
| conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, |
| 30) |
| if id and conn_result['data'][WifiEnums.NETID_KEY] == id: |
| break |
| elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid: |
| break |
| except Empty: |
| pass |
| |
| return conn_result |
| |
| |
| def wait_for_disconnect(ad, timeout=10): |
| """Wait for a disconnect event within the specified timeout. |
| |
| Args: |
| ad: Android device object. |
| timeout: Timeout in seconds. |
| |
| """ |
| try: |
| ad.droid.wifiStartTrackingStateChange() |
| event = ad.ed.pop_event("WifiNetworkDisconnected", timeout) |
| except Empty: |
| raise signals.TestFailure("Device did not disconnect from the network") |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def ensure_no_disconnect(ad, duration=10): |
| """Ensure that there is no disconnect for the specified duration. |
| |
| Args: |
| ad: Android device object. |
| duration: Duration in seconds. |
| |
| """ |
| try: |
| ad.droid.wifiStartTrackingStateChange() |
| event = ad.ed.pop_event("WifiNetworkDisconnected", duration) |
| raise signals.TestFailure("Device disconnected from the network") |
| except Empty: |
| pass |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def connect_to_wifi_network(ad, network, assert_on_fail=True, |
| check_connectivity=True, hidden=False, |
| num_of_scan_tries=3, num_of_connect_tries=3): |
| """Connection logic for open and psk wifi networks. |
| |
| Args: |
| ad: AndroidDevice to use for connection |
| network: network info of the network to connect to |
| assert_on_fail: If true, errors from wifi_connect will raise |
| test failure signals. |
| hidden: Is the Wifi network hidden. |
| num_of_scan_tries: The number of times to try scan |
| interface before declaring failure. |
| num_of_connect_tries: The number of times to try |
| connect wifi before declaring failure. |
| """ |
| if hidden: |
| start_wifi_connection_scan_and_ensure_network_not_found( |
| ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries) |
| else: |
| start_wifi_connection_scan_and_ensure_network_found( |
| ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries) |
| wifi_connect(ad, |
| network, |
| num_of_tries=num_of_connect_tries, |
| assert_on_fail=assert_on_fail, |
| check_connectivity=check_connectivity) |
| |
| |
| def connect_to_wifi_network_with_id(ad, network_id, network_ssid): |
| """Connect to the given network using network id and verify SSID. |
| |
| Args: |
| network_id: int Network Id of the network. |
| network_ssid: string SSID of the network. |
| |
| Returns: True if connect using network id was successful; |
| False otherwise. |
| |
| """ |
| start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid) |
| wifi_connect_by_id(ad, network_id) |
| connect_data = ad.droid.wifiGetConnectionInfo() |
| connect_ssid = connect_data[WifiEnums.SSID_KEY] |
| ad.log.debug("Expected SSID = %s Connected SSID = %s" % |
| (network_ssid, connect_ssid)) |
| if connect_ssid != network_ssid: |
| return False |
| return True |
| |
| |
| def wifi_connect(ad, |
| network, |
| num_of_tries=1, |
| assert_on_fail=True, |
| check_connectivity=True): |
| """Connect an Android device to a wifi network. |
| |
| Initiate connection to a wifi network, wait for the "connected" event, then |
| confirm the connected ssid is the one requested. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network: A dictionary representing the network to connect to. The |
| dictionary must have the key "SSID". |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| Returns a value only if assert_on_fail is false. |
| Returns True if the connection was successful, False otherwise. |
| """ |
| return _assert_on_fail_handler(_wifi_connect, |
| assert_on_fail, |
| ad, |
| network, |
| num_of_tries=num_of_tries, |
| check_connectivity=check_connectivity) |
| |
| |
| def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True): |
| """Connect an Android device to a wifi network. |
| |
| Initiate connection to a wifi network, wait for the "connected" event, then |
| confirm the connected ssid is the one requested. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network: A dictionary representing the network to connect to. The |
| dictionary must have the key "SSID". |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| """ |
| asserts.assert_true( |
| WifiEnums.SSID_KEY in network, |
| "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY) |
| ad.droid.wifiStartTrackingStateChange() |
| expected_ssid = network[WifiEnums.SSID_KEY] |
| ad.droid.wifiConnectByConfig(network) |
| ad.log.info("Starting connection process to %s", expected_ssid) |
| try: |
| event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30) |
| connect_result = _wait_for_connect_event(ad, |
| ssid=expected_ssid, |
| tries=num_of_tries) |
| asserts.assert_true( |
| connect_result, "Failed to connect to Wi-Fi network %s on %s" % |
| (network, ad.serial)) |
| ad.log.debug("Wi-Fi connection result: %s.", connect_result) |
| actual_ssid = connect_result['data'][WifiEnums.SSID_KEY] |
| asserts.assert_equal( |
| actual_ssid, expected_ssid, |
| "Connected to the wrong network on %s." % ad.serial) |
| ad.log.info("Connected to Wi-Fi network %s.", actual_ssid) |
| |
| if check_connectivity: |
| internet = validate_connection(ad, DEFAULT_PING_ADDR) |
| if not internet: |
| raise signals.TestFailure( |
| "Failed to connect to internet on %s" % expected_ssid) |
| except Empty: |
| asserts.fail("Failed to start connection process to %s on %s" % |
| (network, ad.serial)) |
| except Exception as error: |
| ad.log.error("Failed to connect to %s with error %s", expected_ssid, |
| error) |
| raise signals.TestFailure("Failed to connect to %s network" % network) |
| |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True): |
| """Connect an Android device to a wifi network using network Id. |
| |
| Start connection to the wifi network, with the given network Id, wait for |
| the "connected" event, then verify the connected network is the one requested. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network_id: Integer specifying the network id of the network. |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| Returns a value only if assert_on_fail is false. |
| Returns True if the connection was successful, False otherwise. |
| """ |
| _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad, |
| network_id, num_of_tries) |
| |
| |
| def _wifi_connect_by_id(ad, network_id, num_of_tries=1): |
| """Connect an Android device to a wifi network using it's network id. |
| |
| Start connection to the wifi network, with the given network id, wait for |
| the "connected" event, then verify the connected network is the one requested. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network_id: Integer specifying the network id of the network. |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| """ |
| ad.droid.wifiStartTrackingStateChange() |
| # Clear all previous events. |
| ad.ed.clear_all_events() |
| ad.droid.wifiConnectByNetworkId(network_id) |
| ad.log.info("Starting connection to network with id %d", network_id) |
| try: |
| event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60) |
| connect_result = _wait_for_connect_event(ad, |
| id=network_id, |
| tries=num_of_tries) |
| asserts.assert_true( |
| connect_result, |
| "Failed to connect to Wi-Fi network using network id") |
| ad.log.debug("Wi-Fi connection result: %s", connect_result) |
| actual_id = connect_result['data'][WifiEnums.NETID_KEY] |
| asserts.assert_equal( |
| actual_id, network_id, "Connected to the wrong network on %s." |
| "Expected network id = %d, but got %d." % |
| (ad.serial, network_id, actual_id)) |
| expected_ssid = connect_result['data'][WifiEnums.SSID_KEY] |
| ad.log.info("Connected to Wi-Fi network %s with %d network id.", |
| expected_ssid, network_id) |
| |
| internet = validate_connection(ad, DEFAULT_PING_ADDR) |
| if not internet: |
| raise signals.TestFailure("Failed to connect to internet on %s" % |
| expected_ssid) |
| except Empty: |
| asserts.fail("Failed to connect to network with id %d on %s" % |
| (network_id, ad.serial)) |
| except Exception as error: |
| ad.log.error("Failed to connect to network with id %d with error %s", |
| network_id, error) |
| raise signals.TestFailure("Failed to connect to network with network" |
| " id %d" % network_id) |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def wifi_connect_using_network_request(ad, |
| network, |
| network_specifier, |
| num_of_tries=3): |
| """Connect an Android device to a wifi network using network request. |
| |
| Trigger a network request with the provided network specifier, |
| wait for the "onMatch" event, ensure that the scan results in "onMatch" |
| event contain the specified network, then simulate the user granting the |
| request with the specified network selected. Then wait for the "onAvailable" |
| network callback indicating successful connection to network. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network_specifier: A dictionary representing the network specifier to |
| use. |
| network: A dictionary representing the network to connect to. The |
| dictionary must have the key "SSID". |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. |
| Returns: |
| key: Key corresponding to network request. |
| """ |
| key = ad.droid.connectivityRequestWifiNetwork(network_specifier, 0) |
| ad.log.info("Sent network request %s with %s " % (key, network_specifier)) |
| # Need a delay here because UI interaction should only start once wifi |
| # starts processing the request. |
| time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC) |
| _wait_for_wifi_connect_after_network_request(ad, network, key, num_of_tries) |
| return key |
| |
| |
| def wait_for_wifi_connect_after_network_request(ad, |
| network, |
| key, |
| num_of_tries=3, |
| assert_on_fail=True): |
| """ |
| Simulate and verify the connection flow after initiating the network |
| request. |
| |
| Wait for the "onMatch" event, ensure that the scan results in "onMatch" |
| event contain the specified network, then simulate the user granting the |
| request with the specified network selected. Then wait for the "onAvailable" |
| network callback indicating successful connection to network. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network: A dictionary representing the network to connect to. The |
| dictionary must have the key "SSID". |
| key: Key corresponding to network request. |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| Returns a value only if assert_on_fail is false. |
| Returns True if the connection was successful, False otherwise. |
| """ |
| _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request, |
| assert_on_fail, ad, network, key, num_of_tries) |
| |
| |
| def _wait_for_wifi_connect_after_network_request(ad, network, key, num_of_tries=3): |
| """ |
| Simulate and verify the connection flow after initiating the network |
| request. |
| |
| Wait for the "onMatch" event, ensure that the scan results in "onMatch" |
| event contain the specified network, then simulate the user granting the |
| request with the specified network selected. Then wait for the "onAvailable" |
| network callback indicating successful connection to network. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| network: A dictionary representing the network to connect to. The |
| dictionary must have the key "SSID". |
| key: Key corresponding to network request. |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. |
| """ |
| asserts.assert_true( |
| WifiEnums.SSID_KEY in network, |
| "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY) |
| ad.droid.wifiStartTrackingStateChange() |
| expected_ssid = network[WifiEnums.SSID_KEY] |
| ad.droid.wifiRegisterNetworkRequestMatchCallback() |
| # Wait for the platform to scan and return a list of networks |
| # matching the request |
| try: |
| matched_network = None |
| for _ in [0, num_of_tries]: |
| on_match_event = ad.ed.pop_event( |
| wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60) |
| asserts.assert_true(on_match_event, |
| "Network request on match not received.") |
| matched_scan_results = on_match_event["data"] |
| ad.log.debug("Network request on match results %s", |
| matched_scan_results) |
| matched_network = match_networks( |
| {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]}, |
| matched_scan_results) |
| ad.log.debug("Network request on match %s", matched_network) |
| if matched_network: |
| break |
| |
| asserts.assert_true(matched_network, |
| "Target network %s not found" % network) |
| |
| ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network) |
| ad.log.info("Sent user selection for network request %s", |
| expected_ssid) |
| |
| # Wait for the platform to connect to the network. |
| autils.wait_for_event_with_keys( |
| ad, cconsts.EVENT_NETWORK_CALLBACK, |
| 60, |
| (cconsts.NETWORK_CB_KEY_ID, key), |
| (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_AVAILABLE)) |
| on_capabilities_changed = autils.wait_for_event_with_keys( |
| ad, cconsts.EVENT_NETWORK_CALLBACK, |
| 10, |
| (cconsts.NETWORK_CB_KEY_ID, key), |
| (cconsts.NETWORK_CB_KEY_EVENT, |
| cconsts.NETWORK_CB_CAPABILITIES_CHANGED)) |
| connected_network = None |
| # WifiInfo is attached to TransportInfo only in S. |
| if ad.droid.isSdkAtLeastS(): |
| connected_network = ( |
| on_capabilities_changed["data"][ |
| cconsts.NETWORK_CB_KEY_TRANSPORT_INFO] |
| ) |
| else: |
| connected_network = ad.droid.wifiGetConnectionInfo() |
| ad.log.info("Connected to network %s", connected_network) |
| asserts.assert_equal( |
| connected_network[WifiEnums.SSID_KEY], expected_ssid, |
| "Connected to the wrong network." |
| "Expected %s, but got %s." |
| % (network, connected_network)) |
| except Empty: |
| asserts.fail("Failed to connect to %s" % expected_ssid) |
| except Exception as error: |
| ad.log.error("Failed to connect to %s with error %s" % |
| (expected_ssid, error)) |
| raise signals.TestFailure("Failed to connect to %s network" % network) |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def wifi_passpoint_connect(ad, |
| passpoint_network, |
| num_of_tries=1, |
| assert_on_fail=True): |
| """Connect an Android device to a wifi network. |
| |
| Initiate connection to a wifi network, wait for the "connected" event, then |
| confirm the connected ssid is the one requested. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| passpoint_network: SSID of the Passpoint network to connect to. |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| assert_on_fail: If True, error checks in this function will raise test |
| failure signals. |
| |
| Returns: |
| If assert_on_fail is False, function returns network id, if the connect was |
| successful, False otherwise. If assert_on_fail is True, no return value. |
| """ |
| _assert_on_fail_handler(_wifi_passpoint_connect, |
| assert_on_fail, |
| ad, |
| passpoint_network, |
| num_of_tries=num_of_tries) |
| |
| |
| def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1): |
| """Connect an Android device to a wifi network. |
| |
| Initiate connection to a wifi network, wait for the "connected" event, then |
| confirm the connected ssid is the one requested. |
| |
| This will directly fail a test if anything goes wrong. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| passpoint_network: SSID of the Passpoint network to connect to. |
| num_of_tries: An integer that is the number of times to try before |
| delaring failure. Default is 1. |
| """ |
| ad.droid.wifiStartTrackingStateChange() |
| expected_ssid = passpoint_network |
| ad.log.info("Starting connection process to passpoint %s", expected_ssid) |
| |
| try: |
| connect_result = _wait_for_connect_event(ad, expected_ssid, |
| num_of_tries) |
| asserts.assert_true( |
| connect_result, "Failed to connect to WiFi passpoint network %s on" |
| " %s" % (expected_ssid, ad.serial)) |
| ad.log.info("Wi-Fi connection result: %s.", connect_result) |
| actual_ssid = connect_result['data'][WifiEnums.SSID_KEY] |
| asserts.assert_equal( |
| actual_ssid, expected_ssid, |
| "Connected to the wrong network on %s." % ad.serial) |
| ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid) |
| |
| internet = validate_connection(ad, DEFAULT_PING_ADDR) |
| if not internet: |
| raise signals.TestFailure("Failed to connect to internet on %s" % |
| expected_ssid) |
| except Exception as error: |
| ad.log.error("Failed to connect to passpoint network %s with error %s", |
| expected_ssid, error) |
| raise signals.TestFailure("Failed to connect to %s passpoint network" % |
| expected_ssid) |
| |
| finally: |
| ad.droid.wifiStopTrackingStateChange() |
| |
| |
| def delete_passpoint(ad, fqdn): |
| """Delete a required Passpoint configuration.""" |
| try: |
| ad.droid.removePasspointConfig(fqdn) |
| return True |
| except Exception as error: |
| ad.log.error( |
| "Failed to remove passpoint configuration with FQDN=%s " |
| "and error=%s", fqdn, error) |
| return False |
| |
| |
| def start_wifi_single_scan(ad, scan_setting): |
| """Starts wifi single shot scan. |
| |
| Args: |
| ad: android_device object to initiate connection on. |
| scan_setting: A dict representing the settings of the scan. |
| |
| Returns: |
| If scan was started successfully, event data of success event is returned. |
| """ |
| idx = ad.droid.wifiScannerStartScan(scan_setting) |
| event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT) |
| ad.log.debug("Got event %s", event) |
| return event['data'] |
| |
| |
| def track_connection(ad, network_ssid, check_connection_count): |
| """Track wifi connection to network changes for given number of counts |
| |
| Args: |
| ad: android_device object for forget network. |
| network_ssid: network ssid to which connection would be tracked |
| check_connection_count: Integer for maximum number network connection |
| check. |
| Returns: |
| True if connection to given network happen, else return False. |
| """ |
| ad.droid.wifiStartTrackingStateChange() |
| while check_connection_count > 0: |
| connect_network = ad.ed.pop_event("WifiNetworkConnected", 120) |
| ad.log.info("Connected to network %s", connect_network) |
| if (WifiEnums.SSID_KEY in connect_network['data'] and |
| connect_network['data'][WifiEnums.SSID_KEY] == network_ssid): |
| return True |
| check_connection_count -= 1 |
| ad.droid.wifiStopTrackingStateChange() |
| return False |
| |
| |
| def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel): |
| """Calculate the scan time required based on the band or channels in scan |
| setting |
| |
| Args: |
| wifi_chs: Object of channels supported |
| scan_setting: scan setting used for start scan |
| stime_channel: scan time per channel |
| |
| Returns: |
| scan_time: time required for completing a scan |
| scan_channels: channel used for scanning |
| """ |
| scan_time = 0 |
| scan_channels = [] |
| if "band" in scan_setting and "channels" not in scan_setting: |
| scan_channels = wifi_chs.band_to_freq(scan_setting["band"]) |
| elif "channels" in scan_setting and "band" not in scan_setting: |
| scan_channels = scan_setting["channels"] |
| scan_time = len(scan_channels) * stime_channel |
| for channel in scan_channels: |
| if channel in WifiEnums.DFS_5G_FREQUENCIES: |
| scan_time += 132 #passive scan time on DFS |
| return scan_time, scan_channels |
| |
| |
| def start_wifi_track_bssid(ad, track_setting): |
| """Start tracking Bssid for the given settings. |
| |
| Args: |
| ad: android_device object. |
| track_setting: Setting for which the bssid tracking should be started |
| |
| Returns: |
| If tracking started successfully, event data of success event is returned. |
| """ |
| idx = ad.droid.wifiScannerStartTrackingBssids( |
| track_setting["bssidInfos"], track_setting["apLostThreshold"]) |
| event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx), |
| SHORT_TIMEOUT) |
| return event['data'] |
| |
| |
| def convert_pem_key_to_pkcs8(in_file, out_file): |
| """Converts the key file generated by us to the format required by |
| Android using openssl. |
| |
| The input file must have the extension "pem". The output file must |
| have the extension "der". |
| |
| Args: |
| in_file: The original key file. |
| out_file: The full path to the converted key file, including |
| filename. |
| """ |
| asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.") |
| asserts.assert_true(out_file.endswith(".der"), |
| "Output file has to be .der.") |
| cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt" |
| " -topk8").format(in_file, out_file) |
| utils.exe_cmd(cmd) |
| |
| |
| def validate_connection(ad, |
| ping_addr=DEFAULT_PING_ADDR, |
| wait_time=15, |
| ping_gateway=True): |
| """Validate internet connection by pinging the address provided. |
| |
| Args: |
| ad: android_device object. |
| ping_addr: address on internet for pinging. |
| wait_time: wait for some time before validating connection |
| |
| Returns: |
| ping output if successful, NULL otherwise. |
| """ |
| android_version = ad.adb.shell("getprop ro.vendor.build.version.release") |
| # wait_time to allow for DHCP to complete. |
| for i in range(wait_time): |
| if ad.droid.connectivityNetworkIsConnected(): |
| if (android_version > 10 and ad.droid.connectivityGetIPv4DefaultGateway()) or android_version < 11: |
| break |
| time.sleep(1) |
| ping = False |
| try: |
| ping = ad.droid.httpPing(ping_addr) |
| ad.log.info("Http ping result: %s.", ping) |
| except: |
| pass |
| if android_version > 10 and not ping and ping_gateway: |
| ad.log.info("Http ping failed. Pinging default gateway") |
| gw = ad.droid.connectivityGetIPv4DefaultGateway() |
| result = ad.adb.shell("ping -c 6 {}".format(gw)) |
| ad.log.info("Default gateway ping result: %s" % result) |
| ping = False if "100% packet loss" in result else True |
| return ping |
| |
| |
| #TODO(angli): This can only verify if an actual value is exactly the same. |
| # Would be nice to be able to verify an actual value is one of serveral. |
| def verify_wifi_connection_info(ad, expected_con): |
| """Verifies that the information of the currently connected wifi network is |
| as expected. |
| |
| Args: |
| expected_con: A dict representing expected key-value pairs for wifi |
| connection. e.g. {"SSID": "test_wifi"} |
| """ |
| current_con = ad.droid.wifiGetConnectionInfo() |
| case_insensitive = ["BSSID", "supplicant_state"] |
| ad.log.debug("Current connection: %s", current_con) |
| for k, expected_v in expected_con.items(): |
| # Do not verify authentication related fields. |
| if k == "password": |
| continue |
| msg = "Field %s does not exist in wifi connection info %s." % ( |
| k, current_con) |
| if k not in current_con: |
| raise signals.TestFailure(msg) |
| actual_v = current_con[k] |
| if k in case_insensitive: |
| actual_v = actual_v.lower() |
| expected_v = expected_v.lower() |
| msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k, |
| actual_v) |
| if actual_v != expected_v: |
| raise signals.TestFailure(msg) |
| |
| |
| def check_autoconnect_to_open_network( |
| ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT): |
| """Connects to any open WiFI AP |
| Args: |
| timeout value in sec to wait for UE to connect to a WiFi AP |
| Returns: |
| True if UE connects to WiFi AP (supplicant_state = completed) |
| False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time. |
| """ |
| if ad.droid.wifiCheckState(): |
| return True |
| ad.droid.wifiToggleState() |
| wifi_connection_state = None |
| timeout = time.time() + conn_timeout |
| while wifi_connection_state != "completed": |
| wifi_connection_state = ad.droid.wifiGetConnectionInfo( |
| )['supplicant_state'] |
| if time.time() > timeout: |
| ad.log.warning("Failed to connect to WiFi AP") |
| return False |
| return True |
| |
| |
| def expand_enterprise_config_by_phase2(config): |
| """Take an enterprise config and generate a list of configs, each with |
| a different phase2 auth type. |
| |
| Args: |
| config: A dict representing enterprise config. |
| |
| Returns |
| A list of enterprise configs. |
| """ |
| results = [] |
| phase2_types = WifiEnums.EapPhase2 |
| if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP: |
| # Skip unsupported phase2 types for PEAP. |
| phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2] |
| for phase2_type in phase2_types: |
| # Skip a special case for passpoint TTLS. |
| if (WifiEnums.Enterprise.FQDN in config |
| and phase2_type == WifiEnums.EapPhase2.GTC): |
| continue |
| c = dict(config) |
| c[WifiEnums.Enterprise.PHASE2] = phase2_type.value |
| results.append(c) |
| return results |
| |
| |
| def generate_eap_test_name(config, ad=None): |
| """ Generates a test case name based on an EAP configuration. |
| |
| Args: |
| config: A dict representing an EAP credential. |
| ad object: Redundant but required as the same param is passed |
| to test_func in run_generated_tests |
| |
| Returns: |
| A string representing the name of a generated EAP test case. |
| """ |
| eap = WifiEnums.Eap |
| eap_phase2 = WifiEnums.EapPhase2 |
| Ent = WifiEnums.Enterprise |
| name = "test_connect-" |
| eap_name = "" |
| for e in eap: |
| if e.value == config[Ent.EAP]: |
| eap_name = e.name |
| break |
| if "peap0" in config[WifiEnums.SSID_KEY].lower(): |
| eap_name = "PEAP0" |
| if "peap1" in config[WifiEnums.SSID_KEY].lower(): |
| eap_name = "PEAP1" |
| name += eap_name |
| if Ent.PHASE2 in config: |
| for e in eap_phase2: |
| if e.value == config[Ent.PHASE2]: |
| name += "-{}".format(e.name) |
| break |
| return name |
| |
| |
| def group_attenuators(attenuators): |
| """Groups a list of attenuators into attenuator groups for backward |
| compatibility reasons. |
| |
| Most legacy Wi-Fi setups have two attenuators each connected to a separate |
| AP. The new Wi-Fi setup has four attenuators, each connected to one channel |
| on an AP, so two of them are connected to one AP. |
| |
| To make the existing scripts work in the new setup, when the script needs |
| to attenuate one AP, it needs to set attenuation on both attenuators |
| connected to the same AP. |
| |
| This function groups attenuators properly so the scripts work in both |
| legacy and new Wi-Fi setups. |
| |
| Args: |
| attenuators: A list of attenuator objects, either two or four in length. |
| |
| Raises: |
| signals.TestFailure is raised if the attenuator list does not have two |
| or four objects. |
| """ |
| attn0 = attenuator.AttenuatorGroup("AP0") |
| attn1 = attenuator.AttenuatorGroup("AP1") |
| # Legacy testbed setup has two attenuation channels. |
| num_of_attns = len(attenuators) |
| if num_of_attns == 2: |
| attn0.add(attenuators[0]) |
| attn1.add(attenuators[1]) |
| elif num_of_attns == 4: |
| attn0.add(attenuators[0]) |
| attn0.add(attenuators[1]) |
| attn1.add(attenuators[2]) |
| attn1.add(attenuators[3]) |
| else: |
| asserts.fail(("Either two or four attenuators are required for this " |
| "test, but found %s") % num_of_attns) |
| return [attn0, attn1] |
| |
| |
| def set_attns(attenuator, attn_val_name, roaming_attn=ROAMING_ATTN): |
| """Sets attenuation values on attenuators used in this test. |
| |
| Args: |
| attenuator: The attenuator object. |
| attn_val_name: Name of the attenuation value pair to use. |
| roaming_attn: Dictionary specifying the attenuation params. |
| """ |
| logging.info("Set attenuation values to %s", roaming_attn[attn_val_name]) |
| try: |
| attenuator[0].set_atten(roaming_attn[attn_val_name][0]) |
| attenuator[1].set_atten(roaming_attn[attn_val_name][1]) |
| attenuator[2].set_atten(roaming_attn[attn_val_name][2]) |
| attenuator[3].set_atten(roaming_attn[attn_val_name][3]) |
| except: |
| logging.exception("Failed to set attenuation values %s.", |
| attn_val_name) |
| raise |
| |
| |
| def set_attns_steps(attenuators, |
| atten_val_name, |
| roaming_attn=ROAMING_ATTN, |
| steps=10, |
| wait_time=12): |
| """Set attenuation values on attenuators used in this test. It will change |
| the attenuation values linearly from current value to target value step by |
| step. |
| |
| Args: |
| attenuators: The list of attenuator objects that you want to change |
| their attenuation value. |
| atten_val_name: Name of the attenuation value pair to use. |
| roaming_attn: Dictionary specifying the attenuation params. |
| steps: Number of attenuator changes to reach the target value. |
| wait_time: Sleep time for each change of attenuator. |
| """ |
| logging.info("Set attenuation values to %s in %d step(s)", |
| roaming_attn[atten_val_name], steps) |
| start_atten = [attenuator.get_atten() for attenuator in attenuators] |
| target_atten = roaming_attn[atten_val_name] |
| for current_step in range(steps): |
| progress = (current_step + 1) / steps |
| for i, attenuator in enumerate(attenuators): |
| amount_since_start = (target_atten[i] - start_atten[i]) * progress |
| attenuator.set_atten(round(start_atten[i] + amount_since_start)) |
| time.sleep(wait_time) |
| |
| |
| def trigger_roaming_and_validate(dut, |
| attenuator, |
| attn_val_name, |
| expected_con, |
| roaming_attn=ROAMING_ATTN): |
| """Sets attenuators to trigger roaming and validate the DUT connected |
| to the BSSID expected. |
| |
| Args: |
| attenuator: The attenuator object. |
| attn_val_name: Name of the attenuation value pair to use. |
| expected_con: The network information of the expected network. |
| roaming_attn: Dictionary specifying the attenaution params. |
| """ |
| expected_con = { |
| WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY], |
| WifiEnums.BSSID_KEY: expected_con["bssid"], |
| } |
| set_attns_steps(attenuator, attn_val_name, roaming_attn) |
| |
| verify_wifi_connection_info(dut, expected_con) |
| expected_bssid = expected_con[WifiEnums.BSSID_KEY] |
| logging.info("Roamed to %s successfully", expected_bssid) |
| if not validate_connection(dut): |
| raise signals.TestFailure("Fail to connect to internet on %s" % |
| expected_bssid) |
| |
| |
| def create_softap_config(): |
| """Create a softap config with random ssid and password.""" |
| ap_ssid = "softap_" + utils.rand_ascii_str(8) |
| ap_password = utils.rand_ascii_str(8) |
| logging.info("softap setup: %s %s", ap_ssid, ap_password) |
| config = { |
| WifiEnums.SSID_KEY: ap_ssid, |
| WifiEnums.PWD_KEY: ap_password, |
| } |
| return config |
| |
| |
| def start_softap_and_verify(ad, band): |
| """Bring-up softap and verify AP mode and in scan results. |
| |
| Args: |
| band: The band to use for softAP. |
| |
| Returns: dict, the softAP config. |
| |
| """ |
| # Register before start the test. |
| callbackId = ad.dut.droid.registerSoftApCallback() |
| # Check softap info value is default |
| frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True) |
| asserts.assert_true(frequency == 0, "Softap frequency is not reset") |
| asserts.assert_true(bandwdith == 0, "Softap bandwdith is not reset") |
| |
| config = create_softap_config() |
| start_wifi_tethering(ad.dut, |
| config[WifiEnums.SSID_KEY], |
| config[WifiEnums.PWD_KEY], |
| band=band) |
| asserts.assert_true(ad.dut.droid.wifiIsApEnabled(), |
| "SoftAp is not reported as running") |
| start_wifi_connection_scan_and_ensure_network_found( |
| ad.dut_client, config[WifiEnums.SSID_KEY]) |
| |
| # Check softap info can get from callback succeed and assert value should be |
| # valid. |
| frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True) |
| asserts.assert_true(frequency > 0, "Softap frequency is not valid") |
| asserts.assert_true(bandwdith > 0, "Softap bandwdith is not valid") |
| # Unregister callback |
| ad.dut.droid.unregisterSoftApCallback(callbackId) |
| |
| return config |
| |
| |
| def wait_for_expected_number_of_softap_clients(ad, callbackId, |
| expected_num_of_softap_clients): |
| """Wait for the number of softap clients to be updated as expected. |
| Args: |
| callbackId: Id of the callback associated with registering. |
| expected_num_of_softap_clients: expected number of softap clients. |
| """ |
| eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str( |
| callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED |
| clientData = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'] |
| clientCount = clientData[wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY] |
| clientMacAddresses = clientData[ |
| wifi_constants.SOFTAP_CLIENTS_MACS_CALLBACK_KEY] |
| asserts.assert_equal( |
| clientCount, expected_num_of_softap_clients, |
| "The number of softap clients doesn't match the expected number") |
| asserts.assert_equal( |
| len(clientMacAddresses), expected_num_of_softap_clients, |
| "The number of mac addresses doesn't match the expected number") |
| for macAddress in clientMacAddresses: |
| asserts.assert_true(checkMacAddress(macAddress), |
| "An invalid mac address was returned") |
| |
| |
| def checkMacAddress(input): |
| """Validate whether a string is a valid mac address or not. |
| |
| Args: |
| input: The string to validate. |
| |
| Returns: True/False, returns true for a valid mac address and false otherwise. |
| """ |
| macValidationRegex = "[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$" |
| if re.match(macValidationRegex, input.lower()): |
| return True |
| return False |
| |
| |
| def wait_for_expected_softap_state(ad, callbackId, expected_softap_state): |
| """Wait for the expected softap state change. |
| Args: |
| callbackId: Id of the callback associated with registering. |
| expected_softap_state: The expected softap state. |
| """ |
| eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str( |
| callbackId) + wifi_constants.SOFTAP_STATE_CHANGED |
| asserts.assert_equal( |
| ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'][ |
| wifi_constants.SOFTAP_STATE_CHANGE_CALLBACK_KEY], |
| expected_softap_state, |
| "Softap state doesn't match with expected state") |
| |
| |
| def get_current_number_of_softap_clients(ad, callbackId): |
| """pop up all of softap client updated event from queue. |
| Args: |
| callbackId: Id of the callback associated with registering. |
| |
| Returns: |
| If exist aleast callback, returns last updated number_of_softap_clients. |
| Returns None when no any match callback event in queue. |
| """ |
| eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str( |
| callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED |
| events = ad.ed.pop_all(eventStr) |
| for event in events: |
| num_of_clients = event['data'][ |
| wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY] |
| if len(events) == 0: |
| return None |
| return num_of_clients |
| |
| |
| def get_current_softap_info(ad, callbackId, need_to_wait): |
| """pop up all of softap info changed event from queue. |
| Args: |
| callbackId: Id of the callback associated with registering. |
| need_to_wait: Wait for the info callback event before pop all. |
| Returns: |
| Returns last updated information of softap. |
| """ |
| eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str( |
| callbackId) + wifi_constants.SOFTAP_INFO_CHANGED |
| ad.log.debug("softap info dump from eventStr %s", eventStr) |
| frequency = 0 |
| bandwidth = 0 |
| if (need_to_wait): |
| event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT) |
| frequency = event['data'][ |
| wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY] |
| bandwidth = event['data'][ |
| wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY] |
| ad.log.info("softap info updated, frequency is %s, bandwidth is %s", |
| frequency, bandwidth) |
| |
| events = ad.ed.pop_all(eventStr) |
| for event in events: |
| frequency = event['data'][ |
| wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY] |
| bandwidth = event['data'][ |
| wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY] |
| ad.log.info("softap info, frequency is %s, bandwidth is %s", frequency, |
| bandwidth) |
| return frequency, bandwidth |
| |
| def get_current_softap_infos(ad, callbackId, need_to_wait): |
| """pop up all of softap info list changed event from queue. |
| Args: |
| callbackId: Id of the callback associated with registering. |
| need_to_wait: Wait for the info callback event before pop all. |
| Returns: |
| Returns last updated informations of softap. |
| """ |
| eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str( |
| callbackId) + wifi_constants.SOFTAP_INFOLIST_CHANGED |
| ad.log.debug("softap info dump from eventStr %s", eventStr) |
| |
| if (need_to_wait): |
| event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT) |
| infos = event['data'] |
| |
| events = ad.ed.pop_all(eventStr) |
| for event in events: |
| infos = event['data'] |
| |
| for info in infos: |
| frequency = info[ |
| wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY] |
| bandwidth = info[ |
| wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY] |
| wifistandard = info[ |
| wifi_constants.SOFTAP_INFO_WIFISTANDARD_CALLBACK_KEY] |
| bssid = info[ |
| wifi_constants.SOFTAP_INFO_BSSID_CALLBACK_KEY] |
| ad.log.info( |
| "softap info, freq:%s, bw:%s, wifistandard:%s, bssid:%s", |
| frequency, bandwidth, wifistandard, bssid) |
| |
| return infos |
| |
| def get_current_softap_capability(ad, callbackId, need_to_wait): |
| """pop up all of softap info list changed event from queue. |
| Args: |
| callbackId: Id of the callback associated with registering. |
| need_to_wait: Wait for the info callback event before pop all. |
| Returns: |
| Returns last updated capability of softap. |
| """ |
| eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str( |
| callbackId) + wifi_constants.SOFTAP_CAPABILITY_CHANGED |
| ad.log.debug("softap capability dump from eventStr %s", eventStr) |
| if (need_to_wait): |
| event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT) |
| capability = event['data'] |
| |
| events = ad.ed.pop_all(eventStr) |
| for event in events: |
| capability = event['data'] |
| |
| return capability |
| |
| def get_ssrdumps(ad): |
| """Pulls dumps in the ssrdump dir |
| Args: |
| ad: android device object. |
| """ |
| logs = ad.get_file_names("/data/vendor/ssrdump/") |
| if logs: |
| ad.log.info("Pulling ssrdumps %s", logs) |
| log_path = os.path.join(ad.device_log_path, "SSRDUMPS_%s" % ad.serial) |
| os.makedirs(log_path, exist_ok=True) |
| ad.pull_files(logs, log_path) |
| ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete", |
| ignore_status=True) |
| |
| |
| def start_pcap(pcap, wifi_band, test_name): |
| """Start packet capture in monitor mode. |
| |
| Args: |
| pcap: packet capture object |
| wifi_band: '2g' or '5g' or 'dual' |
| test_name: test name to be used for pcap file name |
| |
| Returns: |
| Dictionary with wifi band as key and the tuple |
| (pcap Process object, log directory) as the value |
| """ |
| log_dir = os.path.join( |
| context.get_current_context().get_full_output_path(), 'PacketCapture') |
| os.makedirs(log_dir, exist_ok=True) |
| if wifi_band == 'dual': |
| bands = [BAND_2G, BAND_5G] |
| else: |
| bands = [wifi_band] |
| procs = {} |
| for band in bands: |
| proc = pcap.start_packet_capture(band, log_dir, test_name) |
| procs[band] = (proc, os.path.join(log_dir, test_name)) |
| return procs |
| |
| |
| def stop_pcap(pcap, procs, test_status=None): |
| """Stop packet capture in monitor mode. |
| |
| Since, the pcap logs in monitor mode can be very large, we will |
| delete them if they are not required. 'test_status' if True, will delete |
| the pcap files. If False, we will keep them. |
| |
| Args: |
| pcap: packet capture object |
| procs: dictionary returned by start_pcap |
| test_status: status of the test case |
| """ |
| for proc, fname in procs.values(): |
| pcap.stop_packet_capture(proc) |
| |
| if test_status: |
| shutil.rmtree(os.path.dirname(fname)) |
| |
| |
| def verify_mac_not_found_in_pcap(ad, mac, packets): |
| """Verify that a mac address is not found in the captured packets. |
| |
| Args: |
| ad: android device object |
| mac: string representation of the mac address |
| packets: packets obtained by rdpcap(pcap_fname) |
| """ |
| for pkt in packets: |
| logging.debug("Packet Summary = %s", pkt.summary()) |
| if mac in pkt.summary(): |
| asserts.fail("Device %s caught Factory MAC: %s in packet sniffer." |
| "Packet = %s" % (ad.serial, mac, pkt.show())) |
| |
| |
| def verify_mac_is_found_in_pcap(ad, mac, packets): |
| """Verify that a mac address is found in the captured packets. |
| |
| Args: |
| ad: android device object |
| mac: string representation of the mac address |
| packets: packets obtained by rdpcap(pcap_fname) |
| """ |
| for pkt in packets: |
| if mac in pkt.summary(): |
| return |
| asserts.fail("Did not find MAC = %s in packet sniffer." |
| "for device %s" % (mac, ad.serial)) |
| |
| |
| def start_cnss_diags(ads, cnss_diag_file, pixel_models): |
| for ad in ads: |
| start_cnss_diag(ad, cnss_diag_file, pixel_models) |
| |
| |
| def start_cnss_diag(ad, cnss_diag_file, pixel_models): |
| """Start cnss_diag to record extra wifi logs |
| |
| Args: |
| ad: android device object. |
| cnss_diag_file: cnss diag config file to push to device. |
| pixel_models: pixel devices. |
| """ |
| if ad.model not in pixel_models: |
| ad.log.info("Device not supported to collect pixel logger") |
| return |
| if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP: |
| prop = wifi_constants.LEGACY_CNSS_DIAG_PROP |
| else: |
| prop = wifi_constants.CNSS_DIAG_PROP |
| if ad.adb.getprop(prop) != 'true': |
| if not int( |
| ad.adb.shell("ls -l %s%s | wc -l" % |
| (CNSS_DIAG_CONFIG_PATH, CNSS_DIAG_CONFIG_FILE))): |
| ad.adb.push("%s %s" % (cnss_diag_file, CNSS_DIAG_CONFIG_PATH)) |
| ad.adb.shell( |
| "find /data/vendor/wifi/cnss_diag/wlan_logs/ -type f -delete", |
| ignore_status=True) |
| ad.adb.shell("setprop %s true" % prop, ignore_status=True) |
| |
| |
| def stop_cnss_diags(ads, pixel_models): |
| for ad in ads: |
| stop_cnss_diag(ad, pixel_models) |
| |
| |
| def stop_cnss_diag(ad, pixel_models): |
| """Stops cnss_diag |
| |
| Args: |
| ad: android device object. |
| pixel_models: pixel devices. |
| """ |
| if ad.model not in pixel_models: |
| ad.log.info("Device not supported to collect pixel logger") |
| return |
| if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP: |
| prop = wifi_constants.LEGACY_CNSS_DIAG_PROP |
| else: |
| prop = wifi_constants.CNSS_DIAG_PROP |
| ad.adb.shell("setprop %s false" % prop, ignore_status=True) |
| |
| |
| def get_cnss_diag_log(ad): |
| """Pulls the cnss_diag logs in the wlan_logs dir |
| Args: |
| ad: android device object. |
| """ |
| logs = ad.get_file_names("/data/vendor/wifi/cnss_diag/wlan_logs/") |
| if logs: |
| ad.log.info("Pulling cnss_diag logs %s", logs) |
| log_path = os.path.join(ad.device_log_path, "CNSS_DIAG_%s" % ad.serial) |
| os.makedirs(log_path, exist_ok=True) |
| ad.pull_files(logs, log_path) |
| |
| |
| LinkProbeResult = namedtuple( |
| 'LinkProbeResult', |
| ('is_success', 'stdout', 'elapsed_time', 'failure_reason')) |
| |
| |
| def send_link_probe(ad): |
| """Sends a link probe to the currently connected AP, and returns whether the |
| probe succeeded or not. |
| |
| Args: |
| ad: android device object |
| Returns: |
| LinkProbeResult namedtuple |
| """ |
| stdout = ad.adb.shell('cmd wifi send-link-probe') |
| asserts.assert_false('Error' in stdout or 'Exception' in stdout, |
| 'Exception while sending link probe: ' + stdout) |
| |
| is_success = False |
| elapsed_time = None |
| failure_reason = None |
| if 'succeeded' in stdout: |
| is_success = True |
| elapsed_time = next( |
| (int(token) for token in stdout.split() if token.isdigit()), None) |
| elif 'failed with reason' in stdout: |
| failure_reason = next( |
| (int(token) for token in stdout.split() if token.isdigit()), None) |
| else: |
| asserts.fail('Unexpected link probe result: ' + stdout) |
| |
| return LinkProbeResult(is_success=is_success, |
| stdout=stdout, |
| elapsed_time=elapsed_time, |
| failure_reason=failure_reason) |
| |
| |
| def send_link_probes(ad, num_probes, delay_sec): |
| """Sends a sequence of link probes to the currently connected AP, and |
| returns whether the probes succeeded or not. |
| |
| Args: |
| ad: android device object |
| num_probes: number of probes to perform |
| delay_sec: delay time between probes, in seconds |
| Returns: |
| List[LinkProbeResult] one LinkProbeResults for each probe |
| """ |
| logging.info('Sending link probes') |
| results = [] |
| for _ in range(num_probes): |
| # send_link_probe() will also fail the test if it sees an exception |
| # in the stdout of the adb shell command |
| result = send_link_probe(ad) |
| logging.info('link probe results: ' + str(result)) |
| results.append(result) |
| time.sleep(delay_sec) |
| |
| return results |
| |
| |
| def ap_setup(test, index, ap, network, bandwidth=80, channel=6): |
| """Set up the AP with provided network info. |
| |
| Args: |
| test: the calling test class object. |
| index: int, index of the AP. |
| ap: access_point object of the AP. |
| network: dict with information of the network, including ssid, |
| password and bssid. |
| bandwidth: the operation bandwidth for the AP, default 80MHz. |
| channel: the channel number for the AP. |
| Returns: |
| brconfigs: the bridge interface configs |
| """ |
| bss_settings = [] |
| ssid = network[WifiEnums.SSID_KEY] |
| test.access_points[index].close() |
| time.sleep(5) |
| |
| # Configure AP as required. |
| if "password" in network.keys(): |
| password = network["password"] |
| security = hostapd_security.Security(security_mode="wpa", |
| password=password) |
| else: |
| security = hostapd_security.Security(security_mode=None, password=None) |
| config = hostapd_ap_preset.create_ap_preset(channel=channel, |
| ssid=ssid, |
| security=security, |
| bss_settings=bss_settings, |
| vht_bandwidth=bandwidth, |
| profile_name='whirlwind', |
| iface_wlan_2g=ap.wlan_2g, |
| iface_wlan_5g=ap.wlan_5g) |
| ap.start_ap(config) |
| logging.info("AP started on channel {} with SSID {}".format(channel, ssid)) |
| |
| |
| def turn_ap_off(test, AP): |
| """Bring down hostapd on the Access Point. |
| Args: |
| test: The test class object. |
| AP: int, indicating which AP to turn OFF. |
| """ |
| hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd |
| if hostapd_2g.is_alive(): |
| hostapd_2g.stop() |
| logging.debug('Turned WLAN0 AP%d off' % AP) |
| hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd |
| if hostapd_5g.is_alive(): |
| hostapd_5g.stop() |
| logging.debug('Turned WLAN1 AP%d off' % AP) |
| |
| |
| def turn_ap_on(test, AP): |
| """Bring up hostapd on the Access Point. |
| Args: |
| test: The test class object. |
| AP: int, indicating which AP to turn ON. |
| """ |
| hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd |
| if not hostapd_2g.is_alive(): |
| hostapd_2g.start(hostapd_2g.config) |
| logging.debug('Turned WLAN0 AP%d on' % AP) |
| hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd |
| if not hostapd_5g.is_alive(): |
| hostapd_5g.start(hostapd_5g.config) |
| logging.debug('Turned WLAN1 AP%d on' % AP) |
| |
| |
| def turn_location_off_and_scan_toggle_off(ad): |
| """Turns off wifi location scans.""" |
| utils.set_location_service(ad, False) |
| ad.droid.wifiScannerToggleAlwaysAvailable(False) |
| msg = "Failed to turn off location service's scan." |
| asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg) |
| |
| |
| def set_softap_channel(dut, ap_iface='wlan1', cs_count=10, channel=2462): |
| """ Set SoftAP mode channel |
| |
| Args: |
| dut: android device object |
| ap_iface: interface of SoftAP mode. |
| cs_count: how many beacon frames before switch channel, default = 10 |
| channel: a wifi channel. |
| """ |
| chan_switch_cmd = 'hostapd_cli -i {} chan_switch {} {}' |
| chan_switch_cmd_show = chan_switch_cmd.format(ap_iface, cs_count, channel) |
| dut.log.info('adb shell {}'.format(chan_switch_cmd_show)) |
| chan_switch_result = dut.adb.shell( |
| chan_switch_cmd.format(ap_iface, cs_count, channel)) |
| if chan_switch_result == 'OK': |
| dut.log.info('switch hotspot channel to {}'.format(channel)) |
| return chan_switch_result |
| |
| asserts.fail("Failed to switch hotspot channel") |
| |
| def get_wlan0_link(dut): |
| """ get wlan0 interface status""" |
| get_wlan0 = 'wpa_cli -iwlan0 -g@android:wpa_wlan0 IFNAME=wlan0 status' |
| out = dut.adb.shell(get_wlan0) |
| out = dict(re.findall(r'(\S+)=(".*?"|\S+)', out)) |
| asserts.assert_true("ssid" in out, |
| "Client doesn't connect to any network") |
| return out |
| |
| def verify_11ax_wifi_connection(ad, wifi6_supported_models, wifi6_ap): |
| """Verify 11ax for wifi connection. |
| |
| Args: |
| ad: adndroid device object |
| wifi6_supported_models: device supporting 11ax. |
| wifi6_ap: if the AP supports 11ax. |
| """ |
| if wifi6_ap and ad.model in wifi6_supported_models: |
| logging.info("Verifying 11ax. Model: %s" % ad.model) |
| asserts.assert_true( |
| ad.droid.wifiGetConnectionStandard() == |
| wifi_constants.WIFI_STANDARD_11AX, "DUT did not connect to 11ax.") |
| |
| def verify_11ax_softap(dut, dut_client, wifi6_supported_models): |
| """Verify 11ax SoftAp if devices support it. |
| |
| Check if both DUT and DUT client supports 11ax, then SoftAp turns on |
| with 11ax mode and DUT client can connect to it. |
| |
| Args: |
| dut: Softap device. |
| dut_client: Client connecting to softap. |
| wifi6_supported_models: List of device models supporting 11ax. |
| """ |
| if dut.model in wifi6_supported_models and dut_client.model in wifi6_supported_models: |
| logging.info( |
| "Verifying 11ax softap. DUT model: %s, DUT Client model: %s", |
| dut.model, dut_client.model) |
| asserts.assert_true( |
| dut_client.droid.wifiGetConnectionStandard() == |
| wifi_constants.WIFI_STANDARD_11AX, |
| "DUT failed to start SoftAp in 11ax.") |
| |
| def check_available_channels_in_bands_2_5(dut, country_code): |
| """Check if DUT is capable of enable BridgedAp. |
| #TODO: Find a way to make this function flexible by taking an argument. |
| |
| Args: |
| country_code: country code, e.g., 'US', 'JP'. |
| Returns: |
| True: If DUT is capable of enable BridgedAp. |
| False: If DUT is not capable of enable BridgedAp. |
| """ |
| set_wifi_country_code(dut, country_code) |
| country = dut.droid.wifiGetCountryCode() |
| dut.log.info("DUT current country code : {}".format(country)) |
| # Wi-Fi ON and OFF to make sure country code take effet. |
| wifi_toggle_state(dut, True) |
| wifi_toggle_state(dut, False) |
| |
| # Register SoftAp Callback and get SoftAp capability. |
| callbackId = dut.droid.registerSoftApCallback() |
| capability = get_current_softap_capability(dut, callbackId, True) |
| dut.droid.unregisterSoftApCallback(callbackId) |
| |
| if capability[wifi_constants. |
| SOFTAP_CAPABILITY_24GHZ_SUPPORTED_CHANNEL_LIST] and \ |
| capability[wifi_constants. |
| SOFTAP_CAPABILITY_5GHZ_SUPPORTED_CHANNEL_LIST]: |
| return True |
| return False |
| |
| |
| @retry(tries=5, delay=2) |
| def validate_ping_between_two_clients(dut1, dut2): |
| """Make 2 DUT ping each other. |
| |
| Args: |
| dut1: An AndroidDevice object. |
| dut2: An AndroidDevice object. |
| """ |
| # Get DUTs' IPv4 addresses. |
| dut1_ip = "" |
| dut2_ip = "" |
| try: |
| dut1_ip = dut1.droid.connectivityGetIPv4Addresses('wlan0')[0] |
| except IndexError as e: |
| dut1.log.info( |
| "{} has no Wi-Fi connection, cannot get IPv4 address." |
| .format(dut1.serial)) |
| try: |
| dut2_ip = dut2.droid.connectivityGetIPv4Addresses('wlan0')[0] |
| except IndexError as e: |
| dut2.log.info( |
| "{} has no Wi-Fi connection, cannot get IPv4 address." |
| .format(dut2.serial)) |
| # Test fail if not able to obtain two DUT's IPv4 addresses. |
| asserts.assert_true(dut1_ip and dut2_ip, |
| "Ping failed because no DUT's IPv4 address") |
| |
| dut1.log.info("{} IPv4 addresses : {}".format(dut1.serial, dut1_ip)) |
| dut2.log.info("{} IPv4 addresses : {}".format(dut2.serial, dut2_ip)) |
| |
| # Two clients ping each other |
| dut1.log.info("{} ping {}".format(dut1_ip, dut2_ip)) |
| asserts.assert_true( |
| utils.adb_shell_ping(dut1, count=10, dest_ip=dut2_ip, |
| timeout=20), |
| "%s ping %s failed" % (dut1.serial, dut2_ip)) |
| |
| dut2.log.info("{} ping {}".format(dut2_ip, dut1_ip)) |
| asserts.assert_true( |
| utils.adb_shell_ping(dut2, count=10, dest_ip=dut1_ip, |
| timeout=20), |
| "%s ping %s failed" % (dut2.serial, dut1_ip)) |
| |