blob: 3a37ed09dd894347570b39e6d668b37fdf5fc0d9 [file] [log] [blame]
import contextlib
import os
import os.path
import subprocess
import sys
import tempfile
import threading
from textwrap import dedent
import unittest
from test import support
from test.support import os_helper
from test.support import interpreters
def _captured_script(script):
r, w = os.pipe()
indented = script.replace('\n', '\n ')
wrapped = dedent(f"""
import contextlib
with open({w}, 'w', encoding='utf-8') as spipe:
with contextlib.redirect_stdout(spipe):
{indented}
""")
return wrapped, open(r, encoding='utf-8')
def clean_up_interpreters():
for interp in interpreters.list_all():
if interp.id == 0: # main
continue
try:
interp.close()
except RuntimeError:
pass # already destroyed
def _run_output(interp, request, init=None):
script, rpipe = _captured_script(request)
with rpipe:
if init:
interp.prepare_main(init)
interp.exec_sync(script)
return rpipe.read()
@contextlib.contextmanager
def _running(interp):
r, w = os.pipe()
def run():
interp.exec_sync(dedent(f"""
# wait for "signal"
with open({r}) as rpipe:
rpipe.read()
"""))
t = threading.Thread(target=run)
t.start()
yield
with open(w, 'w') as spipe:
spipe.write('done')
t.join()
class TestBase(unittest.TestCase):
def pipe(self):
def ensure_closed(fd):
try:
os.close(fd)
except OSError:
pass
r, w = os.pipe()
self.addCleanup(lambda: ensure_closed(r))
self.addCleanup(lambda: ensure_closed(w))
return r, w
def temp_dir(self):
tempdir = tempfile.mkdtemp()
tempdir = os.path.realpath(tempdir)
self.addCleanup(lambda: os_helper.rmtree(tempdir))
return tempdir
def make_script(self, filename, dirname=None, text=None):
if text:
text = dedent(text)
if dirname is None:
dirname = self.temp_dir()
filename = os.path.join(dirname, filename)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w', encoding='utf-8') as outfile:
outfile.write(text or '')
return filename
def make_module(self, name, pathentry=None, text=None):
if text:
text = dedent(text)
if pathentry is None:
pathentry = self.temp_dir()
else:
os.makedirs(pathentry, exist_ok=True)
*subnames, basename = name.split('.')
dirname = pathentry
for subname in subnames:
dirname = os.path.join(dirname, subname)
if os.path.isdir(dirname):
pass
elif os.path.exists(dirname):
raise Exception(dirname)
else:
os.mkdir(dirname)
initfile = os.path.join(dirname, '__init__.py')
if not os.path.exists(initfile):
with open(initfile, 'w'):
pass
filename = os.path.join(dirname, basename + '.py')
with open(filename, 'w', encoding='utf-8') as outfile:
outfile.write(text or '')
return filename
@support.requires_subprocess()
def run_python(self, *argv):
proc = subprocess.run(
[sys.executable, *argv],
capture_output=True,
text=True,
)
return proc.returncode, proc.stdout, proc.stderr
def assert_python_ok(self, *argv):
exitcode, stdout, stderr = self.run_python(*argv)
self.assertNotEqual(exitcode, 1)
return stdout, stderr
def assert_python_failure(self, *argv):
exitcode, stdout, stderr = self.run_python(*argv)
self.assertNotEqual(exitcode, 0)
return stdout, stderr
def tearDown(self):
clean_up_interpreters()