blob: 6f953c5f3d78004d15a54b337d187284ece41b8e [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2016 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import re
import shutil
import time
from retry import retry
from collections import namedtuple
from enum import IntEnum
from queue import Empty
from acts import asserts
from acts import context
from acts import signals
from acts import utils
from acts.controllers import attenuator
from acts.controllers.ap_lib import hostapd_security
from acts.controllers.ap_lib import hostapd_ap_preset
from acts.controllers.ap_lib.hostapd_constants import BAND_2G
from acts.controllers.ap_lib.hostapd_constants import BAND_5G
from acts_contrib.test_utils.net import connectivity_const as cconsts
from acts_contrib.test_utils.tel import tel_defines
from acts_contrib.test_utils.wifi import wifi_constants
from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
# Default timeout used for reboot, toggle WiFi and Airplane mode,
# for the system to settle down after the operation.
DEFAULT_TIMEOUT = 10
# Number of seconds to wait for events that are supposed to happen quickly.
# Like onSuccess for start background scan and confirmation on wifi state
# change.
SHORT_TIMEOUT = 30
ROAMING_TIMEOUT = 30
WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
DEFAULT_SCAN_TRIES = 3
DEFAULT_CONNECT_TRIES = 3
# Speed of light in m/s.
SPEED_OF_LIGHT = 299792458
DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"
ROAMING_ATTN = {
"AP1_on_AP2_off": [0, 0, 95, 95],
"AP1_off_AP2_on": [95, 95, 0, 0],
"default": [0, 0, 0, 0]
}
class WifiEnums():
SSID_KEY = "SSID" # Used for Wifi & SoftAp
SSID_PATTERN_KEY = "ssidPattern"
NETID_KEY = "network_id"
BSSID_KEY = "BSSID" # Used for Wifi & SoftAp
BSSID_PATTERN_KEY = "bssidPattern"
PWD_KEY = "password" # Used for Wifi & SoftAp
frequency_key = "frequency"
HIDDEN_KEY = "hiddenSSID" # Used for Wifi & SoftAp
IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
IS_SUGGESTION_METERED = "isMetered"
PRIORITY = "priority"
SECURITY = "security" # Used for Wifi & SoftAp
# Used for SoftAp
AP_BAND_KEY = "apBand"
AP_CHANNEL_KEY = "apChannel"
AP_BANDS_KEY = "apBands"
AP_CHANNEL_FREQUENCYS_KEY = "apChannelFrequencies"
AP_MAC_RANDOMIZATION_SETTING_KEY = "MacRandomizationSetting"
AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY = "BridgedModeOpportunisticShutdownEnabled"
AP_IEEE80211AX_ENABLED_KEY = "Ieee80211axEnabled"
AP_MAXCLIENTS_KEY = "MaxNumberOfClients"
AP_SHUTDOWNTIMEOUT_KEY = "ShutdownTimeoutMillis"
AP_SHUTDOWNTIMEOUTENABLE_KEY = "AutoShutdownEnabled"
AP_CLIENTCONTROL_KEY = "ClientControlByUserEnabled"
AP_ALLOWEDLIST_KEY = "AllowedClientList"
AP_BLOCKEDLIST_KEY = "BlockedClientList"
WIFI_CONFIG_SOFTAP_BAND_2G = 1
WIFI_CONFIG_SOFTAP_BAND_5G = 2
WIFI_CONFIG_SOFTAP_BAND_2G_5G = 3
WIFI_CONFIG_SOFTAP_BAND_6G = 4
WIFI_CONFIG_SOFTAP_BAND_2G_6G = 5
WIFI_CONFIG_SOFTAP_BAND_5G_6G = 6
WIFI_CONFIG_SOFTAP_BAND_ANY = 7
# DO NOT USE IT for new test case! Replaced by WIFI_CONFIG_SOFTAP_BAND_
WIFI_CONFIG_APBAND_2G = WIFI_CONFIG_SOFTAP_BAND_2G
WIFI_CONFIG_APBAND_5G = WIFI_CONFIG_SOFTAP_BAND_5G
WIFI_CONFIG_APBAND_AUTO = WIFI_CONFIG_SOFTAP_BAND_2G_5G
WIFI_CONFIG_APBAND_2G_OLD = 0
WIFI_CONFIG_APBAND_5G_OLD = 1
WIFI_CONFIG_APBAND_AUTO_OLD = -1
WIFI_WPS_INFO_PBC = 0
WIFI_WPS_INFO_DISPLAY = 1
WIFI_WPS_INFO_KEYPAD = 2
WIFI_WPS_INFO_LABEL = 3
WIFI_WPS_INFO_INVALID = 4
class SoftApSecurityType():
OPEN = "NONE"
WPA2 = "WPA2_PSK"
WPA3_SAE_TRANSITION = "WPA3_SAE_TRANSITION"
WPA3_SAE = "WPA3_SAE"
class CountryCode():
AUSTRALIA = "AU"
CHINA = "CN"
GERMANY = "DE"
JAPAN = "JP"
UK = "GB"
US = "US"
UNKNOWN = "UNKNOWN"
# Start of Macros for EAP
# EAP types
class Eap(IntEnum):
NONE = -1
PEAP = 0
TLS = 1
TTLS = 2
PWD = 3
SIM = 4
AKA = 5
AKA_PRIME = 6
UNAUTH_TLS = 7
# EAP Phase2 types
class EapPhase2(IntEnum):
NONE = 0
PAP = 1
MSCHAP = 2
MSCHAPV2 = 3
GTC = 4
class Enterprise:
# Enterprise Config Macros
EMPTY_VALUE = "NULL"
EAP = "eap"
PHASE2 = "phase2"
IDENTITY = "identity"
ANON_IDENTITY = "anonymous_identity"
PASSWORD = "password"
SUBJECT_MATCH = "subject_match"
ALTSUBJECT_MATCH = "altsubject_match"
DOM_SUFFIX_MATCH = "domain_suffix_match"
CLIENT_CERT = "client_cert"
CA_CERT = "ca_cert"
ENGINE = "engine"
ENGINE_ID = "engine_id"
PRIVATE_KEY_ID = "key_id"
REALM = "realm"
PLMN = "plmn"
FQDN = "FQDN"
FRIENDLY_NAME = "providerFriendlyName"
ROAMING_IDS = "roamingConsortiumIds"
OCSP = "ocsp"
# End of Macros for EAP
# Macros for wifi p2p.
WIFI_P2P_SERVICE_TYPE_ALL = 0
WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
WIFI_P2P_SERVICE_TYPE_UPNP = 2
WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255
class ScanResult:
CHANNEL_WIDTH_20MHZ = 0
CHANNEL_WIDTH_40MHZ = 1
CHANNEL_WIDTH_80MHZ = 2
CHANNEL_WIDTH_160MHZ = 3
CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4
# Macros for wifi rtt.
class RttType(IntEnum):
TYPE_ONE_SIDED = 1
TYPE_TWO_SIDED = 2
class RttPeerType(IntEnum):
PEER_TYPE_AP = 1
PEER_TYPE_STA = 2 # Requires NAN.
PEER_P2P_GO = 3
PEER_P2P_CLIENT = 4
PEER_NAN = 5
class RttPreamble(IntEnum):
PREAMBLE_LEGACY = 0x01
PREAMBLE_HT = 0x02
PREAMBLE_VHT = 0x04
class RttBW(IntEnum):
BW_5_SUPPORT = 0x01
BW_10_SUPPORT = 0x02
BW_20_SUPPORT = 0x04
BW_40_SUPPORT = 0x08
BW_80_SUPPORT = 0x10
BW_160_SUPPORT = 0x20
class Rtt(IntEnum):
STATUS_SUCCESS = 0
STATUS_FAILURE = 1
STATUS_FAIL_NO_RSP = 2
STATUS_FAIL_REJECTED = 3
STATUS_FAIL_NOT_SCHEDULED_YET = 4
STATUS_FAIL_TM_TIMEOUT = 5
STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
STATUS_FAIL_NO_CAPABILITY = 7
STATUS_ABORTED = 8
STATUS_FAIL_INVALID_TS = 9
STATUS_FAIL_PROTOCOL = 10
STATUS_FAIL_SCHEDULE = 11
STATUS_FAIL_BUSY_TRY_LATER = 12
STATUS_INVALID_REQ = 13
STATUS_NO_WIFI = 14
STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
REASON_UNSPECIFIED = -1
REASON_NOT_AVAILABLE = -2
REASON_INVALID_LISTENER = -3
REASON_INVALID_REQUEST = -4
class RttParam:
device_type = "deviceType"
request_type = "requestType"
BSSID = "bssid"
channel_width = "channelWidth"
frequency = "frequency"
center_freq0 = "centerFreq0"
center_freq1 = "centerFreq1"
number_burst = "numberBurst"
interval = "interval"
num_samples_per_burst = "numSamplesPerBurst"
num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
num_retries_per_FTMR = "numRetriesPerFTMR"
lci_request = "LCIRequest"
lcr_request = "LCRRequest"
burst_timeout = "burstTimeout"
preamble = "preamble"
bandwidth = "bandwidth"
margin = "margin"
RTT_MARGIN_OF_ERROR = {
RttBW.BW_80_SUPPORT: 2,
RttBW.BW_40_SUPPORT: 5,
RttBW.BW_20_SUPPORT: 5
}
# Macros as specified in the WifiScanner code.
WIFI_BAND_UNSPECIFIED = 0 # not specified
WIFI_BAND_24_GHZ = 1 # 2.4 GHz band
WIFI_BAND_5_GHZ = 2 # 5 GHz band without DFS channels
WIFI_BAND_5_GHZ_DFS_ONLY = 4 # 5 GHz band with DFS channels
WIFI_BAND_5_GHZ_WITH_DFS = 6 # 5 GHz band with DFS channels
WIFI_BAND_BOTH = 3 # both bands without DFS channels
WIFI_BAND_BOTH_WITH_DFS = 7 # both bands with DFS channels
REPORT_EVENT_AFTER_BUFFER_FULL = 0
REPORT_EVENT_AFTER_EACH_SCAN = 1
REPORT_EVENT_FULL_SCAN_RESULT = 2
SCAN_TYPE_LOW_LATENCY = 0
SCAN_TYPE_LOW_POWER = 1
SCAN_TYPE_HIGH_ACCURACY = 2
# US Wifi frequencies
ALL_2G_FREQUENCIES = [
2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
]
DFS_5G_FREQUENCIES = [
5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, 5640,
5660, 5680, 5700, 5720
]
NONE_DFS_5G_FREQUENCIES = [
5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
]
ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
band_to_frequencies = {
WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
}
# TODO: add all of the band mapping.
softap_band_frequencies = {
WIFI_CONFIG_SOFTAP_BAND_2G: ALL_2G_FREQUENCIES,
WIFI_CONFIG_SOFTAP_BAND_5G: ALL_5G_FREQUENCIES
}
# All Wifi frequencies to channels lookup.
freq_to_channel = {
2412: 1,
2417: 2,
2422: 3,
2427: 4,
2432: 5,
2437: 6,
2442: 7,
2447: 8,
2452: 9,
2457: 10,
2462: 11,
2467: 12,
2472: 13,
2484: 14,
4915: 183,
4920: 184,
4925: 185,
4935: 187,
4940: 188,
4945: 189,
4960: 192,
4980: 196,
5035: 7,
5040: 8,
5045: 9,
5055: 11,
5060: 12,
5080: 16,
5170: 34,
5180: 36,
5190: 38,
5200: 40,
5210: 42,
5220: 44,
5230: 46,
5240: 48,
5260: 52,
5280: 56,
5300: 60,
5320: 64,
5500: 100,
5520: 104,
5540: 108,
5560: 112,
5580: 116,
5600: 120,
5620: 124,
5640: 128,
5660: 132,
5680: 136,
5700: 140,
5745: 149,
5765: 153,
5785: 157,
5795: 159,
5805: 161,
5825: 165,
}
# All Wifi channels to frequencies lookup.
channel_2G_to_freq = {
1: 2412,
2: 2417,
3: 2422,
4: 2427,
5: 2432,
6: 2437,
7: 2442,
8: 2447,
9: 2452,
10: 2457,
11: 2462,
12: 2467,
13: 2472,
14: 2484
}
channel_5G_to_freq = {
183: 4915,
184: 4920,
185: 4925,
187: 4935,
188: 4940,
189: 4945,
192: 4960,
196: 4980,
7: 5035,
8: 5040,
9: 5045,
11: 5055,
12: 5060,
16: 5080,
34: 5170,
36: 5180,
38: 5190,
40: 5200,
42: 5210,
44: 5220,
46: 5230,
48: 5240,
50: 5250,
52: 5260,
56: 5280,
60: 5300,
64: 5320,
100: 5500,
104: 5520,
108: 5540,
112: 5560,
116: 5580,
120: 5600,
124: 5620,
128: 5640,
132: 5660,
136: 5680,
140: 5700,
149: 5745,
151: 5755,
153: 5765,
155: 5775,
157: 5785,
159: 5795,
161: 5805,
165: 5825
}
channel_6G_to_freq = {4 * x + 1: 5955 + 20 * x for x in range(59)}
channel_to_freq = {
'2G': channel_2G_to_freq,
'5G': channel_5G_to_freq,
'6G': channel_6G_to_freq
}
class WifiChannelBase:
ALL_2G_FREQUENCIES = []
DFS_5G_FREQUENCIES = []
NONE_DFS_5G_FREQUENCIES = []
ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
MIX_CHANNEL_SCAN = []
def band_to_freq(self, band):
_band_to_frequencies = {
WifiEnums.WIFI_BAND_24_GHZ:
self.ALL_2G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ:
self.NONE_DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY:
self.DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS:
self.ALL_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_BOTH:
self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
}
return _band_to_frequencies[band]
class WifiChannelUS(WifiChannelBase):
# US Wifi frequencies
ALL_2G_FREQUENCIES = [
2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
]
NONE_DFS_5G_FREQUENCIES = [
5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
]
MIX_CHANNEL_SCAN = [
2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500, 5320, 5520, 5560,
5700, 5745, 5805
]
def __init__(self, model=None, support_addition_channel=[]):
if model in support_addition_channel:
self.ALL_2G_FREQUENCIES = [
2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457,
2462, 2467, 2472
]
self.DFS_5G_FREQUENCIES = [
5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620,
5640, 5660, 5680, 5700, 5720
]
self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
class WifiReferenceNetworks:
""" Class to parse and return networks of different band and
auth type from reference_networks
"""
def __init__(self, obj):
self.reference_networks = obj
self.WIFI_2G = "2g"
self.WIFI_5G = "5g"
self.secure_networks_2g = []
self.secure_networks_5g = []
self.open_networks_2g = []
self.open_networks_5g = []
self._parse_networks()
def _parse_networks(self):
for network in self.reference_networks:
for key in network:
if key == self.WIFI_2G:
if "password" in network[key]:
self.secure_networks_2g.append(network[key])
else:
self.open_networks_2g.append(network[key])
else:
if "password" in network[key]:
self.secure_networks_5g.append(network[key])
else:
self.open_networks_5g.append(network[key])
def return_2g_secure_networks(self):
return self.secure_networks_2g
def return_5g_secure_networks(self):
return self.secure_networks_5g
def return_2g_open_networks(self):
return self.open_networks_2g
def return_5g_open_networks(self):
return self.open_networks_5g
def return_secure_networks(self):
return self.secure_networks_2g + self.secure_networks_5g
def return_open_networks(self):
return self.open_networks_2g + self.open_networks_5g
def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
"""Wrapper function that handles the bahevior of assert_on_fail.
When assert_on_fail is True, let all test signals through, which can
terminate test cases directly. When assert_on_fail is False, the wrapper
raises no test signals and reports operation status by returning True or
False.
Args:
func: The function to wrap. This function reports operation status by
raising test signals.
assert_on_fail: A boolean that specifies if the output of the wrapper
is test signal based or return value based.
args: Positional args for func.
kwargs: Name args for func.
Returns:
If assert_on_fail is True, returns True/False to signal operation
status, otherwise return nothing.
"""
try:
func(*args, **kwargs)
if not assert_on_fail:
return True
except signals.TestSignal:
if assert_on_fail:
raise
return False
def assert_network_in_list(target, network_list):
"""Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
networks.
Args:
target: A dict representing a Wi-Fi network.
E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
network_list: A list of dicts, each representing a Wi-Fi network.
"""
match_results = match_networks(target, network_list)
asserts.assert_true(
match_results, "Target network %s, does not exist in network list %s" %
(target, network_list))
def match_networks(target_params, networks):
"""Finds the WiFi networks that match a given set of parameters in a list
of WiFi networks.
To be considered a match, the network should contain every key-value pair
of target_params
Args:
target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
networks: A list of dict objects representing WiFi networks.
Returns:
The networks that match the target parameters.
"""
results = []
asserts.assert_true(target_params,
"Expected networks object 'target_params' is empty")
for n in networks:
add_network = 1
for k, v in target_params.items():
if k not in n:
add_network = 0
break
if n[k] != v:
add_network = 0
break
if add_network:
results.append(n)
return results
def wait_for_wifi_state(ad, state, assert_on_fail=True):
"""Waits for the device to transition to the specified wifi state
Args:
ad: An AndroidDevice object.
state: Wifi state to wait for.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns True if the device transitions
to the specified state, False otherwise. If assert_on_fail is True, no return value.
"""
return _assert_on_fail_handler(_wait_for_wifi_state,
assert_on_fail,
ad,
state=state)
def _wait_for_wifi_state(ad, state):
"""Toggles the state of wifi.
TestFailure signals are raised when something goes wrong.
Args:
ad: An AndroidDevice object.
state: Wifi state to wait for.
"""
if state == ad.droid.wifiCheckState():
# Check if the state is already achieved, so we don't wait for the
# state change event by mistake.
return
ad.droid.wifiStartTrackingStateChange()
fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (
state, ad.serial)
try:
ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
lambda x: x["data"]["enabled"] == state,
SHORT_TIMEOUT)
except Empty:
asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
"""Toggles the state of wifi.
Args:
ad: An AndroidDevice object.
new_state: Wifi state to set to. If None, opposite of the current state.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns True if the toggle was
successful, False otherwise. If assert_on_fail is True, no return value.
"""
return _assert_on_fail_handler(_wifi_toggle_state,
assert_on_fail,
ad,
new_state=new_state)
def _wifi_toggle_state(ad, new_state=None):
"""Toggles the state of wifi.
TestFailure signals are raised when something goes wrong.
Args:
ad: An AndroidDevice object.
new_state: The state to set Wi-Fi to. If None, opposite of the current
state will be set.
"""
if new_state is None:
new_state = not ad.droid.wifiCheckState()
elif new_state == ad.droid.wifiCheckState():
# Check if the new_state is already achieved, so we don't wait for the
# state change event by mistake.
return
ad.droid.wifiStartTrackingStateChange()
ad.log.info("Setting Wi-Fi state to %s.", new_state)
ad.ed.clear_all_events()
# Setting wifi state.
ad.droid.wifiToggleState(new_state)
time.sleep(2)
fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
ad.serial)
try:
ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
lambda x: x["data"]["enabled"] == new_state,
SHORT_TIMEOUT)
except Empty:
asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
finally:
ad.droid.wifiStopTrackingStateChange()
def reset_wifi(ad):
"""Clears all saved Wi-Fi networks on a device.
This will turn Wi-Fi on.
Args:
ad: An AndroidDevice object.
"""
networks = ad.droid.wifiGetConfiguredNetworks()
if not networks:
return
removed = []
for n in networks:
if n['networkId'] not in removed:
ad.droid.wifiForgetNetwork(n['networkId'])
removed.append(n['networkId'])
else:
continue
try:
event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
SHORT_TIMEOUT)
except Empty:
logging.warning("Could not confirm the removal of network %s.", n)
# Check again to see if there's any network left.
asserts.assert_true(
not ad.droid.wifiGetConfiguredNetworks(),
"Failed to remove these configured Wi-Fi networks: %s" % networks)
def toggle_airplane_mode_on_and_off(ad):
"""Turn ON and OFF Airplane mode.
ad: An AndroidDevice object.
Returns: Assert if turning on/off Airplane mode fails.
"""
ad.log.debug("Toggling Airplane mode ON.")
asserts.assert_true(utils.force_airplane_mode(ad, True),
"Can not turn on airplane mode on: %s" % ad.serial)
time.sleep(DEFAULT_TIMEOUT)
ad.log.debug("Toggling Airplane mode OFF.")
asserts.assert_true(utils.force_airplane_mode(ad, False),
"Can not turn on airplane mode on: %s" % ad.serial)
time.sleep(DEFAULT_TIMEOUT)
def toggle_wifi_off_and_on(ad):
"""Turn OFF and ON WiFi.
ad: An AndroidDevice object.
Returns: Assert if turning off/on WiFi fails.
"""
ad.log.debug("Toggling wifi OFF.")
wifi_toggle_state(ad, False)
time.sleep(DEFAULT_TIMEOUT)
ad.log.debug("Toggling wifi ON.")
wifi_toggle_state(ad, True)
time.sleep(DEFAULT_TIMEOUT)
def wifi_forget_network(ad, net_ssid):
"""Remove configured Wifi network on an android device.
Args:
ad: android_device object for forget network.
net_ssid: ssid of network to be forget
"""
networks = ad.droid.wifiGetConfiguredNetworks()
if not networks:
return
removed = []
for n in networks:
if net_ssid in n[WifiEnums.SSID_KEY] and n['networkId'] not in removed:
ad.droid.wifiForgetNetwork(n['networkId'])
removed.append(n['networkId'])
try:
event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
SHORT_TIMEOUT)
except Empty:
asserts.fail("Failed to remove network %s." % n)
break
def wifi_test_device_init(ad, country_code=WifiEnums.CountryCode.US):
"""Initializes an android device for wifi testing.
0. Make sure SL4A connection is established on the android device.
1. Disable location service's WiFi scan.
2. Turn WiFi on.
3. Clear all saved networks.
4. Set country code to US.
5. Enable WiFi verbose logging.
6. Sync device time with computer time.
7. Turn off cellular data.
8. Turn off ambient display.
"""
utils.require_sl4a((ad, ))
ad.droid.wifiScannerToggleAlwaysAvailable(False)
msg = "Failed to turn off location service's scan."
asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
wifi_toggle_state(ad, True)
reset_wifi(ad)
ad.droid.wifiEnableVerboseLogging(1)
msg = "Failed to enable WiFi verbose logging."
asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
# We don't verify the following settings since they are not critical.
# Set wpa_supplicant log level to EXCESSIVE.
output = ad.adb.shell(
"wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
"wlan0 log_level EXCESSIVE",
ignore_status=True)
ad.log.info("wpa_supplicant log change status: %s", output)
utils.sync_device_time(ad)
ad.droid.telephonyToggleDataConnection(False)
set_wifi_country_code(ad, country_code)
utils.set_ambient_display(ad, False)
def set_wifi_country_code(ad, country_code):
"""Sets the wifi country code on the device.
Args:
ad: An AndroidDevice object.
country_code: 2 letter ISO country code
Raises:
An RpcException if unable to set the country code.
"""
try:
ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
except Exception as e:
ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
def start_wifi_connection_scan(ad):
"""Starts a wifi connection scan and wait for results to become available.
Args:
ad: An AndroidDevice object.
"""
ad.ed.clear_all_events()
ad.droid.wifiStartScan()
try:
ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
except Empty:
asserts.fail("Wi-Fi results did not become available within 60s.")
def start_wifi_connection_scan_and_return_status(ad):
"""
Starts a wifi connection scan and wait for results to become available
or a scan failure to be reported.
Args:
ad: An AndroidDevice object.
Returns:
True: if scan succeeded & results are available
False: if scan failed
"""
ad.ed.clear_all_events()
ad.droid.wifiStartScan()
try:
events = ad.ed.pop_events("WifiManagerScan(ResultsAvailable|Failure)",
60)
except Empty:
asserts.fail(
"Wi-Fi scan results/failure did not become available within 60s.")
# If there are multiple matches, we check for atleast one success.
for event in events:
if event["name"] == "WifiManagerScanResultsAvailable":
return True
elif event["name"] == "WifiManagerScanFailure":
ad.log.debug("Scan failure received")
return False
def start_wifi_connection_scan_and_check_for_network(ad,
network_ssid,
max_tries=3):
"""
Start connectivity scans & checks if the |network_ssid| is seen in
scan results. The method performs a max of |max_tries| connectivity scans
to find the network.
Args:
ad: An AndroidDevice object.
network_ssid: SSID of the network we are looking for.
max_tries: Number of scans to try.
Returns:
True: if network_ssid is found in scan results.
False: if network_ssid is not found in scan results.
"""
start_time = time.time()
for num_tries in range(max_tries):
if start_wifi_connection_scan_and_return_status(ad):
scan_results = ad.droid.wifiGetScanResults()
match_results = match_networks({WifiEnums.SSID_KEY: network_ssid},
scan_results)
if len(match_results) > 0:
ad.log.debug("Found network in %s seconds." %
(time.time() - start_time))
return True
ad.log.debug("Did not find network in %s seconds." %
(time.time() - start_time))
return False
def start_wifi_connection_scan_and_ensure_network_found(
ad, network_ssid, max_tries=3):
"""
Start connectivity scans & ensure the |network_ssid| is seen in
scan results. The method performs a max of |max_tries| connectivity scans
to find the network.
This method asserts on failure!
Args:
ad: An AndroidDevice object.
network_ssid: SSID of the network we are looking for.
max_tries: Number of scans to try.
"""
ad.log.info("Starting scans to ensure %s is present", network_ssid)
assert_msg = "Failed to find " + network_ssid + " in scan results" \
" after " + str(max_tries) + " tries"
asserts.assert_true(
start_wifi_connection_scan_and_check_for_network(
ad, network_ssid, max_tries), assert_msg)
def start_wifi_connection_scan_and_ensure_network_not_found(
ad, network_ssid, max_tries=3):
"""
Start connectivity scans & ensure the |network_ssid| is not seen in
scan results. The method performs a max of |max_tries| connectivity scans
to find the network.
This method asserts on failure!
Args:
ad: An AndroidDevice object.
network_ssid: SSID of the network we are looking for.
max_tries: Number of scans to try.
"""
ad.log.info("Starting scans to ensure %s is not present", network_ssid)
assert_msg = "Found " + network_ssid + " in scan results" \
" after " + str(max_tries) + " tries"
asserts.assert_false(
start_wifi_connection_scan_and_check_for_network(
ad, network_ssid, max_tries), assert_msg)
def start_wifi_background_scan(ad, scan_setting):
"""Starts wifi background scan.
Args:
ad: android_device object to initiate connection on.
scan_setting: A dict representing the settings of the scan.
Returns:
If scan was started successfully, event data of success event is returned.
"""
idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
SHORT_TIMEOUT)
return event['data']
def start_wifi_tethering(ad, ssid, password, band=None, hidden=None,
security=None):
"""Starts wifi tethering on an android_device.
Args:
ad: android_device to start wifi tethering on.
ssid: The SSID the soft AP should broadcast.
password: The password the soft AP should use.
band: The band the soft AP should be set on. It should be either
WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
hidden: boolean to indicate if the AP needs to be hidden or not.
security: security type of softap.
Returns:
No return value. Error checks in this function will raise test failure signals
"""
config = {WifiEnums.SSID_KEY: ssid}
if password:
config[WifiEnums.PWD_KEY] = password
if band:
config[WifiEnums.AP_BAND_KEY] = band
if hidden:
config[WifiEnums.HIDDEN_KEY] = hidden
if security:
config[WifiEnums.SECURITY] = security
asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(config),
"Failed to update WifiAp Configuration")
ad.droid.wifiStartTrackingTetherStateChange()
ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
try:
ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
ad.ed.wait_for_event("TetherStateChanged",
lambda x: x["data"]["ACTIVE_TETHER"], 30)
ad.log.debug("Tethering started successfully.")
except Empty:
msg = "Failed to receive confirmation of wifi tethering starting"
asserts.fail(msg)
finally:
ad.droid.wifiStopTrackingTetherStateChange()
def save_wifi_soft_ap_config(ad,
wifi_config,
band=None,
hidden=None,
security=None,
password=None,
channel=None,
max_clients=None,
shutdown_timeout_enable=None,
shutdown_timeout_millis=None,
client_control_enable=None,
allowedList=None,
blockedList=None,
bands=None,
channel_frequencys=None,
mac_randomization_setting=None,
bridged_opportunistic_shutdown_enabled=None,
ieee80211ax_enabled=None):
""" Save a soft ap configuration and verified
Args:
ad: android_device to set soft ap configuration.
wifi_config: a soft ap configuration object, at least include SSID.
band: specifies the band for the soft ap.
hidden: specifies the soft ap need to broadcast its SSID or not.
security: specifies the security type for the soft ap.
password: specifies the password for the soft ap.
channel: specifies the channel for the soft ap.
max_clients: specifies the maximum connected client number.
shutdown_timeout_enable: specifies the auto shut down enable or not.
shutdown_timeout_millis: specifies the shut down timeout value.
client_control_enable: specifies the client control enable or not.
allowedList: specifies allowed clients list.
blockedList: specifies blocked clients list.
bands: specifies the band list for the soft ap.
channel_frequencys: specifies the channel frequency list for soft ap.
mac_randomization_setting: specifies the mac randomization setting.
bridged_opportunistic_shutdown_enabled: specifies the opportunistic
shutdown enable or not.
ieee80211ax_enabled: specifies the ieee80211ax enable or not.
"""
if security and password:
wifi_config[WifiEnums.SECURITY] = security
wifi_config[WifiEnums.PWD_KEY] = password
if hidden is not None:
wifi_config[WifiEnums.HIDDEN_KEY] = hidden
if max_clients is not None:
wifi_config[WifiEnums.AP_MAXCLIENTS_KEY] = max_clients
if shutdown_timeout_enable is not None:
wifi_config[
WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] = shutdown_timeout_enable
if shutdown_timeout_millis is not None:
wifi_config[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] = shutdown_timeout_millis
if client_control_enable is not None:
wifi_config[WifiEnums.AP_CLIENTCONTROL_KEY] = client_control_enable
if allowedList is not None:
wifi_config[WifiEnums.AP_ALLOWEDLIST_KEY] = allowedList
if blockedList is not None:
wifi_config[WifiEnums.AP_BLOCKEDLIST_KEY] = blockedList
if mac_randomization_setting is not None:
wifi_config[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY
] = mac_randomization_setting
if bridged_opportunistic_shutdown_enabled is not None:
wifi_config[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY
] = bridged_opportunistic_shutdown_enabled
if ieee80211ax_enabled is not None:
wifi_config[WifiEnums.AP_IEEE80211AX_ENABLED_KEY]= ieee80211ax_enabled
if channel_frequencys is not None:
wifi_config[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] = channel_frequencys
elif bands is not None:
wifi_config[WifiEnums.AP_BANDS_KEY] = bands
elif band is not None:
if channel is not None:
wifi_config[WifiEnums.AP_BAND_KEY] = band
wifi_config[WifiEnums.AP_CHANNEL_KEY] = channel
else:
wifi_config[WifiEnums.AP_BAND_KEY] = band
if WifiEnums.AP_CHANNEL_KEY in wifi_config and wifi_config[
WifiEnums.AP_CHANNEL_KEY] == 0:
del wifi_config[WifiEnums.AP_CHANNEL_KEY]
if WifiEnums.SECURITY in wifi_config and wifi_config[
WifiEnums.SECURITY] == WifiEnums.SoftApSecurityType.OPEN:
del wifi_config[WifiEnums.SECURITY]
del wifi_config[WifiEnums.PWD_KEY]
asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
"Failed to set WifiAp Configuration")
wifi_ap = ad.droid.wifiGetApConfiguration()
asserts.assert_true(
wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
"Hotspot SSID doesn't match")
if WifiEnums.SECURITY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.SECURITY] == wifi_config[WifiEnums.SECURITY],
"Hotspot Security doesn't match")
if WifiEnums.PWD_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.PWD_KEY] == wifi_config[WifiEnums.PWD_KEY],
"Hotspot Password doesn't match")
if WifiEnums.HIDDEN_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.HIDDEN_KEY] == wifi_config[WifiEnums.HIDDEN_KEY],
"Hotspot hidden setting doesn't match")
if WifiEnums.AP_CHANNEL_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_CHANNEL_KEY] == wifi_config[
WifiEnums.AP_CHANNEL_KEY], "Hotspot Channel doesn't match")
if WifiEnums.AP_MAXCLIENTS_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_MAXCLIENTS_KEY] == wifi_config[
WifiEnums.AP_MAXCLIENTS_KEY],
"Hotspot Max Clients doesn't match")
if WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] == wifi_config[
WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY],
"Hotspot ShutDown feature flag doesn't match")
if WifiEnums.AP_SHUTDOWNTIMEOUT_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] == wifi_config[
WifiEnums.AP_SHUTDOWNTIMEOUT_KEY],
"Hotspot ShutDown timeout setting doesn't match")
if WifiEnums.AP_CLIENTCONTROL_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_CLIENTCONTROL_KEY] == wifi_config[
WifiEnums.AP_CLIENTCONTROL_KEY],
"Hotspot Client control flag doesn't match")
if WifiEnums.AP_ALLOWEDLIST_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_ALLOWEDLIST_KEY] == wifi_config[
WifiEnums.AP_ALLOWEDLIST_KEY],
"Hotspot Allowed List doesn't match")
if WifiEnums.AP_BLOCKEDLIST_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_BLOCKEDLIST_KEY] == wifi_config[
WifiEnums.AP_BLOCKEDLIST_KEY],
"Hotspot Blocked List doesn't match")
if WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY] == wifi_config[
WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY],
"Hotspot Mac randomization setting doesn't match")
if WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY] == wifi_config[
WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY],
"Hotspot bridged shutdown enable setting doesn't match")
if WifiEnums.AP_IEEE80211AX_ENABLED_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_IEEE80211AX_ENABLED_KEY] == wifi_config[
WifiEnums.AP_IEEE80211AX_ENABLED_KEY],
"Hotspot 80211 AX enable setting doesn't match")
if WifiEnums.AP_CHANNEL_FREQUENCYS_KEY in wifi_config:
asserts.assert_true(
wifi_ap[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] == wifi_config[
WifiEnums.AP_CHANNEL_FREQUENCYS_KEY],
"Hotspot channels setting doesn't match")
def start_wifi_tethering_saved_config(ad):
""" Turn on wifi hotspot with a config that is already saved """
ad.droid.wifiStartTrackingTetherStateChange()
ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
try:
ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
ad.ed.wait_for_event("TetherStateChanged",
lambda x: x["data"]["ACTIVE_TETHER"], 30)
except:
asserts.fail("Didn't receive wifi tethering starting confirmation")
finally:
ad.droid.wifiStopTrackingTetherStateChange()
def stop_wifi_tethering(ad):
"""Stops wifi tethering on an android_device.
Args:
ad: android_device to stop wifi tethering on.
"""
ad.droid.wifiStartTrackingTetherStateChange()
ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
try:
ad.ed.pop_event("WifiManagerApDisabled", 30)
ad.ed.wait_for_event("TetherStateChanged",
lambda x: not x["data"]["ACTIVE_TETHER"], 30)
except Empty:
msg = "Failed to receive confirmation of wifi tethering stopping"
asserts.fail(msg)
finally:
ad.droid.wifiStopTrackingTetherStateChange()
def toggle_wifi_and_wait_for_reconnection(ad,
network,
num_of_tries=1,
assert_on_fail=True):
"""Toggle wifi state and then wait for Android device to reconnect to
the provided wifi network.
This expects the device to be already connected to the provided network.
Logic steps are
1. Ensure that we're connected to the network.
2. Turn wifi off.
3. Wait for 10 seconds.
4. Turn wifi on.
5. Wait for the "connected" event, then confirm the connected ssid is the
one requested.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to await connection. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns True if the toggle was
successful, False otherwise. If assert_on_fail is True, no return value.
"""
return _assert_on_fail_handler(_toggle_wifi_and_wait_for_reconnection,
assert_on_fail,
ad,
network,
num_of_tries=num_of_tries)
def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=3):
"""Toggle wifi state and then wait for Android device to reconnect to
the provided wifi network.
This expects the device to be already connected to the provided network.
Logic steps are
1. Ensure that we're connected to the network.
2. Turn wifi off.
3. Wait for 10 seconds.
4. Turn wifi on.
5. Wait for the "connected" event, then confirm the connected ssid is the
one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to await connection. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
expected_ssid = network[WifiEnums.SSID_KEY]
# First ensure that we're already connected to the provided network.
verify_con = {WifiEnums.SSID_KEY: expected_ssid}
verify_wifi_connection_info(ad, verify_con)
# Now toggle wifi state and wait for the connection event.
wifi_toggle_state(ad, False)
time.sleep(10)
wifi_toggle_state(ad, True)
ad.droid.wifiStartTrackingStateChange()
try:
connect_result = None
for i in range(num_of_tries):
try:
connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
30)
break
except Empty:
pass
asserts.assert_true(
connect_result, "Failed to connect to Wi-Fi network %s on %s" %
(network, ad.serial))
logging.debug("Connection result on %s: %s.", ad.serial,
connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
asserts.assert_equal(
actual_ssid, expected_ssid, "Connected to the wrong network on %s."
"Expected %s, but got %s." %
(ad.serial, expected_ssid, actual_ssid))
logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
ad.serial)
finally:
ad.droid.wifiStopTrackingStateChange()
def wait_for_connect(ad,
expected_ssid=None,
expected_id=None,
tries=2,
assert_on_fail=True):
"""Wait for a connect event.
This will directly fail a test if anything goes wrong.
Args:
ad: An Android device object.
expected_ssid: SSID of the network to connect to.
expected_id: Network Id of the network to connect to.
tries: An integer that is the number of times to try before failing.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
return _assert_on_fail_handler(_wait_for_connect, assert_on_fail, ad,
expected_ssid, expected_id, tries)
def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
"""Wait for a connect event.
Args:
ad: An Android device object.
expected_ssid: SSID of the network to connect to.
expected_id: Network Id of the network to connect to.
tries: An integer that is the number of times to try before failing.
"""
ad.droid.wifiStartTrackingStateChange()
try:
connect_result = _wait_for_connect_event(ad,
ssid=expected_ssid,
id=expected_id,
tries=tries)
asserts.assert_true(
connect_result,
"Failed to connect to Wi-Fi network %s" % expected_ssid)
ad.log.debug("Wi-Fi connection result: %s.", connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
if expected_ssid:
asserts.assert_equal(actual_ssid, expected_ssid,
"Connected to the wrong network")
actual_id = connect_result['data'][WifiEnums.NETID_KEY]
if expected_id:
asserts.assert_equal(actual_id, expected_id,
"Connected to the wrong network")
ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
except Empty:
asserts.fail("Failed to start connection process to %s" %
expected_ssid)
except Exception as error:
ad.log.error("Failed to connect to %s with error %s", expected_ssid,
error)
raise signals.TestFailure("Failed to connect to %s network" %
expected_ssid)
finally:
ad.droid.wifiStopTrackingStateChange()
def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
"""Wait for a connect event on queue and pop when available.
Args:
ad: An Android device object.
ssid: SSID of the network to connect to.
id: Network Id of the network to connect to.
tries: An integer that is the number of times to try before failing.
Returns:
A dict with details of the connection data, which looks like this:
{
'time': 1485460337798,
'name': 'WifiNetworkConnected',
'data': {
'rssi': -27,
'is_24ghz': True,
'mac_address': '02:00:00:00:00:00',
'network_id': 1,
'BSSID': '30:b5:c2:33:d3:fc',
'ip_address': 117483712,
'link_speed': 54,
'supplicant_state': 'completed',
'hidden_ssid': False,
'SSID': 'wh_ap1_2g',
'is_5ghz': False}
}
"""
conn_result = None
# If ssid and network id is None, just wait for any connect event.
if id is None and ssid is None:
for i in range(tries):
try:
conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
30)
break
except Empty:
pass
else:
# If ssid or network id is specified, wait for specific connect event.
for i in range(tries):
try:
conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
30)
if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
break
elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
break
except Empty:
pass
return conn_result
def wait_for_disconnect(ad, timeout=10):
"""Wait for a disconnect event within the specified timeout.
Args:
ad: Android device object.
timeout: Timeout in seconds.
"""
try:
ad.droid.wifiStartTrackingStateChange()
event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
except Empty:
raise signals.TestFailure("Device did not disconnect from the network")
finally:
ad.droid.wifiStopTrackingStateChange()
def ensure_no_disconnect(ad, duration=10):
"""Ensure that there is no disconnect for the specified duration.
Args:
ad: Android device object.
duration: Duration in seconds.
"""
try:
ad.droid.wifiStartTrackingStateChange()
event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
raise signals.TestFailure("Device disconnected from the network")
except Empty:
pass
finally:
ad.droid.wifiStopTrackingStateChange()
def connect_to_wifi_network(ad, network, assert_on_fail=True,
check_connectivity=True, hidden=False,
num_of_scan_tries=DEFAULT_SCAN_TRIES,
num_of_connect_tries=DEFAULT_CONNECT_TRIES):
"""Connection logic for open and psk wifi networks.
Args:
ad: AndroidDevice to use for connection
network: network info of the network to connect to
assert_on_fail: If true, errors from wifi_connect will raise
test failure signals.
hidden: Is the Wifi network hidden.
num_of_scan_tries: The number of times to try scan
interface before declaring failure.
num_of_connect_tries: The number of times to try
connect wifi before declaring failure.
"""
if hidden:
start_wifi_connection_scan_and_ensure_network_not_found(
ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
else:
start_wifi_connection_scan_and_ensure_network_found(
ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
wifi_connect(ad,
network,
num_of_tries=num_of_connect_tries,
assert_on_fail=assert_on_fail,
check_connectivity=check_connectivity)
def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
"""Connect to the given network using network id and verify SSID.
Args:
network_id: int Network Id of the network.
network_ssid: string SSID of the network.
Returns: True if connect using network id was successful;
False otherwise.
"""
start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
wifi_connect_by_id(ad, network_id)
connect_data = ad.droid.wifiGetConnectionInfo()
connect_ssid = connect_data[WifiEnums.SSID_KEY]
ad.log.debug("Expected SSID = %s Connected SSID = %s" %
(network_ssid, connect_ssid))
if connect_ssid != network_ssid:
return False
return True
def wifi_connect(ad,
network,
num_of_tries=1,
assert_on_fail=True,
check_connectivity=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
return _assert_on_fail_handler(_wifi_connect,
assert_on_fail,
ad,
network,
num_of_tries=num_of_tries,
check_connectivity=check_connectivity)
def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
asserts.assert_true(
WifiEnums.SSID_KEY in network,
"Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
ad.droid.wifiStartTrackingStateChange()
expected_ssid = network[WifiEnums.SSID_KEY]
ad.droid.wifiConnectByConfig(network)
ad.log.info("Starting connection process to %s", expected_ssid)
try:
event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
connect_result = _wait_for_connect_event(ad,
ssid=expected_ssid,
tries=num_of_tries)
asserts.assert_true(
connect_result, "Failed to connect to Wi-Fi network %s on %s" %
(network, ad.serial))
ad.log.debug("Wi-Fi connection result: %s.", connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
asserts.assert_equal(
actual_ssid, expected_ssid,
"Connected to the wrong network on %s." % ad.serial)
ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
if check_connectivity:
internet = validate_connection(ad, DEFAULT_PING_ADDR)
if not internet:
raise signals.TestFailure(
"Failed to connect to internet on %s" % expected_ssid)
except Empty:
asserts.fail("Failed to start connection process to %s on %s" %
(network, ad.serial))
except Exception as error:
ad.log.error("Failed to connect to %s with error %s", expected_ssid,
error)
raise signals.TestFailure("Failed to connect to %s network" % network)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
"""Connect an Android device to a wifi network using network Id.
Start connection to the wifi network, with the given network Id, wait for
the "connected" event, then verify the connected network is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
network_id: Integer specifying the network id of the network.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
_assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
network_id, num_of_tries)
def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
"""Connect an Android device to a wifi network using it's network id.
Start connection to the wifi network, with the given network id, wait for
the "connected" event, then verify the connected network is the one requested.
Args:
ad: android_device object to initiate connection on.
network_id: Integer specifying the network id of the network.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
ad.droid.wifiStartTrackingStateChange()
# Clear all previous events.
ad.ed.clear_all_events()
ad.droid.wifiConnectByNetworkId(network_id)
ad.log.info("Starting connection to network with id %d", network_id)
try:
event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
connect_result = _wait_for_connect_event(ad,
id=network_id,
tries=num_of_tries)
asserts.assert_true(
connect_result,
"Failed to connect to Wi-Fi network using network id")
ad.log.debug("Wi-Fi connection result: %s", connect_result)
actual_id = connect_result['data'][WifiEnums.NETID_KEY]
asserts.assert_equal(
actual_id, network_id, "Connected to the wrong network on %s."
"Expected network id = %d, but got %d." %
(ad.serial, network_id, actual_id))
expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
ad.log.info("Connected to Wi-Fi network %s with %d network id.",
expected_ssid, network_id)
internet = validate_connection(ad, DEFAULT_PING_ADDR)
if not internet:
raise signals.TestFailure("Failed to connect to internet on %s" %
expected_ssid)
except Empty:
asserts.fail("Failed to connect to network with id %d on %s" %
(network_id, ad.serial))
except Exception as error:
ad.log.error("Failed to connect to network with id %d with error %s",
network_id, error)
raise signals.TestFailure("Failed to connect to network with network"
" id %d" % network_id)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_connect_using_network_request(ad,
network,
network_specifier,
num_of_tries=3):
"""Connect an Android device to a wifi network using network request.
Trigger a network request with the provided network specifier,
wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
Args:
ad: android_device object to initiate connection on.
network_specifier: A dictionary representing the network specifier to
use.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
num_of_tries: An integer that is the number of times to try before
delaring failure.
Returns:
key: Key corresponding to network request.
"""
key = ad.droid.connectivityRequestWifiNetwork(network_specifier, 0)
ad.log.info("Sent network request %s with %s " % (key, network_specifier))
# Need a delay here because UI interaction should only start once wifi
# starts processing the request.
time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
_wait_for_wifi_connect_after_network_request(ad, network, key,
num_of_tries)
return key
def wait_for_wifi_connect_after_network_request(ad,
network,
key,
num_of_tries=3,
assert_on_fail=True):
"""
Simulate and verify the connection flow after initiating the network
request.
Wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
key: Key corresponding to network request.
num_of_tries: An integer that is the number of times to try before
delaring failure.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
Returns a value only if assert_on_fail is false.
Returns True if the connection was successful, False otherwise.
"""
_assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
assert_on_fail, ad, network, key, num_of_tries)
def _wait_for_wifi_connect_after_network_request(ad,
network,
key,
num_of_tries=3):
"""
Simulate and verify the connection flow after initiating the network
request.
Wait for the "onMatch" event, ensure that the scan results in "onMatch"
event contain the specified network, then simulate the user granting the
request with the specified network selected. Then wait for the "onAvailable"
network callback indicating successful connection to network.
Args:
ad: android_device object to initiate connection on.
network: A dictionary representing the network to connect to. The
dictionary must have the key "SSID".
key: Key corresponding to network request.
num_of_tries: An integer that is the number of times to try before
delaring failure.
"""
asserts.assert_true(
WifiEnums.SSID_KEY in network,
"Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
ad.droid.wifiStartTrackingStateChange()
expected_ssid = network[WifiEnums.SSID_KEY]
ad.droid.wifiRegisterNetworkRequestMatchCallback()
# Wait for the platform to scan and return a list of networks
# matching the request
try:
matched_network = None
for _ in [0, num_of_tries]:
on_match_event = ad.ed.pop_event(
wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
asserts.assert_true(on_match_event,
"Network request on match not received.")
matched_scan_results = on_match_event["data"]
ad.log.debug("Network request on match results %s",
matched_scan_results)
matched_network = match_networks(
{WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
matched_scan_results)
ad.log.debug("Network request on match %s", matched_network)
if matched_network:
break
asserts.assert_true(matched_network,
"Target network %s not found" % network)
ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
ad.log.info("Sent user selection for network request %s",
expected_ssid)
# Wait for the platform to connect to the network.
autils.wait_for_event_with_keys(
ad, cconsts.EVENT_NETWORK_CALLBACK, 60,
(cconsts.NETWORK_CB_KEY_ID, key),
(cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_AVAILABLE))
on_capabilities_changed = autils.wait_for_event_with_keys(
ad, cconsts.EVENT_NETWORK_CALLBACK, 10,
(cconsts.NETWORK_CB_KEY_ID, key),
(cconsts.NETWORK_CB_KEY_EVENT,
cconsts.NETWORK_CB_CAPABILITIES_CHANGED))
connected_network = None
# WifiInfo is attached to TransportInfo only in S.
if ad.droid.isSdkAtLeastS():
connected_network = (
on_capabilities_changed["data"][
cconsts.NETWORK_CB_KEY_TRANSPORT_INFO]
)
else:
connected_network = ad.droid.wifiGetConnectionInfo()
ad.log.info("Connected to network %s", connected_network)
asserts.assert_equal(
connected_network[WifiEnums.SSID_KEY], expected_ssid,
"Connected to the wrong network."
"Expected %s, but got %s." % (network, connected_network))
except Empty:
asserts.fail("Failed to connect to %s" % expected_ssid)
except Exception as error:
ad.log.error("Failed to connect to %s with error %s" %
(expected_ssid, error))
raise signals.TestFailure("Failed to connect to %s network" % network)
finally:
ad.droid.wifiStopTrackingStateChange()
def wifi_passpoint_connect(ad,
passpoint_network,
num_of_tries=1,
assert_on_fail=True):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
passpoint_network: SSID of the Passpoint network to connect to.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
assert_on_fail: If True, error checks in this function will raise test
failure signals.
Returns:
If assert_on_fail is False, function returns network id, if the connect was
successful, False otherwise. If assert_on_fail is True, no return value.
"""
_assert_on_fail_handler(_wifi_passpoint_connect,
assert_on_fail,
ad,
passpoint_network,
num_of_tries=num_of_tries)
def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
"""Connect an Android device to a wifi network.
Initiate connection to a wifi network, wait for the "connected" event, then
confirm the connected ssid is the one requested.
This will directly fail a test if anything goes wrong.
Args:
ad: android_device object to initiate connection on.
passpoint_network: SSID of the Passpoint network to connect to.
num_of_tries: An integer that is the number of times to try before
delaring failure. Default is 1.
"""
ad.droid.wifiStartTrackingStateChange()
expected_ssid = passpoint_network
ad.log.info("Starting connection process to passpoint %s", expected_ssid)
try:
connect_result = _wait_for_connect_event(ad, expected_ssid,
num_of_tries)
asserts.assert_true(
connect_result, "Failed to connect to WiFi passpoint network %s on"
" %s" % (expected_ssid, ad.serial))
ad.log.info("Wi-Fi connection result: %s.", connect_result)
actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
asserts.assert_equal(
actual_ssid, expected_ssid,
"Connected to the wrong network on %s." % ad.serial)
ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)
internet = validate_connection(ad, DEFAULT_PING_ADDR)
if not internet:
raise signals.TestFailure("Failed to connect to internet on %s" %
expected_ssid)
except Exception as error:
ad.log.error("Failed to connect to passpoint network %s with error %s",
expected_ssid, error)
raise signals.TestFailure("Failed to connect to %s passpoint network" %
expected_ssid)
finally:
ad.droid.wifiStopTrackingStateChange()
def delete_passpoint(ad, fqdn):
"""Delete a required Passpoint configuration."""
try:
ad.droid.removePasspointConfig(fqdn)
return True
except Exception as error:
ad.log.error(
"Failed to remove passpoint configuration with FQDN=%s "
"and error=%s", fqdn, error)
return False
def start_wifi_single_scan(ad, scan_setting):
"""Starts wifi single shot scan.
Args:
ad: android_device object to initiate connection on.
scan_setting: A dict representing the settings of the scan.
Returns:
If scan was started successfully, event data of success event is returned.
"""
idx = ad.droid.wifiScannerStartScan(scan_setting)
event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
ad.log.debug("Got event %s", event)
return event['data']
def track_connection(ad, network_ssid, check_connection_count):
"""Track wifi connection to network changes for given number of counts
Args:
ad: android_device object for forget network.
network_ssid: network ssid to which connection would be tracked
check_connection_count: Integer for maximum number network connection
check.
Returns:
True if connection to given network happen, else return False.
"""
ad.droid.wifiStartTrackingStateChange()
while check_connection_count > 0:
connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
ad.log.info("Connected to network %s", connect_network)
if (WifiEnums.SSID_KEY in connect_network['data'] and
connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
return True
check_connection_count -= 1
ad.droid.wifiStopTrackingStateChange()
return False
def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
"""Calculate the scan time required based on the band or channels in scan
setting
Args:
wifi_chs: Object of channels supported
scan_setting: scan setting used for start scan
stime_channel: scan time per channel
Returns:
scan_time: time required for completing a scan
scan_channels: channel used for scanning
"""
scan_time = 0
scan_channels = []
if "band" in scan_setting and "channels" not in scan_setting:
scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
elif "channels" in scan_setting and "band" not in scan_setting:
scan_channels = scan_setting["channels"]
scan_time = len(scan_channels) * stime_channel
for channel in scan_channels:
if channel in WifiEnums.DFS_5G_FREQUENCIES:
scan_time += 132 #passive scan time on DFS
return scan_time, scan_channels
def start_wifi_track_bssid(ad, track_setting):
"""Start tracking Bssid for the given settings.
Args:
ad: android_device object.
track_setting: Setting for which the bssid tracking should be started
Returns:
If tracking started successfully, event data of success event is returned.
"""
idx = ad.droid.wifiScannerStartTrackingBssids(
track_setting["bssidInfos"], track_setting["apLostThreshold"])
event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
SHORT_TIMEOUT)
return event['data']
def convert_pem_key_to_pkcs8(in_file, out_file):
"""Converts the key file generated by us to the format required by
Android using openssl.
The input file must have the extension "pem". The output file must
have the extension "der".
Args:
in_file: The original key file.
out_file: The full path to the converted key file, including
filename.
"""
asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
asserts.assert_true(out_file.endswith(".der"),
"Output file has to be .der.")
cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
" -topk8").format(in_file, out_file)
utils.exe_cmd(cmd)
def validate_connection(ad,
ping_addr=DEFAULT_PING_ADDR,
wait_time=15,
ping_gateway=True):
"""Validate internet connection by pinging the address provided.
Args:
ad: android_device object.
ping_addr: address on internet for pinging.
wait_time: wait for some time before validating connection
Returns:
ping output if successful, NULL otherwise.
"""
android_version = int(ad.adb.shell("getprop ro.vendor.build.version.release"))
# wait_time to allow for DHCP to complete.
for i in range(wait_time):
if ad.droid.connectivityNetworkIsConnected():
if (android_version > 10 and ad.droid.connectivityGetIPv4DefaultGateway()) or android_version < 11:
break
time.sleep(1)
ping = False
try:
ping = ad.droid.httpPing(ping_addr)
ad.log.info("Http ping result: %s.", ping)
except:
pass
if android_version > 10 and not ping and ping_gateway:
ad.log.info("Http ping failed. Pinging default gateway")
gw = ad.droid.connectivityGetIPv4DefaultGateway()
result = ad.adb.shell("ping -c 6 {}".format(gw))
ad.log.info("Default gateway ping result: %s" % result)
ping = False if "100% packet loss" in result else True
return ping
#TODO(angli): This can only verify if an actual value is exactly the same.
# Would be nice to be able to verify an actual value is one of serveral.
def verify_wifi_connection_info(ad, expected_con):
"""Verifies that the information of the currently connected wifi network is
as expected.
Args:
expected_con: A dict representing expected key-value pairs for wifi
connection. e.g. {"SSID": "test_wifi"}
"""
current_con = ad.droid.wifiGetConnectionInfo()
case_insensitive = ["BSSID", "supplicant_state"]
ad.log.debug("Current connection: %s", current_con)
for k, expected_v in expected_con.items():
# Do not verify authentication related fields.
if k == "password":
continue
msg = "Field %s does not exist in wifi connection info %s." % (
k, current_con)
if k not in current_con:
raise signals.TestFailure(msg)
actual_v = current_con[k]
if k in case_insensitive:
actual_v = actual_v.lower()
expected_v = expected_v.lower()
msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
actual_v)
if actual_v != expected_v:
raise signals.TestFailure(msg)
def check_autoconnect_to_open_network(
ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
"""Connects to any open WiFI AP
Args:
timeout value in sec to wait for UE to connect to a WiFi AP
Returns:
True if UE connects to WiFi AP (supplicant_state = completed)
False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
"""
if ad.droid.wifiCheckState():
return True
ad.droid.wifiToggleState()
wifi_connection_state = None
timeout = time.time() + conn_timeout
while wifi_connection_state != "completed":
wifi_connection_state = ad.droid.wifiGetConnectionInfo(
)['supplicant_state']
if time.time() > timeout:
ad.log.warning("Failed to connect to WiFi AP")
return False
return True
def expand_enterprise_config_by_phase2(config):
"""Take an enterprise config and generate a list of configs, each with
a different phase2 auth type.
Args:
config: A dict representing enterprise config.
Returns
A list of enterprise configs.
"""
results = []
phase2_types = WifiEnums.EapPhase2
if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
# Skip unsupported phase2 types for PEAP.
phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
for phase2_type in phase2_types:
# Skip a special case for passpoint TTLS.
if (WifiEnums.Enterprise.FQDN in config
and phase2_type == WifiEnums.EapPhase2.GTC):
continue
c = dict(config)
c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
results.append(c)
return results
def generate_eap_test_name(config, ad=None):
""" Generates a test case name based on an EAP configuration.
Args:
config: A dict representing an EAP credential.
ad object: Redundant but required as the same param is passed
to test_func in run_generated_tests
Returns:
A string representing the name of a generated EAP test case.
"""
eap = WifiEnums.Eap
eap_phase2 = WifiEnums.EapPhase2
Ent = WifiEnums.Enterprise
name = "test_connect-"
eap_name = ""
for e in eap:
if e.value == config[Ent.EAP]:
eap_name = e.name
break
if "peap0" in config[WifiEnums.SSID_KEY].lower():
eap_name = "PEAP0"
if "peap1" in config[WifiEnums.SSID_KEY].lower():
eap_name = "PEAP1"
name += eap_name
if Ent.PHASE2 in config:
for e in eap_phase2:
if e.value == config[Ent.PHASE2]:
name += "-{}".format(e.name)
break
return name
def group_attenuators(attenuators):
"""Groups a list of attenuators into attenuator groups for backward
compatibility reasons.
Most legacy Wi-Fi setups have two attenuators each connected to a separate
AP. The new Wi-Fi setup has four attenuators, each connected to one channel
on an AP, so two of them are connected to one AP.
To make the existing scripts work in the new setup, when the script needs
to attenuate one AP, it needs to set attenuation on both attenuators
connected to the same AP.
This function groups attenuators properly so the scripts work in both
legacy and new Wi-Fi setups.
Args:
attenuators: A list of attenuator objects, either two or four in length.
Raises:
signals.TestFailure is raised if the attenuator list does not have two
or four objects.
"""
attn0 = attenuator.AttenuatorGroup("AP0")
attn1 = attenuator.AttenuatorGroup("AP1")
# Legacy testbed setup has two attenuation channels.
num_of_attns = len(attenuators)
if num_of_attns == 2:
attn0.add(attenuators[0])
attn1.add(attenuators[1])
elif num_of_attns == 4:
attn0.add(attenuators[0])
attn0.add(attenuators[1])
attn1.add(attenuators[2])
attn1.add(attenuators[3])
else:
asserts.fail(("Either two or four attenuators are required for this "
"test, but found %s") % num_of_attns)
return [attn0, attn1]
def set_attns(attenuator, attn_val_name, roaming_attn=ROAMING_ATTN):
"""Sets attenuation values on attenuators used in this test.
Args:
attenuator: The attenuator object.
attn_val_name: Name of the attenuation value pair to use.
roaming_attn: Dictionary specifying the attenuation params.
"""
logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
try:
attenuator[0].set_atten(roaming_attn[attn_val_name][0])
attenuator[1].set_atten(roaming_attn[attn_val_name][1])
attenuator[2].set_atten(roaming_attn[attn_val_name][2])
attenuator[3].set_atten(roaming_attn[attn_val_name][3])
except:
logging.exception("Failed to set attenuation values %s.",
attn_val_name)
raise
def set_attns_steps(attenuators,
atten_val_name,
roaming_attn=ROAMING_ATTN,
steps=10,
wait_time=12):
"""Set attenuation values on attenuators used in this test. It will change
the attenuation values linearly from current value to target value step by
step.
Args:
attenuators: The list of attenuator objects that you want to change
their attenuation value.
atten_val_name: Name of the attenuation value pair to use.
roaming_attn: Dictionary specifying the attenuation params.
steps: Number of attenuator changes to reach the target value.
wait_time: Sleep time for each change of attenuator.
"""
logging.info("Set attenuation values to %s in %d step(s)",
roaming_attn[atten_val_name], steps)
start_atten = [attenuator.get_atten() for attenuator in attenuators]
target_atten = roaming_attn[atten_val_name]
for current_step in range(steps):
progress = (current_step + 1) / steps
for i, attenuator in enumerate(attenuators):
amount_since_start = (target_atten[i] - start_atten[i]) * progress
attenuator.set_atten(round(start_atten[i] + amount_since_start))
time.sleep(wait_time)
def trigger_roaming_and_validate(dut,
attenuator,
attn_val_name,
expected_con,
roaming_attn=ROAMING_ATTN):
"""Sets attenuators to trigger roaming and validate the DUT connected
to the BSSID expected.
Args:
attenuator: The attenuator object.
attn_val_name: Name of the attenuation value pair to use.
expected_con: The network information of the expected network.
roaming_attn: Dictionary specifying the attenaution params.
"""
expected_con = {
WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
WifiEnums.BSSID_KEY: expected_con["bssid"],
}
set_attns_steps(attenuator, attn_val_name, roaming_attn)
verify_wifi_connection_info(dut, expected_con)
expected_bssid = expected_con[WifiEnums.BSSID_KEY]
logging.info("Roamed to %s successfully", expected_bssid)
if not validate_connection(dut):
raise signals.TestFailure("Fail to connect to internet on %s" %
expected_bssid)
def create_softap_config():
"""Create a softap config with random ssid and password."""
ap_ssid = "softap_" + utils.rand_ascii_str(8)
ap_password = utils.rand_ascii_str(8)
logging.info("softap setup: %s %s", ap_ssid, ap_password)
config = {
WifiEnums.SSID_KEY: ap_ssid,
WifiEnums.PWD_KEY: ap_password,
}
return config
def start_softap_and_verify(ad, band):
"""Bring-up softap and verify AP mode and in scan results.
Args:
band: The band to use for softAP.
Returns: dict, the softAP config.
"""
# Register before start the test.
callbackId = ad.dut.droid.registerSoftApCallback()
# Check softap info value is default
frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
asserts.assert_true(frequency == 0, "Softap frequency is not reset")
asserts.assert_true(bandwdith == 0, "Softap bandwdith is not reset")
config = create_softap_config()
start_wifi_tethering(ad.dut,
config[WifiEnums.SSID_KEY],
config[WifiEnums.PWD_KEY],
band=band)
asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
"SoftAp is not reported as running")
start_wifi_connection_scan_and_ensure_network_found(
ad.dut_client, config[WifiEnums.SSID_KEY])
# Check softap info can get from callback succeed and assert value should be
# valid.
frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
asserts.assert_true(frequency > 0, "Softap frequency is not valid")
asserts.assert_true(bandwdith > 0, "Softap bandwdith is not valid")
# Unregister callback
ad.dut.droid.unregisterSoftApCallback(callbackId)
return config
def wait_for_expected_number_of_softap_clients(ad, callbackId,
expected_num_of_softap_clients):
"""Wait for the number of softap clients to be updated as expected.
Args:
callbackId: Id of the callback associated with registering.
expected_num_of_softap_clients: expected number of softap clients.
"""
eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
clientData = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data']
clientCount = clientData[wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
clientMacAddresses = clientData[
wifi_constants.SOFTAP_CLIENTS_MACS_CALLBACK_KEY]
asserts.assert_equal(
clientCount, expected_num_of_softap_clients,
"The number of softap clients doesn't match the expected number")
asserts.assert_equal(
len(clientMacAddresses), expected_num_of_softap_clients,
"The number of mac addresses doesn't match the expected number")
for macAddress in clientMacAddresses:
asserts.assert_true(checkMacAddress(macAddress),
"An invalid mac address was returned")
def checkMacAddress(input):
"""Validate whether a string is a valid mac address or not.
Args:
input: The string to validate.
Returns: True/False, returns true for a valid mac address and false otherwise.
"""
macValidationRegex = "[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
if re.match(macValidationRegex, input.lower()):
return True
return False
def wait_for_expected_softap_state(ad, callbackId, expected_softap_state):
"""Wait for the expected softap state change.
Args:
callbackId: Id of the callback associated with registering.
expected_softap_state: The expected softap state.
"""
eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
callbackId) + wifi_constants.SOFTAP_STATE_CHANGED
asserts.assert_equal(
ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'][
wifi_constants.SOFTAP_STATE_CHANGE_CALLBACK_KEY],
expected_softap_state,
"Softap state doesn't match with expected state")
def get_current_number_of_softap_clients(ad, callbackId):
"""pop up all of softap client updated event from queue.
Args:
callbackId: Id of the callback associated with registering.
Returns:
If exist aleast callback, returns last updated number_of_softap_clients.
Returns None when no any match callback event in queue.
"""
eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
events = ad.ed.pop_all(eventStr)
for event in events:
num_of_clients = event['data'][
wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
if len(events) == 0:
return None
return num_of_clients
def get_current_softap_info(ad, callbackId, need_to_wait):
"""pop up all of softap info changed event from queue.
Args:
callbackId: Id of the callback associated with registering.
need_to_wait: Wait for the info callback event before pop all.
Returns:
Returns last updated information of softap.
"""
eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
callbackId) + wifi_constants.SOFTAP_INFO_CHANGED
ad.log.debug("softap info dump from eventStr %s", eventStr)
frequency = 0
bandwidth = 0
if (need_to_wait):
event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
frequency = event['data'][
wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
bandwidth = event['data'][
wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
ad.log.info("softap info updated, frequency is %s, bandwidth is %s",
frequency, bandwidth)
events = ad.ed.pop_all(eventStr)
for event in events:
frequency = event['data'][
wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
bandwidth = event['data'][
wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
ad.log.info("softap info, frequency is %s, bandwidth is %s", frequency,
bandwidth)
return frequency, bandwidth
def get_current_softap_infos(ad, callbackId, need_to_wait):
"""pop up all of softap info list changed event from queue.
Args:
callbackId: Id of the callback associated with registering.
need_to_wait: Wait for the info callback event before pop all.
Returns:
Returns last updated informations of softap.
"""
eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
callbackId) + wifi_constants.SOFTAP_INFOLIST_CHANGED
ad.log.debug("softap info dump from eventStr %s", eventStr)
if (need_to_wait):
event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
infos = event['data']
events = ad.ed.pop_all(eventStr)
for event in events:
infos = event['data']
for info in infos:
frequency = info[
wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
bandwidth = info[
wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
wifistandard = info[
wifi_constants.SOFTAP_INFO_WIFISTANDARD_CALLBACK_KEY]
bssid = info[
wifi_constants.SOFTAP_INFO_BSSID_CALLBACK_KEY]
ad.log.info(
"softap info, freq:%s, bw:%s, wifistandard:%s, bssid:%s",
frequency, bandwidth, wifistandard, bssid)
return infos
def get_current_softap_capability(ad, callbackId, need_to_wait):
"""pop up all of softap info list changed event from queue.
Args:
callbackId: Id of the callback associated with registering.
need_to_wait: Wait for the info callback event before pop all.
Returns:
Returns last updated capability of softap.
"""
eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
callbackId) + wifi_constants.SOFTAP_CAPABILITY_CHANGED
ad.log.debug("softap capability dump from eventStr %s", eventStr)
if (need_to_wait):
event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
capability = event['data']
events = ad.ed.pop_all(eventStr)
for event in events:
capability = event['data']
return capability
def get_ssrdumps(ad):
"""Pulls dumps in the ssrdump dir
Args:
ad: android device object.
"""
logs = ad.get_file_names("/data/vendor/ssrdump/")
if logs:
ad.log.info("Pulling ssrdumps %s", logs)
log_path = os.path.join(ad.device_log_path, "SSRDUMPS_%s" % ad.serial)
os.makedirs(log_path, exist_ok=True)
ad.pull_files(logs, log_path)
ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete",
ignore_status=True)
def start_pcap(pcap, wifi_band, test_name):
"""Start packet capture in monitor mode.
Args:
pcap: packet capture object
wifi_band: '2g' or '5g' or 'dual'
test_name: test name to be used for pcap file name
Returns:
Dictionary with wifi band as key and the tuple
(pcap Process object, log directory) as the value
"""
log_dir = os.path.join(
context.get_current_context().get_full_output_path(), 'PacketCapture')
os.makedirs(log_dir, exist_ok=True)
if wifi_band == 'dual':
bands = [BAND_2G, BAND_5G]
else:
bands = [wifi_band]
procs = {}
for band in bands:
proc = pcap.start_packet_capture(band, log_dir, test_name)
procs[band] = (proc, os.path.join(log_dir, test_name))
return procs
def stop_pcap(pcap, procs, test_status=None):
"""Stop packet capture in monitor mode.
Since, the pcap logs in monitor mode can be very large, we will
delete them if they are not required. 'test_status' if True, will delete
the pcap files. If False, we will keep them.
Args:
pcap: packet capture object
procs: dictionary returned by start_pcap
test_status: status of the test case
"""
for proc, fname in procs.values():
pcap.stop_packet_capture(proc)
if test_status:
shutil.rmtree(os.path.dirname(fname))
def verify_mac_not_found_in_pcap(ad, mac, packets):
"""Verify that a mac address is not found in the captured packets.
Args:
ad: android device object
mac: string representation of the mac address
packets: packets obtained by rdpcap(pcap_fname)
"""
for pkt in packets:
logging.debug("Packet Summary = %s", pkt.summary())
if mac in pkt.summary():
asserts.fail("Device %s caught Factory MAC: %s in packet sniffer."
"Packet = %s" % (ad.serial, mac, pkt.show()))
def verify_mac_is_found_in_pcap(ad, mac, packets):
"""Verify that a mac address is found in the captured packets.
Args:
ad: android device object
mac: string representation of the mac address
packets: packets obtained by rdpcap(pcap_fname)
"""
for pkt in packets:
if mac in pkt.summary():
return
asserts.fail("Did not find MAC = %s in packet sniffer."
"for device %s" % (mac, ad.serial))
def start_all_wlan_logs(ads):
for ad in ads:
start_wlan_logs(ad)
def start_wlan_logs(ad):
"""Start Pixel Logger to record extra wifi logs
Args:
ad: android device object.
"""
if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
ad.log.info("Device doesn't have Pixel Logger")
return
ad.adb.shell(
"find /sdcard/Android/data/com.android.pixellogger/files/logs"
"/wlan_logs/ -type f -delete",
ignore_status=True)
if ad.file_exists("/vendor/bin/cnss_diag"):
ad.adb.shell("am startservice -a com.android.pixellogger.service"
".logging.LoggingService.ACTION_START_LOGGING "
"-e intent_logger cnss_diag", ignore_status=True)
else:
ad.adb.shell("am startservice -a com.android.pixellogger.service"
".logging.LoggingService.ACTION_START_LOGGING "
"-e intent_logger wlan_logs", ignore_status=True)
def stop_all_wlan_logs(ads):
for ad in ads:
stop_wlan_logs(ad)
ad.log.info("Wait 30s for the createion of zip file for wlan logs")
time.sleep(30)
def stop_wlan_logs(ad):
"""Stops Pixel Logger
Args:
ad: android device object.
"""
if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
return
ad.adb.shell("am startservice -a com.android.pixellogger.service.logging"
".LoggingService.ACTION_STOP_LOGGING", ignore_status=True)
def get_wlan_logs(ad):
"""Pull logs from Pixel Logger folder
Args:
ad: android device object.
"""
logs = ad.get_file_names("/sdcard/Android/data/com.android.pixellogger/files/logs/wlan_logs")
if logs:
ad.log.info("Pulling Pixel Logger logs %s", logs)
log_path = os.path.join(ad.device_log_path, "WLAN_LOGS_%s" % ad.serial)
os.makedirs(log_path, exist_ok=True)
ad.pull_files(logs, log_path)
LinkProbeResult = namedtuple(
'LinkProbeResult',
('is_success', 'stdout', 'elapsed_time', 'failure_reason'))
def send_link_probe(ad):
"""Sends a link probe to the currently connected AP, and returns whether the
probe succeeded or not.
Args:
ad: android device object
Returns:
LinkProbeResult namedtuple
"""
stdout = ad.adb.shell('cmd wifi send-link-probe')
asserts.assert_false('Error' in stdout or 'Exception' in stdout,
'Exception while sending link probe: ' + stdout)
is_success = False
elapsed_time = None
failure_reason = None
if 'succeeded' in stdout:
is_success = True
elapsed_time = next(
(int(token) for token in stdout.split() if token.isdigit()), None)
elif 'failed with reason' in stdout:
failure_reason = next(
(int(token) for token in stdout.split() if token.isdigit()), None)
else:
asserts.fail('Unexpected link probe result: ' + stdout)
return LinkProbeResult(is_success=is_success,
stdout=stdout,
elapsed_time=elapsed_time,
failure_reason=failure_reason)
def send_link_probes(ad, num_probes, delay_sec):
"""Sends a sequence of link probes to the currently connected AP, and
returns whether the probes succeeded or not.
Args:
ad: android device object
num_probes: number of probes to perform
delay_sec: delay time between probes, in seconds
Returns:
List[LinkProbeResult] one LinkProbeResults for each probe
"""
logging.info('Sending link probes')
results = []
for _ in range(num_probes):
# send_link_probe() will also fail the test if it sees an exception
# in the stdout of the adb shell command
result = send_link_probe(ad)
logging.info('link probe results: ' + str(result))
results.append(result)
time.sleep(delay_sec)
return results
def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
"""Set up the AP with provided network info.
Args:
test: the calling test class object.
index: int, index of the AP.
ap: access_point object of the AP.
network: dict with information of the network, including ssid,
password and bssid.
bandwidth: the operation bandwidth for the AP, default 80MHz.
channel: the channel number for the AP.
Returns:
brconfigs: the bridge interface configs
"""
bss_settings = []
ssid = network[WifiEnums.SSID_KEY]
test.access_points[index].close()
time.sleep(5)
# Configure AP as required.
if "password" in network.keys():
password = network["password"]
security = hostapd_security.Security(security_mode="wpa",
password=password)
else:
security = hostapd_security.Security(security_mode=None, password=None)
config = hostapd_ap_preset.create_ap_preset(channel=channel,
ssid=ssid,
security=security,
bss_settings=bss_settings,
vht_bandwidth=bandwidth,
profile_name='whirlwind',
iface_wlan_2g=ap.wlan_2g,
iface_wlan_5g=ap.wlan_5g)
ap.start_ap(config)
logging.info("AP started on channel {} with SSID {}".format(channel, ssid))
def turn_ap_off(test, AP):
"""Bring down hostapd on the Access Point.
Args:
test: The test class object.
AP: int, indicating which AP to turn OFF.
"""
hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
if hostapd_2g.is_alive():
hostapd_2g.stop()
logging.debug('Turned WLAN0 AP%d off' % AP)
hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
if hostapd_5g.is_alive():
hostapd_5g.stop()
logging.debug('Turned WLAN1 AP%d off' % AP)
def turn_ap_on(test, AP):
"""Bring up hostapd on the Access Point.
Args:
test: The test class object.
AP: int, indicating which AP to turn ON.
"""
hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
if not hostapd_2g.is_alive():
hostapd_2g.start(hostapd_2g.config)
logging.debug('Turned WLAN0 AP%d on' % AP)
hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
if not hostapd_5g.is_alive():
hostapd_5g.start(hostapd_5g.config)
logging.debug('Turned WLAN1 AP%d on' % AP)
def turn_location_off_and_scan_toggle_off(ad):
"""Turns off wifi location scans."""
utils.set_location_service(ad, False)
ad.droid.wifiScannerToggleAlwaysAvailable(False)
msg = "Failed to turn off location service's scan."
asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
def set_softap_channel(dut, ap_iface='wlan1', cs_count=10, channel=2462):
""" Set SoftAP mode channel
Args:
dut: android device object
ap_iface: interface of SoftAP mode.
cs_count: how many beacon frames before switch channel, default = 10
channel: a wifi channel.
"""
chan_switch_cmd = 'hostapd_cli -i {} chan_switch {} {}'
chan_switch_cmd_show = chan_switch_cmd.format(ap_iface, cs_count, channel)
dut.log.info('adb shell {}'.format(chan_switch_cmd_show))
chan_switch_result = dut.adb.shell(
chan_switch_cmd.format(ap_iface, cs_count, channel))
if chan_switch_result == 'OK':
dut.log.info('switch hotspot channel to {}'.format(channel))
return chan_switch_result
asserts.fail("Failed to switch hotspot channel")
def get_wlan0_link(dut):
""" get wlan0 interface status"""
get_wlan0 = 'wpa_cli -iwlan0 -g@android:wpa_wlan0 IFNAME=wlan0 status'
out = dut.adb.shell(get_wlan0)
out = dict(re.findall(r'(\S+)=(".*?"|\S+)', out))
asserts.assert_true("ssid" in out,
"Client doesn't connect to any network")
return out
def verify_11ax_wifi_connection(ad, wifi6_supported_models, wifi6_ap):
"""Verify 11ax for wifi connection.
Args:
ad: adndroid device object
wifi6_supported_models: device supporting 11ax.
wifi6_ap: if the AP supports 11ax.
"""
if wifi6_ap and ad.model in wifi6_supported_models:
logging.info("Verifying 11ax. Model: %s" % ad.model)
asserts.assert_true(
ad.droid.wifiGetConnectionStandard() ==
wifi_constants.WIFI_STANDARD_11AX, "DUT did not connect to 11ax.")
def verify_11ax_softap(dut, dut_client, wifi6_supported_models):
"""Verify 11ax SoftAp if devices support it.
Check if both DUT and DUT client supports 11ax, then SoftAp turns on
with 11ax mode and DUT client can connect to it.
Args:
dut: Softap device.
dut_client: Client connecting to softap.
wifi6_supported_models: List of device models supporting 11ax.
"""
if dut.model in wifi6_supported_models and dut_client.model in wifi6_supported_models:
logging.info(
"Verifying 11ax softap. DUT model: %s, DUT Client model: %s",
dut.model, dut_client.model)
asserts.assert_true(
dut_client.droid.wifiGetConnectionStandard() ==
wifi_constants.WIFI_STANDARD_11AX,
"DUT failed to start SoftAp in 11ax.")
def check_available_channels_in_bands_2_5(dut, country_code):
"""Check if DUT is capable of enable BridgedAp.
#TODO: Find a way to make this function flexible by taking an argument.
Args:
country_code: country code, e.g., 'US', 'JP'.
Returns:
True: If DUT is capable of enable BridgedAp.
False: If DUT is not capable of enable BridgedAp.
"""
set_wifi_country_code(dut, country_code)
country = dut.droid.wifiGetCountryCode()
dut.log.info("DUT current country code : {}".format(country))
# Wi-Fi ON and OFF to make sure country code take effet.
wifi_toggle_state(dut, True)
wifi_toggle_state(dut, False)
# Register SoftAp Callback and get SoftAp capability.
callbackId = dut.droid.registerSoftApCallback()
capability = get_current_softap_capability(dut, callbackId, True)
dut.droid.unregisterSoftApCallback(callbackId)
if capability[wifi_constants.
SOFTAP_CAPABILITY_24GHZ_SUPPORTED_CHANNEL_LIST] and \
capability[wifi_constants.
SOFTAP_CAPABILITY_5GHZ_SUPPORTED_CHANNEL_LIST]:
return True
return False
@retry(tries=5, delay=2)
def validate_ping_between_two_clients(dut1, dut2):
"""Make 2 DUT ping each other.
Args:
dut1: An AndroidDevice object.
dut2: An AndroidDevice object.
"""
# Get DUTs' IPv4 addresses.
dut1_ip = ""
dut2_ip = ""
try:
dut1_ip = dut1.droid.connectivityGetIPv4Addresses('wlan0')[0]
except IndexError as e:
dut1.log.info(
"{} has no Wi-Fi connection, cannot get IPv4 address."
.format(dut1.serial))
try:
dut2_ip = dut2.droid.connectivityGetIPv4Addresses('wlan0')[0]
except IndexError as e:
dut2.log.info(
"{} has no Wi-Fi connection, cannot get IPv4 address."
.format(dut2.serial))
# Test fail if not able to obtain two DUT's IPv4 addresses.
asserts.assert_true(dut1_ip and dut2_ip,
"Ping failed because no DUT's IPv4 address")
dut1.log.info("{} IPv4 addresses : {}".format(dut1.serial, dut1_ip))
dut2.log.info("{} IPv4 addresses : {}".format(dut2.serial, dut2_ip))
# Two clients ping each other
dut1.log.info("{} ping {}".format(dut1_ip, dut2_ip))
asserts.assert_true(
utils.adb_shell_ping(dut1, count=10, dest_ip=dut2_ip,
timeout=20),
"%s ping %s failed" % (dut1.serial, dut2_ip))
dut2.log.info("{} ping {}".format(dut2_ip, dut1_ip))
asserts.assert_true(
utils.adb_shell_ping(dut2, count=10, dest_ip=dut1_ip,
timeout=20),
"%s ping %s failed" % (dut2.serial, dut1_ip))
def get_wear_wifimediator_disable_status(ad):
"""Gets WearWifiMediator disable status.
Args:
ad: Android Device
Returns:
True if WearWifiMediator is disabled, False otherwise.
"""
status = ad.adb.shell("settings get global cw_disable_wifimediator")
if status == "1":
ad.log.info("WearWifiMediator is DISABLED")
status = True
else:
ad.log.info("WearWifiMediator is ENABLED")
status = False
return status
def disable_wear_wifimediator(ad, state):
"""Disables WearWifiMediator.
Args:
ad: Android Device
state: True to disable, False otherwise.
"""
if state:
ad.log.info("Disabling WearWifiMediator.....")
ad.adb.shell("settings put global cw_disable_wifimediator 1")
asserts.assert_true(get_wear_wifimediator_disable_status(ad),
"WearWifiMediator should be disabled")
else:
ad.log.info("Enabling WearWifiMediator.....")
ad.adb.shell("settings put global cw_disable_wifimediator 0")
asserts.assert_false(get_wear_wifimediator_disable_status(ad),
"WearWifiMediator should be enabled")