blob: b2ae9a4808e5ac67e109edefa102948d24072dc8 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2016 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shutil
import time
from collections import namedtuple
from enum import IntEnum
from queue import Empty
from acts import asserts
from acts import context
from acts import signals
from acts import utils
from acts.controllers import attenuator
from acts.controllers.ap_lib import hostapd_security
from acts.controllers.ap_lib import hostapd_ap_preset
from acts.controllers.ap_lib.hostapd_constants import BAND_2G
from acts.controllers.ap_lib.hostapd_constants import BAND_5G
from acts.test_utils.wifi import wifi_constants
from acts.test_utils.tel import tel_defines
# Default timeout used for reboot, toggle WiFi and Airplane mode,
# for the system to settle down after the operation.
DEFAULT_TIMEOUT = 10
# Number of seconds to wait for events that are supposed to happen quickly.
# Like onSuccess for start background scan and confirmation on wifi state
# change.
SHORT_TIMEOUT = 30
ROAMING_TIMEOUT = 30
WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
WIFI_ABNORMAL_CONNECTION_TIME = 10
# Speed of light in m/s.
SPEED_OF_LIGHT = 299792458
DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"
roaming_attn = {
"AP1_on_AP2_off": [
0,
0,
95,
95
],
"AP1_off_AP2_on": [
95,
95,
0,
0
],
"default": [
0,
0,
0,
0
]
}
class WifiEnums():
SSID_KEY = "SSID"
SSID_PATTERN_KEY = "ssidPattern"
NETID_KEY = "network_id"
BSSID_KEY = "BSSID"
BSSID_PATTERN_KEY = "bssidPattern"
PWD_KEY = "password"
frequency_key = "frequency"
APBAND_KEY = "apBand"
HIDDEN_KEY = "hiddenSSID"
IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
IS_METERED = "isMetered"
PRIORITY = "priority"
SECURITY = "security"
WIFI_CONFIG_APBAND_2G = 0
WIFI_CONFIG_APBAND_5G = 1
WIFI_CONFIG_APBAND_AUTO = -1
WIFI_WPS_INFO_PBC = 0
WIFI_WPS_INFO_DISPLAY = 1
WIFI_WPS_INFO_KEYPAD = 2
WIFI_WPS_INFO_LABEL = 3
WIFI_WPS_INFO_INVALID = 4
class CountryCode():
CHINA = "CN"
JAPAN = "JP"
UK = "GB"
US = "US"
UNKNOWN = "UNKNOWN"
# Start of Macros for EAP
# EAP types
class Eap(IntEnum):
NONE = -1
PEAP = 0
TLS = 1
TTLS = 2
PWD = 3
SIM = 4
AKA = 5
AKA_PRIME = 6
UNAUTH_TLS = 7
# EAP Phase2 types
class EapPhase2(IntEnum):
NONE = 0
PAP = 1
MSCHAP = 2
MSCHAPV2 = 3
GTC = 4
class Enterprise:
# Enterprise Config Macros
EMPTY_VALUE = "NULL"
EAP = "eap"
PHASE2 = "phase2"
IDENTITY = "identity"
ANON_IDENTITY = "anonymous_identity"
PASSWORD = "password"
SUBJECT_MATCH = "subject_match"
ALTSUBJECT_MATCH = "altsubject_match"
DOM_SUFFIX_MATCH = "domain_suffix_match"
CLIENT_CERT = "client_cert"
CA_CERT = "ca_cert"
ENGINE = "engine"
ENGINE_ID = "engine_id"
PRIVATE_KEY_ID = "key_id"
REALM = "realm"
PLMN = "plmn"
FQDN = "FQDN"
FRIENDLY_NAME = "providerFriendlyName"
ROAMING_IDS = "roamingConsortiumIds"
# End of Macros for EAP
# Macros for wifi p2p.
WIFI_P2P_SERVICE_TYPE_ALL = 0
WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
WIFI_P2P_SERVICE_TYPE_UPNP = 2
WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255
class ScanResult:
CHANNEL_WIDTH_20MHZ = 0
CHANNEL_WIDTH_40MHZ = 1
CHANNEL_WIDTH_80MHZ = 2
CHANNEL_WIDTH_160MHZ = 3
CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4
# Macros for wifi rtt.
class RttType(IntEnum):
TYPE_ONE_SIDED = 1
TYPE_TWO_SIDED = 2
class RttPeerType(IntEnum):
PEER_TYPE_AP = 1
PEER_TYPE_STA = 2 # Requires NAN.
PEER_P2P_GO = 3
PEER_P2P_CLIENT = 4
PEER_NAN = 5
class RttPreamble(IntEnum):
PREAMBLE_LEGACY = 0x01
PREAMBLE_HT = 0x02
PREAMBLE_VHT = 0x04
class RttBW(IntEnum):
BW_5_SUPPORT = 0x01
BW_10_SUPPORT = 0x02
BW_20_SUPPORT = 0x04
BW_40_SUPPORT = 0x08
BW_80_SUPPORT = 0x10
BW_160_SUPPORT = 0x20
class Rtt(IntEnum):
STATUS_SUCCESS = 0
STATUS_FAILURE = 1
STATUS_FAIL_NO_RSP = 2
STATUS_FAIL_REJECTED = 3
STATUS_FAIL_NOT_SCHEDULED_YET = 4
STATUS_FAIL_TM_TIMEOUT = 5
STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
STATUS_FAIL_NO_CAPABILITY = 7
STATUS_ABORTED = 8
STATUS_FAIL_INVALID_TS = 9
STATUS_FAIL_PROTOCOL = 10
STATUS_FAIL_SCHEDULE = 11
STATUS_FAIL_BUSY_TRY_LATER = 12
STATUS_INVALID_REQ = 13
STATUS_NO_WIFI = 14
STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
REASON_UNSPECIFIED = -1
REASON_NOT_AVAILABLE = -2
REASON_INVALID_LISTENER = -3
REASON_INVALID_REQUEST = -4
class RttParam:
device_type = "deviceType"
request_type = "requestType"
BSSID = "bssid"
channel_width = "channelWidth"
frequency = "frequency"
center_freq0 = "centerFreq0"
center_freq1 = "centerFreq1"
number_burst = "numberBurst"
interval = "interval"
num_samples_per_burst = "numSamplesPerBurst"
num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
num_retries_per_FTMR = "numRetriesPerFTMR"
lci_request = "LCIRequest"
lcr_request = "LCRRequest"
burst_timeout = "burstTimeout"
preamble = "preamble"
bandwidth = "bandwidth"
margin = "margin"
RTT_MARGIN_OF_ERROR = {
RttBW.BW_80_SUPPORT: 2,
RttBW.BW_40_SUPPORT: 5,
RttBW.BW_20_SUPPORT: 5
}
# Macros as specified in the WifiScanner code.
WIFI_BAND_UNSPECIFIED = 0 # not specified
WIFI_BAND_24_GHZ = 1 # 2.4 GHz band
WIFI_BAND_5_GHZ = 2 # 5 GHz band without DFS channels
WIFI_BAND_5_GHZ_DFS_ONLY = 4 # 5 GHz band with DFS channels
WIFI_BAND_5_GHZ_WITH_DFS = 6 # 5 GHz band with DFS channels
WIFI_BAND_BOTH = 3 # both bands without DFS channels
WIFI_BAND_BOTH_WITH_DFS = 7 # both bands with DFS channels
REPORT_EVENT_AFTER_BUFFER_FULL = 0
REPORT_EVENT_AFTER_EACH_SCAN = 1
REPORT_EVENT_FULL_SCAN_RESULT = 2
SCAN_TYPE_LOW_LATENCY = 0
SCAN_TYPE_LOW_POWER = 1
SCAN_TYPE_HIGH_ACCURACY = 2
# US Wifi frequencies
ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
2457, 2462]
DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580,
5600, 5620, 5640, 5660, 5680, 5700, 5720]
NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
5825]
ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
band_to_frequencies = {
WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
}
# All Wifi frequencies to channels lookup.
freq_to_channel = {
2412: 1,
2417: 2,
2422: 3,
2427: 4,
2432: 5,
2437: 6,
2442: 7,
2447: 8,
2452: 9,
2457: 10,
2462: 11,
2467: 12,
2472: 13,
2484: 14,
4915: 183,
4920: 184,
4925: 185,
4935: 187,
4940: 188,
4945: 189,
4960: 192,
4980: 196,
5035: 7,
5040: 8,
5045: 9,
5055: 11,
5060: 12,
5080: 16,
5170: 34,
5180: 36,
5190: 38,
5200: 40,
5210: 42,
5220: 44,
5230: 46,
5240: 48,
5260: 52,
5280: 56,
5300: 60,
5320: 64,
5500: 100,
5520: 104,
5540: 108,
5560: 112,
5580: 116,
5600: 120,
5620: 124,
5640: 128,
5660: 132,
5680: 136,
5700: 140,
5745: 149,
5765: 153,
5785: 157,
5805: 161,
5825: 165,
}
# All Wifi channels to frequencies lookup.
channel_2G_to_freq = {
1: 2412,
2: 2417,
3: 2422,
4: 2427,
5: 2432,
6: 2437,
7: 2442,
8: 2447,
9: 2452,
10: 2457,
11: 2462,
12: 2467,
13: 2472,
14: 2484
}
channel_5G_to_freq = {
183: 4915,
184: 4920,
185: 4925,
187: 4935,
188: 4940,
189: 4945,
192: 4960,
196: 4980,
7: 5035,
8: 5040,
9: 5045,
11: 5055,
12: 5060,
16: 5080,
34: 5170,
36: 5180,
38: 5190,
40: 5200,
42: 5210,
44: 5220,
46: 5230,
48: 5240,
52: 5260,
56: 5280,
60: 5300,
64: 5320,
100: 5500,
104: 5520,
108: 5540,
112: 5560,
116: 5580,
120: 5600,
124: 5620,
128: 5640,
132: 5660,
136: 5680,
140: 5700,
149: 5745,
153: 5765,
157: 5785,
161: 5805,
165: 5825
}
class WifiChannelBase:
ALL_2G_FREQUENCIES = []
DFS_5G_FREQUENCIES = []
NONE_DFS_5G_FREQUENCIES = []
ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
MIX_CHANNEL_SCAN = []
def band_to_freq(self, band):
_band_to_frequencies = {
WifiEnums.WIFI_BAND_24_GHZ: self.ALL_2G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ: self.NONE_DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY: self.DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS: self.ALL_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_BOTH:
self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
}
return _band_to_frequencies[band]
class WifiChannelUS(WifiChannelBase):
# US Wifi frequencies
ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
2457, 2462]
NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
5825]
MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500,
5320, 5520, 5560, 5700, 5745, 5805]
def __init__(self, model=None):
self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
5540, 5560, 5580, 5600, 5620, 5640,
5660, 5680, 5700, 5720]
self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
class WifiReferenceNetworks:
""" Class to parse and return networks of different band and
auth type from reference_networks
"""
def __init__(self, obj):
self.reference_networks = obj
self.WIFI_2G = "2g"
self.WIFI_5G = "5g"
self.secure_networks_2g = []
self.secure_networks_5g = []
self.open_networks_2g = []
self.open_networks_5g = []
self._parse_networks()
def _parse_networks(self):
for network in self.reference_networks:
for key in network:
if key == self.WIFI_2G:
if "password" in network[key]:
self.secure_networks_2g.append(network[key])
else:
self.open_networks_2g.append(network[key])
else:
if "password" in network[key]:
self.secure_networks_5g.append(network[key])
else:
self.open_networks_5g.append(network[key])
def return_2g_secure_networks(self):
return self.secure_networks_2g
def return_5g_secure_networks(self):
return self.secure_networks_5g
def return_2g_open_networks(self):
return self.open_networks_2g
def return_5g_open_networks(self):
return self.open_networks_5g
def return_secure_networks(self):
return self.secure_networks_2g + self.secure_networks_5g
def return_open_networks(self):
return self.open_networks_2g + self.open_networks_5g
def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
"""Wrapper function that handles the bahevior of assert_on_fail.
When assert_on_fail is True, let all test signals through, which can
terminate test cases directly. When assert_on_fail is False, the wrapper
raises no test signals and reports operation status by returning True or
False.
Args:
func: The function to wrap. This function reports operation status by
raising test signals.
assert_on_fail: A boolean that specifies if the output of the wrapper
is test signal based or return value based.
args: Positional args for func.
kwargs: Name args for func.
Returns:
If assert_on_fail is True, returns True/False to signal operation
status, otherwise return nothing.
"""
try:
func(*args, **kwargs)
if not assert_on_fail:
return True
except signals.TestSignal:
if assert_on_fail:
raise
return False
def assert_network_in_list(target, network_list):
"""Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
networks.
Args:
target: A dict representing a Wi-Fi network.
E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
network_list: A list of dicts, each representing a Wi-Fi network.
"""
match_results = match_networks(target, network_list)
asserts.assert_true(
match_results, "Target network %s, does not exist in network list %s" %
(target, network_list))
def match_networks(target_params, networks):
"""Finds the WiFi networks that match a given set of parameters in a list
of WiFi networks.
To be considered a match, the network should contain every key-value pair
of target_params
Args:
target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
networks: A list of dict objects representing WiFi networks.
Returns:
The networks that match the target parameters.
"""
results = []
asserts.assert_true(target_params,
"Expected networks object 'target_params' is empty")
for n in networks:
add_network = 1
for k, v in target_params.items():
if k not in n:
add_network = 0
break
if n[k] != v:
add_network = 0
break
if add_network:
results.append(n)
return results
def wait_for_wifi_state(ad, state, assert_on_fail=True):
"""Waits for the device to transition to the specified wifi state
Args:
ad: An AndroidDevice object.
state: Wifi state to wait for.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns True if the device transitions
to the specified state, False otherwise. If assert_on_fail is True, no return value.
"""
return _assert_on_fail_handler(
_wait_for_wifi_state, assert_on_fail, ad, state=state)
def _wait_for_wifi_state(ad, state):
"""Toggles the state of wifi.
TestFailure signals are raised when something goes wrong.
Args:
ad: An AndroidDevice object.
state: Wifi state to wait for.
"""
if state == ad.droid.wifiCheckState():
# Check if the state is already achieved, so we don't wait for the
# state change event by mistake.
return
ad.droid.wifiStartTrackingStateChange()
fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (state,
ad.serial)
try:
ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
lambda x: x["data"]["enabled"] == state,
SHORT_TIMEOUT)
except Empty:
asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
"""Toggles the state of wifi.
Args:
ad: An AndroidDevice object.
new_state: Wifi state to set to. If None, opposite of the current state.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns True if the toggle was
successful, False otherwise. If assert_on_fail is True, no return value.
"""
return _assert_on_fail_handler(
_wifi_toggle_state, assert_on_fail, ad, new_state=new_state)
def _wifi_toggle_state(ad, new_state=None):
"""Toggles the state of wifi.
TestFailure signals are raised when something goes wrong.
Args:
ad: An AndroidDevice object.
new_state: The state to set Wi-Fi to. If None, opposite of the current
state will be set.
"""
if new_state is None:
new_state = not ad.droid.wifiCheckState()
elif new_state == ad.droid.wifiCheckState():
# Check if the new_state is already achieved, so we don't wait for the
# state change event by mistake.
return
ad.droid.wifiStartTrackingStateChange()
ad.log.info("Setting Wi-Fi state to %s.", new_state)
ad.ed.clear_all_events()
# Setting wifi state.
ad.droid.wifiToggleState(new_state)
fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
ad.serial)
try:
ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
lambda x: x["data"]["enabled"] == new_state,
SHORT_TIMEOUT)
except Empty:
asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
finally:
ad.droid.wifiStopTrackingStateChange()
def reset_wifi(ad):
"""Clears all saved Wi-Fi networks on a device.
This will turn Wi-Fi on.
Args:
ad: An AndroidDevice object.
"""
networks = ad.droid.wifiGetConfiguredNetworks()
if not networks:
return
for n in networks:
ad.droid.wifiForgetNetwork(n['networkId'])
try:
event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
SHORT_TIMEOUT)
except Empty:
logging.warning("Could not confirm the removal of network %s.", n)
# Check again to see if there's any network left.
asserts.assert_true(
not ad.droid.wifiGetConfiguredNetworks(),
"Failed to remove these configured Wi-Fi networks: %s" % networks)
def toggle_airplane_mode_on_and_off(ad):
"""Turn ON and OFF Airplane mode.
ad: An AndroidDevice object.
Returns: Assert if turning on/off Airplane mode fails.
"""
ad.log.debug("Toggling Airplane mode ON.")
asserts.assert_true(
utils.force_airplane_mode(ad, True),
"Can not turn on airplane mode on: %s" % ad.serial)
time.sleep(DEFAULT_TIMEOUT)
ad.log.debug("Toggling Airplane mode OFF.")
asserts.assert_true(
utils.force_airplane_mode(ad, False),
"Can not turn on airplane mode on: %s" % ad.serial)
time.sleep(DEFAULT_TIMEOUT)
def toggle_wifi_off_and_on(ad):
"""Turn OFF and ON WiFi.
ad: An AndroidDevice object.
Returns: Assert if turning off/on WiFi fails.
"""
ad.log.debug("Toggling wifi OFF.")
wifi_toggle_state(ad, False)
time.sleep(DEFAULT_TIMEOUT)
ad.log.debug("Toggling wifi ON.")
wifi_toggle_state(ad, True)
time.sleep(DEFAULT_TIMEOUT)
def wifi_forget_network(ad, net_ssid):
"""Remove configured Wifi network on an android device.
Args:
ad: android_device object for forget network.
net_ssid: ssid of network to be forget
"""
networks = ad.droid.wifiGetConfiguredNetworks()
if not networks:
return
for n in networks:
if net_ssid in n[WifiEnums.SSID_KEY]:
ad.droid.wifiForgetNetwork(n['networkId'])
try:
event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
SHORT_TIMEOUT)
except Empty:
asserts.fail("Failed to remove network %s." % n)
def wifi_test_device_init(ad):
"""Initializes an android device for wifi testing.
0. Make sure SL4A connection is established on the android device.
1. Disable location service's WiFi scan.
2. Turn WiFi on.
3. Clear all saved networks.
4. Set country code to US.
5. Enable WiFi verbose logging.
6. Sync device time with computer time.
7. Turn off cellular data.
8. Turn off ambient display.
"""
utils.require_sl4a((ad, ))
ad.droid.wifiScannerToggleAlwaysAvailable(False)
msg = "Failed to turn off location service's scan."
asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
wifi_toggle_state(ad, True)
reset_wifi(ad)
ad.droid.wifiEnableVerboseLogging(1)
msg = "Failed to enable WiFi verbose logging."
asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
# We don't verify the following settings since they are not critical.
# Set wpa_supplicant log level to EXCESSIVE.
output = ad.adb.shell("wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
"wlan0 log_level EXCESSIVE")
ad.log.info("wpa_supplicant log change status: %s", output)
utils.sync_device_time(ad)
ad.droid.telephonyToggleDataConnection(False)
# TODO(angli): need to verify the country code was actually set. No generic
# way to check right now.
ad.adb.shell("halutil -country %s" % WifiEnums.CountryCode.US)
utils.set_ambient_display(ad, False)
def start_wifi_connection_scan(ad):
"""Starts a wifi connection scan and wait for results to become available.
Args:
ad: An AndroidDevice object.
"""
ad.ed.clear_all_events()
ad.droid.wifiStartScan()
try:
ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
except Empty:
asserts.fail("Wi-Fi results did not become available within 60s.")
def start_wifi_connection_scan_and_return_status(ad):
"""
Starts a wifi connection scan and wait for results to become available
or a scan failure to be reported.
Args:
ad: An AndroidDevice object.
Returns:
True: if scan succeeded & results are available
False: if scan failed
"""
ad.ed.clear_all_events()
ad.droid.wifiStartScan()
try:
events = ad.ed.pop_events(
"WifiManagerScan(ResultsAvailable|Failure)", 60)
except Empty:
asserts.fail(
"Wi-Fi scan results/failure did not become available within 60s.")
# If there are multiple matches, we check for atleast one success.
for event in events:
if event["name"] == "WifiManagerScanResultsAvailable":
return True
elif event["name"] == "WifiManagerScanFailure":
ad.log.debug("Scan failure received")
return False
def start_wifi_connection_scan_and_check_for_network(ad, network_ssid,
max_tries=3):
"""
Start connectivity scans & checks if the |network_ssid| is seen in
scan results. The method performs a max of |max_tries| connectivity scans
to find the network.
Args:
ad: An AndroidDevice object.
network_ssid: SSID of the network we are looking for.
max_tries: Number of scans to try.
Returns:
True: if network_ssid is found in scan results.
False: if network_ssid is not found in scan results.
"""
for num_tries in range(max_tries):
if start_wifi_connection_scan_and_return_status(ad):
scan_results = ad.droid.wifiGetScanResults()
match_results = match_networks(
{WifiEnums.SSID_KEY: network_ssid}, scan_results)
if len(match_results) > 0:
return True
return False
def start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid,
max_tries=3):
"""
Start connectivity scans & ensure the |network_ssid| is seen in
scan results. The method performs a max of |max_tries| connectivity scans
to find the network.
This method asserts on failure!
Args:
ad: An AndroidDevice object.
network_ssid: SSID of the network we are looking for.
max_tries: Number of scans to try.
"""
ad.log.info("Starting scans to ensure %s is present", network_ssid)
assert_msg = "Failed to find " + network_ssid + " in scan results" \
" after " + str(max_tries) + " tries"
asserts.assert_true(start_wifi_connection_scan_and_check_for_network(
ad, network_ssid, max_tries), assert_msg)
def start_wifi_connection_scan_and_ensure_network_not_found(ad, network_ssid,
max_tries=3):
"""
Start connectivity scans & ensure the |network_ssid| is not seen in
scan results. The method performs a max of |max_tries| connectivity scans
to find the network.
This method asserts on failure!
Args:
ad: An AndroidDevice object.
network_ssid: SSID of the network we are looking for.
max_tries: Number of scans to try.
"""
ad.log.info("Starting scans to ensure %s is not present", network_ssid)
assert_msg = "Found " + network_ssid + " in scan results" \
" after " + str(max_tries) + " tries"
asserts.assert_false(start_wifi_connection_scan_and_check_for_network(
ad, network_ssid, max_tries), assert_msg)
def start_wifi_background_scan(ad, scan_setting):
"""Starts wifi background scan.
Args:
ad: android_device object to initiate connection on.
scan_setting: A dict representing the settings of the scan.
Returns:
If scan was started successfully, event data of success event is returned.
"""
idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
SHORT_TIMEOUT)
return event['data']
def start_wifi_tethering(ad, ssid, password, band=None, hidden=None):
"""Starts wifi tethering on an android_device.
Args:
ad: android_device to start wifi tethering on.
ssid: The SSID the soft AP should broadcast.
password: The password the soft AP should use.
band: The band the soft AP should be set on. It should be either
WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
hidden: boolean to indicate if the AP needs to be hidden or not.
Returns:
No return value. Error checks in this function will raise test failure signals
"""
config = {WifiEnums.SSID_KEY: ssid}
if password:
config[WifiEnums.PWD_KEY] = password
if band:
config[WifiEnums.APBAND_KEY] = band
if hidden:
config[WifiEnums.HIDDEN_KEY] = hidden
asserts.assert_true(
ad.droid.wifiSetWifiApConfiguration(config),
"Failed to update WifiAp Configuration")
ad.droid.wifiStartTrackingTetherStateChange()
ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
try:
ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
ad.ed.wait_for_event("TetherStateChanged",
lambda x: x["data"]["ACTIVE_TETHER"], 30)
ad.log.debug("Tethering started successfully.")
except Empty:
msg = "Failed to receive confirmation of wifi tethering starting"
asserts.fail(msg)
finally:
ad.droid.wifiStopTrackingTetherStateChange()
def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None):
""" Save a soft ap configuration """
if band:
wifi_config[WifiEnums.APBAND_KEY] = band
if hidden:
wifi_config[WifiEnums.HIDDEN_KEY] = hidden
asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
"Failed to set WifiAp Configuration")
wifi_ap = ad.droid.wifiGetApConfiguration()
asserts.assert_true(
wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
"Hotspot SSID doesn't match with expected SSID")
def start_wifi_tethering_saved_config(ad):
""" Turn on wifi hotspot with a config that is already saved """
ad.droid.wifiStartTrackingTetherStateChange()
ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
try:
ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
ad.ed.wait_for_event("TetherStateChanged",
lambda x: x["data"]["ACTIVE_TETHER"], 30)
except:
asserts.fail("Didn't receive wifi tethering starting confirmation")
finally:
ad.droid.wifiStopTrackingTetherStateChange()
def stop_wifi_tethering(ad):
"""Stops wifi tethering on an android_device.
Args:
ad: android_device to stop wifi tethering on.
"""
ad.droid.wifiStartTrackingTetherStateChange()
ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
try:
ad.ed.pop_event("WifiManagerApDisabled", 30)
ad.ed.wait_for_event("TetherStateChanged",
lambda x: not x["data"]["ACTIVE_TETHER"], 30)
except Empty:
msg = "Failed to receive confirmation of wifi tethering stopping"
asserts.fail(msg)
finally:
ad.droid.wifiStopTrackingTetherStateChange()
def toggle_wifi_and_wait_for_reconnection(ad,
network,
num_of_tries=1,
assert_on_fail=True):
"""Toggle wifi state and then wait for Android device to reconnect to
the provided wifi network.
This expects the device to be already connected to the provided network.
Logic steps are
1. Ensure that we're connected to the network.
2. Turn wifi off.
3. Wait for 10 seconds.
4. Turn wifi on.
5. Wait for the "connected" event, then confirm the connected ssid is the
one requested.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to await connection. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns True if the toggle was
successful, False otherwise. If assert_on_fail is True, no return value.
"""
return _assert_on_fail_handler(
_toggle_wifi_and_wait_for_reconnection,
assert_on_fail,
ad,
network,
num_of_tries=num_of_tries)
def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=1):
"""Toggle wifi state and then wait for Android device to reconnect to
the provided wifi network.
This expects the device to be already connected to the provided network.
Logic steps are
1. Ensure that we're connected to the network.
2. Turn wifi off.
3. Wait for 10 seconds.
4. Turn wifi on.
5. Wait for the "connected" event, then confirm the connected ssid is the
one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to await connection. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
expected_ssid = network[WifiEnums.SSID_KEY]
# First ensure that we're already connected to the provided network.
verify_con = {WifiEnums.SSID_KEY: expected_ssid}
verify_wifi_connection_info(ad, verify_con)
# Now toggle wifi state and wait for the connection event.
wifi_toggle_state(ad, False)
time.sleep(10)
wifi_toggle_state(ad, True)
ad.droid.wifiStartTrackingStateChange()
try:
connect_result = None
for i in range(num_of_tries):
try:
connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
30)
break
except Empty:
pass
asserts.assert_true(connect_result,
"Failed to connect to Wi-Fi network %s on %s" %
(network, ad.serial))
logging.debug("Connection result on %s: %s.", ad.serial,
connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
asserts.assert_equal(actual_ssid, expected_ssid,
"Connected to the wrong network on %s."
"Expected %s, but got %s." %
(ad.serial, expected_ssid, actual_ssid))
logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
ad.serial)
finally:
ad.droid.wifiStopTrackingStateChange()
def wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2,
assert_on_fail=True):
"""Wait for a connect event.
This will directly fail a test if anything goes wrong.
Args:
ad: An Android device object.
expected_ssid: SSID of the network to connect to.
expected_id: Network Id of the network to connect to.
tries: An integer that is the number of times to try before failing.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
return _assert_on_fail_handler(
_wait_for_connect, assert_on_fail, ad, expected_ssid, expected_id,
tries)
def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
"""Wait for a connect event.
Args:
ad: An Android device object.
expected_ssid: SSID of the network to connect to.
expected_id: Network Id of the network to connect to.
tries: An integer that is the number of times to try before failing.
"""
ad.droid.wifiStartTrackingStateChange()
try:
connect_result = _wait_for_connect_event(
ad, ssid=expected_ssid, id=expected_id, tries=tries)
asserts.assert_true(connect_result,
"Failed to connect to Wi-Fi network %s" %
expected_ssid)
ad.log.debug("Wi-Fi connection result: %s.", connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
if expected_ssid:
asserts.assert_equal(actual_ssid, expected_ssid,
"Connected to the wrong network")
actual_id = connect_result['data'][WifiEnums.NETID_KEY]
if expected_id:
asserts.assert_equal(actual_id, expected_id,
"Connected to the wrong network")
ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
except Empty:
asserts.fail("Failed to start connection process to %s" %
expected_ssid)
except Exception as error:
ad.log.error("Failed to connect to %s with error %s", expected_ssid,
error)
raise signals.TestFailure("Failed to connect to %s network" %
expected_ssid)
finally:
ad.droid.wifiStopTrackingStateChange()
def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
"""Wait for a connect event on queue and pop when available.
Args:
ad: An Android device object.
ssid: SSID of the network to connect to.
id: Network Id of the network to connect to.
tries: An integer that is the number of times to try before failing.
Returns:
A dict with details of the connection data, which looks like this:
{
'time': 1485460337798,
'name': 'WifiNetworkConnected',
'data': {
'rssi': -27,
'is_24ghz': True,
'mac_address': '02:00:00:00:00:00',
'network_id': 1,
'BSSID': '30:b5:c2:33:d3:fc',
'ip_address': 117483712,
'link_speed': 54,
'supplicant_state': 'completed',
'hidden_ssid': False,
'SSID': 'wh_ap1_2g',
'is_5ghz': False}
}
"""
conn_result = None
# If ssid and network id is None, just wait for any connect event.
if id is None and ssid is None:
for i in range(tries):
try:
start = time.time()
conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
_assert_connection_time(start)
break
except Empty:
pass
else:
# If ssid or network id is specified, wait for specific connect event.
for i in range(tries):
try:
start = time.time()
conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
_assert_connection_time(start)
break
elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
_assert_connection_time(start)
break
except Empty:
pass
return conn_result
def _assert_connection_time(start):
duration = time.time() - start
asserts.assert_true(
duration < WIFI_ABNORMAL_CONNECTION_TIME,
"Took " + str(duration) + "s to connect to network, " +
" expected " + str(WIFI_ABNORMAL_CONNECTION_TIME))
def wait_for_disconnect(ad, timeout=10):
"""Wait for a disconnect event within the specified timeout.
Args:
ad: Android device object.
timeout: Timeout in seconds.
"""
try:
ad.droid.wifiStartTrackingStateChange()
event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
except Empty:
raise signals.TestFailure("Device did not disconnect from the network")
finally:
ad.droid.wifiStopTrackingStateChange()
def ensure_no_disconnect(ad, duration=10):
"""Ensure that there is no disconnect for the specified duration.
Args:
ad: Android device object.
duration: Duration in seconds.
"""
try:
ad.droid.wifiStartTrackingStateChange()
event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
raise signals.TestFailure("Device disconnected from the network")
except Empty:
pass
finally:
ad.droid.wifiStopTrackingStateChange()
def connect_to_wifi_network(ad, network, assert_on_fail=True,
check_connectivity=True):
"""Connection logic for open and psk wifi networks.
Args:
ad: AndroidDevice to use for connection
network: network info of the network to connect to
assert_on_fail: If true, errors from wifi_connect will raise
test failure signals.
"""
start_wifi_connection_scan_and_ensure_network_found(
ad, network[WifiEnums.SSID_KEY])
wifi_connect(ad,
network,
num_of_tries=3,
assert_on_fail=assert_on_fail,
check_connectivity=check_connectivity)
def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
"""Connect to the given network using network id and verify SSID.
Args:
network_id: int Network Id of the network.
network_ssid: string SSID of the network.
Returns: True if connect using network id was successful;
False otherwise.
"""
start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
wifi_connect_by_id(ad, network_id)
connect_data = ad.droid.wifiGetConnectionInfo()
connect_ssid = connect_data[WifiEnums.SSID_KEY]
ad.log.debug("Expected SSID = %s Connected SSID = %s" %
(network_ssid, connect_ssid))
if connect_ssid != network_ssid:
return False
return True
def wifi_connect(ad, network, num_of_tries=1, assert_on_fail=True,
check_connectivity=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
return _assert_on_fail_handler(
_wifi_connect, assert_on_fail, ad, network, num_of_tries=num_of_tries,
check_connectivity=check_connectivity)
def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
asserts.assert_true(WifiEnums.SSID_KEY in network,
"Key '%s' must be present in network definition." %
WifiEnums.SSID_KEY)
ad.droid.wifiStartTrackingStateChange()
expected_ssid = network[WifiEnums.SSID_KEY]
ad.droid.wifiConnectByConfig(network)
ad.log.info("Starting connection process to %s", expected_ssid)
try:
event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
connect_result = _wait_for_connect_event(
ad, ssid=expected_ssid, tries=num_of_tries)
asserts.assert_true(connect_result,
"Failed to connect to Wi-Fi network %s on %s" %
(network, ad.serial))
ad.log.debug("Wi-Fi connection result: %s.", connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
asserts.assert_equal(actual_ssid, expected_ssid,
"Connected to the wrong network on %s." %
ad.serial)
ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
# Wait for data connection to stabilize.
time.sleep(5)
if check_connectivity:
internet = validate_connection(ad, DEFAULT_PING_ADDR)
if not internet:
raise signals.TestFailure("Failed to connect to internet on %s" %
expected_ssid)
except Empty:
asserts.fail("Failed to start connection process to %s on %s" %
(network, ad.serial))
except Exception as error:
ad.log.error("Failed to connect to %s with error %s", expected_ssid,
error)
raise signals.TestFailure("Failed to connect to %s network" % network)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
"""Connect an Android device to a wifi network using network Id.
Start connection to the wifi network, with the given network Id, wait for
the "connected" event, then verify the connected network is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network_id: Integer specifying the network id of the network.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
_assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
network_id, num_of_tries)
def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
"""Connect an Android device to a wifi network using it's network id.
Start connection to the wifi network, with the given network id, wait for
the "connected" event, then verify the connected network is the one requested.
Args:
ad: android_device object to initiate connection on.
network_id: Integer specifying the network id of the network.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
ad.droid.wifiStartTrackingStateChange()
# Clear all previous events.
ad.ed.clear_all_events()
ad.droid.wifiConnectByNetworkId(network_id)
ad.log.info("Starting connection to network with id %d", network_id)
try:
event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
connect_result = _wait_for_connect_event(
ad, id=network_id, tries=num_of_tries)
asserts.assert_true(connect_result,
"Failed to connect to Wi-Fi network using network id")
ad.log.debug("Wi-Fi connection result: %s", connect_result)
actual_id = connect_result['data'][WifiEnums.NETID_KEY]
asserts.assert_equal(actual_id, network_id,
"Connected to the wrong network on %s."
"Expected network id = %d, but got %d." %
(ad.serial, network_id, actual_id))
expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
ad.log.info("Connected to Wi-Fi network %s with %d network id.",
expected_ssid, network_id)
# Wait for data connection to stabilize.
time.sleep(5)
internet = validate_connection(ad, DEFAULT_PING_ADDR)
if not internet:
raise signals.TestFailure("Failed to connect to internet on %s" %
expected_ssid)
except Empty:
asserts.fail("Failed to connect to network with id %d on %s" %
(network_id, ad.serial))
except Exception as error:
ad.log.error("Failed to connect to network with id %d with error %s",
network_id, error)
raise signals.TestFailure("Failed to connect to network with network"
" id %d" % network_id)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_connect_using_network_request(ad, network, network_specifier,
num_of_tries=3, assert_on_fail=True):
"""Connect an Android device to a wifi network using network request.
Trigger a network request with the provided network specifier,
wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network_specifier: A dictionary representing the network specifier to
use.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
_assert_on_fail_handler(_wifi_connect_using_network_request, assert_on_fail,
ad, network, network_specifier, num_of_tries)
def _wifi_connect_using_network_request(ad, network, network_specifier,
num_of_tries=3):
"""Connect an Android device to a wifi network using network request.
Trigger a network request with the provided network specifier,
wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
Args:
ad: android_device object to initiate connection on.
network_specifier: A dictionary representing the network specifier to
use.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure.
"""
ad.droid.wifiRequestNetworkWithSpecifier(network_specifier)
ad.log.info("Sent network request with %s", network_specifier)
# Need a delay here because UI interaction should only start once wifi
# starts processing the request.
time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
_wait_for_wifi_connect_after_network_request(ad, network, num_of_tries)
def wait_for_wifi_connect_after_network_request(ad, network, num_of_tries=3,
assert_on_fail=True):
"""
Simulate and verify the connection flow after initiating the network
request.
Wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
_assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
assert_on_fail, ad, network, num_of_tries)
def _wait_for_wifi_connect_after_network_request(ad, network, num_of_tries=3):
"""
Simulate and verify the connection flow after initiating the network
request.
Wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure.
"""
asserts.assert_true(WifiEnums.SSID_KEY in network,
"Key '%s' must be present in network definition." %
WifiEnums.SSID_KEY)
ad.droid.wifiStartTrackingStateChange()
expected_ssid = network[WifiEnums.SSID_KEY]
ad.droid.wifiRegisterNetworkRequestMatchCallback()
# Wait for the platform to scan and return a list of networks
# matching the request
try:
matched_network = None
for _ in [0, num_of_tries]:
on_match_event = ad.ed.pop_event(
wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
asserts.assert_true(on_match_event,
"Network request on match not received.")
matched_scan_results = on_match_event["data"]
ad.log.debug("Network request on match results %s",
matched_scan_results)
matched_network = match_networks(
{WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
matched_scan_results)
if matched_network:
break;
asserts.assert_true(
matched_network, "Target network %s not found" % network)
ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
ad.log.info("Sent user selection for network request %s",
expected_ssid)
# Wait for the platform to connect to the network.
on_available_event = ad.ed.pop_event(
wifi_constants.WIFI_NETWORK_CB_ON_AVAILABLE, 60)
asserts.assert_true(on_available_event,
"Network request on available not received.")
connected_network = on_available_event["data"]
ad.log.info("Connected to network %s", connected_network)
asserts.assert_equal(connected_network[WifiEnums.SSID_KEY],
expected_ssid,
"Connected to the wrong network."
"Expected %s, but got %s." %
(network, connected_network))
except Empty:
asserts.fail("Failed to connect to %s" % expected_ssid)
except Exception as error:
ad.log.error("Failed to connect to %s with error %s",
(expected_ssid, error))
raise signals.TestFailure("Failed to connect to %s network" % network)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1,
assert_on_fail=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
passpoint_network: SSID of the Passpoint network to connect to.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns network id, if the connect was
successful, False otherwise. If assert_on_fail is True, no return value.
"""
_assert_on_fail_handler(_wifi_passpoint_connect, assert_on_fail, ad,
passpoint_network, num_of_tries = num_of_tries)
def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
passpoint_network: SSID of the Passpoint network to connect to.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
ad.droid.wifiStartTrackingStateChange()
expected_ssid = passpoint_network
ad.log.info("Starting connection process to passpoint %s", expected_ssid)
try:
connect_result = _wait_for_connect_event(
ad, expected_ssid, num_of_tries)
asserts.assert_true(connect_result,
"Failed to connect to WiFi passpoint network %s on"
" %s" % (expected_ssid, ad.serial))
ad.log.info("Wi-Fi connection result: %s.", connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
asserts.assert_equal(actual_ssid, expected_ssid,
"Connected to the wrong network on %s." % ad.serial)
ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)
# Wait for data connection to stabilize.
time.sleep(5)
internet = validate_connection(ad, DEFAULT_PING_ADDR)
if not internet:
raise signals.TestFailure("Failed to connect to internet on %s" %
expected_ssid)
except Exception as error:
ad.log.error("Failed to connect to passpoint network %s with error %s",
expected_ssid, error)
raise signals.TestFailure("Failed to connect to %s passpoint network" %
expected_ssid)
finally:
ad.droid.wifiStopTrackingStateChange()
def delete_passpoint(ad, fqdn):
"""Delete a required Passpoint configuration."""
try:
ad.droid.removePasspointConfig(fqdn)
return True
except Exception as error:
ad.log.error("Failed to remove passpoint configuration with FQDN=%s "
"and error=%s" , fqdn, error)
return False
def start_wifi_single_scan(ad, scan_setting):
"""Starts wifi single shot scan.
Args:
ad: android_device object to initiate connection on.
scan_setting: A dict representing the settings of the scan.
Returns:
If scan was started successfully, event data of success event is returned.
"""
idx = ad.droid.wifiScannerStartScan(scan_setting)
event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
ad.log.debug("Got event %s", event)
return event['data']
def track_connection(ad, network_ssid, check_connection_count):
"""Track wifi connection to network changes for given number of counts
Args:
ad: android_device object for forget network.
network_ssid: network ssid to which connection would be tracked
check_connection_count: Integer for maximum number network connection
check.
Returns:
True if connection to given network happen, else return False.
"""
ad.droid.wifiStartTrackingStateChange()
while check_connection_count > 0:
connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
ad.log.info("Connected to network %s", connect_network)
if (WifiEnums.SSID_KEY in connect_network['data'] and
connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
return True
check_connection_count -= 1
ad.droid.wifiStopTrackingStateChange()
return False
def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
"""Calculate the scan time required based on the band or channels in scan
setting
Args:
wifi_chs: Object of channels supported
scan_setting: scan setting used for start scan
stime_channel: scan time per channel
Returns:
scan_time: time required for completing a scan
scan_channels: channel used for scanning
"""
scan_time = 0
scan_channels = []
if "band" in scan_setting and "channels" not in scan_setting:
scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
elif "channels" in scan_setting and "band" not in scan_setting:
scan_channels = scan_setting["channels"]
scan_time = len(scan_channels) * stime_channel
for channel in scan_channels:
if channel in WifiEnums.DFS_5G_FREQUENCIES:
scan_time += 132 #passive scan time on DFS
return scan_time, scan_channels
def start_wifi_track_bssid(ad, track_setting):
"""Start tracking Bssid for the given settings.
Args:
ad: android_device object.
track_setting: Setting for which the bssid tracking should be started
Returns:
If tracking started successfully, event data of success event is returned.
"""
idx = ad.droid.wifiScannerStartTrackingBssids(
track_setting["bssidInfos"], track_setting["apLostThreshold"])
event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
SHORT_TIMEOUT)
return event['data']
def convert_pem_key_to_pkcs8(in_file, out_file):
"""Converts the key file generated by us to the format required by
Android using openssl.
The input file must have the extension "pem". The output file must
have the extension "der".
Args:
in_file: The original key file.
out_file: The full path to the converted key file, including
filename.
"""
asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
asserts.assert_true(
out_file.endswith(".der"), "Output file has to be .der.")
cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
" -topk8").format(in_file, out_file)
utils.exe_cmd(cmd)
def validate_connection(ad, ping_addr=DEFAULT_PING_ADDR):
"""Validate internet connection by pinging the address provided.
Args:
ad: android_device object.
ping_addr: address on internet for pinging.
Returns:
ping output if successful, NULL otherwise.
"""
ping = ad.droid.httpPing(ping_addr)
ad.log.info("Http ping result: %s.", ping)
return ping
#TODO(angli): This can only verify if an actual value is exactly the same.
# Would be nice to be able to verify an actual value is one of serveral.
def verify_wifi_connection_info(ad, expected_con):
"""Verifies that the information of the currently connected wifi network is
as expected.
Args:
expected_con: A dict representing expected key-value pairs for wifi
connection. e.g. {"SSID": "test_wifi"}
"""
current_con = ad.droid.wifiGetConnectionInfo()
case_insensitive = ["BSSID", "supplicant_state"]
ad.log.debug("Current connection: %s", current_con)
for k, expected_v in expected_con.items():
# Do not verify authentication related fields.
if k == "password":
continue
msg = "Field %s does not exist in wifi connection info %s." % (
k, current_con)
if k not in current_con:
raise signals.TestFailure(msg)
actual_v = current_con[k]
if k in case_insensitive:
actual_v = actual_v.lower()
expected_v = expected_v.lower()
msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
actual_v)
if actual_v != expected_v:
raise signals.TestFailure(msg)
def check_autoconnect_to_open_network(ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
"""Connects to any open WiFI AP
Args:
timeout value in sec to wait for UE to connect to a WiFi AP
Returns:
True if UE connects to WiFi AP (supplicant_state = completed)
False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
"""
if ad.droid.wifiCheckState():
return True
ad.droid.wifiToggleState()
wifi_connection_state = None
timeout = time.time() + conn_timeout
while wifi_connection_state != "completed":
wifi_connection_state = ad.droid.wifiGetConnectionInfo()[
'supplicant_state']
if time.time() > timeout:
ad.log.warning("Failed to connect to WiFi AP")
return False
return True
def expand_enterprise_config_by_phase2(config):
"""Take an enterprise config and generate a list of configs, each with
a different phase2 auth type.
Args:
config: A dict representing enterprise config.
Returns
A list of enterprise configs.
"""
results = []
phase2_types = WifiEnums.EapPhase2
if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
# Skip unsupported phase2 types for PEAP.
phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
for phase2_type in phase2_types:
# Skip a special case for passpoint TTLS.
if (WifiEnums.Enterprise.FQDN in config and
phase2_type == WifiEnums.EapPhase2.GTC):
continue
c = dict(config)
c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
results.append(c)
return results
def generate_eap_test_name(config, ad=None):
""" Generates a test case name based on an EAP configuration.
Args:
config: A dict representing an EAP credential.
ad object: Redundant but required as the same param is passed
to test_func in run_generated_tests
Returns:
A string representing the name of a generated EAP test case.
"""
eap = WifiEnums.Eap
eap_phase2 = WifiEnums.EapPhase2
Ent = WifiEnums.Enterprise
name = "test_connect-"
eap_name = ""
for e in eap:
if e.value == config[Ent.EAP]:
eap_name = e.name
break
if "peap0" in config[WifiEnums.SSID_KEY].lower():
eap_name = "PEAP0"
if "peap1" in config[WifiEnums.SSID_KEY].lower():
eap_name = "PEAP1"
name += eap_name
if Ent.PHASE2 in config:
for e in eap_phase2:
if e.value == config[Ent.PHASE2]:
name += "-{}".format(e.name)
break
return name
def group_attenuators(attenuators):
"""Groups a list of attenuators into attenuator groups for backward
compatibility reasons.
Most legacy Wi-Fi setups have two attenuators each connected to a separate
AP. The new Wi-Fi setup has four attenuators, each connected to one channel
on an AP, so two of them are connected to one AP.
To make the existing scripts work in the new setup, when the script needs
to attenuate one AP, it needs to set attenuation on both attenuators
connected to the same AP.
This function groups attenuators properly so the scripts work in both
legacy and new Wi-Fi setups.
Args:
attenuators: A list of attenuator objects, either two or four in length.
Raises:
signals.TestFailure is raised if the attenuator list does not have two
or four objects.
"""
attn0 = attenuator.AttenuatorGroup("AP0")
attn1 = attenuator.AttenuatorGroup("AP1")
# Legacy testbed setup has two attenuation channels.
num_of_attns = len(attenuators)
if num_of_attns == 2:
attn0.add(attenuators[0])
attn1.add(attenuators[1])
elif num_of_attns == 4:
attn0.add(attenuators[0])
attn0.add(attenuators[1])
attn1.add(attenuators[2])
attn1.add(attenuators[3])
else:
asserts.fail(("Either two or four attenuators are required for this "
"test, but found %s") % num_of_attns)
return [attn0, attn1]
def set_attns(attenuator, attn_val_name):
"""Sets attenuation values on attenuators used in this test.
Args:
attenuator: The attenuator object.
attn_val_name: Name of the attenuation value pair to use.
"""
logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
try:
attenuator[0].set_atten(roaming_attn[attn_val_name][0])
attenuator[1].set_atten(roaming_attn[attn_val_name][1])
attenuator[2].set_atten(roaming_attn[attn_val_name][2])
attenuator[3].set_atten(roaming_attn[attn_val_name][3])
except:
logging.exception("Failed to set attenuation values %s.",
attn_val_name)
raise
def trigger_roaming_and_validate(dut, attenuator, attn_val_name, expected_con):
"""Sets attenuators to trigger roaming and validate the DUT connected
to the BSSID expected.
Args:
attenuator: The attenuator object.
attn_val_name: Name of the attenuation value pair to use.
expected_con: The network information of the expected network.
"""
expected_con = {
WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
WifiEnums.BSSID_KEY: expected_con["bssid"],
}
set_attns(attenuator, attn_val_name)
logging.info("Wait %ss for roaming to finish.", ROAMING_TIMEOUT)
time.sleep(ROAMING_TIMEOUT)
verify_wifi_connection_info(dut, expected_con)
expected_bssid = expected_con[WifiEnums.BSSID_KEY]
logging.info("Roamed to %s successfully", expected_bssid)
if not validate_connection(dut):
raise signals.TestFailure("Fail to connect to internet on %s" %
expected_bssid)
def create_softap_config():
"""Create a softap config with random ssid and password."""
ap_ssid = "softap_" + utils.rand_ascii_str(8)
ap_password = utils.rand_ascii_str(8)
logging.info("softap setup: %s %s", ap_ssid, ap_password)
config = {
WifiEnums.SSID_KEY: ap_ssid,
WifiEnums.PWD_KEY: ap_password,
}
return config
def start_softap_and_verify(ad, band):
"""Bring-up softap and verify AP mode and in scan results.
Args:
band: The band to use for softAP.
Returns: dict, the softAP config.
"""
config = create_softap_config()
start_wifi_tethering(ad.dut,
config[WifiEnums.SSID_KEY],
config[WifiEnums.PWD_KEY], band=band)
asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
"SoftAp is not reported as running")
start_wifi_connection_scan_and_ensure_network_found(ad.dut_client,
config[WifiEnums.SSID_KEY])
return config
def start_pcap(pcap, wifi_band, test_name):
"""Start packet capture in monitor mode.
Args:
pcap: packet capture object
wifi_band: '2g' or '5g' or 'dual'
test_name: test name to be used for pcap file name
Returns:
Dictionary with wifi band as key and the tuple
(pcap Process object, log directory) as the value
"""
log_dir = os.path.join(
context.get_current_context().get_full_output_path(), 'PacketCapture')
utils.create_dir(log_dir)
if wifi_band == 'dual':
bands = [BAND_2G, BAND_5G]
else:
bands = [wifi_band]
procs = {}
for band in bands:
proc = pcap.start_packet_capture(band, log_dir, test_name)
procs[band] = (proc, os.path.join(log_dir, test_name))
return procs
def stop_pcap(pcap, procs, test_status=None):
"""Stop packet capture in monitor mode.
Since, the pcap logs in monitor mode can be very large, we will
delete them if they are not required. 'test_status' if True, will delete
the pcap files. If False, we will keep them.
Args:
pcap: packet capture object
procs: dictionary returned by start_pcap
test_status: status of the test case
"""
for proc, fname in procs.values():
pcap.stop_packet_capture(proc)
if test_status:
shutil.rmtree(os.path.dirname(fname))
def verify_mac_not_found_in_pcap(mac, packets):
"""Verify that a mac address is not found in the captured packets.
Args:
mac: string representation of the mac address
packets: packets obtained by rdpcap(pcap_fname)
"""
for pkt in packets:
logging.debug("Packet Summary = %s", pkt.summary())
if mac in pkt.summary():
asserts.fail("Caught Factory MAC: %s in packet sniffer."
"Packet = %s" % (mac, pkt.show()))
def verify_mac_is_found_in_pcap(mac, packets):
"""Verify that a mac address is found in the captured packets.
Args:
mac: string representation of the mac address
packets: packets obtained by rdpcap(pcap_fname)
"""
for pkt in packets:
if mac in pkt.summary():
return
asserts.fail("Did not find MAC = %s in packet sniffer." % mac)
def start_cnss_diags(ads):
for ad in ads:
start_cnss_diag(ad)
def start_cnss_diag(ad):
"""Start cnss_diag to record extra wifi logs
Args:
ad: android device object.
"""
if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
else:
prop = wifi_constants.CNSS_DIAG_PROP
if ad.adb.getprop(prop) != 'true':
ad.adb.shell("find /data/vendor/wifi/cnss_diag/wlan_logs/ -type f -delete")
ad.adb.shell("setprop %s true" % prop, ignore_status=True)
def stop_cnss_diags(ads):
for ad in ads:
stop_cnss_diag(ad)
def stop_cnss_diag(ad):
"""Stops cnss_diag
Args:
ad: android device object.
"""
if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
else:
prop = wifi_constants.CNSS_DIAG_PROP
ad.adb.shell("setprop %s false" % prop, ignore_status=True)
def get_cnss_diag_log(ad, test_name=""):
"""Pulls the cnss_diag logs in the wlan_logs dir
Args:
ad: android device object.
test_name: test case name
"""
logs = ad.get_file_names("/data/vendor/wifi/cnss_diag/wlan_logs/")
if logs:
ad.log.info("Pulling cnss_diag logs %s", logs)
log_path = os.path.join(ad.device_log_path, "CNSS_DIAG_%s" % ad.serial)
utils.create_dir(log_path)
ad.pull_files(logs, log_path)
LinkProbeResult = namedtuple('LinkProbeResult', (
'is_success', 'stdout', 'elapsed_time', 'failure_reason'))
def send_link_probe(ad):
"""Sends a link probe to the currently connected AP, and returns whether the
probe succeeded or not.
Args:
ad: android device object
Returns:
LinkProbeResult namedtuple
"""
stdout = ad.adb.shell('cmd wifi send-link-probe')
asserts.assert_false('Error' in stdout or 'Exception' in stdout,
'Exception while sending link probe: ' + stdout)
is_success = False
elapsed_time = None
failure_reason = None
if 'succeeded' in stdout:
is_success = True
elapsed_time = next(
(int(token) for token in stdout.split() if token.isdigit()), None)
elif 'failed with reason' in stdout:
failure_reason = next(
(int(token) for token in stdout.split() if token.isdigit()), None)
else:
asserts.fail('Unexpected link probe result: ' + stdout)
return LinkProbeResult(
is_success=is_success, stdout=stdout,
elapsed_time=elapsed_time, failure_reason=failure_reason)
def send_link_probes(ad, num_probes, delay_sec):
"""Sends a sequence of link probes to the currently connected AP, and
returns whether the probes succeeded or not.
Args:
ad: android device object
num_probes: number of probes to perform
delay_sec: delay time between probes, in seconds
Returns:
List[LinkProbeResult] one LinkProbeResults for each probe
"""
logging.info('Sending link probes')
results = []
for _ in range(num_probes):
# send_link_probe() will also fail the test if it sees an exception
# in the stdout of the adb shell command
result = send_link_probe(ad)
logging.info('link probe results: ' + str(result))
results.append(result)
time.sleep(delay_sec)
return results
def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
"""Set up the AP with provided network info.
Args:
test: the calling test class object.
index: int, index of the AP.
ap: access_point object of the AP.
network: dict with information of the network, including ssid,
password and bssid.
bandwidth: the operation bandwidth for the AP, default 80MHz.
channel: the channel number for the AP.
Returns:
brconfigs: the bridge interface configs
"""
bss_settings = []
ssid = network[WifiEnums.SSID_KEY]
test.access_points[index].close()
time.sleep(5)
# Configure AP as required.
if "password" in network.keys():
password = network["password"]
security = hostapd_security.Security(
security_mode="wpa", password=password)
else:
security = hostapd_security.Security(security_mode=None, password=None)
config = hostapd_ap_preset.create_ap_preset(
channel=channel,
ssid=ssid,
security=security,
bss_settings=bss_settings,
vht_bandwidth=bandwidth,
profile_name='whirlwind',
iface_wlan_2g=ap.wlan_2g,
iface_wlan_5g=ap.wlan_5g)
ap.start_ap(config)
logging.info("AP started on channel {} with SSID {}".format(channel, ssid))
def turn_ap_off(test, AP):
"""Bring down hostapd on the Access Point.
Args:
test: The test class object.
AP: int, indicating which AP to turn OFF.
"""
hostapd_2g = test.access_points[AP-1]._aps['wlan0'].hostapd
if hostapd_2g.is_alive():
hostapd_2g.stop()
logging.debug('Turned WLAN0 AP%d off' % AP)
hostapd_5g = test.access_points[AP-1]._aps['wlan1'].hostapd
if hostapd_5g.is_alive():
hostapd_5g.stop()
logging.debug('Turned WLAN1 AP%d off' % AP)
def turn_ap_on(test, AP):
"""Bring up hostapd on the Access Point.
Args:
test: The test class object.
AP: int, indicating which AP to turn ON.
"""
hostapd_2g = test.access_points[AP-1]._aps['wlan0'].hostapd
if not hostapd_2g.is_alive():
hostapd_2g.start(hostapd_2g.config)
logging.debug('Turned WLAN0 AP%d on' % AP)
hostapd_5g = test.access_points[AP-1]._aps['wlan1'].hostapd
if not hostapd_5g.is_alive():
hostapd_5g.start(hostapd_5g.config)
logging.debug('Turned WLAN1 AP%d on' % AP)