| # Copyright 2017, The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Atest Tradefed test runner class.""" |
| |
| # pylint: disable=line-too-long |
| # pylint: disable=too-many-lines |
| |
| from __future__ import print_function |
| |
| import json |
| import logging |
| import os |
| import re |
| import select |
| import shutil |
| import socket |
| |
| from functools import partial |
| from pathlib import Path |
| from typing import Any, Dict, List, Set, Tuple |
| |
| from atest import atest_configs |
| from atest import atest_error |
| from atest import atest_utils |
| from atest import constants |
| from atest import module_info |
| from atest import result_reporter |
| |
| from atest.atest_enum import DetectType, ExitCode |
| from atest.coverage import coverage |
| from atest.logstorage import atest_gcp_utils |
| from atest.logstorage import logstorage_utils |
| from atest.metrics import metrics |
| from atest.test_finders import test_finder_utils |
| from atest.test_finders import test_info |
| from atest.test_runners import test_runner_base as trb |
| from atest.test_runners.event_handler import EventHandler |
| |
| POLL_FREQ_SECS = 10 |
| SOCKET_HOST = '127.0.0.1' |
| SOCKET_QUEUE_MAX = 1 |
| SOCKET_BUFFER = 4096 |
| SELECT_TIMEOUT = 0.5 |
| |
| # Socket Events of form FIRST_EVENT {JSON_DATA}\nSECOND_EVENT {JSON_DATA} |
| # EVENT_RE has groups for the name and the data. "." does not match \n. |
| EVENT_RE = re.compile(r'\n*(?P<event_name>[A-Z_]+) (?P<json_data>{.*})(?=\n|.)*') |
| |
| # Remove aapt from build dependency, use prebuilt version instead. |
| EXEC_DEPENDENCIES = ('adb', 'fastboot') |
| |
| LOG_FOLDER_NAME = 'log' |
| |
| _INTEGRATION_FINDERS = frozenset(['', 'INTEGRATION', 'INTEGRATION_FILE_PATH']) |
| |
| # AAPT binary name |
| _AAPT = 'aapt' |
| |
| # The exist code mapping of tradefed. |
| _TF_EXIT_CODE = [ |
| 'NO_ERROR', |
| 'CONFIG_EXCEPTION', |
| 'NO_BUILD', |
| 'DEVICE_UNRESPONSIVE', |
| 'DEVICE_UNAVAILABLE', |
| 'FATAL_HOST_ERROR', |
| 'THROWABLE_EXCEPTION', |
| 'NO_DEVICE_ALLOCATED', |
| 'WRONG_JAVA_VERSION'] |
| |
| MAINLINE_LOCAL_DOC = 'go/mainline-local-build' |
| |
| class TradeFedExitError(Exception): |
| """Raised when TradeFed exists before test run has finished.""" |
| def __init__(self, exit_code): |
| super().__init__() |
| self.exit_code = exit_code |
| |
| def __str__(self): |
| tf_error_reason = self._get_exit_reason(self.exit_code) |
| return (f'TradeFed subprocess exited early with exit code=' |
| f'{self.exit_code}({tf_error_reason}).') |
| |
| def _get_exit_reason(self, exit_code): |
| if 0 < exit_code < len(_TF_EXIT_CODE): |
| return atest_utils.colorize(_TF_EXIT_CODE[exit_code], constants.RED) |
| return 'Unknown exit status' |
| |
| class AtestTradefedTestRunner(trb.TestRunnerBase): |
| """TradeFed Test Runner class.""" |
| NAME = 'AtestTradefedTestRunner' |
| EXECUTABLE = 'atest_tradefed.sh' |
| _TF_TEMPLATE = 'template/atest_local_min' |
| # Use --no-enable-granular-attempts to control reporter replay behavior. |
| # TODO(b/142630648): Enable option enable-granular-attempts |
| # in sharding mode. |
| _LOG_ARGS = ('--logcat-on-failure --{log_root_option_name}={log_path} ' |
| '{log_ext_option} ' |
| '--no-enable-granular-attempts ' |
| '--proto-output-file={proto_path}') |
| _RUN_CMD = ('{env} {exe} {template} ' |
| '--template:map test=atest ' |
| '--template:map log_saver={log_saver} ' |
| '{tf_customize_template} {log_args} {args}') |
| _BUILD_REQ = {'tradefed-core'} |
| _RERUN_OPTION_GROUP = [constants.ITERATIONS, |
| constants.RERUN_UNTIL_FAILURE, |
| constants.RETRY_ANY_FAILURE] |
| |
| def __init__(self, results_dir: str, |
| mod_info: module_info.ModuleInfo=None, **kwargs): |
| """Init stuff for base class.""" |
| super().__init__(results_dir, **kwargs) |
| self.module_info = mod_info |
| self.log_path = os.path.join(results_dir, LOG_FOLDER_NAME) |
| # (b/275537997) results_dir could be '' in test_runner_handler; only |
| # mkdir when it is invoked by run_tests. |
| if results_dir: |
| Path(self.log_path).mkdir(parents=True, exist_ok=True) |
| log_args = {'log_root_option_name': constants.LOG_ROOT_OPTION_NAME, |
| 'log_ext_option': constants.LOG_SAVER_EXT_OPTION, |
| 'log_path': self.log_path, |
| 'proto_path': os.path.join( |
| self.results_dir, |
| constants.ATEST_TEST_RECORD_PROTO)} |
| self.run_cmd_dict = {'env': '', |
| 'exe': self.EXECUTABLE, |
| 'template': self._TF_TEMPLATE, |
| 'log_saver': constants.ATEST_TF_LOG_SAVER, |
| 'tf_customize_template': '', |
| 'args': '', |
| 'log_args': self._LOG_ARGS.format(**log_args)} |
| if kwargs.get('extra_args', {}).get(constants.LD_LIBRARY_PATH, False): |
| self.run_cmd_dict.update({'env': self._get_ld_library_path()}) |
| self.is_verbose = logging.getLogger().isEnabledFor(logging.DEBUG) |
| self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) |
| |
| def _get_ld_library_path(self) -> str: |
| """Get the corresponding LD_LIBRARY_PATH string for running TF. |
| |
| This method will insert $ANDROID_HOST_OUT/{lib,lib64} to LD_LIBRARY_PATH |
| and returns the updated LD_LIBRARY_PATH. |
| |
| Returns: |
| Strings for the environment passed to TF. Currently only |
| LD_LIBRARY_PATH for TF to load the correct local shared libraries. |
| """ |
| out_dir = os.environ.get(constants.ANDROID_HOST_OUT, '') |
| # From b/188179058, if a 64bit tests, it will break the tests due to the |
| # elf format is not 64bit for the lib path. But for b/160741384, it is |
| # ok to load lib path first. Change the lib_dirs sequence to lib64 first |
| # due to ATest by default only testing the main abi and even a 32bit |
| # only target the lib64 folder is actually not exist. |
| lib_dirs = ['lib64', 'lib'] |
| path = ':'.join([os.path.join(out_dir, dir) for dir in lib_dirs]) |
| return f'LD_LIBRARY_PATH={path}:{os.getenv("LD_LIBRARY_PATH", "")}' |
| |
| def _try_set_gts_authentication_key(self): |
| """Set GTS authentication key if it is available or exists. |
| |
| Strategy: |
| Get APE_API_KEY from os.environ: |
| - If APE_API_KEY is already set by user -> do nothing. |
| Get the APE_API_KEY from constants: |
| - If the key file exists -> set to env var. |
| If APE_API_KEY isn't set and the key file doesn't exist: |
| - Warn user some GTS tests may fail without authentication. |
| """ |
| if os.environ.get('APE_API_KEY'): |
| logging.debug('APE_API_KEY is set by developer.') |
| return |
| ape_api_key = constants.GTS_GOOGLE_SERVICE_ACCOUNT |
| key_path = os.path.join(self.root_dir, ape_api_key) |
| if ape_api_key and os.path.exists(key_path): |
| logging.debug('Set APE_API_KEY: %s', ape_api_key) |
| os.environ['APE_API_KEY'] = key_path |
| else: |
| logging.debug('APE_API_KEY not set, some GTS tests may fail' |
| ' without authentication.') |
| |
| def run_tests(self, test_infos, extra_args, reporter): |
| """Run the list of test_infos. See base class for more. |
| |
| Args: |
| test_infos: A list of TestInfos. |
| extra_args: Dict of extra args to add to test run. |
| reporter: An instance of result_report.ResultReporter. |
| |
| Returns: |
| 0 if tests succeed, non-zero otherwise. |
| """ |
| reporter.log_path = self.log_path |
| reporter.rerun_options = self._extract_rerun_options(extra_args) |
| # Set google service key if it's available or found before |
| # running tests. |
| self._try_set_gts_authentication_key() |
| result = 0 |
| creds, inv = atest_gcp_utils.do_upload_flow(extra_args) |
| try: |
| verify_key = atest_utils.get_verify_key([test_infos[0].test_name], |
| extra_args) |
| if extra_args.get(constants.VERIFY_ENV_VARIABLE, False): |
| # check environment variables. |
| atest_utils.handle_test_env_var( |
| verify_key, result_path=constants.VERIFY_ENV_PATH) |
| return 0 |
| # Change CWD to repo root to ensure TF can find prebuilt SDKs |
| # for some path-sensitive tests like robolectric. |
| os.chdir(os.path.abspath(os.getenv(constants.ANDROID_BUILD_TOP))) |
| if os.getenv(trb.OLD_OUTPUT_ENV_VAR): |
| result = self.run_tests_raw(test_infos, extra_args, reporter) |
| result = self.run_tests_pretty(test_infos, extra_args, reporter) |
| except atest_error.DryRunVerificationError as e: |
| atest_utils.colorful_print(str(e), constants.RED) |
| return ExitCode.VERIFY_FAILURE |
| finally: |
| if inv: |
| try: |
| logging.disable(logging.INFO) |
| # Always set invocation status to completed due to the ATest |
| # handle whole process by its own. |
| inv['schedulerState'] = 'completed' |
| logstorage_utils.BuildClient(creds).update_invocation(inv) |
| reporter.test_result_link = (constants.RESULT_LINK |
| % inv['invocationId']) |
| finally: |
| logging.disable(logging.NOTSET) |
| return result |
| |
| def run_tests_raw(self, test_infos, extra_args, reporter): |
| """Run the list of test_infos. See base class for more. |
| |
| Args: |
| test_infos: A list of TestInfos. |
| extra_args: Dict of extra args to add to test run. |
| reporter: An instance of result_report.ResultReporter. |
| |
| Returns: |
| 0 if tests succeed, non-zero otherwise. |
| """ |
| iterations = self._generate_iterations(extra_args) |
| reporter.register_unsupported_runner(self.NAME) |
| |
| ret_code = ExitCode.SUCCESS |
| for _ in range(iterations): |
| run_cmds = self.generate_run_commands(test_infos, extra_args) |
| subproc = self.run(run_cmds[0], output_to_stdout=True, |
| env_vars=self.generate_env_vars(extra_args)) |
| ret_code |= self.wait_for_subprocess(subproc) |
| return ret_code |
| |
| def run_tests_pretty(self, test_infos, extra_args, reporter): |
| """Run the list of test_infos. See base class for more. |
| |
| Args: |
| test_infos: A list of TestInfos. |
| extra_args: Dict of extra args to add to test run. |
| reporter: An instance of result_report.ResultReporter. |
| |
| Returns: |
| 0 if tests succeed, non-zero otherwise. |
| """ |
| iterations = self._generate_iterations(extra_args) |
| ret_code = ExitCode.SUCCESS |
| for _ in range(iterations): |
| server = self._start_socket_server() |
| run_cmds = self.generate_run_commands(test_infos, extra_args, |
| server.getsockname()[1]) |
| subproc = self.run(run_cmds[0], output_to_stdout=self.is_verbose, |
| env_vars=self.generate_env_vars(extra_args)) |
| self.handle_subprocess(subproc, partial(self._start_monitor, |
| server, |
| subproc, |
| reporter, |
| extra_args)) |
| server.close() |
| ret_code |= self.wait_for_subprocess(subproc) |
| return ret_code |
| |
| # pylint: disable=too-many-branches |
| # pylint: disable=too-many-locals |
| def _start_monitor(self, server, tf_subproc, reporter, extra_args): |
| """Polling and process event. |
| |
| Args: |
| server: Socket server object. |
| tf_subproc: The tradefed subprocess to poll. |
| reporter: Result_Reporter object. |
| extra_args: Dict of extra args to add to test run. |
| """ |
| inputs = [server] |
| event_handlers = {} |
| data_map = {} |
| inv_socket = None |
| while inputs: |
| try: |
| readable, _, _ = select.select(inputs, [], [], SELECT_TIMEOUT) |
| for socket_object in readable: |
| if socket_object is server: |
| conn, addr = socket_object.accept() |
| logging.debug('Accepted connection from %s', addr) |
| conn.setblocking(False) |
| inputs.append(conn) |
| data_map[conn] = '' |
| # The First connection should be invocation |
| # level reporter. |
| if not inv_socket: |
| inv_socket = conn |
| else: |
| # Count invocation level reporter events |
| # without showing real-time information. |
| if inv_socket == socket_object: |
| reporter.silent = True |
| event_handler = event_handlers.setdefault( |
| socket_object, EventHandler(reporter, |
| self.NAME)) |
| else: |
| event_handler = event_handlers.setdefault( |
| socket_object, EventHandler( |
| result_reporter.ResultReporter( |
| collect_only=extra_args.get( |
| constants.COLLECT_TESTS_ONLY), |
| flakes_info=extra_args.get( |
| constants.FLAKES_INFO)), |
| |
| self.NAME)) |
| recv_data = self._process_connection(data_map, |
| socket_object, |
| event_handler) |
| if not recv_data: |
| inputs.remove(socket_object) |
| socket_object.close() |
| finally: |
| # Subprocess ended and all socket clients were closed. |
| if tf_subproc.poll() is not None and len(inputs) == 1: |
| inputs.pop().close() |
| if not reporter.all_test_results: |
| atest_utils.colorful_print( |
| r'No test to run. Test Logs have saved in ' |
| f'{reporter.log_path}.', |
| constants.RED, constants.WHITE) |
| if not data_map: |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.TF_EXIT_CODE, |
| result=tf_subproc.returncode) |
| raise TradeFedExitError(tf_subproc.returncode) |
| self._handle_log_associations(event_handlers) |
| |
| def _process_connection(self, data_map, conn, event_handler): |
| """Process a socket connection betwen TF and ATest. |
| |
| Expect data of form EVENT_NAME {JSON_DATA}. Multiple events will be |
| \n deliminated. Need to buffer data in case data exceeds socket |
| buffer. |
| E.q. |
| TEST_RUN_STARTED {runName":"hello_world_test","runAttempt":0}\n |
| TEST_STARTED {"start_time":2172917, "testName":"PrintHelloWorld"}\n |
| Args: |
| data_map: The data map of all connections. |
| conn: Socket connection. |
| event_handler: EventHandler object. |
| |
| Returns: |
| True if conn.recv() has data , False otherwise. |
| """ |
| # Set connection into blocking mode. |
| conn.settimeout(None) |
| data = conn.recv(SOCKET_BUFFER) |
| if isinstance(data, bytes): |
| data = data.decode() |
| logging.debug('received: %s', data) |
| if data: |
| data_map[conn] += data |
| while True: |
| match = EVENT_RE.match(data_map[conn]) |
| if not match: |
| break |
| try: |
| event_data = json.loads(match.group('json_data')) |
| except ValueError: |
| logging.debug('Json incomplete, wait for more data') |
| break |
| event_name = match.group('event_name') |
| event_handler.process_event(event_name, event_data) |
| data_map[conn] = data_map[conn][match.end():] |
| return bool(data) |
| |
| def _start_socket_server(self): |
| """Start a TCP server.""" |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| # Port 0 lets the OS pick an open port between 1024 and 65535. |
| server.bind((SOCKET_HOST, 0)) |
| server.listen(SOCKET_QUEUE_MAX) |
| server.settimeout(POLL_FREQ_SECS) |
| logging.debug('Socket server started on port %s', |
| server.getsockname()[1]) |
| return server |
| |
| def generate_env_vars(self, extra_args): |
| """Convert extra args into env vars.""" |
| env_vars = os.environ.copy() |
| if constants.TF_GLOBAL_CONFIG: |
| env_vars["TF_GLOBAL_CONFIG"] = constants.TF_GLOBAL_CONFIG |
| debug_port = extra_args.get(constants.TF_DEBUG, '') |
| if debug_port: |
| env_vars['TF_DEBUG'] = 'true' |
| env_vars['TF_DEBUG_PORT'] = str(debug_port) |
| filtered_paths = [] |
| for path in str(env_vars['PYTHONPATH']).split(':'): |
| # TODO (b/166216843) Remove the hacky PYTHON path workaround. |
| if (str(path).startswith('/tmp/Soong.python_') and |
| str(path).find('googleapiclient') > 0): |
| continue |
| filtered_paths.append(path) |
| env_vars['PYTHONPATH'] = ':'.join(filtered_paths) |
| |
| # Use prebuilt aapt if there's no aapt under android system path which |
| # is aligned with build system. |
| # https://android.googlesource.com/platform/build/+/master/core/config.mk#529 |
| if self._is_missing_exec(_AAPT): |
| prebuilt_aapt = Path.joinpath( |
| atest_utils.get_prebuilt_sdk_tools_dir(), _AAPT) |
| if os.path.exists(prebuilt_aapt): |
| env_vars['PATH'] = (str(prebuilt_aapt.parent) + ':' |
| + env_vars['PATH']) |
| return env_vars |
| |
| # pylint: disable=unnecessary-pass |
| # Please keep above disable flag to ensure host_env_check is overriden. |
| def host_env_check(self): |
| """Check that host env has everything we need. |
| |
| We actually can assume the host env is fine because we have the same |
| requirements that atest has. Update this to check for android env vars |
| if that changes. |
| """ |
| pass |
| |
| @staticmethod |
| def _is_missing_exec(executable): |
| """Check if system build executable is available. |
| |
| Args: |
| executable: Executable we are checking for. |
| |
| Returns: |
| True if executable is missing, False otherwise. |
| """ |
| output = shutil.which(executable) |
| if not output: |
| return True |
| # TODO: Check if there is a clever way to determine if system adb is |
| # good enough. |
| root_dir = os.environ.get(constants.ANDROID_BUILD_TOP, '') |
| return os.path.commonprefix([output, root_dir]) != root_dir |
| |
| def get_test_runner_build_reqs(self, test_infos: List[test_info.TestInfo]): |
| """Return the build requirements. |
| |
| Args: |
| test_infos: List of TestInfo. |
| |
| Returns: |
| Set of build targets. |
| """ |
| build_req = self._BUILD_REQ.copy() |
| # Use different base build requirements if google-tf is around. |
| if self.module_info.is_module(constants.GTF_MODULE): |
| build_req = {constants.GTF_TARGET} |
| # Always add ATest's own TF target. |
| build_req.add(constants.ATEST_TF_MODULE) |
| # Add adb if we can't find it. |
| for executable in EXEC_DEPENDENCIES: |
| if self._is_missing_exec(executable): |
| if self.module_info.is_module(executable): |
| build_req.add(executable) |
| |
| # Force rebuilt all jars under $ANDROID_HOST_OUT to prevent old version |
| # host jars break the test. |
| build_req |= self._get_host_framework_targets() |
| |
| build_req |= trb.gather_build_targets(test_infos) |
| return build_req |
| |
| def _get_host_framework_targets(self) -> Set[str]: |
| """Get the build targets for all the existing jars under host framework. |
| |
| Returns: |
| A set of build target name under $(ANDROID_HOST_OUT)/framework. |
| """ |
| host_targets = set() |
| if not self.module_info: |
| return host_targets |
| |
| framework_host_dir = Path( |
| os.environ.get(constants.ANDROID_HOST_OUT)).joinpath('framework') |
| if framework_host_dir.is_dir(): |
| jars = framework_host_dir.glob('*.jar') |
| for jar in jars: |
| if self.module_info.is_module(jar.stem): |
| host_targets.add(jar.stem) |
| logging.debug('Found exist host framework target:%s', host_targets) |
| return host_targets |
| |
| def _parse_extra_args(self, test_infos, extra_args): |
| """Convert the extra args into something tf can understand. |
| |
| Args: |
| extra_args: Dict of args |
| |
| Returns: |
| Tuple of args to append and args not supported. |
| """ |
| args_to_append, args_not_supported = extra_args_to_tf_args( |
| self.module_info, test_infos, extra_args) |
| |
| # Set exclude instant app annotation for non-instant mode run. |
| if (constants.INSTANT not in extra_args and |
| self._has_instant_app_config(test_infos, self.module_info)): |
| args_to_append.append(constants.TF_TEST_ARG) |
| args_to_append.append( |
| '{tf_class}:{option_name}:{option_value}'.format( |
| tf_class=constants.TF_AND_JUNIT_CLASS, |
| option_name=constants.TF_EXCLUDE_ANNOTATE, |
| option_value=constants.INSTANT_MODE_ANNOTATE)) |
| # Force append --enable-parameterized-modules if args_to_append has |
| # --module-parameter in args_to_append |
| if constants.TF_MODULE_PARAMETER in args_to_append: |
| if constants.TF_ENABLE_PARAMETERIZED_MODULES not in args_to_append: |
| args_to_append.append(constants.TF_ENABLE_PARAMETERIZED_MODULES) |
| # If all the test config has config with auto enable parameter, force |
| # exclude those default parameters(ex: instant_app, secondary_user) |
| # TODO: (b/228433541) Remove the limitation after the root cause fixed. |
| if (len(test_infos) <= 1 and |
| self._is_all_tests_parameter_auto_enabled(test_infos)): |
| if constants.TF_ENABLE_PARAMETERIZED_MODULES not in args_to_append: |
| args_to_append.append(constants.TF_ENABLE_PARAMETERIZED_MODULES) |
| for exclude_parameter in constants.DEFAULT_EXCLUDE_PARAS: |
| args_to_append.append('--exclude-module-parameters') |
| args_to_append.append(exclude_parameter) |
| return args_to_append, args_not_supported |
| |
| def _generate_metrics_folder(self, extra_args): |
| """Generate metrics folder.""" |
| metrics_folder = '' |
| if extra_args.get(constants.PRE_PATCH_ITERATIONS): |
| metrics_folder = os.path.join(self.results_dir, 'baseline-metrics') |
| elif extra_args.get(constants.POST_PATCH_ITERATIONS): |
| metrics_folder = os.path.join(self.results_dir, 'new-metrics') |
| return metrics_folder |
| |
| def _generate_iterations(self, extra_args): |
| """Generate iterations.""" |
| iterations = 1 |
| if extra_args.get(constants.PRE_PATCH_ITERATIONS): |
| iterations = extra_args.pop(constants.PRE_PATCH_ITERATIONS) |
| elif extra_args.get(constants.POST_PATCH_ITERATIONS): |
| iterations = extra_args.pop(constants.POST_PATCH_ITERATIONS) |
| return iterations |
| |
| def generate_run_commands(self, test_infos, extra_args, port=None): |
| """Generate a single run command from TestInfos. |
| |
| Args: |
| test_infos: A set of TestInfo instances. |
| extra_args: A Dict of extra args to append. |
| port: Optional. An int of the port number to send events to. If |
| None, then subprocess reporter in TF won't try to connect. |
| |
| Returns: |
| A list that contains the string of atest tradefed run command. |
| Only one command is returned. |
| """ |
| args = self._create_test_args(test_infos) |
| metrics_folder = self._generate_metrics_folder(extra_args) |
| |
| # Create a copy of args as more args could be added to the list. |
| test_args = list(args) |
| if port: |
| test_args.extend(['--subprocess-report-port', str(port)]) |
| if metrics_folder: |
| test_args.extend(['--metrics-folder', metrics_folder]) |
| logging.info('Saved metrics in: %s', metrics_folder) |
| if extra_args.get(constants.INVOCATION_ID, None): |
| test_args.append('--invocation-data invocation_id=%s' |
| % extra_args[constants.INVOCATION_ID]) |
| if extra_args.get(constants.WORKUNIT_ID, None): |
| test_args.append('--invocation-data work_unit_id=%s' |
| % extra_args[constants.WORKUNIT_ID]) |
| if extra_args.get(constants.LOCAL_BUILD_ID, None): |
| # TODO: (b/207584685) Replace with TF local build solutions. |
| test_args.append('--use-stub-build true') |
| test_args.append('--stub-build-id %s' |
| % extra_args[constants.LOCAL_BUILD_ID]) |
| test_args.append('--stub-build-target %s' |
| % extra_args[constants.BUILD_TARGET]) |
| if extra_args.get(constants.ENABLE_DEVICE_PREPARER, False): |
| test_args.append('--template:map preparers=%s' |
| % constants.DEVICE_SETUP_PREPARER) |
| for info in test_infos: |
| if constants.TEST_WITH_MAINLINE_MODULES_RE.match(info.test_name): |
| # TODO(b/253641058) Remove this once mainline module |
| # binaries are stored under testcase directory. |
| self._copy_mainline_module_binary(info.mainline_modules) |
| test_args.append(constants.TF_ENABLE_MAINLINE_PARAMETERIZED_MODULES) |
| break |
| # For detailed logs, set TF options log-level/log-level-display as |
| # 'VERBOSE' by default. |
| log_level = 'VERBOSE' |
| test_args.extend(['--log-level-display', log_level]) |
| test_args.extend(['--log-level', log_level]) |
| # Set no-early-device-release by default to speed up TF teardown time. |
| if not constants.TF_EARLY_DEVICE_RELEASE in extra_args: |
| test_args.extend(['--no-early-device-release']) |
| |
| args_to_add, args_not_supported = self._parse_extra_args( |
| test_infos, extra_args) |
| |
| # If multiple devices in test config, automatically append |
| # --replicate-parent-setup and --multi-device-count |
| device_count = atest_configs.GLOBAL_ARGS.device_count_config |
| if device_count and device_count > 1: |
| args_to_add.append('--replicate-parent-setup') |
| args_to_add.append('--multi-device-count') |
| args_to_add.append(str(device_count)) |
| os.environ.pop(constants.ANDROID_SERIAL, None) |
| else: |
| # TODO(b/122889707) Remove this after finding the root cause. |
| env_serial = os.environ.get(constants.ANDROID_SERIAL) |
| # Use the env variable ANDROID_SERIAL if it's set by user but only |
| # when the target tests are not deviceless tests. |
| if (env_serial and '--serial' not in args_to_add |
| and '-n' not in args_to_add): |
| args_to_add.append("--serial") |
| args_to_add.append(env_serial) |
| |
| test_args.extend(args_to_add) |
| if args_not_supported: |
| logging.info('%s does not support the following args %s', |
| self.EXECUTABLE, args_not_supported) |
| |
| # Only need to check one TestInfo to determine if the tests are |
| # configured in TEST_MAPPING. |
| for_test_mapping = test_infos and test_infos[0].from_test_mapping |
| test_args.extend(atest_utils.get_result_server_args(for_test_mapping)) |
| self.run_cmd_dict['args'] = ' '.join(test_args) |
| self.run_cmd_dict['tf_customize_template'] = ( |
| self._extract_customize_tf_templates(extra_args, test_infos)) |
| |
| # Copy symbols if there are tests belong to native test. |
| self._handle_native_tests(test_infos) |
| return [self._RUN_CMD.format(**self.run_cmd_dict)] |
| |
| def _flatten_test_infos(self, test_infos): |
| """Sort and group test_infos by module_name and sort and group filters |
| by class name. |
| |
| Example of three test_infos in a set: |
| Module1, {(classA, {})} |
| Module1, {(classB, {Method1})} |
| Module1, {(classB, {Method2}} |
| Becomes a set with one element: |
| Module1, {(ClassA, {}), (ClassB, {Method1, Method2})} |
| Where: |
| Each line is a test_info namedtuple |
| {} = Frozenset |
| () = TestFilter namedtuple |
| |
| Args: |
| test_infos: A set of TestInfo namedtuples. |
| |
| Returns: |
| A set of TestInfos flattened. |
| """ |
| results = set() |
| key = lambda x: x.test_name |
| for module, group in atest_utils.sort_and_group(test_infos, key): |
| # module is a string, group is a generator of grouped TestInfos. |
| # Module Test, so flatten test_infos: |
| no_filters = False |
| filters = set() |
| test_runner = None |
| test_finder = None |
| build_targets = set() |
| data = {} |
| module_args = [] |
| for test_info_i in group: |
| data.update(test_info_i.data) |
| # Extend data with constants.TI_MODULE_ARG instead of |
| # overwriting. |
| module_args.extend(test_info_i.data.get( |
| constants.TI_MODULE_ARG, [])) |
| test_runner = test_info_i.test_runner |
| test_finder = test_info_i.test_finder |
| build_targets |= test_info_i.build_targets |
| test_filters = test_info_i.data.get(constants.TI_FILTER) |
| if not test_filters or no_filters: |
| # test_info wants whole module run, so hardcode no filters. |
| no_filters = True |
| filters = set() |
| continue |
| filters |= test_filters |
| if module_args: |
| data[constants.TI_MODULE_ARG] = module_args |
| data[constants.TI_FILTER] = self._flatten_test_filters(filters) |
| results.add( |
| test_info.TestInfo(test_name=module, |
| test_runner=test_runner, |
| test_finder=test_finder, |
| build_targets=build_targets, |
| data=data)) |
| return results |
| |
| @staticmethod |
| def _flatten_test_filters(filters): |
| """Sort and group test_filters by class_name. |
| |
| Example of three test_filters in a frozenset: |
| classA, {} |
| classB, {Method1} |
| classB, {Method2} |
| Becomes a frozenset with these elements: |
| classA, {} |
| classB, {Method1, Method2} |
| Where: |
| Each line is a TestFilter namedtuple |
| {} = Frozenset |
| |
| Args: |
| filters: A frozenset of test_filters. |
| |
| Returns: |
| A frozenset of test_filters flattened. |
| """ |
| results = set() |
| key = lambda x: x.class_name |
| for class_name, group in atest_utils.sort_and_group(filters, key): |
| # class_name is a string, group is a generator of TestFilters |
| assert class_name is not None |
| methods = set() |
| for test_filter in group: |
| if not test_filter.methods: |
| # Whole class should be run |
| methods = set() |
| break |
| methods |= test_filter.methods |
| results.add(test_info.TestFilter(class_name, frozenset(methods))) |
| return frozenset(results) |
| |
| def _is_all_tests_parameter_auto_enabled(self, test_infos): |
| """Check if all the test infos are parameter auto enabled. |
| |
| Args: |
| test_infos: A set of TestInfo instances. |
| |
| Returns: True if all tests are parameter auto enabled, False otherwise. |
| """ |
| for info in test_infos: |
| if not self._is_parameter_auto_enabled_cfg(info, self.module_info): |
| return False |
| return True |
| |
| def _create_test_args(self, test_infos): |
| """Compile TF command line args based on the given test infos. |
| |
| Args: |
| test_infos: A set of TestInfo instances. |
| |
| Returns: A list of TF arguments to run the tests. |
| """ |
| args = [] |
| if not test_infos: |
| return [] |
| |
| test_infos = self._flatten_test_infos(test_infos) |
| has_integration_test = False |
| |
| # Because current --include-filter arg will not working if ATest pass |
| # both --module and --include-filter to TF, only test by --module will |
| # be run. Make a check first, only use --module if all tests are all |
| # parameter auto enabled. |
| # Only auto-enable the parameter if there's only one test. |
| # TODO: (b/228433541) Remove the limitation after the root cause fixed. |
| use_module_arg = False |
| if len(test_infos) <= 1: |
| use_module_arg = self._is_all_tests_parameter_auto_enabled( |
| test_infos) |
| |
| for info in test_infos: |
| # Integration test exists in TF's jar, so it must have the option |
| # if it's integration finder. |
| if info.test_finder in _INTEGRATION_FINDERS: |
| has_integration_test = True |
| # For non-paramertize test module, use --include-filter, but for |
| # tests which have auto enable paramertize config use --module |
| # instead. |
| if (use_module_arg |
| and self._is_parameter_auto_enabled_cfg( |
| info, self.module_info)): |
| args.extend([constants.TF_MODULE_FILTER, info.test_name]) |
| else: |
| args.extend([constants.TF_INCLUDE_FILTER, info.test_name]) |
| for option in info.data.get(constants.TI_MODULE_ARG, []): |
| if constants.TF_INCLUDE_FILTER_OPTION == option[0]: |
| suite_filter = ( |
| constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format( |
| test_name=info.test_name, option_value=option[1])) |
| args.extend([constants.TF_INCLUDE_FILTER, suite_filter]) |
| elif constants.TF_EXCLUDE_FILTER_OPTION == option[0]: |
| suite_filter = ( |
| constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format( |
| test_name=info.test_name, option_value=option[1])) |
| args.extend([constants.TF_EXCLUDE_FILTER, suite_filter]) |
| else: |
| module_arg = ( |
| constants.TF_MODULE_ARG_VALUE_FMT.format( |
| test_name=info.test_name, option_name=option[0], |
| option_value=option[1])) |
| args.extend([constants.TF_MODULE_ARG, module_arg]) |
| |
| # Add ATest include filter |
| args.extend(get_include_filter(test_infos)) |
| |
| # TODO (b/141090547) Pass the config path to TF to load configs. |
| # Compile option in TF if finder is not INTEGRATION or not set. |
| if not has_integration_test: |
| args.append(constants.TF_SKIP_LOADING_CONFIG_JAR) |
| return args |
| |
| def _extract_rerun_options(self, extra_args): |
| """Extract rerun options to a string for output. |
| |
| Args: |
| extra_args: Dict of extra args for test runners to use. |
| |
| Returns: A string of rerun options. |
| """ |
| extracted_options = ['{} {}'.format(arg, extra_args[arg]) |
| for arg in extra_args |
| if arg in self._RERUN_OPTION_GROUP] |
| return ' '.join(extracted_options) |
| |
| def _extract_customize_tf_templates(self, extra_args, test_infos): |
| """Extract tradefed template options to a string for output. |
| |
| Args: |
| extra_args: Dict of extra args for test runners to use. |
| test_infos: A set of TestInfo instances. |
| |
| Returns: A string of tradefed template options. |
| """ |
| tf_templates = extra_args.get(constants.TF_TEMPLATE, []) |
| tf_template_keys = [i.split('=')[0] for i in tf_templates] |
| for info in test_infos: |
| if (info.aggregate_metrics_result |
| and 'metric_post_processor' not in tf_template_keys): |
| template_key = 'metric_post_processor' |
| template_value = ( |
| 'google/template/postprocessors/metric-file-aggregate') |
| tf_templates.append(f'{template_key}={template_value}') |
| return ' '.join(['--template:map %s' % x for x in tf_templates]) |
| |
| def _handle_log_associations(self, event_handlers): |
| """Handle TF's log associations information data. |
| |
| log_association dict: |
| {'loggedFile': '/tmp/serial-util11375755456514097276.ser', |
| 'dataName': 'device_logcat_setup_127.0.0.1:58331', |
| 'time': 1602038599.856113}, |
| |
| Args: |
| event_handlers: Dict of {socket_object:EventHandler}. |
| |
| """ |
| log_associations = [] |
| for _, event_handler in event_handlers.items(): |
| if event_handler.log_associations: |
| log_associations += event_handler.log_associations |
| device_test_end_log_time = '' |
| device_teardown_log_time = '' |
| for log_association in log_associations: |
| if 'device_logcat_test' in log_association.get('dataName', ''): |
| device_test_end_log_time = log_association.get('time') |
| if 'device_logcat_teardown' in log_association.get('dataName', ''): |
| device_teardown_log_time = log_association.get('time') |
| if device_test_end_log_time and device_teardown_log_time: |
| teardowntime = (float(device_teardown_log_time) - |
| float(device_test_end_log_time)) |
| logging.debug('TF logcat teardown time=%s seconds.', teardowntime) |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.TF_TEARDOWN_LOGCAT, |
| result=int(teardowntime)) |
| |
| @staticmethod |
| def _has_instant_app_config(test_infos, mod_info): |
| """Check if one of the input tests defined instant app mode in config. |
| |
| Args: |
| test_infos: A set of TestInfo instances. |
| mod_info: ModuleInfo object. |
| |
| Returns: True if one of the tests set up instant app mode. |
| """ |
| for tinfo in test_infos: |
| test_config, _ = test_finder_utils.get_test_config_and_srcs( |
| tinfo, mod_info) |
| if test_config: |
| parameters = atest_utils.get_config_parameter(test_config) |
| if constants.TF_PARA_INSTANT_APP in parameters: |
| return True |
| return False |
| |
| @staticmethod |
| def _is_parameter_auto_enabled_cfg(tinfo, mod_info): |
| """Check if input tests contains auto enable support parameters. |
| |
| Args: |
| test_infos: A set of TestInfo instances. |
| mod_info: ModuleInfo object. |
| |
| Returns: True if input test has parameter setting which is not in the |
| exclude list. |
| """ |
| test_config, _ = test_finder_utils.get_test_config_and_srcs( |
| tinfo, mod_info) |
| if test_config: |
| parameters = atest_utils.get_config_parameter(test_config) |
| if (parameters - constants.DEFAULT_EXCLUDE_PARAS |
| - constants.DEFAULT_EXCLUDE_NOT_PARAS): |
| return True |
| return False |
| |
| def _handle_native_tests(self, test_infos): |
| """Handling some extra tasks for running native tests from tradefed. |
| |
| Args: |
| test_infos: A set of TestInfo instances. |
| """ |
| for tinfo in test_infos: |
| test_config, _ = test_finder_utils.get_test_config_and_srcs( |
| tinfo, self.module_info) |
| if test_config: |
| module_name, device_path = atest_utils.get_config_gtest_args( |
| test_config) |
| if module_name and device_path: |
| atest_utils.copy_native_symbols(module_name, device_path) |
| |
| # TODO(b/253641058) remove copying files once mainline module |
| # binaries are stored under testcase directory. |
| def _copy_mainline_module_binary(self, mainline_modules): |
| """Copies mainline module binaries to out/dist/mainline_modules_{arch} |
| |
| Copies the mainline module binaries to the location that |
| MainlineModuleHandler in TF expects since there is no way to |
| explicitly tweak the search path. |
| |
| Args: |
| mainline_modules: A list of mainline modules. |
| """ |
| config = atest_utils.get_android_config() |
| arch = config.get('TARGET_ARCH') |
| dest_dir = atest_utils.DIST_OUT_DIR.joinpath(f'mainline_modules_{arch}') |
| dest_dir.mkdir(parents=True, exist_ok=True) |
| |
| for module in mainline_modules: |
| target_module_info = self.module_info.get_module_info(module) |
| installed_paths = target_module_info[constants.MODULE_INSTALLED] |
| |
| for installed_path in installed_paths: |
| if not re.search(atest_utils.MAINLINE_MODULES_EXT_RE, installed_path): |
| atest_utils.colorful_print( |
| '%s is not a apk or apex file. Did you run mainline ' |
| 'local setup script? Please refer to %s' % |
| (installed_path, MAINLINE_LOCAL_DOC), |
| constants.YELLOW) |
| continue |
| file_name = Path(installed_path).name |
| dest_path = Path(dest_dir).joinpath(file_name) |
| if dest_path.exists(): |
| atest_utils.colorful_print( |
| 'Replacing APEX in %s with %s' % (dest_path, installed_path), |
| constants.CYAN) |
| logging.debug( |
| 'deleting the old file: %s and copy a new binary', |
| dest_path) |
| dest_path.unlink() |
| shutil.copyfile(installed_path, dest_path) |
| |
| break |
| |
| def generate_annotation_filter_args( |
| arg_value: Any, mod_info: module_info.ModuleInfo, |
| test_infos: List[test_info.TestInfo]) -> List[str]: |
| """Generate TF annotation filter arguments. |
| |
| Args: |
| arg_value: Argument value for annotation filter. |
| mod_info: ModuleInfo object. |
| test_infos: A set of TestInfo instances. |
| |
| Returns: |
| List of TF annotation filter arguments. |
| """ |
| annotation_filter_args = [] |
| for info in test_infos: |
| test_name = info.test_name |
| for keyword in arg_value: |
| annotation = atest_utils.get_full_annotation_class_name( |
| mod_info.get_module_info(test_name), keyword) |
| if annotation: |
| module_arg = (constants.TF_MODULE_ARG_VALUE_FMT.format( |
| test_name=test_name, |
| option_name=constants.INCLUDE_ANNOTATION, |
| option_value=annotation)) |
| annotation_filter_args.extend([constants.TF_MODULE_ARG, module_arg]) |
| logging.error( |
| atest_utils.colorize( |
| f'Cannot find similar annotation: {keyword}', |
| constants.RED)) |
| return annotation_filter_args |
| |
| |
| def extra_args_to_tf_args( |
| mod_info: module_info.ModuleInfo, |
| test_infos: List[test_info.TestInfo], |
| extra_args: Dict[str, Any], |
| ) -> Tuple[Dict[str, Any], Dict[str, Any]]: |
| """Convert the extra args into atest_tf_test_runner supported args. |
| |
| Args: |
| mod_info: ModuleInfo object. |
| test_infos: A set of TestInfo instances. |
| extra_args: Dict of args |
| |
| Returns: |
| Tuple of ARGS that atest_tf supported and not supported. |
| """ |
| supported_args = [] |
| unsupported_args = [] |
| |
| def constant_list(*value): |
| return lambda *_: value |
| |
| # pylint: disable=unused-argument |
| def print_message(message): |
| def inner(*args): |
| print(message) |
| return [] |
| return inner |
| |
| # Mapping supported TF arguments to the processing function. |
| supported_tf_args = dict({ |
| constants.WAIT_FOR_DEBUGGER: |
| constant_list('--wait-for-debugger'), |
| constants.DISABLE_INSTALL: |
| constant_list('--disable-target-preparers'), |
| constants.SERIAL: |
| lambda arg_value, *_: |
| [j for d in arg_value for j in ('--serial', d)], |
| constants.SHARDING: |
| lambda arg_value, *_: ['--shard-count', |
| str(arg_value)], |
| constants.DISABLE_TEARDOWN: |
| constant_list('--disable-teardown'), |
| constants.HOST: |
| constant_list('-n', '--prioritize-host-config', |
| '--skip-host-arch-check'), |
| constants.CUSTOM_ARGS: |
| # custom args value is a list. |
| lambda arg_value, *_: arg_value, |
| constants.ALL_ABI: |
| constant_list('--all-abi'), |
| constants.INSTANT: |
| constant_list(constants.TF_ENABLE_PARAMETERIZED_MODULES, |
| constants.TF_MODULE_PARAMETER, 'instant_app'), |
| constants.USER_TYPE: |
| lambda arg_value, *_: [ |
| constants.TF_ENABLE_PARAMETERIZED_MODULES, |
| '--enable-optional-parameterization', |
| constants.TF_MODULE_PARAMETER, |
| str(arg_value) |
| ], |
| constants.ITERATIONS: |
| lambda arg_value, *_: [ |
| '--retry-strategy', constants.ITERATIONS, |
| '--max-testcase-run-count', str(arg_value) |
| ], |
| constants.RERUN_UNTIL_FAILURE: |
| lambda arg_value, *_: [ |
| '--retry-strategy', constants.RERUN_UNTIL_FAILURE, |
| '--max-testcase-run-count', str(arg_value) |
| ], |
| constants.RETRY_ANY_FAILURE: |
| lambda arg_value, *_: [ |
| '--retry-strategy', constants.RETRY_ANY_FAILURE, |
| '--max-testcase-run-count', str(arg_value) |
| ], |
| constants.COLLECT_TESTS_ONLY: |
| constant_list('--collect-tests-only'), |
| constants.NO_ENABLE_ROOT: |
| constant_list('--no-enable-root'), |
| constants.TF_DEBUG: |
| print_message("Please attach process to your IDE..."), |
| constants.ANNOTATION_FILTER: |
| generate_annotation_filter_args, |
| constants.TEST_FILTER: |
| lambda arg_value, *_: [ |
| '--test-arg', |
| 'com.android.tradefed.testtype.AndroidJUnitTest:' |
| f'include-filter:{arg_value}', |
| '--test-arg', |
| 'com.android.tradefed.testtype.GTest:native-test-flag:' |
| f'--gtest_filter={arg_value}', |
| '--test-arg', |
| 'com.android.tradefed.testtype.HostGTest:native-test-flag:' |
| f'--gtest_filter={arg_value}' |
| ], |
| constants.TEST_TIMEOUT: |
| lambda arg_value, *_: [ |
| '--test-arg', |
| 'com.android.tradefed.testtype.AndroidJUnitTest:' |
| f'shell-timeout:{arg_value}', |
| '--test-arg', |
| 'com.android.tradefed.testtype.AndroidJUnitTest:' |
| f'test-timeout:{arg_value}', |
| '--test-arg', |
| 'com.android.tradefed.testtype.HostGTest:' |
| f'native-test-timeout:{arg_value}', |
| '--test-arg', |
| 'com.android.tradefed.testtype.GTest:' |
| f'native-test-timeout:{arg_value}', |
| ], |
| constants.COVERAGE: coverage.tf_args, |
| }) |
| |
| for arg in extra_args: |
| if arg in supported_tf_args: |
| tf_args = supported_tf_args[arg](extra_args[arg], mod_info, |
| test_infos) |
| if tf_args: |
| supported_args.extend(tf_args) |
| continue |
| |
| if arg in (constants.TF_TEMPLATE, |
| constants.TF_EARLY_DEVICE_RELEASE, |
| constants.INVOCATION_ID, |
| constants.WORKUNIT_ID, |
| constants.REQUEST_UPLOAD_RESULT, |
| constants.DISABLE_UPLOAD_RESULT, |
| constants.LOCAL_BUILD_ID, |
| constants.BUILD_TARGET, |
| constants.ENABLE_DEVICE_PREPARER, |
| constants.DRY_RUN, |
| constants.VERIFY_ENV_VARIABLE, |
| constants.FLAKES_INFO, |
| constants.LD_LIBRARY_PATH, |
| constants.DEVICE_ONLY): |
| continue |
| unsupported_args.append(arg) |
| return supported_args, unsupported_args |
| |
| def get_include_filter(test_infos: List[test_info.TestInfo]) -> List[str]: |
| """Generate a list of tradefed filter argument from TestInfos. |
| |
| The tradefed argument format should be: |
| atest-include-filter <module-name>:<include-filter-value> |
| """ |
| tf_args = [] |
| for info in test_infos: |
| filters = set() |
| for test_info_filter in info.data.get(constants.TI_FILTER, []): |
| filters.update(test_info_filter.to_set_of_tf_strings()) |
| for test_filter in filters: |
| filter_arg = constants.TF_ATEST_INCLUDE_FILTER_VALUE_FMT.format( |
| test_name=info.test_name, test_filter=test_filter) |
| tf_args.extend([constants.TF_ATEST_INCLUDE_FILTER, filter_arg]) |
| return tf_args |