| #!/usr/bin/env python |
| # |
| # Copyright (C) 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import logging |
| import os |
| import shutil |
| |
| from vts.runners.host import asserts |
| from vts.runners.host import base_test |
| from vts.runners.host import const |
| from vts.runners.host import keys |
| from vts.runners.host import test_runner |
| from vts.utils.python.controllers import adb |
| from vts.utils.python.controllers import android_device |
| from vts.utils.python.common import list_utils |
| |
| from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config |
| from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase |
| |
| |
| class LibFuzzerTest(base_test.BaseTestClass): |
| """Runs LLVM libfuzzer tests on target. |
| |
| Attributes: |
| _dut: AndroidDevice, the device under test as config. |
| """ |
| |
| def setUpClass(self): |
| """Creates a remote shell instance, and copies data files.""" |
| required_params = [ |
| keys.ConfigKeys.IKEY_DATA_FILE_PATH, |
| keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, |
| ] |
| self.getUserParams(required_params) |
| |
| logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH, |
| self.data_file_path) |
| logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, |
| self.binary_test_source) |
| |
| self._dut = self.android_devices[0] |
| self._dut.stop() |
| self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR) |
| |
| def tearDownClass(self): |
| """Deletes all copied data.""" |
| self._dut.adb.shell('rm -rf %s' % config.FUZZER_TEST_DIR) |
| self._dut.start() |
| |
| def PushFiles(self, src): |
| """adb pushes test case file to target.""" |
| push_src = os.path.join(self.data_file_path, src) |
| push_dst = config.FUZZER_TEST_DIR |
| self._dut.adb.push('%s %s' % (push_src, push_dst), no_except=True) |
| logging.info('Adb pushed: %s \nto: %s', push_src, push_dst) |
| return push_dst |
| |
| def CreateTestCases(self): |
| """Creates LibFuzzerTestCase instances. |
| |
| Returns: |
| LibFuzzerTestCase list. |
| """ |
| test_cases = map( |
| lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}), |
| self.binary_test_source) |
| return test_cases |
| |
| def CreateCorpusOut(self, test_case): |
| """Creates corpus output directory on the target. |
| |
| Args: |
| test_case: LibFuzzerTestCase object, current test case. |
| |
| Throws: |
| throws an AdbError when there is an error in adb operations. |
| """ |
| corpus_out = test_case.GetCorpusOutDir() |
| self._dut.adb.shell('mkdir %s -p' % corpus_out) |
| |
| def RetrieveCorpusSeed(self, test_case): |
| """Retrieves corpus seed directory from GCS to the target. |
| |
| Args: |
| test_case: LibFuzzerTestCase object, current test case. |
| |
| Throws: |
| throws an AdbError when there is an error in adb operations. |
| |
| Returns: |
| inuse_seed, the file path of the inuse seed in GCS, if fetch succeeded. |
| None, otherwise. |
| """ |
| inuse_seed = self._corpus_manager.FetchCorpusSeed( |
| test_case._test_name, self._temp_dir) |
| local_corpus_seed_dir = os.path.join( |
| self._temp_dir, '%s_corpus_seed' % test_case._test_name) |
| if os.path.exists(local_corpus_seed_dir) and os.listdir( |
| local_corpus_seed_dir): |
| self._dut.adb.push(local_corpus_seed_dir, config.FUZZER_TEST_DIR) |
| else: |
| corpus_seed = test_case.GetCorpusSeedDir() |
| self._dut.adb.shell('mkdir %s -p' % corpus_seed) |
| return inuse_seed |
| |
| def AnalyzeGeneratedCorpus(self, test_case): |
| """Analyzes the generated corpus body. |
| |
| Args: |
| test_case: LibFuzzerTestCase object. |
| |
| Returns: |
| number of newly generated corpus strings, if the out directory exists. |
| 0, otherwise. |
| """ |
| logging.info('temporary directory for this test: %s', self._temp_dir) |
| pulled_corpus_out_dir = os.path.join( |
| self._temp_dir, os.path.basename(test_case.GetCorpusOutDir())) |
| if os.path.exists(pulled_corpus_out_dir): |
| logging.info('corpus out directory pulled from target: %s', |
| pulled_corpus_out_dir) |
| pulled_corpus = os.listdir(pulled_corpus_out_dir) |
| logging.debug(pulled_corpus) |
| logging.info('generated corpus size: %d', len(pulled_corpus)) |
| return len(pulled_corpus) |
| else: |
| logging.error('corput out directory does not exist on the host.') |
| return 0 |
| |
| def EvaluateTestcase(self, test_case, result, inuse_seed): |
| """Evaluates the test result and moves the used seed accordingly. |
| |
| Args: |
| test_case: LibFuzzerTestCase object. |
| result: a result dict object returned by the adb shell command. |
| inuse_seed: the seed used as input to this test case. |
| |
| Raises: |
| signals.TestFailure when the testcase failed. |
| """ |
| return_codes = result.get('return_codes', None) |
| if return_codes == config.ExitCode.FUZZER_TEST_PASS: |
| logging.info( |
| 'adb shell fuzzing command exited normally with exitcode %d.', |
| result['return_codes']) |
| if inuse_seed is not None: |
| self._corpus_manager.InuseToDest(test_case._test_name, |
| inuse_seed, 'corpus_complete') |
| elif return_codes == config.ExitCode.FUZZER_TEST_FAIL: |
| logging.info( |
| 'adb shell fuzzing command exited normally with exitcode %d.', |
| result['return_codes']) |
| if inuse_seed is not None: |
| self._corpus_manager.InuseToDest(test_case._test_name, |
| inuse_seed, 'corpus_crash') |
| else: |
| logging.error('adb shell fuzzing command exited abnormally.') |
| if inuse_seed is not None: |
| self._corpus_manager.InuseToDest(test_case._test_name, |
| inuse_seed, 'corpus_error') |
| |
| def RunTestcase(self, test_case): |
| """Runs the given test case and asserts the result. |
| |
| Args: |
| test_case: LibFuzzerTestCase object. |
| """ |
| self.PushFiles(test_case.bin_host_path) |
| self.CreateCorpusOut(test_case) |
| inuse_seed = self.RetrieveCorpusSeed(test_case) |
| if inuse_seed == 'locked': |
| # skip this test case |
| logging.warning('test case locked, skipping testcase %s.', |
| test_case.test_name) |
| return |
| |
| fuzz_cmd = '"%s"' % test_case.GetRunCommand() |
| |
| result = {} |
| try: |
| result = self._dut.adb.shell(fuzz_cmd, no_except=True) |
| except adb.AdbError as e: |
| logging.exception(e) |
| |
| corpus_trigger_dir = os.path.join(self._temp_dir, |
| test_case.GetCorpusTriggerDir()) |
| os.makedirs(corpus_trigger_dir) |
| try: |
| self._dut.adb.pull(config.FUZZER_TEST_CRASH_REPORT, |
| corpus_trigger_dir) |
| except adb.AdbError as e: |
| logging.exception(e) |
| logging.error('crash report was not created during test run.') |
| |
| try: |
| self._dut.adb.pull(test_case.GetCorpusOutDir(), self._temp_dir) |
| self.AnalyzeGeneratedCorpus(test_case) |
| self._corpus_manager.UploadCorpusOutDir(test_case._test_name, |
| self._temp_dir) |
| except adb.AdbError as e: |
| logging.exception(e) |
| logging.error('Device failed. Removing lock from GCS.') |
| self._corpus_manager.remove_lock(test_case._test_name) |
| |
| if inuse_seed is not 'directory': |
| self.EvaluateTestcase(test_case, result, inuse_seed) |
| self.AssertTestResult(test_case, result) |
| |
| def LogCrashReport(self, test_case): |
| """Logs crash-causing fuzzer input. |
| |
| Reads the crash report file and logs the contents in format: |
| '\x01\x23\x45\x67\x89\xab\xcd\xef' |
| |
| Args: |
| test_case: LibFuzzerTestCase object |
| """ |
| touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT |
| self._dut.adb.shell(touch_cmd) |
| |
| # output is string of a hexdump from crash report file. |
| # From the example above, output would be '0123456789abcdef'. |
| xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT |
| output = self._dut.adb.shell(xxd_cmd) |
| remove_chars = ['\r', '\t', '\n', ' '] |
| for char in remove_chars: |
| output = output.replace(char, '') |
| |
| crash_report = '' |
| # output is guaranteed to be even in length since its a hexdump. |
| for offset in xrange(0, len(output), 2): |
| crash_report += '\\x%s' % output[offset:offset + 2] |
| |
| logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"', |
| test_case.test_name, crash_report) |
| |
| # TODO(trong): differentiate between crashes and sanitizer rule violations. |
| def AssertTestResult(self, test_case, result): |
| """Asserts that test case finished as expected. |
| |
| Checks that device is in responsive state. If not, waits for boot |
| then reports test as failure. If it is, asserts that all test commands |
| returned exit code 0. |
| |
| Args: |
| test_case: LibFuzzerTestCase object |
| result: dict(str, str, int), command results from shell. |
| """ |
| logging.info('Test case results.') |
| logging.info('stdout: %s' % result[const.STDOUT]) |
| logging.info('stderr: %s' % result[const.STDERR]) |
| logging.info('exit code: %s' % result[const.EXIT_CODE]) |
| if not self._dut.hasBooted(): |
| self._dut.waitForBootCompletion() |
| asserts.fail('%s left the device in unresponsive state.' % |
| test_case.test_name) |
| |
| exit_code = result[const.EXIT_CODE] |
| if exit_code == config.ExitCode.FUZZER_TEST_FAIL: |
| #TODO(b/64123979): once normal fail happens, examine. |
| self.LogCrashReport(test_case) |
| asserts.fail('%s failed normally.' % test_case.test_name) |
| elif exit_code != config.ExitCode.FUZZER_TEST_PASS: |
| asserts.fail('%s failed abnormally.' % test_case.test_name) |
| |
| def tearDownClass(self): |
| """Removes the temporary directory used for corpus management.""" |
| logging.debug('Temporary directory %s is being deleted', |
| self._temp_dir) |
| try: |
| shutil.rmtree(self._temp_dir) |
| except OSError as e: |
| logging.exception(e) |
| |
| def generateFuzzerTests(self): |
| """Runs fuzzer tests.""" |
| self.runGeneratedTests( |
| test_func=self.RunTestcase, |
| settings=self.CreateTestCases(), |
| name_func=lambda x: x.test_name) |
| |
| |
| if __name__ == '__main__': |
| test_runner.main() |