| #!/usr/bin/env python3.4 |
| # |
| # Copyright 2017 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import collections |
| import csv |
| import itertools |
| import logging |
| import numpy |
| import os |
| from acts import asserts |
| from acts import context |
| from acts import base_test |
| from acts import utils |
| from acts.controllers import iperf_client |
| from acts.controllers.utils_lib import ssh |
| from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger |
| from acts_contrib.test_utils.wifi import ota_chamber |
| from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils |
| from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure |
| from acts_contrib.test_utils.wifi import wifi_test_utils as wutils |
| from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap |
| from acts_contrib.test_utils.wifi import ota_sniffer |
| from functools import partial |
| from WifiRvrTest import WifiRvrTest |
| from WifiPingTest import WifiPingTest |
| |
| |
| class WifiSensitivityTest(WifiRvrTest, WifiPingTest): |
| """Class to test WiFi sensitivity tests. |
| |
| This class implements measures WiFi sensitivity per rate. It heavily |
| leverages the WifiRvrTest class and introduced minor differences to set |
| specific rates and the access point, and implements a different pass/fail |
| check. For an example config file to run this test class see |
| example_connectivity_performance_ap_sta.json. |
| """ |
| |
| MAX_CONSECUTIVE_ZEROS = 5 |
| RSSI_POLL_INTERVAL = 0.2 |
| VALID_TEST_CONFIGS = { |
| 1: ['legacy', 'VHT20'], |
| 2: ['legacy', 'VHT20'], |
| 6: ['legacy', 'VHT20'], |
| 10: ['legacy', 'VHT20'], |
| 11: ['legacy', 'VHT20'], |
| 36: ['legacy', 'VHT20', 'VHT40', 'VHT80'], |
| 40: ['legacy', 'VHT20'], |
| 44: ['legacy', 'VHT20'], |
| 48: ['legacy', 'VHT20'], |
| 149: ['legacy', 'VHT20', 'VHT40', 'VHT80'], |
| 153: ['legacy', 'VHT20'], |
| 157: ['legacy', 'VHT20'], |
| 161: ['legacy', 'VHT20'] |
| } |
| RateTuple = collections.namedtuple(('RateTuple'), |
| ['mcs', 'streams', 'data_rate']) |
| #yapf:disable |
| VALID_RATES = { |
| 'legacy_2GHz': [ |
| RateTuple(54, 1, 54), RateTuple(48, 1, 48), |
| RateTuple(36, 1, 36), RateTuple(24, 1, 24), |
| RateTuple(18, 1, 18), RateTuple(12, 1, 12), |
| RateTuple(11, 1, 11), RateTuple(9, 1, 9), |
| RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5), |
| RateTuple(2, 1, 2), RateTuple(1, 1, 1)], |
| 'legacy_5GHz': [ |
| RateTuple(54, 1, 54), RateTuple(48, 1, 48), |
| RateTuple(36, 1, 36), RateTuple(24, 1, 24), |
| RateTuple(18, 1, 18), RateTuple(12, 1, 12), |
| RateTuple(9, 1, 9), RateTuple(6, 1, 6)], |
| 'HT20': [ |
| RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), |
| RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), |
| RateTuple(3, 1, 26), RateTuple(2, 1, 21.7), |
| RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), |
| RateTuple(15, 2, 144.4), RateTuple(14, 2, 130), |
| RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7), |
| RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4), |
| RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)], |
| 'VHT20': [ |
| RateTuple(9, 1, 96), RateTuple(8, 1, 86.7), |
| RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), |
| RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), |
| RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7), |
| RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), |
| RateTuple(9, 2, 192), RateTuple(8, 2, 173.3), |
| RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3), |
| RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7), |
| RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3), |
| RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)], |
| 'VHT40': [ |
| RateTuple(9, 1, 96), RateTuple(8, 1, 86.7), |
| RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), |
| RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), |
| RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7), |
| RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), |
| RateTuple(9, 2, 192), RateTuple(8, 2, 173.3), |
| RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3), |
| RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7), |
| RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3), |
| RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)], |
| 'VHT80': [ |
| RateTuple(9, 1, 96), RateTuple(8, 1, 86.7), |
| RateTuple(7, 1, 72.2), RateTuple(6, 1, 65), |
| RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3), |
| RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7), |
| RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2), |
| RateTuple(9, 2, 192), RateTuple(8, 2, 173.3), |
| RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3), |
| RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7), |
| RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3), |
| RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)], |
| } |
| #yapf:enable |
| |
| def __init__(self, controllers): |
| base_test.BaseTestClass.__init__(self, controllers) |
| self.testcase_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_case()) |
| self.testclass_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_class()) |
| self.publish_testcase_metrics = True |
| |
| def setup_class(self): |
| """Initializes common test hardware and parameters. |
| |
| This function initializes hardwares and compiles parameters that are |
| common to all tests in this class. |
| """ |
| self.dut = self.android_devices[-1] |
| self.sta_dut = self.android_devices[-1] |
| req_params = [ |
| 'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params', |
| 'RemoteServer' |
| ] |
| opt_params = ['main_network', 'OTASniffer'] |
| self.unpack_userparams(req_params, opt_params) |
| self.testclass_params = self.sensitivity_test_params |
| self.num_atten = self.attenuators[0].instrument.num_atten |
| self.ping_server = ssh.connection.SshConnection( |
| ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) |
| if hasattr(self, |
| 'OTASniffer') and self.testbed_params['sniffer_enable']: |
| try: |
| self.sniffer = ota_sniffer.create(self.OTASniffer)[0] |
| except: |
| self.log.warning('Could not start sniffer. Disabling sniffs.') |
| self.testbed_params['sniffer_enable'] = 0 |
| self.remote_server = self.ping_server |
| self.iperf_server = self.iperf_servers[0] |
| self.iperf_client = self.iperf_clients[0] |
| self.access_point = retail_ap.create(self.RetailAccessPoints)[0] |
| self.log.info('Access Point Configuration: {}'.format( |
| self.access_point.ap_settings)) |
| self.log_path = os.path.join(logging.log_path, 'results') |
| os.makedirs(self.log_path, exist_ok=True) |
| self.atten_dut_chain_map = {} |
| self.testclass_results = [] |
| |
| # Turn WiFi ON |
| if self.testclass_params.get('airplane_mode', 1): |
| self.log.info('Turning on airplane mode.') |
| asserts.assert_true(utils.force_airplane_mode(self.dut, True), |
| 'Can not turn on airplane mode.') |
| wutils.wifi_toggle_state(self.dut, True) |
| |
| # Configure test retries |
| self.user_params['retry_tests'] = [self.__class__.__name__] |
| |
| def teardown_class(self): |
| self.access_point.teardown() |
| # Turn WiFi OFF |
| for dev in self.android_devices: |
| wutils.wifi_toggle_state(dev, False) |
| dev.go_to_sleep() |
| self.process_testclass_results() |
| |
| def setup_test(self): |
| self.retry_flag = False |
| |
| def teardown_test(self): |
| self.retry_flag = False |
| |
| def on_retry(self): |
| """Function to control test logic on retried tests. |
| |
| This function is automatically executed on tests that are being |
| retried. In this case the function resets wifi, toggles it off and on |
| and sets a retry_flag to enable further tweaking the test logic on |
| second attempts. |
| """ |
| self.retry_flag = True |
| for dev in self.android_devices: |
| wutils.reset_wifi(dev) |
| wutils.toggle_wifi_off_and_on(dev) |
| |
| def pass_fail_check(self, result): |
| """Checks sensitivity results and decides on pass/fail. |
| |
| Args: |
| result: dict containing attenuation, throughput and other meta |
| data |
| """ |
| result_string = ('Throughput = {}%, Sensitivity = {}.'.format( |
| result['peak_throughput_pct'], result['sensitivity'])) |
| if result['peak_throughput_pct'] < 95: |
| asserts.fail('Result unreliable. {}'.format(result_string)) |
| else: |
| asserts.explicit_pass('Test Passed. {}'.format(result_string)) |
| |
| def plot_per_curves(self): |
| """Plots PER curves to help debug sensitivity.""" |
| |
| plots = collections.OrderedDict() |
| id_fields = ['channel', 'mode', 'num_streams'] |
| for result in self.testclass_results: |
| testcase_params = result['testcase_params'] |
| plot_id = self.extract_test_id(testcase_params, id_fields) |
| plot_id = tuple(plot_id.items()) |
| if plot_id not in plots: |
| plots[plot_id] = BokehFigure( |
| title='Channel {} {} Nss{}'.format( |
| result['testcase_params']['channel'], |
| result['testcase_params']['mode'], |
| result['testcase_params']['num_streams']), |
| x_label='Attenuation (dB)', |
| primary_y_label='PER (%)') |
| per = [stat['summary']['rx_per'] for stat in result['llstats']] |
| if len(per) < len(result['total_attenuation']): |
| per.extend([100] * |
| (len(result['total_attenuation']) - len(per))) |
| plots[plot_id].add_line(result['total_attenuation'], per, |
| result['test_name']) |
| figure_list = [] |
| for plot_id, plot in plots.items(): |
| plot.generate_figure() |
| figure_list.append(plot) |
| output_file_path = os.path.join(self.log_path, 'PER_curves.html') |
| BokehFigure.save_figures(figure_list, output_file_path) |
| |
| def process_testclass_results(self): |
| """Saves and plots test results from all executed test cases.""" |
| # write json output |
| self.plot_per_curves() |
| testclass_results_dict = collections.OrderedDict() |
| id_fields = ['mode', 'rate', 'num_streams', 'chain_mask'] |
| channels_tested = [] |
| for result in self.testclass_results: |
| testcase_params = result['testcase_params'] |
| test_id = self.extract_test_id(testcase_params, id_fields) |
| test_id = tuple(test_id.items()) |
| if test_id not in testclass_results_dict: |
| testclass_results_dict[test_id] = collections.OrderedDict() |
| channel = testcase_params['channel'] |
| if channel not in channels_tested: |
| channels_tested.append(channel) |
| if result['peak_throughput_pct'] >= 95: |
| testclass_results_dict[test_id][channel] = result[ |
| 'sensitivity'] |
| else: |
| testclass_results_dict[test_id][channel] = '' |
| |
| # calculate average metrics |
| metrics_dict = collections.OrderedDict() |
| id_fields = ['channel', 'mode', 'num_streams', 'chain_mask'] |
| for test_id in testclass_results_dict.keys(): |
| for channel in testclass_results_dict[test_id].keys(): |
| metric_tag = collections.OrderedDict(test_id, channel=channel) |
| metric_tag = self.extract_test_id(metric_tag, id_fields) |
| metric_tag = tuple(metric_tag.items()) |
| metrics_dict.setdefault(metric_tag, []) |
| sensitivity_result = testclass_results_dict[test_id][channel] |
| if sensitivity_result != '': |
| metrics_dict[metric_tag].append(sensitivity_result) |
| for metric_tag_tuple, metric_data in metrics_dict.items(): |
| metric_tag_dict = collections.OrderedDict(metric_tag_tuple) |
| metric_tag = 'ch{}_{}_nss{}_chain{}'.format( |
| metric_tag_dict['channel'], metric_tag_dict['mode'], |
| metric_tag_dict['num_streams'], metric_tag_dict['chain_mask']) |
| metric_key = '{}.avg_sensitivity'.format(metric_tag) |
| metric_value = numpy.mean(metric_data) |
| self.testclass_metric_logger.add_metric(metric_key, metric_value) |
| |
| # write csv |
| csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)'] |
| for channel in channels_tested: |
| csv_header.append('Ch. ' + str(channel)) |
| results_file_path = os.path.join(self.log_path, 'results.csv') |
| with open(results_file_path, mode='w') as csv_file: |
| writer = csv.DictWriter(csv_file, fieldnames=csv_header) |
| writer.writeheader() |
| for test_id, test_results in testclass_results_dict.items(): |
| test_id_dict = dict(test_id) |
| if 'legacy' in test_id_dict['mode']: |
| rate_list = self.VALID_RATES['legacy_2GHz'] |
| else: |
| rate_list = self.VALID_RATES[test_id_dict['mode']] |
| data_rate = next(rate.data_rate for rate in rate_list |
| if rate[:-1] == (test_id_dict['rate'], |
| test_id_dict['num_streams'])) |
| row_value = { |
| 'Mode': test_id_dict['mode'], |
| 'MCS': test_id_dict['rate'], |
| 'Streams': test_id_dict['num_streams'], |
| 'Chain': test_id_dict['chain_mask'], |
| 'Rate (Mbps)': data_rate, |
| } |
| for channel in channels_tested: |
| row_value['Ch. ' + str(channel)] = test_results.pop( |
| channel, ' ') |
| writer.writerow(row_value) |
| |
| if not self.testclass_params['traffic_type'].lower() == 'ping': |
| WifiRvrTest.process_testclass_results(self) |
| |
| def process_rvr_test_results(self, testcase_params, rvr_result): |
| """Post processes RvR results to compute sensitivity. |
| |
| Takes in the results of the RvR tests and computes the sensitivity of |
| the current rate by looking at the point at which throughput drops |
| below the percentage specified in the config file. The function then |
| calls on its parent class process_test_results to plot the result. |
| |
| Args: |
| rvr_result: dict containing attenuation, throughput and other meta |
| data |
| """ |
| rvr_result['peak_throughput'] = max(rvr_result['throughput_receive']) |
| rvr_result['peak_throughput_pct'] = 100 |
| throughput_check = [ |
| throughput < rvr_result['peak_throughput'] * |
| (self.testclass_params['throughput_pct_at_sensitivity'] / 100) |
| for throughput in rvr_result['throughput_receive'] |
| ] |
| consistency_check = [ |
| idx for idx in range(len(throughput_check)) |
| if all(throughput_check[idx:]) |
| ] |
| rvr_result['atten_at_range'] = rvr_result['attenuation'][ |
| consistency_check[0] - 1] |
| rvr_result['range'] = rvr_result['fixed_attenuation'] + ( |
| rvr_result['atten_at_range']) |
| rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + ( |
| self.testbed_params['ap_tx_power_offset'][str( |
| testcase_params['channel'])] - rvr_result['range']) |
| WifiRvrTest.process_test_results(self, rvr_result) |
| |
| def process_ping_test_results(self, testcase_params, ping_result): |
| """Post processes RvR results to compute sensitivity. |
| |
| Takes in the results of the RvR tests and computes the sensitivity of |
| the current rate by looking at the point at which throughput drops |
| below the percentage specified in the config file. The function then |
| calls on its parent class process_test_results to plot the result. |
| |
| Args: |
| rvr_result: dict containing attenuation, throughput and other meta |
| data |
| """ |
| WifiPingTest.process_ping_results(self, testcase_params, ping_result) |
| ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + ( |
| self.testbed_params['ap_tx_power_offset'][str( |
| testcase_params['channel'])] - ping_result['range']) |
| |
| def setup_ping_test(self, testcase_params): |
| """Function that gets devices ready for the test. |
| |
| Args: |
| testcase_params: dict containing test-specific parameters |
| """ |
| # Configure AP |
| self.setup_ap(testcase_params) |
| # Set attenuator to starting attenuation |
| for attenuator in self.attenuators: |
| attenuator.set_atten(testcase_params['atten_start'], |
| strict=False, |
| retry=True) |
| # Reset, configure, and connect DUT |
| self.setup_dut(testcase_params) |
| |
| def setup_ap(self, testcase_params): |
| """Sets up the AP and attenuator to compensate for AP chain imbalance. |
| |
| Args: |
| testcase_params: dict containing AP and other test params |
| """ |
| band = self.access_point.band_lookup_by_channel( |
| testcase_params['channel']) |
| if '2G' in band: |
| frequency = wutils.WifiEnums.channel_2G_to_freq[ |
| testcase_params['channel']] |
| else: |
| frequency = wutils.WifiEnums.channel_5G_to_freq[ |
| testcase_params['channel']] |
| if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: |
| self.access_point.set_region(self.testbed_params['DFS_region']) |
| else: |
| self.access_point.set_region(self.testbed_params['default_region']) |
| self.access_point.set_channel(band, testcase_params['channel']) |
| self.access_point.set_bandwidth(band, testcase_params['mode']) |
| self.access_point.set_power(band, testcase_params['ap_tx_power']) |
| self.access_point.set_rate(band, testcase_params['mode'], |
| testcase_params['num_streams'], |
| testcase_params['rate'], |
| testcase_params['short_gi']) |
| # Set attenuator offsets and set attenuators to initial condition |
| atten_offsets = self.testbed_params['chain_offset'][str( |
| testcase_params['channel'])] |
| for atten in self.attenuators: |
| if 'AP-Chain-0' in atten.path: |
| atten.offset = atten_offsets[0] |
| elif 'AP-Chain-1' in atten.path: |
| atten.offset = atten_offsets[1] |
| else: |
| atten.offset = 0 |
| self.log.info('Access Point Configuration: {}'.format( |
| self.access_point.ap_settings)) |
| |
| def setup_dut(self, testcase_params): |
| """Sets up the DUT in the configuration required by the test. |
| |
| Args: |
| testcase_params: dict containing AP and other test params |
| """ |
| # Turn screen off to preserve battery |
| if self.testbed_params.get('screen_on', |
| False) or self.testclass_params.get( |
| 'screen_on', False): |
| self.dut.droid.wakeLockAcquireDim() |
| else: |
| self.dut.go_to_sleep() |
| if wputils.validate_network(self.dut, |
| testcase_params['test_network']['SSID']): |
| self.log.info('Already connected to desired network') |
| else: |
| wutils.wifi_toggle_state(self.dut, False) |
| wutils.set_wifi_country_code(self.dut, |
| self.testclass_params['country_code']) |
| wutils.wifi_toggle_state(self.dut, True) |
| wutils.reset_wifi(self.dut) |
| wutils.set_wifi_country_code(self.dut, |
| self.testclass_params['country_code']) |
| testcase_params['test_network']['channel'] = testcase_params[ |
| 'channel'] |
| wutils.wifi_connect(self.dut, |
| testcase_params['test_network'], |
| num_of_tries=5, |
| check_connectivity=False) |
| self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] |
| # Activate/attenuate the correct chains |
| if testcase_params['channel'] not in self.atten_dut_chain_map.keys(): |
| self.atten_dut_chain_map[testcase_params[ |
| 'channel']] = wputils.get_current_atten_dut_chain_map( |
| self.attenuators, self.dut, self.ping_server) |
| self.log.info('Current Attenuator-DUT Chain Map: {}'.format( |
| self.atten_dut_chain_map[testcase_params['channel']])) |
| for idx, atten in enumerate(self.attenuators): |
| if self.atten_dut_chain_map[testcase_params['channel']][ |
| idx] == testcase_params['attenuated_chain']: |
| atten.offset = atten.instrument.max_atten |
| |
| def extract_test_id(self, testcase_params, id_fields): |
| test_id = collections.OrderedDict( |
| (param, testcase_params[param]) for param in id_fields) |
| return test_id |
| |
| def get_start_atten(self, testcase_params): |
| """Gets the starting attenuation for this sensitivity test. |
| |
| The function gets the starting attenuation by checking whether a test |
| as the next higher MCS has been executed. If so it sets the starting |
| point a configurable number of dBs below the next MCS's sensitivity. |
| |
| Returns: |
| start_atten: starting attenuation for current test |
| """ |
| # If the test is being retried, start from the beginning |
| if self.retry_flag: |
| self.log.info('Retry flag set. Setting attenuation to minimum.') |
| return self.testclass_params['atten_start'] |
| # Get the current and reference test config. The reference test is the |
| # one performed at the current MCS+1 |
| current_rate = testcase_params['rate'] |
| ref_test_params = self.extract_test_id( |
| testcase_params, |
| ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) |
| if 'legacy' in testcase_params['mode']: |
| if testcase_params['channel'] <= 13: |
| rate_list = self.VALID_RATES['legacy_2GHz'] |
| else: |
| rate_list = self.VALID_RATES['legacy_5GHz'] |
| ref_index = max( |
| 0, |
| rate_list.index(self.RateTuple(current_rate, 1, current_rate)) |
| - 1) |
| ref_test_params['rate'] = rate_list[ref_index].mcs |
| else: |
| ref_test_params['rate'] = current_rate + 1 |
| |
| # Check if reference test has been run and set attenuation accordingly |
| previous_params = [ |
| self.extract_test_id( |
| result['testcase_params'], |
| ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) |
| for result in self.testclass_results |
| ] |
| |
| try: |
| ref_index = previous_params.index(ref_test_params) |
| start_atten = self.testclass_results[ref_index][ |
| 'atten_at_range'] - ( |
| self.testclass_params['adjacent_mcs_range_gap']) |
| except ValueError: |
| self.log.warning( |
| 'Reference test not found. Starting from {} dB'.format( |
| self.testclass_params['atten_start'])) |
| start_atten = self.testclass_params['atten_start'] |
| start_atten = max(start_atten, 0) |
| return start_atten |
| |
| def compile_test_params(self, testcase_params): |
| """Function that generates test params based on the test name.""" |
| # Check if test should be skipped. |
| wputils.check_skip_conditions(testcase_params, self.dut, |
| self.access_point, |
| getattr(self, 'ota_chamber', None)) |
| |
| band = self.access_point.band_lookup_by_channel( |
| testcase_params['channel']) |
| testcase_params['band'] = band |
| testcase_params['test_network'] = self.main_network[band] |
| if testcase_params['chain_mask'] in ['0', '1']: |
| testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format( |
| 1 if testcase_params['chain_mask'] == '0' else 0) |
| else: |
| # Set attenuated chain to -1. Do not set to None as this will be |
| # compared to RF chain map which may include None |
| testcase_params['attenuated_chain'] = -1 |
| |
| self.testclass_params[ |
| 'range_ping_loss_threshold'] = 100 - self.testclass_params[ |
| 'throughput_pct_at_sensitivity'] |
| if self.testclass_params['traffic_type'] == 'UDP': |
| testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format( |
| self.testclass_params['iperf_duration'], |
| self.testclass_params['UDP_rates'][testcase_params['mode']]) |
| elif self.testclass_params['traffic_type'] == 'TCP': |
| testcase_params['iperf_args'] = '-i 1 -t {} -J'.format( |
| self.testclass_params['iperf_duration']) |
| |
| if self.testclass_params['traffic_type'] != 'ping' and isinstance( |
| self.iperf_client, iperf_client.IPerfClientOverAdb): |
| testcase_params['iperf_args'] += ' -R' |
| testcase_params['use_client_output'] = True |
| else: |
| testcase_params['use_client_output'] = False |
| |
| return testcase_params |
| |
| def _test_sensitivity(self, testcase_params): |
| """Function that gets called for each test case |
| |
| The function gets called in each rvr test case. The function customizes |
| the rvr test based on the test name of the test that called it |
| """ |
| # Compile test parameters from config and test name |
| testcase_params = self.compile_test_params(testcase_params) |
| testcase_params.update(self.testclass_params) |
| testcase_params['atten_start'] = self.get_start_atten(testcase_params) |
| num_atten_steps = int( |
| (testcase_params['atten_stop'] - testcase_params['atten_start']) / |
| testcase_params['atten_step']) |
| testcase_params['atten_range'] = [ |
| testcase_params['atten_start'] + x * testcase_params['atten_step'] |
| for x in range(0, num_atten_steps) |
| ] |
| |
| # Prepare devices and run test |
| if testcase_params['traffic_type'].lower() == 'ping': |
| self.setup_ping_test(testcase_params) |
| result = self.run_ping_test(testcase_params) |
| self.process_ping_test_results(testcase_params, result) |
| else: |
| self.setup_rvr_test(testcase_params) |
| result = self.run_rvr_test(testcase_params) |
| self.process_rvr_test_results(testcase_params, result) |
| |
| # Post-process results |
| self.testclass_results.append(result) |
| self.pass_fail_check(result) |
| |
| def generate_test_cases(self, channels, modes, chain_mask): |
| """Function that auto-generates test cases for a test class.""" |
| test_cases = [] |
| for channel in channels: |
| requested_modes = [ |
| mode for mode in modes |
| if mode in self.VALID_TEST_CONFIGS[channel] |
| ] |
| for mode in requested_modes: |
| bandwidth = int(''.join([x for x in mode if x.isdigit()])) |
| if 'VHT' in mode: |
| rates = self.VALID_RATES[mode] |
| elif 'HT' in mode: |
| rates = self.VALID_RATES[mode] |
| elif 'legacy' in mode and channel < 14: |
| rates = self.VALID_RATES['legacy_2GHz'] |
| elif 'legacy' in mode and channel > 14: |
| rates = self.VALID_RATES['legacy_5GHz'] |
| else: |
| raise ValueError('Invalid test mode.') |
| for chain, rate in itertools.product(chain_mask, rates): |
| testcase_params = collections.OrderedDict( |
| channel=channel, |
| mode=mode, |
| bandwidth=bandwidth, |
| rate=rate.mcs, |
| num_streams=rate.streams, |
| short_gi=1, |
| chain_mask=chain) |
| if chain in ['0', '1'] and rate[1] == 2: |
| # Do not test 2-stream rates in single chain mode |
| continue |
| if 'legacy' in mode: |
| testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}' |
| '_ch{}'.format( |
| channel, mode, |
| str(rate.mcs).replace('.', 'p'), |
| rate.streams, chain)) |
| else: |
| testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}' |
| '_ch{}'.format( |
| channel, mode, rate.mcs, |
| rate.streams, chain)) |
| setattr(self, testcase_name, |
| partial(self._test_sensitivity, testcase_params)) |
| test_cases.append(testcase_name) |
| return test_cases |
| |
| |
| class WifiSensitivity_AllChannels_Test(WifiSensitivityTest): |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.tests = self.generate_test_cases( |
| [6, 36, 40, 44, 48, 149, 153, 157, 161], |
| ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2']) |
| |
| |
| class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest): |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.tests = self.generate_test_cases([6, 36, 149], |
| ['VHT20', 'VHT40', 'VHT80'], |
| ['0', '1', '2x2']) |
| |
| |
| class WifiSensitivity_2GHz_Test(WifiSensitivityTest): |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'], |
| ['0', '1', '2x2']) |
| |
| |
| class WifiSensitivity_5GHz_Test(WifiSensitivityTest): |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.tests = self.generate_test_cases( |
| [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'], |
| ['0', '1', '2x2']) |
| |
| |
| class WifiSensitivity_UNII1_Test(WifiSensitivityTest): |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.tests = self.generate_test_cases([36, 40, 44, 48], |
| ['VHT20', 'VHT40', 'VHT80'], |
| ['0', '1', '2x2']) |
| |
| |
| class WifiSensitivity_UNII3_Test(WifiSensitivityTest): |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.tests = self.generate_test_cases([149, 153, 157, 161], |
| ['VHT20', 'VHT40', 'VHT80'], |
| ['0', '1', '2x2']) |
| |
| |
| # Over-the air version of senstivity tests |
| class WifiOtaSensitivityTest(WifiSensitivityTest): |
| """Class to test over-the-air senstivity. |
| |
| This class implements measures WiFi sensitivity tests in an OTA chamber. |
| It allows setting orientation and other chamber parameters to study |
| performance in varying channel conditions |
| """ |
| |
| def __init__(self, controllers): |
| base_test.BaseTestClass.__init__(self, controllers) |
| self.testcase_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_case()) |
| self.testclass_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_class()) |
| self.publish_testcase_metrics = False |
| |
| def setup_class(self): |
| WifiSensitivityTest.setup_class(self) |
| self.current_chain_mask = '2x2' |
| self.ota_chamber = ota_chamber.create( |
| self.user_params['OTAChamber'])[0] |
| |
| def teardown_class(self): |
| WifiSensitivityTest.teardown_class(self) |
| self.ota_chamber.reset_chamber() |
| |
| def setup_sensitivity_test(self, testcase_params): |
| # Setup turntable |
| self.ota_chamber.set_orientation(testcase_params['orientation']) |
| # Continue test setup |
| WifiSensitivityTest.setup_sensitivity_test(self, testcase_params) |
| |
| def setup_dut(self, testcase_params): |
| """Sets up the DUT in the configuration required by the test. |
| |
| Args: |
| testcase_params: dict containing AP and other test params |
| """ |
| # Configure the right INI settings |
| wputils.set_chain_mask(self.dut, testcase_params['chain_mask']) |
| # Turn screen off to preserve battery |
| if self.testbed_params.get('screen_on', |
| False) or self.testclass_params.get( |
| 'screen_on', False): |
| self.dut.droid.wakeLockAcquireDim() |
| else: |
| self.dut.go_to_sleep() |
| self.validate_and_connect(testcase_params) |
| self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] |
| |
| def process_testclass_results(self): |
| """Saves and plots test results from all executed test cases.""" |
| self.plot_per_curves() |
| testclass_results_dict = collections.OrderedDict() |
| id_fields = ['channel', 'mode', 'rate'] |
| plots = [] |
| for result in self.testclass_results: |
| test_id = self.extract_test_id(result['testcase_params'], |
| id_fields) |
| test_id = tuple(test_id.items()) |
| chain_mask = result['testcase_params']['chain_mask'] |
| num_streams = result['testcase_params']['num_streams'] |
| line_id = (chain_mask, num_streams) |
| if test_id not in testclass_results_dict: |
| testclass_results_dict[test_id] = collections.OrderedDict() |
| if line_id not in testclass_results_dict[test_id]: |
| testclass_results_dict[test_id][line_id] = { |
| 'orientation': [], |
| 'sensitivity': [] |
| } |
| orientation = result['testcase_params']['orientation'] |
| if result['peak_throughput_pct'] >= 95: |
| sensitivity = result['sensitivity'] |
| else: |
| sensitivity = float('nan') |
| if orientation not in testclass_results_dict[test_id][line_id][ |
| 'orientation']: |
| testclass_results_dict[test_id][line_id]['orientation'].append( |
| orientation) |
| testclass_results_dict[test_id][line_id]['sensitivity'].append( |
| sensitivity) |
| else: |
| testclass_results_dict[test_id][line_id]['sensitivity'][ |
| -1] = sensitivity |
| |
| for test_id, test_data in testclass_results_dict.items(): |
| test_id_dict = dict(test_id) |
| if 'legacy' in test_id_dict['mode']: |
| test_id_str = 'Channel {} - {} {}Mbps'.format( |
| test_id_dict['channel'], test_id_dict['mode'], |
| test_id_dict['rate']) |
| else: |
| test_id_str = 'Channel {} - {} MCS{}'.format( |
| test_id_dict['channel'], test_id_dict['mode'], |
| test_id_dict['rate']) |
| curr_plot = BokehFigure(title=str(test_id_str), |
| x_label='Orientation (deg)', |
| primary_y_label='Sensitivity (dBm)') |
| for line_id, line_results in test_data.items(): |
| curr_plot.add_line(line_results['orientation'], |
| line_results['sensitivity'], |
| legend='Nss{} - Chain Mask {}'.format( |
| line_id[1], line_id[0]), |
| marker='circle') |
| if 'legacy' in test_id_dict['mode']: |
| metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format( |
| test_id_dict['channel'], test_id_dict['mode'], |
| test_id_dict['rate'], line_id[0]) |
| else: |
| metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format( |
| test_id_dict['channel'], test_id_dict['mode'], |
| test_id_dict['rate'], line_id[1], line_id[0]) |
| |
| metric_name = metric_tag + '.avg_sensitivity' |
| metric_value = numpy.nanmean(line_results['sensitivity']) |
| self.testclass_metric_logger.add_metric( |
| metric_name, metric_value) |
| self.log.info(('Average Sensitivity for {}: {:.1f}').format( |
| metric_tag, metric_value)) |
| current_context = ( |
| context.get_current_context().get_full_output_path()) |
| output_file_path = os.path.join(current_context, |
| str(test_id_str) + '.html') |
| curr_plot.generate_figure(output_file_path) |
| plots.append(curr_plot) |
| output_file_path = os.path.join(current_context, 'results.html') |
| BokehFigure.save_figures(plots, output_file_path) |
| |
| def get_start_atten(self, testcase_params): |
| """Gets the starting attenuation for this sensitivity test. |
| |
| The function gets the starting attenuation by checking whether a test |
| at the same rate configuration has executed. If so it sets the starting |
| point a configurable number of dBs below the reference test. |
| |
| Returns: |
| start_atten: starting attenuation for current test |
| """ |
| # If the test is being retried, start from the beginning |
| if self.retry_flag: |
| self.log.info('Retry flag set. Setting attenuation to minimum.') |
| return self.testclass_params['atten_start'] |
| # Get the current and reference test config. The reference test is the |
| # one performed at the current MCS+1 |
| ref_test_params = self.extract_test_id( |
| testcase_params, |
| ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) |
| # Check if reference test has been run and set attenuation accordingly |
| previous_params = [ |
| self.extract_test_id( |
| result['testcase_params'], |
| ['channel', 'mode', 'rate', 'num_streams', 'chain_mask']) |
| for result in self.testclass_results |
| ] |
| try: |
| ref_index = previous_params[::-1].index(ref_test_params) |
| ref_index = len(previous_params) - 1 - ref_index |
| start_atten = self.testclass_results[ref_index][ |
| 'atten_at_range'] - ( |
| self.testclass_params['adjacent_mcs_range_gap']) |
| except ValueError: |
| print('Reference test not found. Starting from {} dB'.format( |
| self.testclass_params['atten_start'])) |
| start_atten = self.testclass_params['atten_start'] |
| start_atten = max(start_atten, 0) |
| return start_atten |
| |
| def generate_test_cases(self, channels, modes, requested_rates, chain_mask, |
| angles): |
| """Function that auto-generates test cases for a test class.""" |
| test_cases = [] |
| for channel in channels: |
| requested_modes = [ |
| mode for mode in modes |
| if mode in self.VALID_TEST_CONFIGS[channel] |
| ] |
| for chain, mode in itertools.product(chain_mask, requested_modes): |
| bandwidth = int(''.join([x for x in mode if x.isdigit()])) |
| if 'VHT' in mode: |
| valid_rates = self.VALID_RATES[mode] |
| elif 'HT' in mode: |
| valid_rates = self.VALID_RATES[mode] |
| elif 'legacy' in mode and channel < 14: |
| valid_rates = self.VALID_RATES['legacy_2GHz'] |
| elif 'legacy' in mode and channel > 14: |
| valid_rates = self.VALID_RATES['legacy_5GHz'] |
| else: |
| raise ValueError('Invalid test mode.') |
| for rate, angle in itertools.product(valid_rates, angles): |
| testcase_params = collections.OrderedDict( |
| channel=channel, |
| mode=mode, |
| bandwidth=bandwidth, |
| rate=rate.mcs, |
| num_streams=rate.streams, |
| short_gi=1, |
| chain_mask=chain, |
| orientation=angle) |
| if rate not in requested_rates: |
| continue |
| if str(chain) in ['0', '1'] and rate[1] == 2: |
| # Do not test 2-stream rates in single chain mode |
| continue |
| if 'legacy' in mode: |
| testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}' |
| '_ch{}_{}deg'.format( |
| channel, mode, |
| str(rate.mcs).replace('.', 'p'), |
| rate.streams, chain, angle)) |
| else: |
| testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}' |
| '_ch{}_{}deg'.format( |
| channel, mode, rate.mcs, |
| rate.streams, chain, angle)) |
| setattr(self, testcase_name, |
| partial(self._test_sensitivity, testcase_params)) |
| test_cases.append(testcase_name) |
| return test_cases |
| |
| |
| class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest): |
| |
| def __init__(self, controllers): |
| WifiOtaSensitivityTest.__init__(self, controllers) |
| requested_channels = [6, 36, 149] |
| requested_rates = [ |
| self.RateTuple(8, 1, 86.7), |
| self.RateTuple(6, 1, 65), |
| self.RateTuple(2, 1, 21.7), |
| self.RateTuple(8, 2, 173.3), |
| self.RateTuple(6, 2, 130.3), |
| self.RateTuple(2, 2, 43.3) |
| ] |
| self.tests = self.generate_test_cases(requested_channels, |
| ['VHT20', 'VHT80'], |
| requested_rates, ['2x2'], |
| list(range(0, 360, 10))) |
| |
| |
| class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest): |
| |
| def __init__(self, controllers): |
| WifiOtaSensitivityTest.__init__(self, controllers) |
| requested_channels = [6, 36, 149] |
| requested_rates = [ |
| self.RateTuple(9, 1, 96), |
| self.RateTuple(9, 2, 192), |
| self.RateTuple(6, 1, 65), |
| self.RateTuple(6, 2, 130.3), |
| self.RateTuple(2, 1, 21.7), |
| self.RateTuple(2, 2, 43.3) |
| ] |
| self.tests = self.generate_test_cases(requested_channels, |
| ['VHT20', 'VHT80'], |
| requested_rates, [0, 1, '2x2'], |
| list(range(0, 360, 10))) |
| |
| |
| class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest): |
| |
| def __init__(self, controllers): |
| WifiOtaSensitivityTest.__init__(self, controllers) |
| requested_channels = [6, 36, 149] |
| requested_rates = [ |
| self.RateTuple(9, 1, 96), |
| self.RateTuple(8, 1, 86.7), |
| self.RateTuple(7, 1, 72.2), |
| self.RateTuple(4, 1, 43.3), |
| self.RateTuple(2, 1, 21.7), |
| self.RateTuple(0, 1, 7.2), |
| self.RateTuple(9, 2, 192), |
| self.RateTuple(8, 2, 173.3), |
| self.RateTuple(7, 2, 144.4), |
| self.RateTuple(4, 2, 86.7), |
| self.RateTuple(2, 2, 43.3), |
| self.RateTuple(0, 2, 14.4) |
| ] |
| self.tests = self.generate_test_cases(requested_channels, |
| ['VHT20', 'VHT80'], |
| requested_rates, ['2x2'], |
| list(range(0, 360, 30))) |
| |
| |
| class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest): |
| |
| def __init__(self, controllers): |
| WifiOtaSensitivityTest.__init__(self, controllers) |
| requested_rates = [ |
| self.RateTuple(8, 1, 86.7), |
| self.RateTuple(2, 1, 21.7), |
| self.RateTuple(8, 2, 173.3), |
| self.RateTuple(2, 2, 43.3) |
| ] |
| self.tests = self.generate_test_cases( |
| [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'], |
| requested_rates, ['2x2'], list(range(0, 360, 45))) |