| # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) |
| # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php |
| """ |
| Routines for testing WSGI applications. |
| |
| Most interesting is the `TestApp <class-paste.fixture.TestApp.html>`_ |
| for testing WSGI applications, and the `TestFileEnvironment |
| <class-paste.fixture.TestFileEnvironment.html>`_ class for testing the |
| effects of command-line scripts. |
| """ |
| |
| from __future__ import print_function |
| |
| import sys |
| import random |
| import mimetypes |
| import time |
| import os |
| import shutil |
| import smtplib |
| import shlex |
| import re |
| import six |
| import subprocess |
| from six.moves import cStringIO as StringIO |
| from six.moves.urllib.parse import urlencode |
| from six.moves.urllib import parse as urlparse |
| try: |
| # Python 3 |
| from http.cookies import BaseCookie |
| from urllib.parse import splittype, splithost |
| except ImportError: |
| # Python 2 |
| from Cookie import BaseCookie |
| from urllib import splittype, splithost |
| |
| from paste import wsgilib |
| from paste import lint |
| from paste.response import HeaderDict |
| |
| def tempnam_no_warning(*args): |
| """ |
| An os.tempnam with the warning turned off, because sometimes |
| you just need to use this and don't care about the stupid |
| security warning. |
| """ |
| return os.tempnam(*args) |
| |
| class NoDefault(object): |
| pass |
| |
| def sorted(l): |
| l = list(l) |
| l.sort() |
| return l |
| |
| class Dummy_smtplib(object): |
| |
| existing = None |
| |
| def __init__(self, server): |
| import warnings |
| warnings.warn( |
| 'Dummy_smtplib is not maintained and is deprecated', |
| DeprecationWarning, 2) |
| assert not self.existing, ( |
| "smtplib.SMTP() called again before Dummy_smtplib.existing.reset() " |
| "called.") |
| self.server = server |
| self.open = True |
| self.__class__.existing = self |
| |
| def quit(self): |
| assert self.open, ( |
| "Called %s.quit() twice" % self) |
| self.open = False |
| |
| def sendmail(self, from_address, to_addresses, msg): |
| self.from_address = from_address |
| self.to_addresses = to_addresses |
| self.message = msg |
| |
| def install(cls): |
| smtplib.SMTP = cls |
| |
| install = classmethod(install) |
| |
| def reset(self): |
| assert not self.open, ( |
| "SMTP connection not quit") |
| self.__class__.existing = None |
| |
| class AppError(Exception): |
| pass |
| |
| class TestApp(object): |
| |
| # for py.test |
| disabled = True |
| |
| def __init__(self, app, namespace=None, relative_to=None, |
| extra_environ=None, pre_request_hook=None, |
| post_request_hook=None): |
| """ |
| Wraps a WSGI application in a more convenient interface for |
| testing. |
| |
| ``app`` may be an application, or a Paste Deploy app |
| URI, like ``'config:filename.ini#test'``. |
| |
| ``namespace`` is a dictionary that will be written to (if |
| provided). This can be used with doctest or some other |
| system, and the variable ``res`` will be assigned everytime |
| you make a request (instead of returning the request). |
| |
| ``relative_to`` is a directory, and filenames used for file |
| uploads are calculated relative to this. Also ``config:`` |
| URIs that aren't absolute. |
| |
| ``extra_environ`` is a dictionary of values that should go |
| into the environment for each request. These can provide a |
| communication channel with the application. |
| |
| ``pre_request_hook`` is a function to be called prior to |
| making requests (such as ``post`` or ``get``). This function |
| must take one argument (the instance of the TestApp). |
| |
| ``post_request_hook`` is a function, similar to |
| ``pre_request_hook``, to be called after requests are made. |
| """ |
| if isinstance(app, (six.binary_type, six.text_type)): |
| from paste.deploy import loadapp |
| # @@: Should pick up relative_to from calling module's |
| # __file__ |
| app = loadapp(app, relative_to=relative_to) |
| self.app = app |
| self.namespace = namespace |
| self.relative_to = relative_to |
| if extra_environ is None: |
| extra_environ = {} |
| self.extra_environ = extra_environ |
| self.pre_request_hook = pre_request_hook |
| self.post_request_hook = post_request_hook |
| self.reset() |
| |
| def reset(self): |
| """ |
| Resets the state of the application; currently just clears |
| saved cookies. |
| """ |
| self.cookies = {} |
| |
| def _make_environ(self): |
| environ = self.extra_environ.copy() |
| environ['paste.throw_errors'] = True |
| return environ |
| |
| def get(self, url, params=None, headers=None, extra_environ=None, |
| status=None, expect_errors=False): |
| """ |
| Get the given url (well, actually a path like |
| ``'/page.html'``). |
| |
| ``params``: |
| A query string, or a dictionary that will be encoded |
| into a query string. You may also include a query |
| string on the ``url``. |
| |
| ``headers``: |
| A dictionary of extra headers to send. |
| |
| ``extra_environ``: |
| A dictionary of environmental variables that should |
| be added to the request. |
| |
| ``status``: |
| The integer status code you expect (if not 200 or 3xx). |
| If you expect a 404 response, for instance, you must give |
| ``status=404`` or it will be an error. You can also give |
| a wildcard, like ``'3*'`` or ``'*'``. |
| |
| ``expect_errors``: |
| If this is not true, then if anything is written to |
| ``wsgi.errors`` it will be an error. If it is true, then |
| non-200/3xx responses are also okay. |
| |
| Returns a `response object |
| <class-paste.fixture.TestResponse.html>`_ |
| """ |
| if extra_environ is None: |
| extra_environ = {} |
| # Hide from py.test: |
| __tracebackhide__ = True |
| if params: |
| if not isinstance(params, (six.binary_type, six.text_type)): |
| params = urlencode(params, doseq=True) |
| if '?' in url: |
| url += '&' |
| else: |
| url += '?' |
| url += params |
| environ = self._make_environ() |
| url = str(url) |
| if '?' in url: |
| url, environ['QUERY_STRING'] = url.split('?', 1) |
| else: |
| environ['QUERY_STRING'] = '' |
| self._set_headers(headers, environ) |
| environ.update(extra_environ) |
| req = TestRequest(url, environ, expect_errors) |
| return self.do_request(req, status=status) |
| |
| def _gen_request(self, method, url, params=b'', headers=None, extra_environ=None, |
| status=None, upload_files=None, expect_errors=False): |
| """ |
| Do a generic request. |
| """ |
| if headers is None: |
| headers = {} |
| if extra_environ is None: |
| extra_environ = {} |
| environ = self._make_environ() |
| # @@: Should this be all non-strings? |
| if isinstance(params, (list, tuple, dict)): |
| params = urlencode(params) |
| if hasattr(params, 'items'): |
| # Some other multi-dict like format |
| params = urlencode(params.items()) |
| if six.PY3: |
| params = params.encode('utf8') |
| if upload_files: |
| params = urlparse.parse_qsl(params, keep_blank_values=True) |
| content_type, params = self.encode_multipart( |
| params, upload_files) |
| environ['CONTENT_TYPE'] = content_type |
| elif params: |
| environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') |
| if '?' in url: |
| url, environ['QUERY_STRING'] = url.split('?', 1) |
| else: |
| environ['QUERY_STRING'] = '' |
| environ['CONTENT_LENGTH'] = str(len(params)) |
| environ['REQUEST_METHOD'] = method |
| environ['wsgi.input'] = six.BytesIO(params) |
| self._set_headers(headers, environ) |
| environ.update(extra_environ) |
| req = TestRequest(url, environ, expect_errors) |
| return self.do_request(req, status=status) |
| |
| def post(self, url, params=b'', headers=None, extra_environ=None, |
| status=None, upload_files=None, expect_errors=False): |
| """ |
| Do a POST request. Very like the ``.get()`` method. |
| ``params`` are put in the body of the request. |
| |
| ``upload_files`` is for file uploads. It should be a list of |
| ``[(fieldname, filename, file_content)]``. You can also use |
| just ``[(fieldname, filename)]`` and the file content will be |
| read from disk. |
| |
| Returns a `response object |
| <class-paste.fixture.TestResponse.html>`_ |
| """ |
| return self._gen_request('POST', url, params=params, headers=headers, |
| extra_environ=extra_environ,status=status, |
| upload_files=upload_files, |
| expect_errors=expect_errors) |
| |
| def put(self, url, params=b'', headers=None, extra_environ=None, |
| status=None, upload_files=None, expect_errors=False): |
| """ |
| Do a PUT request. Very like the ``.get()`` method. |
| ``params`` are put in the body of the request. |
| |
| ``upload_files`` is for file uploads. It should be a list of |
| ``[(fieldname, filename, file_content)]``. You can also use |
| just ``[(fieldname, filename)]`` and the file content will be |
| read from disk. |
| |
| Returns a `response object |
| <class-paste.fixture.TestResponse.html>`_ |
| """ |
| return self._gen_request('PUT', url, params=params, headers=headers, |
| extra_environ=extra_environ,status=status, |
| upload_files=upload_files, |
| expect_errors=expect_errors) |
| |
| def delete(self, url, params=b'', headers=None, extra_environ=None, |
| status=None, expect_errors=False): |
| """ |
| Do a DELETE request. Very like the ``.get()`` method. |
| ``params`` are put in the body of the request. |
| |
| Returns a `response object |
| <class-paste.fixture.TestResponse.html>`_ |
| """ |
| return self._gen_request('DELETE', url, params=params, headers=headers, |
| extra_environ=extra_environ,status=status, |
| upload_files=None, expect_errors=expect_errors) |
| |
| |
| |
| |
| def _set_headers(self, headers, environ): |
| """ |
| Turn any headers into environ variables |
| """ |
| if not headers: |
| return |
| for header, value in headers.items(): |
| if header.lower() == 'content-type': |
| var = 'CONTENT_TYPE' |
| elif header.lower() == 'content-length': |
| var = 'CONTENT_LENGTH' |
| else: |
| var = 'HTTP_%s' % header.replace('-', '_').upper() |
| environ[var] = value |
| |
| def encode_multipart(self, params, files): |
| """ |
| Encodes a set of parameters (typically a name/value list) and |
| a set of files (a list of (name, filename, file_body)) into a |
| typical POST body, returning the (content_type, body). |
| """ |
| boundary = '----------a_BoUnDaRy%s$' % random.random() |
| content_type = 'multipart/form-data; boundary=%s' % boundary |
| if six.PY3: |
| boundary = boundary.encode('ascii') |
| |
| lines = [] |
| for key, value in params: |
| lines.append(b'--'+boundary) |
| line = 'Content-Disposition: form-data; name="%s"' % key |
| if six.PY3: |
| line = line.encode('utf8') |
| lines.append(line) |
| lines.append(b'') |
| line = value |
| if six.PY3 and isinstance(line, six.text_type): |
| line = line.encode('utf8') |
| lines.append(line) |
| for file_info in files: |
| key, filename, value = self._get_file_info(file_info) |
| lines.append(b'--'+boundary) |
| line = ('Content-Disposition: form-data; name="%s"; filename="%s"' |
| % (key, filename)) |
| if six.PY3: |
| line = line.encode('utf8') |
| lines.append(line) |
| fcontent = mimetypes.guess_type(filename)[0] |
| line = ('Content-Type: %s' |
| % (fcontent or 'application/octet-stream')) |
| if six.PY3: |
| line = line.encode('utf8') |
| lines.append(line) |
| lines.append(b'') |
| lines.append(value) |
| lines.append(b'--' + boundary + b'--') |
| lines.append(b'') |
| body = b'\r\n'.join(lines) |
| return content_type, body |
| |
| def _get_file_info(self, file_info): |
| if len(file_info) == 2: |
| # It only has a filename |
| filename = file_info[1] |
| if self.relative_to: |
| filename = os.path.join(self.relative_to, filename) |
| f = open(filename, 'rb') |
| content = f.read() |
| f.close() |
| return (file_info[0], filename, content) |
| elif len(file_info) == 3: |
| return file_info |
| else: |
| raise ValueError( |
| "upload_files need to be a list of tuples of (fieldname, " |
| "filename, filecontent) or (fieldname, filename); " |
| "you gave: %r" |
| % repr(file_info)[:100]) |
| |
| def do_request(self, req, status): |
| """ |
| Executes the given request (``req``), with the expected |
| ``status``. Generally ``.get()`` and ``.post()`` are used |
| instead. |
| """ |
| if self.pre_request_hook: |
| self.pre_request_hook(self) |
| __tracebackhide__ = True |
| if self.cookies: |
| c = BaseCookie() |
| for name, value in self.cookies.items(): |
| c[name] = value |
| hc = '; '.join(['='.join([m.key, m.value]) for m in c.values()]) |
| req.environ['HTTP_COOKIE'] = hc |
| req.environ['paste.testing'] = True |
| req.environ['paste.testing_variables'] = {} |
| app = lint.middleware(self.app) |
| old_stdout = sys.stdout |
| out = CaptureStdout(old_stdout) |
| try: |
| sys.stdout = out |
| start_time = time.time() |
| raise_on_wsgi_error = not req.expect_errors |
| raw_res = wsgilib.raw_interactive( |
| app, req.url, |
| raise_on_wsgi_error=raise_on_wsgi_error, |
| **req.environ) |
| end_time = time.time() |
| finally: |
| sys.stdout = old_stdout |
| sys.stderr.write(out.getvalue()) |
| res = self._make_response(raw_res, end_time - start_time) |
| res.request = req |
| for name, value in req.environ['paste.testing_variables'].items(): |
| if hasattr(res, name): |
| raise ValueError( |
| "paste.testing_variables contains the variable %r, but " |
| "the response object already has an attribute by that " |
| "name" % name) |
| setattr(res, name, value) |
| if self.namespace is not None: |
| self.namespace['res'] = res |
| if not req.expect_errors: |
| self._check_status(status, res) |
| self._check_errors(res) |
| res.cookies_set = {} |
| for header in res.all_headers('set-cookie'): |
| c = BaseCookie(header) |
| for key, morsel in c.items(): |
| self.cookies[key] = morsel.value |
| res.cookies_set[key] = morsel.value |
| if self.post_request_hook: |
| self.post_request_hook(self) |
| if self.namespace is None: |
| # It's annoying to return the response in doctests, as it'll |
| # be printed, so we only return it is we couldn't assign |
| # it anywhere |
| return res |
| |
| def _check_status(self, status, res): |
| __tracebackhide__ = True |
| if status == '*': |
| return |
| if isinstance(status, (list, tuple)): |
| if res.status not in status: |
| raise AppError( |
| "Bad response: %s (not one of %s for %s)\n%s" |
| % (res.full_status, ', '.join(map(str, status)), |
| res.request.url, res.body)) |
| return |
| if status is None: |
| if res.status >= 200 and res.status < 400: |
| return |
| body = res.body |
| if six.PY3: |
| body = body.decode('utf8', 'xmlcharrefreplace') |
| raise AppError( |
| "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s" |
| % (res.full_status, res.request.url, |
| body)) |
| if status != res.status: |
| raise AppError( |
| "Bad response: %s (not %s)" % (res.full_status, status)) |
| |
| def _check_errors(self, res): |
| if res.errors: |
| raise AppError( |
| "Application had errors logged:\n%s" % res.errors) |
| |
| def _make_response(self, resp, total_time): |
| status, headers, body, errors = resp |
| return TestResponse(self, status, headers, body, errors, |
| total_time) |
| |
| class CaptureStdout(object): |
| |
| def __init__(self, actual): |
| self.captured = StringIO() |
| self.actual = actual |
| |
| def write(self, s): |
| self.captured.write(s) |
| self.actual.write(s) |
| |
| def flush(self): |
| self.actual.flush() |
| |
| def writelines(self, lines): |
| for item in lines: |
| self.write(item) |
| |
| def getvalue(self): |
| return self.captured.getvalue() |
| |
| class TestResponse(object): |
| |
| # for py.test |
| disabled = True |
| |
| """ |
| Instances of this class are return by `TestApp |
| <class-paste.fixture.TestApp.html>`_ |
| """ |
| |
| def __init__(self, test_app, status, headers, body, errors, |
| total_time): |
| self.test_app = test_app |
| self.status = int(status.split()[0]) |
| self.full_status = status |
| self.headers = headers |
| self.header_dict = HeaderDict.fromlist(self.headers) |
| self.body = body |
| self.errors = errors |
| self._normal_body = None |
| self.time = total_time |
| self._forms_indexed = None |
| |
| def forms__get(self): |
| """ |
| Returns a dictionary of ``Form`` objects. Indexes are both in |
| order (from zero) and by form id (if the form is given an id). |
| """ |
| if self._forms_indexed is None: |
| self._parse_forms() |
| return self._forms_indexed |
| |
| forms = property(forms__get, |
| doc=""" |
| A list of <form>s found on the page (instances of |
| `Form <class-paste.fixture.Form.html>`_) |
| """) |
| |
| def form__get(self): |
| forms = self.forms |
| if not forms: |
| raise TypeError( |
| "You used response.form, but no forms exist") |
| if 1 in forms: |
| # There is more than one form |
| raise TypeError( |
| "You used response.form, but more than one form exists") |
| return forms[0] |
| |
| form = property(form__get, |
| doc=""" |
| Returns a single `Form |
| <class-paste.fixture.Form.html>`_ instance; it |
| is an error if there are multiple forms on the |
| page. |
| """) |
| |
| _tag_re = re.compile(r'<(/?)([:a-z0-9_\-]*)(.*?)>', re.S|re.I) |
| |
| def _parse_forms(self): |
| forms = self._forms_indexed = {} |
| form_texts = [] |
| started = None |
| for match in self._tag_re.finditer(self.body): |
| end = match.group(1) == '/' |
| tag = match.group(2).lower() |
| if tag != 'form': |
| continue |
| if end: |
| assert started, ( |
| "</form> unexpected at %s" % match.start()) |
| form_texts.append(self.body[started:match.end()]) |
| started = None |
| else: |
| assert not started, ( |
| "Nested form tags at %s" % match.start()) |
| started = match.start() |
| assert not started, ( |
| "Danging form: %r" % self.body[started:]) |
| for i, text in enumerate(form_texts): |
| form = Form(self, text) |
| forms[i] = form |
| if form.id: |
| forms[form.id] = form |
| |
| def header(self, name, default=NoDefault): |
| """ |
| Returns the named header; an error if there is not exactly one |
| matching header (unless you give a default -- always an error |
| if there is more than one header) |
| """ |
| found = None |
| for cur_name, value in self.headers: |
| if cur_name.lower() == name.lower(): |
| assert not found, ( |
| "Ambiguous header: %s matches %r and %r" |
| % (name, found, value)) |
| found = value |
| if found is None: |
| if default is NoDefault: |
| raise KeyError( |
| "No header found: %r (from %s)" |
| % (name, ', '.join([n for n, v in self.headers]))) |
| else: |
| return default |
| return found |
| |
| def all_headers(self, name): |
| """ |
| Gets all headers by the ``name``, returns as a list |
| """ |
| found = [] |
| for cur_name, value in self.headers: |
| if cur_name.lower() == name.lower(): |
| found.append(value) |
| return found |
| |
| def follow(self, **kw): |
| """ |
| If this request is a redirect, follow that redirect. It |
| is an error if this is not a redirect response. Returns |
| another response object. |
| """ |
| assert self.status >= 300 and self.status < 400, ( |
| "You can only follow redirect responses (not %s)" |
| % self.full_status) |
| location = self.header('location') |
| type, rest = splittype(location) |
| host, path = splithost(rest) |
| # @@: We should test that it's not a remote redirect |
| return self.test_app.get(location, **kw) |
| |
| def click(self, description=None, linkid=None, href=None, |
| anchor=None, index=None, verbose=False): |
| """ |
| Click the link as described. Each of ``description``, |
| ``linkid``, and ``url`` are *patterns*, meaning that they are |
| either strings (regular expressions), compiled regular |
| expressions (objects with a ``search`` method), or callables |
| returning true or false. |
| |
| All the given patterns are ANDed together: |
| |
| * ``description`` is a pattern that matches the contents of the |
| anchor (HTML and all -- everything between ``<a...>`` and |
| ``</a>``) |
| |
| * ``linkid`` is a pattern that matches the ``id`` attribute of |
| the anchor. It will receive the empty string if no id is |
| given. |
| |
| * ``href`` is a pattern that matches the ``href`` of the anchor; |
| the literal content of that attribute, not the fully qualified |
| attribute. |
| |
| * ``anchor`` is a pattern that matches the entire anchor, with |
| its contents. |
| |
| If more than one link matches, then the ``index`` link is |
| followed. If ``index`` is not given and more than one link |
| matches, or if no link matches, then ``IndexError`` will be |
| raised. |
| |
| If you give ``verbose`` then messages will be printed about |
| each link, and why it does or doesn't match. If you use |
| ``app.click(verbose=True)`` you'll see a list of all the |
| links. |
| |
| You can use multiple criteria to essentially assert multiple |
| aspects about the link, e.g., where the link's destination is. |
| """ |
| __tracebackhide__ = True |
| found_html, found_desc, found_attrs = self._find_element( |
| tag='a', href_attr='href', |
| href_extract=None, |
| content=description, |
| id=linkid, |
| href_pattern=href, |
| html_pattern=anchor, |
| index=index, verbose=verbose) |
| return self.goto(found_attrs['uri']) |
| |
| def clickbutton(self, description=None, buttonid=None, href=None, |
| button=None, index=None, verbose=False): |
| """ |
| Like ``.click()``, except looks for link-like buttons. |
| This kind of button should look like |
| ``<button onclick="...location.href='url'...">``. |
| """ |
| __tracebackhide__ = True |
| found_html, found_desc, found_attrs = self._find_element( |
| tag='button', href_attr='onclick', |
| href_extract=re.compile(r"location\.href='(.*?)'"), |
| content=description, |
| id=buttonid, |
| href_pattern=href, |
| html_pattern=button, |
| index=index, verbose=verbose) |
| return self.goto(found_attrs['uri']) |
| |
| def _find_element(self, tag, href_attr, href_extract, |
| content, id, |
| href_pattern, |
| html_pattern, |
| index, verbose): |
| content_pat = _make_pattern(content) |
| id_pat = _make_pattern(id) |
| href_pat = _make_pattern(href_pattern) |
| html_pat = _make_pattern(html_pattern) |
| |
| _tag_re = re.compile(r'<%s\s+(.*?)>(.*?)</%s>' % (tag, tag), |
| re.I+re.S) |
| |
| def printlog(s): |
| if verbose: |
| print(s) |
| |
| found_links = [] |
| total_links = 0 |
| for match in _tag_re.finditer(self.body): |
| el_html = match.group(0) |
| el_attr = match.group(1) |
| el_content = match.group(2) |
| attrs = _parse_attrs(el_attr) |
| if verbose: |
| printlog('Element: %r' % el_html) |
| if not attrs.get(href_attr): |
| printlog(' Skipped: no %s attribute' % href_attr) |
| continue |
| el_href = attrs[href_attr] |
| if href_extract: |
| m = href_extract.search(el_href) |
| if not m: |
| printlog(" Skipped: doesn't match extract pattern") |
| continue |
| el_href = m.group(1) |
| attrs['uri'] = el_href |
| if el_href.startswith('#'): |
| printlog(' Skipped: only internal fragment href') |
| continue |
| if el_href.startswith('javascript:'): |
| printlog(' Skipped: cannot follow javascript:') |
| continue |
| total_links += 1 |
| if content_pat and not content_pat(el_content): |
| printlog(" Skipped: doesn't match description") |
| continue |
| if id_pat and not id_pat(attrs.get('id', '')): |
| printlog(" Skipped: doesn't match id") |
| continue |
| if href_pat and not href_pat(el_href): |
| printlog(" Skipped: doesn't match href") |
| continue |
| if html_pat and not html_pat(el_html): |
| printlog(" Skipped: doesn't match html") |
| continue |
| printlog(" Accepted") |
| found_links.append((el_html, el_content, attrs)) |
| if not found_links: |
| raise IndexError( |
| "No matching elements found (from %s possible)" |
| % total_links) |
| if index is None: |
| if len(found_links) > 1: |
| raise IndexError( |
| "Multiple links match: %s" |
| % ', '.join([repr(anc) for anc, d, attr in found_links])) |
| found_link = found_links[0] |
| else: |
| try: |
| found_link = found_links[index] |
| except IndexError: |
| raise IndexError( |
| "Only %s (out of %s) links match; index %s out of range" |
| % (len(found_links), total_links, index)) |
| return found_link |
| |
| def goto(self, href, method='get', **args): |
| """ |
| Go to the (potentially relative) link ``href``, using the |
| given method (``'get'`` or ``'post'``) and any extra arguments |
| you want to pass to the ``app.get()`` or ``app.post()`` |
| methods. |
| |
| All hostnames and schemes will be ignored. |
| """ |
| scheme, host, path, query, fragment = urlparse.urlsplit(href) |
| # We |
| scheme = host = fragment = '' |
| href = urlparse.urlunsplit((scheme, host, path, query, fragment)) |
| href = urlparse.urljoin(self.request.full_url, href) |
| method = method.lower() |
| assert method in ('get', 'post'), ( |
| 'Only "get" or "post" are allowed for method (you gave %r)' |
| % method) |
| if method == 'get': |
| method = self.test_app.get |
| else: |
| method = self.test_app.post |
| return method(href, **args) |
| |
| _normal_body_regex = re.compile(br'[ \n\r\t]+') |
| |
| def normal_body__get(self): |
| if self._normal_body is None: |
| self._normal_body = self._normal_body_regex.sub( |
| b' ', self.body) |
| return self._normal_body |
| |
| normal_body = property(normal_body__get, |
| doc=""" |
| Return the whitespace-normalized body |
| """) |
| |
| def __contains__(self, s): |
| """ |
| A response 'contains' a string if it is present in the body |
| of the response. Whitespace is normalized when searching |
| for a string. |
| """ |
| if not isinstance(s, (six.binary_type, six.text_type)): |
| s = str(s) |
| if isinstance(s, six.text_type): |
| ## FIXME: we don't know that this response uses utf8: |
| s = s.encode('utf8') |
| return (self.body.find(s) != -1 |
| or self.normal_body.find(s) != -1) |
| |
| def mustcontain(self, *strings, **kw): |
| """ |
| Assert that the response contains all of the strings passed |
| in as arguments. |
| |
| Equivalent to:: |
| |
| assert string in res |
| """ |
| if 'no' in kw: |
| no = kw['no'] |
| del kw['no'] |
| if isinstance(no, (six.binary_type, six.text_type)): |
| no = [no] |
| else: |
| no = [] |
| if kw: |
| raise TypeError( |
| "The only keyword argument allowed is 'no'") |
| for s in strings: |
| if not s in self: |
| print("Actual response (no %r):" % s, file=sys.stderr) |
| print(self, file=sys.stderr) |
| raise IndexError( |
| "Body does not contain string %r" % s) |
| for no_s in no: |
| if no_s in self: |
| print("Actual response (has %r)" % no_s, file=sys.stderr) |
| print(self, file=sys.stderr) |
| raise IndexError( |
| "Body contains string %r" % s) |
| |
| def __repr__(self): |
| body = self.body |
| if six.PY3: |
| body = body.decode('utf8', 'xmlcharrefreplace') |
| body = body[:20] |
| return '<Response %s %r>' % (self.full_status, body) |
| |
| def __str__(self): |
| simple_body = b'\n'.join([l for l in self.body.splitlines() |
| if l.strip()]) |
| if six.PY3: |
| simple_body = simple_body.decode('utf8', 'xmlcharrefreplace') |
| return 'Response: %s\n%s\n%s' % ( |
| self.status, |
| '\n'.join(['%s: %s' % (n, v) for n, v in self.headers]), |
| simple_body) |
| |
| def showbrowser(self): |
| """ |
| Show this response in a browser window (for debugging purposes, |
| when it's hard to read the HTML). |
| """ |
| import webbrowser |
| fn = tempnam_no_warning(None, 'paste-fixture') + '.html' |
| f = open(fn, 'wb') |
| f.write(self.body) |
| f.close() |
| url = 'file:' + fn.replace(os.sep, '/') |
| webbrowser.open_new(url) |
| |
| class TestRequest(object): |
| |
| # for py.test |
| disabled = True |
| |
| """ |
| Instances of this class are created by `TestApp |
| <class-paste.fixture.TestApp.html>`_ with the ``.get()`` and |
| ``.post()`` methods, and are consumed there by ``.do_request()``. |
| |
| Instances are also available as a ``.req`` attribute on |
| `TestResponse <class-paste.fixture.TestResponse.html>`_ instances. |
| |
| Useful attributes: |
| |
| ``url``: |
| The url (actually usually the path) of the request, without |
| query string. |
| |
| ``environ``: |
| The environment dictionary used for the request. |
| |
| ``full_url``: |
| The url/path, with query string. |
| """ |
| |
| def __init__(self, url, environ, expect_errors=False): |
| if url.startswith('http://localhost'): |
| url = url[len('http://localhost'):] |
| self.url = url |
| self.environ = environ |
| if environ.get('QUERY_STRING'): |
| self.full_url = url + '?' + environ['QUERY_STRING'] |
| else: |
| self.full_url = url |
| self.expect_errors = expect_errors |
| |
| |
| class Form(object): |
| |
| """ |
| This object represents a form that has been found in a page. |
| This has a couple useful attributes: |
| |
| ``text``: |
| the full HTML of the form. |
| |
| ``action``: |
| the relative URI of the action. |
| |
| ``method``: |
| the method (e.g., ``'GET'``). |
| |
| ``id``: |
| the id, or None if not given. |
| |
| ``fields``: |
| a dictionary of fields, each value is a list of fields by |
| that name. ``<input type=\"radio\">`` and ``<select>`` are |
| both represented as single fields with multiple options. |
| """ |
| |
| # @@: This really should be using Mechanize/ClientForm or |
| # something... |
| |
| _tag_re = re.compile(r'<(/?)([:a-z0-9_\-]*)([^>]*?)>', re.I) |
| |
| def __init__(self, response, text): |
| self.response = response |
| self.text = text |
| self._parse_fields() |
| self._parse_action() |
| |
| def _parse_fields(self): |
| in_select = None |
| in_textarea = None |
| fields = {} |
| for match in self._tag_re.finditer(self.text): |
| end = match.group(1) == '/' |
| tag = match.group(2).lower() |
| if tag not in ('input', 'select', 'option', 'textarea', |
| 'button'): |
| continue |
| if tag == 'select' and end: |
| assert in_select, ( |
| '%r without starting select' % match.group(0)) |
| in_select = None |
| continue |
| if tag == 'textarea' and end: |
| assert in_textarea, ( |
| "</textarea> with no <textarea> at %s" % match.start()) |
| in_textarea[0].value = html_unquote(self.text[in_textarea[1]:match.start()]) |
| in_textarea = None |
| continue |
| if end: |
| continue |
| attrs = _parse_attrs(match.group(3)) |
| if 'name' in attrs: |
| name = attrs.pop('name') |
| else: |
| name = None |
| if tag == 'option': |
| in_select.options.append((attrs.get('value'), |
| 'selected' in attrs)) |
| continue |
| if tag == 'input' and attrs.get('type') == 'radio': |
| field = fields.get(name) |
| if not field: |
| field = Radio(self, tag, name, match.start(), **attrs) |
| fields.setdefault(name, []).append(field) |
| else: |
| field = field[0] |
| assert isinstance(field, Radio) |
| field.options.append((attrs.get('value'), |
| 'checked' in attrs)) |
| continue |
| tag_type = tag |
| if tag == 'input': |
| tag_type = attrs.get('type', 'text').lower() |
| FieldClass = Field.classes.get(tag_type, Field) |
| field = FieldClass(self, tag, name, match.start(), **attrs) |
| if tag == 'textarea': |
| assert not in_textarea, ( |
| "Nested textareas: %r and %r" |
| % (in_textarea, match.group(0))) |
| in_textarea = field, match.end() |
| elif tag == 'select': |
| assert not in_select, ( |
| "Nested selects: %r and %r" |
| % (in_select, match.group(0))) |
| in_select = field |
| fields.setdefault(name, []).append(field) |
| self.fields = fields |
| |
| def _parse_action(self): |
| self.action = None |
| for match in self._tag_re.finditer(self.text): |
| end = match.group(1) == '/' |
| tag = match.group(2).lower() |
| if tag != 'form': |
| continue |
| if end: |
| break |
| attrs = _parse_attrs(match.group(3)) |
| self.action = attrs.get('action', '') |
| self.method = attrs.get('method', 'GET') |
| self.id = attrs.get('id') |
| # @@: enctype? |
| else: |
| assert 0, "No </form> tag found" |
| assert self.action is not None, ( |
| "No <form> tag found") |
| |
| def __setitem__(self, name, value): |
| """ |
| Set the value of the named field. If there is 0 or multiple |
| fields by that name, it is an error. |
| |
| Setting the value of a ``<select>`` selects the given option |
| (and confirms it is an option). Setting radio fields does the |
| same. Checkboxes get boolean values. You cannot set hidden |
| fields or buttons. |
| |
| Use ``.set()`` if there is any ambiguity and you must provide |
| an index. |
| """ |
| fields = self.fields.get(name) |
| assert fields is not None, ( |
| "No field by the name %r found (fields: %s)" |
| % (name, ', '.join(map(repr, self.fields.keys())))) |
| assert len(fields) == 1, ( |
| "Multiple fields match %r: %s" |
| % (name, ', '.join(map(repr, fields)))) |
| fields[0].value = value |
| |
| def __getitem__(self, name): |
| """ |
| Get the named field object (ambiguity is an error). |
| """ |
| fields = self.fields.get(name) |
| assert fields is not None, ( |
| "No field by the name %r found" % name) |
| assert len(fields) == 1, ( |
| "Multiple fields match %r: %s" |
| % (name, ', '.join(map(repr, fields)))) |
| return fields[0] |
| |
| def set(self, name, value, index=None): |
| """ |
| Set the given name, using ``index`` to disambiguate. |
| """ |
| if index is None: |
| self[name] = value |
| else: |
| fields = self.fields.get(name) |
| assert fields is not None, ( |
| "No fields found matching %r" % name) |
| field = fields[index] |
| field.value = value |
| |
| def get(self, name, index=None, default=NoDefault): |
| """ |
| Get the named/indexed field object, or ``default`` if no field |
| is found. |
| """ |
| fields = self.fields.get(name) |
| if fields is None and default is not NoDefault: |
| return default |
| if index is None: |
| return self[name] |
| else: |
| fields = self.fields.get(name) |
| assert fields is not None, ( |
| "No fields found matching %r" % name) |
| field = fields[index] |
| return field |
| |
| def select(self, name, value, index=None): |
| """ |
| Like ``.set()``, except also confirms the target is a |
| ``<select>``. |
| """ |
| field = self.get(name, index=index) |
| assert isinstance(field, Select) |
| field.value = value |
| |
| def submit(self, name=None, index=None, **args): |
| """ |
| Submits the form. If ``name`` is given, then also select that |
| button (using ``index`` to disambiguate)``. |
| |
| Any extra keyword arguments are passed to the ``.get()`` or |
| ``.post()`` method. |
| |
| Returns a response object. |
| """ |
| fields = self.submit_fields(name, index=index) |
| return self.response.goto(self.action, method=self.method, |
| params=fields, **args) |
| |
| def submit_fields(self, name=None, index=None): |
| """ |
| Return a list of ``[(name, value), ...]`` for the current |
| state of the form. |
| """ |
| submit = [] |
| if name is not None: |
| field = self.get(name, index=index) |
| submit.append((field.name, field.value_if_submitted())) |
| for name, fields in self.fields.items(): |
| if name is None: |
| continue |
| for field in fields: |
| value = field.value |
| if value is None: |
| continue |
| submit.append((name, value)) |
| return submit |
| |
| |
| _attr_re = re.compile(r'([^= \n\r\t]+)[ \n\r\t]*(?:=[ \n\r\t]*(?:"([^"]*)"|([^"][^ \n\r\t>]*)))?', re.S) |
| |
| def _parse_attrs(text): |
| attrs = {} |
| for match in _attr_re.finditer(text): |
| attr_name = match.group(1).lower() |
| attr_body = match.group(2) or match.group(3) |
| attr_body = html_unquote(attr_body or '') |
| attrs[attr_name] = attr_body |
| return attrs |
| |
| class Field(object): |
| |
| """ |
| Field object. |
| """ |
| |
| # Dictionary of field types (select, radio, etc) to classes |
| classes = {} |
| |
| settable = True |
| |
| def __init__(self, form, tag, name, pos, |
| value=None, id=None, **attrs): |
| self.form = form |
| self.tag = tag |
| self.name = name |
| self.pos = pos |
| self._value = value |
| self.id = id |
| self.attrs = attrs |
| |
| def value__set(self, value): |
| if not self.settable: |
| raise AttributeError( |
| "You cannot set the value of the <%s> field %r" |
| % (self.tag, self.name)) |
| self._value = value |
| |
| def force_value(self, value): |
| """ |
| Like setting a value, except forces it even for, say, hidden |
| fields. |
| """ |
| self._value = value |
| |
| def value__get(self): |
| return self._value |
| |
| value = property(value__get, value__set) |
| |
| class Select(Field): |
| |
| """ |
| Field representing ``<select>`` |
| """ |
| |
| def __init__(self, *args, **attrs): |
| super(Select, self).__init__(*args, **attrs) |
| self.options = [] |
| self.multiple = attrs.get('multiple') |
| assert not self.multiple, ( |
| "<select multiple> not yet supported") |
| # Undetermined yet: |
| self.selectedIndex = None |
| |
| def value__set(self, value): |
| for i, (option, checked) in enumerate(self.options): |
| if option == str(value): |
| self.selectedIndex = i |
| break |
| else: |
| raise ValueError( |
| "Option %r not found (from %s)" |
| % (value, ', '.join( |
| [repr(o) for o, c in self.options]))) |
| |
| def value__get(self): |
| if self.selectedIndex is not None: |
| return self.options[self.selectedIndex][0] |
| else: |
| for option, checked in self.options: |
| if checked: |
| return option |
| else: |
| if self.options: |
| return self.options[0][0] |
| else: |
| return None |
| |
| value = property(value__get, value__set) |
| |
| Field.classes['select'] = Select |
| |
| class Radio(Select): |
| |
| """ |
| Field representing ``<input type="radio">`` |
| """ |
| |
| Field.classes['radio'] = Radio |
| |
| class Checkbox(Field): |
| |
| """ |
| Field representing ``<input type="checkbox">`` |
| """ |
| |
| def __init__(self, *args, **attrs): |
| super(Checkbox, self).__init__(*args, **attrs) |
| self.checked = 'checked' in attrs |
| |
| def value__set(self, value): |
| self.checked = not not value |
| |
| def value__get(self): |
| if self.checked: |
| if self._value is None: |
| return 'on' |
| else: |
| return self._value |
| else: |
| return None |
| |
| value = property(value__get, value__set) |
| |
| Field.classes['checkbox'] = Checkbox |
| |
| class Text(Field): |
| """ |
| Field representing ``<input type="text">`` |
| """ |
| def __init__(self, form, tag, name, pos, |
| value='', id=None, **attrs): |
| #text fields default to empty string |
| Field.__init__(self, form, tag, name, pos, |
| value=value, id=id, **attrs) |
| |
| Field.classes['text'] = Text |
| |
| class Textarea(Text): |
| """ |
| Field representing ``<textarea>`` |
| """ |
| |
| Field.classes['textarea'] = Textarea |
| |
| class Hidden(Text): |
| """ |
| Field representing ``<input type="hidden">`` |
| """ |
| |
| Field.classes['hidden'] = Hidden |
| |
| class Submit(Field): |
| """ |
| Field representing ``<input type="submit">`` and ``<button>`` |
| """ |
| |
| settable = False |
| |
| def value__get(self): |
| return None |
| |
| value = property(value__get) |
| |
| def value_if_submitted(self): |
| return self._value |
| |
| Field.classes['submit'] = Submit |
| |
| Field.classes['button'] = Submit |
| |
| Field.classes['image'] = Submit |
| |
| ############################################################ |
| ## Command-line testing |
| ############################################################ |
| |
| |
| class TestFileEnvironment(object): |
| |
| """ |
| This represents an environment in which files will be written, and |
| scripts will be run. |
| """ |
| |
| # for py.test |
| disabled = True |
| |
| def __init__(self, base_path, template_path=None, |
| script_path=None, |
| environ=None, cwd=None, start_clear=True, |
| ignore_paths=None, ignore_hidden=True): |
| """ |
| Creates an environment. ``base_path`` is used as the current |
| working directory, and generally where changes are looked for. |
| |
| ``template_path`` is the directory to look for *template* |
| files, which are files you'll explicitly add to the |
| environment. This is done with ``.writefile()``. |
| |
| ``script_path`` is the PATH for finding executables. Usually |
| grabbed from ``$PATH``. |
| |
| ``environ`` is the operating system environment, |
| ``os.environ`` if not given. |
| |
| ``cwd`` is the working directory, ``base_path`` by default. |
| |
| If ``start_clear`` is true (default) then the ``base_path`` |
| will be cleared (all files deleted) when an instance is |
| created. You can also use ``.clear()`` to clear the files. |
| |
| ``ignore_paths`` is a set of specific filenames that should be |
| ignored when created in the environment. ``ignore_hidden`` |
| means, if true (default) that filenames and directories |
| starting with ``'.'`` will be ignored. |
| """ |
| self.base_path = base_path |
| self.template_path = template_path |
| if environ is None: |
| environ = os.environ.copy() |
| self.environ = environ |
| if script_path is None: |
| if sys.platform == 'win32': |
| script_path = environ.get('PATH', '').split(';') |
| else: |
| script_path = environ.get('PATH', '').split(':') |
| self.script_path = script_path |
| if cwd is None: |
| cwd = base_path |
| self.cwd = cwd |
| if start_clear: |
| self.clear() |
| elif not os.path.exists(base_path): |
| os.makedirs(base_path) |
| self.ignore_paths = ignore_paths or [] |
| self.ignore_hidden = ignore_hidden |
| |
| def run(self, script, *args, **kw): |
| """ |
| Run the command, with the given arguments. The ``script`` |
| argument can have space-separated arguments, or you can use |
| the positional arguments. |
| |
| Keywords allowed are: |
| |
| ``expect_error``: (default False) |
| Don't raise an exception in case of errors |
| ``expect_stderr``: (default ``expect_error``) |
| Don't raise an exception if anything is printed to stderr |
| ``stdin``: (default ``""``) |
| Input to the script |
| ``printresult``: (default True) |
| Print the result after running |
| ``cwd``: (default ``self.cwd``) |
| The working directory to run in |
| |
| Returns a `ProcResponse |
| <class-paste.fixture.ProcResponse.html>`_ object. |
| """ |
| __tracebackhide__ = True |
| expect_error = _popget(kw, 'expect_error', False) |
| expect_stderr = _popget(kw, 'expect_stderr', expect_error) |
| cwd = _popget(kw, 'cwd', self.cwd) |
| stdin = _popget(kw, 'stdin', None) |
| printresult = _popget(kw, 'printresult', True) |
| args = list(map(str, args)) |
| assert not kw, ( |
| "Arguments not expected: %s" % ', '.join(kw.keys())) |
| if ' ' in script: |
| assert not args, ( |
| "You cannot give a multi-argument script (%r) " |
| "and arguments (%s)" % (script, args)) |
| script, args = script.split(None, 1) |
| args = shlex.split(args) |
| script = self._find_exe(script) |
| all = [script] + args |
| files_before = self._find_files() |
| proc = subprocess.Popen(all, stdin=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| cwd=cwd, |
| env=self.environ) |
| stdout, stderr = proc.communicate(stdin) |
| files_after = self._find_files() |
| result = ProcResult( |
| self, all, stdin, stdout, stderr, |
| returncode=proc.returncode, |
| files_before=files_before, |
| files_after=files_after) |
| if printresult: |
| print(result) |
| print('-'*40) |
| if not expect_error: |
| result.assert_no_error() |
| if not expect_stderr: |
| result.assert_no_stderr() |
| return result |
| |
| def _find_exe(self, script_name): |
| if self.script_path is None: |
| script_name = os.path.join(self.cwd, script_name) |
| if not os.path.exists(script_name): |
| raise OSError( |
| "Script %s does not exist" % script_name) |
| return script_name |
| for path in self.script_path: |
| fn = os.path.join(path, script_name) |
| if os.path.exists(fn): |
| return fn |
| raise OSError( |
| "Script %s could not be found in %s" |
| % (script_name, ':'.join(self.script_path))) |
| |
| def _find_files(self): |
| result = {} |
| for fn in os.listdir(self.base_path): |
| if self._ignore_file(fn): |
| continue |
| self._find_traverse(fn, result) |
| return result |
| |
| def _ignore_file(self, fn): |
| if fn in self.ignore_paths: |
| return True |
| if self.ignore_hidden and os.path.basename(fn).startswith('.'): |
| return True |
| return False |
| |
| def _find_traverse(self, path, result): |
| full = os.path.join(self.base_path, path) |
| if os.path.isdir(full): |
| result[path] = FoundDir(self.base_path, path) |
| for fn in os.listdir(full): |
| fn = os.path.join(path, fn) |
| if self._ignore_file(fn): |
| continue |
| self._find_traverse(fn, result) |
| else: |
| result[path] = FoundFile(self.base_path, path) |
| |
| def clear(self): |
| """ |
| Delete all the files in the base directory. |
| """ |
| if os.path.exists(self.base_path): |
| shutil.rmtree(self.base_path) |
| os.mkdir(self.base_path) |
| |
| def writefile(self, path, content=None, |
| frompath=None): |
| """ |
| Write a file to the given path. If ``content`` is given then |
| that text is written, otherwise the file in ``frompath`` is |
| used. ``frompath`` is relative to ``self.template_path`` |
| """ |
| full = os.path.join(self.base_path, path) |
| if not os.path.exists(os.path.dirname(full)): |
| os.makedirs(os.path.dirname(full)) |
| f = open(full, 'wb') |
| if content is not None: |
| f.write(content) |
| if frompath is not None: |
| if self.template_path: |
| frompath = os.path.join(self.template_path, frompath) |
| f2 = open(frompath, 'rb') |
| f.write(f2.read()) |
| f2.close() |
| f.close() |
| return FoundFile(self.base_path, path) |
| |
| class ProcResult(object): |
| |
| """ |
| Represents the results of running a command in |
| `TestFileEnvironment |
| <class-paste.fixture.TestFileEnvironment.html>`_. |
| |
| Attributes to pay particular attention to: |
| |
| ``stdout``, ``stderr``: |
| What is produced |
| |
| ``files_created``, ``files_deleted``, ``files_updated``: |
| Dictionaries mapping filenames (relative to the ``base_dir``) |
| to `FoundFile <class-paste.fixture.FoundFile.html>`_ or |
| `FoundDir <class-paste.fixture.FoundDir.html>`_ objects. |
| """ |
| |
| def __init__(self, test_env, args, stdin, stdout, stderr, |
| returncode, files_before, files_after): |
| self.test_env = test_env |
| self.args = args |
| self.stdin = stdin |
| self.stdout = stdout |
| self.stderr = stderr |
| self.returncode = returncode |
| self.files_before = files_before |
| self.files_after = files_after |
| self.files_deleted = {} |
| self.files_updated = {} |
| self.files_created = files_after.copy() |
| for path, f in files_before.items(): |
| if path not in files_after: |
| self.files_deleted[path] = f |
| continue |
| del self.files_created[path] |
| if f.mtime < files_after[path].mtime: |
| self.files_updated[path] = files_after[path] |
| |
| def assert_no_error(self): |
| __tracebackhide__ = True |
| assert self.returncode == 0, ( |
| "Script returned code: %s" % self.returncode) |
| |
| def assert_no_stderr(self): |
| __tracebackhide__ = True |
| if self.stderr: |
| print('Error output:') |
| print(self.stderr) |
| raise AssertionError("stderr output not expected") |
| |
| def __str__(self): |
| s = ['Script result: %s' % ' '.join(self.args)] |
| if self.returncode: |
| s.append(' return code: %s' % self.returncode) |
| if self.stderr: |
| s.append('-- stderr: --------------------') |
| s.append(self.stderr) |
| if self.stdout: |
| s.append('-- stdout: --------------------') |
| s.append(self.stdout) |
| for name, files, show_size in [ |
| ('created', self.files_created, True), |
| ('deleted', self.files_deleted, True), |
| ('updated', self.files_updated, True)]: |
| if files: |
| s.append('-- %s: -------------------' % name) |
| files = files.items() |
| files.sort() |
| last = '' |
| for path, f in files: |
| t = ' %s' % _space_prefix(last, path, indent=4, |
| include_sep=False) |
| last = path |
| if show_size and f.size != 'N/A': |
| t += ' (%s bytes)' % f.size |
| s.append(t) |
| return '\n'.join(s) |
| |
| class FoundFile(object): |
| |
| """ |
| Represents a single file found as the result of a command. |
| |
| Has attributes: |
| |
| ``path``: |
| The path of the file, relative to the ``base_path`` |
| |
| ``full``: |
| The full path |
| |
| ``stat``: |
| The results of ``os.stat``. Also ``mtime`` and ``size`` |
| contain the ``.st_mtime`` and ``st_size`` of the stat. |
| |
| ``bytes``: |
| The contents of the file. |
| |
| You may use the ``in`` operator with these objects (tested against |
| the contents of the file), and the ``.mustcontain()`` method. |
| """ |
| |
| file = True |
| dir = False |
| |
| def __init__(self, base_path, path): |
| self.base_path = base_path |
| self.path = path |
| self.full = os.path.join(base_path, path) |
| self.stat = os.stat(self.full) |
| self.mtime = self.stat.st_mtime |
| self.size = self.stat.st_size |
| self._bytes = None |
| |
| def bytes__get(self): |
| if self._bytes is None: |
| f = open(self.full, 'rb') |
| self._bytes = f.read() |
| f.close() |
| return self._bytes |
| bytes = property(bytes__get) |
| |
| def __contains__(self, s): |
| return s in self.bytes |
| |
| def mustcontain(self, s): |
| __tracebackhide__ = True |
| bytes_ = self.bytes |
| if s not in bytes_: |
| print('Could not find %r in:' % s) |
| print(bytes_) |
| assert s in bytes_ |
| |
| def __repr__(self): |
| return '<%s %s:%s>' % ( |
| self.__class__.__name__, |
| self.base_path, self.path) |
| |
| class FoundDir(object): |
| |
| """ |
| Represents a directory created by a command. |
| """ |
| |
| file = False |
| dir = True |
| |
| def __init__(self, base_path, path): |
| self.base_path = base_path |
| self.path = path |
| self.full = os.path.join(base_path, path) |
| self.size = 'N/A' |
| self.mtime = 'N/A' |
| |
| def __repr__(self): |
| return '<%s %s:%s>' % ( |
| self.__class__.__name__, |
| self.base_path, self.path) |
| |
| def _popget(d, key, default=None): |
| """ |
| Pop the key if found (else return default) |
| """ |
| if key in d: |
| return d.pop(key) |
| return default |
| |
| def _space_prefix(pref, full, sep=None, indent=None, include_sep=True): |
| """ |
| Anything shared by pref and full will be replaced with spaces |
| in full, and full returned. |
| """ |
| if sep is None: |
| sep = os.path.sep |
| pref = pref.split(sep) |
| full = full.split(sep) |
| padding = [] |
| while pref and full and pref[0] == full[0]: |
| if indent is None: |
| padding.append(' ' * (len(full[0]) + len(sep))) |
| else: |
| padding.append(' ' * indent) |
| full.pop(0) |
| pref.pop(0) |
| if padding: |
| if include_sep: |
| return ''.join(padding) + sep + sep.join(full) |
| else: |
| return ''.join(padding) + sep.join(full) |
| else: |
| return sep.join(full) |
| |
| def _make_pattern(pat): |
| if pat is None: |
| return None |
| if isinstance(pat, (six.binary_type, six.text_type)): |
| pat = re.compile(pat) |
| if hasattr(pat, 'search'): |
| return pat.search |
| if callable(pat): |
| return pat |
| assert 0, ( |
| "Cannot make callable pattern object out of %r" % pat) |
| |
| def setup_module(module=None): |
| """ |
| This is used by py.test if it is in the module, so you can |
| import this directly. |
| |
| Use like:: |
| |
| from paste.fixture import setup_module |
| """ |
| # Deprecated June 2008 |
| import warnings |
| warnings.warn( |
| 'setup_module is deprecated', |
| DeprecationWarning, 2) |
| if module is None: |
| # The module we were called from must be the module... |
| module = sys._getframe().f_back.f_globals['__name__'] |
| if isinstance(module, (six.binary_type, six.text_type)): |
| module = sys.modules[module] |
| if hasattr(module, 'reset_state'): |
| module.reset_state() |
| |
| def html_unquote(v): |
| """ |
| Unquote (some) entities in HTML. (incomplete) |
| """ |
| for ent, repl in [(' ', ' '), ('>', '>'), |
| ('<', '<'), ('"', '"'), |
| ('&', '&')]: |
| v = v.replace(ent, repl) |
| return v |