| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import logging |
| import os |
| |
| from vts.runners.host import asserts |
| from vts.runners.host import base_test |
| from vts.runners.host import const |
| from vts.runners.host import keys |
| from vts.runners.host import test_runner |
| from vts.utils.python.controllers import adb |
| |
| from vts.utils.python.common import list_utils |
| from vts.utils.python.os import path_utils |
| |
| from vts.testcases.template.llvmfuzzer_test import llvmfuzzer_test_config as config |
| |
| |
| class LLVMFuzzerTest(base_test.BaseTestClass): |
| """Runs fuzzer tests on target. |
| |
| Attributes: |
| _dut: AndroidDevice, the device under test as config |
| _testcases: string list, list of testcases to run |
| start_vts_agents: whether to start vts agents when registering new |
| android devices. |
| """ |
| start_vts_agents = False |
| |
| def setUpClass(self): |
| """Creates a remote shell instance, and copies data files.""" |
| required_params = [ |
| keys.ConfigKeys.IKEY_DATA_FILE_PATH, |
| config.ConfigKeys.FUZZER_CONFIGS |
| ] |
| self.getUserParams(required_params) |
| |
| self._testcases = map(lambda x: str(x), self.fuzzer_configs.keys()) |
| |
| logging.debug("Testcases: %s", self._testcases) |
| logging.debug("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, |
| self.data_file_path) |
| logging.debug("%s: %s", config.ConfigKeys.FUZZER_CONFIGS, |
| self.fuzzer_configs) |
| |
| self._dut = self.android_devices[0] |
| self._dut.adb.shell("mkdir %s -p" % config.FUZZER_TEST_DIR) |
| |
| def tearDownClass(self): |
| """Deletes all copied data.""" |
| self._dut.adb.shell("rm -rf %s" % config.FUZZER_TEST_DIR) |
| |
| def PushFiles(self, testcase): |
| """adb pushes testcase file to target. |
| |
| Args: |
| testcase: string, path to executable fuzzer. |
| """ |
| push_src = os.path.join(self.data_file_path, config.FUZZER_SRC_DIR, |
| testcase) |
| self._dut.adb.push("%s %s" % (push_src, config.FUZZER_TEST_DIR)) |
| logging.debug("Adb pushed: %s", testcase) |
| |
| def CreateFuzzerFlags(self, fuzzer_config): |
| """Creates flags for the fuzzer executable. |
| |
| Args: |
| fuzzer_config: dict, contains configuration for the fuzzer. |
| |
| Returns: |
| string, command line flags for fuzzer executable. |
| """ |
| |
| def _SerializeVTSFuzzerParams(params): |
| """Creates VTS command line flags for fuzzer executable. |
| |
| Args: |
| params: dict, contains flags and their values. |
| |
| Returns: |
| string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " |
| """ |
| VTS_SPEC_FILES = "vts_spec_files" |
| VTS_EXEC_SIZE = "vts_exec_size" |
| DELIMITER = ":" |
| |
| # vts_spec_files is a string list, will be serialized like this: |
| # [a, b, c] -> "a:b:c" |
| vts_spec_files = params.get(VTS_SPEC_FILES, {}) |
| target_vts_spec_files = DELIMITER.join(map( |
| lambda x: path_utils.JoinTargetPath(config.FUZZER_SPEC_DIR, x), |
| vts_spec_files)) |
| flags = "--%s=\"%s\" " % (VTS_SPEC_FILES, target_vts_spec_files) |
| |
| vts_exec_size = params.get(VTS_EXEC_SIZE, {}) |
| flags += "--%s=%s" % (VTS_EXEC_SIZE, vts_exec_size) |
| return flags |
| |
| def _SerializeLLVMFuzzerParams(params): |
| """Creates LLVM libfuzzer command line flags for fuzzer executable. |
| |
| Args: |
| params: dict, contains flags and their values. |
| |
| Returns: |
| string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " |
| """ |
| return " ".join(["-%s=%s" % (k, v) for k, v in params.items()]) |
| |
| vts_fuzzer_params = fuzzer_config.get("vts_fuzzer_params", {}) |
| |
| llvmfuzzer_params = config.FUZZER_PARAMS.copy() |
| llvmfuzzer_params.update(fuzzer_config.get("llvmfuzzer_params", {})) |
| |
| vts_fuzzer_flags = _SerializeVTSFuzzerParams(vts_fuzzer_params) |
| llvmfuzzer_flags = _SerializeLLVMFuzzerParams(llvmfuzzer_params) |
| |
| return vts_fuzzer_flags + " -- " + llvmfuzzer_flags |
| |
| def CreateCorpus(self, fuzzer, fuzzer_config): |
| """Creates a corpus directory on target. |
| |
| Args: |
| fuzzer: string, name of the fuzzer executable. |
| fuzzer_config: dict, contains configuration for the fuzzer. |
| |
| Returns: |
| string, path to corpus directory on the target. |
| """ |
| corpus = fuzzer_config.get("corpus", []) |
| corpus_dir = path_utils.JoinTargetPath(config.FUZZER_TEST_DIR, |
| "%s_corpus" % fuzzer) |
| |
| self._dut.adb.shell("mkdir %s -p" % corpus_dir) |
| for idx, corpus_entry in enumerate(corpus): |
| corpus_entry = corpus_entry.replace("x", "\\x") |
| corpus_entry_file = path_utils.JoinTargetPath( |
| corpus_dir, "input%s" % idx) |
| cmd = "echo -ne '%s' > %s" % (str(corpus_entry), corpus_entry_file) |
| # Vts shell drive doesn't play nicely with escape characters, |
| # so we use adb shell. |
| self._dut.adb.shell("\"%s\"" % cmd) |
| |
| return corpus_dir |
| |
| def RunTestcase(self, fuzzer): |
| """Runs the given testcase and asserts the result. |
| |
| Args: |
| fuzzer: string, name of fuzzer executable. |
| """ |
| self.PushFiles(fuzzer) |
| |
| fuzzer_config = self.fuzzer_configs.get(fuzzer, {}) |
| test_flags = self.CreateFuzzerFlags(fuzzer_config) |
| corpus_dir = self.CreateCorpus(fuzzer, fuzzer_config) |
| |
| chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath( |
| config.FUZZER_TEST_DIR, fuzzer) |
| self._dut.adb.shell(chmod_cmd) |
| |
| cd_cmd = "cd %s" % config.FUZZER_TEST_DIR |
| ld_path = "LD_LIBRARY_PATH=/data/local/tmp/64:/data/local/tmp/32:$LD_LIBRARY_PATH" |
| test_cmd = "./%s" % fuzzer |
| |
| fuzz_cmd = "%s && %s %s %s %s > /dev/null" % (cd_cmd, ld_path, |
| test_cmd, corpus_dir, |
| test_flags) |
| logging.debug("Executing: %s", fuzz_cmd) |
| # TODO(trong): vts shell doesn't handle timeouts properly, change this after it does. |
| try: |
| stdout = self._dut.adb.shell("'%s'" % fuzz_cmd) |
| result = { |
| const.STDOUT: stdout, |
| const.STDERR: "", |
| const.EXIT_CODE: 0 |
| } |
| except adb.AdbError as e: |
| result = { |
| const.STDOUT: e.stdout, |
| const.STDERR: e.stderr, |
| const.EXIT_CODE: e.ret_code |
| } |
| self.AssertTestResult(fuzzer, result) |
| |
| def LogCrashReport(self, fuzzer): |
| """Logs crash-causing fuzzer input. |
| |
| Reads the crash report file and logs the contents in format: |
| "\x01\x23\x45\x67\x89\xab\xcd\xef" |
| |
| Args: |
| fuzzer: string, name of fuzzer executable. |
| """ |
| cmd = "xxd -p %s" % config.FUZZER_TEST_CRASH_REPORT |
| |
| # output is string of a hexdump from crash report file. |
| # From the example above, output would be "0123456789abcdef". |
| output = self._dut.adb.shell(cmd) |
| remove_chars = ["\r", "\t", "\n", " "] |
| for char in remove_chars: |
| output = output.replace(char, "") |
| |
| crash_report = "" |
| # output is guaranteed to be even in length since its a hexdump. |
| for offset in xrange(0, len(output), 2): |
| crash_report += "\\x%s" % output[offset:offset + 2] |
| |
| logging.debug('FUZZER_TEST_CRASH_REPORT for %s: "%s"', fuzzer, |
| crash_report) |
| |
| # TODO(trong): differentiate between crashes and sanitizer rule violations. |
| def AssertTestResult(self, fuzzer, result): |
| """Asserts that testcase finished as expected. |
| |
| Checks that device is in responsive state. If not, waits for boot |
| then reports test as failure. If it is, asserts that all test commands |
| returned exit code 0. |
| |
| Args: |
| fuzzer: string, name of fuzzer executable. |
| result: dict(str, str, int), command results from shell. |
| """ |
| logging.debug("Test result: %s" % result) |
| if not self._dut.hasBooted(): |
| self._dut.waitForBootCompletion() |
| asserts.fail("%s left the device in unresponsive state." % fuzzer) |
| |
| exit_code = result[const.EXIT_CODE] |
| if exit_code == config.ExitCode.FUZZER_TEST_FAIL: |
| self.LogCrashReport(fuzzer) |
| asserts.fail("%s failed normally." % fuzzer) |
| elif exit_code != config.ExitCode.FUZZER_TEST_PASS: |
| asserts.fail("%s failed abnormally." % fuzzer) |
| |
| def generateFuzzerTests(self): |
| """Runs fuzzer tests.""" |
| self.runGeneratedTests( |
| test_func=self.RunTestcase, |
| settings=self._testcases, |
| name_func=lambda x: x.split("/")[-1]) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |