Add stage 2 scheduling algorithm as option.

Test: HAL Fuzzing Test
Bug: 38283900
Change-Id: Ifa45d19476a9b131a0f4e29298430d98814675f4
diff --git a/utils/python/fuzzer/corpus_manager.py b/utils/python/fuzzer/corpus_manager.py
index 9bfc0e5..4edab67 100644
--- a/utils/python/fuzzer/corpus_manager.py
+++ b/utils/python/fuzzer/corpus_manager.py
@@ -17,15 +17,39 @@
 import logging
 import os
 import random
+import time
+import uuid
 
 from vts.runners.host import keys
 from vts.utils.python.gcs import gcs_api_utils
 from vts.utils.python.web import feature_utils
+"""
+Stage 1: FETCH_ONE_AND_FEED
+The stage 1 algorithm collects the corpus output generated from the fuzz test.
+Then, the algorithm chooses by random one of the generated seeds in the
+next round as input.
 
+Stage 2: FETCH_CRASH_AND_FEED
+The stage 2 algorithm classifies generated corpus output into two priorities:
+high priority and regular priority. Corpus strings created during a fuzz test
+run that revealed a crash will be given a high priority.
+On the other hand, corpus strings created during a fuzz test run that did
+not lead to a crash will be given the regular priority.
+
+Stage 3: FETCH_ALL_AND_REPEAT
+TBA.
+"""
+FETCH_ONE_AND_FEED = 1
+FETCH_CRASH_AND_FEED = 2
+FETCH_ALL_AND_REPEAT = 3
+
+SCHEDULING_ALGORITHM = FETCH_ONE_AND_FEED
+MEASURE_CORPUS = True
 CORPUS_STATES = [
-    'corpus_seed', 'corpus_inuse', 'corpus_complete', 'corpus_crash',
-    'corpus_error'
+    'corpus_seed_high', 'corpus_seed', 'corpus_seed_low', 'corpus_inuse',
+    'corpus_complete', 'corpus_crash', 'corpus_error', 'corpus_trigger'
 ]
+CORPUS_PRIORITIES = ['corpus_seed_high', 'corpus_seed', 'corpus_seed_low']
 
 
 class CorpusManager(feature_utils.Feature):
@@ -68,14 +92,8 @@
             self._gcs_api_utils = gcs_api_utils.GcsApiUtils(
                 self._key_path, self._bucket_name)
 
-    #TODO(b/64022625): fetch from the highest priority
     def FetchCorpusSeed(self, test_name, local_temp_dir):
-        """Fetches 1 seed corpus from the corpus seed directory of the corresponding
-           test from the GCS directory.
-
-        In GCS, moves the seed from corpus_seed directory to corpus_inuse directory.
-        From GCS to host, downloads 1 corpus seed from corpus_inuse directory
-        to {temp_dir}_{test_name}_corpus_seed in host machine.
+        """Fetches seed corpus of the corresponding test from the GCS directory.
 
         Args:
             test_name: string, name of the current fuzzing test.
@@ -92,7 +110,38 @@
         else:
             return None
 
-        corpus_seed_dir = self._GetDirPaths('corpus_seed', test_name)
+        if SCHEDULING_ALGORITHM == FETCH_ONE_AND_FEED:
+            inuse_seed = self._FetchCorpusSeedFromPriority(
+                test_name, local_temp_dir, 'corpus_seed')
+            return inuse_seed
+        elif SCHEDULING_ALGORITHM == FETCH_CRASH_AND_FEED:
+            for CORPUS_PRIORITY in CORPUS_PRIORITIES:
+                inuse_seed = self._FetchCorpusSeedFromPriority(
+                    test_name, local_temp_dir, CORPUS_PRIORITY)
+                if inuse_seed is not None:
+                    return inuse_seed
+            return None
+
+    def _FetchCorpusSeedFromPriority(self, test_name, local_temp_dir,
+                                     CORPUS_PRIORITY):
+        """Fetches 1 seed corpus from a corpus seed directory with the given priority.
+
+        In GCS, moves the seed from corpus_seed directory to corpus_inuse directory.
+        From GCS to host, downloads 1 corpus seed from corpus_inuse directory
+        to {temp_dir}_{test_name}_corpus_seed in host machine.
+
+        Args:
+            test_name: string, name of the current fuzzing test.
+            local_temp_dir: string, path to temporary directory for this test
+                            on the host machine.
+            CORPUS_PRIORITY: string, priority of the given directory.
+
+        Returns:
+            inuse_seed, GCS file path of the seed in use for test case
+                        if fetch was successful.
+            None otherwise.
+        """
+        corpus_seed_dir = self._GetDirPaths(CORPUS_PRIORITY, test_name)
         num_try = 0
         while num_try < 10:
             seed_list = self._gcs_api_utils.ListFilesWithPrefix(
@@ -139,8 +188,9 @@
         was generated.
 
         Args:
-            src_dir: string, source directory in local.
-            dest_dir: string, destination directory in GCS.
+            test_name: string, name of the current fuzzing test.
+            local_temp_dir: string, path to temporary directory for this test
+                            on the host machine.
 
         Returns:
             True if successfully uploaded.
@@ -160,12 +210,36 @@
                                          incoming_parent_dir):
             logging.info('Successfully uploaded corpus output to %s.',
                          incoming_parent_dir)
-            self._ClassifyPriority(test_name, local_temp_dir)
+            num_unique_corpus = self._ClassifyPriority(test_name,
+                                                       local_temp_dir)
+            if MEASURE_CORPUS:
+                self._UploadCorpusMeasure(test_name, local_temp_dir,
+                                          num_unique_corpus)
             return True
         else:
             logging.error('Failed to upload corpus output for %s.', test_name)
             return False
 
+    def _UploadCorpusMeasure(self, test_name, local_temp_dir,
+                             num_unique_corpus):
+        """Uploads the corpus measurement file to GCS.
+
+        Args:
+            test_name: string, name of the current fuzzing test.
+            local_temp_dir: string, path to temporary directory for this test
+                            on the host machine.
+            num_unique_corpus: integer, number of unique corpus generated.
+        """
+        local_measure_file = os.path.join(
+            local_temp_dir,
+            '%s_%s.txt' % (test_name, time.strftime('%Y-%m-%d-%H%M')))
+        with open(local_measure_file, 'w') as f:
+            f.write(str(num_unique_corpus))
+        remote_measure_file = os.path.join(
+            self._GetDirPaths('corpus_measure', test_name),
+            os.path.basename(local_measure_file))
+        self._gcs_api_utils.UploadFile(local_measure_file, remote_measure_file)
+
     def InuseToDest(self, test_name, inuse_seed, destination):
         """Moves the a corpus from corpus_inuse to destination.
 
@@ -205,98 +279,129 @@
             logging.error('seed in use %s does not exist', inuse_seed)
             return False
 
-    def InuseToSeed(self, test_name, inuse_seed):
-        """Moves the a corpus from corpus_inuse to corpus_seed.
-
-        {test_name}_corpus_seed directory is the directory for corpus that are ready
-        to be used as input corpus seed.
+    def _CorpusIsDuplicate(self, test_name, incoming_seed):
+        """Checks if the newly generated corpus is a duplicate corpus.
 
         Args:
-            test_name: name of the current test.
-            inuse_seed: path to corpus seed currently in use.
+            test_name: string, name of the current test.
+            incoming_seed: string, path to the incoming seed in GCS.
 
         Returns:
-            True, if move was successful.
-            False, if the inuse_seed file does not exist or move failed.
+            True if the incoming corpus already exists in the GCS bucket.
+            False otherwise.
         """
-        if not self.enabled:
-            return False
+        for file_type in CORPUS_STATES:
+            remote_corpus = self._GetFilePaths(file_type, test_name,
+                                               incoming_seed)
+            logging.debug(remote_corpus)
+            if self._gcs_api_utils.FileExists(remote_corpus):
+                logging.info('Corpus %s already exists.', remote_corpus)
+                return True
+        return False
 
-        if self._gcs_api_utils.FileExists(inuse_seed):
-            corpus_seed = self._GetFilePaths('corpus_seed', test_name,
-                                             inuse_seed)
-            return self._gcs_api_utils.MoveFile(inuse_seed, corpus_seed, True)
-        else:
-            logging.error('seed in use %s does not exist', inuse_seed)
-            return False
-
-    def InuseToComplete(self, test_name, inuse_seed):
-        """Moves the a corpus from corpus_inuse to corpus_complete.
-
-        {test_name}_corpus_complete directory is the directory for corpus that have
-        been used as an input and the test exited normally.
-
-        Args:
-            test_name: name of the current test.
-            inuse_seed: path to corpus seed currently in use.
-
-        Returns:
-            True, if move was successful.
-            False, if the inuse_seed file does not exist or move failed.
-        """
-        if not self.enabled:
-            return False
-
-        if self._gcs_api_utils.FileExists(inuse_seed):
-            corpus_complete = self._GetFilePaths('corpus_complete', test_name,
-                                                 inuse_seed)
-            return self._gcs_api_utils.MoveFile(inuse_seed, corpus_complete,
-                                                True)
-        else:
-            logging.error('seed in use %s does not exist.', inuse_seed)
-            return False
-
-    def InuseToCrash(self, test_name, inuse_seed):
-        """Moves the a corpus from corpus_inuse to corpus_crash.
-
-        {test_name}_corpus_crash directory is the directory for corpus that have
-        caused a fuzz test crash.
-
-        Args:
-            test_name: name of the current test.
-            inuse_seed: path to corpus seed currently in use.
-
-        Returns:
-            True, if move was successful.
-            False, if the inuse_seed file does not exist or move failed.
-        """
-        if not self.enabled:
-            return False
-
-        if self._gcs_api_utils.FileExists(inuse_seed):
-            corpus_crash = self._GetFilePaths('corpus_crash', test_name,
-                                              inuse_seed)
-            return self._gcs_api_utils.MoveFile(inuse_seed, corpus_crash, True)
-        else:
-            logging.error('seed in use %s does not exist.', inuse_seed)
-            return False
-
-    #TODO(b/64022625): smart algorithm for classifying corpus into different levels of priority
     def _ClassifyPriority(self, test_name, local_temp_dir):
-        """Classifies each of newly genereated corpus into different priorities.
+        """Calls the appropriate classification algorithm.
 
         Args:
             test_name: string, name of the current test.
             local_temp_dir: string, path to temporary directory for this
                             test on the host machine.
+
+        Returns:
+            num_unique_corpus: integer, number of unique corpus generated.
+        """
+        if SCHEDULING_ALGORITHM == FETCH_ONE_AND_FEED:
+            return self._ClassifyPriority1(test_name, local_temp_dir)
+        elif SCHEDULING_ALGORITHM == FETCH_CRASH_AND_FEED:
+            return self._ClassifyPriority2(test_name, local_temp_dir)
+
+    def _ClassifyPriority1(self, test_name, local_temp_dir):
+        """Classifies each of newly genereated corpus into different priorities.
+
+        Uses 1 priority level: corpus_seed.
+        This algorithm is a naive implementation.
+
+        Args:
+            test_name: string, name of the current test.
+            local_temp_dir: string, path to temporary directory for this
+                            test on the host machine.
+
+        Returns:
+            num_unique_corpus: integer, number of unique corpus generated.
         """
         incoming_child_dir = self._GetDirPaths('incoming_child', test_name,
                                                local_temp_dir)
+        num_unique_corpus = 0
         for incoming_seed in self._gcs_api_utils.ListFilesWithPrefix(
                 incoming_child_dir):
-            corpus_seed = self._GetFilePaths('corpus_seed', test_name,
-                                             incoming_seed)
-            self._gcs_api_utils.MoveFile(incoming_seed, corpus_seed, True)
+            if self._CorpusIsDuplicate(test_name, incoming_seed):
+                logging.info('Deleting duplicate corpus.')
+                self._gcs_api_utils.DeleteFile(incoming_seed)
+                continue
+
+            num_unique_corpus += 1
+            logging.info(
+                'Corpus string %s was classified as regular priority.',
+                incoming_seed)
+            corpus_destination = self._GetFilePaths('corpus_seed', test_name,
+                                                    incoming_seed)
+            self._gcs_api_utils.MoveFile(incoming_seed, corpus_destination,
+                                         True)
+
+        return num_unique_corpus
+
+    def _ClassifyPriority2(self, test_name, local_temp_dir):
+        """Classifies each of newly genereated corpus into different priorities.
+
+        Uses 2 priority levels: corpus_seed_high, corpus_seed.
+        This algorithm uses crash occurrence as its classification criteria.
+
+        Args:
+            test_name: string, name of the current test.
+            local_temp_dir: string, path to temporary directory for this
+                            test on the host machine.
+
+        Returns:
+            num_unique_corpus: integer, number of unique corpus generated.
+        """
+        triggered_corpus = os.path.join(
+            self._GetDirPaths('local_corpus_trigger', test_name,
+                              local_temp_dir), 'crash_report')
+        high_priority = os.path.exists(triggered_corpus)
+        incoming_child_dir = self._GetDirPaths('incoming_child', test_name,
+                                               local_temp_dir)
+        num_unique_corpus = 0
+        for incoming_seed in self._gcs_api_utils.ListFilesWithPrefix(
+                incoming_child_dir):
+            if self._CorpusIsDuplicate(test_name, incoming_seed):
+                logging.info('Deleting duplicate corpus.')
+                self._gcs_api_utils.DeleteFile(incoming_seed)
+                continue
+
+            num_unique_corpus += 1
+            if high_priority:
+                logging.info(
+                    'corpus string %s was classified as high priority.',
+                    incoming_seed)
+                corpus_destination = self._GetFilePaths(
+                    'corpus_seed_high', test_name, incoming_seed)
+            else:
+                logging.info(
+                    'corpus string %s was classified as regular priority.',
+                    incoming_seed)
+                corpus_destination = self._GetFilePaths(
+                    'corpus_seed', test_name, incoming_seed)
+            self._gcs_api_utils.MoveFile(incoming_seed, corpus_destination,
+                                         True)
+
+        if os.path.exists(triggered_corpus):
+            corpus_destination = self._GetFilePaths(
+                'corpus_trigger', test_name, triggered_corpus)
+            corpus_destination += str(uuid.uuid4())
+            self._gcs_api_utils.UploadFile(triggered_corpus,
+                                           corpus_destination)
+
+        return num_unique_corpus
 
     def _GetDirPaths(self, dir_type, test_name, local_temp_dir=None):
         """Generates the required directory path name for the given information.
@@ -313,9 +418,12 @@
         """
         dir_path = ''
 
-        # ex: corpus/ILight/ILight_corpus_seed
-        if dir_type == 'corpus_seed':
-            dir_path = 'corpus/%s/%s_corpus_seed' % (test_name, test_name)
+        # ex: corpus/ILight/ILight_corpus_seed_high
+        if dir_type in CORPUS_PRIORITIES:
+            dir_path = 'corpus/%s/%s_%s' % (test_name, test_name, dir_type)
+        # ex: corpus/ILight/ILight_corpus_measure
+        elif dir_type == 'corpus_measure':
+            dir_path = 'corpus/%s/%s_%s' % (test_name, test_name, dir_type)
         # ex: corpus/ILight/incoming/tmpV1oPTp
         elif dir_type == 'incoming_parent':
             dir_path = 'corpus/%s/incoming/%s' % (
@@ -328,6 +436,10 @@
         elif dir_type == 'local_corpus_out':
             dir_path = os.path.join(local_temp_dir,
                                     '%s_corpus_out' % test_name)
+        # ex: /tmp/tmpV1oPTp/ILight_corpus_trigger
+        elif dir_type == 'local_corpus_trigger':
+            dir_path = os.path.join(local_temp_dir,
+                                    '%s_corpus_trigger' % test_name)
 
         return dir_path