| #!/usr/bin/env python3 |
| # |
| # Copyright 2020 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import os |
| from acts_contrib.test_utils.gnss.GnssBlankingBase import GnssBlankingBase |
| from collections import namedtuple |
| from acts_contrib.test_utils.gnss.LabTtffTestBase import LabTtffTestBase |
| from acts_contrib.test_utils.gnss.gnss_test_utils import detect_crash_during_tracking, gnss_tracking_via_gtw_gpstool, \ |
| start_gnss_by_gtw_gpstool, process_ttff_by_gtw_gpstool, calculate_position_error |
| from acts.context import get_current_context |
| from acts.utils import get_current_epoch_time |
| from time import sleep |
| import csv |
| import matplotlib.pyplot as plt |
| from mpl_toolkits.mplot3d.axes3d import Axes3D |
| import statistics |
| |
| |
| class LabGnssPowerSweepTest(GnssBlankingBase): |
| |
| def gnss_plot_2D_result(self, position_error): |
| """Plot 2D position error result |
| """ |
| x_axis = [] |
| y_axis = [] |
| z_axis = [] |
| for key in position_error: |
| tmp = key.split('_') |
| l1_pwr = float(tmp[1]) |
| l5_pwr = float(tmp[3]) |
| position_error_value = position_error[key] |
| x_axis.append(l1_pwr) |
| y_axis.append(l5_pwr) |
| z_axis.append(position_error_value) |
| |
| fig = plt.figure(figsize=(12, 7)) |
| ax = plt.axes(projection='3d') |
| ax.scatter(x_axis, y_axis, z_axis) |
| plt.title("Z axis Position Error", fontsize=12) |
| plt.xlabel("L1 PWR (dBm)", fontsize=12) |
| plt.ylabel("L5 PWR (dBm)", fontsize=12) |
| plt.show() |
| path_name = os.path.join(self.gnss_log_path, 'result.png') |
| plt.savefig(path_name) |
| |
| def gnss_wait_for_ephemeris_download(self): |
| """Launch GTW GPSTool and Clear all GNSS aiding data |
| Start GNSS tracking on GTW_GPSTool. |
| Wait for "first_wait" at simulator power = "power_level" to download Ephemeris |
| """ |
| first_wait = self.user_params.get('first_wait', 300) |
| LabTtffTestBase.start_set_gnss_power(self) |
| self.start_gnss_and_wait(first_wait) |
| |
| def gnss_check_fix(self, json_tag): |
| """Launch GTW GPSTool and check position fix or not |
| Returns: |
| True : Can fix within 120 sec |
| False |
| """ |
| # Check Latitude for fix |
| self.dut.log.info("Restart GTW GPSTool in gnss_check_fix") |
| start_gnss_by_gtw_gpstool(self.dut, state=True) |
| begin_time = get_current_epoch_time() |
| if not self.dut.is_adb_logcat_on: |
| self.dut.start_adb_logcat() |
| while True: |
| if get_current_epoch_time() - begin_time >= 120000: |
| self.dut.log.info("Location fix timeout in gnss_check_fix") |
| start_gnss_by_gtw_gpstool(self.dut, state=False) |
| json_tag = json_tag + '_gnss_check_fix_timeout' |
| self.dut.cat_adb_log(tag=json_tag, |
| begin_time=begin_time, |
| end_time=None, |
| dest_path=self.gnss_log_path) |
| return False |
| sleep(1) |
| logcat_results = self.dut.search_logcat("Latitude", begin_time) |
| if logcat_results: |
| self.dut.log.info("Location fix successfully in gnss_check_fix") |
| json_tag = json_tag + '_gnss_check_fix_success' |
| self.dut.cat_adb_log(tag=json_tag, |
| begin_time=begin_time, |
| end_time=None, |
| dest_path=self.gnss_log_path) |
| return True |
| |
| def gnss_check_l5_engaging(self, json_tag): |
| """check L5 engaging |
| Returns: |
| True : L5 engaged |
| False |
| """ |
| # Check L5 engaging rate |
| begin_time = get_current_epoch_time() |
| if not self.dut.is_adb_logcat_on: |
| self.dut.start_adb_logcat() |
| while True: |
| if get_current_epoch_time() - begin_time >= 120000: |
| self.dut.log.info( |
| "L5 engaging timeout in gnss_check_l5_engaging") |
| start_gnss_by_gtw_gpstool(self.dut, state=False) |
| json_tag = json_tag + '_gnss_check_l5_engaging_timeout' |
| self.dut.cat_adb_log(tag=json_tag, |
| begin_time=begin_time, |
| end_time=None, |
| dest_path=self.gnss_log_path) |
| return False |
| sleep(1) |
| logcat_results = self.dut.search_logcat("L5 engaging rate:", |
| begin_time) |
| if logcat_results: |
| start_idx = logcat_results[-1]['log_message'].find( |
| "L5 engaging rate:") |
| tmp = logcat_results[-1]['log_message'][(start_idx + 18):] |
| l5_engaging_rate = float(tmp.strip('%')) |
| |
| if l5_engaging_rate != 0: |
| self.dut.log.info("L5 engaged") |
| json_tag = json_tag + '_gnss_check_l5_engaging_success' |
| self.dut.cat_adb_log(tag=json_tag, |
| begin_time=begin_time, |
| end_time=None, |
| dest_path=self.gnss_log_path) |
| return True |
| |
| def gnss_check_position_error(self, json_tag): |
| """check position error |
| Returns: |
| position error average value |
| """ |
| average_position_error_count = 60 |
| position_error_all = [] |
| hacc_all = [] |
| default_position_error_mean = 6666 |
| default_position_error_std = 6666 |
| default_hacc_mean = 6666 |
| default_hacc_std = 6666 |
| idx = 0 |
| begin_time = get_current_epoch_time() |
| if not self.dut.is_adb_logcat_on: |
| self.dut.start_adb_logcat() |
| while True: |
| if get_current_epoch_time() - begin_time >= 120000: |
| self.dut.log.info( |
| "Position error calculation timeout in gnss_check_position_error" |
| ) |
| start_gnss_by_gtw_gpstool(self.dut, state=False) |
| json_tag = json_tag + '_gnss_check_position_error_timeout' |
| self.dut.cat_adb_log(tag=json_tag, |
| begin_time=begin_time, |
| end_time=None, |
| dest_path=self.gnss_log_path) |
| return default_position_error_mean, default_position_error_std, default_hacc_mean, default_hacc_std |
| sleep(1) |
| gnss_results = self.dut.search_logcat("GPSService: Check item", |
| begin_time) |
| if gnss_results: |
| self.dut.log.info(gnss_results[-1]["log_message"]) |
| gnss_location_log = \ |
| gnss_results[-1]["log_message"].split() |
| ttff_lat = float(gnss_location_log[8].split("=")[-1].strip(",")) |
| ttff_lon = float(gnss_location_log[9].split("=")[-1].strip(",")) |
| loc_time = int(gnss_location_log[10].split("=")[-1].strip(",")) |
| ttff_haccu = float( |
| gnss_location_log[11].split("=")[-1].strip(",")) |
| hacc_all.append(ttff_haccu) |
| position_error = calculate_position_error( |
| ttff_lat, ttff_lon, self.simulator_location) |
| position_error_all.append(abs(position_error)) |
| idx = idx + 1 |
| if idx >= average_position_error_count: |
| position_error_mean = statistics.mean(position_error_all) |
| position_error_std = statistics.stdev(position_error_all) |
| hacc_mean = statistics.mean(hacc_all) |
| hacc_std = statistics.stdev(hacc_all) |
| json_tag = json_tag + '_gnss_check_position_error_success' |
| self.dut.cat_adb_log(tag=json_tag, |
| begin_time=begin_time, |
| end_time=None, |
| dest_path=self.gnss_log_path) |
| return position_error_mean, position_error_std, hacc_mean, hacc_std |
| |
| def gnss_tracking_L5_position_error_capture(self, json_tag): |
| """Capture position error after L5 engaged |
| Args: |
| Returns: |
| Position error with L5 |
| """ |
| self.dut.log.info('Start gnss_tracking_L5_position_error_capture') |
| fixed = self.gnss_check_fix(json_tag) |
| if fixed: |
| l5_engaged = self.gnss_check_l5_engaging(json_tag) |
| if l5_engaged: |
| position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_check_position_error( |
| json_tag) |
| start_gnss_by_gtw_gpstool(self.dut, state=False) |
| else: |
| position_error_mean = 8888 |
| position_error_std = 8888 |
| hacc_mean = 8888 |
| hacc_std = 8888 |
| else: |
| position_error_mean = 9999 |
| position_error_std = 9999 |
| hacc_mean = 9999 |
| hacc_std = 9999 |
| self.position_fix_timeout_cnt = self.position_fix_timeout_cnt + 1 |
| |
| if self.position_fix_timeout_cnt > (self.l1_sweep_cnt / 2): |
| self.l1_sensitivity_point = self.current_l1_pwr |
| |
| return position_error_mean, position_error_std, hacc_mean, hacc_std |
| |
| def gnss_power_tracking_loop(self): |
| """Launch GTW GPSTool and Clear all GNSS aiding data |
| Start GNSS tracking on GTW_GPSTool. |
| |
| Args: |
| |
| Returns: |
| True: First fix TTFF are within criteria. |
| False: First fix TTFF exceed criteria. |
| """ |
| test_period = 60 |
| type = 'gnss' |
| start_time = get_current_epoch_time() |
| start_gnss_by_gtw_gpstool(self.dut, state=True, type=type) |
| while get_current_epoch_time() - start_time < test_period * 1000: |
| detect_crash_during_tracking(self.dut, start_time, type) |
| stop_time = get_current_epoch_time() |
| |
| return start_time, stop_time |
| |
| def parse_tracking_log_cat(self, log_dir): |
| self.log.warning(f'Parsing log cat {log_dir} results into dataframe!') |
| |
| def check_l5_points(self, gnss_pwr_swp): |
| cnt = 0 |
| for kk in range(len(gnss_pwr_swp[1])): |
| if gnss_pwr_swp[1][kk][0] == gnss_pwr_swp[1][kk + 1][0]: |
| cnt = cnt + 1 |
| else: |
| return cnt |
| |
| def test_tracking_power_sweep(self): |
| # Create log file path |
| full_output_path = get_current_context().get_full_output_path() |
| self.gnss_log_path = os.path.join(full_output_path, '') |
| os.makedirs(self.gnss_log_path, exist_ok=True) |
| self.log.debug(f'Create log path: {self.gnss_log_path}') |
| csv_path = self.gnss_log_path + 'L1_L5_2D_search_result.csv' |
| csvfile = open(csv_path, 'w') |
| writer = csv.writer(csvfile) |
| writer.writerow([ |
| "csv_result_tag", "position_error_mean", "position_error_std", |
| "hacc_mean", "hacc_std" |
| ]) |
| # for L1 position fix early termination |
| self.l1_sensitivity_point = -999 |
| self.enable_early_terminate = 1 |
| self.position_fix_timeout_cnt = 0 |
| self.current_l1_pwr = 0 |
| self.l1_sweep_cnt = 0 |
| |
| self.gnss_wait_for_ephemeris_download() |
| l1_cable_loss = self.gnss_sim_params.get('L1_cable_loss') |
| l5_cable_loss = self.gnss_sim_params.get('L5_cable_loss') |
| |
| for i, gnss_pwr_swp in enumerate(self.gnss_pwr_sweep_fine_sweep_ls): |
| self.log.info(f'Start fine GNSS power level sweep part {i + 1}') |
| self.l1_sweep_cnt = self.check_l5_points(gnss_pwr_swp) |
| for gnss_pwr_params in gnss_pwr_swp[1]: |
| json_tag = f'test_' |
| csv_result_tag = '' |
| for ii, pwr in enumerate( |
| gnss_pwr_params): # Setup L1 and L5 power |
| sat_sys = gnss_pwr_swp[0][ii].get('sat').upper() |
| band = gnss_pwr_swp[0][ii].get('band').upper() |
| if band == "L1": |
| pwr_biased = pwr + l1_cable_loss |
| if pwr != self.current_l1_pwr: |
| self.position_fix_timeout_cnt = 0 |
| self.current_l1_pwr = pwr |
| elif band == "L5": |
| pwr_biased = pwr + l5_cable_loss |
| else: |
| pwr_biased = pwr |
| # Set GNSS Simulator power level |
| self.gnss_simulator.ping_inst() |
| self.gnss_simulator.set_scenario_power( |
| power_level=pwr_biased, |
| sat_system=sat_sys, |
| freq_band=band) |
| self.log.info(f'Set {sat_sys} {band} with power {pwr}') |
| json_tag = json_tag + f'{sat_sys}_{band}_{pwr}' |
| csv_result_tag = csv_result_tag + f'{band}_{pwr}_' |
| |
| if self.current_l1_pwr < self.l1_sensitivity_point and self.enable_early_terminate == 1: |
| position_error_mean = -1 |
| position_error_std = -1 |
| hacc_mean = -1 |
| hacc_std = -1 |
| else: |
| position_error_mean, position_error_std, hacc_mean, hacc_std = self.gnss_tracking_L5_position_error_capture( |
| json_tag) |
| writer = csv.writer(csvfile) |
| writer.writerow([ |
| csv_result_tag, position_error_mean, position_error_std, |
| hacc_mean, hacc_std |
| ]) |
| csvfile.close() |