| import _thread |
| import contextlib |
| import functools |
| import sys |
| import threading |
| import time |
| |
| from test import support |
| |
| |
| #======================================================================= |
| # Threading support to prevent reporting refleaks when running regrtest.py -R |
| |
| # NOTE: we use thread._count() rather than threading.enumerate() (or the |
| # moral equivalent thereof) because a threading.Thread object is still alive |
| # until its __bootstrap() method has returned, even after it has been |
| # unregistered from the threading module. |
| # thread._count(), on the other hand, only gets decremented *after* the |
| # __bootstrap() method has returned, which gives us reliable reference counts |
| # at the end of a test run. |
| |
| |
| def threading_setup(): |
| return _thread._count(), threading._dangling.copy() |
| |
| |
| def threading_cleanup(*original_values): |
| _MAX_COUNT = 100 |
| |
| for count in range(_MAX_COUNT): |
| values = _thread._count(), threading._dangling |
| if values == original_values: |
| break |
| |
| if not count: |
| # Display a warning at the first iteration |
| support.environment_altered = True |
| dangling_threads = values[1] |
| support.print_warning(f"threading_cleanup() failed to cleanup " |
| f"{values[0] - original_values[0]} threads " |
| f"(count: {values[0]}, " |
| f"dangling: {len(dangling_threads)})") |
| for thread in dangling_threads: |
| support.print_warning(f"Dangling thread: {thread!r}") |
| |
| # Don't hold references to threads |
| dangling_threads = None |
| values = None |
| |
| time.sleep(0.01) |
| support.gc_collect() |
| |
| |
| def reap_threads(func): |
| """Use this function when threads are being used. This will |
| ensure that the threads are cleaned up even when the test fails. |
| """ |
| @functools.wraps(func) |
| def decorator(*args): |
| key = threading_setup() |
| try: |
| return func(*args) |
| finally: |
| threading_cleanup(*key) |
| return decorator |
| |
| |
| @contextlib.contextmanager |
| def wait_threads_exit(timeout=None): |
| """ |
| bpo-31234: Context manager to wait until all threads created in the with |
| statement exit. |
| |
| Use _thread.count() to check if threads exited. Indirectly, wait until |
| threads exit the internal t_bootstrap() C function of the _thread module. |
| |
| threading_setup() and threading_cleanup() are designed to emit a warning |
| if a test leaves running threads in the background. This context manager |
| is designed to cleanup threads started by the _thread.start_new_thread() |
| which doesn't allow to wait for thread exit, whereas thread.Thread has a |
| join() method. |
| """ |
| if timeout is None: |
| timeout = support.SHORT_TIMEOUT |
| old_count = _thread._count() |
| try: |
| yield |
| finally: |
| start_time = time.monotonic() |
| deadline = start_time + timeout |
| while True: |
| count = _thread._count() |
| if count <= old_count: |
| break |
| if time.monotonic() > deadline: |
| dt = time.monotonic() - start_time |
| msg = (f"wait_threads() failed to cleanup {count - old_count} " |
| f"threads after {dt:.1f} seconds " |
| f"(count: {count}, old count: {old_count})") |
| raise AssertionError(msg) |
| time.sleep(0.010) |
| support.gc_collect() |
| |
| |
| def join_thread(thread, timeout=None): |
| """Join a thread. Raise an AssertionError if the thread is still alive |
| after timeout seconds. |
| """ |
| if timeout is None: |
| timeout = support.SHORT_TIMEOUT |
| thread.join(timeout) |
| if thread.is_alive(): |
| msg = f"failed to join the thread in {timeout:.1f} seconds" |
| raise AssertionError(msg) |
| |
| |
| @contextlib.contextmanager |
| def start_threads(threads, unlock=None): |
| import faulthandler |
| threads = list(threads) |
| started = [] |
| try: |
| try: |
| for t in threads: |
| t.start() |
| started.append(t) |
| except: |
| if support.verbose: |
| print("Can't start %d threads, only %d threads started" % |
| (len(threads), len(started))) |
| raise |
| yield |
| finally: |
| try: |
| if unlock: |
| unlock() |
| endtime = time.monotonic() |
| for timeout in range(1, 16): |
| endtime += 60 |
| for t in started: |
| t.join(max(endtime - time.monotonic(), 0.01)) |
| started = [t for t in started if t.is_alive()] |
| if not started: |
| break |
| if support.verbose: |
| print('Unable to join %d threads during a period of ' |
| '%d minutes' % (len(started), timeout)) |
| finally: |
| started = [t for t in started if t.is_alive()] |
| if started: |
| faulthandler.dump_traceback(sys.stdout) |
| raise AssertionError('Unable to join %d threads' % len(started)) |
| |
| |
| class catch_threading_exception: |
| """ |
| Context manager catching threading.Thread exception using |
| threading.excepthook. |
| |
| Attributes set when an exception is caught: |
| |
| * exc_type |
| * exc_value |
| * exc_traceback |
| * thread |
| |
| See threading.excepthook() documentation for these attributes. |
| |
| These attributes are deleted at the context manager exit. |
| |
| Usage: |
| |
| with threading_helper.catch_threading_exception() as cm: |
| # code spawning a thread which raises an exception |
| ... |
| |
| # check the thread exception, use cm attributes: |
| # exc_type, exc_value, exc_traceback, thread |
| ... |
| |
| # exc_type, exc_value, exc_traceback, thread attributes of cm no longer |
| # exists at this point |
| # (to avoid reference cycles) |
| """ |
| |
| def __init__(self): |
| self.exc_type = None |
| self.exc_value = None |
| self.exc_traceback = None |
| self.thread = None |
| self._old_hook = None |
| |
| def _hook(self, args): |
| self.exc_type = args.exc_type |
| self.exc_value = args.exc_value |
| self.exc_traceback = args.exc_traceback |
| self.thread = args.thread |
| |
| def __enter__(self): |
| self._old_hook = threading.excepthook |
| threading.excepthook = self._hook |
| return self |
| |
| def __exit__(self, *exc_info): |
| threading.excepthook = self._old_hook |
| del self.exc_type |
| del self.exc_value |
| del self.exc_traceback |
| del self.thread |