blob: 3794707ade3325b0bf0850a90b211c91e15b6678 [file] [log] [blame]
#!/usr/bin/env python
from __future__ import print_function
import collections
import ctypes
import os
import re
import struct
import sys
import frontend
class SlidingRateInfo(object):
def __init__(self, n=32):
self.rate = -1
self.last_update = -1
self.times = collections.deque(maxlen=n)
def update_rate(self, update_hint, time_millis):
if update_hint == self.last_update:
return
self.last_update = update_hint
if len(self.times) == self.times.maxlen:
self.times.popleft()
self.times.append(time_millis)
if self.times[-1] != self.times[0]:
self.rate = len(self.times) / ((self.times[-1] - self.times[0]) / 1e3)
strip_ansi_re = re.compile(r'\x1B\[[^a-zA-Z]*[a-zA-Z]')
def strip_ansi_escape_codes(output):
return strip_ansi_re.sub('', output)
class NinjaNativeFrontend:
def __init__(self):
self.total_edges = 0
self.running_edges = 0
self.started_edges = 0
self.finished_edges = 0
self.running = {}
self.time_millis = 0
self.progress_status_format = os.getenv('NINJA_STATUS', '[%f/%t] ')
self.current_rate = SlidingRateInfo()
self.console_locked = False
self.printer = LinePrinter()
self.verbose = False
def handle(self, msg):
handled = False
if msg.HasField('total_edges'):
handled = True
self.total_edges = msg.total_edges.total_edges
if msg.HasField('build_started'):
handled = True
self.verbose = msg.build_started.verbose
self.current_rate = SlidingRateInfo(msg.build_started.parallelism)
self.running_edges = 0
self.started_edges = 0
self.finished_edges = 0
self.running = {}
if msg.HasField('build_finished'):
handled = True
self.printer.set_console_locked(False)
self.printer.print_on_new_line('')
if msg.HasField('edge_started'):
handled = True
self.started_edges += 1
self.running_edges += 1
self.running[msg.edge_started.id] = msg.edge_started
self.time_millis = msg.edge_started.start_time
if msg.edge_started.console or self.printer.smart_terminal:
self.print_status(msg.edge_started)
if msg.edge_started.console:
self.printer.set_console_locked(True)
if msg.HasField('edge_finished'):
handled = True
self.finished_edges += 1
self.time_millis = msg.edge_finished.end_time
edge_started = self.running[msg.edge_finished.id]
if edge_started.console:
self.printer.set_console_locked(False)
if not edge_started.console:
self.print_status(edge_started)
self.running_edges -= 1
del self.running[msg.edge_finished.id]
# Print the command that is spewing before printing its output.
if msg.edge_finished.status != 0:
self.printer.print_on_new_line('FAILED: ' + ' '.join(edge_started.outputs))
self.printer.print_on_new_line(edge_started.command)
# ninja sets stdout and stderr of subprocesses to a pipe, to be able to
# check if the output is empty. Some compilers, e.g. clang, check
# isatty(stderr) to decide if they should print colored output.
# To make it possible to use colored output with ninja, subprocesses should
# be run with a flag that forces them to always print color escape codes.
# To make sure these escape codes don't show up in a file if ninja's output
# is piped to a file, ninja strips ansi escape codes again if it's not
# writing to a |smart_terminal_|.
# (Launching subprocesses in pseudo ttys doesn't work because there are
# only a few hundred available on some systems, and ninja can launch
# thousands of parallel compile commands.)
# TODO: There should be a flag to disable escape code stripping.
if msg.edge_finished.output != '':
if not self.printer.smart_terminal:
msg.edge_finished.output = strip_ansi_escape_codes(msg.edge_finished.output)
self.printer.print_on_new_line(msg.edge_finished.output)
if msg.HasField('message'):
handled = True
# TODO(colincross): get the enum values from proto
if msg.message.level == 0:
prefix = 'ninja: '
elif msg.message.level == 1:
prefix = 'ninja: warning: '
elif msg.message.level == 2:
prefix = 'ninja: error: '
self.printer.print_line(prefix + msg.message.message, LinePrinter.LINE_FULL)
if not handled:
pass
def format_progress_status(self, fmt):
out = ''
fmt_iter = iter(fmt)
for c in fmt_iter:
if c == '%':
c = next(fmt_iter)
if c == '%':
out += c
elif c == 's':
out += str(self.started_edges)
elif c == 't':
out += str(self.total_edges)
elif c == 'r':
out += str(self.running_edges)
elif c == 'u':
out += str(self.total_edges - self.started_edges)
elif c == 'f':
out += str(self.finished_edges)
elif c == 'o':
if self.time_millis > 0:
rate = self.finished_edges / (self.time_millis / 1e3)
out += '{:.1f}'.format(rate)
else:
out += '?'
elif c == 'c':
self.current_rate.update_rate(self.finished_edges, self.time_millis)
if self.current_rate.rate == -1:
out += '?'
else:
out += '{:.1f}'.format(self.current_rate.rate)
elif c == 'p':
out += '{:3d}%'.format((100 * self.finished_edges) // self.total_edges)
elif c == 'e':
out += '{:.3f}'.format(self.time_millis / 1e3)
else:
raise RuntimeError('unknown placeholder '' + c +'' in $NINJA_STATUS')
else:
out += c
return out
def print_status(self, edge_started):
to_print = edge_started.desc
if self.verbose or to_print == '':
to_print = edge_started.command
to_print = self.format_progress_status(self.progress_status_format) + to_print
self.printer.print_line(to_print, LinePrinter.LINE_FULL if self.verbose else LinePrinter.LINE_ELIDE)
def elide_middle(status, width):
margin = 3 # Space for '...'.
if len(status) + margin > width:
elide_size = (width - margin) // 2
status = status[0:elide_size] + '...' + status[-elide_size:]
return status
class LinePrinter(object):
LINE_FULL = 1
LINE_ELIDE = 2
def __init__(self):
# Whether we can do fancy terminal control codes.
self.smart_terminal = False
# Whether the caret is at the beginning of a blank line.
self.have_blank_line = True
# Whether console is locked.
self.console_locked = False
# Buffered current line while console is locked.
self.line_buffer = ''
# Buffered line type while console is locked.
self.line_type = self.LINE_FULL
# Buffered console output while console is locked.
self.output_buffer = ''
if os.name == 'windows':
STD_OUTPUT_HANDLE = -11
self.console = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
csbi = ctypes.create_string_buffer(22)
self.smart_terminal = ctypes.windll.kernel32.GetConsoleScreenBufferInfo(self.console, csbi)
else:
term = os.getenv('TERM')
self.smart_terminal = os.isatty(1) and term != '' and term != 'dumb'
def print_line(self, to_print, line_type):
if self.console_locked:
self.line_buffer = to_print
self.line_type = line_type
if self.smart_terminal:
sys.stdout.write('\r') # Print over previous line, if any.
if self.smart_terminal and line_type == self.LINE_ELIDE:
if os.name == 'windows':
csbi = ctypes.create_string_buffer(22)
ctypes.windll.kernel32.GetConsoleScreenBufferInfo(self.console, csbi)
(cols, rows) = struct.unpack('hh', csbi.raw)
to_print = elide_middle(to_print, cols)
# TODO: windows support
# We don't want to have the cursor spamming back and forth, so instead of
# printf use WriteConsoleOutput which updates the contents of the buffer,
# but doesn't move the cursor position.
sys.stdout.write(to_print)
sys.stdout.flush()
else:
# Limit output to width of the terminal if provided so we don't cause
# line-wrapping.
import fcntl, termios
winsize = fcntl.ioctl(0, termios.TIOCGWINSZ, '\0'*4)
(rows, cols) = struct.unpack('hh', winsize)
to_print = elide_middle(to_print, cols)
sys.stdout.write(to_print)
sys.stdout.write('\x1B[K') # Clear to end of line.
sys.stdout.flush()
self.have_blank_line = False
else:
sys.stdout.write(to_print + '\n')
sys.stdout.flush()
def print_or_buffer(self, to_print):
if self.console_locked:
self.output_buffer += to_print
else:
sys.stdout.write(to_print)
sys.stdout.flush()
def print_on_new_line(self, to_print):
if self.console_locked or self.line_buffer != '':
self.output_buffer += self.line_buffer + '\n'
self.line_buffer = ''
if not self.have_blank_line:
self.print_or_buffer('\n')
if to_print != '':
self.print_or_buffer(to_print)
self.have_blank_line = to_print == '' or to_print[0] == '\n'
def set_console_locked(self, locked):
if locked == self.console_locked:
return
if locked:
self.print_on_new_line('\n')
self.console_locked = locked
if not locked:
self.print_on_new_line(self.output_buffer)
if self.line_buffer != '':
self.print_line(self.line_buffer, self.line_type)
self.output_buffer = ''
self.line_buffer = ''
def main():
native = NinjaNativeFrontend()
for msg in frontend.Frontend():
native.handle(msg)
if __name__ == '__main__':
main()