blob: 71db8d69668599483704cec4609eb06f3c8a0069 [file] [log] [blame]
import collections
import faulthandler
import json
import os
import queue
import subprocess
import sys
import threading
import time
import traceback
import types
from test import support
from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
format_test_result, TestResult, is_failed, TIMEOUT)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds
assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
# Kill the main process after 5 minutes. It is supposed to write an update
# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
# buildbot workers.
MAIN_PROCESS_TIMEOUT = 5 * 60.0
assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
# Time to wait until a worker completes: should be immediate
JOIN_TIMEOUT = 30.0 # seconds
def must_stop(result, ns):
if result.result == INTERRUPTED:
return True
if ns.failfast and is_failed(result, ns):
return True
return False
def parse_worker_args(worker_args):
ns_dict, test_name = json.loads(worker_args)
ns = types.SimpleNamespace(**ns_dict)
return (ns, test_name)
def run_test_in_subprocess(testname, ns):
ns_dict = vars(ns)
worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args)
cmd = [sys.executable, *support.args_from_interpreter_flags(),
'-u', # Unbuffered stdout and stderr
'-m', 'test.regrtest',
'--worker-args', worker_args]
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
return subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
close_fds=(os.name != 'nt'),
cwd=support.SAVEDCWD)
def run_tests_worker(ns, test_name):
setup_tests(ns)
result = runtest(ns, test_name)
print() # Force a newline (just in case)
# Serialize TestResult as list in JSON
print(json.dumps(list(result)), flush=True)
sys.exit(0)
# We do not use a generator so multiple threads can call next().
class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests_iter):
self.lock = threading.Lock()
self.tests_iter = tests_iter
def __iter__(self):
return self
def __next__(self):
with self.lock:
if self.tests_iter is None:
raise StopIteration
return next(self.tests_iter)
def stop(self):
with self.lock:
self.tests_iter = None
MultiprocessResult = collections.namedtuple('MultiprocessResult',
'result stdout stderr error_msg')
class ExitThread(Exception):
pass
class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id, runner):
super().__init__()
self.worker_id = worker_id
self.pending = runner.pending
self.output = runner.output
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.current_test_name = None
self.start_time = None
self._popen = None
self._killed = False
self._stopped = False
def __repr__(self):
info = [f'TestWorkerProcess #{self.worker_id}']
if self.is_alive():
info.append("running")
else:
info.append('stopped')
test = self.current_test_name
if test:
info.append(f'test={test}')
popen = self._popen
if popen is not None:
dt = time.monotonic() - self.start_time
info.extend((f'pid={self._popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)
def _kill(self):
popen = self._popen
if popen is None:
return
if self._killed:
return
self._killed = True
print(f"Kill {self}", file=sys.stderr, flush=True)
try:
popen.kill()
except OSError as exc:
print_warning(f"Failed to kill {self}: {exc!r}")
def stop(self):
# Method called from a different thread to stop this thread
self._stopped = True
self._kill()
def mp_result_error(self, test_name, error_type, stdout='', stderr='',
err_msg=None):
test_time = time.monotonic() - self.start_time
result = TestResult(test_name, error_type, test_time, None)
return MultiprocessResult(result, stdout, stderr, err_msg)
def _run_process(self, test_name):
self.start_time = time.monotonic()
self.current_test_name = test_name
try:
popen = run_test_in_subprocess(test_name, self.ns)
self._killed = False
self._popen = popen
except:
self.current_test_name = None
raise
try:
if self._stopped:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self._kill()
raise ExitThread
try:
stdout, stderr = popen.communicate(timeout=self.timeout)
retcode = popen.returncode
assert retcode is not None
except subprocess.TimeoutExpired:
if self._stopped:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread
# On timeout, kill the process
self._kill()
# None means TIMEOUT for the caller
retcode = None
# bpo-38207: Don't attempt to call communicate() again: on it
# can hang until all child processes using stdout and stderr
# pipes completes.
stdout = stderr = ''
except OSError:
if self._stopped:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread
raise
else:
stdout = stdout.strip()
stderr = stderr.rstrip()
return (retcode, stdout, stderr)
except:
self._kill()
raise
finally:
self._wait_completed()
self._popen = None
self.current_test_name = None
def _runtest(self, test_name):
retcode, stdout, stderr = self._run_process(test_name)
if retcode is None:
return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
err_msg = None
if retcode != 0:
err_msg = "Exit code %s" % retcode
else:
stdout, _, result = stdout.rpartition("\n")
stdout = stdout.rstrip()
if not result:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
result = json.loads(result)
result = TestResult(*result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
if err_msg is not None:
return self.mp_result_error(test_name, CHILD_ERROR,
stdout, stderr, err_msg)
return MultiprocessResult(result, stdout, stderr, err_msg)
def run(self):
while not self._stopped:
try:
try:
test_name = next(self.pending)
except StopIteration:
break
mp_result = self._runtest(test_name)
self.output.put((False, mp_result))
if must_stop(mp_result.result, self.ns):
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
def _wait_completed(self):
popen = self._popen
# stdout and stderr must be closed to ensure that communicate()
# does not hang
popen.stdout.close()
popen.stderr.close()
try:
popen.wait(JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print_warning(f"Failed to wait for {self} completion "
f"(timeout={format_duration(JOIN_TIMEOUT)}): "
f"{exc!r}")
def wait_stopped(self, start_time):
# bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
# which killed the process. Sometimes, killing the process from the
# main thread does not interrupt popen.communicate() in
# TestWorkerProcess thread. This loop with a timeout is a workaround
# for that.
#
# Moreover, if this method fails to join the thread, it is likely
# that Python will hang at exit while calling threading._shutdown()
# which tries again to join the blocked thread. Regrtest.main()
# uses EXIT_TIMEOUT to workaround this second bug.
while True:
# Write a message every second
self.join(1.0)
if not self.is_alive():
break
dt = time.monotonic() - start_time
self.regrtest.log(f"Waiting for {self} thread "
f"for {format_duration(dt)}")
if dt > JOIN_TIMEOUT:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break
def get_running(workers):
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
class MultiprocessTestRunner:
def __init__(self, regrtest):
self.regrtest = regrtest
self.log = self.regrtest.log
self.ns = regrtest.ns
self.output = queue.Queue()
self.pending = MultiprocessIterator(self.regrtest.tests)
if self.ns.timeout is not None:
self.worker_timeout = self.ns.timeout * 1.5
else:
self.worker_timeout = None
self.workers = None
def start_workers(self):
self.workers = [TestWorkerProcess(index, self)
for index in range(1, self.ns.use_mp + 1)]
self.log("Run tests in parallel using %s child processes"
% len(self.workers))
for worker in self.workers:
worker.start()
def stop_workers(self):
start_time = time.monotonic()
for worker in self.workers:
worker.stop()
for worker in self.workers:
worker.wait_stopped(start_time)
def _get_result(self):
if not any(worker.is_alive() for worker in self.workers):
# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None
use_faulthandler = (self.ns.timeout is not None)
timeout = PROGRESS_UPDATE
while True:
if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)
# wait for a thread
try:
return self.output.get(timeout=timeout)
except queue.Empty:
pass
# display progress
running = get_running(self.workers)
if running and not self.ns.pgo:
self.log('running: %s' % ', '.join(running))
def display_result(self, mp_result):
result = mp_result.result
text = format_test_result(result)
if mp_result.error_msg is not None:
# CHILD_ERROR
text += ' (%s)' % mp_result.error_msg
elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
text += ' (%s)' % format_duration(result.test_time)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item):
if item[0]:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
return True
self.test_index += 1
mp_result = item[1]
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
if mp_result.stdout:
print(mp_result.stdout, flush=True)
if mp_result.stderr and not self.ns.pgo:
print(mp_result.stderr, file=sys.stderr, flush=True)
if must_stop(mp_result.result, self.ns):
return True
return False
def run_tests(self):
self.start_workers()
self.test_index = 0
try:
while True:
item = self._get_result()
if item is None:
break
stop = self._process_result(item)
if stop:
break
except KeyboardInterrupt:
print()
self.regrtest.interrupted = True
finally:
if self.ns.timeout is not None:
faulthandler.cancel_dump_traceback_later()
# Always ensure that all worker processes are no longer
# worker when we exit this function
self.pending.stop()
self.stop_workers()
def run_tests_multiprocess(regrtest):
MultiprocessTestRunner(regrtest).run_tests()