blob: 0f66bea9f887e76c910c7884ccdccb1cfec2769e [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A command line interface to Gerrit-on-borg instances.
Internal Note:
To expose a function directly to the command line interface, name your function
with the prefix "UserAct".
from __future__ import print_function
import inspect
import pprint
import re
from chromite.cbuildbot import config_lib
from chromite.cbuildbot import constants
from chromite.lib import commandline
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import gerrit
from chromite.lib import git
from chromite.lib import gob_util
from chromite.lib import terminal
site_config = config_lib.GetConfig()
COLOR = None
# Map the internal names to the ones we normally show on the web ui.
'COMR': ['CQ', 'Commit Queue ',],
'CRVW': ['CR', 'Code Review ',],
'SUBM': ['S ', 'Submitted ',],
'TRY': ['T ', 'Trybot Ready ',],
'VRIF': ['V ', 'Verified ',],
# Order is important -- matches the web ui. This also controls the short
# entries that we summarize in non-verbose mode.
def red(s):
return COLOR.Color(terminal.Color.RED, s)
def green(s):
return COLOR.Color(terminal.Color.GREEN, s)
def blue(s):
return COLOR.Color(terminal.Color.BLUE, s)
def limits(cls):
"""Given a dict of fields, calculate the longest string lengths
This allows you to easily format the output of many results so that the
various cols all line up correctly.
lims = {}
for cl in cls:
for k in cl.keys():
# Use %s rather than str() to avoid codec issues.
# We also do this so we can format integers.
lims[k] = max(lims.get(k, 0), len('%s' % cl[k]))
return lims
# TODO: This func really needs to be merged into the core gerrit logic.
def GetGerrit(opts, cl=None):
"""Auto pick the right gerrit instance based on the |cl|
opts: The general options object.
cl: A CL taking one of the forms: 1234 *1234 chromium:1234
A tuple of a gerrit object and a sanitized CL #.
gob = opts.gob
if cl is not None:
if cl.startswith('*'):
gob = site_config.params.INTERNAL_GOB_INSTANCE
cl = cl[1:]
elif ':' in cl:
gob, cl = cl.split(':', 1)
if not gob in opts.gerrit:
opts.gerrit[gob] = gerrit.GetGerritHelper(gob=gob, print_cmd=opts.debug)
return (opts.gerrit[gob], cl)
def GetApprovalSummary(_opts, cls):
"""Return a dict of the most important approvals"""
approvs = dict([(x, '') for x in GERRIT_SUMMARY_CATS])
if 'approvals' in cls['currentPatchSet']:
for approver in cls['currentPatchSet']['approvals']:
cats = GERRIT_APPROVAL_MAP.get(approver['type'])
if not cats:
logging.warning('unknown gerrit approval type: %s', approver['type'])
cat = cats[0].strip()
val = int(approver['value'])
if not cat in approvs:
# Ignore the extended categories in the summary view.
elif approvs[cat] is '':
approvs[cat] = val
elif val < 0:
approvs[cat] = min(approvs[cat], val)
approvs[cat] = max(approvs[cat], val)
return approvs
def PrintCl(opts, cls, lims, show_approvals=True):
"""Pretty print a single result"""
if opts.raw:
# Special case internal Chrome GoB as that is what most devs use.
# They can always redirect the list elsewhere via the -g option.
if opts.gob == site_config.params.INTERNAL_GOB_INSTANCE:
print(site_config.params.INTERNAL_CHANGE_PREFIX, end='')
if not lims:
lims = {'url': 0, 'project': 0}
status = ''
if show_approvals and not opts.verbose:
approvs = GetApprovalSummary(opts, cls)
if approvs[cat] is '':
functor = lambda x: x
elif approvs[cat] < 0:
functor = red
functor = green
status += functor('%s:%2s ' % (cat, approvs[cat]))
print('%s %s%-*s %s' % (blue('%-*s' % (lims['url'], cls['url'])), status,
lims['project'], cls['project'], cls['subject']))
if show_approvals and opts.verbose:
for approver in cls['currentPatchSet'].get('approvals', []):
functor = red if int(approver['value']) < 0 else green
n = functor('%2s' % approver['value'])
t = GERRIT_APPROVAL_MAP.get(approver['type'], [approver['type'],
print(' %s %s %s' % (n, t, approver['by']['email']))
def _MyUserInfo():
email = git.GetProjectUserEmail(constants.CHROMITE_DIR)
[username, _, domain] = email.partition('@')
if domain in ('', ''):
emails = ['%s@%s' % (username, domain)
for domain in ('', '')]
emails = [email]
reviewers = ['reviewer:%s' % x for x in emails]
owners = ['owner:%s' % x for x in emails]
return emails, reviewers, owners
def _Query(opts, query, raw=True):
"""Queries Gerrit with a query string built from the commandline options"""
if opts.branch is not None:
query += ' branch:%s' % opts.branch
if opts.project is not None:
query += ' project: %s' % opts.project
if opts.topic is not None:
query += ' topic: %s' % opts.topic
helper, _ = GetGerrit(opts)
return helper.Query(query, raw=raw, bypass_cache=False)
def FilteredQuery(opts, query):
"""Query gerrit and filter/clean up the results"""
ret = []
for cl in _Query(opts, query, raw=True):
# Gerrit likes to return a stats record too.
if not 'project' in cl:
# Strip off common leading names since the result is still
# unique over the whole tree.
if not opts.verbose:
for pfx in ('chromeos', 'chromiumos', 'overlays', 'platform',
if cl['project'].startswith('%s/' % pfx):
cl['project'] = cl['project'][len(pfx) + 1:]
if opts.sort == 'number':
key = lambda x: int(x[opts.sort])
key = lambda x: x[opts.sort]
return sorted(ret, key=key)
def IsApprover(cl, users):
"""See if the approvers in |cl| is listed in |users|"""
# See if we are listed in the approvals list. We have to parse
# this by hand as the gerrit query system doesn't support it :(
if 'approvals' not in cl['currentPatchSet']:
return False
if isinstance(users, basestring):
users = (users,)
for approver in cl['currentPatchSet']['approvals']:
if (approver['by']['email'] in users and
approver['type'] == 'CRVW' and
int(approver['value']) != 0):
return True
return False
def UserActTodo(opts):
"""List CLs needing your review"""
emails, reviewers, owners = _MyUserInfo()
cls = FilteredQuery(opts, ('( %s ) status:open NOT ( %s )' %
(' OR '.join(reviewers), ' OR '.join(owners))))
cls = [x for x in cls if not IsApprover(x, emails)]
lims = limits(cls)
for cl in cls:
PrintCl(opts, cl, lims)
def UserActSearch(opts, query):
"""List CLs matching the Gerrit <search query>"""
cls = FilteredQuery(opts, query)
lims = limits(cls)
for cl in cls:
PrintCl(opts, cl, lims)
def UserActMine(opts):
"""List your CLs with review statuses"""
_, _, owners = _MyUserInfo()
if opts.draft:
rule = 'is:draft'
rule = 'status:new'
UserActSearch(opts, '( %s ) %s' % (' OR '.join(owners), rule))
def _BreadthFirstSearch(to_visit, children, visited_key=lambda x: x):
"""Runs breadth first search starting from the nodes in |to_visit|
to_visit: the starting nodes
children: a function which takes a node and returns the nodes adjacent to it
visited_key: a function for deduplicating node visits. Defaults to the
identity function (lambda x: x)
A list of nodes which are reachable from any node in |to_visit| by calling
|children| any number of times.
to_visit = list(to_visit)
seen = set(map(visited_key, to_visit))
for node in to_visit:
for child in children(node):
key = visited_key(child)
if key not in seen:
return to_visit
def UserActDeps(opts, query):
"""List CLs matching a query, and all transitive dependencies of those CLs"""
cls = _Query(opts, query, raw=False)
def _QueryChange(cl):
return _Query(opts, cl, raw=False)
def _Children(cl):
"""Returns the Gerrit and CQ-Depends dependencies of a patch"""
cq_deps = cl.PaladinDependencies(None)
direct_deps = cl.GerritDependencies() + cq_deps
# We need to query the change to guarantee that we have a .gerrit_number
for dep in direct_deps:
# TODO(phobbs) this should maybe catch network errors.
change = _QueryChange(dep.ToGerritQueryText())[-1]
if change.status == 'NEW':
yield change
transitives = _BreadthFirstSearch(
cls, _Children,
visited_key=lambda cl: cl.gerrit_number)
transitives_raw = [cl.patch_dict for cl in transitives]
lims = limits(transitives_raw)
for cl in transitives_raw:
PrintCl(opts, cl, lims)
def UserActInspect(opts, *args):
"""Inspect CL number <n> [n ...]"""
for arg in args:
cl = FilteredQuery(opts, arg)
if cl:
PrintCl(opts, cl[0], None)
print('no results found for CL %s' % arg)
def UserActReview(opts, *args):
"""Mark CL <n> [n ...] with code review status <-2,-1,0,1,2>"""
num = args[-1]
for arg in args[:-1]:
helper, cl = GetGerrit(opts, arg)
helper.SetReview(cl, labels={'Code-Review': num}, dryrun=opts.dryrun)
UserActReview.arg_min = 2
def UserActVerify(opts, *args):
"""Mark CL <n> [n ...] with verify status <-1,0,1>"""
num = args[-1]
for arg in args[:-1]:
helper, cl = GetGerrit(opts, arg)
helper.SetReview(cl, labels={'Verified': num}, dryrun=opts.dryrun)
UserActVerify.arg_min = 2
def UserActReady(opts, *args):
"""Mark CL <n> [n ...] with ready status <0,1,2>"""
num = args[-1]
for arg in args[:-1]:
helper, cl = GetGerrit(opts, arg)
helper.SetReview(cl, labels={'Commit-Queue': num}, dryrun=opts.dryrun)
UserActReady.arg_min = 2
def UserActTrybotready(opts, *args):
"""Mark CL <n> [n ...] with trybot-ready status <0,1>"""
num = args[-1]
for arg in args[:-1]:
helper, cl = GetGerrit(opts, arg)
helper.SetReview(cl, labels={'Trybot-Ready': num}, dryrun=opts.dryrun)
UserActTrybotready.arg_min = 2
def UserActSubmit(opts, *args):
"""Submit CL <n> [n ...]"""
for arg in args:
helper, cl = GetGerrit(opts, arg)
helper.SubmitChange(cl, dryrun=opts.dryrun)
def UserActAbandon(opts, *args):
"""Abandon CL <n> [n ...]"""
for arg in args:
helper, cl = GetGerrit(opts, arg)
helper.AbandonChange(cl, dryrun=opts.dryrun)
def UserActRestore(opts, *args):
"""Restore CL <n> [n ...] that was abandoned"""
for arg in args:
helper, cl = GetGerrit(opts, arg)
helper.RestoreChange(cl, dryrun=opts.dryrun)
def UserActReviewers(opts, cl, *args):
"""Add/remove reviewers' emails for CL <n> (prepend with '~' to remove)"""
emails = args
# Allow for optional leading '~'.
email_validator = re.compile(r'^[~]?%s$' % constants.EMAIL_REGEX)
add_list, remove_list, invalid_list = [], [], []
for x in emails:
if not email_validator.match(x):
elif x[0] == '~':
if invalid_list:
'Invalid email address(es): %s' % ', '.join(invalid_list))
if add_list or remove_list:
helper, cl = GetGerrit(opts, cl)
helper.SetReviewers(cl, add=add_list, remove=remove_list,
def UserActMessage(opts, cl, message):
"""Add a message to CL <n>"""
helper, cl = GetGerrit(opts, cl)
helper.SetReview(cl, msg=message, dryrun=opts.dryrun)
def UserActTopic(opts, topic, *args):
"""Set |topic| for CL number <n> [n ...]"""
for arg in args:
helper, arg = GetGerrit(opts, arg)
helper.SetTopic(arg, topic, dryrun=opts.dryrun)
def UserActDeletedraft(opts, *args):
"""Delete draft CL <n> [n ...]"""
for arg in args:
helper, cl = GetGerrit(opts, arg)
helper.DeleteDraft(cl, dryrun=opts.dryrun)
def UserActAccount(opts):
"""Get user account information."""
helper, _ = GetGerrit(opts)
def main(argv):
# Locate actions that are exposed to the user. All functions that start
# with "UserAct" are fair game.
act_pfx = 'UserAct'
actions = [x for x in globals() if x.startswith(act_pfx)]
usage = """%(prog)s [options] <action> [action args]
There is no support for doing line-by-line code review via the command line.
This helps you manage various bits and CL status.
For general Gerrit documentation, see:
The Searching Changes page covers the search query syntax:
$ gerrit todo # List all the CLs that await your review.
$ gerrit mine # List all of your open CLs.
$ gerrit inspect 28123 # Inspect CL 28123 on the public gerrit.
$ gerrit inspect *28123 # Inspect CL 28123 on the internal gerrit.
$ gerrit verify 28123 1 # Mark CL 28123 as verified (+1).
$ gerrit ready `gerrit --raw mine` 1 # Mark *ALL* of your public CLs \
$ gerrit ready `gerrit --raw -i mine` 1 # Mark *ALL* of your internal CLs \
indent = max([len(x) - len(act_pfx) for x in actions])
for a in sorted(actions):
cmd = a[len(act_pfx):]
# Sanity check for devs adding new commands. Should be quick.
if cmd != cmd.lower().capitalize():
raise RuntimeError('callback "%s" is misnamed; should be "%s"' %
(cmd, cmd.lower().capitalize()))
usage += '\n %-*s: %s' % (indent, cmd.lower(), globals()[a].__doc__)
parser = commandline.ArgumentParser(usage=usage)
parser.add_argument('-i', '--internal', dest='gob', action='store_const',
help='Query internal Chromium Gerrit instance')
parser.add_argument('-g', '--gob',
help=('Gerrit (on borg) instance to query (default: %s)' %
parser.add_argument('--sort', default='number',
help='Key to sort on (number, project)')
parser.add_argument('--raw', default=False, action='store_true',
help='Return raw results (suitable for scripting)')
parser.add_argument('-n', '--dry-run', default=False, action='store_true',
help='Show what would be done, but do not make changes')
parser.add_argument('-v', '--verbose', default=False, action='store_true',
help='Be more verbose in output')
parser.add_argument('-b', '--branch',
help='Limit output to the specific branch')
parser.add_argument('--draft', default=False, action='store_true',
help="Show draft changes (applicable to 'mine' only)")
parser.add_argument('-p', '--project',
help='Limit output to the specific project')
parser.add_argument('-t', '--topic',
help='Limit output to the specific topic')
parser.add_argument('args', nargs='+')
opts = parser.parse_args(argv)
# A cache of gerrit helpers we'll load on demand.
opts.gerrit = {}
# pylint: disable=W0603
global COLOR
COLOR = terminal.Color(enabled=opts.color)
# Now look up the requested user action and run it.
cmd = opts.args[0].lower()
args = opts.args[1:]
functor = globals().get(act_pfx + cmd.capitalize())
if functor:
argspec = inspect.getargspec(functor)
if argspec.varargs:
arg_min = getattr(functor, 'arg_min', len(argspec.args))
if len(args) < arg_min:
parser.error('incorrect number of args: %s expects at least %s' %
(cmd, arg_min))
elif len(argspec.args) - 1 != len(args):
parser.error('incorrect number of args: %s expects %s' %
(cmd, len(argspec.args) - 1))
functor(opts, *args)
except (cros_build_lib.RunCommandError, gerrit.GerritException,
gob_util.GOBError) as e:
parser.error('unknown action: %s' % (cmd,))