blob: a06dcd60eafe9dedb6698cc142d60116ce18495a [file] [log] [blame]
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import random
import time
import uuid
from vts.runners.host import keys
from vts.utils.python.gcs import gcs_api_utils
from vts.utils.python.web import feature_utils
"""
Stage 1: FETCH_ONE_AND_FEED
The stage 1 algorithm collects the corpus output generated from the fuzz test.
Then, the algorithm chooses by random one of the generated seeds in the
next round as input.
Stage 2: FETCH_CRASH_AND_FEED
The stage 2 algorithm classifies generated corpus output into two priorities:
high priority and regular priority. Corpus strings created during a fuzz test
run that revealed a crash will be given a high priority.
On the other hand, corpus strings created during a fuzz test run that did
not lead to a crash will be given the regular priority.
Stage 3: FETCH_ALL_AND_REPEAT
TBA.
"""
FETCH_ONE_AND_FEED = 1
FETCH_CRASH_AND_FEED = 2
FETCH_ALL_AND_REPEAT = 3
SCHEDULING_ALGORITHM = FETCH_ONE_AND_FEED
MEASURE_CORPUS = True
CORPUS_STATES = [
'corpus_seed_high', 'corpus_seed', 'corpus_seed_low', 'corpus_inuse',
'corpus_complete', 'corpus_crash', 'corpus_error', 'corpus_trigger'
]
CORPUS_PRIORITIES = ['corpus_seed_high', 'corpus_seed', 'corpus_seed_low']
class CorpusManager(feature_utils.Feature):
"""Manages corpus for fuzzing.
Features include:
Fetching corpus input from GCS to host.
Uploading corpus output from host to GCS.
Classifying corpus output into different priorities.
Moving corpus between different states (seed, inuse, complete).
Attributes:
_TOGGLE_PARAM: String, the name of the parameter used to toggle the feature.
_REQUIRED_PARAMS: list, the list of parameter names that are required.
_OPTIONAL_PARAMS: list, the list of parameter names that are optional.
_key_path: string, path to the json path.
_bucket_name: string, name of the Google Cloud Storage bucket used.
_gcs_api_utils: GcsApiUtils object, used to communicate with GCS.
_gcs_path: string, path to the upper most level corpus directory in GCS.
"""
_TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_LOG_UPLOADING
_REQUIRED_PARAMS = [
keys.ConfigKeys.IKEY_SERVICE_JSON_PATH,
keys.ConfigKeys.IKEY_FUZZING_GCS_BUCKET_NAME
]
_OPTIONAL_PARAMS = []
def __init__(self, user_params, dut=None):
"""Initializes the gcs util provider.
Args:
user_params: A dictionary from parameter name (String) to parameter value.
dut: The Android device being tested.
"""
self.ParseParameters(
toggle_param_name=self._TOGGLE_PARAM,
required_param_names=self._REQUIRED_PARAMS,
optional_param_names=self._OPTIONAL_PARAMS,
user_params=user_params)
if self.enabled:
self._key_path = self.service_key_json_path
self._bucket_name = self.fuzzing_gcs_bucket_name
self._gcs_api_utils = gcs_api_utils.GcsApiUtils(
self._key_path, self._bucket_name)
self._gcs_path = 'corpus'
if dut is not None:
branch = dut.build_alias.split('.')[0]
model = dut.product_type
self._gcs_path = os.path.join(self._gcs_path, branch, model)
def FetchCorpusSeed(self, test_name, local_temp_dir):
"""Fetches seed corpus of the corresponding test from the GCS directory.
Args:
test_name: string, name of the current fuzzing test.
local_temp_dir: string, path to temporary directory for this test
on the host machine.
Returns:
inuse_seed, GCS file path of the seed in use for test case
if fetch was successful.
None otherwise.
"""
if self.enabled:
logging.debug('Attempting to fetch corpus seed for %s.', test_name)
else:
return None
if SCHEDULING_ALGORITHM == FETCH_ONE_AND_FEED:
inuse_seed = self._FetchCorpusSeedFromPriority(
test_name, local_temp_dir, 'corpus_seed')
return inuse_seed
elif SCHEDULING_ALGORITHM == FETCH_CRASH_AND_FEED:
for CORPUS_PRIORITY in CORPUS_PRIORITIES:
inuse_seed = self._FetchCorpusSeedFromPriority(
test_name, local_temp_dir, CORPUS_PRIORITY)
if inuse_seed is not None:
return inuse_seed
return None
def _FetchCorpusSeedFromPriority(self, test_name, local_temp_dir,
CORPUS_PRIORITY):
"""Fetches 1 seed corpus from a corpus seed directory with the given priority.
In GCS, moves the seed from corpus_seed directory to corpus_inuse directory.
From GCS to host, downloads 1 corpus seed from corpus_inuse directory
to {temp_dir}_{test_name}_corpus_seed in host machine.
Args:
test_name: string, name of the current fuzzing test.
local_temp_dir: string, path to temporary directory for this test
on the host machine.
CORPUS_PRIORITY: string, priority of the given directory.
Returns:
inuse_seed, GCS file path of the seed in use for test case
if fetch was successful.
None otherwise.
"""
corpus_seed_dir = self._GetDirPaths(CORPUS_PRIORITY, test_name)
num_try = 0
while num_try < 10:
seed_list = self._gcs_api_utils.ListFilesWithPrefix(
corpus_seed_dir)
if len(seed_list) == 0:
logging.info('No corpus available to fetch from %s.',
corpus_seed_dir)
return None
target_seed = seed_list[random.randint(0, len(seed_list) - 1)]
inuse_seed = self._GetFilePaths('corpus_inuse', test_name,
target_seed)
move_successful = self._gcs_api_utils.MoveFile(
target_seed, inuse_seed, False)
if move_successful:
local_dest_folder = self._gcs_api_utils.PrepareDownloadDestination(
corpus_seed_dir, local_temp_dir)
dest_file_path = os.path.join(local_dest_folder,
os.path.basename(target_seed))
try:
self._gcs_api_utils.DownloadFile(inuse_seed,
dest_file_path)
logging.info('Successfully fetched corpus seed from %s.',
corpus_seed_dir)
except:
logging.error('Download failed, retrying.')
continue
return inuse_seed
else:
num_try += 1
logging.debug('move try %d failed, retrying.', num_try)
continue
def UploadCorpusOutDir(self, test_name, local_temp_dir):
"""Uploads the corpus output source directory in host to GCS.
First, uploads the corpus output sorce directory in host to
its corresponding incoming directory in GCS.
Then, calls _ClassifyPriority function to classify each of
newly generated corpus by its priority.
Empty directory can be handled in the case no interesting corpus
was generated.
Args:
test_name: string, name of the current fuzzing test.
local_temp_dir: string, path to temporary directory for this test
on the host machine.
Returns:
True if successfully uploaded.
False otherwise.
"""
if self.enabled:
logging.debug('Attempting to upload corpus output for %s.',
test_name)
else:
return False
local_corpus_out_dir = self._GetDirPaths('local_corpus_out', test_name,
local_temp_dir)
incoming_parent_dir = self._GetDirPaths('incoming_parent', test_name,
local_temp_dir)
if self._gcs_api_utils.UploadDir(local_corpus_out_dir,
incoming_parent_dir):
logging.info('Successfully uploaded corpus output to %s.',
incoming_parent_dir)
num_unique_corpus = self._ClassifyPriority(test_name,
local_temp_dir)
if MEASURE_CORPUS:
self._UploadCorpusMeasure(test_name, local_temp_dir,
num_unique_corpus)
return True
else:
logging.error('Failed to upload corpus output for %s.', test_name)
return False
def _UploadCorpusMeasure(self, test_name, local_temp_dir,
num_unique_corpus):
"""Uploads the corpus measurement file to GCS.
Args:
test_name: string, name of the current fuzzing test.
local_temp_dir: string, path to temporary directory for this test
on the host machine.
num_unique_corpus: integer, number of unique corpus generated.
"""
local_measure_file = os.path.join(
local_temp_dir,
'%s_%s.txt' % (test_name, time.strftime('%Y-%m-%d-%H%M')))
with open(local_measure_file, 'w') as f:
f.write(str(num_unique_corpus))
remote_measure_file = os.path.join(
self._GetDirPaths('corpus_measure', test_name),
os.path.basename(local_measure_file))
self._gcs_api_utils.UploadFile(local_measure_file, remote_measure_file)
def InuseToDest(self, test_name, inuse_seed, destination):
"""Moves the a corpus from corpus_inuse to destination.
Destinations are as follows:
corpus_seed directory is the directory for corpus that are ready
to be used as input corpus seed.
corpus_complete directory is the directory for corpus that have
been used as an input, succeeded, and the test exited normally.
corpus_crash directory is the directory for corpus whose mutant have
caused a fuzz test crash.
corpus_error directory is the directory for corpus that have
caused an error in executing the fuzz test.
Args:
test_name: string, name of the current test.
inuse_seed: string, path to corpus seed currently in use.
destination: string, destination of the seed.
Returns:
True if move was successful.
False otherwise.
"""
if not self.enabled:
return False
if self._gcs_api_utils.FileExists(inuse_seed):
if destination in CORPUS_STATES:
corpus_destination = self._GetFilePaths(
destination, test_name, inuse_seed)
return self._gcs_api_utils.MoveFile(inuse_seed,
corpus_destination, True)
else:
logging.error(
'destination is not one of the predefined states')
return False
else:
logging.error('seed in use %s does not exist', inuse_seed)
return False
def _CorpusIsDuplicate(self, test_name, incoming_seed):
"""Checks if the newly generated corpus is a duplicate corpus.
Args:
test_name: string, name of the current test.
incoming_seed: string, path to the incoming seed in GCS.
Returns:
True if the incoming corpus already exists in the GCS bucket.
False otherwise.
"""
for file_type in CORPUS_STATES:
remote_corpus = self._GetFilePaths(file_type, test_name,
incoming_seed)
logging.debug(remote_corpus)
if self._gcs_api_utils.FileExists(remote_corpus):
logging.info('Corpus %s already exists.', remote_corpus)
return True
return False
def _ClassifyPriority(self, test_name, local_temp_dir):
"""Calls the appropriate classification algorithm.
Args:
test_name: string, name of the current test.
local_temp_dir: string, path to temporary directory for this
test on the host machine.
Returns:
num_unique_corpus: integer, number of unique corpus generated.
"""
if SCHEDULING_ALGORITHM == FETCH_ONE_AND_FEED:
return self._ClassifyPriority1(test_name, local_temp_dir)
elif SCHEDULING_ALGORITHM == FETCH_CRASH_AND_FEED:
return self._ClassifyPriority2(test_name, local_temp_dir)
def _ClassifyPriority1(self, test_name, local_temp_dir):
"""Classifies each of newly genereated corpus into different priorities.
Uses 1 priority level: corpus_seed.
This algorithm is a naive implementation.
Args:
test_name: string, name of the current test.
local_temp_dir: string, path to temporary directory for this
test on the host machine.
Returns:
num_unique_corpus: integer, number of unique corpus generated.
"""
incoming_child_dir = self._GetDirPaths('incoming_child', test_name,
local_temp_dir)
num_unique_corpus = 0
for incoming_seed in self._gcs_api_utils.ListFilesWithPrefix(
incoming_child_dir):
if self._CorpusIsDuplicate(test_name, incoming_seed):
logging.info('Deleting duplicate corpus.')
self._gcs_api_utils.DeleteFile(incoming_seed)
continue
num_unique_corpus += 1
logging.info(
'Corpus string %s was classified as regular priority.',
incoming_seed)
corpus_destination = self._GetFilePaths('corpus_seed', test_name,
incoming_seed)
self._gcs_api_utils.MoveFile(incoming_seed, corpus_destination,
True)
return num_unique_corpus
def _ClassifyPriority2(self, test_name, local_temp_dir):
"""Classifies each of newly genereated corpus into different priorities.
Uses 2 priority levels: corpus_seed_high, corpus_seed.
This algorithm uses crash occurrence as its classification criteria.
Args:
test_name: string, name of the current test.
local_temp_dir: string, path to temporary directory for this
test on the host machine.
Returns:
num_unique_corpus: integer, number of unique corpus generated.
"""
triggered_corpus = os.path.join(
self._GetDirPaths('local_corpus_trigger', test_name,
local_temp_dir), 'crash_report')
high_priority = os.path.exists(triggered_corpus)
incoming_child_dir = self._GetDirPaths('incoming_child', test_name,
local_temp_dir)
num_unique_corpus = 0
for incoming_seed in self._gcs_api_utils.ListFilesWithPrefix(
incoming_child_dir):
if self._CorpusIsDuplicate(test_name, incoming_seed):
logging.info('Deleting duplicate corpus.')
self._gcs_api_utils.DeleteFile(incoming_seed)
continue
num_unique_corpus += 1
if high_priority:
logging.info(
'corpus string %s was classified as high priority.',
incoming_seed)
corpus_destination = self._GetFilePaths(
'corpus_seed_high', test_name, incoming_seed)
else:
logging.info(
'corpus string %s was classified as regular priority.',
incoming_seed)
corpus_destination = self._GetFilePaths(
'corpus_seed', test_name, incoming_seed)
self._gcs_api_utils.MoveFile(incoming_seed, corpus_destination,
True)
if os.path.exists(triggered_corpus):
corpus_destination = self._GetFilePaths(
'corpus_trigger', test_name, triggered_corpus)
corpus_destination += str(uuid.uuid4())
self._gcs_api_utils.UploadFile(triggered_corpus,
corpus_destination)
return num_unique_corpus
def _GetDirPaths(self, dir_type, test_name, local_temp_dir=None):
"""Generates the required directory path name for the given information.
Args:
dir_type: string, type of the directory requested.
test_name: string, name of the current test.
local_temp_dir: string, path to temporary directory for this
test on the host machine.
Returns:
dir_path, generated directory path if dir_type supported.
Empty string if dir_type not supported.
"""
dir_path = ''
# ex: corpus/ILight/ILight_corpus_seed
if dir_type in CORPUS_PRIORITIES:
dir_path = os.path.join(self._gcs_path, test_name,
'%s_%s' % (test_name, dir_type))
# ex: corpus/ILight/ILight_corpus_measure
elif dir_type == 'corpus_measure':
dir_path = os.path.join(self._gcs_path, test_name,
'%s_%s' % (test_name, dir_type))
# ex: corpus/ILight/incoming/tmpV1oPTp
elif dir_type == 'incoming_parent':
dir_path = os.path.join(self._gcs_path, test_name, 'incoming',
os.path.basename(local_temp_dir))
# ex: corpus/ILight/incoming/tmpV1oPTp/ILight_corpus_out
elif dir_type == 'incoming_child':
dir_path = os.path.join(self._gcs_path, test_name, 'incoming',
os.path.basename(local_temp_dir),
'%s_corpus_out' % test_name)
# ex: /tmp/tmpV1oPTp/ILight_corpus_out
elif dir_type == 'local_corpus_out':
dir_path = os.path.join(local_temp_dir,
'%s_corpus_out' % test_name)
# ex: /tmp/tmpV1oPTp/ILight_corpus_trigger
elif dir_type == 'local_corpus_trigger':
dir_path = os.path.join(local_temp_dir,
'%s_corpus_trigger' % test_name)
return dir_path
def _GetFilePaths(self, file_type, test_name, seed=None):
"""Generates the required file path name for the given information.
Args:
file_type: string, type of the file requested.
test_name: string, name of the current test.
seed: string, seed to base new file path name upon.
Returns:
file_path, generated file path if file_type supported.
Empty string if file_type not supported.
"""
# ex: corpus/[build tag]/[device]/ILight/ILight_corpus_seed/20f5d9b8cd53881c9ff0205c9fdc5d283dc9fc68
if file_type in CORPUS_STATES:
file_path = os.path.join(self._gcs_path, test_name,
'%s_%s' % (test_name, file_type),
os.path.basename(seed))
return file_path
else:
logging.error('invalid file_type argument entered.')
return ''