| #!/usr/bin/env python3.4 |
| # |
| # Copyright 2017 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import collections |
| import itertools |
| import json |
| import logging |
| import os |
| import statistics |
| from acts import asserts |
| from acts import context |
| from acts import base_test |
| from acts import utils |
| from acts.controllers.utils_lib import ssh |
| from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger |
| from acts.test_utils.wifi import ota_chamber |
| from acts.test_utils.wifi import wifi_performance_test_utils as wputils |
| from acts.test_utils.wifi import wifi_retail_ap as retail_ap |
| from acts.test_utils.wifi import wifi_test_utils as wutils |
| from functools import partial |
| |
| |
| class WifiPingTest(base_test.BaseTestClass): |
| """Class for ping-based Wifi performance tests. |
| |
| This class implements WiFi ping performance tests such as range and RTT. |
| The class setups up the AP in the desired configurations, configures |
| and connects the phone to the AP, and runs For an example config file to |
| run this test class see example_connectivity_performance_ap_sta.json. |
| """ |
| |
| TEST_TIMEOUT = 10 |
| RSSI_POLL_INTERVAL = 0.2 |
| SHORT_SLEEP = 1 |
| MED_SLEEP = 5 |
| MAX_CONSECUTIVE_ZEROS = 5 |
| DISCONNECTED_PING_RESULT = { |
| 'connected': 0, |
| 'rtt': [], |
| 'time_stamp': [], |
| 'ping_interarrivals': [], |
| 'packet_loss_percentage': 100 |
| } |
| |
| def __init__(self, controllers): |
| base_test.BaseTestClass.__init__(self, controllers) |
| self.testcase_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_case()) |
| self.testclass_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_class()) |
| self.publish_testcase_metrics = True |
| |
| self.tests = self.generate_test_cases( |
| ap_power='standard', |
| channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], |
| modes=['VHT20', 'VHT40', 'VHT80'], |
| test_types=[ |
| 'test_ping_range', 'test_fast_ping_rtt', 'test_slow_ping_rtt' |
| ]) |
| |
| def setup_class(self): |
| self.dut = self.android_devices[-1] |
| req_params = [ |
| 'ping_test_params', 'testbed_params', 'main_network', |
| 'RetailAccessPoints', 'RemoteServer' |
| ] |
| opt_params = ['golden_files_list'] |
| self.unpack_userparams(req_params, opt_params) |
| self.testclass_params = self.ping_test_params |
| self.num_atten = self.attenuators[0].instrument.num_atten |
| self.ping_server = ssh.connection.SshConnection( |
| ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) |
| self.access_point = retail_ap.create(self.RetailAccessPoints)[0] |
| self.log.info('Access Point Configuration: {}'.format( |
| self.access_point.ap_settings)) |
| self.log_path = os.path.join(logging.log_path, 'results') |
| utils.create_dir(self.log_path) |
| if not hasattr(self, 'golden_files_list'): |
| self.golden_files_list = [ |
| os.path.join(self.testbed_params['golden_results_path'], file) |
| for file in os.listdir( |
| self.testbed_params['golden_results_path']) |
| ] |
| if hasattr(self, 'bdf'): |
| self.log.info('Pushing WiFi BDF to DUT.') |
| wputils.push_bdf(self.dut, self.bdf) |
| if hasattr(self, 'firmware'): |
| self.log.info('Pushing WiFi firmware to DUT.') |
| wlanmdsp = [ |
| file for file in self.firmware if "wlanmdsp.mbn" in file |
| ][0] |
| data_msc = [file for file in self.firmware |
| if "Data.msc" in file][0] |
| wputils.push_firmware(self.dut, wlanmdsp, data_msc) |
| self.testclass_results = [] |
| |
| # Turn WiFi ON |
| if self.testclass_params.get('airplane_mode', 1): |
| self.log.info('Turning on airplane mode.') |
| asserts.assert_true( |
| utils.force_airplane_mode(self.dut, True), |
| "Can not turn on airplane mode.") |
| wutils.wifi_toggle_state(self.dut, True) |
| |
| def teardown_class(self): |
| # Turn WiFi OFF |
| for dev in self.android_devices: |
| wutils.wifi_toggle_state(dev, False) |
| self.process_testclass_results() |
| |
| def process_testclass_results(self): |
| """Saves all test results to enable comparison.""" |
| testclass_summary = {} |
| for test in self.testclass_results: |
| if 'range' in test['test_name']: |
| testclass_summary[test['test_name']] = test['range'] |
| # Save results |
| results_file_path = os.path.join(self.log_path, |
| 'testclass_summary.json') |
| with open(results_file_path, 'w') as results_file: |
| json.dump(testclass_summary, results_file, indent=4) |
| |
| def pass_fail_check_ping_rtt(self, result): |
| """Check the test result and decide if it passed or failed. |
| |
| The function computes RTT statistics and fails any tests in which the |
| tail of the ping latency results exceeds the threshold defined in the |
| configuration file. |
| |
| Args: |
| result: dict containing ping results and other meta data |
| """ |
| ignored_fraction = (self.testclass_params['rtt_ignored_interval'] / |
| self.testclass_params['rtt_ping_duration']) |
| sorted_rtt = [ |
| sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):]) |
| for x in result['ping_results'] |
| ] |
| try: |
| mean_rtt = [statistics.mean(x) for x in sorted_rtt] |
| std_rtt = [statistics.stdev(x) for x in sorted_rtt] |
| except statistics.StatisticsError: |
| self.log.debug("Ping Result: {}".format(result['ping_results'])) |
| self.log.debug("Sorted RTT: {}".format(sorted_rtt)) |
| rtt_at_test_percentile = [ |
| x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) * |
| len(x))] for x in sorted_rtt |
| ] |
| # Set blackbox metric |
| if self.publish_testcase_metrics: |
| self.testcase_metric_logger.add_metric('ping_rtt', |
| max(rtt_at_test_percentile)) |
| # Evaluate test pass/fail |
| test_failed = False |
| for idx, rtt in enumerate(rtt_at_test_percentile): |
| if rtt > self.testclass_params['rtt_threshold'] * 1000: |
| test_failed = True |
| self.log.info( |
| 'RTT Failed. Test %ile RTT = {}ms. Mean = {}ms. Stdev = {}' |
| .format(rtt, mean_rtt[idx], std_rtt[idx])) |
| if test_failed: |
| asserts.fail('RTT above threshold') |
| else: |
| asserts.explicit_pass( |
| 'Test Passed. RTTs at test percentile = {}'.format( |
| rtt_at_test_percentile)) |
| |
| def pass_fail_check_ping_range(self, result): |
| """Check the test result and decide if it passed or failed. |
| |
| Checks whether the attenuation at which ping packet losses begin to |
| exceed the threshold matches the range derived from golden |
| rate-vs-range result files. The test fails is ping range is |
| range_gap_threshold worse than RvR range. |
| |
| Args: |
| result: dict containing ping results and meta data |
| """ |
| # Get target range |
| rvr_range = self.get_range_from_rvr() |
| # Set Blackbox metric |
| if self.publish_testcase_metrics: |
| self.testcase_metric_logger.add_metric('ping_range', |
| result['range']) |
| # Evaluate test pass/fail |
| test_message = ('Attenuation at range is {}dB. Golden range is {}dB. ' |
| 'LLStats at Range: {}'.format( |
| result['range'], rvr_range, |
| result['llstats_at_range'])) |
| if result['range'] - rvr_range < -self.testclass_params[ |
| 'range_gap_threshold']: |
| asserts.fail(test_message) |
| else: |
| asserts.explicit_pass(test_message) |
| |
| def pass_fail_check(self, result): |
| if 'range' in result['testcase_params']['test_type']: |
| self.pass_fail_check_ping_range(result) |
| else: |
| self.pass_fail_check_ping_rtt(result) |
| |
| def process_ping_results(self, testcase_params, ping_range_result): |
| """Saves and plots ping results. |
| |
| Args: |
| ping_range_result: dict containing ping results and metadata |
| """ |
| # Compute range |
| ping_loss_over_att = [ |
| x['packet_loss_percentage'] |
| for x in ping_range_result['ping_results'] |
| ] |
| ping_loss_above_threshold = [ |
| x > self.testclass_params['range_ping_loss_threshold'] |
| for x in ping_loss_over_att |
| ] |
| for idx in range(len(ping_loss_above_threshold)): |
| if all(ping_loss_above_threshold[idx:]): |
| range_index = max(idx, 1) - 1 |
| break |
| else: |
| range_index = -1 |
| ping_range_result['atten_at_range'] = testcase_params['atten_range'][ |
| range_index] |
| ping_range_result['peak_throughput_pct'] = 100 - min( |
| ping_loss_over_att) |
| ping_range_result['range'] = (ping_range_result['atten_at_range'] + |
| ping_range_result['fixed_attenuation']) |
| ping_range_result['llstats_at_range'] = ( |
| 'TX MCS = {0} ({1:.1f}%). ' |
| 'RX MCS = {2} ({3:.1f}%)'.format( |
| ping_range_result['llstats'][range_index]['summary'] |
| ['common_tx_mcs'], ping_range_result['llstats'][range_index] |
| ['summary']['common_tx_mcs_freq'] * 100, |
| ping_range_result['llstats'][range_index]['summary'] |
| ['common_rx_mcs'], ping_range_result['llstats'][range_index] |
| ['summary']['common_rx_mcs_freq'] * 100)) |
| |
| # Save results |
| results_file_path = os.path.join( |
| self.log_path, '{}.json'.format(self.current_test_name)) |
| with open(results_file_path, 'w') as results_file: |
| json.dump(ping_range_result, results_file, indent=4) |
| |
| # Plot results |
| if 'range' not in self.current_test_name: |
| figure = wputils.BokehFigure( |
| self.current_test_name, |
| x_label='Timestamp (s)', |
| primary_y_label='Round Trip Time (ms)') |
| for idx, result in enumerate(ping_range_result['ping_results']): |
| if len(result['rtt']) > 1: |
| x_data = [ |
| t - result['time_stamp'][0] |
| for t in result['time_stamp'] |
| ] |
| figure.add_line( |
| x_data, result['rtt'], 'RTT @ {}dB'.format( |
| ping_range_result['attenuation'][idx])) |
| |
| output_file_path = os.path.join( |
| self.log_path, '{}.html'.format(self.current_test_name)) |
| figure.generate_figure(output_file_path) |
| |
| def get_range_from_rvr(self): |
| """Function gets range from RvR golden results |
| |
| The function fetches the attenuation at which the RvR throughput goes |
| to zero. |
| |
| Returns: |
| rvr_range: range derived from looking at rvr curves |
| """ |
| # Fetch the golden RvR results |
| test_name = self.current_test_name |
| rvr_golden_file_name = 'test_rvr_TCP_DL_' + '_'.join( |
| test_name.split('_')[3:]) |
| golden_path = [ |
| file_name for file_name in self.golden_files_list |
| if rvr_golden_file_name in file_name |
| ] |
| if len(golden_path) == 0: |
| rvr_range = float('nan') |
| return rvr_range |
| |
| # Get 0 Mbps attenuation and backoff by low_rssi_backoff_from_range |
| with open(golden_path[0], 'r') as golden_file: |
| golden_results = json.load(golden_file) |
| try: |
| atten_idx = golden_results['throughput_receive'].index(0) |
| rvr_range = (golden_results['attenuation'][atten_idx - 1] + |
| golden_results['fixed_attenuation']) |
| except ValueError: |
| rvr_range = float('nan') |
| return rvr_range |
| |
| def run_ping_test(self, testcase_params): |
| """Main function to test ping. |
| |
| The function sets up the AP in the correct channel and mode |
| configuration and calls get_ping_stats while sweeping attenuation |
| |
| Args: |
| testcase_params: dict containing all test parameters |
| Returns: |
| test_result: dict containing ping results and other meta data |
| """ |
| # Prepare results dict |
| llstats_obj = wputils.LinkLayerStats(self.dut) |
| test_result = collections.OrderedDict() |
| test_result['testcase_params'] = testcase_params.copy() |
| test_result['test_name'] = self.current_test_name |
| test_result['ap_config'] = self.access_point.ap_settings.copy() |
| test_result['attenuation'] = testcase_params['atten_range'] |
| test_result['fixed_attenuation'] = self.testbed_params[ |
| 'fixed_attenuation'][str(testcase_params['channel'])] |
| test_result['rssi_results'] = [] |
| test_result['ping_results'] = [] |
| test_result['llstats'] = [] |
| # Run ping and sweep attenuation as needed |
| zero_counter = 0 |
| for atten in testcase_params['atten_range']: |
| for attenuator in self.attenuators: |
| attenuator.set_atten(atten, strict=False) |
| rssi_future = wputils.get_connected_rssi_nb( |
| self.dut, |
| int(testcase_params['ping_duration'] / 2 / |
| self.RSSI_POLL_INTERVAL), self.RSSI_POLL_INTERVAL, |
| testcase_params['ping_duration'] / 2) |
| # Refresh link layer stats |
| llstats_obj.update_stats() |
| current_ping_stats = wputils.get_ping_stats( |
| self.ping_server, self.dut_ip, |
| testcase_params['ping_duration'], |
| testcase_params['ping_interval'], testcase_params['ping_size']) |
| current_rssi = rssi_future.result() |
| test_result['rssi_results'].append(current_rssi) |
| llstats_obj.update_stats() |
| curr_llstats = llstats_obj.llstats_incremental.copy() |
| test_result['llstats'].append(curr_llstats) |
| if current_ping_stats['connected']: |
| self.log.info( |
| 'Attenuation = {0}dB\tPacket Loss = {1}%\t' |
| 'Avg RTT = {2:.2f}ms\tRSSI = {3} [{4},{5}]\t'.format( |
| atten, current_ping_stats['packet_loss_percentage'], |
| statistics.mean(current_ping_stats['rtt']), |
| current_rssi['signal_poll_rssi']['mean'], |
| current_rssi['chain_0_rssi']['mean'], |
| current_rssi['chain_1_rssi']['mean'])) |
| if current_ping_stats['packet_loss_percentage'] == 100: |
| zero_counter = zero_counter + 1 |
| else: |
| zero_counter = 0 |
| else: |
| self.log.info( |
| 'Attenuation = {}dB. Disconnected.'.format(atten)) |
| zero_counter = zero_counter + 1 |
| test_result['ping_results'].append(current_ping_stats.as_dict()) |
| if zero_counter == self.MAX_CONSECUTIVE_ZEROS: |
| self.log.info('Ping loss stable at 100%. Stopping test now.') |
| for idx in range( |
| len(testcase_params['atten_range']) - |
| len(test_result['ping_results'])): |
| test_result['ping_results'].append( |
| self.DISCONNECTED_PING_RESULT) |
| break |
| return test_result |
| |
| def setup_ap(self, testcase_params): |
| """Sets up the access point in the configuration required by the test. |
| |
| Args: |
| testcase_params: dict containing AP and other test params |
| """ |
| band = self.access_point.band_lookup_by_channel( |
| testcase_params['channel']) |
| if '2G' in band: |
| frequency = wutils.WifiEnums.channel_2G_to_freq[ |
| testcase_params['channel']] |
| else: |
| frequency = wutils.WifiEnums.channel_5G_to_freq[ |
| testcase_params['channel']] |
| if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: |
| self.access_point.set_region(self.testbed_params['DFS_region']) |
| else: |
| self.access_point.set_region(self.testbed_params['default_region']) |
| self.access_point.set_channel(band, testcase_params['channel']) |
| self.access_point.set_bandwidth(band, testcase_params['mode']) |
| if 'low' in testcase_params['ap_power']: |
| self.log.info('Setting low AP power.') |
| self.access_point.set_power(band, 0) |
| self.log.info('Access Point Configuration: {}'.format( |
| self.access_point.ap_settings)) |
| |
| def setup_dut(self, testcase_params): |
| """Sets up the DUT in the configuration required by the test. |
| |
| Args: |
| testcase_params: dict containing AP and other test params |
| """ |
| # Check battery level before test |
| if not wputils.health_check(self.dut, 10): |
| asserts.skip('Battery level too low. Skipping test.') |
| # Turn screen off to preserve battery |
| self.dut.go_to_sleep() |
| band = self.access_point.band_lookup_by_channel( |
| testcase_params['channel']) |
| current_network = self.dut.droid.wifiGetConnectionInfo() |
| try: |
| connected = wutils.validate_connection(self.dut) is not None |
| except: |
| connected = False |
| if connected and current_network['SSID'] == self.main_network[band][ |
| 'SSID']: |
| self.log.info('Already connected to desired network') |
| else: |
| wutils.reset_wifi(self.dut) |
| self.dut.droid.wifiSetCountryCode( |
| self.testclass_params['country_code']) |
| self.main_network[band]['channel'] = testcase_params['channel'] |
| wutils.wifi_connect( |
| self.dut, |
| self.main_network[band], |
| num_of_tries=5, |
| check_connectivity=False) |
| self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] |
| |
| def setup_ping_test(self, testcase_params): |
| """Function that gets devices ready for the test. |
| |
| Args: |
| testcase_params: dict containing test-specific parameters |
| """ |
| # Configure AP |
| self.setup_ap(testcase_params) |
| # Set attenuator to 0 dB |
| for attenuator in self.attenuators: |
| attenuator.set_atten(0, strict=False) |
| # Reset, configure, and connect DUT |
| self.setup_dut(testcase_params) |
| |
| def get_range_start_atten(self, testcase_params): |
| """Gets the starting attenuation for this ping test. |
| |
| This function is used to get the starting attenuation for ping range |
| tests. This implementation returns the default starting attenuation, |
| however, defining this function enables a more involved configuration |
| for over-the-air test classes. |
| |
| Args: |
| testcase_params: dict containing all test params |
| """ |
| return self.testclass_params['range_atten_start'] |
| |
| def compile_test_params(self, testcase_params): |
| if testcase_params['test_type'] == 'test_ping_range': |
| testcase_params.update( |
| ping_interval=self.testclass_params['range_ping_interval'], |
| ping_duration=self.testclass_params['range_ping_duration'], |
| ping_size=self.testclass_params['ping_size'], |
| ) |
| elif testcase_params['test_type'] == 'test_fast_ping_rtt': |
| testcase_params.update( |
| ping_interval=self.testclass_params['rtt_ping_interval'] |
| ['fast'], |
| ping_duration=self.testclass_params['rtt_ping_duration'], |
| ping_size=self.testclass_params['ping_size'], |
| ) |
| elif testcase_params['test_type'] == 'test_slow_ping_rtt': |
| testcase_params.update( |
| ping_interval=self.testclass_params['rtt_ping_interval'] |
| ['slow'], |
| ping_duration=self.testclass_params['rtt_ping_duration'], |
| ping_size=self.testclass_params['ping_size']) |
| |
| if testcase_params['test_type'] == 'test_ping_range': |
| start_atten = self.get_range_start_atten(testcase_params) |
| num_atten_steps = int( |
| (self.testclass_params['range_atten_stop'] - start_atten) / |
| self.testclass_params['range_atten_step']) |
| testcase_params['atten_range'] = [ |
| start_atten + x * self.testclass_params['range_atten_step'] |
| for x in range(0, num_atten_steps) |
| ] |
| else: |
| testcase_params['atten_range'] = self.testclass_params[ |
| 'rtt_test_attenuation'] |
| return testcase_params |
| |
| def _test_ping(self, testcase_params): |
| """ Function that gets called for each range test case |
| |
| The function gets called in each range test case. It customizes the |
| range test based on the test name of the test that called it |
| |
| Args: |
| testcase_params: dict containing preliminary set of parameters |
| """ |
| # Compile test parameters from config and test name |
| testcase_params = self.compile_test_params(testcase_params) |
| # Run ping test |
| self.setup_ping_test(testcase_params) |
| ping_result = self.run_ping_test(testcase_params) |
| # Postprocess results |
| self.testclass_results.append(ping_result) |
| self.process_ping_results(testcase_params, ping_result) |
| self.pass_fail_check(ping_result) |
| |
| def generate_test_cases(self, ap_power, channels, modes, test_types): |
| test_cases = [] |
| allowed_configs = { |
| 'VHT20': [ |
| 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153, |
| 157, 161 |
| ], |
| 'VHT40': [36, 44, 149, 157], |
| 'VHT80': [36, 149] |
| } |
| for channel, mode, test_type in itertools.product( |
| channels, modes, test_types): |
| if channel not in allowed_configs[mode]: |
| continue |
| testcase_name = '{}_ch{}_{}'.format(test_type, channel, mode) |
| testcase_params = collections.OrderedDict( |
| test_type=test_type, |
| ap_power=ap_power, |
| channel=channel, |
| mode=mode, |
| ) |
| setattr(self, testcase_name, |
| partial(self._test_ping, testcase_params)) |
| test_cases.append(testcase_name) |
| return test_cases |
| |
| |
| class WifiPing_LowPowerAP_Test(WifiPingTest): |
| def __init__(self, controllers): |
| super().__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='low_power', |
| channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], |
| modes=['VHT20', 'VHT40', 'VHT80'], |
| test_types=['test_ping_range']) |
| |
| |
| # Over-the air version of ping tests |
| class WifiOtaPingTest(WifiPingTest): |
| """Class to test over-the-air ping |
| |
| This class tests WiFi ping performance in an OTA chamber. It enables |
| setting turntable orientation and other chamber parameters to study |
| performance in varying channel conditions |
| """ |
| |
| def __init__(self, controllers): |
| base_test.BaseTestClass.__init__(self, controllers) |
| self.testcase_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_case()) |
| self.testclass_metric_logger = ( |
| BlackboxMappedMetricLogger.for_test_class()) |
| self.publish_testcase_metrics = False |
| |
| def setup_class(self): |
| WifiPingTest.setup_class(self) |
| self.ota_chamber = ota_chamber.create( |
| self.user_params['OTAChamber'])[0] |
| |
| def teardown_class(self): |
| self.process_testclass_results() |
| self.ota_chamber.reset_chamber() |
| |
| def process_testclass_results(self): |
| """Saves all test results to enable comparison.""" |
| WifiPingTest.process_testclass_results(self) |
| |
| range_vs_angle = collections.OrderedDict() |
| for test in self.testclass_results: |
| curr_params = test['testcase_params'] |
| curr_config = curr_params['channel'] |
| if curr_config in range_vs_angle: |
| range_vs_angle[curr_config]['position'].append( |
| curr_params['position']) |
| range_vs_angle[curr_config]['range'].append(test['range']) |
| range_vs_angle[curr_config]['llstats_at_range'].append( |
| test['llstats_at_range']) |
| else: |
| range_vs_angle[curr_config] = { |
| 'position': [curr_params['position']], |
| 'range': [test['range']], |
| 'llstats_at_range': [test['llstats_at_range']] |
| } |
| chamber_mode = self.testclass_results[0]['testcase_params'][ |
| 'chamber_mode'] |
| if chamber_mode == 'orientation': |
| x_label = 'Angle (deg)' |
| elif chamber_mode == 'stepped stirrers': |
| x_label = 'Position Index' |
| figure = wputils.BokehFigure( |
| title='Range vs. Position', |
| x_label=x_label, |
| primary_y_label='Range (dB)', |
| ) |
| for channel, channel_data in range_vs_angle.items(): |
| figure.add_line( |
| x_data=channel_data['position'], |
| y_data=channel_data['range'], |
| hover_text=channel_data['llstats_at_range'], |
| legend='Channel {}'.format(channel)) |
| average_range = sum(channel_data['range']) / len( |
| channel_data['range']) |
| self.log.info('Average range for Channel {} is: {}dB'.format( |
| channel, average_range)) |
| metric_name = 'ota_summary_ch{}.avg_range'.format(channel) |
| self.testclass_metric_logger.add_metric(metric_name, average_range) |
| current_context = context.get_current_context().get_full_output_path() |
| plot_file_path = os.path.join(current_context, 'results.html') |
| figure.generate_figure(plot_file_path) |
| |
| # Save results |
| results_file_path = os.path.join(current_context, |
| 'testclass_summary.json') |
| with open(results_file_path, 'w') as results_file: |
| json.dump(range_vs_angle, results_file, indent=4) |
| |
| def setup_ping_test(self, testcase_params): |
| WifiPingTest.setup_ping_test(self, testcase_params) |
| # Setup turntable |
| if testcase_params['chamber_mode'] == 'orientation': |
| self.ota_chamber.set_orientation(testcase_params['position']) |
| elif testcase_params['chamber_mode'] == 'stepped stirrers': |
| self.ota_chamber.step_stirrers(testcase_params['total_positions']) |
| |
| def extract_test_id(self, testcase_params, id_fields): |
| test_id = collections.OrderedDict( |
| (param, testcase_params[param]) for param in id_fields) |
| return test_id |
| |
| def get_range_start_atten(self, testcase_params): |
| """Gets the starting attenuation for this ping test. |
| |
| The function gets the starting attenuation by checking whether a test |
| at the same configuration has executed. If so it sets the starting |
| point a configurable number of dBs below the reference test. |
| |
| Returns: |
| start_atten: starting attenuation for current test |
| """ |
| # Get the current and reference test config. The reference test is the |
| # one performed at the current MCS+1 |
| ref_test_params = self.extract_test_id(testcase_params, |
| ['channel', 'mode']) |
| # Check if reference test has been run and set attenuation accordingly |
| previous_params = [ |
| self.extract_test_id(result['testcase_params'], |
| ['channel', 'mode']) |
| for result in self.testclass_results |
| ] |
| try: |
| ref_index = previous_params[::-1].index(ref_test_params) |
| ref_index = len(previous_params) - 1 - ref_index |
| start_atten = self.testclass_results[ref_index][ |
| 'atten_at_range'] - ( |
| self.testclass_params['adjacent_range_test_gap']) |
| except ValueError: |
| print('Reference test not found. Starting from {} dB'.format( |
| self.testclass_params['range_atten_start'])) |
| start_atten = self.testclass_params['range_atten_start'] |
| return start_atten |
| |
| def generate_test_cases(self, ap_power, channels, modes, chamber_mode, |
| positions): |
| test_cases = [] |
| allowed_configs = { |
| 'VHT20': [ |
| 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153, |
| 157, 161 |
| ], |
| 'VHT40': [36, 44, 149, 157], |
| 'VHT80': [36, 149] |
| } |
| for channel, mode, position in itertools.product( |
| channels, modes, positions): |
| if channel not in allowed_configs[mode]: |
| continue |
| testcase_name = 'test_ping_range_ch{}_{}_pos{}'.format( |
| channel, mode, position) |
| testcase_params = collections.OrderedDict( |
| test_type='test_ping_range', |
| ap_power=ap_power, |
| channel=channel, |
| mode=mode, |
| chamber_mode=chamber_mode, |
| total_positions=len(positions), |
| position=position) |
| setattr(self, testcase_name, |
| partial(self._test_ping, testcase_params)) |
| test_cases.append(testcase_name) |
| return test_cases |
| |
| |
| class WifiOtaPing_TenDegree_Test(WifiOtaPingTest): |
| def __init__(self, controllers): |
| WifiOtaPingTest.__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='standard', |
| channels=[6, 36, 149], |
| modes=['VHT20'], |
| chamber_mode='orientation', |
| positions=list(range(0, 360, 10))) |
| |
| |
| class WifiOtaPing_45Degree_Test(WifiOtaPingTest): |
| def __init__(self, controllers): |
| WifiOtaPingTest.__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='standard', |
| channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], |
| modes=['VHT20'], |
| chamber_mode='orientation', |
| positions=list(range(0, 360, 45))) |
| |
| |
| class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest): |
| def __init__(self, controllers): |
| WifiOtaPingTest.__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='standard', |
| channels=[6, 36, 149], |
| modes=['VHT20'], |
| chamber_mode='stepped stirrers', |
| positions=list(range(100))) |
| |
| |
| class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest): |
| def __init__(self, controllers): |
| WifiOtaPingTest.__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='low_power', |
| channels=[6, 36, 149], |
| modes=['VHT20'], |
| chamber_mode='orientation', |
| positions=list(range(0, 360, 10))) |
| |
| |
| class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest): |
| def __init__(self, controllers): |
| WifiOtaPingTest.__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='low_power', |
| channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], |
| modes=['VHT20'], |
| chamber_mode='orientation', |
| positions=list(range(0, 360, 45))) |
| |
| |
| class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest): |
| def __init__(self, controllers): |
| WifiOtaPingTest.__init__(self, controllers) |
| self.tests = self.generate_test_cases( |
| ap_power='low_power', |
| channels=[6, 36, 149], |
| modes=['VHT20'], |
| chamber_mode='stepped stirrers', |
| positions=list(range(100))) |