| import asyncio |
| import unittest |
| import time |
| |
| |
| def tearDownModule(): |
| asyncio.set_event_loop_policy(None) |
| |
| |
| # The following value can be used as a very small timeout: |
| # it passes check "timeout > 0", but has almost |
| # no effect on the test performance |
| _EPSILON = 0.0001 |
| |
| |
| class SlowTask: |
| """ Task will run for this defined time, ignoring cancel requests """ |
| TASK_TIMEOUT = 0.2 |
| |
| def __init__(self): |
| self.exited = False |
| |
| async def run(self): |
| exitat = time.monotonic() + self.TASK_TIMEOUT |
| |
| while True: |
| tosleep = exitat - time.monotonic() |
| if tosleep <= 0: |
| break |
| |
| try: |
| await asyncio.sleep(tosleep) |
| except asyncio.CancelledError: |
| pass |
| |
| self.exited = True |
| |
| |
| class AsyncioWaitForTest(unittest.IsolatedAsyncioTestCase): |
| |
| async def test_asyncio_wait_for_cancelled(self): |
| t = SlowTask() |
| |
| waitfortask = asyncio.create_task( |
| asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2)) |
| await asyncio.sleep(0) |
| waitfortask.cancel() |
| await asyncio.wait({waitfortask}) |
| |
| self.assertTrue(t.exited) |
| |
| async def test_asyncio_wait_for_timeout(self): |
| t = SlowTask() |
| |
| try: |
| await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2) |
| except asyncio.TimeoutError: |
| pass |
| |
| self.assertTrue(t.exited) |
| |
| async def test_wait_for_timeout_less_then_0_or_0_future_done(self): |
| loop = asyncio.get_running_loop() |
| |
| fut = loop.create_future() |
| fut.set_result('done') |
| |
| t0 = loop.time() |
| ret = await asyncio.wait_for(fut, 0) |
| t1 = loop.time() |
| |
| self.assertEqual(ret, 'done') |
| self.assertTrue(fut.done()) |
| self.assertLess(t1 - t0, 0.1) |
| |
| async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): |
| loop = asyncio.get_running_loop() |
| |
| foo_started = False |
| |
| async def foo(): |
| nonlocal foo_started |
| foo_started = True |
| |
| with self.assertRaises(asyncio.TimeoutError): |
| t0 = loop.time() |
| await asyncio.wait_for(foo(), 0) |
| t1 = loop.time() |
| |
| self.assertEqual(foo_started, False) |
| self.assertLess(t1 - t0, 0.1) |
| |
| async def test_wait_for_timeout_less_then_0_or_0(self): |
| loop = asyncio.get_running_loop() |
| |
| for timeout in [0, -1]: |
| with self.subTest(timeout=timeout): |
| foo_running = None |
| started = loop.create_future() |
| |
| async def foo(): |
| nonlocal foo_running |
| foo_running = True |
| started.set_result(None) |
| try: |
| await asyncio.sleep(10) |
| finally: |
| foo_running = False |
| return 'done' |
| |
| fut = asyncio.create_task(foo()) |
| await started |
| |
| with self.assertRaises(asyncio.TimeoutError): |
| t0 = loop.time() |
| await asyncio.wait_for(fut, timeout) |
| t1 = loop.time() |
| |
| self.assertTrue(fut.done()) |
| # it should have been cancelled due to the timeout |
| self.assertTrue(fut.cancelled()) |
| self.assertEqual(foo_running, False) |
| self.assertLess(t1 - t0, 0.1) |
| |
| async def test_wait_for(self): |
| loop = asyncio.get_running_loop() |
| foo_running = None |
| |
| async def foo(): |
| nonlocal foo_running |
| foo_running = True |
| try: |
| await asyncio.sleep(10) |
| finally: |
| foo_running = False |
| return 'done' |
| |
| fut = asyncio.create_task(foo()) |
| |
| with self.assertRaises(asyncio.TimeoutError): |
| t0 = loop.time() |
| await asyncio.wait_for(fut, 0.1) |
| t1 = loop.time() |
| self.assertTrue(fut.done()) |
| # it should have been cancelled due to the timeout |
| self.assertTrue(fut.cancelled()) |
| self.assertLess(t1 - t0, 0.2) |
| self.assertEqual(foo_running, False) |
| |
| async def test_wait_for_blocking(self): |
| async def coro(): |
| return 'done' |
| |
| res = await asyncio.wait_for(coro(), timeout=None) |
| self.assertEqual(res, 'done') |
| |
| async def test_wait_for_race_condition(self): |
| loop = asyncio.get_running_loop() |
| |
| fut = loop.create_future() |
| task = asyncio.wait_for(fut, timeout=0.2) |
| loop.call_later(0.1, fut.set_result, "ok") |
| res = await task |
| self.assertEqual(res, "ok") |
| |
| async def test_wait_for_cancellation_race_condition(self): |
| async def inner(): |
| with self.assertRaises(asyncio.CancelledError): |
| await asyncio.sleep(1) |
| return 1 |
| |
| result = await asyncio.wait_for(inner(), timeout=.01) |
| self.assertEqual(result, 1) |
| |
| async def test_wait_for_waits_for_task_cancellation(self): |
| task_done = False |
| |
| async def inner(): |
| nonlocal task_done |
| try: |
| await asyncio.sleep(10) |
| except asyncio.CancelledError: |
| await asyncio.sleep(_EPSILON) |
| raise |
| finally: |
| task_done = True |
| |
| inner_task = asyncio.create_task(inner()) |
| |
| with self.assertRaises(asyncio.TimeoutError) as cm: |
| await asyncio.wait_for(inner_task, timeout=_EPSILON) |
| |
| self.assertTrue(task_done) |
| chained = cm.exception.__context__ |
| self.assertEqual(type(chained), asyncio.CancelledError) |
| |
| async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): |
| task_done = False |
| |
| async def foo(): |
| async def inner(): |
| nonlocal task_done |
| try: |
| await asyncio.sleep(10) |
| except asyncio.CancelledError: |
| await asyncio.sleep(_EPSILON) |
| raise |
| finally: |
| task_done = True |
| |
| inner_task = asyncio.create_task(inner()) |
| await asyncio.sleep(_EPSILON) |
| await asyncio.wait_for(inner_task, timeout=0) |
| |
| with self.assertRaises(asyncio.TimeoutError) as cm: |
| await foo() |
| |
| self.assertTrue(task_done) |
| chained = cm.exception.__context__ |
| self.assertEqual(type(chained), asyncio.CancelledError) |
| |
| async def test_wait_for_reraises_exception_during_cancellation(self): |
| class FooException(Exception): |
| pass |
| |
| async def foo(): |
| async def inner(): |
| try: |
| await asyncio.sleep(0.2) |
| finally: |
| raise FooException |
| |
| inner_task = asyncio.create_task(inner()) |
| |
| await asyncio.wait_for(inner_task, timeout=_EPSILON) |
| |
| with self.assertRaises(FooException): |
| await foo() |
| |
| async def test_wait_for_self_cancellation(self): |
| async def inner(): |
| try: |
| await asyncio.sleep(0.3) |
| except asyncio.CancelledError: |
| try: |
| await asyncio.sleep(0.3) |
| except asyncio.CancelledError: |
| await asyncio.sleep(0.3) |
| |
| return 42 |
| |
| inner_task = asyncio.create_task(inner()) |
| |
| wait = asyncio.wait_for(inner_task, timeout=0.1) |
| |
| # Test that wait_for itself is properly cancellable |
| # even when the initial task holds up the initial cancellation. |
| task = asyncio.create_task(wait) |
| await asyncio.sleep(0.2) |
| task.cancel() |
| |
| with self.assertRaises(asyncio.CancelledError): |
| await task |
| |
| self.assertEqual(await inner_task, 42) |
| |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |