blob: 507e0735e459f6cbecfc5c4cde796214cd79e637 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2020 - Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import re
import os
import pathlib
import math
import shutil
import fnmatch
import posixpath
import subprocess
import tempfile
from retry import retry
from collections import namedtuple
from datetime import datetime
from xml.etree import ElementTree
from contextlib import contextmanager
from statistics import median
from acts import utils
from acts import asserts
from acts import signals
from acts.libs.proc import job
from acts.controllers.adb_lib.error import AdbCommandError
from acts.controllers.android_device import list_adb_devices
from acts.controllers.android_device import list_fastboot_devices
from acts.controllers.android_device import DEFAULT_QXDM_LOG_PATH
from acts.controllers.android_device import SL4A_APK_NAME
from acts_contrib.test_utils.gnss.gnss_measurement import GnssMeasurement
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.tel import tel_logging_utils as tlutils
from acts_contrib.test_utils.tel import tel_test_utils as tutils
from acts_contrib.test_utils.gnss import gnssstatus_utils
from acts_contrib.test_utils.gnss import gnss_constant
from acts_contrib.test_utils.gnss import supl
from acts_contrib.test_utils.instrumentation.device.command.instrumentation_command_builder import InstrumentationCommandBuilder
from acts_contrib.test_utils.instrumentation.device.command.instrumentation_command_builder import InstrumentationTestCommandBuilder
from acts.utils import get_current_epoch_time
from acts.utils import epoch_to_human_time
from acts_contrib.test_utils.gnss.gnss_defines import BCM_GPS_XML_PATH
from acts_contrib.test_utils.gnss.gnss_defines import BCM_NVME_STO_PATH
WifiEnums = wutils.WifiEnums
FIRST_FIXED_MAX_WAITING_TIME = 60
UPLOAD_TO_SPONGE_PREFIX = "TestResult "
PULL_TIMEOUT = 300
GNSSSTATUS_LOG_PATH = (
"/storage/emulated/0/Android/data/com.android.gpstool/files/")
QXDM_MASKS = ["GPS.cfg", "GPS-general.cfg", "default.cfg"]
TTFF_REPORT = namedtuple(
"TTFF_REPORT", "utc_time ttff_loop ttff_sec ttff_pe ttff_ant_cn "
"ttff_base_cn ttff_haccu")
TRACK_REPORT = namedtuple(
"TRACK_REPORT", "l5flag pe ant_top4cn ant_cn base_top4cn base_cn device_time report_time")
LOCAL_PROP_FILE_CONTENTS = """\
log.tag.LocationManagerService=VERBOSE
log.tag.GnssLocationProvider=VERBOSE
log.tag.GnssMeasurementsProvider=VERBOSE
log.tag.GpsNetInitiatedHandler=VERBOSE
log.tag.GnssNetInitiatedHandler=VERBOSE
log.tag.GnssNetworkConnectivityHandler=VERBOSE
log.tag.ConnectivityService=VERBOSE
log.tag.ConnectivityManager=VERBOSE
log.tag.GnssVisibilityControl=VERBOSE
log.tag.NtpTimeHelper=VERBOSE
log.tag.NtpTrustedTime=VERBOSE
log.tag.GnssPsdsDownloader=VERBOSE
log.tag.Gnss=VERBOSE
log.tag.GnssConfiguration=VERBOSE"""
LOCAL_PROP_FILE_CONTENTS_FOR_WEARABLE = """\
log.tag.ImsPhone=VERBOSE
log.tag.GsmCdmaPhone=VERBOSE
log.tag.Phone=VERBOSE
log.tag.GCoreFlp=VERBOSE"""
TEST_PACKAGE_NAME = "com.google.android.apps.maps"
LOCATION_PERMISSIONS = [
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.ACCESS_COARSE_LOCATION"
]
GNSSTOOL_PACKAGE_NAME = "com.android.gpstool"
GNSSTOOL_PERMISSIONS = [
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.READ_EXTERNAL_STORAGE",
"android.permission.ACCESS_COARSE_LOCATION",
"android.permission.CALL_PHONE",
"android.permission.WRITE_CONTACTS",
"android.permission.CAMERA",
"android.permission.WRITE_EXTERNAL_STORAGE",
"android.permission.READ_CONTACTS",
"android.permission.ACCESS_BACKGROUND_LOCATION"
]
DISABLE_LTO_FILE_CONTENTS = """\
LONGTERM_PSDS_SERVER_1="http://"
LONGTERM_PSDS_SERVER_2="http://"
LONGTERM_PSDS_SERVER_3="http://"
NORMAL_PSDS_SERVER="http://"
REALTIME_PSDS_SERVER="http://"
"""
DISABLE_LTO_FILE_CONTENTS_R = """\
XTRA_SERVER_1="http://"
XTRA_SERVER_2="http://"
XTRA_SERVER_3="http://"
"""
_BRCM_DUTY_CYCLE_PATTERN = re.compile(r".*PGLOR,\d+,STA.*")
class GnssTestUtilsError(Exception):
pass
def remount_device(ad):
"""Remount device file system to read and write.
Args:
ad: An AndroidDevice object.
"""
for retries in range(5):
ad.root_adb()
if ad.adb.getprop("ro.boot.veritymode") == "enforcing":
ad.adb.disable_verity()
reboot(ad)
remount_result = ad.adb.remount()
ad.log.info("Attempt %d - %s" % (retries + 1, remount_result))
if "remount succeeded" in remount_result:
break
def reboot(ad):
"""Reboot device and check if mobile data is available.
Args:
ad: An AndroidDevice object.
"""
ad.log.info("Reboot device to make changes take effect.")
# TODO(diegowchung): remove the timeout setting after p23 back to normal
ad.reboot(timeout=600)
ad.unlock_screen(password=None)
if not is_mobile_data_on(ad):
set_mobile_data(ad, True)
utils.sync_device_time(ad)
def enable_gnss_verbose_logging(ad):
"""Enable GNSS VERBOSE Logging and persistent logcat.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
ad.log.info("Enable GNSS VERBOSE Logging and persistent logcat.")
if check_chipset_vendor_by_qualcomm(ad):
ad.adb.shell("echo -e '\nDEBUG_LEVEL = 5' >> /vendor/etc/gps.conf")
else:
ad.adb.shell("echo LogEnabled=true >> /data/vendor/gps/libgps.conf")
ad.adb.shell("chown gps.system /data/vendor/gps/libgps.conf")
if is_device_wearable(ad):
PROP_CONTENTS = LOCAL_PROP_FILE_CONTENTS + LOCAL_PROP_FILE_CONTENTS_FOR_WEARABLE
else:
PROP_CONTENTS = LOCAL_PROP_FILE_CONTENTS
ad.adb.shell("echo %r >> /data/local.prop" % PROP_CONTENTS)
ad.adb.shell("chmod 644 /data/local.prop")
ad.adb.shell("setprop persist.logd.logpersistd.size 20000")
ad.adb.shell("setprop persist.logd.size 16777216")
ad.adb.shell("setprop persist.vendor.radio.adb_log_on 1")
ad.adb.shell("setprop persist.logd.logpersistd logcatd")
ad.adb.shell("setprop log.tag.copresGcore VERBOSE")
ad.adb.shell("sync")
def get_am_flags(value):
"""Returns the (value, type) flags for a given python value."""
if type(value) is bool:
return str(value).lower(), 'boolean'
elif type(value) is str:
return value, 'string'
raise ValueError("%s should be either 'boolean' or 'string'" % value)
def enable_compact_and_particle_fusion_log(ad):
"""Enable CompactLog, FLP particle fusion log and disable gms
location-based quake monitoring.
Args:
ad: An AndroidDevice object.
"""
ad.root_adb()
ad.log.info("Enable FLP flags and Disable GMS location-based quake "
"monitoring.")
overrides = {
'compact_log_enabled': True,
'flp_use_particle_fusion': True,
'flp_particle_fusion_extended_bug_report': True,
'flp_event_log_size': '86400',
'proks_config': '28',
'flp_particle_fusion_bug_report_window_sec': '86400',
'flp_particle_fusion_bug_report_max_buffer_size': '86400',
'seismic_data_collection': False,
'Ealert__enable': False,
}
for flag, python_value in overrides.items():
value, type = get_am_flags(python_value)
cmd = ("am broadcast -a com.google.android.gms.phenotype.FLAG_OVERRIDE "
"--es package com.google.android.location --es user \* "
"--esa flags %s --esa values %s --esa types %s "
"com.google.android.gms" % (flag, value, type))
ad.adb.shell(cmd, ignore_status=True)
ad.adb.shell("am force-stop com.google.android.gms")
ad.adb.shell("am broadcast -a com.google.android.gms.INITIALIZE")
def disable_xtra_throttle(ad):
"""Disable XTRA throttle will have no limit to download XTRA data.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
ad.log.info("Disable XTRA Throttle.")
ad.adb.shell("echo -e '\nXTRA_TEST_ENABLED=1' >> /vendor/etc/gps.conf")
ad.adb.shell("echo -e '\nXTRA_THROTTLE_ENABLED=0' >> /vendor/etc/gps.conf")
def enable_supl_mode(ad):
"""Enable SUPL back on for next test item.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
ad.log.info("Enable SUPL mode.")
ad.adb.shell("echo -e '\nSUPL_MODE=1' >> /etc/gps_debug.conf")
def disable_supl_mode(ad):
"""Kill SUPL to test XTRA/LTO only test item.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
ad.log.info("Disable SUPL mode.")
ad.adb.shell("echo -e '\nSUPL_MODE=0' >> /etc/gps_debug.conf")
if not check_chipset_vendor_by_qualcomm(ad):
supl.set_supl_over_wifi_state(ad, False)
def enable_vendor_orbit_assistance_data(ad):
"""Enable vendor assistance features.
For Qualcomm: Enable XTRA
For Broadcom: Enable LTO
Args:
ad: An AndroidDevice object.
"""
ad.root_adb()
if is_device_wearable(ad):
lto_mode_wearable(ad, True)
elif check_chipset_vendor_by_qualcomm(ad):
disable_xtra_throttle(ad)
reboot(ad)
else:
lto_mode(ad, True)
def disable_vendor_orbit_assistance_data(ad):
"""Disable vendor assistance features.
For Qualcomm: disable XTRA
For Broadcom: disable LTO
Args:
ad: An AndroidDevice object.
"""
ad.root_adb()
if is_device_wearable(ad):
lto_mode_wearable(ad, False)
elif check_chipset_vendor_by_qualcomm(ad):
disable_qualcomm_orbit_assistance_data(ad)
else:
lto_mode(ad, False)
def disable_qualcomm_orbit_assistance_data(ad):
"""Disable assiatance features for Qualcomm project.
Args:
ad: An AndroidDevice object.
"""
ad.log.info("Disable XTRA-daemon until next reboot.")
ad.adb.shell("killall xtra-daemon", ignore_status=True)
def disable_private_dns_mode(ad):
"""Due to b/118365122, it's better to disable private DNS mode while
testing. 8.8.8.8 private dns sever is unstable now, sometimes server
will not response dns query suddenly.
Args:
ad: An AndroidDevice object.
"""
tutils.get_operator_name(ad.log, ad, subId=None)
if ad.adb.shell("settings get global private_dns_mode") != "off":
ad.log.info("Disable Private DNS mode.")
ad.adb.shell("settings put global private_dns_mode off")
def _init_device(ad):
"""Init GNSS test devices.
Args:
ad: An AndroidDevice object.
"""
check_location_service(ad)
enable_gnss_verbose_logging(ad)
prepare_gps_overlay(ad)
set_screen_always_on(ad)
ad.log.info("Setting Bluetooth state to False")
ad.droid.bluetoothToggleState(False)
set_wifi_and_bt_scanning(ad, True)
disable_private_dns_mode(ad)
init_gtw_gpstool(ad)
if is_device_wearable(ad):
disable_battery_defend(ad)
def prepare_gps_overlay(ad):
"""Set pixellogger gps log mask to
resolve gps logs unreplayable from brcm vendor
"""
if not check_chipset_vendor_by_qualcomm(ad):
overlay_file = "/data/vendor/gps/overlay/gps_overlay.xml"
xml_file = generate_gps_overlay_xml(ad)
try:
ad.log.info("Push gps_overlay to device")
ad.adb.push(xml_file, overlay_file)
ad.adb.shell(f"chmod 777 {overlay_file}")
finally:
xml_folder = os.path.abspath(os.path.join(xml_file, os.pardir))
shutil.rmtree(xml_folder)
def generate_gps_overlay_xml(ad):
"""For r11 devices, the overlay setting is 'Replayable default'
For other brcm devices, the setting is 'Replayable debug'
Returns:
path to the xml file
"""
root_attrib = {
"xmlns": "http://www.glpals.com/",
"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
"xsi:schemaLocation": "http://www.glpals.com/ glconfig.xsd",
}
sub_attrib = {"EnableOnChipStopNotification": "true"}
if not is_device_wearable(ad):
sub_attrib["LogPriMask"] = "LOG_DEBUG"
sub_attrib["LogFacMask"] = "LOG_GLLIO | LOG_GLLAPI | LOG_NMEA | LOG_RAWDATA"
sub_attrib["OnChipLogPriMask"] = "LOG_DEBUG"
sub_attrib["OnChipLogFacMask"] = "LOG_GLLIO | LOG_GLLAPI | LOG_NMEA | LOG_RAWDATA"
temp_path = tempfile.mkdtemp()
xml_file = os.path.join(temp_path, "gps_overlay.xml")
root = ElementTree.Element('glgps')
for key, value in root_attrib.items():
root.attrib[key] = value
ad.log.debug("Sub attrib is %s", sub_attrib)
sub = ElementTree.SubElement(root, 'gll')
for key, value in sub_attrib.items():
sub.attrib[key] = value
xml = ElementTree.ElementTree(root)
xml.write(xml_file, xml_declaration=True, encoding="utf-8", method="xml")
return xml_file
def connect_to_wifi_network(ad, network):
"""Connection logic for open and psk wifi networks.
Args:
ad: An AndroidDevice object.
network: Dictionary with network info.
"""
SSID = network[WifiEnums.SSID_KEY]
ad.ed.clear_all_events()
wutils.reset_wifi(ad)
wutils.start_wifi_connection_scan_and_ensure_network_found(ad, SSID)
for i in range(5):
wutils.wifi_connect(ad, network, check_connectivity=False)
# Validates wifi connection with ping_gateway=False to avoid issue like
# b/254913994.
if wutils.validate_connection(ad, ping_gateway=False):
ad.log.info("WiFi connection is validated")
return
raise signals.TestError("Failed to connect WiFi")
def set_wifi_and_bt_scanning(ad, state=True):
"""Set Wi-Fi and Bluetooth scanning on/off in Settings -> Location
Args:
ad: An AndroidDevice object.
state: True to turn on "Wi-Fi and Bluetooth scanning".
False to turn off "Wi-Fi and Bluetooth scanning".
"""
ad.root_adb()
if state:
ad.adb.shell("settings put global wifi_scan_always_enabled 1")
ad.adb.shell("settings put global ble_scan_always_enabled 1")
ad.log.info("Wi-Fi and Bluetooth scanning are enabled")
else:
ad.adb.shell("settings put global wifi_scan_always_enabled 0")
ad.adb.shell("settings put global ble_scan_always_enabled 0")
ad.log.info("Wi-Fi and Bluetooth scanning are disabled")
def check_location_service(ad):
"""Set location service on.
Verify if location service is available.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
utils.set_location_service(ad, True)
ad.adb.shell("cmd location set-location-enabled true")
location_mode = int(ad.adb.shell("settings get secure location_mode"))
ad.log.info("Current Location Mode >> %d" % location_mode)
if location_mode != 3:
raise signals.TestError("Failed to turn Location on")
def delete_device_folder(ad, folder):
ad.log.info("Folder to be deleted: %s" % folder)
folder_contents = ad.adb.shell(f"ls {folder}", ignore_status=True)
ad.log.debug("Contents to be deleted: %s" % folder_contents)
ad.adb.shell("rm -rf %s" % folder, ignore_status=True)
def remove_pixel_logger_folder(ad):
if check_chipset_vendor_by_qualcomm(ad):
folder = "/sdcard/Android/data/com.android.pixellogger/files/logs/diag_logs"
else:
folder = "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/"
delete_device_folder(ad, folder)
def clear_logd_gnss_qxdm_log(ad):
"""Clear /data/misc/logd,
/storage/emulated/0/Android/data/com.android.gpstool/files and
/data/vendor/radio/diag_logs/logs from previous test item then reboot.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
ad.log.info("Clear Logd, GNSS and PixelLogger Log from previous test item.")
folders_should_be_removed = ["/data/misc/logd"]
ad.adb.shell(
'find %s -name "*.txt" -type f -delete' % GNSSSTATUS_LOG_PATH,
ignore_status=True)
if check_chipset_vendor_by_qualcomm(ad):
output_path = posixpath.join(DEFAULT_QXDM_LOG_PATH, "logs")
folders_should_be_removed += [output_path]
else:
always_on_logger_log_path = ("/data/vendor/gps/logs")
folders_should_be_removed += [always_on_logger_log_path]
for folder in folders_should_be_removed:
delete_device_folder(ad, folder)
remove_pixel_logger_folder(ad)
if not is_device_wearable(ad):
reboot(ad)
def get_gnss_qxdm_log(ad, qdb_path=None):
"""Get /storage/emulated/0/Android/data/com.android.gpstool/files and
/data/vendor/radio/diag_logs/logs for test item.
Args:
ad: An AndroidDevice object.
qdb_path: The path of qdsp6m.qdb on different projects.
"""
log_path = ad.device_log_path
os.makedirs(log_path, exist_ok=True)
gnss_log_name = "gnssstatus_log_%s_%s" % (ad.model, ad.serial)
gnss_log_path = posixpath.join(log_path, gnss_log_name)
os.makedirs(gnss_log_path, exist_ok=True)
ad.log.info("Pull GnssStatus Log to %s" % gnss_log_path)
ad.adb.pull("%s %s" % (GNSSSTATUS_LOG_PATH + ".", gnss_log_path),
timeout=PULL_TIMEOUT, ignore_status=True)
shutil.make_archive(gnss_log_path, "zip", gnss_log_path)
shutil.rmtree(gnss_log_path, ignore_errors=True)
if check_chipset_vendor_by_qualcomm(ad):
output_path = (
"/sdcard/Android/data/com.android.pixellogger/files/logs/"
"diag_logs/.")
else:
output_path = (
"/sdcard/Android/data/com.android.pixellogger/files/logs/gps/.")
qxdm_log_name = "PixelLogger_%s_%s" % (ad.model, ad.serial)
qxdm_log_path = posixpath.join(log_path, qxdm_log_name)
os.makedirs(qxdm_log_path, exist_ok=True)
ad.log.info("Pull PixelLogger Log %s to %s" % (output_path,
qxdm_log_path))
ad.adb.pull("%s %s" % (output_path, qxdm_log_path),
timeout=PULL_TIMEOUT, ignore_status=True)
if check_chipset_vendor_by_qualcomm(ad):
for path in qdb_path:
output = ad.adb.pull("%s %s" % (path, qxdm_log_path),
timeout=PULL_TIMEOUT, ignore_status=True)
if "No such file or directory" in output:
continue
break
shutil.make_archive(qxdm_log_path, "zip", qxdm_log_path)
shutil.rmtree(qxdm_log_path, ignore_errors=True)
def set_mobile_data(ad, state):
"""Set mobile data on or off and check mobile data state.
Args:
ad: An AndroidDevice object.
state: True to enable mobile data. False to disable mobile data.
"""
ad.root_adb()
if state:
if is_device_wearable(ad):
ad.log.info("Enable wearable mobile data.")
ad.adb.shell("settings put global cell_on 1")
else:
ad.log.info("Enable mobile data via RPC call.")
ad.droid.telephonyToggleDataConnection(True)
else:
if is_device_wearable(ad):
ad.log.info("Disable wearable mobile data.")
ad.adb.shell("settings put global cell_on 0")
else:
ad.log.info("Disable mobile data via RPC call.")
ad.droid.telephonyToggleDataConnection(False)
time.sleep(5)
ret_val = is_mobile_data_on(ad)
if state and ret_val:
ad.log.info("Mobile data is enabled and set to %s" % ret_val)
elif not state and not ret_val:
ad.log.info("Mobile data is disabled and set to %s" % ret_val)
else:
ad.log.error("Mobile data is at unknown state and set to %s" % ret_val)
def gnss_trigger_modem_ssr_by_adb(ad, dwelltime=60):
"""Trigger modem SSR crash by adb and verify if modem crash and recover
successfully.
Args:
ad: An AndroidDevice object.
dwelltime: Waiting time for modem reset. Default is 60 seconds.
Returns:
True if success.
False if failed.
"""
begin_time = get_current_epoch_time()
ad.root_adb()
cmds = ("echo restart > /sys/kernel/debug/msm_subsys/modem",
r"echo 'at+cfun=1,1\r' > /dev/at_mdm0")
for cmd in cmds:
ad.log.info("Triggering modem SSR crash by %s" % cmd)
output = ad.adb.shell(cmd, ignore_status=True)
if "No such file or directory" in output:
continue
break
time.sleep(dwelltime)
ad.send_keycode("HOME")
logcat_results = ad.search_logcat("SSRObserver", begin_time)
if logcat_results:
for ssr in logcat_results:
if "mSubsystem='modem', mCrashReason" in ssr["log_message"]:
ad.log.debug(ssr["log_message"])
ad.log.info("Triggering modem SSR crash successfully.")
return True
raise signals.TestError("Failed to trigger modem SSR crash")
raise signals.TestError("No SSRObserver found in logcat")
def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60):
"""Trigger modem SSR crash by mds tool and verify if modem crash and recover
successfully.
Args:
ad: An AndroidDevice object.
dwelltime: Waiting time for modem reset. Default is 60 seconds.
"""
mds_check = ad.adb.shell("pm path com.google.mdstest")
if not mds_check:
raise signals.TestError("MDS Tool is not properly installed.")
ad.root_adb()
cmd = ('am instrument -w -e request "4b 25 03 00" '
'"com.google.mdstest/com.google.mdstest.instrument'
'.ModemCommandInstrumentation"')
ad.log.info("Triggering modem SSR crash by MDS")
output = ad.adb.shell(cmd, ignore_status=True)
ad.log.debug(output)
time.sleep(dwelltime)
ad.send_keycode("HOME")
if "SUCCESS" in output:
ad.log.info("Triggering modem SSR crash by MDS successfully.")
else:
raise signals.TestError(
"Failed to trigger modem SSR crash by MDS. \n%s" % output)
def check_xtra_download(ad, begin_time):
"""Verify XTRA download success log message in logcat.
Args:
ad: An AndroidDevice object.
begin_time: test begin time
Returns:
True: xtra_download if XTRA downloaded and injected successfully
otherwise return False.
"""
ad.send_keycode("HOME")
if check_chipset_vendor_by_qualcomm(ad):
xtra_results = ad.search_logcat("XTRA download success. "
"inject data into modem", begin_time)
if xtra_results:
ad.log.debug("%s" % xtra_results[-1]["log_message"])
ad.log.info("XTRA downloaded and injected successfully.")
return True
ad.log.error("XTRA downloaded FAIL.")
else:
if is_device_wearable(ad):
lto_results = ad.search_logcat("GnssLocationProvider: "
"calling native_inject_psds_data", begin_time)
else:
lto_results = ad.search_logcat("GnssPsdsAidl: injectPsdsData: "
"psdsType: 1", begin_time)
if lto_results:
ad.log.debug("%s" % lto_results[-1]["log_message"])
ad.log.info("LTO downloaded and injected successfully.")
return True
ad.log.error("LTO downloaded and inject FAIL.")
return False
def pull_package_apk(ad, package_name):
"""Pull apk of given package_name from device.
Args:
ad: An AndroidDevice object.
package_name: Package name of apk to pull.
Returns:
The temp path of pulled apk.
"""
out = ad.adb.shell("pm path %s" % package_name)
result = re.search(r"package:(.*)", out)
if not result:
raise signals.TestError("Couldn't find apk of %s" % package_name)
else:
apk_source = result.group(1)
ad.log.info("Get apk of %s from %s" % (package_name, apk_source))
apk_path = tempfile.mkdtemp()
ad.pull_files([apk_source], apk_path)
return apk_path
def pull_gnss_cfg_file(ad, file):
"""Pull given gnss cfg file from device.
Args:
ad: An AndroidDevice object.
file: CFG file in device to pull.
Returns:
The temp path of pulled gnss cfg file in host.
"""
ad.root_adb()
host_dest = tempfile.mkdtemp()
ad.pull_files(file, host_dest)
for path_key in os.listdir(host_dest):
if fnmatch.fnmatch(path_key, "*.cfg"):
gnss_cfg_file = os.path.join(host_dest, path_key)
break
else:
raise signals.TestError("No cfg file is found in %s" % host_dest)
return gnss_cfg_file
def reinstall_package_apk(ad, package_name, apk_path):
"""Reinstall apk of given package_name.
Args:
ad: An AndroidDevice object.
package_name: Package name of apk.
apk_path: The temp path of pulled apk.
"""
for path_key in os.listdir(apk_path):
if fnmatch.fnmatch(path_key, "*.apk"):
apk_path = os.path.join(apk_path, path_key)
break
else:
raise signals.TestError("No apk is found in %s" % apk_path)
ad.log.info("Re-install %s with path: %s" % (package_name, apk_path))
ad.adb.shell("settings put global verifier_verify_adb_installs 0")
ad.adb.install("-r -d -g --user 0 %s" % apk_path)
package_check = ad.adb.shell("pm path %s" % package_name)
if not package_check:
tutils.abort_all_tests(
ad.log, "%s is not properly re-installed." % package_name)
ad.log.info("%s is re-installed successfully." % package_name)
def init_gtw_gpstool(ad):
"""Init GTW_GPSTool apk.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
gpstool_path = pull_package_apk(ad, "com.android.gpstool")
reinstall_package_apk(ad, "com.android.gpstool", gpstool_path)
shutil.rmtree(gpstool_path, ignore_errors=True)
def fastboot_factory_reset(ad, state=True):
"""Factory reset the device in fastboot mode.
Pull sl4a apk from device. Terminate all sl4a sessions,
Reboot the device to bootloader,
factory reset the device by fastboot.
Reboot the device. wait for device to complete booting
Re-install and start an sl4a session.
Args:
ad: An AndroidDevice object.
State: True for exit_setup_wizard, False for not exit_setup_wizard.
Returns:
True if factory reset process complete.
"""
status = True
mds_path = ""
gnss_cfg_file = ""
gnss_cfg_path = "/vendor/etc/mdlog"
default_gnss_cfg = "/vendor/etc/mdlog/DEFAULT+SECURITY+FULLDPL+GPS.cfg"
sl4a_path = pull_package_apk(ad, SL4A_APK_NAME)
gpstool_path = pull_package_apk(ad, "com.android.gpstool")
if check_chipset_vendor_by_qualcomm(ad):
mds_path = pull_package_apk(ad, "com.google.mdstest")
gnss_cfg_file = pull_gnss_cfg_file(ad, default_gnss_cfg)
stop_pixel_logger(ad)
ad.stop_services()
for i in range(1, 4):
try:
if ad.serial in list_adb_devices():
ad.log.info("Reboot to bootloader")
ad.adb.reboot("bootloader", ignore_status=True)
time.sleep(10)
if ad.serial in list_fastboot_devices():
ad.log.info("Factory reset in fastboot")
ad.fastboot._w(timeout=300, ignore_status=True)
time.sleep(30)
ad.log.info("Reboot in fastboot")
ad.fastboot.reboot()
ad.wait_for_boot_completion()
ad.root_adb()
if ad.skip_sl4a:
break
if ad.is_sl4a_installed():
break
if is_device_wearable(ad):
ad.log.info("Wait 5 mins for wearable projects system busy time.")
time.sleep(300)
reinstall_package_apk(ad, SL4A_APK_NAME, sl4a_path)
reinstall_package_apk(ad, "com.android.gpstool", gpstool_path)
if check_chipset_vendor_by_qualcomm(ad):
reinstall_package_apk(ad, "com.google.mdstest", mds_path)
ad.push_system_file(gnss_cfg_file, gnss_cfg_path)
time.sleep(10)
break
except Exception as e:
ad.log.error(e)
if i == attempts:
tutils.abort_all_tests(ad.log, str(e))
time.sleep(5)
try:
ad.start_adb_logcat()
except Exception as e:
ad.log.error(e)
if state:
ad.exit_setup_wizard()
if ad.skip_sl4a:
return status
tutils.bring_up_sl4a(ad)
for path in [sl4a_path, gpstool_path, mds_path, gnss_cfg_file]:
shutil.rmtree(path, ignore_errors=True)
return status
def clear_aiding_data_by_gtw_gpstool(ad):
"""Launch GTW GPSTool and Clear all GNSS aiding data.
Wait 5 seconds for GTW GPStool to clear all GNSS aiding
data properly.
Args:
ad: An AndroidDevice object.
"""
if not check_chipset_vendor_by_qualcomm(ad):
delete_lto_file(ad)
ad.log.info("Launch GTW GPSTool and Clear all GNSS aiding data")
ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode clear")
time.sleep(10)
def start_gnss_by_gtw_gpstool(ad,
state,
api_type="gnss",
bgdisplay=False,
freq=0,
lowpower=False,
meas=False):
"""Start or stop GNSS on GTW_GPSTool.
Args:
ad: An AndroidDevice object.
state: True to start GNSS. False to Stop GNSS.
api_type: Different API for location fix. Use gnss/flp/nmea
bgdisplay: true to run GTW when Display off. false to not run GTW when
Display off.
freq: An integer to set location update frequency.
meas: A Boolean to set GNSS measurement registration.
lowpower: A boolean to set GNSS LowPowerMode.
"""
cmd = "am start -S -n com.android.gpstool/.GPSTool --es mode gps"
if not state:
ad.log.info("Stop %s on GTW_GPSTool." % api_type)
cmd = "am broadcast -a com.android.gpstool.stop_gps_action"
else:
options = ("--es type {} --ei freq {} --ez BG {} --ez meas {} --ez "
"lowpower {}").format(api_type, freq, bgdisplay, meas, lowpower)
cmd = cmd + " " + options
ad.adb.shell(cmd, ignore_status=True, timeout = 300)
time.sleep(3)
def process_gnss_by_gtw_gpstool(ad,
criteria,
api_type="gnss",
clear_data=True,
meas_flag=False,
freq=0,
bg_display=False):
"""Launch GTW GPSTool and Clear all GNSS aiding data
Start GNSS tracking on GTW_GPSTool.
Args:
ad: An AndroidDevice object.
criteria: Criteria for current test item.
api_type: Different API for location fix. Use gnss/flp/nmea
clear_data: True to clear GNSS aiding data. False is not to. Default
set to True.
meas_flag: True to enable GnssMeasurement. False is not to. Default
set to False.
freq: An integer to set location update frequency. Default set to 0.
bg_display: To enable GPS tool bg display or not
Returns:
First fix datetime obj
Raises:
signals.TestFailure: when first fixed is over criteria or not even get first fixed
"""
retries = 3
for i in range(retries):
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
check_adblog_functionality(ad)
check_location_runtime_permissions(
ad, GNSSTOOL_PACKAGE_NAME, GNSSTOOL_PERMISSIONS)
begin_time = get_current_epoch_time()
if clear_data:
clear_aiding_data_by_gtw_gpstool(ad)
ad.log.info("Start %s on GTW_GPSTool - attempt %d" % (api_type.upper(),
i+1))
start_gnss_by_gtw_gpstool(ad, state=True, api_type=api_type, meas=meas_flag, freq=freq,
bgdisplay=bg_display)
for _ in range(10 + criteria):
logcat_results = ad.search_logcat("First fixed", begin_time)
if logcat_results:
ad.log.debug(logcat_results[-1]["log_message"])
first_fixed = int(logcat_results[-1]["log_message"].split()[-1])
ad.log.info("%s First fixed = %.3f seconds" %
(api_type.upper(), first_fixed/1000))
if (first_fixed/1000) <= criteria:
return logcat_results[-1]["datetime_obj"]
start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
raise signals.TestFailure("Fail to get %s location fixed "
"within %d seconds criteria."
% (api_type.upper(), criteria))
time.sleep(1)
check_current_focus_app(ad)
start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
raise signals.TestFailure("Fail to get %s location fixed within %d "
"attempts." % (api_type.upper(), retries))
def start_ttff_by_gtw_gpstool(ad,
ttff_mode,
iteration,
aid_data=False,
raninterval=False,
mininterval=10,
maxinterval=40,
hot_warm_sleep=300,
timeout=60):
"""Identify which TTFF mode for different test items.
Args:
ad: An AndroidDevice object.
ttff_mode: TTFF Test mode for current test item.
iteration: Iteration of TTFF cycles.
aid_data: Boolean for identify aid_data existed or not
raninterval: Boolean for identify random interval of TTFF in enable or not.
mininterval: Minimum value of random interval pool. The unit is second.
maxinterval: Maximum value of random interval pool. The unit is second.
hot_warm_sleep: Wait time for acquiring Almanac.
timeout: TTFF time out. The unit is second.
Returns:
latest_start_time: (Datetime) the start time of latest successful TTFF
"""
begin_time = get_current_epoch_time()
ad.log.debug("[start_ttff] Search logcat start time: %s" % begin_time)
if (ttff_mode == "hs" or ttff_mode == "ws") and not aid_data:
ad.log.info("Wait {} seconds to start TTFF {}...".format(
hot_warm_sleep, ttff_mode.upper()))
time.sleep(hot_warm_sleep)
if ttff_mode == "cs":
ad.log.info("Start TTFF Cold Start...")
time.sleep(3)
elif ttff_mode == "csa":
ad.log.info("Start TTFF CSWith Assist...")
time.sleep(3)
for i in range(1, 4):
try:
ad.log.info(f"Before sending TTFF gms version is {get_gms_version(ad)}")
ad.adb.shell("am broadcast -a com.android.gpstool.ttff_action "
"--es ttff {} --es cycle {} --ez raninterval {} "
"--ei mininterval {} --ei maxinterval {}".format(
ttff_mode, iteration, raninterval, mininterval,
maxinterval))
except job.TimeoutError:
# If this is the last retry and we still get timeout error, raises the timeoutError.
if i == 3:
raise
# Currently we encounter lots of timeout issue in Qualcomm devices. But so far we don't
# know the root cause yet. In order to continue the test, we ignore the timeout for
# retry.
ad.log.warn("Send TTFF command timeout.")
ad.log.info(f"Current gms version is {get_gms_version(ad)}")
# Wait 2 second to retry
time.sleep(2)
continue
time.sleep(1)
result = ad.search_logcat("act=com.android.gpstool.start_test_action", begin_time)
if result:
ad.log.debug("TTFF start log %s" % result)
latest_start_time = max(list(map(lambda x: x['datetime_obj'], result)))
ad.log.info("Send TTFF start_test_action successfully.")
return latest_start_time
else:
check_current_focus_app(ad)
raise signals.TestError("Fail to send TTFF start_test_action.")
def gnss_tracking_via_gtw_gpstool(ad,
criteria,
api_type="gnss",
testtime=60,
meas_flag=False,
freq=0,
is_screen_off=False):
"""Start GNSS/FLP tracking tests for input testtime on GTW_GPSTool.
Args:
ad: An AndroidDevice object.
criteria: Criteria for current TTFF.
api_type: Different API for location fix. Use gnss/flp/nmea
testtime: Tracking test time for minutes. Default set to 60 minutes.
meas_flag: True to enable GnssMeasurement. False is not to. Default
set to False.
freq: An integer to set location update frequency. Default set to 0.
is_screen_off: whether to turn off during tracking
"""
process_gnss_by_gtw_gpstool(
ad, criteria=criteria, api_type=api_type, meas_flag=meas_flag, freq=freq,
bg_display=is_screen_off)
ad.log.info("Start %s tracking test for %d minutes" % (api_type.upper(),
testtime))
begin_time = get_current_epoch_time()
with set_screen_status(ad, off=is_screen_off):
wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type)
ad.log.info("Successfully tested for %d minutes" % testtime)
start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss",
ignore_hal_crash=False):
"""Waits for GNSS tracking to finish and detect GNSS crash during the waiting time.
Args:
ad: An AndroidDevice object.
begin_time: The start time of tracking.
api_type: Different API for location fix. Use gnss/flp/nmea
testtime: Tracking test time for minutes.
ignore_hal_crash: To ignore HAL crash error no not.
"""
while get_current_epoch_time() - begin_time < testtime * 60 * 1000:
detect_crash_during_tracking(ad, begin_time, api_type, ignore_hal_crash)
# add sleep here to avoid too many request and cause device not responding
time.sleep(1)
def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location):
"""Run GNSS TTFF test with selected mode and parse the results.
Args:
mode: "cs", "ws" or "hs"
criteria: Criteria for the TTFF.
"""
# Before running TTFF, we will run tracking and try to get first fixed.
# But the TTFF before TTFF doesn't apply to any criteria, so we set a maximum value.
process_gnss_by_gtw_gpstool(ad, criteria=FIRST_FIXED_MAX_WAITING_TIME)
ttff_start_time = start_ttff_by_gtw_gpstool(ad, mode, test_cycle)
ttff_data = process_ttff_by_gtw_gpstool(ad, ttff_start_time, true_location)
result = check_ttff_data(ad, ttff_data, gnss_constant.TTFF_MODE.get(mode), criteria)
asserts.assert_true(
result, "TTFF %s fails to reach designated criteria of %d "
"seconds." % (gnss_constant.TTFF_MODE.get(mode), criteria))
def parse_gtw_gpstool_log(ad, true_position, api_type="gnss", validate_gnssstatus=False):
"""Process GNSS/FLP API logs from GTW GPSTool and output track_data to
test_run_info for ACTS plugin to parse and display on MobileHarness as
Property.
Args:
ad: An AndroidDevice object.
true_position: Coordinate as [latitude, longitude] to calculate
position error.
api_type: Different API for location fix. Use gnss/flp/nmea
validate_gnssstatus: Validate gnssstatus or not
Returns:
A dict of location reported from GPSTool
{<utc_time>: TRACK_REPORT, ...}
"""
gnssstatus_count = 0
test_logfile = {}
track_data = {}
ant_top4_cn = 0
ant_cn = 0
base_top4_cn = 0
base_cn = 0
track_lat = 0
track_long = 0
l5flag = "false"
gps_datetime_pattern = re.compile("(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{0,5})")
gps_datetime_format = "%Y/%m/%d %H:%M:%S.%f"
file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
% GNSSSTATUS_LOG_PATH))
if file_count != 1:
ad.log.warn("%d API logs exist." % file_count)
dir_file = ad.adb.shell("ls -tr %s" % GNSSSTATUS_LOG_PATH).split()
for path_key in dir_file:
if fnmatch.fnmatch(path_key, "*.txt"):
logpath = posixpath.join(GNSSSTATUS_LOG_PATH, path_key)
out = ad.adb.shell("wc -c %s" % logpath)
file_size = int(out.split(" ")[0])
if file_size < 2000:
ad.log.info("Skip log %s due to log size %d bytes" %
(path_key, file_size))
continue
test_logfile = logpath
if not test_logfile:
raise signals.TestError("Failed to get test log file in device.")
lines = ad.adb.shell("cat %s" % test_logfile).split("\n")
gnss_svid_container = gnssstatus_utils.GnssSvidContainer()
for line in lines:
if line.startswith('Fix'):
try:
gnss_status = gnssstatus_utils.GnssStatus(line)
gnssstatus_count += 1
except gnssstatus_utils.RegexParseException as e:
ad.log.warn(e)
continue
gnss_svid_container.add_satellite(gnss_status)
if validate_gnssstatus:
gnss_status.validate_gnssstatus()
if "Antenna_History Avg Top4" in line:
ant_top4_cn = float(line.split(":")[-1].strip())
elif "Antenna_History Avg" in line:
ant_cn = float(line.split(":")[-1].strip())
elif "Baseband_History Avg Top4" in line:
base_top4_cn = float(line.split(":")[-1].strip())
elif "Baseband_History Avg" in line:
base_cn = float(line.split(":")[-1].strip())
elif "L5 used in fix" in line:
l5flag = line.split(":")[-1].strip()
elif "Latitude" in line:
track_lat = float(line.split(":")[-1].strip())
elif "Longitude" in line:
track_long = float(line.split(":")[-1].strip())
elif "Read:" in line:
target = re.search(gps_datetime_pattern, line)
device_time = datetime.strptime(target.group(1), gps_datetime_format)
elif "Time" in line:
target = re.search(gps_datetime_pattern, line)
track_utc = target.group(1)
report_time = datetime.strptime(track_utc, gps_datetime_format)
if track_utc in track_data.keys():
continue
pe = calculate_position_error(track_lat, track_long, true_position)
track_data[track_utc] = TRACK_REPORT(l5flag=l5flag,
pe=pe,
ant_top4cn=ant_top4_cn,
ant_cn=ant_cn,
base_top4cn=base_top4_cn,
base_cn=base_cn,
device_time=device_time,
report_time=report_time,
)
ad.log.info("Total %d gnssstatus samples verified" %gnssstatus_count)
ad.log.debug(track_data)
prop_basename = UPLOAD_TO_SPONGE_PREFIX + f"{api_type.upper()}_tracking_"
time_list = sorted(track_data.keys())
l5flag_list = [track_data[key].l5flag for key in time_list]
pe_list = [float(track_data[key].pe) for key in time_list]
ant_top4cn_list = [float(track_data[key].ant_top4cn) for key in time_list]
ant_cn_list = [float(track_data[key].ant_cn) for key in time_list]
base_top4cn_list = [float(track_data[key].base_top4cn) for key in time_list]
base_cn_list = [float(track_data[key].base_cn) for key in time_list]
ad.log.info(prop_basename+"StartTime %s" % time_list[0].replace(" ", "-"))
ad.log.info(prop_basename+"EndTime %s" % time_list[-1].replace(" ", "-"))
ad.log.info(prop_basename+"TotalFixPoints %d" % len(time_list))
ad.log.info(prop_basename+"L5FixRate "+'{percent:.2%}'.format(
percent=l5flag_list.count("true")/len(l5flag_list)))
ad.log.info(prop_basename+"AvgDis %.1f" % (sum(pe_list)/len(pe_list)))
ad.log.info(prop_basename+"MaxDis %.1f" % max(pe_list))
ad.log.info(prop_basename+"Ant_AvgTop4Signal %.1f" % ant_top4cn_list[-1])
ad.log.info(prop_basename+"Ant_AvgSignal %.1f" % ant_cn_list[-1])
ad.log.info(prop_basename+"Base_AvgTop4Signal %.1f" % base_top4cn_list[-1])
ad.log.info(prop_basename+"Base_AvgSignal %.1f" % base_cn_list[-1])
_log_svid_info(gnss_svid_container, prop_basename, ad)
return track_data
def verify_gps_time_should_be_close_to_device_time(ad, tracking_result):
"""Check the time gap between GPS time and device time.
In normal cases, the GPS time should be close to device time. But if GPS week rollover happens,
the GPS time may goes back to 20 years ago. In order to capture this issue, we assert the time
diff between the GPS time and device time.
Args:
ad: The device under test.
tracking_result: The result we get from GNSS tracking.
"""
ad.log.info("Validating GPS/Device time difference")
max_time_diff_in_seconds = 2.0
exceed_report = []
for report in tracking_result.values():
time_diff_in_seconds = abs((report.report_time - report.device_time).total_seconds())
if time_diff_in_seconds > max_time_diff_in_seconds:
message = (f"GPS time: {report.report_time} Device time: {report.device_time} "
f"diff: {time_diff_in_seconds}")
exceed_report.append(message)
fail_message = (f"The following items exceed {max_time_diff_in_seconds}s\n" +
"\n".join(exceed_report))
asserts.assert_false(exceed_report, msg=fail_message)
def validate_location_fix_rate(ad, location_reported, run_time, fix_rate_criteria):
"""Check location reported count
The formula is "total_fix_points / (run_time * 60)"
When the result is lower than fix_rate_criteria, fail the test case
Args:
ad: AndroidDevice object
location_reported: (Enumerate) Contains the reported location
run_time: (int) How many minutes do we need to verify
fix_rate_criteria: The threshold of the pass criteria
if we expect fix rate to be 99%, then fix_rate_criteria should be 0.99
"""
ad.log.info("Validating fix rate")
pass_criteria = run_time * 60 * fix_rate_criteria
actual_location_count = len(location_reported)
# The fix rate may exceed 100% occasionally, to standardlize the result
# set maximum fix rate to 100%
actual_fix_rate = min(1, (actual_location_count / (run_time * 60)))
actual_fix_rate_percentage = f"{actual_fix_rate:.0%}"
log_prefix = UPLOAD_TO_SPONGE_PREFIX + f"FIX_RATE_"
ad.log.info("%sresult %s" % (log_prefix, actual_fix_rate_percentage))
ad.log.debug("Actual location count %s" % actual_location_count)
fail_message = (f"Fail to meet criteria. Expect to have at least {pass_criteria} location count"
f" Actual: {actual_location_count}")
asserts.assert_true(pass_criteria <= actual_location_count, msg=fail_message)
def _log_svid_info(container, log_prefix, ad):
"""Write GnssSvidContainer svid information into logger
Args:
container: A GnssSvidContainer object
log_prefix:
A prefix used to specify the log will be upload to dashboard
ad: An AndroidDevice object
"""
for sv_type, svids in container.used_in_fix.items():
message = f"{log_prefix}{sv_type} {len(svids)}"
ad.log.info(message)
ad.log.debug("Satellite used in fix %s ids are: %s", sv_type, svids)
for sv_type, svids in container.not_used_in_fix.items():
ad.log.debug("Satellite not used in fix %s ids are: %s", sv_type, svids)
def process_ttff_by_gtw_gpstool(ad, begin_time, true_position, api_type="gnss"):
"""Process TTFF and record results in ttff_data.
Args:
ad: An AndroidDevice object.
begin_time: test begin time.
true_position: Coordinate as [latitude, longitude] to calculate
position error.
api_type: Different API for location fix. Use gnss/flp/nmea
Returns:
ttff_data: A dict of all TTFF data.
"""
ttff_lat = 0
ttff_lon = 0
utc_time = epoch_to_human_time(get_current_epoch_time())
ttff_data = {}
ttff_loop_time = get_current_epoch_time()
while True:
if get_current_epoch_time() - ttff_loop_time >= 120000:
raise signals.TestError("Fail to search specific GPSService "
"message in logcat. Abort test.")
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
logcat_results = ad.search_logcat("write TTFF log", ttff_loop_time)
if logcat_results:
ttff_loop_time = get_current_epoch_time()
ttff_log = logcat_results[-1]["log_message"].split()
ttff_loop = int(ttff_log[8].split(":")[-1])
ttff_sec = float(ttff_log[11])
if ttff_sec != 0.0:
ttff_ant_cn = float(ttff_log[18].strip("]"))
ttff_base_cn = float(ttff_log[25].strip("]"))
if api_type == "gnss":
gnss_results = ad.search_logcat("GPSService: Check item",
begin_time)
if gnss_results:
ad.log.debug(gnss_results[-1]["log_message"])
gnss_location_log = \
gnss_results[-1]["log_message"].split()
ttff_lat = float(
gnss_location_log[8].split("=")[-1].strip(","))
ttff_lon = float(
gnss_location_log[9].split("=")[-1].strip(","))
loc_time = int(
gnss_location_log[10].split("=")[-1].strip(","))
utc_time = epoch_to_human_time(loc_time)
ttff_haccu = float(
gnss_location_log[11].split("=")[-1].strip(","))
elif api_type == "flp":
flp_results = ad.search_logcat("GPSService: FLP Location",
begin_time)
if flp_results:
ad.log.debug(flp_results[-1]["log_message"])
flp_location_log = flp_results[-1][
"log_message"].split()
ttff_lat = float(flp_location_log[8].split(",")[0])
ttff_lon = float(flp_location_log[8].split(",")[1])
ttff_haccu = float(flp_location_log[9].split("=")[1])
utc_time = epoch_to_human_time(get_current_epoch_time())
else:
ttff_ant_cn = float(ttff_log[19].strip("]"))
ttff_base_cn = float(ttff_log[26].strip("]"))
ttff_lat = 0
ttff_lon = 0
ttff_haccu = 0
utc_time = epoch_to_human_time(get_current_epoch_time())
ad.log.debug("TTFF Loop %d - (Lat, Lon) = (%s, %s)" % (ttff_loop,
ttff_lat,
ttff_lon))
ttff_pe = calculate_position_error(
ttff_lat, ttff_lon, true_position)
ttff_data[ttff_loop] = TTFF_REPORT(utc_time=utc_time,
ttff_loop=ttff_loop,
ttff_sec=ttff_sec,
ttff_pe=ttff_pe,
ttff_ant_cn=ttff_ant_cn,
ttff_base_cn=ttff_base_cn,
ttff_haccu=ttff_haccu)
ad.log.info("UTC Time = %s, Loop %d = %.1f seconds, "
"Position Error = %.1f meters, "
"Antenna Average Signal = %.1f dbHz, "
"Baseband Average Signal = %.1f dbHz, "
"Horizontal Accuracy = %.1f meters" % (utc_time,
ttff_loop,
ttff_sec,
ttff_pe,
ttff_ant_cn,
ttff_base_cn,
ttff_haccu))
stop_gps_results = ad.search_logcat("stop gps test", begin_time)
if stop_gps_results:
ad.send_keycode("HOME")
break
crash_result = ad.search_logcat("Force finishing activity "
"com.android.gpstool/.GPSTool",
begin_time)
if crash_result:
raise signals.TestError("GPSTool crashed. Abort test.")
# wait 5 seconds to avoid logs not writing into logcat yet
time.sleep(5)
return ttff_data
def check_ttff_data(ad, ttff_data, ttff_mode, criteria):
"""Verify all TTFF results from ttff_data.
Args:
ad: An AndroidDevice object.
ttff_data: TTFF data of secs, position error and signal strength.
ttff_mode: TTFF Test mode for current test item.
criteria: Criteria for current test item.
Returns:
True: All TTFF results are within criteria.
False: One or more TTFF results exceed criteria or Timeout.
"""
ad.log.info("%d iterations of TTFF %s tests finished."
% (len(ttff_data.keys()), ttff_mode))
ad.log.info("%s PASS criteria is %d seconds" % (ttff_mode, criteria))
ad.log.debug("%s TTFF data: %s" % (ttff_mode, ttff_data))
if len(ttff_data.keys()) == 0:
ad.log.error("GTW_GPSTool didn't process TTFF properly.")
raise ValueError("No ttff loop is done")
ttff_property_key_and_value(ad, ttff_data, ttff_mode)
if any(float(ttff_data[key].ttff_sec) == 0.0 for key in ttff_data.keys()):
ad.log.error("One or more TTFF %s Timeout" % ttff_mode)
return False
elif any(float(ttff_data[key].ttff_sec) >= criteria for key in
ttff_data.keys()):
ad.log.error("One or more TTFF %s are over test criteria %d seconds"
% (ttff_mode, criteria))
return False
ad.log.info("All TTFF %s are within test criteria %d seconds."
% (ttff_mode, criteria))
return True
def ttff_property_key_and_value(ad, ttff_data, ttff_mode):
"""Output ttff_data to test_run_info for ACTS plugin to parse and display
on MobileHarness as Property.
Args:
ad: An AndroidDevice object.
ttff_data: TTFF data of secs, position error and signal strength.
ttff_mode: TTFF Test mode for current test item.
"""
timeout_ttff = 61
prop_basename = "TestResult "+ttff_mode.replace(" ", "_")+"_TTFF_"
sec_list = [float(ttff_data[key].ttff_sec) for key in ttff_data.keys()]
pe_list = [float(ttff_data[key].ttff_pe) for key in ttff_data.keys()]
ant_cn_list = [float(ttff_data[key].ttff_ant_cn) for key in
ttff_data.keys()]
base_cn_list = [float(ttff_data[key].ttff_base_cn) for key in
ttff_data.keys()]
haccu_list = [float(ttff_data[key].ttff_haccu) for key in
ttff_data.keys()]
timeoutcount = sec_list.count(0.0)
sec_list = sorted(sec_list)
if len(sec_list) == timeoutcount:
median_ttff = avgttff = timeout_ttff
else:
avgttff = sum(sec_list)/(len(sec_list) - timeoutcount)
median_ttff = median(sec_list)
if timeoutcount != 0:
maxttff = timeout_ttff
else:
maxttff = max(sec_list)
avgdis = sum(pe_list)/len(pe_list)
maxdis = max(pe_list)
ant_avgcn = sum(ant_cn_list)/len(ant_cn_list)
base_avgcn = sum(base_cn_list)/len(base_cn_list)
avg_haccu = sum(haccu_list)/len(haccu_list)
ad.log.info(prop_basename+"AvgTime %.1f" % avgttff)
ad.log.info(prop_basename+"MedianTime %.1f" % median_ttff)
ad.log.info(prop_basename+"MaxTime %.1f" % maxttff)
ad.log.info(prop_basename+"TimeoutCount %d" % timeoutcount)
ad.log.info(prop_basename+"AvgDis %.1f" % avgdis)
ad.log.info(prop_basename+"MaxDis %.1f" % maxdis)
ad.log.info(prop_basename+"Ant_AvgSignal %.1f" % ant_avgcn)
ad.log.info(prop_basename+"Base_AvgSignal %.1f" % base_avgcn)
ad.log.info(prop_basename+"Avg_Horizontal_Accuracy %.1f" % avg_haccu)
def calculate_position_error(latitude, longitude, true_position):
"""Use haversine formula to calculate position error base on true location
coordinate.
Args:
latitude: latitude of location fixed in the present.
longitude: longitude of location fixed in the present.
true_position: [latitude, longitude] of true location coordinate.
Returns:
position_error of location fixed in the present.
"""
radius = 6371009
dlat = math.radians(latitude - true_position[0])
dlon = math.radians(longitude - true_position[1])
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(math.radians(true_position[0])) * \
math.cos(math.radians(latitude)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return radius * c
def launch_google_map(ad):
"""Launch Google Map via intent.
Args:
ad: An AndroidDevice object.
"""
ad.log.info("Launch Google Map.")
try:
if is_device_wearable(ad):
cmd = ("am start -S -n com.google.android.apps.maps/"
"com.google.android.apps.gmmwearable.MainActivity")
else:
cmd = ("am start -S -n com.google.android.apps.maps/"
"com.google.android.maps.MapsActivity")
ad.adb.shell(cmd)
ad.send_keycode("BACK")
ad.force_stop_apk("com.google.android.apps.maps")
ad.adb.shell(cmd)
except Exception as e:
ad.log.error(e)
raise signals.TestError("Failed to launch google map.")
check_current_focus_app(ad)
def check_current_focus_app(ad):
"""Check to see current focused window and app.
Args:
ad: An AndroidDevice object.
Returns:
string: the current focused window / app
"""
time.sleep(1)
current = ad.adb.shell(
"dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'")
ad.log.debug("\n"+current)
return current
def check_location_api(ad, retries):
"""Verify if GnssLocationProvider API reports location.
Args:
ad: An AndroidDevice object.
retries: Retry time.
Returns:
True: GnssLocationProvider API reports location.
otherwise return False.
"""
for i in range(retries):
begin_time = get_current_epoch_time()
ad.log.info("Try to get location report from GnssLocationProvider API "
"- attempt %d" % (i+1))
while get_current_epoch_time() - begin_time <= 30000:
logcat_results = ad.search_logcat("reportLocation", begin_time)
if logcat_results:
ad.log.info("%s" % logcat_results[-1]["log_message"])
ad.log.info("GnssLocationProvider reports location "
"successfully.")
return True
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
ad.log.error("GnssLocationProvider is unable to report location.")
return False
def check_network_location(ad, retries, location_type, criteria=30):
"""Verify if NLP reports location after requesting via GPSTool.
Args:
ad: An AndroidDevice object.
retries: Retry time.
location_type: cell or wifi.
criteria: expected nlp return time, default 30 seconds
Returns:
True: NLP reports location.
otherwise return False.
"""
criteria = criteria * 1000
search_pattern = ("GPSTool : networkLocationType = %s" % location_type)
for i in range(retries):
# Capture the begin time 1 seconds before due to time gap.
begin_time = get_current_epoch_time() - 1000
ad.log.info("Try to get NLP status - attempt %d" % (i+1))
ad.adb.shell(
"am start -S -n com.android.gpstool/.GPSTool --es mode nlp")
while get_current_epoch_time() - begin_time <= criteria:
# Search pattern in 1 second interval
time.sleep(1)
result = ad.search_logcat(search_pattern, begin_time)
if result:
ad.log.info("Pattern Found: %s." % result[-1]["log_message"])
ad.send_keycode("BACK")
return True
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
ad.send_keycode("BACK")
ad.log.error("Unable to report network location \"%s\"." % location_type)
return False
def set_attenuator_gnss_signal(ad, attenuator, atten_value):
"""Set attenuation value for different GNSS signal.
Args:
ad: An AndroidDevice object.
attenuator: The attenuator object.
atten_value: attenuation value
"""
try:
ad.log.info(
"Set attenuation value to \"%d\" for GNSS signal." % atten_value)
attenuator[0].set_atten(atten_value)
except Exception as e:
ad.log.error(e)
def set_battery_saver_mode(ad, state):
"""Enable or disable battery saver mode via adb.
Args:
ad: An AndroidDevice object.
state: True is enable Battery Saver mode. False is disable.
"""
ad.root_adb()
if state:
ad.log.info("Enable Battery Saver mode.")
ad.adb.shell("cmd battery unplug")
ad.adb.shell("settings put global low_power 1")
else:
ad.log.info("Disable Battery Saver mode.")
ad.adb.shell("settings put global low_power 0")
ad.adb.shell("cmd battery reset")
def set_gnss_qxdm_mask(ad, masks):
"""Find defined gnss qxdm mask and set as default logging mask.
Args:
ad: An AndroidDevice object.
masks: Defined gnss qxdm mask.
"""
try:
for mask in masks:
if not tlutils.find_qxdm_log_mask(ad, mask):
continue
tlutils.set_qxdm_logger_command(ad, mask)
break
except Exception as e:
ad.log.error(e)
raise signals.TestError("Failed to set any QXDM masks.")
def start_youtube_video(ad, url=None, retries=0):
"""Start youtube video and verify if audio is in music state.
Args:
ad: An AndroidDevice object.
url: Youtube video url.
retries: Retry times if audio is not in music state.
Returns:
True if youtube video is playing normally.
False if youtube video is not playing properly.
"""
for i in range(retries):
ad.log.info("Open an youtube video - attempt %d" % (i+1))
cmd = ("am start -n com.google.android.youtube/"
"com.google.android.youtube.UrlActivity -d \"%s\"" % url)
ad.adb.shell(cmd)
time.sleep(2)
out = ad.adb.shell(
"dumpsys activity | grep NewVersionAvailableActivity")
if out:
ad.log.info("Skip Youtube New Version Update.")
ad.send_keycode("BACK")
if tutils.wait_for_state(ad.droid.audioIsMusicActive, True, 15, 1):
ad.log.info("Started a video in youtube, audio is in MUSIC state")
return True
ad.log.info("Force-Stop youtube and reopen youtube again.")
ad.force_stop_apk("com.google.android.youtube")
check_current_focus_app(ad)
raise signals.TestError("Started a video in youtube, "
"but audio is not in MUSIC state")
def get_gms_version(ad):
cmd = "dumpsys package com.google.android.gms | grep versionName"
return ad.adb.shell(cmd).split("\n")[0].split("=")[1]
def get_baseband_and_gms_version(ad, extra_msg=""):
"""Get current radio baseband and GMSCore version of AndroidDevice object.
Args:
ad: An AndroidDevice object.
extra_msg: Extra message before or after the change.
"""
mpss_version = ""
brcm_gps_version = ""
brcm_sensorhub_version = ""
try:
build_version = ad.adb.getprop("ro.build.id")
baseband_version = ad.adb.getprop("gsm.version.baseband")
gms_version = get_gms_version(ad)
if check_chipset_vendor_by_qualcomm(ad):
mpss_version = ad.adb.shell(
"cat /sys/devices/soc0/images | grep MPSS | cut -d ':' -f 3")
else:
brcm_gps_version = ad.adb.shell("cat /data/vendor/gps/chip.info")
sensorhub_version = ad.adb.shell(
"cat /vendor/firmware/SensorHub.patch | grep ChangeList")
brcm_sensorhub_version = re.compile(
r'<ChangeList=(\w+)>').search(sensorhub_version).group(1)
if not extra_msg:
ad.log.info("TestResult Build_Version %s" % build_version)
ad.log.info("TestResult Baseband_Version %s" % baseband_version)
ad.log.info(
"TestResult GMS_Version %s" % gms_version.replace(" ", ""))
if check_chipset_vendor_by_qualcomm(ad):
ad.log.info("TestResult MPSS_Version %s" % mpss_version)
else:
ad.log.info("TestResult GPS_Version %s" % brcm_gps_version)
ad.log.info(
"TestResult SensorHub_Version %s" % brcm_sensorhub_version)
else:
ad.log.info(
"%s, Baseband_Version = %s" % (extra_msg, baseband_version))
except Exception as e:
ad.log.error(e)
def start_toggle_gnss_by_gtw_gpstool(ad, iteration):
"""Send toggle gnss off/on start_test_action
Args:
ad: An AndroidDevice object.
iteration: Iteration of toggle gnss off/on cycles.
"""
msg_list = []
begin_time = get_current_epoch_time()
try:
for i in range(1, 4):
ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool "
"--es mode toggle --es cycle %d" % iteration)
time.sleep(1)
if is_device_wearable(ad):
# Wait 20 seconds for Wearable low performance time.
time.sleep(20)
if ad.search_logcat("ToggleGPS onResume",
begin_time):
ad.log.info("Send ToggleGPS start_test_action successfully.")
break
elif ad.search_logcat("cmp=com.android.gpstool/.ToggleGPS",
begin_time):
ad.log.info("Send ToggleGPS start_test_action successfully.")
break
else:
check_current_focus_app(ad)
raise signals.TestError("Fail to send ToggleGPS "
"start_test_action within 3 attempts.")
time.sleep(2)
if is_device_wearable(ad):
test_start = ad.search_logcat("GPSService: create toggle GPS log",
begin_time)
else:
test_start = ad.search_logcat("GPSTool_ToggleGPS: startService",
begin_time)
if test_start:
ad.log.info(test_start[-1]["log_message"].split(":")[-1].strip())
else:
raise signals.TestError("Fail to start toggle GPS off/on test.")
# Every iteration is expected to finish within 4 minutes.
while get_current_epoch_time() - begin_time <= iteration * 240000:
crash_end = ad.search_logcat("Force finishing activity "
"com.android.gpstool/.GPSTool",
begin_time)
if crash_end:
raise signals.TestError("GPSTool crashed. Abort test.")
toggle_results = ad.search_logcat("GPSTool : msg", begin_time)
if toggle_results:
for toggle_result in toggle_results:
msg = toggle_result["log_message"]
if not msg in msg_list:
ad.log.info(msg.split(":")[-1].strip())
msg_list.append(msg)
if "timeout" in msg:
raise signals.TestFailure("Fail to get location fixed "
"within 60 seconds.")
if "Test end" in msg:
raise signals.TestPass("Completed quick toggle GNSS "
"off/on test.")
raise signals.TestFailure("Fail to finish toggle GPS off/on test "
"within %d minutes" % (iteration * 4))
finally:
ad.send_keycode("HOME")
def grant_location_permission(ad, option):
"""Grant or revoke location related permission.
Args:
ad: An AndroidDevice object.
option: Boolean to grant or revoke location related permissions.
"""
action = "grant" if option else "revoke"
for permission in LOCATION_PERMISSIONS:
ad.log.info(
"%s permission:%s on %s" % (action, permission, TEST_PACKAGE_NAME))
ad.adb.shell("pm %s %s %s" % (action, TEST_PACKAGE_NAME, permission))
def check_location_runtime_permissions(ad, package, permissions):
"""Check if runtime permissions are granted on selected package.
Args:
ad: An AndroidDevice object.
package: Apk package name to check.
permissions: A list of permissions to be granted.
"""
for _ in range(3):
location_runtime_permission = ad.adb.shell(
"dumpsys package %s | grep ACCESS_FINE_LOCATION" % package)
if "true" not in location_runtime_permission:
ad.log.info("ACCESS_FINE_LOCATION is NOT granted on %s" % package)
for permission in permissions:
ad.log.debug("Grant %s on %s" % (permission, package))
ad.adb.shell("pm grant %s %s" % (package, permission))
else:
ad.log.info("ACCESS_FINE_LOCATION is granted on %s" % package)
break
else:
raise signals.TestError(
"Fail to grant ACCESS_FINE_LOCATION on %s" % package)
def install_mdstest_app(ad, mdsapp):
"""
Install MDS test app in DUT
Args:
ad: An Android Device Object
mdsapp: Installation path of MDSTest app
"""
if not ad.is_apk_installed("com.google.mdstest"):
ad.adb.install("-r %s" % mdsapp, timeout=300, ignore_status=True)
def write_modemconfig(ad, mdsapp, nvitem_dict, modemparfile):
"""
Modify the NV items using modem_tool.par
Note: modem_tool.par
Args:
ad: An Android Device Object
mdsapp: Installation path of MDSTest app
nvitem_dict: dictionary of NV items and values.
modemparfile: modem_tool.par path.
"""
ad.log.info("Verify MDSTest app installed in DUT")
install_mdstest_app(ad, mdsapp)
os.system("chmod 777 %s" % modemparfile)
for key, value in nvitem_dict.items():
if key.isdigit():
op_name = "WriteEFS"
else:
op_name = "WriteNV"
ad.log.info("Modifying the NV{!r} using {}".format(key, op_name))
job.run("{} --op {} --item {} --data '{}'".
format(modemparfile, op_name, key, value))
time.sleep(2)
def verify_modemconfig(ad, nvitem_dict, modemparfile):
"""
Verify the NV items using modem_tool.par
Note: modem_tool.par
Args:
ad: An Android Device Object
nvitem_dict: dictionary of NV items and values
modemparfile: modem_tool.par path.
"""
os.system("chmod 777 %s" % modemparfile)
for key, value in nvitem_dict.items():
if key.isdigit():
op_name = "ReadEFS"
else:
op_name = "ReadNV"
# Sleeptime to avoid Modem communication error
time.sleep(5)
result = job.run(
"{} --op {} --item {}".format(modemparfile, op_name, key))
output = str(result.stdout)
ad.log.info("Actual Value for NV{!r} is {!r}".format(key, output))
if not value.casefold() in output:
ad.log.error("NV Value is wrong {!r} in {!r}".format(value, result))
raise ValueError(
"could not find {!r} in {!r}".format(value, result))
def check_ttff_pe(ad, ttff_data, ttff_mode, pe_criteria):
"""Verify all TTFF results from ttff_data.
Args:
ad: An AndroidDevice object.
ttff_data: TTFF data of secs, position error and signal strength.
ttff_mode: TTFF Test mode for current test item.
pe_criteria: Criteria for current test item.
"""
ad.log.info("%d iterations of TTFF %s tests finished."
% (len(ttff_data.keys()), ttff_mode))
ad.log.info("%s PASS criteria is %f meters" % (ttff_mode, pe_criteria))
ad.log.debug("%s TTFF data: %s" % (ttff_mode, ttff_data))
if len(ttff_data.keys()) == 0:
ad.log.error("GTW_GPSTool didn't process TTFF properly.")
raise signals.TestFailure("GTW_GPSTool didn't process TTFF properly.")
elif any(float(ttff_data[key].ttff_pe) >= pe_criteria for key in
ttff_data.keys()):
ad.log.error("One or more TTFF %s are over test criteria %f meters"
% (ttff_mode, pe_criteria))
raise signals.TestFailure("GTW_GPSTool didn't process TTFF properly.")
else:
ad.log.info("All TTFF %s are within test criteria %f meters." % (
ttff_mode, pe_criteria))
return True
def check_adblog_functionality(ad):
"""Restart adb logcat if system can't write logs into file after checking
adblog file size.
Args:
ad: An AndroidDevice object.
"""
logcat_path = os.path.join(ad.device_log_path, "adblog_%s_debug.txt" %
ad.serial)
if not os.path.exists(logcat_path):
raise signals.TestError("Logcat file %s does not exist." % logcat_path)
original_log_size = os.path.getsize(logcat_path)
ad.log.debug("Original adblog size is %d" % original_log_size)
time.sleep(.5)
current_log_size = os.path.getsize(logcat_path)
ad.log.debug("Current adblog size is %d" % current_log_size)
if current_log_size == original_log_size:
ad.log.warn("System can't write logs into file. Restart adb "
"logcat process now.")
ad.stop_adb_logcat()
ad.start_adb_logcat()
def build_instrumentation_call(package,
runner,
test_methods=None,
options=None):
"""Build an instrumentation call for the tests
Args:
package: A string to identify test package.
runner: A string to identify test runner.
test_methods: A dictionary contains {class_name, test_method}.
options: A dictionary constant {key, value} param for test.
Returns:
An instrumentation call command.
"""
if test_methods is None:
test_methods = {}
cmd_builder = InstrumentationCommandBuilder()
else:
cmd_builder = InstrumentationTestCommandBuilder()
if options is None:
options = {}
cmd_builder.set_manifest_package(package)
cmd_builder.set_runner(runner)
cmd_builder.add_flag("-w")
for class_name, test_method in test_methods.items():
cmd_builder.add_test_method(class_name, test_method)
for option_key, option_value in options.items():
cmd_builder.add_key_value_param(option_key, option_value)
return cmd_builder.build()
def check_chipset_vendor_by_qualcomm(ad):
"""Check if chipset vendor is by Qualcomm.
Args:
ad: An AndroidDevice object.
Returns:
True if it's by Qualcomm. False irf not.
"""
ad.root_adb()
soc = str(ad.adb.shell("getprop gsm.version.ril-impl"))
ad.log.debug("SOC = %s" % soc)
return "Qualcomm" in soc
def delete_lto_file(ad):
"""Delete downloaded LTO files.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
status = ad.adb.shell("rm -rf /data/vendor/gps/lto*")
ad.log.info("Delete downloaded LTO files.\n%s" % status)
def lto_mode(ad, state):
"""Enable or Disable LTO mode.
Args:
ad: An AndroidDevice object.
state: True to enable. False to disable.
"""
server_list = ["LONGTERM_PSDS_SERVER_1",
"LONGTERM_PSDS_SERVER_2",
"LONGTERM_PSDS_SERVER_3",
"NORMAL_PSDS_SERVER",
"REALTIME_PSDS_SERVER"]
delete_lto_file(ad)
if state:
tmp_path = tempfile.mkdtemp()
ad.pull_files("/etc/gps_debug.conf", tmp_path)
gps_conf_path = os.path.join(tmp_path, "gps_debug.conf")
gps_conf_file = open(gps_conf_path, "r")
lines = gps_conf_file.readlines()
gps_conf_file.close()
fout = open(gps_conf_path, "w")
for line in lines:
for server in server_list:
if server in line:
line = line.replace(line, "")
fout.write(line)
fout.close()
ad.push_system_file(gps_conf_path, "/etc/gps_debug.conf")
ad.log.info("Push back modified gps_debug.conf")
ad.log.info("LTO/RTO/RTI enabled")
shutil.rmtree(tmp_path, ignore_errors=True)
else:
ad.adb.shell("echo %r >> /etc/gps_debug.conf" %
DISABLE_LTO_FILE_CONTENTS)
ad.log.info("LTO/RTO/RTI disabled")
reboot(ad)
def lto_mode_wearable(ad, state):
"""Enable or Disable LTO mode for wearable in Android R release.
Args:
ad: An AndroidDevice object.
state: True to enable. False to disable.
"""
rto_enable = ' RtoEnable="true"\n'
rto_disable = ' RtoEnable="false"\n'
rti_enable = ' RtiEnable="true"\n'
rti_disable = ' RtiEnable="false"\n'
sync_lto_enable = ' HttpDirectSyncLto="true"\n'
sync_lto_disable = ' HttpDirectSyncLto="false"\n'
server_list = ["XTRA_SERVER_1", "XTRA_SERVER_2", "XTRA_SERVER_3"]
delete_lto_file(ad)
tmp_path = tempfile.mkdtemp()
ad.pull_files("/vendor/etc/gnss/gps.xml", tmp_path)
gps_xml_path = os.path.join(tmp_path, "gps.xml")
gps_xml_file = open(gps_xml_path, "r")
lines = gps_xml_file.readlines()
gps_xml_file.close()
fout = open(gps_xml_path, "w")
for line in lines:
if state:
if rto_disable in line:
line = line.replace(line, rto_enable)
ad.log.info("RTO enabled")
elif rti_disable in line:
line = line.replace(line, rti_enable)
ad.log.info("RTI enabled")
elif sync_lto_disable in line:
line = line.replace(line, sync_lto_enable)
ad.log.info("LTO sync enabled")
else:
if rto_enable in line:
line = line.replace(line, rto_disable)
ad.log.info("RTO disabled")
elif rti_enable in line:
line = line.replace(line, rti_disable)
ad.log.info("RTI disabled")
elif sync_lto_enable in line:
line = line.replace(line, sync_lto_disable)
ad.log.info("LTO sync disabled")
fout.write(line)
fout.close()
ad.push_system_file(gps_xml_path, "/vendor/etc/gnss/gps.xml")
ad.log.info("Push back modified gps.xml")
shutil.rmtree(tmp_path, ignore_errors=True)
if state:
xtra_tmp_path = tempfile.mkdtemp()
ad.pull_files("/etc/gps_debug.conf", xtra_tmp_path)
gps_conf_path = os.path.join(xtra_tmp_path, "gps_debug.conf")
gps_conf_file = open(gps_conf_path, "r")
lines = gps_conf_file.readlines()
gps_conf_file.close()
fout = open(gps_conf_path, "w")
for line in lines:
for server in server_list:
if server in line:
line = line.replace(line, "")
fout.write(line)
fout.close()
ad.push_system_file(gps_conf_path, "/etc/gps_debug.conf")
ad.log.info("Push back modified gps_debug.conf")
ad.log.info("LTO/RTO/RTI enabled")
shutil.rmtree(xtra_tmp_path, ignore_errors=True)
else:
ad.adb.shell(
"echo %r >> /etc/gps_debug.conf" % DISABLE_LTO_FILE_CONTENTS_R)
ad.log.info("LTO/RTO/RTI disabled")
def start_pixel_logger(ad, max_log_size_mb=100, max_number_of_files=500):
"""adb to start pixel logger for GNSS logging.
Args:
ad: An AndroidDevice object.
max_log_size_mb: Determines when to create a new log file if current
one reaches the size limit.
max_number_of_files: Determines how many log files can be saved on DUT.
"""
retries = 3
start_timeout_sec = 60
default_gnss_cfg = "/vendor/etc/mdlog/DEFAULT+SECURITY+FULLDPL+GPS.cfg"
if check_chipset_vendor_by_qualcomm(ad):
start_cmd = ("am startservice -a com.android.pixellogger."
"service.logging.LoggingService.ACTION_START_LOGGING "
"-e intent_key_cfg_path '%s' "
"--ei intent_key_max_log_size_mb %d "
"--ei intent_key_max_number_of_files %d" %
(default_gnss_cfg, max_log_size_mb, max_number_of_files))
else:
start_cmd = ("am startservice -a com.android.pixellogger."
"service.logging.LoggingService.ACTION_START_LOGGING "
"-e intent_logger brcm_gps "
"--ei intent_key_max_log_size_mb %d "
"--ei intent_key_max_number_of_files %d" %
(max_log_size_mb, max_number_of_files))
for attempt in range(retries):
begin_time = get_current_epoch_time() - 3000
ad.log.info("Start Pixel Logger - Attempt %d" % (attempt + 1))
ad.adb.shell(start_cmd)
while get_current_epoch_time() - begin_time <= start_timeout_sec * 1000:
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
if check_chipset_vendor_by_qualcomm(ad):
start_result = ad.search_logcat(
"ModemLogger: Start logging", begin_time)
else:
start_result = ad.search_logcat("startRecording", begin_time)
if start_result:
ad.log.info("Pixel Logger starts recording successfully.")
return True
stop_pixel_logger(ad)
else:
ad.log.warn("Pixel Logger fails to start recording in %d seconds "
"within %d attempts." % (start_timeout_sec, retries))
def stop_pixel_logger(ad):
"""adb to stop pixel logger for GNSS logging.
Args:
ad: An AndroidDevice object.
"""
retries = 3
stop_timeout_sec = 60
zip_timeout_sec = 30
if check_chipset_vendor_by_qualcomm(ad):
stop_cmd = ("am startservice -a com.android.pixellogger."
"service.logging.LoggingService.ACTION_STOP_LOGGING")
else:
stop_cmd = ("am startservice -a com.android.pixellogger."
"service.logging.LoggingService.ACTION_STOP_LOGGING "
"-e intent_logger brcm_gps")
for attempt in range(retries):
begin_time = get_current_epoch_time() - 3000
ad.log.info("Stop Pixel Logger - Attempt %d" % (attempt + 1))
ad.adb.shell(stop_cmd)
while get_current_epoch_time() - begin_time <= stop_timeout_sec * 1000:
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
stop_result = ad.search_logcat(
"LoggingService: Stopping service", begin_time)
if stop_result:
ad.log.info("Pixel Logger stops successfully.")
zip_end_time = time.time() + zip_timeout_sec
while time.time() < zip_end_time:
zip_file_created = ad.search_logcat(
"FileUtil: Zip file has been created", begin_time)
if zip_file_created:
ad.log.info("Pixel Logger created zip file "
"successfully.")
return True
else:
ad.log.warn("Pixel Logger failed to create zip file.")
return False
ad.force_stop_apk("com.android.pixellogger")
else:
ad.log.warn("Pixel Logger fails to stop in %d seconds within %d "
"attempts." % (stop_timeout_sec, retries))
def launch_eecoexer(ad):
"""Launch EEcoexer.
Args:
ad: An AndroidDevice object.
Raise:
signals.TestError if DUT fails to launch EEcoexer
"""
launch_cmd = ("am start -a android.intent.action.MAIN -n"
"com.google.eecoexer"
"/.MainActivity")
ad.adb.shell(launch_cmd)
try:
ad.log.info("Launch EEcoexer.")
except Exception as e:
ad.log.error(e)
raise signals.TestError("Failed to launch EEcoexer.")
def execute_eecoexer_function(ad, eecoexer_args):
"""Execute EEcoexer commands.
Args:
ad: An AndroidDevice object.
eecoexer_args: EEcoexer function arguments
"""
cat_index = eecoexer_args.split(',')[:2]
cat_index = ','.join(cat_index)
enqueue_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
" --es sms_body ENQUEUE,{}".format(eecoexer_args))
exe_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
" --es sms_body EXECUTE")
wait_for_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
" --es sms_body WAIT_FOR_COMPLETE,{}".format(cat_index))
ad.log.info("EEcoexer Add Enqueue: {}".format(eecoexer_args))
ad.adb.shell(enqueue_cmd)
ad.log.info("EEcoexer Excute.")
ad.adb.shell(exe_cmd)
ad.log.info("Wait EEcoexer for complete")
ad.adb.shell(wait_for_cmd)
def get_process_pid(ad, process_name):
"""Gets the process PID
Args:
ad: The device under test
process_name: The name of the process
Returns:
The PID of the process
"""
command = f"ps -A | grep {process_name} | awk '{{print $2}}'"
pid = ad.adb.shell(command)
return pid
def restart_gps_daemons(ad):
"""Restart GPS daemons by killing services of gpsd, lhd and scd.
Args:
ad: An AndroidDevice object.
"""
gps_daemons_list = ["gpsd", "lhd", "scd"]
ad.root_adb()
for service in gps_daemons_list:
time.sleep(3)
ad.log.info("Kill GPS daemon \"%s\"" % service)
service_pid = get_process_pid(ad, service)
ad.log.debug("%s PID: %s" % (service, service_pid))
ad.adb.shell(f"kill -9 {service_pid}")
# Wait 3 seconds for daemons and services to start.
time.sleep(3)
new_pid = get_process_pid(ad, service)
ad.log.debug("%s new PID: %s" % (service, new_pid))
if not new_pid or service_pid == new_pid:
raise signals.TestError("Unable to restart \"%s\"" % service)
ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % (
service, service_pid, new_pid))
def is_device_wearable(ad):
"""Check device is wearable project or not.
Args:
ad: An AndroidDevice object.
"""
package = ad.adb.getprop("ro.cw.home_package_names")
ad.log.debug("[ro.cw.home_package_names]: [%s]" % package)
return "wearable" in package
def is_mobile_data_on(ad):
"""Check if mobile data of device is on.
Args:
ad: An AndroidDevice object.
"""
if is_device_wearable(ad):
cell_on = ad.adb.shell("settings get global cell_on")
ad.log.debug("Current mobile status is %s" % cell_on)
return "1" in cell_on
else:
return ad.droid.telephonyIsDataEnabled()
def human_to_epoch_time(human_time):
"""Convert human readable time to epoch time.
Args:
human_time: Human readable time. (Ex: 2020-08-04 13:24:28.900)
Returns:
epoch: Epoch time in milliseconds.
"""
if "/" in human_time:
human_time.replace("/", "-")
try:
epoch_start = datetime.utcfromtimestamp(0)
if "." in human_time:
epoch_time = datetime.strptime(human_time, "%Y-%m-%d %H:%M:%S.%f")
else:
epoch_time = datetime.strptime(human_time, "%Y-%m-%d %H:%M:%S")
epoch = int((epoch_time - epoch_start).total_seconds() * 1000)
return epoch
except ValueError:
return None
def _get_dpo_info_from_logcat(ad, begin_time):
"""Gets the DPO info from logcat.
Args:
ad: The device under test.
begin_time: The start time of the log.
"""
dpo_results = ad.search_logcat("HardwareClockDiscontinuityCount",
begin_time)
if not dpo_results:
raise signals.TestError(
"No \"HardwareClockDiscontinuityCount\" is found in logs.")
return dpo_results
def check_dpo_rate_via_gnss_meas(ad, begin_time, dpo_threshold):
"""Check DPO engage rate through "HardwareClockDiscontinuityCount" in
GnssMeasurement callback.
Args:
ad: An AndroidDevice object.
begin_time: test begin time.
dpo_threshold: The value to set threshold. (Ex: dpo_threshold = 60)
"""
time_regex = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3})'
dpo_results = _get_dpo_info_from_logcat(ad, begin_time)
ad.log.info(dpo_results[0]["log_message"])
ad.log.info(dpo_results[-1]["log_message"])
start_time = re.compile(
time_regex).search(dpo_results[0]["log_message"]).group(1)
end_time = re.compile(
time_regex).search(dpo_results[-1]["log_message"]).group(1)
gnss_start_epoch = human_to_epoch_time(start_time)
gnss_stop_epoch = human_to_epoch_time(end_time)
test_time_in_sec = round((gnss_stop_epoch - gnss_start_epoch) / 1000) + 1
first_dpo_count = int(dpo_results[0]["log_message"].split()[-1])
final_dpo_count = int(dpo_results[-1]["log_message"].split()[-1])
dpo_rate = ((final_dpo_count - first_dpo_count)/test_time_in_sec)
dpo_engage_rate = "{percent:.2%}".format(percent=dpo_rate)
ad.log.info("DPO is ON for %d seconds during %d seconds test." % (
final_dpo_count - first_dpo_count, test_time_in_sec))
ad.log.info("TestResult DPO_Engage_Rate " + dpo_engage_rate)
threshold = "{percent:.0%}".format(percent=dpo_threshold / 100)
asserts.assert_true(dpo_rate * 100 > dpo_threshold,
"DPO only engaged %s in %d seconds test with "
"threshold %s." % (dpo_engage_rate,
test_time_in_sec,
threshold))
def parse_brcm_nmea_log(ad, nmea_pattern, brcm_error_log_allowlist, stop_logger=True):
"""Parse specific NMEA pattern out of BRCM NMEA log.
Args:
ad: An AndroidDevice object.
nmea_pattern: Specific NMEA pattern to parse.
brcm_error_log_allowlist: Benign error logs to exclude.
stop_logger: To stop pixel logger or not.
Returns:
brcm_log_list: A list of specific NMEA pattern logs.
"""
brcm_log_list = []
brcm_log_error_pattern = ["lhd: FS: Start Failsafe dump", "E slog"]
brcm_error_log_list = []
pixellogger_path = (
"/sdcard/Android/data/com.android.pixellogger/files/logs/gps/.")
if not isinstance(nmea_pattern, re.Pattern):
nmea_pattern = re.compile(nmea_pattern)
with tempfile.TemporaryDirectory() as tmp_dir:
try:
ad.pull_files(pixellogger_path, tmp_dir)
except AdbCommandError:
raise FileNotFoundError("No pixel logger folders found")
# Although we don't rely on the zip file, stop pixel logger here to avoid
# wasting resources.
if stop_logger:
stop_pixel_logger(ad)
tmp_path = pathlib.Path(tmp_dir)
log_folders = sorted([x for x in tmp_path.iterdir() if x.is_dir()])
if not log_folders:
raise FileNotFoundError("No BRCM logs found.")
# The folder name is a string of datetime, the latest one will be in the last index.
gl_logs = log_folders[-1].glob("**/gl*.log")
for nmea_log_path in gl_logs:
ad.log.info("Parsing log pattern of \"%s\" in %s" % (nmea_pattern,
nmea_log_path))
with open(nmea_log_path, "r", encoding="UTF-8", errors="ignore") as lines:
for line in lines:
line = line.strip()
if nmea_pattern.fullmatch(line):
brcm_log_list.append(line)
for attr in brcm_log_error_pattern:
if attr in line:
benign_log = False
for regex_pattern in brcm_error_log_allowlist:
if re.search(regex_pattern, line):
benign_log = True
ad.log.debug("\"%s\" is in allow-list and removed "
"from error." % line)
if not benign_log:
brcm_error_log_list.append(line)
brcm_error_log = "".join(brcm_error_log_list)
return brcm_log_list, brcm_error_log
def _get_power_mode_log_from_pixel_logger(ad, brcm_error_log_allowlist, stop_pixel_logger=True):
"""Gets the power log from pixel logger.
Args:
ad: The device under test.
brcm_error_log_allow_list: The allow list to ignore certain error in pixel logger.
stop_pixel_logger: To disable pixel logger when getting the log.
"""
pglor_list, brcm_error_log = parse_brcm_nmea_log(
ad, _BRCM_DUTY_CYCLE_PATTERN, brcm_error_log_allowlist, stop_pixel_logger)
if not pglor_list:
raise signals.TestFailure("Fail to get DPO logs from pixel logger")
return pglor_list, brcm_error_log
def check_dpo_rate_via_brcm_log(ad, dpo_threshold, brcm_error_log_allowlist):
"""Check DPO engage rate through "$PGLOR,11,STA" in BRCM Log.
D - Disabled, Always full power.
F - Enabled, now in full power mode.
S - Enabled, now in power save mode.
H - Host off load mode.
Args:
ad: An AndroidDevice object.
dpo_threshold: The value to set threshold. (Ex: dpo_threshold = 60)
brcm_error_log_allowlist: Benign error logs to exclude.
"""
always_full_power_count = 0
full_power_count = 0
power_save_count = 0
pglor_list, brcm_error_log = _get_power_mode_log_from_pixel_logger(ad, brcm_error_log_allowlist)
for pglor in pglor_list:
power_res = re.compile(r',P,(\w),').search(pglor).group(1)
if power_res == "D":
always_full_power_count += 1
elif power_res == "F":
full_power_count += 1
elif power_res == "S":
power_save_count += 1
ad.log.info(sorted(pglor_list)[0])
ad.log.info(sorted(pglor_list)[-1])
ad.log.info("TestResult Total_Count %d" % len(pglor_list))
ad.log.info("TestResult Always_Full_Power_Count %d" %
always_full_power_count)
ad.log.info("TestResult Full_Power_Mode_Count %d" % full_power_count)
ad.log.info("TestResult Power_Save_Mode_Count %d" % power_save_count)
dpo_rate = (power_save_count / len(pglor_list))
dpo_engage_rate = "{percent:.2%}".format(percent=dpo_rate)
ad.log.info("Power Save Mode is ON for %d seconds during %d seconds test."
% (power_save_count, len(pglor_list)))
ad.log.info("TestResult DPO_Engage_Rate " + dpo_engage_rate)
threshold = "{percent:.0%}".format(percent=dpo_threshold / 100)
asserts.assert_true((dpo_rate * 100 > dpo_threshold) and not brcm_error_log,
"Power Save Mode only engaged %s in %d seconds test "
"with threshold %s.\nAbnormal behavior found as below."
"\n%s" % (dpo_engage_rate,
len(pglor_list),
threshold,
brcm_error_log))
def process_pair(watch, phone):
"""Pair phone to watch via Bluetooth in OOBE.
Args:
watch: A wearable project.
phone: A pixel phone.
"""
check_location_service(phone)
utils.sync_device_time(phone)
bt_model_name = watch.adb.getprop("ro.product.model")
bt_sn_name = watch.adb.getprop("ro.serialno")
bluetooth_name = bt_model_name +" " + bt_sn_name[10:]
fastboot_factory_reset(watch, False)
# TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
watch.log.info("Wait 1 min for wearable system busy time.")
time.sleep(60)
watch.adb.shell("input keyevent 4")
# Clear Denali paired data in phone.
phone.adb.shell("pm clear com.google.android.gms")
phone.adb.shell("pm clear com.google.android.apps.wear.companion")
phone.adb.shell("am start -S -n com.google.android.apps.wear.companion/"
"com.google.android.apps.wear.companion.application.RootActivity")
uia_click(phone, "Continue")
uia_click(phone, "More")
uia_click(phone, "I agree")
uia_click(phone, "I accept")
uia_click(phone, bluetooth_name)
uia_click(phone, "Pair")
uia_click(phone, "Skip")
uia_click(phone, "Next")
uia_click(phone, "Skip")
uia_click(phone, "Done")
# TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
watch.log.info("Wait 3 mins for complete pairing process.")
time.sleep(180)
set_screen_always_on(watch)
check_location_service(watch)
enable_gnss_verbose_logging(watch)
def is_bluetooth_connected(watch, phone):
"""Check if device's Bluetooth status is connected or not.
Args:
watch: A wearable project
phone: A pixel phone.
"""
return watch.droid.bluetoothIsDeviceConnected(phone.droid.bluetoothGetLocalAddress())
def detect_crash_during_tracking(ad, begin_time, api_type, ignore_hal_crash=False):
"""Check if GNSS or GPSTool crash happened druing GNSS Tracking.
Args:
ad: An AndroidDevice object.
begin_time: Start Time to check if crash happened in logs.
api_type: Using GNSS or FLP reading method in GNSS tracking.
ignore_hal_crash: In BRCM devices, once the HAL is being killed, it will write error/fatal logs.
Ignore this error if the error logs are expected.
"""
gnss_crash_list = [".*Fatal signal.*gnss",
".*Fatal signal.*xtra"]
if not ignore_hal_crash:
gnss_crash_list += [".*Fatal signal.*gpsd", ".*F DEBUG.*gnss"]
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
for attr in gnss_crash_list:
gnss_crash_result = ad.adb.shell(
"logcat -d | grep -E -i '%s'" % attr, ignore_status=True, timeout = 300)
if gnss_crash_result:
start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
raise signals.TestFailure(
"Test failed due to GNSS HAL crashed. \n%s" %
gnss_crash_result)
gpstool_crash_result = ad.search_logcat("Force finishing activity "
"com.android.gpstool/.GPSTool",
begin_time)
if gpstool_crash_result:
raise signals.TestError("GPSTool crashed. Abort test.")
def is_wearable_btwifi(ad):
"""Check device is wearable btwifi sku or not.
Args:
ad: An AndroidDevice object.
"""
package = ad.adb.getprop("ro.product.product.name")
ad.log.debug("[ro.product.product.name]: [%s]" % package)
return "btwifi" in package
def compare_watch_phone_location(ad,watch_file, phone_file):
"""Compare watch and phone's FLP location to see if the same or not.
Args:
ad: An AndroidDevice object.
watch_file: watch's FLP locations
phone_file: phone's FLP locations
"""
not_match_location_counts = 0
not_match_location = []
for watch_key, watch_value in watch_file.items():
if phone_file.get(watch_key):
lat_ads = abs(float(watch_value[0]) - float(phone_file[watch_key][0]))
lon_ads = abs(float(watch_value[1]) - float(phone_file[watch_key][1]))
if lat_ads > 0.000002 or lon_ads > 0.000002:
not_match_location_counts += 1
not_match_location += (watch_key, watch_value, phone_file[watch_key])
if not_match_location_counts > 0:
ad.log.info("There are %s not match locations: %s" %(not_match_location_counts, not_match_location))
ad.log.info("Watch's locations are not using Phone's locations.")
return False
else:
ad.log.info("Watch's locations are using Phone's location.")
return True
def check_tracking_file(ad):
"""Check tracking file in device and save "Latitude", "Longitude", and "Time" information.
Args:
ad: An AndroidDevice object.
Returns:
location_reports: A dict with [latitude, longitude]
"""
location_reports = dict()
test_logfile = {}
file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
% GNSSSTATUS_LOG_PATH))
if file_count != 1:
ad.log.error("%d API logs exist." % file_count)
dir_file = ad.adb.shell("ls %s" % GNSSSTATUS_LOG_PATH).split()
for path_key in dir_file:
if fnmatch.fnmatch(path_key, "*.txt"):
logpath = posixpath.join(GNSSSTATUS_LOG_PATH, path_key)
out = ad.adb.shell("wc -c %s" % logpath)
file_size = int(out.split(" ")[0])
if file_size < 10:
ad.log.info("Skip log %s due to log size %d bytes" %
(path_key, file_size))
continue
test_logfile = logpath
if not test_logfile:
raise signals.TestError("Failed to get test log file in device.")
lines = ad.adb.shell("cat %s" % test_logfile).split("\n")
for file_data in lines:
if "Latitude:" in file_data:
file_lat = ("%.6f" %float(file_data[9:]))
elif "Longitude:" in file_data:
file_long = ("%.6f" %float(file_data[11:]))
elif "Time:" in file_data:
file_time = (file_data[17:25])
location_reports[file_time] = [file_lat, file_long]
return location_reports
def uia_click(ad, matching_text):
"""Use uiautomator to click objects.
Args:
ad: An AndroidDevice object.
matching_text: Text of the target object to click
"""
if ad.uia(textMatches=matching_text).wait.exists(timeout=60000):
ad.uia(textMatches=matching_text).click()
ad.log.info("Click button %s" % matching_text)
else:
ad.log.error("No button named %s" % matching_text)
def delete_bcm_nvmem_sto_file(ad):
"""Delete BCM's NVMEM ephemeris gldata.sto.
Args:
ad: An AndroidDevice object.
"""
remount_device(ad)
rm_cmd = "rm -rf {}".format(BCM_NVME_STO_PATH)
status = ad.adb.shell(rm_cmd)
ad.log.info("Delete BCM's NVMEM ephemeris files.\n%s" % status)
def bcm_gps_xml_update_option(ad,
option,
search_line=None,
append_txt=None,
update_txt=None,
delete_txt=None,
gps_xml_path=BCM_GPS_XML_PATH):
"""Append parameter setting in gps.xml for BCM solution
Args:
option: A str to identify the operation (add/update/delete).
ad: An AndroidDevice object.
search_line: Pattern matching of target
line for appending new line data.
append_txt: New lines that will be appended after the search_line.
update_txt: New line to update the original file.
delete_txt: lines to delete from the original file.
gps_xml_path: gps.xml file location of DUT
"""
remount_device(ad)
#Update gps.xml
tmp_log_path = tempfile.mkdtemp()
ad.pull_files(gps_xml_path, tmp_log_path)
gps_xml_tmp_path = os.path.join(tmp_log_path, "gps.xml")
gps_xml_file = open(gps_xml_tmp_path, "r")
lines = gps_xml_file.readlines()
gps_xml_file.close()
fout = open(gps_xml_tmp_path, "w")
if option == "add":
for line in lines:
if line.strip() in append_txt:
ad.log.info("{} is already in the file. Skip".format(append_txt))
continue
fout.write(line)
if search_line in line:
for add_txt in append_txt:
fout.write(add_txt)
ad.log.info("Add new line: '{}' in gps.xml.".format(add_txt))
elif option == "update":
for line in lines:
if search_line in line:
ad.log.info(line)
fout.write(update_txt)
ad.log.info("Update line: '{}' in gps.xml.".format(update_txt))
continue
fout.write(line)
elif option == "delete":
for line in lines:
if delete_txt in line:
ad.log.info("Delete line: '{}' in gps.xml.".format(line.strip()))
continue
fout.write(line)
fout.close()
# Update gps.xml with gps_new.xml
ad.push_system_file(gps_xml_tmp_path, gps_xml_path)
# remove temp folder
shutil.rmtree(tmp_log_path, ignore_errors=True)
def bcm_gps_ignore_warmstandby(ad):
""" remove warmstandby setting in BCM gps.xml to reset tracking filter
Args:
ad: An AndroidDevice object.
"""
search_line_tag = '<gll\n'
delete_line_str = 'WarmStandbyTimeout1Seconds'
bcm_gps_xml_update_option(ad,
"delete",
search_line_tag,
append_txt=None,
update_txt=None,
delete_txt=delete_line_str)
search_line_tag = '<gll\n'
delete_line_str = 'WarmStandbyTimeout2Seconds'
bcm_gps_xml_update_option(ad,
"delete",
search_line_tag,
append_txt=None,
update_txt=None,
delete_txt=delete_line_str)
def bcm_gps_ignore_rom_alm(ad):
""" Update BCM gps.xml with ignoreRomAlm="True"
Args:
ad: An AndroidDevice object.
"""
search_line_tag = '<hal\n'
append_line_str = [' IgnoreJniTime=\"true\"\n']
bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str)
search_line_tag = '<gll\n'
append_line_str = [' IgnoreRomAlm=\"true\"\n',
' AutoColdStartSignal=\"SIMULATED\"\n',
' IgnoreJniTime=\"true\"\n']
bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str)
def check_inject_time(ad):
"""Check if watch could get the UTC time.
Args:
ad: An AndroidDevice object.
"""
for i in range(1, 6):
time.sleep(10)
inject_time_results = ad.search_logcat("GPSIC.OUT.gps_inject_time")
ad.log.info("Check time injected - attempt %s" % i)
if inject_time_results:
ad.log.info("Time is injected successfully.")
return True
raise signals.TestFailure("Fail to get time injected within %s attempts." % i)
def recover_paired_status(watch, phone):
"""Recover Bluetooth paired status if not paired.
Args:
watch: A wearable project.
phone: A pixel phone.
"""
for _ in range(3):
watch.log.info("Switch Bluetooth Off-On to recover paired status.")
for status in (False, True):
watch.droid.bluetoothToggleState(status)
phone.droid.bluetoothToggleState(status)
# TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
watch.log.info("Wait for Bluetooth auto re-connect.")
time.sleep(10)
if is_bluetooth_connected(watch, phone):
watch.log.info("Success to recover paired status.")
return True
raise signals.TestFailure("Fail to recover BT paired status in 3 attempts.")
def push_lhd_overlay(ad):
"""Push lhd_overlay.conf to device in /data/vendor/gps/overlay/
ad:
ad: An AndroidDevice object.
"""
overlay_name = "lhd_overlay.conf"
overlay_asset = ad.adb.shell("ls /data/vendor/gps/overlay/")
if overlay_name in overlay_asset:
ad.log.info(f"{overlay_name} already in device, skip.")
return
temp_path = tempfile.mkdtemp()
file_path = os.path.join(temp_path, overlay_name)
lhd_content = 'Lhe477xDebugFlags=RPC:FACILITY=2097151:LOG_INFO:STDOUT_PUTS:STDOUT_LOG\n'\
'LogLevel=*:E\nLogLevel=*:W\nLogLevel=*:I\nLog=LOGCAT\nLogEnabled=true\n'
overlay_path = "/data/vendor/gps/overlay/"
with open(file_path, "w") as f:
f.write(lhd_content)
ad.log.info("Push lhd_overlay to device")
ad.adb.push(file_path, overlay_path)
def disable_ramdump(ad):
"""Disable ramdump so device will reboot when about to enter ramdump
Once device enter ramdump, it will take a while to generate dump file
The process may take a while and block all the tests.
By disabling the ramdump mode, device will reboot instead of entering ramdump mode
Args:
ad: An AndroidDevice object.
"""
ad.log.info("Enter bootloader mode")
ad.stop_services()
ad.adb.reboot("bootloader")
for _ in range(1,9):
if ad.is_bootloader:
break
time.sleep(1)
else:
raise signals.TestFailure("can't enter bootloader mode")
ad.log.info("Disable ramdump")
ad.fastboot.oem("ramdump disable")
ad.fastboot.reboot()
ad.wait_for_boot_completion()
ad.root_adb()
tutils.bring_up_sl4a(ad)
ad.start_adb_logcat()
def deep_suspend_device(ad):
"""Force DUT to enter deep suspend mode
When DUT is connected to PCs, it won't enter deep suspend mode
by pressing power button.
To force DUT enter deep suspend mode, we need to send the
following command to DUT "echo mem >/sys/power/state"
To make sure the DUT stays in deep suspend mode for a while,
it will send the suspend command 3 times with 15s interval
Args:
ad: An AndroidDevice object.
"""
ad.log.info("Ready to go to deep suspend mode")
begin_time = get_device_time(ad)
ad.droid.goToSleepNow()
ensure_power_manager_is_dozing(ad, begin_time)
ad.stop_services()
try:
command = "echo deep > /sys/power/mem_sleep && echo mem > /sys/power/state"
for i in range(1, 4):
ad.log.debug(f"Send deep suspend command round {i}")
ad.adb.shell(command, ignore_status=True)
# sleep here to ensure the device stays enough time in deep suspend mode
time.sleep(15)
if not _is_device_enter_deep_suspend(ad):
raise signals.TestFailure("Device didn't enter deep suspend mode")
ad.log.info("Wake device up now")
except Exception:
# when exception happen, it's very likely the device is rebooting
# to ensure the test can go on, wait for the device is ready
ad.log.warn("Device may be rebooting, wait for it")
ad.wait_for_boot_completion()
ad.root_adb()
raise
finally:
tutils.bring_up_sl4a(ad)
ad.start_adb_logcat()
ad.droid.wakeUpNow()
def get_device_time(ad):
"""Get current datetime from device
Args:
ad: An AndroidDevice object.
Returns:
datetime object
"""
result = ad.adb.shell("date +\"%Y-%m-%d %T.%3N\"")
return datetime.strptime(result, "%Y-%m-%d %H:%M:%S.%f")
def ensure_power_manager_is_dozing(ad, begin_time):
"""Check if power manager is in dozing
When device is sleeping, power manager should goes to doze mode.
To ensure that, we check the log every 1 second (maximum to 3 times)
Args:
ad: An AndroidDevice object.
begin_time: datetime, used as the starting point to search log
"""
keyword = "PowerManagerService: Dozing"
ad.log.debug("Log search start time: %s" % begin_time)
for i in range(0,3):
result = ad.search_logcat(keyword, begin_time)
if result:
break
ad.log.debug("Power manager is not dozing... retry in 1 second")
time.sleep(1)
else:
ad.log.warn("Power manager didn't enter dozing")
def _is_device_enter_deep_suspend(ad):
"""Check device has been enter deep suspend mode
If device has entered deep suspend mode, we should be able to find keyword
"suspend entry (deep)"
Args:
ad: An AndroidDevice object.
Returns:
bool: True / False -> has / has not entered deep suspend
"""
cmd = f"adb -s {ad.serial} logcat -d|grep \"suspend entry (deep)\""
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
result, _ = process.communicate()
ad.log.debug(f"suspend result = {result}")
return bool(result)
def check_location_report_interval(ad, location_reported_time_src, total_seconds, tolerance):
"""Validate the interval between two location reported time
Normally the interval should be around 1 second but occasionally it may up to nearly 2 seconds
So we set up a tolerance - 99% of reported interval should be less than 1.3 seconds
We validate the interval backward, because the wrong interval mostly happened at the end
Args:
ad: An AndroidDevice object.
location_reported_time_src: A list of reported time(in string) from GPS tool
total_seconds: (int) how many seconds has the GPS been enabled
tolerance: (float) set how many ratio of error should be accepted
if we want to set tolerance to be 1% then pass 0.01 as tolerance value
"""
ad.log.info("Checking location report frequency")
error_count = 0
error_tolerance = max(1, int(total_seconds * tolerance))
expected_longest_interval = 1.3
location_reported_time = list(map(lambda x: datetime.strptime(x, "%Y/%m/%d %H:%M:%S.%f"),
location_reported_time_src))
location_reported_time = sorted(location_reported_time)
last_gps_report_time = location_reported_time[-1]
ad.log.debug("Location report time: %s" % location_reported_time)
for reported_time in reversed(location_reported_time):
time_diff = last_gps_report_time - reported_time
if time_diff.total_seconds() > expected_longest_interval:
error_count += 1
last_gps_report_time = reported_time
if error_count > error_tolerance:
fail_message = (f"Interval longer than {expected_longest_interval}s "
f"exceed tolerance count: {error_tolerance}, error count: {error_count}")
ad.log.error(fail_message)
@contextmanager
def set_screen_status(ad, off=True):
"""Set screen on / off
A context manager function, can be used with "with" statement.
example:
with set_screen_status(ad, off=True):
do anything you want during screen is off
Once the function end, it will turn on the screen
Args:
ad: AndroidDevice object
off: (bool) True -> turn off screen / False -> leave screen as it is
"""
try:
if off:
ad.droid.goToSleepNow()
yield ad
finally:
ad.droid.wakeUpNow()
ensure_device_screen_is_on(ad)
@contextmanager
def full_gnss_measurement(ad):
"""Context manager function to enable full gnss measurement"""
try:
ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 1")
yield ad
finally:
ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 0")
def ensure_device_screen_is_on(ad):
"""Make sure the screen is on
Will try 3 times, each with 1 second interval
Raise:
GnssTestUtilsError: if screen can't be turn on after 3 tries
"""
for _ in range(3):
# when NotificationShade appears in focus window, it indicates the screen is still off
if "NotificationShade" not in check_current_focus_app(ad):
break
time.sleep(1)
else:
raise GnssTestUtilsError("Device screen is not on after 3 tries")
def start_qxdm_and_tcpdump_log(ad, enable):
"""Start QXDM and adb tcpdump if collect_logs is True.
Args:
ad: AndroidDevice object
enable: (bool) True -> start collecting
False -> not start collecting
"""
if enable:
start_pixel_logger(ad)
tlutils.start_adb_tcpdump(ad)
def set_screen_always_on(ad):
"""Ensure the sceen will not turn off and display the correct app screen
for wearable, we also disable the charing screen,
otherwise the charing screen will keep popping up and block the GPS tool
"""
if is_device_wearable(ad):
ad.adb.shell("settings put global stay_on_while_plugged_in 7")
ad.adb.shell("setprop persist.enable_charging_experience false")
else:
ad.adb.shell("settings put system screen_off_timeout 1800000")
def validate_adr_rate(ad, pass_criteria):
"""Check the ADR rate
Args:
ad: AndroidDevice object
pass_criteria: (float) the passing ratio, 1 = 100%, 0.5 = 50%
"""
adr_statistic = GnssMeasurement(ad).get_adr_static()
ad.log.info("ADR threshold: {0:.1%}".format(pass_criteria))
ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_valid_rate {0:.1%}".format(adr_statistic.valid_rate))
ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
"ADR_usable_rate {0:.1%}".format(adr_statistic.usable_rate))
ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_total_count %s" % adr_statistic.total_count)
ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_valid_count %s" % adr_statistic.valid_count)
ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_reset_count %s" % adr_statistic.reset_count)
ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
"ADR_cycle_slip_count %s" % adr_statistic.cycle_slip_count)
ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
"ADR_half_cycle_reported_count %s" % adr_statistic.half_cycle_reported_count)
ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
"ADR_half_cycle_resolved_count %s" % adr_statistic.half_cycle_resolved_count)
asserts.assert_true(
(pass_criteria < adr_statistic.valid_rate) and (pass_criteria < adr_statistic.usable_rate),
f"ADR valid rate: {adr_statistic.valid_rate:.1%}, "
f"ADR usable rate: {adr_statistic.usable_rate:.1%} "
f"Lower than expected: {pass_criteria:.1%}"
)
def pair_to_wearable(watch, phone):
"""Pair watch to phone.
Args:
watch: A wearable project.
phone: A pixel phone.
Raise:
TestFailure: If pairing process could not success after 3 tries.
"""
for _ in range(3):
process_pair(watch, phone)
if is_bluetooth_connected(watch, phone):
watch.log.info("Pairing successfully.")
return True
raise signals.TestFailure("Pairing is not successfully.")
def disable_battery_defend(ad):
"""Disable battery defend config to prevent battery defend message pop up
after connecting to the same charger for 4 days in a row.
Args:
ad: A wearable project.
"""
for _ in range(5):
remount_device(ad)
ad.adb.shell("setprop vendor.battery.defender.disable 1")
# To simulate cable unplug and the status will be recover after device reboot.
ad.adb.shell("cmd battery unplug")
# Sleep 3 seconds for waiting adb commend changes config and simulates cable unplug.
time.sleep(3)
config_setting = ad.adb.shell("getprop vendor.battery.defender.state")
if config_setting == "DISABLED":
ad.log.info("Disable Battery Defend setting successfully.")
break
def restart_hal_service(ad):
"""Restart HAL service by killing the pid.
Gets the pid by ps command and pass the pid to kill command. Then we get the pid of HAL service
again to see if the pid changes(pid should be different after HAL restart). If not, we will
retry up to 4 times before raising Test Failure.
Args:
ad: AndroidDevice object
"""
ad.log.info("Restart HAL service")
hal_process_name = "'android.hardware.gnss@[[:digit:]]\{1,2\}\.[[:digit:]]\{1,2\}-service'"
hal_pid = get_process_pid(ad, hal_process_name)
ad.log.info("HAL pid: %s" % hal_pid)
# Retry kill process if the PID is the same as original one
for _ in range(4):
ad.log.info("Kill HAL service")
ad.adb.shell(f"kill -9 {hal_pid}")
# Waits for the HAL service to restart up to 4 seconds.
for _ in range(4):
new_hal_pid = get_process_pid(ad, hal_process_name)
ad.log.info("New HAL pid: %s" % new_hal_pid)
if new_hal_pid:
if hal_pid != new_hal_pid:
return
break
time.sleep(1)
else:
raise signals.TestFailure("HAL service can't be killed")
def run_ttff(ad, mode, criteria, test_cycle, base_lat_long, collect_logs=False):
"""Verify TTFF functionality with mobile data.
Args:
mode: "cs", "ws" or "hs"
criteria: Criteria for the test.
"""
start_qxdm_and_tcpdump_log(ad, collect_logs)
run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, base_lat_long)
def re_register_measurement_callback(dut):
"""Send command to unregister then register measurement callback.
Args:
dut: The device under test.
"""
dut.log.info("Reregister measurement callback")
dut.adb.shell("am broadcast -a com.android.gpstool.stop_meas_action")
time.sleep(1)
dut.adb.shell("am broadcast -a com.android.gpstool.start_meas_action")
time.sleep(1)
def check_power_save_mode_status(ad, full_power, begin_time, brcm_error_allowlist):
"""Checks the power save mode status.
For Broadcom:
Gets NEMA sentences from pixel logger and retrieve the status [F, S, D].
F,S => not in full power mode
D => in full power mode
For Qualcomm:
Gets the HardwareClockDiscontinuityCount from logcat. In full power mode, the
HardwareClockDiscontinuityCount should not be increased.
Args:
ad: The device under test.
full_power: The device is in full power mode or not.
begin_time: It is used to get the correct logcat information for qualcomm.
brcm_error_allowlist: It is used to ignore certain error in pixel logger.
"""
if check_chipset_vendor_by_qualcomm(ad):
_check_qualcomm_power_save_mode(ad, full_power, begin_time)
else:
_check_broadcom_power_save_mode(ad, full_power, brcm_error_allowlist)
def _check_qualcomm_power_save_mode(ad, full_power, begin_time):
dpo_results = _get_dpo_info_from_logcat(ad, begin_time)
first_dpo_count = int(dpo_results[0]["log_message"].split()[-1])
final_dpo_count = int(dpo_results[-1]["log_message"].split()[-1])
dpo_count_diff = final_dpo_count - first_dpo_count
ad.log.debug("The DPO count diff is {diff}".format(diff=dpo_count_diff))
if full_power:
asserts.assert_equal(dpo_count_diff, 0, msg="DPO count diff should be 0")
else:
asserts.assert_true(dpo_count_diff > 0, msg="DPO count diff should be more than 0")
def _check_broadcom_power_save_mode(ad, full_power, brcm_error_allowlist):
power_save_log, _ = _get_power_mode_log_from_pixel_logger(
ad, brcm_error_allowlist, stop_pixel_logger=False)
power_status = re.compile(r',P,(\w),').search(power_save_log[-2]).group(1)
ad.log.debug("The power status is {status}".format(status=power_status))
if full_power:
asserts.assert_true(power_status == "D", msg="Should be in full power mode")
else:
asserts.assert_true(power_status in ["F", "S"], msg="Should not be in full power mode")
@contextmanager
def run_gnss_tracking(ad, criteria, meas_flag):
"""A context manager to enable gnss tracking and stops at the end.
Args:
ad: The device under test.
criteria: The criteria for First Fixed.
meas_flag: A flag to turn on measurement log or not.
"""
process_gnss_by_gtw_gpstool(ad, criteria=criteria, meas_flag=meas_flag)
try:
yield
finally:
start_gnss_by_gtw_gpstool(ad, state=False)
def log_current_epoch_time(ad, sponge_key):
"""Logs current epoch timestamp in second.
Args:
sponge_key: The key name of the sponge property.
"""
current_epoch_time = get_current_epoch_time() // 1000
ad.log.info(f"TestResult {sponge_key} {current_epoch_time}")