blob: 1e363192e1b86c5cb5bff8ffc5c6565ec2891d8a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2023, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module to facilitate integration test within the build and test environment.
This module provides utilities for running tests in both build and test
environments, managing environment variables, and snapshotting the workspace for
restoration later.
"""
import argparse
import atexit
from concurrent.futures import ThreadPoolExecutor
import copy
import datetime
import logging
import multiprocessing
import os
from pathlib import Path
import shutil
import subprocess
import sys
import tarfile
import tempfile
import time
import traceback
from typing import Any, Callable
import unittest
import zipfile
from snapshot import Snapshot
# Env key for the storage tar path.
SNAPSHOT_STORAGE_TAR_KEY = 'SNAPSHOT_STORAGE_TAR_PATH'
# Env key for the repo root
ANDROID_BUILD_TOP_KEY = 'ANDROID_BUILD_TOP'
class IntegrationTestConfiguration:
"""Internal class to store integration test configuration."""
device_serial: str = None
is_build_env: bool = False
is_test_env: bool = False
is_device_serial_required = True
snapshot_storage_path: Path = None
snapshot_storage_tar_path: Path = None
workspace_path: Path = None
is_tar_snapshot: bool = False
class StepInput:
"""Input information for a build/test step."""
def __init__(self, env, repo_root, config, objs):
self._env = env
self._repo_root = repo_root
self._config = config
self._objs = objs
def get_device_serial_args_or_empty(self) -> str:
"""Gets command arguments for device serial. May return empty string."""
if self._config.device_serial:
return ' -s ' + self._config.device_serial
if self._config.is_device_serial_required:
raise RuntimeError('Device serial is required but not set')
return ''
def get_device_serial(self) -> str:
"""Returns the serial of the connected device. Throws if not set."""
if not self._config.device_serial:
raise RuntimeError('Device serial is not set')
return self._config.device_serial
def get_env(self):
"""Get environment variables."""
return self._env
def get_repo_root(self) -> str:
"""Get repo root directory."""
return self._repo_root
def get_obj(self, name: str) -> Any:
"""Get an object saved in previous snapshot."""
return self._objs.get(name, None)
def get_config(self) -> IntegrationTestConfiguration:
"""Get the integration test configuration."""
return self._config
class StepOutput:
"""Output information generated from a build step."""
def __init__(self):
self._snapshot_include_paths: list[str] = []
self._snapshot_exclude_paths: list[str] = []
self._snapshot_env_keys: list[str] = []
self._snapshot_objs: dict[str, Any] = {}
def add_snapshot_include_paths(self, paths: list[str]) -> None:
"""Add paths to include in snapshot artifacts."""
self._snapshot_include_paths.extend(paths)
def set_snapshot_include_paths(self, paths: list[str]) -> None:
"""Set the snapshot include paths.
Note that the default include paths will be removed.
Use add_snapshot_include_paths if that's not intended.
"""
self._snapshot_include_paths.clear()
self._snapshot_include_paths.extend(paths)
def add_snapshot_exclude_paths(self, paths: list[str]) -> None:
"""Add paths to exclude from snapshot artifacts."""
self._snapshot_exclude_paths.extend(paths)
def add_snapshot_env_keys(self, keys: list[str]) -> None:
"""Add environment variable keys for snapshot."""
self._snapshot_env_keys.extend(keys)
def add_snapshot_obj(self, name: str, obj: Any):
"""Add objects to save in snapshot."""
self._snapshot_objs[name] = obj
def get_snapshot_include_paths(self):
"""Returns the stored snapshot include path list."""
return self._snapshot_include_paths
def get_snapshot_exclude_paths(self):
"""Returns the stored snapshot exclude path list."""
return self._snapshot_exclude_paths
def get_snapshot_env_keys(self):
"""Returns the stored snapshot env key list."""
return self._snapshot_env_keys
def get_snapshot_objs(self):
"""Returns the stored snapshot object dictionary."""
return self._snapshot_objs
class SplitBuildTestScript:
"""Utility for running integration test in build and test environment."""
def __init__(self, name: str, config: IntegrationTestConfiguration) -> None:
self._config = config
self._id: str = name
self._snapshot: Snapshot = Snapshot(self._config.snapshot_storage_path)
self._has_already_run: bool = False
self._steps: list[self._Step] = []
self._snapshot_restore_exclude_paths: list[str] = []
def add_build_step(self, step_func: Callable[StepInput, StepOutput]):
"""Add a build step.
Args:
step_func: A function that takes a StepInput object and returns a
StepOutput object.
"""
if self._steps and isinstance(self._steps[-1], self._BuildStep):
raise RuntimeError(
'Two adjacent build steps are unnecessary. Combine them.'
)
self._steps.append(self._BuildStep(step_func))
def add_test_step(self, step_func: Callable[StepInput, None]):
"""Add a test step.
Args:
step_func: A function that takes a StepInput object.
"""
if not self._steps or isinstance(self._steps[-1], self._TestStep):
raise RuntimeError('A build step is required before a test step.')
self._steps.append(self._TestStep(step_func))
def _exception_to_dict(self, exception: Exception):
"""Converts an exception object to a dictionary to be saved by json."""
return {
'type': exception.__class__.__name__,
'message': str(exception),
'traceback': ''.join(traceback.format_tb(exception.__traceback__)),
}
def _dict_to_exception(self, exception_dict: dict[str, str]):
"""Converts a dictionary to an exception object."""
return RuntimeError(
'The last build step raised an exception:\n'
f'{exception_dict["type"]}: {exception_dict["message"]}\n'
'Traceback (from saved snapshot):\n'
f'{exception_dict["traceback"]}'
)
def run(self):
"""Run the steps added previously.
This function cannot be executed more than once.
"""
if self._has_already_run:
raise RuntimeError(f'Script {self.name} has already run.')
self._has_already_run = True
build_step_exception_key = '_internal_build_step_exception'
for index, step in enumerate(self._steps):
if isinstance(step, self._BuildStep) and self._config.is_build_env:
step_in = StepInput(
os.environ,
self._get_repo_root(os.environ),
self._config,
{},
)
last_exception = None
try:
step_out = step.get_step_func()(step_in)
# pylint: disable=broad-exception-caught
except Exception as e:
last_exception = e
step_out = StepOutput()
step_out.add_snapshot_obj(
build_step_exception_key, self._exception_to_dict(e)
)
self._take_snapshot(
self._get_repo_root(os.environ),
self._id + '_' + str(index // 2),
step_out,
)
if last_exception:
raise last_exception
if isinstance(step, self._TestStep) and self._config.is_test_env:
env, objs = self._restore_snapshot(
self._id + '_' + str(index // 2)
)
if build_step_exception_key in objs:
raise self._dict_to_exception(
objs[build_step_exception_key]
)
step_in = StepInput(
env,
self._get_repo_root(env),
self._config,
objs,
)
step.get_step_func()(step_in)
def add_snapshot_restore_exclude_paths(self, paths: list[str]) -> None:
"""Add paths to ignore during snapshot directory restore."""
self._snapshot_restore_exclude_paths.extend(paths)
def _take_snapshot(
self, repo_root: str, name: str, step_out: StepOutput
) -> None:
"""Take a snapshot of the repository and environment."""
self._snapshot.take_snapshot(
name,
repo_root,
step_out.get_snapshot_include_paths(),
step_out.get_snapshot_exclude_paths(),
step_out.get_snapshot_env_keys(),
step_out.get_snapshot_objs(),
)
def _restore_snapshot(self, name: str) -> None:
"""Restore the repository and environment from a snapshot."""
return self._snapshot.restore_snapshot(
name,
self._config.workspace_path.as_posix(),
exclude_paths=self._snapshot_restore_exclude_paths,
)
def _get_repo_root(self, env) -> str:
"""Get repo root directory."""
if self._config.is_build_env:
return os.environ[ANDROID_BUILD_TOP_KEY]
return env[ANDROID_BUILD_TOP_KEY]
class _Step:
"""Parent class to build step and test step for typing declaration."""
class _BuildStep(_Step):
def __init__(self, step_func: Callable[StepInput, StepOutput]):
self._step_func = step_func
def get_step_func(self) -> Callable[StepInput, StepOutput]:
"""Returns the stored step function for build."""
return self._step_func
class _TestStep(_Step):
def __init__(self, step_func: Callable[StepInput, None]):
self._step_func = step_func
def get_step_func(self) -> Callable[StepInput, None]:
"""Returns the stored step function for test."""
return self._step_func
class SplitBuildTestTestCase(unittest.TestCase):
"""Base test case class for split build-test scripting tests."""
# Internal config to be injected to the test case from main.
injected_config: IntegrationTestConfiguration = None
def create_split_build_test_script(self, name: str) -> SplitBuildTestScript:
"""Return an instance of SplitBuildTestScript with the given name.
Args:
name: The name of the script. The name will be used to store
snapshots and tt's recommended to set the name to self.id()in most
cases.
"""
return SplitBuildTestScript(name, self.injected_config)
class _FileCompressor:
"""Class for compressing and decompressing files."""
def compress_all_sub_files(self, root_path: Path) -> None:
"""Compresses all files in the given directory and subdirectories.
Args:
root_path: Path to the root directory.
"""
cpu_count = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
for file_path in root_path.rglob('*'):
if file_path.is_file():
executor.submit(self.compress_file, file_path)
def compress_file(self, file_path: Path) -> None:
"""Compresses a single file to zip.
Args:
file_path: Path to the file to compress.
"""
with zipfile.ZipFile(
file_path.with_suffix('.zip'), 'w', zipfile.ZIP_DEFLATED
) as zip_file:
zip_file.write(file_path, arcname=file_path.name)
file_path.unlink()
def decompress_all_sub_files(self, root_path: Path) -> None:
"""Decompresses all compressed sub files in the given directory.
Args:
root_path: Path to the root directory.
"""
cpu_count = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
for file_path in root_path.rglob('*.zip'):
executor.submit(self.decompress_file, file_path)
def decompress_file(self, file_path: Path) -> None:
"""Decompresses a single zip file.
Args:
file_path: Path to the compressed file.
"""
with zipfile.ZipFile(file_path, 'r') as zip_file:
zip_file.extractall(file_path.parent)
file_path.unlink()
def _configure_logging(verbose: bool, log_file_dir_path: Path):
"""Configure the logger.
Args:
verbose: If true display DEBUG level logs on console.
log_file_dir_path: A directory which stores the log file.
"""
log_file = log_file_dir_path.joinpath('asuite_integration_tests.log')
if log_file.exists():
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
log_file = log_file_dir_path.joinpath(
f'asuite_integration_tests_{timestamp}.log'
)
log_format = (
'%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s'
)
date_format = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(
filename=log_file.as_posix(),
level=logging.DEBUG,
format=log_format,
datefmt=date_format,
)
console = logging.StreamHandler()
console.name = 'console'
console.setLevel(logging.INFO)
if verbose:
console.setLevel(logging.DEBUG)
console.setFormatter(logging.Formatter(log_format))
logging.getLogger('').addHandler(console)
def _parse_known_args(
argv: list[str],
argparser_update_func: Callable[argparse.ArgumentParser, None] = None,
) -> tuple[argparse.Namespace, list[str]]:
"""Parse command line args and check required args being provided."""
description = """A script to build and/or run the Asuite integration tests.
Usage examples:
python <script_path>: Runs both the build and test steps.
python <script_path> -b -t: Runs both the build and test steps.
python <script_path> -b: Runs only the build steps.
python <script_path> -t: Runs only the test steps.
"""
parser = argparse.ArgumentParser(
add_help=True,
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'-b',
'--build',
action='store_true',
default=False,
help=(
'Run build steps. Can be set to true together with the test option.'
' If both build and test are unset, will run both steps.'
),
)
parser.add_argument(
'-t',
'--test',
action='store_true',
default=False,
help=(
'Run test steps. Can be set to true together with the build option.'
' If both build and test are unset, will run both steps.'
),
)
parser.add_argument(
'--tar_snapshot',
action='store_true',
default=False,
help=(
'Whether to tar and untar the snapshot storage into/from a single'
' file.'
),
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
default=False,
help='Whether to set log level to verbose.',
)
# The below flags are passed in by the TF Python test runner.
parser.add_argument(
'-s',
'--serial',
help=(
'The device serial. Required in test mode when ANDROID_BUILD_TOP is'
' not set.'
),
)
parser.add_argument(
'--test-output-file',
help=(
'The file in which to store the unit test results. This option is'
' usually set by TradeFed when running the script with python and'
' is optional during manual script execution.'
),
)
if argparser_update_func:
argparser_update_func(parser)
return parser.parse_known_args(argv)
def _run_test(
config: IntegrationTestConfiguration,
argv: list[str],
test_output_file_path: str = None,
) -> None:
"""Execute integration tests with given test configuration."""
compressor = _FileCompressor()
def cleanup() -> None:
if config.workspace_path.exists():
shutil.rmtree(config.workspace_path)
if config.snapshot_storage_path.exists():
shutil.rmtree(config.snapshot_storage_path)
if config.is_test_env and config.is_tar_snapshot:
if not config.snapshot_storage_tar_path.exists():
raise EnvironmentError(
f'Snapshot tar {config.snapshot_storage_tar_path} does not'
' exist. Have you run the build mode with --tar_snapshot'
' option enabled?'
)
with tarfile.open(config.snapshot_storage_tar_path, 'r') as tar:
tar.extractall(config.snapshot_storage_path.parent.as_posix())
logging.info(
'Decompressing the snapshot storage with %s threads...',
multiprocessing.cpu_count(),
)
start_time = time.time()
compressor.decompress_all_sub_files(config.snapshot_storage_path)
logging.info(
'Decompression finished in {:.2f} seconds'.format(
time.time() - start_time
)
)
atexit.register(cleanup)
def unittest_main(stream=None):
# Note that we use a type and not an instance for 'testRunner'
# since TestProgram forwards its constructor arguments when creating
# an instance of the runner type. Not doing so would require us to
# make sure that the parameters passed to TestProgram are aligned
# with those for creating a runner instance.
class TestRunner(unittest.TextTestRunner):
"""Writes test results to the TF-provided file."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(stream=stream, *args, **kwargs)
class TestLoader(unittest.TestLoader):
"""Injects the test configuration to the test classes."""
def loadTestsFromTestCase(self, *args, **kwargs):
test_suite = super().loadTestsFromTestCase(*args, **kwargs)
for test in test_suite:
test.injected_config = config
return test_suite
# Setting verbosity is required to generate output that the TradeFed
# test runner can parse.
unittest.main(
testRunner=TestRunner,
verbosity=3,
argv=argv,
testLoader=TestLoader(),
exit=config.is_test_env,
)
if test_output_file_path:
Path(test_output_file_path).parent.mkdir(exist_ok=True)
with open(
test_output_file_path, 'w', encoding='utf-8'
) as test_output_file:
unittest_main(stream=test_output_file)
else:
unittest_main(stream=None)
if config.is_build_env and config.is_tar_snapshot:
logging.info(
'Compressing the snapshot storage with %s threads...',
multiprocessing.cpu_count(),
)
start_time = time.time()
compressor.compress_all_sub_files(config.snapshot_storage_path)
logging.info(
'Compression finished in {:.2f} seconds'.format(
time.time() - start_time
)
)
with tarfile.open(config.snapshot_storage_tar_path, 'w') as tar:
tar.add(
config.snapshot_storage_path,
arcname=config.snapshot_storage_path.name,
)
cleanup()
def main(
argv: list[str] = None,
make_before_build: list[str] = None,
argparser_update_func: Callable[argparse.ArgumentParser, None] = None,
config_update_function: Callable[
[IntegrationTestConfiguration, argparse.Namespace], None
] = None,
) -> None:
"""Main method to start the integration tests.
Args:
argv: A list of arguments to parse.
make_before_build: A list of targets to make before running build steps.
argparser_update_func: A function that takes an ArgumentParser object
and updates it.
config_update_function: A function that takes a
IntegrationTestConfiguration config and the parsed args to updates the
config.
"""
if not argv:
argv = sys.argv
if make_before_build is None:
make_before_build = []
args, unittest_argv = _parse_known_args(argv, argparser_update_func)
snapshot_storage_dir_name = 'snapshot_storage'
snapshot_storage_tar_name = 'snapshot.tar'
integration_test_out_path = Path(
tempfile.gettempdir(),
'asuite_integration_tests_%s'
% Path('~').expanduser().name.replace(' ', '_'),
)
if SNAPSHOT_STORAGE_TAR_KEY in os.environ:
snapshot_storage_tar_path = Path(os.environ[SNAPSHOT_STORAGE_TAR_KEY])
snapshot_storage_tar_path.parent.mkdir(parents=True, exist_ok=True)
elif ANDROID_BUILD_TOP_KEY in os.environ:
snapshot_storage_tar_path = integration_test_out_path.joinpath(
snapshot_storage_tar_name
)
else:
raise EnvironmentError(
'Cannot determine snapshot storage tar path. Try set the'
f' {SNAPSHOT_STORAGE_TAR_KEY} environment value. Current'
f' environment variables: {os.environ}'
)
_configure_logging(args.verbose, snapshot_storage_tar_path.parent)
logging.debug('The os environ is: %s', os.environ)
# When the build or test is unset, assume it's a local run for both build
# and test steps.
is_build_test_unset = not args.build and not args.test
config = IntegrationTestConfiguration()
config.is_build_env = args.build or is_build_test_unset
config.is_test_env = args.test or is_build_test_unset
config.device_serial = args.serial
config.snapshot_storage_path = integration_test_out_path.joinpath(
snapshot_storage_dir_name
)
config.snapshot_storage_tar_path = snapshot_storage_tar_path
config.workspace_path = integration_test_out_path.joinpath('workspace')
# Device serial is not required during local run, and
# ANDROID_BUILD_TOP_KEY env being available implies it's local run.
config.is_device_serial_required = not ANDROID_BUILD_TOP_KEY in os.environ
config.is_tar_snapshot = args.tar_snapshot
if config_update_function:
config_update_function(config, args)
if config.is_build_env:
if ANDROID_BUILD_TOP_KEY not in os.environ:
raise EnvironmentError(
f'Environment variable {ANDROID_BUILD_TOP_KEY} is required to'
' build the integration test.'
)
for target in make_before_build:
subprocess.check_call(
f'build/soong/soong_ui.bash --make-mode {target}'.split(),
cwd=os.environ[ANDROID_BUILD_TOP_KEY],
)
if config.is_build_env ^ config.is_test_env:
_run_test(config, unittest_argv, args.test_output_file)
return
build_config = copy.deepcopy(config)
build_config.is_test_env = False
test_config = copy.deepcopy(config)
test_config.is_build_env = False
_run_test(build_config, unittest_argv, args.test_output_file)
_run_test(test_config, unittest_argv, args.test_output_file)