blob: f6d962fa80569f66bfee1d04f70eadaa3478e223 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''GNSS Base Class for Blanking and Hot Start Sensitivity Search'''
import os
import re
from time import sleep
from collections import namedtuple
from itertools import product
from numpy import arange
from pandas import DataFrame, merge
from acts.signals import TestError
from acts.signals import TestFailure
from acts.logger import epoch_to_log_line_timestamp
from acts.context import get_current_context
from acts_contrib.test_utils.gnss import LabTtffTestBase as lttb
from acts_contrib.test_utils.gnss.LabTtffTestBase import glob_re
from acts_contrib.test_utils.gnss.gnss_test_utils import launch_eecoexer
from acts_contrib.test_utils.gnss.gnss_test_utils import execute_eecoexer_function
from acts_contrib.test_utils.gnss.gnss_test_utils import start_gnss_by_gtw_gpstool
from acts_contrib.test_utils.gnss.gnss_test_utils import get_current_epoch_time
from acts_contrib.test_utils.gnss.gnss_test_utils import check_current_focus_app
from acts_contrib.test_utils.gnss.gnss_test_utils import process_ttff_by_gtw_gpstool
from acts_contrib.test_utils.gnss.gnss_test_utils import check_ttff_data
from acts_contrib.test_utils.gnss.gnss_test_utils import process_gnss_by_gtw_gpstool
from acts_contrib.test_utils.gnss.dut_log_test_utils import get_gpstool_logs
from acts_contrib.test_utils.gnss.gnss_testlog_utils import parse_gpstool_ttfflog_to_df
def range_wi_end(dut, start, stop, step):
"""
Generate a list of data from start to stop with the step. The list includes start and stop value
and also supports floating point.
Args:
dut: An AndroidDevice object.
start: start value.
Type, int or float.
stop: stop value.
Type, int or float.
step: step value.
Type, int or float.
Returns:
range_ls: the list of data.
"""
if step == 0:
dut.log.warn('Step is 0. Return empty list')
range_ls = []
else:
if start == stop:
range_ls = [stop]
else:
range_ls = list(arange(start, stop, step))
if len(range_ls) > 0:
if (step < 0 and range_ls[-1] > stop) or (step > 0 and
range_ls[-1] < stop):
range_ls.append(stop)
dut.log.debug(f'The range list is: {range_ls}')
return range_ls
def check_ttff_pe(dut, ttff_data, ttff_mode, pe_criteria):
"""Verify all TTFF results from ttff_data.
Args:
dut: An AndroidDevice object.
ttff_data: TTFF data of secs, position error and signal strength.
ttff_mode: TTFF Test mode for current test item.
pe_criteria: Criteria for current test item.
"""
ret = True
no_iteration = len(ttff_data.keys())
dut.log.info(
f'{no_iteration} iterations of TTFF {ttff_mode} tests finished.')
dut.log.info(f'{ttff_mode} PASS criteria is {pe_criteria} meters')
dut.log.debug(f'{ttff_mode} TTFF data: {ttff_data}')
if len(ttff_data.keys()) == 0:
dut.log.error("GTW_GPSTool didn't process TTFF properly.")
raise TestFailure("GTW_GPSTool didn't process TTFF properly.")
if any(
float(ttff_data[key].ttff_pe) >= pe_criteria
for key in ttff_data.keys()):
dut.log.error(
f'One or more TTFF {ttff_mode} are over test criteria {pe_criteria} meters'
)
ret = False
else:
dut.log.info(
f'All TTFF {ttff_mode} are within test criteria {pe_criteria} meters.'
)
ret = True
return ret
class GnssBlankingBase(lttb.LabTtffTestBase):
""" LAB GNSS Cellular Coex Tx Power Sweep TTFF/FFPE Tests"""
GNSS_PWR_SWEEP = 'gnss_pwr_sweep'
CELL_PWR_SWEEP = 'cell_pwr_sweep'
def __init__(self, controllers):
""" Initializes class attributes. """
super().__init__(controllers)
self.eecoex_func = ''
self.start_pwr = 10
self.stop_pwr = 24
self.offset = 1
self.result_cell_pwr = 10
self.gsm_sweep_params = None
self.lte_tdd_pc3_sweep_params = None
self.lte_tdd_pc2_sweep_params = None
self.coex_stop_cmd = None
self.scen_sweep = False
self.gnss_pwr_sweep_init_ls = []
self.gnss_pwr_sweep_fine_sweep_ls = []
def setup_class(self):
super().setup_class()
# Required parameters
req_params = [self.GNSS_PWR_SWEEP]
self.unpack_userparams(req_param_names=req_params)
self.unpack_gnss_pwr_sweep()
# Optional parameters
cell_sweep_params = self.user_params.get(self.CELL_PWR_SWEEP, [])
if cell_sweep_params:
self.gsm_sweep_params = cell_sweep_params.get("GSM", [10, 33, 1])
self.lte_tdd_pc3_sweep_params = cell_sweep_params.get(
"LTE_TDD_PC3", [10, 24, 1])
self.lte_tdd_pc2_sweep_params = cell_sweep_params.get(
"LTE_TDD_PC2", [10, 26, 1])
def setup_test(self):
super().setup_test()
launch_eecoexer(self.dut)
# Set DUT temperature the limit to 60 degree
self.dut.adb.shell(
'setprop persist.com.google.eecoexer.cellular.temperature_limit 60')
# Get current context full path to create the log folder.
cur_test_item_dir = get_current_context().get_full_output_path()
self.gnss_log_path = os.path.join(self.log_path, cur_test_item_dir)
os.makedirs(self.gnss_log_path, exist_ok=True)
## Start GNSS chip log
self.start_dut_gnss_log()
def teardown_test(self):
super().teardown_test()
# Set gnss_vendor_log_path based on GNSS solution vendor.
gnss_vendor_log_path = os.path.join(self.gnss_log_path,
self.diag_option)
os.makedirs(gnss_vendor_log_path, exist_ok=True)
# Stop GNSS chip log and pull the logs to local file system
self.stop_and_pull_dut_gnss_log(gnss_vendor_log_path)
# Stop cellular Tx and close GPStool and EEcoexer APPs.
self.stop_coex_tx()
self.log.debug('Close GPStool APP')
self.dut.force_stop_apk("com.android.gpstool")
self.log.debug('Close EEcoexer APP')
self.dut.force_stop_apk("com.google.eecoexer")
def derive_sweep_list(self, data):
"""
Derive sweep list from config
Args:
data: GNSS simulator scenario power setting.
type, dictionary.
"""
match_tag = r'(?P<sat>[a-z]+)_(?P<band>[a-z]+\d\S*)'
sweep_all_ls = []
set_all_ls = []
regex_match = re.compile(match_tag)
method = data.get('method')
for key, value in data.items():
result = regex_match.search(key)
if result:
set_all_ls.append(result.groupdict())
sweep_all_ls.append(
range_wi_end(self.dut, value[0], value[1], value[2]))
if method == 'product':
swp_result_ls = list(product(*sweep_all_ls))
else:
swp_result_ls = list(zip(*sweep_all_ls))
self.log.debug(f'set_all_ls: {set_all_ls}')
self.log.debug(f'swp_result_ls: {swp_result_ls}')
return set_all_ls, swp_result_ls
def unpack_gnss_pwr_sweep(self):
""" Unpack gnss_pwr_sweep and construct sweep parameters
"""
for key, value in self.gnss_pwr_sweep.items():
if key == 'init':
self.gnss_pwr_sweep_init_ls = []
self.log.info(f'Sweep: {value}')
result = self.derive_sweep_list(value)
self.gnss_pwr_sweep_init_ls.append(result)
elif key == 'fine_sweep':
self.gnss_pwr_sweep_fine_sweep_ls = []
self.log.info(f'Sweep: {value}')
result = self.derive_sweep_list(value)
self.gnss_pwr_sweep_fine_sweep_ls.append(result)
else:
self.log.error(f'{key} is a unsupported key in gnss_pwr_sweep.')
def stop_coex_tx(self):
"""
Stop EEcoexer Tx power.
"""
# Stop cellular Tx by EEcoexer.
if self.coex_stop_cmd:
self.log.info(f'Stop EEcoexer Test Command: {self.coex_stop_cmd}')
execute_eecoexer_function(self.dut, self.coex_stop_cmd)
def analysis_ttff_ffpe(self, ttff_data, json_tag=''):
"""
Pull logs and parsing logs into json file.
Args:
ttff_data: ttff_data from test results.
Type, list.
json_tag: tag for parsed json file name.
Type, str.
"""
# Create log directory.
gps_log_path = os.path.join(self.gnss_log_path,
'Cell_Pwr_Sweep_Results')
# Pull logs of GTW GPStool.
get_gpstool_logs(self.dut, gps_log_path, False)
# Parsing the log of GTW GPStool into pandas dataframe.
target_dir = os.path.join(gps_log_path, 'GPSLogs', 'files')
gps_api_log_ls = glob_re(self.dut, target_dir, r'GNSS_\d+')
latest_gps_api_log = max(gps_api_log_ls, key=os.path.getctime)
self.log.info(f'Get latest GPStool log is: {latest_gps_api_log}')
df_ttff_ffpe = DataFrame(
parse_gpstool_ttfflog_to_df(latest_gps_api_log))
# Add test case, TTFF and FFPE data into the dataframe.
ttff_dict = {}
for i in ttff_data:
data = ttff_data[i]._asdict()
ttff_dict[i] = dict(data)
ttff_data_df = DataFrame(ttff_dict).transpose()
ttff_data_df = ttff_data_df[[
'ttff_loop', 'ttff_sec', 'ttff_pe', 'ttff_haccu'
]]
try:
df_ttff_ffpe = merge(df_ttff_ffpe,
ttff_data_df,
left_on='loop',
right_on='ttff_loop')
except: # pylint: disable=bare-except
self.log.warning("Can't merge ttff_data and df.")
df_ttff_ffpe['test_case'] = json_tag
json_file = f'gps_log_{json_tag}.json'
ttff_data_json_file = f'gps_log_{json_tag}_ttff_data.json'
json_path = os.path.join(gps_log_path, json_file)
ttff_data_json_path = os.path.join(gps_log_path, ttff_data_json_file)
# Save dataframe into json file.
df_ttff_ffpe.to_json(json_path, orient='table', index=False)
ttff_data_df.to_json(ttff_data_json_path, orient='table', index=False)
def hot_start_ttff_ffpe_process(self, iteration, wait):
"""
Function to run hot start TTFF/FFPE by GTW GPSTool
Args:
iteration: TTFF/FFPE test iteration.
type, integer.
wait: wait time before the hot start TTFF/FFPE test.
type, integer.
"""
# Start GTW GPStool.
self.dut.log.info("Restart GTW GPSTool")
start_gnss_by_gtw_gpstool(self.dut, state=True)
if wait > 0:
self.log.info(
f'Wait for {wait} seconds before TTFF to acquire data.')
sleep(wait)
# Get current time and convert to human readable format
begin_time = get_current_epoch_time()
log_begin_time = epoch_to_log_line_timestamp(begin_time)
self.dut.log.debug(f'Start time is {log_begin_time}')
# Run hot start TTFF
for i in range(3):
self.log.info(f'Start hot start attempt {i + 1}')
self.dut.adb.shell(
f'am broadcast -a com.android.gpstool.ttff_action '
f'--es ttff hs --es cycle {iteration} --ez raninterval False')
sleep(1)
if self.dut.search_logcat(
"act=com.android.gpstool.start_test_action", begin_time):
self.dut.log.info("Send TTFF start_test_action successfully.")
break
else:
check_current_focus_app(self.dut)
raise TestError("Fail to send TTFF start_test_action.")
return begin_time
def gnss_hot_start_ttff_ffpe_test(self,
iteration,
sweep_enable=False,
json_tag='',
wait=0):
"""
GNSS hot start ttff ffpe tset
Args:
iteration: hot start TTFF test iteration.
Type, int.
Default, 1.
sweep_enable: Indicator for the function to check if it is run by cell_power_sweep()
Type, bool.
Default, False.
json_tag: if the function is run by cell_power_sweep(), the function would use
this as a part of file name to save TTFF and FFPE results into json file.
Type, str.
Default, ''.
wait: wait time before ttff test.
Type, int.
Default, 0.
Raise:
TestError: fail to send TTFF start_test_action.
"""
test_type = namedtuple('Type', ['command', 'criteria'])
test_type_ttff = test_type('Hot Start', self.hs_criteria)
test_type_pe = test_type('Hot Start', self.hs_ttff_pecriteria)
# Verify hot start TTFF results
begin_time = self.hot_start_ttff_ffpe_process(iteration, wait)
try:
ttff_data = process_ttff_by_gtw_gpstool(self.dut, begin_time,
self.simulator_location)
except: # pylint: disable=bare-except
self.log.warning('Fail to acquire TTFF data. Retry again.')
begin_time = self.hot_start_ttff_ffpe_process(iteration, wait)
ttff_data = process_ttff_by_gtw_gpstool(self.dut, begin_time,
self.simulator_location)
# Stop GTW GPSTool
self.dut.log.info("Stop GTW GPSTool")
start_gnss_by_gtw_gpstool(self.dut, state=False)
if sweep_enable:
self.analysis_ttff_ffpe(ttff_data, json_tag)
result_ttff = check_ttff_data(self.dut,
ttff_data,
ttff_mode=test_type_ttff.command,
criteria=test_type_ttff.criteria)
result_pe = check_ttff_pe(self.dut,
ttff_data,
ttff_mode=test_type_pe.command,
pe_criteria=test_type_pe.criteria)
if not result_ttff or not result_pe:
self.dut.log.warning('%s TTFF fails to reach '
'designated criteria' % test_type_ttff.command)
self.dut.log.info("Stop GTW GPSTool")
return False
return True
def hot_start_gnss_power_sweep(self,
sweep_ls,
wait=0,
iteration=1,
sweep_enable=False,
title=''):
"""
GNSS simulator power sweep of hot start test.
Args:
sweep_ls: list of sweep parameters.
Type, tuple.
wait: Wait time before the power sweep.
Type, int.
Default, 0.
iteration: The iteration times of hot start test.
Type, int.
Default, 1.
sweep_enable: Indicator for power sweep.
It will be True only in GNSS sensitivity search case.
Type, bool.
Defaule, False.
title: the target log folder title for GNSS sensitivity search test items.
Type, str.
Default, ''.
Return:
Bool, gnss_pwr_params.
"""
# Calculate loop range list from gnss_simulator_power_level and sa_sensitivity
self.log.debug(
f'Start the GNSS simulator power sweep. The sweep tuple is [{sweep_ls}]'
)
if sweep_enable:
self.start_gnss_and_wait(wait)
else:
self.dut.log.info('Wait %d seconds to start TTFF HS' % wait)
sleep(wait)
# Sweep GNSS simulator power level in range_ls.
# Do hot start for every power level.
# Check the TTFF result if it can pass the criteria.
gnss_pwr_params = None
previous_pwr_lvl = None
current_pwr_lvl = None
return_pwr_lvl = {}
for j, gnss_pwr_params in enumerate(sweep_ls[1]):
json_tag = f'{title}_'
current_pwr_lvl = gnss_pwr_params
if j == 0:
previous_pwr_lvl = current_pwr_lvl
for i, pwr in enumerate(gnss_pwr_params):
sat_sys = sweep_ls[0][i].get('sat').upper()
band = sweep_ls[0][i].get('band').upper()
# Set GNSS Simulator power level
self.gnss_simulator.ping_inst()
self.gnss_simulator.set_scenario_power(power_level=pwr,
sat_system=sat_sys,
freq_band=band)
self.log.info(f'Set {sat_sys} {band} with power {pwr}')
json_tag = json_tag + f'{sat_sys}_{band}_{pwr}'
# Wait 30 seconds if major power sweep level is changed.
wait = 0
if j > 0:
if current_pwr_lvl[0] != previous_pwr_lvl[0]:
wait = 30
# GNSS hot start test
if not self.gnss_hot_start_ttff_ffpe_test(iteration, sweep_enable,
json_tag, wait):
result = False
break
previous_pwr_lvl = current_pwr_lvl
result = True
for i, pwr in enumerate(previous_pwr_lvl):
key = f'{sweep_ls[0][i].get("sat").upper()}_{sweep_ls[0][i].get("band").upper()}'
return_pwr_lvl.setdefault(key, pwr)
return result, return_pwr_lvl
def gnss_init_power_setting(self, first_wait=180):
"""
GNSS initial power level setting.
Args:
first_wait: wait time after the cold start.
Type, int.
Default, 180.
Returns:
True if the process is done successully and hot start results pass criteria.
Raise:
TestFailure: fail TTFF test criteria.
"""
# Start and set GNSS simulator
self.start_set_gnss_power()
# Start 1st time cold start to obtain ephemeris
process_gnss_by_gtw_gpstool(self.dut, self.test_types['cs'].criteria)
# Read initial power sweep settings
if self.gnss_pwr_sweep_init_ls:
for sweep_ls in self.gnss_pwr_sweep_init_ls:
ret, gnss_pwr_lvl = self.hot_start_gnss_power_sweep(
sweep_ls, first_wait)
else:
self.log.warning('Skip initial power sweep.')
ret = False
gnss_pwr_lvl = None
return ret, gnss_pwr_lvl
def cell_power_sweep(self):
"""
Linear search cellular power level. Doing GNSS hot start with cellular coexistence
and checking if hot start can pass hot start criteria or not.
Returns: final power level of cellular power
"""
# Get parameters from user params.
ttft_iteration = self.user_params.get('ttff_iteration', 25)
wait_before_test = self.user_params.get('wait_before_test', 60)
wait_between_pwr = self.user_params.get('wait_between_pwr', 60)
power_th = self.start_pwr
# Generate the power sweep list.
power_search_ls = range_wi_end(self.dut, self.start_pwr, self.stop_pwr,
self.offset)
# Create gnss log folders for init and cellular sweep
gnss_init_log_dir = os.path.join(self.gnss_log_path, 'GNSS_init')
# Pull all exist GPStool logs into GNSS_init folder
get_gpstool_logs(self.dut, gnss_init_log_dir, False)
if power_search_ls:
# Run the cellular and GNSS coexistence test item.
for i, pwr_lvl in enumerate(power_search_ls):
self.log.info(f'Cellular power sweep loop: {i}')
self.log.info(f'Cellular target power: {pwr_lvl}')
# Enable GNSS to receive satellites' signals for "wait_between_pwr" seconds.
# Wait more time before 1st power level
if i == 0:
wait = wait_before_test
else:
wait = wait_between_pwr
self.start_gnss_and_wait(wait)
# Set cellular Tx power level.
eecoex_cmd = self.eecoex_func.format(pwr_lvl)
eecoex_cmd_file_str = eecoex_cmd.replace(',', '_')
execute_eecoexer_function(self.dut, eecoex_cmd)
# Get the last power level that can pass hots start ttff/ffpe spec.
if self.gnss_hot_start_ttff_ffpe_test(ttft_iteration, True,
eecoex_cmd_file_str):
if i + 1 == len(power_search_ls):
power_th = pwr_lvl
else:
if i == 0:
power_th = self.start_pwr
else:
power_th = power_search_ls[i - 1]
# Stop cellular Tx after a test cycle.
self.stop_coex_tx()
else:
# Run the stand alone test item.
self.start_gnss_and_wait(wait_between_pwr)
eecoex_cmd_file_str = 'no_cellular_coex'
self.gnss_hot_start_ttff_ffpe_test(ttft_iteration, True,
eecoex_cmd_file_str)
self.log.info(f'The GNSS WWAN coex celluar Tx power is {power_th}')
return power_th