Merge "[AP_LIB]Correcting variable names"
diff --git a/acts/tests/google/wifi/WifiRssiTest.py b/acts/tests/google/wifi/WifiRssiTest.py
index 1ea75b5..d53e49b 100644
--- a/acts/tests/google/wifi/WifiRssiTest.py
+++ b/acts/tests/google/wifi/WifiRssiTest.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import json
import logging
import math
@@ -55,9 +56,11 @@
def setup_class(self):
self.dut = self.android_devices[0]
- req_params = ["rssi_test_params", "testbed_params", "main_network"]
- opt_params = ["RetailAccessPoints"]
- self.unpack_userparams(req_params, opt_params)
+ req_params = [
+ "RetailAccessPoints", "rssi_test_params", "testbed_params",
+ "main_network"
+ ]
+ self.unpack_userparams(req_params)
self.test_params = self.rssi_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
self.iperf_server = self.iperf_servers[0]
@@ -148,20 +151,20 @@
) or math.isnan(avg_error)
if rssi_failure and key in rssi_under_test:
test_message = test_message + (
- "{} failed. Average {} error is {:.2f} dB. "
- "Average shift is {:.2f} dB.\n").format(
- key, error_type, avg_error, avg_shift)
+ "{} failed ({} error = {:.2f} dB, "
+ "shift = {:.2f} dB)\n").format(key, error_type,
+ avg_error, avg_shift)
test_failed = True
elif rssi_failure:
test_message = test_message + (
- "{} failed (ignored). Average {} error is {:.2f} dB. "
- "Average shift is {:.2f} dB.\n").format(
- key, error_type, avg_error, avg_shift)
+ "{} failed (ignored) ({} error = {:.2f} dB, "
+ "shift = {:.2f} dB)\n").format(key, error_type,
+ avg_error, avg_shift)
else:
test_message = test_message + (
- "{} passed. Average {} error is {:.2f} dB. "
- "Average shift is {:.2f} dB.\n").format(
- key, error_type, avg_error, avg_shift)
+ "{} passed ({} error = {:.2f} dB, "
+ "shift = {:.2f} dB)\n").format(key, error_type,
+ avg_error, avg_shift)
if test_failed:
asserts.fail(test_message)
@@ -183,15 +186,16 @@
with open(results_file_path, 'w') as results_file:
json.dump(rssi_result, results_file, indent=4)
# Compile results into arrays of RSSIs suitable for plotting
- postprocessed_results = {
- "signal_poll_rssi": {},
- "signal_poll_avg_rssi": {},
- "scan_rssi": {},
- "chain_0_rssi": {},
- "chain_1_rssi": {},
- "total_attenuation": [],
- "predicted_rssi": []
- }
+ # yapf: disable
+ postprocessed_results = collections.OrderedDict(
+ [("signal_poll_rssi", {}),
+ ("signal_poll_avg_rssi", {}),
+ ("scan_rssi", {}),
+ ("chain_0_rssi", {}),
+ ("chain_1_rssi", {}),
+ ("total_attenuation", []),
+ ("predicted_rssi", [])])
+ # yapf: enable
for key, val in postprocessed_results.items():
if "scan_rssi" in key:
postprocessed_results[key]["data"] = [
@@ -299,14 +303,15 @@
x_data = []
y_data = []
legends = []
- rssi_time_series = {
- "signal_poll_rssi": [],
- "signal_poll_avg_rssi": [],
- "scan_rssi": [],
- "chain_0_rssi": [],
- "chain_1_rssi": [],
- "predicted_rssi": []
- }
+ # yapf: disable
+ rssi_time_series = collections.OrderedDict(
+ [("signal_poll_rssi", []),
+ ("signal_poll_avg_rssi", []),
+ ("scan_rssi", []),
+ ("chain_0_rssi", []),
+ ("chain_1_rssi", []),
+ ("predicted_rssi", [])])
+ # yapf: enable
for key, val in rssi_time_series.items():
if "predicted_rssi" in key:
rssi_time_series[key] = [
@@ -355,6 +360,11 @@
shaded_region=None,
output_file_path=output_file_path)
+ @staticmethod
+ def empty_rssi_result():
+ return collections.OrderedDict([("data", []), ("mean", None), ("stdev",
+ None)])
+
def get_scan_rssi(self, tracked_bssids, num_measurements=1):
"""Gets scan RSSI for specified BSSIDs.
@@ -365,9 +375,9 @@
scan_rssi: dict containing the measurement results as well as the
statistics of the scan RSSI for all BSSIDs in tracked_bssids
"""
- scan_rssi = {}
+ scan_rssi = collections.OrderedDict()
for bssid in tracked_bssids:
- scan_rssi[bssid] = {"data": [], "mean": None, "stdev": None}
+ scan_rssi[bssid] = self.empty_rssi_result()
for idx in range(num_measurements):
scan_output = self.dut.adb.shell(SCAN)
time.sleep(MED_SLEEP)
@@ -411,28 +421,13 @@
all reported RSSI values (signal_poll, per chain, etc.) and their
statistics
"""
- connected_rssi = {
- "signal_poll_rssi": {
- "data": [],
- "mean": None,
- "stdev": None
- },
- "signal_poll_avg_rssi": {
- "data": [],
- "mean": None,
- "stdev": None
- },
- "chain_0_rssi": {
- "data": [],
- "mean": None,
- "stdev": None
- },
- "chain_1_rssi": {
- "data": [],
- "mean": None,
- "stdev": None
- }
- }
+ # yapf: disable
+ connected_rssi = collections.OrderedDict(
+ [("signal_poll_rssi", self.empty_rssi_result()),
+ ("signal_poll_avg_rssi", self.empty_rssi_result()),
+ ("chain_0_rssi", self.empty_rssi_result()),
+ ("chain_1_rssi", self.empty_rssi_result())])
+ # yapf: enable
for idx in range(num_measurements):
measurement_start_time = time.time()
# Get signal poll RSSI
@@ -525,12 +520,10 @@
for atten in self.rssi_atten_range:
# Set Attenuation
self.log.info("Setting attenuation to {} dB".format(atten))
- [
- self.attenuators[i].set_atten(atten)
- for i in range(self.num_atten)
- ]
+ for attenuator in self.attenuators:
+ attenuator.set_atten(atten)
time.sleep(first_measurement_delay)
- current_rssi = {}
+ current_rssi = collections.OrderedDict()
current_rssi = self.get_connected_rssi(connected_measurements,
polling_frequency)
current_rssi["scan_rssi"] = self.get_scan_rssi(
@@ -542,7 +535,8 @@
if self.iperf_traffic:
self.iperf_server.stop()
self.dut.adb.shell("pkill iperf3")
- [self.attenuators[i].set_atten(0) for i in range(self.num_atten)]
+ for attenuator in self.attenuators:
+ attenuator.set_atten(0)
return rssi_result
def rssi_test_func(self, iperf_traffic, connected_measurements,
@@ -558,7 +552,7 @@
rssi_result: dict containing rssi_results and meta data
"""
#Initialize test settings
- rssi_result = {}
+ rssi_result = collections.OrderedDict()
# Configure AP
band = self.access_point.band_lookup_by_channel(self.channel)
if "2G" in band:
@@ -574,10 +568,8 @@
self.log.info("Access Point Configuration: {}".format(
self.access_point.ap_settings))
# Set attenuator to starting attenuation
- [
- self.attenuators[i].set_atten(self.rssi_atten_range[0])
- for i in range(self.num_atten)
- ]
+ for attenuator in self.attenuators:
+ attenuator.set_atten(self.rssi_atten_range[0])
# Connect DUT to Network
wutils.wifi_toggle_state(self.dut, True)
wutils.reset_wifi(self.dut)