blob: f641cc480082740e836aec7ff4beed318199e895 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2020 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from acts_contrib.test_utils.gnss.GnssBlankingBase import GnssBlankingBase
from collections import namedtuple
from acts_contrib.test_utils.gnss.LabTtffTestBase import LabTtffTestBase
from acts_contrib.test_utils.gnss.gnss_test_utils import detect_crash_during_tracking, gnss_tracking_via_gtw_gpstool, \
start_gnss_by_gtw_gpstool, process_ttff_by_gtw_gpstool, calculate_position_error
from acts.context import get_current_context
from acts.utils import get_current_epoch_time
from time import sleep
import csv
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.axes3d import Axes3D
import statistics
class LabGnssPowerSweepTest(GnssBlankingBase):
def gnss_plot_2D_result(self, position_error):
"""Plot 2D position error result
"""
x_axis = []
y_axis = []
z_axis = []
for key in position_error:
tmp = key.split('_')
l1_pwr = float(tmp[1])
l5_pwr = float(tmp[3])
position_error_value = position_error[key]
x_axis.append(l1_pwr)
y_axis.append(l5_pwr)
z_axis.append(position_error_value)
fig = plt.figure(figsize=(12, 7))
ax = plt.axes(projection='3d')
ax.scatter(x_axis, y_axis, z_axis)
plt.title("Z axis Position Error", fontsize=12)
plt.xlabel("L1 PWR (dBm)", fontsize=12)
plt.ylabel("L5 PWR (dBm)", fontsize=12)
plt.show()
path_name = os.path.join(self.gnss_log_path, 'result.png')
plt.savefig(path_name)
def gnss_wait_for_ephemeris_download(self):
"""Launch GTW GPSTool and Clear all GNSS aiding data
Start GNSS tracking on GTW_GPSTool.
Wait for "first_wait" at simulator power = "power_level" to download Ephemeris
"""
first_wait = self.user_params.get('first_wait', 300)
LabTtffTestBase.start_set_gnss_power(self)
self.start_gnss_and_wait(first_wait)
def gnss_check_fix(self, json_tag):
"""Launch GTW GPSTool and check position fix or not
Returns:
True : Can fix within 120 sec
False
"""
# Check Latitude for fix
self.dut.log.info("Restart GTW GPSTool in gnss_check_fix")
start_gnss_by_gtw_gpstool(self.dut, state=True)
begin_time = get_current_epoch_time()
if not self.dut.is_adb_logcat_on:
self.dut.start_adb_logcat()
while True:
if get_current_epoch_time() - begin_time >= 120000:
self.dut.log.info("Location fix timeout in gnss_check_fix")
start_gnss_by_gtw_gpstool(self.dut, state=False)
json_tag = json_tag + '_gnss_check_fix_timeout'
self.dut.cat_adb_log(tag=json_tag,
begin_time=begin_time,
end_time=None,
dest_path=self.gnss_log_path)
return False
sleep(1)
logcat_results = self.dut.search_logcat("Latitude", begin_time)
if logcat_results:
self.dut.log.info("Location fix successfully in gnss_check_fix")
json_tag = json_tag + '_gnss_check_fix_success'
self.dut.cat_adb_log(tag=json_tag,
begin_time=begin_time,
end_time=None,
dest_path=self.gnss_log_path)
return True
def gnss_check_l5_engaging(self, json_tag):
"""check L5 engaging
Returns:
True : L5 engaged
False
"""
# Check L5 engaging rate
begin_time = get_current_epoch_time()
if not self.dut.is_adb_logcat_on:
self.dut.start_adb_logcat()
while True:
if get_current_epoch_time() - begin_time >= 120000:
self.dut.log.info(
"L5 engaging timeout in gnss_check_l5_engaging")
start_gnss_by_gtw_gpstool(self.dut, state=False)
json_tag = json_tag + '_gnss_check_l5_engaging_timeout'
self.dut.cat_adb_log(tag=json_tag,
begin_time=begin_time,
end_time=None,
dest_path=self.gnss_log_path)
return False
sleep(1)
logcat_results = self.dut.search_logcat("L5 engaging rate:",
begin_time)
if logcat_results:
start_idx = logcat_results[-1]['log_message'].find(
"L5 engaging rate:")
tmp = logcat_results[-1]['log_message'][(start_idx + 18):]
l5_engaging_rate = float(tmp.strip('%'))
if l5_engaging_rate != 0:
self.dut.log.info("L5 engaged")
json_tag = json_tag + '_gnss_check_l5_engaging_success'
self.dut.cat_adb_log(tag=json_tag,
begin_time=begin_time,
end_time=None,
dest_path=self.gnss_log_path)
return True
def gnss_check_position_error(self, json_tag):
"""check position error
Returns:
position error average value
"""
average_position_error_count = 60
position_error_all = []
hacc_all = []
default_position_error_mean = 6666
default_position_error_std = 6666
default_hacc_mean = 6666
default_hacc_std = 6666
idx = 0
begin_time = get_current_epoch_time()
if not self.dut.is_adb_logcat_on:
self.dut.start_adb_logcat()
while True:
if get_current_epoch_time() - begin_time >= 120000:
self.dut.log.info(
"Position error calculation timeout in gnss_check_position_error"
)
start_gnss_by_gtw_gpstool(self.dut, state=False)
json_tag = json_tag + '_gnss_check_position_error_timeout'
self.dut.cat_adb_log(tag=json_tag,
begin_time=begin_time,
end_time=None,
dest_path=self.gnss_log_path)
return default_position_error_mean, default_position_error_std, default_hacc_mean, default_hacc_std
sleep(1)
gnss_results = self.dut.search_logcat("GPSService: Check item",
begin_time)
if gnss_results:
self.dut.log.info(gnss_results[-1]["log_message"])
gnss_location_log = \
gnss_results[-1]["log_message"].split()
ttff_lat = float(gnss_location_log[8].split("=")[-1].strip(","))
ttff_lon = float(gnss_location_log[9].split("=")[-1].strip(","))
loc_time = int(gnss_location_log[10].split("=")[-1].strip(","))
ttff_haccu = float(
gnss_location_log[11].split("=")[-1].strip(","))
hacc_all.append(ttff_haccu)
position_error = calculate_position_error(
ttff_lat, ttff_lon, self.simulator_location)
position_error_all.append(abs(position_error))
idx = idx + 1
if idx >= average_position_error_count:
position_error_mean = statistics.mean(position_error_all)
position_error_std = statistics.stdev(position_error_all)
hacc_mean = statistics.mean(hacc_all)
hacc_std = statistics.stdev(hacc_all)
json_tag = json_tag + '_gnss_check_position_error_success'
self.dut.cat_adb_log(tag=json_tag,
begin_time=begin_time,
end_time=None,
dest_path=self.gnss_log_path)
return position_error_mean, position_error_std, hacc_mean, hacc_std
def gnss_tracking_L5_position_error_capture(self, json_tag):
"""Capture position error after L5 engaged
Args:
Returns:
Position error with L5
"""
self.dut.log.info('Start gnss_tracking_L5_position_error_capture')
fixed = self.gnss_check_fix(json_tag)
if fixed:
l5_engaged = self.gnss_check_l5_engaging(json_tag)
if l5_engaged:
position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_check_position_error(
json_tag)
start_gnss_by_gtw_gpstool(self.dut, state=False)
else:
position_error_mean = 8888
position_error_std = 8888
hacc_mean = 8888
hacc_std = 8888
else:
position_error_mean = 9999
position_error_std = 9999
hacc_mean = 9999
hacc_std = 9999
self.position_fix_timeout_cnt = self.position_fix_timeout_cnt + 1
if self.position_fix_timeout_cnt > (self.l1_sweep_cnt / 2):
self.l1_sensitivity_point = self.current_l1_pwr
return position_error_mean, position_error_std, hacc_mean, hacc_std
def gnss_power_tracking_loop(self):
"""Launch GTW GPSTool and Clear all GNSS aiding data
Start GNSS tracking on GTW_GPSTool.
Args:
Returns:
True: First fix TTFF are within criteria.
False: First fix TTFF exceed criteria.
"""
test_period = 60
type = 'gnss'
start_time = get_current_epoch_time()
start_gnss_by_gtw_gpstool(self.dut, state=True, type=type)
while get_current_epoch_time() - start_time < test_period * 1000:
detect_crash_during_tracking(self.dut, start_time, type)
stop_time = get_current_epoch_time()
return start_time, stop_time
def parse_tracking_log_cat(self, log_dir):
self.log.warning(f'Parsing log cat {log_dir} results into dataframe!')
def check_l5_points(self, gnss_pwr_swp):
cnt = 0
for kk in range(len(gnss_pwr_swp[1])):
if gnss_pwr_swp[1][kk][0] == gnss_pwr_swp[1][kk + 1][0]:
cnt = cnt + 1
else:
return cnt
def test_tracking_power_sweep(self):
# Create log file path
full_output_path = get_current_context().get_full_output_path()
self.gnss_log_path = os.path.join(full_output_path, '')
os.makedirs(self.gnss_log_path, exist_ok=True)
self.log.debug(f'Create log path: {self.gnss_log_path}')
csv_path = self.gnss_log_path + 'L1_L5_2D_search_result.csv'
csvfile = open(csv_path, 'w')
writer = csv.writer(csvfile)
writer.writerow([
"csv_result_tag", "position_error_mean", "position_error_std",
"hacc_mean", "hacc_std"
])
# for L1 position fix early termination
self.l1_sensitivity_point = -999
self.enable_early_terminate = 1
self.position_fix_timeout_cnt = 0
self.current_l1_pwr = 0
self.l1_sweep_cnt = 0
self.gnss_wait_for_ephemeris_download()
l1_cable_loss = self.gnss_sim_params.get('L1_cable_loss')
l5_cable_loss = self.gnss_sim_params.get('L5_cable_loss')
for i, gnss_pwr_swp in enumerate(self.gnss_pwr_sweep_fine_sweep_ls):
self.log.info(f'Start fine GNSS power level sweep part {i + 1}')
self.l1_sweep_cnt = self.check_l5_points(gnss_pwr_swp)
for gnss_pwr_params in gnss_pwr_swp[1]:
json_tag = f'test_'
csv_result_tag = ''
for ii, pwr in enumerate(
gnss_pwr_params): # Setup L1 and L5 power
sat_sys = gnss_pwr_swp[0][ii].get('sat').upper()
band = gnss_pwr_swp[0][ii].get('band').upper()
if band == "L1":
pwr_biased = pwr + l1_cable_loss
if pwr != self.current_l1_pwr:
self.position_fix_timeout_cnt = 0
self.current_l1_pwr = pwr
elif band == "L5":
pwr_biased = pwr + l5_cable_loss
else:
pwr_biased = pwr
# Set GNSS Simulator power level
self.gnss_simulator.ping_inst()
self.gnss_simulator.set_scenario_power(
power_level=pwr_biased,
sat_system=sat_sys,
freq_band=band)
self.log.info(f'Set {sat_sys} {band} with power {pwr}')
json_tag = json_tag + f'{sat_sys}_{band}_{pwr}'
csv_result_tag = csv_result_tag + f'{band}_{pwr}_'
if self.current_l1_pwr < self.l1_sensitivity_point and self.enable_early_terminate == 1:
position_error_mean = -1
position_error_std = -1
hacc_mean = -1
hacc_std = -1
else:
position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_tracking_L5_position_error_capture(
json_tag)
writer = csv.writer(csvfile)
writer.writerow([
csv_result_tag, position_error_mean, position_error_std,
hacc_mean, hacc_std
])
csvfile.close()