| # Copyright 2019, The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ATest execution info generator.""" |
| |
| |
| from __future__ import print_function |
| |
| import argparse |
| import glob |
| import json |
| import logging |
| import os |
| import pathlib |
| import re |
| import shutil |
| import sys |
| import time |
| from typing import Callable, List |
| |
| from atest import atest_enum |
| from atest import atest_utils |
| from atest import constants |
| from atest import usb_speed_detect as usb |
| from atest.atest_enum import ExitCode |
| from atest.logstorage import log_uploader |
| from atest.metrics import metrics |
| from atest.metrics import metrics_utils |
| |
| _ARGS_KEY = 'args' |
| _STATUS_PASSED_KEY = 'PASSED' |
| _STATUS_FAILED_KEY = 'FAILED' |
| _STATUS_IGNORED_KEY = 'IGNORED' |
| _SUMMARY_KEY = 'summary' |
| _TOTAL_SUMMARY_KEY = 'total_summary' |
| _TEST_RUNNER_KEY = 'test_runner' |
| _TEST_NAME_KEY = 'test_name' |
| _TEST_TIME_KEY = 'test_time' |
| _TEST_DETAILS_KEY = 'details' |
| _TEST_RESULT_NAME = 'test_result' |
| _TEST_RESULT_LINK = 'test_result_link' |
| _EXIT_CODE_ATTR = 'EXIT_CODE' |
| _MAIN_MODULE_KEY = '__main__' |
| _UUID_LEN = 30 |
| _RESULT_LEN = 20 |
| _RESULT_URL_LEN = 35 |
| _COMMAND_LEN = 50 |
| _LOGCAT_FMT = '{}/log/invocation_*/{}*device_logcat_test*' |
| _APK_CHANGE_DETECTOR_CLASSNAME = 'ApkChangeDetector' |
| _APP_INSTALL_SKIP_KEY = 'Skipping the installation of' |
| _APP_INSTALL_KEY = 'Installing apk' |
| _HOST_LOG_PREFIX = 'host_log_' |
| |
| _SUMMARY_MAP_TEMPLATE = { |
| _STATUS_PASSED_KEY: 0, |
| _STATUS_FAILED_KEY: 0, |
| _STATUS_IGNORED_KEY: 0, |
| } |
| |
| PREPARE_END_TIME = None |
| |
| _INCLUDE_FILTER_REGEX = re.compile( |
| r'--atest-include-filter (?P<include_filter>[^\s]+)' |
| ) |
| _INVOCATION_FOLDER_REGEX = re.compile( |
| r'Creating temp file at (?P<inv_folder>[^\s]+)' |
| ) |
| |
| |
| def preparation_time(start_time): |
| """Return the preparation time. |
| |
| Args: |
| start_time: The time. |
| |
| Returns: |
| The preparation time if PREPARE_END_TIME is set, None otherwise. |
| """ |
| return PREPARE_END_TIME - start_time if PREPARE_END_TIME else None |
| |
| |
| def symlink_latest_result(test_result_dir): |
| """Make the symbolic link to latest result. |
| |
| Args: |
| test_result_dir: A string of the dir path. |
| """ |
| symlink = os.path.join(constants.ATEST_RESULT_ROOT, 'LATEST') |
| if os.path.exists(symlink) or os.path.islink(symlink): |
| os.remove(symlink) |
| os.symlink(test_result_dir, symlink) |
| |
| |
| def print_test_result(root, history_arg): |
| """Make a list of latest n test result. |
| |
| Args: |
| root: A string of the test result root path. |
| history_arg: A string of an integer or uuid. If it's an integer string, |
| the number of lines of test result will be given; else it will be |
| treated a uuid and print test result accordingly in detail. |
| """ |
| if not history_arg.isdigit(): |
| path = os.path.join(constants.ATEST_RESULT_ROOT, history_arg, 'test_result') |
| print_test_result_by_path(path) |
| return |
| target = '%s/20*_*_*' % root |
| paths = glob.glob(target) |
| paths.sort(reverse=True) |
| if has_url_results(): |
| print( |
| '{:-^{uuid_len}} {:-^{result_len}} {:-^{result_url_len}}' |
| ' {:-^{command_len}}'.format( |
| 'uuid', |
| 'result', |
| 'result_url', |
| 'command', |
| uuid_len=_UUID_LEN, |
| result_len=_RESULT_LEN, |
| result_url_len=_RESULT_URL_LEN, |
| command_len=_COMMAND_LEN, |
| ) |
| ) |
| else: |
| print( |
| '{:-^{uuid_len}} {:-^{result_len}} {:-^{command_len}}'.format( |
| 'uuid', |
| 'result', |
| 'command', |
| uuid_len=_UUID_LEN, |
| result_len=_RESULT_LEN, |
| command_len=_COMMAND_LEN, |
| ) |
| ) |
| for path in paths[0 : int(history_arg) + 1]: |
| result_path = os.path.join(path, 'test_result') |
| result = atest_utils.load_json_safely(result_path) |
| total_summary = result.get(_TOTAL_SUMMARY_KEY, {}) |
| summary_str = ', '.join( |
| [k[:1] + ':' + str(v) for k, v in total_summary.items()] |
| ) |
| test_result_url = result.get(_TEST_RESULT_LINK, '') |
| if has_url_results(): |
| print( |
| '{:<{uuid_len}} {:<{result_len}} ' |
| '{:<{result_url_len}} atest {:<{command_len}}'.format( |
| os.path.basename(path), |
| summary_str, |
| test_result_url, |
| result.get(_ARGS_KEY, ''), |
| uuid_len=_UUID_LEN, |
| result_len=_RESULT_LEN, |
| result_url_len=_RESULT_URL_LEN, |
| command_len=_COMMAND_LEN, |
| ) |
| ) |
| else: |
| print( |
| '{:<{uuid_len}} {:<{result_len}} atest {:<{command_len}}'.format( |
| os.path.basename(path), |
| summary_str, |
| result.get(_ARGS_KEY, ''), |
| uuid_len=_UUID_LEN, |
| result_len=_RESULT_LEN, |
| command_len=_COMMAND_LEN, |
| ) |
| ) |
| |
| |
| def print_test_result_by_path(path): |
| """Print latest test result. |
| |
| Args: |
| path: A string of test result path. |
| """ |
| result = atest_utils.load_json_safely(path) |
| if not result: |
| return |
| print('\natest {}'.format(result.get(_ARGS_KEY, ''))) |
| test_result_url = result.get(_TEST_RESULT_LINK, '') |
| if test_result_url: |
| print('\nTest Result Link: {}'.format(test_result_url)) |
| print('\nTotal Summary:\n{}'.format(atest_utils.delimiter('-'))) |
| total_summary = result.get(_TOTAL_SUMMARY_KEY, {}) |
| print(', '.join([(k + ':' + str(v)) for k, v in total_summary.items()])) |
| fail_num = total_summary.get(_STATUS_FAILED_KEY) |
| if fail_num > 0: |
| message = '%d test failed' % fail_num |
| print(f'\n{atest_utils.mark_red(message)}\n{"-" * len(message)}') |
| test_runner = result.get(_TEST_RUNNER_KEY, {}) |
| for runner_name in test_runner.keys(): |
| test_dict = test_runner.get(runner_name, {}) |
| for test_name in test_dict: |
| test_details = test_dict.get(test_name, {}) |
| for fail in test_details.get(_STATUS_FAILED_KEY): |
| print(atest_utils.mark_red(f'{fail.get(_TEST_NAME_KEY)}')) |
| failure_files = glob.glob( |
| _LOGCAT_FMT.format( |
| os.path.dirname(path), fail.get(_TEST_NAME_KEY) |
| ) |
| ) |
| if failure_files: |
| print( |
| '{} {}'.format( |
| atest_utils.mark_cyan('LOGCAT-ON-FAILURES:'), |
| failure_files[0], |
| ) |
| ) |
| print( |
| '{} {}'.format( |
| atest_utils.mark_cyan('STACKTRACE:\n'), |
| fail.get(_TEST_DETAILS_KEY), |
| ) |
| ) |
| |
| |
| def has_non_test_options(args: argparse.ArgumentParser): |
| """check whether non-test option in the args. |
| |
| Args: |
| args: An argparse.ArgumentParser class instance holding parsed args. |
| |
| Returns: |
| True, if args has at least one non-test option. |
| False, otherwise. |
| """ |
| return ( |
| args.collect_tests_only |
| or args.dry_run |
| or args.history |
| or args.version |
| or args.latest_result |
| or args.history |
| ) |
| |
| |
| def has_url_results(): |
| """Get if contains url info.""" |
| for root, _, files in os.walk(constants.ATEST_RESULT_ROOT): |
| for file in files: |
| if file != 'test_result': |
| continue |
| json_file = os.path.join(root, 'test_result') |
| result = atest_utils.load_json_safely(json_file) |
| url_link = result.get(_TEST_RESULT_LINK, '') |
| if url_link: |
| return True |
| return False |
| |
| |
| def parse_test_log_and_send_app_installation_stats_metrics( |
| log_path: pathlib.Path, |
| ) -> None: |
| """Parse log and send app installation statistic metrics.""" |
| if not log_path: |
| return |
| |
| # Attempt to find all host logs |
| absolute_host_log_paths = glob.glob( |
| str(log_path / f'**/{_HOST_LOG_PREFIX}*'), recursive=True |
| ) |
| |
| if not absolute_host_log_paths: |
| return |
| |
| skipped_count = 0 |
| not_skipped_count = 0 |
| try: |
| for host_log_path in absolute_host_log_paths: |
| if not os.path.isfile(host_log_path): |
| continue |
| |
| # Open the host log and parse app installation skip metric |
| with open(f'{host_log_path}', 'r') as host_log_file: |
| for line in host_log_file: |
| if ( |
| _APP_INSTALL_SKIP_KEY in line |
| and _APK_CHANGE_DETECTOR_CLASSNAME in line |
| ): |
| skipped_count += 1 |
| elif _APP_INSTALL_KEY in line: |
| # TODO(b/394384055): Check classname for unskipped APKs as well. |
| not_skipped_count += 1 |
| logging.debug('%d APK(s) skipped installation.', skipped_count) |
| logging.debug('%d APK(s) did not skip installation.', not_skipped_count) |
| metrics.LocalDetectEvent( |
| detect_type=atest_enum.DetectType.APP_INSTALLATION_SKIPPED_COUNT, |
| result=skipped_count, |
| ) |
| metrics.LocalDetectEvent( |
| detect_type=atest_enum.DetectType.APP_INSTALLATION_NOT_SKIPPED_COUNT, |
| result=not_skipped_count, |
| ) |
| except Exception as e: |
| logging.debug('An error occurred when accessing certain host logs: %s', e) |
| |
| |
| def append_test_info_to_invocation_pathnames( |
| log_path: pathlib.Path, |
| ) -> None: |
| """Append test info to invocation paths for a better readability.""" |
| if not log_path: |
| return |
| |
| # Attempt to find all host logs |
| absolute_host_log_paths = list(log_path.glob(f'**/{_HOST_LOG_PREFIX}*')) |
| |
| if not absolute_host_log_paths: |
| return |
| |
| try: |
| for host_log_path in absolute_host_log_paths: |
| if not host_log_path.is_file(): |
| continue |
| |
| # Open the host log and parse test filter and invocation folder names. |
| with open(f'{host_log_path}', 'r') as host_log_file: |
| test_filter = '' |
| invocation_folder_name = '' |
| for line in host_log_file: |
| if not test_filter: |
| include_filters = [] |
| for match in _INCLUDE_FILTER_REGEX.finditer(line): |
| single_test_filter = ( |
| match.group('include_filter') |
| .replace(':', '_') |
| .replace('#', '_') |
| .replace('?', '_') |
| .replace('*', '_') |
| ) |
| include_filters.append(single_test_filter) |
| if include_filters: |
| test_filter = '_'.join(include_filters) |
| if not invocation_folder_name: |
| match = _INVOCATION_FOLDER_REGEX.search(line) |
| if match: |
| invocation_folder_name = match.group('inv_folder') |
| |
| if invocation_folder_name and test_filter: |
| break |
| |
| if invocation_folder_name and test_filter: |
| new_inv_pathname = f'{invocation_folder_name}__{test_filter}' |
| logging.debug( |
| 'Renaming %s to %s', |
| invocation_folder_name, |
| new_inv_pathname, |
| ) |
| pathlib.Path(invocation_folder_name).replace(new_inv_pathname) |
| except Exception as e: |
| logging.debug('An error occurred when accessing certain host log: %s', e) |
| |
| |
| class AtestExecutionInfo: |
| """Class that stores the whole test progress information in JSON format. |
| |
| ---- |
| For example, running command |
| atest hello_world_test HelloWorldTest |
| |
| will result in storing the execution detail in JSON: |
| { |
| "args": "hello_world_test HelloWorldTest", |
| "test_runner": { |
| "AtestTradefedTestRunner": { |
| "hello_world_test": { |
| "FAILED": [ |
| {"test_time": "(5ms)", |
| "details": "Hello, Wor...", |
| "test_name": "HelloWorldTest#PrintHelloWorld"} |
| ], |
| "summary": {"FAILED": 1, "PASSED": 0, "IGNORED": 0} |
| }, |
| "HelloWorldTests": { |
| "PASSED": [ |
| {"test_time": "(27ms)", |
| "details": null, |
| "test_name": "...HelloWorldTest#testHalloWelt"}, |
| {"test_time": "(1ms)", |
| "details": null, |
| "test_name": "....HelloWorldTest#testHelloWorld"} |
| ], |
| "summary": {"FAILED": 0, "PASSED": 2, "IGNORED": 0} |
| } |
| } |
| }, |
| "total_summary": {"FAILED": 1, "PASSED": 2, "IGNORED": 0} |
| } |
| """ |
| |
| result_reporters = [] |
| |
| def __init__( |
| self, |
| args: List[str], |
| work_dir: str, |
| args_ns: argparse.ArgumentParser, |
| get_exit_code_func: Callable[[], int] = None, |
| start_time: float = None, |
| repo_out_dir: pathlib.Path = None, |
| ): |
| """Initialise an AtestExecutionInfo instance. |
| |
| Args: |
| args: Command line parameters. |
| work_dir: The directory for saving information. |
| args_ns: An argparse.ArgumentParser class instance holding parsed args. |
| get_exit_code_func: A callable that returns the exit_code value. |
| start_time: The execution start time. Can be None. |
| repo_out_dir: The repo output directory. Can be None. |
| |
| Returns: |
| A json format string. |
| """ |
| self.args = args |
| self.smart_test_selection = '--sts' in args |
| self.work_dir = work_dir |
| self.result_file_obj = None |
| self.args_ns = args_ns |
| self.get_exit_code_func = get_exit_code_func |
| self.test_result = os.path.join(self.work_dir, _TEST_RESULT_NAME) |
| self._proc_usb_speed = None |
| logging.debug( |
| 'A %s object is created with args %s, work_dir %s', |
| __class__, |
| args, |
| work_dir, |
| ) |
| self._start_time = start_time if start_time is not None else time.time() |
| self._repo_out_dir = ( |
| repo_out_dir |
| if repo_out_dir is not None |
| else atest_utils.get_build_out_dir() |
| ) |
| |
| def __enter__(self): |
| """Create and return information file object.""" |
| try: |
| self.result_file_obj = open(self.test_result, 'w') |
| except IOError: |
| atest_utils.print_and_log_error('Cannot open file %s', self.test_result) |
| |
| self._proc_usb_speed = atest_utils.run_multi_proc( |
| func=self._send_usb_metrics_and_warning |
| ) |
| |
| return self.result_file_obj |
| |
| def __exit__(self, exit_type, value, traceback): |
| """Write execution information and close information file.""" |
| |
| if self._proc_usb_speed: |
| # Usb speed detection is not an obligatory function of atest, |
| # so it can be skipped if the process hasn't finished by the time atest |
| # is ready to exit. |
| if self._proc_usb_speed.is_alive(): |
| self._proc_usb_speed.terminate() |
| |
| log_path = pathlib.Path(self.work_dir) |
| |
| build_log_path = log_path / 'build_logs' |
| build_log_path.mkdir() |
| AtestExecutionInfo._copy_build_artifacts_to_log_dir( |
| self._start_time, |
| time.time(), |
| self._repo_out_dir, |
| build_log_path, |
| 'build.trace', |
| ) |
| AtestExecutionInfo._copy_build_artifacts_to_log_dir( |
| self._start_time, |
| time.time(), |
| self._repo_out_dir, |
| build_log_path, |
| 'verbose.log', |
| ) |
| |
| if self.get_exit_code_func: |
| main_exit_code = self.get_exit_code_func() |
| else: |
| main_module = sys.modules.get(_MAIN_MODULE_KEY) |
| main_exit_code = ( |
| value.code |
| if isinstance(value, SystemExit) |
| else (getattr(main_module, _EXIT_CODE_ATTR, ExitCode.ERROR)) |
| ) |
| |
| print() |
| if log_path: |
| print(f'Test logs: {log_path / "log"}') |
| parse_test_log_and_send_app_installation_stats_metrics(log_path) |
| if self.smart_test_selection: |
| append_test_info_to_invocation_pathnames(log_path) |
| |
| html_path = None |
| if self.result_file_obj and not has_non_test_options(self.args_ns): |
| self.result_file_obj.write( |
| AtestExecutionInfo._generate_execution_detail(self.args) |
| ) |
| self.result_file_obj.close() |
| atest_utils.prompt_suggestions(self.test_result) |
| html_path = atest_utils.generate_result_html(self.test_result) |
| symlink_latest_result(self.work_dir) |
| |
| log_link = html_path if html_path else log_path |
| if log_link: |
| print(atest_utils.mark_magenta(f'Log file list: file://{log_link}')) |
| bug_report_url = AtestExecutionInfo._create_bug_report_url() |
| if bug_report_url: |
| print(atest_utils.mark_magenta(f'Report an issue: {bug_report_url}')) |
| print() |
| |
| # Do not send stacktrace with send_exit_event when exit code is not |
| # ERROR. |
| if main_exit_code != ExitCode.ERROR: |
| logging.debug('send_exit_event:%s', main_exit_code) |
| metrics_utils.send_exit_event(main_exit_code) |
| else: |
| logging.debug('handle_exc_and_send_exit_event:%s', main_exit_code) |
| metrics_utils.handle_exc_and_send_exit_event(main_exit_code) |
| |
| if log_uploader.is_uploading_logs(): |
| log_uploader.upload_logs_detached(log_path) |
| |
| def _send_usb_metrics_and_warning(self): |
| # Read the USB speed and send usb metrics. |
| device_ids = usb.get_adb_device_identifiers() |
| if not device_ids: |
| return |
| |
| usb_speed_dir_name = usb.get_udc_driver_usb_device_dir_name() |
| if not usb_speed_dir_name: |
| return |
| |
| usb_negotiated_speed = usb.get_udc_driver_usb_device_attribute_speed_value( |
| usb_speed_dir_name, usb.UsbAttributeName.NEGOTIATED_SPEED |
| ) |
| usb_max_speed = usb.get_udc_driver_usb_device_attribute_speed_value( |
| usb_speed_dir_name, usb.UsbAttributeName.MAXIMUM_SPEED |
| ) |
| usb.verify_and_print_usb_speed_warning( |
| device_ids, usb_negotiated_speed, usb_max_speed |
| ) |
| |
| metrics.LocalDetectEvent( |
| detect_type=atest_enum.DetectType.USB_NEGOTIATED_SPEED, |
| result=usb_negotiated_speed, |
| ) |
| metrics.LocalDetectEvent( |
| detect_type=atest_enum.DetectType.USB_MAX_SPEED, |
| result=usb_max_speed, |
| ) |
| |
| @staticmethod |
| def _create_bug_report_url() -> str: |
| if not metrics.is_internal_user(): |
| return '' |
| if not log_uploader.is_uploading_logs(): |
| return 'http://go/new-atest-issue' |
| return f'http://go/from-atest-runid/{metrics.get_run_id()}' |
| |
| @staticmethod |
| def _copy_build_artifacts_to_log_dir( |
| start_time: float, |
| end_time: float, |
| repo_out_path: pathlib.Path, |
| log_path: pathlib.Path, |
| file_name_prefix: str, |
| ): |
| """Copy build trace files to log directory. |
| |
| Params: |
| start_time: The start time of the build. |
| end_time: The end time of the build. |
| repo_out_path: The path to the repo out directory. |
| log_path: The path to the log directory. |
| file_name_prefix: The prefix of the file name. |
| """ |
| for file in repo_out_path.iterdir(): |
| if ( |
| file.is_file() |
| and file.name.startswith(file_name_prefix) |
| and start_time <= file.stat().st_mtime <= end_time |
| ): |
| shutil.copy(file, log_path / file.name) |
| |
| @staticmethod |
| def _generate_execution_detail(args): |
| """Generate execution detail. |
| |
| Args: |
| args: Command line parameters that you want to save. |
| |
| Returns: |
| A json format string. |
| """ |
| info_dict = {_ARGS_KEY: ' '.join(args)} |
| try: |
| AtestExecutionInfo._arrange_test_result( |
| info_dict, AtestExecutionInfo.result_reporters |
| ) |
| return json.dumps(info_dict) |
| except ValueError as err: |
| atest_utils.print_and_log_warning( |
| 'Parsing test result failed due to : %s', err |
| ) |
| return {} |
| |
| @staticmethod |
| def _arrange_test_result(info_dict, reporters): |
| """Append test result information in given dict. |
| |
| Arrange test information to below |
| "test_runner": { |
| "test runner name": { |
| "test name": { |
| "FAILED": [ |
| {"test time": "", |
| "details": "", |
| "test name": ""} |
| ], |
| "summary": {"FAILED": 0, "PASSED": 0, "IGNORED": 0} |
| }, |
| }, |
| "total_summary": {"FAILED": 0, "PASSED": 0, "IGNORED": 0} |
| |
| Args: |
| info_dict: A dict you want to add result information in. |
| reporters: A list of result_reporter. |
| |
| Returns: |
| A dict contains test result information data. |
| """ |
| info_dict[_TEST_RUNNER_KEY] = {} |
| for reporter in reporters: |
| if reporter.test_result_link: |
| info_dict[_TEST_RESULT_LINK] = reporter.test_result_link |
| for test in reporter.all_test_results: |
| runner = info_dict[_TEST_RUNNER_KEY].setdefault(test.runner_name, {}) |
| group = runner.setdefault(test.group_name, {}) |
| result_dict = { |
| _TEST_NAME_KEY: test.test_name, |
| _TEST_TIME_KEY: test.test_time, |
| _TEST_DETAILS_KEY: test.details, |
| } |
| group.setdefault(test.status, []).append(result_dict) |
| |
| total_test_group_summary = _SUMMARY_MAP_TEMPLATE.copy() |
| for runner in info_dict[_TEST_RUNNER_KEY]: |
| for group in info_dict[_TEST_RUNNER_KEY][runner]: |
| group_summary = _SUMMARY_MAP_TEMPLATE.copy() |
| for status in info_dict[_TEST_RUNNER_KEY][runner][group]: |
| count = len(info_dict[_TEST_RUNNER_KEY][runner][group][status]) |
| if status in _SUMMARY_MAP_TEMPLATE: |
| group_summary[status] = count |
| total_test_group_summary[status] += count |
| info_dict[_TEST_RUNNER_KEY][runner][group][_SUMMARY_KEY] = group_summary |
| info_dict[_TOTAL_SUMMARY_KEY] = total_test_group_summary |
| return info_dict |