Fix error: "host has no public ip address" for iperf server. am: 7a370a6528
Original change: https://android-review.googlesource.com/c/platform/tools/test/connectivity/+/3188799
Change-Id: I31065a56bcf443e3c818729ecba2bdcb44acfc43
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py b/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py
index 967cace..430f396 100644
--- a/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py
+++ b/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py
@@ -179,6 +179,27 @@
return inner
+ ### Configure Cells
+ def skip_config_if_none_decorator(func):
+ "Decorator function that skips the config function if any args are none"
+
+ def inner(self, *args, **kwargs):
+ none_arg = False
+ for arg in args:
+ if arg is None:
+ none_arg = True
+ for key, value in kwargs.items():
+ if value is None:
+ none_arg = True
+ if none_arg:
+ self.log.warning(
+ 'Skipping {}. Received incomplete arguments.'.format(
+ func.__name__))
+ return
+ return (func(self, *args, **kwargs))
+
+ return inner
+
def assert_cell_off(self, cell_type, cell):
cell_state = self.get_cell_state(cell_type, cell)
if cell_state:
@@ -434,10 +455,46 @@
cell: cell/carrier number
transmission_mode: one of TM1, TM2, TM3, TM4 ...
"""
+
self.assert_cell_off('LTE', cell)
self.send_cmd('BSE:CONFig:LTE:{}:RRC:TMODe {}'.format(
Keysight5GTestApp._format_cells(cell), transmission_mode))
+ @skip_config_if_none_decorator
+ def set_lte_cell_num_layers(self, cell, num_layers):
+ """Function to set LTE cell number of layers."""
+
+ self.assert_cell_off('LTE', cell)
+ self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:NUMLayers {}'.format(
+ Keysight5GTestApp._format_cells(cell), num_layers))
+
+ @skip_config_if_none_decorator
+ def set_lte_cell_num_codewords(self, cell, num_codewords):
+ """Function to set LTE number of codewords."""
+
+ self.assert_cell_off('LTE', cell)
+ self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:NUMCodewords {}'.format(
+ Keysight5GTestApp._format_cells(cell), num_codewords))
+
+ @skip_config_if_none_decorator
+ def set_lte_cell_dl_subframe_allocation(self,
+ cell,
+ dl_subframe_allocation=[1] * 10):
+ """Function to set LTE downlink subrframe allocation.
+
+ Args:
+ cell: cell/carrier number
+ dl_subframe_allocation: string or bool list indicating allocation
+ (1 enabled, 0 disabled)
+ """
+ if isinstance(dl_subframe_allocation, list):
+ dl_subframe_allocation = str(dl_subframe_allocation)[1:-1].replace(
+ '\'', '')
+ self.assert_cell_off('LTE', cell)
+ self.send_cmd(
+ 'BSE:CONFig:LTE:{}:PHY:DL:SFRame:ALLocation:ALL {}'.format(
+ Keysight5GTestApp._format_cells(cell), dl_subframe_allocation))
+
def set_cell_dl_power(self, cell_type, cell, power, full_bw):
"""Function to set cell power
@@ -478,6 +535,7 @@
'BSE:CONFig:{}:{}:UL:CLPControl:TARGet:POWer:PUSCH {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell),
target_power))
+ self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
def set_cell_input_power(self, cell_type, cell, power):
"""Function to set cell input power
@@ -496,11 +554,9 @@
self.send_cmd('BSE:CONFig:{}:{}:MANual:POWer {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), power))
if power == "AUTO" and cell_type == "NR5G":
- self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO ON'.format(
- cell_type))
+ self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO ON'.format(cell_type))
elif cell_type == "NR5G":
- self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO OFF'.format(
- cell_type))
+ self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO OFF'.format(cell_type))
self.send_cmd('BSE:CONFig:{}:{}:MANual:POWer {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), power))
self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
@@ -561,6 +617,25 @@
Keysight5GTestApp._format_cells(cell), slot_ratio))
self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
+ def set_nr_schedule_tdd_pattern(self, cell, tdd_pattern):
+ """Function to set NR schedule to one of predefince quick configs.
+
+ Args:
+ cell: cell number to address. schedule will apply to all cells
+ tdd_pattern: 0 for disabled, 1/enabled, or current
+ """
+ tdd_pattern_mapping = {
+ 0: 'DISabled',
+ 1: 'ENABled',
+ 'current': 'CURRent'
+ }
+ self.assert_cell_off('NR5G', cell)
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:TDD:PATTern {}'.format(
+ Keysight5GTestApp._format_cells(cell),
+ tdd_pattern_mapping[tdd_pattern]))
+ self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
+
def set_nr_cell_mcs(self, cell, dl_mcs, ul_mcs):
"""Function to set NR cell DL & UL MCS
@@ -716,14 +791,26 @@
def configure_channel_emulator(self, cell_type, cell, fading_model):
if cell_type == 'LTE':
- self.send_cmd('BSE:CONFig:{}:{}:CMODel {}'.format(cell_type, Keysight5GTestApp._format_cells(cell), fading_model['channel_model']))
- self.send_cmd('BSE:CONFig:{}:{}:CMATrix {}'.format(cell_type, Keysight5GTestApp._format_cells(cell), fading_model['correlation_matrix']))
- self.send_cmd('BSE:CONFig:{}:{}:MDSHift {}'.format(cell_type, Keysight5GTestApp._format_cells(cell), fading_model['max_doppler']))
+ self.send_cmd('BSE:CONFig:{}:{}:CMODel {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ fading_model['channel_model']))
+ self.send_cmd('BSE:CONFig:{}:{}:CMATrix {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ fading_model['correlation_matrix']))
+ self.send_cmd('BSE:CONFig:{}:{}:MDSHift {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ fading_model['max_doppler']))
elif cell_type == 'NR5G':
#TODO: check that this is FR1
- self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMODel {}'.format(cell_type, Keysight5GTestApp._format_cells(cell), fading_model['channel_model']))
- self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMATrix {}'.format(cell_type, Keysight5GTestApp._format_cells(cell), fading_model['correlation_matrix']))
- self.send_cmd('BSE:CONFig:{}:{}:FRANge1:MDSHift {}'.format(cell_type, Keysight5GTestApp._format_cells(cell), fading_model['max_doppler']))
+ self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMODel {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ fading_model['channel_model']))
+ self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMATrix {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ fading_model['correlation_matrix']))
+ self.send_cmd('BSE:CONFig:{}:{}:FRANge1:MDSHift {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ fading_model['max_doppler']))
def set_channel_emulator_state(self, state):
self.send_cmd('BSE:CONFig:FADing:ENABle {}'.format(int(state)))
@@ -743,14 +830,13 @@
if not self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
raise RuntimeError('LTE must be connected to start aggregation.')
# Continue if LTE connected
- self.send_cmd(
- 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly', 0, 0)
+ self.send_cmd('BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly',
+ 0, 0)
time.sleep(MEDIUM_SLEEP)
error = self.check_error()
if error:
acts_asserts.fail('Failed to apply NR carrier aggregation.')
-
def get_ip_throughput(self, cell_type):
"""Function to query IP layer throughput on LTE or NR
@@ -774,8 +860,7 @@
if cell_type == 'LTE':
tput_response = self.send_cmd(
'BSE:MEASure:LTE:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
- link,
- Keysight5GTestApp._format_cells(cell)), 1)
+ link, Keysight5GTestApp._format_cells(cell)), 1)
elif cell_type == 'NR5G':
# Tester reply format
#progress-count, ack-count, ack-ratio, nack-count, nack-ratio, statdtx-count, statdtx-ratio, pdschBlerCount, pdschBlerRatio, pdschTputRatio.
@@ -812,11 +897,13 @@
tput_result = collections.OrderedDict()
for cell in dl_cells:
tput_result.setdefault(cell, {})
- tput_result[cell]['DL'] = self._get_throughput(cell_type, 'DL', cell)
+ tput_result[cell]['DL'] = self._get_throughput(
+ cell_type, 'DL', cell)
frame_count = tput_result[cell]['DL']['frame_count']
for cell in ul_cells:
tput_result.setdefault(cell, {})
- tput_result[cell]['UL'] = self._get_throughput(cell_type, 'UL', cell)
+ tput_result[cell]['UL'] = self._get_throughput(
+ cell_type, 'UL', cell)
agg_tput = {
'DL': {
'frame_count': frame_count,
@@ -900,18 +987,30 @@
bler_response = self.send_cmd(
'BSE:MEASure:LTE:BTHRoughput:{}:BLER:{}?'.format(
link, Keysight5GTestApp._format_cells(cell)), 1)
- bler_items = ['frame_count','ack_count','ack_ratio','nack_count','nack_ratio',
- 'statDtx_count','statDtx_ratio','nackStatDtx_count','nackStatDtx_ratio',
- 'pdschBler_count','pdschBler_ratio','any_count','any_ratio']
- bler_result = {bler_items[x] : bler_response[x] for x in range(len(bler_response))}
+ bler_items = [
+ 'frame_count', 'ack_count', 'ack_ratio', 'nack_count',
+ 'nack_ratio', 'statDtx_count', 'statDtx_ratio',
+ 'nackStatDtx_count', 'nackStatDtx_ratio', 'pdschBler_count',
+ 'pdschBler_ratio', 'any_count', 'any_ratio'
+ ]
+ bler_result = {
+ bler_items[x]: bler_response[x]
+ for x in range(len(bler_response))
+ }
elif cell_type == 'NR5G':
bler_response = self.send_cmd(
'BSE:MEASure:NR5G:BTHRoughput:{}:BLER:{}?'.format(
link, Keysight5GTestApp._format_cells(cell)), 1)
- bler_items = ['frame_count','ack_count','ack_ratio','nack_count','nack_ratio',
- 'statDtx_count','statDtx_ratio', 'pdschBler_count','pdschBler_ratio','pdschTputRatio']
+ bler_items = [
+ 'frame_count', 'ack_count', 'ack_ratio', 'nack_count',
+ 'nack_ratio', 'statDtx_count', 'statDtx_ratio',
+ 'pdschBler_count', 'pdschBler_ratio', 'pdschTputRatio'
+ ]
- bler_result = {bler_items[x]: bler_response[x] for x in range(len(bler_response))}
+ bler_result = {
+ bler_items[x]: bler_response[x]
+ for x in range(len(bler_response))
+ }
return bler_result
def get_bler_result(self,
@@ -1048,3 +1147,40 @@
self.send_cmd(
'BSE:MEASure:NR5G:{}:L1:RSRPower:REPorts:JSON?'.format(
Keysight5GTestApp._format_cells(cell)), 1))
+
+ def release_rrc_connection(self, cell_type, cell):
+ if cell_type == 'LTE':
+ self.send_cmd('BSE:FUNCtion:LTE:{}:RELease:SEND'.format(
+ Keysight5GTestApp._format_cells(cell)))
+ elif cell_type == 'NR5G':
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:RCONtrol:RRC:STARt RRELease'.format(
+ Keysight5GTestApp._format_cells(cell)))
+
+ def send_rrc_paging(self, cell_type, cell):
+ if cell_type == 'LTE':
+ self.send_cmd('BSE:FUNCtion:LTE:{}:PAGing:PAGE'.format(
+ Keysight5GTestApp._format_cells(cell)))
+ elif cell_type == 'NR5G':
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:RCONtrol:RRC:STARt PAGing'.format(
+ Keysight5GTestApp._format_cells(cell)))
+
+ def enable_rach(self, cell_type, cell, enabled):
+ self.send_cmd('BSE:CONFig:{}:{}:MAC:RACH:IGNore {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell), int(enabled)))
+ self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
+
+ def enable_preamble_report(self, cell_type, enable):
+ self.send_cmd('BSE:CONFig:{}:PReamble:REPort:STATe {}'.format(
+ cell_type, int(enable)))
+
+ def fetch_preamble_report(self, cell_type, cell, num_reports=10):
+ report = self.send_cmd(
+ 'BSE:CONFig:{}:{}:PReamble:REPort:FETCh:JSON? {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell), num_reports),
+ read_response=1)
+ self.send_cmd('BSE:CONFig:{}:PReamble:REPort:CLEAr'.format(cell_type))
+ if 'No Data' in report:
+ report = None
+ return report
diff --git a/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py b/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py
index 2ebcb25..231c00c 100644
--- a/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py
+++ b/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py
@@ -29,13 +29,13 @@
from acts import utils
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts.controllers.utils_lib import ssh
-from acts.controllers.android_lib.tel import tel_utils
from acts.controllers import iperf_server as ipf
+from acts.controllers import power_monitor as power_monitor_lib
from acts_contrib.test_utils.cellular.keysight_5g_testapp import Keysight5GTestApp
from acts_contrib.test_utils.cellular.keysight_chamber import KeysightChamber
from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
-from functools import partial
+from acts_contrib.test_utils.power import plot_utils as power_plot_utils
LONG_SLEEP = 10
MEDIUM_SLEEP = 2
@@ -44,6 +44,25 @@
VERY_SHORT_SLEEP = 0.1
SUBFRAME_LENGTH = 0.001
STOP_COUNTER_LIMIT = 3
+RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
+DEFAULT_MONSOON_FREQUENCY = 500
+PHONE_BATTERY_VOLTAGE_DEFAULT = 4
+
+from functools import wraps
+import logging
+
+
+def suspend_logging(func):
+
+ @wraps(func)
+ def inner(*args, **kwargs):
+ logging.disable(logging.FATAL)
+ try:
+ return func(*args, **kwargs)
+ finally:
+ logging.disable(logging.NOTSET)
+
+ return inner
class CellularThroughputBaseTest(base_test.BaseTestClass):
@@ -60,7 +79,7 @@
self.testclass_metric_logger = (
BlackboxMappedMetricLogger.for_test_class())
self.publish_testcase_metrics = True
- self.testclass_params = None
+ self.testclass_params = {}
def setup_class(self):
"""Initializes common test hardware and parameters.
@@ -70,19 +89,22 @@
"""
# Setup controllers
self.dut = self.android_devices[-1]
+ self.dut_utils = cputils.DeviceUtils(self.dut, self.log)
self.keysight_test_app = Keysight5GTestApp(
self.user_params['Keysight5GTestApp'])
if 'KeysightChamber' in self.user_params:
self.keysight_chamber = KeysightChamber(
self.user_params['KeysightChamber'])
- self.iperf_server = self.iperf_servers[0]
- self.iperf_client = self.iperf_clients[0]
self.remote_server = ssh.connection.SshConnection(
ssh.settings.from_config(
self.user_params['RemoteServer']['ssh_config']))
+ self.unpack_userparams(MonsoonParams=None,
+ bits_root_rail_csv_export=False)
+ self.power_monitor = self.initialize_power_monitor()
+
# Configure Tester
- if self.testclass_params.get('reload_scpi', 1):
+ if self.testclass_params.get('reload_scpi', 0):
self.keysight_test_app.import_scpi_file(
self.testclass_params['scpi_file'])
@@ -93,16 +115,15 @@
self.user_params['retry_tests'] = [self.__class__.__name__]
# Turn Airplane mode on
- #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- # 'Can not turn on airplane mode.')
- tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ self.dut_utils.toggle_airplane_mode(True, False)
def teardown_class(self):
+ if self.power_monitor:
+ self.power_monitor.connect_usb()
+ self.dut.wait_for_boot_completion()
self.log.info('Turning airplane mode on')
try:
- #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- # 'Can not turn on airplane mode.')
- tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ self.dut_utils.toggle_airplane_mode(True, False)
except:
self.log.warning('Cannot perform teardown operations on DUT.')
try:
@@ -114,22 +135,22 @@
def setup_test(self):
self.retry_flag = False
- if self.testclass_params['enable_pixel_logs']:
- cputils.start_pixel_logger(self.dut)
+ if self.testclass_params.get('enable_pixel_logs', 0):
+ self.dut_utils.start_pixel_logger()
def teardown_test(self):
+ if self.power_monitor:
+ self.power_monitor.connect_usb()
self.retry_flag = False
self.log.info('Turing airplane mode on')
- #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- # 'Can not turn on airplane mode.')
- tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ self.dut_utils.toggle_airplane_mode(True, False)
self.log.info('Turning all cells off.')
self.keysight_test_app.turn_all_cells_off()
log_path = os.path.join(
context.get_current_context().get_full_output_path(), 'pixel_logs')
os.makedirs(self.log_path, exist_ok=True)
- if self.testclass_params['enable_pixel_logs']:
- cputils.stop_pixel_logger(self.dut, log_path)
+ if self.testclass_params.get('enable_pixel_logs', 0):
+ self.dut_utils.stop_pixel_logger(log_path)
self.process_testcase_results()
self.pass_fail_check()
@@ -141,14 +162,35 @@
and sets a retry_flag to enable further tweaking the test logic on
second attempts.
"""
- #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- # 'Can not turn on airplane mode.')
- tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ self.dut_utils.toggle_airplane_mode(True, False)
if self.keysight_test_app.get_cell_state('LTE', 'CELL1'):
self.log.info('Turning LTE off.')
self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0)
self.retry_flag = True
+ def initialize_power_monitor(self):
+ """ Initializes the power monitor object.
+
+ Raises an exception if there are no controllers available.
+ """
+
+ device_time = self.dut.adb.shell('echo $EPOCHREALTIME')
+ host_time = time.time()
+ self.log.debug('device start time %s, host start time %s', device_time,
+ host_time)
+ self.device_to_host_offset = float(device_time) - host_time
+ if hasattr(self, 'bitses'):
+ power_monitor = self.bitses[0]
+ power_monitor.setup(registry=self.user_params)
+ elif hasattr(self, 'monsoons'):
+ power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
+ self.monsoons[0])
+ self.monsoons[0].set_max_current(self.MonsoonParams['current'])
+ self.monsoons[0].set_voltage(self.MonsoonParams['voltage'])
+ else:
+ power_monitor = None
+ return power_monitor
+
def pass_fail_check(self):
pass
@@ -188,7 +230,7 @@
'udp_socket_size', None)
testcase_params['iperf_processes'] = self.testclass_params.get(
'udp_processes', 1)
- adb_iperf_server = isinstance(self.iperf_server,
+ adb_iperf_server = isinstance(self.iperf_servers[0],
ipf.IPerfServerOverAdb)
if testcase_params['traffic_direction'] == 'DL':
reverse_direction = 0 if adb_iperf_server else 1
@@ -211,20 +253,20 @@
return testcase_params
def run_iperf_traffic(self, testcase_params):
- self.iperf_server.start(tag=0)
+ self.iperf_servers[0].start(tag=0)
dut_ip = self.dut.droid.connectivityGetIPv4Addresses('rmnet0')[0]
if 'iperf_server_address' in self.testclass_params:
iperf_server_address = self.testclass_params[
'iperf_server_address']
- elif isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
+ elif isinstance(self.iperf_servers[0], ipf.IPerfServerOverAdb):
iperf_server_address = dut_ip
else:
iperf_server_address = wputils.get_server_address(
self.remote_server, dut_ip, '255.255.255.0')
- client_output_path = self.iperf_client.start(
+ client_output_path = self.iperf_clients[0].start(
iperf_server_address, testcase_params['iperf_args'], 0,
self.testclass_params['traffic_duration'] + IPERF_TIMEOUT)
- server_output_path = self.iperf_server.stop()
+ server_output_path = self.iperf_servers[0].stop()
# Parse and log result
if testcase_params['use_client_output']:
iperf_file = client_output_path
@@ -241,6 +283,45 @@
current_throughput = 0
return current_throughput
+ def start_single_throughput_measurement(self, testcase_params):
+ self.log.info('Starting BLER & throughput tests.')
+ if testcase_params['endc_combo_config']['nr_cell_count']:
+ self.keysight_test_app.start_bler_measurement(
+ 'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
+ testcase_params['bler_measurement_length'])
+ if testcase_params['endc_combo_config']['lte_cell_count']:
+ self.keysight_test_app.start_bler_measurement(
+ 'LTE',
+ testcase_params['endc_combo_config']['lte_dl_carriers'][0],
+ testcase_params['bler_measurement_length'])
+ if self.testclass_params['traffic_type'] != 'PHY':
+ #TODO: get iperf to run in non-blocking mode
+ self.log.warning(
+ 'iperf traffic not currently supported with power measurement')
+
+ def stop_single_throughput_measurement(self, testcase_params):
+ result = collections.OrderedDict()
+ if testcase_params['endc_combo_config']['nr_cell_count']:
+ result['nr_bler_result'] = self.keysight_test_app.get_bler_result(
+ 'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
+ testcase_params['endc_combo_config']['nr_ul_carriers'],
+ testcase_params['bler_measurement_length'])
+ result['nr_tput_result'] = self.keysight_test_app.get_throughput(
+ 'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
+ testcase_params['endc_combo_config']['nr_ul_carriers'])
+ if testcase_params['endc_combo_config']['lte_cell_count']:
+ result['lte_bler_result'] = self.keysight_test_app.get_bler_result(
+ cell_type='LTE',
+ dl_cells=testcase_params['endc_combo_config']
+ ['lte_dl_carriers'],
+ ul_cells=testcase_params['endc_combo_config']
+ ['lte_ul_carriers'],
+ length=testcase_params['bler_measurement_length'])
+ result['lte_tput_result'] = self.keysight_test_app.get_throughput(
+ 'LTE', testcase_params['endc_combo_config']['lte_dl_carriers'],
+ testcase_params['endc_combo_config']['lte_ul_carriers'])
+ return result
+
def run_single_throughput_measurement(self, testcase_params):
result = collections.OrderedDict()
self.log.info('Starting BLER & throughput tests.')
@@ -250,7 +331,8 @@
testcase_params['bler_measurement_length'])
if testcase_params['endc_combo_config']['lte_cell_count']:
self.keysight_test_app.start_bler_measurement(
- 'LTE', testcase_params['endc_combo_config']['lte_dl_carriers'][0],
+ 'LTE',
+ testcase_params['endc_combo_config']['lte_dl_carriers'][0],
testcase_params['bler_measurement_length'])
if self.testclass_params['traffic_type'] != 'PHY':
@@ -267,14 +349,95 @@
testcase_params['endc_combo_config']['nr_ul_carriers'])
if testcase_params['endc_combo_config']['lte_cell_count']:
result['lte_bler_result'] = self.keysight_test_app.get_bler_result(
- cell_type='LTE', dl_cells=testcase_params['endc_combo_config']['lte_dl_carriers'],
- ul_cells=testcase_params['endc_combo_config']['lte_ul_carriers'],
+ cell_type='LTE',
+ dl_cells=testcase_params['endc_combo_config']
+ ['lte_dl_carriers'],
+ ul_cells=testcase_params['endc_combo_config']
+ ['lte_ul_carriers'],
length=testcase_params['bler_measurement_length'])
result['lte_tput_result'] = self.keysight_test_app.get_throughput(
'LTE', testcase_params['endc_combo_config']['lte_dl_carriers'],
testcase_params['endc_combo_config']['lte_ul_carriers'])
return result
+ @suspend_logging
+ def meausre_power_silently(self, measurement_time, measurement_wait,
+ data_path):
+ measurement_args = dict(duration=measurement_time,
+ measure_after_seconds=measurement_wait,
+ hz=self.MonsoonParams['frequency'])
+
+ self.power_monitor.measure(measurement_args=measurement_args,
+ measurement_name=self.test_name,
+ start_time=self.device_to_host_offset,
+ monsoon_output_path=data_path)
+
+ def collect_power_data(self,
+ measurement_time,
+ measurement_wait,
+ reconnect_usb=0,
+ measurement_tag=0):
+ """Measure power, plot and take log if needed.
+
+ Returns:
+ A MonsoonResult object.
+ measurement_time: length of power measurement
+ measurement_wait: wait before measurement(within monsoon controller)
+ measurement_tag: tag to append to file names
+ """
+ if self.dut.is_connected():
+ self.dut_utils.stop_services()
+ time.sleep(SHORT_SLEEP)
+ self.dut_utils.log_odpm(
+ os.path.join(
+ context.get_current_context().get_full_output_path(),
+ '{}.txt'.format('before')))
+ self.power_monitor.disconnect_usb()
+ else:
+ self.log.info('DUT already disconnected. Skipping USB operations.')
+
+ self.log.info('Starting power measurement. Duration: {}s. Offset: '
+ '{}s. Voltage: {} V.'.format(
+ measurement_time, measurement_wait,
+ self.MonsoonParams['voltage']))
+ # Collecting current measurement data and plot
+ tag = '{}_{}'.format(self.test_name, measurement_tag)
+ data_path = os.path.join(
+ context.get_current_context().get_full_output_path(),
+ '{}.txt'.format(tag))
+ self.meausre_power_silently(measurement_time, measurement_wait,
+ data_path)
+ self.power_monitor.release_resources()
+ if hasattr(self, 'bitses') and self.bits_root_rail_csv_export:
+ path = os.path.join(
+ context.get_current_context().get_full_output_path(), 'Kibble')
+ self.power_monitor.get_bits_root_rail_csv_export(
+ path, self.test_name)
+
+ if reconnect_usb:
+ self.log.info('Reconnecting USB.')
+ self.power_monitor.connect_usb()
+ self.dut.wait_for_boot_completion()
+ # Save ODPM if applicable
+ self.dut_utils.log_odpm(
+ os.path.join(
+ context.get_current_context().get_full_output_path(),
+ '{}.txt'.format('after')))
+ # Restart Sl4a and other services
+ self.dut_utils.start_services()
+
+ samples = self.power_monitor.get_waveform(file_path=data_path)
+
+ current = [sample[1] for sample in samples]
+ average_current = sum(current) * 1000 / len(current)
+ self.log.info('Average current computed: {}'.format(average_current))
+ plot_title = '{}_{}'.format(self.test_name, measurement_tag)
+ power_plot_utils.current_waveform_plot(
+ samples, self.MonsoonParams['voltage'],
+ context.get_current_context().get_full_output_path(), plot_title)
+
+ return average_current
+
def print_throughput_result(self, result):
# Print Test Summary
if 'nr_tput_result' in result:
@@ -285,16 +448,20 @@
result['nr_tput_result']['total']['DL']['min_tput'],
result['nr_tput_result']['total']['DL']['average_tput'],
result['nr_tput_result']['total']['DL']['max_tput'],
- result['nr_tput_result']['total']['DL']['theoretical_tput'],
- result['nr_bler_result']['total']['DL']['nack_ratio'] * 100))
+ result['nr_tput_result']['total']['DL']
+ ['theoretical_tput'],
+ result['nr_bler_result']['total']['DL']['nack_ratio'] *
+ 100))
self.log.info(
"NR5G UL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
.format(
result['nr_tput_result']['total']['UL']['min_tput'],
result['nr_tput_result']['total']['UL']['average_tput'],
result['nr_tput_result']['total']['UL']['max_tput'],
- result['nr_tput_result']['total']['UL']['theoretical_tput'],
- result['nr_bler_result']['total']['UL']['nack_ratio'] * 100))
+ result['nr_tput_result']['total']['UL']
+ ['theoretical_tput'],
+ result['nr_bler_result']['total']['UL']['nack_ratio'] *
+ 100))
if 'lte_tput_result' in result:
self.log.info(
"LTE DL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
@@ -302,17 +469,22 @@
result['lte_tput_result']['total']['DL']['min_tput'],
result['lte_tput_result']['total']['DL']['average_tput'],
result['lte_tput_result']['total']['DL']['max_tput'],
- result['lte_tput_result']['total']['DL']['theoretical_tput'],
- result['lte_bler_result']['total']['DL']['nack_ratio'] * 100))
+ result['lte_tput_result']['total']['DL']
+ ['theoretical_tput'],
+ result['lte_bler_result']['total']['DL']['nack_ratio'] *
+ 100))
if self.testclass_params['lte_ul_mac_padding']:
self.log.info(
"LTE UL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
.format(
result['lte_tput_result']['total']['UL']['min_tput'],
- result['lte_tput_result']['total']['UL']['average_tput'],
+ result['lte_tput_result']['total']['UL']
+ ['average_tput'],
result['lte_tput_result']['total']['UL']['max_tput'],
- result['lte_tput_result']['total']['UL']['theoretical_tput'],
- result['lte_bler_result']['total']['UL']['nack_ratio'] * 100))
+ result['lte_tput_result']['total']['UL']
+ ['theoretical_tput'],
+ result['lte_bler_result']['total']['UL']['nack_ratio']
+ * 100))
if self.testclass_params['traffic_type'] != 'PHY':
self.log.info("{} Tput: {:.2f} Mbps".format(
self.testclass_params['traffic_type'],
@@ -320,6 +492,7 @@
def setup_tester(self, testcase_params):
# Configure all cells
+ self.keysight_test_app.toggle_contiguous_nr_channels(0)
for cell_idx, cell in enumerate(
testcase_params['endc_combo_config']['cell_list']):
if cell['cell_type'] == 'NR5G':
@@ -336,15 +509,14 @@
testcase_params['cell_power_sweep'][cell_idx][0], 1)
self.keysight_test_app.set_cell_input_power(
cell['cell_type'], cell['cell_number'],
- self.testclass_params['input_power'][cell['cell_type']])
+ self.testclass_params['input_power'][cell['cell_type']])
if cell['cell_type'] == 'LTE' and cell['pcc'] == 0:
pass
else:
self.keysight_test_app.set_cell_ul_power_control(
cell['cell_type'], cell['cell_number'],
self.testclass_params['ul_power_control_mode'],
- self.testclass_params.get('ul_power_control_target',0)
- )
+ self.testclass_params.get('ul_power_control_target', 0))
if cell['cell_type'] == 'NR5G':
self.keysight_test_app.set_nr_subcarrier_spacing(
cell['cell_number'], cell['subcarrier_spacing'])
@@ -360,6 +532,12 @@
if cell['cell_type'] == 'LTE':
self.keysight_test_app.set_lte_cell_transmission_mode(
cell['cell_number'], cell['transmission_mode'])
+ self.keysight_test_app.set_lte_cell_num_codewords(
+ cell['cell_number'], cell['num_codewords'])
+ self.keysight_test_app.set_lte_cell_num_layers(
+ cell['cell_number'], cell['num_layers'])
+ self.keysight_test_app.set_lte_cell_dl_subframe_allocation(
+ cell['cell_number'], cell['dl_subframe_allocation'])
self.keysight_test_app.set_lte_control_region_size(
cell['cell_number'], 1)
if cell['ul_enabled'] and cell['cell_type'] == 'NR5G':
@@ -369,7 +547,8 @@
if 'fading_scenario' in self.testclass_params:
self.keysight_test_app.configure_channel_emulator(
cell['cell_type'], cell['cell_number'],
- self.testclass_params['fading_scenario'][cell['cell_type']])
+ self.testclass_params['fading_scenario'][
+ cell['cell_type']])
if testcase_params.get('force_contiguous_nr_channel', False):
self.keysight_test_app.toggle_contiguous_nr_channels(1)
@@ -386,12 +565,12 @@
if testcase_params['endc_combo_config']['nr_cell_count']:
if 'schedule_scenario' in testcase_params:
self.keysight_test_app.set_nr_cell_schedule_scenario(
- 'CELL1',
- testcase_params['schedule_scenario'])
+ 'CELL1', testcase_params['schedule_scenario'])
if testcase_params['schedule_scenario'] == 'FULL_TPUT':
self.keysight_test_app.set_nr_schedule_slot_ratio(
- 'CELL1',
- testcase_params['schedule_slot_ratio'])
+ 'CELL1', testcase_params['schedule_slot_ratio'])
+ self.keysight_test_app.set_nr_schedule_tdd_pattern(
+ 'CELL1', testcase_params.get('tdd_pattern', 0))
self.keysight_test_app.set_nr_ul_dft_precoding(
'CELL1', testcase_params['transform_precoding'])
self.keysight_test_app.set_nr_cell_mcs(
@@ -410,22 +589,23 @@
cell['cell_type'], cell['cell_number']):
self.log.info('Turning LTE Cell {} on.'.format(
cell['cell_number']))
- self.keysight_test_app.set_cell_state(cell['cell_type'],
- cell['cell_number'], 1)
+ self.keysight_test_app.set_cell_state(
+ cell['cell_type'], cell['cell_number'], 1)
self.log.info('Waiting for LTE connections')
# Turn airplane mode off
num_apm_toggles = 10
for idx in range(num_apm_toggles):
self.log.info('Turning off airplane mode')
- cputils.toggle_airplane_mode(self.log, self.dut, False, False, idx)
+ self.dut_utils.toggle_airplane_mode(False, False, idx)
if self.keysight_test_app.wait_for_cell_status(
- 'LTE', 'CELL1', 'CONN', 10*(idx+1)):
- self.log.info('Connected! Waiting for {} seconds.'.format(LONG_SLEEP))
+ 'LTE', 'CELL1', 'CONN', 10 * (idx + 1)):
+ self.log.info('Connected! Waiting for {} seconds.'.format(
+ LONG_SLEEP))
time.sleep(LONG_SLEEP)
break
elif idx < num_apm_toggles - 1:
self.log.info('Turning on airplane mode')
- cputils.toggle_airplane_mode(self.log, self.dut, True, False, idx)
+ self.dut_utils.toggle_airplane_mode(True, False, idx)
time.sleep(MEDIUM_SLEEP)
else:
asserts.fail('DUT did not connect to LTE.')
@@ -438,7 +618,8 @@
self.keysight_test_app.apply_carrier_agg()
self.log.info('Waiting for 5G connection')
connected = self.keysight_test_app.wait_for_cell_status(
- 'NR5G', testcase_params['endc_combo_config']['nr_cell_count'],
+ 'NR5G',
+ testcase_params['endc_combo_config']['nr_cell_count'],
['ACT', 'CONN'], 60)
if not connected:
asserts.fail('DUT did not connect to NR.')
@@ -451,27 +632,30 @@
cell['cell_type'], cell['cell_number']):
self.log.info('Turning NR Cell {} on.'.format(
cell['cell_number']))
- self.keysight_test_app.set_cell_state(cell['cell_type'],
- cell['cell_number'], 1)
+ self.keysight_test_app.set_cell_state(
+ cell['cell_type'], cell['cell_number'], 1)
num_apm_toggles = 10
for idx in range(num_apm_toggles):
self.log.info('Turning off airplane mode now.')
- cputils.toggle_airplane_mode(self.log, self.dut, False, False, idx)
+ self.dut_utils.toggle_airplane_mode(False, False, idx)
if self.keysight_test_app.wait_for_cell_status(
- 'NR5G', 'CELL1', 'CONN', 10*(idx+1)):
- self.log.info('Connected! Waiting for {} seconds.'.format(LONG_SLEEP))
+ 'NR5G', 'CELL1', 'CONN', 10 * (idx + 1)):
+ self.log.info('Connected! Waiting for {} seconds.'.format(
+ LONG_SLEEP))
time.sleep(LONG_SLEEP)
break
elif idx < num_apm_toggles - 1:
self.log.info('Turning on airplane mode now.')
- cputils.toggle_airplane_mode(self.log, self.dut, True, False, idx)
+ self.dut_utils.toggle_airplane_mode(True, False, idx)
time.sleep(MEDIUM_SLEEP)
else:
asserts.fail('DUT did not connect to NR.')
- if 'fading_scenario' in self.testclass_params and self.testclass_params['fading_scenario']['enable']:
+ if 'fading_scenario' in self.testclass_params and self.testclass_params[
+ 'fading_scenario']['enable']:
self.log.info('Enabling fading.')
- self.keysight_test_app.set_channel_emulator_state(self.testclass_params['fading_scenario']['enable'])
+ self.keysight_test_app.set_channel_emulator_state(
+ self.testclass_params['fading_scenario']['enable'])
else:
self.keysight_test_app.set_channel_emulator_state(0)
@@ -494,10 +678,13 @@
testcase_results['results'] = []
# Setup ota chamber if needed
- if hasattr(self, 'keysight_chamber') and 'orientation' in testcase_params:
+ if hasattr(self,
+ 'keysight_chamber') and 'orientation' in testcase_params:
self.keysight_chamber.move_theta_phi_abs(
- self.keysight_chamber.preset_orientations[testcase_params['orientation']]['theta'],
- self.keysight_chamber.preset_orientations[testcase_params['orientation']]['phi'])
+ self.keysight_chamber.preset_orientations[
+ testcase_params['orientation']]['theta'],
+ self.keysight_chamber.preset_orientations[
+ testcase_params['orientation']]['phi'])
# Setup tester and wait for DUT to connect
self.setup_tester(testcase_params)
@@ -516,8 +703,8 @@
connected = 1
for cell in testcase_params['endc_combo_config']['cell_list']:
if not self.keysight_test_app.wait_for_cell_status(
- cell['cell_type'], cell['cell_number'],
- ['ACT', 'CONN'], VERY_SHORT_SLEEP,VERY_SHORT_SLEEP):
+ cell['cell_type'], cell['cell_number'],
+ ['ACT', 'CONN'], VERY_SHORT_SLEEP, VERY_SHORT_SLEEP):
connected = 0
if not connected:
self.log.info('DUT lost connection to cells. Ending test.')
@@ -534,24 +721,35 @@
1)
result['cell_power'] = cell_power_array
# Start BLER and throughput measurements
- current_throughput = self.run_single_throughput_measurement(testcase_params)
- lte_rx_meas = cputils.get_rx_measurements(self.dut, 'LTE')
- nr_rx_meas = cputils.get_rx_measurements(self.dut, 'NR5G')
+ self.log.info('Cell powers: {}'.format(cell_power_array))
+ self.start_single_throughput_measurement(testcase_params)
+ if self.power_monitor:
+ measurement_wait = LONG_SLEEP if (power_idx == 0) else 0
+ current_average_current = self.collect_power_data(
+ self.testclass_params['traffic_duration'],
+ measurement_wait,
+ reconnect_usb=0,
+ measurement_tag=power_idx)
+ current_throughput = self.stop_single_throughput_measurement(
+ testcase_params)
+ lte_rx_meas = self.dut_utils.get_rx_measurements('LTE')
+ nr_rx_meas = self.dut_utils.get_rx_measurements('NR5G')
result['throughput_measurements'] = current_throughput
- result['lte_rx_measurements'] = lte_rx_meas
- result['nr_rx_measurements'] = nr_rx_meas
-
self.print_throughput_result(current_throughput)
- self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas))
- self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas))
+
+ if self.testclass_params.get('log_rsrp_metrics', 1):
+ result['lte_rx_measurements'] = lte_rx_meas
+ result['nr_rx_measurements'] = nr_rx_meas
+ self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas))
+ self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas))
testcase_results['results'].append(result)
if (('lte_bler_result' in result['throughput_measurements']
- and result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio'] *
- 100 > 99) or
- ('nr_bler_result' in result['throughput_measurements']
- and result['throughput_measurements']['nr_bler_result']['total']['DL']['nack_ratio'] *
- 100 > 99)):
+ and result['throughput_measurements']['lte_bler_result']
+ ['total']['DL']['nack_ratio'] * 100 > 99)
+ or ('nr_bler_result' in result['throughput_measurements']
+ and result['throughput_measurements']['nr_bler_result']
+ ['total']['DL']['nack_ratio'] * 100 > 99)):
stop_counter = stop_counter + 1
else:
stop_counter = 0
@@ -560,3 +758,10 @@
# Save results
self.testclass_results[self.current_test_name] = testcase_results
+
+ def test_measure_power(self):
+ self.log.info('Turing screen off')
+ self.dut_utils.set_screen_state(0)
+ time.sleep(10)
+ self.log.info('Measuring power now.')
+ self.collect_power_data(60, 0)
diff --git a/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py b/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py
index 0132693..89819f9 100644
--- a/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py
+++ b/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py
@@ -20,6 +20,7 @@
import re
import time
from queue import Empty
+from acts.controllers.adb_lib.error import AdbError
from acts.controllers.android_lib.tel import tel_utils
PCC_PRESET_MAPPING = {
@@ -68,305 +69,37 @@
},
}
+SHORT_SLEEP = 1
LONG_SLEEP = 10
+POWER_STATS_DUMPSYS_CMD = 'dumpsys android.hardware.power.stats.IPowerStats/default delta'
+
+
+class ObjNew(object):
+ """Create a random obj with unknown attributes and value.
+
+ """
+
+ def __init__(self, **kwargs):
+ self.__dict__.update(kwargs)
+
+ def __contains__(self, item):
+ """Function to check if one attribute is contained in the object.
+
+ Args:
+ item: the item to check
+ Return:
+ True/False
+ """
+ return hasattr(self, item)
+
+
def extract_test_id(testcase_params, id_fields):
test_id = collections.OrderedDict(
(param, testcase_params[param]) for param in id_fields)
return test_id
-def start_pixel_logger(ad):
- """Function to start pixel logger with default log mask.
-
- Args:
- ad: android device on which to start logger
- """
-
- try:
- ad.adb.shell(
- 'rm -R /storage/emulated/0/Android/data/com.android.pixellogger/files/logs/logs/'
- )
- except:
- pass
- ad.adb.shell(
- 'am startservice -a com.android.pixellogger.service.logging.LoggingService.ACTION_START_LOGGING'
- )
-
-
-def stop_pixel_logger(ad, log_path, tag=None):
- """Function to stop pixel logger and retrieve logs
-
- Args:
- ad: android device on which to start logger
- log_path: location of saved logs
- """
- ad.adb.shell(
- 'am startservice -a com.android.pixellogger.service.logging.LoggingService.ACTION_STOP_LOGGING'
- )
- logging.info('Waiting for Pixel log file')
- file_name = None
- file_size = 0
- previous_file_size = 0
- for idx in range(600):
- try:
- file = ad.adb.shell(
- 'ls -l /storage/emulated/0/Android/data/com.android.pixellogger/files/logs/logs/'
- ).split(' ')
- file_name = file[-1]
- file_size = file[-4]
- except:
- file_name = None
- file_size = 0
- if file_name and file_size == previous_file_size:
- logging.info('Log file found after {}s.'.format(idx))
- break
- else:
- previous_file_size = file_size
- time.sleep(1)
- try:
- local_file_name = '{}_{}'.format(file_name, tag) if tag else file_name
- local_path = os.path.join(log_path, local_file_name)
- ad.pull_files(
- '/storage/emulated/0/Android/data/com.android.pixellogger/files/logs/logs/{}'
- .format(file_name), log_path)
- return local_path
- except:
- logging.error('Could not pull pixel logs.')
-
-
-def log_system_power_metrics(ad, verbose=1):
- # Log temperature sensors
- if verbose:
- temp_sensors = ad.adb.shell(
- 'ls -1 /dev/thermal/tz-by-name/').splitlines()
- else:
- temp_sensors = ['BIG', 'battery', 'quiet_therm', 'usb_pwr_therm']
- temp_measurements = collections.OrderedDict()
- for sensor in temp_sensors:
- try:
- temp_measurements[sensor] = ad.adb.shell(
- 'cat /dev/thermal/tz-by-name/{}/temp'.format(sensor))
- except:
- temp_measurements[sensor] = float('nan')
- logging.debug('Temperature sensor readings: {}'.format(temp_measurements))
-
- # Log mitigation items
- if verbose:
- mitigation_points = [
- "batoilo",
- "ocp_cpu1",
- "ocp_cpu2",
- "ocp_gpu",
- "ocp_tpu",
- "smpl_warn",
- "soft_ocp_cpu1",
- "soft_ocp_cpu2",
- "soft_ocp_gpu",
- "soft_ocp_tpu",
- "vdroop1",
- "vdroop2",
- ]
- else:
- mitigation_points = [
- "batoilo",
- "smpl_warn",
- "vdroop1",
- "vdroop2",
- ]
-
- parameters_f = ['count', 'capacity', 'timestamp', 'voltage']
- parameters_v = ['count', 'cap', 'time', 'volt']
- mitigation_measurements = collections.OrderedDict()
- for mp in mitigation_points:
- mitigation_measurements[mp] = collections.OrderedDict()
- for par_f, par_v in zip(parameters_f, parameters_v):
- mitigation_measurements[mp][par_v] = ad.adb.shell(
- 'cat /sys/devices/virtual/pmic/mitigation/last_triggered_{}/{}_{}'
- .format(par_f, mp, par_v))
- logging.debug('Mitigation readings: {}'.format(mitigation_measurements))
-
- # Log power meter items
- power_meter_measurements = collections.OrderedDict()
- for device in ['device0', 'device1']:
- power_str = ad.adb.shell(
- 'cat /sys/bus/iio/devices/iio:{}/lpf_power'.format(
- device)).splitlines()
- power_meter_measurements[device] = collections.OrderedDict()
- for line in power_str:
- if line.startswith('CH'):
- try:
- line_split = line.split(', ')
- power_meter_measurements[device][line_split[0]] = int(
- line_split[1])
- except (IndexError, ValueError):
- continue
- elif line.startswith('t='):
- try:
- power_meter_measurements[device]['t_pmeter'] = int(
- line[2:])
- except (IndexError, ValueError):
- continue
- else:
- continue
- logging.debug(
- 'Power Meter readings: {}'.format(power_meter_measurements))
-
- # Log battery items
- if verbose:
- battery_parameters = [
- "act_impedance", "capacity", "charge_counter", "charge_full",
- "charge_full_design", "current_avg", "current_now",
- "cycle_count", "health", "offmode_charger", "present",
- "rc_switch_enable", "resistance", "status", "temp",
- "voltage_avg", "voltage_now", "voltage_ocv"
- ]
- else:
- battery_parameters = [
- "capacity", "current_avg", "current_now", "voltage_avg",
- "voltage_now", "voltage_ocv"
- ]
-
- battery_meaurements = collections.OrderedDict()
- for par in battery_parameters:
- battery_meaurements['bat_{}'.format(par)] = ad.adb.shell(
- 'cat /sys/class/power_supply/maxfg/{}'.format(par))
- logging.debug('Battery readings: {}'.format(battery_meaurements))
-
-
-def send_at_command(ad, at_command):
- at_cmd_output = ad.adb.shell('am instrument -w -e request {} -e response wait '
- '"com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'.format(at_command))
- return at_cmd_output
-
-def get_rx_measurements(ad, cell_type):
- cell_type_int = 7 if cell_type == 'LTE' else 8
- rx_meas = send_at_command(ad, 'AT+GOOGGETRXMEAS\={}?'.format(cell_type_int))
- rsrp_regex = r"RSRP\[\d+\]\s+(-?\d+)"
- rsrp_values = [float(x) for x in re.findall(rsrp_regex, rx_meas)]
- rsrq_regex = r"RSRQ\[\d+\]\s+(-?\d+)"
- rsrq_values = [float(x) for x in re.findall(rsrq_regex, rx_meas)]
- rssi_regex = r"RSSI\[\d+\]\s+(-?\d+)"
- rssi_values = [float(x) for x in re.findall(rssi_regex, rx_meas)]
- sinr_regex = r"SINR\[\d+\]\s+(-?\d+)"
- sinr_values = [float(x) for x in re.findall(sinr_regex, rx_meas)]
- return {'rsrp': rsrp_values, 'rsrq': rsrq_values, 'rssi': rssi_values, 'sinr': sinr_values}
-
-def toggle_airplane_mode(log, ad, new_state=None, strict_checking=True, try_index=0):
- """ Toggle the state of airplane mode.
-
- Args:
- log: log handler.
- ad: android_device object.
- new_state: Airplane mode state to set to.
- If None, opposite of the current state.
- strict_checking: Whether to turn on strict checking that checks all features.
- try_index: index of apm toggle
-
- Returns:
- result: True if operation succeed. False if error happens.
- """
- if try_index % 2 == 0:
- log.info('Toggling airplane mode {} by adb.'.format(new_state))
- return tel_utils.toggle_airplane_mode_by_adb(log, ad, new_state)
- else:
- log.info('Toggling airplane mode {} by msim.'.format(new_state))
- return toggle_airplane_mode_msim(
- log, ad, new_state, strict_checking=strict_checking)
-
-def toggle_airplane_mode_msim(log, ad, new_state=None, strict_checking=True):
- """ Toggle the state of airplane mode.
-
- Args:
- log: log handler.
- ad: android_device object.
- new_state: Airplane mode state to set to.
- If None, opposite of the current state.
- strict_checking: Whether to turn on strict checking that checks all features.
-
- Returns:
- result: True if operation succeed. False if error happens.
- """
-
- cur_state = ad.droid.connectivityCheckAirplaneMode()
- if cur_state == new_state:
- ad.log.info("Airplane mode already in %s", new_state)
- return True
- elif new_state is None:
- new_state = not cur_state
- ad.log.info("Toggle APM mode, from current tate %s to %s", cur_state,
- new_state)
- sub_id_list = []
- active_sub_info = ad.droid.subscriptionGetAllSubInfoList()
- if active_sub_info:
- for info in active_sub_info:
- sub_id_list.append(info['subscriptionId'])
-
- ad.ed.clear_all_events()
- time.sleep(0.1)
- service_state_list = []
- if new_state:
- service_state_list.append(tel_utils.SERVICE_STATE_POWER_OFF)
- ad.log.info("Turn on airplane mode")
-
- else:
- # If either one of these 3 events show up, it should be OK.
- # Normal SIM, phone in service
- service_state_list.append(tel_utils.SERVICE_STATE_IN_SERVICE)
- # NO SIM, or Dead SIM, or no Roaming coverage.
- service_state_list.append(tel_utils.SERVICE_STATE_OUT_OF_SERVICE)
- service_state_list.append(tel_utils.SERVICE_STATE_EMERGENCY_ONLY)
- ad.log.info("Turn off airplane mode")
-
- for sub_id in sub_id_list:
- ad.droid.telephonyStartTrackingServiceStateChangeForSubscription(
- sub_id)
-
- timeout_time = time.time() + LONG_SLEEP
- ad.droid.connectivityToggleAirplaneMode(new_state)
-
- try:
- try:
- event = ad.ed.wait_for_event(
- tel_utils.EVENT_SERVICE_STATE_CHANGED,
- tel_utils.is_event_match_for_list,
- timeout= LONG_SLEEP,
- field=tel_utils.ServiceStateContainer.SERVICE_STATE,
- value_list=service_state_list)
- ad.log.info("Got event %s", event)
- except Empty:
- ad.log.warning("Did not get expected service state change to %s",
- service_state_list)
- finally:
- for sub_id in sub_id_list:
- ad.droid.telephonyStopTrackingServiceStateChangeForSubscription(
- sub_id)
- except Exception as e:
- ad.log.error(e)
-
- # APM on (new_state=True) will turn off bluetooth but may not turn it on
- try:
- if new_state and not tel_utils._wait_for_bluetooth_in_state(
- log, ad, False, timeout_time - time.time()):
- ad.log.error(
- "Failed waiting for bluetooth during airplane mode toggle")
- if strict_checking: return False
- except Exception as e:
- ad.log.error("Failed to check bluetooth state due to %s", e)
- if strict_checking:
- raise
-
- # APM on (new_state=True) will turn off wifi but may not turn it on
- if new_state and not tel_utils._wait_for_wifi_in_state(log, ad, False,
- timeout_time - time.time()):
- ad.log.error("Failed waiting for wifi during airplane mode toggle on")
- if strict_checking: return False
-
- if ad.droid.connectivityCheckAirplaneMode() != new_state:
- ad.log.error("Set airplane mode to %s failed", new_state)
- return False
- return True
-
def generate_endc_combo_config_from_string(endc_combo_str):
"""Function to generate ENDC combo config from combo string
@@ -376,7 +109,7 @@
endc_combo_config: dictionary with all ENDC combo settings
"""
endc_combo_config = collections.OrderedDict()
- endc_combo_config['endc_combo_name']=endc_combo_str
+ endc_combo_config['endc_combo_name'] = endc_combo_str
endc_combo_str = endc_combo_str.replace(' ', '')
endc_combo_list = endc_combo_str.split('+')
cell_config_list = list()
@@ -390,7 +123,8 @@
cell_config_regex = re.compile(
r'(?P<cell_type>[B,N])(?P<band>[0-9]+)(?P<bandwidth_class>[A-Z])\[bw=(?P<dl_bandwidth>[0-9]+)\]'
- r'(\[ch=)?(?P<channel>[0-9]+)?\]?\[ant=(?P<dl_mimo_config>[0-9]+),?(?P<transmission_mode>[TM0-9]+)?\];?'
+ r'(\[ch=)?(?P<channel>[0-9]+)?\]?'
+ r'\[ant=(?P<dl_mimo_config>[0-9]+),?(?P<transmission_mode>[TM0-9]+)?,?(?P<num_layers>[TM0-9]+)?,?(?P<num_codewords>[TM0-9]+)?\];?'
r'(?P<ul_bandwidth_class>[A-Z])?(\[ant=)?(?P<ul_mimo_config>[0-9])?(\])?'
)
for cell_string in endc_combo_list:
@@ -408,10 +142,10 @@
lte_scc_list.append(cell_config['cell_number'])
cell_config['duplex_mode'] = 'FDD' if int(
cell_config['band']
- ) in DUPLEX_MODE_TO_BAND_MAPPING['LTE'][
- 'FDD'] else 'TDD'
+ ) in DUPLEX_MODE_TO_BAND_MAPPING['LTE']['FDD'] else 'TDD'
cell_config['dl_mimo_config'] = 'D{nss}U{nss}'.format(
nss=cell_config['dl_mimo_config'])
+ cell_config['dl_subframe_allocation'] = [1] * 10
lte_dl_carriers.append(cell_config['cell_number'])
else:
# Configure NR specific parameters
@@ -423,16 +157,16 @@
cell_config['nr_cell_type'] = 'NSA'
cell_config['band'] = 'N' + cell_config['band']
cell_config['duplex_mode'] = 'FDD' if cell_config[
- 'band'] in DUPLEX_MODE_TO_BAND_MAPPING['NR5G'][
- 'FDD'] else 'TDD'
+ 'band'] in DUPLEX_MODE_TO_BAND_MAPPING['NR5G']['FDD'] else 'TDD'
cell_config['subcarrier_spacing'] = 'MU0' if cell_config[
'duplex_mode'] == 'FDD' else 'MU1'
cell_config['dl_mimo_config'] = 'N{nss}X{nss}'.format(
nss=cell_config['dl_mimo_config'])
cell_config['dl_bandwidth_class'] = cell_config['bandwidth_class']
- cell_config['dl_bandwidth'] = 'BW'+ cell_config['dl_bandwidth']
- cell_config['ul_enabled'] = 1 if cell_config['ul_bandwidth_class'] else 0
+ cell_config['dl_bandwidth'] = 'BW' + cell_config['dl_bandwidth']
+ cell_config[
+ 'ul_enabled'] = 1 if cell_config['ul_bandwidth_class'] else 0
if cell_config['ul_enabled']:
cell_config['ul_mimo_config'] = 'N{nss}X{nss}'.format(
nss=cell_config['ul_mimo_config'])
@@ -451,6 +185,7 @@
endc_combo_config['lte_ul_carriers'] = lte_ul_carriers
return endc_combo_config
+
def generate_endc_combo_config_from_csv_row(test_config):
"""Function to generate ENDC combo config from CSV test config
@@ -469,7 +204,7 @@
lte_ul_carriers = []
cell_config_list = []
- if test_config['lte_band']:
+ if 'lte_band' in test_config and test_config['lte_band']:
lte_cell = {
'cell_type':
'LTE',
@@ -488,21 +223,23 @@
'dl_mimo_config':
'D{nss}U{nss}'.format(nss=test_config['lte_dl_mimo_config']),
'ul_mimo_config':
- 'D{nss}U{nss}'.format(nss=test_config['lte_ul_mimo_config'])
+ 'D{nss}U{nss}'.format(nss=test_config['lte_ul_mimo_config']),
+ 'transmission_mode':
+ test_config['lte_tm_mode'],
+ 'num_codewords':
+ test_config['lte_codewords'],
+ 'num_layers':
+ test_config['lte_layers'],
+ 'dl_subframe_allocation':
+ test_config.get('dl_subframe_allocation', [1] * 10)
}
- if int(test_config['lte_dl_mimo_config']) == 1:
- lte_cell['transmission_mode'] = 'TM1'
- elif int(test_config['lte_dl_mimo_config']) == 2:
- lte_cell['transmission_mode'] = 'TM2'
- else:
- lte_cell['transmission_mode'] = 'TM3'
cell_config_list.append(lte_cell)
endc_combo_config['lte_pcc'] = 1
lte_cell_count = 1
lte_dl_carriers = [1]
lte_ul_carriers = [1]
- if test_config['nr_band']:
+ if 'nr_band' in test_config and test_config['nr_band']:
nr_cell = {
'cell_type':
'NR5G',
@@ -510,7 +247,8 @@
1,
'band':
test_config['nr_band'],
- 'nr_cell_type': test_config['nr_cell_type'],
+ 'nr_cell_type':
+ test_config['nr_cell_type'],
'duplex_mode':
test_config['nr_duplex_mode'],
'dl_mimo_config':
@@ -542,3 +280,438 @@
endc_combo_config['lte_dl_carriers'] = lte_dl_carriers
endc_combo_config['lte_ul_carriers'] = lte_ul_carriers
return endc_combo_config
+
+
+class DeviceUtils():
+
+ def __new__(self, dut, log):
+ if hasattr(dut,
+ 'device_type') and dut.device_type == 'android_non_pixel':
+ return AndroidNonPixelDeviceUtils(dut, log)
+ else:
+ return PixelDeviceUtils(dut, log)
+
+
+class PixelDeviceUtils():
+
+ def __init__(self, dut, log):
+ self.dut = dut
+ self.log = log
+
+ def stop_services(self):
+ """Gracefully stop sl4a before power measurement"""
+ self.dut.stop_services()
+
+ def start_services(self):
+ self.dut.start_services()
+
+ def start_pixel_logger(self):
+ """Function to start pixel logger with default log mask.
+
+ Args:
+ ad: android device on which to start logger
+ """
+
+ try:
+ self.dut.adb.shell(
+ 'rm -R /storage/emulated/0/Android/data/com.android.pixellogger/files/logs/logs/'
+ )
+ except:
+ pass
+ self.dut.adb.shell(
+ 'am startservice -a com.android.pixellogger.service.logging.LoggingService.ACTION_START_LOGGING'
+ )
+
+ def stop_pixel_logger(self, log_path, tag=None):
+ """Function to stop pixel logger and retrieve logs
+
+ Args:
+ ad: android device on which to start logger
+ log_path: location of saved logs
+ """
+ self.dut.adb.shell(
+ 'am startservice -a com.android.pixellogger.service.logging.LoggingService.ACTION_STOP_LOGGING'
+ )
+ logging.info('Waiting for Pixel log file')
+ file_name = None
+ file_size = 0
+ previous_file_size = 0
+ for idx in range(600):
+ try:
+ file = self.dut.adb.shell(
+ 'ls -l /storage/emulated/0/Android/data/com.android.pixellogger/files/logs/logs/'
+ ).split(' ')
+ file_name = file[-1]
+ file_size = file[-4]
+ except:
+ file_name = None
+ file_size = 0
+ if file_name and file_size == previous_file_size:
+ logging.info('Log file found after {}s.'.format(idx))
+ break
+ else:
+ previous_file_size = file_size
+ time.sleep(1)
+ try:
+ local_file_name = '{}_{}'.format(file_name,
+ tag) if tag else file_name
+ local_path = os.path.join(log_path, local_file_name)
+ self.dut.pull_files(
+ '/storage/emulated/0/Android/data/com.android.pixellogger/files/logs/logs/{}'
+ .format(file_name), log_path)
+ return local_path
+ except:
+ logging.error('Could not pull pixel logs.')
+
+ def log_system_power_metrics(self, verbose=1):
+ # Log temperature sensors
+ if verbose:
+ temp_sensors = self.dut.adb.shell(
+ 'ls -1 /dev/thermal/tz-by-name/').splitlines()
+ else:
+ temp_sensors = ['BIG', 'battery', 'quiet_therm', 'usb_pwr_therm']
+ temp_measurements = collections.OrderedDict()
+ for sensor in temp_sensors:
+ try:
+ temp_measurements[sensor] = self.dut.adb.shell(
+ 'cat /dev/thermal/tz-by-name/{}/temp'.format(sensor))
+ except:
+ temp_measurements[sensor] = float('nan')
+ logging.debug(
+ 'Temperature sensor readings: {}'.format(temp_measurements))
+
+ # Log mitigation items
+ if verbose:
+ mitigation_points = [
+ "batoilo",
+ "ocp_cpu1",
+ "ocp_cpu2",
+ "ocp_gpu",
+ "ocp_tpu",
+ "smpl_warn",
+ "soft_ocp_cpu1",
+ "soft_ocp_cpu2",
+ "soft_ocp_gpu",
+ "soft_ocp_tpu",
+ "vdroop1",
+ "vdroop2",
+ ]
+ else:
+ mitigation_points = [
+ "batoilo",
+ "smpl_warn",
+ "vdroop1",
+ "vdroop2",
+ ]
+
+ parameters_f = ['count', 'capacity', 'timestamp', 'voltage']
+ parameters_v = ['count', 'cap', 'time', 'volt']
+ mitigation_measurements = collections.OrderedDict()
+ for mp in mitigation_points:
+ mitigation_measurements[mp] = collections.OrderedDict()
+ for par_f, par_v in zip(parameters_f, parameters_v):
+ mitigation_measurements[mp][par_v] = self.dut.adb.shell(
+ 'cat /sys/devices/virtual/pmic/mitigation/last_triggered_{}/{}_{}'
+ .format(par_f, mp, par_v))
+ logging.debug(
+ 'Mitigation readings: {}'.format(mitigation_measurements))
+
+ # Log power meter items
+ power_meter_measurements = collections.OrderedDict()
+ for device in ['device0', 'device1']:
+ power_str = self.dut.adb.shell(
+ 'cat /sys/bus/iio/devices/iio:{}/lpf_power'.format(
+ device)).splitlines()
+ power_meter_measurements[device] = collections.OrderedDict()
+ for line in power_str:
+ if line.startswith('CH'):
+ try:
+ line_split = line.split(', ')
+ power_meter_measurements[device][line_split[0]] = int(
+ line_split[1])
+ except (IndexError, ValueError):
+ continue
+ elif line.startswith('t='):
+ try:
+ power_meter_measurements[device]['t_pmeter'] = int(
+ line[2:])
+ except (IndexError, ValueError):
+ continue
+ else:
+ continue
+ logging.debug(
+ 'Power Meter readings: {}'.format(power_meter_measurements))
+
+ # Log battery items
+ if verbose:
+ battery_parameters = [
+ "act_impedance", "capacity", "charge_counter",
+ "charge_full", "charge_full_design", "current_avg",
+ "current_now", "cycle_count", "health", "offmode_charger",
+ "present", "rc_switch_enable", "resistance", "status",
+ "temp", "voltage_avg", "voltage_now", "voltage_ocv"
+ ]
+ else:
+ battery_parameters = [
+ "capacity", "current_avg", "current_now", "voltage_avg",
+ "voltage_now", "voltage_ocv"
+ ]
+
+ battery_meaurements = collections.OrderedDict()
+ for par in battery_parameters:
+ battery_meaurements['bat_{}'.format(par)] = self.dut.adb.shell(
+ 'cat /sys/class/power_supply/maxfg/{}'.format(par))
+ logging.debug('Battery readings: {}'.format(battery_meaurements))
+
+ def log_odpm(self, file_path):
+ """Dumpsys ODPM data and save it."""
+ try:
+ stats = self.dut.adb.shell(POWER_STATS_DUMPSYS_CMD)
+ with open(file_path, 'w') as f:
+ f.write(stats)
+ except AdbError as e:
+ self.log.warning('Error dumping and saving odpm')
+
+ def send_at_command(self, at_command):
+ at_cmd_output = self.dut.adb.shell(
+ 'am instrument -w -e request {} -e response wait '
+ '"com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ .format(at_command))
+ return at_cmd_output
+
+ def get_rx_measurements(self, cell_type):
+ cell_type_int = 7 if cell_type == 'LTE' else 8
+ try:
+ rx_meas = self.send_at_command(
+ 'AT+GOOGGETRXMEAS\={}?'.format(cell_type_int))
+ except:
+ rx_meas = ''
+ rsrp_regex = r"RSRP\[\d+\]\s+(-?\d+)"
+ rsrp_values = [float(x) for x in re.findall(rsrp_regex, rx_meas)]
+ rsrq_regex = r"RSRQ\[\d+\]\s+(-?\d+)"
+ rsrq_values = [float(x) for x in re.findall(rsrq_regex, rx_meas)]
+ rssi_regex = r"RSSI\[\d+\]\s+(-?\d+)"
+ rssi_values = [float(x) for x in re.findall(rssi_regex, rx_meas)]
+ sinr_regex = r"SINR\[\d+\]\s+(-?\d+)"
+ sinr_values = [float(x) for x in re.findall(sinr_regex, rx_meas)]
+ return {
+ 'rsrp': rsrp_values,
+ 'rsrq': rsrq_values,
+ 'rssi': rssi_values,
+ 'sinr': sinr_values
+ }
+
+ def get_fr2_tx_power(self):
+ try:
+ tx_power = self.send_at_command('AT+MMPDREAD=0,2,0')
+ except:
+ tx_power = ''
+ logging.info(tx_power)
+
+ def toggle_airplane_mode(self,
+ new_state=None,
+ strict_checking=True,
+ try_index=0):
+ """ Toggle the state of airplane mode.
+
+ Args:
+ log: log handler.
+ ad: android_device object.
+ new_state: Airplane mode state to set to.
+ If None, opposite of the current state.
+ strict_checking: Whether to turn on strict checking that checks all features.
+ try_index: index of apm toggle
+
+ Returns:
+ result: True if operation succeed. False if error happens.
+ """
+ if hasattr(
+ self.dut, 'toggle_airplane_mode'
+ ) and 'at_command' in self.dut.toggle_airplane_mode['method']:
+ cfun_setting = 0 if new_state else 1
+ self.log.info(
+ 'Toggling airplane mode {} by AT command.'.format(new_state))
+ self.send_at_command('AT+CFUN={}'.format(cfun_setting))
+ elif self.dut.skip_sl4a or try_index % 2 == 0:
+ self.log.info(
+ 'Toggling airplane mode {} by adb.'.format(new_state))
+ return tel_utils.toggle_airplane_mode_by_adb(
+ self.log, self.dut, new_state)
+ else:
+ self.log.info(
+ 'Toggling airplane mode {} by msim.'.format(new_state))
+ return self.toggle_airplane_mode_msim(
+ new_state, strict_checking=strict_checking)
+
+ def toggle_airplane_mode_msim(self, new_state=None, strict_checking=True):
+ """ Toggle the state of airplane mode.
+
+ Args:
+ log: log handler.
+ ad: android_device object.
+ new_state: Airplane mode state to set to.
+ If None, opposite of the current state.
+ strict_checking: Whether to turn on strict checking that checks all features.
+
+ Returns:
+ result: True if operation succeed. False if error happens.
+ """
+
+ cur_state = self.dut.droid.connectivityCheckAirplaneMode()
+ if cur_state == new_state:
+ self.dut.log.info("Airplane mode already in %s", new_state)
+ return True
+ elif new_state is None:
+ new_state = not cur_state
+ self.dut.log.info("Toggle APM mode, from current tate %s to %s",
+ cur_state, new_state)
+ sub_id_list = []
+ active_sub_info = self.dut.droid.subscriptionGetAllSubInfoList()
+ if active_sub_info:
+ for info in active_sub_info:
+ sub_id_list.append(info['subscriptionId'])
+
+ self.dut.ed.clear_all_events()
+ time.sleep(0.1)
+ service_state_list = []
+ if new_state:
+ service_state_list.append(tel_utils.SERVICE_STATE_POWER_OFF)
+ self.dut.log.info("Turn on airplane mode")
+
+ else:
+ # If either one of these 3 events show up, it should be OK.
+ # Normal SIM, phone in service
+ service_state_list.append(tel_utils.SERVICE_STATE_IN_SERVICE)
+ # NO SIM, or Dead SIM, or no Roaming coverage.
+ service_state_list.append(tel_utils.SERVICE_STATE_OUT_OF_SERVICE)
+ service_state_list.append(tel_utils.SERVICE_STATE_EMERGENCY_ONLY)
+ self.dut.log.info("Turn off airplane mode")
+
+ for sub_id in sub_id_list:
+ self.dut.droid.telephonyStartTrackingServiceStateChangeForSubscription(
+ sub_id)
+
+ timeout_time = time.time() + LONG_SLEEP
+ self.dut.droid.connectivityToggleAirplaneMode(new_state)
+
+ try:
+ try:
+ event = self.dut.ed.wait_for_event(
+ tel_utils.EVENT_SERVICE_STATE_CHANGED,
+ tel_utils.is_event_match_for_list,
+ timeout=LONG_SLEEP,
+ field=tel_utils.ServiceStateContainer.SERVICE_STATE,
+ value_list=service_state_list)
+ self.dut.log.info("Got event %s", event)
+ except Empty:
+ self.dut.log.warning(
+ "Did not get expected service state change to %s",
+ service_state_list)
+ finally:
+ for sub_id in sub_id_list:
+ self.dut.droid.telephonyStopTrackingServiceStateChangeForSubscription(
+ sub_id)
+ except Exception as e:
+ self.dut.log.error(e)
+
+ # APM on (new_state=True) will turn off bluetooth but may not turn it on
+ try:
+ if new_state and not tel_utils._wait_for_bluetooth_in_state(
+ self.log, self.dut, False, timeout_time - time.time()):
+ self.dut.log.error(
+ "Failed waiting for bluetooth during airplane mode toggle")
+ if strict_checking: return False
+ except Exception as e:
+ self.dut.log.error("Failed to check bluetooth state due to %s", e)
+ if strict_checking:
+ raise
+
+ # APM on (new_state=True) will turn off wifi but may not turn it on
+ if new_state and not tel_utils._wait_for_wifi_in_state(
+ self.log, self.dut, False, timeout_time - time.time()):
+ self.dut.log.error(
+ "Failed waiting for wifi during airplane mode toggle on")
+ if strict_checking: return False
+
+ if self.dut.droid.connectivityCheckAirplaneMode() != new_state:
+ self.dut.log.error("Set airplane mode to %s failed", new_state)
+ return False
+ return True
+
+
+class AndroidNonPixelDeviceUtils():
+
+ def __init__(self, dut, log):
+ self.dut = dut
+ self.log = log
+ self.set_screen_timeout()
+
+ def start_services(self):
+ self.log.debug('stop_services not supported on non_pixel devices')
+
+ def stop_services(self):
+ self.log.debug('stop_services not supported on non_pixel devices')
+
+ def start_pixel_logger(self):
+ self.log.debug('start_pixel_logger not supported on non_pixel devices')
+
+ def stop_pixel_logger(self, log_path, tag=None):
+ self.log.debug('stop_pixel_logger not supported on non_pixel devices')
+
+ def log_system_power_metrics(self, verbose=1):
+ self.log.debug(
+ 'log_system_power_metrics not supported on non_pixel devices')
+
+ def log_odpm(self, file_path):
+ self.log.debug('log_odpm not supported on non_pixel devices')
+
+ def send_at_command(self, at_command):
+ self.log.debug('send_at_command not supported on non_pixel devices')
+
+ def get_rx_measurements(self, cell_type):
+ self.log.debug(
+ 'get_rx_measurements not supported on non_pixel devices')
+
+ def get_tx_measurements(self, cell_type):
+ self.log.debug(
+ 'get_tx_measurements not supported on non_pixel devices')
+
+ def toggle_airplane_mode(self,
+ new_state=None,
+ strict_checking=True,
+ try_index=0):
+ cur_state = bool(
+ int(self.dut.adb.shell("settings get global airplane_mode_on")))
+ if new_state == cur_state:
+ self.log.info(
+ 'Airplane mode already in {} state.'.format(cur_state))
+ else:
+ self.tap_airplane_mode()
+
+ def get_screen_state(self):
+ screen_state_output = self.dut.adb.shell(
+ "dumpsys display | grep 'mScreenState'")
+ if 'ON' in screen_state_output:
+ return 1
+ else:
+ return 0
+
+ def set_screen_state(self, state):
+ curr_state = self.get_screen_state()
+ if state == curr_state:
+ self.log.debug('Screen state already {}'.format(state))
+ elif state == True:
+ self.dut.adb.shell('input keyevent KEYCODE_WAKEUP')
+ elif state == False:
+ self.dut.adb.shell('input keyevent KEYCODE_SLEEP')
+
+ def set_screen_timeout(self, timeout=5):
+ self.dut.adb.shell('settings put system screen_off_timeout {}'.format(
+ timeout * 1000))
+
+ def tap_airplane_mode(self):
+ self.set_screen_state(1)
+ for command in self.dut.toggle_airplane_mode['screen_routine']:
+ self.dut.adb.shell(command)
+ time.sleep(SHORT_SLEEP)
+ self.set_screen_state(0)
diff --git a/acts_tests/acts_contrib/test_utils/wifi/ota_sniffer.py b/acts_tests/acts_contrib/test_utils/wifi/ota_sniffer.py
index 7cc52da..cc79e4d 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/ota_sniffer.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/ota_sniffer.py
@@ -28,6 +28,8 @@
WifiEnums = wutils.WifiEnums
SNIFFER_TIMEOUT = 6
+MEDIUM_SLEEP = 3
+SHORT_SLEEP = 1
def create(configs):
@@ -47,6 +49,8 @@
objs.append(TsharkSnifferOnUnix(config))
elif config['os'] == 'linux':
objs.append(TsharkSnifferOnLinux(config))
+ elif config['os'] == 'android':
+ objs.append(TsharkSnifferOnAndroid(config))
else:
raise RuntimeError('Wrong sniffer config')
@@ -503,7 +507,7 @@
self._sniffer_server.run('sudo modprobe iwlwifi debug=0x1')
# Wait for wifi config changes before trying to further configuration
# e.g. setting monitor mode (which will fail if above is not complete)
- time.sleep(1)
+ time.sleep(SHORT_SLEEP)
def start_capture(self, network, chan, bw, duration=60):
"""Starts sniffer capture on the specified machine.
@@ -608,3 +612,82 @@
self.log.debug('Setting monitor mode on Ch {}, bw {}'.format(chan, bw))
self.set_monitor_mode(chan, bw)
+
+
+class TsharkSnifferOnAndroid(TsharkSnifferBase):
+ """Class that implements Tshark based sniffer controller on Linux."""
+
+ def __init__(self, config):
+ super().__init__(config)
+ self._init_sniffer(config)
+ self.channel = None
+ self.bandwidth = None
+
+ def _init_sniffer(self, config):
+ """Function to configure interface for the first time"""
+ tshark_D_output = self._sniffer_server.run('tshark -D').stdout
+ self.sniffer_sn = config['serial']
+ time.sleep(MEDIUM_SLEEP)
+ if config['interface'] in tshark_D_output:
+ self.log.info("Target sniffer interface {} detected".format(
+ config['interface']))
+ else:
+ self.log.error('Target sniffer interface {} NOT detected'.format(
+ config['interface']))
+ # Wait for wifi config changes before trying to further configuration
+ # e.g. setting monitor mode (which will fail if above is not complete)
+ time.sleep(SHORT_SLEEP)
+
+ def start_capture(self, network, chan, bw, duration=60):
+ """Starts sniffer capture on the specified machine.
+
+ Args:
+ network: dict describing network to sniff on.
+ duration: duration of sniff.
+ """
+ # If sniffer doesnt support the channel, return
+ # Checking for existing sniffer processes
+ if self._started:
+ return
+
+ # Configure sniffer
+ self._configure_sniffer(network, chan, bw)
+ tshark_command = self._get_tshark_command(duration)
+ sniffer_command = self._get_sniffer_command(tshark_command)
+
+ # Starting sniffer capture by executing tshark command
+ self._run_tshark(sniffer_command)
+
+ def set_monitor_mode(self, chan, bw):
+ """Function to configure interface to monitor mode
+
+ Brings up the sniffer wireless interface in monitor mode and
+ tunes it to the appropriate channel and bandwidth
+
+ Args:
+ chan: primary channel (int) to tune the sniffer to
+ bw: bandwidth (int) to tune the sniffer to
+ """
+ if chan == self.channel and bw == self.bandwidth:
+ return
+
+ self.channel = chan
+ self.bandwidth = bw
+
+ self._sniffer_server.run('adb -s {} shell wl chanspec {}/{}'.format(
+ self.sniffer_sn, chan, bw))
+ time.sleep(SHORT_SLEEP)
+ sniffer_chanspec = self._sniffer_server.run(
+ 'adb -s {} shell wl chanspec'.format(self.sniffer_sn)).stdout
+ time.sleep(SHORT_SLEEP)
+ self.log.info("Sniffer channel: {}".format(sniffer_chanspec))
+
+ def _configure_sniffer(self, network, chan, bw):
+ """ Connects to a wireless network using networksetup utility.
+
+ Args:
+ network: dictionary of network credentials; SSID and password.
+ """
+
+ self.log.debug('Setting monitor mode on Ch {}, bw {}'.format(chan, bw))
+ self.set_monitor_mode(chan, bw)
\ No newline at end of file
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py
index 9817e4b..8f34fc5 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py
@@ -124,6 +124,51 @@
(field, full_dict[field]) for field in fields)
return sub_dict
+def write_antenna_tune_code(dut, tune_code):
+ flag_tune_code_forcing_on = 0
+ # Use AT Command file to enable tune code forcing
+ for n_enable in range(5):
+ logging.debug('{}-th Enabling modem test mode'.format(n_enable))
+ try:
+ at_lmodetest_output = dut.adb.shell("modem_cmd raw AT+LMODETEST")
+ time.sleep(SHORT_SLEEP)
+ logging.debug('Command AT+LMODETEST output: {}'.format(at_lmodetest_output))
+ except:
+ logging.error('{}-th Failed modem test mode AT+LMODETEST'.format(n_enable))
+ continue
+ try:
+ at_lrffinalstart_output = dut.adb.shell("modem_cmd raw AT+LRFFINALSTART")
+ time.sleep(SHORT_SLEEP)
+ logging.debug('Command AT+LRFFINALSTART output: {}'.format(at_lrffinalstart_output))
+ if "+LRFFINALSTART:0" in at_lrffinalstart_output and not "ERROR" in at_lrffinalstart_output:
+ flag_tune_code_forcing_on = 1
+ logging.info('Enable modem test mode SUCCESSFUL')
+ break
+ except:
+ logging.error('{}-th Failed modem test mode AT+LRFFINALSTART'.format(n_enable))
+ continue
+ if not flag_tune_code_forcing_on:
+ raise RuntimeError("AT Command File Not set up")
+ at_cmd_output = dut.adb.shell("modem_cmd raw " + tune_code['tune_code_cmd'])
+ logging.debug('Write Tune Code: {}'.format("modem_cmd raw " + tune_code['tune_code_cmd']))
+ flag_tc_reg_correct = True
+ # Check tune code register values
+ for tune_code_register_key in tune_code['tune_code_registers'].keys():
+ try:
+ at_tc_reg_output = dut.adb.shell("modem_cmd raw AT+MIPIREAD={}".format(tune_code_register_key))
+ time.sleep(SHORT_SLEEP)
+ except:
+ pass
+ if "+MIPIREAD:"+tune_code['tune_code_registers'][tune_code_register_key].lower() in at_tc_reg_output:
+ logging.info('Forced tune code register {} value matches expectation: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key]))
+ else:
+ logging.warning('Expected tune code register {} value: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key]))
+ logging.warning('tune code register value is set to {}'.format(at_tc_reg_output))
+ flag_tc_reg_correct = False
+ if flag_tc_reg_correct:
+ return True
+ else:
+ raise RuntimeError("Enable modem test mode SUCCESSFUL, but register values NOT correct")
# Miscellaneous Wifi Utilities
def check_skip_conditions(testcase_params, dut, access_point,
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py
index b9e6d28..ef703e9 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py
@@ -118,19 +118,19 @@
1: {
20: [
8.6, 17.2, 25.8, 34.4, 51.6, 68.8, 77.4, 86.0, 103.2, 114.7,
- 129.0, 143.4, 154.9, 172.1, 0, 0
+ 129.0, 143.4, 154.9, 172.1, 0, 4.3
],
40: [
17.2, 34.4, 51.6, 68.8, 103.2, 137.6, 154.9, 172.1, 206.5, 229.4,
- 258.1, 286.8, 309.7, 344.1, 0, 0
+ 258.1, 286.8, 309.7, 344.1, 0, 8.6
],
80: [
36.0, 72.1, 108.1, 144.1, 216.2, 288.2, 324.3, 360.3, 432.4,
- 480.4, 540.4, 600.4, 648.5, 720.6, 0, 0
+ 480.4, 540.4, 600.4, 648.5, 720.6, 8.6, 18.0
],
160: [
72.1, 144.1, 216.2, 288.2, 432.4, 576.5, 648.5, 720.6, 864.7,
- 960.7, 1080.9, 1201, 1297.1, 1441.2, 0, 0
+ 960.7, 1080.9, 1201, 1297.1, 1441.2, 18.0, 36.0
]
},
2: {
diff --git a/acts_tests/tests/google/cellular/performance/CellularFr2PeakThroughputTest.py b/acts_tests/tests/google/cellular/performance/CellularFr2PeakThroughputTest.py
index 6fbb4d9..548452b 100644
--- a/acts_tests/tests/google/cellular/performance/CellularFr2PeakThroughputTest.py
+++ b/acts_tests/tests/google/cellular/performance/CellularFr2PeakThroughputTest.py
@@ -76,61 +76,69 @@
'nr_cell_count']:
metric_map.update({
'nr_min_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']['min_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['min_tput'],
'nr_max_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']['max_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['max_tput'],
'nr_avg_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']
- ['average_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['average_tput'],
'nr_theoretical_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']
- ['theoretical_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['theoretical_tput'],
'nr_dl_bler':
- testcase_result['throughput_measurements']['nr_bler_result']['total']['DL']['nack_ratio']
- * 100,
+ testcase_result['throughput_measurements']['nr_bler_result']
+ ['total']['DL']['nack_ratio'] * 100,
'nr_min_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']['min_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['min_tput'],
'nr_max_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']['max_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['max_tput'],
'nr_avg_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']
- ['average_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['average_tput'],
'nr_theoretical_dl_tput':
- testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']
- ['theoretical_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['theoretical_tput'],
'nr_ul_bler':
- testcase_result['throughput_measurements']['nr_bler_result']['total']['UL']['nack_ratio']
- * 100
+ testcase_result['throughput_measurements']['nr_bler_result']
+ ['total']['UL']['nack_ratio'] * 100
})
if testcase_data['testcase_params']['endc_combo_config'][
'lte_cell_count']:
metric_map.update({
'lte_min_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']['min_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['DL']['min_tput'],
'lte_max_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']['max_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['DL']['max_tput'],
'lte_avg_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']
- ['average_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['DL']['average_tput'],
'lte_theoretical_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']
- ['theoretical_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['DL']['theoretical_tput'],
'lte_dl_bler':
- testcase_result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio']
- * 100,
+ testcase_result['throughput_measurements']['lte_bler_result']
+ ['total']['DL']['nack_ratio'] * 100,
'lte_min_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']['min_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['UL']['min_tput'],
'lte_max_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']['max_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['UL']['max_tput'],
'lte_avg_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']
- ['average_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['UL']['average_tput'],
'lte_theoretical_dl_tput':
- testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']
- ['theoretical_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']
+ ['total']['UL']['theoretical_tput'],
'lte_ul_bler':
- testcase_result['throughput_measurements']['lte_bler_result']['total']['UL']['nack_ratio']
- * 100
+ testcase_result['throughput_measurements']['lte_bler_result']
+ ['total']['UL']['nack_ratio'] * 100
})
if self.publish_testcase_metrics:
for metric_name, metric_value in metric_map.items():
@@ -170,69 +178,73 @@
'endc_combo_config']['nr_cell_count']:
row_dict.update({
'NR DL Min. Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['DL']
- ['min_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['min_tput'],
'NR DL Max. Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['DL']
- ['max_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['max_tput'],
'NR DL Avg. Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['DL']
- ['average_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['average_tput'],
'NR DL Theoretical Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['DL']
- ['theoretical_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['DL']['theoretical_tput'],
'NR UL Min. Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['UL']
- ['min_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['min_tput'],
'NR UL Max. Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['UL']
- ['max_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['max_tput'],
'NR UL Avg. Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['UL']
- ['average_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['average_tput'],
'NR UL Theoretical Throughput':
- result['throughput_measurements']['nr_tput_result']['total']['UL']
- ['theoretical_tput'],
+ result['throughput_measurements']['nr_tput_result']
+ ['total']['UL']['theoretical_tput'],
'NR DL BLER (%)':
- result['throughput_measurements']['nr_bler_result']['total']['DL']
- ['nack_ratio'] * 100,
+ result['throughput_measurements']['nr_bler_result']
+ ['total']['DL']['nack_ratio'] * 100,
'NR UL BLER (%)':
- result['throughput_measurements']['nr_bler_result']['total']['UL']
- ['nack_ratio'] * 100
+ result['throughput_measurements']['nr_bler_result']
+ ['total']['UL']['nack_ratio'] * 100
})
if testcase_results['testcase_params'][
'endc_combo_config']['lte_cell_count']:
row_dict.update({
'LTE DL Min. Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['DL']
- ['min_tput'],
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['DL']['min_tput'],
'LTE DL Max. Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['DL']
- ['max_tput'],
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['DL']['max_tput'],
'LTE DL Avg. Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['DL']
- ['average_tput'],
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['DL']['average_tput'],
'LTE DL Theoretical Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['DL']
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['DL']
['theoretical_tput'],
'LTE UL Min. Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['UL']
- ['min_tput'],
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['UL']['min_tput'],
'LTE UL Max. Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['UL']
- ['max_tput'],
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['UL']['max_tput'],
'LTE UL Avg. Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['UL']
- ['average_tput'],
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['UL']['average_tput'],
'LTE UL Theoretical Throughput':
- result['throughput_measurements']['lte_tput_result']['total']['UL']
+ result['throughput_measurements']
+ ['lte_tput_result']['total']['UL']
['theoretical_tput'],
'LTE DL BLER (%)':
- result['throughput_measurements']['lte_bler_result']['total']['DL']
- ['nack_ratio'] * 100,
+ result['throughput_measurements']
+ ['lte_bler_result']['total']['DL']['nack_ratio'] *
+ 100,
'LTE UL BLER (%)':
- result['throughput_measurements']['lte_bler_result']['total']['UL']
- ['nack_ratio'] * 100
+ result['throughput_measurements']
+ ['lte_bler_result']['total']['UL']['nack_ratio'] *
+ 100
})
writer.writerow(row_dict)
@@ -269,26 +281,19 @@
lte_scc_list = []
endc_combo_config['lte_pcc'] = 1
lte_cell = {
- 'cell_type':
- 'LTE',
- 'cell_number':
- 1,
- 'pcc':
- 1,
- 'band':
- test_config['lte_band'],
- 'dl_bandwidth':
- test_config['lte_bandwidth'],
- 'ul_enabled':
- 1,
- 'duplex_mode':
- test_config['lte_duplex_mode'],
- 'dl_mimo_config':
- 'D{nss}U{nss}'.format(nss=test_config['lte_dl_mimo_config']),
- 'ul_mimo_config':
- 'D{nss}U{nss}'.format(nss=test_config['lte_ul_mimo_config']),
- 'transmission_mode':
- 'TM1'
+ 'cell_type': 'LTE',
+ 'cell_number': 1,
+ 'pcc': 1,
+ 'band': self.testclass_params['lte_anchor_band'],
+ 'dl_bandwidth': self.testclass_params['lte_anchor_bandwidth'],
+ 'ul_enabled': 1,
+ 'duplex_mode': self.testclass_params['lte_anchor_duplex_mode'],
+ 'dl_mimo_config': 'D{nss}U{nss}'.format(nss=1),
+ 'ul_mimo_config': 'D{nss}U{nss}'.format(nss=1),
+ 'transmission_mode': 'TM1',
+ 'num_codewords': 1,
+ 'num_layers': 1,
+ 'dl_subframe_allocation': [1] * 10,
}
cell_config_list.append(lte_cell)
@@ -301,7 +306,8 @@
'NR5G',
'cell_number':
nr_cell_idx,
- 'nr_cell_type': 'NSA',
+ 'nr_cell_type':
+ 'NSA',
'band':
test_config['nr_band'],
'duplex_mode':
@@ -340,28 +346,24 @@
return endc_combo_config
def generate_test_cases(self, bands, channels, nr_mcs_pair_list,
- num_dl_cells_list, num_ul_cells_list, orientation_list,
- dl_mimo_config, ul_mimo_config, **kwargs):
+ num_dl_cells_list, num_ul_cells_list,
+ orientation_list, dl_mimo_config, ul_mimo_config,
+ **kwargs):
"""Function that auto-generates test cases for a test class."""
test_cases = []
for orientation, band, channel, num_ul_cells, num_dl_cells, nr_mcs_pair in itertools.product(
- orientation_list, bands, channels, num_ul_cells_list, num_dl_cells_list,
- nr_mcs_pair_list):
+ orientation_list, bands, channels, num_ul_cells_list,
+ num_dl_cells_list, nr_mcs_pair_list):
if num_ul_cells > num_dl_cells:
continue
if channel not in cputils.PCC_PRESET_MAPPING[band]:
continue
test_config = {
- 'lte_band': 2,
- 'lte_bandwidth': 'BW20',
- 'lte_duplex_mode': 'FDD',
- 'lte_dl_mimo_config': 1,
- 'lte_ul_mimo_config': 1,
'nr_band': band,
'nr_bandwidth': 'BW100',
'nr_duplex_mode': 'TDD',
'nr_cell_type': 'NSA',
- 'nr_channel': channel,
+ 'nr_channel': cputils.PCC_PRESET_MAPPING[band][channel],
'num_dl_cells': num_dl_cells,
'num_ul_cells': num_ul_cells,
'nr_dl_mimo_config': dl_mimo_config,
@@ -369,9 +371,9 @@
}
endc_combo_config = self.generate_endc_combo_config(test_config)
test_name = 'test_fr2_{}_{}_{}_DL_{}CC_mcs{}_{}x{}_UL_{}CC_mcs{}_{}x{}'.format(
- orientation, band, channel, num_dl_cells, nr_mcs_pair[0], dl_mimo_config,
- dl_mimo_config, num_ul_cells, nr_mcs_pair[1], ul_mimo_config,
- ul_mimo_config)
+ orientation, band, channel, num_dl_cells, nr_mcs_pair[0],
+ dl_mimo_config, dl_mimo_config, num_ul_cells, nr_mcs_pair[1],
+ ul_mimo_config, ul_mimo_config)
test_params = collections.OrderedDict(
endc_combo_config=endc_combo_config,
nr_dl_mcs=nr_mcs_pair[0],
@@ -397,7 +399,8 @@
self.testclass_params = self.user_params['fr2_throughput_test_params']
self.tests = self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
['low', 'mid', 'high'],
- [(16, 4), (27, 4)],
+ [(16, 4), (25, 4), (27, 4),
+ (28, 4)],
list(range(1, 9)),
list(range(1, 3)),
['A_Plane', 'B_Plane'],
@@ -409,7 +412,7 @@
traffic_direction='DL',
transform_precoding=0,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64')
@@ -418,27 +421,30 @@
def __init__(self, controllers):
super().__init__(controllers)
- self.testclass_params = self.user_params['throughput_test_params']
+ self.testclass_params = self.user_params['fr2_throughput_test_params']
self.tests = self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [1], [1],
+ [(4, 16), (4, 25), (4, 27),
+ (4, 28)], [1], [1],
['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=True,
dl_mimo_config=2,
- ul_mimo_config=1,
+ ul_mimo_config=2,
schedule_scenario="FULL_TPUT",
schedule_slot_ratio=80,
traffic_direction='UL',
transform_precoding=0,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64')
+
self.tests.extend(
self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
- ['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [1], [1],
- ['A_Plane', 'B_Plane'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [2],
+ [2], ['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=True,
dl_mimo_config=2,
ul_mimo_config=2,
@@ -447,14 +453,15 @@
traffic_direction='UL',
transform_precoding=0,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64'))
self.tests.extend(
self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
- ['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [2], [2],
- ['A_Plane', 'B_Plane'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [4],
+ [4], ['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=True,
dl_mimo_config=2,
ul_mimo_config=2,
@@ -463,23 +470,7 @@
traffic_direction='UL',
transform_precoding=0,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
- lte_ul_mcs=4,
- lte_ul_mcs_table='QAM64'))
- self.tests.extend(
- self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
- ['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [4], [4],
- ['A_Plane', 'B_Plane'],
- force_contiguous_nr_channel=True,
- dl_mimo_config=2,
- ul_mimo_config=2,
- schedule_scenario="FULL_TPUT",
- schedule_slot_ratio=80,
- traffic_direction='UL',
- transform_precoding=0,
- lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64'))
@@ -488,10 +479,11 @@
def __init__(self, controllers):
super().__init__(controllers)
- self.testclass_params = self.user_params['throughput_test_params']
+ self.testclass_params = self.user_params['fr2_throughput_test_params']
self.tests = self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [1], [1],
+ [(4, 16), (4, 25), (4, 27),
+ (4, 28)], [1], [1],
['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=True,
dl_mimo_config=2,
@@ -501,14 +493,16 @@
traffic_direction='UL',
transform_precoding=1,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64')
+
self.tests.extend(
self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
- ['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [1], [1],
- ['A_Plane', 'B_Plane'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [2],
+ [2], ['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=True,
dl_mimo_config=2,
ul_mimo_config=2,
@@ -517,14 +511,15 @@
traffic_direction='UL',
transform_precoding=1,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64'))
self.tests.extend(
self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
- ['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [2], [2],
- ['A_Plane', 'B_Plane'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [4],
+ [4], ['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=True,
dl_mimo_config=2,
ul_mimo_config=2,
@@ -533,23 +528,7 @@
traffic_direction='UL',
transform_precoding=1,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
- lte_ul_mcs=4,
- lte_ul_mcs_table='QAM64'))
- self.tests.extend(
- self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
- ['low', 'mid', 'high'],
- [(4, 16), (4, 27)], [4], [4],
- ['A_Plane', 'B_Plane'],
- force_contiguous_nr_channel=True,
- dl_mimo_config=2,
- ul_mimo_config=2,
- schedule_scenario="FULL_TPUT",
- schedule_slot_ratio=80,
- traffic_direction='UL',
- transform_precoding=1,
- lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64'))
@@ -565,12 +544,11 @@
def __init__(self, controllers):
super().__init__(controllers)
- self.testclass_params = self.user_params['throughput_test_params']
+ self.testclass_params = self.user_params['fr2_throughput_test_params']
self.tests = self.generate_test_cases(
['N257', 'N258', 'N260', 'N261'],
self.user_params['throughput_test_params']['frequency_sweep'],
- [(16, 4), (27, 4)],
- ['A_Plane', 'B_Plane'],
+ [(16, 4), (27, 4)], ['A_Plane', 'B_Plane'],
force_contiguous_nr_channel=False,
dl_mimo_config=2,
ul_mimo_config=1,
@@ -579,7 +557,7 @@
traffic_direction='DL',
transform_precoding=0,
lte_dl_mcs=4,
- lte_dl_mcs_table='QAM256',
+ lte_dl_mcs_table='QAM64',
lte_ul_mcs=4,
lte_ul_mcs_table='QAM64')
@@ -596,11 +574,6 @@
if channel not in cputils.PCC_PRESET_MAPPING[band]:
continue
test_config = {
- 'lte_band': 2,
- 'lte_bandwidth': 'BW20',
- 'lte_duplex_mode': 'FDD',
- 'lte_dl_mimo_config': 1,
- 'lte_ul_mimo_config': 1,
'nr_band': band,
'nr_bandwidth': 'BW100',
'nr_duplex_mode': 'TDD',
diff --git a/acts_tests/tests/google/cellular/performance/CellularFr2SensitivityTest.py b/acts_tests/tests/google/cellular/performance/CellularFr2SensitivityTest.py
index 8f92667..bf5fafb 100644
--- a/acts_tests/tests/google/cellular/performance/CellularFr2SensitivityTest.py
+++ b/acts_tests/tests/google/cellular/performance/CellularFr2SensitivityTest.py
@@ -46,7 +46,7 @@
self.tests = self.generate_test_cases(
band_list=['N257', 'N258', 'N260', 'N261'],
channel_list=['low', 'mid', 'high'],
- dl_mcs_list=list(numpy.arange(27, -1, -1)),
+ dl_mcs_list=list(numpy.arange(28, -1, -1)),
num_dl_cells_list=[1, 2, 4, 8],
orientation_list=['A_Plane', 'B_Plane'],
dl_mimo_config=2,
@@ -56,7 +56,7 @@
lte_ul_mcs_table='QAM256',
lte_ul_mcs=4,
schedule_scenario="FULL_TPUT",
- schedule_slot_ratio= 80,
+ schedule_slot_ratio=80,
force_contiguous_nr_channel=True,
transform_precoding=0)
@@ -128,15 +128,12 @@
figure_list.append(plot)
output_file_path = os.path.join(self.log_path, 'results.html')
BokehFigure.save_figures(figure_list, output_file_path)
-
"""Saves CSV with all test results to enable comparison."""
results_file_path = os.path.join(
context.get_current_context().get_full_output_path(),
'results.csv')
with open(results_file_path, 'w', newline='') as csvfile:
- field_names = [
- 'Test Name', 'Sensitivity'
- ]
+ field_names = ['Test Name', 'Sensitivity']
writer = csv.DictWriter(csvfile, fieldnames=field_names)
writer.writeheader()
@@ -238,6 +235,7 @@
[nr_cell_sweep] *
testcase_params['endc_combo_config']['nr_cell_count'])
return cell_power_sweeps
+
def generate_endc_combo_config(self, test_config):
"""Function to generate ENDC combo config from CSV test config
@@ -254,26 +252,19 @@
lte_scc_list = []
endc_combo_config['lte_pcc'] = 1
lte_cell = {
- 'cell_type':
- 'LTE',
- 'cell_number':
- 1,
- 'pcc':
- 1,
- 'band':
- test_config['lte_band'],
- 'dl_bandwidth':
- test_config['lte_bandwidth'],
- 'ul_enabled':
- 1,
- 'duplex_mode':
- test_config['lte_duplex_mode'],
- 'dl_mimo_config':
- 'D{nss}U{nss}'.format(nss=test_config['lte_dl_mimo_config']),
- 'ul_mimo_config':
- 'D{nss}U{nss}'.format(nss=test_config['lte_ul_mimo_config']),
- 'transmission_mode':
- 'TM1'
+ 'cell_type': 'LTE',
+ 'cell_number': 1,
+ 'pcc': 1,
+ 'band': self.testclass_params['lte_anchor_band'],
+ 'dl_bandwidth': self.testclass_params['lte_anchor_bandwidth'],
+ 'ul_enabled': 1,
+ 'duplex_mode': self.testclass_params['lte_anchor_duplex_mode'],
+ 'dl_mimo_config': 'D{nss}U{nss}'.format(nss=1),
+ 'ul_mimo_config': 'D{nss}U{nss}'.format(nss=1),
+ 'transmission_mode': 'TM1',
+ 'num_codewords': 1,
+ 'num_layers': 1,
+ 'dl_subframe_allocation': [1] * 10,
}
cell_config_list.append(lte_cell)
@@ -286,7 +277,8 @@
'NR5G',
'cell_number':
nr_cell_idx,
- 'nr_cell_type': 'NSA',
+ 'nr_cell_type':
+ 'NSA',
'band':
test_config['nr_band'],
'duplex_mode':
@@ -325,19 +317,16 @@
return endc_combo_config
def generate_test_cases(self, band_list, channel_list, dl_mcs_list,
- num_dl_cells_list, dl_mimo_config, orientation_list, **kwargs):
+ num_dl_cells_list, dl_mimo_config,
+ orientation_list, **kwargs):
"""Function that auto-generates test cases for a test class."""
test_cases = []
for orientation, band, channel, num_dl_cells, nr_dl_mcs in itertools.product(
- orientation_list, band_list, channel_list, num_dl_cells_list, dl_mcs_list):
+ orientation_list, band_list, channel_list, num_dl_cells_list,
+ dl_mcs_list):
if channel not in cputils.PCC_PRESET_MAPPING[band]:
continue
test_config = {
- 'lte_band': 2,
- 'lte_bandwidth': 'BW20',
- 'lte_duplex_mode': 'FDD',
- 'lte_dl_mimo_config': 1,
- 'lte_ul_mimo_config': 1,
'nr_band': band,
'nr_bandwidth': 'BW100',
'nr_duplex_mode': 'TDD',
diff --git a/acts_tests/tests/google/cellular/performance/CellularFr2UplinkPowerSweepTest.py b/acts_tests/tests/google/cellular/performance/CellularFr2UplinkPowerSweepTest.py
new file mode 100644
index 0000000..ce56a6c
--- /dev/null
+++ b/acts_tests/tests/google/cellular/performance/CellularFr2UplinkPowerSweepTest.py
@@ -0,0 +1,413 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2022 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import csv
+import itertools
+import json
+import re
+import numpy
+import os
+from acts import context
+from acts import base_test
+from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
+from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
+from acts_contrib.test_utils.cellular.performance.CellularThroughputBaseTest import CellularThroughputBaseTest
+from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
+
+from functools import partial
+
+LONG_SLEEP = 10
+MEDIUM_SLEEP = 2
+IPERF_TIMEOUT = 10
+SHORT_SLEEP = 1
+SUBFRAME_LENGTH = 0.001
+STOP_COUNTER_LIMIT = 3
+
+
+class CellularFr2UplinkPowerSweepTest(CellularThroughputBaseTest):
+ """Base class to test cellular FR2 throughput
+
+ This class implements cellular FR2 throughput tests on a callbox setup.
+ The class setups up the callbox in the desired configurations, configures
+ and connects the phone, and runs traffic/iperf throughput.
+ """
+
+ def __init__(self, controllers):
+ super().__init__(controllers)
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.testcase_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_case())
+ self.testclass_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_class())
+ self.publish_testcase_metrics = True
+
+ def process_testcase_results(self):
+ """Publish test case metrics and save results"""
+ if self.current_test_name not in self.testclass_results:
+ return
+ testcase_data = self.testclass_results[self.current_test_name]
+ results_file_path = os.path.join(
+ context.get_current_context().get_full_output_path(),
+ '{}.json'.format(self.current_test_name))
+ with open(results_file_path, 'w') as results_file:
+ json.dump(wputils.serialize_dict(testcase_data),
+ results_file,
+ indent=4)
+ testcase_result = testcase_data['results'][0]
+
+ def process_testclass_results(self):
+ pass
+
+ def get_per_cell_power_sweeps(self, testcase_params):
+ cell_power_sweeps = []
+ for cell in testcase_params['endc_combo_config']['cell_list']:
+ if cell['cell_type'] == 'LTE':
+ sweep = [self.testclass_params['lte_cell_power']]
+ else:
+ sweep = [self.testclass_params['nr_cell_power']]
+ cell_power_sweeps.append(sweep)
+ return cell_power_sweeps
+
+ def generate_endc_combo_config(self, test_config):
+ """Function to generate ENDC combo config from CSV test config
+
+ Args:
+ test_config: dict containing ENDC combo config from CSV
+ Returns:
+ endc_combo_config: dictionary with all ENDC combo settings
+ """
+ endc_combo_config = collections.OrderedDict()
+ cell_config_list = []
+
+ lte_cell_count = 1
+ lte_carriers = [1]
+ lte_scc_list = []
+ endc_combo_config['lte_pcc'] = 1
+ lte_cell = {
+ 'cell_type': 'LTE',
+ 'cell_number': 1,
+ 'pcc': 1,
+ 'band': self.testclass_params['lte_anchor_band'],
+ 'dl_bandwidth': self.testclass_params['lte_anchor_bandwidth'],
+ 'ul_enabled': 1,
+ 'duplex_mode': self.testclass_params['lte_anchor_duplex_mode'],
+ 'dl_mimo_config': 'D{nss}U{nss}'.format(nss=1),
+ 'ul_mimo_config': 'D{nss}U{nss}'.format(nss=1),
+ 'transmission_mode': 'TM1',
+ 'num_codewords': 1,
+ 'num_layers': 1,
+ 'dl_subframe_allocation': [1] * 10,
+ }
+ cell_config_list.append(lte_cell)
+
+ nr_cell_count = 0
+ nr_dl_carriers = []
+ nr_ul_carriers = []
+ for nr_cell_idx in range(1, test_config['num_dl_cells'] + 1):
+ nr_cell = {
+ 'cell_type':
+ 'NR5G',
+ 'cell_number':
+ nr_cell_idx,
+ 'nr_cell_type':
+ 'NSA',
+ 'band':
+ test_config['nr_band'],
+ 'duplex_mode':
+ test_config['nr_duplex_mode'],
+ 'channel':
+ test_config['nr_channel'],
+ 'dl_mimo_config':
+ 'N{nss}X{nss}'.format(nss=test_config['nr_dl_mimo_config']),
+ 'dl_bandwidth_class':
+ 'A',
+ 'dl_bandwidth':
+ test_config['nr_bandwidth'],
+ 'ul_enabled':
+ 1 if nr_cell_idx <= test_config['num_ul_cells'] else 0,
+ 'ul_bandwidth_class':
+ 'A',
+ 'ul_mimo_config':
+ 'N{nss}X{nss}'.format(nss=test_config['nr_ul_mimo_config']),
+ 'subcarrier_spacing':
+ 'MU3'
+ }
+ cell_config_list.append(nr_cell)
+ nr_cell_count = nr_cell_count + 1
+ nr_dl_carriers.append(nr_cell_idx)
+ if nr_cell_idx <= test_config['num_ul_cells']:
+ nr_ul_carriers.append(nr_cell_idx)
+
+ endc_combo_config['lte_cell_count'] = lte_cell_count
+ endc_combo_config['nr_cell_count'] = nr_cell_count
+ endc_combo_config['nr_dl_carriers'] = nr_dl_carriers
+ endc_combo_config['nr_ul_carriers'] = nr_ul_carriers
+ endc_combo_config['cell_list'] = cell_config_list
+ endc_combo_config['lte_scc_list'] = lte_scc_list
+ endc_combo_config['lte_dl_carriers'] = lte_carriers
+ endc_combo_config['lte_ul_carriers'] = lte_carriers
+ return endc_combo_config
+
+ def _test_throughput_bler_sweep_ul_power(self, testcase_params):
+ """Test function to run cellular throughput and BLER measurements.
+
+ The function runs BLER/throughput measurement after configuring the
+ callbox and DUT. The test supports running PHY or TCP/UDP layer traffic
+ in a variety of band/carrier/mcs/etc configurations.
+
+ Args:
+ testcase_params: dict containing test-specific parameters
+ Returns:
+ result: dict containing throughput results and metadata
+ """
+ # Prepare results dicts
+ testcase_params = self.compile_test_params(testcase_params)
+ testcase_params['nr_target_power_sweep'] = list(
+ numpy.arange(self.testclass_params['nr_target_power_start'],
+ self.testclass_params['nr_target_power_stop'],
+ self.testclass_params['nr_target_power_step']))
+
+ testcase_results = collections.OrderedDict()
+ testcase_results['testcase_params'] = testcase_params
+ testcase_results['results'] = []
+
+ # Setup ota chamber if needed
+ if hasattr(self,
+ 'keysight_chamber') and 'orientation' in testcase_params:
+ self.keysight_chamber.move_theta_phi_abs(
+ self.keysight_chamber.preset_orientations[
+ testcase_params['orientation']]['theta'],
+ self.keysight_chamber.preset_orientations[
+ testcase_params['orientation']]['phi'])
+
+ # Setup tester and wait for DUT to connect
+ self.setup_tester(testcase_params)
+
+ # Run throughput test loop
+ stop_counter = 0
+ if testcase_params['endc_combo_config']['nr_cell_count']:
+ self.keysight_test_app.select_display_tab('NR5G', 1, 'BTHR',
+ 'OTAGRAPH')
+ else:
+ self.keysight_test_app.select_display_tab('LTE', 1, 'BTHR',
+ 'OTAGRAPH')
+ for power_idx in range(len(testcase_params['nr_target_power_sweep'])):
+ result = collections.OrderedDict()
+ # Check that cells are still connected
+ connected = 1
+ for cell in testcase_params['endc_combo_config']['cell_list']:
+ if not self.keysight_test_app.wait_for_cell_status(
+ cell['cell_type'], cell['cell_number'],
+ ['ACT', 'CONN'], SHORT_SLEEP, SHORT_SLEEP):
+ connected = 0
+ if not connected:
+ self.log.info('DUT lost connection to cells. Ending test.')
+ break
+ # Set DL cell power
+ current_target_power = testcase_params['nr_target_power_sweep'][
+ power_idx]
+ self.log.info(
+ 'Setting target power to {}dBm'.format(current_target_power))
+ for cell_idx, cell in enumerate(
+ testcase_params['endc_combo_config']['cell_list']):
+ self.keysight_test_app.set_cell_ul_power_control(
+ cell['cell_type'], cell['cell_number'], 'TARget',
+ current_target_power)
+ # Start BLER and throughput measurements
+ current_throughput = self.run_single_throughput_measurement(
+ testcase_params)
+ result['throughput_measurements'] = current_throughput
+ result['nr_target_power'] = current_target_power
+ self.print_throughput_result(current_throughput)
+
+ tx_power = self.dut_utils.get_fr2_tx_power()
+
+ testcase_results['results'].append(result)
+ if (('lte_bler_result' in result['throughput_measurements']
+ and result['throughput_measurements']['lte_bler_result']
+ ['total']['DL']['nack_ratio'] * 100 > 99)
+ or ('nr_bler_result' in result['throughput_measurements']
+ and result['throughput_measurements']['nr_bler_result']
+ ['total']['DL']['nack_ratio'] * 100 > 99)):
+ stop_counter = stop_counter + 1
+ else:
+ stop_counter = 0
+ if stop_counter == STOP_COUNTER_LIMIT:
+ break
+
+ # Save results
+ self.testclass_results[self.current_test_name] = testcase_results
+
+ def generate_test_cases(self, bands, channels, nr_mcs_pair_list,
+ num_dl_cells_list, num_ul_cells_list,
+ orientation_list, dl_mimo_config, ul_mimo_config,
+ **kwargs):
+ """Function that auto-generates test cases for a test class."""
+ test_cases = []
+ for orientation, band, channel, num_ul_cells, num_dl_cells, nr_mcs_pair in itertools.product(
+ orientation_list, bands, channels, num_ul_cells_list,
+ num_dl_cells_list, nr_mcs_pair_list):
+ if num_ul_cells > num_dl_cells:
+ continue
+ if channel not in cputils.PCC_PRESET_MAPPING[band]:
+ continue
+ test_config = {
+ 'nr_band': band,
+ 'nr_bandwidth': 'BW100',
+ 'nr_duplex_mode': 'TDD',
+ 'nr_cell_type': 'NSA',
+ 'nr_channel': cputils.PCC_PRESET_MAPPING[band][channel],
+ 'num_dl_cells': num_dl_cells,
+ 'num_ul_cells': num_ul_cells,
+ 'nr_dl_mimo_config': dl_mimo_config,
+ 'nr_ul_mimo_config': ul_mimo_config
+ }
+ endc_combo_config = self.generate_endc_combo_config(test_config)
+ test_name = 'test_fr2_ul_power_sweep_{}_{}_{}_DL_{}CC_mcs{}_{}x{}_UL_{}CC_mcs{}_{}x{}'.format(
+ orientation, band, channel, num_dl_cells, nr_mcs_pair[0],
+ dl_mimo_config, dl_mimo_config, num_ul_cells, nr_mcs_pair[1],
+ ul_mimo_config, ul_mimo_config)
+ test_params = collections.OrderedDict(
+ endc_combo_config=endc_combo_config,
+ nr_dl_mcs=nr_mcs_pair[0],
+ nr_ul_mcs=nr_mcs_pair[1],
+ orientation=orientation,
+ **kwargs)
+ setattr(
+ self, test_name,
+ partial(self._test_throughput_bler_sweep_ul_power,
+ test_params))
+ test_cases.append(test_name)
+ return test_cases
+
+
+class CellularFr2CpOfdmUplinkPowerSweepTest(CellularFr2UplinkPowerSweepTest):
+
+ def __init__(self, controllers):
+ super().__init__(controllers)
+ self.testclass_params = self.user_params[
+ 'fr2_uplink_power_sweep_test_params']
+ self.tests = self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
+ ['low', 'mid', 'high'],
+ [(4, 16), (4, 25), (4, 27),
+ (4, 28)], [1], [1],
+ ['A_Plane', 'B_Plane'],
+ force_contiguous_nr_channel=True,
+ dl_mimo_config=2,
+ ul_mimo_config=2,
+ schedule_scenario="FULL_TPUT",
+ schedule_slot_ratio=80,
+ traffic_direction='UL',
+ transform_precoding=0,
+ lte_dl_mcs=4,
+ lte_dl_mcs_table='QAM64',
+ lte_ul_mcs=4,
+ lte_ul_mcs_table='QAM64')
+
+ self.tests.extend(
+ self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [2],
+ [2], ['A_Plane', 'B_Plane'],
+ force_contiguous_nr_channel=True,
+ dl_mimo_config=2,
+ ul_mimo_config=2,
+ schedule_scenario="FULL_TPUT",
+ schedule_slot_ratio=80,
+ traffic_direction='UL',
+ transform_precoding=0,
+ lte_dl_mcs=4,
+ lte_dl_mcs_table='QAM64',
+ lte_ul_mcs=4,
+ lte_ul_mcs_table='QAM64'))
+ self.tests.extend(
+ self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [4],
+ [4], ['A_Plane', 'B_Plane'],
+ force_contiguous_nr_channel=True,
+ dl_mimo_config=2,
+ ul_mimo_config=2,
+ schedule_scenario="FULL_TPUT",
+ schedule_slot_ratio=80,
+ traffic_direction='UL',
+ transform_precoding=0,
+ lte_dl_mcs=4,
+ lte_dl_mcs_table='QAM64',
+ lte_ul_mcs=4,
+ lte_ul_mcs_table='QAM64'))
+
+
+class CellularFr2DftsOfdmUplinkPowerSweepTest(CellularFr2UplinkPowerSweepTest):
+
+ def __init__(self, controllers):
+ super().__init__(controllers)
+ self.testclass_params = self.user_params[
+ 'fr2_uplink_power_sweep_test_params']
+ self.tests = self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
+ ['low', 'mid', 'high'],
+ [(4, 16), (4, 25), (4, 27),
+ (4, 28)], [1], [1],
+ ['A_Plane', 'B_Plane'],
+ force_contiguous_nr_channel=True,
+ dl_mimo_config=2,
+ ul_mimo_config=1,
+ schedule_scenario="FULL_TPUT",
+ schedule_slot_ratio=80,
+ traffic_direction='UL',
+ transform_precoding=1,
+ lte_dl_mcs=4,
+ lte_dl_mcs_table='QAM64',
+ lte_ul_mcs=4,
+ lte_ul_mcs_table='QAM64')
+
+ self.tests.extend(
+ self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [2],
+ [2], ['A_Plane', 'B_Plane'],
+ force_contiguous_nr_channel=True,
+ dl_mimo_config=2,
+ ul_mimo_config=2,
+ schedule_scenario="FULL_TPUT",
+ schedule_slot_ratio=80,
+ traffic_direction='UL',
+ transform_precoding=1,
+ lte_dl_mcs=4,
+ lte_dl_mcs_table='QAM64',
+ lte_ul_mcs=4,
+ lte_ul_mcs_table='QAM64'))
+ self.tests.extend(
+ self.generate_test_cases(['N257', 'N258', 'N260', 'N261'],
+ ['low', 'mid', 'high'], [(4, 16), (4, 25),
+ (4, 27),
+ (4, 28)], [4],
+ [4], ['A_Plane', 'B_Plane'],
+ force_contiguous_nr_channel=True,
+ dl_mimo_config=2,
+ ul_mimo_config=2,
+ schedule_scenario="FULL_TPUT",
+ schedule_slot_ratio=80,
+ traffic_direction='UL',
+ transform_precoding=1,
+ lte_dl_mcs=4,
+ lte_dl_mcs_table='QAM64',
+ lte_ul_mcs=4,
+ lte_ul_mcs_table='QAM64'))
diff --git a/acts_tests/tests/google/cellular/performance/CellularPageDecodeTest.py b/acts_tests/tests/google/cellular/performance/CellularPageDecodeTest.py
new file mode 100644
index 0000000..21a5015
--- /dev/null
+++ b/acts_tests/tests/google/cellular/performance/CellularPageDecodeTest.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2022 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import csv
+import numpy
+import json
+import re
+import os
+import time
+from acts import context
+from acts import base_test
+from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
+from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
+from acts_contrib.test_utils.cellular.performance.CellularThroughputBaseTest import CellularThroughputBaseTest
+from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
+from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
+from functools import partial
+
+VERY_SHORT_SLEEP = 0.1
+SHORT_SLEEP = 1
+MEDIUM_SLEEP = 5
+LONG_SLEEP = 10
+STOP_COUNTER_LIMIT = 3
+
+
+class CellularPageDecodeTest(CellularThroughputBaseTest):
+ """Class to test ENDC sensitivity"""
+
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.testcase_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_case())
+ self.testclass_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_class())
+ self.publish_testcase_metrics = True
+ self.testclass_params = self.user_params['page_decode_test_params']
+ self.tests = self.generate_test_cases()
+
+ def _test_page_decode(self, testcase_params):
+ """Test function to run cellular throughput and BLER measurements.
+
+ The function runs BLER/throughput measurement after configuring the
+ callbox and DUT. The test supports running PHY or TCP/UDP layer traffic
+ in a variety of band/carrier/mcs/etc configurations.
+
+ Args:
+ testcase_params: dict containing test-specific parameters
+ Returns:
+ result: dict containing throughput results and meta data
+ """
+ # Prepare results dicts
+ testcase_params = self.compile_test_params(testcase_params)
+ testcase_results = collections.OrderedDict()
+ testcase_results['testcase_params'] = testcase_params
+ testcase_results['results'] = []
+
+ # Setup ota chamber if needed
+ if hasattr(self,
+ 'keysight_chamber') and 'orientation' in testcase_params:
+ self.keysight_chamber.move_theta_phi_abs(
+ self.keysight_chamber.preset_orientations[
+ testcase_params['orientation']]['theta'],
+ self.keysight_chamber.preset_orientations[
+ testcase_params['orientation']]['phi'])
+
+ # Setup tester and wait for DUT to connect
+ self.setup_tester(testcase_params)
+ test_cell = testcase_params['endc_combo_config']['cell_list'][0]
+
+ # Release RRC connection
+ self.keysight_test_app.release_rrc_connection(test_cell['cell_type'],
+ test_cell['cell_number'])
+ # Set tester to ignore RACH
+ self.keysight_test_app.enable_rach(test_cell['cell_type'],
+ test_cell['cell_number'],
+ enabled=0)
+ self.keysight_test_app.enable_preamble_report(test_cell['cell_type'],
+ 1)
+ stop_counter = 0
+ for power_idx in range(len(testcase_params['cell_power_sweep'][0])):
+ result = collections.OrderedDict()
+ # Set DL cell power
+ for cell_idx, cell in enumerate(
+ testcase_params['endc_combo_config']['cell_list']):
+ cell_power_array = []
+ current_cell_power = testcase_params['cell_power_sweep'][
+ cell_idx][power_idx]
+ cell_power_array.append(current_cell_power)
+ self.keysight_test_app.set_cell_dl_power(
+ cell['cell_type'], cell['cell_number'], current_cell_power,
+ 1)
+ result['cell_power'] = cell_power_array
+ # Start BLER and throughput measurements
+ decode_counter = 0
+ for idx in range(self.testclass_params['num_measurements']):
+ # Page device
+ self.keysight_test_app.send_rrc_paging(
+ test_cell['cell_type'], test_cell['cell_number'])
+ time.sleep(MEDIUM_SLEEP)
+ # Fetch page result
+ preamble_report = self.keysight_test_app.fetch_preamble_report(
+ test_cell['cell_type'], test_cell['cell_number'])
+ self.log.info(preamble_report)
+ # If rach attempted, increment decode counter.
+ if preamble_report:
+ decode_counter = decode_counter + 1
+ lte_rx_meas = self.dut_utils.get_rx_measurements('LTE')
+ nr_rx_meas = self.dut_utils.get_rx_measurements('NR5G')
+ result[
+ 'decode_probability'] = decode_counter / self.testclass_params[
+ 'num_measurements']
+
+ if self.testclass_params.get('log_rsrp_metrics', 1):
+ result['lte_rx_measurements'] = lte_rx_meas
+ result['nr_rx_measurements'] = nr_rx_meas
+ self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas))
+ self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas))
+
+ testcase_results['results'].append(result)
+ if result['decode_probability'] == 0:
+ stop_counter = stop_counter + 1
+ else:
+ stop_counter = 0
+ if stop_counter == STOP_COUNTER_LIMIT:
+ break
+ self.keysight_test_app.enable_rach(test_cell['cell_type'],
+ test_cell['cell_number'],
+ enabled=1)
+
+ # Save results
+ self.testclass_results[self.current_test_name] = testcase_results
+
+ def get_per_cell_power_sweeps(self, testcase_params):
+ # get reference test
+ nr_cell_index = testcase_params['endc_combo_config']['lte_cell_count']
+ current_band = testcase_params['endc_combo_config']['cell_list'][
+ nr_cell_index]['band']
+ reference_test = None
+ reference_sensitivity = None
+ for testcase_name, testcase_data in self.testclass_results.items():
+ if testcase_data['testcase_params']['endc_combo_config'][
+ 'cell_list'][nr_cell_index]['band'] == current_band:
+ reference_test = testcase_name
+ reference_sensitivity = testcase_data['sensitivity']
+ if reference_test and reference_sensitivity and not self.retry_flag:
+ start_atten = reference_sensitivity + self.testclass_params[
+ 'adjacent_mcs_gap']
+ self.log.info(
+ "Reference test {} found. Sensitivity {} dBm. Starting at {} dBm"
+ .format(reference_test, reference_sensitivity, start_atten))
+ else:
+ start_atten = self.testclass_params['nr_cell_power_start']
+ self.log.info(
+ "Reference test not found. Starting at {} dBm".format(
+ start_atten))
+ # get current cell power start
+ nr_cell_sweep = list(
+ numpy.arange(start_atten,
+ self.testclass_params['nr_cell_power_stop'],
+ self.testclass_params['nr_cell_power_step']))
+ lte_sweep = [self.testclass_params['lte_cell_power']
+ ] * len(nr_cell_sweep)
+ if nr_cell_index == 0:
+ cell_power_sweeps = [nr_cell_sweep]
+ else:
+ cell_power_sweeps = [lte_sweep, nr_cell_sweep]
+ return cell_power_sweeps
+
+ def compile_test_params(self, testcase_params):
+ """Function that completes all test params based on the test name.
+
+ Args:
+ testcase_params: dict containing test-specific parameters
+ """
+ # Cell power sweep
+ # TODO: Make this a function to support single power and sweep modes for each cell
+ testcase_params['cell_power_sweep'] = self.get_per_cell_power_sweeps(
+ testcase_params)
+ return testcase_params
+
+ def generate_test_cases(self, **kwargs):
+ test_cases = []
+ with open(self.testclass_params['nr_single_cell_configs'],
+ 'r') as csvfile:
+ test_configs = csv.DictReader(csvfile)
+ for test_config in test_configs:
+ if int(test_config['skip_test']):
+ continue
+ endc_combo_config = cputils.generate_endc_combo_config_from_csv_row(
+ test_config)
+ test_name = 'test_fr1_{}'.format(test_config['nr_band'])
+ test_params = collections.OrderedDict(
+ endc_combo_config=endc_combo_config,
+ lte_dl_mcs_table='QAM256',
+ lte_dl_mcs=4,
+ lte_ul_mcs_table='QAM256',
+ lte_ul_mcs=4,
+ nr_dl_mcs=4,
+ nr_ul_mcs=4,
+ transform_precoding=0,
+ # schedule_scenario='FULL_TPUT',
+ # schedule_slot_ratio=80
+ **kwargs)
+ setattr(self, test_name,
+ partial(self._test_page_decode, test_params))
+ test_cases.append(test_name)
+ return test_cases
diff --git a/acts_tests/tests/google/cellular/performance/CellularRxPowerTest.py b/acts_tests/tests/google/cellular/performance/CellularRxPowerTest.py
index 9615c91..edd6d35 100644
--- a/acts_tests/tests/google/cellular/performance/CellularRxPowerTest.py
+++ b/acts_tests/tests/google/cellular/performance/CellularRxPowerTest.py
@@ -73,7 +73,7 @@
self.keysight_test_app.destroy()
def setup_test(self):
- cputils.start_pixel_logger(self.dut)
+ self.dut_utils.start_pixel_logger()
def on_retry(self):
"""Function to control test logic on retried tests.
@@ -102,7 +102,7 @@
self.testclass_results[self.current_test_name].setdefault(
'log_path', [])
self.testclass_results[self.current_test_name]['log_path'].append(
- cputils.stop_pixel_logger(self.dut, log_path))
+ self.dut_utils.stop_pixel_logger(log_path))
self.process_test_results()
def process_test_results(self):
diff --git a/acts_tests/tests/google/wifi/WifiRvrTest.py b/acts_tests/tests/google/wifi/WifiRvrTest.py
index 731807c..1daf863 100644
--- a/acts_tests/tests/google/wifi/WifiRvrTest.py
+++ b/acts_tests/tests/google/wifi/WifiRvrTest.py
@@ -572,6 +572,12 @@
self.sta_dut.droid.wakeLockAcquireDim()
else:
self.sta_dut.go_to_sleep()
+ # Enable Tune Code
+ band = self.access_point.band_lookup_by_channel(testcase_params['channel'])
+ if 'tune_code' in self.testbed_params:
+ if int(self.testbed_params['tune_code']['manual_tune_code']):
+ self.log.info('Tune Code forcing enabled in config file')
+ wputils.write_antenna_tune_code(self.sta_dut, self.testbed_params['tune_code'][band])
if (wputils.validate_network(self.sta_dut,
testcase_params['test_network']['SSID'])
and not self.testclass_params.get('force_reconnect', 0)):