blob: 3a2054789dc5b247da319a92dcf2f170d9a409c7 [file] [log] [blame]
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import fnmatch
import json
import os
import pipes
import shlex
import shutil
import subprocess
import sys
import tempfile
import zipfile
CHROMIUM_SRC = os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir, os.pardir, os.pardir)
COLORAMA_ROOT = os.path.join(CHROMIUM_SRC,
'third_party', 'colorama', 'src')
@contextlib.contextmanager
def TempDir():
dirname = tempfile.mkdtemp()
try:
yield dirname
finally:
shutil.rmtree(dirname)
def MakeDirectory(dir_path):
try:
os.makedirs(dir_path)
except OSError:
pass
def DeleteDirectory(dir_path):
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
def Touch(path):
MakeDirectory(os.path.dirname(path))
with open(path, 'a'):
os.utime(path, None)
def FindInDirectory(directory, filename_filter):
files = []
for root, _dirnames, filenames in os.walk(directory):
matched_files = fnmatch.filter(filenames, filename_filter)
files.extend((os.path.join(root, f) for f in matched_files))
return files
def FindInDirectories(directories, filename_filter):
all_files = []
for directory in directories:
all_files.extend(FindInDirectory(directory, filename_filter))
return all_files
def ParseGypList(gyp_string):
# The ninja generator doesn't support $ in strings, so use ## to
# represent $.
# TODO(cjhopman): Remove when
# https://code.google.com/p/gyp/issues/detail?id=327
# is addressed.
gyp_string = gyp_string.replace('##', '$')
return shlex.split(gyp_string)
def CheckOptions(options, parser, required=None):
if not required:
return
for option_name in required:
if getattr(options, option_name) is None:
parser.error('--%s is required' % option_name.replace('_', '-'))
def WriteJson(obj, path, only_if_changed=False):
old_dump = None
if os.path.exists(path):
with open(path, 'r') as oldfile:
old_dump = oldfile.read()
new_dump = json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '))
if not only_if_changed or old_dump != new_dump:
with open(path, 'w') as outfile:
outfile.write(new_dump)
def ReadJson(path):
with open(path, 'r') as jsonfile:
return json.load(jsonfile)
class CalledProcessError(Exception):
"""This exception is raised when the process run by CheckOutput
exits with a non-zero exit code."""
def __init__(self, cwd, args, output):
super(CalledProcessError, self).__init__()
self.cwd = cwd
self.args = args
self.output = output
def __str__(self):
# A user should be able to simply copy and paste the command that failed
# into their shell.
copyable_command = '( cd {}; {} )'.format(os.path.abspath(self.cwd),
' '.join(map(pipes.quote, self.args)))
return 'Command failed: {}\n{}'.format(copyable_command, self.output)
# This can be used in most cases like subprocess.check_output(). The output,
# particularly when the command fails, better highlights the command's failure.
# If the command fails, raises a build_utils.CalledProcessError.
def CheckOutput(args, cwd=None,
print_stdout=False, print_stderr=True,
stdout_filter=None,
stderr_filter=None,
fail_func=lambda returncode, stderr: returncode != 0):
if not cwd:
cwd = os.getcwd()
child = subprocess.Popen(args,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
stdout, stderr = child.communicate()
if stdout_filter is not None:
stdout = stdout_filter(stdout)
if stderr_filter is not None:
stderr = stderr_filter(stderr)
if fail_func(child.returncode, stderr):
raise CalledProcessError(cwd, args, stdout + stderr)
if print_stdout:
sys.stdout.write(stdout)
if print_stderr:
sys.stderr.write(stderr)
return stdout
def GetModifiedTime(path):
# For a symlink, the modified time should be the greater of the link's
# modified time and the modified time of the target.
return max(os.lstat(path).st_mtime, os.stat(path).st_mtime)
def IsTimeStale(output, inputs):
if not os.path.exists(output):
return True
output_time = GetModifiedTime(output)
for i in inputs:
if GetModifiedTime(i) > output_time:
return True
return False
def IsDeviceReady():
device_state = CheckOutput(['adb', 'get-state'])
return device_state.strip() == 'device'
def CheckZipPath(name):
if os.path.normpath(name) != name:
raise Exception('Non-canonical zip path: %s' % name)
if os.path.isabs(name):
raise Exception('Absolute zip path: %s' % name)
def ExtractAll(zip_path, path=None, no_clobber=True):
if path is None:
path = os.getcwd()
elif not os.path.exists(path):
MakeDirectory(path)
with zipfile.ZipFile(zip_path) as z:
for name in z.namelist():
CheckZipPath(name)
if no_clobber:
output_path = os.path.join(path, name)
if os.path.exists(output_path):
raise Exception(
'Path already exists from zip: %s %s %s'
% (zip_path, name, output_path))
z.extractall(path=path)
def DoZip(inputs, output, base_dir):
with zipfile.ZipFile(output, 'w') as outfile:
for f in inputs:
CheckZipPath(os.path.relpath(f, base_dir))
outfile.write(f, os.path.relpath(f, base_dir))
def PrintWarning(message):
print 'WARNING: ' + message
def PrintBigWarning(message):
print '***** ' * 8
PrintWarning(message)
print '***** ' * 8