| import pytest |
| import asyncio |
| |
| from jinja2 import Template, Environment, DictLoader |
| from jinja2.exceptions import TemplateNotFound, TemplatesNotFound, \ |
| UndefinedError |
| |
| |
| def run(coro): |
| loop = asyncio.get_event_loop() |
| return loop.run_until_complete(coro) |
| |
| |
| def test_basic_async(): |
| t = Template('{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}', |
| enable_async=True) |
| async def func(): |
| return await t.render_async() |
| |
| rv = run(func()) |
| assert rv == '[1][2][3]' |
| |
| |
| def test_await_on_calls(): |
| t = Template('{{ async_func() + normal_func() }}', |
| enable_async=True) |
| |
| async def async_func(): |
| return 42 |
| |
| def normal_func(): |
| return 23 |
| |
| async def func(): |
| return await t.render_async( |
| async_func=async_func, |
| normal_func=normal_func |
| ) |
| |
| rv = run(func()) |
| assert rv == '65' |
| |
| |
| def test_await_on_calls_normal_render(): |
| t = Template('{{ async_func() + normal_func() }}', |
| enable_async=True) |
| |
| async def async_func(): |
| return 42 |
| |
| def normal_func(): |
| return 23 |
| |
| rv = t.render( |
| async_func=async_func, |
| normal_func=normal_func |
| ) |
| |
| assert rv == '65' |
| |
| |
| def test_await_and_macros(): |
| t = Template('{% macro foo(x) %}[{{ x }}][{{ async_func() }}]' |
| '{% endmacro %}{{ foo(42) }}', enable_async=True) |
| |
| async def async_func(): |
| return 42 |
| |
| async def func(): |
| return await t.render_async(async_func=async_func) |
| |
| rv = run(func()) |
| assert rv == '[42][42]' |
| |
| |
| def test_async_blocks(): |
| t = Template('{% block foo %}<Test>{% endblock %}{{ self.foo() }}', |
| enable_async=True, autoescape=True) |
| async def func(): |
| return await t.render_async() |
| |
| rv = run(func()) |
| assert rv == '<Test><Test>' |
| |
| |
| def test_async_generate(): |
| t = Template('{% for x in [1, 2, 3] %}{{ x }}{% endfor %}', |
| enable_async=True) |
| rv = list(t.generate()) |
| assert rv == ['1', '2', '3'] |
| |
| |
| def test_async_iteration_in_templates(): |
| t = Template('{% for x in rng %}{{ x }}{% endfor %}', |
| enable_async=True) |
| async def async_iterator(): |
| for item in [1, 2, 3]: |
| yield item |
| rv = list(t.generate(rng=async_iterator())) |
| assert rv == ['1', '2', '3'] |
| |
| |
| def test_async_iteration_in_templates_extended(): |
| t = Template('{% for x in rng %}{{ loop.index0 }}/{{ x }}{% endfor %}', |
| enable_async=True) |
| async def async_iterator(): |
| for item in [1, 2, 3]: |
| yield item |
| rv = list(t.generate(rng=async_iterator())) |
| assert rv == ['0/1', '1/2', '2/3'] |
| |
| |
| @pytest.fixture |
| def test_env_async(): |
| env = Environment(loader=DictLoader(dict( |
| module='{% macro test() %}[{{ foo }}|{{ bar }}]{% endmacro %}', |
| header='[{{ foo }}|{{ 23 }}]', |
| o_printer='({{ o }})' |
| )), enable_async=True) |
| env.globals['bar'] = 23 |
| return env |
| |
| |
| @pytest.mark.imports |
| class TestAsyncImports(object): |
| |
| def test_context_imports(self, test_env_async): |
| t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}') |
| assert t.render(foo=42) == '[|23]' |
| t = test_env_async.from_string( |
| '{% import "module" as m without context %}{{ m.test() }}' |
| ) |
| assert t.render(foo=42) == '[|23]' |
| t = test_env_async.from_string( |
| '{% import "module" as m with context %}{{ m.test() }}' |
| ) |
| assert t.render(foo=42) == '[42|23]' |
| t = test_env_async.from_string('{% from "module" import test %}{{ test() }}') |
| assert t.render(foo=42) == '[|23]' |
| t = test_env_async.from_string( |
| '{% from "module" import test without context %}{{ test() }}' |
| ) |
| assert t.render(foo=42) == '[|23]' |
| t = test_env_async.from_string( |
| '{% from "module" import test with context %}{{ test() }}' |
| ) |
| assert t.render(foo=42) == '[42|23]' |
| |
| def test_trailing_comma(self, test_env_async): |
| test_env_async.from_string('{% from "foo" import bar, baz with context %}') |
| test_env_async.from_string('{% from "foo" import bar, baz, with context %}') |
| test_env_async.from_string('{% from "foo" import bar, with context %}') |
| test_env_async.from_string('{% from "foo" import bar, with, context %}') |
| test_env_async.from_string('{% from "foo" import bar, with with context %}') |
| |
| def test_exports(self, test_env_async): |
| m = run(test_env_async.from_string(''' |
| {% macro toplevel() %}...{% endmacro %} |
| {% macro __private() %}...{% endmacro %} |
| {% set variable = 42 %} |
| {% for item in [1] %} |
| {% macro notthere() %}{% endmacro %} |
| {% endfor %} |
| ''')._get_default_module_async()) |
| assert run(m.toplevel()) == '...' |
| assert not hasattr(m, '__missing') |
| assert m.variable == 42 |
| assert not hasattr(m, 'notthere') |
| |
| |
| @pytest.mark.imports |
| @pytest.mark.includes |
| class TestAsyncIncludes(object): |
| |
| def test_context_include(self, test_env_async): |
| t = test_env_async.from_string('{% include "header" %}') |
| assert t.render(foo=42) == '[42|23]' |
| t = test_env_async.from_string('{% include "header" with context %}') |
| assert t.render(foo=42) == '[42|23]' |
| t = test_env_async.from_string('{% include "header" without context %}') |
| assert t.render(foo=42) == '[|23]' |
| |
| def test_choice_includes(self, test_env_async): |
| t = test_env_async.from_string('{% include ["missing", "header"] %}') |
| assert t.render(foo=42) == '[42|23]' |
| |
| t = test_env_async.from_string( |
| '{% include ["missing", "missing2"] ignore missing %}' |
| ) |
| assert t.render(foo=42) == '' |
| |
| t = test_env_async.from_string('{% include ["missing", "missing2"] %}') |
| pytest.raises(TemplateNotFound, t.render) |
| try: |
| t.render() |
| except TemplatesNotFound as e: |
| assert e.templates == ['missing', 'missing2'] |
| assert e.name == 'missing2' |
| else: |
| assert False, 'thou shalt raise' |
| |
| def test_includes(t, **ctx): |
| ctx['foo'] = 42 |
| assert t.render(ctx) == '[42|23]' |
| |
| t = test_env_async.from_string('{% include ["missing", "header"] %}') |
| test_includes(t) |
| t = test_env_async.from_string('{% include x %}') |
| test_includes(t, x=['missing', 'header']) |
| t = test_env_async.from_string('{% include [x, "header"] %}') |
| test_includes(t, x='missing') |
| t = test_env_async.from_string('{% include x %}') |
| test_includes(t, x='header') |
| t = test_env_async.from_string('{% include x %}') |
| test_includes(t, x='header') |
| t = test_env_async.from_string('{% include [x] %}') |
| test_includes(t, x='header') |
| |
| def test_include_ignoring_missing(self, test_env_async): |
| t = test_env_async.from_string('{% include "missing" %}') |
| pytest.raises(TemplateNotFound, t.render) |
| for extra in '', 'with context', 'without context': |
| t = test_env_async.from_string('{% include "missing" ignore missing ' + |
| extra + ' %}') |
| assert t.render() == '' |
| |
| def test_context_include_with_overrides(self, test_env_async): |
| env = Environment(loader=DictLoader(dict( |
| main="{% for item in [1, 2, 3] %}{% include 'item' %}{% endfor %}", |
| item="{{ item }}" |
| ))) |
| assert env.get_template("main").render() == "123" |
| |
| def test_unoptimized_scopes(self, test_env_async): |
| t = test_env_async.from_string(""" |
| {% macro outer(o) %} |
| {% macro inner() %} |
| {% include "o_printer" %} |
| {% endmacro %} |
| {{ inner() }} |
| {% endmacro %} |
| {{ outer("FOO") }} |
| """) |
| assert t.render().strip() == '(FOO)' |
| |
| |
| @pytest.mark.core_tags |
| @pytest.mark.for_loop |
| class TestAsyncForLoop(object): |
| |
| def test_simple(self, test_env_async): |
| tmpl = test_env_async.from_string('{% for item in seq %}{{ item }}{% endfor %}') |
| assert tmpl.render(seq=list(range(10))) == '0123456789' |
| |
| def test_else(self, test_env_async): |
| tmpl = test_env_async.from_string( |
| '{% for item in seq %}XXX{% else %}...{% endfor %}') |
| assert tmpl.render() == '...' |
| |
| def test_empty_blocks(self, test_env_async): |
| tmpl = test_env_async.from_string('<{% for item in seq %}{% else %}{% endfor %}>') |
| assert tmpl.render() == '<>' |
| |
| def test_context_vars(self, test_env_async): |
| slist = [42, 24] |
| for seq in [slist, iter(slist), reversed(slist), (_ for _ in slist)]: |
| tmpl = test_env_async.from_string('''{% for item in seq -%} |
| {{ loop.index }}|{{ loop.index0 }}|{{ loop.revindex }}|{{ |
| loop.revindex0 }}|{{ loop.first }}|{{ loop.last }}|{{ |
| loop.length }}###{% endfor %}''') |
| one, two, _ = tmpl.render(seq=seq).split('###') |
| (one_index, one_index0, one_revindex, one_revindex0, one_first, |
| one_last, one_length) = one.split('|') |
| (two_index, two_index0, two_revindex, two_revindex0, two_first, |
| two_last, two_length) = two.split('|') |
| |
| assert int(one_index) == 1 and int(two_index) == 2 |
| assert int(one_index0) == 0 and int(two_index0) == 1 |
| assert int(one_revindex) == 2 and int(two_revindex) == 1 |
| assert int(one_revindex0) == 1 and int(two_revindex0) == 0 |
| assert one_first == 'True' and two_first == 'False' |
| assert one_last == 'False' and two_last == 'True' |
| assert one_length == two_length == '2' |
| |
| def test_cycling(self, test_env_async): |
| tmpl = test_env_async.from_string('''{% for item in seq %}{{ |
| loop.cycle('<1>', '<2>') }}{% endfor %}{% |
| for item in seq %}{{ loop.cycle(*through) }}{% endfor %}''') |
| output = tmpl.render(seq=list(range(4)), through=('<1>', '<2>')) |
| assert output == '<1><2>' * 4 |
| |
| def test_scope(self, test_env_async): |
| tmpl = test_env_async.from_string('{% for item in seq %}{% endfor %}{{ item }}') |
| output = tmpl.render(seq=list(range(10))) |
| assert not output |
| |
| def test_varlen(self, test_env_async): |
| def inner(): |
| for item in range(5): |
| yield item |
| tmpl = test_env_async.from_string('{% for item in iter %}{{ item }}{% endfor %}') |
| output = tmpl.render(iter=inner()) |
| assert output == '01234' |
| |
| def test_noniter(self, test_env_async): |
| tmpl = test_env_async.from_string('{% for item in none %}...{% endfor %}') |
| pytest.raises(TypeError, tmpl.render) |
| |
| def test_recursive(self, test_env_async): |
| tmpl = test_env_async.from_string('''{% for item in seq recursive -%} |
| [{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] |
| {%- endfor %}''') |
| assert tmpl.render(seq=[ |
| dict(a=1, b=[dict(a=1), dict(a=2)]), |
| dict(a=2, b=[dict(a=1), dict(a=2)]), |
| dict(a=3, b=[dict(a='a')]) |
| ]) == '[1<[1][2]>][2<[1][2]>][3<[a]>]' |
| |
| def test_recursive_depth0(self, test_env_async): |
| tmpl = test_env_async.from_string('''{% for item in seq recursive -%} |
| [{{ loop.depth0 }}:{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] |
| {%- endfor %}''') |
| assert tmpl.render(seq=[ |
| dict(a=1, b=[dict(a=1), dict(a=2)]), |
| dict(a=2, b=[dict(a=1), dict(a=2)]), |
| dict(a=3, b=[dict(a='a')]) |
| ]) == '[0:1<[1:1][1:2]>][0:2<[1:1][1:2]>][0:3<[1:a]>]' |
| |
| def test_recursive_depth(self, test_env_async): |
| tmpl = test_env_async.from_string('''{% for item in seq recursive -%} |
| [{{ loop.depth }}:{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] |
| {%- endfor %}''') |
| assert tmpl.render(seq=[ |
| dict(a=1, b=[dict(a=1), dict(a=2)]), |
| dict(a=2, b=[dict(a=1), dict(a=2)]), |
| dict(a=3, b=[dict(a='a')]) |
| ]) == '[1:1<[2:1][2:2]>][1:2<[2:1][2:2]>][1:3<[2:a]>]' |
| |
| def test_looploop(self, test_env_async): |
| tmpl = test_env_async.from_string('''{% for row in table %} |
| {%- set rowloop = loop -%} |
| {% for cell in row -%} |
| [{{ rowloop.index }}|{{ loop.index }}] |
| {%- endfor %} |
| {%- endfor %}''') |
| assert tmpl.render(table=['ab', 'cd']) == '[1|1][1|2][2|1][2|2]' |
| |
| def test_reversed_bug(self, test_env_async): |
| tmpl = test_env_async.from_string('{% for i in items %}{{ i }}' |
| '{% if not loop.last %}' |
| ',{% endif %}{% endfor %}') |
| assert tmpl.render(items=reversed([3, 2, 1])) == '1,2,3' |
| |
| def test_loop_errors(self, test_env_async): |
| tmpl = test_env_async.from_string('''{% for item in [1] if loop.index |
| == 0 %}...{% endfor %}''') |
| pytest.raises(UndefinedError, tmpl.render) |
| tmpl = test_env_async.from_string('''{% for item in [] %}...{% else |
| %}{{ loop }}{% endfor %}''') |
| assert tmpl.render() == '' |
| |
| def test_loop_filter(self, test_env_async): |
| tmpl = test_env_async.from_string('{% for item in range(10) if item ' |
| 'is even %}[{{ item }}]{% endfor %}') |
| assert tmpl.render() == '[0][2][4][6][8]' |
| tmpl = test_env_async.from_string(''' |
| {%- for item in range(10) if item is even %}[{{ |
| loop.index }}:{{ item }}]{% endfor %}''') |
| assert tmpl.render() == '[1:0][2:2][3:4][4:6][5:8]' |
| |
| def test_scoped_special_var(self, test_env_async): |
| t = test_env_async.from_string( |
| '{% for s in seq %}[{{ loop.first }}{% for c in s %}' |
| '|{{ loop.first }}{% endfor %}]{% endfor %}') |
| assert t.render(seq=('ab', 'cd')) \ |
| == '[True|True|False][False|True|False]' |
| |
| def test_scoped_loop_var(self, test_env_async): |
| t = test_env_async.from_string('{% for x in seq %}{{ loop.first }}' |
| '{% for y in seq %}{% endfor %}{% endfor %}') |
| assert t.render(seq='ab') == 'TrueFalse' |
| t = test_env_async.from_string('{% for x in seq %}{% for y in seq %}' |
| '{{ loop.first }}{% endfor %}{% endfor %}') |
| assert t.render(seq='ab') == 'TrueFalseTrueFalse' |
| |
| def test_recursive_empty_loop_iter(self, test_env_async): |
| t = test_env_async.from_string(''' |
| {%- for item in foo recursive -%}{%- endfor -%} |
| ''') |
| assert t.render(dict(foo=[])) == '' |
| |
| def test_call_in_loop(self, test_env_async): |
| t = test_env_async.from_string(''' |
| {%- macro do_something() -%} |
| [{{ caller() }}] |
| {%- endmacro %} |
| |
| {%- for i in [1, 2, 3] %} |
| {%- call do_something() -%} |
| {{ i }} |
| {%- endcall %} |
| {%- endfor -%} |
| ''') |
| assert t.render() == '[1][2][3]' |
| |
| def test_scoping_bug(self, test_env_async): |
| t = test_env_async.from_string(''' |
| {%- for item in foo %}...{{ item }}...{% endfor %} |
| {%- macro item(a) %}...{{ a }}...{% endmacro %} |
| {{- item(2) -}} |
| ''') |
| assert t.render(foo=(1,)) == '...1......2...' |
| |
| def test_unpacking(self, test_env_async): |
| tmpl = test_env_async.from_string('{% for a, b, c in [[1, 2, 3]] %}' |
| '{{ a }}|{{ b }}|{{ c }}{% endfor %}') |
| assert tmpl.render() == '1|2|3' |
| |
| def test_recursive_loop_filter(self, test_env_async): |
| t = test_env_async.from_string(''' |
| <?xml version="1.0" encoding="UTF-8"?> |
| <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"> |
| {%- for page in [site.root] if page.url != this recursive %} |
| <url><loc>{{ page.url }}</loc></url> |
| {{- loop(page.children) }} |
| {%- endfor %} |
| </urlset> |
| ''') |
| sm =t.render(this='/foo', site={'root': { |
| 'url': '/', |
| 'children': [ |
| {'url': '/foo'}, |
| {'url': '/bar'}, |
| ] |
| }}) |
| lines = [x.strip() for x in sm.splitlines() if x.strip()] |
| assert lines == [ |
| '<?xml version="1.0" encoding="UTF-8"?>', |
| '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">', |
| '<url><loc>/</loc></url>', |
| '<url><loc>/bar</loc></url>', |
| '</urlset>', |
| ] |
| |
| def test_nonrecursive_loop_filter(self, test_env_async): |
| t = test_env_async.from_string(''' |
| <?xml version="1.0" encoding="UTF-8"?> |
| <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"> |
| {%- for page in items if page.url != this %} |
| <url><loc>{{ page.url }}</loc></url> |
| {%- endfor %} |
| </urlset> |
| ''') |
| sm =t.render(this='/foo', items=[ |
| {'url': '/'}, |
| {'url': '/foo'}, |
| {'url': '/bar'}, |
| ]) |
| lines = [x.strip() for x in sm.splitlines() if x.strip()] |
| assert lines == [ |
| '<?xml version="1.0" encoding="UTF-8"?>', |
| '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">', |
| '<url><loc>/</loc></url>', |
| '<url><loc>/bar</loc></url>', |
| '</urlset>', |
| ] |
| |
| def test_bare_async(self, test_env_async): |
| t = test_env_async.from_string('{% extends "header" %}') |
| assert t.render(foo=42) == '[42|23]' |