| #!/usr/bin/env python3 |
| # |
| # Copyright 2021 - Google |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from datetime import datetime |
| import os |
| import re |
| import shutil |
| import time |
| |
| from acts import utils |
| from acts.libs.proc import job |
| from acts.controllers.android_device import DEFAULT_QXDM_LOG_PATH |
| from acts.controllers.android_device import DEFAULT_SDM_LOG_PATH |
| from acts.libs.utils.multithread import run_multithread_func |
| from acts.utils import get_current_epoch_time |
| from acts.utils import start_standing_subprocess |
| |
| |
| _LS_MASK_NAME = "Lassen default + TCP" |
| |
| _LS_ENABLE_LOG_SHELL = f"""\ |
| am broadcast -n com.android.pixellogger/.receiver.AlwaysOnLoggingReceiver \ |
| -a com.android.pixellogger.service.logging.LoggingService.ACTION_CONFIGURE_ALWAYS_ON_LOGGING \ |
| -e intent_key_enable "true" -e intent_key_config "{_LS_MASK_NAME}" \ |
| --ei intent_key_max_log_size_mb 100 --ei intent_key_max_number_of_files 100 |
| """ |
| _LS_DISABLE_LOG_SHELL = """\ |
| am broadcast -n com.android.pixellogger/.receiver.AlwaysOnLoggingReceiver \ |
| -a com.android.pixellogger.service.logging.LoggingService.ACTION_CONFIGURE_ALWAYS_ON_LOGGING \ |
| -e intent_key_enable "false" |
| """ |
| |
| |
| def check_if_tensor_platform(ad): |
| """Check if current platform belongs to the Tensor platform |
| |
| Args: |
| ad: Android object |
| |
| Returns: |
| True if current platform belongs to the Tensor platform. Otherwise False. |
| """ |
| result = ad.adb.getprop("ro.boot.hardware.platform") |
| if re.search('^gs', result, re.I): |
| return True |
| return False |
| |
| |
| def start_pixellogger_always_on_logging(ad): |
| """Start always-on logging of Pixellogger for both Qualcomm and Tensor |
| platform. |
| |
| Args: |
| ad: Android object |
| |
| Returns: |
| True if the property is set correctly. Otherwise False. |
| """ |
| setattr(ad, 'enable_always_on_modem_logger', True) |
| if check_if_tensor_platform(ad): |
| key = "persist.vendor.sys.modem.logging.enable" |
| else: |
| key = "persist.vendor.sys.modem.diag.mdlog" |
| |
| if ad.adb.getprop(key) == "false": |
| ad.adb.shell("setprop persist.vendor.sys.modem.logging.enable true") |
| time.sleep(5) |
| if ad.adb.getprop(key) == "true": |
| return True |
| else: |
| return False |
| else: |
| return True |
| |
| |
| def start_sdm_logger(ad): |
| """Start SDM logger.""" |
| if not getattr(ad, "sdm_log", True): return |
| |
| # Delete existing SDM logs which were created 15 mins prior |
| ad.sdm_log_path = DEFAULT_SDM_LOG_PATH |
| file_count = ad.adb.shell( |
| f"find {ad.sdm_log_path} -type f -iname sbuff_[0-9]*.sdm* | wc -l") |
| if int(file_count) > 3: |
| seconds = 15 * 60 |
| # Remove sdm logs modified more than specified seconds ago |
| ad.adb.shell( |
| f"find {ad.sdm_log_path} -type f -iname sbuff_[0-9]*.sdm* " |
| f"-not -mtime -{seconds}s -delete") |
| |
| # Disable modem logging already running |
| stop_sdm_logger(ad) |
| |
| # start logging |
| ad.log.debug("start sdm logging") |
| while int( |
| ad.adb.shell(f"find {ad.sdm_log_path} -type f " |
| "-iname sbuff_profile.sdm | wc -l") == 0 or |
| int( |
| ad.adb.shell(f"find {ad.sdm_log_path} -type f " |
| "-iname sbuff_[0-9]*.sdm* | wc -l")) == 0): |
| ad.adb.shell(_LS_ENABLE_LOG_SHELL, ignore_status=True) |
| time.sleep(5) |
| |
| |
| def stop_sdm_logger(ad): |
| """Stop SDM logger.""" |
| ad.sdm_log_path = DEFAULT_SDM_LOG_PATH |
| cycle = 1 |
| |
| ad.log.debug("stop sdm logging") |
| while int( |
| ad.adb.shell( |
| f"find {ad.sdm_log_path} -type f -iname sbuff_profile.sdm -o " |
| "-iname sbuff_[0-9]*.sdm* | wc -l")) != 0: |
| if cycle == 1 and int( |
| ad.adb.shell(f"find {ad.sdm_log_path} -type f " |
| "-iname sbuff_profile.sdm | wc -l")) == 0: |
| ad.adb.shell(_LS_ENABLE_LOG_SHELL, ignore_status=True) |
| time.sleep(5) |
| ad.adb.shell(_LS_DISABLE_LOG_SHELL, ignore_status=True) |
| cycle += 1 |
| time.sleep(15) |
| |
| |
| def start_sdm_loggers(log, ads): |
| tasks = [(start_sdm_logger, [ad]) for ad in ads |
| if getattr(ad, "sdm_log", True)] |
| if tasks: run_multithread_func(log, tasks) |
| |
| |
| def stop_sdm_loggers(log, ads): |
| tasks = [(stop_sdm_logger, [ad]) for ad in ads] |
| run_multithread_func(log, tasks) |
| |
| |
| def find_qxdm_log_mask(ad, mask="default.cfg"): |
| """Find QXDM logger mask.""" |
| if "/" not in mask: |
| # Call nexuslogger to generate log mask |
| start_nexuslogger(ad) |
| # Find the log mask path |
| for path in (DEFAULT_QXDM_LOG_PATH, "/data/diag_logs", |
| "/vendor/etc/mdlog/", "/vendor/etc/modem/"): |
| out = ad.adb.shell( |
| "find %s -type f -iname %s" % (path, mask), ignore_status=True) |
| if out and "No such" not in out and "Permission denied" not in out: |
| if path.startswith("/vendor/"): |
| setattr(ad, "qxdm_log_path", DEFAULT_QXDM_LOG_PATH) |
| else: |
| setattr(ad, "qxdm_log_path", path) |
| return out.split("\n")[0] |
| for mask_file in ("/vendor/etc/mdlog/", "/vendor/etc/modem/"): |
| if mask in ad.adb.shell("ls %s" % mask_file, ignore_status=True): |
| setattr(ad, "qxdm_log_path", DEFAULT_QXDM_LOG_PATH) |
| return "%s/%s" % (mask_file, mask) |
| else: |
| out = ad.adb.shell("ls %s" % mask, ignore_status=True) |
| if out and "No such" not in out: |
| qxdm_log_path, cfg_name = os.path.split(mask) |
| setattr(ad, "qxdm_log_path", qxdm_log_path) |
| return mask |
| ad.log.warning("Could NOT find QXDM logger mask path for %s", mask) |
| |
| |
| def set_qxdm_logger_command(ad, mask=None): |
| """Set QXDM logger always on. |
| |
| Args: |
| ad: android device object. |
| |
| """ |
| ## Neet to check if log mask will be generated without starting nexus logger |
| masks = [] |
| mask_path = None |
| if mask: |
| masks = [mask] |
| masks.extend(["QC_Default.cfg", "default.cfg"]) |
| for mask in masks: |
| mask_path = find_qxdm_log_mask(ad, mask) |
| if mask_path: break |
| if not mask_path: |
| ad.log.error("Cannot find QXDM mask %s", mask) |
| ad.qxdm_logger_command = None |
| return False |
| else: |
| ad.log.info("Use QXDM log mask %s", mask_path) |
| ad.log.debug("qxdm_log_path = %s", ad.qxdm_log_path) |
| output_path = os.path.join(ad.qxdm_log_path, "logs") |
| ad.qxdm_logger_command = ("diag_mdlog -f %s -o %s -s 90 -c" % |
| (mask_path, output_path)) |
| return True |
| |
| |
| def stop_qxdm_logger(ad): |
| """Stop QXDM logger.""" |
| for cmd in ("diag_mdlog -k", "killall diag_mdlog"): |
| output = ad.adb.shell("ps -ef | grep mdlog") or "" |
| if "diag_mdlog" not in output: |
| break |
| ad.log.debug("Kill the existing qxdm process") |
| ad.adb.shell(cmd, ignore_status=True) |
| time.sleep(5) |
| |
| |
| def start_qxdm_logger(ad, begin_time=None): |
| """Start QXDM logger.""" |
| if not getattr(ad, "qxdm_log", True): return |
| # Delete existing QXDM logs 5 minutes earlier than the begin_time |
| current_time = get_current_epoch_time() |
| if getattr(ad, "qxdm_log_path", None): |
| seconds = None |
| file_count = ad.adb.shell( |
| "find %s -type f -iname *.qmdl | wc -l" % ad.qxdm_log_path) |
| if int(file_count) > 3: |
| if begin_time: |
| # if begin_time specified, delete old qxdm logs modified |
| # 10 minutes before begin time |
| seconds = int((current_time - begin_time) / 1000.0) + 10 * 60 |
| else: |
| # if begin_time is not specified, delete old qxdm logs modified |
| # 15 minutes before current time |
| seconds = 15 * 60 |
| if seconds: |
| # Remove qxdm logs modified more than specified seconds ago |
| ad.adb.shell( |
| "find %s -type f -iname *.qmdl -not -mtime -%ss -delete" % |
| (ad.qxdm_log_path, seconds)) |
| ad.adb.shell( |
| "find %s -type f -iname *.xml -not -mtime -%ss -delete" % |
| (ad.qxdm_log_path, seconds)) |
| if getattr(ad, "qxdm_logger_command", None): |
| output = ad.adb.shell("ps -ef | grep mdlog") or "" |
| if ad.qxdm_logger_command not in output: |
| ad.log.debug("QXDM logging command %s is not running", |
| ad.qxdm_logger_command) |
| if "diag_mdlog" in output: |
| # Kill the existing non-matching diag_mdlog process |
| # Only one diag_mdlog process can be run |
| stop_qxdm_logger(ad) |
| ad.log.info("Start QXDM logger") |
| ad.adb.shell_nb(ad.qxdm_logger_command) |
| time.sleep(10) |
| else: |
| run_time = check_qxdm_logger_run_time(ad) |
| if run_time < 600: |
| # the last diag_mdlog started within 10 minutes ago |
| # no need to restart |
| return True |
| if ad.search_logcat( |
| "Diag_Lib: diag: In delete_log", |
| begin_time=current_time - |
| run_time) or not ad.get_file_names( |
| ad.qxdm_log_path, |
| begin_time=current_time - 600000, |
| match_string="*.qmdl"): |
| # diag_mdlog starts deleting files or no qmdl logs were |
| # modified in the past 10 minutes |
| ad.log.debug("Quit existing diag_mdlog and start a new one") |
| stop_qxdm_logger(ad) |
| ad.adb.shell_nb(ad.qxdm_logger_command) |
| time.sleep(10) |
| return True |
| |
| |
| def disable_qxdm_logger(ad): |
| for prop in ("persist.sys.modem.diag.mdlog", |
| "persist.vendor.sys.modem.diag.mdlog", |
| "vendor.sys.modem.diag.mdlog_on"): |
| if ad.adb.getprop(prop): |
| ad.adb.shell("setprop %s false" % prop, ignore_status=True) |
| for apk in ("com.android.nexuslogger", "com.android.pixellogger"): |
| if ad.is_apk_installed(apk) and ad.is_apk_running(apk): |
| ad.force_stop_apk(apk) |
| stop_qxdm_logger(ad) |
| return True |
| |
| |
| def check_qxdm_logger_run_time(ad): |
| output = ad.adb.shell("ps -eo etime,cmd | grep diag_mdlog") |
| result = re.search(r"(\d+):(\d+):(\d+) diag_mdlog", output) |
| if result: |
| return int(result.group(1)) * 60 * 60 + int( |
| result.group(2)) * 60 + int(result.group(3)) |
| else: |
| result = re.search(r"(\d+):(\d+) diag_mdlog", output) |
| if result: |
| return int(result.group(1)) * 60 + int(result.group(2)) |
| else: |
| return 0 |
| |
| |
| def start_qxdm_loggers(log, ads, begin_time=None): |
| tasks = [(start_qxdm_logger, [ad, begin_time]) for ad in ads |
| if getattr(ad, "qxdm_log", True)] |
| if tasks: run_multithread_func(log, tasks) |
| |
| |
| def stop_qxdm_loggers(log, ads): |
| tasks = [(stop_qxdm_logger, [ad]) for ad in ads] |
| run_multithread_func(log, tasks) |
| |
| |
| def check_qxdm_logger_mask(ad, mask_file="QC_Default.cfg"): |
| """Check if QXDM logger always on is set. |
| |
| Args: |
| ad: android device object. |
| |
| """ |
| output = ad.adb.shell( |
| "ls /data/vendor/radio/diag_logs/", ignore_status=True) |
| if not output or "No such" in output: |
| return True |
| if mask_file not in ad.adb.shell( |
| "cat /data/vendor/radio/diag_logs/diag.conf", ignore_status=True): |
| return False |
| return True |
| |
| |
| def start_nexuslogger(ad): |
| """Start Nexus/Pixel Logger Apk.""" |
| qxdm_logger_apk = None |
| for apk, activity in (("com.android.nexuslogger", ".MainActivity"), |
| ("com.android.pixellogger", |
| ".ui.main.MainActivity")): |
| if ad.is_apk_installed(apk): |
| qxdm_logger_apk = apk |
| break |
| if not qxdm_logger_apk: return |
| if ad.is_apk_running(qxdm_logger_apk): |
| if "granted=true" in ad.adb.shell( |
| "dumpsys package %s | grep WRITE_EXTERN" % qxdm_logger_apk): |
| return True |
| else: |
| ad.log.info("Kill %s" % qxdm_logger_apk) |
| ad.force_stop_apk(qxdm_logger_apk) |
| time.sleep(5) |
| for perm in ("READ", "WRITE"): |
| ad.adb.shell("pm grant %s android.permission.%s_EXTERNAL_STORAGE" % |
| (qxdm_logger_apk, perm)) |
| time.sleep(2) |
| for i in range(3): |
| ad.unlock_screen() |
| ad.log.info("Start %s Attempt %d" % (qxdm_logger_apk, i + 1)) |
| ad.adb.shell("am start -n %s/%s" % (qxdm_logger_apk, activity)) |
| time.sleep(5) |
| if ad.is_apk_running(qxdm_logger_apk): |
| ad.send_keycode("HOME") |
| return True |
| return False |
| |
| |
| def start_tcpdumps(ads, |
| test_name="", |
| begin_time=None, |
| interface="any", |
| mask="all"): |
| for ad in ads: |
| try: |
| start_adb_tcpdump( |
| ad, |
| test_name=test_name, |
| begin_time=begin_time, |
| interface=interface, |
| mask=mask) |
| except Exception as e: |
| ad.log.warning("Fail to start tcpdump due to %s", e) |
| |
| |
| def start_adb_tcpdump(ad, |
| test_name="", |
| begin_time=None, |
| interface="any", |
| mask="all"): |
| """Start tcpdump on any iface |
| |
| Args: |
| ad: android device object. |
| test_name: tcpdump file name will have this |
| |
| """ |
| out = ad.adb.shell("ls -l /data/local/tmp/tcpdump/", ignore_status=True) |
| if "No such file" in out or not out: |
| ad.adb.shell("mkdir /data/local/tmp/tcpdump") |
| else: |
| ad.adb.shell( |
| "find /data/local/tmp/tcpdump -type f -not -mtime -1800s -delete", |
| ignore_status=True) |
| ad.adb.shell( |
| "find /data/local/tmp/tcpdump -type f -size +5G -delete", |
| ignore_status=True) |
| |
| if not begin_time: |
| begin_time = get_current_epoch_time() |
| |
| out = ad.adb.shell( |
| 'ifconfig | grep -v -E "r_|-rmnet" | grep -E "lan|data"', |
| ignore_status=True, |
| timeout=180) |
| intfs = re.findall(r"(\S+).*", out) |
| if interface and interface not in ("any", "all"): |
| if interface not in intfs: return |
| intfs = [interface] |
| |
| out = ad.adb.shell("ps -ef | grep tcpdump") |
| cmds = [] |
| for intf in intfs: |
| if intf in out: |
| ad.log.info("tcpdump on interface %s is already running", intf) |
| continue |
| else: |
| log_file_name = "/data/local/tmp/tcpdump/tcpdump_%s_%s_%s_%s.pcap" \ |
| % (ad.serial, intf, test_name, begin_time) |
| if mask == "ims": |
| cmds.append( |
| "adb -s %s shell tcpdump -i %s -s0 -n -p udp port 500 or " |
| "udp port 4500 -w %s" % (ad.serial, intf, log_file_name)) |
| else: |
| cmds.append("adb -s %s shell tcpdump -i %s -s0 -w %s" % |
| (ad.serial, intf, log_file_name)) |
| if "Qualcomm" not in str(ad.adb.shell("getprop gsm.version.ril-impl")): |
| log_file_name = ("/data/local/tmp/tcpdump/tcpdump_%s_any_%s_%s.pcap" |
| % (ad.serial, test_name, begin_time)) |
| cmds.append("adb -s %s shell nohup tcpdump -i any -s0 -w %s" % |
| (ad.serial, log_file_name)) |
| for cmd in cmds: |
| ad.log.info(cmd) |
| try: |
| start_standing_subprocess(cmd, 10) |
| except Exception as e: |
| ad.log.error(e) |
| if cmds: |
| time.sleep(5) |
| |
| |
| def stop_tcpdumps(ads): |
| for ad in ads: |
| stop_adb_tcpdump(ad) |
| |
| |
| def stop_adb_tcpdump(ad, interface="any"): |
| """Stops tcpdump on any iface |
| Pulls the tcpdump file in the tcpdump dir |
| |
| Args: |
| ad: android device object. |
| |
| """ |
| if interface == "any": |
| try: |
| ad.adb.shell("killall -9 tcpdump", ignore_status=True) |
| except Exception as e: |
| ad.log.error("Killing tcpdump with exception %s", e) |
| else: |
| out = ad.adb.shell("ps -ef | grep tcpdump | grep %s" % interface) |
| if "tcpdump -i" in out: |
| pids = re.findall(r"\S+\s+(\d+).*tcpdump -i", out) |
| for pid in pids: |
| ad.adb.shell("kill -9 %s" % pid) |
| ad.adb.shell( |
| "find /data/local/tmp/tcpdump -type f -not -mtime -1800s -delete", |
| ignore_status=True) |
| |
| |
| def get_tcpdump_log(ad, test_name="", begin_time=None): |
| """Stops tcpdump on any iface |
| Pulls the tcpdump file in the tcpdump dir |
| Zips all tcpdump files |
| |
| Args: |
| ad: android device object. |
| test_name: test case name |
| begin_time: test begin time |
| """ |
| logs = ad.get_file_names("/data/local/tmp/tcpdump", begin_time=begin_time) |
| if logs: |
| ad.log.info("Pulling tcpdumps %s", logs) |
| log_path = os.path.join( |
| ad.device_log_path, "TCPDUMP_%s_%s" % (ad.model, ad.serial)) |
| os.makedirs(log_path, exist_ok=True) |
| ad.pull_files(logs, log_path) |
| shutil.make_archive(log_path, "zip", log_path) |
| shutil.rmtree(log_path) |
| return True |
| |
| |
| def wait_for_log(ad, pattern, begin_time=None, end_time=None, max_wait_time=120): |
| """Wait for logcat logs matching given pattern. This function searches in |
| logcat for strings matching given pattern by using search_logcat per second |
| until max_wait_time reaches. |
| |
| Args: |
| ad: android device object |
| pattern: pattern to be searched in grep format |
| begin_time: only the lines in logcat with time stamps later than |
| begin_time will be searched. |
| end_time: only the lines in logcat with time stamps earlier than |
| end_time will be searched. |
| max_wait_time: timeout of this function |
| |
| Returns: |
| All matched lines will be returned. If no line matches the given pattern |
| None will be returned. |
| """ |
| start_time = datetime.now() |
| while True: |
| ad.log.info( |
| '====== Searching logcat for "%s" ====== ', pattern) |
| res = ad.search_logcat( |
| pattern, begin_time=begin_time, end_time=end_time) |
| if res: |
| return res |
| time.sleep(1) |
| stop_time = datetime.now() |
| passed_time = (stop_time - start_time).total_seconds() |
| if passed_time > max_wait_time: |
| return |
| |
| |
| def extract_test_log(log, src_file, dst_file, test_tag): |
| os.makedirs(os.path.dirname(dst_file), exist_ok=True) |
| cmd = "grep -n '%s' %s" % (test_tag, src_file) |
| result = job.run(cmd, ignore_status=True) |
| if not result.stdout or result.exit_status == 1: |
| log.warning("Command %s returns %s", cmd, result) |
| return |
| line_nums = re.findall(r"(\d+).*", result.stdout) |
| if line_nums: |
| begin_line = int(line_nums[0]) |
| end_line = int(line_nums[-1]) |
| if end_line - begin_line <= 5: |
| result = job.run("wc -l < %s" % src_file) |
| if result.stdout: |
| end_line = int(result.stdout) |
| log.info("Extract %s from line %s to line %s to %s", src_file, |
| begin_line, end_line, dst_file) |
| job.run("awk 'NR >= %s && NR <= %s' %s > %s" % (begin_line, end_line, |
| src_file, dst_file)) |
| |
| |
| def log_screen_shot(ad, test_name=""): |
| file_name = "/sdcard/Pictures/screencap" |
| if test_name: |
| file_name = "%s_%s" % (file_name, test_name) |
| file_name = "%s_%s.png" % (file_name, utils.get_current_epoch_time()) |
| try: |
| ad.adb.shell("screencap -p %s" % file_name) |
| except: |
| ad.log.error("Fail to log screen shot to %s", file_name) |
| |
| |
| def get_screen_shot_log(ad, test_name="", begin_time=None): |
| logs = ad.get_file_names("/sdcard/Pictures", begin_time=begin_time) |
| if logs: |
| ad.log.info("Pulling %s", logs) |
| log_path = os.path.join(ad.device_log_path, "Screenshot_%s" % ad.serial) |
| os.makedirs(log_path, exist_ok=True) |
| ad.pull_files(logs, log_path) |
| ad.adb.shell("rm -rf /sdcard/Pictures/screencap_*", ignore_status=True) |
| |
| |
| def get_screen_shot_logs(ads, test_name="", begin_time=None): |
| for ad in ads: |
| get_screen_shot_log(ad, test_name=test_name, begin_time=begin_time) |