blob: 706903c80fd48f445441fc88bafb550eca9fbd02 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import csv
import itertools
import json
import logging
import math
import os
import re
import scipy.stats
import time
from acts import asserts
from acts import context
from acts import base_test
from acts import utils
from acts.controllers.utils_lib import ssh
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts_contrib.test_utils.wifi import ota_sniffer
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from functools import partial
class WifiTxPowerCheckTest(base_test.BaseTestClass):
"""Class for ping-based Wifi performance tests.
This class implements WiFi ping performance tests such as range and RTT.
The class setups up the AP in the desired configurations, configures
and connects the phone to the AP, and runs For an example config file to
run this test class see example_connectivity_performance_ap_sta.json.
"""
TEST_TIMEOUT = 10
RSSI_POLL_INTERVAL = 0.2
SHORT_SLEEP = 1
MED_SLEEP = 5
MAX_CONSECUTIVE_ZEROS = 5
DISCONNECTED_PING_RESULT = {
'connected': 0,
'rtt': [],
'time_stamp': [],
'ping_interarrivals': [],
'packet_loss_percentage': 100
}
BRCM_SAR_MAPPING = {
0: 'disable',
1: 'head',
2: 'grip',
16: 'bt',
32: 'hotspot'
}
BAND_TO_CHANNEL_MAP = {
('2g', 1): [1, 6, 11],
('5g', 1): [36, 40, 44, 48],
('5g', 2): [52, 56, 60, 64],
('5g', 3): range(100, 148, 4),
('5g', 4): [149, 153, 157, 161],
('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)],
('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)],
('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)],
('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)],
('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)],
('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)]
}
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
BlackboxMappedMetricLogger.for_test_case())
self.testclass_metric_logger = (
BlackboxMappedMetricLogger.for_test_class())
self.publish_testcase_metrics = True
self.tests = self.generate_test_cases(
ap_power='standard',
channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'],
modes=['bw20', 'bw40', 'bw80', 'bw160'],
test_types=[
'test_tx_power',
],
country_codes=['US', 'GB', 'JP'],
sar_states=range(0, 13))
def setup_class(self):
self.dut = self.android_devices[-1]
req_params = [
'tx_power_test_params', 'testbed_params', 'main_network',
'RetailAccessPoints', 'RemoteServer'
]
opt_params = ['OTASniffer']
self.unpack_userparams(req_params, opt_params)
self.testclass_params = self.tx_power_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
self.ping_server = ssh.connection.SshConnection(
ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
if hasattr(self,
'OTASniffer') and self.testbed_params['sniffer_enable']:
try:
self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
except:
self.log.warning('Could not start sniffer. Disabling sniffs.')
self.testbed_params['sniffer_enable'] = 0
self.log.info('Access Point Configuration: {}'.format(
self.access_point.ap_settings))
self.log_path = os.path.join(logging.log_path, 'results')
os.makedirs(self.log_path, exist_ok=True)
self.atten_dut_chain_map = {}
self.testclass_results = []
# Turn WiFi ON
if self.testclass_params.get('airplane_mode', 1):
self.log.info('Turning on airplane mode.')
asserts.assert_true(utils.force_airplane_mode(self.dut, True),
'Can not turn on airplane mode.')
wutils.wifi_toggle_state(self.dut, True)
self.dut.droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
"Failed to enable WiFi verbose logging.")
# decode nvram
self.nvram_sar_data = self.read_nvram_sar_data()
self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv'])
def teardown_class(self):
# Turn WiFi OFF and reset AP
self.access_point.teardown()
for dev in self.android_devices:
wutils.wifi_toggle_state(dev, False)
dev.go_to_sleep()
self.process_testclass_results()
def setup_test(self):
self.retry_flag = False
def teardown_test(self):
self.retry_flag = False
def on_retry(self):
"""Function to control test logic on retried tests.
This function is automatically executed on tests that are being
retried. In this case the function resets wifi, toggles it off and on
and sets a retry_flag to enable further tweaking the test logic on
second attempts.
"""
self.retry_flag = True
for dev in self.android_devices:
wutils.reset_wifi(dev)
wutils.toggle_wifi_off_and_on(dev)
def read_sar_csv(self, sar_csv):
"""Reads SAR powers from CSV.
This function reads SAR powers from a CSV and generate a dictionary
with all programmed TX powers on a per band and regulatory domain
basis.
Args:
sar_csv: path to SAR data file.
Returns:
sar_powers: dict containing all SAR data
"""
sar_powers = {}
sar_csv_data = []
with open(sar_csv, mode='r') as f:
reader = csv.DictReader(f)
for row in reader:
row['Sub-band Powers'] = [
float(val) for key, val in row.items()
if 'Sub-band' in key and val != ''
]
sar_csv_data.append(row)
for row in sar_csv_data:
sar_powers.setdefault(int(row['Scenario Index']), {})
sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {})
sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band'])
sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault(
sar_row_key, {})
sar_powers[int(
row['Scenario Index'])]['SAR Powers'][sar_row_key][int(
row['Chain'])] = row['Sub-band Powers']
return sar_powers
def read_nvram_sar_data(self):
"""Reads SAR powers from NVRAM.
This function reads SAR powers from the NVRAM found on the DUT and
generates a dictionary with all programmed TX powers on a per band and
regulatory domain basis. NThe NVRAM file is chosen based on the build,
but if no NVRAM file is found matching the expected name, the default
NVRAM will be loaded. The choice of NVRAM is not guaranteed to be
correct.
Returns:
nvram_sar_data: dict containing all SAR data
"""
self._read_sar_config_info()
try:
hardware_version = self.dut.adb.shell(
'getprop ro.boot.hardware.revision')
nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format(
hardware_version)
nvram = self.dut.adb.shell('cat {}'.format(nvram_path))
except:
nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal')
current_context = context.get_current_context().get_full_output_path()
file_path = os.path.join(current_context, 'nvram_file')
with open(file_path, 'w') as file:
file.write(nvram)
nvram_sar_data = {}
for line in nvram.splitlines():
if 'dynsar' in line:
sar_config, sar_powers = self._parse_nvram_sar_line(line)
nvram_sar_data[sar_config] = sar_powers
file_path = os.path.join(current_context, 'nvram_sar_data')
with open(file_path, 'w') as file:
json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4)
return nvram_sar_data
def _read_sar_config_info(self):
"""Function to read SAR scenario mapping,
This function reads sar_config.info file which contains the mapping
of SAR scenarios to NVRAM data tables.
"""
self.sar_state_mapping = collections.OrderedDict([(-1, {
"google_name":
'WIFI_POWER_SCENARIO_DISABLE'
}), (0, {
"google_name": 'WIFI_POWER_SCENARIO_DISABLE'
}), (1, {
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF'
}), (2, {
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON'
}), (3, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF'
}), (4, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON'
}), (5, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT'
}), (6, {
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT'
}), (7, {
"google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW'
}), (8, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT'
}), (9, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT'
}), (10, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT'
}), (11, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW'
}), (12, {
"google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW'
})])
sar_config_path = '/vendor/firmware/sarconfig.info'
sar_config = self.dut.adb.shell(
'cat {}'.format(sar_config_path)).splitlines()
sar_config = [line.split(',') for line in sar_config]
sar_config = [[int(x) for x in line] for line in sar_config]
for sar_state in sar_config:
self.sar_state_mapping[sar_state[0]]['brcm_index'] = (
self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2]))
current_context = context.get_current_context().get_full_output_path()
file_path = os.path.join(current_context, 'sarconfig')
with open(file_path, 'w') as file:
json.dump(wputils.serialize_dict(self.sar_state_mapping),
file,
indent=4)
def _parse_nvram_sar_line(self, sar_line):
"""Helper function to decode SAR NVRAM data lines.
Args:
sar_line: single line of text from NVRAM file containing SAR data.
Returns:
sar_config: sar config referenced in this line
decoded_values: tx powers configured in this line
"""
sar_config = collections.OrderedDict()
list_of_countries = ['fcc', 'jp']
try:
sar_config['country'] = next(country
for country in list_of_countries
if country in sar_line)
except:
sar_config['country'] = 'row'
list_of_sar_states = ['grip', 'bt', 'hotspot']
try:
sar_config['state'] = next(state for state in list_of_sar_states
if state in sar_line)
except:
sar_config['state'] = 'head'
list_of_bands = ['2g', '5g', '6g']
sar_config['band'] = next(band for band in list_of_bands
if band in sar_line)
sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo'
sar_config['airplane_mode'] = '_2=' in sar_line
sar_powers = sar_line.split('=')[1].split(',')
decoded_powers = []
for sar_power in sar_powers:
decoded_powers.append([
(int(sar_power[2:4], 16) & int('7f', 16)) / 4,
(int(sar_power[4:], 16) & int('7f', 16)) / 4
])
return tuple(sar_config.values()), decoded_powers
def get_sar_power_from_nvram(self, testcase_params):
"""Function to get current expected SAR power from nvram
This functions gets the expected SAR TX power from the DUT NVRAM data.
The SAR power is looked up based on the current channel and regulatory
domain,
Args:
testcase_params: dict containing channel, sar state, country code
Returns:
sar_config: current expected sar config
sar_powers: current expected sar powers
"""
if testcase_params['country_code'] == 'US':
reg_domain = 'fcc'
elif testcase_params['country_code'] == 'JP':
reg_domain = 'jp'
else:
reg_domain = 'row'
for band, channels in self.BAND_TO_CHANNEL_MAP.items():
if testcase_params['channel'] in channels:
current_band = band[0]
sub_band_idx = band[1]
break
sar_config = (reg_domain, self.sar_state_mapping[
testcase_params['sar_state']]['brcm_index'][0], current_band,
'mimo', self.sar_state_mapping[
testcase_params['sar_state']]['brcm_index'][1])
sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1]
return sar_config, sar_powers
def get_sar_power_from_csv(self, testcase_params):
"""Function to get current expected SAR power from CSV.
This functions gets the expected SAR TX power from the DUT NVRAM data.
The SAR power is looked up based on the current channel and regulatory
domain,
Args:
testcase_params: dict containing channel, sar state, country code
Returns:
sar_config: current expected sar config
sar_powers: current expected sar powers
"""
if testcase_params['country_code'] == 'US':
reg_domain = 'fcc'
elif testcase_params['country_code'] == 'JP':
reg_domain = 'jp'
else:
reg_domain = 'row'
for band, channels in self.BAND_TO_CHANNEL_MAP.items():
if testcase_params['channel'] in channels:
current_band = band[0]
sub_band_idx = band[1]
break
sar_config = (reg_domain, 'mimo', current_band)
sar_powers = [
self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
[sar_config][0][sub_band_idx - 1],
self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
[sar_config][1][sub_band_idx - 1]
]
return sar_config, sar_powers
def process_wl_curpower(self, wl_curpower_file, testcase_params):
"""Function to parse wl_curpower output.
Args:
wl_curpower_file: path to curpower output file.
testcase_params: dict containing channel, sar state, country code
Returns:
wl_curpower_dict: dict formatted version of curpower data.
"""
with open(wl_curpower_file, 'r') as file:
wl_curpower_out = file.read()
channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)')
bandwidth_regex = re.compile(
r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n')
channel = int(
re.search(channel_regex, wl_curpower_out).group('channel'))
bandwidth = int(
re.search(bandwidth_regex, wl_curpower_out).group('bandwidth'))
regulatory_limits = self.generate_regulatory_table(
wl_curpower_out, channel, bandwidth)
board_limits = self.generate_board_limit_table(wl_curpower_out,
channel, bandwidth)
wl_curpower_dict = {
'channel': channel,
'bandwidth': bandwidth,
'country': testcase_params['country_code'],
'regulatory_limits': regulatory_limits,
'board_limits': board_limits
}
return wl_curpower_dict
def generate_regulatory_table(self, wl_curpower_out, channel, bw):
""""Helper function to generate regulatory limit table from curpower.
Args:
wl_curpower_out: curpower output
channel: current channel
bw: current bandwidth
Returns:
regulatory_table: dict with regulatory limits for current config
"""
regulatory_group_map = {
'DSSS':
[('CCK', rate, 1)
for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]],
'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [
'{}Mbps'.format(mbps)
for mbps in [6, 9, 12, 18, 24, 36, 48, 54]
]],
'MCS0_7_CDD1':
[(mode, rate, 1)
for (mode,
rate) in itertools.product(['HT' + str(bw), 'VHT' +
str(bw)], range(0, 8))],
'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1),
('VHT' + str(bw), 9, 1)],
'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1),
('VHT' + str(bw), 11, 1)],
'MCS8_15':
[(mode, rate - 8 * ('VHT' in mode), 2)
for (mode,
rate) in itertools.product(['HT' + str(bw), 'VHT' +
str(bw)], range(8, 16))],
'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)],
'VHT10_11SS2': [('VHT' + str(bw), 10, 2),
('VHT' + str(bw), 11, 2)],
'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1)
for rate in range(0, 12)],
'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2)
for rate in range(0, 12)],
}
tx_power_regex = re.compile(
'(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
)
regulatory_section_regex = re.compile(
r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:')
regulatory_list = re.search(regulatory_section_regex,
wl_curpower_out).group('regulatory_limits')
regulatory_list = re.findall(tx_power_regex, regulatory_list)
regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list}
bw_index = int(math.log(bw / 10, 2)) - 1
regulatory_table = collections.OrderedDict()
for regulatory_group, rates in regulatory_group_map.items():
for rate in rates:
reg_power = regulatory_dict.get(regulatory_group,
['0', '0', '0', '0'])[bw_index]
regulatory_table[rate] = float(
reg_power) if reg_power != '-' else 0
return regulatory_table
def generate_board_limit_table(self, wl_curpower_out, channel, bw):
""""Helper function to generate board limit table from curpower.
Args:
wl_curpower_out: curpower output
channel: current channel
bw: current bandwidth
Returns:
board_limit_table: dict with board limits for current config
"""
tx_power_regex = re.compile(
'(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
)
board_section_regex = re.compile(
r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:')
board_limits_list = re.search(board_section_regex,
wl_curpower_out).group('board_limits')
board_limits_list = re.findall(tx_power_regex, board_limits_list)
board_limits_dict = {
entry[0]: entry[2:]
for entry in board_limits_list
}
mcs_regex_list = [[
re.compile('DSSS'),
[('CCK', rate, 1)
for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]]
], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]],
[
re.compile('MCS(?P<mcs>[0-7])_CDD1'),
[('HT{}'.format(bw), '{}', 1),
('VHT{}'.format(bw), '{}', 1)]
],
[
re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'),
[('VHT{}'.format(bw), '{}', 1)]
],
[
re.compile('VHT10_11SS1_CDD1'),
[('VHT{}'.format(bw), '10', 1),
('VHT{}'.format(bw), '11', 1)]
],
[
re.compile('MCS(?P<mcs>[0-9]{2})'),
[('HT{}'.format(bw), '{}', 2)]
],
[
re.compile('VHT(?P<mcs>[0-9])SS2'),
[('VHT{}'.format(bw), '{}', 2)]
],
[
re.compile('VHT10_11SS2'),
[('VHT{}'.format(bw), '10', 2),
('VHT{}'.format(bw), '11', 2)]
],
[
re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'),
[('HE{}'.format(bw), '{}', 1)]
],
[
re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'),
[('HE{}'.format(bw), '{}', 2)]
]]
bw_index = int(math.log(bw / 10, 2)) - 1
board_limit_table = collections.OrderedDict()
for mcs, board_limit in board_limits_dict.items():
for mcs_regex_tuple in mcs_regex_list:
mcs_match = re.match(mcs_regex_tuple[0], mcs)
if mcs_match:
for possible_mcs in mcs_regex_tuple[1]:
try:
curr_mcs = (possible_mcs[0],
possible_mcs[1].format(
mcs_match.group('mcs')),
possible_mcs[2])
except:
curr_mcs = (possible_mcs[0], possible_mcs[1],
possible_mcs[2])
board_limit_table[curr_mcs] = float(
board_limit[bw_index]
) if board_limit[bw_index] != '-' else 0
break
return board_limit_table
def pass_fail_check(self, result):
"""Function to evaluate if current TX powqe matches CSV/NVRAM settings.
This function assesses whether the current TX power reported by the
DUT matches the powers programmed in NVRAM and CSV after applying the
correct TX power backoff used to account for CLPC errors.
"""
if isinstance(result['testcase_params']['channel'],
str) and '6g' in result['testcase_params']['channel']:
mode = 'HE' + str(result['testcase_params']['bandwidth'])
else:
mode = 'VHT' + str(result['testcase_params']['bandwidth'])
regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0,
2)]
if result['testcase_params']['sar_state'] == 0:
#get from wl_curpower
csv_powers = [30, 30]
nvram_powers = [30, 30]
sar_config = 'SAR DISABLED'
else:
sar_config, nvram_powers = self.get_sar_power_from_nvram(
result['testcase_params'])
csv_config, csv_powers = self.get_sar_power_from_csv(
result['testcase_params'])
self.log.info("SAR state: {} ({})".format(
result['testcase_params']['sar_state'],
self.sar_state_mapping[result['testcase_params']['sar_state']],
))
self.log.info("Country Code: {}".format(
result['testcase_params']['country_code']))
self.log.info('BRCM SAR Table: {}'.format(sar_config))
expected_power = [
min([csv_powers[0], regulatory_power]) - 1.5,
min([csv_powers[1], regulatory_power]) - 1.5
]
power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Expected Powers: {}, Reported Powers: {}".format(
nvram_powers, csv_powers, [regulatory_power] * 2, expected_power,
result['tx_powers'])
max_error = max([
abs(expected_power[idx] - result['tx_powers'][idx])
for idx in [0, 1]
])
if max_error > 1:
asserts.fail(power_str)
else:
asserts.explicit_pass(power_str)
def process_testclass_results(self):
pass
def run_tx_power_test(self, testcase_params):
"""Main function to test tx power.
The function sets up the AP & DUT in the correct channel and mode
configuration, starts ping traffic and queries the current TX power.
Args:
testcase_params: dict containing all test parameters
Returns:
test_result: dict containing ping results and other meta data
"""
# Prepare results dict
llstats_obj = wputils.LinkLayerStats(
self.dut, self.testclass_params.get('llstats_enabled', True))
test_result = collections.OrderedDict()
test_result['testcase_params'] = testcase_params.copy()
test_result['test_name'] = self.current_test_name
test_result['ap_config'] = self.access_point.ap_settings.copy()
test_result['attenuation'] = testcase_params['atten_range']
test_result['fixed_attenuation'] = self.testbed_params[
'fixed_attenuation'][str(testcase_params['channel'])]
test_result['rssi_results'] = []
test_result['ping_results'] = []
test_result['llstats'] = []
# Setup sniffer
if self.testbed_params['sniffer_enable']:
self.sniffer.start_capture(
testcase_params['test_network'],
chan=testcase_params['channel'],
bw=testcase_params['bandwidth'],
duration=testcase_params['ping_duration'] *
len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
# Run ping and sweep attenuation as needed
self.log.info('Starting ping.')
thread_future = wputils.get_ping_stats_nb(self.ping_server,
self.dut_ip, 10, 0.02, 64)
for atten in testcase_params['atten_range']:
for attenuator in self.attenuators:
attenuator.set_atten(atten, strict=False, retry=True)
# Set mcs
if isinstance(testcase_params['channel'],
int) and testcase_params['channel'] < 13:
self.dut.adb.shell('wl 2g_rate -v 0x2 -b {}'.format(
testcase_params['bandwidth']))
elif isinstance(testcase_params['channel'],
int) and testcase_params['channel'] > 13:
self.dut.adb.shell('wl 5g_rate -v 0x2 -b {}'.format(
testcase_params['bandwidth']))
else:
self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format(
testcase_params['bandwidth']))
# Set sar state
self.dut.adb.shell('halutil -sar enable {}'.format(
testcase_params['sar_state']))
# Refresh link layer stats
llstats_obj.update_stats()
# Check sar state
self.log.info('Current Country: {}'.format(
self.dut.adb.shell('wl country')))
# Dump last est power multiple times
chain_0_power = []
chain_1_power = []
for idx in range(30):
last_est_out = self.dut.adb.shell(
"wl curpower | grep 'Last est. power'", ignore_status=True)
if "Last est. power" in last_est_out:
per_chain_powers = last_est_out.split(
':')[1].strip().split(' ')
per_chain_powers = [
float(power) for power in per_chain_powers
]
self.log.info(
'Current Tx Powers = {}'.format(per_chain_powers))
if per_chain_powers[0] > 0:
chain_0_power.append(per_chain_powers[0])
if per_chain_powers[1] > 0:
chain_1_power.append(per_chain_powers[1])
time.sleep(0.25)
# Check if empty
if len(chain_0_power) == 0 or len(chain_1_power) == 0:
test_result['tx_powers'] = [0, 0]
tx_power_frequency = [100, 100]
else:
test_result['tx_powers'] = [
scipy.stats.mode(chain_0_power).mode[0],
scipy.stats.mode(chain_1_power).mode[0]
]
tx_power_frequency = [
100 * scipy.stats.mode(chain_0_power).count[0] /
len(chain_0_power),
100 * scipy.stats.mode(chain_1_power).count[0] /
len(chain_0_power)
]
self.log.info(
'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'.
format(test_result['tx_powers'], tx_power_frequency[0],
tx_power_frequency[1]))
llstats_obj.update_stats()
curr_llstats = llstats_obj.llstats_incremental.copy()
test_result['llstats'].append(curr_llstats)
# DUMP wl curpower one
try:
wl_curpower = self.dut.adb.shell('wl curpower')
except:
time.sleep(0.25)
wl_curpower = self.dut.adb.shell('wl curpower',
ignore_status=True)
current_context = context.get_current_context(
).get_full_output_path()
wl_curpower_path = os.path.join(current_context,
'wl_curpower_output')
with open(wl_curpower_path, 'w') as file:
file.write(wl_curpower)
wl_curpower_dict = self.process_wl_curpower(
wl_curpower_path, testcase_params)
wl_curpower_path = os.path.join(current_context,
'wl_curpower_dict')
with open(wl_curpower_path, 'w') as file:
json.dump(wputils.serialize_dict(wl_curpower_dict),
file,
indent=4)
test_result['wl_curpower'] = wl_curpower_dict
thread_future.result()
if self.testbed_params['sniffer_enable']:
self.sniffer.stop_capture()
return test_result
def setup_ap(self, testcase_params):
"""Sets up the access point in the configuration required by the test.
Args:
testcase_params: dict containing AP and other test params
"""
band = self.access_point.band_lookup_by_channel(
testcase_params['channel'])
if '6G' in band:
frequency = wutils.WifiEnums.channel_6G_to_freq[int(
testcase_params['channel'].strip('6g'))]
else:
if testcase_params['channel'] < 13:
frequency = wutils.WifiEnums.channel_2G_to_freq[
testcase_params['channel']]
else:
frequency = wutils.WifiEnums.channel_5G_to_freq[
testcase_params['channel']]
if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
self.access_point.set_region(self.testbed_params['DFS_region'])
else:
self.access_point.set_region(self.testbed_params['default_region'])
self.access_point.set_channel(band, testcase_params['channel'])
self.access_point.set_bandwidth(band, testcase_params['mode'])
if 'low' in testcase_params['ap_power']:
self.log.info('Setting low AP power.')
self.access_point.set_power(
band, self.testclass_params['low_ap_tx_power'])
self.log.info('Access Point Configuration: {}'.format(
self.access_point.ap_settings))
def setup_dut(self, testcase_params):
"""Sets up the DUT in the configuration required by the test.
Args:
testcase_params: dict containing AP and other test params
"""
# Turn screen off to preserve battery
if self.testbed_params.get('screen_on',
False) or self.testclass_params.get(
'screen_on', False):
self.dut.droid.wakeLockAcquireDim()
else:
self.dut.go_to_sleep()
if wputils.validate_network(self.dut,
testcase_params['test_network']['SSID']):
current_country = self.dut.adb.shell('wl country')
self.log.info('Current country code: {}'.format(current_country))
if testcase_params['country_code'] in current_country:
self.log.info('Already connected to desired network')
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses(
'wlan0')[0]
return
testcase_params['test_network']['channel'] = testcase_params['channel']
wutils.wifi_toggle_state(self.dut, False)
wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
wutils.wifi_toggle_state(self.dut, True)
wutils.reset_wifi(self.dut)
if self.testbed_params.get('txbf_off', False):
wputils.disable_beamforming(self.dut)
wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
wutils.wifi_connect(self.dut,
testcase_params['test_network'],
num_of_tries=1,
check_connectivity=True)
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
def setup_tx_power_test(self, testcase_params):
"""Function that gets devices ready for the test.
Args:
testcase_params: dict containing test-specific parameters
"""
# Configure AP
self.setup_ap(testcase_params)
# Set attenuator to 0 dB
for attenuator in self.attenuators:
attenuator.set_atten(0, strict=False, retry=True)
# Reset, configure, and connect DUT
self.setup_dut(testcase_params)
def check_skip_conditions(self, testcase_params):
"""Checks if test should be skipped."""
# Check battery level before test
if not wputils.health_check(self.dut, 10):
asserts.skip('DUT battery level too low.')
if testcase_params[
'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported(
):
asserts.skip('DUT does not support 6 GHz band.')
if not self.access_point.band_lookup_by_channel(
testcase_params['channel']):
asserts.skip('AP does not support requested channel.')
def compile_test_params(self, testcase_params):
"""Function to compile all testcase parameters."""
self.check_skip_conditions(testcase_params)
band = self.access_point.band_lookup_by_channel(
testcase_params['channel'])
testcase_params['test_network'] = self.main_network[band]
testcase_params['attenuated_chain'] = -1
testcase_params.update(
ping_interval=self.testclass_params['ping_interval'],
ping_duration=self.testclass_params['ping_duration'],
ping_size=self.testclass_params['ping_size'],
)
testcase_params['atten_range'] = [0]
return testcase_params
def _test_ping(self, testcase_params):
""" Function that gets called for each range test case
The function gets called in each range test case. It customizes the
range test based on the test name of the test that called it
Args:
testcase_params: dict containing preliminary set of parameters
"""
# Compile test parameters from config and test name
testcase_params = self.compile_test_params(testcase_params)
# Run ping test
self.setup_tx_power_test(testcase_params)
result = self.run_tx_power_test(testcase_params)
self.pass_fail_check(result)
def generate_test_cases(self, ap_power, channels, modes, test_types,
country_codes, sar_states):
"""Function that auto-generates test cases for a test class."""
test_cases = []
allowed_configs = {
20: [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100,
116, 132, 140, 149, 153, 157, 161
],
40: [36, 44, 100, 149, 157],
80: [36, 100, 149],
160: [36, '6g37', '6g117', '6g213']
}
for channel, mode, test_type, country_code, sar_state in itertools.product(
channels, modes, test_types, country_codes, sar_states):
bandwidth = int(''.join([x for x in mode if x.isdigit()]))
if channel not in allowed_configs[bandwidth]:
continue
testcase_name = '{}_ch{}_{}_{}_sar_{}'.format(
test_type, channel, mode, country_code, sar_state)
testcase_params = collections.OrderedDict(
test_type=test_type,
ap_power=ap_power,
channel=channel,
mode=mode,
bandwidth=bandwidth,
country_code=country_code,
sar_state=sar_state)
setattr(self, testcase_name,
partial(self._test_ping, testcase_params))
test_cases.append(testcase_name)
return test_cases