| # (c) 2005 Ian Bicking, Clark C. Evans and contributors |
| # This module is part of the Python Paste Project and is released under |
| # the MIT License: http://www.opensource.org/licenses/mit-license.php |
| import time |
| import random |
| import os |
| import tempfile |
| try: |
| # Python 3 |
| from email.utils import parsedate_tz, mktime_tz |
| except ImportError: |
| # Python 2 |
| from rfc822 import parsedate_tz, mktime_tz |
| import six |
| |
| from paste import fileapp |
| from paste.fileapp import * |
| from paste.fixture import * |
| |
| # NOTE(haypo): don't use string.letters because the order of lower and upper |
| # case letters changes when locale.setlocale() is called for the first time |
| LETTERS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' |
| |
| def test_data(): |
| harness = TestApp(DataApp(b'mycontent')) |
| res = harness.get("/") |
| assert 'application/octet-stream' == res.header('content-type') |
| assert '9' == res.header('content-length') |
| assert "<Response 200 OK 'mycontent'>" == repr(res) |
| harness.app.set_content(b"bingles") |
| assert "<Response 200 OK 'bingles'>" == repr(harness.get("/")) |
| |
| def test_cache(): |
| def build(*args,**kwargs): |
| app = DataApp(b"SomeContent") |
| app.cache_control(*args,**kwargs) |
| return TestApp(app).get("/") |
| res = build() |
| assert 'public' == res.header('cache-control') |
| assert not res.header('expires',None) |
| res = build(private=True) |
| assert 'private' == res.header('cache-control') |
| assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time() |
| res = build(no_cache=True) |
| assert 'no-cache' == res.header('cache-control') |
| assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time() |
| res = build(max_age=60,s_maxage=30) |
| assert 'public, max-age=60, s-maxage=30' == res.header('cache-control') |
| expires = mktime_tz(parsedate_tz(res.header('expires'))) |
| assert expires > time.time()+58 and expires < time.time()+61 |
| res = build(private=True, max_age=60, no_transform=True, no_store=True) |
| assert 'private, no-store, no-transform, max-age=60' == \ |
| res.header('cache-control') |
| expires = mktime_tz(parsedate_tz(res.header('expires'))) |
| assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time() |
| |
| def test_disposition(): |
| def build(*args,**kwargs): |
| app = DataApp(b"SomeContent") |
| app.content_disposition(*args,**kwargs) |
| return TestApp(app).get("/") |
| res = build() |
| assert 'attachment' == res.header('content-disposition') |
| assert 'application/octet-stream' == res.header('content-type') |
| res = build(filename="bing.txt") |
| assert 'attachment; filename="bing.txt"' == \ |
| res.header('content-disposition') |
| assert 'text/plain' == res.header('content-type') |
| res = build(inline=True) |
| assert 'inline' == res.header('content-disposition') |
| assert 'application/octet-stream' == res.header('content-type') |
| res = build(inline=True, filename="/some/path/bing.txt") |
| assert 'inline; filename="bing.txt"' == \ |
| res.header('content-disposition') |
| assert 'text/plain' == res.header('content-type') |
| try: |
| res = build(inline=True,attachment=True) |
| except AssertionError: |
| pass |
| else: |
| assert False, "should be an exception" |
| |
| def test_modified(): |
| harness = TestApp(DataApp(b'mycontent')) |
| res = harness.get("/") |
| assert "<Response 200 OK 'mycontent'>" == repr(res) |
| last_modified = res.header('last-modified') |
| res = harness.get("/",headers={'if-modified-since': last_modified}) |
| assert "<Response 304 Not Modified ''>" == repr(res) |
| res = harness.get("/",headers={'if-modified-since': last_modified + \ |
| '; length=1506'}) |
| assert "<Response 304 Not Modified ''>" == repr(res) |
| res = harness.get("/",status=400, |
| headers={'if-modified-since': 'garbage'}) |
| assert 400 == res.status and b"ill-formed timestamp" in res.body |
| res = harness.get("/",status=400, |
| headers={'if-modified-since': |
| 'Thu, 22 Dec 2030 01:01:01 GMT'}) |
| assert 400 == res.status and b"check your system clock" in res.body |
| |
| def test_file(): |
| tempfile = "test_fileapp.%s.txt" % (random.random()) |
| content = LETTERS * 20 |
| if six.PY3: |
| content = content.encode('utf8') |
| with open(tempfile, "wb") as fp: |
| fp.write(content) |
| try: |
| app = fileapp.FileApp(tempfile) |
| res = TestApp(app).get("/") |
| assert len(content) == int(res.header('content-length')) |
| assert 'text/plain' == res.header('content-type') |
| assert content == res.body |
| assert content == app.content # this is cashed |
| lastmod = res.header('last-modified') |
| print("updating", tempfile) |
| file = open(tempfile,"a+") |
| file.write("0123456789") |
| file.close() |
| res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'}) |
| assert len(content)+10 == int(res.header('content-length')) |
| assert 'text/plain' == res.header('content-type') |
| assert content + b"0123456789" == res.body |
| assert app.content # we are still cached |
| file = open(tempfile,"a+") |
| file.write("X" * fileapp.CACHE_SIZE) # exceed the cashe size |
| file.write("YZ") |
| file.close() |
| res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'}) |
| newsize = fileapp.CACHE_SIZE + len(content)+12 |
| assert newsize == int(res.header('content-length')) |
| assert newsize == len(res.body) |
| assert res.body.startswith(content) and res.body.endswith(b'XYZ') |
| assert not app.content # we are no longer cached |
| finally: |
| os.unlink(tempfile) |
| |
| def test_dir(): |
| tmpdir = tempfile.mkdtemp() |
| try: |
| tmpfile = os.path.join(tmpdir, 'file') |
| tmpsubdir = os.path.join(tmpdir, 'dir') |
| fp = open(tmpfile, 'w') |
| fp.write('abcd') |
| fp.close() |
| os.mkdir(tmpsubdir) |
| try: |
| app = fileapp.DirectoryApp(tmpdir) |
| for path in ['/', '', '//', '/..', '/.', '/../..']: |
| assert TestApp(app).get(path, status=403).status == 403, ValueError(path) |
| for path in ['/~', '/foo', '/dir', '/dir/']: |
| assert TestApp(app).get(path, status=404).status == 404, ValueError(path) |
| assert TestApp(app).get('/file').body == b'abcd' |
| finally: |
| os.remove(tmpfile) |
| os.rmdir(tmpsubdir) |
| finally: |
| os.rmdir(tmpdir) |
| |
| def _excercize_range(build,content): |
| # full content request, but using ranges' |
| res = build("bytes=0-%d" % (len(content)-1)) |
| assert res.header('accept-ranges') == 'bytes' |
| assert res.body == content |
| assert res.header('content-length') == str(len(content)) |
| res = build("bytes=-%d" % (len(content)-1)) |
| assert res.body == content |
| assert res.header('content-length') == str(len(content)) |
| res = build("bytes=0-") |
| assert res.body == content |
| assert res.header('content-length') == str(len(content)) |
| # partial content requests |
| res = build("bytes=0-9", status=206) |
| assert res.body == content[:10] |
| assert res.header('content-length') == '10' |
| res = build("bytes=%d-" % (len(content)-1), status=206) |
| assert res.body == b'Z' |
| assert res.header('content-length') == '1' |
| res = build("bytes=%d-%d" % (3,17), status=206) |
| assert res.body == content[3:18] |
| assert res.header('content-length') == '15' |
| |
| def test_range(): |
| content = LETTERS * 5 |
| if six.PY3: |
| content = content.encode('utf8') |
| def build(range, status=206): |
| app = DataApp(content) |
| return TestApp(app).get("/",headers={'Range': range}, status=status) |
| _excercize_range(build,content) |
| build('bytes=0-%d' % (len(content)+1), 416) |
| |
| def test_file_range(): |
| tempfile = "test_fileapp.%s.txt" % (random.random()) |
| content = LETTERS * (1+(fileapp.CACHE_SIZE // len(LETTERS))) |
| if six.PY3: |
| content = content.encode('utf8') |
| assert len(content) > fileapp.CACHE_SIZE |
| with open(tempfile, "wb") as fp: |
| fp.write(content) |
| try: |
| def build(range, status=206): |
| app = fileapp.FileApp(tempfile) |
| return TestApp(app).get("/",headers={'Range': range}, |
| status=status) |
| _excercize_range(build,content) |
| for size in (13,len(LETTERS), len(LETTERS)-1): |
| fileapp.BLOCK_SIZE = size |
| _excercize_range(build,content) |
| finally: |
| os.unlink(tempfile) |
| |
| def test_file_cache(): |
| filename = os.path.join(os.path.dirname(__file__), |
| 'urlparser_data', 'secured.txt') |
| app = TestApp(fileapp.FileApp(filename)) |
| res = app.get('/') |
| etag = res.header('ETag') |
| last_mod = res.header('Last-Modified') |
| res = app.get('/', headers={'If-Modified-Since': last_mod}, |
| status=304) |
| res = app.get('/', headers={'If-None-Match': etag}, |
| status=304) |
| res = app.get('/', headers={'If-None-Match': 'asdf'}, |
| status=200) |
| res = app.get('/', headers={'If-Modified-Since': 'Sat, 1 Jan 2005 12:00:00 GMT'}, |
| status=200) |
| res = app.get('/', headers={'If-Modified-Since': last_mod + '; length=100'}, |
| status=304) |
| res = app.get('/', headers={'If-Modified-Since': 'invalid date'}, |
| status=400) |
| |
| def test_methods(): |
| filename = os.path.join(os.path.dirname(__file__), |
| 'urlparser_data', 'secured.txt') |
| app = TestApp(fileapp.FileApp(filename)) |
| get_res = app.get('') |
| res = app.get('', extra_environ={'REQUEST_METHOD': 'HEAD'}) |
| assert res.headers == get_res.headers |
| assert not res.body |
| app.post('', status=405) # Method Not Allowed |
| |