blob: 98299e24734a7fa649f56948d077627b5ff5ae4c [file] [log] [blame]
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
A module of many disparate routines.
"""
from __future__ import print_function
# functions which moved to paste.request and paste.response
# Deprecated around 15 Dec 2005
from paste.request import get_cookies, parse_querystring, parse_formvars
from paste.request import construct_url, path_info_split, path_info_pop
from paste.response import HeaderDict, has_header, header_value, remove_header
from paste.response import error_body_response, error_response, error_response_app
from traceback import print_exception
import six
import sys
from six.moves import cStringIO as StringIO
from six.moves.urllib.parse import unquote, urlsplit
import warnings
__all__ = ['add_close', 'add_start_close', 'capture_output', 'catch_errors',
'catch_errors_app', 'chained_app_iters', 'construct_url',
'dump_environ', 'encode_unicode_app_iter', 'error_body_response',
'error_response', 'get_cookies', 'has_header', 'header_value',
'interactive', 'intercept_output', 'path_info_pop',
'path_info_split', 'raw_interactive', 'send_file']
class add_close(object):
"""
An an iterable that iterates over app_iter, then calls
close_func.
"""
def __init__(self, app_iterable, close_func):
self.app_iterable = app_iterable
self.app_iter = iter(app_iterable)
self.close_func = close_func
self._closed = False
def __iter__(self):
return self
def next(self):
return self.app_iter.next()
def close(self):
self._closed = True
if hasattr(self.app_iterable, 'close'):
self.app_iterable.close()
self.close_func()
def __del__(self):
if not self._closed:
# We can't raise an error or anything at this stage
print("Error: app_iter.close() was not called when finishing "
"WSGI request. finalization function %s not called"
% self.close_func, file=sys.stderr)
class add_start_close(object):
"""
An an iterable that iterates over app_iter, calls start_func
before the first item is returned, then calls close_func at the
end.
"""
def __init__(self, app_iterable, start_func, close_func=None):
self.app_iterable = app_iterable
self.app_iter = iter(app_iterable)
self.first = True
self.start_func = start_func
self.close_func = close_func
self._closed = False
def __iter__(self):
return self
def next(self):
if self.first:
self.start_func()
self.first = False
return next(self.app_iter)
__next__ = next
def close(self):
self._closed = True
if hasattr(self.app_iterable, 'close'):
self.app_iterable.close()
if self.close_func is not None:
self.close_func()
def __del__(self):
if not self._closed:
# We can't raise an error or anything at this stage
print("Error: app_iter.close() was not called when finishing "
"WSGI request. finalization function %s not called"
% self.close_func, file=sys.stderr)
class chained_app_iters(object):
"""
Chains several app_iters together, also delegating .close() to each
of them.
"""
def __init__(self, *chained):
self.app_iters = chained
self.chained = [iter(item) for item in chained]
self._closed = False
def __iter__(self):
return self
def next(self):
if len(self.chained) == 1:
return self.chained[0].next()
else:
try:
return self.chained[0].next()
except StopIteration:
self.chained.pop(0)
return self.next()
def close(self):
self._closed = True
got_exc = None
for app_iter in self.app_iters:
try:
if hasattr(app_iter, 'close'):
app_iter.close()
except:
got_exc = sys.exc_info()
if got_exc:
six.reraise(got_exc[0], got_exc[1], got_exc[2])
def __del__(self):
if not self._closed:
# We can't raise an error or anything at this stage
print("Error: app_iter.close() was not called when finishing "
"WSGI request. finalization function %s not called"
% self.close_func, file=sys.stderr)
class encode_unicode_app_iter(object):
"""
Encodes an app_iterable's unicode responses as strings
"""
def __init__(self, app_iterable, encoding=sys.getdefaultencoding(),
errors='strict'):
self.app_iterable = app_iterable
self.app_iter = iter(app_iterable)
self.encoding = encoding
self.errors = errors
def __iter__(self):
return self
def next(self):
content = next(self.app_iter)
if isinstance(content, six.text_type):
content = content.encode(self.encoding, self.errors)
return content
__next__ = next
def close(self):
if hasattr(self.app_iterable, 'close'):
self.app_iterable.close()
def catch_errors(application, environ, start_response, error_callback,
ok_callback=None):
"""
Runs the application, and returns the application iterator (which should be
passed upstream). If an error occurs then error_callback will be called with
exc_info as its sole argument. If no errors occur and ok_callback is given,
then it will be called with no arguments.
"""
try:
app_iter = application(environ, start_response)
except:
error_callback(sys.exc_info())
raise
if type(app_iter) in (list, tuple):
# These won't produce exceptions
if ok_callback:
ok_callback()
return app_iter
else:
return _wrap_app_iter(app_iter, error_callback, ok_callback)
class _wrap_app_iter(object):
def __init__(self, app_iterable, error_callback, ok_callback):
self.app_iterable = app_iterable
self.app_iter = iter(app_iterable)
self.error_callback = error_callback
self.ok_callback = ok_callback
if hasattr(self.app_iterable, 'close'):
self.close = self.app_iterable.close
def __iter__(self):
return self
def next(self):
try:
return self.app_iter.next()
except StopIteration:
if self.ok_callback:
self.ok_callback()
raise
except:
self.error_callback(sys.exc_info())
raise
def catch_errors_app(application, environ, start_response, error_callback_app,
ok_callback=None, catch=Exception):
"""
Like ``catch_errors``, except error_callback_app should be a
callable that will receive *three* arguments -- ``environ``,
``start_response``, and ``exc_info``. It should call
``start_response`` (*with* the exc_info argument!) and return an
iterator.
"""
try:
app_iter = application(environ, start_response)
except catch:
return error_callback_app(environ, start_response, sys.exc_info())
if type(app_iter) in (list, tuple):
# These won't produce exceptions
if ok_callback is not None:
ok_callback()
return app_iter
else:
return _wrap_app_iter_app(
environ, start_response, app_iter,
error_callback_app, ok_callback, catch=catch)
class _wrap_app_iter_app(object):
def __init__(self, environ, start_response, app_iterable,
error_callback_app, ok_callback, catch=Exception):
self.environ = environ
self.start_response = start_response
self.app_iterable = app_iterable
self.app_iter = iter(app_iterable)
self.error_callback_app = error_callback_app
self.ok_callback = ok_callback
self.catch = catch
if hasattr(self.app_iterable, 'close'):
self.close = self.app_iterable.close
def __iter__(self):
return self
def next(self):
try:
return self.app_iter.next()
except StopIteration:
if self.ok_callback:
self.ok_callback()
raise
except self.catch:
if hasattr(self.app_iterable, 'close'):
try:
self.app_iterable.close()
except:
# @@: Print to wsgi.errors?
pass
new_app_iterable = self.error_callback_app(
self.environ, self.start_response, sys.exc_info())
app_iter = iter(new_app_iterable)
if hasattr(new_app_iterable, 'close'):
self.close = new_app_iterable.close
self.next = app_iter.next
return self.next()
def raw_interactive(application, path='', raise_on_wsgi_error=False,
**environ):
"""
Runs the application in a fake environment.
"""
assert "path_info" not in environ, "argument list changed"
if raise_on_wsgi_error:
errors = ErrorRaiser()
else:
errors = six.BytesIO()
basic_environ = {
# mandatory CGI variables
'REQUEST_METHOD': 'GET', # always mandatory
'SCRIPT_NAME': '', # may be empty if app is at the root
'PATH_INFO': '', # may be empty if at root of app
'SERVER_NAME': 'localhost', # always mandatory
'SERVER_PORT': '80', # always mandatory
'SERVER_PROTOCOL': 'HTTP/1.0',
# mandatory wsgi variables
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': six.BytesIO(),
'wsgi.errors': errors,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
}
if path:
(_, _, path_info, query, fragment) = urlsplit(str(path))
path_info = unquote(path_info)
# urlsplit returns unicode so coerce it back to str
path_info, query = str(path_info), str(query)
basic_environ['PATH_INFO'] = path_info
if query:
basic_environ['QUERY_STRING'] = query
for name, value in environ.items():
name = name.replace('__', '.')
basic_environ[name] = value
if ('SERVER_NAME' in basic_environ
and 'HTTP_HOST' not in basic_environ):
basic_environ['HTTP_HOST'] = basic_environ['SERVER_NAME']
istream = basic_environ['wsgi.input']
if isinstance(istream, bytes):
basic_environ['wsgi.input'] = six.BytesIO(istream)
basic_environ['CONTENT_LENGTH'] = len(istream)
data = {}
output = []
headers_set = []
headers_sent = []
def start_response(status, headers, exc_info=None):
if exc_info:
try:
if headers_sent:
# Re-raise original exception only if headers sent
six.reraise(exc_info[0], exc_info[1], exc_info[2])
finally:
# avoid dangling circular reference
exc_info = None
elif headers_set:
# You cannot set the headers more than once, unless the
# exc_info is provided.
raise AssertionError("Headers already set and no exc_info!")
headers_set.append(True)
data['status'] = status
data['headers'] = headers
return output.append
app_iter = application(basic_environ, start_response)
try:
try:
for s in app_iter:
if not isinstance(s, six.binary_type):
raise ValueError(
"The app_iter response can only contain bytes (not "
"unicode); got: %r" % s)
headers_sent.append(True)
if not headers_set:
raise AssertionError("Content sent w/o headers!")
output.append(s)
except TypeError as e:
# Typically "iteration over non-sequence", so we want
# to give better debugging information...
e.args = ((e.args[0] + ' iterable: %r' % app_iter),) + e.args[1:]
raise
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
return (data['status'], data['headers'], b''.join(output),
errors.getvalue())
class ErrorRaiser(object):
def flush(self):
pass
def write(self, value):
if not value:
return
raise AssertionError(
"No errors should be written (got: %r)" % value)
def writelines(self, seq):
raise AssertionError(
"No errors should be written (got lines: %s)" % list(seq))
def getvalue(self):
return ''
def interactive(*args, **kw):
"""
Runs the application interatively, wrapping `raw_interactive` but
returning the output in a formatted way.
"""
status, headers, content, errors = raw_interactive(*args, **kw)
full = StringIO()
if errors:
full.write('Errors:\n')
full.write(errors.strip())
full.write('\n----------end errors\n')
full.write(status + '\n')
for name, value in headers:
full.write('%s: %s\n' % (name, value))
full.write('\n')
full.write(content)
return full.getvalue()
interactive.proxy = 'raw_interactive'
def dump_environ(environ, start_response):
"""
Application which simply dumps the current environment
variables out as a plain text response.
"""
output = []
keys = list(environ.keys())
keys.sort()
for k in keys:
v = str(environ[k]).replace("\n","\n ")
output.append("%s: %s\n" % (k, v))
output.append("\n")
content_length = environ.get("CONTENT_LENGTH", '')
if content_length:
output.append(environ['wsgi.input'].read(int(content_length)))
output.append("\n")
output = "".join(output)
if six.PY3:
output = output.encode('utf8')
headers = [('Content-Type', 'text/plain'),
('Content-Length', str(len(output)))]
start_response("200 OK", headers)
return [output]
def send_file(filename):
warnings.warn(
"wsgilib.send_file has been moved to paste.fileapp.FileApp",
DeprecationWarning, 2)
from paste import fileapp
return fileapp.FileApp(filename)
def capture_output(environ, start_response, application):
"""
Runs application with environ and start_response, and captures
status, headers, and body.
Sends status and header, but *not* body. Returns (status,
headers, body). Typically this is used like:
.. code-block:: python
def dehtmlifying_middleware(application):
def replacement_app(environ, start_response):
status, headers, body = capture_output(
environ, start_response, application)
content_type = header_value(headers, 'content-type')
if (not content_type
or not content_type.startswith('text/html')):
return [body]
body = re.sub(r'<.*?>', '', body)
return [body]
return replacement_app
"""
warnings.warn(
'wsgilib.capture_output has been deprecated in favor '
'of wsgilib.intercept_output',
DeprecationWarning, 2)
data = []
output = StringIO()
def replacement_start_response(status, headers, exc_info=None):
if data:
data[:] = []
data.append(status)
data.append(headers)
start_response(status, headers, exc_info)
return output.write
app_iter = application(environ, replacement_start_response)
try:
for item in app_iter:
output.write(item)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
if not data:
data.append(None)
if len(data) < 2:
data.append(None)
data.append(output.getvalue())
return data
def intercept_output(environ, application, conditional=None,
start_response=None):
"""
Runs application with environ and captures status, headers, and
body. None are sent on; you must send them on yourself (unlike
``capture_output``)
Typically this is used like:
.. code-block:: python
def dehtmlifying_middleware(application):
def replacement_app(environ, start_response):
status, headers, body = intercept_output(
environ, application)
start_response(status, headers)
content_type = header_value(headers, 'content-type')
if (not content_type
or not content_type.startswith('text/html')):
return [body]
body = re.sub(r'<.*?>', '', body)
return [body]
return replacement_app
A third optional argument ``conditional`` should be a function
that takes ``conditional(status, headers)`` and returns False if
the request should not be intercepted. In that case
``start_response`` will be called and ``(None, None, app_iter)``
will be returned. You must detect that in your code and return
the app_iter, like:
.. code-block:: python
def dehtmlifying_middleware(application):
def replacement_app(environ, start_response):
status, headers, body = intercept_output(
environ, application,
lambda s, h: header_value(headers, 'content-type').startswith('text/html'),
start_response)
if status is None:
return body
start_response(status, headers)
body = re.sub(r'<.*?>', '', body)
return [body]
return replacement_app
"""
if conditional is not None and start_response is None:
raise TypeError(
"If you provide conditional you must also provide "
"start_response")
data = []
output = StringIO()
def replacement_start_response(status, headers, exc_info=None):
if conditional is not None and not conditional(status, headers):
data.append(None)
return start_response(status, headers, exc_info)
if data:
data[:] = []
data.append(status)
data.append(headers)
return output.write
app_iter = application(environ, replacement_start_response)
if data[0] is None:
return (None, None, app_iter)
try:
for item in app_iter:
output.write(item)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
if not data:
data.append(None)
if len(data) < 2:
data.append(None)
data.append(output.getvalue())
return data
## Deprecation warning wrapper:
class ResponseHeaderDict(HeaderDict):
def __init__(self, *args, **kw):
warnings.warn(
"The class wsgilib.ResponseHeaderDict has been moved "
"to paste.response.HeaderDict",
DeprecationWarning, 2)
HeaderDict.__init__(self, *args, **kw)
def _warn_deprecated(new_func):
new_name = new_func.func_name
new_path = new_func.func_globals['__name__'] + '.' + new_name
def replacement(*args, **kw):
warnings.warn(
"The function wsgilib.%s has been moved to %s"
% (new_name, new_path),
DeprecationWarning, 2)
return new_func(*args, **kw)
try:
replacement.func_name = new_func.func_name
except:
pass
return replacement
# Put warnings wrapper in place for all public functions that
# were imported from elsewhere:
for _name in __all__:
_func = globals()[_name]
if (hasattr(_func, 'func_globals')
and _func.func_globals['__name__'] != __name__):
globals()[_name] = _warn_deprecated(_func)
if __name__ == '__main__':
import doctest
doctest.testmod()