blob: e91ddbaf6f024f1710578c5724dea1e18686db5e [file] [log] [blame]
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for running fuzzers."""
import json
import os
import sys
import shutil
import tempfile
import unittest
from unittest import mock
import parameterized
from pyfakefs import fake_filesystem_unittest
import docker
import build_fuzzers
import fuzz_target
import run_fuzzers
# pylint: disable=wrong-import-position
INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(INFRA_DIR)
import helper
import test_helpers
# NOTE: This integration test relies on
# https://github.com/google/oss-fuzz/tree/master/projects/example project.
EXAMPLE_PROJECT = 'example'
# Location of files used for testing.
TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'test_data')
MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory')
MEMORY_FUZZER = 'curl_fuzzer_memory'
UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined')
UNDEFINED_FUZZER = 'curl_fuzzer_undefined'
FUZZ_SECONDS = 10
class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name
"""Mixin for integration test classes that runbuild_fuzzers on builds of a
specific sanitizer."""
# These must be defined by children.
FUZZER_DIR = None
FUZZER = None
def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer):
"""Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts
the run succeeded and that no bug was found."""
with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy:
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
workspace=fuzzer_dir_copy,
oss_fuzz_project_name='curl',
sanitizer=sanitizer)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an MSAN build."""
FUZZER_DIR = MEMORY_FUZZER_DIR
FUZZER = MEMORY_FUZZER
def test_run_with_memory_sanitizer(self):
"""Tests run_fuzzers with a valid MSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory')
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an UBSAN build."""
FUZZER_DIR = UNDEFINED_FUZZER_DIR
FUZZER = UNDEFINED_FUZZER
def test_run_with_undefined_sanitizer(self):
"""Tests run_fuzzers with a valid UBSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined')
class BaseFuzzTargetRunnerTest(unittest.TestCase):
"""Tests BaseFuzzTargetRunner."""
def _create_runner(self, **kwargs): # pylint: disable=no-self-use
defaults = {
'fuzz_seconds': FUZZ_SECONDS,
'oss_fuzz_project_name': EXAMPLE_PROJECT
}
for default_key, default_value in defaults.items():
if default_key not in kwargs:
kwargs[default_key] = default_value
config = test_helpers.create_run_config(**kwargs)
return run_fuzzers.BaseFuzzTargetRunner(config)
def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs):
with mock.patch('logging.error') as mocked_error:
runner = self._create_runner(**create_runner_kwargs)
self.assertFalse(runner.initialize())
mocked_error.assert_called_with(*expected_error_args)
@parameterized.parameterized.expand([(0,), (None,), (-1,)])
def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds):
"""Tests initialize fails with an invalid fuzz seconds."""
expected_error_args = ('Fuzz_seconds argument must be greater than 1, '
'but was: %s.', fuzz_seconds)
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
with mock.patch('utils.get_fuzz_targets') as mocked_get_fuzz_targets:
mocked_get_fuzz_targets.return_value = [
os.path.join(out_path, 'fuzz_target')
]
self._test_initialize_fail(expected_error_args,
fuzz_seconds=fuzz_seconds,
workspace=tmp_dir)
def test_initialize_no_out_dir(self):
"""Tests initialize fails with no out dir."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
expected_error_args = ('Out directory: %s does not exist.', out_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
def test_initialize_nonempty_artifacts(self):
"""Tests initialize with a file artifacts path."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
os.makedirs(os.path.join(tmp_dir, 'out'))
artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts')
with open(artifacts_path, 'w') as artifacts_handle:
artifacts_handle.write('fake')
expected_error_args = (
'Artifacts path: %s exists and is not an empty directory.',
artifacts_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
def test_initialize_bad_artifacts(self):
"""Tests initialize with a non-empty artifacts path."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts')
os.makedirs(artifacts_path)
artifact_path = os.path.join(artifacts_path, 'artifact')
with open(artifact_path, 'w') as artifact_handle:
artifact_handle.write('fake')
expected_error_args = (
'Artifacts path: %s exists and is not an empty directory.',
artifacts_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
@mock.patch('utils.get_fuzz_targets')
@mock.patch('logging.error')
def test_initialize_empty_artifacts(self, mocked_log_error,
mocked_get_fuzz_targets):
"""Tests initialize with an empty artifacts dir."""
mocked_get_fuzz_targets.return_value = ['fuzz-target']
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts')
os.makedirs(artifacts_path)
runner = self._create_runner(workspace=tmp_dir)
self.assertTrue(runner.initialize())
mocked_log_error.assert_not_called()
self.assertTrue(os.path.isdir(artifacts_path))
@mock.patch('utils.get_fuzz_targets')
@mock.patch('logging.error')
def test_initialize_no_artifacts(self, mocked_log_error,
mocked_get_fuzz_targets):
"""Tests initialize with no artifacts dir (the expected setting)."""
mocked_get_fuzz_targets.return_value = ['fuzz-target']
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
runner = self._create_runner(workspace=tmp_dir)
self.assertTrue(runner.initialize())
mocked_log_error.assert_not_called()
self.assertTrue(os.path.isdir(os.path.join(tmp_dir, 'out', 'artifacts')))
def test_initialize_no_fuzz_targets(self):
"""Tests initialize with no fuzz targets."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.makedirs(out_path)
expected_error_args = ('No fuzz targets were found in out directory: %s.',
out_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
def test_get_fuzz_target_artifact(self):
"""Tests that get_fuzz_target_artifact works as intended."""
with tempfile.TemporaryDirectory() as tmp_dir:
runner = self._create_runner(workspace=tmp_dir)
crashes_dir = 'crashes-dir'
runner.crashes_dir = crashes_dir
artifact_name = 'artifact-name'
target = mock.MagicMock()
target_name = 'target_name'
target.target_name = target_name
fuzz_target_artifact = runner.get_fuzz_target_artifact(
target, artifact_name)
expected_fuzz_target_artifact = os.path.join(
tmp_dir, 'out', 'artifacts', 'target_name-address-artifact-name')
self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact)
class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
"""Tests that CiFuzzTargetRunner works as intended."""
def setUp(self):
self.setUpPyfakefs()
@mock.patch('utils.get_fuzz_targets')
@mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target')
@mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj')
def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj,
mocked_run_fuzz_target,
mocked_get_fuzz_targets):
"""Tests that run_fuzz_targets quits on the first crash it finds."""
workspace = 'workspace'
out_path = os.path.join(workspace, 'build-out')
self.fs.create_dir(out_path)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
oss_fuzz_project_name=EXAMPLE_PROJECT)
runner = run_fuzzers.CiFuzzTargetRunner(config)
mocked_get_fuzz_targets.return_value = ['target1', 'target2']
runner.initialize()
testcase = os.path.join(workspace, 'testcase')
self.fs.create_file(testcase)
stacktrace = b'stacktrace'
corpus_dir = 'corpus'
self.fs.create_dir(corpus_dir)
mocked_run_fuzz_target.return_value = fuzz_target.FuzzResult(
testcase, stacktrace, corpus_dir)
magic_mock = mock.MagicMock()
magic_mock.target_name = 'target1'
mocked_create_fuzz_target_obj.return_value = magic_mock
self.assertTrue(runner.run_fuzz_targets())
self.assertIn('target1-address-testcase',
os.listdir(runner.workspace.artifacts))
self.assertEqual(mocked_run_fuzz_target.call_count, 1)
class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
"""Tests that BatchFuzzTargetRunnerTest works as intended."""
WORKSPACE = 'workspace'
STACKTRACE = b'stacktrace'
CORPUS_DIR = 'corpus'
def setUp(self):
self.setUpPyfakefs()
out_dir = os.path.join(self.WORKSPACE, 'build-out')
self.fs.create_dir(out_dir)
self.testcase1 = os.path.join(out_dir, 'testcase-aaa')
self.fs.create_file(self.testcase1)
self.testcase2 = os.path.join(out_dir, 'testcase-bbb')
self.fs.create_file(self.testcase2)
self.config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
workspace=self.WORKSPACE,
is_github=True)
@mock.patch('utils.get_fuzz_targets', return_value=['target1', 'target2'])
@mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_latest_build',
return_value=True)
@mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target')
@mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj')
def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj,
mocked_run_fuzz_target, _, __):
"""Tests that run_fuzz_targets doesn't quit on the first crash it finds."""
runner = run_fuzzers.BatchFuzzTargetRunner(self.config)
runner.initialize()
call_count = 0
def mock_run_fuzz_target(_):
nonlocal call_count
if call_count == 0:
testcase = self.testcase1
elif call_count == 1:
testcase = self.testcase2
assert call_count != 2
call_count += 1
if not os.path.exists(self.CORPUS_DIR):
self.fs.create_dir(self.CORPUS_DIR)
return fuzz_target.FuzzResult(testcase, self.STACKTRACE, self.CORPUS_DIR)
mocked_run_fuzz_target.side_effect = mock_run_fuzz_target
magic_mock = mock.MagicMock()
magic_mock.target_name = 'target1'
mocked_create_fuzz_target_obj.return_value = magic_mock
self.assertTrue(runner.run_fuzz_targets())
self.assertEqual(mocked_run_fuzz_target.call_count, 2)
@mock.patch('run_fuzzers.BaseFuzzTargetRunner.run_fuzz_targets',
return_value=False)
@mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_latest_build')
@mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_crashes')
def test_run_fuzz_targets_upload_crashes_and_builds(
self, mocked_upload_crashes, mocked_upload_latest_build, _):
"""Tests that run_fuzz_targets uploads crashes and builds correctly."""
runner = run_fuzzers.BatchFuzzTargetRunner(self.config)
# TODO(metzman): Don't rely on this failing gracefully.
runner.initialize()
self.assertFalse(runner.run_fuzz_targets())
self.assertEqual(mocked_upload_crashes.call_count, 1)
self.assertEqual(mocked_upload_latest_build.call_count, 1)
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class CoverageReportIntegrationTest(unittest.TestCase):
"""Integration tests for coverage reports."""
SANITIZER = 'coverage'
def setUp(self):
test_helpers.patch_environ(self)
@mock.patch('third_party.github_actions_toolkit.artifact.artifact_client'
'.upload_artifact',
return_value=True)
def test_coverage_report(self, _):
"""Tests generation of coverage reports end-to-end, from building to
generation."""
with tempfile.TemporaryDirectory() as workspace:
try:
# Do coverage build.
build_config = test_helpers.create_build_config(
oss_fuzz_project_name=EXAMPLE_PROJECT,
project_repo_name='oss-fuzz',
workspace=workspace,
commit_sha='0b95fe1039ed7c38fea1f97078316bfc1030c523',
base_commit='da0746452433dc18bae699e355a9821285d863c8',
sanitizer=self.SANITIZER,
is_github=True)
self.assertTrue(build_fuzzers.build_fuzzers(build_config))
# Generate report.
run_config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
sanitizer=self.SANITIZER,
run_fuzzers_mode='coverage',
is_github=True)
result = run_fuzzers.run_fuzzers(run_config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
expected_summary_path = os.path.join(
TEST_DATA_PATH, 'example_coverage_report_summary.json')
with open(expected_summary_path) as file_handle:
expected_summary = json.loads(file_handle.read())
actual_summary_path = os.path.join(workspace, 'cifuzz-coverage',
'report', 'linux', 'summary.json')
with open(actual_summary_path) as file_handle:
actual_summary = json.loads(file_handle.read())
self.assertEqual(expected_summary, actual_summary)
finally:
# If we don't do this, there will be an exception when the temporary
# directory is deleted because there are files there that are only
# writeable by root.
if os.listdir(workspace):
helper.docker_run([
'-v', f'{workspace}:/workspace', '-t', docker.BASE_RUNNER_TAG,
'/bin/bash', '-c', 'rm -rf /workspace/*'
])
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration tests for build_fuzzers with an ASAN build."""
BUILD_DIR_NAME = 'cifuzz-latest-build'
def test_new_bug_found(self):
"""Tests run_fuzzers with a valid ASAN build."""
# Set the first return value to True, then the second to False to
# emulate a bug existing in the current PR but not on the downloaded
# OSS-Fuzz build.
with mock.patch('fuzz_target.FuzzTarget.is_reproducible',
side_effect=[True, False]):
with tempfile.TemporaryDirectory() as tmp_dir:
workspace = os.path.join(tmp_dir, 'workspace')
shutil.copytree(TEST_DATA_PATH, workspace)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
oss_fuzz_project_name=EXAMPLE_PROJECT)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND)
@mock.patch('fuzz_target.FuzzTarget.is_reproducible',
side_effect=[True, True])
def test_old_bug_found(self, _):
"""Tests run_fuzzers with a bug found in OSS-Fuzz before."""
with tempfile.TemporaryDirectory() as tmp_dir:
workspace = os.path.join(tmp_dir, 'workspace')
shutil.copytree(TEST_DATA_PATH, workspace)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
oss_fuzz_project_name=EXAMPLE_PROJECT)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
def test_invalid_build(self):
"""Tests run_fuzzers with an invalid ASAN build."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=tmp_dir,
oss_fuzz_project_name=EXAMPLE_PROJECT)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR)
class GetFuzzTargetRunnerTest(unittest.TestCase):
"""Tests for get_fuzz_fuzz_target_runner."""
@parameterized.parameterized.expand([
('batch', run_fuzzers.BatchFuzzTargetRunner),
('ci', run_fuzzers.CiFuzzTargetRunner),
('coverage', run_fuzzers.CoverageTargetRunner)
])
def test_get_fuzz_target_runner(self, run_fuzzers_mode,
fuzz_target_runner_cls):
"""Tests that get_fuzz_target_runner returns the correct runner based on the
specified run_fuzzers_mode."""
with tempfile.TemporaryDirectory() as tmp_dir:
run_config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=tmp_dir,
oss_fuzz_project_name='example',
run_fuzzers_mode=run_fuzzers_mode)
runner = run_fuzzers.get_fuzz_target_runner(run_config)
self.assertTrue(isinstance(runner, fuzz_target_runner_cls))
if __name__ == '__main__':
unittest.main()