| import contextlib |
| import os |
| import os.path |
| import subprocess |
| import sys |
| import tempfile |
| import threading |
| from textwrap import dedent |
| import unittest |
| |
| from test import support |
| from test.support import os_helper |
| |
| from test.support import interpreters |
| |
| |
| def _captured_script(script): |
| r, w = os.pipe() |
| indented = script.replace('\n', '\n ') |
| wrapped = dedent(f""" |
| import contextlib |
| with open({w}, 'w', encoding='utf-8') as spipe: |
| with contextlib.redirect_stdout(spipe): |
| {indented} |
| """) |
| return wrapped, open(r, encoding='utf-8') |
| |
| |
| def clean_up_interpreters(): |
| for interp in interpreters.list_all(): |
| if interp.id == 0: # main |
| continue |
| try: |
| interp.close() |
| except RuntimeError: |
| pass # already destroyed |
| |
| |
| def _run_output(interp, request, init=None): |
| script, rpipe = _captured_script(request) |
| with rpipe: |
| if init: |
| interp.prepare_main(init) |
| interp.exec_sync(script) |
| return rpipe.read() |
| |
| |
| @contextlib.contextmanager |
| def _running(interp): |
| r, w = os.pipe() |
| def run(): |
| interp.exec_sync(dedent(f""" |
| # wait for "signal" |
| with open({r}) as rpipe: |
| rpipe.read() |
| """)) |
| |
| t = threading.Thread(target=run) |
| t.start() |
| |
| yield |
| |
| with open(w, 'w') as spipe: |
| spipe.write('done') |
| t.join() |
| |
| |
| class TestBase(unittest.TestCase): |
| |
| def pipe(self): |
| def ensure_closed(fd): |
| try: |
| os.close(fd) |
| except OSError: |
| pass |
| r, w = os.pipe() |
| self.addCleanup(lambda: ensure_closed(r)) |
| self.addCleanup(lambda: ensure_closed(w)) |
| return r, w |
| |
| def temp_dir(self): |
| tempdir = tempfile.mkdtemp() |
| tempdir = os.path.realpath(tempdir) |
| self.addCleanup(lambda: os_helper.rmtree(tempdir)) |
| return tempdir |
| |
| def make_script(self, filename, dirname=None, text=None): |
| if text: |
| text = dedent(text) |
| if dirname is None: |
| dirname = self.temp_dir() |
| filename = os.path.join(dirname, filename) |
| |
| os.makedirs(os.path.dirname(filename), exist_ok=True) |
| with open(filename, 'w', encoding='utf-8') as outfile: |
| outfile.write(text or '') |
| return filename |
| |
| def make_module(self, name, pathentry=None, text=None): |
| if text: |
| text = dedent(text) |
| if pathentry is None: |
| pathentry = self.temp_dir() |
| else: |
| os.makedirs(pathentry, exist_ok=True) |
| *subnames, basename = name.split('.') |
| |
| dirname = pathentry |
| for subname in subnames: |
| dirname = os.path.join(dirname, subname) |
| if os.path.isdir(dirname): |
| pass |
| elif os.path.exists(dirname): |
| raise Exception(dirname) |
| else: |
| os.mkdir(dirname) |
| initfile = os.path.join(dirname, '__init__.py') |
| if not os.path.exists(initfile): |
| with open(initfile, 'w'): |
| pass |
| filename = os.path.join(dirname, basename + '.py') |
| |
| with open(filename, 'w', encoding='utf-8') as outfile: |
| outfile.write(text or '') |
| return filename |
| |
| @support.requires_subprocess() |
| def run_python(self, *argv): |
| proc = subprocess.run( |
| [sys.executable, *argv], |
| capture_output=True, |
| text=True, |
| ) |
| return proc.returncode, proc.stdout, proc.stderr |
| |
| def assert_python_ok(self, *argv): |
| exitcode, stdout, stderr = self.run_python(*argv) |
| self.assertNotEqual(exitcode, 1) |
| return stdout, stderr |
| |
| def assert_python_failure(self, *argv): |
| exitcode, stdout, stderr = self.run_python(*argv) |
| self.assertNotEqual(exitcode, 0) |
| return stdout, stderr |
| |
| def tearDown(self): |
| clean_up_interpreters() |