Add emmc_image.py

Test: None
Change-Id: I361d160017efef6bc8e3a8c2f2405bdf844c1177
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c995872
--- /dev/null
+++ b/README.md
@@ -0,0 +1,27 @@
+emmc_image.py is a tool to generate an eMMC USER image.
+
+Requirement:
+
+*   Python 2.7
+*   simg2img
+    *   Used to unsparse Android sparse images
+    *   Either make it available in `PATH` or set `SIMG2IMG_BIN` to the path of
+        the `simg2img` binary
+
+
+**Factory image downloaded from console in both slots:**
+
+```
+./emmc_image.py --out emmc.zip --factory_files console_build.zip --all-slots
+```
+
+
+**Custom images in slot A, factory image downloaded from console in slot B:**
+
+```
+./emmc_image.py --out emmc.zip --factory_files console_build.zip \
+--all-slots \
+--partition boot_a custom_boot.img \
+--partition system_a custom_system.img \
+--partition vbmeta_a custom_vbmeta.img
+```
diff --git a/emmc_image.py b/emmc_image.py
new file mode 100755
index 0000000..2ae34be
--- /dev/null
+++ b/emmc_image.py
@@ -0,0 +1,247 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import os
+import struct
+import shutil
+import subprocess
+import sys
+import tempfile
+import zipfile
+
+
+class PackedTuple(object):
+  """A base class for a named tuple packed into a binary string."""
+
+  def __init__(self, members, data=None):
+    """Initialize the instance with a given set of members.
+    Args:
+      members: The list of members of the tuple, as a pair of strings: struct
+        encoding and member name.
+    """
+    self._ptuple_fmt = '<' + ''.join(fldfmt for fldfmt, _ in members)
+    self._ptuple_flds = [fld for _, fld in members]
+    self._ptuple_size = struct.calcsize(self._ptuple_fmt)
+
+    values = ([None for _ in members] if data is None
+              else struct.unpack(self._ptuple_fmt, data[:self._ptuple_size]))
+    for value, fld in zip(values, self._ptuple_flds):
+      setattr(self, fld, value)
+
+  def __len__(self):
+    return self._ptuple_size
+
+  def __str__(self):
+    return struct.pack(self._ptuple_fmt,
+                       *(getattr(self, fld) for fld in self._ptuple_flds))
+
+  def __repr__(self):
+    return '<%s ' % (type(self).__name__) + ' '.join(
+        '%s=%r' % (fld, getattr(self, fld)) for fld in self._ptuple_flds) + ' >'
+
+
+class GPTPartitionEntry(PackedTuple):
+  """A packed tuple representing a GPT partition entry."""
+
+  def __init__(self, data):
+    members = (
+      ('16s', 'guid'),
+      ('16s', 'uuid'),
+      ('Q', 'first_lba'),
+      ('Q', 'last_lba'),
+      ('Q', 'flags'),
+      ('72s', 'name'),
+    )
+    super(GPTPartitionEntry, self).__init__(members, data)
+    if data is None:
+      self.guid = '\0' * 16
+      self.uuid = '\0' * 16
+      self.name = ''
+
+
+class GPTPartitionTable(PackedTuple):
+  """A packed tuple representing the header of a GPT partition."""
+
+  def __init__(self, data):
+    members = (
+      ('8s', 'signature'),
+      ('I', 'revision'),
+      ('I', 'header_size'),
+      ('I', 'crc32'),
+      ('4s', '_pad'),
+      ('Q', 'current_lba'),
+      ('Q', 'backup_lba'),
+      ('Q', 'first_usable_lba'),
+      ('Q', 'last_usable_lba'),
+      ('16s', 'disk_guid'),
+      ('Q', 'part_entry_start_lba'),
+      ('I', 'num_part_entries'),
+      ('I', 'part_entry_size'),
+      ('I', 'crc32_part_array'),
+    )
+    super(GPTPartitionTable, self).__init__(members, data)
+    if data is None:
+      self.current_lba = 1
+
+
+def SparseImageExtract(image_file):
+  """Return a temporary file with the RAW image from the sparse image in
+  |image_file|.
+
+  If |image_file| isn't an Android sparse image returns None instead. The
+  temporary file will be deleted once closed.
+  """
+  SPARSE_HEADER_FMT = '<I4H'
+  header_size = struct.calcsize(SPARSE_HEADER_FMT)
+  try:
+    # magic, major_version, minor_version, file_hdr_sz, chunk_hdr_sz
+    header = struct.unpack(SPARSE_HEADER_FMT, image_file.read(header_size))
+    image_file.seek(-header_size, os.SEEK_CUR)
+    # This is the only combination supported, so we used it to identify sparse
+    # image files.
+    if header != (0xED26FF3A, 1, 0, 28, 12):
+      return
+  except IOError:
+    pass
+
+  temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(image_file.name))
+  simg2img_bin = os.environ.get('SIMG2IMG_BIN', 'simg2img')
+  subprocess.check_call([simg2img_bin, image_file.name, temp_file.name])
+  return temp_file
+
+
+def WritePartition(out_file, part_file, start_offset):
+  out_file.seek(start_offset);
+  # Autodetect sparse images if possible.
+  raw_file = SparseImageExtract(part_file)
+  source_file = raw_file if raw_file else part_file
+  shutil.copyfileobj(source_file, out_file)
+  return source_file.tell()
+
+
+def ExtractZips(zips, file_name):
+  for zip_path in zips:
+    if not zip_path:
+      continue
+    with zipfile.ZipFile(zip_path, 'r') as zip_file:
+      if file_name not in zip_file.namelist():
+        continue
+      with zip_file.open(file_name, 'r') as part_file:
+        temp_file = tempfile.NamedTemporaryFile()
+        shutil.copyfileobj(part_file, temp_file)
+        temp_file.seek(0)
+        return temp_file
+
+
+def GenerateEMMC(args):
+  """Assemble an EMMC raw image."""
+  if args.partition_table:
+    with open(args.partition_table, 'rb') as gpt_file:
+      partition_table = gpt_file.read()
+  else:
+    gpt_file = ExtractZips(args.tfzips, 'IMAGES/partition-table.img')
+
+    if not gpt_file:
+      gpt_file = ExtractZips([args.factory_files],
+                              'partition-table.img')
+
+    assert gpt_file, 'No partition_table provided'
+    partition_table = gpt_file.read()
+
+  gpt = GPTPartitionTable(partition_table[512:])
+  gpt_partitions = {}
+  for i in range(gpt.num_part_entries):
+    offset = gpt.part_entry_start_lba * 512 + i * gpt.part_entry_size
+    part = GPTPartitionEntry(partition_table[offset:])
+    part_name = str(part.name.decode('utf-16le').rstrip('\0'))
+    if part_name:
+      if part_name in gpt_partitions:
+        print 'Ignoring duplicate partition entry "%s"' % part_name
+      else:
+        gpt_partitions[part_name] = part
+
+  gpt_head = partition_table[:gpt.first_usable_lba * 512]
+  gpt_tail = partition_table[gpt.first_usable_lba * 512:]
+  gpt_tail_offset = (gpt.last_usable_lba + 1) * 512
+
+  out_file = tempfile.NamedTemporaryFile()
+  print("Creating image: %s" % out_file.name)
+  out_file.write(gpt_head)
+  out_file.seek(gpt_tail_offset)
+  out_file.write(gpt_tail)
+
+  partition_overrides = {}
+  if args.partition:
+    partition_overrides = {name: path for name, path in args.partition}
+  for part_name, part in gpt_partitions.items():
+    part_offset = part.first_lba * 512
+    part_size = (part.last_lba - part.first_lba + 1) * 512
+
+    if part_name.endswith('_a') or part_name.endswith('_b'):
+      if not args.all_slots and part_name.endswith('_b'):
+        continue
+      part_name_no_suffix = part_name[:-2]
+    else:
+      part_name_no_suffix = part_name
+
+    if part_name in partition_overrides:
+      part_file = open(partition_overrides[part_name], 'rb')
+    elif part_name_no_suffix in partition_overrides:
+      part_file = open(partition_overrides[part_name_no_suffix], 'rb')
+    elif part_name == 'userdata':
+      part_file = None
+    else:
+      part_file = ExtractZips(args.tfzips,
+                              'IMAGES/%s.img' % part_name_no_suffix)
+      if not part_file:
+        part_file = ExtractZips([args.factory_files],
+                                '%s.img' % part_name_no_suffix)
+    if not part_file:
+      print ' Skipping partition "%s", no file provided' % (part_name,)
+      continue
+
+    print(' Copying partition "%s"' % (part_name))
+    copied_size = WritePartition(out_file, part_file, part_offset)
+    part_file.close()
+    assert copied_size <= part_size, \
+        'Partition %s overflow; size is %d KiB but copied %d KiB' % (
+            part_name, part_size / 1024, copied_size / 1024)
+    print(' Partition "%s", copied size: %d KiB at offset %d KiB' %
+          (part_name, copied_size / 1024, part_offset / 1024))
+
+  out_file.flush()
+  with zipfile.ZipFile(args.out, 'w', zipfile.ZIP_DEFLATED, True) as out_zip:
+    print("Zipping image: %s" % args.out)
+    out_zip.write(out_file.name, 'emmc.img')
+  return 0
+
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(description=GenerateEMMC.__doc__)
+  parser.add_argument('--out', metavar='OUTPUT', type=str, required=True,
+                      help='The output zipped emmc image.')
+  parser.add_argument('--partition_table', help='path to the MBR+GPT image.')
+  parser.add_argument('--partition', nargs=2, metavar=('PARTITION_NAME','PATH'),
+                      action='append', help='override partition images.')
+  parser.add_argument('--factory_files',
+                      help='path to the factory_files or flashfiles zip')
+  parser.add_argument('tfzips', nargs='*', metavar='TARGET_FILES',
+                      help='path to target_files zip(s)')
+  parser.add_argument('--all-slots', default=False, action='store_true',
+                      help='copy the provided images to all slots')
+
+  sys.exit(GenerateEMMC(parser.parse_args()))
diff --git a/emmc_image_unittest.py b/emmc_image_unittest.py
new file mode 100755
index 0000000..8664d3f
--- /dev/null
+++ b/emmc_image_unittest.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import tempfile
+import unittest
+import zipfile
+
+import emmc_image
+
+
+class GPTPartitionTableTest(unittest.TestCase):
+
+  def testGPTPartitionTable(self):
+    gpt_header = ('EFI PART'
+                  '\x00\x00\x01\x00'
+                  '\\\x00\x00\x00'
+                  '\x88R\xe9\x92'
+                  '\x00\x00\x00\x00'
+                  '\x01\x00\x00\x00\x00\x00\x00\x00'
+                  '\xff\xff\x7f\x00\x00\x00\x00\x00'
+                  '"\x00\x00\x00\x00\x00\x00\x00'
+                  '\xde\xff\x7f\x00\x00\x00\x00\x00'
+                  '\xdc\xdd0\x7f-WVF\xa3\x81\x96W\xfd\xe5\\L'
+                  '\x02\x00\x00\x00\x00\x00\x00\x00'
+                  '\x80\x00\x00\x00'
+                  '\x80\x00\x00\x00'
+                  '\x93J\xf4]')
+    gpt = emmc_image.GPTPartitionTable(gpt_header)
+    self.assertEqual(gpt.signature, 'EFI PART')
+    self.assertEqual(gpt.revision, 65536)
+    self.assertEqual(gpt.header_size, len(gpt_header))
+    self.assertEqual(gpt.crc32, 0x92e95288)
+    self.assertEqual(gpt.current_lba, 1)
+    self.assertEqual(gpt.backup_lba, 0x7fffff)
+    self.assertEqual(gpt.first_usable_lba, 34)
+    self.assertEqual(gpt.last_usable_lba, 0x7fffde)
+    self.assertEqual(gpt.disk_guid, '\xdc\xdd0\x7f-WVF\xa3\x81\x96W\xfd\xe5\\L')
+    self.assertEqual(gpt.part_entry_start_lba, 2)
+    self.assertEqual(gpt.num_part_entries, 128)
+    self.assertEqual(gpt.part_entry_size, 128)
+    self.assertEqual(gpt.crc32_part_array, 0x5df44a93)
+
+
+class GenerateEMMCTest(unittest.TestCase):
+
+  def setUp(self):
+    bpt = '''
+    {
+        "settings": {
+            "disk_size": "1 MiB"
+        },
+        "partitions": [
+            {
+                "ab": true,
+                "label": "boot",
+                "size": "10 KiB"
+            },
+            {
+                "ab": true,
+                "label": "system",
+                "size": "20 KiB"
+            },
+            {
+                "label": "misc",
+                "size": "1 KiB"
+            }
+        ]
+    }'''
+
+    bpt_file = tempfile.NamedTemporaryFile()
+    bpt_file.write(bpt)
+    bpt_file.flush()
+    self.gpt_file = tempfile.NamedTemporaryFile()
+    subprocess.check_call([
+        'bpttool', 'make_table', '--input', bpt_file.name, '--output_gpt',
+        self.gpt_file.name
+    ])
+
+    boot_file = tempfile.NamedTemporaryFile()
+    self.boot = os.urandom(8 * 1024)
+    boot_file.write(self.boot)
+    boot_file.flush()
+
+    zip_file = tempfile.NamedTemporaryFile()
+    self.misc = os.urandom(300)
+    with zipfile.ZipFile(zip_file.name, 'w') as factory_files_zip:
+      factory_files_zip.writestr('misc.img', self.misc)
+    self.out_file = tempfile.NamedTemporaryFile()
+
+    class FakeArgs():
+
+      def __init__(self):
+        self.partition_table = ''
+        self.partition = [['boot', boot_file.name]]
+        self.factory_files = zip_file.name
+        self.tfzips = []
+        self.out = ''
+        self.all_slots = False
+
+    self.args = FakeArgs()
+    self.args.partition_table = self.gpt_file.name
+    self.args.out = self.out_file.name
+
+
+  def testGenerateEMMC(self):
+    self.assertEqual(emmc_image.GenerateEMMC(self.args), 0)
+
+    out_zip = zipfile.ZipFile(self.out_file.name, 'r')
+    emmc_file = out_zip.open('emmc.img', 'r')
+    emmc = emmc_file.read()
+    self.assertEqual(len(emmc), 1024 * 1024)
+
+    self.assertEqual(emmc[:17 * 1024], self.gpt_file.read(17 * 1024))
+    boot_offset = 20 * 1024
+    boot_part_size = 10 * 1024
+    self.assertEqual(emmc[boot_offset:boot_offset + len(self.boot)], self.boot)
+    # rest of boot_a should be 0
+    self.assertEqual(
+        emmc[boot_offset + len(self.boot):boot_offset + boot_part_size],
+        '\0' * (boot_part_size - len(self.boot)))
+    # padding between boot_a and boot_b should be 0
+    self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+    # unwritten boot_b should be 0
+    self.assertEqual(emmc[32 * 1024:42 * 1024], '\0' * 10 * 1024)
+    # padding between boot_b and system_a should be 0
+    self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+    # unwritten system_a should be 0
+    self.assertEqual(emmc[44 * 1024:64 * 1024], '\0' * 20 * 1024)
+    # unwritten system_b should be 0
+    self.assertEqual(emmc[64 * 1024:84 * 1024], '\0' * 20 * 1024)
+    misc_offset = 84 * 1024
+    misc_part_size = 1024
+    self.assertEqual(emmc[misc_offset:misc_offset + len(self.misc)], self.misc)
+    # rest of misc should be 0
+    self.assertEqual(
+        emmc[misc_offset + len(self.misc):misc_offset + misc_part_size],
+        '\0' * (misc_part_size - len(self.misc)))
+    # secondary gpt
+    self.assertEqual(emmc[-33 * 512:], self.gpt_file.read(33 * 512))
+
+  def testParitionOverride(self):
+    boot_b_file = tempfile.NamedTemporaryFile()
+    boot_b = os.urandom(9 * 1024)
+    boot_b_file.write(boot_b)
+    boot_b_file.flush()
+
+    self.args.partition += [['boot_b', boot_b_file.name]]
+    self.args.all_slots = True
+    self.assertEqual(emmc_image.GenerateEMMC(self.args), 0)
+
+    out_zip = zipfile.ZipFile(self.out_file.name, 'r')
+    emmc_file = out_zip.open('emmc.img', 'r')
+    emmc = emmc_file.read()
+    self.assertEqual(len(emmc), 1024 * 1024)
+
+    self.assertEqual(emmc[:17 * 1024], self.gpt_file.read(17 * 1024))
+    boot_offset = 20 * 1024
+    boot_part_size = 10 * 1024
+    self.assertEqual(emmc[boot_offset:boot_offset + len(self.boot)], self.boot)
+    # rest of boot_a should be 0
+    self.assertEqual(
+        emmc[boot_offset + len(self.boot):boot_offset + boot_part_size],
+        '\0' * (boot_part_size - len(self.boot)))
+    # padding between boot_a and boot_b should be 0
+    self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+    boot_b_offset = 32 * 1024
+    self.assertEqual(emmc[boot_b_offset:boot_b_offset + len(boot_b)], boot_b)
+    # rest of boot_b should be 0
+    self.assertEqual(
+        emmc[boot_b_offset + len(boot_b):boot_b_offset + boot_part_size],
+        '\0' * (boot_part_size - len(boot_b)))
+    # padding between boot_b and system_a should be 0
+    self.assertEqual(emmc[30 * 1024:32 * 1024], '\0' * 2 * 1024)
+    # unwritten system_a should be 0
+    self.assertEqual(emmc[44 * 1024:64 * 1024], '\0' * 20 * 1024)
+    # unwritten system_b should be 0
+    self.assertEqual(emmc[64 * 1024:84 * 1024], '\0' * 20 * 1024)
+    misc_offset = 84 * 1024
+    misc_part_size = 1024
+    self.assertEqual(emmc[misc_offset:misc_offset + len(self.misc)], self.misc)
+    # rest of misc should be 0
+    self.assertEqual(
+        emmc[misc_offset + len(self.misc):misc_offset + misc_part_size],
+        '\0' * (misc_part_size - len(self.misc)))
+    # secondary gpt
+    self.assertEqual(emmc[-33 * 512:], self.gpt_file.read(33 * 512))
+
+
+if __name__ == '__main__':
+  unittest.main()