| # Copyright 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import contextlib |
| import fnmatch |
| import json |
| import os |
| import pipes |
| import shlex |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import zipfile |
| |
| CHROMIUM_SRC = os.path.join(os.path.dirname(__file__), |
| os.pardir, os.pardir, os.pardir, os.pardir) |
| COLORAMA_ROOT = os.path.join(CHROMIUM_SRC, |
| 'third_party', 'colorama', 'src') |
| |
| |
| @contextlib.contextmanager |
| def TempDir(): |
| dirname = tempfile.mkdtemp() |
| try: |
| yield dirname |
| finally: |
| shutil.rmtree(dirname) |
| |
| |
| def MakeDirectory(dir_path): |
| try: |
| os.makedirs(dir_path) |
| except OSError: |
| pass |
| |
| |
| def DeleteDirectory(dir_path): |
| if os.path.exists(dir_path): |
| shutil.rmtree(dir_path) |
| |
| |
| def Touch(path): |
| MakeDirectory(os.path.dirname(path)) |
| with open(path, 'a'): |
| os.utime(path, None) |
| |
| |
| def FindInDirectory(directory, filename_filter): |
| files = [] |
| for root, _dirnames, filenames in os.walk(directory): |
| matched_files = fnmatch.filter(filenames, filename_filter) |
| files.extend((os.path.join(root, f) for f in matched_files)) |
| return files |
| |
| |
| def FindInDirectories(directories, filename_filter): |
| all_files = [] |
| for directory in directories: |
| all_files.extend(FindInDirectory(directory, filename_filter)) |
| return all_files |
| |
| |
| def ParseGypList(gyp_string): |
| # The ninja generator doesn't support $ in strings, so use ## to |
| # represent $. |
| # TODO(cjhopman): Remove when |
| # https://code.google.com/p/gyp/issues/detail?id=327 |
| # is addressed. |
| gyp_string = gyp_string.replace('##', '$') |
| return shlex.split(gyp_string) |
| |
| |
| def CheckOptions(options, parser, required=None): |
| if not required: |
| return |
| for option_name in required: |
| if getattr(options, option_name) is None: |
| parser.error('--%s is required' % option_name.replace('_', '-')) |
| |
| def WriteJson(obj, path, only_if_changed=False): |
| old_dump = None |
| if os.path.exists(path): |
| with open(path, 'r') as oldfile: |
| old_dump = oldfile.read() |
| |
| new_dump = json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': ')) |
| |
| if not only_if_changed or old_dump != new_dump: |
| with open(path, 'w') as outfile: |
| outfile.write(new_dump) |
| |
| def ReadJson(path): |
| with open(path, 'r') as jsonfile: |
| return json.load(jsonfile) |
| |
| |
| class CalledProcessError(Exception): |
| """This exception is raised when the process run by CheckOutput |
| exits with a non-zero exit code.""" |
| |
| def __init__(self, cwd, args, output): |
| super(CalledProcessError, self).__init__() |
| self.cwd = cwd |
| self.args = args |
| self.output = output |
| |
| def __str__(self): |
| # A user should be able to simply copy and paste the command that failed |
| # into their shell. |
| copyable_command = '( cd {}; {} )'.format(os.path.abspath(self.cwd), |
| ' '.join(map(pipes.quote, self.args))) |
| return 'Command failed: {}\n{}'.format(copyable_command, self.output) |
| |
| |
| # This can be used in most cases like subprocess.check_output(). The output, |
| # particularly when the command fails, better highlights the command's failure. |
| # If the command fails, raises a build_utils.CalledProcessError. |
| def CheckOutput(args, cwd=None, |
| print_stdout=False, print_stderr=True, |
| stdout_filter=None, |
| stderr_filter=None, |
| fail_func=lambda returncode, stderr: returncode != 0): |
| if not cwd: |
| cwd = os.getcwd() |
| |
| child = subprocess.Popen(args, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) |
| stdout, stderr = child.communicate() |
| |
| if stdout_filter is not None: |
| stdout = stdout_filter(stdout) |
| |
| if stderr_filter is not None: |
| stderr = stderr_filter(stderr) |
| |
| if fail_func(child.returncode, stderr): |
| raise CalledProcessError(cwd, args, stdout + stderr) |
| |
| if print_stdout: |
| sys.stdout.write(stdout) |
| if print_stderr: |
| sys.stderr.write(stderr) |
| |
| return stdout |
| |
| |
| def GetModifiedTime(path): |
| # For a symlink, the modified time should be the greater of the link's |
| # modified time and the modified time of the target. |
| return max(os.lstat(path).st_mtime, os.stat(path).st_mtime) |
| |
| |
| def IsTimeStale(output, inputs): |
| if not os.path.exists(output): |
| return True |
| |
| output_time = GetModifiedTime(output) |
| for i in inputs: |
| if GetModifiedTime(i) > output_time: |
| return True |
| return False |
| |
| |
| def IsDeviceReady(): |
| device_state = CheckOutput(['adb', 'get-state']) |
| return device_state.strip() == 'device' |
| |
| |
| def CheckZipPath(name): |
| if os.path.normpath(name) != name: |
| raise Exception('Non-canonical zip path: %s' % name) |
| if os.path.isabs(name): |
| raise Exception('Absolute zip path: %s' % name) |
| |
| |
| def ExtractAll(zip_path, path=None, no_clobber=True): |
| if path is None: |
| path = os.getcwd() |
| elif not os.path.exists(path): |
| MakeDirectory(path) |
| |
| with zipfile.ZipFile(zip_path) as z: |
| for name in z.namelist(): |
| CheckZipPath(name) |
| if no_clobber: |
| output_path = os.path.join(path, name) |
| if os.path.exists(output_path): |
| raise Exception( |
| 'Path already exists from zip: %s %s %s' |
| % (zip_path, name, output_path)) |
| |
| z.extractall(path=path) |
| |
| |
| def DoZip(inputs, output, base_dir): |
| with zipfile.ZipFile(output, 'w') as outfile: |
| for f in inputs: |
| CheckZipPath(os.path.relpath(f, base_dir)) |
| outfile.write(f, os.path.relpath(f, base_dir)) |
| |
| |
| def PrintWarning(message): |
| print 'WARNING: ' + message |
| |
| |
| def PrintBigWarning(message): |
| print '***** ' * 8 |
| PrintWarning(message) |
| print '***** ' * 8 |