Update wifi test util with better styles and to utilize new ACTS
features.
Add a retry option for wifi_connect.
Resolve any descrepancy in Wi-Fi code between internal master and
AOSP.
Bug=27821528
Bug=28342613
Change-Id: I77dc14cb5d6f72c4a538745e7acf0013702009d6
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index 1f37490..01011ce 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -21,6 +21,7 @@
from enum import IntEnum
from queue import Empty
+from acts import asserts
from acts import signals
from acts.utils import exe_cmd
from acts.utils import require_sl4a
@@ -46,6 +47,7 @@
DEFAULT_PING_ADDR = "http://www.google.com/robots.txt"
+
class WifiEnums():
SSID_KEY = "SSID"
@@ -57,11 +59,11 @@
WIFI_CONFIG_APBAND_2G = 0
WIFI_CONFIG_APBAND_5G = 1
- WIFI_WPS_INFO_PBC = 0;
- WIFI_WPS_INFO_DISPLAY = 1;
- WIFI_WPS_INFO_KEYPAD = 2;
- WIFI_WPS_INFO_LABEL = 3;
- WIFI_WPS_INFO_INVALID = 4;
+ WIFI_WPS_INFO_PBC = 0
+ WIFI_WPS_INFO_DISPLAY = 1
+ WIFI_WPS_INFO_KEYPAD = 2
+ WIFI_WPS_INFO_LABEL = 3
+ WIFI_WPS_INFO_INVALID = 4
class CountryCode():
CHINA = "CN"
@@ -75,41 +77,43 @@
class Eap(IntEnum):
NONE = -1
PEAP = 0
- TLS = 1
+ TLS = 1
TTLS = 2
- PWD = 3
- SIM = 4
- AKA = 5
+ PWD = 3
+ SIM = 4
+ AKA = 5
+ AKA_PRIME = 6
+ UNAUTH_TLS = 7
# EAP Phase2 types
class EapPhase2(IntEnum):
- NONE = 0
- PAP = 1
- MSCHAP = 2
- MSCHAPV2 = 3
- GTC = 4
+ NONE = 0
+ PAP = 1
+ MSCHAP = 2
+ MSCHAPV2 = 3
+ GTC = 4
class Enterprise:
- # Enterprise Config Macros
- EMPTY_VALUE = "NULL"
- EAP = "eap"
- PHASE2 = "phase2"
- IDENTITY = "identity"
- ANON_IDENTITY = "anonymous_identity"
- PASSWORD = "password"
- SUBJECT_MATCH = "subject_match"
+ # Enterprise Config Macros
+ EMPTY_VALUE = "NULL"
+ EAP = "eap"
+ PHASE2 = "phase2"
+ IDENTITY = "identity"
+ ANON_IDENTITY = "anonymous_identity"
+ PASSWORD = "password"
+ SUBJECT_MATCH = "subject_match"
ALTSUBJECT_MATCH = "altsubject_match"
DOM_SUFFIX_MATCH = "domain_suffix_match"
- CLIENT_CERT = "client_cert"
- CA_CERT = "ca_cert"
- ENGINE = "engine"
- ENGINE_ID = "engine_id"
- PRIVATE_KEY_ID = "key_id"
- REALM = "realm"
- PLMN = "plmn"
- FQDN = "FQDN"
- FRIENDLY_NAME = "providerFriendlyName"
- ROAMING_IDS = "roamingConsortiumIds"
+ CLIENT_CERT = "client_cert"
+ CA_CERT = "ca_cert"
+ ENGINE = "engine"
+ ENGINE_ID = "engine_id"
+ PRIVATE_KEY_ID = "key_id"
+ REALM = "realm"
+ PLMN = "plmn"
+ FQDN = "FQDN"
+ FRIENDLY_NAME = "providerFriendlyName"
+ ROAMING_IDS = "roamingConsortiumIds"
# End of Macros for EAP
# Macros for wifi p2p.
@@ -127,51 +131,51 @@
# Macros for wifi rtt.
class RttType(IntEnum):
- TYPE_ONE_SIDED = 1
- TYPE_TWO_SIDED = 2
+ TYPE_ONE_SIDED = 1
+ TYPE_TWO_SIDED = 2
class RttPeerType(IntEnum):
- PEER_TYPE_AP = 1
- PEER_TYPE_STA = 2 # Requires NAN.
- PEER_P2P_GO = 3
- PEER_P2P_CLIENT = 4
- PEER_NAN = 5
+ PEER_TYPE_AP = 1
+ PEER_TYPE_STA = 2 # Requires NAN.
+ PEER_P2P_GO = 3
+ PEER_P2P_CLIENT = 4
+ PEER_NAN = 5
class RttPreamble(IntEnum):
- PREAMBLE_LEGACY = 0x01
- PREAMBLE_HT = 0x02
- PREAMBLE_VHT = 0x04
+ PREAMBLE_LEGACY = 0x01
+ PREAMBLE_HT = 0x02
+ PREAMBLE_VHT = 0x04
class RttBW(IntEnum):
- BW_5_SUPPORT = 0x01
- BW_10_SUPPORT = 0x02
- BW_20_SUPPORT = 0x04
- BW_40_SUPPORT = 0x08
- BW_80_SUPPORT = 0x10
+ BW_5_SUPPORT = 0x01
+ BW_10_SUPPORT = 0x02
+ BW_20_SUPPORT = 0x04
+ BW_40_SUPPORT = 0x08
+ BW_80_SUPPORT = 0x10
BW_160_SUPPORT = 0x20
class Rtt(IntEnum):
- STATUS_SUCCESS = 0
- STATUS_FAILURE = 1
- STATUS_FAIL_NO_RSP = 2
- STATUS_FAIL_REJECTED = 3
- STATUS_FAIL_NOT_SCHEDULED_YET = 4
- STATUS_FAIL_TM_TIMEOUT = 5
- STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
- STATUS_FAIL_NO_CAPABILITY = 7
- STATUS_ABORTED = 8
- STATUS_FAIL_INVALID_TS = 9
- STATUS_FAIL_PROTOCOL = 10
- STATUS_FAIL_SCHEDULE = 11
- STATUS_FAIL_BUSY_TRY_LATER = 12
- STATUS_INVALID_REQ = 13
- STATUS_NO_WIFI = 14
- STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
+ STATUS_SUCCESS = 0
+ STATUS_FAILURE = 1
+ STATUS_FAIL_NO_RSP = 2
+ STATUS_FAIL_REJECTED = 3
+ STATUS_FAIL_NOT_SCHEDULED_YET = 4
+ STATUS_FAIL_TM_TIMEOUT = 5
+ STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
+ STATUS_FAIL_NO_CAPABILITY = 7
+ STATUS_ABORTED = 8
+ STATUS_FAIL_INVALID_TS = 9
+ STATUS_FAIL_PROTOCOL = 10
+ STATUS_FAIL_SCHEDULE = 11
+ STATUS_FAIL_BUSY_TRY_LATER = 12
+ STATUS_INVALID_REQ = 13
+ STATUS_NO_WIFI = 14
+ STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
- REASON_UNSPECIFIED = -1
- REASON_NOT_AVAILABLE = -2
- REASON_INVALID_LISTENER = -3
- REASON_INVALID_REQUEST = -4
+ REASON_UNSPECIFIED = -1
+ REASON_NOT_AVAILABLE = -2
+ REASON_INVALID_LISTENER = -3
+ REASON_INVALID_REQUEST = -4
class RttParam:
device_type = "deviceType"
@@ -200,13 +204,13 @@
}
# Macros as specified in the WifiScanner code.
- WIFI_BAND_UNSPECIFIED = 0 # not specified
- WIFI_BAND_24_GHZ = 1 # 2.4 GHz band
- WIFI_BAND_5_GHZ = 2 # 5 GHz band without DFS channels
- WIFI_BAND_5_GHZ_DFS_ONLY = 4 # 5 GHz band with DFS channels
- WIFI_BAND_5_GHZ_WITH_DFS = 6 # 5 GHz band with DFS channels
- WIFI_BAND_BOTH = 3 # both bands without DFS channels
- WIFI_BAND_BOTH_WITH_DFS = 7 # both bands with DFS channels
+ WIFI_BAND_UNSPECIFIED = 0 # not specified
+ WIFI_BAND_24_GHZ = 1 # 2.4 GHz band
+ WIFI_BAND_5_GHZ = 2 # 5 GHz band without DFS channels
+ WIFI_BAND_5_GHZ_DFS_ONLY = 4 # 5 GHz band with DFS channels
+ WIFI_BAND_5_GHZ_WITH_DFS = 6 # 5 GHz band with DFS channels
+ WIFI_BAND_BOTH = 3 # both bands without DFS channels
+ WIFI_BAND_BOTH_WITH_DFS = 7 # both bands with DFS channels
REPORT_EVENT_AFTER_BUFFER_FULL = 0
REPORT_EVENT_AFTER_EACH_SCAN = 1
@@ -222,12 +226,12 @@
ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
band_to_frequencies = {
- WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
- WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
- WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
- WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
- WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
- WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
+ WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
+ WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
+ WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
+ WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
+ WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
+ WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
}
# All Wifi frequencies to channels lookup.
@@ -353,14 +357,17 @@
165: 5825
}
+
class WifiEventNames:
WIFI_CONNECTED = "WifiNetworkConnected"
SUPPLICANT_CON_CHANGED = "SupplicantConnectionChanged"
WIFI_FORGET_NW_SUCCESS = "WifiManagerForgetNetworkOnSuccess"
+
class WifiTestUtilsError(Exception):
pass
+
class WifiChannelBase:
ALL_2G_FREQUENCIES = []
DFS_5G_FREQUENCIES = []
@@ -374,42 +381,95 @@
WifiEnums.WIFI_BAND_5_GHZ: self.NONE_DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY: self.DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS: self.ALL_5G_FREQUENCIES,
- WifiEnums.WIFI_BAND_BOTH: self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
- WifiEnums.WIFI_BAND_BOTH_WITH_DFS: self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
+ WifiEnums.WIFI_BAND_BOTH:
+ self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
+ WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
+ self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
}
return _band_to_frequencies[band]
+
class WifiChannelUS(WifiChannelBase):
# US Wifi frequencies
ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
2457, 2462]
NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
5825]
- MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300,5500, 5320,
- 5520, 5560, 5700, 5745, 5805]
+ MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500,
+ 5320, 5520, 5560, 5700, 5745, 5805]
def __init__(self, model=None):
if model and trim_model_name(model) in K_DEVICES:
self.DFS_5G_FREQUENCIES = []
self.ALL_5G_FREQUENCIES = self.NONE_DFS_5G_FREQUENCIES
- self.MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5240, 5745, 5765]
+ self.MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5240, 5745,
+ 5765]
elif model and trim_model_name(model) in L_DEVICES:
self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
5540, 5560, 5580, 5660, 5680, 5700]
self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
elif model and trim_model_name(model) in L_TAP_DEVICES:
self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
- 5540, 5560, 5580, 5660, 5680, 5700, 5720]
+ 5540, 5560, 5580, 5660, 5680, 5700,
+ 5720]
self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
elif model and trim_model_name(model) in M_DEVICES:
- self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560,5580,
- 5600, 5620, 5640, 5660, 5680, 5700]
+ self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
+ 5540, 5560, 5580, 5600, 5620, 5640,
+ 5660, 5680, 5700]
self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
else:
- self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560,5580,
- 5600, 5620, 5640, 5660, 5680, 5700, 5720]
+ self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
+ 5540, 5560, 5580, 5600, 5620, 5640,
+ 5660, 5680, 5700, 5720]
self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
+
+def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
+ """Wrapper function that handles the bahevior of assert_on_fail.
+
+ When assert_on_fail is True, let all test signals through, which can
+ terminate test cases directly. When assert_on_fail is False, the wrapper
+ raises no test signals and reports operation status by returning True or
+ False.
+
+ Args:
+ func: The function to wrap. This function reports operation status by
+ raising test signals.
+ assert_on_fail: A boolean that specifies if the output of the wrapper
+ is test signal based or return value based.
+ args: Positional args for func.
+ kwargs: Name args for func.
+
+ Returns:
+ If assert_on_fail is True, returns True/False to signal operation
+ status, otherwise return nothing.
+ """
+ try:
+ func(*args, **kwargs)
+ if not assert_on_fail:
+ return True
+ except signals.TestSignal:
+ if assert_on_fail:
+ raise
+ return False
+
+
+def assert_network_in_list(target, network_list):
+ """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
+ networks.
+
+ Args:
+ target: A dict representing a Wi-Fi network.
+ E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
+ network_list: A list of dicts, each representing a Wi-Fi network.
+ """
+ match_results = match_networks(target, network_list)
+ asserts.assert_true(
+ match_results, "Target network %s, does not exist in network list %s" %
+ (target, network_list))
+
+
def match_networks(target_params, networks):
"""Finds the WiFi networks that match a given set of parameters in a list
of WiFi networks.
@@ -435,33 +495,56 @@
results.append(n)
return results
-def wifi_toggle_state(ad, new_state=None):
+
+def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
"""Toggles the state of wifi.
Args:
ad: An AndroidDevice object.
new_state: Wifi state to set to. If None, opposite of the current state.
+ assert_on_fail: If True, error checks in this function will raise test
+ failure signals.
Returns:
- True if the toggle was successful, False otherwise.
+ If assert_on_fail is False, function returns True if the toggle was
+ successful, False otherwise. If assert_on_fail is True, no return value.
+ """
+ _assert_on_fail_handler(_wifi_toggle_state, assert_on_fail, ad, new_state)
+
+
+def _wifi_toggle_state(ad, new_state=None):
+ """Toggles the state of wifi.
+
+ TestFailure signals are raised when something goes wrong.
+
+ Args:
+ ad: An AndroidDevice object.
+ new_state: The state to set Wi-Fi to. If None, opposite of the current
+ state will be set.
"""
# Check if the new_state is already achieved, so we don't wait for the
# state change event by mistake.
if new_state == ad.droid.wifiCheckState():
- return True
+ return
ad.droid.wifiStartTrackingStateChange()
- log.info("Setting wifi state to {}".format(new_state))
+ log.info("Setting Wi-Fi state to %s on %s.", new_state, ad.serial)
ad.droid.wifiToggleState(new_state)
+ fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
+ ad.serial)
try:
- event = ad.ed.pop_event(WifiEventNames.SUPPLICANT_CON_CHANGED, SHORT_TIMEOUT)
- return event['data']['Connected'] == new_state
+ event = ad.ed.pop_event(WifiEventNames.SUPPLICANT_CON_CHANGED,
+ SHORT_TIMEOUT)
+ asserts.assert_equal(event['data']['Connected'], new_state, fail_msg)
except Empty:
- # Supplicant connection event is not always reliable. We double check here
- # and call it a success as long as the new state equals the expected state.
- return new_state == ad.droid.wifiCheckState()
+ # Supplicant connection event is not always reliable. We double check
+ # here and call it a success as long as the new state equals the
+ # expected state.
+ time.wait(5)
+ asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
finally:
ad.droid.wifiStopTrackingStateChange()
+
def reset_wifi(ad):
"""Clears all saved networks on a device.
@@ -479,9 +562,10 @@
ad.droid.wifiForgetNetwork(n['networkId'])
try:
event = ad.ed.pop_event(WifiEventNames.WIFI_FORGET_NW_SUCCESS,
- SHORT_TIMEOUT)
+ SHORT_TIMEOUT)
except Empty:
- raise WifiTestUtilsError("Failed to remove network {}.".format(n))
+ raise WifiTestUtilsError("Failed to remove network %s." % n)
+
def wifi_forget_network(ad, net_ssid):
"""Remove configured Wifi network on an android device.
@@ -503,10 +587,11 @@
droid.wifiForgetNetwork(n['networkId'])
try:
event = ed.pop_event(WifiEventNames.WIFI_FORGET_NW_SUCCESS,
- SHORT_TIMEOUT)
+ SHORT_TIMEOUT)
except Empty:
raise WifiTestUtilsError("Failed to remove network %s." % n)
+
def wifi_test_device_init(ad):
"""Initializes an android device for wifi testing.
@@ -519,18 +604,17 @@
6. Sync device time with computer time.
7. Turn off cellular data.
"""
- require_sl4a((ad,))
+ require_sl4a((ad, ))
ad.droid.wifiScannerToggleAlwaysAvailable(False)
msg = "Failed to turn off location service's scan."
- assert not ad.droid.wifiScannerIsAlwaysAvailable(), msg
- msg = "Failed to turn WiFi on %s" % ad.serial
- assert wifi_toggle_state(ad, True), msg
+ asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
+ wifi_toggle_state(ad, True)
reset_wifi(ad)
msg = "Failed to clear configured networks."
- assert not ad.droid.wifiGetConfiguredNetworks(), msg
+ asserts.assert_true(not ad.droid.wifiGetConfiguredNetworks(), msg)
ad.droid.wifiEnableVerboseLogging(1)
msg = "Failed to enable WiFi verbose logging."
- assert ad.droid.wifiGetVerboseLoggingLevel() == 1, msg
+ asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
ad.droid.wifiScannerToggleAlwaysAvailable(False)
# We don't verify the following settings since they are not critical.
sync_device_time(ad)
@@ -539,6 +623,7 @@
# way to check right now.
ad.adb.shell("halutil -country %s" % WifiEnums.CountryCode.US)
+
def sort_wifi_scan_results(results, key="level"):
"""Sort wifi scan results by key.
@@ -551,6 +636,7 @@
"""
return sorted(results, lambda d: (key not in d, d[key]))
+
def start_wifi_connection_scan(ad):
"""Starts a wifi connection scan and wait for results to become available.
@@ -558,7 +644,11 @@
ad: An AndroidDevice object.
"""
ad.droid.wifiStartScan()
- ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
+ try:
+ ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
+ except Empty:
+ asserts.fail("Wi-Fi results did not become available within 60s.")
+
def start_wifi_background_scan(ad, scan_setting):
"""Starts wifi background scan.
@@ -576,6 +666,7 @@
SHORT_TIMEOUT)
return event['data']
+
def start_wifi_tethering(ad, ssid, password, band=None):
"""Starts wifi tethering on an android_device.
@@ -591,9 +682,7 @@
"""
droid, ed = ad.droid, ad.ed
droid.wifiStartTrackingStateChange()
- config = {
- WifiEnums.SSID_KEY: ssid
- }
+ config = {WifiEnums.SSID_KEY: ssid}
if password:
config[WifiEnums.PWD_KEY] = password
if band:
@@ -602,10 +691,11 @@
return False
ed.pop_event("WifiManagerApEnabled", 30)
ed.wait_for_event("TetherStateChanged",
- lambda x : x["data"]["ACTIVE_TETHER"], 30)
+ lambda x: x["data"]["ACTIVE_TETHER"], 30)
droid.wifiStopTrackingStateChange()
return True
+
def stop_wifi_tethering(ad):
"""Stops wifi tethering on an android_device.
@@ -617,35 +707,80 @@
droid.wifiSetApEnabled(False, None)
ed.pop_event("WifiManagerApDisabled", 30)
ed.wait_for_event("TetherStateChanged",
- lambda x : not x["data"]["ACTIVE_TETHER"], 30)
+ lambda x: not x["data"]["ACTIVE_TETHER"], 30)
droid.wifiStopTrackingStateChange()
-def wifi_connect(ad, network):
+
+def wifi_connect(ad, network, num_of_tries=1, assert_on_fail=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
+ This will directly fail a test if anything goes wrong.
+
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
- dictionary must have the key "SSID".
+ dictionary must have the key "SSID".
+ num_of_tries: An integer that is the number of times to try before
+ delaring failure. Default is 1.
+ assert_on_fail: If True, error checks in this function will raise test
+ failure signals.
+
+ Returns:
+ If assert_on_fail is False, function returns True if the toggle was
+ successful, False otherwise. If assert_on_fail is True, no return value.
"""
- assert WifiEnums.SSID_KEY in network, ("Key '%s' must be present in "
- "network definition.") % WifiEnums.SSID_KEY
+ _assert_on_fail_handler(_wifi_connect, assert_on_fail, ad, network,
+ num_of_tries)
+
+
+def _wifi_connect(ad, network, num_of_tries=1):
+ """Connect an Android device to a wifi network.
+
+ Initiate connection to a wifi network, wait for the "connected" event, then
+ confirm the connected ssid is the one requested.
+
+ This will directly fail a test if anything goes wrong.
+
+ Args:
+ ad: android_device object to initiate connection on.
+ network: A dictionary representing the network to connect to. The
+ dictionary must have the key "SSID".
+ num_of_tries: An integer that is the number of times to try before
+ delaring failure. Default is 1.
+ """
+ serial = ad.serial
+ asserts.assert_true(WifiEnums.SSID_KEY in network,
+ "Key '%s' must be present in network definition." %
+ WifiEnums.SSID_KEY)
ad.droid.wifiStartTrackingStateChange()
try:
- assert ad.droid.wifiConnect(network), "WiFi connect returned false."
- connect_result = ad.ed.pop_event(WifiEventNames.WIFI_CONNECTED)
- log.debug("Connection result: %s." % connect_result)
+ asserts.assert_true(
+ ad.droid.wifiConnect(network),
+ "Wi-Fi connect returned false on %s." % serial)
+ connect_result = None
+ for i in range(num_of_tries):
+ try:
+ connect_result = ad.ed.pop_event(WifiEventNames.WIFI_CONNECTED,
+ 30)
+ break
+ except Empty:
+ pass
+ asserts.assert_true(connect_result,
+ "Failed to connect to Wi-Fi network %s on %s" %
+ (network, serial))
+ log.debug("Connection result on %s: %s.", serial, connect_result)
expected_ssid = network[WifiEnums.SSID_KEY]
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
- assert actual_ssid == expected_ssid, ("Expected to connect to %s, "
- "connected to %s") % (expected_ssid, actual_ssid)
- log.info("Successfully connected to %s" % actual_ssid)
+ asserts.assert_equal(actual_ssid, expected_ssid,
+ "Connected to the wrong network on %s." % serial)
+ log.info("Connected to Wi-Fi network %s on %s", actual_ssid, serial)
finally:
ad.droid.wifiStopTrackingStateChange()
+
def start_wifi_single_scan(ad, scan_setting):
"""Starts wifi single shot scan.
@@ -656,13 +791,12 @@
Returns:
If scan was started successfully, event data of success event is returned.
"""
- droid, ed = ad.droid, ad.ed
- idx = droid.wifiScannerStartScan(scan_setting)
- event = ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
- SHORT_TIMEOUT)
- log.debug("event {}".format(event))
+ idx = ad.droid.wifiScannerStartScan(scan_setting)
+ event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
+ log.debug("Got event %s", event)
return event['data']
+
def track_connection(ad, network_ssid, check_connection_count):
"""Track wifi connection to network changes for given number of counts
@@ -670,9 +804,8 @@
ad: android_device object for forget network.
network_ssid: network ssid to which connection would be tracked
check_connection_count: Integer for maximum number network connection
- check.
+ check.
Returns:
-
True if connection to given network happen, else return False.
"""
droid, ed = ad.droid, ad.ed
@@ -680,13 +813,14 @@
while check_connection_count > 0:
connect_network = ed.pop_event("WifiNetworkConnected", 120)
log.info("connect_network {}".format(connect_network))
- if (WifiEnums.SSID_KEY in connect_network['data']
- and connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
- return True
+ if (WifiEnums.SSID_KEY in connect_network['data'] and
+ connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
+ return True
check_connection_count -= 1
droid.wifiStopTrackingStateChange()
return False
+
def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
"""Calculate the scan time required based on the band or channels in scan
setting
@@ -709,9 +843,10 @@
scan_time = len(scan_channels) * stime_channel
for channel in scan_channels:
if channel in WifiEnums.DFS_5G_FREQUENCIES:
- scan_time += 132 #passive scan time on DFS
+ scan_time += 132 #passive scan time on DFS
return scan_time, scan_channels
+
def start_wifi_track_bssid(ad, track_setting):
"""Start tracking Bssid for the given settings.
@@ -724,13 +859,12 @@
"""
droid, ed = ad.droid, ad.ed
idx = droid.wifiScannerStartTrackingBssids(
- track_setting["bssidInfos"],
- track_setting["apLostThreshold"]
- )
+ track_setting["bssidInfos"], track_setting["apLostThreshold"])
event = ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
SHORT_TIMEOUT)
return event['data']
+
def convert_pem_key_to_pkcs8(in_file, out_file):
"""Converts the key file generated by us to the format required by
Android using openssl.
@@ -743,12 +877,14 @@
out_file: The full path to the converted key file, including
filename.
"""
- assert in_file.endswith(".pem")
- assert out_file.endswith(".der")
+ asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
+ asserts.assert_true(
+ out_file.endswith(".der"), "Output file has to be .der.")
cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
" -topk8").format(in_file, out_file)
exe_cmd(cmd)
+
def check_internet_connection(ad, ping_addr):
"""Validate internet connection by pinging the address provided.
@@ -761,9 +897,10 @@
"""
droid, ed = ad.droid, ad.ed
ping = droid.httpPing(ping_addr)
- log.info("Http ping result: {}".format(ping))
+ log.info("Http ping result: %s.", ping)
return ping
+
#TODO(angli): This can only verify if an actual value is exactly the same.
# Would be nice to be able to verify an actual value is one of serveral.
def verify_wifi_connection_info(ad, expected_con):
@@ -776,13 +913,13 @@
"""
current_con = ad.droid.wifiGetConnectionInfo()
case_insensitive = ["BSSID", "supplicant_state"]
- log.debug("Current connection: %s" % current_con)
+ log.debug("Current connection: %s", current_con)
for k, expected_v in expected_con.items():
# Do not verify authentication related fields.
if k == "password":
continue
- msg = "Field %s does not exist in wifi connection info %s." % (k,
- current_con)
+ msg = "Field %s does not exist in wifi connection info %s." % (
+ k, current_con)
if k not in current_con:
raise signals.TestFailure(msg)
actual_v = current_con[k]
@@ -790,11 +927,16 @@
actual_v = actual_v.lower()
expected_v = expected_v.lower()
msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
- actual_v)
+ actual_v)
if actual_v != expected_v:
raise signals.TestFailure(msg)
-def eap_connect(config, ad, validate_con=True, ping_addr=DEFAULT_PING_ADDR):
+
+def eap_connect(config,
+ ad,
+ validate_con=True,
+ ping_addr=DEFAULT_PING_ADDR,
+ assert_on_fail=True):
"""Connects to an enterprise network and verify connection.
This logic expect the enterprise network to have Internet access.
@@ -808,43 +950,58 @@
Returns:
True if the connection is successful and Internet access works.
"""
+ _assert_on_fail_handler(_eap_connect, assert_on_fail, config, ad,
+ validate_con, ping_addr)
+
+
+def _eap_connect(config, ad, validate_con=True, ping_addr=DEFAULT_PING_ADDR):
+ """Connects to an enterprise network and verify connection.
+
+ This logic expect the enterprise network to have Internet access.
+
+ Args:
+ config: A dict representing a wifi enterprise configuration.
+ ad: The android_device to operate with.
+ validate_con: If True, validate Internet connection after connecting to
+ the network.
+ """
droid, ed = ad.droid, ad.ed
+ serial = ad.serial
start_wifi_connection_scan(ad)
expect_ssid = None
if WifiEnums.SSID_KEY in config:
expect_ssid = config[WifiEnums.SSID_KEY]
- log.info("Connecting to %s." % expect_ssid)
+ log.info("Connecting to %s on %s.", expect_ssid, serial)
else:
- log.info("Connecting.")
- log.debug(pprint.pformat(config, indent=4))
+ log.info("Connecting on %s.", serial)
ad.droid.wifiEnterpriseConnect(config)
try:
event = ed.pop_event("WifiManagerEnterpriseConnectOnSuccess", 30)
- log.info("Started connecting...")
+ log.info("Connection started on %s", serial)
+ except Empty:
+ asserts.fail("Failed to start connection process to %s on %s" %
+ (config, serial))
+ try:
event = ed.pop_event(WifiEventNames.WIFI_CONNECTED, 60)
except Empty:
- log.info("Failed to connect.")
- return False
+ asserts.fail("Failed to connect to %s on %s." % (config, serial))
log.debug(event)
if expect_ssid:
actual_ssid = event["data"][WifiEnums.SSID_KEY]
- msg = "Expected SSID {}, got {}".format(expect_ssid, actual_ssid)
- if expect_ssid != actual_ssid:
- log.error(msg)
- return False
- log.info("Connected to %s." % expect_ssid)
+ asserts.assert_equal(expect_ssid, actual_ssid)
+ log.info("Connected to %s on %s.", expect_ssid, serial)
else:
- log.info("Connected successfully.")
+ log.info("Connected successfully on %s.", serial)
if validate_con:
log.info("Checking Internet access.")
# Wait for data connection to stabilize.
time.sleep(4)
ping = ad.droid.httpPing(ping_addr)
- log.info("Http ping result: {}".format(ping))
- if not ping:
- log.error("No Internet access.")
- return False
- return True
+ log.info("Http ping result: %s on %s", ping, serial)
+ asserts.assert_true(ping,
+ "No Internet access on device %s on network %s." %
+ (serial, config))
+
def expand_enterprise_config_by_phase2(config):
"""Take an enterprise config and generate a list of configs, each with
@@ -864,7 +1021,7 @@
for phase2_type in phase2_types:
# Skip a special case for passpoint TTLS.
if (WifiEnums.Enterprise.FQDN in config and
- phase2_type == WifiEnums.EapPhase2.GTC):
+ phase2_type == WifiEnums.EapPhase2.GTC):
continue
c = dict(config)
c[WifiEnums.Enterprise.PHASE2] = phase2_type
diff --git a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
index 33c43f5..84884c9 100644
--- a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
+++ b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
@@ -199,16 +199,12 @@
WifiEnums.BSSID_KEY: self.bssid_b,
}
self.set_attns("a_on_b_off")
- asserts.assert_true(
- wutils.eap_connect(config, self.dut, validate_con=False),
- "Failed to connect to %s" % config
- )
+ wutils.eap_connect(config, self.dut, validate_con=False)
wutils.verify_wifi_connection_info(self.dut, expected_con_to_a)
self.log.info("Roaming from %s to %s" % (self.bssid_a, self.bssid_b))
self.trigger_roaming_and_validate("b_on_a_off", expected_con_to_b)
self.log.info("Roaming from %s to %s" % (self.bssid_b, self.bssid_a))
self.trigger_roaming_and_validate("a_on_b_off", expected_con_to_a)
- return True
""" Tests Begin """
@acts.signals.generated_test
diff --git a/acts/tests/google/wifi/WifiEnterpriseTest.py b/acts/tests/google/wifi/WifiEnterpriseTest.py
index 3987e43..433816e 100755
--- a/acts/tests/google/wifi/WifiEnterpriseTest.py
+++ b/acts/tests/google/wifi/WifiEnterpriseTest.py
@@ -18,25 +18,23 @@
import random
import time
-import acts.base_test
-import acts.signals
-import acts.test_utils.wifi.wifi_test_utils as wutils
-
from acts import asserts
+from acts import base_test
+from acts import signals
+from acts.test_utils.wifi import wifi_test_utils as wutils
WifiEnums = wutils.WifiEnums
# EAP Macros
EAP = WifiEnums.Eap
EapPhase2 = WifiEnums.EapPhase2
-
# Enterprise Config Macros
Ent = WifiEnums.Enterprise
-class WifiEnterpriseTest(acts.base_test.BaseTestClass):
+class WifiEnterpriseTest(base_test.BaseTestClass):
def __init__(self, controllers):
- acts.base_test.BaseTestClass.__init__(self, controllers)
+ base_test.BaseTestClass.__init__(self, controllers)
self.tests = (
"test_eap_connect",
"test_eap_connect_negative",
@@ -60,29 +58,32 @@
"fqdn",
"provider_friendly_name",
"realm",
- "ssid_peap",
+ "ssid_peap0",
+ "ssid_peap1",
"ssid_tls",
"ssid_ttls",
+ "ssid_pwd",
"ssid_sim",
+ "ssid_aka",
+ "ssid_aka_prime",
"ssid_passpoint",
"device_password",
"ping_addr"
)
- optional_userparam_names = (
- "roaming_consortium_ids",
- "plmn"
- )
self.unpack_userparams(required_userparam_names,
- opt_param_names = optional_userparam_names)
+ roaming_consortium_ids=None,
+ plmn=None)
# Default configs for EAP networks.
- self.config_peap = {
+ self.config_peap0 = {
Ent.EAP: EAP.PEAP,
Ent.CA_CERT: self.ca_cert,
Ent.IDENTITY: self.eap_identity,
Ent.PASSWORD: self.eap_password,
Ent.PHASE2: EapPhase2.MSCHAPV2,
- WifiEnums.SSID_KEY: self.ssid_peap
+ WifiEnums.SSID_KEY: self.ssid_peap0
}
+ self.config_peap1 = dict(self.config_peap0)
+ self.config_peap1[WifiEnums.SSID_KEY] = self.ssid_peap1
self.config_tls = {
Ent.EAP: EAP.TLS,
Ent.CA_CERT: self.ca_cert,
@@ -99,10 +100,24 @@
Ent.PHASE2: EapPhase2.MSCHAPV2,
WifiEnums.SSID_KEY: self.ssid_ttls
}
+ self.config_pwd = {
+ Ent.EAP: EAP.PWD,
+ Ent.IDENTITY: self.eap_identity,
+ Ent.PASSWORD: self.eap_password,
+ WifiEnums.SSID_KEY: self.ssid_pwd
+ }
self.config_sim = {
Ent.EAP: EAP.SIM,
WifiEnums.SSID_KEY: self.ssid_sim,
}
+ self.config_aka = {
+ Ent.EAP: EAP.AKA,
+ WifiEnums.SSID_KEY: self.ssid_aka,
+ }
+ self.config_aka_prime = {
+ Ent.EAP: EAP.AKA_PRIME,
+ WifiEnums.SSID_KEY: self.ssid_aka_prime,
+ }
# Base config for passpoint networks.
self.config_passpoint = {
@@ -111,9 +126,9 @@
Ent.REALM: self.realm,
Ent.CA_CERT: self.passpoint_ca_cert
}
- if hasattr(self, "plmn"):
+ if self.plmn:
self.config_passpoint[Ent.PLMN] = self.plmn
- if hasattr(self, "roaming_consortium_ids"):
+ if self.roaming_consortium_ids:
self.config_passpoint[Ent.ROAMING_IDS] = self.roaming_consortium_ids
# Default configs for passpoint networks.
@@ -160,10 +175,9 @@
Returns:
True if connection failed as expected, False otherwise.
"""
- verdict = wutils.eap_connect(config, ad)
- asserts.assert_true(not verdict, "Connection should have failed.")
- self.log.info("Connection failed as expected.")
- return True
+ with asserts.assert_raises(signals.TestFailure, extras=config):
+ verdict = wutils.eap_connect(config, ad)
+ asserts.explicit_pass("Connection failed as expected.")
def expand_config_by_phase2(self, config):
"""Take an enterprise config and generate a list of configs, each with
@@ -191,10 +205,14 @@
Returns:
A list of dicts each representing an EAP configuration.
"""
- configs = [self.config_tls]
- # self.config_sim
+ configs = [self.config_tls,
+ self.config_pwd,
+ self.config_sim,
+ self.config_aka,
+ self.config_aka_prime]
configs += wutils.expand_enterprise_config_by_phase2(self.config_ttls)
- configs += wutils.expand_enterprise_config_by_phase2(self.config_peap)
+ configs += wutils.expand_enterprise_config_by_phase2(self.config_peap0)
+ configs += wutils.expand_enterprise_config_by_phase2(self.config_peap1)
return configs
def gen_passpoint_configs(self):
@@ -294,7 +312,12 @@
Returns:
A string representing the name of a generated EAP test case.
"""
- name = "test_connect-%s" % config[Ent.EAP].name
+ eap_name = config[Ent.EAP].name
+ if "peap0" in config[WifiEnums.SSID_KEY].lower():
+ eap_name = "PEAP0"
+ if "peap1" in config[WifiEnums.SSID_KEY].lower():
+ eap_name = "PEAP1"
+ name = "test_connect-%s" % eap_name
if Ent.PHASE2 in config:
name += "-{}".format(config[Ent.PHASE2].name)
return name
@@ -317,7 +340,7 @@
return name
"""Tests"""
- @acts.signals.generated_test
+ @signals.generated_test
def test_eap_connect(self):
"""Test connecting to enterprise networks of different authentication
types.
@@ -346,9 +369,9 @@
name_func=self.gen_eap_test_name)
msg = ("The following configs failed EAP connect test: %s" %
pprint.pformat(failed))
- asserts.assert_true(len(failed) == 0, msg)
+ asserts.assert_equal(len(failed), 0, msg)
- @acts.signals.generated_test
+ @signals.generated_test
def test_eap_connect_negative(self):
"""Test connecting to enterprise networks.
@@ -373,9 +396,9 @@
name_func=name_gen)
msg = ("The following configs failed negative EAP connect test: %s" %
pprint.pformat(failed))
- asserts.assert_true(len(failed) == 0, msg)
+ asserts.assert_equal(len(failed), 0, msg)
- @acts.signals.generated_test
+ @signals.generated_test
def test_passpoint_connect(self):
"""Test connecting to enterprise networks of different authentication
types with passpoint support.
@@ -405,9 +428,9 @@
name_func=self.gen_passpoint_test_name)
msg = ("The following configs failed passpoint connect test: %s" %
pprint.pformat(failed))
- asserts.assert_true(len(failed) == 0, msg)
+ asserts.assert_equal(len(failed), 0, msg)
- @acts.signals.generated_test
+ @signals.generated_test
def test_passpoint_connect_negative(self):
"""Test connecting to enterprise networks.
@@ -434,4 +457,4 @@
name_func=name_gen)
msg = ("The following configs failed negative passpoint connect test: "
"%s") % pprint.pformat(failed)
- asserts.assert_true(len(failed) == 0, msg)
+ asserts.assert_equal(len(failed), 0, msg)
diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py
index 81a6303..78b2b0f 100755
--- a/acts/tests/google/wifi/WifiManagerTest.py
+++ b/acts/tests/google/wifi/WifiManagerTest.py
@@ -28,23 +28,18 @@
WifiEnums = wutils.WifiEnums
WifiEventNames = wutils.WifiEventNames
-class WifiManagerTest(acts.base_test.BaseTestClass):
+class WifiManagerTest(acts.base_test.BaseTestClass):
def setup_class(self):
self.dut = self.android_devices[0]
wutils.wifi_test_device_init(self.dut)
- req_params = (
- "iot_networks",
- "open_network",
- "iperf_server_address",
- "tdls_models",
- "energy_info_models"
- )
+ req_params = ("iot_networks", "open_network", "iperf_server_address",
+ "tdls_models", "energy_info_models")
self.unpack_userparams(req_params)
- asserts.assert_true(len(self.iot_networks) > 0,
+ asserts.assert_true(
+ len(self.iot_networks) > 0,
"Need at least one iot network with psk.")
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, True),
- "Failed to turn on wifi before tests.")
+ wutils.wifi_toggle_state(self.dut, True)
self.iot_networks = self.iot_networks + [self.open_network]
self.iperf_server = self.iperf_servers[0]
@@ -63,6 +58,7 @@
self.dut.cat_adb_log(test_name, begin_time)
"""Helper Functions"""
+
def connect_to_wifi_network_with_password(self, params):
"""Connection logic for open and psk wifi networks.
@@ -76,62 +72,51 @@
Returns:
True if successful, False otherwise.
"""
- result = False
wait_time = 5
network, ad = params
droid = ad.droid
ed = ad.ed
SSID = network[WifiEnums.SSID_KEY]
- try:
- ed.clear_all_events()
- wutils.start_wifi_connection_scan(ad)
- droid.wifiStartTrackingStateChange()
- asserts.assert_true(droid.wifiConnect(network),
- "wifi connect returned false.")
- connect_result = ed.pop_event(WifiEventNames.WIFI_CONNECTED)
- self.log.debug(connect_result)
- result = connect_result['data'][WifiEnums.SSID_KEY] == SSID
- if result:
- self.log.info("Starting iperf traffic through {}".format(SSID))
- time.sleep(wait_time)
- port_arg = "-p {}".format(self.iperf_server.port)
- result, data = ad.run_iperf_client(self.iperf_server_address,
- port_arg)
- self.log.debug(pprint.pformat(data))
- except queue.Empty:
- self.log.exception("Failed to connect to {}".format(SSID))
- finally:
- droid.wifiStopTrackingStateChange()
- return result
+ ed.clear_all_events()
+ wutils.start_wifi_connection_scan(ad)
+ scan_results = droid.wifiGetScanResults()
+ wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
+ wutils.wifi_connect(ad, network, num_of_tries=3)
+ self.log.info("Starting iperf traffic through {}".format(SSID))
+ time.sleep(wait_time)
+ port_arg = "-p {}".format(self.iperf_server.port)
+ success, data = ad.run_iperf_client(self.iperf_server_address,
+ port_arg)
+ self.log.debug(pprint.pformat(data))
+ asserts.assert_true(success, "Error occurred in iPerf traffic.")
def run_iperf(self, iperf_args):
if "iperf_server_address" not in self.user_params:
self.log.error(("Missing iperf_server_address. "
- "Provide one in config."))
+ "Provide one in config."))
else:
iperf_addr = self.user_params["iperf_server_address"]
self.log.info("Running iperf client.")
- result, data = self.dut.run_iperf_client(iperf_addr,
- iperf_args)
+ result, data = self.dut.run_iperf_client(iperf_addr, iperf_args)
self.log.debug(data)
def run_iperf_rx_tx(self, time, omit=10):
args = "-p {} -t {} -O 10".format(self.iperf_server.port, time, omit)
self.log.info("Running iperf client {}".format(args))
self.run_iperf(args)
- args = "-p {} -t {} -O 10 -R".format(self.iperf_server.port, time, omit)
+ args = "-p {} -t {} -O 10 -R".format(self.iperf_server.port, time,
+ omit)
self.log.info("Running iperf client {}".format(args))
self.run_iperf(args)
"""Tests"""
+
def test_toggle_state(self):
"""Test toggling wifi"""
self.log.debug("Going from on to off.")
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, False),
- "Failed to turn wifi off.")
+ wutils.wifi_toggle_state(self.dut, False)
self.log.debug("Going from off to on.")
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, True),
- "Failed to turn wifi on.")
+ wutils.wifi_toggle_state(self.dut, True)
def test_toggle_with_screen(self):
"""Test toggling wifi with screen on/off"""
@@ -142,12 +127,10 @@
time.sleep(wait_time)
self.log.debug("Going from on to off.")
try:
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, False),
- "Failed to turn wifi off.")
+ wutils.wifi_toggle_state(self.dut, False)
time.sleep(wait_time)
self.log.debug("Going from off to on.")
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, True),
- "Failed to turn wifi on.")
+ wutils.wifi_toggle_state(self.dut, True)
finally:
self.dut.droid.wakeLockRelease()
time.sleep(wait_time)
@@ -159,11 +142,9 @@
self.log.debug("Start regular wifi scan.")
wutils.start_wifi_connection_scan(self.dut)
wifi_results = self.dut.droid.wifiGetScanResults()
- self.log.debug("Scan results: %s" % wifi_results)
+ self.log.debug("Scan results: %s", wifi_results)
ssid = self.open_network[WifiEnums.SSID_KEY]
- condition = {WifiEnums.SSID_KEY: ssid}
- asserts.assert_true(wutils.match_networks(condition, wifi_results),
- "Can not find expected network %s" % ssid)
+ wutils.assert_network_in_list({WifiEnums.SSID_KEY: ssid}, wifi_results)
def test_add_network(self):
"""Test wifi connection scan."""
@@ -173,10 +154,8 @@
configured_networks = self.dut.droid.wifiGetConfiguredNetworks()
self.log.debug(("Configured networks after adding: %s" %
configured_networks))
- condition = {WifiEnums.SSID_KEY: ssid}
- asserts.assert_true(wutils.match_networks(condition, configured_networks),
- ("Could not find expected network %s in configured "
- "networks.") % ssid)
+ wutils.assert_network_in_list(
+ {WifiEnums.SSID_KEY: ssid}, configured_networks)
def test_forget_network(self):
self.test_add_network()
@@ -184,13 +163,15 @@
wutils.wifi_forget_network(self.dut, ssid)
configured_networks = self.dut.droid.wifiGetConfiguredNetworks()
for nw in configured_networks:
- asserts.assert_true(nw[WifiEnums.BSSID_KEY] != ssid,
+ asserts.assert_true(
+ nw[WifiEnums.BSSID_KEY] != ssid,
"Found forgotten network %s in configured networks." % ssid)
@acts.signals.generated_test
def test_iot_with_password(self):
- params = list(itertools.product(self.iot_networks, self.android_devices))
- name_gen = lambda p : "test_connection_to-%s" % p[0][WifiEnums.SSID_KEY]
+ params = list(itertools.product(self.iot_networks,
+ self.android_devices))
+ name_gen = lambda p: "test_connection_to-%s" % p[0][WifiEnums.SSID_KEY]
failed = self.run_generated_testcases(
self.connect_to_wifi_network_with_password,
params,
@@ -201,13 +182,13 @@
model = acts.utils.trim_model_name(self.dut.model)
self.log.debug("Model is %s" % model)
if model in self.tdls_models:
- asserts.assert_true(self.dut.droid.wifiIsTdlsSupported(),
- ("TDLS should be supported on %s, but device is "
- "reporting not supported.") % model)
+ asserts.assert_true(self.dut.droid.wifiIsTdlsSupported(), (
+ "TDLS should be supported on %s, but device is "
+ "reporting not supported.") % model)
else:
- asserts.assert_true(not self.dut.droid.wifiIsTdlsSupported(),
- ("TDLS should not be supported on %s, but device "
- "is reporting supported.") % model)
+ asserts.assert_true(not self.dut.droid.wifiIsTdlsSupported(), (
+ "TDLS should not be supported on %s, but device "
+ "is reporting supported.") % model)
def test_energy_info(self):
"""Verify the WiFi energy info reporting feature.
@@ -227,11 +208,12 @@
model = self.dut.model
expected_support = model in self.energy_info_models
msg = "Expect energy info support to be %s on %s, got %s." % (
- expected_support, model, actual_support)
+ expected_support, model, actual_support)
asserts.assert_true(actual_support == expected_support, msg)
if not actual_support:
- asserts.skip(("Device %s does not support energy info reporting as "
- "expected.") % model)
+ asserts.skip(
+ ("Device %s does not support energy info reporting as "
+ "expected.") % model)
# Verify reported values don't decrease.
self.log.info(("Device %s supports energy info reporting, verify that "
"the reported values don't decrease.") % model)
@@ -243,12 +225,12 @@
new_energy = info["ControllerEnergyUsed"]
new_idle_time = info["ControllerIdleTimeMillis"]
asserts.assert_true(new_energy >= energy,
- "Energy value decreased: previous %d, now %d" % (energy,
- new_energy))
+ "Energy value decreased: previous %d, now %d" %
+ (energy, new_energy))
energy = new_energy
asserts.assert_true(new_idle_time >= idle_time,
- "Idle time decreased: previous %d, now %d" % (idle_time,
- new_idle_time))
+ "Idle time decreased: previous %d, now %d" % (
+ idle_time, new_idle_time))
idle_time = new_idle_time
wutils.start_wifi_connection_scan(self.dut)
diff --git a/acts/tests/google/wifi/WifiNanManagerTest.py b/acts/tests/google/wifi/WifiNanManagerTest.py
index 6dc9e80..59d7772 100644
--- a/acts/tests/google/wifi/WifiNanManagerTest.py
+++ b/acts/tests/google/wifi/WifiNanManagerTest.py
@@ -27,32 +27,25 @@
ON_MESSAGE_TX_FAIL = "WifiNanSessionOnMessageSendFail"
ON_MESSAGE_TX_OK = "WifiNanSessionOnMessageSendSuccess"
+
class WifiNanManagerTest(base_test.BaseTestClass):
def setup_class(self):
self.publisher = self.android_devices[0]
self.subscriber = self.android_devices[1]
- required_params = (
- "config_request1",
- "config_request2",
- "publish_data",
- "publish_settings",
- "subscribe_data",
- "subscribe_settings"
- )
+ required_params = ("config_request1", "config_request2",
+ "publish_data", "publish_settings",
+ "subscribe_data", "subscribe_settings")
self.unpack_userparams(required_params)
self.msg_id = 10
def setup_test(self):
- asserts.assert_true(wutils.wifi_toggle_state(self.publisher, True),
- "Failed enabling Wi-Fi interface on publisher")
- asserts.assert_true(wutils.wifi_toggle_state(self.subscriber, True),
- "Failed enabling Wi-Fi interface on subscriber")
+ wutils.wifi_toggle_state(self.publisher, True)
+ wutils.wifi_toggle_state(self.subscriber, True)
- # def teardown_class(self): (b/27692829)
- # asserts.assert_true(wutils.wifi_toggle_state(self.publisher, False),
- # "Failed disabling Wi-Fi interface on publisher")
- # asserts.assert_true(wutils.wifi_toggle_state(self.subscriber, False),
- # "Failed disabling Wi-Fi interface on subscriber")
+ # (b/27692829)
+ # def teardown_class(self):
+ # wutils.wifi_toggle_state(self.publisher, False)
+ # wutils.wifi_toggle_state(self.subscriber, False)
def reliable_tx(self, device, session_id, peer, msg):
num_tries = 0
@@ -107,7 +100,7 @@
self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
except queue.Empty:
asserts.fail('Timed out while waiting for %s on Publisher' %
- ON_IDENTITY_CHANGED)
+ ON_IDENTITY_CHANGED)
self.log.debug(event)
try:
@@ -115,7 +108,7 @@
self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
except queue.Empty:
asserts.fail('Timed out while waiting for %s on Subscriber' %
- ON_IDENTITY_CHANGED)
+ ON_IDENTITY_CHANGED)
self.log.debug(event)
pub_id = self.publisher.droid.wifiNanPublish(0, self.publish_config)
@@ -126,37 +119,38 @@
event = self.subscriber.ed.pop_event(ON_MATCH, 30)
self.log.info('%s: %s' % (ON_MATCH, event['data']))
except queue.Empty:
- asserts.fail('Timed out while waiting for %s on Subscriber'
- % ON_MATCH)
+ asserts.fail('Timed out while waiting for %s on Subscriber' %
+ ON_MATCH)
self.log.debug(event)
- asserts.assert_true(self.reliable_tx(self.subscriber, sub_id,
- event['data']['peerId'],
- sub2pub_msg),
- "Failed to transmit from subscriber")
+ asserts.assert_true(
+ self.reliable_tx(self.subscriber, sub_id, event['data']['peerId'],
+ sub2pub_msg),
+ "Failed to transmit from subscriber")
try:
event = self.publisher.ed.pop_event(ON_MESSAGE_RX, 10)
self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
- asserts.assert_true(event['data']['messageAsString'] == sub2pub_msg,
- "Subscriber -> publisher message corrupted")
+ asserts.assert_equal(
+ event['data']['messageAsString'], sub2pub_msg,
+ "Subscriber -> publisher message corrupted")
except queue.Empty:
asserts.fail('Timed out while waiting for %s on publisher' %
- ON_MESSAGE_RX)
+ ON_MESSAGE_RX)
- asserts.assert_true(self.reliable_tx(self.publisher, pub_id,
- event['data']['peerId'],
- pub2sub_msg),
- "Failed to transmit from publisher")
+ asserts.assert_true(
+ self.reliable_tx(self.publisher, pub_id, event['data']['peerId'],
+ pub2sub_msg), "Failed to transmit from publisher")
try:
event = self.subscriber.ed.pop_event(ON_MESSAGE_RX, 10)
self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
- asserts.assert_true(event['data']['messageAsString'] == pub2sub_msg,
- "Publisher -> subscriber message corrupted")
+ asserts.assert_equal(
+ event['data']['messageAsString'], pub2sub_msg,
+ "Publisher -> subscriber message corrupted")
except queue.Empty:
asserts.fail('Timed out while waiting for %s on subscriber' %
- ON_MESSAGE_RX)
+ ON_MESSAGE_RX)
self.publisher.droid.wifiNanTerminateSession(pub_id)
self.subscriber.droid.wifiNanTerminateSession(sub_id)
diff --git a/acts/tests/google/wifi/WifiNativeTest.py b/acts/tests/google/wifi/WifiNativeTest.py
index 9cb3a91..aa90763 100644
--- a/acts/tests/google/wifi/WifiNativeTest.py
+++ b/acts/tests/google/wifi/WifiNativeTest.py
@@ -14,29 +14,26 @@
# License for the specific language governing permissions and limitations under
# the License.
-import acts.base_test as base_test
+from acts import base_test
+from acts.controllers import native_android_device
class WifiNativeTest(base_test.BaseTestClass):
- tests = None
- def __init__(self, controllers):
- base_test.BaseTestClass.__init__(self, controllers)
- self.device = self.native_android_devices[0]
- self.tests = (
- "test_hal_get_features",
- )
+ def setup_class(self):
+ self.native_devices = self.register_controller(native_android_device)
+ self.dut = self.native_devices[0]
def setup_test(self):
# TODO: uncomment once wifi_toggle_state (or alternative)
# work with sl4n
# assert wutils.wifi_toggle_state(self.device, True)
- return self.device.droid.WifiInit()
+ return self.dut.droid.WifiInit()
# TODO: uncomment once wifi_toggle_state (or alternative)
# work with sl4n
-# def teardown_class(self):
+# def teardown_test(self):
# assert wutils.wifi_toggle_state(self.device, False)
def test_hal_get_features(self):
- result = self.device.droid.WifiGetSupportedFeatureSet()
- self.log.info("Wi-Fi feature set: {}".format(result))
+ result = self.dut.droid.WifiGetSupportedFeatureSet()
+ self.log.info("Wi-Fi feature set: %d", result)
diff --git a/acts/tests/google/wifi/WifiPowerTest.py b/acts/tests/google/wifi/WifiPowerTest.py
index 4940944..ca7f437 100755
--- a/acts/tests/google/wifi/WifiPowerTest.py
+++ b/acts/tests/google/wifi/WifiPowerTest.py
@@ -23,11 +23,10 @@
from acts.controllers import monsoon
from acts.test_utils.wifi import wifi_test_utils as wutils
-
pmc_base_cmd = ("am broadcast -a com.android.pmc.action.AUTOPOWER --es"
" PowerAction ")
start_pmc_cmd = ("am start -n com.android.pmc/com.android.pmc."
- "PMCMainActivity")
+ "PMCMainActivity")
pmc_interval_cmd = ("am broadcast -a com.android.pmc.action.SETPARAMS --es "
"Interval %s ")
pmc_set_params = "am broadcast -a com.android.pmc.action.SETPARAMS --es "
@@ -44,7 +43,6 @@
class WifiPowerTest(acts.base_test.BaseTestClass):
-
def setup_class(self):
self.hz = 5000
self.offset = 5 * 60
@@ -59,16 +57,16 @@
self.mon.set_max_current(7.8)
self.dut = self.android_devices[0]
self.mon.attach_device(self.dut)
- asserts.assert_true(self.mon.usb("auto"),
- "Failed to turn USB mode to auto on monsoon.")
+ asserts.assert_true(
+ self.mon.usb("auto"),
+ "Failed to turn USB mode to auto on monsoon.")
required_userparam_names = (
# These two params should follow the format of
# {"SSID": <SSID>, "password": <Password>}
"network_2g",
"network_5g",
- "iperf_server_address"
- )
- self.unpack_userparams(required_userparam_names, ("threshold",))
+ "iperf_server_address")
+ self.unpack_userparams(required_userparam_names, ("threshold", ))
wutils.wifi_test_device_init(self.dut)
# Start pmc app.
self.dut.adb.shell(start_pmc_cmd)
@@ -137,25 +135,24 @@
if self.threshold:
model = utils.trim_model_name(self.dut.model)
asserts.assert_true(tag in self.threshold[model],
- "Acceptance threshold for %s is missing" % tag,
- extras=result_extra)
+ "Acceptance threshold for %s is missing" % tag,
+ extras=result_extra)
acceptable_threshold = self.threshold[model][tag]
- asserts.assert_true(actual_current < acceptable_threshold,
- ("Measured average current in [%s]: %s, which is "
- "higher than acceptable threshold %.2fmA.") % (
- tag, actual_current_str, acceptable_threshold),
- extras=result_extra)
+ asserts.assert_true(
+ actual_current < acceptable_threshold,
+ ("Measured average current in [%s]: %s, which is "
+ "higher than acceptable threshold %.2fmA.") % (
+ tag, actual_current_str, acceptable_threshold),
+ extras=result_extra)
asserts.explicit_pass("Measurement finished for %s." % tag,
extras=result_extra)
def test_power_wifi_off(self):
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, False),
- "Failed to toggle wifi off.")
+ wutils.wifi_toggle_state(self.dut, False)
self.measure_and_process_result()
def test_power_wifi_on_idle(self):
- asserts.assert_true(wutils.wifi_toggle_state(self.dut, True),
- "Failed to toggle wifi on.")
+ wutils.wifi_toggle_state(self.dut, True)
self.measure_and_process_result()
def test_power_disconnected_connectivity_scan(self):