| #!/usr/bin/env python3 |
| # Copyright 2018 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """A tool for running puffin tests in a corpus of deflate compressed files.""" |
| |
| import argparse |
| import filecmp |
| import logging |
| import os |
| import subprocess |
| import sys |
| import tempfile |
| |
| |
| _PUFFHUFF = "puffhuff" |
| _PUFFDIFF = "puffdiff" |
| TESTS = (_PUFFHUFF, _PUFFDIFF) |
| |
| |
| class Error(Exception): |
| """Puffin general processing error.""" |
| |
| |
| def ParseArguments(argv): |
| """Parses and Validates command line arguments. |
| |
| Args: |
| argv: command line arguments to parse. |
| |
| Returns: |
| The arguments list. |
| """ |
| parser = argparse.ArgumentParser() |
| |
| parser.add_argument( |
| "corpus", |
| metavar="CORPUS", |
| help="A corpus directory containing compressed files", |
| ) |
| parser.add_argument( |
| "-d", |
| "--disabled_tests", |
| default=(), |
| metavar="", |
| nargs="*", |
| help=( |
| "Space separated list of tests to disable. " |
| "Allowed options include: " + ", ".join(TESTS) |
| ), |
| choices=TESTS, |
| ) |
| parser.add_argument( |
| "--cache_size", |
| type=int, |
| metavar="SIZE", |
| help="The size (in bytes) of the cache for puffpatch " "operations.", |
| ) |
| parser.add_argument( |
| "--debug", action="store_true", help="Turns on verbosity." |
| ) |
| |
| # Parse command-line arguments. |
| args = parser.parse_args(argv) |
| |
| if not os.path.isdir(args.corpus): |
| raise Error( |
| "Corpus directory {} is non-existent or inaccesible".format( |
| args.corpus |
| ) |
| ) |
| return args |
| |
| |
| def main(argv): |
| """The main function.""" |
| args = ParseArguments(argv[1:]) |
| |
| if args.debug: |
| logging.getLogger().setLevel(logging.DEBUG) |
| |
| # Construct list of appropriate files. |
| files = list( |
| filter( |
| os.path.isfile, |
| [os.path.join(args.corpus, f) for f in os.listdir(args.corpus)], |
| ) |
| ) |
| |
| # For each file in corpus run puffhuff. |
| if _PUFFHUFF not in args.disabled_tests: |
| for src in files: |
| with tempfile.NamedTemporaryFile() as tgt_file: |
| operation = "puffhuff" |
| logging.debug("Running %s on %s", operation, src) |
| cmd = [ |
| "puffin", |
| "--operation={}".format(operation), |
| "--src_file={}".format(src), |
| "--dst_file={}".format(tgt_file.name), |
| ] |
| if subprocess.call(cmd) != 0: |
| raise Error( |
| "Puffin failed to do {} command: {}".format( |
| operation, cmd |
| ) |
| ) |
| |
| if not filecmp.cmp(src, tgt_file.name): |
| raise Error( |
| "The generated file {} is not equivalent to the " |
| "original file {} after {} operation.".format( |
| tgt_file.name, src, operation |
| ) |
| ) |
| |
| if _PUFFDIFF not in args.disabled_tests: |
| # Run puffdiff and puffpatch for each pairs of files in the corpus. |
| for src in files: |
| for tgt in files: |
| with tempfile.NamedTemporaryFile() as patch, \ |
| tempfile.NamedTemporaryFile() as new_tgt: |
| operation = "puffdiff" |
| logging.debug( |
| "Running %s on %s (%d) and %s (%d)", |
| operation, |
| os.path.basename(src), |
| os.stat(src).st_size, |
| os.path.basename(tgt), |
| os.stat(tgt).st_size, |
| ) |
| cmd = [ |
| "puffin", |
| "--operation={}".format(operation), |
| "--src_file={}".format(src), |
| "--dst_file={}".format(tgt), |
| "--patch_file={}".format(patch.name), |
| ] |
| |
| # Running the puffdiff operation |
| if subprocess.call(cmd) != 0: |
| raise Error( |
| "Puffin failed to do {} command: {}".format( |
| operation, cmd |
| ) |
| ) |
| |
| logging.debug( |
| "Patch size is: %d", os.stat(patch.name).st_size |
| ) |
| |
| operation = "puffpatch" |
| logging.debug( |
| "Running %s on src file %s and patch %s", |
| operation, |
| os.path.basename(src), |
| patch.name, |
| ) |
| cmd = [ |
| "puffin", |
| "--operation={}".format(operation), |
| "--src_file={}".format(src), |
| "--dst_file={}".format(new_tgt.name), |
| "--patch_file={}".format(patch.name), |
| ] |
| if args.cache_size: |
| cmd += ["--cache_size={}".format(args.cache_size)] |
| |
| # Running the puffpatch operation |
| if subprocess.call(cmd) != 0: |
| raise Error( |
| "Puffin failed to do {} command: {}".format( |
| operation, cmd |
| ) |
| ) |
| |
| if not filecmp.cmp(tgt, new_tgt.name): |
| raise Error( |
| "The generated file {} is not equivalent to the " |
| "original file {} after puffpatch " |
| "operation.".format( |
| new_tgt.name, tgt |
| ) |
| ) |
| |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main(sys.argv)) |