Merge "releasetools: Validate A/B OTA payload signatures."
diff --git a/tools/releasetools/check_ota_package_signature.py b/tools/releasetools/check_ota_package_signature.py
index 548b619..1f8b7bb 100755
--- a/tools/releasetools/check_ota_package_signature.py
+++ b/tools/releasetools/check_ota_package_signature.py
@@ -25,12 +25,19 @@
 import re
 import subprocess
 import sys
+import tempfile
+import zipfile
 
 from hashlib import sha1
 from hashlib import sha256
 
+# 'update_payload' package is under 'system/update_engine/scripts/', which
+# should to be included in PYTHONPATH.
+from update_payload.payload import Payload
+from update_payload.update_metadata_pb2 import Signatures
 
-def cert_uses_sha256(cert):
+
+def CertUsesSha256(cert):
   """Check if the cert uses SHA-256 hashing algorithm."""
 
   cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert]
@@ -46,7 +53,7 @@
   return algorithm.group(1).startswith('sha256')
 
 
-def verify_package(cert, package):
+def VerifyPackage(cert, package):
   """Verify the given package with the certificate.
 
   (Comments from bootable/recovery/verifier.cpp:)
@@ -90,7 +97,7 @@
   print('Signed data length: %d' % (signed_len,))
   print('Signature start: %d' % (signature_start,))
 
-  use_sha256 = cert_uses_sha256(cert)
+  use_sha256 = CertUsesSha256(cert)
   print('Use SHA-256: %s' % (use_sha256,))
 
   if use_sha256:
@@ -100,7 +107,7 @@
   h.update(package_bytes[:signed_len])
   package_digest = h.hexdigest().lower()
 
-  print('Digest: %s\n' % (package_digest,))
+  print('Digest: %s' % (package_digest,))
 
   # Get the signature from the input package.
   signature = package_bytes[signature_start:-6]
@@ -141,7 +148,87 @@
   assert package_digest == digest_string, "Verification failed."
 
   # Verified successfully upon reaching here.
-  print('VERIFIED\n')
+  print('\nWhole package signature VERIFIED\n')
+
+
+def VerifyAbOtaPayload(cert, package):
+  """Verifies the payload and metadata signatures in an A/B OTA payload."""
+
+  def VerifySignatureBlob(hash_file, blob):
+    """Verifies the input hash_file against the signature blob."""
+    signatures = Signatures()
+    signatures.ParseFromString(blob)
+
+    extracted_sig_file = common.MakeTempFile(
+        prefix='extracted-sig-', suffix='.bin')
+    # In Android, we only expect one signature.
+    assert len(signatures.signatures) == 1, \
+        'Invalid number of signatures: %d' % len(signatures.signatures)
+    signature = signatures.signatures[0]
+    length = len(signature.data)
+    assert length == 256, 'Invalid signature length %d' % (length,)
+    with open(extracted_sig_file, 'w') as f:
+      f.write(signature.data)
+
+    # Verify the signature file extracted from the payload, by reversing the
+    # signing operation. Alternatively, this can be done by calling 'openssl
+    # rsautl -verify -certin -inkey <cert.pem> -in <extracted_sig_file> -out
+    # <output>', then to assert that
+    # <output> == SHA-256 DigestInfo prefix || <hash_file>.
+    cmd = ['openssl', 'pkeyutl', '-verify', '-certin', '-inkey', cert,
+           '-pkeyopt', 'digest:sha256', '-in', hash_file,
+           '-sigfile', extracted_sig_file]
+    p = common.Run(cmd, stdout=subprocess.PIPE)
+    result, _ = p.communicate()
+
+    # https://github.com/openssl/openssl/pull/3213
+    # 'openssl pkeyutl -verify' (prior to 1.1.0) returns non-zero return code,
+    # even on successful verification. To avoid the false alarm with older
+    # openssl, check the output directly.
+    assert result.strip() == 'Signature Verified Successfully', result.strip()
+
+  package_zip = zipfile.ZipFile(package, 'r')
+  if 'payload.bin' not in package_zip.namelist():
+    common.ZipClose(package_zip)
+    return
+
+  print('Verifying A/B OTA payload signatures...')
+
+  package_dir = tempfile.mkdtemp(prefix='package-')
+  common.OPTIONS.tempfiles.append(package_dir)
+
+  payload_file = package_zip.extract('payload.bin', package_dir)
+  payload = Payload(open(payload_file, 'rb'))
+  payload.Init()
+
+  # Extract the payload hash and metadata hash from the payload.bin.
+  payload_hash_file = common.MakeTempFile(prefix='hash-', suffix='.bin')
+  metadata_hash_file = common.MakeTempFile(prefix='hash-', suffix='.bin')
+  cmd = ['brillo_update_payload', 'hash',
+         '--unsigned_payload', payload_file,
+         '--signature_size', '256',
+         '--metadata_hash_file', metadata_hash_file,
+         '--payload_hash_file', payload_hash_file]
+  p = common.Run(cmd, stdout=subprocess.PIPE)
+  p.communicate()
+  assert p.returncode == 0, 'brillo_update_payload hash failed'
+
+  # Payload signature verification.
+  assert payload.manifest.HasField('signatures_offset')
+  payload_signature = payload.ReadDataBlob(
+      payload.manifest.signatures_offset, payload.manifest.signatures_size)
+  VerifySignatureBlob(payload_hash_file, payload_signature)
+
+  # Metadata signature verification.
+  metadata_signature = payload.ReadDataBlob(
+      -payload.header.metadata_signature_len,
+      payload.header.metadata_signature_len)
+  VerifySignatureBlob(metadata_hash_file, metadata_signature)
+
+  common.ZipClose(package_zip)
+
+  # Verified successfully upon reaching here.
+  print('\nPayload signatures VERIFIED\n\n')
 
 
 def main():
@@ -150,7 +237,8 @@
   parser.add_argument('package', help='The OTA package to be verified.')
   args = parser.parse_args()
 
-  verify_package(args.certificate, args.package)
+  VerifyPackage(args.certificate, args.package)
+  VerifyAbOtaPayload(args.certificate, args.package)
 
 
 if __name__ == '__main__':
@@ -159,3 +247,5 @@
   except AssertionError as err:
     print('\n    ERROR: %s\n' % (err,))
     sys.exit(1)
+  finally:
+    common.Cleanup()