| # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) |
| # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php |
| |
| """ |
| Application that runs a CGI script. |
| """ |
| import os |
| import sys |
| import subprocess |
| from six.moves.urllib.parse import quote |
| try: |
| import select |
| except ImportError: |
| select = None |
| import six |
| |
| from paste.util import converters |
| |
| __all__ = ['CGIError', 'CGIApplication'] |
| |
| class CGIError(Exception): |
| """ |
| Raised when the CGI script can't be found or doesn't |
| act like a proper CGI script. |
| """ |
| |
| class CGIApplication(object): |
| |
| """ |
| This object acts as a proxy to a CGI application. You pass in the |
| script path (``script``), an optional path to search for the |
| script (if the name isn't absolute) (``path``). If you don't give |
| a path, then ``$PATH`` will be used. |
| """ |
| |
| def __init__(self, |
| global_conf, |
| script, |
| path=None, |
| include_os_environ=True, |
| query_string=None): |
| if global_conf: |
| raise NotImplemented( |
| "global_conf is no longer supported for CGIApplication " |
| "(use make_cgi_application); please pass None instead") |
| self.script_filename = script |
| if path is None: |
| path = os.environ.get('PATH', '').split(':') |
| self.path = path |
| if '?' in script: |
| assert query_string is None, ( |
| "You cannot have '?' in your script name (%r) and also " |
| "give a query_string (%r)" % (script, query_string)) |
| script, query_string = script.split('?', 1) |
| if os.path.abspath(script) != script: |
| # relative path |
| for path_dir in self.path: |
| if os.path.exists(os.path.join(path_dir, script)): |
| self.script = os.path.join(path_dir, script) |
| break |
| else: |
| raise CGIError( |
| "Script %r not found in path %r" |
| % (script, self.path)) |
| else: |
| self.script = script |
| self.include_os_environ = include_os_environ |
| self.query_string = query_string |
| |
| def __call__(self, environ, start_response): |
| if 'REQUEST_URI' not in environ: |
| environ['REQUEST_URI'] = ( |
| quote(environ.get('SCRIPT_NAME', '')) |
| + quote(environ.get('PATH_INFO', ''))) |
| if self.include_os_environ: |
| cgi_environ = os.environ.copy() |
| else: |
| cgi_environ = {} |
| for name in environ: |
| # Should unicode values be encoded? |
| if (name.upper() == name |
| and isinstance(environ[name], str)): |
| cgi_environ[name] = environ[name] |
| if self.query_string is not None: |
| old = cgi_environ.get('QUERY_STRING', '') |
| if old: |
| old += '&' |
| cgi_environ['QUERY_STRING'] = old + self.query_string |
| cgi_environ['SCRIPT_FILENAME'] = self.script |
| proc = subprocess.Popen( |
| [self.script], |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| env=cgi_environ, |
| cwd=os.path.dirname(self.script), |
| ) |
| writer = CGIWriter(environ, start_response) |
| if select and sys.platform != 'win32': |
| proc_communicate( |
| proc, |
| stdin=StdinReader.from_environ(environ), |
| stdout=writer, |
| stderr=environ['wsgi.errors']) |
| else: |
| stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read()) |
| if stderr: |
| environ['wsgi.errors'].write(stderr) |
| writer.write(stdout) |
| if not writer.headers_finished: |
| start_response(writer.status, writer.headers) |
| return [] |
| |
| class CGIWriter(object): |
| |
| def __init__(self, environ, start_response): |
| self.environ = environ |
| self.start_response = start_response |
| self.status = '200 OK' |
| self.headers = [] |
| self.headers_finished = False |
| self.writer = None |
| self.buffer = b'' |
| |
| def write(self, data): |
| if self.headers_finished: |
| self.writer(data) |
| return |
| self.buffer += data |
| while b'\n' in self.buffer: |
| if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'): |
| line1, self.buffer = self.buffer.split(b'\r\n', 1) |
| else: |
| line1, self.buffer = self.buffer.split(b'\n', 1) |
| if not line1: |
| self.headers_finished = True |
| self.writer = self.start_response( |
| self.status, self.headers) |
| self.writer(self.buffer) |
| del self.buffer |
| del self.headers |
| del self.status |
| break |
| elif b':' not in line1: |
| raise CGIError( |
| "Bad header line: %r" % line1) |
| else: |
| name, value = line1.split(b':', 1) |
| value = value.lstrip() |
| name = name.strip() |
| if six.PY3: |
| name = name.decode('utf8') |
| value = value.decode('utf8') |
| if name.lower() == 'status': |
| if ' ' not in value: |
| # WSGI requires this space, sometimes CGI scripts don't set it: |
| value = '%s General' % value |
| self.status = value |
| else: |
| self.headers.append((name, value)) |
| |
| class StdinReader(object): |
| |
| def __init__(self, stdin, content_length): |
| self.stdin = stdin |
| self.content_length = content_length |
| |
| @classmethod |
| def from_environ(cls, environ): |
| length = environ.get('CONTENT_LENGTH') |
| if length: |
| length = int(length) |
| else: |
| length = 0 |
| return cls(environ['wsgi.input'], length) |
| |
| def read(self, size=None): |
| if not self.content_length: |
| return b'' |
| if size is None: |
| text = self.stdin.read(self.content_length) |
| else: |
| text = self.stdin.read(min(self.content_length, size)) |
| self.content_length -= len(text) |
| return text |
| |
| def proc_communicate(proc, stdin=None, stdout=None, stderr=None): |
| """ |
| Run the given process, piping input/output/errors to the given |
| file-like objects (which need not be actual file objects, unlike |
| the arguments passed to Popen). Wait for process to terminate. |
| |
| Note: this is taken from the posix version of |
| subprocess.Popen.communicate, but made more general through the |
| use of file-like objects. |
| """ |
| read_set = [] |
| write_set = [] |
| input_buffer = b'' |
| trans_nl = proc.universal_newlines and hasattr(open, 'newlines') |
| |
| if proc.stdin: |
| # Flush stdio buffer. This might block, if the user has |
| # been writing to .stdin in an uncontrolled fashion. |
| proc.stdin.flush() |
| if input: |
| write_set.append(proc.stdin) |
| else: |
| proc.stdin.close() |
| else: |
| assert stdin is None |
| if proc.stdout: |
| read_set.append(proc.stdout) |
| else: |
| assert stdout is None |
| if proc.stderr: |
| read_set.append(proc.stderr) |
| else: |
| assert stderr is None |
| |
| while read_set or write_set: |
| rlist, wlist, xlist = select.select(read_set, write_set, []) |
| |
| if proc.stdin in wlist: |
| # When select has indicated that the file is writable, |
| # we can write up to PIPE_BUF bytes without risk |
| # blocking. POSIX defines PIPE_BUF >= 512 |
| next, input_buffer = input_buffer, b'' |
| next_len = 512-len(next) |
| if next_len: |
| next += stdin.read(next_len) |
| if not next: |
| proc.stdin.close() |
| write_set.remove(proc.stdin) |
| else: |
| bytes_written = os.write(proc.stdin.fileno(), next) |
| if bytes_written < len(next): |
| input_buffer = next[bytes_written:] |
| |
| if proc.stdout in rlist: |
| data = os.read(proc.stdout.fileno(), 1024) |
| if data == b"": |
| proc.stdout.close() |
| read_set.remove(proc.stdout) |
| if trans_nl: |
| data = proc._translate_newlines(data) |
| stdout.write(data) |
| |
| if proc.stderr in rlist: |
| data = os.read(proc.stderr.fileno(), 1024) |
| if data == b"": |
| proc.stderr.close() |
| read_set.remove(proc.stderr) |
| if trans_nl: |
| data = proc._translate_newlines(data) |
| stderr.write(data) |
| |
| try: |
| proc.wait() |
| except OSError as e: |
| if e.errno != 10: |
| raise |
| |
| def make_cgi_application(global_conf, script, path=None, include_os_environ=None, |
| query_string=None): |
| """ |
| Paste Deploy interface for :class:`CGIApplication` |
| |
| This object acts as a proxy to a CGI application. You pass in the |
| script path (``script``), an optional path to search for the |
| script (if the name isn't absolute) (``path``). If you don't give |
| a path, then ``$PATH`` will be used. |
| """ |
| if path is None: |
| path = global_conf.get('path') or global_conf.get('PATH') |
| include_os_environ = converters.asbool(include_os_environ) |
| return CGIApplication( |
| None, |
| script, path=path, include_os_environ=include_os_environ, |
| query_string=query_string) |