parallellize computation of binary patches

Change ota_from_target_files to compute binary patches for all the
changed files in the OTA package in parallel.  On a recent test build
this cuts incremental construction time roughly in half (6 min to 2
min 40 sec).
diff --git a/tools/releasetools/ota_from_target_files b/tools/releasetools/ota_from_target_files
index fe63c3a..cbae34c 100755
--- a/tools/releasetools/ota_from_target_files
+++ b/tools/releasetools/ota_from_target_files
@@ -62,6 +62,7 @@
 import sha
 import subprocess
 import tempfile
+import threading
 import time
 import zipfile
 
@@ -80,6 +81,7 @@
 OPTIONS.omit_prereq = False
 OPTIONS.extra_script = None
 OPTIONS.script_mode = 'auto'
+OPTIONS.worker_threads = 3
 
 def MostPopularKey(d, default):
   """Given a dict, return the key corresponding to the largest
@@ -297,7 +299,8 @@
   executable.
   """
 
-  patch = Difference(recovery_img, boot_img, "imgdiff")
+  d = Difference(recovery_img, boot_img)
+  _, _, patch = d.ComputePatch()
   common.ZipWriteStr(output_zip, "system/recovery-from-boot.p", patch)
   Item.Get("system/recovery-from-boot.p", dir=False)
 
@@ -420,38 +423,111 @@
   return out
 
 
-def Difference(tf, sf, diff_program):
-  """Return the patch (as a string of data) needed to turn sf into tf.
-  diff_program is the name of an external program (or list, if
-  additional arguments are desired) to run to generate the diff.
-  """
+DIFF_PROGRAM_BY_EXT = {
+    ".gz" : "imgdiff",
+    ".zip" : ["imgdiff", "-z"],
+    ".jar" : ["imgdiff", "-z"],
+    ".apk" : ["imgdiff", "-z"],
+    ".img" : "imgdiff",
+    }
 
-  ttemp = tf.WriteToTemp()
-  stemp = sf.WriteToTemp()
 
-  ext = os.path.splitext(tf.name)[1]
+class Difference(object):
+  def __init__(self, tf, sf):
+    self.tf = tf
+    self.sf = sf
+    self.patch = None
 
-  try:
-    ptemp = tempfile.NamedTemporaryFile()
-    if isinstance(diff_program, list):
-      cmd = copy.copy(diff_program)
-    else:
-      cmd = [diff_program]
-    cmd.append(stemp.name)
-    cmd.append(ttemp.name)
-    cmd.append(ptemp.name)
-    p = common.Run(cmd)
-    _, err = p.communicate()
-    if err or p.returncode != 0:
-      print "WARNING: failure running %s:\n%s\n" % (diff_program, err)
-      return None
-    diff = ptemp.read()
-  finally:
-    ptemp.close()
-    stemp.close()
-    ttemp.close()
+  def ComputePatch(self):
+    """Compute the patch (as a string of data) needed to turn sf into
+    tf.  Returns the same tuple as GetPatch()."""
 
-  return diff
+    tf = self.tf
+    sf = self.sf
+
+    ext = os.path.splitext(tf.name)[1]
+    diff_program = DIFF_PROGRAM_BY_EXT.get(ext, "bsdiff")
+
+    ttemp = tf.WriteToTemp()
+    stemp = sf.WriteToTemp()
+
+    ext = os.path.splitext(tf.name)[1]
+
+    try:
+      ptemp = tempfile.NamedTemporaryFile()
+      if isinstance(diff_program, list):
+        cmd = copy.copy(diff_program)
+      else:
+        cmd = [diff_program]
+      cmd.append(stemp.name)
+      cmd.append(ttemp.name)
+      cmd.append(ptemp.name)
+      p = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+      _, err = p.communicate()
+      if err or p.returncode != 0:
+        print "WARNING: failure running %s:\n%s\n" % (diff_program, err)
+        return None
+      diff = ptemp.read()
+    finally:
+      ptemp.close()
+      stemp.close()
+      ttemp.close()
+
+    self.patch = diff
+    return self.tf, self.sf, self.patch
+
+
+  def GetPatch(self):
+    """Return a tuple (target_file, source_file, patch_data).
+    patch_data may be None if ComputePatch hasn't been called, or if
+    computing the patch failed."""
+    return self.tf, self.sf, self.patch
+
+
+def ComputeDifferences(diffs):
+  """Call ComputePatch on all the Difference objects in 'diffs'."""
+  print len(diffs), "diffs to compute"
+
+  # Do the largest files first, to try and reduce the long-pole effect.
+  by_size = [(i.tf.size, i) for i in diffs]
+  by_size.sort(reverse=True)
+  by_size = [i[1] for i in by_size]
+
+  lock = threading.Lock()
+  diff_iter = iter(by_size)   # accessed under lock
+
+  def worker():
+    try:
+      lock.acquire()
+      for d in diff_iter:
+        lock.release()
+        start = time.time()
+        d.ComputePatch()
+        dur = time.time() - start
+        lock.acquire()
+
+        tf, sf, patch = d.GetPatch()
+        if sf.name == tf.name:
+          name = tf.name
+        else:
+          name = "%s (%s)" % (tf.name, sf.name)
+        if patch is None:
+          print "patching failed!                                  %s" % (name,)
+        else:
+          print "%8.2f sec %8d / %8d bytes (%6.2f%%) %s" % (
+              dur, len(patch), tf.size, 100.0 * len(patch) / tf.size, name)
+      lock.release()
+    except e:
+      print e
+      raise
+
+  # start worker threads; wait for them all to finish.
+  threads = [threading.Thread(target=worker)
+             for i in range(OPTIONS.worker_threads)]
+  for th in threads:
+    th.start()
+  while threads:
+    threads.pop().join()
 
 
 def GetBuildProp(property, z):
@@ -482,14 +558,6 @@
       return 0
 
 
-DIFF_METHOD_BY_EXT = {
-    ".gz" : "imgdiff",
-    ".zip" : ["imgdiff", "-z"],
-    ".jar" : ["imgdiff", "-z"],
-    ".apk" : ["imgdiff", "-z"],
-    }
-
-
 def WriteIncrementalOTAPackage(target_zip, source_zip, output_zip):
   source_version = GetRecoveryAPIVersion(source_zip)
 
@@ -521,9 +589,11 @@
 
   verbatim_targets = []
   patch_list = []
+  diffs = []
   largest_source_size = 0
   for fn in sorted(target_data.keys()):
     tf = target_data[fn]
+    assert fn == tf.name
     sf = source_data.get(fn, None)
 
     if sf is None or fn in OPTIONS.require_verbatim:
@@ -535,25 +605,23 @@
       verbatim_targets.append((fn, tf.size))
     elif tf.sha1 != sf.sha1:
       # File is different; consider sending as a patch
-      ext = os.path.splitext(tf.name)[1]
-      diff_method = DIFF_METHOD_BY_EXT.get(ext, "bsdiff")
-      d = Difference(tf, sf, diff_method)
-      if d is not None:
-        print fn, tf.size, len(d), (float(len(d)) / tf.size)
-      if d is None or len(d) > tf.size * OPTIONS.patch_threshold:
-        # patch is almost as big as the file; don't bother patching
-        tf.AddToZip(output_zip)
-        verbatim_targets.append((fn, tf.size))
-      else:
-        common.ZipWriteStr(output_zip, "patch/" + fn + ".p", d)
-        patch_list.append((fn, tf, sf, tf.size))
-        largest_source_size = max(largest_source_size, sf.size)
+      diffs.append(Difference(tf, sf))
     else:
       # Target file identical to source.
       pass
 
-  total_verbatim_size = sum([i[1] for i in verbatim_targets])
-  total_patched_size = sum([i[3] for i in patch_list])
+  ComputeDifferences(diffs)
+
+  for diff in diffs:
+    tf, sf, d = diff.GetPatch()
+    if d is None or len(d) > tf.size * OPTIONS.patch_threshold:
+      # patch is almost as big as the file; don't bother patching
+      tf.AddToZip(output_zip)
+      verbatim_targets.append((tf.name, tf.size))
+    else:
+      common.ZipWriteStr(output_zip, "patch/" + tf.name + ".p", d)
+      patch_list.append((tf.name, tf, sf, tf.size))
+      largest_source_size = max(largest_source_size, sf.size)
 
   source_fp = GetBuildProp("ro.build.fingerprint", source_zip)
   target_fp = GetBuildProp("ro.build.fingerprint", target_zip)
@@ -600,7 +668,8 @@
     script.SetProgress(so_far / total_verify_size)
 
   if updating_boot:
-    d = Difference(target_boot, source_boot, "imgdiff")
+    d = Difference(target_boot, source_boot)
+    _, _, d = d.ComputePatch()
     print "boot      target: %d  source: %d  diff: %d" % (
         target_boot.size, source_boot.size, len(d))
 
@@ -755,6 +824,8 @@
       OPTIONS.extra_script = a
     elif o in ("-m", "--script_mode"):
       OPTIONS.script_mode = a
+    elif o in ("--worker_threads"):
+      OPTIONS.worker_threads = int(a)
     else:
       return False
     return True
@@ -767,7 +838,8 @@
                                               "wipe_user_data",
                                               "no_prereq",
                                               "extra_script=",
-                                              "script_mode="],
+                                              "script_mode=",
+                                              "worker_threads="],
                              extra_option_handler=option_handler)
 
   if len(args) != 2: