Update Wi-Fi manager, scanner, and PNO test scripts to run on new
Wi-Fi test bed.
Apply pep8 style.
Use asserts properly.
Update string format convention.
Clean up import statements and namespace.
Update attenuator usage for new Wi-Fi test setup.
Bug=29182983
Change-Id: Ic0ab7c9472bce131120ff7a47287af7c7c2586fc
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index 01011ce..0a98a76 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -23,6 +23,7 @@
from acts import asserts
from acts import signals
+from acts.controllers import attenuator
from acts.utils import exe_cmd
from acts.utils import require_sl4a
from acts.utils import sync_device_time
@@ -1027,3 +1028,43 @@
c[WifiEnums.Enterprise.PHASE2] = phase2_type
results.append(c)
return results
+
+
+def group_attenuators(attenuators):
+ """Groups a list of attenuators into attenuator groups for backward
+ compatibility reasons.
+
+ Most legacy Wi-Fi setups have two attenuators each connected to a separate
+ AP. The new Wi-Fi setup has four attenuators, each connected to one channel
+ on an AP, so two of them are connected to one AP.
+
+ To make the existing scripts work in the new setup, when the script needs
+ to attenuate one AP, it needs to set attenuation on both attenuators
+ connected to the same AP.
+
+ This function groups attenuators properly so the scripts work in both
+ legacy and new Wi-Fi setups.
+
+ Args:
+ attenuators: A list of attenuator objects, either two or four in length.
+
+ Raises:
+ signals.TestFailure is raised if the attenuator list does not have two
+ or four objects.
+ """
+ attn0 = attenuator.AttenuatorGroup("AP0")
+ attn1 = attenuator.AttenuatorGroup("AP1")
+ # Legacy testbed setup has two attenuation channels.
+ num_of_attns = len(attenuators)
+ if num_of_attns == 2:
+ attn0.add(attenuators[0])
+ attn1.add(attenuators[1])
+ elif num_of_attns == 4:
+ attn0.add(attenuators[0])
+ attn0.add(attenuators[1])
+ attn1.add(attenuators[2])
+ attn1.add(attenuators[3])
+ else:
+ asserts.fail(("Either two or four attenuators are required for this "
+ "test, but found %s") % num_of_attns)
+ return [attn0, attn1]
diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py
index 78b2b0f..0b4827c 100755
--- a/acts/tests/google/wifi/WifiManagerTest.py
+++ b/acts/tests/google/wifi/WifiManagerTest.py
@@ -30,9 +30,21 @@
class WifiManagerTest(acts.base_test.BaseTestClass):
+ """Tests for APIs in Android's WifiManager class.
+
+ Test Bed Requirement:
+ * One Android device
+ * Several Wi-Fi networks visible to the device, including an open Wi-Fi
+ network.
+ """
def setup_class(self):
self.dut = self.android_devices[0]
wutils.wifi_test_device_init(self.dut)
+ # If running in a setup with attenuators, set attenuation on all
+ # channels to zero.
+ if getattr(self, "attenuators", []):
+ for a in self.attenuators:
+ a.set_atten(0)
req_params = ("iot_networks", "open_network", "iperf_server_address",
"tdls_models", "energy_info_models")
self.unpack_userparams(req_params)
@@ -207,9 +219,7 @@
actual_support = self.dut.droid.wifiIsEnhancedPowerReportingSupported()
model = self.dut.model
expected_support = model in self.energy_info_models
- msg = "Expect energy info support to be %s on %s, got %s." % (
- expected_support, model, actual_support)
- asserts.assert_true(actual_support == expected_support, msg)
+ asserts.assert_equal(expected_support, actual_support, msg)
if not actual_support:
asserts.skip(
("Device %s does not support energy info reporting as "
diff --git a/acts/tests/google/wifi/WifiPnoTest.py b/acts/tests/google/wifi/WifiPnoTest.py
index 6aeda48..07220a6 100644
--- a/acts/tests/google/wifi/WifiPnoTest.py
+++ b/acts/tests/google/wifi/WifiPnoTest.py
@@ -15,33 +15,22 @@
import time
-import acts.base_test
-import acts.test_utils.wifi.wifi_test_utils as wutils
-
from acts import asserts
+from acts import base_test
+import acts.test_utils.wifi.wifi_test_utils as wutils
WifiEnums = wutils.WifiEnums
WifiEventNames = wutils.WifiEventNames
-class WifiPnoTest(acts.base_test.BaseTestClass):
- def __init__(self, controllers):
- acts.base_test.BaseTestClass.__init__(self, controllers)
- self.tests = (
- "test_simple_pno_connection",
- "test_pno_connection_with_multiple_saved_networks",
- )
-
+class WifiPnoTest(base_test.BaseTestClass):
def setup_class(self):
self.dut = self.android_devices[0]
wutils.wifi_test_device_init(self.dut)
- req_params = (
- "attn_vals",
- "pno_network_a",
- "pno_network_b",
- "pno_interval"
- )
+ req_params = ("attn_vals", "pno_network_a", "pno_network_b",
+ "pno_interval")
self.unpack_userparams(req_params)
+ self.attenuators = wutils.group_attenuators(self.attenuators)
self.attn_a = self.attenuators[0]
self.attn_b = self.attenuators[1]
self.set_attns("default")
@@ -66,20 +55,21 @@
self.dut.cat_adb_log(test_name, begin_time)
"""Helper Functions"""
+
def set_attns(self, attn_val_name):
"""Sets attenuation values on attenuators used in this test.
Args:
attn_val_name: Name of the attenuation value pair to use.
"""
- msg = "Set attenuation values to %s" % self.attn_vals[attn_val_name]
- self.log.info(msg)
+ self.log.info("Set attenuation values to %s",
+ self.attn_vals[attn_val_name])
try:
self.attn_a.set_atten(self.attn_vals[attn_val_name][0])
self.attn_b.set_atten(self.attn_vals[attn_val_name][1])
except:
- msg = "Failed to set attenuation values %s." % attn_val_name
- self.log.error(msg)
+ self.log.error("Failed to set attenuation values %s.",
+ attn_val_name)
raise
def trigger_pno_and_assert_connect(self, attn_val_name, expected_con):
@@ -93,21 +83,21 @@
to roam to.
"""
connection_info = self.dut.droid.wifiGetConnectionInfo()
- self.log.info("Triggering PNO connect from %s to %s" %
- (connection_info[WifiEnums.SSID_KEY],
- expected_con[WifiEnums.SSID_KEY]))
+ self.log.info("Triggering PNO connect from %s to %s",
+ connection_info[WifiEnums.SSID_KEY],
+ expected_con[WifiEnums.SSID_KEY])
self.dut.droid.goToSleepNow()
self.set_attns(attn_val_name)
- self.log.info("Wait %ss for PNO to trigger." % self.pno_interval)
+ self.log.info("Wait %ss for PNO to trigger.", self.pno_interval)
time.sleep(self.pno_interval)
try:
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
expected_ssid = expected_con[WifiEnums.SSID_KEY]
- verify_con = { WifiEnums.SSID_KEY : expected_ssid }
+ verify_con = {WifiEnums.SSID_KEY: expected_ssid}
wutils.verify_wifi_connection_info(self.dut, verify_con)
- self.log.info("Connected to %s successfully after PNO" %
- expected_ssid)
+ self.log.info("Connected to %s successfully after PNO",
+ expected_ssid)
finally:
self.dut.droid.wifiLockRelease()
self.dut.droid.goToSleepNow()
@@ -119,14 +109,16 @@
num_networks: Number of networks to add.
"""
ssid_name_base = "pno_dummy_network_"
- for i in range(0, num_networks) :
+ for i in range(0, num_networks):
network = {}
network[WifiEnums.SSID_KEY] = ssid_name_base + str(i)
- network[WifiEnums.PWD_KEY] = "pno_dummy";
- asserts.assert_true(self.dut.droid.wifiAddNetwork(network) != -1,
+ network[WifiEnums.PWD_KEY] = "pno_dummy"
+ asserts.assert_true(
+ self.dut.droid.wifiAddNetwork(network) != -1,
"Add network %r failed" % network)
""" Tests Begin """
+
def test_simple_pno_connection(self):
"""Test PNO triggered autoconnect to a network.
@@ -168,4 +160,5 @@
"""
self.add_dummy_networks(16)
self.test_simple_pno_connection()
+
""" Tests End """
diff --git a/acts/tests/google/wifi/WifiScannerBssidTest.py b/acts/tests/google/wifi/WifiScannerBssidTest.py
index 0e41e1a..9c2e257 100755
--- a/acts/tests/google/wifi/WifiScannerBssidTest.py
+++ b/acts/tests/google/wifi/WifiScannerBssidTest.py
@@ -14,62 +14,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
-import os
import itertools
+import queue
-from queue import Empty
from acts import asserts
-from acts.base_test import BaseTestClass
-from acts.utils import load_config
-from acts.test_utils.wifi.wifi_test_utils import start_wifi_track_bssid
-from acts.test_utils.wifi.wifi_test_utils import start_wifi_background_scan
-from acts.test_utils.wifi.wifi_test_utils import wifi_test_device_init
-from acts.test_utils.wifi.wifi_test_utils import WifiChannelUS
-from acts.test_utils.wifi.wifi_test_utils import WifiEnums
-from acts.test_utils.wifi.wifi_test_utils import get_scan_time_and_channels
-
+from acts import base_test
+from acts import utils
+from acts.test_utils.wifi import wifi_test_utils as wutils
BSSID_EVENT_WAIT = 30
BSSID_EVENT_TAG = "WifiScannerBssid"
SCAN_EVENT_TAG = "WifiScannerScan"
-SCANTIME = 10000 #framework support only 10s as minimum scan interval
+SCANTIME = 10000 #framework support only 10s as minimum scan interval
+
class WifiScannerBssidError(Exception):
pass
-class WifiScannerBssidTest(BaseTestClass):
- def __init__(self, controllers):
- BaseTestClass.__init__(self, controllers)
- # A list of all test cases to be executed in this class.
- self.tests = ("test_wifi_track_bssid_sanity",
- "test_wifi_track_bssid_found",
- "test_wifi_track_bssid_lost",
- "test_wifi_track_bssid_for_2g_while_scanning_5g_channels",
- "test_wifi_track_bssid_for_5g_while_scanning_2g_channels",)
+class WifiScannerBssidTest(base_test.BaseTestClass):
+ def setup_class(self):
self.default_scan_setting = {
- "band": WifiEnums.WIFI_BAND_BOTH_WITH_DFS,
+ "band": wutils.WifiEnums.WIFI_BAND_BOTH_WITH_DFS,
"periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
+ "reportEvents": wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
'numBssidsPerScan': 32
}
self.leeway = 5
- self.stime_channel = 47 #dwell time plus 2ms
-
- def setup_class(self):
+ self.stime_channel = 47 #dwell time plus 2ms
self.dut = self.android_devices[0]
- wifi_test_device_init(self.dut)
+ wutils.wifi_test_device_init(self.dut)
+ self.attenuators = wutils.group_attenuators(self.attenuators)
asserts.assert_true(self.dut.droid.wifiIsScannerSupported(),
- "Device %s doesn't support WifiScanner, abort." % self.dut.model)
+ "Device %s doesn't support WifiScanner, abort." %
+ self.dut.model)
"""It will setup the required dependencies and fetch the user params from
config file"""
self.attenuators[0].set_atten(0)
self.attenuators[1].set_atten(0)
req_params = ("bssid_2g", "bssid_5g", "bssid_dfs", "attenuator_id",
"max_bugreports")
- self.wifi_chs = WifiChannelUS(self.dut.model)
+ self.wifi_chs = wutils.WifiChannelUS(self.dut.model)
self.unpack_userparams(req_params)
def on_fail(self, test_name, begin_time):
@@ -78,6 +64,7 @@
self.max_bugreports -= 1
""" Helper Functions Begin """
+
def fetch_scan_result(self, scan_idx, scan_setting):
"""Fetch the scan result for provider listener index.
@@ -93,23 +80,25 @@
"""
#generating event wait time from scan setting plus leeway
self.log.debug(scan_setting)
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- scan_setting,
- self.stime_channel)
- scan_time += scan_setting['periodInMs'] #add scan period delay for next cycle
- if scan_setting["reportEvents"] == WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN:
- waittime = int(scan_time/1000) + self.leeway
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, scan_setting, self.stime_channel)
+ scan_time += scan_setting['periodInMs'
+ ] #add scan period delay for next cycle
+ if scan_setting[
+ "reportEvents"] == wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN:
+ waittime = int(scan_time / 1000) + self.leeway
else:
- time_cache = scan_setting['periodInMs'] * 10 #default cache
- waittime = int((time_cache + scan_time )/1000) + self.leeway
- event_name = "{}{}onResults".format(SCAN_EVENT_TAG, scan_idx)
- self.log.info("Waiting for the scan result event {}".format(event_name))
+ time_cache = scan_setting['periodInMs'] * 10 #default cache
+ waittime = int((time_cache + scan_time) / 1000) + self.leeway
+ event_name = "%s%sonResults" % (SCAN_EVENT_TAG, scan_idx)
+ self.log.info("Waiting for the scan result event %s", event_name)
event = self.dut.ed.pop_event(event_name, waittime)
results = event["data"]["Results"]
if len(results) > 0 and "ScanResults" in results[0]:
return results[0]["ScanResults"]
- def start_scan_and_validate_environment(self, scan_setting, bssid_settings):
+ def start_scan_and_validate_environment(self, scan_setting,
+ bssid_settings):
"""Validate environment for test using current scan result for provided
settings.
@@ -124,22 +113,23 @@
True, if bssid not found in scan result.
"""
try:
- data = start_wifi_background_scan(self.dut, scan_setting)
+ data = wutils.start_wifi_background_scan(self.dut, scan_setting)
self.scan_idx = data["Index"]
results = self.fetch_scan_result(self.scan_idx, scan_setting)
- self.log.debug("scan result {}".format(results))
- asserts.assert_true(results, "Device is not able to fetch the scan results")
+ self.log.debug("scan result %s.", results)
+ asserts.assert_true(results,
+ "Device is not able to fetch the scan results")
for result in results:
for bssid_setting in bssid_settings:
- if bssid_setting[WifiEnums.BSSID_KEY] == result[WifiEnums.BSSID_KEY]:
- self.log.error("Bssid {} already exist in current scan results".
- format(result[WifiEnums.BSSID_KEY]))
- return False
- except Empty as error:
+ if bssid_setting[wutils.WifiEnums.BSSID_KEY] == result[
+ wutils.WifiEnums.BSSID_KEY]:
+ asserts.fail(("Test environment is not valid: Bssid %s"
+ "already exist in current scan results")
+ % result[wutils.WifiEnums.BSSID_KEY])
+ except queue.Empty as error:
self.dut.droid.wifiScannerStopBackgroundScan(self.scan_idx)
- raise AssertionError("OnResult event did not triggered for scanner\n{}".
- format(error))
- return True
+ raise AssertionError(
+ "OnResult event did not triggered for scanner\n%s" % error)
def check_bssid_in_found_result(self, bssid_settings, found_results):
"""look for any tracked bssid in reported result of found bssids.
@@ -153,10 +143,11 @@
"""
for bssid_setting in bssid_settings:
for found_result in found_results:
- if found_result[WifiEnums.BSSID_KEY] == bssid_setting[WifiEnums.
- BSSID_KEY]:
- return True
- return False
+ if found_result[wutils.WifiEnums.BSSID_KEY] == bssid_setting[
+ wutils.WifiEnums.BSSID_KEY]:
+ return
+ asserts.fail("Test fail because Bssid %s is not found in event results"
+ % bssid_settings)
def track_bssid_with_vaild_scan_for_found(self, track_setting):
"""Common logic for tracking a bssid for Found event.
@@ -175,31 +166,27 @@
True if found event occur for interested BSSID.
"""
self.attenuators[self.attenuator_id].set_atten(90)
- data = start_wifi_track_bssid(self.dut, track_setting)
+ data = wutils.start_wifi_track_bssid(self.dut, track_setting)
idx = data["Index"]
- valid_env = self.start_scan_and_validate_environment(
- self.default_scan_setting, track_setting["bssidInfos"])
+ self.start_scan_and_validate_environment(self.default_scan_setting,
+ track_setting["bssidInfos"])
try:
- asserts.assert_true(valid_env,
- "Test environment is not valid, AP is in range")
self.attenuators[self.attenuator_id].set_atten(0)
- event_name = "{}{}onFound".format(BSSID_EVENT_TAG, idx)
- self.log.info("Waiting for the BSSID event {}".format(event_name))
+ event_name = "%s%sonFound" % (BSSID_EVENT_TAG, idx)
+ self.log.info("Waiting for the BSSID event %s", event_name)
event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT)
self.log.debug(event)
- found = self.check_bssid_in_found_result(track_setting["bssidInfos"],
- event["data"]["Results"])
- asserts.assert_true(found,
- "Test fail because Bssid is not found in event results")
- except Empty as error:
- self.log.error("{}".format(error))
+ self.check_bssid_in_found_result(track_setting["bssidInfos"],
+ event["data"]["Results"])
+ except queue.Empty as error:
+ self.log.error(error)
# log scan result for debugging
results = self.fetch_scan_result(self.scan_idx,
self.default_scan_setting)
- self.log.debug("scan result {}".format(results))
- raise AssertionError("Event {} did not triggered for {}\n{}".
- format(event_name, track_setting["bssidInfos"],
- error))
+ self.log.debug("scan result %s", results)
+ raise AssertionError("Event %s did not triggered for %s\n%s" %
+ (event_name, track_setting["bssidInfos"],
+ error))
finally:
self.dut.droid.wifiScannerStopBackgroundScan(self.scan_idx)
self.dut.droid.wifiScannerStopTrackingBssids(idx)
@@ -224,39 +211,35 @@
True if Lost event occur for interested BSSID.
"""
self.attenuators[self.attenuator_id].set_atten(90)
- valid_env = self.start_scan_and_validate_environment(
- self.default_scan_setting, track_setting["bssidInfos"])
+ self.start_scan_and_validate_environment(self.default_scan_setting,
+ track_setting["bssidInfos"])
idx = None
found = False
try:
- asserts.assert_true(valid_env,
- "Test environment is not valid, AP is in range")
- data = start_wifi_track_bssid(self.dut, track_setting)
+ data = wutils.start_wifi_track_bssid(self.dut, track_setting)
idx = data["Index"]
self.attenuators[self.attenuator_id].set_atten(0)
#onFound event should be occurre before tracking for onLost event
- event_name = "{}{}onFound".format(BSSID_EVENT_TAG, idx)
- self.log.info("Waiting for the BSSID event {}".format(event_name))
+ event_name = "%s%sonFound" % (BSSID_EVENT_TAG, idx)
+ self.log.info("Waiting for the BSSID event %s", event_name)
event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT)
self.log.debug(event)
- found = self.check_bssid_in_found_result(track_setting["bssidInfos"],
- event["data"]["Results"])
- asserts.assert_true(found,
- "Test fail because Bssid is not found in event results")
- if found:
- self.attenuators[self.attenuator_id].set_atten(90)
- # log scan result for debugging
- for i in range(1, track_setting["apLostThreshold"]):
- results = self.fetch_scan_result(self.scan_idx,
- self.default_scan_setting)
- self.log.debug("scan result {} {}".format(i, results))
- event_name = "{}{}onLost".format(BSSID_EVENT_TAG, idx)
- self.log.info("Waiting for the BSSID event {}".format(event_name))
- event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT)
- self.log.debug(event)
- except Empty as error:
- raise AssertionError("Event {} did not triggered for {}\n{}".
- format(event_name, track_setting["bssidInfos"], error))
+ self.check_bssid_in_found_result(track_setting["bssidInfos"],
+ event["data"]["Results"])
+ self.attenuators[self.attenuator_id].set_atten(90)
+ # log scan result for debugging
+ for i in range(1, track_setting["apLostThreshold"]):
+ results = self.fetch_scan_result(self.scan_idx,
+ self.default_scan_setting)
+ self.log.debug("scan result %s %s", i, results)
+ event_name = "%s%sonLost" % (BSSID_EVENT_TAG, idx)
+ self.log.info("Waiting for the BSSID event %s", event_name)
+ event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT)
+ self.log.debug(event)
+ except queue.Empty as error:
+ raise AssertionError("Event %s did not triggered for %s\n%s" %
+ (event_name, track_setting["bssidInfos"],
+ error))
finally:
self.dut.droid.wifiScannerStopBackgroundScan(self.scan_idx)
if idx:
@@ -273,9 +256,9 @@
if self.dut.model != "hammerhead":
bssids.append([self.bssid_dfs])
if isLost:
- apthreshold = (3,5)
+ apthreshold = (3, 5)
else:
- apthreshold = (1,)
+ apthreshold = (1, )
# Create track setting strings based on the combinations
setting_combinations = list(itertools.product(bssids, apthreshold))
# Create scan setting strings based on the combinations
@@ -291,13 +274,13 @@
"""Convert track setting to string for Bssids in that"""
string = ""
for bssid_setting in track_setting:
- string += bssid_setting[WifiEnums.BSSID_KEY]
+ string += bssid_setting[wutils.WifiEnums.BSSID_KEY]
string += "_"
return string
""" Helper Functions End """
-
""" Tests Begin """
+
def test_wifi_track_bssid_found(self):
"""Test bssid track for event found with a list of different settings.
@@ -309,15 +292,14 @@
track setting.
"""
track_settings = self.wifi_generate_track_bssid_settings(False)
- name_func = (lambda track_setting :
- "test_wifi_track_found_bssidInfos_{}apLostThreshold_{}".
- format(self.track_setting_to_string(track_setting["bssidInfos"]),
- track_setting["apLostThreshold"]))
- failed = self.run_generated_testcases( self.track_bssid_with_vaild_scan_for_found,
- track_settings, name_func = name_func)
+ name_func = lambda track_setting: "test_wifi_track_found_bssidInfos_%sapLostThreshold_%s" % (self.track_setting_to_string(track_setting["bssidInfos"]), track_setting["apLostThreshold"])
+ failed = self.run_generated_testcases(
+ self.track_bssid_with_vaild_scan_for_found,
+ track_settings,
+ name_func=name_func)
asserts.assert_true(not failed,
- "Track bssid found failed with these bssids: {}".
- format(failed))
+ "Track bssid found failed with these bssids: %s" %
+ failed)
def test_wifi_track_bssid_lost(self):
"""Test bssid track for event lost with a list of different settings.
@@ -332,14 +314,14 @@
7. Verified that onLost event is triggered.
"""
track_settings = self.wifi_generate_track_bssid_settings(True)
- name_func = (lambda track_setting :
- "test_wifi_track_lost_bssidInfos_{}apLostThreshold_{}".
- format(self.track_setting_to_string(track_setting["bssidInfos"]),
- track_setting["apLostThreshold"]))
- failed = self.run_generated_testcases( self.track_bssid_with_vaild_scan_for_lost,
- track_settings, name_func = name_func)
+ name_func = lambda track_setting: "test_wifi_track_lost_bssidInfos_%sapLostThreshold_%s" % (self.track_setting_to_string(track_setting["bssidInfos"]), track_setting["apLostThreshold"])
+ failed = self.run_generated_testcases(
+ self.track_bssid_with_vaild_scan_for_lost,
+ track_settings,
+ name_func=name_func)
asserts.assert_true(not failed,
- "Track bssid lost failed with these bssids: {}".format(failed))
+ "Track bssid lost failed with these bssids: %s" %
+ failed)
def test_wifi_track_bssid_sanity(self):
"""Test bssid track for event found and lost with default settings.
@@ -351,11 +333,11 @@
5. Attenuate the signal to move out of range
6. Verify that onLost event occur.
"""
- track_setting = {"bssidInfos":[self.bssid_2g], "apLostThreshold":3}
+ track_setting = {"bssidInfos": [self.bssid_2g], "apLostThreshold": 3}
self.track_bssid_with_vaild_scan_for_lost(track_setting)
def test_wifi_track_bssid_for_2g_while_scanning_5g_channels(self):
- """Test bssid track for 2g bssids while scanning 5g channels.
+ """Test bssid track for 2g bssids while scanning 5g channels.
1. Starts Wifi Scanner bssid tracking for 2g bssids in track_setting.
2. Start Wifi Scanner scan for 5G Band only.
@@ -363,36 +345,34 @@
4. Attenuate the signal to make AP in range.
5. Verified that onFound event isn't triggered for 2g bssids.
"""
- self.attenuators[self.attenuator_id].set_atten(90)
- scan_setting = { "band": WifiEnums.WIFI_BAND_5_GHZ,
- "periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
- "numBssidsPerScan": 32}
- track_setting = {"bssidInfos":[self.bssid_2g], "apLostThreshold":3}
- valid_env = self.start_scan_and_validate_environment(scan_setting,
- track_setting["bssidInfos"])
- idx = None
- try:
- asserts.assert_true(valid_env,
- "Test environment is not valid, AP is in range")
- data = start_wifi_track_bssid(self.dut, track_setting)
- idx = data["Index"]
- self.attenuators[self.attenuator_id].set_atten(0)
- event_name = "{}{}onFound".format(BSSID_EVENT_TAG, idx)
- self.log.info("Waiting for the BSSID event {}".format(event_name))
- #waiting for 2x time to make sure
- event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT * 2)
- self.log.debug(event)
- found = self.check_bssid_in_found_result(track_setting["bssidInfos"],
- event["data"]["Results"])
- asserts.assert_true(not found,
- "Test fail because Bssid onFound event is triggered")
- except Empty as error:
- self.log.info("As excepted event didn't occurred with different scan setting")
- finally:
- self.dut.droid.wifiScannerStopBackgroundScan(self.scan_idx)
- if idx:
- self.dut.droid.wifiScannerStopTrackingBssids(idx)
+ self.attenuators[self.attenuator_id].set_atten(90)
+ scan_setting = {"band": wutils.WifiEnums.WIFI_BAND_5_GHZ,
+ "periodInMs": SCANTIME,
+ "reportEvents":
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
+ "numBssidsPerScan": 32}
+ track_setting = {"bssidInfos": [self.bssid_2g], "apLostThreshold": 3}
+ self.start_scan_and_validate_environment(scan_setting,
+ track_setting["bssidInfos"])
+ idx = None
+ try:
+ data = wutils.start_wifi_track_bssid(self.dut, track_setting)
+ idx = data["Index"]
+ self.attenuators[self.attenuator_id].set_atten(0)
+ event_name = "%s%sonFound" % (BSSID_EVENT_TAG, idx)
+ self.log.info("Waiting for the BSSID event %s", event_name)
+ #waiting for 2x time to make sure
+ event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT * 2)
+ self.log.debug(event)
+ self.check_bssid_in_found_result(track_setting["bssidInfos"],
+ event["data"]["Results"])
+ except queue.Empty as error:
+ self.log.info(
+ "As excepted event didn't occurred with different scan setting")
+ finally:
+ self.dut.droid.wifiScannerStopBackgroundScan(self.scan_idx)
+ if idx:
+ self.dut.droid.wifiScannerStopTrackingBssids(idx)
def test_wifi_track_bssid_for_5g_while_scanning_2g_channels(self):
"""Test bssid track for 5g bssids while scanning 2g channels.
@@ -404,31 +384,29 @@
5. Verified that onFound event isn't triggered for 5g bssids.
"""
self.attenuators[self.attenuator_id].set_atten(90)
- scan_setting = { "band": WifiEnums.WIFI_BAND_24_GHZ,
- "periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
- "numBssidsPerScan": 32}
- track_setting = {"bssidInfos":[self.bssid_5g], "apLostThreshold":3}
- data = start_wifi_track_bssid(self.dut, track_setting)
+ scan_setting = {"band": wutils.WifiEnums.WIFI_BAND_24_GHZ,
+ "periodInMs": SCANTIME,
+ "reportEvents":
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
+ "numBssidsPerScan": 32}
+ track_setting = {"bssidInfos": [self.bssid_5g], "apLostThreshold": 3}
+ data = wutils.start_wifi_track_bssid(self.dut, track_setting)
idx = data["Index"]
- valid_env = self.start_scan_and_validate_environment(scan_setting,
- track_setting["bssidInfos"])
+ self.start_scan_and_validate_environment(scan_setting,
+ track_setting["bssidInfos"])
idx = None
try:
- asserts.assert_true(valid_env,
- "Test environment is not valid, AP is in range")
self.attenuators[self.attenuator_id].set_atten(0)
- event_name = "{}{}onFound".format(BSSID_EVENT_TAG, idx)
- self.log.info("Waiting for the BSSID event {}".format(event_name))
+ event_name = "%s%sonFound" % (BSSID_EVENT_TAG, idx)
+ self.log.info("Waiting for the BSSID event %s", event_name)
#waiting for 2x time to make sure
event = self.dut.ed.pop_event(event_name, BSSID_EVENT_WAIT * 2)
self.log.debug(event)
- found = self.check_bssid_in_found_result(track_setting["bssidInfos"],
- event["data"]["Results"])
- asserts.assert_true(not found,
- "Test fail because Bssid onFound event is triggered")
- except Empty as error:
- self.log.info("As excepted event didn't occurred with different scan setting")
+ self.check_bssid_in_found_result(track_setting["bssidInfos"],
+ event["data"]["Results"])
+ except queue.Empty as error:
+ self.log.info(
+ "As excepted event didn't occurred with different scan setting")
finally:
self.dut.droid.wifiScannerStopBackgroundScan(self.scan_idx)
if idx:
diff --git a/acts/tests/google/wifi/WifiScannerScanTest.py b/acts/tests/google/wifi/WifiScannerScanTest.py
index 14a15b8..b105ba3 100755
--- a/acts/tests/google/wifi/WifiScannerScanTest.py
+++ b/acts/tests/google/wifi/WifiScannerScanTest.py
@@ -15,28 +15,20 @@
# limitations under the License.
import itertools
-from queue import Empty
-import threading, time, traceback
+import queue
+import time
+import traceback
from acts import asserts
-from acts.base_test import BaseTestClass
-from acts.utils import load_config
-from acts.test_utils.wifi.wifi_test_utils import get_scan_time_and_channels
-from acts.test_utils.wifi.wifi_test_utils import check_internet_connection
-from acts.test_utils.wifi.wifi_test_utils import start_wifi_background_scan
-from acts.test_utils.wifi.wifi_test_utils import start_wifi_single_scan
-from acts.test_utils.wifi.wifi_test_utils import track_connection
-from acts.test_utils.wifi.wifi_test_utils import wifi_test_device_init
-from acts.test_utils.wifi.wifi_test_utils import WifiEnums
-from acts.test_utils.wifi.wifi_test_utils import WifiChannelUS
-from acts.test_utils.wifi.wifi_test_utils import wifi_forget_network
-from acts.test_utils.wifi.wifi_test_utils import wifi_toggle_state
+from acts import base_test
+from acts import utils
+from acts.test_utils.wifi import wifi_test_utils as wutils
-SCANTIME = 10000 #framework support only 10s as minimum scan interval
+SCANTIME = 10000 #framework support only 10s as minimum scan interval
NUMBSSIDPERSCAN = 8
EVENT_TAG = "WifiScannerScan"
-SCAN_TIME_PASSIVE = 47 # dwell time plus 2ms
-SCAN_TIME_ACTIVE = 32 # dwell time plus 2ms
+SCAN_TIME_PASSIVE = 47 # dwell time plus 2ms
+SCAN_TIME_ACTIVE = 32 # dwell time plus 2ms
SHORT_TIMEOUT = 30
NETWORK_ID_ERROR = "Network don't have ID"
NETWORK_ERROR = "Device is not connected to reference network"
@@ -44,15 +36,17 @@
EMPTY_RESULT = "Test fail because empty scan result reported"
KEY_RET = "ResultElapsedRealtime"
+
class WifiScannerScanError(Exception):
pass
-class WifiScannerScanTest(BaseTestClass):
+class WifiScannerScanTest(base_test.BaseTestClass):
def __init__(self, controllers):
BaseTestClass.__init__(self, controllers)
- asserts.failed_scan_settings = None
- # A list of all test cases to be executed in this class.
+ # TODO(angli): Remove this list.
+ # There are order dependencies among these tests so we'll have to leave
+ # it here for now. :(
self.tests = (
"test_available_channels_generated",
"test_wifi_scanner_single_scan_channel_sanity",
@@ -73,36 +67,37 @@
"test_single_scan_while_pno",
"test_wifi_connection_and_pno_while_batch_scan",
"test_wifi_scanner_single_scan_in_isolated",
- "test_wifi_scanner_with_invalid_numBssidsPerScan"
- )
- self.leeway = 10
- self.stime_channel = SCAN_TIME_PASSIVE
- self.default_scan_setting = {
- "band": WifiEnums.WIFI_BAND_BOTH,
- "periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN
- }
- self.default_batch_scan_setting = {
- "band": WifiEnums.WIFI_BAND_BOTH,
- "periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL
- }
+ "test_wifi_scanner_with_invalid_numBssidsPerScan")
def setup_class(self):
self.dut = self.android_devices[0]
- wifi_test_device_init(self.dut)
+ wutils.wifi_test_device_init(self.dut)
req_params = ("connect_network", "run_extended_test", "ping_addr",
"max_bugreports")
self.unpack_userparams(req_params)
+ self.leeway = 10
+ self.stime_channel = SCAN_TIME_PASSIVE
+ self.default_scan_setting = {
+ "band": wutils.WifiEnums.WIFI_BAND_BOTH,
+ "periodInMs": SCANTIME,
+ "reportEvents": wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN
+ }
+ self.default_batch_scan_setting = {
+ "band": wutils.WifiEnums.WIFI_BAND_BOTH,
+ "periodInMs": SCANTIME,
+ "reportEvents": wutils.WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL
+ }
self.log.debug("Run extended test: {}".format(self.run_extended_test))
- self.wifi_chs = WifiChannelUS(self.dut.model)
+ self.wifi_chs = wutils.WifiChannelUS(self.dut.model)
asserts.assert_true(self.dut.droid.wifiIsScannerSupported(),
- "Device %s doesn't support WifiScanner, abort." % self.dut.model)
+ "Device %s doesn't support WifiScanner, abort." %
+ self.dut.model)
+ self.attenuators = wutils.group_attenuators(self.attenuators)
self.attenuators[0].set_atten(0)
self.attenuators[1].set_atten(0)
def teardown_test(self):
- BaseTestClass.teardown_test(self)
+ base_test.BaseTestClass.teardown_test(self)
self.log.debug("Shut down all wifi scanner activities.")
self.dut.droid.wifiScannerShutdown()
@@ -114,7 +109,8 @@
""" Helper Functions Begin """
- def wifi_generate_scanner_scan_settings(self, extended, scan_type, report_result):
+ def wifi_generate_scanner_scan_settings(self, extended, scan_type,
+ report_result):
"""Generates all the combinations of different scan setting parameters.
Args:
@@ -125,29 +121,30 @@
Returns:
A list of dictionaries each representing a set of scan settings.
"""
- base_scan_time = [SCANTIME*2]
+ base_scan_time = [SCANTIME * 2]
if scan_type == "band":
- scan_types_setting = [WifiEnums.WIFI_BAND_BOTH]
+ scan_types_setting = [wutils.WifiEnums.WIFI_BAND_BOTH]
else:
scan_types_setting = [self.wifi_chs.MIX_CHANNEL_SCAN]
- num_of_bssid = [NUMBSSIDPERSCAN*4]
+ num_of_bssid = [NUMBSSIDPERSCAN * 4]
max_scan_cache = [0]
if extended:
base_scan_time.append(SCANTIME)
if scan_type == "band":
- scan_types_setting.extend([WifiEnums.WIFI_BAND_24_GHZ,
- WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS,
- WifiEnums.WIFI_BAND_BOTH_WITH_DFS])
+ scan_types_setting.extend(
+ [wutils.WifiEnums.WIFI_BAND_24_GHZ,
+ wutils.WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS,
+ wutils.WifiEnums.WIFI_BAND_BOTH_WITH_DFS])
else:
- scan_types_setting.extend([self.wifi_chs.NONE_DFS_5G_FREQUENCIES,
- self.wifi_chs.ALL_2G_FREQUENCIES,
- self.wifi_chs.DFS_5G_FREQUENCIES,
- self.wifi_chs.ALL_5G_FREQUENCIES])
- num_of_bssid.append(NUMBSSIDPERSCAN*3)
+ scan_types_setting.extend(
+ [self.wifi_chs.NONE_DFS_5G_FREQUENCIES, self.wifi_chs.
+ ALL_2G_FREQUENCIES, self.wifi_chs.DFS_5G_FREQUENCIES,
+ self.wifi_chs.ALL_5G_FREQUENCIES])
+ num_of_bssid.append(NUMBSSIDPERSCAN * 3)
max_scan_cache.append(5)
# Generate all the combinations of report types and scan types
- if report_result == WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT:
- report_types = {"reportEvents" : report_result}
+ if report_result == wutils.WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT:
+ report_types = {"reportEvents": report_result}
setting_combinations = list(itertools.product(scan_types_setting,
base_scan_time))
# Create scan setting strings based on the combinations
@@ -158,10 +155,10 @@
s["periodInMs"] = combo[1]
scan_settings.append(s)
else:
- report_types = {"reportEvents" : report_result}
- setting_combinations = list(itertools.product(scan_types_setting,
- base_scan_time, num_of_bssid,
- max_scan_cache))
+ report_types = {"reportEvents": report_result}
+ setting_combinations = list(
+ itertools.product(scan_types_setting, base_scan_time,
+ num_of_bssid, max_scan_cache))
# Create scan setting strings based on the combinations
scan_settings = []
for combo in setting_combinations:
@@ -197,25 +194,26 @@
validity = True
scan_time_mic = 0
scan_channels = []
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- scan_setting,
- self.stime_channel)
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, scan_setting, self.stime_channel)
scan_time_mic = scan_time * 1000
for i, batch in enumerate(scan_resutls, start=1):
- asserts.assert_true(batch["ScanResults"],
- "At least one scan result is required to validate")
- max_scan_interval = batch["ScanResults"][0]["timestamp"] + scan_time_mic
- self.log.debug("max_scan_interval: {}".format(max_scan_interval) )
+ asserts.assert_true(
+ batch["ScanResults"],
+ "At least one scan result is required to validate")
+ max_scan_interval = batch["ScanResults"][0][
+ "timestamp"] + scan_time_mic
+ self.log.debug("max_scan_interval: %s", max_scan_interval)
for result in batch["ScanResults"]:
- if (result["frequency"] not in scan_channels
- or result["timestamp"] > max_scan_interval
- or result["timestamp"] < scan_rt*1000
- or result["timestamp"] > result_rt*1000) :
- self.log.error("Result didn't match requirement: {}".
- format(result) )
- validity = False
- self.log.info("Number of scan result in batch {} :: {}".format(i,
- len(batch["ScanResults"])))
+ if (result["frequency"] not in scan_channels or
+ result["timestamp"] > max_scan_interval or
+ result["timestamp"] < scan_rt * 1000 or
+ result["timestamp"] > result_rt * 1000):
+ self.log.error("Result didn't match requirement: %s",
+ result)
+ validity = False
+ self.log.info("Number of scan result in batch %s: %s", i,
+ len(batch["ScanResults"]))
bssids += len(batch["ScanResults"])
return bssids, validity
@@ -233,8 +231,8 @@
events = self.dut.ed.pop_all(event_name)
for event in events:
results.append(event["data"]["Results"])
- except Empty as error:
- self.log.debug("Number of Full scan results {}".format(len(results)))
+ except queue.Empty as error:
+ self.log.debug("Number of Full scan results %s", len(results))
return results
def wifi_scanner_single_scan(self, scan_setting):
@@ -249,50 +247,49 @@
Args:
scan_setting: The params for the single scan.
"""
- data = start_wifi_single_scan(self.dut, scan_setting)
+ data = wutils.start_wifi_single_scan(self.dut, scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi single shot scan started index: {} at real time: {}".
- format(idx, scan_rt))
+ self.log.info(
+ "Wifi single shot scan started index: %s at real time: %s", idx,
+ scan_rt)
results = []
#generating event wait time from scan setting plus leeway
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- scan_setting,
- self.stime_channel)
- wait_time = int(scan_time/1000) + self.leeway
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, scan_setting, self.stime_channel)
+ wait_time = int(scan_time / 1000) + self.leeway
validity = False
#track number of result received
result_received = 0
try:
- for snumber in range(1,3):
+ for snumber in range(1, 3):
event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.debug("Waiting for event: {} for time {}".
- format(event_name, wait_time))
+ self.log.debug("Waiting for event: %s for time %s", event_name,
+ wait_time)
event = self.dut.ed.pop_event(event_name, wait_time)
- self.log.debug("Event received: {}".format(event ))
+ self.log.debug("Event received: %s", event)
results = event["data"]["Results"]
result_received += 1
bssids, validity = self.proces_and_valid_batch_scan_result(
- results, scan_rt,
- event["data"][KEY_RET],
- scan_setting)
- asserts.assert_true(len(results) == 1,
- "Test fail because number of scan result {}"
- .format(len(results)))
+ results, scan_rt, event["data"][KEY_RET], scan_setting)
+ asserts.assert_equal(
+ len(results), 1,
+ "Test fail because number of scan result %s" %
+ len(results))
asserts.assert_true(bssids > 0, EMPTY_RESULT)
asserts.assert_true(validity, INVALID_RESULT)
- self.log.info("Scan number Buckets: {}\nTotal BSSID: {}".
- format(len(results), bssids))
- except Empty as error:
- asserts.assert_true(result_received >= 1,
- "Event did not triggered for single shot {}".
- format(error))
+ self.log.info("Scan number Buckets: %s\nTotal BSSID: %s",
+ len(results), bssids)
+ except queue.Empty as error:
+ asserts.assert_true(
+ result_received >= 1,
+ "Event did not triggered for single shot {}".format(error))
finally:
self.dut.droid.wifiScannerStopScan(idx)
#For single shot number of result received and length of result should be one
- asserts.assert_true(result_received == 1,
- "Test fail because received result {}".
- format(result_received))
+ asserts.assert_true(
+ result_received == 1,
+ "Test fail because received result {}".format(result_received))
def wifi_scanner_single_scan_full(self, scan_setting):
"""Common logic for single scan test case for full scan result.
@@ -307,37 +304,35 @@
scan_setting: The parameters for the single scan.
"""
self.dut.ed.clear_all_events()
- data = start_wifi_single_scan(self.dut, scan_setting)
+ data = wutils.start_wifi_single_scan(self.dut, scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi single shot scan started with index: {}".format(idx))
+ self.log.info("Wifi single shot scan started with index: %s", idx)
#generating event wait time from scan setting plus leeway
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- scan_setting,
- self.stime_channel)
- wait_time = int(scan_time/1000) + self.leeway
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, scan_setting, self.stime_channel)
+ wait_time = int(scan_time / 1000) + self.leeway
results = []
validity = False
try:
- event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.debug("Waiting for event: {} for time {}".
- format(event_name, wait_time))
+ event_name = "%s%sonResults" % (EVENT_TAG, idx)
+ self.log.debug("Waiting for event: %s for time %s", event_name,
+ wait_time)
event = self.dut.ed.pop_event(event_name, wait_time)
- self.log.info("Event received: {}".format(event))
+ self.log.info("Event received: %s", event)
bssids, validity = (self.proces_and_valid_batch_scan_result(
- event["data"]["Results"], scan_rt,
- event["data"][KEY_RET],
- scan_setting))
+ event["data"]["Results"], scan_rt, event["data"][KEY_RET],
+ scan_setting))
asserts.assert_true(bssids > 0, EMPTY_RESULT)
asserts.assert_true(validity, INVALID_RESULT)
event_name = "{}{}onFullResult".format(EVENT_TAG, idx)
results = self.pop_scan_result_events(event_name)
- asserts.assert_true(len(results) >= bssids,
- "Full single shot result don't match {}".
- format(len(results)))
- except Empty as error:
- raise AssertionError("Event did not triggered for single shot {}".
- format(error))
+ asserts.assert_true(
+ len(results) >= bssids,
+ "Full single shot result don't match {}".format(len(results)))
+ except queue.Empty as error:
+ raise AssertionError(
+ "Event did not triggered for single shot {}".format(error))
finally:
self.dut.droid.wifiScannerStopScan(idx)
@@ -354,41 +349,39 @@
scan_setting: The params for the batch scan.
"""
self.dut.ed.clear_all_events()
- data = start_wifi_background_scan(self.dut, scan_setting)
+ data = wutils.start_wifi_background_scan(self.dut, scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi batch shot scan started with index: {}".format(idx))
+ self.log.info("Wifi batch shot scan started with index: %s", idx)
#generating event wait time from scan setting plus leeway
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- scan_setting,
- self.stime_channel)
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, scan_setting, self.stime_channel)
# multiply scan period by two to account for scheduler changing period
- scan_time += scan_setting['periodInMs'] * 2 #add scan period delay for next cycle
+ scan_time += scan_setting[
+ 'periodInMs'] * 2 #add scan period delay for next cycle
wait_time = scan_time / 1000 + self.leeway
validity = False
try:
- for snumber in range(1,3):
+ for snumber in range(1, 3):
results = []
- event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.debug("Waiting for event: {} for time {}".
- format(event_name, wait_time))
+ event_name = "%s%sonResults" % (EVENT_TAG, idx)
+ self.log.debug("Waiting for event: %s for time %s", event_name,
+ wait_time)
event = self.dut.ed.pop_event(event_name, wait_time)
- self.log.debug("Event received: {}".format(event))
+ self.log.debug("Event received: %s", event)
bssids, validity = self.proces_and_valid_batch_scan_result(
- event["data"]["Results"],
- scan_rt,
- event["data"][KEY_RET],
- scan_setting)
- event_name = "{}{}onFullResult".format(EVENT_TAG, idx)
+ event["data"]["Results"], scan_rt, event["data"][KEY_RET],
+ scan_setting)
+ event_name = "%s%sonFullResult" % (EVENT_TAG, idx)
results = self.pop_scan_result_events(event_name)
- asserts.assert_true(len(results) >= bssids,
- "Full single shot result don't match {}".
- format(len(results)))
+ asserts.assert_true(
+ len(results) >= bssids,
+ "Full single shot result don't match %s" % len(results))
asserts.assert_true(bssids > 0, EMPTY_RESULT)
asserts.assert_true(validity, INVALID_RESULT)
- except Empty as error:
- raise AssertionError("Event did not triggered for batch scan {}".
- format(error))
+ except queue.Empty as error:
+ raise AssertionError("Event did not triggered for batch scan %s" %
+ error)
finally:
self.dut.droid.wifiScannerStopBackgroundScan(idx)
self.dut.ed.clear_all_events()
@@ -405,72 +398,73 @@
Args:
scan_setting: The parameters for the batch scan.
"""
- data = start_wifi_background_scan(self.dut, scan_setting)
+ data = wutils.start_wifi_background_scan(self.dut, scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi background scan started with index: {} real time {}"
- .format(idx, scan_rt))
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- scan_setting,
- self.stime_channel)
+ self.log.info(
+ "Wifi background scan started with index: %s real time %s", idx,
+ scan_rt)
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, scan_setting, self.stime_channel)
#generating event wait time from scan setting plus leeway
time_cache = 0
- number_bucket = 1 #bucket for Report result on each scan
+ number_bucket = 1 #bucket for Report result on each scan
check_get_result = False
- if scan_setting['reportEvents'] == WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL:
+ if scan_setting[
+ 'reportEvents'] == wutils.WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL:
check_get_result = True
if ('maxScansToCache' in scan_setting and
- scan_setting['maxScansToCache'] != 0) :
+ scan_setting['maxScansToCache'] != 0):
time_cache = (scan_setting['maxScansToCache'] *
scan_setting['periodInMs'])
number_bucket = scan_setting['maxScansToCache']
else:
- time_cache = 10 * scan_setting['periodInMs'] #10 as default max scan cache
+ time_cache = 10 * scan_setting['periodInMs'
+ ] #10 as default max scan cache
number_bucket = 10
else:
- time_cache = scan_setting['periodInMs'] #need while waiting for seconds resutls
+ time_cache = scan_setting[
+ 'periodInMs'
+ ] #need while waiting for seconds resutls
# multiply cache time by two to account for scheduler changing period
wait_time = (time_cache * 2 + scan_time) / 1000 + self.leeway
validity = False
try:
- for snumber in range(1,3):
- event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.info("Waiting for event: {} for time {}".
- format(event_name,wait_time))
+ for snumber in range(1, 3):
+ event_name = "%s%sonResults" % (EVENT_TAG, idx)
+ self.log.info("Waiting for event: %s for time %s", event_name,
+ wait_time)
event = self.dut.ed.pop_event(event_name, wait_time)
- self.log.debug("Event received: {}".format(event ))
+ self.log.debug("Event received: %s", event)
results = event["data"]["Results"]
bssids, validity = (self.proces_and_valid_batch_scan_result(
- results, scan_rt,
- event["data"][KEY_RET],
- scan_setting))
- self.log.info("Scan number: {}\n Buckets: {}\n BSSID: {}".
- format(snumber, len(results), bssids))
- asserts.assert_true(len(results) == number_bucket,
- "Test fail because number_bucket {}".
- format(len(results)))
+ results, scan_rt, event["data"][KEY_RET], scan_setting))
+ self.log.info("Scan number: %s\n Buckets: %s\n BSSID: %s",
+ snumber, len(results), bssids)
+ asserts.assert_equal(
+ len(results), number_bucket,
+ "Test fail because number_bucket %s" % len(results))
asserts.assert_true(bssids >= 1, EMPTY_RESULT)
asserts.assert_true(validity, INVALID_RESULT)
- if snumber%2 == 1 and check_get_result :
+ if snumber % 2 == 1 and check_get_result:
self.log.info("Get Scan result using GetScanResult API")
- time.sleep(wait_time/number_bucket)
+ time.sleep(wait_time / number_bucket)
if self.dut.droid.wifiScannerGetScanResults():
event = self.dut.ed.pop_event(event_name, 1)
- self.log.debug("Event onResults: {}".format(event))
+ self.log.debug("Event onResults: %s", event)
results = event["data"]["Results"]
- bssids, validity = (self.proces_and_valid_batch_scan_result(
- results, scan_rt,
- event["data"][KEY_RET],
- scan_setting))
- self.log.info("Got Scan result number: {} BSSID: {}".
- format(snumber, bssids))
+ bssids, validity = self.proces_and_valid_batch_scan_result(
+ results, scan_rt, event["data"][KEY_RET],
+ scan_setting)
+ self.log.info("Got Scan result number: %s BSSID: %s",
+ snumber, bssids)
asserts.assert_true(bssids >= 1, EMPTY_RESULT)
asserts.assert_true(validity, INVALID_RESULT)
else:
self.log.error("Error while fetching the scan result")
- except Empty as error:
- raise AssertionError("Event did not triggered for batch scan {}".
- format(error))
+ except queue.Empty as error:
+ raise AssertionError("Event did not triggered for batch scan %s" %
+ error)
finally:
self.dut.droid.wifiScannerStopBackgroundScan(idx)
self.dut.ed.clear_all_events()
@@ -487,10 +481,11 @@
"""
try:
idx = self.dut.droid.wifiScannerStartScan(scan_setting)
- event = self.dut.ed.pop_event("{}{}onFailure".format(EVENT_TAG, idx),
- SHORT_TIMEOUT)
- except Empty as error:
- raise AssertionError("Did not get expected onFailure {}".format(error))
+ event = self.dut.ed.pop_event(
+ "{}{}onFailure".format(EVENT_TAG, idx), SHORT_TIMEOUT)
+ except queue.Empty as error:
+ raise AssertionError("Did not get expected onFailure {}".format(
+ error))
def start_wifi_scanner_background_scan_expect_failure(self, scan_setting):
"""Common logic to test wif scanner batch scan with invalid settings
@@ -503,11 +498,12 @@
scan_setting: The params for the single scan.
"""
try:
- idx = self.dut.droid.wifiScannerStartBackgroundScan(scan_setting)
- event = self.dut.ed.pop_event("{}{}onFailure".format(EVENT_TAG, idx),
- SHORT_TIMEOUT)
- except Empty as error:
- raise AssertionError("Did not get expected onFailure {}".format(error))
+ idx = self.dut.droid.wifiScannerStartBackgroundScan(scan_setting)
+ event = self.dut.ed.pop_event(
+ "{}{}onFailure".format(EVENT_TAG, idx), SHORT_TIMEOUT)
+ except queue.Empty as error:
+ raise AssertionError("Did not get expected onFailure {}".format(
+ error))
def check_get_available_channels_with_one_band(self, band):
"""Common logic to check available channels for a band.
@@ -522,9 +518,7 @@
self.log.debug(band)
self.log.debug(r)
expected = self.wifi_chs.band_to_freq(band)
- asserts.assert_true(set(r) == set(expected),
- "Band {} failed. Expected {}, got {}".
- format(band, expected, r))
+ asserts.assert_equal(set(r), set(expected), "Band %s failed." % band)
def connect_to_reference_network(self):
"""Connect to reference network and make sure that connection happen"""
@@ -532,37 +526,40 @@
self.dut.droid.wakeUpNow()
try:
self.dut.droid.wifiPriorityConnect(self.connect_network)
- connect_result = self.dut.ed.pop_event("WifiManagerPriorityConnectOnSuccess",
- SHORT_TIMEOUT)
+ connect_result = self.dut.ed.pop_event(
+ "WifiManagerPriorityConnectOnSuccess", SHORT_TIMEOUT)
self.log.info(connect_result)
- return track_connection(self.dut, self.connect_network["ssid"], 1)
+ return wutils.track_connection(self.dut,
+ self.connect_network["ssid"], 1)
except Exception as error:
self.log.exception(traceback.format_exc())
- self.log.error("Connection to network fail because {}".format(error))
+ self.log.error("Connection to network fail because %s", error)
return False
finally:
self.dut.droid.wifiLockRelease()
self.dut.droid.goToSleepNow()
""" Helper Functions End """
-
""" Tests Begin """
+
def test_available_channels_generated(self):
"""Test available channels for different bands.
1. Get available channels for different bands.
2. Verify that channels match with supported channels for respective band.
"""
- bands = (1,2,3,4,6,7)
- name_func = lambda band : "test_get_channel_band_{}".format(band)
+ bands = (1, 2, 3, 4, 6, 7)
+ name_func = lambda band: "test_get_channel_band_{}".format(band)
failed = self.run_generated_testcases(
- self.check_get_available_channels_with_one_band,
- bands, name_func = name_func)
- asserts.assert_true(not failed,
- "Number of test_get_channel_band failed {}".
- format(len(failed)))
+ self.check_get_available_channels_with_one_band,
+ bands,
+ name_func=name_func)
+ asserts.assert_true(
+ not failed,
+ "Number of test_get_channel_band failed {}".format(len(failed)))
- def test_single_scan_report_each_scan_for_channels_with_enumerated_params(self):
+ def test_single_scan_report_each_scan_for_channels_with_enumerated_params(
+ self):
"""Test WiFi scanner single scan for channels with enumerated settings.
1. Start WifiScanner single scan for different channels with enumerated
@@ -570,25 +567,23 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "channels",
- WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
- self.log.debug("Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_single_scan_report_each_scan_for_channels_{}"
- "_numBssidsPerScan_{}_maxScansToCache_{}_period_{}").
- format(scan_setting["channels"],
- scan_setting["numBssidsPerScan"],
- scan_setting["maxScansToCache"],
- scan_setting["periodInMs"]))
+ self.run_extended_test, "channels",
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
+ self.log.debug("Scan settings: %s\n%s", len(scan_settings),
+ scan_settings)
+ name_func = (
+ lambda scan_setting: ("test_single_scan_report_each_scan_for_channels_{}"
+ "_numBssidsPerScan_{}_maxScansToCache_{}_period_{}").format(scan_setting["channels"], scan_setting["numBssidsPerScan"], scan_setting["maxScansToCache"], scan_setting["periodInMs"])
+ )
failed = self.run_generated_testcases(self.wifi_scanner_single_scan,
scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_single_scan_report_each_scan_for_channels"
- " failed {}").format(len(failed)))
+ name_func=name_func)
+ asserts.assert_true(not failed, (
+ "Number of test_single_scan_report_each_scan_for_channels"
+ " failed {}").format(len(failed)))
- def test_single_scan_report_each_scan_for_band_with_enumerated_params(self):
+ def test_single_scan_report_each_scan_for_band_with_enumerated_params(
+ self):
"""Test WiFi scanner single scan for bands with enumerated settings.
1. Start WifiScanner single scan for different bands with enumerated
@@ -596,25 +591,27 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test,"band",
- WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
- self.log.debug("Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_single_scan_report_each_scan_for_band_{}"
- "_numBssidsPerScan_{}_maxScansToCache_{}_period_{}").
- format(scan_setting["band"],
- scan_setting["numBssidsPerScan"],
- scan_setting["maxScansToCache"],
- scan_setting["periodInMs"]))
+ self.run_extended_test, "band",
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
+ self.log.debug("Scan settings:%s\n%s", len(scan_settings),
+ scan_settings)
+
+ def scan_setting_name_gen(scan_setting):
+ return "test_single_scan_report_each_scan_for_band_%s_numBssidsPerScan_%s_maxScansToCache_%s_period_%s" % (
+ scan_setting["band"], scan_setting["numBssidsPerScan"],
+ scan_setting["maxScansToCache"], scan_setting["periodInMs"])
+
+ name_func = scan_setting_name_gen
failed = self.run_generated_testcases(self.wifi_scanner_single_scan,
scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_single_scan_report_each_scan_for_band"
- " failed {}").format(len(failed)))
+ name_func=name_func)
+ asserts.assert_true(
+ not failed,
+ "Number of test_single_scan_report_each_scan_for_band failed %s" %
+ len(failed))
- def test_batch_scan_report_buffer_full_for_channels_with_enumerated_params(self):
+ def test_batch_scan_report_buffer_full_for_channels_with_enumerated_params(
+ self):
"""Test WiFi scanner batch scan using channels with enumerated settings
to report buffer full scan results.
@@ -623,25 +620,23 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "channels",
- WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL)
- self.log.debug("Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_batch_scan_report_buffer_full_for_channels_{}"
- "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").
- format(scan_setting["channels"],
- scan_setting["numBssidsPerScan"],
- scan_setting["maxScansToCache"],
- scan_setting["periodInMs"]))
+ self.run_extended_test, "channels",
+ wutils.WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL)
+ self.log.debug("Scan settings:%s\n%s", len(scan_settings),
+ scan_settings)
+ name_func = (
+ lambda scan_setting: ("test_batch_scan_report_buffer_full_for_channels_{}"
+ "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").format(scan_setting["channels"], scan_setting["numBssidsPerScan"], scan_setting["maxScansToCache"], scan_setting["periodInMs"])
+ )
failed = self.run_generated_testcases(self.wifi_scanner_batch_scan,
scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_batch_scan_report_buffer_full_for_channels"
- " failed {}").format(len(failed)))
+ name_func=name_func)
+ asserts.assert_true(not failed, (
+ "Number of test_batch_scan_report_buffer_full_for_channels"
+ " failed {}").format(len(failed)))
- def test_batch_scan_report_buffer_full_for_band_with_enumerated_params(self):
+ def test_batch_scan_report_buffer_full_for_band_with_enumerated_params(
+ self):
"""Test WiFi scanner batch scan using band with enumerated settings
to report buffer full scan results.
@@ -650,25 +645,23 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test,"band",
- WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL)
- self.log.debug("Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_batch_scan_report_buffer_full_for_band_{}"
- "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").
- format(scan_setting["band"],
- scan_setting["numBssidsPerScan"],
- scan_setting["maxScansToCache"],
- scan_setting["periodInMs"]))
+ self.run_extended_test, "band",
+ wutils.WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL)
+ self.log.debug("Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: ("test_batch_scan_report_buffer_full_for_band_{}"
+ "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").format(scan_setting["band"], scan_setting["numBssidsPerScan"], scan_setting["maxScansToCache"], scan_setting["periodInMs"])
+ )
failed = self.run_generated_testcases(self.wifi_scanner_batch_scan,
scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_batch_scan_report_buffer_full_for_band"
- " failed {}").format(len(failed)))
+ name_func=name_func)
+ asserts.assert_true(not failed, (
+ "Number of test_batch_scan_report_buffer_full_for_band"
+ " failed {}").format(len(failed)))
- def test_batch_scan_report_each_scan_for_channels_with_enumerated_params(self):
+ def test_batch_scan_report_each_scan_for_channels_with_enumerated_params(
+ self):
"""Test WiFi scanner batch scan using channels with enumerated settings
to report each scan results.
@@ -677,23 +670,20 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "channels",
- WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
- self.log.debug("Scan settings:{}\n{}".
- format(len(scan_settings),scan_settings))
- name_func = (lambda scan_setting :
- ("test_batch_scan_report_each_scan_for_channels_{}"
- "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").
- format(scan_setting["channels"],
- scan_setting["numBssidsPerScan"],
- scan_setting["maxScansToCache"],
- scan_setting["periodInMs"]))
+ self.run_extended_test, "channels",
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
+ self.log.debug("Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: ("test_batch_scan_report_each_scan_for_channels_{}"
+ "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").format(scan_setting["channels"], scan_setting["numBssidsPerScan"], scan_setting["maxScansToCache"], scan_setting["periodInMs"])
+ )
failed = self.run_generated_testcases(self.wifi_scanner_batch_scan,
scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_batch_scan_report_each_scan_for_channels"
- " failed {}").format(len(failed)))
+ name_func=name_func)
+ asserts.assert_true(not failed, (
+ "Number of test_batch_scan_report_each_scan_for_channels"
+ " failed {}").format(len(failed)))
def test_batch_scan_report_each_scan_for_band_with_enumerated_params(self):
"""Test WiFi scanner batch scan using band with enumerated settings
@@ -704,25 +694,23 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "band",
- WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
- self.log.debug("Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_batch_scan_report_each_scan_for_band_{}"
- "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").
- format(scan_setting["band"],
- scan_setting["numBssidsPerScan"],
- scan_setting["maxScansToCache"],
- scan_setting["periodInMs"]))
+ self.run_extended_test, "band",
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN)
+ self.log.debug("Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: ("test_batch_scan_report_each_scan_for_band_{}"
+ "_numBssidsPerScan_{}_maxScansToCache_{}_periodInMs_{}").format(scan_setting["band"], scan_setting["numBssidsPerScan"], scan_setting["maxScansToCache"], scan_setting["periodInMs"])
+ )
failed = self.run_generated_testcases(self.wifi_scanner_batch_scan,
scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_batch_scan_report_each_scan_for_band"
- " failed {}").format(len(failed)))
+ name_func=name_func)
+ asserts.assert_true(
+ not failed, ("Number of test_batch_scan_report_each_scan_for_band"
+ " failed {}").format(len(failed)))
- def test_single_scan_report_full_scan_for_channels_with_enumerated_params(self):
+ def test_single_scan_report_full_scan_for_channels_with_enumerated_params(
+ self):
"""Test WiFi scanner single scan using channels with enumerated settings
to report full scan results.
@@ -731,21 +719,23 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "channels",
- WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
- self.log.debug("Full Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- "test_single_scan_report_full_scan_for_channels_{}_periodInMs_{}".
- format(scan_setting["channels"],scan_setting["periodInMs"]))
- failed = self.run_generated_testcases(self.wifi_scanner_single_scan_full,
- scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_single_scan_report_full_scan_for_channels"
- " failed {}").format(len(failed)))
+ self.run_extended_test, "channels",
+ wutils.WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
+ self.log.debug("Full Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: "test_single_scan_report_full_scan_for_channels_{}_periodInMs_{}".format(scan_setting["channels"], scan_setting["periodInMs"])
+ )
+ failed = self.run_generated_testcases(
+ self.wifi_scanner_single_scan_full,
+ scan_settings,
+ name_func=name_func)
+ asserts.assert_true(not failed, (
+ "Number of test_single_scan_report_full_scan_for_channels"
+ " failed {}").format(len(failed)))
- def test_single_scan_report_full_scan_for_band_with_enumerated_params(self):
+ def test_single_scan_report_full_scan_for_band_with_enumerated_params(
+ self):
"""Test WiFi scanner single scan using band with enumerated settings
to report full scan results.
@@ -754,21 +744,23 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "band",
- WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
- self.log.debug("Full Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- "test_single_scan_report_full_scan_for_band_{}_periodInMs_{}".
- format(scan_setting["band"],scan_setting["periodInMs"]))
- failed = self.run_generated_testcases(self.wifi_scanner_single_scan_full,
- scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_single_scan_report_full_scan_for_band"
- " failed {}").format(len(failed)))
+ self.run_extended_test, "band",
+ wutils.WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
+ self.log.debug("Full Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: "test_single_scan_report_full_scan_for_band_{}_periodInMs_{}".format(scan_setting["band"], scan_setting["periodInMs"])
+ )
+ failed = self.run_generated_testcases(
+ self.wifi_scanner_single_scan_full,
+ scan_settings,
+ name_func=name_func)
+ asserts.assert_true(
+ not failed, ("Number of test_single_scan_report_full_scan_for_band"
+ " failed {}").format(len(failed)))
- def test_batch_scan_report_full_scan_for_channels_with_enumerated_params(self):
+ def test_batch_scan_report_full_scan_for_channels_with_enumerated_params(
+ self):
"""Test WiFi scanner batch scan using channels with enumerated settings
to report full scan results.
@@ -777,20 +769,21 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "channels",
- WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
- self.log.debug("Full Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_batch_scan_report_full_scan_for_channels"
- "_{}_periodInMs_{}").format(scan_setting["channels"],
- scan_setting["periodInMs"]))
- failed = self.run_generated_testcases(self.wifi_scanner_batch_scan_full,
- scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_batch_scan_report_full_scan_for_channels"
- " failed {}").format(len(failed)))
+ self.run_extended_test, "channels",
+ wutils.WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
+ self.log.debug("Full Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: ("test_batch_scan_report_full_scan_for_channels"
+ "_{}_periodInMs_{}").format(scan_setting["channels"], scan_setting["periodInMs"])
+ )
+ failed = self.run_generated_testcases(
+ self.wifi_scanner_batch_scan_full,
+ scan_settings,
+ name_func=name_func)
+ asserts.assert_true(not failed, (
+ "Number of test_batch_scan_report_full_scan_for_channels"
+ " failed {}").format(len(failed)))
def test_batch_scan_report_full_scan_for_band_with_enumerated_params(self):
"""Test WiFi scanner batch scan using channels with enumerated settings
@@ -801,20 +794,21 @@
2. Verify that scan results match with respective scan settings.
"""
scan_settings = self.wifi_generate_scanner_scan_settings(
- self.run_extended_test, "band",
- WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
- self.log.debug("Full Scan settings:{}\n{}".format(len(scan_settings),
- scan_settings))
- name_func = (lambda scan_setting :
- ("test_batch_scan_report_full_scan_for_band"
- "_{}_periodInMs_{}").format(scan_setting["band"],
- scan_setting["periodInMs"]))
- failed = self.run_generated_testcases(self.wifi_scanner_batch_scan_full,
- scan_settings,
- name_func = name_func)
- asserts.assert_true(not failed,
- ("Number of test_batch_scan_report_full_scan_for_band"
- " failed {}").format(len(failed)))
+ self.run_extended_test, "band",
+ wutils.WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT)
+ self.log.debug("Full Scan settings:{}\n{}".format(
+ len(scan_settings), scan_settings))
+ name_func = (
+ lambda scan_setting: ("test_batch_scan_report_full_scan_for_band"
+ "_{}_periodInMs_{}").format(scan_setting["band"], scan_setting["periodInMs"])
+ )
+ failed = self.run_generated_testcases(
+ self.wifi_scanner_batch_scan_full,
+ scan_settings,
+ name_func=name_func)
+ asserts.assert_true(
+ not failed, ("Number of test_batch_scan_report_full_scan_for_band"
+ " failed {}").format(len(failed)))
def test_wifi_connection_while_single_scan(self):
"""Test configuring a connection parallel to wifi scanner single scan.
@@ -825,37 +819,39 @@
2. Verify that scanner report single scan results.
"""
self.attenuators[self.connect_network["attenuator"]].set_atten(0)
- data = start_wifi_single_scan(self.dut, self.default_scan_setting)
+ data = wutils.start_wifi_single_scan(self.dut,
+ self.default_scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi single shot scan started with index: {}".format(idx))
+ self.log.info("Wifi single shot scan started with index: {}".format(
+ idx))
asserts.assert_true(self.connect_to_reference_network(), NETWORK_ERROR)
- time.sleep(10) #wait for connection to be active
- asserts.assert_true(check_internet_connection(self.dut, self.ping_addr),
- "Error, No internet connection for current network")
+ time.sleep(10) #wait for connection to be active
+ asserts.assert_true(
+ wutils.check_internet_connection(self.dut, self.ping_addr),
+ "Error, No internet connection for current network")
#generating event wait time from scan setting plus leeway
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- self.default_scan_setting,
- self.stime_channel)
- wait_time = int(scan_time/1000) + self.leeway
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, self.default_scan_setting, self.stime_channel)
+ wait_time = int(scan_time / 1000) + self.leeway
validity = False
try:
event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.debug("Waiting for event: {} for time {}".
- format(event_name, wait_time))
+ self.log.debug("Waiting for event: {} for time {}".format(
+ event_name, wait_time))
event = self.dut.ed.pop_event(event_name, wait_time)
- self.log.debug("Event received: {}".format(event ))
+ self.log.debug("Event received: {}".format(event))
results = event["data"]["Results"]
bssids, validity = self.proces_and_valid_batch_scan_result(
- results, scan_rt,
- event["data"][KEY_RET],
- self.default_scan_setting)
- self.log.info("Scan number Buckets: {}\nTotal BSSID: {}".
- format(len(results), bssids))
- asserts.assert_true(len(results) == 1 and bssids >= 1, EMPTY_RESULT)
- except Empty as error:
- raise AssertionError("Event did not triggered for single scan {}".
- format(error))
+ results, scan_rt, event["data"][KEY_RET],
+ self.default_scan_setting)
+ self.log.info("Scan number Buckets: {}\nTotal BSSID: {}".format(
+ len(results), bssids))
+ asserts.assert_true(
+ len(results) == 1 and bssids >= 1, EMPTY_RESULT)
+ except queue.Empty as error:
+ raise AssertionError(
+ "Event did not triggered for single scan {}".format(error))
def test_single_scan_while_pno(self):
"""Test wifi scanner single scan parallel to PNO connection.
@@ -874,20 +870,21 @@
asserts.assert_true(current_network['network_id'] >= 0, NETWORK_ERROR)
self.log.info("Kicking PNO for reference network")
self.attenuators[self.connect_network["attenuator"]].set_atten(90)
- time.sleep(10) #wait for PNO to be kicked
+ time.sleep(10) #wait for PNO to be kicked
self.log.info("Starting single scan while PNO")
self.wifi_scanner_single_scan(self.default_scan_setting)
self.attenuators[self.connect_network["attenuator"]].set_atten(0)
self.log.info("Check connection through PNO for reference network")
- time.sleep(30) #wait for connection through PNO
+ time.sleep(30) #wait for connection through PNO
current_network = self.dut.droid.wifiGetConnectionInfo()
self.log.info("Current network: {}".format(current_network))
asserts.assert_true('network_id' in current_network, NETWORK_ID_ERROR)
asserts.assert_true(current_network['network_id'] >= 0, NETWORK_ERROR)
- time.sleep(10) #wait for IP to be assigned
- asserts.assert_true(check_internet_connection(self.dut, self.ping_addr),
- "Error, No internet connection for current network")
- wifi_forget_network(self.dut, self.connect_network["ssid"])
+ time.sleep(10) #wait for IP to be assigned
+ asserts.assert_true(
+ wutils.check_internet_connection(self.dut, self.ping_addr),
+ "Error, No internet connection for current network")
+ wutils.wifi_forget_network(self.dut, self.connect_network["ssid"])
def test_wifi_connection_and_pno_while_batch_scan(self):
"""Test configuring a connection and PNO connection parallel to wifi
@@ -907,70 +904,90 @@
12. Verify connection occurred through PNO.
"""
self.attenuators[self.connect_network["attenuator"]].set_atten(0)
- data = start_wifi_background_scan(self.dut, self.default_batch_scan_setting)
+ data = wutils.start_wifi_background_scan(
+ self.dut, self.default_batch_scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi background scan started with index: {} rt {}".
- format(idx, scan_rt))
+ self.log.info(
+ "Wifi background scan started with index: {} rt {}".format(
+ idx, scan_rt))
#generating event wait time from scan setting plus leeway
- scan_time, scan_channels = get_scan_time_and_channels(
- self.wifi_chs,
- self.default_batch_scan_setting,
- self.stime_channel)
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, self.default_batch_scan_setting, self.stime_channel)
#default number buckets
number_bucket = 10
- time_cache = self.default_batch_scan_setting['periodInMs'] * number_bucket #default cache
+ time_cache = self.default_batch_scan_setting[
+ 'periodInMs'] * number_bucket #default cache
#add 2 seconds extra time for switch between the channel for connection scan
#multiply cache time by two to account for scheduler changing period
wait_time = (time_cache * 2 + scan_time) / 1000 + self.leeway + 2
result_flag = 0
try:
- for snumber in range(1,7):
- event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.info("Waiting for event: {}".format(event_name ))
- event = self.dut.ed.pop_event(event_name, wait_time)
- self.log.debug("Event onResults: {}".format(event ))
- results = event["data"]["Results"]
- bssids, validity = self.proces_and_valid_batch_scan_result(
- results, scan_rt,
- event["data"][KEY_RET],
- self.default_batch_scan_setting)
- self.log.info("Scan number: {}\n Buckets: {}\n BSSID: {}".
- format(snumber, len(results), bssids))
- asserts.assert_true(bssids >= 1, "Not able to fetch scan result")
- if snumber == 1:
- self.log.info("Try to connect AP while waiting for event: {}".
- format(event_name ))
- asserts.assert_true(self.connect_to_reference_network(), NETWORK_ERROR)
- time.sleep(10) #wait for connection to be active
- asserts.assert_true(check_internet_connection(self.dut, self.ping_addr),
- "Error, No internet connection for current network")
- elif snumber == 3:
- self.log.info("Kicking PNO for reference network")
- self.attenuators[self.connect_network["attenuator"]].set_atten(90)
- elif snumber == 4:
- self.log.info("Bring back device for PNO connection")
- current_network = self.dut.droid.wifiGetConnectionInfo()
- self.log.info("Current network: {}".format(current_network))
- asserts.assert_true('network_id' in current_network, NETWORK_ID_ERROR)
- asserts.assert_true(current_network['network_id'] == -1,
- "Device is still connected to network {}".
- format(current_network[WifiEnums.SSID_KEY]))
- self.attenuators[self.connect_network["attenuator"]].set_atten(0)
- time.sleep(10) #wait for connection to take place before waiting for scan result
- elif snumber == 6:
- self.log.info("Check connection through PNO for reference network")
- current_network = self.dut.droid.wifiGetConnectionInfo()
- self.log.info("Current network: {}".format(current_network))
- asserts.assert_true('network_id' in current_network, NETWORK_ID_ERROR)
- asserts.assert_true(current_network['network_id'] >= 0, NETWORK_ERROR)
- time.sleep(10) #wait for connection to be active
- asserts.assert_true(check_internet_connection(self.dut, self.ping_addr),
- "Error, No internet connection for current network")
- wifi_forget_network(self.dut, self.connect_network["ssid"])
- except Empty as error:
- raise AssertionError("Event did not triggered for batch scan {}".
- format(error))
+ for snumber in range(1, 7):
+ event_name = "{}{}onResults".format(EVENT_TAG, idx)
+ self.log.info("Waiting for event: {}".format(event_name))
+ event = self.dut.ed.pop_event(event_name, wait_time)
+ self.log.debug("Event onResults: {}".format(event))
+ results = event["data"]["Results"]
+ bssids, validity = self.proces_and_valid_batch_scan_result(
+ results, scan_rt, event["data"][KEY_RET],
+ self.default_batch_scan_setting)
+ self.log.info(
+ "Scan number: {}\n Buckets: {}\n BSSID: {}".format(
+ snumber, len(results), bssids))
+ asserts.assert_true(bssids >= 1,
+ "Not able to fetch scan result")
+ if snumber == 1:
+ self.log.info(
+ "Try to connect AP while waiting for event: {}".format(
+ event_name))
+ asserts.assert_true(self.connect_to_reference_network(),
+ NETWORK_ERROR)
+ time.sleep(10) #wait for connection to be active
+ asserts.assert_true(
+ wutils.check_internet_connection(self.dut,
+ self.ping_addr),
+ "Error, No internet connection for current network")
+ elif snumber == 3:
+ self.log.info("Kicking PNO for reference network")
+ self.attenuators[self.connect_network[
+ "attenuator"]].set_atten(90)
+ elif snumber == 4:
+ self.log.info("Bring back device for PNO connection")
+ current_network = self.dut.droid.wifiGetConnectionInfo()
+ self.log.info("Current network: {}".format(
+ current_network))
+ asserts.assert_true('network_id' in current_network,
+ NETWORK_ID_ERROR)
+ asserts.assert_true(
+ current_network['network_id'] == -1,
+ "Device is still connected to network {}".format(
+ current_network[wutils.WifiEnums.SSID_KEY]))
+ self.attenuators[self.connect_network[
+ "attenuator"]].set_atten(0)
+ time.sleep(
+ 10
+ ) #wait for connection to take place before waiting for scan result
+ elif snumber == 6:
+ self.log.info(
+ "Check connection through PNO for reference network")
+ current_network = self.dut.droid.wifiGetConnectionInfo()
+ self.log.info("Current network: {}".format(
+ current_network))
+ asserts.assert_true('network_id' in current_network,
+ NETWORK_ID_ERROR)
+ asserts.assert_true(current_network['network_id'] >= 0,
+ NETWORK_ERROR)
+ time.sleep(10) #wait for connection to be active
+ asserts.assert_true(
+ wutils.check_internet_connection(self.dut,
+ self.ping_addr),
+ "Error, No internet connection for current network")
+ wutils.wifi_forget_network(self.dut,
+ self.connect_network["ssid"])
+ except queue.Empty as error:
+ raise AssertionError(
+ "Event did not triggered for batch scan {}".format(error))
finally:
self.dut.droid.wifiScannerStopBackgroundScan(idx)
self.dut.ed.clear_all_events()
@@ -983,9 +1000,10 @@
parameters.
2. Verify that scan results match with respective scan settings.
"""
- scan_setting = { "channels": self.wifi_chs.MIX_CHANNEL_SCAN,
- "periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN }
+ scan_setting = {"channels": self.wifi_chs.MIX_CHANNEL_SCAN,
+ "periodInMs": SCANTIME,
+ "reportEvents":
+ wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN}
self.wifi_scanner_single_scan(scan_setting)
def test_wifi_scanner_batch_scan_channel_sanity(self):
@@ -996,9 +1014,10 @@
parameters.
2. Verify that scan results match with respective scan settings.
"""
- scan_setting = { "channels": self.wifi_chs.MIX_CHANNEL_SCAN,
- "periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL}
+ scan_setting = {"channels": self.wifi_chs.MIX_CHANNEL_SCAN,
+ "periodInMs": SCANTIME,
+ "reportEvents":
+ wutils.WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL}
self.wifi_scanner_batch_scan(scan_setting)
def test_wifi_scanner_batch_scan_period_too_short(self):
@@ -1006,10 +1025,11 @@
1. Start WifiScanner batch scan for both band with interval period as 5s.
2. Verify that scan is not started."""
- scan_setting = { "band": WifiEnums.WIFI_BAND_BOTH_WITH_DFS,
- "periodInMs": 5000,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL}
- self.start_wifi_scanner_background_scan_expect_failure(scan_setting);
+ scan_setting = {"band": wutils.WifiEnums.WIFI_BAND_BOTH_WITH_DFS,
+ "periodInMs": 5000,
+ "reportEvents":
+ wutils.WifiEnums.REPORT_EVENT_AFTER_BUFFER_FULL}
+ self.start_wifi_scanner_background_scan_expect_failure(scan_setting)
def test_wifi_scanner_single_scan_in_isolated(self):
"""Test WiFi scanner in isolated environment with default scan settings.
@@ -1021,30 +1041,32 @@
"""
self.attenuators[0].set_atten(90)
self.attenuators[1].set_atten(90)
- data = start_wifi_single_scan(self.dut, self.default_scan_setting)
+ data = wutils.start_wifi_single_scan(self.dut,
+ self.default_scan_setting)
idx = data["Index"]
scan_rt = data["ScanElapsedRealtime"]
- self.log.info("Wifi single shot scan started with index: {}".format(idx))
+ self.log.info("Wifi single shot scan started with index: {}".format(
+ idx))
results = []
#generating event wait time from scan setting plus leeway
- scan_time, scan_channels = get_scan_time_and_channels(self.wifi_chs,
- self.default_scan_setting,
- self.stime_channel)
- wait_time = int(scan_time/1000) + self.leeway
+ scan_time, scan_channels = wutils.get_scan_time_and_channels(
+ self.wifi_chs, self.default_scan_setting, self.stime_channel)
+ wait_time = int(scan_time / 1000) + self.leeway
try:
event_name = "{}{}onResults".format(EVENT_TAG, idx)
- self.log.debug("Waiting for event: {} for time {}".format(event_name,
- wait_time))
+ self.log.debug("Waiting for event: {} for time {}".format(
+ event_name, wait_time))
event = self.dut.ed.pop_event(event_name, wait_time)
self.log.debug("Event received: {}".format(event))
results = event["data"]["Results"]
for batch in results:
- asserts.assert_true(not batch["ScanResults"],
- "Test fail because report scan "
- "results reported are not empty")
- except Empty as error:
- raise AssertionError("Event did not triggered for in isolated environment {}".
- format(error))
+ asserts.assert_true(not batch["ScanResults"],
+ "Test fail because report scan "
+ "results reported are not empty")
+ except queue.Empty as error:
+ raise AssertionError(
+ "Event did not triggered for in isolated environment {}".format(
+ error))
finally:
self.dut.ed.clear_all_events()
self.attenuators[0].set_atten(0)
@@ -1058,10 +1080,11 @@
3. Verify that scan is not started.
"""
self.log.debug("Make sure wifi is off.")
- wifi_toggle_state(self.dut, False)
- self.start_wifi_scanner_single_scan_expect_failure(self.default_scan_setting)
+ wutils.wifi_toggle_state(self.dut, False)
+ self.start_wifi_scanner_single_scan_expect_failure(
+ self.default_scan_setting)
self.log.debug("Turning wifi back on.")
- wifi_toggle_state(self.dut, True)
+ wutils.wifi_toggle_state(self.dut, True)
def test_wifi_scanner_with_invalid_numBssidsPerScan(self):
"""Test WiFi scanner single scan with invalid number of bssids reported
@@ -1073,10 +1096,11 @@
bssids per scan.
"""
scan_setting = {
- "band": WifiEnums.WIFI_BAND_BOTH_WITH_DFS,
+ "band": wutils.WifiEnums.WIFI_BAND_BOTH_WITH_DFS,
"periodInMs": SCANTIME,
- "reportEvents": WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
+ "reportEvents": wutils.WifiEnums.REPORT_EVENT_AFTER_EACH_SCAN,
'numBssidsPerScan': 33
}
self.wifi_scanner_single_scan(scan_setting)
+
""" Tests End """