validation_pool: Split out throttled tree logic.

ApplyPoolIntoRepo contained the logic to decide what CLs to test if the
tree was logic. Move that logic out into it's own method.

This is prep work to allow intermediate steps that modify the manifest
to run after we know exactly what CLs are being considered, but before
any of them are applied into the code base.

BUG=chromium:549844
TEST=cbuildbot/run_tests

Change-Id: I0eb17459c956e4c1fe9e2995e9bd601f279aee72
Reviewed-on: https://chromium-review.googlesource.com/311212
Commit-Ready: Don Garrett <dgarrett@chromium.org>
Tested-by: Don Garrett <dgarrett@chromium.org>
Reviewed-by: Aviv Keshet <akeshet@chromium.org>
diff --git a/cbuildbot/lkgm_manager.py b/cbuildbot/lkgm_manager.py
index 5f2d50b..947b7b2 100644
--- a/cbuildbot/lkgm_manager.py
+++ b/cbuildbot/lkgm_manager.py
@@ -249,6 +249,11 @@
     self.InitializeManifestVariables(version_info)
 
     self.GenerateBlameListSinceLKGM()
+
+    # Throw away CLs that might not be used this run.
+    if validation_pool:
+      validation_pool.FilterChangesForThrottledTree()
+
     new_manifest = self.CreateManifest()
 
     # For Chrome PFQ, add the version of Chrome to use.
diff --git a/cbuildbot/validation_pool.py b/cbuildbot/validation_pool.py
index b6e96e8..c5aa5e0 100644
--- a/cbuildbot/validation_pool.py
+++ b/cbuildbot/validation_pool.py
@@ -822,7 +822,7 @@
 
   @_ManifestDecorator
   def Apply(self, changes, frozen=True, honor_ordering=False,
-            changes_filter=None, max_change_count=None):
+            changes_filter=None):
     """Applies changes from pool into the build root specified by the manifest.
 
     This method resolves each given change down into a set of transactions-
@@ -855,10 +855,6 @@
         changes being inspected, and expand the changes if necessary.
         Primarily this is of use for cbuildbot patching when dealing w/
         uploaded/remote patches.
-      max_change_count: If not None, this is a soft integer limit on the number
-        of patches to pull in. We stop pulling in patches as soon as we grab
-        at least this many patches. Note that this limit may be exceeded by N-1,
-        where N is the length of the longest transaction.
 
     Returns:
       A tuple of changes-applied, Exceptions for the changes that failed
@@ -885,9 +881,6 @@
                      change, ', '.join(map(str, resolved[-1][-1])))
         planned.update(plan)
 
-      if max_change_count is not None and len(planned) >= max_change_count:
-        break
-
     if not resolved:
       # No work to do; either no changes were given to us, or all failed
       # to be resolved.
@@ -1622,6 +1615,22 @@
 
       logging.PrintBuildbotLink(s, change.url)
 
+  def FilterChangesForThrottledTree(self):
+    """Apply Throttled Tree logic to select patch candidates.
+
+    If the tree is throttled, we only test a random subset of our candidate
+    changes. Call this to select that subset, and throw away unrelated changes.
+
+    If the three was open when this pool was created, it does nothing.
+    """
+    if self.tree_was_open:
+      return
+
+    fail_streak = self._GetFailStreak()
+    test_pool_size = max(1, len(self.changes) / (2**fail_streak))
+    random.shuffle(self.changes)
+    self.changes = self.changes[:test_pool_size]
+
   def ApplyPoolIntoRepo(self, manifest=None):
     """Applies changes from pool into the directory specified by the buildroot.
 
@@ -1640,18 +1649,11 @@
     failed_inflight = []
     patch_series = PatchSeries(self.build_root, helper_pool=self._helper_pool)
 
-    # Only try a subset of the changes if the tree was throttled.
-    max_change_count = len(self.changes)
-    if not self.tree_was_open:
-      random.shuffle(self.changes)
-      fail_streak = self._GetFailStreak()
-      max_change_count = max(1, len(self.changes) / (2**fail_streak))
-
     if self.is_master:
       try:
         # pylint: disable=E1123
         applied, failed_tot, failed_inflight = patch_series.Apply(
-            self.changes, manifest=manifest, max_change_count=max_change_count)
+            self.changes, manifest=manifest)
       except (KeyboardInterrupt, RuntimeError, SystemExit):
         raise
       except Exception as e:
diff --git a/cbuildbot/validation_pool_unittest.py b/cbuildbot/validation_pool_unittest.py
index 228c5e4..3fab2d1 100644
--- a/cbuildbot/validation_pool_unittest.py
+++ b/cbuildbot/validation_pool_unittest.py
@@ -16,7 +16,6 @@
 import mox
 import os
 import pickle
-import random
 import tempfile
 import time
 
@@ -878,10 +877,7 @@
   def MakeFailure(self, patch, inflight=True):
     return cros_patch.ApplyPatchException(patch, inflight=inflight)
 
-  def GetPool(self, changes, applied=(), tot=(), inflight=(),
-              max_change_count=None, **kwargs):
-    if not max_change_count:
-      max_change_count = len(changes)
+  def GetPool(self, changes, applied=(), tot=(), inflight=(), **kwargs):
 
     pool = self.MakePool(changes=changes, fake_db=self.fake_db, **kwargs)
     applied = list(applied)
@@ -889,8 +885,7 @@
     inflight = [self.MakeFailure(x, inflight=True) for x in inflight]
     # pylint: disable=E1120,E1123
     validation_pool.PatchSeries.Apply(
-        changes, manifest=mox.IgnoreArg(), max_change_count=max_change_count
-        ).AndReturn((applied, tot, inflight))
+        changes, manifest=mox.IgnoreArg()).AndReturn((applied, tot, inflight))
 
     for patch in applied:
       pool.HandleApplySuccess(patch, mox.IgnoreArg()).AndReturn(None)
@@ -1138,8 +1133,7 @@
 
     # pylint: disable=E1120,E1123
     validation_pool.PatchSeries.Apply(
-        patches, manifest=mox.IgnoreArg(),
-        max_change_count=len(patches)).AndRaise(MyException)
+        patches, manifest=mox.IgnoreArg()).AndRaise(MyException)
     errors = [mox.Func(functools.partial(VerifyCQError, x)) for x in patches]
     pool._HandleApplyFailure(errors).AndReturn(None)
 
@@ -1366,47 +1360,72 @@
 
     self.assertEqual(slave_pool._GetFailStreak(), 0)
 
-  def testApplyWithTreeNotOpen(self):
+  def testFilterChangesForThrottledTree(self):
     """Tests that we can correctly apply exponential fallback."""
     patches = self.GetPatches(4)
-
-    # We mock out the shuffle so that we can deterministically test.
-    self.mox.StubOutWithMock(random, 'shuffle')
     self.mox.StubOutWithMock(validation_pool.ValidationPool, '_GetFailStreak')
 
-    slave_pool = self.GetPool(changes=patches, applied=patches[:2],
-                              max_change_count=2,
-                              tree_was_open=False, handlers=True)
-    random.shuffle(patches) # Mock.
+    #
+    # Test when tree is open.
+    #
+    self.mox.ReplayAll()
+
+    # Perform test.
+    slave_pool = self.MakePool(changes=patches, tree_was_open=True)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
+    self.assertEqual(len(slave_pool.changes), 4)
+    self.mox.VerifyAll()
+    self.mox.ResetAll()
+
+    #
+    # Test when tree is closed with a streak of 1.
+    #
+
     # pylint: disable=no-value-for-parameter
     validation_pool.ValidationPool._GetFailStreak().AndReturn(1)
-
     self.mox.ReplayAll()
-    self.runApply(slave_pool, True)
+
+    # Perform test.
+    slave_pool = self.MakePool(changes=patches, tree_was_open=False)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
     self.assertEqual(len(slave_pool.changes), 2)
     self.mox.VerifyAll()
     self.mox.ResetAll()
 
-    slave_pool = self.GetPool(changes=patches, applied=patches[:1],
-                              max_change_count=1,
-                              tree_was_open=False, handlers=True)
-    random.shuffle(patches) # Mock.
-    validation_pool.ValidationPool._GetFailStreak().AndReturn(2)
+    #
+    # Test when tree is closed with a streak of 2.
+    #
 
+    # pylint: disable=no-value-for-parameter
+    validation_pool.ValidationPool._GetFailStreak().AndReturn(2)
     self.mox.ReplayAll()
-    self.runApply(slave_pool, True)
+
+    # Perform test.
+    slave_pool = self.MakePool(changes=patches, tree_was_open=False)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
     self.assertEqual(len(slave_pool.changes), 1)
     self.mox.VerifyAll()
     self.mox.ResetAll()
 
-    slave_pool = self.GetPool(changes=patches, applied=patches[:1],
-                              max_change_count=1,
-                              tree_was_open=False, handlers=True)
-    random.shuffle(patches) # Mock.
-    validation_pool.ValidationPool._GetFailStreak().AndReturn(10)
+    #
+    # Test when tree is closed with a streak of many.
+    #
 
+    # pylint: disable=no-value-for-parameter
+    validation_pool.ValidationPool._GetFailStreak().AndReturn(200)
     self.mox.ReplayAll()
-    self.runApply(slave_pool, True)
+
+    # Perform test.
+    slave_pool = self.MakePool(changes=patches, tree_was_open=False)
+    slave_pool.FilterChangesForThrottledTree()
+
+    # Validate results.
     self.assertEqual(len(slave_pool.changes), 1)
     self.mox.VerifyAll()