import unittest | |
from test import test_support | |
import subprocess | |
import sys | |
import signal | |
import os | |
import errno | |
import tempfile | |
import time | |
import re | |
import sysconfig | |
mswindows = (sys.platform == "win32") | |
# | |
# Depends on the following external programs: Python | |
# | |
if mswindows: | |
SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' | |
'os.O_BINARY);') | |
else: | |
SETBINARY = '' | |
try: | |
mkstemp = tempfile.mkstemp | |
except AttributeError: | |
# tempfile.mkstemp is not available | |
def mkstemp(): | |
"""Replacement for mkstemp, calling mktemp.""" | |
fname = tempfile.mktemp() | |
return os.open(fname, os.O_RDWR|os.O_CREAT), fname | |
class BaseTestCase(unittest.TestCase): | |
def setUp(self): | |
# Try to minimize the number of children we have so this test | |
# doesn't crash on some buildbots (Alphas in particular). | |
test_support.reap_children() | |
def tearDown(self): | |
for inst in subprocess._active: | |
inst.wait() | |
subprocess._cleanup() | |
self.assertFalse(subprocess._active, "subprocess._active not empty") | |
def assertStderrEqual(self, stderr, expected, msg=None): | |
# In a debug build, stuff like "[6580 refs]" is printed to stderr at | |
# shutdown time. That frustrates tests trying to check stderr produced | |
# from a spawned Python process. | |
actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) | |
self.assertEqual(actual, expected, msg) | |
class ProcessTestCase(BaseTestCase): | |
def test_call_seq(self): | |
# call() function with sequence argument | |
rc = subprocess.call([sys.executable, "-c", | |
"import sys; sys.exit(47)"]) | |
self.assertEqual(rc, 47) | |
def test_check_call_zero(self): | |
# check_call() function with zero return code | |
rc = subprocess.check_call([sys.executable, "-c", | |
"import sys; sys.exit(0)"]) | |
self.assertEqual(rc, 0) | |
def test_check_call_nonzero(self): | |
# check_call() function with non-zero return code | |
with self.assertRaises(subprocess.CalledProcessError) as c: | |
subprocess.check_call([sys.executable, "-c", | |
"import sys; sys.exit(47)"]) | |
self.assertEqual(c.exception.returncode, 47) | |
def test_check_output(self): | |
# check_output() function with zero return code | |
output = subprocess.check_output( | |
[sys.executable, "-c", "print 'BDFL'"]) | |
self.assertIn('BDFL', output) | |
def test_check_output_nonzero(self): | |
# check_call() function with non-zero return code | |
with self.assertRaises(subprocess.CalledProcessError) as c: | |
subprocess.check_output( | |
[sys.executable, "-c", "import sys; sys.exit(5)"]) | |
self.assertEqual(c.exception.returncode, 5) | |
def test_check_output_stderr(self): | |
# check_output() function stderr redirected to stdout | |
output = subprocess.check_output( | |
[sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], | |
stderr=subprocess.STDOUT) | |
self.assertIn('BDFL', output) | |
def test_check_output_stdout_arg(self): | |
# check_output() function stderr redirected to stdout | |
with self.assertRaises(ValueError) as c: | |
output = subprocess.check_output( | |
[sys.executable, "-c", "print 'will not be run'"], | |
stdout=sys.stdout) | |
self.fail("Expected ValueError when stdout arg supplied.") | |
self.assertIn('stdout', c.exception.args[0]) | |
def test_call_kwargs(self): | |
# call() function with keyword args | |
newenv = os.environ.copy() | |
newenv["FRUIT"] = "banana" | |
rc = subprocess.call([sys.executable, "-c", | |
'import sys, os;' | |
'sys.exit(os.getenv("FRUIT")=="banana")'], | |
env=newenv) | |
self.assertEqual(rc, 1) | |
def test_stdin_none(self): | |
# .stdin is None when not redirected | |
p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
p.wait() | |
self.assertEqual(p.stdin, None) | |
def test_stdout_none(self): | |
# .stdout is None when not redirected | |
p = subprocess.Popen([sys.executable, "-c", | |
'print " this bit of output is from a ' | |
'test of stdout in a different ' | |
'process ..."'], | |
stdin=subprocess.PIPE, stderr=subprocess.PIPE) | |
self.addCleanup(p.stdin.close) | |
self.addCleanup(p.stderr.close) | |
p.wait() | |
self.assertEqual(p.stdout, None) | |
def test_stderr_none(self): | |
# .stderr is None when not redirected | |
p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], | |
stdin=subprocess.PIPE, stdout=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stdin.close) | |
p.wait() | |
self.assertEqual(p.stderr, None) | |
def test_executable_with_cwd(self): | |
python_dir = os.path.dirname(os.path.realpath(sys.executable)) | |
p = subprocess.Popen(["somethingyoudonthave", "-c", | |
"import sys; sys.exit(47)"], | |
executable=sys.executable, cwd=python_dir) | |
p.wait() | |
self.assertEqual(p.returncode, 47) | |
@unittest.skipIf(sysconfig.is_python_build(), | |
"need an installed Python. See #7774") | |
def test_executable_without_cwd(self): | |
# For a normal installation, it should work without 'cwd' | |
# argument. For test runs in the build directory, see #7774. | |
p = subprocess.Popen(["somethingyoudonthave", "-c", | |
"import sys; sys.exit(47)"], | |
executable=sys.executable) | |
p.wait() | |
self.assertEqual(p.returncode, 47) | |
def test_stdin_pipe(self): | |
# stdin redirection | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.exit(sys.stdin.read() == "pear")'], | |
stdin=subprocess.PIPE) | |
p.stdin.write("pear") | |
p.stdin.close() | |
p.wait() | |
self.assertEqual(p.returncode, 1) | |
def test_stdin_filedes(self): | |
# stdin is set to open file descriptor | |
tf = tempfile.TemporaryFile() | |
d = tf.fileno() | |
os.write(d, "pear") | |
os.lseek(d, 0, 0) | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.exit(sys.stdin.read() == "pear")'], | |
stdin=d) | |
p.wait() | |
self.assertEqual(p.returncode, 1) | |
def test_stdin_fileobj(self): | |
# stdin is set to open file object | |
tf = tempfile.TemporaryFile() | |
tf.write("pear") | |
tf.seek(0) | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.exit(sys.stdin.read() == "pear")'], | |
stdin=tf) | |
p.wait() | |
self.assertEqual(p.returncode, 1) | |
def test_stdout_pipe(self): | |
# stdout redirection | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stdout.write("orange")'], | |
stdout=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual(p.stdout.read(), "orange") | |
def test_stdout_filedes(self): | |
# stdout is set to open file descriptor | |
tf = tempfile.TemporaryFile() | |
d = tf.fileno() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stdout.write("orange")'], | |
stdout=d) | |
p.wait() | |
os.lseek(d, 0, 0) | |
self.assertEqual(os.read(d, 1024), "orange") | |
def test_stdout_fileobj(self): | |
# stdout is set to open file object | |
tf = tempfile.TemporaryFile() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stdout.write("orange")'], | |
stdout=tf) | |
p.wait() | |
tf.seek(0) | |
self.assertEqual(tf.read(), "orange") | |
def test_stderr_pipe(self): | |
# stderr redirection | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stderr.write("strawberry")'], | |
stderr=subprocess.PIPE) | |
self.addCleanup(p.stderr.close) | |
self.assertStderrEqual(p.stderr.read(), "strawberry") | |
def test_stderr_filedes(self): | |
# stderr is set to open file descriptor | |
tf = tempfile.TemporaryFile() | |
d = tf.fileno() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stderr.write("strawberry")'], | |
stderr=d) | |
p.wait() | |
os.lseek(d, 0, 0) | |
self.assertStderrEqual(os.read(d, 1024), "strawberry") | |
def test_stderr_fileobj(self): | |
# stderr is set to open file object | |
tf = tempfile.TemporaryFile() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stderr.write("strawberry")'], | |
stderr=tf) | |
p.wait() | |
tf.seek(0) | |
self.assertStderrEqual(tf.read(), "strawberry") | |
def test_stdout_stderr_pipe(self): | |
# capture stdout and stderr to the same pipe | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys;' | |
'sys.stdout.write("apple");' | |
'sys.stdout.flush();' | |
'sys.stderr.write("orange")'], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
self.addCleanup(p.stdout.close) | |
self.assertStderrEqual(p.stdout.read(), "appleorange") | |
def test_stdout_stderr_file(self): | |
# capture stdout and stderr to the same open file | |
tf = tempfile.TemporaryFile() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys;' | |
'sys.stdout.write("apple");' | |
'sys.stdout.flush();' | |
'sys.stderr.write("orange")'], | |
stdout=tf, | |
stderr=tf) | |
p.wait() | |
tf.seek(0) | |
self.assertStderrEqual(tf.read(), "appleorange") | |
def test_stdout_filedes_of_stdout(self): | |
# stdout is set to 1 (#1531862). | |
cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" | |
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) | |
self.assertEqual(rc, 2) | |
def test_cwd(self): | |
tmpdir = tempfile.gettempdir() | |
# We cannot use os.path.realpath to canonicalize the path, | |
# since it doesn't expand Tru64 {memb} strings. See bug 1063571. | |
cwd = os.getcwd() | |
os.chdir(tmpdir) | |
tmpdir = os.getcwd() | |
os.chdir(cwd) | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' | |
'sys.stdout.write(os.getcwd())'], | |
stdout=subprocess.PIPE, | |
cwd=tmpdir) | |
self.addCleanup(p.stdout.close) | |
normcase = os.path.normcase | |
self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) | |
def test_env(self): | |
newenv = os.environ.copy() | |
newenv["FRUIT"] = "orange" | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' | |
'sys.stdout.write(os.getenv("FRUIT"))'], | |
stdout=subprocess.PIPE, | |
env=newenv) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual(p.stdout.read(), "orange") | |
def test_communicate_stdin(self): | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys;' | |
'sys.exit(sys.stdin.read() == "pear")'], | |
stdin=subprocess.PIPE) | |
p.communicate("pear") | |
self.assertEqual(p.returncode, 1) | |
def test_communicate_stdout(self): | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stdout.write("pineapple")'], | |
stdout=subprocess.PIPE) | |
(stdout, stderr) = p.communicate() | |
self.assertEqual(stdout, "pineapple") | |
self.assertEqual(stderr, None) | |
def test_communicate_stderr(self): | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys; sys.stderr.write("pineapple")'], | |
stderr=subprocess.PIPE) | |
(stdout, stderr) = p.communicate() | |
self.assertEqual(stdout, None) | |
self.assertStderrEqual(stderr, "pineapple") | |
def test_communicate(self): | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' | |
'sys.stderr.write("pineapple");' | |
'sys.stdout.write(sys.stdin.read())'], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
self.addCleanup(p.stdin.close) | |
(stdout, stderr) = p.communicate("banana") | |
self.assertEqual(stdout, "banana") | |
self.assertStderrEqual(stderr, "pineapple") | |
# This test is Linux specific for simplicity to at least have | |
# some coverage. It is not a platform specific bug. | |
@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), | |
"Linux specific") | |
# Test for the fd leak reported in http://bugs.python.org/issue2791. | |
def test_communicate_pipe_fd_leak(self): | |
fd_directory = '/proc/%d/fd' % os.getpid() | |
num_fds_before_popen = len(os.listdir(fd_directory)) | |
p = subprocess.Popen([sys.executable, "-c", "print()"], | |
stdout=subprocess.PIPE) | |
p.communicate() | |
num_fds_after_communicate = len(os.listdir(fd_directory)) | |
del p | |
num_fds_after_destruction = len(os.listdir(fd_directory)) | |
self.assertEqual(num_fds_before_popen, num_fds_after_destruction) | |
self.assertEqual(num_fds_before_popen, num_fds_after_communicate) | |
def test_communicate_returns(self): | |
# communicate() should return None if no redirection is active | |
p = subprocess.Popen([sys.executable, "-c", | |
"import sys; sys.exit(47)"]) | |
(stdout, stderr) = p.communicate() | |
self.assertEqual(stdout, None) | |
self.assertEqual(stderr, None) | |
def test_communicate_pipe_buf(self): | |
# communicate() with writes larger than pipe_buf | |
# This test will probably deadlock rather than fail, if | |
# communicate() does not work properly. | |
x, y = os.pipe() | |
if mswindows: | |
pipe_buf = 512 | |
else: | |
pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") | |
os.close(x) | |
os.close(y) | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' | |
'sys.stdout.write(sys.stdin.read(47));' | |
'sys.stderr.write("xyz"*%d);' | |
'sys.stdout.write(sys.stdin.read())' % pipe_buf], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
self.addCleanup(p.stdin.close) | |
string_to_write = "abc"*pipe_buf | |
(stdout, stderr) = p.communicate(string_to_write) | |
self.assertEqual(stdout, string_to_write) | |
def test_writes_before_communicate(self): | |
# stdin.write before communicate() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' | |
'sys.stdout.write(sys.stdin.read())'], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
self.addCleanup(p.stdin.close) | |
p.stdin.write("banana") | |
(stdout, stderr) = p.communicate("split") | |
self.assertEqual(stdout, "bananasplit") | |
self.assertStderrEqual(stderr, "") | |
def test_universal_newlines(self): | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' + SETBINARY + | |
'sys.stdout.write("line1\\n");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("line2\\r");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("line3\\r\\n");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("line4\\r");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("\\nline5");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("\\nline6");'], | |
stdout=subprocess.PIPE, | |
universal_newlines=1) | |
self.addCleanup(p.stdout.close) | |
stdout = p.stdout.read() | |
if hasattr(file, 'newlines'): | |
# Interpreter with universal newline support | |
self.assertEqual(stdout, | |
"line1\nline2\nline3\nline4\nline5\nline6") | |
else: | |
# Interpreter without universal newline support | |
self.assertEqual(stdout, | |
"line1\nline2\rline3\r\nline4\r\nline5\nline6") | |
def test_universal_newlines_communicate(self): | |
# universal newlines through communicate() | |
p = subprocess.Popen([sys.executable, "-c", | |
'import sys,os;' + SETBINARY + | |
'sys.stdout.write("line1\\n");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("line2\\r");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("line3\\r\\n");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("line4\\r");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("\\nline5");' | |
'sys.stdout.flush();' | |
'sys.stdout.write("\\nline6");'], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
universal_newlines=1) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
(stdout, stderr) = p.communicate() | |
if hasattr(file, 'newlines'): | |
# Interpreter with universal newline support | |
self.assertEqual(stdout, | |
"line1\nline2\nline3\nline4\nline5\nline6") | |
else: | |
# Interpreter without universal newline support | |
self.assertEqual(stdout, | |
"line1\nline2\rline3\r\nline4\r\nline5\nline6") | |
def test_no_leaking(self): | |
# Make sure we leak no resources | |
if not mswindows: | |
max_handles = 1026 # too much for most UNIX systems | |
else: | |
max_handles = 2050 # too much for (at least some) Windows setups | |
handles = [] | |
try: | |
for i in range(max_handles): | |
try: | |
handles.append(os.open(test_support.TESTFN, | |
os.O_WRONLY | os.O_CREAT)) | |
except OSError as e: | |
if e.errno != errno.EMFILE: | |
raise | |
break | |
else: | |
self.skipTest("failed to reach the file descriptor limit " | |
"(tried %d)" % max_handles) | |
# Close a couple of them (should be enough for a subprocess) | |
for i in range(10): | |
os.close(handles.pop()) | |
# Loop creating some subprocesses. If one of them leaks some fds, | |
# the next loop iteration will fail by reaching the max fd limit. | |
for i in range(15): | |
p = subprocess.Popen([sys.executable, "-c", | |
"import sys;" | |
"sys.stdout.write(sys.stdin.read())"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
data = p.communicate(b"lime")[0] | |
self.assertEqual(data, b"lime") | |
finally: | |
for h in handles: | |
os.close(h) | |
def test_list2cmdline(self): | |
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), | |
'"a b c" d e') | |
self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), | |
'ab\\"c \\ d') | |
self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), | |
'ab\\"c " \\\\" d') | |
self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), | |
'a\\\\\\b "de fg" h') | |
self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), | |
'a\\\\\\"b c d') | |
self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), | |
'"a\\\\b c" d e') | |
self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), | |
'"a\\\\b\\ c" d e') | |
self.assertEqual(subprocess.list2cmdline(['ab', '']), | |
'ab ""') | |
def test_poll(self): | |
p = subprocess.Popen([sys.executable, | |
"-c", "import time; time.sleep(1)"]) | |
count = 0 | |
while p.poll() is None: | |
time.sleep(0.1) | |
count += 1 | |
# We expect that the poll loop probably went around about 10 times, | |
# but, based on system scheduling we can't control, it's possible | |
# poll() never returned None. It "should be" very rare that it | |
# didn't go around at least twice. | |
self.assertGreaterEqual(count, 2) | |
# Subsequent invocations should just return the returncode | |
self.assertEqual(p.poll(), 0) | |
def test_wait(self): | |
p = subprocess.Popen([sys.executable, | |
"-c", "import time; time.sleep(2)"]) | |
self.assertEqual(p.wait(), 0) | |
# Subsequent invocations should just return the returncode | |
self.assertEqual(p.wait(), 0) | |
def test_invalid_bufsize(self): | |
# an invalid type of the bufsize argument should raise | |
# TypeError. | |
with self.assertRaises(TypeError): | |
subprocess.Popen([sys.executable, "-c", "pass"], "orange") | |
def test_leaking_fds_on_error(self): | |
# see bug #5179: Popen leaks file descriptors to PIPEs if | |
# the child fails to execute; this will eventually exhaust | |
# the maximum number of open fds. 1024 seems a very common | |
# value for that limit, but Windows has 2048, so we loop | |
# 1024 times (each call leaked two fds). | |
for i in range(1024): | |
# Windows raises IOError. Others raise OSError. | |
with self.assertRaises(EnvironmentError) as c: | |
subprocess.Popen(['nonexisting_i_hope'], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
# ignore errors that indicate the command was not found | |
if c.exception.errno not in (errno.ENOENT, errno.EACCES): | |
raise c.exception | |
def test_handles_closed_on_exception(self): | |
# If CreateProcess exits with an error, ensure the | |
# duplicate output handles are released | |
ifhandle, ifname = mkstemp() | |
ofhandle, ofname = mkstemp() | |
efhandle, efname = mkstemp() | |
try: | |
subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, | |
stderr=efhandle) | |
except OSError: | |
os.close(ifhandle) | |
os.remove(ifname) | |
os.close(ofhandle) | |
os.remove(ofname) | |
os.close(efhandle) | |
os.remove(efname) | |
self.assertFalse(os.path.exists(ifname)) | |
self.assertFalse(os.path.exists(ofname)) | |
self.assertFalse(os.path.exists(efname)) | |
def test_communicate_epipe(self): | |
# Issue 10963: communicate() should hide EPIPE | |
p = subprocess.Popen([sys.executable, "-c", 'pass'], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
self.addCleanup(p.stdin.close) | |
p.communicate("x" * 2**20) | |
def test_communicate_epipe_only_stdin(self): | |
# Issue 10963: communicate() should hide EPIPE | |
p = subprocess.Popen([sys.executable, "-c", 'pass'], | |
stdin=subprocess.PIPE) | |
self.addCleanup(p.stdin.close) | |
time.sleep(2) | |
p.communicate("x" * 2**20) | |
# context manager | |
class _SuppressCoreFiles(object): | |
"""Try to prevent core files from being created.""" | |
old_limit = None | |
def __enter__(self): | |
"""Try to save previous ulimit, then set it to (0, 0).""" | |
try: | |
import resource | |
self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) | |
resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) | |
except (ImportError, ValueError, resource.error): | |
pass | |
if sys.platform == 'darwin': | |
# Check if the 'Crash Reporter' on OSX was configured | |
# in 'Developer' mode and warn that it will get triggered | |
# when it is. | |
# | |
# This assumes that this context manager is used in tests | |
# that might trigger the next manager. | |
value = subprocess.Popen(['/usr/bin/defaults', 'read', | |
'com.apple.CrashReporter', 'DialogType'], | |
stdout=subprocess.PIPE).communicate()[0] | |
if value.strip() == b'developer': | |
print "this tests triggers the Crash Reporter, that is intentional" | |
sys.stdout.flush() | |
def __exit__(self, *args): | |
"""Return core file behavior to default.""" | |
if self.old_limit is None: | |
return | |
try: | |
import resource | |
resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) | |
except (ImportError, ValueError, resource.error): | |
pass | |
@unittest.skipIf(mswindows, "POSIX specific tests") | |
class POSIXProcessTestCase(BaseTestCase): | |
def test_exceptions(self): | |
# caught & re-raised exceptions | |
with self.assertRaises(OSError) as c: | |
p = subprocess.Popen([sys.executable, "-c", ""], | |
cwd="/this/path/does/not/exist") | |
# The attribute child_traceback should contain "os.chdir" somewhere. | |
self.assertIn("os.chdir", c.exception.child_traceback) | |
def test_run_abort(self): | |
# returncode handles signal termination | |
with _SuppressCoreFiles(): | |
p = subprocess.Popen([sys.executable, "-c", | |
"import os; os.abort()"]) | |
p.wait() | |
self.assertEqual(-p.returncode, signal.SIGABRT) | |
def test_preexec(self): | |
# preexec function | |
p = subprocess.Popen([sys.executable, "-c", | |
"import sys, os;" | |
"sys.stdout.write(os.getenv('FRUIT'))"], | |
stdout=subprocess.PIPE, | |
preexec_fn=lambda: os.putenv("FRUIT", "apple")) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual(p.stdout.read(), "apple") | |
def test_args_string(self): | |
# args is a string | |
f, fname = mkstemp() | |
os.write(f, "#!/bin/sh\n") | |
os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % | |
sys.executable) | |
os.close(f) | |
os.chmod(fname, 0o700) | |
p = subprocess.Popen(fname) | |
p.wait() | |
os.remove(fname) | |
self.assertEqual(p.returncode, 47) | |
def test_invalid_args(self): | |
# invalid arguments should raise ValueError | |
self.assertRaises(ValueError, subprocess.call, | |
[sys.executable, "-c", | |
"import sys; sys.exit(47)"], | |
startupinfo=47) | |
self.assertRaises(ValueError, subprocess.call, | |
[sys.executable, "-c", | |
"import sys; sys.exit(47)"], | |
creationflags=47) | |
def test_shell_sequence(self): | |
# Run command through the shell (sequence) | |
newenv = os.environ.copy() | |
newenv["FRUIT"] = "apple" | |
p = subprocess.Popen(["echo $FRUIT"], shell=1, | |
stdout=subprocess.PIPE, | |
env=newenv) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual(p.stdout.read().strip(), "apple") | |
def test_shell_string(self): | |
# Run command through the shell (string) | |
newenv = os.environ.copy() | |
newenv["FRUIT"] = "apple" | |
p = subprocess.Popen("echo $FRUIT", shell=1, | |
stdout=subprocess.PIPE, | |
env=newenv) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual(p.stdout.read().strip(), "apple") | |
def test_call_string(self): | |
# call() function with string argument on UNIX | |
f, fname = mkstemp() | |
os.write(f, "#!/bin/sh\n") | |
os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % | |
sys.executable) | |
os.close(f) | |
os.chmod(fname, 0700) | |
rc = subprocess.call(fname) | |
os.remove(fname) | |
self.assertEqual(rc, 47) | |
def test_specific_shell(self): | |
# Issue #9265: Incorrect name passed as arg[0]. | |
shells = [] | |
for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: | |
for name in ['bash', 'ksh']: | |
sh = os.path.join(prefix, name) | |
if os.path.isfile(sh): | |
shells.append(sh) | |
if not shells: # Will probably work for any shell but csh. | |
self.skipTest("bash or ksh required for this test") | |
sh = '/bin/sh' | |
if os.path.isfile(sh) and not os.path.islink(sh): | |
# Test will fail if /bin/sh is a symlink to csh. | |
shells.append(sh) | |
for sh in shells: | |
p = subprocess.Popen("echo $0", executable=sh, shell=True, | |
stdout=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual(p.stdout.read().strip(), sh) | |
def _kill_process(self, method, *args): | |
# Do not inherit file handles from the parent. | |
# It should fix failures on some platforms. | |
p = subprocess.Popen([sys.executable, "-c", """if 1: | |
import sys, time | |
sys.stdout.write('x\\n') | |
sys.stdout.flush() | |
time.sleep(30) | |
"""], | |
close_fds=True, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
# Wait for the interpreter to be completely initialized before | |
# sending any signal. | |
p.stdout.read(1) | |
getattr(p, method)(*args) | |
return p | |
def test_send_signal(self): | |
p = self._kill_process('send_signal', signal.SIGINT) | |
_, stderr = p.communicate() | |
self.assertIn('KeyboardInterrupt', stderr) | |
self.assertNotEqual(p.wait(), 0) | |
def test_kill(self): | |
p = self._kill_process('kill') | |
_, stderr = p.communicate() | |
self.assertStderrEqual(stderr, '') | |
self.assertEqual(p.wait(), -signal.SIGKILL) | |
def test_terminate(self): | |
p = self._kill_process('terminate') | |
_, stderr = p.communicate() | |
self.assertStderrEqual(stderr, '') | |
self.assertEqual(p.wait(), -signal.SIGTERM) | |
def check_close_std_fds(self, fds): | |
# Issue #9905: test that subprocess pipes still work properly with | |
# some standard fds closed | |
stdin = 0 | |
newfds = [] | |
for a in fds: | |
b = os.dup(a) | |
newfds.append(b) | |
if a == 0: | |
stdin = b | |
try: | |
for fd in fds: | |
os.close(fd) | |
out, err = subprocess.Popen([sys.executable, "-c", | |
'import sys;' | |
'sys.stdout.write("apple");' | |
'sys.stdout.flush();' | |
'sys.stderr.write("orange")'], | |
stdin=stdin, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE).communicate() | |
err = test_support.strip_python_stderr(err) | |
self.assertEqual((out, err), (b'apple', b'orange')) | |
finally: | |
for b, a in zip(newfds, fds): | |
os.dup2(b, a) | |
for b in newfds: | |
os.close(b) | |
def test_close_fd_0(self): | |
self.check_close_std_fds([0]) | |
def test_close_fd_1(self): | |
self.check_close_std_fds([1]) | |
def test_close_fd_2(self): | |
self.check_close_std_fds([2]) | |
def test_close_fds_0_1(self): | |
self.check_close_std_fds([0, 1]) | |
def test_close_fds_0_2(self): | |
self.check_close_std_fds([0, 2]) | |
def test_close_fds_1_2(self): | |
self.check_close_std_fds([1, 2]) | |
def test_close_fds_0_1_2(self): | |
# Issue #10806: test that subprocess pipes still work properly with | |
# all standard fds closed. | |
self.check_close_std_fds([0, 1, 2]) | |
def test_wait_when_sigchild_ignored(self): | |
# NOTE: sigchild_ignore.py may not be an effective test on all OSes. | |
sigchild_ignore = test_support.findfile("sigchild_ignore.py", | |
subdir="subprocessdata") | |
p = subprocess.Popen([sys.executable, sigchild_ignore], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
stdout, stderr = p.communicate() | |
self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" | |
" non-zero with this error:\n%s" % stderr) | |
@unittest.skipUnless(mswindows, "Windows specific tests") | |
class Win32ProcessTestCase(BaseTestCase): | |
def test_startupinfo(self): | |
# startupinfo argument | |
# We uses hardcoded constants, because we do not want to | |
# depend on win32all. | |
STARTF_USESHOWWINDOW = 1 | |
SW_MAXIMIZE = 3 | |
startupinfo = subprocess.STARTUPINFO() | |
startupinfo.dwFlags = STARTF_USESHOWWINDOW | |
startupinfo.wShowWindow = SW_MAXIMIZE | |
# Since Python is a console process, it won't be affected | |
# by wShowWindow, but the argument should be silently | |
# ignored | |
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], | |
startupinfo=startupinfo) | |
def test_creationflags(self): | |
# creationflags argument | |
CREATE_NEW_CONSOLE = 16 | |
sys.stderr.write(" a DOS box should flash briefly ...\n") | |
subprocess.call(sys.executable + | |
' -c "import time; time.sleep(0.25)"', | |
creationflags=CREATE_NEW_CONSOLE) | |
def test_invalid_args(self): | |
# invalid arguments should raise ValueError | |
self.assertRaises(ValueError, subprocess.call, | |
[sys.executable, "-c", | |
"import sys; sys.exit(47)"], | |
preexec_fn=lambda: 1) | |
self.assertRaises(ValueError, subprocess.call, | |
[sys.executable, "-c", | |
"import sys; sys.exit(47)"], | |
stdout=subprocess.PIPE, | |
close_fds=True) | |
def test_close_fds(self): | |
# close file descriptors | |
rc = subprocess.call([sys.executable, "-c", | |
"import sys; sys.exit(47)"], | |
close_fds=True) | |
self.assertEqual(rc, 47) | |
def test_shell_sequence(self): | |
# Run command through the shell (sequence) | |
newenv = os.environ.copy() | |
newenv["FRUIT"] = "physalis" | |
p = subprocess.Popen(["set"], shell=1, | |
stdout=subprocess.PIPE, | |
env=newenv) | |
self.addCleanup(p.stdout.close) | |
self.assertIn("physalis", p.stdout.read()) | |
def test_shell_string(self): | |
# Run command through the shell (string) | |
newenv = os.environ.copy() | |
newenv["FRUIT"] = "physalis" | |
p = subprocess.Popen("set", shell=1, | |
stdout=subprocess.PIPE, | |
env=newenv) | |
self.addCleanup(p.stdout.close) | |
self.assertIn("physalis", p.stdout.read()) | |
def test_call_string(self): | |
# call() function with string argument on Windows | |
rc = subprocess.call(sys.executable + | |
' -c "import sys; sys.exit(47)"') | |
self.assertEqual(rc, 47) | |
def _kill_process(self, method, *args): | |
# Some win32 buildbot raises EOFError if stdin is inherited | |
p = subprocess.Popen([sys.executable, "-c", """if 1: | |
import sys, time | |
sys.stdout.write('x\\n') | |
sys.stdout.flush() | |
time.sleep(30) | |
"""], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
self.addCleanup(p.stdout.close) | |
self.addCleanup(p.stderr.close) | |
self.addCleanup(p.stdin.close) | |
# Wait for the interpreter to be completely initialized before | |
# sending any signal. | |
p.stdout.read(1) | |
getattr(p, method)(*args) | |
_, stderr = p.communicate() | |
self.assertStderrEqual(stderr, '') | |
returncode = p.wait() | |
self.assertNotEqual(returncode, 0) | |
def test_send_signal(self): | |
self._kill_process('send_signal', signal.SIGTERM) | |
def test_kill(self): | |
self._kill_process('kill') | |
def test_terminate(self): | |
self._kill_process('terminate') | |
@unittest.skipUnless(getattr(subprocess, '_has_poll', False), | |
"poll system call not supported") | |
class ProcessTestCaseNoPoll(ProcessTestCase): | |
def setUp(self): | |
subprocess._has_poll = False | |
ProcessTestCase.setUp(self) | |
def tearDown(self): | |
subprocess._has_poll = True | |
ProcessTestCase.tearDown(self) | |
class HelperFunctionTests(unittest.TestCase): | |
@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") | |
def test_eintr_retry_call(self): | |
record_calls = [] | |
def fake_os_func(*args): | |
record_calls.append(args) | |
if len(record_calls) == 2: | |
raise OSError(errno.EINTR, "fake interrupted system call") | |
return tuple(reversed(args)) | |
self.assertEqual((999, 256), | |
subprocess._eintr_retry_call(fake_os_func, 256, 999)) | |
self.assertEqual([(256, 999)], record_calls) | |
# This time there will be an EINTR so it will loop once. | |
self.assertEqual((666,), | |
subprocess._eintr_retry_call(fake_os_func, 666)) | |
self.assertEqual([(256, 999), (666,), (666,)], record_calls) | |
@unittest.skipUnless(mswindows, "mswindows only") | |
class CommandsWithSpaces (BaseTestCase): | |
def setUp(self): | |
super(CommandsWithSpaces, self).setUp() | |
f, fname = mkstemp(".py", "te st") | |
self.fname = fname.lower () | |
os.write(f, b"import sys;" | |
b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" | |
) | |
os.close(f) | |
def tearDown(self): | |
os.remove(self.fname) | |
super(CommandsWithSpaces, self).tearDown() | |
def with_spaces(self, *args, **kwargs): | |
kwargs['stdout'] = subprocess.PIPE | |
p = subprocess.Popen(*args, **kwargs) | |
self.addCleanup(p.stdout.close) | |
self.assertEqual( | |
p.stdout.read ().decode("mbcs"), | |
"2 [%r, 'ab cd']" % self.fname | |
) | |
def test_shell_string_with_spaces(self): | |
# call() function with string argument with spaces on Windows | |
self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, | |
"ab cd"), shell=1) | |
def test_shell_sequence_with_spaces(self): | |
# call() function with sequence argument with spaces on Windows | |
self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) | |
def test_noshell_string_with_spaces(self): | |
# call() function with string argument with spaces on Windows | |
self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, | |
"ab cd")) | |
def test_noshell_sequence_with_spaces(self): | |
# call() function with sequence argument with spaces on Windows | |
self.with_spaces([sys.executable, self.fname, "ab cd"]) | |
def test_main(): | |
unit_tests = (ProcessTestCase, | |
POSIXProcessTestCase, | |
Win32ProcessTestCase, | |
ProcessTestCaseNoPoll, | |
HelperFunctionTests, | |
CommandsWithSpaces) | |
test_support.run_unittest(*unit_tests) | |
test_support.reap_children() | |
if __name__ == "__main__": | |
test_main() |