blob: 531a977097e975dff6d272122dfd6f5b96929790 [file] [log] [blame]
#!/usr/bin/env python
"""Appcommands-compatible command class with extra fixins."""
from __future__ import print_function
import cmd
import inspect
import pdb
import shlex
import sys
import traceback
import types
import six
from google.apputils import app
from google.apputils import appcommands
import gflags as flags
__all__ = [
'NewCmd',
'Repl',
]
flags.DEFINE_boolean(
'debug_mode', False,
'Show tracebacks on Python exceptions.')
flags.DEFINE_boolean(
'headless', False,
'Assume no user is at the controlling console.')
FLAGS = flags.FLAGS
def _SafeMakeAscii(s):
if isinstance(s, six.text_type):
return s.encode('ascii')
elif isinstance(s, str):
return s.decode('ascii')
else:
return six.text_type(s).encode('ascii', 'backslashreplace')
class NewCmd(appcommands.Cmd):
"""Featureful extension of appcommands.Cmd."""
def __init__(self, name, flag_values):
super(NewCmd, self).__init__(name, flag_values)
run_with_args = getattr(self, 'RunWithArgs', None)
self._new_style = isinstance(run_with_args, types.MethodType)
if self._new_style:
func = run_with_args.__func__
argspec = inspect.getargspec(func)
if argspec.args and argspec.args[0] == 'self':
argspec = argspec._replace( # pylint: disable=protected-access
args=argspec.args[1:])
self._argspec = argspec
# TODO(craigcitro): Do we really want to support all this
# nonsense?
self._star_args = self._argspec.varargs is not None
self._star_kwds = self._argspec.keywords is not None
self._max_args = len(self._argspec.args or ())
self._min_args = self._max_args - len(self._argspec.defaults or ())
if self._star_args:
self._max_args = sys.maxsize
self._debug_mode = FLAGS.debug_mode
self.surface_in_shell = True
self.__doc__ = self.RunWithArgs.__doc__
def __getattr__(self, name):
if name in self._command_flags:
return self._command_flags[name].value
return super(NewCmd, self).__getattribute__(name)
def _GetFlag(self, flagname):
if flagname in self._command_flags:
return self._command_flags[flagname]
else:
return None
def Run(self, argv):
"""Run this command.
If self is a new-style command, we set up arguments and call
self.RunWithArgs, gracefully handling exceptions. If not, we
simply call self.Run(argv).
Args:
argv: List of arguments as strings.
Returns:
0 on success, nonzero on failure.
"""
if not self._new_style:
return super(NewCmd, self).Run(argv)
# TODO(craigcitro): We need to save and restore flags each time so
# that we can per-command flags in the REPL.
args = argv[1:]
fail = None
fail_template = '%s positional args, found %d, expected at %s %d'
if len(args) < self._min_args:
fail = fail_template % ('Not enough', len(args),
'least', self._min_args)
if len(args) > self._max_args:
fail = fail_template % ('Too many', len(args),
'most', self._max_args)
if fail:
print(fail)
if self.usage:
print('Usage: %s' % (self.usage,))
return 1
if self._debug_mode:
return self.RunDebug(args, {})
else:
return self.RunSafely(args, {})
def RunCmdLoop(self, argv):
"""Hook for use in cmd.Cmd-based command shells."""
try:
args = shlex.split(argv)
except ValueError as e:
raise SyntaxError(self.EncodeForPrinting(e))
return self.Run([self._command_name] + args)
@staticmethod
def EncodeForPrinting(s):
"""Safely encode a string as the encoding for sys.stdout."""
encoding = sys.stdout.encoding or 'ascii'
return six.text_type(s).encode(encoding, 'backslashreplace')
def _FormatError(self, e):
"""Hook for subclasses to modify how error messages are printed."""
return _SafeMakeAscii(e)
def _HandleError(self, e):
message = self._FormatError(e)
print('Exception raised in %s operation: %s' % (
self._command_name, message))
return 1
def _IsDebuggableException(self, e):
"""Hook for subclasses to skip debugging on certain exceptions."""
return not isinstance(e, app.UsageError)
def RunDebug(self, args, kwds):
"""Run this command in debug mode."""
try:
return_value = self.RunWithArgs(*args, **kwds)
except BaseException as e:
# Don't break into the debugger for expected exceptions.
if not self._IsDebuggableException(e):
return self._HandleError(e)
print()
print('****************************************************')
print('** Unexpected Exception raised in execution! **')
if FLAGS.headless:
print('** --headless mode enabled, exiting. **')
print('** See STDERR for traceback. **')
else:
print('** --debug_mode enabled, starting pdb. **')
print('****************************************************')
print()
traceback.print_exc()
print()
if not FLAGS.headless:
pdb.post_mortem()
return 1
return return_value
def RunSafely(self, args, kwds):
"""Run this command, turning exceptions into print statements."""
try:
return_value = self.RunWithArgs(*args, **kwds)
except BaseException as e:
return self._HandleError(e)
return return_value
class CommandLoop(cmd.Cmd):
"""Instance of cmd.Cmd built to work with NewCmd."""
class TerminateSignal(Exception):
"""Exception type used for signaling loop completion."""
def __init__(self, commands, prompt):
cmd.Cmd.__init__(self)
self._commands = {'help': commands['help']}
self._special_command_names = ['help', 'repl', 'EOF']
for name, command in commands.items():
if (name not in self._special_command_names and
isinstance(command, NewCmd) and
command.surface_in_shell):
self._commands[name] = command
setattr(self, 'do_%s' % (name,), command.RunCmdLoop)
self._default_prompt = prompt
self._set_prompt()
self._last_return_code = 0
@property
def last_return_code(self):
return self._last_return_code
def _set_prompt(self):
self.prompt = self._default_prompt
def do_EOF(self, *unused_args): # pylint: disable=invalid-name
"""Terminate the running command loop.
This function raises an exception to avoid the need to do
potentially-error-prone string parsing inside onecmd.
Args:
*unused_args: unused.
Returns:
Never returns.
Raises:
CommandLoop.TerminateSignal: always.
"""
raise CommandLoop.TerminateSignal()
def postloop(self):
print('Goodbye.')
# pylint: disable=arguments-differ
def completedefault(self, unused_text, line, unused_begidx, unused_endidx):
if not line:
return []
else:
command_name = line.partition(' ')[0].lower()
usage = ''
if command_name in self._commands:
usage = self._commands[command_name].usage
if usage:
print()
print(usage)
print('%s%s' % (self.prompt, line), end=' ')
return []
# pylint: enable=arguments-differ
def emptyline(self):
print('Available commands:', end=' ')
print(' '.join(list(self._commands)))
def precmd(self, line):
"""Preprocess the shell input."""
if line == 'EOF':
return line
if line.startswith('exit') or line.startswith('quit'):
return 'EOF'
words = line.strip().split()
if len(words) == 1 and words[0] not in ['help', 'ls', 'version']:
return 'help %s' % (line.strip(),)
return line
def onecmd(self, line):
"""Process a single command.
Runs a single command, and stores the return code in
self._last_return_code. Always returns False unless the command
was EOF.
Args:
line: (str) Command line to process.
Returns:
A bool signaling whether or not the command loop should terminate.
"""
try:
self._last_return_code = cmd.Cmd.onecmd(self, line)
except CommandLoop.TerminateSignal:
return True
except BaseException as e:
name = line.split(' ')[0]
print('Error running %s:' % name)
print(e)
self._last_return_code = 1
return False
def get_names(self):
names = dir(self)
commands = (name for name in self._commands
if name not in self._special_command_names)
names.extend('do_%s' % (name,) for name in commands)
names.remove('do_EOF')
return names
def do_help(self, command_name):
"""Print the help for command_name (if present) or general help."""
# TODO(craigcitro): Add command-specific flags.
def FormatOneCmd(name, command, command_names):
indent_size = appcommands.GetMaxCommandLength() + 3
if len(command_names) > 1:
indent = ' ' * indent_size
command_help = flags.TextWrap(
command.CommandGetHelp('', cmd_names=command_names),
indent=indent,
firstline_indent='')
first_help_line, _, rest = command_help.partition('\n')
first_line = '%-*s%s' % (indent_size,
name + ':', first_help_line)
return '\n'.join((first_line, rest))
else:
default_indent = ' '
return '\n' + flags.TextWrap(
command.CommandGetHelp('', cmd_names=command_names),
indent=default_indent,
firstline_indent=default_indent) + '\n'
if not command_name:
print('\nHelp for commands:\n')
command_names = list(self._commands)
print('\n\n'.join(
FormatOneCmd(name, command, command_names)
for name, command in self._commands.items()
if name not in self._special_command_names))
print()
elif command_name in self._commands:
print(FormatOneCmd(command_name, self._commands[command_name],
command_names=[command_name]))
return 0
def postcmd(self, stop, line):
return bool(stop) or line == 'EOF'
class Repl(NewCmd):
"""Start an interactive session."""
PROMPT = '> '
def __init__(self, name, fv):
super(Repl, self).__init__(name, fv)
self.surface_in_shell = False
flags.DEFINE_string(
'prompt', '',
'Prompt to use for interactive shell.',
flag_values=fv)
def RunWithArgs(self):
"""Start an interactive session."""
prompt = FLAGS.prompt or self.PROMPT
repl = CommandLoop(appcommands.GetCommandList(), prompt=prompt)
print('Welcome! (Type help for more information.)')
while True:
try:
repl.cmdloop()
break
except KeyboardInterrupt:
print()
return repl.last_return_code