blob: e5a62f451d7efd90fd047e5cffd5035a18df6e89 [file] [log] [blame]
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Application that runs a CGI script.
"""
import os
import sys
import subprocess
from six.moves.urllib.parse import quote
try:
import select
except ImportError:
select = None
import six
from paste.util import converters
__all__ = ['CGIError', 'CGIApplication']
class CGIError(Exception):
"""
Raised when the CGI script can't be found or doesn't
act like a proper CGI script.
"""
class CGIApplication(object):
"""
This object acts as a proxy to a CGI application. You pass in the
script path (``script``), an optional path to search for the
script (if the name isn't absolute) (``path``). If you don't give
a path, then ``$PATH`` will be used.
"""
def __init__(self,
global_conf,
script,
path=None,
include_os_environ=True,
query_string=None):
if global_conf:
raise NotImplemented(
"global_conf is no longer supported for CGIApplication "
"(use make_cgi_application); please pass None instead")
self.script_filename = script
if path is None:
path = os.environ.get('PATH', '').split(':')
self.path = path
if '?' in script:
assert query_string is None, (
"You cannot have '?' in your script name (%r) and also "
"give a query_string (%r)" % (script, query_string))
script, query_string = script.split('?', 1)
if os.path.abspath(script) != script:
# relative path
for path_dir in self.path:
if os.path.exists(os.path.join(path_dir, script)):
self.script = os.path.join(path_dir, script)
break
else:
raise CGIError(
"Script %r not found in path %r"
% (script, self.path))
else:
self.script = script
self.include_os_environ = include_os_environ
self.query_string = query_string
def __call__(self, environ, start_response):
if 'REQUEST_URI' not in environ:
environ['REQUEST_URI'] = (
quote(environ.get('SCRIPT_NAME', ''))
+ quote(environ.get('PATH_INFO', '')))
if self.include_os_environ:
cgi_environ = os.environ.copy()
else:
cgi_environ = {}
for name in environ:
# Should unicode values be encoded?
if (name.upper() == name
and isinstance(environ[name], str)):
cgi_environ[name] = environ[name]
if self.query_string is not None:
old = cgi_environ.get('QUERY_STRING', '')
if old:
old += '&'
cgi_environ['QUERY_STRING'] = old + self.query_string
cgi_environ['SCRIPT_FILENAME'] = self.script
proc = subprocess.Popen(
[self.script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=cgi_environ,
cwd=os.path.dirname(self.script),
)
writer = CGIWriter(environ, start_response)
if select and sys.platform != 'win32':
proc_communicate(
proc,
stdin=StdinReader.from_environ(environ),
stdout=writer,
stderr=environ['wsgi.errors'])
else:
stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read())
if stderr:
environ['wsgi.errors'].write(stderr)
writer.write(stdout)
if not writer.headers_finished:
start_response(writer.status, writer.headers)
return []
class CGIWriter(object):
def __init__(self, environ, start_response):
self.environ = environ
self.start_response = start_response
self.status = '200 OK'
self.headers = []
self.headers_finished = False
self.writer = None
self.buffer = b''
def write(self, data):
if self.headers_finished:
self.writer(data)
return
self.buffer += data
while b'\n' in self.buffer:
if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'):
line1, self.buffer = self.buffer.split(b'\r\n', 1)
else:
line1, self.buffer = self.buffer.split(b'\n', 1)
if not line1:
self.headers_finished = True
self.writer = self.start_response(
self.status, self.headers)
self.writer(self.buffer)
del self.buffer
del self.headers
del self.status
break
elif b':' not in line1:
raise CGIError(
"Bad header line: %r" % line1)
else:
name, value = line1.split(b':', 1)
value = value.lstrip()
name = name.strip()
if six.PY3:
name = name.decode('utf8')
value = value.decode('utf8')
if name.lower() == 'status':
if ' ' not in value:
# WSGI requires this space, sometimes CGI scripts don't set it:
value = '%s General' % value
self.status = value
else:
self.headers.append((name, value))
class StdinReader(object):
def __init__(self, stdin, content_length):
self.stdin = stdin
self.content_length = content_length
@classmethod
def from_environ(cls, environ):
length = environ.get('CONTENT_LENGTH')
if length:
length = int(length)
else:
length = 0
return cls(environ['wsgi.input'], length)
def read(self, size=None):
if not self.content_length:
return b''
if size is None:
text = self.stdin.read(self.content_length)
else:
text = self.stdin.read(min(self.content_length, size))
self.content_length -= len(text)
return text
def proc_communicate(proc, stdin=None, stdout=None, stderr=None):
"""
Run the given process, piping input/output/errors to the given
file-like objects (which need not be actual file objects, unlike
the arguments passed to Popen). Wait for process to terminate.
Note: this is taken from the posix version of
subprocess.Popen.communicate, but made more general through the
use of file-like objects.
"""
read_set = []
write_set = []
input_buffer = b''
trans_nl = proc.universal_newlines and hasattr(open, 'newlines')
if proc.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
proc.stdin.flush()
if input:
write_set.append(proc.stdin)
else:
proc.stdin.close()
else:
assert stdin is None
if proc.stdout:
read_set.append(proc.stdout)
else:
assert stdout is None
if proc.stderr:
read_set.append(proc.stderr)
else:
assert stderr is None
while read_set or write_set:
rlist, wlist, xlist = select.select(read_set, write_set, [])
if proc.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
next, input_buffer = input_buffer, b''
next_len = 512-len(next)
if next_len:
next += stdin.read(next_len)
if not next:
proc.stdin.close()
write_set.remove(proc.stdin)
else:
bytes_written = os.write(proc.stdin.fileno(), next)
if bytes_written < len(next):
input_buffer = next[bytes_written:]
if proc.stdout in rlist:
data = os.read(proc.stdout.fileno(), 1024)
if data == b"":
proc.stdout.close()
read_set.remove(proc.stdout)
if trans_nl:
data = proc._translate_newlines(data)
stdout.write(data)
if proc.stderr in rlist:
data = os.read(proc.stderr.fileno(), 1024)
if data == b"":
proc.stderr.close()
read_set.remove(proc.stderr)
if trans_nl:
data = proc._translate_newlines(data)
stderr.write(data)
try:
proc.wait()
except OSError as e:
if e.errno != 10:
raise
def make_cgi_application(global_conf, script, path=None, include_os_environ=None,
query_string=None):
"""
Paste Deploy interface for :class:`CGIApplication`
This object acts as a proxy to a CGI application. You pass in the
script path (``script``), an optional path to search for the
script (if the name isn't absolute) (``path``). If you don't give
a path, then ``$PATH`` will be used.
"""
if path is None:
path = global_conf.get('path') or global_conf.get('PATH')
include_os_environ = converters.asbool(include_os_environ)
return CGIApplication(
None,
script, path=path, include_os_environ=include_os_environ,
query_string=query_string)