import unittest | |
from test.test_support import verbose, run_unittest | |
import sys | |
import gc | |
import weakref | |
### Support code | |
############################################################################### | |
# Bug 1055820 has several tests of longstanding bugs involving weakrefs and | |
# cyclic gc. | |
# An instance of C1055820 has a self-loop, so becomes cyclic trash when | |
# unreachable. | |
class C1055820(object): | |
def __init__(self, i): | |
self.i = i | |
self.loop = self | |
class GC_Detector(object): | |
# Create an instance I. Then gc hasn't happened again so long as | |
# I.gc_happened is false. | |
def __init__(self): | |
self.gc_happened = False | |
def it_happened(ignored): | |
self.gc_happened = True | |
# Create a piece of cyclic trash that triggers it_happened when | |
# gc collects it. | |
self.wr = weakref.ref(C1055820(666), it_happened) | |
### Tests | |
############################################################################### | |
class GCTests(unittest.TestCase): | |
def test_list(self): | |
l = [] | |
l.append(l) | |
gc.collect() | |
del l | |
self.assertEqual(gc.collect(), 1) | |
def test_dict(self): | |
d = {} | |
d[1] = d | |
gc.collect() | |
del d | |
self.assertEqual(gc.collect(), 1) | |
def test_tuple(self): | |
# since tuples are immutable we close the loop with a list | |
l = [] | |
t = (l,) | |
l.append(t) | |
gc.collect() | |
del t | |
del l | |
self.assertEqual(gc.collect(), 2) | |
def test_class(self): | |
class A: | |
pass | |
A.a = A | |
gc.collect() | |
del A | |
self.assertNotEqual(gc.collect(), 0) | |
def test_newstyleclass(self): | |
class A(object): | |
pass | |
gc.collect() | |
del A | |
self.assertNotEqual(gc.collect(), 0) | |
def test_instance(self): | |
class A: | |
pass | |
a = A() | |
a.a = a | |
gc.collect() | |
del a | |
self.assertNotEqual(gc.collect(), 0) | |
def test_newinstance(self): | |
class A(object): | |
pass | |
a = A() | |
a.a = a | |
gc.collect() | |
del a | |
self.assertNotEqual(gc.collect(), 0) | |
class B(list): | |
pass | |
class C(B, A): | |
pass | |
a = C() | |
a.a = a | |
gc.collect() | |
del a | |
self.assertNotEqual(gc.collect(), 0) | |
del B, C | |
self.assertNotEqual(gc.collect(), 0) | |
A.a = A() | |
del A | |
self.assertNotEqual(gc.collect(), 0) | |
self.assertEqual(gc.collect(), 0) | |
def test_method(self): | |
# Tricky: self.__init__ is a bound method, it references the instance. | |
class A: | |
def __init__(self): | |
self.init = self.__init__ | |
a = A() | |
gc.collect() | |
del a | |
self.assertNotEqual(gc.collect(), 0) | |
def test_finalizer(self): | |
# A() is uncollectable if it is part of a cycle, make sure it shows up | |
# in gc.garbage. | |
class A: | |
def __del__(self): pass | |
class B: | |
pass | |
a = A() | |
a.a = a | |
id_a = id(a) | |
b = B() | |
b.b = b | |
gc.collect() | |
del a | |
del b | |
self.assertNotEqual(gc.collect(), 0) | |
for obj in gc.garbage: | |
if id(obj) == id_a: | |
del obj.a | |
break | |
else: | |
self.fail("didn't find obj in garbage (finalizer)") | |
gc.garbage.remove(obj) | |
def test_finalizer_newclass(self): | |
# A() is uncollectable if it is part of a cycle, make sure it shows up | |
# in gc.garbage. | |
class A(object): | |
def __del__(self): pass | |
class B(object): | |
pass | |
a = A() | |
a.a = a | |
id_a = id(a) | |
b = B() | |
b.b = b | |
gc.collect() | |
del a | |
del b | |
self.assertNotEqual(gc.collect(), 0) | |
for obj in gc.garbage: | |
if id(obj) == id_a: | |
del obj.a | |
break | |
else: | |
self.fail("didn't find obj in garbage (finalizer)") | |
gc.garbage.remove(obj) | |
def test_function(self): | |
# Tricky: f -> d -> f, code should call d.clear() after the exec to | |
# break the cycle. | |
d = {} | |
exec("def f(): pass\n") in d | |
gc.collect() | |
del d | |
self.assertEqual(gc.collect(), 2) | |
def test_frame(self): | |
def f(): | |
frame = sys._getframe() | |
gc.collect() | |
f() | |
self.assertEqual(gc.collect(), 1) | |
def test_saveall(self): | |
# Verify that cyclic garbage like lists show up in gc.garbage if the | |
# SAVEALL option is enabled. | |
# First make sure we don't save away other stuff that just happens to | |
# be waiting for collection. | |
gc.collect() | |
# if this fails, someone else created immortal trash | |
self.assertEqual(gc.garbage, []) | |
L = [] | |
L.append(L) | |
id_L = id(L) | |
debug = gc.get_debug() | |
gc.set_debug(debug | gc.DEBUG_SAVEALL) | |
del L | |
gc.collect() | |
gc.set_debug(debug) | |
self.assertEqual(len(gc.garbage), 1) | |
obj = gc.garbage.pop() | |
self.assertEqual(id(obj), id_L) | |
def test_del(self): | |
# __del__ methods can trigger collection, make this to happen | |
thresholds = gc.get_threshold() | |
gc.enable() | |
gc.set_threshold(1) | |
class A: | |
def __del__(self): | |
dir(self) | |
a = A() | |
del a | |
gc.disable() | |
gc.set_threshold(*thresholds) | |
def test_del_newclass(self): | |
# __del__ methods can trigger collection, make this to happen | |
thresholds = gc.get_threshold() | |
gc.enable() | |
gc.set_threshold(1) | |
class A(object): | |
def __del__(self): | |
dir(self) | |
a = A() | |
del a | |
gc.disable() | |
gc.set_threshold(*thresholds) | |
# The following two tests are fragile: | |
# They precisely count the number of allocations, | |
# which is highly implementation-dependent. | |
# For example: | |
# - disposed tuples are not freed, but reused | |
# - the call to assertEqual somehow avoids building its args tuple | |
def test_get_count(self): | |
# Avoid future allocation of method object | |
assertEqual = self._baseAssertEqual | |
gc.collect() | |
assertEqual(gc.get_count(), (0, 0, 0)) | |
a = dict() | |
# since gc.collect(), we created two objects: | |
# the dict, and the tuple returned by get_count() | |
assertEqual(gc.get_count(), (2, 0, 0)) | |
def test_collect_generations(self): | |
# Avoid future allocation of method object | |
assertEqual = self.assertEqual | |
gc.collect() | |
a = dict() | |
gc.collect(0) | |
assertEqual(gc.get_count(), (0, 1, 0)) | |
gc.collect(1) | |
assertEqual(gc.get_count(), (0, 0, 1)) | |
gc.collect(2) | |
assertEqual(gc.get_count(), (0, 0, 0)) | |
def test_trashcan(self): | |
class Ouch: | |
n = 0 | |
def __del__(self): | |
Ouch.n = Ouch.n + 1 | |
if Ouch.n % 17 == 0: | |
gc.collect() | |
# "trashcan" is a hack to prevent stack overflow when deallocating | |
# very deeply nested tuples etc. It works in part by abusing the | |
# type pointer and refcount fields, and that can yield horrible | |
# problems when gc tries to traverse the structures. | |
# If this test fails (as it does in 2.0, 2.1 and 2.2), it will | |
# most likely die via segfault. | |
# Note: In 2.3 the possibility for compiling without cyclic gc was | |
# removed, and that in turn allows the trashcan mechanism to work | |
# via much simpler means (e.g., it never abuses the type pointer or | |
# refcount fields anymore). Since it's much less likely to cause a | |
# problem now, the various constants in this expensive (we force a lot | |
# of full collections) test are cut back from the 2.2 version. | |
gc.enable() | |
N = 150 | |
for count in range(2): | |
t = [] | |
for i in range(N): | |
t = [t, Ouch()] | |
u = [] | |
for i in range(N): | |
u = [u, Ouch()] | |
v = {} | |
for i in range(N): | |
v = {1: v, 2: Ouch()} | |
gc.disable() | |
def test_boom(self): | |
class Boom: | |
def __getattr__(self, someattribute): | |
del self.attr | |
raise AttributeError | |
a = Boom() | |
b = Boom() | |
a.attr = b | |
b.attr = a | |
gc.collect() | |
garbagelen = len(gc.garbage) | |
del a, b | |
# a<->b are in a trash cycle now. Collection will invoke | |
# Boom.__getattr__ (to see whether a and b have __del__ methods), and | |
# __getattr__ deletes the internal "attr" attributes as a side effect. | |
# That causes the trash cycle to get reclaimed via refcounts falling to | |
# 0, thus mutating the trash graph as a side effect of merely asking | |
# whether __del__ exists. This used to (before 2.3b1) crash Python. | |
# Now __getattr__ isn't called. | |
self.assertEqual(gc.collect(), 4) | |
self.assertEqual(len(gc.garbage), garbagelen) | |
def test_boom2(self): | |
class Boom2: | |
def __init__(self): | |
self.x = 0 | |
def __getattr__(self, someattribute): | |
self.x += 1 | |
if self.x > 1: | |
del self.attr | |
raise AttributeError | |
a = Boom2() | |
b = Boom2() | |
a.attr = b | |
b.attr = a | |
gc.collect() | |
garbagelen = len(gc.garbage) | |
del a, b | |
# Much like test_boom(), except that __getattr__ doesn't break the | |
# cycle until the second time gc checks for __del__. As of 2.3b1, | |
# there isn't a second time, so this simply cleans up the trash cycle. | |
# We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get | |
# reclaimed this way. | |
self.assertEqual(gc.collect(), 4) | |
self.assertEqual(len(gc.garbage), garbagelen) | |
def test_boom_new(self): | |
# boom__new and boom2_new are exactly like boom and boom2, except use | |
# new-style classes. | |
class Boom_New(object): | |
def __getattr__(self, someattribute): | |
del self.attr | |
raise AttributeError | |
a = Boom_New() | |
b = Boom_New() | |
a.attr = b | |
b.attr = a | |
gc.collect() | |
garbagelen = len(gc.garbage) | |
del a, b | |
self.assertEqual(gc.collect(), 4) | |
self.assertEqual(len(gc.garbage), garbagelen) | |
def test_boom2_new(self): | |
class Boom2_New(object): | |
def __init__(self): | |
self.x = 0 | |
def __getattr__(self, someattribute): | |
self.x += 1 | |
if self.x > 1: | |
del self.attr | |
raise AttributeError | |
a = Boom2_New() | |
b = Boom2_New() | |
a.attr = b | |
b.attr = a | |
gc.collect() | |
garbagelen = len(gc.garbage) | |
del a, b | |
self.assertEqual(gc.collect(), 4) | |
self.assertEqual(len(gc.garbage), garbagelen) | |
def test_get_referents(self): | |
alist = [1, 3, 5] | |
got = gc.get_referents(alist) | |
got.sort() | |
self.assertEqual(got, alist) | |
atuple = tuple(alist) | |
got = gc.get_referents(atuple) | |
got.sort() | |
self.assertEqual(got, alist) | |
adict = {1: 3, 5: 7} | |
expected = [1, 3, 5, 7] | |
got = gc.get_referents(adict) | |
got.sort() | |
self.assertEqual(got, expected) | |
got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0)) | |
got.sort() | |
self.assertEqual(got, [0, 0] + range(5)) | |
self.assertEqual(gc.get_referents(1, 'a', 4j), []) | |
def test_is_tracked(self): | |
# Atomic built-in types are not tracked, user-defined objects and | |
# mutable containers are. | |
# NOTE: types with special optimizations (e.g. tuple) have tests | |
# in their own test files instead. | |
self.assertFalse(gc.is_tracked(None)) | |
self.assertFalse(gc.is_tracked(1)) | |
self.assertFalse(gc.is_tracked(1.0)) | |
self.assertFalse(gc.is_tracked(1.0 + 5.0j)) | |
self.assertFalse(gc.is_tracked(True)) | |
self.assertFalse(gc.is_tracked(False)) | |
self.assertFalse(gc.is_tracked("a")) | |
self.assertFalse(gc.is_tracked(u"a")) | |
self.assertFalse(gc.is_tracked(bytearray("a"))) | |
self.assertFalse(gc.is_tracked(type)) | |
self.assertFalse(gc.is_tracked(int)) | |
self.assertFalse(gc.is_tracked(object)) | |
self.assertFalse(gc.is_tracked(object())) | |
class OldStyle: | |
pass | |
class NewStyle(object): | |
pass | |
self.assertTrue(gc.is_tracked(gc)) | |
self.assertTrue(gc.is_tracked(OldStyle)) | |
self.assertTrue(gc.is_tracked(OldStyle())) | |
self.assertTrue(gc.is_tracked(NewStyle)) | |
self.assertTrue(gc.is_tracked(NewStyle())) | |
self.assertTrue(gc.is_tracked([])) | |
self.assertTrue(gc.is_tracked(set())) | |
def test_bug1055820b(self): | |
# Corresponds to temp2b.py in the bug report. | |
ouch = [] | |
def callback(ignored): | |
ouch[:] = [wr() for wr in WRs] | |
Cs = [C1055820(i) for i in range(2)] | |
WRs = [weakref.ref(c, callback) for c in Cs] | |
c = None | |
gc.collect() | |
self.assertEqual(len(ouch), 0) | |
# Make the two instances trash, and collect again. The bug was that | |
# the callback materialized a strong reference to an instance, but gc | |
# cleared the instance's dict anyway. | |
Cs = None | |
gc.collect() | |
self.assertEqual(len(ouch), 2) # else the callbacks didn't run | |
for x in ouch: | |
# If the callback resurrected one of these guys, the instance | |
# would be damaged, with an empty __dict__. | |
self.assertEqual(x, None) | |
class GCTogglingTests(unittest.TestCase): | |
def setUp(self): | |
gc.enable() | |
def tearDown(self): | |
gc.disable() | |
def test_bug1055820c(self): | |
# Corresponds to temp2c.py in the bug report. This is pretty | |
# elaborate. | |
c0 = C1055820(0) | |
# Move c0 into generation 2. | |
gc.collect() | |
c1 = C1055820(1) | |
c1.keep_c0_alive = c0 | |
del c0.loop # now only c1 keeps c0 alive | |
c2 = C1055820(2) | |
c2wr = weakref.ref(c2) # no callback! | |
ouch = [] | |
def callback(ignored): | |
ouch[:] = [c2wr()] | |
# The callback gets associated with a wr on an object in generation 2. | |
c0wr = weakref.ref(c0, callback) | |
c0 = c1 = c2 = None | |
# What we've set up: c0, c1, and c2 are all trash now. c0 is in | |
# generation 2. The only thing keeping it alive is that c1 points to | |
# it. c1 and c2 are in generation 0, and are in self-loops. There's a | |
# global weakref to c2 (c2wr), but that weakref has no callback. | |
# There's also a global weakref to c0 (c0wr), and that does have a | |
# callback, and that callback references c2 via c2wr(). | |
# | |
# c0 has a wr with callback, which references c2wr | |
# ^ | |
# | | |
# | Generation 2 above dots | |
#. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . | |
# | Generation 0 below dots | |
# | | |
# | | |
# ^->c1 ^->c2 has a wr but no callback | |
# | | | | | |
# <--v <--v | |
# | |
# So this is the nightmare: when generation 0 gets collected, we see | |
# that c2 has a callback-free weakref, and c1 doesn't even have a | |
# weakref. Collecting generation 0 doesn't see c0 at all, and c0 is | |
# the only object that has a weakref with a callback. gc clears c1 | |
# and c2. Clearing c1 has the side effect of dropping the refcount on | |
# c0 to 0, so c0 goes away (despite that it's in an older generation) | |
# and c0's wr callback triggers. That in turn materializes a reference | |
# to c2 via c2wr(), but c2 gets cleared anyway by gc. | |
# We want to let gc happen "naturally", to preserve the distinction | |
# between generations. | |
junk = [] | |
i = 0 | |
detector = GC_Detector() | |
while not detector.gc_happened: | |
i += 1 | |
if i > 10000: | |
self.fail("gc didn't happen after 10000 iterations") | |
self.assertEqual(len(ouch), 0) | |
junk.append([]) # this will eventually trigger gc | |
self.assertEqual(len(ouch), 1) # else the callback wasn't invoked | |
for x in ouch: | |
# If the callback resurrected c2, the instance would be damaged, | |
# with an empty __dict__. | |
self.assertEqual(x, None) | |
def test_bug1055820d(self): | |
# Corresponds to temp2d.py in the bug report. This is very much like | |
# test_bug1055820c, but uses a __del__ method instead of a weakref | |
# callback to sneak in a resurrection of cyclic trash. | |
ouch = [] | |
class D(C1055820): | |
def __del__(self): | |
ouch[:] = [c2wr()] | |
d0 = D(0) | |
# Move all the above into generation 2. | |
gc.collect() | |
c1 = C1055820(1) | |
c1.keep_d0_alive = d0 | |
del d0.loop # now only c1 keeps d0 alive | |
c2 = C1055820(2) | |
c2wr = weakref.ref(c2) # no callback! | |
d0 = c1 = c2 = None | |
# What we've set up: d0, c1, and c2 are all trash now. d0 is in | |
# generation 2. The only thing keeping it alive is that c1 points to | |
# it. c1 and c2 are in generation 0, and are in self-loops. There's | |
# a global weakref to c2 (c2wr), but that weakref has no callback. | |
# There are no other weakrefs. | |
# | |
# d0 has a __del__ method that references c2wr | |
# ^ | |
# | | |
# | Generation 2 above dots | |
#. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . | |
# | Generation 0 below dots | |
# | | |
# | | |
# ^->c1 ^->c2 has a wr but no callback | |
# | | | | | |
# <--v <--v | |
# | |
# So this is the nightmare: when generation 0 gets collected, we see | |
# that c2 has a callback-free weakref, and c1 doesn't even have a | |
# weakref. Collecting generation 0 doesn't see d0 at all. gc clears | |
# c1 and c2. Clearing c1 has the side effect of dropping the refcount | |
# on d0 to 0, so d0 goes away (despite that it's in an older | |
# generation) and d0's __del__ triggers. That in turn materializes | |
# a reference to c2 via c2wr(), but c2 gets cleared anyway by gc. | |
# We want to let gc happen "naturally", to preserve the distinction | |
# between generations. | |
detector = GC_Detector() | |
junk = [] | |
i = 0 | |
while not detector.gc_happened: | |
i += 1 | |
if i > 10000: | |
self.fail("gc didn't happen after 10000 iterations") | |
self.assertEqual(len(ouch), 0) | |
junk.append([]) # this will eventually trigger gc | |
self.assertEqual(len(ouch), 1) # else __del__ wasn't invoked | |
for x in ouch: | |
# If __del__ resurrected c2, the instance would be damaged, with an | |
# empty __dict__. | |
self.assertEqual(x, None) | |
def test_main(): | |
enabled = gc.isenabled() | |
gc.disable() | |
assert not gc.isenabled() | |
debug = gc.get_debug() | |
gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak | |
try: | |
gc.collect() # Delete 2nd generation garbage | |
run_unittest(GCTests, GCTogglingTests) | |
finally: | |
gc.set_debug(debug) | |
# test gc.enable() even if GC is disabled by default | |
if verbose: | |
print "restoring automatic collection" | |
# make sure to always test gc.enable() | |
gc.enable() | |
assert gc.isenabled() | |
if not enabled: | |
gc.disable() | |
if __name__ == "__main__": | |
test_main() |