| import unittest |
| |
| from contextlib import contextmanager, ExitStack |
| from test.support import catch_unraisable_exception, import_helper |
| |
| |
| # Skip this test if the _testcapi module isn't available. |
| _testcapi = import_helper.import_module('_testcapi') |
| |
| |
| class TestDictWatchers(unittest.TestCase): |
| # types of watchers testcapimodule can add: |
| EVENTS = 0 # appends dict events as strings to global event list |
| ERROR = 1 # unconditionally sets and signals a RuntimeException |
| SECOND = 2 # always appends "second" to global event list |
| |
| def add_watcher(self, kind=EVENTS): |
| return _testcapi.add_dict_watcher(kind) |
| |
| def clear_watcher(self, watcher_id): |
| _testcapi.clear_dict_watcher(watcher_id) |
| |
| @contextmanager |
| def watcher(self, kind=EVENTS): |
| wid = self.add_watcher(kind) |
| try: |
| yield wid |
| finally: |
| self.clear_watcher(wid) |
| |
| def assert_events(self, expected): |
| actual = _testcapi.get_dict_watcher_events() |
| self.assertEqual(actual, expected) |
| |
| def watch(self, wid, d): |
| _testcapi.watch_dict(wid, d) |
| |
| def unwatch(self, wid, d): |
| _testcapi.unwatch_dict(wid, d) |
| |
| def test_set_new_item(self): |
| d = {} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| d["foo"] = "bar" |
| self.assert_events(["new:foo:bar"]) |
| |
| def test_set_existing_item(self): |
| d = {"foo": "bar"} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| d["foo"] = "baz" |
| self.assert_events(["mod:foo:baz"]) |
| |
| def test_clone(self): |
| d = {} |
| d2 = {"foo": "bar"} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| d.update(d2) |
| self.assert_events(["clone"]) |
| |
| def test_no_event_if_not_watched(self): |
| d = {} |
| with self.watcher() as wid: |
| d["foo"] = "bar" |
| self.assert_events([]) |
| |
| def test_del(self): |
| d = {"foo": "bar"} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| del d["foo"] |
| self.assert_events(["del:foo"]) |
| |
| def test_pop(self): |
| d = {"foo": "bar"} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| d.pop("foo") |
| self.assert_events(["del:foo"]) |
| |
| def test_clear(self): |
| d = {"foo": "bar"} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| d.clear() |
| self.assert_events(["clear"]) |
| |
| def test_dealloc(self): |
| d = {"foo": "bar"} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| del d |
| self.assert_events(["dealloc"]) |
| |
| def test_unwatch(self): |
| d = {} |
| with self.watcher() as wid: |
| self.watch(wid, d) |
| d["foo"] = "bar" |
| self.unwatch(wid, d) |
| d["hmm"] = "baz" |
| self.assert_events(["new:foo:bar"]) |
| |
| def test_error(self): |
| d = {} |
| with self.watcher(kind=self.ERROR) as wid: |
| self.watch(wid, d) |
| with catch_unraisable_exception() as cm: |
| d["foo"] = "bar" |
| self.assertIn( |
| "PyDict_EVENT_ADDED watcher callback for <dict at", |
| cm.unraisable.object |
| ) |
| self.assertEqual(str(cm.unraisable.exc_value), "boom!") |
| self.assert_events([]) |
| |
| def test_dealloc_error(self): |
| d = {} |
| with self.watcher(kind=self.ERROR) as wid: |
| self.watch(wid, d) |
| with catch_unraisable_exception() as cm: |
| del d |
| self.assertEqual(str(cm.unraisable.exc_value), "boom!") |
| |
| def test_two_watchers(self): |
| d1 = {} |
| d2 = {} |
| with self.watcher() as wid1: |
| with self.watcher(kind=self.SECOND) as wid2: |
| self.watch(wid1, d1) |
| self.watch(wid2, d2) |
| d1["foo"] = "bar" |
| d2["hmm"] = "baz" |
| self.assert_events(["new:foo:bar", "second"]) |
| |
| def test_watch_non_dict(self): |
| with self.watcher() as wid: |
| with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): |
| self.watch(wid, 1) |
| |
| def test_watch_out_of_range_watcher_id(self): |
| d = {} |
| with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): |
| self.watch(-1, d) |
| with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): |
| self.watch(8, d) # DICT_MAX_WATCHERS = 8 |
| |
| def test_watch_unassigned_watcher_id(self): |
| d = {} |
| with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): |
| self.watch(1, d) |
| |
| def test_unwatch_non_dict(self): |
| with self.watcher() as wid: |
| with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): |
| self.unwatch(wid, 1) |
| |
| def test_unwatch_out_of_range_watcher_id(self): |
| d = {} |
| with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): |
| self.unwatch(-1, d) |
| with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): |
| self.unwatch(8, d) # DICT_MAX_WATCHERS = 8 |
| |
| def test_unwatch_unassigned_watcher_id(self): |
| d = {} |
| with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): |
| self.unwatch(1, d) |
| |
| def test_clear_out_of_range_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): |
| self.clear_watcher(-1) |
| with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): |
| self.clear_watcher(8) # DICT_MAX_WATCHERS = 8 |
| |
| def test_clear_unassigned_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"): |
| self.clear_watcher(1) |
| |
| |
| class TestTypeWatchers(unittest.TestCase): |
| # types of watchers testcapimodule can add: |
| TYPES = 0 # appends modified types to global event list |
| ERROR = 1 # unconditionally sets and signals a RuntimeException |
| WRAP = 2 # appends modified type wrapped in list to global event list |
| |
| # duplicating the C constant |
| TYPE_MAX_WATCHERS = 8 |
| |
| def add_watcher(self, kind=TYPES): |
| return _testcapi.add_type_watcher(kind) |
| |
| def clear_watcher(self, watcher_id): |
| _testcapi.clear_type_watcher(watcher_id) |
| |
| @contextmanager |
| def watcher(self, kind=TYPES): |
| wid = self.add_watcher(kind) |
| try: |
| yield wid |
| finally: |
| self.clear_watcher(wid) |
| |
| def assert_events(self, expected): |
| actual = _testcapi.get_type_modified_events() |
| self.assertEqual(actual, expected) |
| |
| def watch(self, wid, t): |
| _testcapi.watch_type(wid, t) |
| |
| def unwatch(self, wid, t): |
| _testcapi.unwatch_type(wid, t) |
| |
| def test_watch_type(self): |
| class C: pass |
| with self.watcher() as wid: |
| self.watch(wid, C) |
| C.foo = "bar" |
| self.assert_events([C]) |
| |
| def test_event_aggregation(self): |
| class C: pass |
| with self.watcher() as wid: |
| self.watch(wid, C) |
| C.foo = "bar" |
| C.bar = "baz" |
| # only one event registered for both modifications |
| self.assert_events([C]) |
| |
| def test_lookup_resets_aggregation(self): |
| class C: pass |
| with self.watcher() as wid: |
| self.watch(wid, C) |
| C.foo = "bar" |
| # lookup resets type version tag |
| self.assertEqual(C.foo, "bar") |
| C.bar = "baz" |
| # both events registered |
| self.assert_events([C, C]) |
| |
| def test_unwatch_type(self): |
| class C: pass |
| with self.watcher() as wid: |
| self.watch(wid, C) |
| C.foo = "bar" |
| self.assertEqual(C.foo, "bar") |
| self.assert_events([C]) |
| self.unwatch(wid, C) |
| C.bar = "baz" |
| self.assert_events([C]) |
| |
| def test_clear_watcher(self): |
| class C: pass |
| # outer watcher is unused, it's just to keep events list alive |
| with self.watcher() as _: |
| with self.watcher() as wid: |
| self.watch(wid, C) |
| C.foo = "bar" |
| self.assertEqual(C.foo, "bar") |
| self.assert_events([C]) |
| C.bar = "baz" |
| # Watcher on C has been cleared, no new event |
| self.assert_events([C]) |
| |
| def test_watch_type_subclass(self): |
| class C: pass |
| class D(C): pass |
| with self.watcher() as wid: |
| self.watch(wid, D) |
| C.foo = "bar" |
| self.assert_events([D]) |
| |
| def test_error(self): |
| class C: pass |
| with self.watcher(kind=self.ERROR) as wid: |
| self.watch(wid, C) |
| with catch_unraisable_exception() as cm: |
| C.foo = "bar" |
| self.assertIs(cm.unraisable.object, C) |
| self.assertEqual(str(cm.unraisable.exc_value), "boom!") |
| self.assert_events([]) |
| |
| def test_two_watchers(self): |
| class C1: pass |
| class C2: pass |
| with self.watcher() as wid1: |
| with self.watcher(kind=self.WRAP) as wid2: |
| self.assertNotEqual(wid1, wid2) |
| self.watch(wid1, C1) |
| self.watch(wid2, C2) |
| C1.foo = "bar" |
| C2.hmm = "baz" |
| self.assert_events([C1, [C2]]) |
| |
| def test_watch_non_type(self): |
| with self.watcher() as wid: |
| with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): |
| self.watch(wid, 1) |
| |
| def test_watch_out_of_range_watcher_id(self): |
| class C: pass |
| with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): |
| self.watch(-1, C) |
| with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): |
| self.watch(self.TYPE_MAX_WATCHERS, C) |
| |
| def test_watch_unassigned_watcher_id(self): |
| class C: pass |
| with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): |
| self.watch(1, C) |
| |
| def test_unwatch_non_type(self): |
| with self.watcher() as wid: |
| with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): |
| self.unwatch(wid, 1) |
| |
| def test_unwatch_out_of_range_watcher_id(self): |
| class C: pass |
| with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): |
| self.unwatch(-1, C) |
| with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): |
| self.unwatch(self.TYPE_MAX_WATCHERS, C) |
| |
| def test_unwatch_unassigned_watcher_id(self): |
| class C: pass |
| with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): |
| self.unwatch(1, C) |
| |
| def test_clear_out_of_range_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): |
| self.clear_watcher(-1) |
| with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): |
| self.clear_watcher(self.TYPE_MAX_WATCHERS) |
| |
| def test_clear_unassigned_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): |
| self.clear_watcher(1) |
| |
| def test_no_more_ids_available(self): |
| contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)] |
| with ExitStack() as stack: |
| for ctx in contexts: |
| stack.enter_context(ctx) |
| with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"): |
| self.add_watcher() |
| |
| |
| class TestCodeObjectWatchers(unittest.TestCase): |
| @contextmanager |
| def code_watcher(self, which_watcher): |
| wid = _testcapi.add_code_watcher(which_watcher) |
| try: |
| yield wid |
| finally: |
| _testcapi.clear_code_watcher(wid) |
| |
| def assert_event_counts(self, exp_created_0, exp_destroyed_0, |
| exp_created_1, exp_destroyed_1): |
| self.assertEqual( |
| exp_created_0, _testcapi.get_code_watcher_num_created_events(0)) |
| self.assertEqual( |
| exp_destroyed_0, _testcapi.get_code_watcher_num_destroyed_events(0)) |
| self.assertEqual( |
| exp_created_1, _testcapi.get_code_watcher_num_created_events(1)) |
| self.assertEqual( |
| exp_destroyed_1, _testcapi.get_code_watcher_num_destroyed_events(1)) |
| |
| def test_code_object_events_dispatched(self): |
| # verify that all counts are zero before any watchers are registered |
| self.assert_event_counts(0, 0, 0, 0) |
| |
| # verify that all counts remain zero when a code object is |
| # created and destroyed with no watchers registered |
| co1 = _testcapi.code_newempty("test_watchers", "dummy1", 0) |
| self.assert_event_counts(0, 0, 0, 0) |
| del co1 |
| self.assert_event_counts(0, 0, 0, 0) |
| |
| # verify counts are as expected when first watcher is registered |
| with self.code_watcher(0): |
| self.assert_event_counts(0, 0, 0, 0) |
| co2 = _testcapi.code_newempty("test_watchers", "dummy2", 0) |
| self.assert_event_counts(1, 0, 0, 0) |
| del co2 |
| self.assert_event_counts(1, 1, 0, 0) |
| |
| # again with second watcher registered |
| with self.code_watcher(1): |
| self.assert_event_counts(1, 1, 0, 0) |
| co3 = _testcapi.code_newempty("test_watchers", "dummy3", 0) |
| self.assert_event_counts(2, 1, 1, 0) |
| del co3 |
| self.assert_event_counts(2, 2, 1, 1) |
| |
| # verify counts are reset and don't change after both watchers are cleared |
| co4 = _testcapi.code_newempty("test_watchers", "dummy4", 0) |
| self.assert_event_counts(0, 0, 0, 0) |
| del co4 |
| self.assert_event_counts(0, 0, 0, 0) |
| |
| def test_error(self): |
| with self.code_watcher(2): |
| with catch_unraisable_exception() as cm: |
| co = _testcapi.code_newempty("test_watchers", "dummy0", 0) |
| |
| self.assertEqual( |
| cm.unraisable.object, |
| f"PY_CODE_EVENT_CREATE watcher callback for {co!r}" |
| ) |
| self.assertEqual(str(cm.unraisable.exc_value), "boom!") |
| |
| def test_dealloc_error(self): |
| co = _testcapi.code_newempty("test_watchers", "dummy0", 0) |
| with self.code_watcher(2): |
| with catch_unraisable_exception() as cm: |
| del co |
| |
| self.assertEqual(str(cm.unraisable.exc_value), "boom!") |
| |
| def test_clear_out_of_range_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID -1"): |
| _testcapi.clear_code_watcher(-1) |
| with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID 8"): |
| _testcapi.clear_code_watcher(8) # CODE_MAX_WATCHERS = 8 |
| |
| def test_clear_unassigned_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"No code watcher set for ID 1"): |
| _testcapi.clear_code_watcher(1) |
| |
| def test_allocate_too_many_watchers(self): |
| with self.assertRaisesRegex(RuntimeError, r"no more code watcher IDs available"): |
| _testcapi.allocate_too_many_code_watchers() |
| |
| |
| class TestFuncWatchers(unittest.TestCase): |
| @contextmanager |
| def add_watcher(self, func): |
| wid = _testcapi.add_func_watcher(func) |
| try: |
| yield |
| finally: |
| _testcapi.clear_func_watcher(wid) |
| |
| def test_func_events_dispatched(self): |
| events = [] |
| def watcher(*args): |
| events.append(args) |
| |
| with self.add_watcher(watcher): |
| def myfunc(): |
| pass |
| self.assertIn((_testcapi.PYFUNC_EVENT_CREATE, myfunc, None), events) |
| myfunc_id = id(myfunc) |
| |
| new_code = self.test_func_events_dispatched.__code__ |
| myfunc.__code__ = new_code |
| self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_CODE, myfunc, new_code), events) |
| |
| new_defaults = (123,) |
| myfunc.__defaults__ = new_defaults |
| self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events) |
| |
| new_defaults = (456,) |
| _testcapi.set_func_defaults_via_capi(myfunc, new_defaults) |
| self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events) |
| |
| new_kwdefaults = {"self": 123} |
| myfunc.__kwdefaults__ = new_kwdefaults |
| self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events) |
| |
| new_kwdefaults = {"self": 456} |
| _testcapi.set_func_kwdefaults_via_capi(myfunc, new_kwdefaults) |
| self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events) |
| |
| # Clear events reference to func |
| events = [] |
| del myfunc |
| self.assertIn((_testcapi.PYFUNC_EVENT_DESTROY, myfunc_id, None), events) |
| |
| def test_multiple_watchers(self): |
| events0 = [] |
| def first_watcher(*args): |
| events0.append(args) |
| |
| events1 = [] |
| def second_watcher(*args): |
| events1.append(args) |
| |
| with self.add_watcher(first_watcher): |
| with self.add_watcher(second_watcher): |
| def myfunc(): |
| pass |
| |
| event = (_testcapi.PYFUNC_EVENT_CREATE, myfunc, None) |
| self.assertIn(event, events0) |
| self.assertIn(event, events1) |
| |
| def test_watcher_raises_error(self): |
| class MyError(Exception): |
| pass |
| |
| def watcher(*args): |
| raise MyError("testing 123") |
| |
| with self.add_watcher(watcher): |
| with catch_unraisable_exception() as cm: |
| def myfunc(): |
| pass |
| |
| self.assertEqual( |
| cm.unraisable.object, |
| f"PyFunction_EVENT_CREATE watcher callback for {myfunc!r}" |
| ) |
| |
| def test_dealloc_watcher_raises_error(self): |
| class MyError(Exception): |
| pass |
| |
| def watcher(*args): |
| raise MyError("testing 123") |
| |
| def myfunc(): |
| pass |
| |
| with self.add_watcher(watcher): |
| with catch_unraisable_exception() as cm: |
| del myfunc |
| |
| self.assertIsInstance(cm.unraisable.exc_value, MyError) |
| |
| def test_clear_out_of_range_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"invalid func watcher ID -1"): |
| _testcapi.clear_func_watcher(-1) |
| with self.assertRaisesRegex(ValueError, r"invalid func watcher ID 8"): |
| _testcapi.clear_func_watcher(8) # FUNC_MAX_WATCHERS = 8 |
| |
| def test_clear_unassigned_watcher_id(self): |
| with self.assertRaisesRegex(ValueError, r"no func watcher set for ID 1"): |
| _testcapi.clear_func_watcher(1) |
| |
| def test_allocate_too_many_watchers(self): |
| with self.assertRaisesRegex(RuntimeError, r"no more func watcher IDs"): |
| _testcapi.allocate_too_many_func_watchers() |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |