| #!/usr/bin/env python3 |
| # |
| # Copyright 2022, The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Integration tests for the Atest Bazel mode feature.""" |
| |
| # pylint: disable=invalid-name |
| # pylint: disable=missing-class-docstring |
| # pylint: disable=missing-function-docstring |
| |
| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import tempfile |
| import unittest |
| |
| from pathlib import Path |
| from typing import Any, Dict |
| |
| |
| class ResultCompareTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.src_root_path = Path(os.environ['ANDROID_BUILD_TOP']) |
| self.out_dir_path = Path(tempfile.mkdtemp()) |
| self.test_env = self.setup_test_env() |
| |
| def tearDown(self): |
| shutil.rmtree(self.out_dir_path) |
| |
| def test_standard_mode_and_bazel_mode_result_equal(self): |
| standard_mode_result = self.get_test_result( |
| shell_cmd='atest -c -m --host --host-unit-test-only') |
| |
| bazel_mode_result = self.get_test_result( |
| shell_cmd=('atest -c --bazel-mode --host --host-unit-test-only ' |
| '--bazel-arg=--test_timeout=300'), is_bazel_mode=True) |
| |
| self.assert_test_result_equal(standard_mode_result, bazel_mode_result) |
| |
| def setup_test_env(self) -> Dict[str, Any]: |
| test_env = { |
| 'PATH': os.environ['PATH'], |
| 'HOME': os.environ['HOME'], |
| 'OUT_DIR': str(self.out_dir_path), |
| } |
| return test_env |
| |
| def get_test_result( |
| self, |
| shell_cmd: str, |
| is_bazel_mode: bool = False, |
| ) -> Dict[str, str]: |
| result_file_name = 'test_result' |
| if is_bazel_mode: |
| shell_cmd = ( |
| f'{shell_cmd} ' |
| f'--bazel-arg=--build_event_json_file={result_file_name}') |
| |
| completed_process = self.run_shell_command(shell_cmd) |
| result_file_path = self.get_result_file_path( |
| completed_process, result_file_name, is_bazel_mode) |
| |
| if is_bazel_mode: |
| return parse_bazel_result(result_file_path) |
| return parse_standard_result(result_file_path) |
| |
| def get_result_file_path( |
| self, |
| completed_process: subprocess.CompletedProcess, |
| result_file_name: str, |
| is_bazel_mode: bool = False, |
| ) -> Path: |
| if is_bazel_mode: |
| return self.out_dir_path.joinpath('atest_bazel_workspace', |
| result_file_name) |
| |
| result_file_path = None |
| for line in completed_process.stdout.decode().splitlines(): |
| if line.startswith('Test Logs have saved in'): |
| result_file_path = Path( |
| re.sub('Test Logs have saved in ', "", line).replace( |
| 'log',result_file_name)) |
| break |
| |
| if not result_file_path: |
| raise Exception('Could not find test result filepath') |
| |
| return result_file_path |
| |
| def run_shell_command( |
| self, |
| shell_command: str, |
| ) -> subprocess.CompletedProcess: |
| return subprocess.run( |
| '. build/envsetup.sh && ' |
| 'lunch aosp_cf_x86_64_pc-userdebug && ' |
| f'{shell_command}', |
| env=self.test_env, |
| cwd=self.src_root_path, |
| shell=True, |
| check=False, |
| stderr=subprocess.STDOUT, |
| stdout=subprocess.PIPE) |
| |
| def assert_test_result_equal(self, result1, result2): |
| self.assertEqual(set(result1.keys()), set(result2.keys())) |
| |
| print('{0:100} {1:20} {2}'.format( |
| 'Test', 'Atest Standard Mode', 'Atest Bazel Mode')) |
| count = 0 |
| for k, v in result1.items(): |
| if v != result2[k]: |
| count +=1 |
| print('{0:100} {1:20} {2}'.format(k, v, result2[k])) |
| print(f'Total Number of Host Unit Test: {len(result1)}. {count} tests ' |
| 'have different results.') |
| |
| self.assertEqual(count, 0) |
| |
| |
| def parse_standard_result(result_file: Path) -> Dict[str, str]: |
| result = {} |
| with result_file.open('r') as f: |
| json_result = json.loads(f.read()) |
| for k, v in json_result['test_runner'][ |
| 'AtestTradefedTestRunner'].items(): |
| name = k.split()[-1] |
| if name in result: |
| raise Exception(f'Duplicated Test Target: `{name}`') |
| |
| # Test passed when there are no failed test cases and no errors. |
| result[name] = 'PASSED' if v['summary'][ |
| 'FAILED'] == 0 and not v.get('ERROR') else 'FAILED' |
| return result |
| |
| |
| def parse_bazel_result(result_file: Path) -> Dict[str, str]: |
| result = {} |
| with result_file.open('r') as f: |
| content = f.read() |
| events = content.splitlines() |
| |
| for e in events: |
| json_event = json.loads(e) |
| if 'testSummary' in json_event['id']: |
| name = json_event['id']['testSummary'][ |
| 'label'].split(':')[-1].removesuffix('_host') |
| result[name] = json_event['testSummary']['overallStatus'] |
| return result |
| |
| |
| if __name__ == '__main__': |
| unittest.main(verbosity=2) |