| #!/usr/bin/env python |
| |
| # Copyright 2016, The Android Open Source Project |
| # |
| # Permission is hereby granted, free of charge, to any person |
| # obtaining a copy of this software and associated documentation |
| # files (the "Software"), to deal in the Software without |
| # restriction, including without limitation the rights to use, copy, |
| # modify, merge, publish, distribute, sublicense, and/or sell copies |
| # of the Software, and to permit persons to whom the Software is |
| # furnished to do so, subject to the following conditions: |
| # |
| # The above copyright notice and this permission notice shall be |
| # included in all copies or substantial portions of the Software. |
| # |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS |
| # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
| # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
| # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| # SOFTWARE. |
| """Command-line tool for working with Brillo Verified Boot images.""" |
| |
| import argparse |
| import hashlib |
| import os |
| import struct |
| import subprocess |
| import sys |
| |
| import Crypto.PublicKey.RSA |
| |
| # Keep in sync with bvb_boot_image_header.h. |
| BVB_VERSION_MAJOR = 1 |
| BVB_VERSION_MINOR = 0 |
| |
| |
| class Algorithm(object): |
| """Contains details about an algorithm. |
| |
| See the bvb_boot_image_header.h file for more details about |
| algorithms. |
| |
| The constant |ALGORITHMS| is a dictionary from human-readable |
| names (e.g 'SHA256_RSA2048') to instances of this class. |
| |
| Attributes: |
| algorithm_type: Integer code corresponding to |BvbAlgorithmType|. |
| hash_num_bytes: Number of bytes used to store the hash. |
| signature_num_bytes: Number of bytes used to store the signature. |
| public_key_num_bytes: Number of bytes used to store the public key. |
| padding: Padding used for signature, if any. |
| """ |
| |
| def __init__(self, algorithm_type, hash_num_bytes, signature_num_bytes, |
| public_key_num_bytes, padding): |
| self.algorithm_type = algorithm_type |
| self.hash_num_bytes = hash_num_bytes |
| self.signature_num_bytes = signature_num_bytes |
| self.public_key_num_bytes = public_key_num_bytes |
| self.padding = padding |
| |
| # This must be kept in sync with bvb_verify.h. |
| ALGORITHMS = { |
| 'NONE': Algorithm( |
| algorithm_type=0, # BVB_ALGORITHM_TYPE_NONE |
| hash_num_bytes=0, |
| signature_num_bytes=0, |
| public_key_num_bytes=0, |
| padding=[]), |
| 'SHA256_RSA2048': Algorithm( |
| algorithm_type=1, # BVB_ALGORITHM_TYPE_SHA256_RSA2048 |
| hash_num_bytes=32, |
| signature_num_bytes=256, |
| public_key_num_bytes=8 + 2*2048/8, |
| padding=[ |
| # PKCS1-v1_5 padding |
| 0x00, 0x01] + [0xff]*202 + [0x00] + [ |
| # ASN.1 header |
| 0x30, 0x31, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, |
| 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01, 0x05, |
| 0x00, 0x04, 0x20, |
| ]), |
| 'SHA256_RSA4096': Algorithm( |
| algorithm_type=2, # BVB_ALGORITHM_TYPE_SHA256_RSA4096 |
| hash_num_bytes=32, |
| signature_num_bytes=512, |
| public_key_num_bytes=8 + 2*4096/8, |
| padding=[ |
| # PKCS1-v1_5 padding |
| 0x00, 0x01] + [0xff]*458 + [0x00] + [ |
| # ASN.1 header |
| 0x30, 0x31, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, |
| 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01, 0x05, |
| 0x00, 0x04, 0x20, |
| ]), |
| 'SHA256_RSA8192': Algorithm( |
| algorithm_type=3, # BVB_ALGORITHM_TYPE_SHA256_RSA8192 |
| hash_num_bytes=32, |
| signature_num_bytes=1024, |
| public_key_num_bytes=8 + 2*8192/8, |
| padding=[ |
| # PKCS1-v1_5 padding |
| 0x00, 0x01] + [0xff]*970 + [0x00] + [ |
| # ASN.1 header |
| 0x30, 0x31, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, |
| 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01, 0x05, |
| 0x00, 0x04, 0x20, |
| ]), |
| 'SHA512_RSA2048': Algorithm( |
| algorithm_type=4, # BVB_ALGORITHM_TYPE_SHA512_RSA2048 |
| hash_num_bytes=64, |
| signature_num_bytes=256, |
| public_key_num_bytes=8 + 2*2048/8, |
| padding=[ |
| # PKCS1-v1_5 padding |
| 0x00, 0x01] + [0xff]*170 + [0x00] + [ |
| # ASN.1 header |
| 0x30, 0x51, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, |
| 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03, 0x05, |
| 0x00, 0x04, 0x40 |
| ]), |
| 'SHA512_RSA4096': Algorithm( |
| algorithm_type=5, # BVB_ALGORITHM_TYPE_SHA512_RSA4096 |
| hash_num_bytes=64, |
| signature_num_bytes=512, |
| public_key_num_bytes=8 + 2*4096/8, |
| padding=[ |
| # PKCS1-v1_5 padding |
| 0x00, 0x01] + [0xff]*426 + [0x00] + [ |
| # ASN.1 header |
| 0x30, 0x51, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, |
| 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03, 0x05, |
| 0x00, 0x04, 0x40 |
| ]), |
| 'SHA512_RSA8192': Algorithm( |
| algorithm_type=6, # BVB_ALGORITHM_TYPE_SHA512_RSA8192 |
| hash_num_bytes=64, |
| signature_num_bytes=1024, |
| public_key_num_bytes=8 + 2*8192/8, |
| padding=[ |
| # PKCS1-v1_5 padding |
| 0x00, 0x01] + [0xff]*938 + [0x00] + [ |
| # ASN.1 header |
| 0x30, 0x51, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, |
| 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03, 0x05, |
| 0x00, 0x04, 0x40 |
| ]), |
| } |
| |
| |
| def round_to_multiple(number, size): |
| """Rounds a number up to nearest multiple of another number. |
| |
| Args: |
| number: The number to round up. |
| size: The multiple to round up to. |
| |
| Returns: |
| If |number| is a multiple of |size|, returns |number|, otherwise |
| returns |number| + |size|. |
| """ |
| remainder = number % size |
| if remainder == 0: |
| return number |
| return number + size - remainder |
| |
| |
| def round_to_pow2(number): |
| """Rounds a number up to the next power of 2. |
| |
| Args: |
| number: The number to round up. |
| |
| Returns: |
| If |number| is already a power of 2 then |number| is |
| returned. Otherwise the smallest power of 2 greater than |number| |
| is returned. |
| """ |
| return 2**((number - 1).bit_length()) |
| |
| |
| def write_long(output, num_bits, value): |
| """Writes a long to an output stream using a given amount of bits. |
| |
| This number is written big-endian, e.g. with the most significant |
| bit first. |
| |
| Arguments: |
| output: The object to write the output to. |
| num_bits: The number of bits to write, e.g. 2048. |
| value: The value to write. |
| """ |
| for bit_pos in range(num_bits, 0, -8): |
| octet = (value >> (bit_pos - 8)) & 0xff |
| output.write(struct.pack('!B', octet)) |
| |
| |
| def egcd(a, b): |
| """Calculate greatest common divisor of two numbers. |
| |
| This implementation uses a recursive version of the extended |
| Euclidian algorithm. |
| |
| Arguments: |
| a: First number. |
| b: Second number. |
| |
| Returns: |
| A tuple (gcd, x, y) that where |gcd| is the greatest common |
| divisor of |a| and |b| and |a|*|x| + |b|*|y| = |gcd|. |
| """ |
| if a == 0: |
| return (b, 0, 1) |
| else: |
| g, y, x = egcd(b % a, a) |
| return (g, x - (b // a) * y, y) |
| |
| |
| def modinv(a, m): |
| """Calculate modular multiplicative inverse of |a| modulo |m|. |
| |
| This calculates the number |x| such that |a| * |x| == 1 (modulo |
| |m|). This number only exists if |a| and |m| are co-prime - |None| |
| is returned if this isn't true. |
| |
| Arguments: |
| a: The number to calculate a modular inverse of. |
| m: The modulo to use. |
| |
| Returns: |
| The modular multiplicative inverse of |a| and |m| or |None| if |
| these numbers are not co-prime. |
| """ |
| gcd, x, _ = egcd(a, m) |
| if gcd != 1: |
| return None # modular inverse does not exist |
| else: |
| return x % m |
| |
| |
| def parse_number(string): |
| """Parse a string as a number. |
| |
| This is just a short-hand for int(string, 0) suitable for use in the |
| |type| parameter of |ArgumentParser|'s add_argument() function. An |
| improvement to just using type=int is that this function supports |
| numbers in other bases, e.g. "0x1234". |
| |
| Arguments: |
| string: The string to parse. |
| |
| Returns: |
| The parsed integer. |
| |
| Raises: |
| ValueError: If the number could not be parsed. |
| """ |
| return int(string, 0) |
| |
| |
| def write_rsa_key(output, key): |
| """Writes a public RSA key in |BvBRSAPublicKeyHeader| format. |
| |
| This writes the |BvBRSAPublicKeyHeader| as well as the two large |
| numbers (|key_num_bits| bits long) following it. |
| |
| Arguments: |
| output: The object to write the output to. |
| key: A Crypto.PublicKey.RSA object. |
| """ |
| # key.e is exponent |
| # key.n is modulus |
| key_num_bits = key.size() + 1 |
| # Calculate n0inv = -1/n[0] (mod 2^32) |
| b = 2L**32 |
| n0inv = b - modinv(key.n, b) |
| # Calculate rr = r^2 (mod N), where r = 2^(# of key bits) |
| r = 2L**key.n.bit_length() |
| rrmodn = r * r % key.n |
| output.write(struct.pack('!II', key_num_bits, n0inv)) |
| write_long(output, key_num_bits, key.n) |
| write_long(output, key_num_bits, rrmodn) |
| |
| |
| def lookup_algorithm_by_type(alg_type): |
| """Looks up algorithm by type. |
| |
| Arguments: |
| alg_type: The integer representing the type. |
| |
| Returns: |
| A tuple with the algorithm name and an |Algorithm| instance. |
| |
| Raises: |
| Exception: If the algorithm cannot be found |
| """ |
| for alg_name in ALGORITHMS: |
| alg_data = ALGORITHMS[alg_name] |
| if alg_data.algorithm_type == alg_type: |
| return (alg_name, alg_data) |
| raise Exception('Unknown algorithm type %d' % alg_type) |
| |
| |
| def add_property(encoded_props, key, value): |
| """Helper function to add a key/value pair to a bytearray. |
| |
| The encoding specified in the |BvbPropertyHeader| in |
| bvb_boot_image_header.h is used. |
| |
| Arguments: |
| encoded_props: A bytearray to append to. |
| key: The key to write. |
| value: The value to write. |
| """ |
| encoded_props.extend(struct.pack('!QQ', len(key), len(value))) |
| encoded_props.extend(key) |
| encoded_props.append(0) |
| encoded_props.extend(value) |
| encoded_props.append(0) |
| num_bytes = 2 * 8 + len(key) + len(value) + 2 |
| padding_bytes = (8 - num_bytes) & 7 |
| for _ in range(padding_bytes): |
| encoded_props.append(0) |
| |
| |
| class BvbIntegrityFooter(object): |
| """A class for parsing and writing Integrity Footers. |
| |
| Integrity footers are stored at the end of filesystems where |
| dm-verity hashes have been added with the 'add_image_hashes' |
| command. |
| |
| Attributes: |
| magic: Magic for identifying the footer, see |MAGIC|. |
| version_major: The major version of bvbtool that wrote the footer. |
| version_minor: The minor version of bvbtool that wrote the footer. |
| dm_verity_version: dm-verity version used. |
| image_size: Size of the image, after rounding up to |block_size|. |
| tree_offset: Offset of the hash tree in the file. |
| tree_size: Size of the tree. |
| data_block_size: Data block size |
| hash_block_size: Hash block size |
| hash_algorithm: Hash algorithm used. |
| salt_hex: Salt used, as a hex-string |
| root_hash_hex: Root hash, as a hex-string. |
| """ |
| |
| MAGIC = 'BVBi' |
| SIZE = 4096 |
| RESERVED = 1968 |
| FORMAT_STRING = ('!4s2L' # magic, 2 x version |
| 'L' # dm-verity version used |
| 'Q' # image size (bytes) |
| 'Q' # tree offset (bytes) |
| 'Q' # tree size (bytes) |
| 'L' # data block size (bytes) |
| 'L' # hash block size (bytes) |
| '32s' # hash algorithm used |
| '1024s' # salt, as a hex string |
| '1024s' + # root hash, as a hex string |
| str(RESERVED) + 'x') # padding for reserved bytes |
| |
| def __init__(self, data=None): |
| """Initializes a new footer object. |
| |
| Arguments: |
| data: If not None, must be a bytearray of size 4096. |
| |
| Raises: |
| LookupError: If the given footer is malformed. |
| struct.error: If the given data has no footer. |
| """ |
| assert struct.calcsize(self.FORMAT_STRING) == self.SIZE |
| |
| if data: |
| (self.magic, self.version_major, self.version_minor, |
| self.dm_verity_version, self.image_size, self.tree_offset, |
| self.tree_size, self.data_block_size, self.hash_block_size, |
| self.hash_algorithm, self.salt_hex, |
| self.root_hash_hex) = struct.unpack(self.FORMAT_STRING, data) |
| if self.magic != self.MAGIC: |
| raise LookupError('Given data does not look like a Brillo ' |
| 'integrity footer.') |
| # Nuke NUL-bytes at the end. |
| self.hash_algorithm = self.hash_algorithm.split('\0', 1)[0] |
| self.salt_hex = self.salt_hex.split('\0', 1)[0] |
| self.root_hash_hex = self.root_hash_hex.split('\0', 1)[0] |
| else: |
| self.magic = self.MAGIC |
| self.version_major = BVB_VERSION_MAJOR |
| self.version_minor = BVB_VERSION_MINOR |
| self.dm_verity_version = 0 |
| self.image_size = 0 |
| self.tree_offset = 0 |
| self.tree_size = 0 |
| self.data_block_size = 0 |
| self.hash_block_size = 0 |
| self.hash_algorithm = '' |
| self.salt_hex = '' |
| self.root_hash_hex = '' |
| |
| def save(self, output): |
| """Serializes the header (4096 bytes) to disk. |
| |
| Arguments: |
| output: The object to write the output to. |
| """ |
| output.write(struct.pack( |
| self.FORMAT_STRING, self.magic, self.version_major, self.version_minor, |
| self.dm_verity_version, self.image_size, self.tree_offset, |
| self.tree_size, self.data_block_size, self.hash_block_size, |
| self.hash_algorithm, self.salt_hex, self.root_hash_hex)) |
| |
| |
| class BvbHeader(object): |
| """A class for parsing and writing Brillo Verified Boot headers. |
| |
| Attributes: |
| The attributes correspond to the |BvBBootImageHeader| struct |
| defined in bvb_boot_image_header.h. |
| """ |
| |
| SIZE = 8192 |
| |
| # Keep in sync with |reserved| field of |BvbBootImageHeader|. |
| RESERVED = 3936 |
| |
| # Keep in sync with |BvbBootImageHeader|. |
| FORMAT_STRING = ('!4s2L' # magic, 2 x version |
| '3Q' # 3 x block size |
| 'L' # algorithm type |
| '2Q' # offset, size (hash) |
| '2Q' # offset, size (signature) |
| '2Q' # offset, size (public key) |
| '2Q' # offset, size (properties) |
| 'Q' # rollback_index |
| '2Q' # offset, size (kernel) |
| '2Q' # offset, size (initrd) |
| 'Q' # kernel load address |
| 'Q' # initrd load address |
| '4096s' + # cmdline |
| str(RESERVED) + 'x') # padding for reserved bytes |
| |
| def __init__(self, data=None): |
| """Initializes a new header object. |
| |
| Arguments: |
| data: If not None, must be a bytearray of size 8192. |
| |
| Raises: |
| Exception: If the given data is malformed. |
| """ |
| assert struct.calcsize(self.FORMAT_STRING) == self.SIZE |
| |
| if data: |
| (self.magic, self.header_version_major, self.header_version_minor, |
| self.authentication_data_block_size, self.auxilary_data_block_size, |
| self.payload_data_block_size, self.algorithm_type, self.hash_offset, |
| self.hash_size, self.signature_offset, self.signature_size, |
| self.public_key_offset, self.public_key_size, self.properties_offset, |
| self.properties_size, self.rollback_index, self.kernel_offset, |
| self.kernel_size, self.initrd_offset, self.initrd_size, |
| self.kernel_address, self.initrd_address, |
| self.kernel_cmdline) = struct.unpack(self.FORMAT_STRING, data) |
| # Nuke NUL-bytes at the end of the string. |
| self.kernel_cmdline = self.kernel_cmdline.split('\0', 1)[0] |
| if self.magic != 'BVB0': |
| raise Exception('Given image does not look like a Brillo boot image') |
| else: |
| self.magic = 'BVB0' |
| self.header_version_major = BVB_VERSION_MAJOR |
| self.header_version_minor = BVB_VERSION_MINOR |
| self.authentication_data_block_size = 0 |
| self.auxilary_data_block_size = 0 |
| self.payload_data_block_size = 0 |
| self.algorithm_type = 0 |
| self.hash_offset = 0 |
| self.hash_size = 0 |
| self.signature_offset = 0 |
| self.signature_size = 0 |
| self.public_key_offset = 0 |
| self.public_key_size = 0 |
| self.properties_offset = 0 |
| self.properties_size = 0 |
| self.rollback_index = 0 |
| self.kernel_offset = 0 |
| self.kernel_size = 0 |
| self.initrd_offset = 0 |
| self.initrd_size = 0 |
| self.kernel_address = 0 |
| self.initrd_address = 0 |
| self.kernel_cmdline = '' |
| |
| def save(self, output): |
| """Serializes the header (8192 bytes) to disk. |
| |
| Arguments: |
| output: The object to write the output to. |
| """ |
| output.write(struct.pack( |
| self.FORMAT_STRING, self.magic, self.header_version_major, |
| self.header_version_minor, self.authentication_data_block_size, |
| self.auxilary_data_block_size, self.payload_data_block_size, |
| self.algorithm_type, self.hash_offset, self.hash_size, |
| self.signature_offset, self.signature_size, self.public_key_offset, |
| self.public_key_size, self.properties_offset, self.properties_size, |
| self.rollback_index, self.kernel_offset, self.kernel_size, |
| self.initrd_offset, self.initrd_size, self.kernel_address, |
| self.initrd_address, self.kernel_cmdline)) |
| |
| |
| class BvbTool(object): |
| """Object for bvbtool.""" |
| |
| def __init__(self): |
| """Initiailzes the object.""" |
| parser = argparse.ArgumentParser(usage="""bvbtool COMMAND [<args>] |
| |
| Commands: |
| version Prints out version of bvbtool. |
| make_boot_image Make boot image. |
| sign_boot_image Sign boot image. |
| info_boot_image Show information about boot image. |
| add_image_hashes Add hashes for integrity-checking to image. |
| info_image_hashes Show information about integrity-checking hashes. |
| extract_public_key Extract public key. |
| |
| """) |
| parser.add_argument('COMMAND', help='The command to run') |
| args = parser.parse_args(sys.argv[1:2]) |
| if not hasattr(self, args.COMMAND): |
| print 'Unrecognized command' |
| parser.print_help() |
| sys.exit(1) |
| getattr(self, args.COMMAND)() |
| |
| def version(self): |
| """Implements the 'version' command.""" |
| print '%d.%d' % (BVB_VERSION_MAJOR, BVB_VERSION_MINOR) |
| |
| def info_boot_image(self): |
| """Implements the 'info_boot_image' command.""" |
| parser = argparse.ArgumentParser( |
| prog='bvbtool info_boot_image', |
| description='Show information about Brillo boot image.') |
| parser.add_argument('--image', |
| help='Brillo boot image to use', |
| type=argparse.FileType('rb'), |
| required=True) |
| parser.add_argument('--output', |
| help='Write info to file', |
| type=argparse.FileType('wt'), |
| default=sys.stdout) |
| args = parser.parse_args(sys.argv[2:]) |
| |
| h = BvbHeader(args.image.read(BvbHeader.SIZE)) |
| |
| (alg_name, _) = lookup_algorithm_by_type(h.algorithm_type) |
| |
| o = args.output |
| o.write('Boot Image version: %d.%d\n' % |
| (h.header_version_major, h.header_version_minor)) |
| o.write('Header Block: %d bytes\n' % BvbHeader.SIZE) |
| o.write('Authentication Block: %d bytes\n' % |
| h.authentication_data_block_size) |
| o.write('Auxilary Block: %d bytes\n' % h.auxilary_data_block_size) |
| o.write('Payload Block: %d bytes\n' % h.payload_data_block_size) |
| o.write('Algorithm: %s\n' % alg_name) |
| o.write('Rollback Index: %d\n' % h.rollback_index) |
| o.write('Kernel: %d bytes\n' % h.kernel_size) |
| o.write('Initrd: %d bytes\n' % h.initrd_size) |
| o.write('Kernel Load Address: 0x%08x\n' % h.kernel_address) |
| o.write('Initrd Load Address: 0x%08x\n' % h.initrd_address) |
| o.write('Kernel Cmdline: %s\n' % h.kernel_cmdline) |
| |
| # Print properties. |
| o.write('Properties:\n') |
| authentication_block_offset = BvbHeader.SIZE |
| auxilary_block_offset = ( |
| authentication_block_offset + h.authentication_data_block_size) |
| prop_start_offset = auxilary_block_offset + h.properties_offset |
| prop_end_offset = prop_start_offset + h.properties_size |
| args.image.seek(prop_start_offset) |
| num_printed = 0 |
| while args.image.tell() < prop_end_offset: |
| (key_len, value_len) = struct.unpack('!2Q', args.image.read(16)) |
| num_bytes = key_len + value_len + 2 |
| padding_bytes = (8 - num_bytes) & 7 |
| prop_data = args.image.read(num_bytes + padding_bytes) |
| key = prop_data[0:key_len] |
| # Avoid printing large property values (e.g. blobs). |
| if value_len >= 256: |
| o.write(' %s: (%d bytes)\n' % (key, value_len)) |
| else: |
| value = prop_data[key_len + 1:key_len + 1 + value_len] |
| o.write(' %s: %s\n' % (key, repr(value))) |
| num_printed += 1 |
| if num_printed == 0: |
| o.write(' (none)\n') |
| |
| def make_boot_image(self): |
| """Implements the 'make_boot_image' command.""" |
| parser = argparse.ArgumentParser(prog='bvbtool make_boot_image', |
| description='Make Brillo boot image.') |
| |
| parser.add_argument('--kernel', |
| help='Path to kernel', |
| type=argparse.FileType('rb')) #, required=True) |
| parser.add_argument('--initrd', |
| help='Path to ramdisk', |
| type=argparse.FileType('rb')) |
| parser.add_argument('--kernel_address', |
| help='Kernel load address', |
| type=parse_number, |
| default=0x10008000) |
| parser.add_argument('--initrd_address', |
| help='Ramdisk load address', |
| type=parse_number, |
| default=0x11000000) |
| parser.add_argument('--kernel_cmdline', |
| help='Kernel command-line', |
| default='') |
| parser.add_argument('--rootfs_with_hashes', |
| help='Setup dm-verity for given rootfs', |
| type=argparse.FileType('rb')) |
| parser.add_argument('--rollback_index', |
| help='Rollback Index', |
| type=parse_number, |
| default=0) |
| parser.add_argument('--prop', |
| help='Add property', |
| metavar='KEY:VALUE', |
| action='append') |
| parser.add_argument('--prop_from_file', |
| help='Add property from file', |
| metavar='KEY:PATH', |
| action='append') |
| parser.add_argument('--output', |
| help='Output file name', |
| type=argparse.FileType('wb'), |
| required=True) |
| args = parser.parse_args(sys.argv[2:]) |
| |
| h = BvbHeader() |
| |
| # Write header data block and leave ample room for hash, signature |
| # and public key. |
| h.authentication_data_block_size = 4096 |
| h.kernel_offset = 0 |
| h.kernel_size = os.fstat(args.kernel.fileno()).st_size |
| h.initrd_offset = 0 |
| h.initrd_size = 0 |
| if args.initrd: |
| h.initrd_offset = h.kernel_offset + h.kernel_size |
| h.initrd_size = os.fstat(args.initrd.fileno()).st_size |
| h.payload_data_block_size = h.kernel_size + h.initrd_size |
| |
| # Generate properties blob. |
| encoded_props = bytearray() |
| if args.prop: |
| for prop in args.prop: |
| idx = prop.find(':') |
| if idx == -1: |
| sys.stderr.write('Malformed --property value %s.\n', prop) |
| sys.exit(1) |
| key = prop[0:idx] |
| value = prop[(idx + 1):] |
| add_property(encoded_props, key, value) |
| if args.prop_from_file: |
| for prop in args.prop_from_file: |
| idx = prop.find(':') |
| if idx == -1: |
| sys.stderr.write('Malformed --property value %s.\n', prop) |
| sys.exit(1) |
| key = prop[0:idx] |
| file_path = prop[(idx + 1):] |
| value = open(file_path, 'rb').read() |
| add_property(encoded_props, key, value) |
| |
| # We'll store the properties at offset 0 in the Auxiliary data |
| # block. Make sure it's big enough to hold the biggest |
| # possible key. |
| h.auxilary_data_block_size = round_to_multiple( |
| len(encoded_props) + 4096, 64) |
| h.properties_offset = 0 |
| h.properties_size = len(encoded_props) |
| |
| h.rollback_index = args.rollback_index |
| h.kernel_address = args.kernel_address |
| h.initrd_address = args.initrd_address |
| h.kernel_cmdline = args.kernel_cmdline |
| |
| # Setup dm-verity, if requested. |
| if args.rootfs_with_hashes: |
| args.rootfs_with_hashes.seek(0, os.SEEK_END) |
| image_size = args.rootfs_with_hashes.tell() |
| args.rootfs_with_hashes.seek(image_size - BvbIntegrityFooter.SIZE) |
| f = BvbIntegrityFooter(args.rootfs_with_hashes.read( |
| BvbIntegrityFooter.SIZE)) |
| c = 'dm="1 vroot none ro 1,' |
| c += '0 ' # start |
| c += '%d ' % (f.image_size / 512) # size (# sectors) |
| c += 'verity %d ' % f.dm_verity_version # type and version |
| c += 'PARTUUID=$(ANDROID_SYSTEM_PARTUUID) ' # data_dev |
| c += 'PARTUUID=$(ANDROID_SYSTEM_PARTUUID) ' # hash_dev |
| c += '%d ' % f.data_block_size # data_block |
| c += '%d ' % f.hash_block_size # hash_block |
| c += '%d ' % (f.image_size / f.data_block_size) # #blocks |
| c += '%d ' % (f.image_size / f.data_block_size) # hash_offset |
| c += '%s ' % f.hash_algorithm # hash_alg |
| c += '%s ' % f.root_hash_hex # root_digest |
| c += '%s' % f.salt_hex # salt |
| c += '"' |
| if h.kernel_cmdline: |
| c += ' ' |
| h.kernel_cmdline = c + h.kernel_cmdline |
| |
| # Save the header. |
| h.save(args.output) |
| |
| # Write Authentication data block, as zeroes. |
| args.output.write(struct.pack(str(h.authentication_data_block_size) + 'x')) |
| |
| # Write Auxilary data block. First properties, then pad with zeroes. |
| args.output.write(encoded_props) |
| args.output.write(struct.pack(str(h.auxilary_data_block_size - len( |
| encoded_props)) + 'x')) |
| |
| # Write Payload data block: kernel and initrd |
| args.output.write(args.kernel.read()) |
| if args.initrd: |
| args.output.write(args.initrd.read()) |
| |
| def sign_boot_image(self): |
| """Implements the 'sign_boot_image' command.""" |
| parser = argparse.ArgumentParser(prog='bvbtool sign_boot_image', |
| description='Sign Brillo boot image.') |
| group = parser.add_argument_group() |
| group.add_argument('--show_algorithms', |
| help='Show avaiable algorithms', |
| action='store_true') |
| group = parser.add_argument_group() |
| group.add_argument('--image', |
| help='Brillo boot image to sign', |
| type=argparse.FileType('rab+')) |
| group.add_argument('--key', help='Path to RSA private key file') |
| group.add_argument('--algorithm', help='Algorithm to use') |
| args = parser.parse_args(sys.argv[2:]) |
| |
| if args.show_algorithms: |
| algs = [] |
| for alg_name in ALGORITHMS: |
| if alg_name != 'NONE': |
| algs.append(alg_name) |
| algs.sort() |
| for alg in algs: |
| print alg |
| sys.exit(0) |
| |
| # Support 'NONE' to avoid conditionals in build systems if |
| # signing is not needed (this way they can always execute |
| # 'bvbtool sign_boot_image --algorithm NONE') |
| if args.algorithm == 'NONE': |
| sys.exit(0) |
| |
| if not args.algorithm: |
| sys.stderr.write('Option --algorithm is required.\n') |
| sys.exit(1) |
| if not args.image: |
| sys.stderr.write('Option --image is required.\n') |
| sys.exit(1) |
| if not args.key: |
| sys.stderr.write('Option --key is required.\n') |
| sys.exit(1) |
| |
| try: |
| alg = ALGORITHMS[args.algorithm] |
| except IndexError: |
| sys.stderr.write('Unknown algorithm %s.\n' % args.algorithm) |
| sys.exit(1) |
| |
| h = BvbHeader(args.image.read(BvbHeader.SIZE)) |
| |
| if h.properties_offset != 0: |
| # This is just an implementation limitation which can be lifted later. |
| sys.stderr.write('Only support images with props at the top.\n') |
| sys.exit(1) |
| |
| if h.authentication_data_block_size < ( |
| alg.hash_num_bytes + alg.signature_num_bytes): |
| sys.stderr.write('Insufficient room for storing hash and signature.\n') |
| sys.exit(1) |
| |
| if h.auxilary_data_block_size < alg.public_key_num_bytes: |
| sys.stderr.write('Insufficient room for storing public key.\n') |
| sys.exit(1) |
| |
| authentication_data_block_offset = BvbHeader.SIZE |
| auxilary_data_block_offset = ( |
| authentication_data_block_offset + h.authentication_data_block_size) |
| |
| # Update header with hash type/offset/size, signature |
| # type/offset/size, and public key size. |
| h.algorithm_type = alg.algorithm_type |
| # Hash offset and size (in Authentication data block). |
| h.hash_offset = 0 |
| h.hash_size = alg.hash_num_bytes |
| # Signature offset and size - it's stored right after the hash |
| # (in Authentication data block). |
| h.signature_offset = alg.hash_num_bytes |
| h.signature_size = alg.signature_num_bytes |
| # Public key offset and size - follows properties (in Auxilary data block). |
| h.public_key_offset = h.properties_size |
| h.public_key_size = alg.public_key_num_bytes |
| |
| # Extract public key and insert it into "Auxilary data" block. |
| key = Crypto.PublicKey.RSA.importKey(open(args.key).read()) |
| args.image.seek(auxilary_data_block_offset + h.properties_size) |
| write_rsa_key(args.image, key) |
| |
| # Save the updated header. |
| args.image.seek(0) |
| h.save(args.image) |
| |
| # Calculate the hash. |
| if args.algorithm[0:6] == 'SHA256': |
| ha = hashlib.sha256() |
| elif args.algorithm[0:6] == 'SHA512': |
| ha = hashlib.sha512() |
| else: |
| sys.stderr.write('Unsupported algorithm.\n') |
| sys.exit(1) |
| args.image.seek(0) |
| ha.update(args.image.read(BvbHeader.SIZE)) |
| args.image.seek(auxilary_data_block_offset) |
| ha.update(args.image.read(h.auxilary_data_block_size)) |
| ha.update(args.image.read(h.payload_data_block_size)) |
| # Write the hash |
| args.image.seek(authentication_data_block_offset) |
| binary_hash = ha.digest() |
| args.image.write(binary_hash) |
| |
| # Calculate the signature. |
| p = subprocess.Popen( |
| ['openssl', 'rsautl', '-sign', '-inkey', args.key, '-raw'], |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| padding_and_hash = str(bytearray(alg.padding)) + binary_hash |
| (pout, perr) = p.communicate(padding_and_hash) |
| retcode = p.wait() |
| if retcode != 0: |
| sys.stderr.write('Error signing: %s\n' % perr) |
| sys.exit(1) |
| # Write the signature. |
| args.image.seek(authentication_data_block_offset + alg.hash_num_bytes) |
| args.image.write(pout) |
| |
| def extract_public_key(self): |
| """Implements the 'extract_public_key' command.""" |
| parser = argparse.ArgumentParser( |
| prog='bvbtool extract_public_key', |
| description= |
| 'Extract public key and dump it in the format used by Brillo.') |
| parser.add_argument('--key', |
| help='Path to RSA private key file', |
| required=True) |
| parser.add_argument('--output', |
| help='Output file name', |
| type=argparse.FileType('wb'), |
| required=True) |
| args = parser.parse_args(sys.argv[2:]) |
| |
| key = Crypto.PublicKey.RSA.importKey(open(args.key).read()) |
| write_rsa_key(args.output, key) |
| |
| def info_image_hashes(self): |
| """Implements the 'info_boot_image' command.""" |
| parser = argparse.ArgumentParser( |
| prog='bvbtool info_image_hashes', |
| description='Show information about integrity-checking hashes.') |
| parser.add_argument('--image', |
| help='Brillo boot image to use', |
| type=argparse.FileType('rb'), |
| required=True) |
| parser.add_argument('--output', |
| help='Write info to file', |
| type=argparse.FileType('wt'), |
| default=sys.stdout) |
| args = parser.parse_args(sys.argv[2:]) |
| |
| args.image.seek(0, os.SEEK_END) |
| image_size = args.image.tell() |
| args.image.seek(image_size - BvbIntegrityFooter.SIZE) |
| footer = BvbIntegrityFooter(args.image.read(BvbIntegrityFooter.SIZE)) |
| |
| o = args.output |
| o.write('Footer version: %d.%d\n' % |
| (footer.version_major, footer.version_minor)) |
| o.write('Version of dm-verity: %d\n' % footer.dm_verity_version) |
| o.write('Image Size: %d bytes\n' % footer.image_size) |
| o.write('Tree Offset: %d\n' % footer.tree_offset) |
| o.write('Tree Size: %d bytes\n' % footer.tree_size) |
| o.write('Data Block Size: %d bytes\n' % footer.data_block_size) |
| o.write('Hash Block Size: %d bytes\n' % footer.hash_block_size) |
| o.write('Hash Algorithm: %s\n' % footer.hash_algorithm) |
| o.write('Salt: %s\n' % footer.salt_hex) |
| o.write('Root Hash: %s\n' % footer.root_hash_hex) |
| |
| def add_image_hashes(self): |
| """Implements the 'add_image_hashes' command. |
| |
| See https://gitlab.com/cryptsetup/cryptsetup/wikis/DMVerity for |
| more information about dm-verity and these hashes. |
| """ |
| parser = argparse.ArgumentParser( |
| prog='bvbtool add_hashes_to_image', |
| description='Add hashes for integrity-checking to image.') |
| parser.add_argument('--image', |
| help='Brillo boot image to add hashes to', |
| type=argparse.FileType('rab+')) |
| parser.add_argument('--hash', |
| help='Hash algorithm to use (default: sha1)', |
| default='sha1') |
| parser.add_argument('--salt', help='Salt in hex (default: /dev/urandom)') |
| parser.add_argument('--block_size', |
| help='Block size (default: 4096)', |
| type=parse_number, |
| default=4096) |
| args = parser.parse_args(sys.argv[2:]) |
| |
| # If there's already a footer of ours, truncate it. This way |
| # 'bvbtool add_hashes_to_image' is idempotent modulo salts. |
| args.image.seek(0, os.SEEK_END) |
| image_size = args.image.tell() |
| args.image.seek(image_size - BvbIntegrityFooter.SIZE) |
| try: |
| footer = BvbIntegrityFooter(args.image.read(BvbIntegrityFooter.SIZE)) |
| # Existing footer found. Just truncate. |
| image_size = footer.image_size |
| args.image.truncate(image_size) |
| except (LookupError, struct.error): |
| pass |
| |
| # Ensure image is multiple of block_size |
| rounded_image_size = round_to_multiple(image_size, args.block_size) |
| if rounded_image_size > image_size: |
| args.image.write('\0' * (rounded_image_size - image_size)) |
| image_size = rounded_image_size |
| |
| tree_offset = image_size |
| |
| digest_size = len(hashlib.new(name=args.hash).digest()) |
| digest_padding = round_to_pow2(digest_size) - digest_size |
| |
| if args.salt: |
| salt = args.salt.decode('hex') |
| else: |
| if args.salt is None: |
| # If salt is not explicitly specified, choose a hash |
| # that's the same size as the hash size. |
| hash_size = digest_size |
| salt = open('/dev/urandom').read(hash_size) |
| else: |
| salt = '' |
| |
| # Hashes are stored upside down so we need to calcuate hash |
| # offsets in advance. |
| (hash_level_offsets, tree_size) = calc_hash_level_offsets( |
| image_size, args.block_size, digest_size + digest_padding) |
| |
| # Make room for the tree. |
| args.image.truncate(image_size + tree_size) |
| |
| # Generate the tree. |
| root_hash = generate_hash_tree(args.image, image_size, args.block_size, |
| args.hash, salt, digest_padding, tree_offset, |
| hash_level_offsets) |
| |
| # Write footer with the root hash and other information. |
| footer = BvbIntegrityFooter() |
| footer.dm_verity_version = 1 |
| footer.image_size = image_size |
| footer.tree_offset = tree_offset |
| footer.tree_size = tree_size |
| footer.data_block_size = args.block_size |
| footer.hash_block_size = args.block_size |
| footer.hash_algorithm = args.hash |
| footer.salt_hex = salt.encode('hex') |
| footer.root_hash_hex = root_hash.encode('hex') |
| args.image.seek(tree_offset + tree_size) |
| footer.save(args.image) |
| |
| |
| def calc_hash_level_offsets(image_size, block_size, digest_size): |
| """Calculate the offsets of all the hash-levels in a Merkle-tree. |
| |
| Arguments: |
| image_size: The size of the image to calculate a Merkle-tree for. |
| block_size: The block size, e.g. 4096. |
| digest_size: The size of each hash, e.g. 32 for SHA-256. |
| |
| Returns: |
| A tuple where the first argument is an array of offsets and the |
| second is size of the tree, in bytes. |
| """ |
| level_offsets = [] |
| level_sizes = [] |
| tree_size = 0 |
| |
| num_levels = 0 |
| size = image_size |
| while size > block_size: |
| num_blocks = (size + block_size - 1) / block_size |
| level_size = round_to_multiple(num_blocks * digest_size, block_size) |
| |
| level_sizes.append(level_size) |
| tree_size += level_size |
| num_levels += 1 |
| |
| size = level_size |
| |
| for n in range(0, num_levels): |
| offset = 0 |
| for m in range(n + 1, num_levels): |
| offset += level_sizes[m] |
| level_offsets.append(offset) |
| |
| return (level_offsets, tree_size) |
| |
| |
| def generate_hash_tree(image, image_size, block_size, hash_alg_name, salt, |
| digest_padding, tree_offset, hash_level_offsets): |
| """Generates a Merkle-tree for a file. |
| |
| Args: |
| image: The image, as a file. |
| image_size: The size of the image. |
| block_size: The block size, e.g. 4096. |
| hash_alg_name: The hash algorithm, e.g. 'sha256' or 'sha1'. |
| salt: The salt to use. |
| digest_padding: The padding for each digest. |
| tree_offset: The offset of where to store the Merkle tree in |image|. |
| hash_level_offsets: The offsets from calc_hash_level_offsets(). |
| |
| Returns: |
| The top-level hash. |
| """ |
| hash_src_offset = 0 |
| hash_src_size = image_size |
| level_num = 0 |
| while hash_src_size > block_size: |
| level_output = '' |
| image.seek(hash_src_offset) |
| remaining = hash_src_size |
| while remaining > 0: |
| hasher = hashlib.new(name=hash_alg_name, string=salt) |
| data = image.read(min(remaining, block_size)) |
| assert data |
| remaining -= len(data) |
| hasher.update(data) |
| if len(data) < block_size: |
| hasher.update('\0' * (block_size - len(data))) |
| level_output += hasher.digest() |
| if digest_padding > 0: |
| level_output += '\0' * digest_padding |
| |
| padding_needed = (round_to_multiple( |
| len(level_output), block_size) - len(level_output)) |
| level_output += '\0' * padding_needed |
| |
| hash_dest_offset = hash_level_offsets[level_num] + tree_offset |
| |
| image.seek(hash_dest_offset) |
| image.write(level_output) |
| |
| hash_src_offset = hash_dest_offset |
| hash_src_size = len(level_output) |
| |
| level_num += 1 |
| |
| hasher = hashlib.new(name=hash_alg_name, string=salt) |
| hasher.update(level_output) |
| return hasher.digest() |
| |
| |
| if __name__ == '__main__': |
| BvbTool() |