| import asyncio |
| import unittest |
| |
| from unittest import mock |
| from . import utils as test_utils |
| |
| |
| class TestPolicy(asyncio.AbstractEventLoopPolicy): |
| |
| def __init__(self, loop_factory): |
| self.loop_factory = loop_factory |
| self.loop = None |
| |
| def get_event_loop(self): |
| # shouldn't ever be called by asyncio.run() |
| raise RuntimeError |
| |
| def new_event_loop(self): |
| return self.loop_factory() |
| |
| def set_event_loop(self, loop): |
| if loop is not None: |
| # we want to check if the loop is closed |
| # in BaseTest.tearDown |
| self.loop = loop |
| |
| |
| class BaseTest(unittest.TestCase): |
| |
| def new_loop(self): |
| loop = asyncio.BaseEventLoop() |
| loop._process_events = mock.Mock() |
| loop._selector = mock.Mock() |
| loop._selector.select.return_value = () |
| loop.shutdown_ag_run = False |
| |
| async def shutdown_asyncgens(): |
| loop.shutdown_ag_run = True |
| loop.shutdown_asyncgens = shutdown_asyncgens |
| |
| return loop |
| |
| def setUp(self): |
| super().setUp() |
| |
| policy = TestPolicy(self.new_loop) |
| asyncio.set_event_loop_policy(policy) |
| |
| def tearDown(self): |
| policy = asyncio.get_event_loop_policy() |
| if policy.loop is not None: |
| self.assertTrue(policy.loop.is_closed()) |
| self.assertTrue(policy.loop.shutdown_ag_run) |
| |
| asyncio.set_event_loop_policy(None) |
| super().tearDown() |
| |
| |
| class RunTests(BaseTest): |
| |
| def test_asyncio_run_return(self): |
| async def main(): |
| await asyncio.sleep(0) |
| return 42 |
| |
| self.assertEqual(asyncio.run(main()), 42) |
| |
| def test_asyncio_run_raises(self): |
| async def main(): |
| await asyncio.sleep(0) |
| raise ValueError('spam') |
| |
| with self.assertRaisesRegex(ValueError, 'spam'): |
| asyncio.run(main()) |
| |
| def test_asyncio_run_only_coro(self): |
| for o in {1, lambda: None}: |
| with self.subTest(obj=o), \ |
| self.assertRaisesRegex(ValueError, |
| 'a coroutine was expected'): |
| asyncio.run(o) |
| |
| def test_asyncio_run_debug(self): |
| async def main(expected): |
| loop = asyncio.get_event_loop() |
| self.assertIs(loop.get_debug(), expected) |
| |
| asyncio.run(main(False)) |
| asyncio.run(main(True), debug=True) |
| |
| def test_asyncio_run_from_running_loop(self): |
| async def main(): |
| coro = main() |
| try: |
| asyncio.run(coro) |
| finally: |
| coro.close() # Suppress ResourceWarning |
| |
| with self.assertRaisesRegex(RuntimeError, |
| 'cannot be called from a running'): |
| asyncio.run(main()) |
| |
| def test_asyncio_run_cancels_hanging_tasks(self): |
| lo_task = None |
| |
| async def leftover(): |
| await asyncio.sleep(0.1) |
| |
| async def main(): |
| nonlocal lo_task |
| lo_task = asyncio.create_task(leftover()) |
| return 123 |
| |
| self.assertEqual(asyncio.run(main()), 123) |
| self.assertTrue(lo_task.done()) |
| |
| def test_asyncio_run_reports_hanging_tasks_errors(self): |
| lo_task = None |
| call_exc_handler_mock = mock.Mock() |
| |
| async def leftover(): |
| try: |
| await asyncio.sleep(0.1) |
| except asyncio.CancelledError: |
| 1 / 0 |
| |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.call_exception_handler = call_exc_handler_mock |
| |
| nonlocal lo_task |
| lo_task = asyncio.create_task(leftover()) |
| return 123 |
| |
| self.assertEqual(asyncio.run(main()), 123) |
| self.assertTrue(lo_task.done()) |
| |
| call_exc_handler_mock.assert_called_with({ |
| 'message': test_utils.MockPattern(r'asyncio.run.*shutdown'), |
| 'task': lo_task, |
| 'exception': test_utils.MockInstanceOf(ZeroDivisionError) |
| }) |
| |
| def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self): |
| spinner = None |
| lazyboy = None |
| |
| class FancyExit(Exception): |
| pass |
| |
| async def fidget(): |
| while True: |
| yield 1 |
| await asyncio.sleep(1) |
| |
| async def spin(): |
| nonlocal spinner |
| spinner = fidget() |
| try: |
| async for the_meaning_of_life in spinner: # NoQA |
| pass |
| except asyncio.CancelledError: |
| 1 / 0 |
| |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.call_exception_handler = mock.Mock() |
| |
| nonlocal lazyboy |
| lazyboy = asyncio.create_task(spin()) |
| raise FancyExit |
| |
| with self.assertRaises(FancyExit): |
| asyncio.run(main()) |
| |
| self.assertTrue(lazyboy.done()) |
| |
| self.assertIsNone(spinner.ag_frame) |
| self.assertFalse(spinner.ag_running) |