blob: 5f0101e362c92231a0e51b5c92e71012862eaf5f [file] [log] [blame]
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ATest execution info generator."""
from __future__ import print_function
import argparse
import glob
import json
import logging
import os
import pathlib
import re
import shutil
import sys
import time
from typing import Callable, List
from atest import atest_enum
from atest import atest_utils
from atest import constants
from atest import usb_speed_detect as usb
from atest.atest_enum import ExitCode
from atest.logstorage import log_uploader
from atest.metrics import metrics
from atest.metrics import metrics_utils
_ARGS_KEY = 'args'
_STATUS_PASSED_KEY = 'PASSED'
_STATUS_FAILED_KEY = 'FAILED'
_STATUS_IGNORED_KEY = 'IGNORED'
_SUMMARY_KEY = 'summary'
_TOTAL_SUMMARY_KEY = 'total_summary'
_TEST_RUNNER_KEY = 'test_runner'
_TEST_NAME_KEY = 'test_name'
_TEST_TIME_KEY = 'test_time'
_TEST_DETAILS_KEY = 'details'
_TEST_RESULT_NAME = 'test_result'
_TEST_RESULT_LINK = 'test_result_link'
_EXIT_CODE_ATTR = 'EXIT_CODE'
_MAIN_MODULE_KEY = '__main__'
_UUID_LEN = 30
_RESULT_LEN = 20
_RESULT_URL_LEN = 35
_COMMAND_LEN = 50
_LOGCAT_FMT = '{}/log/invocation_*/{}*device_logcat_test*'
_APK_CHANGE_DETECTOR_CLASSNAME = 'ApkChangeDetector'
_APP_INSTALL_SKIP_KEY = 'Skipping the installation of'
_APP_INSTALL_KEY = 'Installing apk'
_HOST_LOG_PREFIX = 'host_log_'
_SUMMARY_MAP_TEMPLATE = {
_STATUS_PASSED_KEY: 0,
_STATUS_FAILED_KEY: 0,
_STATUS_IGNORED_KEY: 0,
}
PREPARE_END_TIME = None
_INCLUDE_FILTER_REGEX = re.compile(
r'--atest-include-filter (?P<include_filter>[^\s]+)'
)
_INVOCATION_FOLDER_REGEX = re.compile(
r'Creating temp file at (?P<inv_folder>[^\s]+)'
)
def preparation_time(start_time):
"""Return the preparation time.
Args:
start_time: The time.
Returns:
The preparation time if PREPARE_END_TIME is set, None otherwise.
"""
return PREPARE_END_TIME - start_time if PREPARE_END_TIME else None
def symlink_latest_result(test_result_dir):
"""Make the symbolic link to latest result.
Args:
test_result_dir: A string of the dir path.
"""
symlink = os.path.join(constants.ATEST_RESULT_ROOT, 'LATEST')
if os.path.exists(symlink) or os.path.islink(symlink):
os.remove(symlink)
os.symlink(test_result_dir, symlink)
def print_test_result(root, history_arg):
"""Make a list of latest n test result.
Args:
root: A string of the test result root path.
history_arg: A string of an integer or uuid. If it's an integer string,
the number of lines of test result will be given; else it will be
treated a uuid and print test result accordingly in detail.
"""
if not history_arg.isdigit():
path = os.path.join(constants.ATEST_RESULT_ROOT, history_arg, 'test_result')
print_test_result_by_path(path)
return
target = '%s/20*_*_*' % root
paths = glob.glob(target)
paths.sort(reverse=True)
if has_url_results():
print(
'{:-^{uuid_len}} {:-^{result_len}} {:-^{result_url_len}}'
' {:-^{command_len}}'.format(
'uuid',
'result',
'result_url',
'command',
uuid_len=_UUID_LEN,
result_len=_RESULT_LEN,
result_url_len=_RESULT_URL_LEN,
command_len=_COMMAND_LEN,
)
)
else:
print(
'{:-^{uuid_len}} {:-^{result_len}} {:-^{command_len}}'.format(
'uuid',
'result',
'command',
uuid_len=_UUID_LEN,
result_len=_RESULT_LEN,
command_len=_COMMAND_LEN,
)
)
for path in paths[0 : int(history_arg) + 1]:
result_path = os.path.join(path, 'test_result')
result = atest_utils.load_json_safely(result_path)
total_summary = result.get(_TOTAL_SUMMARY_KEY, {})
summary_str = ', '.join(
[k[:1] + ':' + str(v) for k, v in total_summary.items()]
)
test_result_url = result.get(_TEST_RESULT_LINK, '')
if has_url_results():
print(
'{:<{uuid_len}} {:<{result_len}} '
'{:<{result_url_len}} atest {:<{command_len}}'.format(
os.path.basename(path),
summary_str,
test_result_url,
result.get(_ARGS_KEY, ''),
uuid_len=_UUID_LEN,
result_len=_RESULT_LEN,
result_url_len=_RESULT_URL_LEN,
command_len=_COMMAND_LEN,
)
)
else:
print(
'{:<{uuid_len}} {:<{result_len}} atest {:<{command_len}}'.format(
os.path.basename(path),
summary_str,
result.get(_ARGS_KEY, ''),
uuid_len=_UUID_LEN,
result_len=_RESULT_LEN,
command_len=_COMMAND_LEN,
)
)
def print_test_result_by_path(path):
"""Print latest test result.
Args:
path: A string of test result path.
"""
result = atest_utils.load_json_safely(path)
if not result:
return
print('\natest {}'.format(result.get(_ARGS_KEY, '')))
test_result_url = result.get(_TEST_RESULT_LINK, '')
if test_result_url:
print('\nTest Result Link: {}'.format(test_result_url))
print('\nTotal Summary:\n{}'.format(atest_utils.delimiter('-')))
total_summary = result.get(_TOTAL_SUMMARY_KEY, {})
print(', '.join([(k + ':' + str(v)) for k, v in total_summary.items()]))
fail_num = total_summary.get(_STATUS_FAILED_KEY)
if fail_num > 0:
message = '%d test failed' % fail_num
print(f'\n{atest_utils.mark_red(message)}\n{"-" * len(message)}')
test_runner = result.get(_TEST_RUNNER_KEY, {})
for runner_name in test_runner.keys():
test_dict = test_runner.get(runner_name, {})
for test_name in test_dict:
test_details = test_dict.get(test_name, {})
for fail in test_details.get(_STATUS_FAILED_KEY):
print(atest_utils.mark_red(f'{fail.get(_TEST_NAME_KEY)}'))
failure_files = glob.glob(
_LOGCAT_FMT.format(
os.path.dirname(path), fail.get(_TEST_NAME_KEY)
)
)
if failure_files:
print(
'{} {}'.format(
atest_utils.mark_cyan('LOGCAT-ON-FAILURES:'),
failure_files[0],
)
)
print(
'{} {}'.format(
atest_utils.mark_cyan('STACKTRACE:\n'),
fail.get(_TEST_DETAILS_KEY),
)
)
def has_non_test_options(args: argparse.ArgumentParser):
"""check whether non-test option in the args.
Args:
args: An argparse.ArgumentParser class instance holding parsed args.
Returns:
True, if args has at least one non-test option.
False, otherwise.
"""
return (
args.collect_tests_only
or args.dry_run
or args.history
or args.version
or args.latest_result
or args.history
)
def has_url_results():
"""Get if contains url info."""
for root, _, files in os.walk(constants.ATEST_RESULT_ROOT):
for file in files:
if file != 'test_result':
continue
json_file = os.path.join(root, 'test_result')
result = atest_utils.load_json_safely(json_file)
url_link = result.get(_TEST_RESULT_LINK, '')
if url_link:
return True
return False
def parse_test_log_and_send_app_installation_stats_metrics(
log_path: pathlib.Path,
) -> None:
"""Parse log and send app installation statistic metrics."""
if not log_path:
return
# Attempt to find all host logs
absolute_host_log_paths = glob.glob(
str(log_path / f'**/{_HOST_LOG_PREFIX}*'), recursive=True
)
if not absolute_host_log_paths:
return
skipped_count = 0
not_skipped_count = 0
try:
for host_log_path in absolute_host_log_paths:
if not os.path.isfile(host_log_path):
continue
# Open the host log and parse app installation skip metric
with open(f'{host_log_path}', 'r') as host_log_file:
for line in host_log_file:
if (
_APP_INSTALL_SKIP_KEY in line
and _APK_CHANGE_DETECTOR_CLASSNAME in line
):
skipped_count += 1
elif _APP_INSTALL_KEY in line:
# TODO(b/394384055): Check classname for unskipped APKs as well.
not_skipped_count += 1
logging.debug('%d APK(s) skipped installation.', skipped_count)
logging.debug('%d APK(s) did not skip installation.', not_skipped_count)
metrics.LocalDetectEvent(
detect_type=atest_enum.DetectType.APP_INSTALLATION_SKIPPED_COUNT,
result=skipped_count,
)
metrics.LocalDetectEvent(
detect_type=atest_enum.DetectType.APP_INSTALLATION_NOT_SKIPPED_COUNT,
result=not_skipped_count,
)
except Exception as e:
logging.debug('An error occurred when accessing certain host logs: %s', e)
def append_test_info_to_invocation_pathnames(
log_path: pathlib.Path,
) -> None:
"""Append test info to invocation paths for a better readability."""
if not log_path:
return
# Attempt to find all host logs
absolute_host_log_paths = list(log_path.glob(f'**/{_HOST_LOG_PREFIX}*'))
if not absolute_host_log_paths:
return
try:
for host_log_path in absolute_host_log_paths:
if not host_log_path.is_file():
continue
# Open the host log and parse test filter and invocation folder names.
with open(f'{host_log_path}', 'r') as host_log_file:
test_filter = ''
invocation_folder_name = ''
for line in host_log_file:
if not test_filter:
include_filters = []
for match in _INCLUDE_FILTER_REGEX.finditer(line):
single_test_filter = (
match.group('include_filter')
.replace(':', '_')
.replace('#', '_')
.replace('?', '_')
.replace('*', '_')
)
include_filters.append(single_test_filter)
if include_filters:
test_filter = '_'.join(include_filters)
if not invocation_folder_name:
match = _INVOCATION_FOLDER_REGEX.search(line)
if match:
invocation_folder_name = match.group('inv_folder')
if invocation_folder_name and test_filter:
break
if invocation_folder_name and test_filter:
new_inv_pathname = f'{invocation_folder_name}__{test_filter}'
logging.debug(
'Renaming %s to %s',
invocation_folder_name,
new_inv_pathname,
)
pathlib.Path(invocation_folder_name).replace(new_inv_pathname)
except Exception as e:
logging.debug('An error occurred when accessing certain host log: %s', e)
class AtestExecutionInfo:
"""Class that stores the whole test progress information in JSON format.
----
For example, running command
atest hello_world_test HelloWorldTest
will result in storing the execution detail in JSON:
{
"args": "hello_world_test HelloWorldTest",
"test_runner": {
"AtestTradefedTestRunner": {
"hello_world_test": {
"FAILED": [
{"test_time": "(5ms)",
"details": "Hello, Wor...",
"test_name": "HelloWorldTest#PrintHelloWorld"}
],
"summary": {"FAILED": 1, "PASSED": 0, "IGNORED": 0}
},
"HelloWorldTests": {
"PASSED": [
{"test_time": "(27ms)",
"details": null,
"test_name": "...HelloWorldTest#testHalloWelt"},
{"test_time": "(1ms)",
"details": null,
"test_name": "....HelloWorldTest#testHelloWorld"}
],
"summary": {"FAILED": 0, "PASSED": 2, "IGNORED": 0}
}
}
},
"total_summary": {"FAILED": 1, "PASSED": 2, "IGNORED": 0}
}
"""
result_reporters = []
def __init__(
self,
args: List[str],
work_dir: str,
args_ns: argparse.ArgumentParser,
get_exit_code_func: Callable[[], int] = None,
start_time: float = None,
repo_out_dir: pathlib.Path = None,
):
"""Initialise an AtestExecutionInfo instance.
Args:
args: Command line parameters.
work_dir: The directory for saving information.
args_ns: An argparse.ArgumentParser class instance holding parsed args.
get_exit_code_func: A callable that returns the exit_code value.
start_time: The execution start time. Can be None.
repo_out_dir: The repo output directory. Can be None.
Returns:
A json format string.
"""
self.args = args
self.smart_test_selection = '--sts' in args
self.work_dir = work_dir
self.result_file_obj = None
self.args_ns = args_ns
self.get_exit_code_func = get_exit_code_func
self.test_result = os.path.join(self.work_dir, _TEST_RESULT_NAME)
self._proc_usb_speed = None
logging.debug(
'A %s object is created with args %s, work_dir %s',
__class__,
args,
work_dir,
)
self._start_time = start_time if start_time is not None else time.time()
self._repo_out_dir = (
repo_out_dir
if repo_out_dir is not None
else atest_utils.get_build_out_dir()
)
def __enter__(self):
"""Create and return information file object."""
try:
self.result_file_obj = open(self.test_result, 'w')
except IOError:
atest_utils.print_and_log_error('Cannot open file %s', self.test_result)
self._proc_usb_speed = atest_utils.run_multi_proc(
func=self._send_usb_metrics_and_warning
)
return self.result_file_obj
def __exit__(self, exit_type, value, traceback):
"""Write execution information and close information file."""
if self._proc_usb_speed:
# Usb speed detection is not an obligatory function of atest,
# so it can be skipped if the process hasn't finished by the time atest
# is ready to exit.
if self._proc_usb_speed.is_alive():
self._proc_usb_speed.terminate()
log_path = pathlib.Path(self.work_dir)
build_log_path = log_path / 'build_logs'
build_log_path.mkdir()
AtestExecutionInfo._copy_build_artifacts_to_log_dir(
self._start_time,
time.time(),
self._repo_out_dir,
build_log_path,
'build.trace',
)
AtestExecutionInfo._copy_build_artifacts_to_log_dir(
self._start_time,
time.time(),
self._repo_out_dir,
build_log_path,
'verbose.log',
)
if self.get_exit_code_func:
main_exit_code = self.get_exit_code_func()
else:
main_module = sys.modules.get(_MAIN_MODULE_KEY)
main_exit_code = (
value.code
if isinstance(value, SystemExit)
else (getattr(main_module, _EXIT_CODE_ATTR, ExitCode.ERROR))
)
print()
if log_path:
print(f'Test logs: {log_path / "log"}')
parse_test_log_and_send_app_installation_stats_metrics(log_path)
if self.smart_test_selection:
append_test_info_to_invocation_pathnames(log_path)
html_path = None
if self.result_file_obj and not has_non_test_options(self.args_ns):
self.result_file_obj.write(
AtestExecutionInfo._generate_execution_detail(self.args)
)
self.result_file_obj.close()
atest_utils.prompt_suggestions(self.test_result)
html_path = atest_utils.generate_result_html(self.test_result)
symlink_latest_result(self.work_dir)
log_link = html_path if html_path else log_path
if log_link:
print(atest_utils.mark_magenta(f'Log file list: file://{log_link}'))
bug_report_url = AtestExecutionInfo._create_bug_report_url()
if bug_report_url:
print(atest_utils.mark_magenta(f'Report an issue: {bug_report_url}'))
print()
# Do not send stacktrace with send_exit_event when exit code is not
# ERROR.
if main_exit_code != ExitCode.ERROR:
logging.debug('send_exit_event:%s', main_exit_code)
metrics_utils.send_exit_event(main_exit_code)
else:
logging.debug('handle_exc_and_send_exit_event:%s', main_exit_code)
metrics_utils.handle_exc_and_send_exit_event(main_exit_code)
if log_uploader.is_uploading_logs():
log_uploader.upload_logs_detached(log_path)
def _send_usb_metrics_and_warning(self):
# Read the USB speed and send usb metrics.
device_ids = usb.get_adb_device_identifiers()
if not device_ids:
return
usb_speed_dir_name = usb.get_udc_driver_usb_device_dir_name()
if not usb_speed_dir_name:
return
usb_negotiated_speed = usb.get_udc_driver_usb_device_attribute_speed_value(
usb_speed_dir_name, usb.UsbAttributeName.NEGOTIATED_SPEED
)
usb_max_speed = usb.get_udc_driver_usb_device_attribute_speed_value(
usb_speed_dir_name, usb.UsbAttributeName.MAXIMUM_SPEED
)
usb.verify_and_print_usb_speed_warning(
device_ids, usb_negotiated_speed, usb_max_speed
)
metrics.LocalDetectEvent(
detect_type=atest_enum.DetectType.USB_NEGOTIATED_SPEED,
result=usb_negotiated_speed,
)
metrics.LocalDetectEvent(
detect_type=atest_enum.DetectType.USB_MAX_SPEED,
result=usb_max_speed,
)
@staticmethod
def _create_bug_report_url() -> str:
if not metrics.is_internal_user():
return ''
if not log_uploader.is_uploading_logs():
return 'http://go/new-atest-issue'
return f'http://go/from-atest-runid/{metrics.get_run_id()}'
@staticmethod
def _copy_build_artifacts_to_log_dir(
start_time: float,
end_time: float,
repo_out_path: pathlib.Path,
log_path: pathlib.Path,
file_name_prefix: str,
):
"""Copy build trace files to log directory.
Params:
start_time: The start time of the build.
end_time: The end time of the build.
repo_out_path: The path to the repo out directory.
log_path: The path to the log directory.
file_name_prefix: The prefix of the file name.
"""
for file in repo_out_path.iterdir():
if (
file.is_file()
and file.name.startswith(file_name_prefix)
and start_time <= file.stat().st_mtime <= end_time
):
shutil.copy(file, log_path / file.name)
@staticmethod
def _generate_execution_detail(args):
"""Generate execution detail.
Args:
args: Command line parameters that you want to save.
Returns:
A json format string.
"""
info_dict = {_ARGS_KEY: ' '.join(args)}
try:
AtestExecutionInfo._arrange_test_result(
info_dict, AtestExecutionInfo.result_reporters
)
return json.dumps(info_dict)
except ValueError as err:
atest_utils.print_and_log_warning(
'Parsing test result failed due to : %s', err
)
return {}
@staticmethod
def _arrange_test_result(info_dict, reporters):
"""Append test result information in given dict.
Arrange test information to below
"test_runner": {
"test runner name": {
"test name": {
"FAILED": [
{"test time": "",
"details": "",
"test name": ""}
],
"summary": {"FAILED": 0, "PASSED": 0, "IGNORED": 0}
},
},
"total_summary": {"FAILED": 0, "PASSED": 0, "IGNORED": 0}
Args:
info_dict: A dict you want to add result information in.
reporters: A list of result_reporter.
Returns:
A dict contains test result information data.
"""
info_dict[_TEST_RUNNER_KEY] = {}
for reporter in reporters:
if reporter.test_result_link:
info_dict[_TEST_RESULT_LINK] = reporter.test_result_link
for test in reporter.all_test_results:
runner = info_dict[_TEST_RUNNER_KEY].setdefault(test.runner_name, {})
group = runner.setdefault(test.group_name, {})
result_dict = {
_TEST_NAME_KEY: test.test_name,
_TEST_TIME_KEY: test.test_time,
_TEST_DETAILS_KEY: test.details,
}
group.setdefault(test.status, []).append(result_dict)
total_test_group_summary = _SUMMARY_MAP_TEMPLATE.copy()
for runner in info_dict[_TEST_RUNNER_KEY]:
for group in info_dict[_TEST_RUNNER_KEY][runner]:
group_summary = _SUMMARY_MAP_TEMPLATE.copy()
for status in info_dict[_TEST_RUNNER_KEY][runner][group]:
count = len(info_dict[_TEST_RUNNER_KEY][runner][group][status])
if status in _SUMMARY_MAP_TEMPLATE:
group_summary[status] = count
total_test_group_summary[status] += count
info_dict[_TEST_RUNNER_KEY][runner][group][_SUMMARY_KEY] = group_summary
info_dict[_TOTAL_SUMMARY_KEY] = total_test_group_summary
return info_dict