| """Tests for futures.py.""" |
| |
| import concurrent.futures |
| import gc |
| import re |
| import sys |
| import threading |
| import unittest |
| from unittest import mock |
| |
| import asyncio |
| from asyncio import futures |
| from test.test_asyncio import utils as test_utils |
| from test import support |
| |
| |
| def tearDownModule(): |
| asyncio.set_event_loop_policy(None) |
| |
| |
| def _fakefunc(f): |
| return f |
| |
| |
| def first_cb(): |
| pass |
| |
| |
| def last_cb(): |
| pass |
| |
| |
| class DuckFuture: |
| # Class that does not inherit from Future but aims to be duck-type |
| # compatible with it. |
| |
| _asyncio_future_blocking = False |
| __cancelled = False |
| __result = None |
| __exception = None |
| |
| def cancel(self): |
| if self.done(): |
| return False |
| self.__cancelled = True |
| return True |
| |
| def cancelled(self): |
| return self.__cancelled |
| |
| def done(self): |
| return (self.__cancelled |
| or self.__result is not None |
| or self.__exception is not None) |
| |
| def result(self): |
| assert not self.cancelled() |
| if self.__exception is not None: |
| raise self.__exception |
| return self.__result |
| |
| def exception(self): |
| assert not self.cancelled() |
| return self.__exception |
| |
| def set_result(self, result): |
| assert not self.done() |
| assert result is not None |
| self.__result = result |
| |
| def set_exception(self, exception): |
| assert not self.done() |
| assert exception is not None |
| self.__exception = exception |
| |
| def __iter__(self): |
| if not self.done(): |
| self._asyncio_future_blocking = True |
| yield self |
| assert self.done() |
| return self.result() |
| |
| |
| class DuckTests(test_utils.TestCase): |
| |
| def setUp(self): |
| super().setUp() |
| self.loop = self.new_test_loop() |
| self.addCleanup(self.loop.close) |
| |
| def test_wrap_future(self): |
| f = DuckFuture() |
| g = asyncio.wrap_future(f) |
| assert g is f |
| |
| def test_ensure_future(self): |
| f = DuckFuture() |
| g = asyncio.ensure_future(f) |
| assert g is f |
| |
| |
| class BaseFutureTests: |
| |
| def _new_future(self, *args, **kwargs): |
| return self.cls(*args, **kwargs) |
| |
| def setUp(self): |
| super().setUp() |
| self.loop = self.new_test_loop() |
| self.addCleanup(self.loop.close) |
| |
| def test_isfuture(self): |
| class MyFuture: |
| _asyncio_future_blocking = None |
| |
| def __init__(self): |
| self._asyncio_future_blocking = False |
| |
| self.assertFalse(asyncio.isfuture(MyFuture)) |
| self.assertTrue(asyncio.isfuture(MyFuture())) |
| self.assertFalse(asyncio.isfuture(1)) |
| |
| # As `isinstance(Mock(), Future)` returns `False` |
| self.assertFalse(asyncio.isfuture(mock.Mock())) |
| |
| f = self._new_future(loop=self.loop) |
| self.assertTrue(asyncio.isfuture(f)) |
| self.assertFalse(asyncio.isfuture(type(f))) |
| |
| # As `isinstance(Mock(Future), Future)` returns `True` |
| self.assertTrue(asyncio.isfuture(mock.Mock(type(f)))) |
| |
| f.cancel() |
| |
| def test_initial_state(self): |
| f = self._new_future(loop=self.loop) |
| self.assertFalse(f.cancelled()) |
| self.assertFalse(f.done()) |
| f.cancel() |
| self.assertTrue(f.cancelled()) |
| |
| def test_init_constructor_default_loop(self): |
| asyncio.set_event_loop(self.loop) |
| f = self._new_future() |
| self.assertIs(f._loop, self.loop) |
| self.assertIs(f.get_loop(), self.loop) |
| |
| def test_constructor_positional(self): |
| # Make sure Future doesn't accept a positional argument |
| self.assertRaises(TypeError, self._new_future, 42) |
| |
| def test_uninitialized(self): |
| # Test that C Future doesn't crash when Future.__init__() |
| # call was skipped. |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| self.assertRaises(asyncio.InvalidStateError, fut.result) |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| self.assertRaises(asyncio.InvalidStateError, fut.exception) |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| with self.assertRaises((RuntimeError, AttributeError)): |
| fut.set_result(None) |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| with self.assertRaises((RuntimeError, AttributeError)): |
| fut.set_exception(Exception) |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| with self.assertRaises((RuntimeError, AttributeError)): |
| fut.cancel() |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| with self.assertRaises((RuntimeError, AttributeError)): |
| fut.add_done_callback(lambda f: None) |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| with self.assertRaises((RuntimeError, AttributeError)): |
| fut.remove_done_callback(lambda f: None) |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| try: |
| repr(fut) |
| except (RuntimeError, AttributeError): |
| pass |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| try: |
| fut.__await__() |
| except RuntimeError: |
| pass |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| try: |
| iter(fut) |
| except RuntimeError: |
| pass |
| |
| fut = self.cls.__new__(self.cls, loop=self.loop) |
| self.assertFalse(fut.cancelled()) |
| self.assertFalse(fut.done()) |
| |
| def test_future_cancel_message_getter(self): |
| f = self._new_future(loop=self.loop) |
| self.assertTrue(hasattr(f, '_cancel_message')) |
| self.assertEqual(f._cancel_message, None) |
| |
| f.cancel('my message') |
| with self.assertRaises(asyncio.CancelledError): |
| self.loop.run_until_complete(f) |
| self.assertEqual(f._cancel_message, 'my message') |
| |
| def test_future_cancel_message_setter(self): |
| f = self._new_future(loop=self.loop) |
| f.cancel('my message') |
| f._cancel_message = 'my new message' |
| self.assertEqual(f._cancel_message, 'my new message') |
| |
| # Also check that the value is used for cancel(). |
| with self.assertRaises(asyncio.CancelledError): |
| self.loop.run_until_complete(f) |
| self.assertEqual(f._cancel_message, 'my new message') |
| |
| def test_cancel(self): |
| f = self._new_future(loop=self.loop) |
| self.assertTrue(f.cancel()) |
| self.assertTrue(f.cancelled()) |
| self.assertTrue(f.done()) |
| self.assertRaises(asyncio.CancelledError, f.result) |
| self.assertRaises(asyncio.CancelledError, f.exception) |
| self.assertRaises(asyncio.InvalidStateError, f.set_result, None) |
| self.assertRaises(asyncio.InvalidStateError, f.set_exception, None) |
| self.assertFalse(f.cancel()) |
| |
| def test_result(self): |
| f = self._new_future(loop=self.loop) |
| self.assertRaises(asyncio.InvalidStateError, f.result) |
| |
| f.set_result(42) |
| self.assertFalse(f.cancelled()) |
| self.assertTrue(f.done()) |
| self.assertEqual(f.result(), 42) |
| self.assertEqual(f.exception(), None) |
| self.assertRaises(asyncio.InvalidStateError, f.set_result, None) |
| self.assertRaises(asyncio.InvalidStateError, f.set_exception, None) |
| self.assertFalse(f.cancel()) |
| |
| def test_exception(self): |
| exc = RuntimeError() |
| f = self._new_future(loop=self.loop) |
| self.assertRaises(asyncio.InvalidStateError, f.exception) |
| |
| # StopIteration cannot be raised into a Future - CPython issue26221 |
| self.assertRaisesRegex(TypeError, "StopIteration .* cannot be raised", |
| f.set_exception, StopIteration) |
| |
| f.set_exception(exc) |
| self.assertFalse(f.cancelled()) |
| self.assertTrue(f.done()) |
| self.assertRaises(RuntimeError, f.result) |
| self.assertEqual(f.exception(), exc) |
| self.assertRaises(asyncio.InvalidStateError, f.set_result, None) |
| self.assertRaises(asyncio.InvalidStateError, f.set_exception, None) |
| self.assertFalse(f.cancel()) |
| |
| def test_exception_class(self): |
| f = self._new_future(loop=self.loop) |
| f.set_exception(RuntimeError) |
| self.assertIsInstance(f.exception(), RuntimeError) |
| |
| def test_yield_from_twice(self): |
| f = self._new_future(loop=self.loop) |
| |
| def fixture(): |
| yield 'A' |
| x = yield from f |
| yield 'B', x |
| y = yield from f |
| yield 'C', y |
| |
| g = fixture() |
| self.assertEqual(next(g), 'A') # yield 'A'. |
| self.assertEqual(next(g), f) # First yield from f. |
| f.set_result(42) |
| self.assertEqual(next(g), ('B', 42)) # yield 'B', x. |
| # The second "yield from f" does not yield f. |
| self.assertEqual(next(g), ('C', 42)) # yield 'C', y. |
| |
| def test_future_repr(self): |
| self.loop.set_debug(True) |
| f_pending_debug = self._new_future(loop=self.loop) |
| frame = f_pending_debug._source_traceback[-1] |
| self.assertEqual( |
| repr(f_pending_debug), |
| f'<{self.cls.__name__} pending created at {frame[0]}:{frame[1]}>') |
| f_pending_debug.cancel() |
| |
| self.loop.set_debug(False) |
| f_pending = self._new_future(loop=self.loop) |
| self.assertEqual(repr(f_pending), f'<{self.cls.__name__} pending>') |
| f_pending.cancel() |
| |
| f_cancelled = self._new_future(loop=self.loop) |
| f_cancelled.cancel() |
| self.assertEqual(repr(f_cancelled), f'<{self.cls.__name__} cancelled>') |
| |
| f_result = self._new_future(loop=self.loop) |
| f_result.set_result(4) |
| self.assertEqual( |
| repr(f_result), f'<{self.cls.__name__} finished result=4>') |
| self.assertEqual(f_result.result(), 4) |
| |
| exc = RuntimeError() |
| f_exception = self._new_future(loop=self.loop) |
| f_exception.set_exception(exc) |
| self.assertEqual( |
| repr(f_exception), |
| f'<{self.cls.__name__} finished exception=RuntimeError()>') |
| self.assertIs(f_exception.exception(), exc) |
| |
| def func_repr(func): |
| filename, lineno = test_utils.get_function_source(func) |
| text = '%s() at %s:%s' % (func.__qualname__, filename, lineno) |
| return re.escape(text) |
| |
| f_one_callbacks = self._new_future(loop=self.loop) |
| f_one_callbacks.add_done_callback(_fakefunc) |
| fake_repr = func_repr(_fakefunc) |
| self.assertRegex( |
| repr(f_one_callbacks), |
| r'<' + self.cls.__name__ + r' pending cb=\[%s\]>' % fake_repr) |
| f_one_callbacks.cancel() |
| self.assertEqual(repr(f_one_callbacks), |
| f'<{self.cls.__name__} cancelled>') |
| |
| f_two_callbacks = self._new_future(loop=self.loop) |
| f_two_callbacks.add_done_callback(first_cb) |
| f_two_callbacks.add_done_callback(last_cb) |
| first_repr = func_repr(first_cb) |
| last_repr = func_repr(last_cb) |
| self.assertRegex(repr(f_two_callbacks), |
| r'<' + self.cls.__name__ + r' pending cb=\[%s, %s\]>' |
| % (first_repr, last_repr)) |
| |
| f_many_callbacks = self._new_future(loop=self.loop) |
| f_many_callbacks.add_done_callback(first_cb) |
| for i in range(8): |
| f_many_callbacks.add_done_callback(_fakefunc) |
| f_many_callbacks.add_done_callback(last_cb) |
| cb_regex = r'%s, <8 more>, %s' % (first_repr, last_repr) |
| self.assertRegex( |
| repr(f_many_callbacks), |
| r'<' + self.cls.__name__ + r' pending cb=\[%s\]>' % cb_regex) |
| f_many_callbacks.cancel() |
| self.assertEqual(repr(f_many_callbacks), |
| f'<{self.cls.__name__} cancelled>') |
| |
| def test_copy_state(self): |
| from asyncio.futures import _copy_future_state |
| |
| f = self._new_future(loop=self.loop) |
| f.set_result(10) |
| |
| newf = self._new_future(loop=self.loop) |
| _copy_future_state(f, newf) |
| self.assertTrue(newf.done()) |
| self.assertEqual(newf.result(), 10) |
| |
| f_exception = self._new_future(loop=self.loop) |
| f_exception.set_exception(RuntimeError()) |
| |
| newf_exception = self._new_future(loop=self.loop) |
| _copy_future_state(f_exception, newf_exception) |
| self.assertTrue(newf_exception.done()) |
| self.assertRaises(RuntimeError, newf_exception.result) |
| |
| f_cancelled = self._new_future(loop=self.loop) |
| f_cancelled.cancel() |
| |
| newf_cancelled = self._new_future(loop=self.loop) |
| _copy_future_state(f_cancelled, newf_cancelled) |
| self.assertTrue(newf_cancelled.cancelled()) |
| |
| def test_iter(self): |
| fut = self._new_future(loop=self.loop) |
| |
| def coro(): |
| yield from fut |
| |
| def test(): |
| arg1, arg2 = coro() |
| |
| with self.assertRaisesRegex(RuntimeError, "await wasn't used"): |
| test() |
| fut.cancel() |
| |
| def test_log_traceback(self): |
| fut = self._new_future(loop=self.loop) |
| with self.assertRaisesRegex(ValueError, 'can only be set to False'): |
| fut._log_traceback = True |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_abandoned(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| del fut |
| self.assertFalse(m_log.error.called) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_not_called_after_cancel(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| fut.set_exception(Exception()) |
| fut.cancel() |
| del fut |
| self.assertFalse(m_log.error.called) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_result_unretrieved(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| fut.set_result(42) |
| del fut |
| self.assertFalse(m_log.error.called) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_result_retrieved(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| fut.set_result(42) |
| fut.result() |
| del fut |
| self.assertFalse(m_log.error.called) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_exception_unretrieved(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| fut.set_exception(RuntimeError('boom')) |
| del fut |
| test_utils.run_briefly(self.loop) |
| support.gc_collect() |
| self.assertTrue(m_log.error.called) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_exception_retrieved(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| fut.set_exception(RuntimeError('boom')) |
| fut.exception() |
| del fut |
| self.assertFalse(m_log.error.called) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def test_tb_logger_exception_result_retrieved(self, m_log): |
| fut = self._new_future(loop=self.loop) |
| fut.set_exception(RuntimeError('boom')) |
| self.assertRaises(RuntimeError, fut.result) |
| del fut |
| self.assertFalse(m_log.error.called) |
| |
| def test_wrap_future(self): |
| |
| def run(arg): |
| return (arg, threading.get_ident()) |
| ex = concurrent.futures.ThreadPoolExecutor(1) |
| f1 = ex.submit(run, 'oi') |
| f2 = asyncio.wrap_future(f1, loop=self.loop) |
| res, ident = self.loop.run_until_complete(f2) |
| self.assertTrue(asyncio.isfuture(f2)) |
| self.assertEqual(res, 'oi') |
| self.assertNotEqual(ident, threading.get_ident()) |
| ex.shutdown(wait=True) |
| |
| def test_wrap_future_future(self): |
| f1 = self._new_future(loop=self.loop) |
| f2 = asyncio.wrap_future(f1) |
| self.assertIs(f1, f2) |
| |
| def test_wrap_future_use_global_loop(self): |
| with mock.patch('asyncio.futures.events') as events: |
| events.get_event_loop = lambda: self.loop |
| def run(arg): |
| return (arg, threading.get_ident()) |
| ex = concurrent.futures.ThreadPoolExecutor(1) |
| f1 = ex.submit(run, 'oi') |
| f2 = asyncio.wrap_future(f1) |
| self.assertIs(self.loop, f2._loop) |
| ex.shutdown(wait=True) |
| |
| def test_wrap_future_cancel(self): |
| f1 = concurrent.futures.Future() |
| f2 = asyncio.wrap_future(f1, loop=self.loop) |
| f2.cancel() |
| test_utils.run_briefly(self.loop) |
| self.assertTrue(f1.cancelled()) |
| self.assertTrue(f2.cancelled()) |
| |
| def test_wrap_future_cancel2(self): |
| f1 = concurrent.futures.Future() |
| f2 = asyncio.wrap_future(f1, loop=self.loop) |
| f1.set_result(42) |
| f2.cancel() |
| test_utils.run_briefly(self.loop) |
| self.assertFalse(f1.cancelled()) |
| self.assertEqual(f1.result(), 42) |
| self.assertTrue(f2.cancelled()) |
| |
| def test_future_source_traceback(self): |
| self.loop.set_debug(True) |
| |
| future = self._new_future(loop=self.loop) |
| lineno = sys._getframe().f_lineno - 1 |
| self.assertIsInstance(future._source_traceback, list) |
| self.assertEqual(future._source_traceback[-2][:3], |
| (__file__, |
| lineno, |
| 'test_future_source_traceback')) |
| |
| @mock.patch('asyncio.base_events.logger') |
| def check_future_exception_never_retrieved(self, debug, m_log): |
| self.loop.set_debug(debug) |
| |
| def memory_error(): |
| try: |
| raise MemoryError() |
| except BaseException as exc: |
| return exc |
| exc = memory_error() |
| |
| future = self._new_future(loop=self.loop) |
| future.set_exception(exc) |
| future = None |
| test_utils.run_briefly(self.loop) |
| support.gc_collect() |
| |
| if sys.version_info >= (3, 4): |
| regex = f'^{self.cls.__name__} exception was never retrieved\n' |
| exc_info = (type(exc), exc, exc.__traceback__) |
| m_log.error.assert_called_once_with(mock.ANY, exc_info=exc_info) |
| else: |
| regex = r'^Future/Task exception was never retrieved\n' |
| m_log.error.assert_called_once_with(mock.ANY, exc_info=False) |
| message = m_log.error.call_args[0][0] |
| self.assertRegex(message, re.compile(regex, re.DOTALL)) |
| |
| def test_future_exception_never_retrieved(self): |
| self.check_future_exception_never_retrieved(False) |
| |
| def test_future_exception_never_retrieved_debug(self): |
| self.check_future_exception_never_retrieved(True) |
| |
| def test_set_result_unless_cancelled(self): |
| fut = self._new_future(loop=self.loop) |
| fut.cancel() |
| futures._set_result_unless_cancelled(fut, 2) |
| self.assertTrue(fut.cancelled()) |
| |
| def test_future_stop_iteration_args(self): |
| fut = self._new_future(loop=self.loop) |
| fut.set_result((1, 2)) |
| fi = fut.__iter__() |
| result = None |
| try: |
| fi.send(None) |
| except StopIteration as ex: |
| result = ex.args[0] |
| else: |
| self.fail('StopIteration was expected') |
| self.assertEqual(result, (1, 2)) |
| |
| def test_future_iter_throw(self): |
| fut = self._new_future(loop=self.loop) |
| fi = iter(fut) |
| self.assertRaises(TypeError, fi.throw, |
| Exception, Exception("elephant"), 32) |
| self.assertRaises(TypeError, fi.throw, |
| Exception("elephant"), Exception("elephant")) |
| self.assertRaises(TypeError, fi.throw, list) |
| |
| def test_future_del_collect(self): |
| class Evil: |
| def __del__(self): |
| gc.collect() |
| |
| for i in range(100): |
| fut = self._new_future(loop=self.loop) |
| fut.set_result(Evil()) |
| |
| |
| @unittest.skipUnless(hasattr(futures, '_CFuture'), |
| 'requires the C _asyncio module') |
| class CFutureTests(BaseFutureTests, test_utils.TestCase): |
| try: |
| cls = futures._CFuture |
| except AttributeError: |
| cls = None |
| |
| def test_future_del_segfault(self): |
| fut = self._new_future(loop=self.loop) |
| with self.assertRaises(AttributeError): |
| del fut._asyncio_future_blocking |
| with self.assertRaises(AttributeError): |
| del fut._log_traceback |
| |
| |
| @unittest.skipUnless(hasattr(futures, '_CFuture'), |
| 'requires the C _asyncio module') |
| class CSubFutureTests(BaseFutureTests, test_utils.TestCase): |
| try: |
| class CSubFuture(futures._CFuture): |
| pass |
| |
| cls = CSubFuture |
| except AttributeError: |
| cls = None |
| |
| |
| class PyFutureTests(BaseFutureTests, test_utils.TestCase): |
| cls = futures._PyFuture |
| |
| |
| class BaseFutureDoneCallbackTests(): |
| |
| def setUp(self): |
| super().setUp() |
| self.loop = self.new_test_loop() |
| |
| def run_briefly(self): |
| test_utils.run_briefly(self.loop) |
| |
| def _make_callback(self, bag, thing): |
| # Create a callback function that appends thing to bag. |
| def bag_appender(future): |
| bag.append(thing) |
| return bag_appender |
| |
| def _new_future(self): |
| raise NotImplementedError |
| |
| def test_callbacks_remove_first_callback(self): |
| bag = [] |
| f = self._new_future() |
| |
| cb1 = self._make_callback(bag, 42) |
| cb2 = self._make_callback(bag, 17) |
| cb3 = self._make_callback(bag, 100) |
| |
| f.add_done_callback(cb1) |
| f.add_done_callback(cb2) |
| f.add_done_callback(cb3) |
| |
| f.remove_done_callback(cb1) |
| f.remove_done_callback(cb1) |
| |
| self.assertEqual(bag, []) |
| f.set_result('foo') |
| |
| self.run_briefly() |
| |
| self.assertEqual(bag, [17, 100]) |
| self.assertEqual(f.result(), 'foo') |
| |
| def test_callbacks_remove_first_and_second_callback(self): |
| bag = [] |
| f = self._new_future() |
| |
| cb1 = self._make_callback(bag, 42) |
| cb2 = self._make_callback(bag, 17) |
| cb3 = self._make_callback(bag, 100) |
| |
| f.add_done_callback(cb1) |
| f.add_done_callback(cb2) |
| f.add_done_callback(cb3) |
| |
| f.remove_done_callback(cb1) |
| f.remove_done_callback(cb2) |
| f.remove_done_callback(cb1) |
| |
| self.assertEqual(bag, []) |
| f.set_result('foo') |
| |
| self.run_briefly() |
| |
| self.assertEqual(bag, [100]) |
| self.assertEqual(f.result(), 'foo') |
| |
| def test_callbacks_remove_third_callback(self): |
| bag = [] |
| f = self._new_future() |
| |
| cb1 = self._make_callback(bag, 42) |
| cb2 = self._make_callback(bag, 17) |
| cb3 = self._make_callback(bag, 100) |
| |
| f.add_done_callback(cb1) |
| f.add_done_callback(cb2) |
| f.add_done_callback(cb3) |
| |
| f.remove_done_callback(cb3) |
| f.remove_done_callback(cb3) |
| |
| self.assertEqual(bag, []) |
| f.set_result('foo') |
| |
| self.run_briefly() |
| |
| self.assertEqual(bag, [42, 17]) |
| self.assertEqual(f.result(), 'foo') |
| |
| def test_callbacks_invoked_on_set_result(self): |
| bag = [] |
| f = self._new_future() |
| f.add_done_callback(self._make_callback(bag, 42)) |
| f.add_done_callback(self._make_callback(bag, 17)) |
| |
| self.assertEqual(bag, []) |
| f.set_result('foo') |
| |
| self.run_briefly() |
| |
| self.assertEqual(bag, [42, 17]) |
| self.assertEqual(f.result(), 'foo') |
| |
| def test_callbacks_invoked_on_set_exception(self): |
| bag = [] |
| f = self._new_future() |
| f.add_done_callback(self._make_callback(bag, 100)) |
| |
| self.assertEqual(bag, []) |
| exc = RuntimeError() |
| f.set_exception(exc) |
| |
| self.run_briefly() |
| |
| self.assertEqual(bag, [100]) |
| self.assertEqual(f.exception(), exc) |
| |
| def test_remove_done_callback(self): |
| bag = [] |
| f = self._new_future() |
| cb1 = self._make_callback(bag, 1) |
| cb2 = self._make_callback(bag, 2) |
| cb3 = self._make_callback(bag, 3) |
| |
| # Add one cb1 and one cb2. |
| f.add_done_callback(cb1) |
| f.add_done_callback(cb2) |
| |
| # One instance of cb2 removed. Now there's only one cb1. |
| self.assertEqual(f.remove_done_callback(cb2), 1) |
| |
| # Never had any cb3 in there. |
| self.assertEqual(f.remove_done_callback(cb3), 0) |
| |
| # After this there will be 6 instances of cb1 and one of cb2. |
| f.add_done_callback(cb2) |
| for i in range(5): |
| f.add_done_callback(cb1) |
| |
| # Remove all instances of cb1. One cb2 remains. |
| self.assertEqual(f.remove_done_callback(cb1), 6) |
| |
| self.assertEqual(bag, []) |
| f.set_result('foo') |
| |
| self.run_briefly() |
| |
| self.assertEqual(bag, [2]) |
| self.assertEqual(f.result(), 'foo') |
| |
| def test_remove_done_callbacks_list_mutation(self): |
| # see http://bugs.python.org/issue28963 for details |
| |
| fut = self._new_future() |
| fut.add_done_callback(str) |
| |
| for _ in range(63): |
| fut.add_done_callback(id) |
| |
| class evil: |
| def __eq__(self, other): |
| fut.remove_done_callback(id) |
| return False |
| |
| fut.remove_done_callback(evil()) |
| |
| def test_schedule_callbacks_list_mutation_1(self): |
| # see http://bugs.python.org/issue28963 for details |
| |
| def mut(f): |
| f.remove_done_callback(str) |
| |
| fut = self._new_future() |
| fut.add_done_callback(mut) |
| fut.add_done_callback(str) |
| fut.add_done_callback(str) |
| fut.set_result(1) |
| test_utils.run_briefly(self.loop) |
| |
| def test_schedule_callbacks_list_mutation_2(self): |
| # see http://bugs.python.org/issue30828 for details |
| |
| fut = self._new_future() |
| fut.add_done_callback(str) |
| |
| for _ in range(63): |
| fut.add_done_callback(id) |
| |
| max_extra_cbs = 100 |
| extra_cbs = 0 |
| |
| class evil: |
| def __eq__(self, other): |
| nonlocal extra_cbs |
| extra_cbs += 1 |
| if extra_cbs < max_extra_cbs: |
| fut.add_done_callback(id) |
| return False |
| |
| fut.remove_done_callback(evil()) |
| |
| |
| @unittest.skipUnless(hasattr(futures, '_CFuture'), |
| 'requires the C _asyncio module') |
| class CFutureDoneCallbackTests(BaseFutureDoneCallbackTests, |
| test_utils.TestCase): |
| |
| def _new_future(self): |
| return futures._CFuture(loop=self.loop) |
| |
| |
| @unittest.skipUnless(hasattr(futures, '_CFuture'), |
| 'requires the C _asyncio module') |
| class CSubFutureDoneCallbackTests(BaseFutureDoneCallbackTests, |
| test_utils.TestCase): |
| |
| def _new_future(self): |
| class CSubFuture(futures._CFuture): |
| pass |
| return CSubFuture(loop=self.loop) |
| |
| |
| class PyFutureDoneCallbackTests(BaseFutureDoneCallbackTests, |
| test_utils.TestCase): |
| |
| def _new_future(self): |
| return futures._PyFuture(loop=self.loop) |
| |
| |
| class BaseFutureInheritanceTests: |
| |
| def _get_future_cls(self): |
| raise NotImplementedError |
| |
| def setUp(self): |
| super().setUp() |
| self.loop = self.new_test_loop() |
| self.addCleanup(self.loop.close) |
| |
| def test_inherit_without_calling_super_init(self): |
| # See https://bugs.python.org/issue38785 for the context |
| cls = self._get_future_cls() |
| |
| class MyFut(cls): |
| def __init__(self, *args, **kwargs): |
| # don't call super().__init__() |
| pass |
| |
| fut = MyFut(loop=self.loop) |
| with self.assertRaisesRegex( |
| RuntimeError, |
| "Future object is not initialized." |
| ): |
| fut.get_loop() |
| |
| |
| class PyFutureInheritanceTests(BaseFutureInheritanceTests, |
| test_utils.TestCase): |
| def _get_future_cls(self): |
| return futures._PyFuture |
| |
| |
| class CFutureInheritanceTests(BaseFutureInheritanceTests, |
| test_utils.TestCase): |
| def _get_future_cls(self): |
| return futures._CFuture |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |