blob: a0b0181aa33cf2e8ad215ca387e0123017368a84 [file] [log] [blame]
#!/usr/bin/env python2
# Copyright 2013 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import Queue
import optparse
import subprocess
import sys
import threading
import time
class FilterFormat:
total_tests = 0
finished_tests = 0
tests = {}
outputs = {}
failures = []
def print_test_status(self, last_finished_test, time_ms):
print "[%d/%d] %s (%d ms)" % (self.finished_tests,
self.total_tests,
last_finished_test,
time_ms)
def handle_meta(self, job_id, args):
(command, arg) = args.split(' ', 1)
if command == "TEST":
(binary, test) = arg.split(' ', 1)
self.tests[job_id] = (binary, test.strip())
self.outputs[job_id] = []
self.total_tests += 1
elif command == "EXIT":
(exit_code, time_ms) = [int(x) for x in arg.split(' ', 1)]
self.finished_tests += 1
(binary, test) = self.tests[job_id]
self.print_test_status(test, time_ms)
if exit_code != 0:
self.failures.append(self.tests[job_id])
for line in self.outputs[job_id]:
print line
def add_stdout(self, job_id, output):
self.outputs[job_id].append(output)
def log(self):
print "[0/?] Running tests...\r",
while True:
line = log.get()
if line == "":
break
(prefix, output) = line.split(' ', 1)
if prefix[-1] == ':':
self.handle_meta(int(prefix[:-1]), output)
else:
self.add_stdout(int(prefix[:-1]), output)
if self.failures:
print "FAILED TESTS (%d/%d):" % (len(self.failures), self.total_tests)
for (binary, test) in self.failures:
print " ", binary + ": " + test
class RawFormat:
def log(self):
while True:
line = log.get()
if line == "":
return
sys.stdout.write(line + "\n")
sys.stdout.flush()
parser = optparse.OptionParser(
usage = 'usage: %prog [options] executable [executable ...]')
parser.add_option('-r', '--repeat', type='int', default=1,
help='repeat tests')
parser.add_option('-w', '--workers', type='int', default=16,
help='number of workers to spawn')
parser.add_option('--gtest_color', type='string', default='yes',
help='color output')
parser.add_option('--gtest_filter', type='string', default='',
help='test filter')
parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
default=False, help='run disabled tests too')
parser.add_option('--format', type='string', default='filter',
help='output format (raw,filter)')
(options, binaries) = parser.parse_args()
if binaries == []:
parser.print_usage()
sys.exit(1)
logger = RawFormat()
if options.format == 'raw':
pass
elif options.format == 'filter':
logger = FilterFormat()
else:
sys.exit("Unknown output format: " + options.format)
# Find tests.
tests = []
for test_binary in binaries:
command = [test_binary]
if options.gtest_filter != '':
command += ['--gtest_filter=' + options.gtest_filter]
if options.gtest_also_run_disabled_tests:
command += ['--gtest_also_run_disabled_tests']
test_list = subprocess.Popen(command + ['--gtest_list_tests'],
stdout=subprocess.PIPE).communicate()[0]
test_group = ''
for line in test_list.split('\n'):
if not line.strip():
continue
if line[0] != " ":
test_group = line.strip()
continue
line = line.strip()
if not options.gtest_also_run_disabled_tests and 'DISABLED' in line:
continue
test = test_group + line
tests.append((test_binary, command, test))
# Repeat tests (-r flag).
tests *= options.repeat
log = Queue.Queue()
test_queue = Queue.Queue()
for job_id, (test_binary, command, test) in enumerate(tests):
log.put(str(job_id) + ': TEST ' + test_binary + ' ' + test)
test_queue.put((command, job_id, test))
exit_code = 0
def run_job((command, job_id, test)):
begin = time.time()
sub = subprocess.Popen(command + ['--gtest_filter=' + test] +
['--gtest_color=' + options.gtest_color],
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT)
while True:
line = sub.stdout.readline()
if line == '':
break
log.put(str(job_id) + '> ' + line.rstrip())
code = sub.wait()
runtime_ms = int(1000 * (time.time() - begin))
log.put(str(job_id) + ': EXIT ' + str(code) + ' ' + str(runtime_ms))
if code != 0:
global exit_code
exit_code = code
def worker():
while True:
try:
run_job(test_queue.get_nowait())
test_queue.task_done()
except Queue.Empty:
return
def start_daemon(func):
t = threading.Thread(target=func)
t.daemon = True
t.start()
return t
workers = [start_daemon(worker) for i in range(options.workers)]
printer = start_daemon(logger.log)
[t.join() for t in workers]
log.put("")
printer.join()
sys.exit(exit_code)