| #!/usr/bin/env python |
| # |
| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| """Unit tests for apexer.""" |
| |
| import hashlib |
| import json |
| import logging |
| import os |
| import shutil |
| import stat |
| import subprocess |
| import tempfile |
| import unittest |
| from importlib import resources |
| from zipfile import ZipFile |
| |
| from apex_manifest import ValidateApexManifest |
| from apex_manifest import ParseApexManifest |
| |
| logger = logging.getLogger(__name__) |
| |
| TEST_APEX = "com.android.example.apex" |
| TEST_APEX_LEGACY = "com.android.example-legacy.apex" |
| TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" |
| TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" |
| |
| TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") |
| TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") |
| TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") |
| TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") |
| TEST_MANIFEST_JSON = os.path.join("testdata", "manifest.json") |
| |
| def run(args, verbose=None, **kwargs): |
| """Creates and returns a subprocess.Popen object. |
| |
| Args: |
| args: The command represented as a list of strings. |
| verbose: Whether the commands should be shown. Default to the global |
| verbosity if unspecified. |
| kwargs: Any additional args to be passed to subprocess.Popen(), such as env, |
| stdin, etc. stdout and stderr will default to subprocess.PIPE and |
| subprocess.STDOUT respectively unless caller specifies any of them. |
| universal_newlines will default to True, as most of the users in |
| releasetools expect string output. |
| |
| Returns: |
| A subprocess.Popen object. |
| """ |
| if 'stdout' not in kwargs and 'stderr' not in kwargs: |
| kwargs['stdout'] = subprocess.PIPE |
| kwargs['stderr'] = subprocess.STDOUT |
| if 'universal_newlines' not in kwargs: |
| kwargs['universal_newlines'] = True |
| # Don't log any if caller explicitly says so. |
| if DEBUG_TEST: |
| print("\nRunning: \n%s\n" % " ".join(args)) |
| if verbose: |
| logger.info(" Running: \"%s\"", " ".join(args)) |
| return subprocess.Popen(args, **kwargs) |
| |
| |
| def run_host_command(args, verbose=None, **kwargs): |
| host_build_top = os.environ.get("ANDROID_BUILD_TOP") |
| if host_build_top: |
| host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin") |
| args[0] = os.path.join(host_command_dir, args[0]) |
| return run_and_check_output(args, verbose, **kwargs) |
| |
| |
| def run_and_check_output(args, verbose=None, **kwargs): |
| """Runs the given command and returns the output. |
| |
| Args: |
| args: The command represented as a list of strings. |
| verbose: Whether the commands should be shown. Default to the global |
| verbosity if unspecified. |
| kwargs: Any additional args to be passed to subprocess.Popen(), such as env, |
| stdin, etc. stdout and stderr will default to subprocess.PIPE and |
| subprocess.STDOUT respectively unless caller specifies any of them. |
| |
| Returns: |
| The output string. |
| |
| Raises: |
| ExternalError: On non-zero exit from the command. |
| """ |
| proc = run(args, verbose=verbose, **kwargs) |
| output, _ = proc.communicate() |
| if output is None: |
| output = "" |
| # Don't log any if caller explicitly says so. |
| if verbose: |
| logger.info("%s", output.rstrip()) |
| if proc.returncode != 0: |
| raise RuntimeError( |
| "Failed to run command '{}' (exit code {}):\n{}".format( |
| args, proc.returncode, output)) |
| return output |
| |
| |
| def get_sha1sum(file_path): |
| h = hashlib.sha256() |
| |
| with open(file_path, 'rb') as file: |
| while True: |
| # Reading is buffered, so we can read smaller chunks. |
| chunk = file.read(h.block_size) |
| if not chunk: |
| break |
| h.update(chunk) |
| |
| return h.hexdigest() |
| |
| |
| def round_up(size, unit): |
| assert unit & (unit - 1) == 0 |
| return (size + unit - 1) & (~(unit - 1)) |
| |
| # In order to debug test failures, set DEBUG_TEST to True and run the test from |
| # local workstation bypassing atest, e.g.: |
| # $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test |
| # |
| # the test will print out the command used, and the temporary files used by the |
| # test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with |
| # /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are |
| # different. |
| # A simple script to analyze the differences: |
| # |
| # FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex |
| # FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex |
| # |
| # cd ~/tmp/ |
| # rm -rf input output |
| # mkdir input output |
| # unzip ${FILE_INPUT} -d input/ |
| # unzip ${FILE_OUTPUT} -d output/ |
| # |
| # diff -r input/ output/ |
| # |
| # For analyzing binary diffs I had mild success using the vbindiff utility. |
| DEBUG_TEST = False |
| |
| |
| class ApexerRebuildTest(unittest.TestCase): |
| def setUp(self): |
| self._to_cleanup = [] |
| self._get_host_tools() |
| |
| def tearDown(self): |
| if not DEBUG_TEST: |
| for i in self._to_cleanup: |
| if os.path.isdir(i): |
| shutil.rmtree(i, ignore_errors=True) |
| else: |
| os.remove(i) |
| del self._to_cleanup[:] |
| else: |
| print(self._to_cleanup) |
| |
| def _get_host_tools(self): |
| dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") |
| self._to_cleanup.append(dir_name) |
| with resources.files("apexer_test").joinpath("apexer_test_host_tools.zip").open('rb') as f: |
| with ZipFile(f, 'r') as zip_obj: |
| zip_obj.extractall(path=dir_name) |
| |
| files = {} |
| for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", |
| "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", |
| "signapk.jar", "android.jar", "blkid", "fsck.erofs", "conv_apex_manifest"]: |
| file_path = os.path.join(dir_name, "bin", i) |
| if os.path.exists(file_path): |
| os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR); |
| files[i] = file_path |
| else: |
| files[i] = i |
| self.host_tools = files |
| self.host_tools_path = os.path.join(dir_name, "bin") |
| |
| path = self.host_tools_path |
| if "PATH" in os.environ: |
| path += ":" + os.environ["PATH"] |
| os.environ["PATH"] = path |
| |
| ld_library_path = os.path.join(dir_name, "lib64") |
| if "LD_LIBRARY_PATH" in os.environ: |
| ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"] |
| if "ANDROID_HOST_OUT" in os.environ: |
| ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") |
| os.environ["LD_LIBRARY_PATH"] = ld_library_path |
| |
| def _extract_resource(self, resource_name): |
| with ( |
| resources.files("apexer_test").joinpath(resource_name).open('rb') as f, |
| tempfile.NamedTemporaryFile(prefix=resource_name.replace('/', '_'), delete=False) as f2, |
| ): |
| self._to_cleanup.append(f2.name) |
| shutil.copyfileobj(f, f2) |
| return f2.name |
| |
| def _get_container_files(self, apex_file_path): |
| dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") |
| self._to_cleanup.append(dir_name) |
| with ZipFile(apex_file_path, 'r') as zip_obj: |
| zip_obj.extractall(path=dir_name) |
| files = {} |
| for i in ["apex_manifest.json", "apex_manifest.pb", |
| "apex_build_info.pb", "assets", |
| "apex_payload.img", "apex_payload.zip"]: |
| file_path = os.path.join(dir_name, i) |
| if os.path.exists(file_path): |
| files[i] = file_path |
| self.assertIn("apex_manifest.pb", files) |
| self.assertIn("apex_build_info.pb", files) |
| |
| image_file = None |
| if "apex_payload.img" in files: |
| image_file = files["apex_payload.img"] |
| elif "apex_payload.zip" in files: |
| image_file = files["apex_payload.zip"] |
| self.assertIsNotNone(image_file) |
| files["apex_payload"] = image_file |
| |
| return files |
| |
| def _extract_payload_from_img(self, img_file_path): |
| dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") |
| self._to_cleanup.append(dir_name) |
| cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] |
| run_host_command(cmd) |
| |
| # Remove payload files added by apexer and e2fs tools. |
| for i in ["apex_manifest.json", "apex_manifest.pb"]: |
| if os.path.exists(os.path.join(dir_name, i)): |
| os.remove(os.path.join(dir_name, i)) |
| if os.path.isdir(os.path.join(dir_name, "lost+found")): |
| shutil.rmtree(os.path.join(dir_name, "lost+found")) |
| return dir_name |
| |
| def _extract_payload(self, apex_file_path): |
| dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") |
| self._to_cleanup.append(dir_name) |
| cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"], |
| "--blkid_path",self.host_tools["blkid"], "--fsckerofs_path", |
| self.host_tools["fsck.erofs"], "extract", apex_file_path, dir_name] |
| run_host_command(cmd) |
| |
| # Remove payload files added by apexer and e2fs tools. |
| for i in ["apex_manifest.json", "apex_manifest.pb"]: |
| if os.path.exists(os.path.join(dir_name, i)): |
| os.remove(os.path.join(dir_name, i)) |
| if os.path.isdir(os.path.join(dir_name, "lost+found")): |
| shutil.rmtree(os.path.join(dir_name, "lost+found")) |
| return dir_name |
| |
| def _run_apexer(self, container_files, payload_dir, args=[]): |
| unsigned_payload_only = False |
| payload_only = False |
| if "--unsigned_payload_only" in args: |
| unsigned_payload_only = True |
| if unsigned_payload_only or "--payload_only" in args: |
| payload_only = True |
| |
| os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path + |
| ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") |
| cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] |
| if DEBUG_TEST: |
| cmd.append('-v') |
| cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]]) |
| cmd.extend(["--android_jar_path", self.host_tools["android.jar"]]) |
| cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) |
| if "apex_manifest.json" in container_files: |
| cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) |
| cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) |
| if not payload_only and "assets" in container_files: |
| cmd.extend(["--assets_dir", container_files["assets"]]) |
| if not unsigned_payload_only: |
| cmd.extend(["--key", self._extract_resource(TEST_PRIVATE_KEY)]) |
| cmd.extend(["--pubkey", self._extract_resource(TEST_AVB_PUBLIC_KEY)]) |
| cmd.extend(args) |
| |
| # Decide on output file name |
| apex_suffix = ".apex.unsigned" |
| if payload_only: |
| apex_suffix = ".payload" |
| fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) |
| os.close(fd) |
| self._to_cleanup.append(fn) |
| cmd.extend([payload_dir, fn]) |
| |
| run_host_command(cmd) |
| return fn |
| |
| def _get_java_toolchain(self): |
| java_toolchain = "java" |
| if os.path.isfile("prebuilts/jdk/jdk17/linux-x86/bin/java"): |
| java_toolchain = "prebuilts/jdk/jdk17/linux-x86/bin/java" |
| elif os.path.isfile("/jdk/jdk17/linux-x86/bin/java"): |
| java_toolchain = "/jdk/jdk17/linux-x86/bin/java" |
| elif "ANDROID_JAVA_TOOLCHAIN" in os.environ: |
| java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java") |
| elif "ANDROID_JAVA_HOME" in os.environ: |
| java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java") |
| elif "JAVA_HOME" in os.environ: |
| java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java") |
| |
| java_dep_lib = os.environ["LD_LIBRARY_PATH"] |
| if "ANDROID_HOST_OUT" in os.environ: |
| java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") |
| if "ANDROID_BUILD_TOP" in os.environ: |
| java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"], |
| "out/host/linux-x86/lib64") |
| |
| return [java_toolchain, java_dep_lib] |
| |
| def _sign_apk_container(self, unsigned_apex): |
| fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") |
| os.close(fd) |
| self._to_cleanup.append(fn) |
| java_toolchain, java_dep_lib = self._get_java_toolchain() |
| cmd = [ |
| java_toolchain, |
| "-Djava.library.path=" + java_dep_lib, |
| "-jar", self.host_tools['signapk.jar'], |
| "-a", "4096", "--align-file-size", |
| self._extract_resource(TEST_X509_KEY), |
| self._extract_resource(TEST_PK8_KEY), |
| unsigned_apex, fn] |
| run_and_check_output(cmd) |
| return fn |
| |
| def _sign_payload(self, container_files, unsigned_payload): |
| fd, signed_payload = \ |
| tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") |
| os.close(fd) |
| self._to_cleanup.append(signed_payload) |
| shutil.copyfile(unsigned_payload, signed_payload) |
| |
| cmd = ['avbtool'] |
| cmd.append('add_hashtree_footer') |
| cmd.append('--do_not_generate_fec') |
| cmd.extend(['--algorithm', 'SHA256_RSA4096']) |
| cmd.extend(['--hash_algorithm', 'sha256']) |
| cmd.extend(['--key', self._extract_resource(TEST_PRIVATE_KEY)]) |
| manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"]) |
| ValidateApexManifest(manifest_apex) |
| cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) |
| # Set up the salt based on manifest content which includes name |
| # and version |
| salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() |
| cmd.extend(['--salt', salt]) |
| cmd.extend(['--image', signed_payload]) |
| cmd.append('--no_hashtree') |
| run_and_check_output(cmd) |
| |
| return signed_payload |
| |
| def _verify_payload(self, payload): |
| """Verifies that the payload is properly signed by avbtool""" |
| cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] |
| run_and_check_output(cmd) |
| |
| def _run_build_test(self, apex_name): |
| apex_file_path = self._extract_resource(apex_name + ".apex") |
| if DEBUG_TEST: |
| fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") |
| os.close(fd) |
| shutil.copyfile(apex_file_path, fn) |
| self._to_cleanup.append(fn) |
| container_files = self._get_container_files(apex_file_path) |
| payload_dir = self._extract_payload(apex_file_path) |
| repack_apex_file_path = self._run_apexer(container_files, payload_dir) |
| resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) |
| self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) |
| |
| def test_simple_apex(self): |
| self._run_build_test(TEST_APEX) |
| |
| def test_legacy_apex(self): |
| self._run_build_test(TEST_APEX_LEGACY) |
| |
| def test_output_payload_only(self): |
| """Assert that payload-only output from apexer is same as the payload we get by unzipping |
| apex. |
| """ |
| apex_file_path = self._extract_resource(TEST_APEX + ".apex") |
| container_files = self._get_container_files(apex_file_path) |
| payload_dir = self._extract_payload(apex_file_path) |
| payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) |
| self._verify_payload(payload_only_file_path) |
| self.assertEqual(get_sha1sum(payload_only_file_path), |
| get_sha1sum(container_files["apex_payload"])) |
| |
| def test_output_unsigned_payload_only(self): |
| """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is |
| same as the payload we get by unzipping apex. |
| """ |
| apex_file_path = self._extract_resource(TEST_APEX + ".apex") |
| container_files = self._get_container_files(apex_file_path) |
| payload_dir = self._extract_payload(apex_file_path) |
| unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, |
| ["--unsigned_payload_only"]) |
| with self.assertRaises(RuntimeError) as error: |
| self._verify_payload(unsigned_payload_only_file_path) |
| self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) |
| signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) |
| self.assertEqual(get_sha1sum(signed_payload), |
| get_sha1sum(container_files["apex_payload"])) |
| |
| # Now assert that given an unsigned image and the original container |
| # files, we can produce an identical unsigned image. |
| unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path) |
| unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir, |
| ["--unsigned_payload_only"]) |
| self.assertEqual(get_sha1sum(unsigned_payload_only_file_path), |
| get_sha1sum(unsigned_payload_only_2_file_path)) |
| |
| def test_apex_with_logging_parent(self): |
| self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) |
| |
| def test_apex_with_overridden_package_name(self): |
| self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) |
| |
| def test_conv_apex_manifest(self): |
| # .pb generation from json |
| manifest_json_path = self._extract_resource(TEST_MANIFEST_JSON) |
| |
| fd, fn = tempfile.mkstemp(prefix=self._testMethodName + "_manifest_", suffix=".pb") |
| os.close(fd) |
| self._to_cleanup.append(fn) |
| cmd = [ |
| "conv_apex_manifest", |
| "proto", |
| manifest_json_path, |
| "-o", fn] |
| run_and_check_output(cmd) |
| |
| with open(manifest_json_path) as fd_json: |
| manifest_json = json.load(fd_json) |
| manifest_apex = ParseApexManifest(fn) |
| ValidateApexManifest(manifest_apex) |
| |
| self.assertEqual(manifest_apex.name, manifest_json["name"]) |
| self.assertEqual(manifest_apex.version, manifest_json["version"]) |
| |
| # setprop check on already generated .pb |
| next_version = 20 |
| cmd = [ |
| "conv_apex_manifest", |
| "setprop", |
| "version", str(next_version), |
| fn] |
| run_and_check_output(cmd) |
| |
| manifest_apex = ParseApexManifest(fn) |
| ValidateApexManifest(manifest_apex) |
| |
| self.assertEqual(manifest_apex.version, next_version) |
| |
| |
| |
| if __name__ == '__main__': |
| unittest.main(verbosity=2) |