blob: 176b994c618f2fd98c660e25160a111eb5e9279f [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for apexer."""
import hashlib
import logging
import os
import re
import shutil
import subprocess
import tempfile
import unittest
from zipfile import ZipFile
from apex_manifest import ValidateApexManifest
logger = logging.getLogger(__name__)
TEST_APEX = "com.android.example.apex"
TEST_APEX_LEGACY = "com.android.example-legacy.apex"
TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
def run(args, verbose=None, **kwargs):
"""Creates and returns a subprocess.Popen object.
Args:
args: The command represented as a list of strings.
verbose: Whether the commands should be shown. Default to the global
verbosity if unspecified.
kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
stdin, etc. stdout and stderr will default to subprocess.PIPE and
subprocess.STDOUT respectively unless caller specifies any of them.
universal_newlines will default to True, as most of the users in
releasetools expect string output.
Returns:
A subprocess.Popen object.
"""
if 'stdout' not in kwargs and 'stderr' not in kwargs:
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
if 'universal_newlines' not in kwargs:
kwargs['universal_newlines'] = True
# Don't log any if caller explicitly says so.
if DEBUG_TEST:
print("\nRunning: \n%s\n" % " ".join(args))
if verbose:
logger.info(" Running: \"%s\"", " ".join(args))
return subprocess.Popen(args, **kwargs)
def run_host_command(args, verbose=None, **kwargs):
host_build_top = os.environ.get("ANDROID_BUILD_TOP")
if host_build_top:
host_command_dir = os.path.join(host_build_top, "out/soong/host/linux-x86/bin")
args[0] = os.path.join(host_command_dir, args[0])
return run_and_check_output(args, verbose, **kwargs)
def run_and_check_output(args, verbose=None, **kwargs):
"""Runs the given command and returns the output.
Args:
args: The command represented as a list of strings.
verbose: Whether the commands should be shown. Default to the global
verbosity if unspecified.
kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
stdin, etc. stdout and stderr will default to subprocess.PIPE and
subprocess.STDOUT respectively unless caller specifies any of them.
Returns:
The output string.
Raises:
ExternalError: On non-zero exit from the command.
"""
proc = run(args, verbose=verbose, **kwargs)
output, _ = proc.communicate()
if output is None:
output = ""
# Don't log any if caller explicitly says so.
if verbose:
logger.info("%s", output.rstrip())
if proc.returncode != 0:
raise RuntimeError(
"Failed to run command '{}' (exit code {}):\n{}".format(
args, proc.returncode, output))
return output
def get_sha1sum(file_path):
h = hashlib.sha256()
with open(file_path, 'rb') as file:
while True:
# Reading is buffered, so we can read smaller chunks.
chunk = file.read(h.block_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def get_current_dir():
"""Returns the current dir, relative to the script dir."""
# The script dir is the one we want, which could be different from pwd.
current_dir = os.path.dirname(os.path.realpath(__file__))
return current_dir
def round_up(size, unit):
assert unit & (unit - 1) == 0
return (size + unit - 1) & (~(unit - 1))
# In order to debug test failures, set DEBUG_TEST to True and run the test from
# local workstation bypassing atest, e.g.:
# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
#
# the test will print out the command used, and the temporary files used by the
# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
# different.
# A simple script to analyze the differences:
#
# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
#
# cd ~/tmp/
# rm -rf input output
# mkdir input output
# unzip ${FILE_INPUT} -d input/
# unzip ${FILE_OUTPUT} -d output/
#
# diff -r input/ output/
#
# For analyzing binary diffs I had mild success using the vbindiff utility.
DEBUG_TEST = False
class ApexerRebuildTest(unittest.TestCase):
def setUp(self):
self._to_cleanup = []
def tearDown(self):
if not DEBUG_TEST:
for i in self._to_cleanup:
if os.path.isdir(i):
shutil.rmtree(i, ignore_errors=True)
else:
os.remove(i)
del self._to_cleanup[:]
else:
print(self._to_cleanup)
def _get_container_files(self, apex_file_path):
dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
self._to_cleanup.append(dir_name)
with ZipFile(apex_file_path, 'r') as zip_obj:
zip_obj.extractall(path=dir_name)
files = {}
for i in ["apex_manifest.json", "apex_manifest.pb",
"apex_build_info.pb", "assets",
"apex_payload.img", "apex_payload.zip"]:
file_path = os.path.join(dir_name, i)
if os.path.exists(file_path):
files[i] = file_path
self.assertIn("apex_manifest.pb", files)
self.assertIn("apex_build_info.pb", files)
image_file = None
if "apex_payload.img" in files:
image_file = files["apex_payload.img"]
elif "apex_payload.zip" in files:
image_file = files["apex_payload.zip"]
self.assertIsNotNone(image_file)
files["apex_payload"] = image_file
return files
def _extract_payload_from_img(self, img_file_path):
dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
self._to_cleanup.append(dir_name)
cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
run_host_command(cmd)
# Remove payload files added by apexer and e2fs tools.
for i in ["apex_manifest.json", "apex_manifest.pb"]:
if os.path.exists(os.path.join(dir_name, i)):
os.remove(os.path.join(dir_name, i))
if os.path.isdir(os.path.join(dir_name, "lost+found")):
shutil.rmtree(os.path.join(dir_name, "lost+found"))
return dir_name
def _extract_payload(self, apex_file_path):
dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
self._to_cleanup.append(dir_name)
cmd = ["deapexer", "extract", apex_file_path, dir_name]
run_host_command(cmd)
# Remove payload files added by apexer and e2fs tools.
for i in ["apex_manifest.json", "apex_manifest.pb"]:
if os.path.exists(os.path.join(dir_name, i)):
os.remove(os.path.join(dir_name, i))
if os.path.isdir(os.path.join(dir_name, "lost+found")):
shutil.rmtree(os.path.join(dir_name, "lost+found"))
return dir_name
def _run_apexer(self, container_files, payload_dir, args=[]):
unsigned_payload_only = False
payload_only = False
if "--unsigned_payload_only" in args:
unsigned_payload_only = True
if unsigned_payload_only or "--payload_only" in args:
payload_only = True
os.environ["APEXER_TOOL_PATH"] = (
"out/soong/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
if "apex_manifest.json" in container_files:
cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
if not payload_only and "assets" in container_files:
cmd.extend(["--assets_dir", "assets"])
if not unsigned_payload_only:
cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
cmd.extend(args)
# Decide on output file name
apex_suffix = ".apex.unsigned"
if payload_only:
apex_suffix = ".payload"
fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
os.close(fd)
self._to_cleanup.append(fn)
cmd.extend([payload_dir, fn])
run_host_command(cmd)
return fn
def _sign_apk_container(self, unsigned_apex):
fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
os.close(fd)
self._to_cleanup.append(fn)
cmd = [
"prebuilts/jdk/jdk11/linux-x86/bin/java",
"-Djava.library.path=out/soong/host/linux-x86/lib64",
"-jar", "out/soong/host/linux-x86/framework/signapk.jar",
"-a", "4096",
os.path.join(get_current_dir(), TEST_X509_KEY),
os.path.join(get_current_dir(), TEST_PK8_KEY),
unsigned_apex, fn]
run_and_check_output(cmd)
return fn
def _sign_payload(self, container_files, unsigned_payload):
fd, signed_payload = \
tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
os.close(fd)
self._to_cleanup.append(signed_payload)
shutil.copyfile(unsigned_payload, signed_payload)
cmd = ['avbtool']
cmd.append('add_hashtree_footer')
cmd.append('--do_not_generate_fec')
cmd.extend(['--algorithm', 'SHA256_RSA4096'])
cmd.extend(['--hash_algorithm', 'sha256'])
cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"])
cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
# Set up the salt based on manifest content which includes name
# and version
salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
cmd.extend(['--salt', salt])
cmd.extend(['--image', signed_payload])
cmd.append('--no_hashtree')
run_and_check_output(cmd)
return signed_payload
def _verify_payload(self, payload):
"""Verifies that the payload is properly signed by avbtool"""
cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
run_and_check_output(cmd)
def _run_build_test(self, apex_name):
apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex")
if DEBUG_TEST:
fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
os.close(fd)
shutil.copyfile(apex_file_path, fn)
self._to_cleanup.append(fn)
container_files = self._get_container_files(apex_file_path)
payload_dir = self._extract_payload(apex_file_path)
repack_apex_file_path = self._run_apexer(container_files, payload_dir)
resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
def test_simple_apex(self):
self._run_build_test(TEST_APEX)
def test_legacy_apex(self):
self._run_build_test(TEST_APEX_LEGACY)
def test_output_payload_only(self):
"""Assert that payload-only output from apexer is same as the payload we get by unzipping
apex.
"""
apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
container_files = self._get_container_files(apex_file_path)
payload_dir = self._extract_payload(apex_file_path)
payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
self._verify_payload(payload_only_file_path)
self.assertEqual(get_sha1sum(payload_only_file_path),
get_sha1sum(container_files["apex_payload"]))
def test_output_unsigned_payload_only(self):
"""Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
same as the payload we get by unzipping apex.
"""
apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
container_files = self._get_container_files(apex_file_path)
payload_dir = self._extract_payload(apex_file_path)
unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
["--unsigned_payload_only"])
with self.assertRaises(RuntimeError) as error:
self._verify_payload(unsigned_payload_only_file_path)
self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
self.assertEqual(get_sha1sum(signed_payload),
get_sha1sum(container_files["apex_payload"]))
# Now assert that given an unsigned image and the original container
# files, we can produce an identical unsigned image.
unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path)
unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir,
["--unsigned_payload_only"])
self.assertEqual(get_sha1sum(unsigned_payload_only_file_path),
get_sha1sum(unsigned_payload_only_2_file_path))
def test_apex_with_logging_parent(self):
self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
def test_apex_with_overridden_package_name(self):
self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
if __name__ == '__main__':
unittest.main(verbosity=2)