From ccfa938f6799a3ea15f321ede2f5d4232de31753 Mon Sep 17 00:00:00 2001 From: CPython Developers <> Date: Mon, 23 Feb 2026 02:13:15 +0900 Subject: [PATCH] Update test_gc from v3.14.3 --- Lib/test/test_gc.py | 1622 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1622 insertions(+) create mode 100644 Lib/test/test_gc.py diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py new file mode 100644 index 000000000..3e3092dca --- /dev/null +++ b/Lib/test/test_gc.py @@ -0,0 +1,1622 @@ +import unittest +import unittest.mock +from test import support +from test.support import (verbose, refcount_test, + cpython_only, requires_subprocess, + requires_gil_enabled, + Py_GIL_DISABLED) +from test.support.import_helper import import_module +from test.support.os_helper import temp_dir, TESTFN, unlink +from test.support.script_helper import assert_python_ok, make_script, run_test_script +from test.support import threading_helper, gc_threshold + +import gc +import sys +import sysconfig +import textwrap +import threading +import time +import weakref + +try: + import _testcapi + from _testcapi import with_tp_del + from _testcapi import ContainerNoGC +except ImportError: + _testcapi = None + def with_tp_del(cls): + class C(object): + def __new__(cls, *args, **kwargs): + raise unittest.SkipTest('requires _testcapi.with_tp_del') + return C + ContainerNoGC = None + +try: + import _testinternalcapi +except ImportError: + _testinternalcapi = None + +### Support code +############################################################################### + +# Bug 1055820 has several tests of longstanding bugs involving weakrefs and +# cyclic gc. + +# An instance of C1055820 has a self-loop, so becomes cyclic trash when +# unreachable. +class C1055820(object): + def __init__(self, i): + self.i = i + self.loop = self + +class GC_Detector(object): + # Create an instance I. Then gc hasn't happened again so long as + # I.gc_happened is false. + + def __init__(self): + self.gc_happened = False + + def it_happened(ignored): + self.gc_happened = True + + # Create a piece of cyclic trash that triggers it_happened when + # gc collects it. + self.wr = weakref.ref(C1055820(666), it_happened) + +@with_tp_del +class Uncollectable(object): + """Create a reference cycle with multiple __del__ methods. + + An object in a reference cycle will never have zero references, + and so must be garbage collected. If one or more objects in the + cycle have __del__ methods, the gc refuses to guess an order, + and leaves the cycle uncollected.""" + def __init__(self, partner=None): + if partner is None: + self.partner = Uncollectable(partner=self) + else: + self.partner = partner + def __tp_del__(self): + pass + +if sysconfig.get_config_vars().get('PY_CFLAGS', ''): + BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS']) +else: + # Usually, sys.gettotalrefcount() is only present if Python has been + # compiled in debug mode. If it's missing, expect that Python has + # been released in release mode: with NDEBUG defined. + BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount')) + +### Tests +############################################################################### + +class GCTests(unittest.TestCase): + def test_list(self): + l = [] + l.append(l) + gc.collect() + del l + self.assertEqual(gc.collect(), 1) + + def test_dict(self): + d = {} + d[1] = d + gc.collect() + del d + self.assertEqual(gc.collect(), 1) + + def test_tuple(self): + # since tuples are immutable we close the loop with a list + l = [] + t = (l,) + l.append(t) + gc.collect() + del t + del l + self.assertEqual(gc.collect(), 2) + + def test_class(self): + class A: + pass + A.a = A + gc.collect() + del A + self.assertNotEqual(gc.collect(), 0) + + def test_newstyleclass(self): + class A(object): + pass + gc.collect() + del A + self.assertNotEqual(gc.collect(), 0) + + def test_instance(self): + class A: + pass + a = A() + a.a = a + gc.collect() + del a + self.assertNotEqual(gc.collect(), 0) + + def test_newinstance(self): + class A(object): + pass + a = A() + a.a = a + gc.collect() + del a + self.assertNotEqual(gc.collect(), 0) + class B(list): + pass + class C(B, A): + pass + a = C() + a.a = a + gc.collect() + del a + self.assertNotEqual(gc.collect(), 0) + del B, C + self.assertNotEqual(gc.collect(), 0) + A.a = A() + del A + self.assertNotEqual(gc.collect(), 0) + self.assertEqual(gc.collect(), 0) + + def test_method(self): + # Tricky: self.__init__ is a bound method, it references the instance. + class A: + def __init__(self): + self.init = self.__init__ + a = A() + gc.collect() + del a + self.assertNotEqual(gc.collect(), 0) + + @cpython_only + def test_legacy_finalizer(self): + # A() is uncollectable if it is part of a cycle, make sure it shows up + # in gc.garbage. + @with_tp_del + class A: + def __tp_del__(self): pass + class B: + pass + a = A() + a.a = a + id_a = id(a) + b = B() + b.b = b + gc.collect() + del a + del b + self.assertNotEqual(gc.collect(), 0) + for obj in gc.garbage: + if id(obj) == id_a: + del obj.a + break + else: + self.fail("didn't find obj in garbage (finalizer)") + gc.garbage.remove(obj) + + @cpython_only + def test_legacy_finalizer_newclass(self): + # A() is uncollectable if it is part of a cycle, make sure it shows up + # in gc.garbage. + @with_tp_del + class A(object): + def __tp_del__(self): pass + class B(object): + pass + a = A() + a.a = a + id_a = id(a) + b = B() + b.b = b + gc.collect() + del a + del b + self.assertNotEqual(gc.collect(), 0) + for obj in gc.garbage: + if id(obj) == id_a: + del obj.a + break + else: + self.fail("didn't find obj in garbage (finalizer)") + gc.garbage.remove(obj) + + def test_function(self): + # Tricky: f -> d -> f, code should call d.clear() after the exec to + # break the cycle. + d = {} + exec("def f(): pass\n", d) + gc.collect() + del d + # In the free-threaded build, the count returned by `gc.collect()` + # is 3 because it includes f's code object. + self.assertIn(gc.collect(), (2, 3)) + + def test_function_tp_clear_leaves_consistent_state(self): + # https://github.com/python/cpython/issues/91636 + code = """if 1: + + import gc + import weakref + + class LateFin: + __slots__ = ('ref',) + + def __del__(self): + + # 8. Now `latefin`'s finalizer is called. Here we + # obtain a reference to `func`, which is currently + # undergoing `tp_clear`. + global func + func = self.ref() + + class Cyclic(tuple): + __slots__ = () + + # 4. The finalizers of all garbage objects are called. In + # this case this is only us as `func` doesn't have a + # finalizer. + def __del__(self): + + # 5. Create a weakref to `func` now. If we had created + # it earlier, it would have been cleared by the + # garbage collector before calling the finalizers. + self[1].ref = weakref.ref(self[0]) + + # 6. Drop the global reference to `latefin`. The only + # remaining reference is the one we have. + global latefin + del latefin + + # 7. Now `func` is `tp_clear`-ed. This drops the last + # reference to `Cyclic`, which gets `tp_dealloc`-ed. + # This drops the last reference to `latefin`. + + latefin = LateFin() + def func(): + pass + cyc = tuple.__new__(Cyclic, (func, latefin)) + + # 1. Create a reference cycle of `cyc` and `func`. + func.__module__ = cyc + + # 2. Make the cycle unreachable, but keep the global reference + # to `latefin` so that it isn't detected as garbage. This + # way its finalizer will not be called immediately. + del func, cyc + + # 3. Invoke garbage collection, + # which will find `cyc` and `func` as garbage. + gc.collect() + + # 9. Previously, this would crash because `func_qualname` + # had been NULL-ed out by func_clear(). + print(f"{func=}") + """ + # We're mostly just checking that this doesn't crash. + rc, stdout, stderr = assert_python_ok("-c", code) + self.assertEqual(rc, 0) + self.assertRegex(stdout, rb"""\A\s*func=\s*\z""") + self.assertFalse(stderr) + + @refcount_test + def test_frame(self): + def f(): + frame = sys._getframe() + gc.collect() + f() + self.assertEqual(gc.collect(), 1) + + def test_saveall(self): + # Verify that cyclic garbage like lists show up in gc.garbage if the + # SAVEALL option is enabled. + + # First make sure we don't save away other stuff that just happens to + # be waiting for collection. + gc.collect() + # if this fails, someone else created immortal trash + self.assertEqual(gc.garbage, []) + + L = [] + L.append(L) + id_L = id(L) + + debug = gc.get_debug() + gc.set_debug(debug | gc.DEBUG_SAVEALL) + del L + gc.collect() + gc.set_debug(debug) + + self.assertEqual(len(gc.garbage), 1) + obj = gc.garbage.pop() + self.assertEqual(id(obj), id_L) + + def test_del(self): + # __del__ methods can trigger collection, make this to happen + thresholds = gc.get_threshold() + gc.enable() + gc.set_threshold(1) + + class A: + def __del__(self): + dir(self) + a = A() + del a + + gc.disable() + gc.set_threshold(*thresholds) + + def test_del_newclass(self): + # __del__ methods can trigger collection, make this to happen + thresholds = gc.get_threshold() + gc.enable() + gc.set_threshold(1) + + class A(object): + def __del__(self): + dir(self) + a = A() + del a + + gc.disable() + gc.set_threshold(*thresholds) + + # The following two tests are fragile: + # They precisely count the number of allocations, + # which is highly implementation-dependent. + # For example, disposed tuples are not freed, but reused. + # To minimize variations, though, we first store the get_count() results + # and check them at the end. + @refcount_test + @requires_gil_enabled('needs precise allocation counts') + def test_get_count(self): + gc.collect() + a, b, c = gc.get_count() + x = [] + d, e, f = gc.get_count() + self.assertEqual((b, c), (0, 0)) + self.assertEqual((e, f), (0, 0)) + # This is less fragile than asserting that a equals 0. + self.assertLess(a, 5) + # Between the two calls to get_count(), at least one object was + # created (the list). + self.assertGreater(d, a) + + @refcount_test + def test_collect_generations(self): + gc.collect() + # This object will "trickle" into generation N + 1 after + # each call to collect(N) + x = [] + gc.collect(0) + # x is now in the old gen + a, b, c = gc.get_count() + # We don't check a since its exact values depends on + # internal implementation details of the interpreter. + self.assertEqual((b, c), (1, 0)) + + def test_trashcan(self): + class Ouch: + n = 0 + def __del__(self): + Ouch.n = Ouch.n + 1 + if Ouch.n % 17 == 0: + gc.collect() + + # "trashcan" is a hack to prevent stack overflow when deallocating + # very deeply nested tuples etc. It works in part by abusing the + # type pointer and refcount fields, and that can yield horrible + # problems when gc tries to traverse the structures. + # If this test fails (as it does in 2.0, 2.1 and 2.2), it will + # most likely die via segfault. + + # Note: In 2.3 the possibility for compiling without cyclic gc was + # removed, and that in turn allows the trashcan mechanism to work + # via much simpler means (e.g., it never abuses the type pointer or + # refcount fields anymore). Since it's much less likely to cause a + # problem now, the various constants in this expensive (we force a lot + # of full collections) test are cut back from the 2.2 version. + gc.enable() + N = 150 + for count in range(2): + t = [] + for i in range(N): + t = [t, Ouch()] + u = [] + for i in range(N): + u = [u, Ouch()] + v = {} + for i in range(N): + v = {1: v, 2: Ouch()} + gc.disable() + + @threading_helper.requires_working_threading() + def test_trashcan_threads(self): + # Issue #13992: trashcan mechanism should be thread-safe + NESTING = 60 + N_THREADS = 2 + + def sleeper_gen(): + """A generator that releases the GIL when closed or dealloc'ed.""" + try: + yield + finally: + time.sleep(0.000001) + + class C(list): + # Appending to a list is atomic, which avoids the use of a lock. + inits = [] + dels = [] + def __init__(self, alist): + self[:] = alist + C.inits.append(None) + def __del__(self): + # This __del__ is called by subtype_dealloc(). + C.dels.append(None) + # `g` will release the GIL when garbage-collected. This + # helps assert subtype_dealloc's behaviour when threads + # switch in the middle of it. + g = sleeper_gen() + next(g) + # Now that __del__ is finished, subtype_dealloc will proceed + # to call list_dealloc, which also uses the trashcan mechanism. + + def make_nested(): + """Create a sufficiently nested container object so that the + trashcan mechanism is invoked when deallocating it.""" + x = C([]) + for i in range(NESTING): + x = [C([x])] + del x + + def run_thread(): + """Exercise make_nested() in a loop.""" + while not exit: + make_nested() + + old_switchinterval = sys.getswitchinterval() + support.setswitchinterval(1e-5) + try: + exit = [] + threads = [] + for i in range(N_THREADS): + t = threading.Thread(target=run_thread) + threads.append(t) + with threading_helper.start_threads(threads, lambda: exit.append(1)): + time.sleep(1.0) + finally: + sys.setswitchinterval(old_switchinterval) + gc.collect() + self.assertEqual(len(C.inits), len(C.dels)) + + def test_boom(self): + class Boom: + def __getattr__(self, someattribute): + del self.attr + raise AttributeError + + a = Boom() + b = Boom() + a.attr = b + b.attr = a + + gc.collect() + garbagelen = len(gc.garbage) + del a, b + # a<->b are in a trash cycle now. Collection will invoke + # Boom.__getattr__ (to see whether a and b have __del__ methods), and + # __getattr__ deletes the internal "attr" attributes as a side effect. + # That causes the trash cycle to get reclaimed via refcounts falling to + # 0, thus mutating the trash graph as a side effect of merely asking + # whether __del__ exists. This used to (before 2.3b1) crash Python. + # Now __getattr__ isn't called. + self.assertEqual(gc.collect(), 2) + self.assertEqual(len(gc.garbage), garbagelen) + + def test_boom2(self): + class Boom2: + def __init__(self): + self.x = 0 + + def __getattr__(self, someattribute): + self.x += 1 + if self.x > 1: + del self.attr + raise AttributeError + + a = Boom2() + b = Boom2() + a.attr = b + b.attr = a + + gc.collect() + garbagelen = len(gc.garbage) + del a, b + # Much like test_boom(), except that __getattr__ doesn't break the + # cycle until the second time gc checks for __del__. As of 2.3b1, + # there isn't a second time, so this simply cleans up the trash cycle. + # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get + # reclaimed this way. + self.assertEqual(gc.collect(), 2) + self.assertEqual(len(gc.garbage), garbagelen) + + def test_get_referents(self): + alist = [1, 3, 5] + got = gc.get_referents(alist) + got.sort() + self.assertEqual(got, alist) + + atuple = tuple(alist) + got = gc.get_referents(atuple) + got.sort() + self.assertEqual(got, alist) + + adict = {1: 3, 5: 7} + expected = [1, 3, 5, 7] + got = gc.get_referents(adict) + got.sort() + self.assertEqual(got, expected) + + got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0)) + got.sort() + self.assertEqual(got, [0, 0] + list(range(5))) + + self.assertEqual(gc.get_referents(1, 'a', 4j), []) + + def test_is_tracked(self): + # Atomic built-in types are not tracked, user-defined objects and + # mutable containers are. + # NOTE: types with special optimizations (e.g. tuple) have tests + # in their own test files instead. + self.assertFalse(gc.is_tracked(None)) + self.assertFalse(gc.is_tracked(1)) + self.assertFalse(gc.is_tracked(1.0)) + self.assertFalse(gc.is_tracked(1.0 + 5.0j)) + self.assertFalse(gc.is_tracked(True)) + self.assertFalse(gc.is_tracked(False)) + self.assertFalse(gc.is_tracked(b"a")) + self.assertFalse(gc.is_tracked("a")) + self.assertFalse(gc.is_tracked(bytearray(b"a"))) + self.assertFalse(gc.is_tracked(type)) + self.assertFalse(gc.is_tracked(int)) + self.assertFalse(gc.is_tracked(object)) + self.assertFalse(gc.is_tracked(object())) + + class UserClass: + pass + + class UserInt(int): + pass + + # Base class is object; no extra fields. + class UserClassSlots: + __slots__ = () + + # Base class is fixed size larger than object; no extra fields. + class UserFloatSlots(float): + __slots__ = () + + # Base class is variable size; no extra fields. + class UserIntSlots(int): + __slots__ = () + + self.assertTrue(gc.is_tracked(gc)) + self.assertTrue(gc.is_tracked(UserClass)) + self.assertTrue(gc.is_tracked(UserClass())) + self.assertTrue(gc.is_tracked(UserInt())) + self.assertTrue(gc.is_tracked([])) + self.assertTrue(gc.is_tracked(set())) + self.assertTrue(gc.is_tracked(UserClassSlots())) + self.assertTrue(gc.is_tracked(UserFloatSlots())) + self.assertTrue(gc.is_tracked(UserIntSlots())) + + def test_is_finalized(self): + # Objects not tracked by the always gc return false + self.assertFalse(gc.is_finalized(3)) + + storage = [] + class Lazarus: + def __del__(self): + storage.append(self) + + lazarus = Lazarus() + self.assertFalse(gc.is_finalized(lazarus)) + + del lazarus + gc.collect() + + lazarus = storage.pop() + self.assertTrue(gc.is_finalized(lazarus)) + + def test_bug1055820b(self): + # Corresponds to temp2b.py in the bug report. + + ouch = [] + def callback(ignored): + ouch[:] = [wr() for wr in WRs] + + Cs = [C1055820(i) for i in range(2)] + WRs = [weakref.ref(c, callback) for c in Cs] + c = None + + gc.collect() + self.assertEqual(len(ouch), 0) + # Make the two instances trash, and collect again. The bug was that + # the callback materialized a strong reference to an instance, but gc + # cleared the instance's dict anyway. + Cs = None + gc.collect() + self.assertEqual(len(ouch), 2) # else the callbacks didn't run + for x in ouch: + # If the callback resurrected one of these guys, the instance + # would be damaged, with an empty __dict__. + self.assertEqual(x, None) + + def test_bug21435(self): + # This is a poor test - its only virtue is that it happened to + # segfault on Tim's Windows box before the patch for 21435 was + # applied. That's a nasty bug relying on specific pieces of cyclic + # trash appearing in exactly the right order in finalize_garbage()'s + # input list. + # But there's no reliable way to force that order from Python code, + # so over time chances are good this test won't really be testing much + # of anything anymore. Still, if it blows up, there's _some_ + # problem ;-) + gc.collect() + + class A: + pass + + class B: + def __init__(self, x): + self.x = x + + def __del__(self): + self.attr = None + + def do_work(): + a = A() + b = B(A()) + + a.attr = b + b.attr = a + + do_work() + gc.collect() # this blows up (bad C pointer) when it fails + + @cpython_only + @requires_subprocess() + @unittest.skipIf(_testcapi is None, "requires _testcapi") + def test_garbage_at_shutdown(self): + import subprocess + code = """if 1: + import gc + import _testcapi + @_testcapi.with_tp_del + class X: + def __init__(self, name): + self.name = name + def __repr__(self): + return "" %% self.name + def __tp_del__(self): + pass + + x = X('first') + x.x = x + x.y = X('second') + del x + gc.set_debug(%s) + """ + def run_command(code): + p = subprocess.Popen([sys.executable, "-Wd", "-c", code], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + p.stdout.close() + p.stderr.close() + self.assertEqual(p.returncode, 0) + self.assertEqual(stdout, b"") + return stderr + + stderr = run_command(code % "0") + self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " + b"shutdown; use", stderr) + self.assertNotIn(b"", stderr) + one_line_re = b"gc: uncollectable " + expected_re = one_line_re + b"\r?\n" + one_line_re + self.assertNotRegex(stderr, expected_re) + # With DEBUG_UNCOLLECTABLE, the garbage list gets printed + stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE") + self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " + b"shutdown", stderr) + self.assertTrue( + (b"[, ]" in stderr) or + (b"[, ]" in stderr), stderr) + # we expect two lines with uncollectable objects + self.assertRegex(stderr, expected_re) + # With DEBUG_SAVEALL, no additional message should get printed + # (because gc.garbage also contains normally reclaimable cyclic + # references, and its elements get printed at runtime anyway). + stderr = run_command(code % "gc.DEBUG_SAVEALL") + self.assertNotIn(b"uncollectable objects at shutdown", stderr) + + def test_gc_main_module_at_shutdown(self): + # Create a reference cycle through the __main__ module and check + # it gets collected at interpreter shutdown. + code = """if 1: + class C: + def __del__(self): + print('__del__ called') + l = [C()] + l.append(l) + """ + rc, out, err = assert_python_ok('-c', code) + self.assertEqual(out.strip(), b'__del__ called') + + def test_gc_ordinary_module_at_shutdown(self): + # Same as above, but with a non-__main__ module. + with temp_dir() as script_dir: + module = """if 1: + class C: + def __del__(self): + print('__del__ called') + l = [C()] + l.append(l) + """ + code = """if 1: + import sys + sys.path.insert(0, %r) + import gctest + """ % (script_dir,) + make_script(script_dir, 'gctest', module) + rc, out, err = assert_python_ok('-c', code) + self.assertEqual(out.strip(), b'__del__ called') + + @unittest.skipIf(Py_GIL_DISABLED, "requires GC generations or increments") + def test_gc_debug_stats(self): + # Checks that debug information is printed to stderr + # when DEBUG_STATS is set. + code = """if 1: + import gc + gc.set_debug(%s) + gc.collect() + """ + _, _, err = assert_python_ok("-c", code % "gc.DEBUG_STATS") + self.assertRegex(err, b"gc: collecting generation [0-9]+") + self.assertRegex( + err, + b"gc: objects in each generation: [0-9]+ [0-9]+ [0-9]+", + ) + self.assertRegex( + err, b"gc: objects in permanent generation: [0-9]+" + ) + self.assertRegex( + err, + b"gc: done, .* unreachable, .* uncollectable, .* elapsed", + ) + + _, _, err = assert_python_ok("-c", code % "0") + self.assertNotIn(b"elapsed", err) + + def test_global_del_SystemExit(self): + code = """if 1: + class ClassWithDel: + def __del__(self): + print('__del__ called') + a = ClassWithDel() + a.link = a + raise SystemExit(0)""" + self.addCleanup(unlink, TESTFN) + with open(TESTFN, 'w', encoding="utf-8") as script: + script.write(code) + rc, out, err = assert_python_ok(TESTFN) + self.assertEqual(out.strip(), b'__del__ called') + + def test_get_stats(self): + stats = gc.get_stats() + self.assertEqual(len(stats), 3) + for st in stats: + self.assertIsInstance(st, dict) + self.assertEqual(set(st), + {"collected", "collections", "uncollectable"}) + self.assertGreaterEqual(st["collected"], 0) + self.assertGreaterEqual(st["collections"], 0) + self.assertGreaterEqual(st["uncollectable"], 0) + # Check that collection counts are incremented correctly + if gc.isenabled(): + self.addCleanup(gc.enable) + gc.disable() + old = gc.get_stats() + gc.collect(0) + new = gc.get_stats() + self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) + self.assertEqual(new[1]["collections"], old[1]["collections"]) + self.assertEqual(new[2]["collections"], old[2]["collections"]) + gc.collect(2) + new = gc.get_stats() + self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) + self.assertEqual(new[1]["collections"], old[1]["collections"]) + self.assertEqual(new[2]["collections"], old[2]["collections"] + 1) + + def test_freeze(self): + gc.freeze() + self.assertGreater(gc.get_freeze_count(), 0) + gc.unfreeze() + self.assertEqual(gc.get_freeze_count(), 0) + + def test_get_objects(self): + gc.collect() + l = [] + l.append(l) + self.assertTrue( + any(l is element for element in gc.get_objects()) + ) + + @requires_gil_enabled('need generational GC') + def test_get_objects_generations(self): + gc.collect() + l = [] + l.append(l) + self.assertTrue( + any(l is element for element in gc.get_objects(generation=0)) + ) + gc.collect() + self.assertFalse( + any(l is element for element in gc.get_objects(generation=0)) + ) + del l + gc.collect() + + def test_get_objects_arguments(self): + gc.collect() + self.assertEqual(len(gc.get_objects()), + len(gc.get_objects(generation=None))) + + self.assertRaises(ValueError, gc.get_objects, 1000) + self.assertRaises(ValueError, gc.get_objects, -1000) + self.assertRaises(TypeError, gc.get_objects, "1") + self.assertRaises(TypeError, gc.get_objects, 1.234) + + def test_resurrection_only_happens_once_per_object(self): + class A: # simple self-loop + def __init__(self): + self.me = self + + class Lazarus(A): + resurrected = 0 + resurrected_instances = [] + + def __del__(self): + Lazarus.resurrected += 1 + Lazarus.resurrected_instances.append(self) + + gc.collect() + gc.disable() + + # We start with 0 resurrections + laz = Lazarus() + self.assertEqual(Lazarus.resurrected, 0) + + # Deleting the instance and triggering a collection + # resurrects the object + del laz + gc.collect() + self.assertEqual(Lazarus.resurrected, 1) + self.assertEqual(len(Lazarus.resurrected_instances), 1) + + # Clearing the references and forcing a collection + # should not resurrect the object again. + Lazarus.resurrected_instances.clear() + self.assertEqual(Lazarus.resurrected, 1) + gc.collect() + self.assertEqual(Lazarus.resurrected, 1) + + gc.enable() + + def test_resurrection_is_transitive(self): + class Cargo: + def __init__(self): + self.me = self + + class Lazarus: + resurrected_instances = [] + + def __del__(self): + Lazarus.resurrected_instances.append(self) + + gc.collect() + gc.disable() + + laz = Lazarus() + cargo = Cargo() + cargo_id = id(cargo) + + # Create a cycle between cargo and laz + laz.cargo = cargo + cargo.laz = laz + + # Drop the references, force a collection and check that + # everything was resurrected. + del laz, cargo + gc.collect() + self.assertEqual(len(Lazarus.resurrected_instances), 1) + instance = Lazarus.resurrected_instances.pop() + self.assertHasAttr(instance, "cargo") + self.assertEqual(id(instance.cargo), cargo_id) + + gc.collect() + gc.enable() + + def test_resurrection_does_not_block_cleanup_of_other_objects(self): + + # When a finalizer resurrects objects, stats were reporting them as + # having been collected. This affected both collect()'s return + # value and the dicts returned by get_stats(). + N = 100 + + class A: # simple self-loop + def __init__(self): + self.me = self + + class Z(A): # resurrecting __del__ + def __del__(self): + zs.append(self) + + zs = [] + + def getstats(): + d = gc.get_stats()[-1] + return d['collected'], d['uncollectable'] + + gc.collect() + gc.disable() + + # No problems if just collecting A() instances. + oldc, oldnc = getstats() + for i in range(N): + A() + t = gc.collect() + c, nc = getstats() + self.assertEqual(t, N) # instance objects + self.assertEqual(c - oldc, N) + self.assertEqual(nc - oldnc, 0) + + # But Z() is not actually collected. + oldc, oldnc = c, nc + Z() + # Nothing is collected - Z() is merely resurrected. + t = gc.collect() + c, nc = getstats() + self.assertEqual(t, 0) + self.assertEqual(c - oldc, 0) + self.assertEqual(nc - oldnc, 0) + + # Z() should not prevent anything else from being collected. + oldc, oldnc = c, nc + for i in range(N): + A() + Z() + t = gc.collect() + c, nc = getstats() + self.assertEqual(t, N) + self.assertEqual(c - oldc, N) + self.assertEqual(nc - oldnc, 0) + + # The A() trash should have been reclaimed already but the + # 2 copies of Z are still in zs (and the associated dicts). + oldc, oldnc = c, nc + zs.clear() + t = gc.collect() + c, nc = getstats() + self.assertEqual(t, 2) + self.assertEqual(c - oldc, 2) + self.assertEqual(nc - oldnc, 0) + + gc.enable() + + @unittest.skipIf(ContainerNoGC is None, + 'requires ContainerNoGC extension type') + def test_trash_weakref_clear(self): + # Test that trash weakrefs are properly cleared (bpo-38006). + # + # Structure we are creating: + # + # Z <- Y <- A--+--> WZ -> C + # ^ | + # +--+ + # where: + # WZ is a weakref to Z with callback C + # Y doesn't implement tp_traverse + # A contains a reference to itself, Y and WZ + # + # A, Y, Z, WZ are all trash. The GC doesn't know that Z is trash + # because Y does not implement tp_traverse. To show the bug, WZ needs + # to live long enough so that Z is deallocated before it. Then, if + # gcmodule is buggy, when Z is being deallocated, C will run. + # + # To ensure WZ lives long enough, we put it in a second reference + # cycle. That trick only works due to the ordering of the GC prev/next + # linked lists. So, this test is a bit fragile. + # + # The bug reported in bpo-38006 is caused because the GC did not + # clear WZ before starting the process of calling tp_clear on the + # trash. Normally, handle_weakrefs() would find the weakref via Z and + # clear it. However, since the GC cannot find Z, WR is not cleared and + # it can execute during delete_garbage(). That can lead to disaster + # since the callback might tinker with objects that have already had + # tp_clear called on them (leaving them in possibly invalid states). + + callback = unittest.mock.Mock() + + class A: + __slots__ = ['a', 'y', 'wz'] + + class Z: + pass + + # setup required object graph, as described above + a = A() + a.a = a + a.y = ContainerNoGC(Z()) + a.wz = weakref.ref(a.y.value, callback) + # create second cycle to keep WZ alive longer + wr_cycle = [a.wz] + wr_cycle.append(wr_cycle) + # ensure trash unrelated to this test is gone + gc.collect() + gc.disable() + # release references and create trash + del a, wr_cycle + gc.collect() + # if called, it means there is a bug in the GC. The weakref should be + # cleared before Z dies. + callback.assert_not_called() + gc.enable() + + @cpython_only + def test_get_referents_on_capsule(self): + # gh-124538: Calling gc.get_referents() on an untracked capsule must not crash. + import _datetime + import _socket + untracked_capsule = _datetime.datetime_CAPI + tracked_capsule = _socket.CAPI + + # For whoever sees this in the future: if this is failing + # after making datetime's capsule tracked, that's fine -- this isn't something + # users are relying on. Just find a different capsule that is untracked. + self.assertFalse(gc.is_tracked(untracked_capsule)) + self.assertTrue(gc.is_tracked(tracked_capsule)) + + self.assertEqual(len(gc.get_referents(untracked_capsule)), 0) + gc.get_referents(tracked_capsule) + + @cpython_only + def test_get_objects_during_gc(self): + # gh-125859: Calling gc.get_objects() or gc.get_referrers() during a + # collection should not crash. + test = self + collected = False + + class GetObjectsOnDel: + def __del__(self): + nonlocal collected + collected = True + objs = gc.get_objects() + # NB: can't use "in" here because some objects override __eq__ + for obj in objs: + test.assertTrue(obj is not self) + test.assertEqual(gc.get_referrers(self), []) + + obj = GetObjectsOnDel() + obj.cycle = obj + del obj + + gc.collect() + self.assertTrue(collected) + + def test_traverse_frozen_objects(self): + # See GH-126312: Objects that were not frozen could traverse over + # a frozen object on the free-threaded build, which would cause + # a negative reference count. + x = [1, 2, 3] + gc.freeze() + y = [x] + y.append(y) + del y + gc.collect() + gc.unfreeze() + + def test_deferred_refcount_frozen(self): + # Also from GH-126312: objects that use deferred reference counting + # weren't ignored if they were frozen. Unfortunately, it's pretty + # difficult to come up with a case that triggers this. + # + # Calling gc.collect() while the garbage collector is frozen doesn't + # trigger this normally, but it *does* if it's inside unittest for whatever + # reason. We can't call unittest from inside a test, so it has to be + # in a subprocess. + source = textwrap.dedent(""" + import gc + import unittest + + + class Test(unittest.TestCase): + def test_something(self): + gc.freeze() + gc.collect() + gc.unfreeze() + + + if __name__ == "__main__": + unittest.main() + """) + assert_python_ok("-c", source) + + + @unittest.skipUnless(Py_GIL_DISABLED, "requires free-threaded GC") + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + def test_tuple_untrack_counts(self): + # This ensures that the free-threaded GC is counting untracked tuples + # in the "long_lived_total" count. This is required to avoid + # performance issues from running the GC too frequently. See + # GH-142531 as an example. + gc.collect() + count = _testinternalcapi.get_long_lived_total() + n = 20_000 + tuples = [(x,) for x in range(n)] + gc.collect() + new_count = _testinternalcapi.get_long_lived_total() + self.assertFalse(gc.is_tracked(tuples[0])) + # Use n // 2 just in case some other objects were collected. + self.assertTrue(new_count - count > (n // 2)) + + +class IncrementalGCTests(unittest.TestCase): + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + @requires_gil_enabled("Free threading does not support incremental GC") + def test_incremental_gc_handles_fast_cycle_creation(self): + # Run this test in a fresh process. The number of alive objects (which can + # be from unit tests run before this one) can influence how quickly cyclic + # garbage is found. + script = support.findfile("_test_gc_fast_cycles.py") + run_test_script(script) + + +class GCCallbackTests(unittest.TestCase): + def setUp(self): + # Save gc state and disable it. + self.enabled = gc.isenabled() + gc.disable() + self.debug = gc.get_debug() + gc.set_debug(0) + gc.callbacks.append(self.cb1) + gc.callbacks.append(self.cb2) + self.othergarbage = [] + + def tearDown(self): + # Restore gc state + del self.visit + gc.callbacks.remove(self.cb1) + gc.callbacks.remove(self.cb2) + gc.set_debug(self.debug) + if self.enabled: + gc.enable() + # destroy any uncollectables + gc.collect() + for obj in gc.garbage: + if isinstance(obj, Uncollectable): + obj.partner = None + del gc.garbage[:] + del self.othergarbage + gc.collect() + + def preclean(self): + # Remove all fluff from the system. Invoke this function + # manually rather than through self.setUp() for maximum + # safety. + self.visit = [] + gc.collect() + garbage, gc.garbage[:] = gc.garbage[:], [] + self.othergarbage.append(garbage) + self.visit = [] + + def cb1(self, phase, info): + self.visit.append((1, phase, dict(info))) + + def cb2(self, phase, info): + self.visit.append((2, phase, dict(info))) + if phase == "stop" and hasattr(self, "cleanup"): + # Clean Uncollectable from garbage + uc = [e for e in gc.garbage if isinstance(e, Uncollectable)] + gc.garbage[:] = [e for e in gc.garbage + if not isinstance(e, Uncollectable)] + for e in uc: + e.partner = None + + def test_collect(self): + self.preclean() + gc.collect() + # Algorithmically verify the contents of self.visit + # because it is long and tortuous. + + # Count the number of visits to each callback + n = [v[0] for v in self.visit] + n1 = [i for i in n if i == 1] + n2 = [i for i in n if i == 2] + self.assertEqual(n1, [1]*2) + self.assertEqual(n2, [2]*2) + + # Count that we got the right number of start and stop callbacks. + n = [v[1] for v in self.visit] + n1 = [i for i in n if i == "start"] + n2 = [i for i in n if i == "stop"] + self.assertEqual(n1, ["start"]*2) + self.assertEqual(n2, ["stop"]*2) + + # Check that we got the right info dict for all callbacks + for v in self.visit: + info = v[2] + self.assertTrue("generation" in info) + self.assertTrue("collected" in info) + self.assertTrue("uncollectable" in info) + + def test_collect_generation(self): + self.preclean() + gc.collect(2) + for v in self.visit: + info = v[2] + self.assertEqual(info["generation"], 2) + + @cpython_only + def test_collect_garbage(self): + self.preclean() + # Each of these cause two objects to be garbage: + Uncollectable() + Uncollectable() + C1055820(666) + gc.collect() + for v in self.visit: + if v[1] != "stop": + continue + info = v[2] + self.assertEqual(info["collected"], 1) + self.assertEqual(info["uncollectable"], 4) + + # We should now have the Uncollectables in gc.garbage + self.assertEqual(len(gc.garbage), 4) + for e in gc.garbage: + self.assertIsInstance(e, Uncollectable) + + # Now, let our callback handle the Uncollectable instances + self.cleanup=True + self.visit = [] + gc.garbage[:] = [] + gc.collect() + for v in self.visit: + if v[1] != "stop": + continue + info = v[2] + self.assertEqual(info["collected"], 0) + self.assertEqual(info["uncollectable"], 2) + + # Uncollectables should be gone + self.assertEqual(len(gc.garbage), 0) + + + @requires_subprocess() + @unittest.skipIf(BUILD_WITH_NDEBUG, + 'built with -NDEBUG') + def test_refcount_errors(self): + self.preclean() + # Verify the "handling" of objects with broken refcounts + + # Skip the test if ctypes is not available + import_module("ctypes") + + import subprocess + code = textwrap.dedent(''' + from test.support import gc_collect, SuppressCrashReport + + a = [1, 2, 3] + b = [a, a] + a.append(b) + + # Avoid coredump when Py_FatalError() calls abort() + SuppressCrashReport().__enter__() + + # Simulate the refcount of "a" being too low (compared to the + # references held on it by live data), but keeping it above zero + # (to avoid deallocating it): + import ctypes + ctypes.pythonapi.Py_DecRef(ctypes.py_object(a)) + del a + del b + + # The garbage collector should now have a fatal error + # when it reaches the broken object + gc_collect() + ''') + p = subprocess.Popen([sys.executable, "-c", code], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + p.stdout.close() + p.stderr.close() + # Verify that stderr has a useful error message: + self.assertRegex(stderr, + br'gc.*\.c:[0-9]+: .*: Assertion "gc_get_refs\(.+\) .*" failed.') + self.assertRegex(stderr, + br'refcount is too small') + # "address : 0x7fb5062efc18" + # "address : 7FB5062EFC18" + address_regex = br'[0-9a-fA-Fx]+' + self.assertRegex(stderr, + br'object address : ' + address_regex) + self.assertRegex(stderr, + br'object refcount : 1') + self.assertRegex(stderr, + br'object type : ' + address_regex) + self.assertRegex(stderr, + br'object type name: list') + self.assertRegex(stderr, + br'object repr : \[1, 2, 3, \[\[...\], \[...\]\]\]') + + +class GCTogglingTests(unittest.TestCase): + def setUp(self): + gc.enable() + + def tearDown(self): + gc.disable() + + @unittest.skipIf(Py_GIL_DISABLED, "requires GC generations or increments") + def test_bug1055820c(self): + # Corresponds to temp2c.py in the bug report. This is pretty + # elaborate. + + c0 = C1055820(0) + # Move c0 into generation 2. + gc.collect() + + c1 = C1055820(1) + c1.keep_c0_alive = c0 + del c0.loop # now only c1 keeps c0 alive + + c2 = C1055820(2) + c2wr = weakref.ref(c2) # no callback! + + ouch = [] + def callback(ignored): + ouch[:] = [c2wr()] + + # The callback gets associated with a wr on an object in generation 2. + c0wr = weakref.ref(c0, callback) + + c0 = c1 = c2 = None + + # What we've set up: c0, c1, and c2 are all trash now. c0 is in + # generation 2. The only thing keeping it alive is that c1 points to + # it. c1 and c2 are in generation 0, and are in self-loops. There's a + # global weakref to c2 (c2wr), but that weakref has no callback. + # There's also a global weakref to c0 (c0wr), and that does have a + # callback, and that callback references c2 via c2wr(). + # + # c0 has a wr with callback, which references c2wr + # ^ + # | + # | Generation 2 above dots + #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . + # | Generation 0 below dots + # | + # | + # ^->c1 ^->c2 has a wr but no callback + # | | | | + # <--v <--v + # + # So this is the nightmare: when generation 0 gets collected, we see + # that c2 has a callback-free weakref, and c1 doesn't even have a + # weakref. Collecting generation 0 doesn't see c0 at all, and c0 is + # the only object that has a weakref with a callback. gc clears c1 + # and c2. Clearing c1 has the side effect of dropping the refcount on + # c0 to 0, so c0 goes away (despite that it's in an older generation) + # and c0's wr callback triggers. That in turn materializes a reference + # to c2 via c2wr(), but c2 gets cleared anyway by gc. + + # We want to let gc happen "naturally", to preserve the distinction + # between generations. + junk = [] + i = 0 + detector = GC_Detector() + if Py_GIL_DISABLED: + # The free-threaded build doesn't have multiple generations, so + # just trigger a GC manually. + gc.collect() + assert not detector.gc_happened + while not detector.gc_happened: + i += 1 + if i > 100000: + self.fail("gc didn't happen after 100000 iterations") + self.assertEqual(len(ouch), 0) + junk.append([]) # this will eventually trigger gc + + self.assertEqual(len(ouch), 1) # else the callback wasn't invoked + for x in ouch: + # If the callback resurrected c2, the instance would be damaged, + # with an empty __dict__. + self.assertEqual(x, None) + + @gc_threshold(1000, 0, 0) + def test_bug1055820d(self): + # Corresponds to temp2d.py in the bug report. This is very much like + # test_bug1055820c, but uses a __del__ method instead of a weakref + # callback to sneak in a resurrection of cyclic trash. + + ouch = [] + class D(C1055820): + def __del__(self): + ouch[:] = [c2wr()] + + d0 = D(0) + # Move all the above into generation 2. + gc.collect() + + c1 = C1055820(1) + c1.keep_d0_alive = d0 + del d0.loop # now only c1 keeps d0 alive + + c2 = C1055820(2) + c2wr = weakref.ref(c2) # no callback! + + d0 = c1 = c2 = None + + # What we've set up: d0, c1, and c2 are all trash now. d0 is in + # generation 2. The only thing keeping it alive is that c1 points to + # it. c1 and c2 are in generation 0, and are in self-loops. There's + # a global weakref to c2 (c2wr), but that weakref has no callback. + # There are no other weakrefs. + # + # d0 has a __del__ method that references c2wr + # ^ + # | + # | Generation 2 above dots + #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . + # | Generation 0 below dots + # | + # | + # ^->c1 ^->c2 has a wr but no callback + # | | | | + # <--v <--v + # + # So this is the nightmare: when generation 0 gets collected, we see + # that c2 has a callback-free weakref, and c1 doesn't even have a + # weakref. Collecting generation 0 doesn't see d0 at all. gc clears + # c1 and c2. Clearing c1 has the side effect of dropping the refcount + # on d0 to 0, so d0 goes away (despite that it's in an older + # generation) and d0's __del__ triggers. That in turn materializes + # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc. + + # We want to let gc happen "naturally", to preserve the distinction + # between generations. + detector = GC_Detector() + junk = [] + i = 0 + if Py_GIL_DISABLED: + # The free-threaded build doesn't have multiple generations, so + # just trigger a GC manually. + gc.collect() + while not detector.gc_happened: + i += 1 + if i > 50000: + self.fail("gc didn't happen after 50000 iterations") + self.assertEqual(len(ouch), 0) + junk.append([]) # this will eventually trigger gc + + self.assertEqual(len(ouch), 1) # else __del__ wasn't invoked + for x in ouch: + # If __del__ resurrected c2, the instance would be damaged, with an + # empty __dict__. + self.assertEqual(x, None) + + @gc_threshold(1000, 0, 0) + def test_indirect_calls_with_gc_disabled(self): + junk = [] + i = 0 + detector = GC_Detector() + while not detector.gc_happened: + i += 1 + if i > 100000: + self.fail("gc didn't happen after 100000 iterations") + junk.append([]) # this will eventually trigger gc + + try: + gc.disable() + junk = [] + i = 0 + detector = GC_Detector() + while not detector.gc_happened: + i += 1 + if i > 100000: + break + junk.append([]) # this may eventually trigger gc (if it is enabled) + + self.assertEqual(i, 100001) + finally: + gc.enable() + + # Ensure that setting *threshold0* to zero disables collection. + @gc_threshold(0) + def test_threshold_zero(self): + junk = [] + i = 0 + detector = GC_Detector() + while not detector.gc_happened: + i += 1 + if i > 50000: + break + junk.append([]) # this may eventually trigger gc (if it is enabled) + + self.assertEqual(i, 50001) + + +class PythonFinalizationTests(unittest.TestCase): + def test_ast_fini(self): + # bpo-44184: Regression test for subtype_dealloc() when deallocating + # an AST instance also destroy its AST type: subtype_dealloc() must + # not access the type memory after deallocating the instance, since + # the type memory can be freed as well. The test is also related to + # _PyAST_Fini() which clears references to AST types. + code = textwrap.dedent(""" + import ast + import codecs + from test import support + + # Small AST tree to keep their AST types alive + tree = ast.parse("def f(x, y): return 2*x-y") + + # Store the tree somewhere to survive until the last GC collection + support.late_deletion(tree) + """) + assert_python_ok("-c", code) + + def test_warnings_fini(self): + # See https://github.com/python/cpython/issues/137384 + code = textwrap.dedent(''' + import asyncio + from contextvars import ContextVar + + context_loop = ContextVar("context_loop", default=None) + loop = asyncio.new_event_loop() + context_loop.set(loop) + ''') + + assert_python_ok("-c", code) + + +def setUpModule(): + global enabled, debug + enabled = gc.isenabled() + gc.disable() + assert not gc.isenabled() + debug = gc.get_debug() + gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak + gc.collect() # Delete 2nd generation garbage + + +def tearDownModule(): + gc.set_debug(debug) + # test gc.enable() even if GC is disabled by default + if verbose: + print("restoring automatic collection") + # make sure to always test gc.enable() + gc.enable() + assert gc.isenabled() + if not enabled: + gc.disable() + + +if __name__ == "__main__": + unittest.main()