blob: 8618d8d0cbce149512357b90ebb4b7ae53941c5a [file] [log] [blame]
import pytest
import asyncio
from jinja2 import Template, Environment, DictLoader
from jinja2.exceptions import TemplateNotFound, TemplatesNotFound, \
UndefinedError
def run(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
def test_basic_async():
t = Template('{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}',
enable_async=True)
async def func():
return await t.render_async()
rv = run(func())
assert rv == '[1][2][3]'
def test_await_on_calls():
t = Template('{{ async_func() + normal_func() }}',
enable_async=True)
async def async_func():
return 42
def normal_func():
return 23
async def func():
return await t.render_async(
async_func=async_func,
normal_func=normal_func
)
rv = run(func())
assert rv == '65'
def test_await_on_calls_normal_render():
t = Template('{{ async_func() + normal_func() }}',
enable_async=True)
async def async_func():
return 42
def normal_func():
return 23
rv = t.render(
async_func=async_func,
normal_func=normal_func
)
assert rv == '65'
def test_await_and_macros():
t = Template('{% macro foo(x) %}[{{ x }}][{{ async_func() }}]'
'{% endmacro %}{{ foo(42) }}', enable_async=True)
async def async_func():
return 42
async def func():
return await t.render_async(async_func=async_func)
rv = run(func())
assert rv == '[42][42]'
def test_async_blocks():
t = Template('{% block foo %}<Test>{% endblock %}{{ self.foo() }}',
enable_async=True, autoescape=True)
async def func():
return await t.render_async()
rv = run(func())
assert rv == '<Test><Test>'
def test_async_generate():
t = Template('{% for x in [1, 2, 3] %}{{ x }}{% endfor %}',
enable_async=True)
rv = list(t.generate())
assert rv == ['1', '2', '3']
def test_async_iteration_in_templates():
t = Template('{% for x in rng %}{{ x }}{% endfor %}',
enable_async=True)
async def async_iterator():
for item in [1, 2, 3]:
yield item
rv = list(t.generate(rng=async_iterator()))
assert rv == ['1', '2', '3']
def test_async_iteration_in_templates_extended():
t = Template('{% for x in rng %}{{ loop.index0 }}/{{ x }}{% endfor %}',
enable_async=True)
async def async_iterator():
for item in [1, 2, 3]:
yield item
rv = list(t.generate(rng=async_iterator()))
assert rv == ['0/1', '1/2', '2/3']
@pytest.fixture
def test_env_async():
env = Environment(loader=DictLoader(dict(
module='{% macro test() %}[{{ foo }}|{{ bar }}]{% endmacro %}',
header='[{{ foo }}|{{ 23 }}]',
o_printer='({{ o }})'
)), enable_async=True)
env.globals['bar'] = 23
return env
@pytest.mark.imports
class TestAsyncImports(object):
def test_context_imports(self, test_env_async):
t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
assert t.render(foo=42) == '[|23]'
t = test_env_async.from_string(
'{% import "module" as m without context %}{{ m.test() }}'
)
assert t.render(foo=42) == '[|23]'
t = test_env_async.from_string(
'{% import "module" as m with context %}{{ m.test() }}'
)
assert t.render(foo=42) == '[42|23]'
t = test_env_async.from_string('{% from "module" import test %}{{ test() }}')
assert t.render(foo=42) == '[|23]'
t = test_env_async.from_string(
'{% from "module" import test without context %}{{ test() }}'
)
assert t.render(foo=42) == '[|23]'
t = test_env_async.from_string(
'{% from "module" import test with context %}{{ test() }}'
)
assert t.render(foo=42) == '[42|23]'
def test_trailing_comma(self, test_env_async):
test_env_async.from_string('{% from "foo" import bar, baz with context %}')
test_env_async.from_string('{% from "foo" import bar, baz, with context %}')
test_env_async.from_string('{% from "foo" import bar, with context %}')
test_env_async.from_string('{% from "foo" import bar, with, context %}')
test_env_async.from_string('{% from "foo" import bar, with with context %}')
def test_exports(self, test_env_async):
m = run(test_env_async.from_string('''
{% macro toplevel() %}...{% endmacro %}
{% macro __private() %}...{% endmacro %}
{% set variable = 42 %}
{% for item in [1] %}
{% macro notthere() %}{% endmacro %}
{% endfor %}
''')._get_default_module_async())
assert run(m.toplevel()) == '...'
assert not hasattr(m, '__missing')
assert m.variable == 42
assert not hasattr(m, 'notthere')
@pytest.mark.imports
@pytest.mark.includes
class TestAsyncIncludes(object):
def test_context_include(self, test_env_async):
t = test_env_async.from_string('{% include "header" %}')
assert t.render(foo=42) == '[42|23]'
t = test_env_async.from_string('{% include "header" with context %}')
assert t.render(foo=42) == '[42|23]'
t = test_env_async.from_string('{% include "header" without context %}')
assert t.render(foo=42) == '[|23]'
def test_choice_includes(self, test_env_async):
t = test_env_async.from_string('{% include ["missing", "header"] %}')
assert t.render(foo=42) == '[42|23]'
t = test_env_async.from_string(
'{% include ["missing", "missing2"] ignore missing %}'
)
assert t.render(foo=42) == ''
t = test_env_async.from_string('{% include ["missing", "missing2"] %}')
pytest.raises(TemplateNotFound, t.render)
try:
t.render()
except TemplatesNotFound as e:
assert e.templates == ['missing', 'missing2']
assert e.name == 'missing2'
else:
assert False, 'thou shalt raise'
def test_includes(t, **ctx):
ctx['foo'] = 42
assert t.render(ctx) == '[42|23]'
t = test_env_async.from_string('{% include ["missing", "header"] %}')
test_includes(t)
t = test_env_async.from_string('{% include x %}')
test_includes(t, x=['missing', 'header'])
t = test_env_async.from_string('{% include [x, "header"] %}')
test_includes(t, x='missing')
t = test_env_async.from_string('{% include x %}')
test_includes(t, x='header')
t = test_env_async.from_string('{% include x %}')
test_includes(t, x='header')
t = test_env_async.from_string('{% include [x] %}')
test_includes(t, x='header')
def test_include_ignoring_missing(self, test_env_async):
t = test_env_async.from_string('{% include "missing" %}')
pytest.raises(TemplateNotFound, t.render)
for extra in '', 'with context', 'without context':
t = test_env_async.from_string('{% include "missing" ignore missing ' +
extra + ' %}')
assert t.render() == ''
def test_context_include_with_overrides(self, test_env_async):
env = Environment(loader=DictLoader(dict(
main="{% for item in [1, 2, 3] %}{% include 'item' %}{% endfor %}",
item="{{ item }}"
)))
assert env.get_template("main").render() == "123"
def test_unoptimized_scopes(self, test_env_async):
t = test_env_async.from_string("""
{% macro outer(o) %}
{% macro inner() %}
{% include "o_printer" %}
{% endmacro %}
{{ inner() }}
{% endmacro %}
{{ outer("FOO") }}
""")
assert t.render().strip() == '(FOO)'
@pytest.mark.core_tags
@pytest.mark.for_loop
class TestAsyncForLoop(object):
def test_simple(self, test_env_async):
tmpl = test_env_async.from_string('{% for item in seq %}{{ item }}{% endfor %}')
assert tmpl.render(seq=list(range(10))) == '0123456789'
def test_else(self, test_env_async):
tmpl = test_env_async.from_string(
'{% for item in seq %}XXX{% else %}...{% endfor %}')
assert tmpl.render() == '...'
def test_empty_blocks(self, test_env_async):
tmpl = test_env_async.from_string('<{% for item in seq %}{% else %}{% endfor %}>')
assert tmpl.render() == '<>'
def test_context_vars(self, test_env_async):
slist = [42, 24]
for seq in [slist, iter(slist), reversed(slist), (_ for _ in slist)]:
tmpl = test_env_async.from_string('''{% for item in seq -%}
{{ loop.index }}|{{ loop.index0 }}|{{ loop.revindex }}|{{
loop.revindex0 }}|{{ loop.first }}|{{ loop.last }}|{{
loop.length }}###{% endfor %}''')
one, two, _ = tmpl.render(seq=seq).split('###')
(one_index, one_index0, one_revindex, one_revindex0, one_first,
one_last, one_length) = one.split('|')
(two_index, two_index0, two_revindex, two_revindex0, two_first,
two_last, two_length) = two.split('|')
assert int(one_index) == 1 and int(two_index) == 2
assert int(one_index0) == 0 and int(two_index0) == 1
assert int(one_revindex) == 2 and int(two_revindex) == 1
assert int(one_revindex0) == 1 and int(two_revindex0) == 0
assert one_first == 'True' and two_first == 'False'
assert one_last == 'False' and two_last == 'True'
assert one_length == two_length == '2'
def test_cycling(self, test_env_async):
tmpl = test_env_async.from_string('''{% for item in seq %}{{
loop.cycle('<1>', '<2>') }}{% endfor %}{%
for item in seq %}{{ loop.cycle(*through) }}{% endfor %}''')
output = tmpl.render(seq=list(range(4)), through=('<1>', '<2>'))
assert output == '<1><2>' * 4
def test_scope(self, test_env_async):
tmpl = test_env_async.from_string('{% for item in seq %}{% endfor %}{{ item }}')
output = tmpl.render(seq=list(range(10)))
assert not output
def test_varlen(self, test_env_async):
def inner():
for item in range(5):
yield item
tmpl = test_env_async.from_string('{% for item in iter %}{{ item }}{% endfor %}')
output = tmpl.render(iter=inner())
assert output == '01234'
def test_noniter(self, test_env_async):
tmpl = test_env_async.from_string('{% for item in none %}...{% endfor %}')
pytest.raises(TypeError, tmpl.render)
def test_recursive(self, test_env_async):
tmpl = test_env_async.from_string('''{% for item in seq recursive -%}
[{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
{%- endfor %}''')
assert tmpl.render(seq=[
dict(a=1, b=[dict(a=1), dict(a=2)]),
dict(a=2, b=[dict(a=1), dict(a=2)]),
dict(a=3, b=[dict(a='a')])
]) == '[1<[1][2]>][2<[1][2]>][3<[a]>]'
def test_recursive_depth0(self, test_env_async):
tmpl = test_env_async.from_string('''{% for item in seq recursive -%}
[{{ loop.depth0 }}:{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
{%- endfor %}''')
assert tmpl.render(seq=[
dict(a=1, b=[dict(a=1), dict(a=2)]),
dict(a=2, b=[dict(a=1), dict(a=2)]),
dict(a=3, b=[dict(a='a')])
]) == '[0:1<[1:1][1:2]>][0:2<[1:1][1:2]>][0:3<[1:a]>]'
def test_recursive_depth(self, test_env_async):
tmpl = test_env_async.from_string('''{% for item in seq recursive -%}
[{{ loop.depth }}:{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
{%- endfor %}''')
assert tmpl.render(seq=[
dict(a=1, b=[dict(a=1), dict(a=2)]),
dict(a=2, b=[dict(a=1), dict(a=2)]),
dict(a=3, b=[dict(a='a')])
]) == '[1:1<[2:1][2:2]>][1:2<[2:1][2:2]>][1:3<[2:a]>]'
def test_looploop(self, test_env_async):
tmpl = test_env_async.from_string('''{% for row in table %}
{%- set rowloop = loop -%}
{% for cell in row -%}
[{{ rowloop.index }}|{{ loop.index }}]
{%- endfor %}
{%- endfor %}''')
assert tmpl.render(table=['ab', 'cd']) == '[1|1][1|2][2|1][2|2]'
def test_reversed_bug(self, test_env_async):
tmpl = test_env_async.from_string('{% for i in items %}{{ i }}'
'{% if not loop.last %}'
',{% endif %}{% endfor %}')
assert tmpl.render(items=reversed([3, 2, 1])) == '1,2,3'
def test_loop_errors(self, test_env_async):
tmpl = test_env_async.from_string('''{% for item in [1] if loop.index
== 0 %}...{% endfor %}''')
pytest.raises(UndefinedError, tmpl.render)
tmpl = test_env_async.from_string('''{% for item in [] %}...{% else
%}{{ loop }}{% endfor %}''')
assert tmpl.render() == ''
def test_loop_filter(self, test_env_async):
tmpl = test_env_async.from_string('{% for item in range(10) if item '
'is even %}[{{ item }}]{% endfor %}')
assert tmpl.render() == '[0][2][4][6][8]'
tmpl = test_env_async.from_string('''
{%- for item in range(10) if item is even %}[{{
loop.index }}:{{ item }}]{% endfor %}''')
assert tmpl.render() == '[1:0][2:2][3:4][4:6][5:8]'
def test_scoped_special_var(self, test_env_async):
t = test_env_async.from_string(
'{% for s in seq %}[{{ loop.first }}{% for c in s %}'
'|{{ loop.first }}{% endfor %}]{% endfor %}')
assert t.render(seq=('ab', 'cd')) \
== '[True|True|False][False|True|False]'
def test_scoped_loop_var(self, test_env_async):
t = test_env_async.from_string('{% for x in seq %}{{ loop.first }}'
'{% for y in seq %}{% endfor %}{% endfor %}')
assert t.render(seq='ab') == 'TrueFalse'
t = test_env_async.from_string('{% for x in seq %}{% for y in seq %}'
'{{ loop.first }}{% endfor %}{% endfor %}')
assert t.render(seq='ab') == 'TrueFalseTrueFalse'
def test_recursive_empty_loop_iter(self, test_env_async):
t = test_env_async.from_string('''
{%- for item in foo recursive -%}{%- endfor -%}
''')
assert t.render(dict(foo=[])) == ''
def test_call_in_loop(self, test_env_async):
t = test_env_async.from_string('''
{%- macro do_something() -%}
[{{ caller() }}]
{%- endmacro %}
{%- for i in [1, 2, 3] %}
{%- call do_something() -%}
{{ i }}
{%- endcall %}
{%- endfor -%}
''')
assert t.render() == '[1][2][3]'
def test_scoping_bug(self, test_env_async):
t = test_env_async.from_string('''
{%- for item in foo %}...{{ item }}...{% endfor %}
{%- macro item(a) %}...{{ a }}...{% endmacro %}
{{- item(2) -}}
''')
assert t.render(foo=(1,)) == '...1......2...'
def test_unpacking(self, test_env_async):
tmpl = test_env_async.from_string('{% for a, b, c in [[1, 2, 3]] %}'
'{{ a }}|{{ b }}|{{ c }}{% endfor %}')
assert tmpl.render() == '1|2|3'
def test_recursive_loop_filter(self, test_env_async):
t = test_env_async.from_string('''
<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
{%- for page in [site.root] if page.url != this recursive %}
<url><loc>{{ page.url }}</loc></url>
{{- loop(page.children) }}
{%- endfor %}
</urlset>
''')
sm =t.render(this='/foo', site={'root': {
'url': '/',
'children': [
{'url': '/foo'},
{'url': '/bar'},
]
}})
lines = [x.strip() for x in sm.splitlines() if x.strip()]
assert lines == [
'<?xml version="1.0" encoding="UTF-8"?>',
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
'<url><loc>/</loc></url>',
'<url><loc>/bar</loc></url>',
'</urlset>',
]
def test_nonrecursive_loop_filter(self, test_env_async):
t = test_env_async.from_string('''
<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
{%- for page in items if page.url != this %}
<url><loc>{{ page.url }}</loc></url>
{%- endfor %}
</urlset>
''')
sm =t.render(this='/foo', items=[
{'url': '/'},
{'url': '/foo'},
{'url': '/bar'},
])
lines = [x.strip() for x in sm.splitlines() if x.strip()]
assert lines == [
'<?xml version="1.0" encoding="UTF-8"?>',
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
'<url><loc>/</loc></url>',
'<url><loc>/bar</loc></url>',
'</urlset>',
]
def test_bare_async(self, test_env_async):
t = test_env_async.from_string('{% extends "header" %}')
assert t.render(foo=42) == '[42|23]'