blob: a16493866bbcb02a9bdb33fb16179b208e531ea1 [file] [log] [blame]
from base64 import b64encode
from datetime import (
datetime,
timedelta,
)
from hashlib import md5
import re
import struct
import zlib
try:
import simplejson as json
except ImportError:
import json
from webob.byterange import ContentRange
from webob.cachecontrol import (
CacheControl,
serialize_cache_control,
)
from webob.compat import (
PY3,
bytes_,
native_,
text_type,
url_quote,
urlparse,
)
from webob.cookies import (
Cookie,
make_cookie,
)
from webob.datetime_utils import (
parse_date_delta,
serialize_date_delta,
timedelta_to_seconds,
)
from webob.descriptors import (
CHARSET_RE,
SCHEME_RE,
converter,
date_header,
header_getter,
list_header,
parse_auth,
parse_content_range,
parse_etag_response,
parse_int,
parse_int_safe,
serialize_auth,
serialize_content_range,
serialize_etag_response,
serialize_int,
)
from webob.headers import ResponseHeaders
from webob.request import BaseRequest
from webob.util import status_reasons, status_generic_reasons
__all__ = ['Response']
_PARAM_RE = re.compile(r'([a-z0-9]+)=(?:"([^"]*)"|([a-z0-9_.-]*))', re.I)
_OK_PARAM_RE = re.compile(r'^[a-z0-9_.-]+$', re.I)
_gzip_header = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff'
class Response(object):
"""
Represents a WSGI response
"""
default_content_type = 'text/html'
default_charset = 'UTF-8' # TODO: deprecate
unicode_errors = 'strict' # TODO: deprecate (why would response body have errors?)
default_conditional_response = False
request = None
environ = None
#
# __init__, from_file, copy
#
def __init__(self, body=None, status=None, headerlist=None, app_iter=None,
content_type=None, conditional_response=None,
**kw):
if app_iter is None and body is None and ('json_body' in kw or 'json' in kw):
if 'json_body' in kw:
json_body = kw.pop('json_body')
else:
json_body = kw.pop('json')
body = json.dumps(json_body, separators=(',', ':'))
if content_type is None:
content_type = 'application/json'
if app_iter is None:
if body is None:
body = b''
elif body is not None:
raise TypeError(
"You may only give one of the body and app_iter arguments")
if status is None:
self._status = '200 OK'
else:
self.status = status
if headerlist is None:
self._headerlist = []
else:
self._headerlist = headerlist
self._headers = None
if content_type is None:
content_type = self.default_content_type
charset = None
if 'charset' in kw:
charset = kw.pop('charset')
elif self.default_charset:
if (content_type
and 'charset=' not in content_type
and (content_type == 'text/html'
or content_type.startswith('text/')
or content_type.startswith('application/xml')
or content_type.startswith('application/json')
or (content_type.startswith('application/')
and (content_type.endswith('+xml') or content_type.endswith('+json'))))):
charset = self.default_charset
if content_type and charset:
content_type += '; charset=' + charset
elif self._headerlist and charset:
self.charset = charset
if not self._headerlist and content_type:
self._headerlist.append(('Content-Type', content_type))
if conditional_response is None:
self.conditional_response = self.default_conditional_response
else:
self.conditional_response = bool(conditional_response)
if app_iter is None:
if isinstance(body, text_type):
if charset is None:
raise TypeError(
"You cannot set the body to a text value without a "
"charset")
body = body.encode(charset)
app_iter = [body]
if headerlist is None:
self._headerlist.append(('Content-Length', str(len(body))))
else:
self.headers['Content-Length'] = str(len(body))
self._app_iter = app_iter
for name, value in kw.items():
if not hasattr(self.__class__, name):
# Not a basic attribute
raise TypeError(
"Unexpected keyword: %s=%r" % (name, value))
setattr(self, name, value)
@classmethod
def from_file(cls, fp):
"""Reads a response from a file-like object (it must implement
``.read(size)`` and ``.readline()``).
It will read up to the end of the response, not the end of the
file.
This reads the response as represented by ``str(resp)``; it
may not read every valid HTTP response properly. Responses
must have a ``Content-Length``"""
headerlist = []
status = fp.readline().strip()
is_text = isinstance(status, text_type)
if is_text:
_colon = ':'
else:
_colon = b':'
while 1:
line = fp.readline().strip()
if not line:
# end of headers
break
try:
header_name, value = line.split(_colon, 1)
except ValueError:
raise ValueError('Bad header line: %r' % line)
value = value.strip()
headerlist.append((
native_(header_name, 'latin-1'),
native_(value, 'latin-1')
))
r = cls(
status=status,
headerlist=headerlist,
app_iter=(),
)
body = fp.read(r.content_length or 0)
if is_text:
r.text = body
else:
r.body = body
return r
def copy(self):
"""Makes a copy of the response"""
# we need to do this for app_iter to be reusable
app_iter = list(self._app_iter)
iter_close(self._app_iter)
# and this to make sure app_iter instances are different
self._app_iter = list(app_iter)
return self.__class__(
content_type=False,
status=self._status,
headerlist=self._headerlist[:],
app_iter=app_iter,
conditional_response=self.conditional_response)
#
# __repr__, __str__
#
def __repr__(self):
return '<%s at 0x%x %s>' % (self.__class__.__name__, abs(id(self)),
self.status)
def __str__(self, skip_body=False):
parts = [self.status]
if not skip_body:
# Force enumeration of the body (to set content-length)
self.body
parts += map('%s: %s'.__mod__, self.headerlist)
if not skip_body and self.body:
parts += ['', self.text if PY3 else self.body]
return '\r\n'.join(parts)
#
# status, status_code/status_int
#
def _status__get(self):
"""
The status string
"""
return self._status
def _status__set(self, value):
try:
code = int(value)
except (ValueError, TypeError):
pass
else:
self.status_code = code
return
if PY3: # pragma: no cover
if isinstance(value, bytes):
value = value.decode('ascii')
elif isinstance(value, text_type):
value = value.encode('ascii')
if not isinstance(value, str):
raise TypeError(
"You must set status to a string or integer (not %s)"
% type(value))
# Attempt to get the status code itself, if this fails we should fail
status_code = int(value.split()[0])
self._status = value
status = property(_status__get, _status__set, doc=_status__get.__doc__)
def _status_code__get(self):
"""
The status as an integer
"""
return int(self._status.split()[0])
def _status_code__set(self, code):
try:
self._status = '%d %s' % (code, status_reasons[code])
except KeyError:
self._status = '%d %s' % (code, status_generic_reasons[code // 100])
status_code = status_int = property(_status_code__get, _status_code__set,
doc=_status_code__get.__doc__)
#
# headerslist, headers
#
def _headerlist__get(self):
"""
The list of response headers
"""
return self._headerlist
def _headerlist__set(self, value):
self._headers = None
if not isinstance(value, list):
if hasattr(value, 'items'):
value = value.items()
value = list(value)
self._headerlist = value
def _headerlist__del(self):
self.headerlist = []
headerlist = property(_headerlist__get, _headerlist__set,
_headerlist__del, doc=_headerlist__get.__doc__)
def _headers__get(self):
"""
The headers in a dictionary-like object
"""
if self._headers is None:
self._headers = ResponseHeaders.view_list(self.headerlist)
return self._headers
def _headers__set(self, value):
if hasattr(value, 'items'):
value = value.items()
self.headerlist = value
self._headers = None
headers = property(_headers__get, _headers__set, doc=_headers__get.__doc__)
#
# body
#
def _body__get(self):
"""
The body of the response, as a ``str``. This will read in the
entire app_iter if necessary.
"""
app_iter = self._app_iter
# try:
# if len(app_iter) == 1:
# return app_iter[0]
# except:
# pass
if isinstance(app_iter, list) and len(app_iter) == 1:
return app_iter[0]
if app_iter is None:
raise AttributeError("No body has been set")
try:
body = b''.join(app_iter)
finally:
iter_close(app_iter)
if isinstance(body, text_type):
raise _error_unicode_in_app_iter(app_iter, body)
self._app_iter = [body]
if len(body) == 0:
# if body-length is zero, we assume it's a HEAD response and
# leave content_length alone
pass # pragma: no cover (no idea why necessary, it's hit)
elif self.content_length is None:
self.content_length = len(body)
elif self.content_length != len(body):
raise AssertionError(
"Content-Length is different from actual app_iter length "
"(%r!=%r)"
% (self.content_length, len(body))
)
return body
def _body__set(self, value=b''):
if not isinstance(value, bytes):
if isinstance(value, text_type):
msg = ("You cannot set Response.body to a text object "
"(use Response.text)")
else:
msg = ("You can only set the body to a binary type (not %s)" %
type(value))
raise TypeError(msg)
if self._app_iter is not None:
self.content_md5 = None
self._app_iter = [value]
self.content_length = len(value)
# def _body__del(self):
# self.body = ''
# #self.content_length = None
body = property(_body__get, _body__set, _body__set)
def _json_body__get(self):
"""Access the body of the response as JSON"""
# Note: UTF-8 is a content-type specific default for JSON:
return json.loads(self.body.decode(self.charset or 'UTF-8'))
def _json_body__set(self, value):
self.body = json.dumps(value, separators=(',', ':')).encode(self.charset or 'UTF-8')
def _json_body__del(self):
del self.body
json = json_body = property(_json_body__get, _json_body__set, _json_body__del)
#
# text, unicode_body, ubody
#
def _text__get(self):
"""
Get/set the text value of the body (using the charset of the
Content-Type)
"""
if not self.charset:
raise AttributeError(
"You cannot access Response.text unless charset is set")
body = self.body
return body.decode(self.charset, self.unicode_errors)
def _text__set(self, value):
if not self.charset:
raise AttributeError(
"You cannot access Response.text unless charset is set")
if not isinstance(value, text_type):
raise TypeError(
"You can only set Response.text to a unicode string "
"(not %s)" % type(value))
self.body = value.encode(self.charset)
def _text__del(self):
del self.body
text = property(_text__get, _text__set, _text__del, doc=_text__get.__doc__)
unicode_body = ubody = property(_text__get, _text__set, _text__del,
"Deprecated alias for .text")
#
# body_file, write(text)
#
def _body_file__get(self):
"""
A file-like object that can be used to write to the
body. If you passed in a list app_iter, that app_iter will be
modified by writes.
"""
return ResponseBodyFile(self)
def _body_file__set(self, file):
self.app_iter = iter_file(file)
def _body_file__del(self):
del self.body
body_file = property(_body_file__get, _body_file__set, _body_file__del,
doc=_body_file__get.__doc__)
def write(self, text):
if not isinstance(text, bytes):
if not isinstance(text, text_type):
msg = "You can only write str to a Response.body_file, not %s"
raise TypeError(msg % type(text))
if not self.charset:
msg = ("You can only write text to Response if charset has "
"been set")
raise TypeError(msg)
text = text.encode(self.charset)
app_iter = self._app_iter
if not isinstance(app_iter, list):
try:
new_app_iter = self._app_iter = list(app_iter)
finally:
iter_close(app_iter)
app_iter = new_app_iter
self.content_length = sum(len(chunk) for chunk in app_iter)
app_iter.append(text)
if self.content_length is not None:
self.content_length += len(text)
#
# app_iter
#
def _app_iter__get(self):
"""
Returns the app_iter of the response.
If body was set, this will create an app_iter from that body
(a single-item list)
"""
return self._app_iter
def _app_iter__set(self, value):
if self._app_iter is not None:
# Undo the automatically-set content-length
self.content_length = None
self.content_md5 = None
self._app_iter = value
def _app_iter__del(self):
self._app_iter = []
self.content_length = None
app_iter = property(_app_iter__get, _app_iter__set, _app_iter__del,
doc=_app_iter__get.__doc__)
#
# headers attrs
#
allow = list_header('Allow', '14.7')
# TODO: (maybe) support response.vary += 'something'
# TODO: same thing for all listy headers
vary = list_header('Vary', '14.44')
content_length = converter(
header_getter('Content-Length', '14.17'),
parse_int, serialize_int, 'int')
content_encoding = header_getter('Content-Encoding', '14.11')
content_language = list_header('Content-Language', '14.12')
content_location = header_getter('Content-Location', '14.14')
content_md5 = header_getter('Content-MD5', '14.14')
content_disposition = header_getter('Content-Disposition', '19.5.1')
accept_ranges = header_getter('Accept-Ranges', '14.5')
content_range = converter(
header_getter('Content-Range', '14.16'),
parse_content_range, serialize_content_range, 'ContentRange object')
date = date_header('Date', '14.18')
expires = date_header('Expires', '14.21')
last_modified = date_header('Last-Modified', '14.29')
_etag_raw = header_getter('ETag', '14.19')
etag = converter(_etag_raw,
parse_etag_response, serialize_etag_response,
'Entity tag'
)
@property
def etag_strong(self):
return parse_etag_response(self._etag_raw, strong=True)
location = header_getter('Location', '14.30')
pragma = header_getter('Pragma', '14.32')
age = converter(
header_getter('Age', '14.6'),
parse_int_safe, serialize_int, 'int')
retry_after = converter(
header_getter('Retry-After', '14.37'),
parse_date_delta, serialize_date_delta, 'HTTP date or delta seconds')
server = header_getter('Server', '14.38')
# TODO: the standard allows this to be a list of challenges
www_authenticate = converter(
header_getter('WWW-Authenticate', '14.47'),
parse_auth, serialize_auth,
)
#
# charset
#
def _charset__get(self):
"""
Get/set the charset (in the Content-Type)
"""
header = self.headers.get('Content-Type')
if not header:
return None
match = CHARSET_RE.search(header)
if match:
return match.group(1)
return None
def _charset__set(self, charset):
if charset is None:
del self.charset
return
header = self.headers.pop('Content-Type', None)
if header is None:
raise AttributeError("You cannot set the charset when no "
"content-type is defined")
match = CHARSET_RE.search(header)
if match:
header = header[:match.start()] + header[match.end():]
header += '; charset=%s' % charset
self.headers['Content-Type'] = header
def _charset__del(self):
header = self.headers.pop('Content-Type', None)
if header is None:
# Don't need to remove anything
return
match = CHARSET_RE.search(header)
if match:
header = header[:match.start()] + header[match.end():]
self.headers['Content-Type'] = header
charset = property(_charset__get, _charset__set, _charset__del,
doc=_charset__get.__doc__)
#
# content_type
#
def _content_type__get(self):
"""
Get/set the Content-Type header (or None), *without* the
charset or any parameters.
If you include parameters (or ``;`` at all) when setting the
content_type, any existing parameters will be deleted;
otherwise they will be preserved.
"""
header = self.headers.get('Content-Type')
if not header:
return None
return header.split(';', 1)[0]
def _content_type__set(self, value):
if not value:
self._content_type__del()
return
if ';' not in value:
header = self.headers.get('Content-Type', '')
if ';' in header:
params = header.split(';', 1)[1]
value += ';' + params
self.headers['Content-Type'] = value
def _content_type__del(self):
self.headers.pop('Content-Type', None)
content_type = property(_content_type__get, _content_type__set,
_content_type__del, doc=_content_type__get.__doc__)
#
# content_type_params
#
def _content_type_params__get(self):
"""
A dictionary of all the parameters in the content type.
(This is not a view, set to change, modifications of the dict would not
be applied otherwise)
"""
params = self.headers.get('Content-Type', '')
if ';' not in params:
return {}
params = params.split(';', 1)[1]
result = {}
for match in _PARAM_RE.finditer(params):
result[match.group(1)] = match.group(2) or match.group(3) or ''
return result
def _content_type_params__set(self, value_dict):
if not value_dict:
del self.content_type_params
return
params = []
for k, v in sorted(value_dict.items()):
if not _OK_PARAM_RE.search(v):
v = '"%s"' % v.replace('"', '\\"')
params.append('; %s=%s' % (k, v))
ct = self.headers.pop('Content-Type', '').split(';', 1)[0]
ct += ''.join(params)
self.headers['Content-Type'] = ct
def _content_type_params__del(self):
self.headers['Content-Type'] = self.headers.get(
'Content-Type', '').split(';', 1)[0]
content_type_params = property(
_content_type_params__get,
_content_type_params__set,
_content_type_params__del,
_content_type_params__get.__doc__
)
#
# set_cookie, unset_cookie, delete_cookie, merge_cookies
#
def set_cookie(self, name, value='', max_age=None,
path='/', domain=None, secure=False, httponly=False,
comment=None, expires=None, overwrite=False):
"""
Set (add) a cookie for the response.
Arguments are:
``name``
The cookie name.
``value``
The cookie value, which should be a string or ``None``. If
``value`` is ``None``, it's equivalent to calling the
:meth:`webob.response.Response.unset_cookie` method for this
cookie key (it effectively deletes the cookie on the client).
``max_age``
An integer representing a number of seconds, ``datetime.timedelta``,
or ``None``. This value is used as the ``Max-Age`` of the generated
cookie. If ``expires`` is not passed and this value is not
``None``, the ``max_age`` value will also influence the ``Expires``
value of the cookie (``Expires`` will be set to now + max_age). If
this value is ``None``, the cookie will not have a ``Max-Age`` value
(unless ``expires`` is set). If both ``max_age`` and ``expires`` are
set, this value takes precedence.
``path``
A string representing the cookie ``Path`` value. It defaults to
``/``.
``domain``
A string representing the cookie ``Domain``, or ``None``. If
domain is ``None``, no ``Domain`` value will be sent in the
cookie.
``secure``
A boolean. If it's ``True``, the ``secure`` flag will be sent in
the cookie, if it's ``False``, the ``secure`` flag will not be
sent in the cookie.
``httponly``
A boolean. If it's ``True``, the ``HttpOnly`` flag will be sent
in the cookie, if it's ``False``, the ``HttpOnly`` flag will not
be sent in the cookie.
``comment``
A string representing the cookie ``Comment`` value, or ``None``.
If ``comment`` is ``None``, no ``Comment`` value will be sent in
the cookie.
``expires``
A ``datetime.timedelta`` object representing an amount of time,
``datetime.datetime`` or ``None``. A non-``None`` value is used to
generate the ``Expires`` value of the generated cookie. If
``max_age`` is not passed, but this value is not ``None``, it will
influence the ``Max-Age`` header. If this value is ``None``, the
``Expires`` cookie value will be unset (unless ``max_age`` is set).
If ``max_age`` is set, it will be used to generate the ``expires``
and this value is ignored.
``overwrite``
If this key is ``True``, before setting the cookie, unset any
existing cookie.
"""
if overwrite:
self.unset_cookie(name, strict=False)
# If expires is set, but not max_age we set max_age to expires
if not max_age and isinstance(expires, timedelta):
max_age = expires
# expires can also be a datetime
if not max_age and isinstance(expires, datetime):
max_age = expires - datetime.utcnow()
value = bytes_(value, 'utf-8')
cookie = make_cookie(name, value, max_age=max_age, path=path,
domain=domain, secure=secure, httponly=httponly,
comment=comment)
self.headerlist.append(('Set-Cookie', cookie))
def delete_cookie(self, name, path='/', domain=None):
"""
Delete a cookie from the client. Note that path and domain must match
how the cookie was originally set.
This sets the cookie to the empty string, and max_age=0 so
that it should expire immediately.
"""
self.set_cookie(name, None, path=path, domain=domain)
def unset_cookie(self, name, strict=True):
"""
Unset a cookie with the given name (remove it from the
response).
"""
existing = self.headers.getall('Set-Cookie')
if not existing and not strict:
return
cookies = Cookie()
for header in existing:
cookies.load(header)
if isinstance(name, text_type):
name = name.encode('utf8')
if name in cookies:
del cookies[name]
del self.headers['Set-Cookie']
for m in cookies.values():
self.headerlist.append(('Set-Cookie', m.serialize()))
elif strict:
raise KeyError("No cookie has been set with the name %r" % name)
def merge_cookies(self, resp):
"""Merge the cookies that were set on this response with the
given `resp` object (which can be any WSGI application).
If the `resp` is a :class:`webob.Response` object, then the
other object will be modified in-place.
"""
if not self.headers.get('Set-Cookie'):
return resp
if isinstance(resp, Response):
for header in self.headers.getall('Set-Cookie'):
resp.headers.add('Set-Cookie', header)
return resp
else:
c_headers = [h for h in self.headerlist if
h[0].lower() == 'set-cookie']
def repl_app(environ, start_response):
def repl_start_response(status, headers, exc_info=None):
return start_response(status, headers+c_headers,
exc_info=exc_info)
return resp(environ, repl_start_response)
return repl_app
#
# cache_control
#
_cache_control_obj = None
def _cache_control__get(self):
"""
Get/set/modify the Cache-Control header (`HTTP spec section 14.9
<http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9>`_)
"""
value = self.headers.get('cache-control', '')
if self._cache_control_obj is None:
self._cache_control_obj = CacheControl.parse(
value, updates_to=self._update_cache_control, type='response')
self._cache_control_obj.header_value = value
if self._cache_control_obj.header_value != value:
new_obj = CacheControl.parse(value, type='response')
self._cache_control_obj.properties.clear()
self._cache_control_obj.properties.update(new_obj.properties)
self._cache_control_obj.header_value = value
return self._cache_control_obj
def _cache_control__set(self, value):
# This actually becomes a copy
if not value:
value = ""
if isinstance(value, dict):
value = CacheControl(value, 'response')
if isinstance(value, text_type):
value = str(value)
if isinstance(value, str):
if self._cache_control_obj is None:
self.headers['Cache-Control'] = value
return
value = CacheControl.parse(value, 'response')
cache = self.cache_control
cache.properties.clear()
cache.properties.update(value.properties)
def _cache_control__del(self):
self.cache_control = {}
def _update_cache_control(self, prop_dict):
value = serialize_cache_control(prop_dict)
if not value:
if 'Cache-Control' in self.headers:
del self.headers['Cache-Control']
else:
self.headers['Cache-Control'] = value
cache_control = property(
_cache_control__get, _cache_control__set,
_cache_control__del, doc=_cache_control__get.__doc__)
#
# cache_expires
#
def _cache_expires(self, seconds=0, **kw):
"""
Set expiration on this request. This sets the response to
expire in the given seconds, and any other attributes are used
for cache_control (e.g., private=True, etc).
"""
if seconds is True:
seconds = 0
elif isinstance(seconds, timedelta):
seconds = timedelta_to_seconds(seconds)
cache_control = self.cache_control
if seconds is None:
pass
elif not seconds:
# To really expire something, you have to force a
# bunch of these cache control attributes, and IE may
# not pay attention to those still so we also set
# Expires.
cache_control.no_store = True
cache_control.no_cache = True
cache_control.must_revalidate = True
cache_control.max_age = 0
cache_control.post_check = 0
cache_control.pre_check = 0
self.expires = datetime.utcnow()
if 'last-modified' not in self.headers:
self.last_modified = datetime.utcnow()
self.pragma = 'no-cache'
else:
cache_control.properties.clear()
cache_control.max_age = seconds
self.expires = datetime.utcnow() + timedelta(seconds=seconds)
self.pragma = None
for name, value in kw.items():
setattr(cache_control, name, value)
cache_expires = property(lambda self: self._cache_expires, _cache_expires)
#
# encode_content, decode_content, md5_etag
#
def encode_content(self, encoding='gzip', lazy=False):
"""
Encode the content with the given encoding (only gzip and
identity are supported).
"""
assert encoding in ('identity', 'gzip'), \
"Unknown encoding: %r" % encoding
if encoding == 'identity':
self.decode_content()
return
if self.content_encoding == 'gzip':
return
if lazy:
self.app_iter = gzip_app_iter(self._app_iter)
self.content_length = None
else:
self.app_iter = list(gzip_app_iter(self._app_iter))
self.content_length = sum(map(len, self._app_iter))
self.content_encoding = 'gzip'
def decode_content(self):
content_encoding = self.content_encoding or 'identity'
if content_encoding == 'identity':
return
if content_encoding not in ('gzip', 'deflate'):
raise ValueError(
"I don't know how to decode the content %s" % content_encoding)
if content_encoding == 'gzip':
from gzip import GzipFile
from io import BytesIO
gzip_f = GzipFile(filename='', mode='r', fileobj=BytesIO(self.body))
self.body = gzip_f.read()
self.content_encoding = None
gzip_f.close()
else:
# Weird feature: http://bugs.python.org/issue5784
self.body = zlib.decompress(self.body, -15)
self.content_encoding = None
def md5_etag(self, body=None, set_content_md5=False):
"""
Generate an etag for the response object using an MD5 hash of
the body (the body parameter, or ``self.body`` if not given)
Sets ``self.etag``
If ``set_content_md5`` is True sets ``self.content_md5`` as well
"""
if body is None:
body = self.body
md5_digest = md5(body).digest()
md5_digest = b64encode(md5_digest)
md5_digest = md5_digest.replace(b'\n', b'')
md5_digest = native_(md5_digest)
self.etag = md5_digest.strip('=')
if set_content_md5:
self.content_md5 = md5_digest
#
# __call__, conditional_response_app
#
def __call__(self, environ, start_response):
"""
WSGI application interface
"""
if self.conditional_response:
return self.conditional_response_app(environ, start_response)
headerlist = self._abs_headerlist(environ)
start_response(self.status, headerlist)
if environ['REQUEST_METHOD'] == 'HEAD':
# Special case here...
return EmptyResponse(self._app_iter)
return self._app_iter
def _abs_headerlist(self, environ):
"""Returns a headerlist, with the Location header possibly
made absolute given the request environ.
"""
headerlist = list(self.headerlist)
for i, (name, value) in enumerate(headerlist):
if name.lower() == 'location':
if SCHEME_RE.search(value):
break
new_location = urlparse.urljoin(_request_uri(environ), value)
headerlist[i] = (name, new_location)
break
return headerlist
_safe_methods = ('GET', 'HEAD')
def conditional_response_app(self, environ, start_response):
"""
Like the normal __call__ interface, but checks conditional headers:
* If-Modified-Since (304 Not Modified; only on GET, HEAD)
* If-None-Match (304 Not Modified; only on GET, HEAD)
* Range (406 Partial Content; only on GET, HEAD)
"""
req = BaseRequest(environ)
headerlist = self._abs_headerlist(environ)
method = environ.get('REQUEST_METHOD', 'GET')
if method in self._safe_methods:
status304 = False
if req.if_none_match and self.etag:
status304 = self.etag in req.if_none_match
elif req.if_modified_since and self.last_modified:
status304 = self.last_modified <= req.if_modified_since
if status304:
start_response('304 Not Modified', filter_headers(headerlist))
return EmptyResponse(self._app_iter)
if (req.range and self in req.if_range
and self.content_range is None
and method in ('HEAD', 'GET')
and self.status_code == 200
and self.content_length is not None
):
content_range = req.range.content_range(self.content_length)
if content_range is None:
iter_close(self._app_iter)
body = bytes_("Requested range not satisfiable: %s" % req.range)
headerlist = [
('Content-Length', str(len(body))),
('Content-Range', str(ContentRange(None, None,
self.content_length))),
('Content-Type', 'text/plain'),
] + filter_headers(headerlist)
start_response('416 Requested Range Not Satisfiable',
headerlist)
if method == 'HEAD':
return ()
return [body]
else:
app_iter = self.app_iter_range(content_range.start,
content_range.stop)
if app_iter is not None:
# the following should be guaranteed by
# Range.range_for_length(length)
assert content_range.start is not None
headerlist = [
('Content-Length',
str(content_range.stop - content_range.start)),
('Content-Range', str(content_range)),
] + filter_headers(headerlist, ('content-length',))
start_response('206 Partial Content', headerlist)
if method == 'HEAD':
return EmptyResponse(app_iter)
return app_iter
start_response(self.status, headerlist)
if method == 'HEAD':
return EmptyResponse(self._app_iter)
return self._app_iter
def app_iter_range(self, start, stop):
"""
Return a new app_iter built from the response app_iter, that
serves up only the given ``start:stop`` range.
"""
app_iter = self._app_iter
if hasattr(app_iter, 'app_iter_range'):
return app_iter.app_iter_range(start, stop)
return AppIterRange(app_iter, start, stop)
def filter_headers(hlist, remove_headers=('content-length', 'content-type')):
return [h for h in hlist if (h[0].lower() not in remove_headers)]
def iter_file(file, block_size=1<<18): # 256Kb
while True:
data = file.read(block_size)
if not data:
break
yield data
class ResponseBodyFile(object):
mode = 'wb'
closed = False
def __init__(self, response):
self.response = response
self.write = response.write
def __repr__(self):
return '<body_file for %r>' % self.response
encoding = property(
lambda self: self.response.charset,
doc="The encoding of the file (inherited from response.charset)"
)
def writelines(self, seq):
for item in seq:
self.write(item)
def close(self):
raise NotImplementedError("Response bodies cannot be closed")
def flush(self):
pass
class AppIterRange(object):
"""
Wraps an app_iter, returning just a range of bytes
"""
def __init__(self, app_iter, start, stop):
assert start >= 0, "Bad start: %r" % start
assert stop is None or (stop >= 0 and stop >= start), (
"Bad stop: %r" % stop)
self.app_iter = iter(app_iter)
self._pos = 0 # position in app_iter
self.start = start
self.stop = stop
def __iter__(self):
return self
def _skip_start(self):
start, stop = self.start, self.stop
for chunk in self.app_iter:
self._pos += len(chunk)
if self._pos < start:
continue
elif self._pos == start:
return b''
else:
chunk = chunk[start-self._pos:]
if stop is not None and self._pos > stop:
chunk = chunk[:stop-self._pos]
assert len(chunk) == stop - start
return chunk
else:
raise StopIteration()
def next(self):
if self._pos < self.start:
# need to skip some leading bytes
return self._skip_start()
stop = self.stop
if stop is not None and self._pos >= stop:
raise StopIteration
chunk = next(self.app_iter)
self._pos += len(chunk)
if stop is None or self._pos <= stop:
return chunk
else:
return chunk[:stop-self._pos]
__next__ = next # py3
def close(self):
iter_close(self.app_iter)
class EmptyResponse(object):
"""An empty WSGI response.
An iterator that immediately stops. Optionally provides a close
method to close an underlying app_iter it replaces.
"""
def __init__(self, app_iter=None):
if app_iter is not None and hasattr(app_iter, 'close'):
self.close = app_iter.close
def __iter__(self):
return self
def __len__(self):
return 0
def next(self):
raise StopIteration()
__next__ = next # py3
def _request_uri(environ):
"""Like wsgiref.url.request_uri, except eliminates :80 ports
Return the full request URI"""
url = environ['wsgi.url_scheme']+'://'
if environ.get('HTTP_HOST'):
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME'] + ':' + environ['SERVER_PORT']
if url.endswith(':80') and environ['wsgi.url_scheme'] == 'http':
url = url[:-3]
elif url.endswith(':443') and environ['wsgi.url_scheme'] == 'https':
url = url[:-4]
if PY3: # pragma: no cover
script_name = bytes_(environ.get('SCRIPT_NAME', '/'), 'latin-1')
path_info = bytes_(environ.get('PATH_INFO', ''), 'latin-1')
else:
script_name = environ.get('SCRIPT_NAME', '/')
path_info = environ.get('PATH_INFO', '')
url += url_quote(script_name)
qpath_info = url_quote(path_info)
if not 'SCRIPT_NAME' in environ:
url += qpath_info[1:]
else:
url += qpath_info
return url
def iter_close(iter):
if hasattr(iter, 'close'):
iter.close()
def gzip_app_iter(app_iter):
size = 0
crc = zlib.crc32(b"") & 0xffffffff
compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield _gzip_header
for item in app_iter:
size += len(item)
crc = zlib.crc32(item, crc) & 0xffffffff
# The compress function may return zero length bytes if the input is
# small enough; it buffers the input for the next iteration or for a
# flush.
result = compress.compress(item)
if result:
yield result
# Similarly, flush may also not yield a value.
result = compress.flush()
if result:
yield result
yield struct.pack("<2L", crc, size & 0xffffffff)
def _error_unicode_in_app_iter(app_iter, body):
app_iter_repr = repr(app_iter)
if len(app_iter_repr) > 50:
app_iter_repr = (
app_iter_repr[:30] + '...' + app_iter_repr[-10:])
raise TypeError(
'An item of the app_iter (%s) was text, causing a '
'text body: %r' % (app_iter_repr, body))