| import _thread |
| import asyncio |
| import contextvars |
| import gc |
| import re |
| import signal |
| import threading |
| import unittest |
| |
| from unittest import mock |
| from unittest.mock import patch |
| from test.test_asyncio import utils as test_utils |
| |
| |
| def tearDownModule(): |
| asyncio.set_event_loop_policy(None) |
| |
| |
| def interrupt_self(): |
| _thread.interrupt_main() |
| |
| |
| class TestPolicy(asyncio.AbstractEventLoopPolicy): |
| |
| def __init__(self, loop_factory): |
| self.loop_factory = loop_factory |
| self.loop = None |
| |
| def get_event_loop(self): |
| # shouldn't ever be called by asyncio.run() |
| raise RuntimeError |
| |
| def new_event_loop(self): |
| return self.loop_factory() |
| |
| def set_event_loop(self, loop): |
| if loop is not None: |
| # we want to check if the loop is closed |
| # in BaseTest.tearDown |
| self.loop = loop |
| |
| |
| class BaseTest(unittest.TestCase): |
| |
| def new_loop(self): |
| loop = asyncio.BaseEventLoop() |
| loop._process_events = mock.Mock() |
| loop._selector = mock.Mock() |
| loop._selector.select.return_value = () |
| loop.shutdown_ag_run = False |
| |
| async def shutdown_asyncgens(): |
| loop.shutdown_ag_run = True |
| loop.shutdown_asyncgens = shutdown_asyncgens |
| |
| return loop |
| |
| def setUp(self): |
| super().setUp() |
| |
| policy = TestPolicy(self.new_loop) |
| asyncio.set_event_loop_policy(policy) |
| |
| def tearDown(self): |
| policy = asyncio.get_event_loop_policy() |
| if policy.loop is not None: |
| self.assertTrue(policy.loop.is_closed()) |
| self.assertTrue(policy.loop.shutdown_ag_run) |
| |
| asyncio.set_event_loop_policy(None) |
| super().tearDown() |
| |
| |
| class RunTests(BaseTest): |
| |
| def test_asyncio_run_return(self): |
| async def main(): |
| await asyncio.sleep(0) |
| return 42 |
| |
| self.assertEqual(asyncio.run(main()), 42) |
| |
| def test_asyncio_run_raises(self): |
| async def main(): |
| await asyncio.sleep(0) |
| raise ValueError('spam') |
| |
| with self.assertRaisesRegex(ValueError, 'spam'): |
| asyncio.run(main()) |
| |
| def test_asyncio_run_only_coro(self): |
| for o in {1, lambda: None}: |
| with self.subTest(obj=o), \ |
| self.assertRaisesRegex(ValueError, |
| 'a coroutine was expected'): |
| asyncio.run(o) |
| |
| def test_asyncio_run_debug(self): |
| async def main(expected): |
| loop = asyncio.get_event_loop() |
| self.assertIs(loop.get_debug(), expected) |
| |
| asyncio.run(main(False)) |
| asyncio.run(main(True), debug=True) |
| with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True): |
| asyncio.run(main(True)) |
| asyncio.run(main(False), debug=False) |
| |
| def test_asyncio_run_from_running_loop(self): |
| async def main(): |
| coro = main() |
| try: |
| asyncio.run(coro) |
| finally: |
| coro.close() # Suppress ResourceWarning |
| |
| with self.assertRaisesRegex(RuntimeError, |
| 'cannot be called from a running'): |
| asyncio.run(main()) |
| |
| def test_asyncio_run_cancels_hanging_tasks(self): |
| lo_task = None |
| |
| async def leftover(): |
| await asyncio.sleep(0.1) |
| |
| async def main(): |
| nonlocal lo_task |
| lo_task = asyncio.create_task(leftover()) |
| return 123 |
| |
| self.assertEqual(asyncio.run(main()), 123) |
| self.assertTrue(lo_task.done()) |
| |
| def test_asyncio_run_reports_hanging_tasks_errors(self): |
| lo_task = None |
| call_exc_handler_mock = mock.Mock() |
| |
| async def leftover(): |
| try: |
| await asyncio.sleep(0.1) |
| except asyncio.CancelledError: |
| 1 / 0 |
| |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.call_exception_handler = call_exc_handler_mock |
| |
| nonlocal lo_task |
| lo_task = asyncio.create_task(leftover()) |
| return 123 |
| |
| self.assertEqual(asyncio.run(main()), 123) |
| self.assertTrue(lo_task.done()) |
| |
| call_exc_handler_mock.assert_called_with({ |
| 'message': test_utils.MockPattern(r'asyncio.run.*shutdown'), |
| 'task': lo_task, |
| 'exception': test_utils.MockInstanceOf(ZeroDivisionError) |
| }) |
| |
| def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self): |
| spinner = None |
| lazyboy = None |
| |
| class FancyExit(Exception): |
| pass |
| |
| async def fidget(): |
| while True: |
| yield 1 |
| await asyncio.sleep(1) |
| |
| async def spin(): |
| nonlocal spinner |
| spinner = fidget() |
| try: |
| async for the_meaning_of_life in spinner: # NoQA |
| pass |
| except asyncio.CancelledError: |
| 1 / 0 |
| |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.call_exception_handler = mock.Mock() |
| |
| nonlocal lazyboy |
| lazyboy = asyncio.create_task(spin()) |
| raise FancyExit |
| |
| with self.assertRaises(FancyExit): |
| asyncio.run(main()) |
| |
| self.assertTrue(lazyboy.done()) |
| |
| self.assertIsNone(spinner.ag_frame) |
| self.assertFalse(spinner.ag_running) |
| |
| |
| class RunnerTests(BaseTest): |
| |
| def test_non_debug(self): |
| with asyncio.Runner(debug=False) as runner: |
| self.assertFalse(runner.get_loop().get_debug()) |
| |
| def test_debug(self): |
| with asyncio.Runner(debug=True) as runner: |
| self.assertTrue(runner.get_loop().get_debug()) |
| |
| def test_custom_factory(self): |
| loop = mock.Mock() |
| with asyncio.Runner(loop_factory=lambda: loop) as runner: |
| self.assertIs(runner.get_loop(), loop) |
| |
| def test_run(self): |
| async def f(): |
| await asyncio.sleep(0) |
| return 'done' |
| |
| with asyncio.Runner() as runner: |
| self.assertEqual('done', runner.run(f())) |
| loop = runner.get_loop() |
| |
| with self.assertRaisesRegex( |
| RuntimeError, |
| "Runner is closed" |
| ): |
| runner.get_loop() |
| |
| self.assertTrue(loop.is_closed()) |
| |
| def test_run_non_coro(self): |
| with asyncio.Runner() as runner: |
| with self.assertRaisesRegex( |
| ValueError, |
| "a coroutine was expected" |
| ): |
| runner.run(123) |
| |
| def test_run_future(self): |
| with asyncio.Runner() as runner: |
| with self.assertRaisesRegex( |
| ValueError, |
| "a coroutine was expected" |
| ): |
| fut = runner.get_loop().create_future() |
| runner.run(fut) |
| |
| def test_explicit_close(self): |
| runner = asyncio.Runner() |
| loop = runner.get_loop() |
| runner.close() |
| with self.assertRaisesRegex( |
| RuntimeError, |
| "Runner is closed" |
| ): |
| runner.get_loop() |
| |
| self.assertTrue(loop.is_closed()) |
| |
| def test_double_close(self): |
| runner = asyncio.Runner() |
| loop = runner.get_loop() |
| |
| runner.close() |
| self.assertTrue(loop.is_closed()) |
| |
| # the second call is no-op |
| runner.close() |
| self.assertTrue(loop.is_closed()) |
| |
| def test_second_with_block_raises(self): |
| ret = [] |
| |
| async def f(arg): |
| ret.append(arg) |
| |
| runner = asyncio.Runner() |
| with runner: |
| runner.run(f(1)) |
| |
| with self.assertRaisesRegex( |
| RuntimeError, |
| "Runner is closed" |
| ): |
| with runner: |
| runner.run(f(2)) |
| |
| self.assertEqual([1], ret) |
| |
| def test_run_keeps_context(self): |
| cvar = contextvars.ContextVar("cvar", default=-1) |
| |
| async def f(val): |
| old = cvar.get() |
| await asyncio.sleep(0) |
| cvar.set(val) |
| return old |
| |
| async def get_context(): |
| return contextvars.copy_context() |
| |
| with asyncio.Runner() as runner: |
| self.assertEqual(-1, runner.run(f(1))) |
| self.assertEqual(1, runner.run(f(2))) |
| |
| self.assertEqual(2, runner.run(get_context()).get(cvar)) |
| |
| def test_recursive_run(self): |
| async def g(): |
| pass |
| |
| async def f(): |
| runner.run(g()) |
| |
| with asyncio.Runner() as runner: |
| with self.assertWarnsRegex( |
| RuntimeWarning, |
| "coroutine .+ was never awaited", |
| ): |
| with self.assertRaisesRegex( |
| RuntimeError, |
| re.escape( |
| "Runner.run() cannot be called from a running event loop" |
| ), |
| ): |
| runner.run(f()) |
| |
| def test_interrupt_call_soon(self): |
| # The only case when task is not suspended by waiting a future |
| # or another task |
| assert threading.current_thread() is threading.main_thread() |
| |
| async def coro(): |
| with self.assertRaises(asyncio.CancelledError): |
| while True: |
| await asyncio.sleep(0) |
| raise asyncio.CancelledError() |
| |
| with asyncio.Runner() as runner: |
| runner.get_loop().call_later(0.1, interrupt_self) |
| with self.assertRaises(KeyboardInterrupt): |
| runner.run(coro()) |
| |
| def test_interrupt_wait(self): |
| # interrupting when waiting a future cancels both future and main task |
| assert threading.current_thread() is threading.main_thread() |
| |
| async def coro(fut): |
| with self.assertRaises(asyncio.CancelledError): |
| await fut |
| raise asyncio.CancelledError() |
| |
| with asyncio.Runner() as runner: |
| fut = runner.get_loop().create_future() |
| runner.get_loop().call_later(0.1, interrupt_self) |
| |
| with self.assertRaises(KeyboardInterrupt): |
| runner.run(coro(fut)) |
| |
| self.assertTrue(fut.cancelled()) |
| |
| def test_interrupt_cancelled_task(self): |
| # interrupting cancelled main task doesn't raise KeyboardInterrupt |
| assert threading.current_thread() is threading.main_thread() |
| |
| async def subtask(task): |
| await asyncio.sleep(0) |
| task.cancel() |
| interrupt_self() |
| |
| async def coro(): |
| asyncio.create_task(subtask(asyncio.current_task())) |
| await asyncio.sleep(10) |
| |
| with asyncio.Runner() as runner: |
| with self.assertRaises(asyncio.CancelledError): |
| runner.run(coro()) |
| |
| def test_signal_install_not_supported_ok(self): |
| # signal.signal() can throw if the "main thread" doensn't have signals enabled |
| assert threading.current_thread() is threading.main_thread() |
| |
| async def coro(): |
| pass |
| |
| with asyncio.Runner() as runner: |
| with patch.object( |
| signal, |
| "signal", |
| side_effect=ValueError( |
| "signal only works in main thread of the main interpreter" |
| ) |
| ): |
| runner.run(coro()) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |