blob: 58f938d85aee5e1ea20022721e26dec28dbe6008 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import csv
import itertools
import logging
import numpy
import os
from acts import asserts
from acts import context
from acts import base_test
from acts import utils
from acts.controllers import iperf_client
from acts.controllers.utils_lib import ssh
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts_contrib.test_utils.wifi import ota_chamber
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
from acts_contrib.test_utils.wifi import ota_sniffer
from functools import partial
from WifiRvrTest import WifiRvrTest
from WifiPingTest import WifiPingTest
class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
"""Class to test WiFi sensitivity tests.
This class implements measures WiFi sensitivity per rate. It heavily
leverages the WifiRvrTest class and introduced minor differences to set
specific rates and the access point, and implements a different pass/fail
check. For an example config file to run this test class see
example_connectivity_performance_ap_sta.json.
"""
MAX_CONSECUTIVE_ZEROS = 5
RSSI_POLL_INTERVAL = 0.2
VALID_TEST_CONFIGS = {
1: ['legacy', 'VHT20'],
2: ['legacy', 'VHT20'],
6: ['legacy', 'VHT20'],
10: ['legacy', 'VHT20'],
11: ['legacy', 'VHT20'],
36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
40: ['legacy', 'VHT20'],
44: ['legacy', 'VHT20'],
48: ['legacy', 'VHT20'],
149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
153: ['legacy', 'VHT20'],
157: ['legacy', 'VHT20'],
161: ['legacy', 'VHT20']
}
RateTuple = collections.namedtuple(('RateTuple'),
['mcs', 'streams', 'data_rate'])
#yapf:disable
VALID_RATES = {
'legacy_2GHz': [
RateTuple(54, 1, 54), RateTuple(48, 1, 48),
RateTuple(36, 1, 36), RateTuple(24, 1, 24),
RateTuple(18, 1, 18), RateTuple(12, 1, 12),
RateTuple(11, 1, 11), RateTuple(9, 1, 9),
RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
'legacy_5GHz': [
RateTuple(54, 1, 54), RateTuple(48, 1, 48),
RateTuple(36, 1, 36), RateTuple(24, 1, 24),
RateTuple(18, 1, 18), RateTuple(12, 1, 12),
RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
'HT20': [
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
'VHT20': [
RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
'VHT40': [
RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
'VHT80': [
RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
}
#yapf:enable
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
BlackboxMappedMetricLogger.for_test_case())
self.testclass_metric_logger = (
BlackboxMappedMetricLogger.for_test_class())
self.publish_testcase_metrics = True
def setup_class(self):
"""Initializes common test hardware and parameters.
This function initializes hardwares and compiles parameters that are
common to all tests in this class.
"""
self.dut = self.android_devices[-1]
self.sta_dut = self.android_devices[-1]
req_params = [
'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
'RemoteServer'
]
opt_params = ['main_network', 'OTASniffer']
self.unpack_userparams(req_params, opt_params)
self.testclass_params = self.sensitivity_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
self.ping_server = ssh.connection.SshConnection(
ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
if hasattr(self,
'OTASniffer') and self.testbed_params['sniffer_enable']:
try:
self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
except:
self.log.warning('Could not start sniffer. Disabling sniffs.')
self.testbed_params['sniffer_enable'] = 0
self.remote_server = self.ping_server
self.iperf_server = self.iperf_servers[0]
self.iperf_client = self.iperf_clients[0]
self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
self.log.info('Access Point Configuration: {}'.format(
self.access_point.ap_settings))
self.log_path = os.path.join(logging.log_path, 'results')
os.makedirs(self.log_path, exist_ok=True)
self.atten_dut_chain_map = {}
self.testclass_results = []
# Turn WiFi ON
if self.testclass_params.get('airplane_mode', 1):
self.log.info('Turning on airplane mode.')
asserts.assert_true(utils.force_airplane_mode(self.dut, True),
'Can not turn on airplane mode.')
wutils.wifi_toggle_state(self.dut, True)
# Configure test retries
self.user_params['retry_tests'] = [self.__class__.__name__]
def teardown_class(self):
self.access_point.teardown()
# Turn WiFi OFF
for dev in self.android_devices:
wutils.wifi_toggle_state(dev, False)
dev.go_to_sleep()
self.process_testclass_results()
def setup_test(self):
self.retry_flag = False
def teardown_test(self):
self.retry_flag = False
def on_retry(self):
"""Function to control test logic on retried tests.
This function is automatically executed on tests that are being
retried. In this case the function resets wifi, toggles it off and on
and sets a retry_flag to enable further tweaking the test logic on
second attempts.
"""
self.retry_flag = True
for dev in self.android_devices:
wutils.reset_wifi(dev)
wutils.toggle_wifi_off_and_on(dev)
def pass_fail_check(self, result):
"""Checks sensitivity results and decides on pass/fail.
Args:
result: dict containing attenuation, throughput and other meta
data
"""
result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
result['peak_throughput_pct'], result['sensitivity']))
if result['peak_throughput_pct'] < 95:
asserts.fail('Result unreliable. {}'.format(result_string))
else:
asserts.explicit_pass('Test Passed. {}'.format(result_string))
def plot_per_curves(self):
"""Plots PER curves to help debug sensitivity."""
plots = collections.OrderedDict()
id_fields = ['channel', 'mode', 'num_streams']
for result in self.testclass_results:
testcase_params = result['testcase_params']
plot_id = self.extract_test_id(testcase_params, id_fields)
plot_id = tuple(plot_id.items())
if plot_id not in plots:
plots[plot_id] = BokehFigure(
title='Channel {} {} Nss{}'.format(
result['testcase_params']['channel'],
result['testcase_params']['mode'],
result['testcase_params']['num_streams']),
x_label='Attenuation (dB)',
primary_y_label='PER (%)')
per = [stat['summary']['rx_per'] for stat in result['llstats']]
if len(per) < len(result['total_attenuation']):
per.extend([100] *
(len(result['total_attenuation']) - len(per)))
plots[plot_id].add_line(result['total_attenuation'], per,
result['test_name'])
figure_list = []
for plot_id, plot in plots.items():
plot.generate_figure()
figure_list.append(plot)
output_file_path = os.path.join(self.log_path, 'PER_curves.html')
BokehFigure.save_figures(figure_list, output_file_path)
def process_testclass_results(self):
"""Saves and plots test results from all executed test cases."""
# write json output
self.plot_per_curves()
testclass_results_dict = collections.OrderedDict()
id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
channels_tested = []
for result in self.testclass_results:
testcase_params = result['testcase_params']
test_id = self.extract_test_id(testcase_params, id_fields)
test_id = tuple(test_id.items())
if test_id not in testclass_results_dict:
testclass_results_dict[test_id] = collections.OrderedDict()
channel = testcase_params['channel']
if channel not in channels_tested:
channels_tested.append(channel)
if result['peak_throughput_pct'] >= 95:
testclass_results_dict[test_id][channel] = result[
'sensitivity']
else:
testclass_results_dict[test_id][channel] = ''
# calculate average metrics
metrics_dict = collections.OrderedDict()
id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
for test_id in testclass_results_dict.keys():
for channel in testclass_results_dict[test_id].keys():
metric_tag = collections.OrderedDict(test_id, channel=channel)
metric_tag = self.extract_test_id(metric_tag, id_fields)
metric_tag = tuple(metric_tag.items())
metrics_dict.setdefault(metric_tag, [])
sensitivity_result = testclass_results_dict[test_id][channel]
if sensitivity_result != '':
metrics_dict[metric_tag].append(sensitivity_result)
for metric_tag_tuple, metric_data in metrics_dict.items():
metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
metric_tag_dict['channel'], metric_tag_dict['mode'],
metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
metric_key = '{}.avg_sensitivity'.format(metric_tag)
metric_value = numpy.mean(metric_data)
self.testclass_metric_logger.add_metric(metric_key, metric_value)
# write csv
csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
for channel in channels_tested:
csv_header.append('Ch. ' + str(channel))
results_file_path = os.path.join(self.log_path, 'results.csv')
with open(results_file_path, mode='w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=csv_header)
writer.writeheader()
for test_id, test_results in testclass_results_dict.items():
test_id_dict = dict(test_id)
if 'legacy' in test_id_dict['mode']:
rate_list = self.VALID_RATES['legacy_2GHz']
else:
rate_list = self.VALID_RATES[test_id_dict['mode']]
data_rate = next(rate.data_rate for rate in rate_list
if rate[:-1] == (test_id_dict['rate'],
test_id_dict['num_streams']))
row_value = {
'Mode': test_id_dict['mode'],
'MCS': test_id_dict['rate'],
'Streams': test_id_dict['num_streams'],
'Chain': test_id_dict['chain_mask'],
'Rate (Mbps)': data_rate,
}
for channel in channels_tested:
row_value['Ch. ' + str(channel)] = test_results.pop(
channel, ' ')
writer.writerow(row_value)
if not self.testclass_params['traffic_type'].lower() == 'ping':
WifiRvrTest.process_testclass_results(self)
def process_rvr_test_results(self, testcase_params, rvr_result):
"""Post processes RvR results to compute sensitivity.
Takes in the results of the RvR tests and computes the sensitivity of
the current rate by looking at the point at which throughput drops
below the percentage specified in the config file. The function then
calls on its parent class process_test_results to plot the result.
Args:
rvr_result: dict containing attenuation, throughput and other meta
data
"""
rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
rvr_result['peak_throughput_pct'] = 100
throughput_check = [
throughput < rvr_result['peak_throughput'] *
(self.testclass_params['throughput_pct_at_sensitivity'] / 100)
for throughput in rvr_result['throughput_receive']
]
consistency_check = [
idx for idx in range(len(throughput_check))
if all(throughput_check[idx:])
]
rvr_result['atten_at_range'] = rvr_result['attenuation'][
consistency_check[0] - 1]
rvr_result['range'] = rvr_result['fixed_attenuation'] + (
rvr_result['atten_at_range'])
rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
self.testbed_params['ap_tx_power_offset'][str(
testcase_params['channel'])] - rvr_result['range'])
WifiRvrTest.process_test_results(self, rvr_result)
def process_ping_test_results(self, testcase_params, ping_result):
"""Post processes RvR results to compute sensitivity.
Takes in the results of the RvR tests and computes the sensitivity of
the current rate by looking at the point at which throughput drops
below the percentage specified in the config file. The function then
calls on its parent class process_test_results to plot the result.
Args:
rvr_result: dict containing attenuation, throughput and other meta
data
"""
WifiPingTest.process_ping_results(self, testcase_params, ping_result)
ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
self.testbed_params['ap_tx_power_offset'][str(
testcase_params['channel'])] - ping_result['range'])
def setup_ping_test(self, testcase_params):
"""Function that gets devices ready for the test.
Args:
testcase_params: dict containing test-specific parameters
"""
# Configure AP
self.setup_ap(testcase_params)
# Set attenuator to starting attenuation
for attenuator in self.attenuators:
attenuator.set_atten(testcase_params['atten_start'],
strict=False,
retry=True)
# Reset, configure, and connect DUT
self.setup_dut(testcase_params)
def setup_ap(self, testcase_params):
"""Sets up the AP and attenuator to compensate for AP chain imbalance.
Args:
testcase_params: dict containing AP and other test params
"""
band = self.access_point.band_lookup_by_channel(
testcase_params['channel'])
if '2G' in band:
frequency = wutils.WifiEnums.channel_2G_to_freq[
testcase_params['channel']]
else:
frequency = wutils.WifiEnums.channel_5G_to_freq[
testcase_params['channel']]
if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
self.access_point.set_region(self.testbed_params['DFS_region'])
else:
self.access_point.set_region(self.testbed_params['default_region'])
self.access_point.set_channel(band, testcase_params['channel'])
self.access_point.set_bandwidth(band, testcase_params['mode'])
self.access_point.set_power(band, testcase_params['ap_tx_power'])
self.access_point.set_rate(band, testcase_params['mode'],
testcase_params['num_streams'],
testcase_params['rate'],
testcase_params['short_gi'])
# Set attenuator offsets and set attenuators to initial condition
atten_offsets = self.testbed_params['chain_offset'][str(
testcase_params['channel'])]
for atten in self.attenuators:
if 'AP-Chain-0' in atten.path:
atten.offset = atten_offsets[0]
elif 'AP-Chain-1' in atten.path:
atten.offset = atten_offsets[1]
else:
atten.offset = 0
self.log.info('Access Point Configuration: {}'.format(
self.access_point.ap_settings))
def setup_dut(self, testcase_params):
"""Sets up the DUT in the configuration required by the test.
Args:
testcase_params: dict containing AP and other test params
"""
# Turn screen off to preserve battery
if self.testbed_params.get('screen_on',
False) or self.testclass_params.get(
'screen_on', False):
self.dut.droid.wakeLockAcquireDim()
else:
self.dut.go_to_sleep()
if wputils.validate_network(self.dut,
testcase_params['test_network']['SSID']):
self.log.info('Already connected to desired network')
else:
wutils.wifi_toggle_state(self.dut, False)
wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
wutils.wifi_toggle_state(self.dut, True)
wutils.reset_wifi(self.dut)
wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
testcase_params['test_network']['channel'] = testcase_params[
'channel']
wutils.wifi_connect(self.dut,
testcase_params['test_network'],
num_of_tries=5,
check_connectivity=False)
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
# Activate/attenuate the correct chains
if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
self.atten_dut_chain_map[testcase_params[
'channel']] = wputils.get_current_atten_dut_chain_map(
self.attenuators, self.dut, self.ping_server)
self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
self.atten_dut_chain_map[testcase_params['channel']]))
for idx, atten in enumerate(self.attenuators):
if self.atten_dut_chain_map[testcase_params['channel']][
idx] == testcase_params['attenuated_chain']:
atten.offset = atten.instrument.max_atten
def extract_test_id(self, testcase_params, id_fields):
test_id = collections.OrderedDict(
(param, testcase_params[param]) for param in id_fields)
return test_id
def get_start_atten(self, testcase_params):
"""Gets the starting attenuation for this sensitivity test.
The function gets the starting attenuation by checking whether a test
as the next higher MCS has been executed. If so it sets the starting
point a configurable number of dBs below the next MCS's sensitivity.
Returns:
start_atten: starting attenuation for current test
"""
# If the test is being retried, start from the beginning
if self.retry_flag:
self.log.info('Retry flag set. Setting attenuation to minimum.')
return self.testclass_params['atten_start']
# Get the current and reference test config. The reference test is the
# one performed at the current MCS+1
current_rate = testcase_params['rate']
ref_test_params = self.extract_test_id(
testcase_params,
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
if 'legacy' in testcase_params['mode']:
if testcase_params['channel'] <= 13:
rate_list = self.VALID_RATES['legacy_2GHz']
else:
rate_list = self.VALID_RATES['legacy_5GHz']
ref_index = max(
0,
rate_list.index(self.RateTuple(current_rate, 1, current_rate))
- 1)
ref_test_params['rate'] = rate_list[ref_index].mcs
else:
ref_test_params['rate'] = current_rate + 1
# Check if reference test has been run and set attenuation accordingly
previous_params = [
self.extract_test_id(
result['testcase_params'],
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
for result in self.testclass_results
]
try:
ref_index = previous_params.index(ref_test_params)
start_atten = self.testclass_results[ref_index][
'atten_at_range'] - (
self.testclass_params['adjacent_mcs_range_gap'])
except ValueError:
self.log.warning(
'Reference test not found. Starting from {} dB'.format(
self.testclass_params['atten_start']))
start_atten = self.testclass_params['atten_start']
start_atten = max(start_atten, 0)
return start_atten
def compile_test_params(self, testcase_params):
"""Function that generates test params based on the test name."""
# Check if test should be skipped.
wputils.check_skip_conditions(testcase_params, self.dut,
self.access_point,
getattr(self, 'ota_chamber', None))
band = self.access_point.band_lookup_by_channel(
testcase_params['channel'])
testcase_params['band'] = band
testcase_params['test_network'] = self.main_network[band]
if testcase_params['chain_mask'] in ['0', '1']:
testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
1 if testcase_params['chain_mask'] == '0' else 0)
else:
# Set attenuated chain to -1. Do not set to None as this will be
# compared to RF chain map which may include None
testcase_params['attenuated_chain'] = -1
self.testclass_params[
'range_ping_loss_threshold'] = 100 - self.testclass_params[
'throughput_pct_at_sensitivity']
if self.testclass_params['traffic_type'] == 'UDP':
testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
self.testclass_params['iperf_duration'],
self.testclass_params['UDP_rates'][testcase_params['mode']])
elif self.testclass_params['traffic_type'] == 'TCP':
testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
self.testclass_params['iperf_duration'])
if self.testclass_params['traffic_type'] != 'ping' and isinstance(
self.iperf_client, iperf_client.IPerfClientOverAdb):
testcase_params['iperf_args'] += ' -R'
testcase_params['use_client_output'] = True
else:
testcase_params['use_client_output'] = False
return testcase_params
def _test_sensitivity(self, testcase_params):
"""Function that gets called for each test case
The function gets called in each rvr test case. The function customizes
the rvr test based on the test name of the test that called it
"""
# Compile test parameters from config and test name
testcase_params = self.compile_test_params(testcase_params)
testcase_params.update(self.testclass_params)
testcase_params['atten_start'] = self.get_start_atten(testcase_params)
num_atten_steps = int(
(testcase_params['atten_stop'] - testcase_params['atten_start']) /
testcase_params['atten_step'])
testcase_params['atten_range'] = [
testcase_params['atten_start'] + x * testcase_params['atten_step']
for x in range(0, num_atten_steps)
]
# Prepare devices and run test
if testcase_params['traffic_type'].lower() == 'ping':
self.setup_ping_test(testcase_params)
result = self.run_ping_test(testcase_params)
self.process_ping_test_results(testcase_params, result)
else:
self.setup_rvr_test(testcase_params)
result = self.run_rvr_test(testcase_params)
self.process_rvr_test_results(testcase_params, result)
# Post-process results
self.testclass_results.append(result)
self.pass_fail_check(result)
def generate_test_cases(self, channels, modes, chain_mask):
"""Function that auto-generates test cases for a test class."""
test_cases = []
for channel in channels:
requested_modes = [
mode for mode in modes
if mode in self.VALID_TEST_CONFIGS[channel]
]
for mode in requested_modes:
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
if 'VHT' in mode:
rates = self.VALID_RATES[mode]
elif 'HT' in mode:
rates = self.VALID_RATES[mode]
elif 'legacy' in mode and channel < 14:
rates = self.VALID_RATES['legacy_2GHz']
elif 'legacy' in mode and channel > 14:
rates = self.VALID_RATES['legacy_5GHz']
else:
raise ValueError('Invalid test mode.')
for chain, rate in itertools.product(chain_mask, rates):
testcase_params = collections.OrderedDict(
channel=channel,
mode=mode,
bandwidth=bandwidth,
rate=rate.mcs,
num_streams=rate.streams,
short_gi=1,
chain_mask=chain)
if chain in ['0', '1'] and rate[1] == 2:
# Do not test 2-stream rates in single chain mode
continue
if 'legacy' in mode:
testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
'_ch{}'.format(
channel, mode,
str(rate.mcs).replace('.', 'p'),
rate.streams, chain))
else:
testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
'_ch{}'.format(
channel, mode, rate.mcs,
rate.streams, chain))
setattr(self, testcase_name,
partial(self._test_sensitivity, testcase_params))
test_cases.append(testcase_name)
return test_cases
class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases(
[6, 36, 40, 44, 48, 149, 153, 157, 161],
['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases([6, 36, 149],
['VHT20', 'VHT40', 'VHT80'],
['0', '1', '2x2'])
class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
['0', '1', '2x2'])
class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases(
[36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
['0', '1', '2x2'])
class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases([36, 40, 44, 48],
['VHT20', 'VHT40', 'VHT80'],
['0', '1', '2x2'])
class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases([149, 153, 157, 161],
['VHT20', 'VHT40', 'VHT80'],
['0', '1', '2x2'])
# Over-the air version of senstivity tests
class WifiOtaSensitivityTest(WifiSensitivityTest):
"""Class to test over-the-air senstivity.
This class implements measures WiFi sensitivity tests in an OTA chamber.
It allows setting orientation and other chamber parameters to study
performance in varying channel conditions
"""
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
BlackboxMappedMetricLogger.for_test_case())
self.testclass_metric_logger = (
BlackboxMappedMetricLogger.for_test_class())
self.publish_testcase_metrics = False
def setup_class(self):
WifiSensitivityTest.setup_class(self)
self.current_chain_mask = '2x2'
self.ota_chamber = ota_chamber.create(
self.user_params['OTAChamber'])[0]
def teardown_class(self):
WifiSensitivityTest.teardown_class(self)
self.ota_chamber.reset_chamber()
def setup_sensitivity_test(self, testcase_params):
# Setup turntable
self.ota_chamber.set_orientation(testcase_params['orientation'])
# Continue test setup
WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
def setup_dut(self, testcase_params):
"""Sets up the DUT in the configuration required by the test.
Args:
testcase_params: dict containing AP and other test params
"""
# Configure the right INI settings
wputils.set_chain_mask(self.dut, testcase_params['chain_mask'])
# Turn screen off to preserve battery
if self.testbed_params.get('screen_on',
False) or self.testclass_params.get(
'screen_on', False):
self.dut.droid.wakeLockAcquireDim()
else:
self.dut.go_to_sleep()
self.validate_and_connect(testcase_params)
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
def process_testclass_results(self):
"""Saves and plots test results from all executed test cases."""
self.plot_per_curves()
testclass_results_dict = collections.OrderedDict()
id_fields = ['channel', 'mode', 'rate']
plots = []
for result in self.testclass_results:
test_id = self.extract_test_id(result['testcase_params'],
id_fields)
test_id = tuple(test_id.items())
chain_mask = result['testcase_params']['chain_mask']
num_streams = result['testcase_params']['num_streams']
line_id = (chain_mask, num_streams)
if test_id not in testclass_results_dict:
testclass_results_dict[test_id] = collections.OrderedDict()
if line_id not in testclass_results_dict[test_id]:
testclass_results_dict[test_id][line_id] = {
'orientation': [],
'sensitivity': []
}
orientation = result['testcase_params']['orientation']
if result['peak_throughput_pct'] >= 95:
sensitivity = result['sensitivity']
else:
sensitivity = float('nan')
if orientation not in testclass_results_dict[test_id][line_id][
'orientation']:
testclass_results_dict[test_id][line_id]['orientation'].append(
orientation)
testclass_results_dict[test_id][line_id]['sensitivity'].append(
sensitivity)
else:
testclass_results_dict[test_id][line_id]['sensitivity'][
-1] = sensitivity
for test_id, test_data in testclass_results_dict.items():
test_id_dict = dict(test_id)
if 'legacy' in test_id_dict['mode']:
test_id_str = 'Channel {} - {} {}Mbps'.format(
test_id_dict['channel'], test_id_dict['mode'],
test_id_dict['rate'])
else:
test_id_str = 'Channel {} - {} MCS{}'.format(
test_id_dict['channel'], test_id_dict['mode'],
test_id_dict['rate'])
curr_plot = BokehFigure(title=str(test_id_str),
x_label='Orientation (deg)',
primary_y_label='Sensitivity (dBm)')
for line_id, line_results in test_data.items():
curr_plot.add_line(line_results['orientation'],
line_results['sensitivity'],
legend='Nss{} - Chain Mask {}'.format(
line_id[1], line_id[0]),
marker='circle')
if 'legacy' in test_id_dict['mode']:
metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
test_id_dict['channel'], test_id_dict['mode'],
test_id_dict['rate'], line_id[0])
else:
metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
test_id_dict['channel'], test_id_dict['mode'],
test_id_dict['rate'], line_id[1], line_id[0])
metric_name = metric_tag + '.avg_sensitivity'
metric_value = numpy.nanmean(line_results['sensitivity'])
self.testclass_metric_logger.add_metric(
metric_name, metric_value)
self.log.info(('Average Sensitivity for {}: {:.1f}').format(
metric_tag, metric_value))
current_context = (
context.get_current_context().get_full_output_path())
output_file_path = os.path.join(current_context,
str(test_id_str) + '.html')
curr_plot.generate_figure(output_file_path)
plots.append(curr_plot)
output_file_path = os.path.join(current_context, 'results.html')
BokehFigure.save_figures(plots, output_file_path)
def get_start_atten(self, testcase_params):
"""Gets the starting attenuation for this sensitivity test.
The function gets the starting attenuation by checking whether a test
at the same rate configuration has executed. If so it sets the starting
point a configurable number of dBs below the reference test.
Returns:
start_atten: starting attenuation for current test
"""
# If the test is being retried, start from the beginning
if self.retry_flag:
self.log.info('Retry flag set. Setting attenuation to minimum.')
return self.testclass_params['atten_start']
# Get the current and reference test config. The reference test is the
# one performed at the current MCS+1
ref_test_params = self.extract_test_id(
testcase_params,
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
# Check if reference test has been run and set attenuation accordingly
previous_params = [
self.extract_test_id(
result['testcase_params'],
['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
for result in self.testclass_results
]
try:
ref_index = previous_params[::-1].index(ref_test_params)
ref_index = len(previous_params) - 1 - ref_index
start_atten = self.testclass_results[ref_index][
'atten_at_range'] - (
self.testclass_params['adjacent_mcs_range_gap'])
except ValueError:
print('Reference test not found. Starting from {} dB'.format(
self.testclass_params['atten_start']))
start_atten = self.testclass_params['atten_start']
start_atten = max(start_atten, 0)
return start_atten
def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
angles):
"""Function that auto-generates test cases for a test class."""
test_cases = []
for channel in channels:
requested_modes = [
mode for mode in modes
if mode in self.VALID_TEST_CONFIGS[channel]
]
for chain, mode in itertools.product(chain_mask, requested_modes):
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
if 'VHT' in mode:
valid_rates = self.VALID_RATES[mode]
elif 'HT' in mode:
valid_rates = self.VALID_RATES[mode]
elif 'legacy' in mode and channel < 14:
valid_rates = self.VALID_RATES['legacy_2GHz']
elif 'legacy' in mode and channel > 14:
valid_rates = self.VALID_RATES['legacy_5GHz']
else:
raise ValueError('Invalid test mode.')
for rate, angle in itertools.product(valid_rates, angles):
testcase_params = collections.OrderedDict(
channel=channel,
mode=mode,
bandwidth=bandwidth,
rate=rate.mcs,
num_streams=rate.streams,
short_gi=1,
chain_mask=chain,
orientation=angle)
if rate not in requested_rates:
continue
if str(chain) in ['0', '1'] and rate[1] == 2:
# Do not test 2-stream rates in single chain mode
continue
if 'legacy' in mode:
testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
'_ch{}_{}deg'.format(
channel, mode,
str(rate.mcs).replace('.', 'p'),
rate.streams, chain, angle))
else:
testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
'_ch{}_{}deg'.format(
channel, mode, rate.mcs,
rate.streams, chain, angle))
setattr(self, testcase_name,
partial(self._test_sensitivity, testcase_params))
test_cases.append(testcase_name)
return test_cases
class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
def __init__(self, controllers):
WifiOtaSensitivityTest.__init__(self, controllers)
requested_channels = [6, 36, 149]
requested_rates = [
self.RateTuple(8, 1, 86.7),
self.RateTuple(6, 1, 65),
self.RateTuple(2, 1, 21.7),
self.RateTuple(8, 2, 173.3),
self.RateTuple(6, 2, 130.3),
self.RateTuple(2, 2, 43.3)
]
self.tests = self.generate_test_cases(requested_channels,
['VHT20', 'VHT80'],
requested_rates, ['2x2'],
list(range(0, 360, 10)))
class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
def __init__(self, controllers):
WifiOtaSensitivityTest.__init__(self, controllers)
requested_channels = [6, 36, 149]
requested_rates = [
self.RateTuple(9, 1, 96),
self.RateTuple(9, 2, 192),
self.RateTuple(6, 1, 65),
self.RateTuple(6, 2, 130.3),
self.RateTuple(2, 1, 21.7),
self.RateTuple(2, 2, 43.3)
]
self.tests = self.generate_test_cases(requested_channels,
['VHT20', 'VHT80'],
requested_rates, [0, 1, '2x2'],
list(range(0, 360, 10)))
class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
def __init__(self, controllers):
WifiOtaSensitivityTest.__init__(self, controllers)
requested_channels = [6, 36, 149]
requested_rates = [
self.RateTuple(9, 1, 96),
self.RateTuple(8, 1, 86.7),
self.RateTuple(7, 1, 72.2),
self.RateTuple(4, 1, 43.3),
self.RateTuple(2, 1, 21.7),
self.RateTuple(0, 1, 7.2),
self.RateTuple(9, 2, 192),
self.RateTuple(8, 2, 173.3),
self.RateTuple(7, 2, 144.4),
self.RateTuple(4, 2, 86.7),
self.RateTuple(2, 2, 43.3),
self.RateTuple(0, 2, 14.4)
]
self.tests = self.generate_test_cases(requested_channels,
['VHT20', 'VHT80'],
requested_rates, ['2x2'],
list(range(0, 360, 30)))
class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
def __init__(self, controllers):
WifiOtaSensitivityTest.__init__(self, controllers)
requested_rates = [
self.RateTuple(8, 1, 86.7),
self.RateTuple(2, 1, 21.7),
self.RateTuple(8, 2, 173.3),
self.RateTuple(2, 2, 43.3)
]
self.tests = self.generate_test_cases(
[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
requested_rates, ['2x2'], list(range(0, 360, 45)))