| # Copyright 2018, The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Module Info class used to hold cached module-info.json.""" |
| |
| # pylint: disable=too-many-lines |
| from __future__ import annotations |
| |
| import collections |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| import pickle |
| import re |
| import shutil |
| import sqlite3 |
| import sys |
| import tempfile |
| import time |
| from typing import Any, Callable, Dict, List, Set, Tuple |
| |
| from atest import atest_utils |
| from atest import constants |
| from atest.atest_enum import DetectType, ExitCode |
| from atest.metrics import metrics |
| |
| |
| # JSON file generated by build system that lists all buildable targets. |
| _MODULE_INFO = 'module-info.json' |
| # JSON file generated by build system that lists dependencies for java. |
| _JAVA_DEP_INFO = 'module_bp_java_deps.json' |
| # JSON file generated by build system that lists dependencies for cc. |
| _CC_DEP_INFO = 'module_bp_cc_deps.json' |
| # JSON file generated by atest merged the content from module-info, |
| # module_bp_java_deps.json, and module_bp_cc_deps. |
| _MERGED_INFO = 'atest_merged_dep.json' |
| _DB_VERSION = 2 |
| _DB_NAME = f'module-info.{_DB_VERSION}.db' |
| _NAME_MODULE_TABLE = 'modules' |
| _PATH_MODULE_TABLE = 'path_modules' |
| |
| |
| Module = Dict[str, Any] |
| |
| |
| def load_from_file( |
| module_file: Path = None, |
| force_build: bool = False, |
| ) -> ModuleInfo: |
| """Factory method that initializes ModuleInfo from the build-generated |
| |
| JSON file |
| """ |
| loader = Loader( |
| module_file=module_file, |
| force_build=force_build, |
| need_merge_fn=lambda: False, |
| ) |
| |
| mi = loader.load() |
| |
| return mi |
| |
| |
| def load_from_dict(name_to_module_info: Dict[str, Any]) -> ModuleInfo: |
| """Factory method that initializes ModuleInfo from a dictionary.""" |
| path_to_module_info = get_path_to_module_info(name_to_module_info) |
| return ModuleInfo( |
| name_to_module_info=name_to_module_info, |
| path_to_module_info=path_to_module_info, |
| get_testable_modules=lambda s: _get_testable_modules( |
| name_to_module_info, path_to_module_info, s |
| ), |
| ) |
| |
| |
| def create_empty() -> ModuleInfo: |
| """Factory method that initializes an empty ModuleInfo.""" |
| return ModuleInfo() |
| |
| |
| def load( |
| force_build: bool = False, sqlite_module_cache: bool = False |
| ) -> ModuleInfo: |
| """Factory method that initializes ModuleInfo from the build-generated |
| |
| JSON or Sqlite file. |
| """ |
| mod_start = time.time() |
| loader = Loader( |
| force_build=force_build, sqlite_module_cache=sqlite_module_cache |
| ) |
| mod_stop = time.time() - mod_start |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.MODULE_INFO_INIT_MS, result=int(mod_stop * 1000) |
| ) |
| |
| return loader.load(save_timestamps=True) |
| |
| |
| def metrics_timer(func): |
| """Decorator method for sending data to metrics.""" |
| |
| def wrapper(*args, **kwargs): |
| start = time.time() |
| result = func(*args, **kwargs) |
| elapsed_time = int(time.time() - start) |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.TESTABLE_MODULES, result=elapsed_time |
| ) |
| return result |
| |
| return wrapper |
| |
| |
| class Loader: |
| """Class that handles load and merge processes.""" |
| |
| def __init__( |
| self, |
| module_file: Path = None, |
| force_build: bool = False, |
| sqlite_module_cache: bool = False, |
| need_merge_fn: Callable = None, |
| ): |
| logging.debug( |
| 'Creating module info loader object with module_file: %s, force_build:' |
| ' %s, sqlite_module_cache: %s, need_merge_fn: %s', |
| module_file, |
| force_build, |
| sqlite_module_cache, |
| need_merge_fn, |
| ) |
| self.java_dep_path = atest_utils.get_build_out_dir('soong', _JAVA_DEP_INFO) |
| self.cc_dep_path = atest_utils.get_build_out_dir('soong', _CC_DEP_INFO) |
| self.merged_dep_path = atest_utils.get_product_out(_MERGED_INFO) |
| logging.debug( |
| 'java_dep_path: %s, cc_dep_path: %s, merged_dep_path: %s', |
| self.java_dep_path, |
| self.cc_dep_path, |
| self.merged_dep_path, |
| ) |
| |
| self.sqlite_module_cache = sqlite_module_cache |
| logging.debug('sqlite_module_cache: %s', sqlite_module_cache) |
| if self.sqlite_module_cache: |
| self.cache_file = atest_utils.get_product_out(_DB_NAME) |
| self.save_cache_async = self._save_db_async |
| self.load_from_cache = self._load_from_db |
| else: |
| self.cache_file = self.merged_dep_path |
| self.save_cache_async = self._save_json_async |
| self.load_from_cache = self._load_from_json |
| |
| if need_merge_fn: |
| self.save_cache_async = lambda _, __: None |
| |
| self.update_merge_info = False |
| self.module_index = atest_utils.get_index_path( |
| f'suite-modules.{_DB_VERSION}.idx' |
| ) |
| self.module_index_proc = None |
| logging.debug('module_index: %s', self.module_index) |
| |
| if module_file: |
| self.mod_info_file_path = Path(module_file) |
| self.load_module_info = self._load_module_info_from_file_wo_merging |
| else: |
| self.mod_info_file_path = atest_utils.get_product_out(_MODULE_INFO) |
| if force_build: |
| logging.debug('Triggering module info build by force build.') |
| build() |
| elif not self.mod_info_file_path.is_file(): |
| logging.debug( |
| 'Triggering module info build due to module info file path %s not' |
| ' exist.', |
| self.mod_info_file_path, |
| ) |
| build() |
| |
| self.update_merge_info = self.need_merge_module_info() |
| self.load_module_info = self._load_module_info_file |
| |
| logging.debug( |
| 'Executing load_module_info function %s', self.load_module_info |
| ) |
| self.name_to_module_info, self.path_to_module_info = self.load_module_info() |
| |
| logging.debug('Completed creating module info loader object') |
| |
| def load(self, save_timestamps: bool = False): |
| logging.debug('Loading ModuleInfo. save_timestamps: %s', save_timestamps) |
| if save_timestamps: |
| atest_utils.run_multi_proc(func=atest_utils.save_build_files_timestamp) |
| |
| return ModuleInfo( |
| name_to_module_info=self.name_to_module_info, |
| path_to_module_info=self.path_to_module_info, |
| mod_info_file_path=self.mod_info_file_path, |
| get_testable_modules=self.get_testable_modules, |
| ) |
| |
| def _load_module_info_file(self): |
| """Load module-info.json file as ModuleInfo and merge related JSON files |
| |
| whenever required. |
| |
| Returns: |
| Dict of module name to module info and dict of module path to module |
| info. |
| """ |
| # +--------------+ +----------------------------------+ |
| # | ModuleInfo() | | ModuleInfo(module_file=foo.json) | |
| # +-------+------+ +----------------+-----------------+ |
| # | module_info.build() | load |
| # v V |
| # +--------------------------+ +--------------------------+ |
| # | module-info.json | | foo.json | |
| # | module_bp_cc_deps.json | | module_bp_cc_deps.json | |
| # | module_bp_java_deps.json | | module_bp_java_deps.json | |
| # +--------------------------+ +--------------------------+ |
| # | | |
| # | _merge_soong_info() <--------------------+ |
| # v |
| # +============================+ |
| # | $ANDROID_PRODUCT_OUT | |
| # | /atest_merged_dep.json |--> load as module info. |
| # +============================+ |
| if not self.update_merge_info: |
| return self.load_from_cache() |
| |
| name_modules, path_modules = self._load_from_json(merge=True) |
| self.save_cache_async(name_modules, path_modules) |
| self._save_testable_modules_async(name_modules, path_modules) |
| |
| return name_modules, path_modules |
| |
| def _load_module_info_from_file_wo_merging(self): |
| """Load module-info.json as ModuleInfo without merging.""" |
| name_modules = atest_utils.load_json_safely(self.mod_info_file_path) |
| _add_missing_variant_modules(name_modules) |
| |
| return name_modules, get_path_to_module_info(name_modules) |
| |
| def _save_db_async( |
| self, |
| name_to_module_info: Dict[str, Any], |
| path_to_module_info: Dict[str, Any], |
| ): |
| """Save data to a Sqlite database in parallel.""" |
| data_map = { |
| _NAME_MODULE_TABLE: name_to_module_info, |
| _PATH_MODULE_TABLE: path_to_module_info, |
| } |
| _save_data_async( |
| function=_create_db, |
| contents=data_map, |
| target_path=self.cache_file, |
| ) |
| |
| def _load_from_db(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: |
| """Return a tuple of dicts by from SqliteDict.""" |
| conn = sqlite3.connect(self.cache_file) |
| with conn: |
| name_to_module_info = SqliteDict(conn, _NAME_MODULE_TABLE) |
| path_to_module_info = SqliteDict(conn, _PATH_MODULE_TABLE) |
| |
| return name_to_module_info, path_to_module_info |
| |
| def _save_json_async(self, name_to_module_info: Dict[str, Any], _): |
| """Save data to a JSON format in parallel.""" |
| _save_data_async( |
| function=_create_json, |
| contents=name_to_module_info, |
| target_path=self.cache_file, |
| ) |
| |
| def _load_from_json(self, merge: bool = False) -> Tuple[Dict, Dict]: |
| """Load or merge module info from json file. |
| |
| Args: |
| merge: Boolean whether to merge build system infos. |
| |
| Returns: |
| A tuple of (name_to_module_info, path_to_module_info). |
| """ |
| start = time.time() |
| if merge: |
| name_info = self._merge_build_system_infos( |
| atest_utils.load_json_safely(self.mod_info_file_path) |
| ) |
| duration = time.time() - start |
| logging.debug('Merging module info took %ss', duration) |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.MODULE_MERGE_MS, result=int(duration * 1000) |
| ) |
| |
| return name_info, get_path_to_module_info(name_info) |
| |
| name_info = atest_utils.load_json_safely(self.merged_dep_path) |
| duration = time.time() - start |
| logging.debug('Loading module info took %ss', duration) |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.MODULE_LOAD_MS, result=int(duration * 1000) |
| ) |
| logging.debug('Loading %s as module-info.', self.merged_dep_path) |
| |
| return name_info, get_path_to_module_info(name_info) |
| |
| def _save_testable_modules_async( |
| self, |
| name_to_module_info: Dict[str, Any], |
| path_to_module_info: Dict[str, Any], |
| ): |
| """Save testable modules in parallel.""" |
| return atest_utils.run_multi_proc( |
| func=_get_testable_modules, |
| kwargs={ |
| 'name_to_module_info': name_to_module_info, |
| 'path_to_module_info': path_to_module_info, |
| 'index_path': self.module_index, |
| }, |
| ) |
| |
| def need_merge_module_info(self): |
| """Check if needed to regenerate the cache file. |
| |
| If the cache file is non-existent or testable module index is inexistent |
| or older than any of the JSON files used to generate it, the cache file |
| must re-generate. |
| |
| Returns: |
| True when the cache file is older or non-existent, False otherwise. |
| """ |
| if not self.cache_file.is_file(): |
| return True |
| |
| if not self.module_index.is_file(): |
| return True |
| |
| # The dependency input files should be generated at this point. |
| return any( |
| self.cache_file.stat().st_mtime < f.stat().st_mtime |
| for f in (self.mod_info_file_path, self.java_dep_path, self.cc_dep_path) |
| ) |
| |
| def _merge_build_system_infos( |
| self, name_to_module_info, java_bp_info_path=None, cc_bp_info_path=None |
| ): |
| """Merge the content of module-info.json and CC/Java dependency files |
| |
| to name_to_module_info. |
| |
| Args: |
| name_to_module_info: Dict of module name to module info dict. |
| java_bp_info_path: String of path to java dep file to load up. Used for |
| testing. |
| cc_bp_info_path: String of path to cc dep file to load up. Used for |
| testing. |
| |
| Returns: |
| Dict of updated name_to_module_info. |
| """ |
| # Merge _JAVA_DEP_INFO |
| if not java_bp_info_path: |
| java_bp_info_path = self.java_dep_path |
| java_bp_infos = atest_utils.load_json_safely(java_bp_info_path) |
| if java_bp_infos: |
| logging.debug('Merging Java build info: %s', java_bp_info_path) |
| name_to_module_info = merge_soong_info(name_to_module_info, java_bp_infos) |
| # Merge _CC_DEP_INFO |
| if not cc_bp_info_path: |
| cc_bp_info_path = self.cc_dep_path |
| cc_bp_infos = atest_utils.load_json_safely(cc_bp_info_path) |
| if cc_bp_infos: |
| logging.debug('Merging CC build info: %s', cc_bp_info_path) |
| # CC's dep json format is different with java. |
| # Below is the example content: |
| # { |
| # "clang": "${ANDROID_ROOT}/bin/clang", |
| # "clang++": "${ANDROID_ROOT}/bin/clang++", |
| # "modules": { |
| # "ACameraNdkVendorTest": { |
| # "path": [ |
| # "frameworks/av/camera/ndk" |
| # ], |
| # "srcs": [ |
| # "frameworks/tests/AImageVendorTest.cpp", |
| # "frameworks/tests/ACameraManagerTest.cpp" |
| # ], |
| name_to_module_info = merge_soong_info( |
| name_to_module_info, cc_bp_infos.get('modules', {}) |
| ) |
| # If $ANDROID_PRODUCT_OUT was not created in pyfakefs, simply return it |
| # without dumping atest_merged_dep.json in real. |
| |
| # Adds the key into module info as a unique ID. |
| for key, info in name_to_module_info.items(): |
| info[constants.MODULE_INFO_ID] = key |
| |
| _add_missing_variant_modules(name_to_module_info) |
| |
| return name_to_module_info |
| |
| @metrics_timer |
| def get_testable_modules(self, suite=None): |
| """Return the testable modules of the given suite name. |
| |
| Atest does not index testable modules against compatibility_suites. When |
| suite was given, or the index file was interrupted, always run |
| _get_testable_modules() and re-index. |
| |
| Args: |
| suite: A string of suite name. |
| |
| Returns: |
| If suite is not given, return all the testable modules in module |
| info, otherwise return only modules that belong to the suite. |
| """ |
| modules = set() |
| |
| if self.module_index.is_file(): |
| modules = self.get_testable_modules_from_index(suite) |
| # If the modules.idx does not exist or invalid for any reason, generate |
| # a new one arbitrarily. |
| if not modules: |
| modules = self.get_testable_module_from_memory(suite) |
| |
| return modules |
| |
| def get_testable_modules_from_index(self, suite: str = None) -> Set[str]: |
| """Return the testable modules of the given suite name.""" |
| suite_to_modules = {} |
| with open(self.module_index, 'rb') as cache: |
| try: |
| suite_to_modules = pickle.load(cache, encoding='utf-8') |
| except UnicodeDecodeError: |
| suite_to_modules = pickle.load(cache) |
| # when module indexing was interrupted. |
| except EOFError: |
| pass |
| |
| return _filter_modules_by_suite(suite_to_modules, suite) |
| |
| def get_testable_module_from_memory(self, suite: str = None) -> Set[str]: |
| """Return the testable modules of the given suite name.""" |
| return _get_testable_modules( |
| name_to_module_info=self.name_to_module_info, |
| path_to_module_info=self.path_to_module_info, |
| index_path=self.module_index, |
| suite=suite, |
| ) |
| |
| |
| class ModuleInfo: |
| """Class that offers fast/easy lookup for Module related details.""" |
| |
| def __init__( |
| self, |
| name_to_module_info: Dict[str, Any] = None, |
| path_to_module_info: Dict[str, Any] = None, |
| mod_info_file_path: Path = None, |
| get_testable_modules: Callable = None, |
| ): |
| """Initialize the ModuleInfo object. |
| |
| Load up the module-info.json file and initialize the helper vars. |
| Note that module-info.json does not contain all module dependencies, |
| therefore, Atest needs to accumulate dependencies defined in bp files. |
| |
| Args: |
| name_to_module_info: Dict of name to module info. |
| path_to_module_info: Dict of path to module info. |
| mod_info_file_path: Path of module-info.json. |
| get_testable_modules: Function to get all testable modules. |
| """ |
| # +----------------------+ +----------------------------+ |
| # | $ANDROID_PRODUCT_OUT | |$ANDROID_BUILD_TOP/out/soong| |
| # | /module-info.json | | /module_bp_java_deps.json | |
| # +-----------+----------+ +-------------+--------------+ |
| # | _merge_soong_info() | |
| # +------------------------------+ |
| # | |
| # v |
| # +----------------------------+ +----------------------------+ |
| # |tempfile.NamedTemporaryFile | |$ANDROID_BUILD_TOP/out/soong| |
| # +-------------+--------------+ | /module_bp_cc_deps.json | |
| # | +-------------+--------------+ |
| # | _merge_soong_info() | |
| # +-------------------------------+ |
| # | |
| # +-------| |
| # v |
| # +============================+ |
| # | $ANDROID_PRODUCT_OUT | |
| # | /atest_merged_dep.json |--> load as module info. |
| # +============================+ |
| self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) |
| |
| self.name_to_module_info = name_to_module_info or {} |
| self.path_to_module_info = path_to_module_info or {} |
| self.mod_info_file_path = mod_info_file_path |
| self._get_testable_modules = get_testable_modules |
| |
| def is_module(self, name): |
| """Return True if name is a module, False otherwise.""" |
| info = self.get_module_info(name) |
| # From aosp/2293302 it started merging all modules' dependency in bp |
| # even the module is not be exposed to make, and those modules could not |
| # be treated as a build target using m. Only treat input name as module |
| # if it also has the module_name attribute which means it could be a |
| # build target for m. |
| if info and info.get(constants.MODULE_NAME): |
| return True |
| return False |
| |
| def get_paths(self, name) -> list[str]: |
| """Return paths of supplied module name, Empty list if non-existent.""" |
| info = self.get_module_info(name) |
| if info: |
| return info.get(constants.MODULE_PATH, []) |
| return [] |
| |
| def get_module_names(self, rel_module_path): |
| """Get the modules that all have module_path. |
| |
| Args: |
| rel_module_path: path of module in module-info.json |
| |
| Returns: |
| List of module names. |
| """ |
| return _get_module_names(self.path_to_module_info, rel_module_path) |
| |
| def get_module_info(self, mod_name): |
| """Return dict of info for given module name, None if non-existence.""" |
| return self.name_to_module_info.get(mod_name) |
| |
| @staticmethod |
| def is_suite_in_compatibility_suites(suite, mod_info): |
| """Check if suite exists in the compatibility_suites of module-info. |
| |
| Args: |
| suite: A string of suite name. |
| mod_info: Dict of module info to check. |
| |
| Returns: |
| True if it exists in mod_info, False otherwise. |
| """ |
| if not isinstance(mod_info, dict): |
| return False |
| return suite in mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, []) |
| |
| def get_testable_modules(self, suite=None): |
| return self._get_testable_modules(suite) |
| |
| @staticmethod |
| def is_tradefed_testable_module(info: Dict[str, Any]) -> bool: |
| """Check whether the module is a Tradefed executable test.""" |
| if not info: |
| return False |
| if not info.get(constants.MODULE_INSTALLED, []): |
| return False |
| return ModuleInfo.has_test_config(info) |
| |
| @staticmethod |
| def is_mobly_module(info: Dict[str, Any]) -> bool: |
| """Check whether the module is a Mobly test. |
| |
| Note: Only python_test_host modules marked with a test_options tag of |
| "mobly" is considered a Mobly module. |
| |
| Args: |
| info: Dict of module info to check. |
| |
| Returns: |
| True if this is a Mobly test module, False otherwise. |
| """ |
| return constants.MOBLY_TEST_OPTIONS_TAG in info.get( |
| constants.MODULE_TEST_OPTIONS_TAGS, [] |
| ) |
| |
| def is_testable_module(self, info: Dict[str, Any]) -> bool: |
| """Check if module is something we can test. |
| |
| A module is testable if: |
| - it's a tradefed testable module, or |
| - it's a Mobly module, or |
| - it's a robolectric module (or shares path with one). |
| |
| Args: |
| info: Dict of module info to check. |
| |
| Returns: |
| True if we can test this module, False otherwise. |
| """ |
| return _is_testable_module( |
| self.name_to_module_info, self.path_to_module_info, info |
| ) |
| |
| @staticmethod |
| def has_test_config(info: Dict[str, Any]) -> bool: |
| """Validate if this module has a test config. |
| |
| A module can have a test config in the following manner: |
| - test_config be set in module-info.json. |
| - Auto-generated config via the auto_test_config key |
| in module-info.json. |
| |
| Args: |
| info: Dict of module info to check. |
| |
| Returns: |
| True if this module has a test config, False otherwise. |
| """ |
| return bool( |
| info.get(constants.MODULE_TEST_CONFIG, []) |
| or info.get('auto_test_config', []) |
| ) |
| |
| def is_legacy_robolectric_test(self, info: Dict[str, Any]) -> bool: |
| """Return whether the module_name is a legacy Robolectric test""" |
| return _is_legacy_robolectric_test( |
| self.name_to_module_info, self.path_to_module_info, info |
| ) |
| |
| def get_robolectric_test_name(self, info: Dict[str, Any]) -> str: |
| """Returns runnable robolectric module name. |
| |
| This method is for legacy robolectric tests and returns one of associated |
| modules. The pattern is determined by the amount of shards: |
| |
| 10 shards: |
| FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 |
| No shard: |
| FooTests -> RunFooTests |
| |
| Arg: |
| info: Dict of module info to check. |
| |
| Returns: |
| String of the first-matched associated module that belongs to the |
| actual robolectric module, None if nothing has been found. |
| """ |
| return _get_robolectric_test_name( |
| self.name_to_module_info, self.path_to_module_info, info |
| ) |
| |
| def is_robolectric_test(self, module_name): |
| """Check if the given module is a robolectric test. |
| |
| Args: |
| module_name: String of module to check. |
| |
| Returns: |
| Boolean whether it's a robotest or not. |
| """ |
| if self.get_robolectric_type(module_name): |
| return True |
| return False |
| |
| def get_robolectric_type(self, module_name: str) -> int: |
| """Check if the given module is a robolectric test and return type of it. |
| |
| Robolectric declaration is converting from Android.mk to Android.bp, and |
| in the interim Atest needs to support testing both types of tests. |
| |
| The modern robolectric tests defined by 'android_robolectric_test' in an |
| Android.bp file can can be run in Tradefed Test Runner: |
| |
| SettingsRoboTests -> Tradefed Test Runner |
| |
| Legacy tests defined in an Android.mk can only run with the 'make' way. |
| |
| SettingsRoboTests -> make RunSettingsRoboTests0 |
| |
| To determine whether the test is a modern/legacy robolectric test: |
| 1. If the 'robolectric-test` in the compatibility_suites, it's a |
| modern one, otherwise it's a legacy test. This is accurate since |
| aosp/2308586 already set the test suite of `robolectric-test` |
| for all `modern` Robolectric tests in Soong. |
| 2. Traverse all modules share the module path. If one of the |
| modules has a ROBOLECTRIC class, it's a legacy robolectric test. |
| |
| Args: |
| module_name: String of module to check. |
| |
| Returns: |
| 0: not a robolectric test. |
| 1: a modern robolectric test(defined in Android.bp) |
| 2: a legacy robolectric test(defined in Android.mk) |
| """ |
| info = self.get_module_info(module_name) |
| if not info: |
| return 0 |
| # Some Modern mode Robolectric test has related module which compliant |
| # with the Legacy Robolectric test. In this case, the Modern mode |
| # Robolectric tests should be prior to the Legacy mode. |
| if self.is_modern_robolectric_test(info): |
| return constants.ROBOTYPE_MODERN |
| if self.is_legacy_robolectric_test(info): |
| return constants.ROBOTYPE_LEGACY |
| return 0 |
| |
| def get_instrumentation_target_apps(self, module_name: str) -> Dict: |
| """Return target APKs of an instrumentation test. |
| |
| Returns: |
| A dict of target module and target APK(s). e.g. |
| {"FooService": {"/path/to/the/FooService.apk"}} |
| """ |
| # 1. Determine the actual manifest filename from an Android.bp(if any) |
| manifest = self.get_filepath_from_module(module_name, 'AndroidManifest.xml') |
| bpfile = self.get_filepath_from_module(module_name, 'Android.bp') |
| if bpfile.is_file(): |
| bp_info = atest_utils.get_bp_content(bpfile, 'android_test') |
| if not bp_info or not bp_info.get(module_name): |
| return {} |
| manifest = self.get_filepath_from_module( |
| module_name, bp_info.get(module_name).get('manifest') |
| ) |
| xml_info = atest_utils.get_manifest_info(manifest) |
| # 2. Translate package name to a module name. |
| package = xml_info.get('package') |
| target_package = xml_info.get('target_package') |
| # Ensure it's an instrumentation test(excluding self-instrmented) |
| if target_package and package != target_package: |
| logging.debug('Found %s an instrumentation test.', module_name) |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.FOUND_INSTRUMENTATION_TEST, result=1 |
| ) |
| target_module = self.get_target_module_by_pkg( |
| package=target_package, search_from=manifest.parent |
| ) |
| if target_module: |
| return self.get_artifact_map(target_module) |
| return {} |
| |
| # pylint: disable=anomalous-backslash-in-string |
| def get_target_module_by_pkg(self, package: str, search_from: Path) -> str: |
| """Translate package name to the target module name. |
| |
| This method is dedicated to determine the target module by translating |
| a package name. |
| |
| Phase 1: Find out possible manifest files among parent directories. |
| Phase 2. Look for the defined package fits the given name, and ensure |
| it is not a persistent app. |
| Phase 3: Translate the manifest path to possible modules. A valid module |
| must fulfill: |
| 1. The 'class' type must be ['APPS']. |
| 2. It is not a Robolectric test. |
| |
| Returns: |
| A string of module name. |
| """ |
| xmls = [] |
| for pth in search_from.parents: |
| if pth == Path(self.root_dir): |
| break |
| for name in os.listdir(pth): |
| if pth.joinpath(name).is_file(): |
| match = re.match('.*AndroidManifest.*\.xml$', name) |
| if match: |
| xmls.append(os.path.join(pth, name)) |
| possible_modules = [] |
| for xml in xmls: |
| rel_dir = str(Path(xml).relative_to(self.root_dir).parent) |
| logging.debug('Looking for package "%s" in %s...', package, xml) |
| xml_info = atest_utils.get_manifest_info(xml) |
| if xml_info.get('package') == package: |
| if xml_info.get('persistent'): |
| logging.debug('%s is a persistent app.', package) |
| continue |
| for _m in self.path_to_module_info.get(rel_dir): |
| possible_modules.append(_m) |
| if possible_modules: |
| for mod in possible_modules: |
| name = mod.get('module_name') |
| if mod.get('class') == ['APPS'] and not self.is_robolectric_test(name): |
| return name |
| return '' |
| |
| def get_artifact_map(self, module_name: str) -> Dict: |
| """Get the installed APK path of the given module.""" |
| target_mod_info = self.get_module_info(module_name) |
| artifact_map = {} |
| if target_mod_info: |
| apks = set() |
| artifacts = target_mod_info.get('installed') |
| for artifact in artifacts: |
| if Path(artifact).suffix == '.apk': |
| apks.add(os.path.join(self.root_dir, artifact)) |
| artifact_map.update({module_name: apks}) |
| return artifact_map |
| |
| def is_auto_gen_test_config(self, module_name): |
| """Check if the test config file will be generated automatically. |
| |
| Args: |
| module_name: A string of the module name. |
| |
| Returns: |
| True if the test config file will be generated automatically. |
| """ |
| if self.is_module(module_name): |
| mod_info = self.get_module_info(module_name) |
| auto_test_config = mod_info.get('auto_test_config', []) |
| return auto_test_config and auto_test_config[0] |
| return False |
| |
| @staticmethod |
| def is_legacy_robolectric_class(info: Dict[str, Any]) -> bool: |
| """Check if the class is `ROBOLECTRIC` |
| |
| This method is for legacy robolectric tests that the associated modules |
| contain: |
| 'class': ['ROBOLECTRIC'] |
| |
| Args: |
| info: ModuleInfo to check. |
| |
| Returns: |
| True if the attribute class in mod_info is ROBOLECTRIC, False |
| otherwise. |
| """ |
| if info: |
| module_classes = info.get(constants.MODULE_CLASS, []) |
| return ( |
| module_classes |
| and module_classes[0] == constants.MODULE_CLASS_ROBOLECTRIC |
| ) |
| return False |
| |
| def is_native_test(self, module_name): |
| """Check if the input module is a native test. |
| |
| Args: |
| module_name: A string of the module name. |
| |
| Returns: |
| True if the test is a native test, False otherwise. |
| """ |
| mod_info = self.get_module_info(module_name) |
| return constants.MODULE_CLASS_NATIVE_TESTS in mod_info.get( |
| constants.MODULE_CLASS, [] |
| ) |
| |
| def has_mainline_modules( |
| self, module_name: str, mainline_binaries: List[str] |
| ) -> bool: |
| """Check if the mainline modules are in module-info. |
| |
| Args: |
| module_name: A string of the module name. |
| mainline_binaries: A list of mainline module binaries. |
| |
| Returns: |
| True if mainline_binaries is in module-info, False otherwise. |
| """ |
| mod_info = self.get_module_info(module_name) |
| # Check 'test_mainline_modules' attribute of the module-info.json. |
| mm_in_mf = mod_info.get(constants.MODULE_MAINLINE_MODULES, []) |
| ml_modules_set = set(mainline_binaries) |
| if mm_in_mf: |
| return contains_same_mainline_modules(ml_modules_set, set(mm_in_mf)) |
| for test_config in mod_info.get(constants.MODULE_TEST_CONFIG, []): |
| # Check the value of 'mainline-param' in the test config. |
| if not self.is_auto_gen_test_config(module_name): |
| return contains_same_mainline_modules( |
| ml_modules_set, |
| atest_utils.get_mainline_param( |
| os.path.join(self.root_dir, test_config) |
| ), |
| ) |
| # Unable to verify mainline modules in an auto-gen test config. |
| logging.debug( |
| '%s is associated with an auto-generated test config.', module_name |
| ) |
| return True |
| return False |
| |
| def get_filepath_from_module(self, module_name: str, filename: str) -> Path: |
| """Return absolute path of the given module and filename.""" |
| mod_path = self.get_paths(module_name) |
| if mod_path: |
| return Path(self.root_dir).joinpath(mod_path[0], filename) |
| return Path() |
| |
| def get_module_dependency(self, module_name, depend_on=None): |
| """Get the dependency sets for input module. |
| |
| Recursively find all the dependencies of the input module. |
| |
| Args: |
| module_name: String of module to check. |
| depend_on: The list of parent dependencies. |
| |
| Returns: |
| Set of dependency modules. |
| """ |
| if not depend_on: |
| depend_on = set() |
| deps = set() |
| mod_info = self.get_module_info(module_name) |
| if not mod_info: |
| return deps |
| mod_deps = set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) |
| # Remove item in deps if it already in depend_on: |
| mod_deps = mod_deps - depend_on |
| deps = deps.union(mod_deps) |
| for mod_dep in mod_deps: |
| deps = deps.union( |
| set( |
| self.get_module_dependency( |
| mod_dep, depend_on=depend_on.union(deps) |
| ) |
| ) |
| ) |
| return deps |
| |
| def get_install_module_dependency(self, module_name, depend_on=None): |
| """Get the dependency set for the given modules with installed path. |
| |
| Args: |
| module_name: String of module to check. |
| depend_on: The list of parent dependencies. |
| |
| Returns: |
| Set of dependency modules which has installed path. |
| """ |
| install_deps = set() |
| deps = self.get_module_dependency(module_name, depend_on) |
| logging.debug('%s depends on: %s', module_name, deps) |
| for module in deps: |
| mod_info = self.get_module_info(module) |
| if mod_info and mod_info.get(constants.MODULE_INSTALLED, []): |
| install_deps.add(module) |
| logging.debug( |
| 'modules %s required by %s were not installed', |
| install_deps, |
| module_name, |
| ) |
| return install_deps |
| |
| @staticmethod |
| def is_unit_test(mod_info): |
| """Return True if input module is unit test, False otherwise. |
| |
| Args: |
| mod_info: ModuleInfo to check. |
| |
| Returns: |
| True if input module is unit test, False otherwise. |
| """ |
| return mod_info.get(constants.MODULE_IS_UNIT_TEST, '') == 'true' |
| |
| def is_host_unit_test(self, info: Dict[str, Any]) -> bool: |
| """Return True if input module is host unit test, False otherwise. |
| |
| Args: |
| info: ModuleInfo to check. |
| |
| Returns: |
| True if input module is host unit test, False otherwise. |
| """ |
| return self.is_tradefed_testable_module( |
| info |
| ) and self.is_suite_in_compatibility_suites('host-unit-tests', info) |
| |
| def is_modern_robolectric_test(self, info: Dict[str, Any]) -> bool: |
| """Return whether 'robolectric-tests' is in 'compatibility_suites'.""" |
| return self.is_tradefed_testable_module( |
| info |
| ) and self.is_robolectric_test_suite(info) |
| |
| def is_robolectric_test_suite(self, mod_info) -> bool: |
| """Return True if 'robolectric-tests' in the compatibility_suites. |
| |
| Args: |
| mod_info: ModuleInfo to check. |
| |
| Returns: |
| True if the 'robolectric-tests' is in the compatibility_suites, |
| False otherwise. |
| """ |
| return self.is_suite_in_compatibility_suites('robolectric-tests', mod_info) |
| |
| def is_ravenwood_test(self, info: Dict[str, Any]) -> bool: |
| """Return whether 'ravenwood-tests' is in 'compatibility_suites'.""" |
| return self.is_tradefed_testable_module( |
| info |
| ) and self.is_ravenwood_test_suite(info) |
| |
| def is_ravenwood_test_suite(self, mod_info) -> bool: |
| """Return True if 'ravenwood-tests' in the compatibility_suites. |
| |
| Args: |
| mod_info: ModuleInfo to check. |
| |
| Returns: |
| True if the 'ravenwood-tests' is in the compatibility_suites, |
| False otherwise. |
| """ |
| return self.is_suite_in_compatibility_suites('ravenwood-tests', mod_info) |
| |
| def is_device_driven_test(self, mod_info): |
| """Return True if input module is device driven test, False otherwise. |
| |
| Args: |
| mod_info: ModuleInfo to check. |
| |
| Returns: |
| True if input module is device driven test, False otherwise. |
| """ |
| if self.is_robolectric_test_suite(mod_info): |
| return False |
| if self.is_ravenwood_test_suite(mod_info): |
| return False |
| |
| return self.is_tradefed_testable_module( |
| mod_info |
| ) and 'DEVICE' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) |
| |
| def is_host_driven_test(self, mod_info): |
| """Return True if input module is host driven test, False otherwise. |
| |
| Args: |
| mod_info: ModuleInfo to check. |
| |
| Returns: |
| True if input module is host driven test, False otherwise. |
| """ |
| return self.is_tradefed_testable_module( |
| mod_info |
| ) and 'HOST' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) |
| |
| def _any_module(self, _: Module) -> bool: |
| return True |
| |
| def get_all_tests(self): |
| """Get a list of all the module names which are tests.""" |
| return self._get_all_modules(type_predicate=self.is_testable_module) |
| |
| def get_all_unit_tests(self): |
| """Get a list of all the module names which are unit tests.""" |
| return self._get_all_modules(type_predicate=ModuleInfo.is_unit_test) |
| |
| def get_all_host_unit_tests(self): |
| """Get a list of all the module names which are host unit tests.""" |
| return self._get_all_modules(type_predicate=self.is_host_unit_test) |
| |
| def get_all_device_driven_tests(self): |
| """Get a list of all the module names which are device driven tests.""" |
| return self._get_all_modules(type_predicate=self.is_device_driven_test) |
| |
| def _get_all_modules(self, type_predicate=None): |
| """Get a list of all the module names that passed the predicate.""" |
| modules = [] |
| type_predicate = type_predicate or self._any_module |
| for mod_name, mod_info in self.name_to_module_info.items(): |
| if mod_info.get(constants.MODULE_NAME, '') == mod_name: |
| if type_predicate(mod_info): |
| modules.append(mod_name) |
| return modules |
| |
| def get_modules_by_path_in_srcs( |
| self, path: str, testable_modules_only: bool = False |
| ) -> Set[str]: |
| """Get the module name that the given path belongs to.(in 'srcs') |
| |
| Args: |
| path: file path which is relative to ANDROID_BUILD_TOP. |
| testable_modules_only: boolean flag which determines whether search |
| testable modules only or not. |
| |
| Returns: |
| A set of string for matched module names, empty set if nothing find. |
| """ |
| modules = set() |
| |
| for mod_name in ( |
| self.get_testable_modules() |
| if testable_modules_only |
| else self.name_to_module_info.keys() |
| ): |
| m_info = self.get_module_info(mod_name) |
| if m_info: |
| for src in m_info.get(constants.MODULE_SRCS, []): |
| if src in path: |
| modules.add(mod_name) |
| |
| return modules |
| |
| def get_modules_by_path( |
| self, path: str, testable_modules_only: bool = False |
| ) -> set[str]: |
| """Get the module names that the give path belongs to. |
| |
| Args: |
| path: dir path for searching among `path` in module information. |
| testable_modules_only: boolean flag which determines whether search |
| testable modules only or not. |
| |
| Returns: |
| A set of module names. |
| """ |
| modules = set() |
| is_testable_module_fn = ( |
| self.is_testable_module if testable_modules_only else lambda _: True |
| ) |
| |
| m_infos = self.path_to_module_info.get(path) |
| if m_infos: |
| modules = { |
| info.get(constants.MODULE_NAME) |
| for info in m_infos |
| if is_testable_module_fn(info) |
| } |
| |
| return modules |
| |
| def get_modules_by_include_deps( |
| self, deps: Set[str], testable_module_only: bool = False |
| ) -> Set[str]: |
| """Get the matched module names for the input dependencies. |
| |
| Args: |
| deps: A set of string for dependencies. |
| testable_module_only: Option if only want to get testable module. |
| |
| Returns: |
| A set of matched module names for the input dependencies. |
| """ |
| modules = set() |
| |
| for mod_name in ( |
| self.get_testable_modules() |
| if testable_module_only |
| else self.name_to_module_info.keys() |
| ): |
| mod_info = self.get_module_info(mod_name) |
| if mod_info and deps.intersection( |
| set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) |
| ): |
| modules.add(mod_info.get(constants.MODULE_NAME)) |
| return modules |
| |
| def get_installed_paths(self, module_name: str) -> List[Path]: |
| """Return installed path from module info.""" |
| mod_info = self.get_module_info(module_name) |
| if not mod_info: |
| return [] |
| |
| def _to_abs_path(p): |
| if os.path.isabs(p): |
| return Path(p) |
| return Path(os.getenv(constants.ANDROID_BUILD_TOP), p) |
| |
| return [_to_abs_path(p) for p in mod_info.get('installed', [])] |
| |
| def get_code_under_test(self, module_name: str) -> List[str]: |
| """Return code under test from module info.""" |
| mod_info = self.get_module_info(module_name) |
| if not mod_info: |
| atest_utils.colorful_print( |
| '\nmodule %s cannot be found in module info, skip generating' |
| ' coverage for it.' % module_name, |
| constants.YELLOW, |
| ) |
| return [] |
| |
| return mod_info.get('code_under_test', []) |
| |
| def build_variants(self, info: Dict[str, Any]) -> List[str]: |
| return info.get(constants.MODULE_SUPPORTED_VARIANTS, []) |
| |
| def requires_device(self, info: Dict[str, Any]) -> bool: |
| |
| if self.is_modern_robolectric_test(info): |
| return False |
| if self.is_ravenwood_test(info): |
| return False |
| if self.is_host_unit_test(info) and 'DEVICE' not in self.build_variants( |
| info |
| ): |
| return False |
| |
| return True |
| |
| |
| def _create_db(data_map: Dict[str, Dict[str, Any]], db_path: Path): |
| """Create a Sqlite DB by writing to tempfile and move it to the right place. |
| |
| Args: |
| data_map: A dict where the key is table name and value is data itself. |
| db_path: A Path pointing to the DB file. |
| """ |
| if db_path.is_file(): |
| db_path.unlink() |
| |
| with tempfile.NamedTemporaryFile(delete=False) as tmp_db: |
| _create_db_in_path(data_map, tmp_db.name) |
| shutil.move(tmp_db.name, db_path) |
| |
| logging.debug('%s is created successfully.', db_path) |
| |
| |
| def _create_db_in_path(data_map: Dict[str, Dict[str, Any]], db_path: Path): |
| """Create a Sqlite DB with multiple tables. |
| |
| Args: |
| data_map: A dict where the key is table name and value is data itself. |
| db_path: A Path pointing to the DB file. |
| """ |
| con = sqlite3.connect(db_path) |
| with con: |
| cur = con.cursor() |
| for table, contents in data_map.items(): |
| cur.execute(f'CREATE TABLE {table}(key TEXT PRIMARY KEY, value TEXT)') |
| |
| data = [] |
| for k, v in contents.items(): |
| data.append({'key': k, 'value': json.dumps(v)}) |
| cur.executemany(f'INSERT INTO {table} VALUES(:key, :value)', data) |
| |
| |
| def _create_json(data_map: Dict[str, Any], json_path: Path): |
| """Write content onto a JSON file. |
| |
| Args: |
| data_map: A dict where the key is table name and value is data itself. |
| json_path: A Path pointing to the JSON file. |
| """ |
| if json_path.is_file(): |
| json_path.unlink() |
| |
| with tempfile.NamedTemporaryFile(delete=False) as temp_json: |
| with open(temp_json.name, 'w', encoding='utf-8') as _temp: |
| json.dump(data_map, _temp, indent=0) |
| shutil.move(temp_json.name, json_path) |
| |
| logging.debug('%s is created successfully.', json_path) |
| |
| |
| def _save_data_async(function: Callable, contents: Any, target_path: Path): |
| """Save contents to a static file in asynchronized manner.""" |
| atest_utils.run_multi_proc( |
| func=function, |
| args=[contents, target_path], |
| # We set `daemon` to `False` to make sure that Atest doesn't exit before |
| # writing the cache file. |
| daemon=False, |
| ) |
| |
| |
| def merge_soong_info(name_to_module_info, mod_bp_infos): |
| """Merge the dependency and srcs in mod_bp_infos to name_to_module_info. |
| |
| Args: |
| name_to_module_info: Dict of module name to module info dict. |
| mod_bp_infos: Dict of module name to bp's module info dict. |
| |
| Returns: |
| Dict of updated name_to_module_info. |
| """ |
| merge_items = [ |
| constants.MODULE_DEPENDENCIES, |
| constants.MODULE_SRCS, |
| constants.MODULE_LIBS, |
| constants.MODULE_STATIC_LIBS, |
| constants.MODULE_STATIC_DEPS, |
| constants.MODULE_PATH, |
| ] |
| for module_name, dep_info in mod_bp_infos.items(): |
| mod_info = name_to_module_info.setdefault(module_name, {}) |
| for merge_item in merge_items: |
| dep_info_values = dep_info.get(merge_item, []) |
| mod_info_values = mod_info.get(merge_item, []) |
| mod_info_values.extend(dep_info_values) |
| mod_info_values.sort() |
| # deduplicate values just in case. |
| mod_info_values = list(dict.fromkeys(mod_info_values)) |
| name_to_module_info[module_name][merge_item] = mod_info_values |
| return name_to_module_info |
| |
| |
| def _add_missing_variant_modules(name_to_module_info: Dict[str, Module]): |
| missing_modules = {} |
| |
| # Android's build system automatically adds a suffix for some build module |
| # variants. For example, a module-info entry for a module originally named |
| # 'HelloWorldTest' might appear as 'HelloWorldTest_32' and which Atest would |
| # not be able to find. We add such entries if not already present so they |
| # can be looked up using their declared module name. |
| for mod_name, mod_info in name_to_module_info.items(): |
| declared_module_name = mod_info.get(constants.MODULE_NAME, mod_name) |
| if declared_module_name in name_to_module_info: |
| continue |
| missing_modules.setdefault(declared_module_name, mod_info) |
| |
| name_to_module_info.update(missing_modules) |
| |
| |
| def contains_same_mainline_modules( |
| mainline_modules: Set[str], module_lists: Set[str] |
| ): |
| """Check if mainline modules listed on command line is |
| |
| the same set as config. |
| |
| Args: |
| mainline_modules: A list of mainline modules from triggered test. |
| module_lists: A list of concatenate mainline module string from test |
| configs. |
| |
| Returns |
| True if the set mainline modules from triggered test is in the test |
| configs. |
| """ |
| for module_string in module_lists: |
| if mainline_modules == set(module_string.split('+')): |
| return True |
| return False |
| |
| |
| def get_path_to_module_info(name_to_module_info): |
| """Return the path_to_module_info dict. |
| |
| Args: |
| name_to_module_info: Dict of module name to module info dict. |
| |
| Returns: |
| Dict of module path to module info dict. |
| """ |
| path_to_module_info = {} |
| for mod_name, mod_info in name_to_module_info.items(): |
| # Cross-compiled and multi-arch modules actually all belong to |
| # a single target so filter out these extra modules. |
| if mod_name != mod_info.get(constants.MODULE_NAME, ''): |
| continue |
| for path in mod_info.get(constants.MODULE_PATH, []): |
| mod_info[constants.MODULE_NAME] = mod_name |
| # There could be multiple modules in a path. |
| if path in path_to_module_info: |
| path_to_module_info[path].append(mod_info) |
| else: |
| path_to_module_info[path] = [mod_info] |
| return path_to_module_info |
| |
| |
| def _get_module_names(path_to_module_info, rel_module_path): |
| """Get the modules that all have module_path. |
| |
| Args: |
| path_to_module_info: Dict of path to module info. |
| rel_module_path: path of module in module-info.json. |
| |
| Returns: |
| List of module names. |
| """ |
| return [ |
| m.get(constants.MODULE_NAME) |
| for m in path_to_module_info.get(rel_module_path, []) |
| ] |
| |
| |
| def _get_robolectric_test_name( |
| name_to_module_info: Dict[str, Dict], |
| path_to_module_info: Dict[str, Dict], |
| info: Dict[str, Any], |
| ) -> str: |
| """Returns runnable robolectric module name. |
| |
| This method is for legacy robolectric tests and returns one of associated |
| modules. The pattern is determined by the amount of shards: |
| |
| 10 shards: |
| FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 |
| No shard: |
| FooTests -> RunFooTests |
| |
| Arg: |
| name_to_module_info: Dict of name to module info. |
| path_to_module_info: Dict of path to module info. |
| info: Dict of module info to check. |
| |
| Returns: |
| String of the first-matched associated module that belongs to the |
| actual robolectric module, None if nothing has been found. |
| """ |
| if not info: |
| return '' |
| module_paths = info.get(constants.MODULE_PATH, []) |
| if not module_paths: |
| return '' |
| filtered_module_names = [ |
| name |
| for name in _get_module_names(path_to_module_info, module_paths[0]) |
| if name.startswith('Run') |
| ] |
| return next( |
| ( |
| name |
| for name in filtered_module_names |
| if ModuleInfo.is_legacy_robolectric_class( |
| name_to_module_info.get(name) |
| ) |
| ), |
| '', |
| ) |
| |
| |
| def _is_legacy_robolectric_test( |
| name_to_module_info: Dict[str, Dict], |
| path_to_module_info: Dict[str, Dict], |
| info: Dict[str, Any], |
| ) -> bool: |
| """Return whether the module_name is a legacy Robolectric test""" |
| if ModuleInfo.is_tradefed_testable_module(info): |
| return False |
| return bool( |
| _get_robolectric_test_name(name_to_module_info, path_to_module_info, info) |
| ) |
| |
| |
| def get_module_info_target() -> str: |
| """Get module info target name for soong_ui.bash""" |
| build_top = atest_utils.get_build_top() |
| module_info_path = atest_utils.get_product_out(_MODULE_INFO) |
| if module_info_path.is_relative_to(build_top): |
| return str(module_info_path.relative_to(build_top)) |
| |
| logging.debug('Found customized OUT_DIR!') |
| return str(module_info_path) |
| |
| |
| def build(): |
| """Build module-info.json""" |
| logging.debug( |
| 'Generating %s - this is required for initial runs or forced rebuilds.', |
| _MODULE_INFO, |
| ) |
| build_start = time.time() |
| if not atest_utils.build([get_module_info_target()]): |
| sys.exit(ExitCode.BUILD_FAILURE) |
| |
| metrics.LocalDetectEvent( |
| detect_type=DetectType.ONLY_BUILD_MODULE_INFO, |
| result=int(time.time() - build_start), |
| ) |
| |
| |
| def _is_testable_module( |
| name_to_module_info: Dict[str, Dict], |
| path_to_module_info: Dict[str, Dict], |
| info: Dict[str, Any], |
| ) -> bool: |
| """Check if module is something we can test. |
| |
| A module is testable if: |
| - it's a tradefed testable module, or |
| - it's a Mobly module, or |
| - it's a robolectric module (or shares path with one). |
| |
| Args: |
| name_to_module_info: Dict of name to module info. |
| path_to_module_info: Dict of path to module info. |
| info: Dict of module info to check. |
| |
| Returns: |
| True if we can test this module, False otherwise. |
| """ |
| if not info or not info.get(constants.MODULE_NAME): |
| return False |
| if ModuleInfo.is_tradefed_testable_module(info): |
| return True |
| if ModuleInfo.is_mobly_module(info): |
| return True |
| if _is_legacy_robolectric_test( |
| name_to_module_info, path_to_module_info, info |
| ): |
| return True |
| return False |
| |
| |
| def _get_testable_modules( |
| name_to_module_info: Dict[str, Dict], |
| path_to_module_info: Dict[str, Dict], |
| suite: str = None, |
| index_path: Path = None, |
| ): |
| """Return testable modules of the given suite name.""" |
| suite_to_modules = _get_suite_to_modules( |
| name_to_module_info, path_to_module_info, index_path |
| ) |
| |
| return _filter_modules_by_suite(suite_to_modules, suite) |
| |
| |
| def _get_suite_to_modules( |
| name_to_module_info: Dict[str, Dict], |
| path_to_module_info: Dict[str, Dict], |
| index_path: Path = None, |
| ) -> Dict[str, Set[str]]: |
| """Map suite and its modules. |
| |
| Args: |
| name_to_module_info: Dict of name to module info. |
| path_to_module_info: Dict of path to module info. |
| index_path: Path of the stored content. |
| |
| Returns: |
| Dict of suite and testable modules mapping. |
| """ |
| suite_to_modules = {} |
| |
| for _, info in name_to_module_info.items(): |
| if _is_testable_module(name_to_module_info, path_to_module_info, info): |
| testable_module = info.get(constants.MODULE_NAME) |
| suites = ( |
| info.get('compatibility_suites') |
| if info.get('compatibility_suites') |
| else ['null-suite'] |
| ) |
| |
| for suite in suites: |
| suite_to_modules.setdefault(suite, set()).add(testable_module) |
| |
| if index_path: |
| _index_testable_modules(suite_to_modules, index_path) |
| |
| return suite_to_modules |
| |
| |
| def _filter_modules_by_suite( |
| suite_to_modules: Dict[str, Set[str]], |
| suite: str = None, |
| ) -> Set[str]: |
| """Return modules of the given suite name.""" |
| if suite: |
| return suite_to_modules.get(suite) |
| |
| return {mod for mod_set in suite_to_modules.values() for mod in mod_set} |
| |
| |
| def _index_testable_modules(contents: Any, index_path: Path): |
| """Dump testable modules. |
| |
| Args: |
| content: An object that will be written to the index file. |
| index_path: Path to the saved index file. |
| """ |
| logging.debug( |
| r'Indexing testable modules... ' |
| r'(This is required whenever module-info.json ' |
| r'was rebuilt.)' |
| ) |
| index_path.parent.mkdir(parents=True, exist_ok=True) |
| with tempfile.NamedTemporaryFile(delete=False) as cache: |
| try: |
| pickle.dump(contents, cache, protocol=2) |
| shutil.move(cache.name, index_path) |
| logging.debug('%s is created successfully.', index_path) |
| except IOError: |
| atest_utils.print_and_log_error('Failed in dumping %s', cache) |
| os.remove(cache.name) |
| |
| |
| class SqliteDict(collections.abc.Mapping): |
| """A class that loads a Sqlite DB as a dictionary-like object. |
| |
| Args: |
| conn: A connection to the Sqlite database. |
| table_name: A string the table name. |
| """ |
| |
| def __init__(self, conn: sqlite3.Connection, table_name: str): |
| """Initialize the SqliteDict instance.""" |
| self.conn = conn |
| self.table = table_name |
| |
| def __iter__(self) -> str: |
| """Iterate over the keys in the SqliteDict.""" |
| for key in self._load_key_rows(): |
| yield key[0] |
| |
| def _load_key_rows(self) -> Set[str]: |
| """Load the key rows from the database table.""" |
| results = self.conn.execute(f'SELECT key FROM {self.table}').fetchall() |
| return set(results) |
| |
| def __len__(self) -> int: |
| """Get the size of key-value pairs in the SqliteDict.""" |
| return len(self._load_key_rows()) |
| |
| def __getitem__(self, key) -> Dict[str, Any]: |
| """Get the value associated with the specified key.""" |
| result = self.conn.execute( |
| f'SELECT value FROM {self.table} WHERE key = ?', (key,) |
| ).fetchone() |
| if result: |
| return json.loads(result[0]) |
| raise KeyError(f'Bad key: {key}') |
| |
| def items(self) -> Tuple[str, Dict[str, Any]]: |
| """Iterate over the key-value pairs in the SqliteDict.""" |
| for key in self: |
| value = self[key] |
| yield key, value |