Add --unsigned_payload_only flag to apexer

The flag instructs apexer to output the payload before it is signed by
avbtool.

The CL also contains new test to ensure that when the unsigned payload
is signed by avbtool, it is same as the payload we get when we unzip an
apex.

Bug: 149993331
Test: atest --host apexer_test
Change-Id: I0c0c29ee2fabaa447f5844e71949a00778b1d935
diff --git a/apexer/Android.bp b/apexer/Android.bp
index eeb69a8..6a11936 100644
--- a/apexer/Android.bp
+++ b/apexer/Android.bp
@@ -130,6 +130,9 @@
         },
     },
     test_suites: ["general-tests"],
+    libs: [
+        "apex_manifest",
+    ],
     required: [
         "apexer",
         "deapexer"
diff --git a/apexer/apexer.py b/apexer/apexer.py
index bbd2d43..a8c92cc 100644
--- a/apexer/apexer.py
+++ b/apexer/apexer.py
@@ -159,6 +159,12 @@
       action='store_true',
       help='Outputs the payload image/zip only.'
   )
+  parser.add_argument(
+      '--unsigned_payload_only',
+      action='store_true',
+      help="""Outputs the unsigned payload image/zip only. Also, setting this flag implies 
+                                    --payload_only is set too."""
+  )
   return parser.parse_args(argv)
 
 
@@ -283,8 +289,11 @@
     print(args.output + ' already exists. Use --force to overwrite.')
     return False
 
+  if args.unsigned_payload_only:
+    args.payload_only = True;
+
   if args.payload_type == 'image':
-    if not args.key:
+    if not args.key and not args.unsigned_payload_only:
       print('Missing --key {keyfile} argument!')
       return False
 
@@ -437,9 +446,10 @@
     copyfile(args.manifest_json, os.path.join(manifests_dir, 'apex_manifest.json'))
 
   if args.payload_type == 'image':
-    key_name = os.path.basename(os.path.splitext(args.key)[0])
-    if args.do_not_check_keyname:
+    if args.do_not_check_keyname or args.unsigned_payload_only:
       key_name = manifest_apex.name
+    else:
+      key_name = os.path.basename(os.path.splitext(args.key)[0])
 
     if manifest_apex.name != key_name:
       print("package name '" + manifest_apex.name +
@@ -503,6 +513,12 @@
     cmd.append(img_file)
     RunCommand(cmd, args.verbose, {'E2FSPROGS_FAKE_TIME': '1'})
 
+    if args.unsigned_payload_only:
+      shutil.copyfile(img_file, args.output)
+      if (args.verbose):
+        print('Created (unsigned payload only) ' + args.output)
+      return True
+
     cmd = ['avbtool']
     cmd.append('add_hashtree_footer')
     cmd.append('--do_not_generate_fec')
diff --git a/apexer/apexer_test.py b/apexer/apexer_test.py
index 282216a..89a3980 100644
--- a/apexer/apexer_test.py
+++ b/apexer/apexer_test.py
@@ -19,13 +19,15 @@
 import hashlib
 import logging
 import os
+import re
 import shutil
 import subprocess
 import tempfile
 import unittest
-
 from zipfile import ZipFile
 
+from apex_manifest import ValidateApexManifest
+
 logger = logging.getLogger(__name__)
 
 TEST_APEX = "com.android.example.apex"
@@ -125,6 +127,9 @@
     current_dir = os.path.dirname(os.path.realpath(__file__))
     return current_dir
 
+def round_up(size, unit):
+    assert unit & (unit - 1) == 0
+    return (size + unit - 1) & (~(unit - 1))
 
 # In order to debug test failures, set DEBUG_TEST to True and run the test from
 # local workstation bypassing atest, e.g.:
@@ -207,6 +212,13 @@
         return dir_name
 
     def _run_apexer(self, container_files, payload_dir, args=[]):
+        unsigned_payload_only = False
+        payload_only = False
+        if "--unsigned_payload_only" in args:
+            unsigned_payload_only = True
+        if unsigned_payload_only or "--payload_only" in args:
+            payload_only = True
+
         os.environ["APEXER_TOOL_PATH"] = (
             "out/soong/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
         cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
@@ -214,17 +226,17 @@
         if "apex_manifest.json" in container_files:
             cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
         cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
-        if "assets" in container_files:
+        if not payload_only and "assets" in container_files:
             cmd.extend(["--assets_dir", "assets"])
-        cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
-        cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
+        if not unsigned_payload_only:
+            cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
+            cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
         cmd.extend(args)
 
         # Decide on output file name
         apex_suffix = ".apex.unsigned"
-        if "--payload-only" in args:
-            apex_suffix = ".img"
-
+        if payload_only:
+            apex_suffix = ".payload"
         fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
         os.close(fd)
         self._to_cleanup.append(fn)
@@ -248,6 +260,35 @@
         run_and_check_output(cmd)
         return fn
 
+    def _sign_payload(self, container_files, unsigned_payload):
+        fd, signed_payload = \
+            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
+        os.close(fd)
+        self._to_cleanup.append(signed_payload)
+        shutil.copyfile(unsigned_payload, signed_payload)
+
+        cmd = ['avbtool']
+        cmd.append('add_hashtree_footer')
+        cmd.append('--do_not_generate_fec')
+        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
+        cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
+        manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"])
+        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
+        # Set up the salt based on manifest content which includes name
+        # and version
+        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
+        cmd.extend(['--salt', salt])
+        cmd.extend(['--image', signed_payload])
+        cmd.append('--no_hashtree')
+        run_and_check_output(cmd)
+
+        return signed_payload
+
+    def _verify_payload(self, payload):
+        """Verifies that the payload is properly signed by avbtool"""
+        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
+        run_and_check_output(cmd)
+
     def _run_build_test(self, apex_name):
         apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex")
         if DEBUG_TEST:
@@ -267,15 +308,34 @@
     def test_legacy_apex(self):
         self._run_build_test(TEST_APEX_LEGACY)
 
-    # Assert that payload-only output from apexer is same as the payload we get by unzipping apex.
     def test_output_payload_only(self):
+        """Assert that payload-only output from apexer is same as the payload we get by unzipping
+        apex.
+        """
         apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
         container_files = self._get_container_files(apex_file_path)
         payload_dir = self._extract_payload(apex_file_path)
         payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
+        self._verify_payload(payload_only_file_path)
         self.assertEqual(get_sha1sum(payload_only_file_path),
                          get_sha1sum(container_files["apex_payload"]))
 
+    def test_output_unsigned_payload_only(self):
+        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
+        same as the payload we get by unzipping apex.
+        """
+        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
+        container_files = self._get_container_files(apex_file_path)
+        payload_dir = self._extract_payload(apex_file_path)
+        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
+                                                  ["--unsigned_payload_only"])
+        with self.assertRaises(RuntimeError) as error:
+            self._verify_payload(unsigned_payload_only_file_path)
+        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
+        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
+        self.assertEqual(get_sha1sum(signed_payload),
+                         get_sha1sum(container_files["apex_payload"]))
+
 
 if __name__ == '__main__':
     unittest.main(verbosity=2)