efro.debug

Utilities for debugging memory leaks or other issues.

IMPORTANT - these functions use the gc module which looks 'under the hood' at Python and sometimes returns not-fully-initialized objects, which may cause crashes or errors due to suddenly having references to them that they didn't expect, etc. See https://github.com/python/cpython/issues/59313. For this reason, these methods should NEVER be called in production code. Enable them only for debugging situations and be aware that their use may itself cause problems. The same is true for the gc module itself.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Utilities for debugging memory leaks or other issues.
  4
  5IMPORTANT - these functions use the gc module which looks 'under the hood'
  6at Python and sometimes returns not-fully-initialized objects, which may
  7cause crashes or errors due to suddenly having references to them that they
  8didn't expect, etc. See https://github.com/python/cpython/issues/59313.
  9For this reason, these methods should NEVER be called in production code.
 10Enable them only for debugging situations and be aware that their use may
 11itself cause problems. The same is true for the gc module itself.
 12"""
 13from __future__ import annotations
 14
 15import gc
 16import sys
 17import time
 18import types
 19import weakref
 20import threading
 21from typing import TYPE_CHECKING
 22
 23if TYPE_CHECKING:
 24    from typing import Any, TextIO
 25
 26    from logging import Logger
 27
 28ABS_MAX_LEVEL = 10
 29
 30# NOTE: In general we want this toolset to allow us to explore
 31# which objects are holding references to others so we can diagnose
 32# leaks/etc. It is a bit tricky to do that, however, without
 33# affecting the objects we are looking at by adding temporary references
 34# from module dicts, function scopes, etc. So we need to try to be
 35# careful about cleaning up after ourselves and explicitly avoiding
 36# returning these temporary references wherever possible.
 37
 38# A good test is running printrefs() repeatedly on some object that is
 39# known to be static. If the list of references or the ids or any
 40# the listed references changes with each run, it's a good sign that
 41# we're showing some temporary objects that we should be ignoring.
 42
 43
 44def getobjs(
 45    cls: type | str, contains: str | None = None, expanded: bool = False
 46) -> list[Any]:
 47    """Return all garbage-collected objects matching criteria.
 48
 49    'type' can be an actual type or a string in which case objects
 50    whose types contain that string will be returned.
 51
 52    If 'contains' is provided, objects will be filtered to those
 53    containing that in their str() representations.
 54    """
 55
 56    # Don't wanna return stuff waiting to be garbage-collected.
 57    gc.collect()
 58
 59    if not isinstance(cls, type | str):
 60        raise TypeError('Expected a type or string for cls')
 61    if not isinstance(contains, str | None):
 62        raise TypeError('Expected a string or None for contains')
 63
 64    allobjs = _get_all_objects(expanded=expanded)
 65
 66    if isinstance(cls, str):
 67        objs = [o for o in allobjs if cls in str(type(o))]
 68    else:
 69        objs = [o for o in allobjs if isinstance(o, cls)]
 70    if contains is not None:
 71        objs = [o for o in objs if contains in str(o)]
 72
 73    return objs
 74
 75
 76# Recursively expand slists objects into olist, using seen to track
 77# already processed objects.
 78def _getr(slist: list[Any], olist: list[Any], seen: set[int]) -> None:
 79    for obj in slist:
 80        if id(obj) in seen:
 81            continue
 82        seen.add(id(obj))
 83        olist.append(obj)
 84        tll = gc.get_referents(obj)
 85        if tll:
 86            _getr(tll, olist, seen)
 87
 88
 89def _get_all_objects(expanded: bool) -> list[Any]:
 90    """Return an expanded list of all objects.
 91
 92    See https://utcc.utoronto.ca/~cks/space/blog/python/GetAllObjects
 93    """
 94    gcl = gc.get_objects()
 95    if not expanded:
 96        return gcl
 97    olist: list[Any] = []
 98    seen: set[int] = set()
 99    # Just in case:
100    seen.add(id(gcl))
101    seen.add(id(olist))
102    seen.add(id(seen))
103    # _getr does the real work.
104    _getr(gcl, olist, seen)
105    return olist
106
107
108def getobj(objid: int, expanded: bool = False) -> Any:
109    """Return a garbage-collected object by its id.
110
111    Remember that this is VERY inefficient and should only ever be used
112    for debugging.
113    """
114    if not isinstance(objid, int):
115        raise TypeError(f'Expected an int for objid; got a {type(objid)}.')
116
117    # Don't wanna return stuff waiting to be garbage-collected.
118    gc.collect()
119
120    allobjs = _get_all_objects(expanded=expanded)
121    for obj in allobjs:
122        if id(obj) == objid:
123            return obj
124    raise RuntimeError(f'Object with id {objid} not found.')
125
126
127def getrefs(obj: Any) -> list[Any]:
128    """Given an object, return things referencing it."""
129    v = vars()  # Ignore ref coming from locals.
130    return [o for o in gc.get_referrers(obj) if o is not v]
131
132
133def printfiles(file: TextIO | None = None) -> None:
134    """Print info about open files in the current app."""
135    import io
136
137    file = sys.stderr if file is None else file
138    try:
139        import psutil
140    except ImportError:
141        print(
142            "Error: printfiles requires the 'psutil' module to be installed.",
143            file=file,
144        )
145        return
146
147    proc = psutil.Process()
148
149    # Let's grab all Python file handles so we can associate raw files
150    # with their Python objects when possible.
151    fileio_ids = {obj.fileno(): obj for obj in getobjs(io.FileIO)}
152    textio_ids = {obj.fileno(): obj for obj in getobjs(io.TextIOWrapper)}
153
154    # FIXME: we could do a more limited version of this when psutil is
155    # not present that simply includes Python's files.
156    print('Files open by this app (not limited to Python\'s):', file=file)
157    for i, ofile in enumerate(proc.open_files()):
158        # Mypy doesn't know about mode apparently.
159        # (and can't use type: ignore because we don't require psutil
160        # and then mypy complains about unused ignore comment when its
161        # not present)
162        mode = getattr(ofile, 'mode')
163        assert isinstance(mode, str)
164        textio = textio_ids.get(ofile.fd)
165        textio_s = id(textio) if textio is not None else '<not found>'
166        fileio = fileio_ids.get(ofile.fd)
167        fileio_s = id(fileio) if fileio is not None else '<not found>'
168        print(
169            f'#{i+1}: path={ofile.path!r},'
170            f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},'
171            f' FileIO={fileio_s}'
172        )
173
174
175def printrefs(
176    obj: Any,
177    max_level: int = 2,
178    exclude_objs: list[Any] | None = None,
179    expand_ids: list[int] | None = None,
180    file: TextIO | None = None,
181) -> None:
182    """Print human readable list of objects referring to an object.
183
184    'max_level' specifies how many levels of recursion are printed.
185    'exclude_objs' can be a list of exact objects to skip if found in the
186      referrers list. This can be useful to avoid printing the local context
187      where the object was passed in from (locals(), etc).
188    'expand_ids' can be a list of object ids; if that particular object is
189      found, it will always be expanded even if max_level has been reached.
190    """
191    _printrefs(
192        obj,
193        level=0,
194        max_level=max_level,
195        exclude_objs=[] if exclude_objs is None else exclude_objs,
196        expand_ids=[] if expand_ids is None else expand_ids,
197        file=sys.stderr if file is None else file,
198    )
199
200
201def printtypes(
202    limit: int = 50, file: TextIO | None = None, expanded: bool = False
203) -> None:
204    """Print a human readable list of which types have the most instances."""
205    assert limit > 0
206    objtypes: dict[str, int] = {}
207    gc.collect()  # Recommended before get_objects().
208    allobjs = _get_all_objects(expanded=expanded)
209    allobjc = len(allobjs)
210    for obj in allobjs:
211        modname = type(obj).__module__
212        tpname = type(obj).__qualname__
213        if modname != 'builtins':
214            tpname = f'{modname}.{tpname}'
215        objtypes[tpname] = objtypes.get(tpname, 0) + 1
216
217    # Presumably allobjs contains stack-frame/dict type stuff
218    # from this function call which in turn contain refs to allobjs.
219    # Let's try to prevent these huge lists from accumulating until
220    # the cyclical collector (hopefully) gets to them.
221    allobjs.clear()
222    del allobjs
223
224    print(f'Types most allocated ({allobjc} total objects):', file=file)
225    for i, tpitem in enumerate(
226        sorted(objtypes.items(), key=lambda x: x[1], reverse=True)[:limit]
227    ):
228        tpname, tpval = tpitem
229        percent = tpval / allobjc * 100.0
230        print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
231
232
233def printsizes(
234    limit: int = 50, file: TextIO | None = None, expanded: bool = False
235) -> None:
236    """Print total allocated sizes of different types."""
237    assert limit > 0
238    objsizes: dict[str, int] = {}
239    gc.collect()  # Recommended before get_objects().
240    allobjs = _get_all_objects(expanded=expanded)
241    totalobjsize = 0
242
243    for obj in allobjs:
244        modname = type(obj).__module__
245        tpname = type(obj).__qualname__
246        if modname != 'builtins':
247            tpname = f'{modname}.{tpname}'
248        objsize = sys.getsizeof(obj)
249        objsizes[tpname] = objsizes.get(tpname, 0) + objsize
250        totalobjsize += objsize
251
252    totalobjmb = totalobjsize / (1024 * 1024)
253    print(
254        f'Types with most allocated bytes ({totalobjmb:.2f} mb total):',
255        file=file,
256    )
257    for i, tpitem in enumerate(
258        sorted(objsizes.items(), key=lambda x: x[1], reverse=True)[:limit]
259    ):
260        tpname, tpval = tpitem
261        percent = tpval / totalobjsize * 100.0
262        print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
263
264
265def _desctype(obj: Any) -> str:
266    cls = type(obj)
267    # noinspection PyPep8
268    if cls is types.ModuleType:
269        return f'{type(obj).__name__} {obj.__name__}'
270    # noinspection PyPep8
271    if cls is types.MethodType:
272        bnd = 'bound' if hasattr(obj, '__self__') else 'unbound'
273        return f'{bnd} {type(obj).__name__} {obj.__name__}'
274    return f'{type(obj).__name__}'
275
276
277def _desc(obj: Any) -> str:
278    extra: str | None = None
279    if isinstance(obj, list | tuple):
280        # Print length and the first few types.
281        tps = [_desctype(i) for i in obj[:3]]
282        tpsj = ', '.join(tps)
283        tpss = (
284            f', contains [{tpsj}, ...]'
285            if len(obj) > 3
286            else f', contains [{tpsj}]' if tps else ''
287        )
288        extra = f' (len {len(obj)}{tpss})'
289    elif isinstance(obj, dict):
290        # If it seems to be the vars() for a type or module,
291        # try to identify what.
292        for ref in getrefs(obj):
293            if hasattr(ref, '__dict__') and vars(ref) is obj:
294                extra = f' (vars for {_desctype(ref)} @ {id(ref)})'
295
296        # Generic dict: print length and the first few key:type pairs.
297        if extra is None:
298            pairs = [
299                f'{repr(n)}: {_desctype(v)}' for n, v in list(obj.items())[:3]
300            ]
301            pairsj = ', '.join(pairs)
302            pairss = (
303                f', contains {{{pairsj}, ...}}'
304                if len(obj) > 3
305                else f', contains {{{pairsj}}}' if pairs else ''
306            )
307            extra = f' (len {len(obj)}{pairss})'
308    if extra is None:
309        extra = ''
310    return f'{_desctype(obj)} @ {id(obj)}{extra}'
311
312
313def _printrefs(
314    obj: Any,
315    level: int,
316    max_level: int,
317    exclude_objs: list,
318    expand_ids: list[int],
319    file: TextIO,
320) -> None:
321    ind = '  ' * level
322    print(ind + _desc(obj), file=file)
323    v = vars()
324    if level < max_level or (id(obj) in expand_ids and level < ABS_MAX_LEVEL):
325        refs = getrefs(obj)
326        for ref in refs:
327            # It seems we tend to get a transient cell object with contents
328            # set to obj. Would be nice to understand why that happens
329            # but just ignoring it for now.
330            if isinstance(ref, types.CellType) and ref.cell_contents is obj:
331                continue
332
333            # Ignore anything we were asked to ignore.
334            if exclude_objs is not None:
335                if any(ref is eobj for eobj in exclude_objs):
336                    continue
337
338            # Ignore references from our locals.
339            if ref is v:
340                continue
341
342            # The 'refs' list we just made will be listed as a referrer
343            # of this obj, so explicitly exclude it from the obj's listing.
344            _printrefs(
345                ref,
346                level=level + 1,
347                max_level=max_level,
348                exclude_objs=exclude_objs + [refs],
349                expand_ids=expand_ids,
350                file=file,
351            )
352
353
354class DeadlockDumper:
355    """Dumps thread states if still around after timeout seconds.
356
357    This uses low level Python functionality so should still fire
358    even in the case of deadlock.
359    """
360
361    # faulthandler has a single traceback-dump-later state, so only
362    # one of us can control it at a time.
363    lock = threading.Lock()
364    watch_in_progress = False
365
366    def __init__(self, timeout: float) -> None:
367        import faulthandler
368
369        cls = type(self)
370
371        with cls.lock:
372            if cls.watch_in_progress:
373                print(
374                    'Existing DeadlockDumper found; new one will be a no-op.',
375                    file=sys.stderr,
376                )
377                self.active = False
378                return
379
380            # Ok; no watch is in progress; we can be the active one.
381            cls.watch_in_progress = True
382            self.active = True
383            faulthandler.dump_traceback_later(timeout=timeout)
384
385    def __del__(self) -> None:
386        import faulthandler
387
388        cls = type(self)
389
390        # If we made it to here, call off the dump. But only if we are
391        # the active dump.
392        if self.active:
393            faulthandler.cancel_dump_traceback_later()
394            cls.watch_in_progress = False
395
396
397class DeadlockWatcher:
398    """Individual watcher for deadlock conditions.
399
400    Use the enable_deadlock_watchers() to enable this system.
401
402    Next, create these in contexts where they will be torn down after
403    some operation completes. If any is not torn down within the
404    timeout period, a traceback of all threads will be dumped.
405
406    Note that the checker thread runs a cycle every ~5 seconds, so
407    something stuck needs to remain stuck for 5 seconds or so to be
408    caught for sure.
409    """
410
411    watchers_lock: threading.Lock | None = None
412    watchers: list[weakref.ref[DeadlockWatcher]] | None = None
413
414    def __init__(
415        self,
416        timeout: float = 10.0,
417        logger: Logger | None = None,
418        logextra: dict | None = None,
419    ) -> None:
420        # pylint: disable=not-context-manager
421        cls = type(self)
422        if cls.watchers_lock is None or cls.watchers is None:
423            print(
424                'DeadlockWatcher created without watchers enabled.',
425                file=sys.stderr,
426            )
427            return
428
429        # All we do is record when we were made and how long till we
430        # expire.
431        self.create_time = time.monotonic()
432        self.timeout = timeout
433        self.noted_expire = False
434        self.logger = logger
435        self.logextra = logextra
436
437        with cls.watchers_lock:
438            cls.watchers.append(weakref.ref(self))
439
440    @classmethod
441    def enable_deadlock_watchers(cls) -> None:
442        """Spins up deadlock-watcher functionality.
443
444        Must be explicitly called before any DeadlockWatchers are
445        created.
446        """
447        assert cls.watchers_lock is None
448        cls.watchers_lock = threading.Lock()
449        assert cls.watchers is None
450        cls.watchers = []
451
452        threading.Thread(
453            target=cls._deadlock_watcher_thread_main, daemon=True
454        ).start()
455
456    @classmethod
457    def _deadlock_watcher_thread_main(cls) -> None:
458        # pylint: disable=not-context-manager
459        # pylint: disable=not-an-iterable
460        assert cls.watchers_lock is not None and cls.watchers is not None
461
462        # Spin in a loop checking our watchers periodically and dumping
463        # state if any have timed out. The trick here is that we don't
464        # explicitly dump state, but rather we set up a "dead man's
465        # switch" that does so after some amount of time if we don't
466        # explicitly cancel it. This way we'll hopefully get state dumps
467        # even for things like total GIL deadlocks.
468        while True:
469
470            # Set a dead man's switch for this pass.
471            dumper = DeadlockDumper(timeout=5.171)
472
473            # Sleep most of the way through it but give ourselves time
474            # to turn it off if we're still responsive.
475            time.sleep(4.129)
476            now = time.monotonic()
477
478            found_fresh_expired = False
479
480            # If any watcher is still alive but expired, sleep past the
481            # timeout to force the dumper to do its thing.
482            with cls.watchers_lock:
483
484                for wref in cls.watchers:
485                    w = wref()
486                    if (
487                        w is not None
488                        and now - w.create_time > w.timeout
489                        and not w.noted_expire
490                    ):
491                        # If they supplied a logger, let them know they
492                        # should check stderr for a dump.
493                        if w.logger is not None:
494                            w.logger.error(
495                                'DeadlockWatcher with time %.2f expired;'
496                                ' check stderr for stack traces.',
497                                w.timeout,
498                                extra=w.logextra,
499                            )
500                        found_fresh_expired = True
501                        w.noted_expire = True
502
503                    # Important to clear this ref; otherwise we can keep
504                    # a random watcher alive until our next time through.
505                    w = None
506
507                # Prune dead watchers and reset for the next pass.
508                cls.watchers = [w for w in cls.watchers if w() is not None]
509
510            if found_fresh_expired:
511                # Push us over the dumper time limit which give us a
512                # lovely dump. Technically we could just do an immediate
513                # dump here instead, but that would give us two paths to
514                # maintain instead of this single one.
515                time.sleep(2.0)
516
517            # Call off the dump if it hasn't happened yet.
518            del dumper
ABS_MAX_LEVEL = 10
def getobjs( cls: type | str, contains: str | None = None, expanded: bool = False) -> list[typing.Any]:
45def getobjs(
46    cls: type | str, contains: str | None = None, expanded: bool = False
47) -> list[Any]:
48    """Return all garbage-collected objects matching criteria.
49
50    'type' can be an actual type or a string in which case objects
51    whose types contain that string will be returned.
52
53    If 'contains' is provided, objects will be filtered to those
54    containing that in their str() representations.
55    """
56
57    # Don't wanna return stuff waiting to be garbage-collected.
58    gc.collect()
59
60    if not isinstance(cls, type | str):
61        raise TypeError('Expected a type or string for cls')
62    if not isinstance(contains, str | None):
63        raise TypeError('Expected a string or None for contains')
64
65    allobjs = _get_all_objects(expanded=expanded)
66
67    if isinstance(cls, str):
68        objs = [o for o in allobjs if cls in str(type(o))]
69    else:
70        objs = [o for o in allobjs if isinstance(o, cls)]
71    if contains is not None:
72        objs = [o for o in objs if contains in str(o)]
73
74    return objs

Return all garbage-collected objects matching criteria.

'type' can be an actual type or a string in which case objects whose types contain that string will be returned.

If 'contains' is provided, objects will be filtered to those containing that in their str() representations.

def getobj(objid: int, expanded: bool = False) -> Any:
109def getobj(objid: int, expanded: bool = False) -> Any:
110    """Return a garbage-collected object by its id.
111
112    Remember that this is VERY inefficient and should only ever be used
113    for debugging.
114    """
115    if not isinstance(objid, int):
116        raise TypeError(f'Expected an int for objid; got a {type(objid)}.')
117
118    # Don't wanna return stuff waiting to be garbage-collected.
119    gc.collect()
120
121    allobjs = _get_all_objects(expanded=expanded)
122    for obj in allobjs:
123        if id(obj) == objid:
124            return obj
125    raise RuntimeError(f'Object with id {objid} not found.')

Return a garbage-collected object by its id.

Remember that this is VERY inefficient and should only ever be used for debugging.

def getrefs(obj: Any) -> list[typing.Any]:
128def getrefs(obj: Any) -> list[Any]:
129    """Given an object, return things referencing it."""
130    v = vars()  # Ignore ref coming from locals.
131    return [o for o in gc.get_referrers(obj) if o is not v]

Given an object, return things referencing it.

def printfiles(file: typing.TextIO | None = None) -> None:
134def printfiles(file: TextIO | None = None) -> None:
135    """Print info about open files in the current app."""
136    import io
137
138    file = sys.stderr if file is None else file
139    try:
140        import psutil
141    except ImportError:
142        print(
143            "Error: printfiles requires the 'psutil' module to be installed.",
144            file=file,
145        )
146        return
147
148    proc = psutil.Process()
149
150    # Let's grab all Python file handles so we can associate raw files
151    # with their Python objects when possible.
152    fileio_ids = {obj.fileno(): obj for obj in getobjs(io.FileIO)}
153    textio_ids = {obj.fileno(): obj for obj in getobjs(io.TextIOWrapper)}
154
155    # FIXME: we could do a more limited version of this when psutil is
156    # not present that simply includes Python's files.
157    print('Files open by this app (not limited to Python\'s):', file=file)
158    for i, ofile in enumerate(proc.open_files()):
159        # Mypy doesn't know about mode apparently.
160        # (and can't use type: ignore because we don't require psutil
161        # and then mypy complains about unused ignore comment when its
162        # not present)
163        mode = getattr(ofile, 'mode')
164        assert isinstance(mode, str)
165        textio = textio_ids.get(ofile.fd)
166        textio_s = id(textio) if textio is not None else '<not found>'
167        fileio = fileio_ids.get(ofile.fd)
168        fileio_s = id(fileio) if fileio is not None else '<not found>'
169        print(
170            f'#{i+1}: path={ofile.path!r},'
171            f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},'
172            f' FileIO={fileio_s}'
173        )

Print info about open files in the current app.

def printrefs( obj: Any, max_level: int = 2, exclude_objs: list[typing.Any] | None = None, expand_ids: list[int] | None = None, file: typing.TextIO | None = None) -> None:
176def printrefs(
177    obj: Any,
178    max_level: int = 2,
179    exclude_objs: list[Any] | None = None,
180    expand_ids: list[int] | None = None,
181    file: TextIO | None = None,
182) -> None:
183    """Print human readable list of objects referring to an object.
184
185    'max_level' specifies how many levels of recursion are printed.
186    'exclude_objs' can be a list of exact objects to skip if found in the
187      referrers list. This can be useful to avoid printing the local context
188      where the object was passed in from (locals(), etc).
189    'expand_ids' can be a list of object ids; if that particular object is
190      found, it will always be expanded even if max_level has been reached.
191    """
192    _printrefs(
193        obj,
194        level=0,
195        max_level=max_level,
196        exclude_objs=[] if exclude_objs is None else exclude_objs,
197        expand_ids=[] if expand_ids is None else expand_ids,
198        file=sys.stderr if file is None else file,
199    )

Print human readable list of objects referring to an object.

'max_level' specifies how many levels of recursion are printed. 'exclude_objs' can be a list of exact objects to skip if found in the referrers list. This can be useful to avoid printing the local context where the object was passed in from (locals(), etc). 'expand_ids' can be a list of object ids; if that particular object is found, it will always be expanded even if max_level has been reached.

def printtypes( limit: int = 50, file: typing.TextIO | None = None, expanded: bool = False) -> None:
202def printtypes(
203    limit: int = 50, file: TextIO | None = None, expanded: bool = False
204) -> None:
205    """Print a human readable list of which types have the most instances."""
206    assert limit > 0
207    objtypes: dict[str, int] = {}
208    gc.collect()  # Recommended before get_objects().
209    allobjs = _get_all_objects(expanded=expanded)
210    allobjc = len(allobjs)
211    for obj in allobjs:
212        modname = type(obj).__module__
213        tpname = type(obj).__qualname__
214        if modname != 'builtins':
215            tpname = f'{modname}.{tpname}'
216        objtypes[tpname] = objtypes.get(tpname, 0) + 1
217
218    # Presumably allobjs contains stack-frame/dict type stuff
219    # from this function call which in turn contain refs to allobjs.
220    # Let's try to prevent these huge lists from accumulating until
221    # the cyclical collector (hopefully) gets to them.
222    allobjs.clear()
223    del allobjs
224
225    print(f'Types most allocated ({allobjc} total objects):', file=file)
226    for i, tpitem in enumerate(
227        sorted(objtypes.items(), key=lambda x: x[1], reverse=True)[:limit]
228    ):
229        tpname, tpval = tpitem
230        percent = tpval / allobjc * 100.0
231        print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)

Print a human readable list of which types have the most instances.

def printsizes( limit: int = 50, file: typing.TextIO | None = None, expanded: bool = False) -> None:
234def printsizes(
235    limit: int = 50, file: TextIO | None = None, expanded: bool = False
236) -> None:
237    """Print total allocated sizes of different types."""
238    assert limit > 0
239    objsizes: dict[str, int] = {}
240    gc.collect()  # Recommended before get_objects().
241    allobjs = _get_all_objects(expanded=expanded)
242    totalobjsize = 0
243
244    for obj in allobjs:
245        modname = type(obj).__module__
246        tpname = type(obj).__qualname__
247        if modname != 'builtins':
248            tpname = f'{modname}.{tpname}'
249        objsize = sys.getsizeof(obj)
250        objsizes[tpname] = objsizes.get(tpname, 0) + objsize
251        totalobjsize += objsize
252
253    totalobjmb = totalobjsize / (1024 * 1024)
254    print(
255        f'Types with most allocated bytes ({totalobjmb:.2f} mb total):',
256        file=file,
257    )
258    for i, tpitem in enumerate(
259        sorted(objsizes.items(), key=lambda x: x[1], reverse=True)[:limit]
260    ):
261        tpname, tpval = tpitem
262        percent = tpval / totalobjsize * 100.0
263        print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)

Print total allocated sizes of different types.

class DeadlockDumper:
355class DeadlockDumper:
356    """Dumps thread states if still around after timeout seconds.
357
358    This uses low level Python functionality so should still fire
359    even in the case of deadlock.
360    """
361
362    # faulthandler has a single traceback-dump-later state, so only
363    # one of us can control it at a time.
364    lock = threading.Lock()
365    watch_in_progress = False
366
367    def __init__(self, timeout: float) -> None:
368        import faulthandler
369
370        cls = type(self)
371
372        with cls.lock:
373            if cls.watch_in_progress:
374                print(
375                    'Existing DeadlockDumper found; new one will be a no-op.',
376                    file=sys.stderr,
377                )
378                self.active = False
379                return
380
381            # Ok; no watch is in progress; we can be the active one.
382            cls.watch_in_progress = True
383            self.active = True
384            faulthandler.dump_traceback_later(timeout=timeout)
385
386    def __del__(self) -> None:
387        import faulthandler
388
389        cls = type(self)
390
391        # If we made it to here, call off the dump. But only if we are
392        # the active dump.
393        if self.active:
394            faulthandler.cancel_dump_traceback_later()
395            cls.watch_in_progress = False

Dumps thread states if still around after timeout seconds.

This uses low level Python functionality so should still fire even in the case of deadlock.

DeadlockDumper(timeout: float)
367    def __init__(self, timeout: float) -> None:
368        import faulthandler
369
370        cls = type(self)
371
372        with cls.lock:
373            if cls.watch_in_progress:
374                print(
375                    'Existing DeadlockDumper found; new one will be a no-op.',
376                    file=sys.stderr,
377                )
378                self.active = False
379                return
380
381            # Ok; no watch is in progress; we can be the active one.
382            cls.watch_in_progress = True
383            self.active = True
384            faulthandler.dump_traceback_later(timeout=timeout)
lock = <unlocked _thread.lock object>
watch_in_progress = False
class DeadlockWatcher:
398class DeadlockWatcher:
399    """Individual watcher for deadlock conditions.
400
401    Use the enable_deadlock_watchers() to enable this system.
402
403    Next, create these in contexts where they will be torn down after
404    some operation completes. If any is not torn down within the
405    timeout period, a traceback of all threads will be dumped.
406
407    Note that the checker thread runs a cycle every ~5 seconds, so
408    something stuck needs to remain stuck for 5 seconds or so to be
409    caught for sure.
410    """
411
412    watchers_lock: threading.Lock | None = None
413    watchers: list[weakref.ref[DeadlockWatcher]] | None = None
414
415    def __init__(
416        self,
417        timeout: float = 10.0,
418        logger: Logger | None = None,
419        logextra: dict | None = None,
420    ) -> None:
421        # pylint: disable=not-context-manager
422        cls = type(self)
423        if cls.watchers_lock is None or cls.watchers is None:
424            print(
425                'DeadlockWatcher created without watchers enabled.',
426                file=sys.stderr,
427            )
428            return
429
430        # All we do is record when we were made and how long till we
431        # expire.
432        self.create_time = time.monotonic()
433        self.timeout = timeout
434        self.noted_expire = False
435        self.logger = logger
436        self.logextra = logextra
437
438        with cls.watchers_lock:
439            cls.watchers.append(weakref.ref(self))
440
441    @classmethod
442    def enable_deadlock_watchers(cls) -> None:
443        """Spins up deadlock-watcher functionality.
444
445        Must be explicitly called before any DeadlockWatchers are
446        created.
447        """
448        assert cls.watchers_lock is None
449        cls.watchers_lock = threading.Lock()
450        assert cls.watchers is None
451        cls.watchers = []
452
453        threading.Thread(
454            target=cls._deadlock_watcher_thread_main, daemon=True
455        ).start()
456
457    @classmethod
458    def _deadlock_watcher_thread_main(cls) -> None:
459        # pylint: disable=not-context-manager
460        # pylint: disable=not-an-iterable
461        assert cls.watchers_lock is not None and cls.watchers is not None
462
463        # Spin in a loop checking our watchers periodically and dumping
464        # state if any have timed out. The trick here is that we don't
465        # explicitly dump state, but rather we set up a "dead man's
466        # switch" that does so after some amount of time if we don't
467        # explicitly cancel it. This way we'll hopefully get state dumps
468        # even for things like total GIL deadlocks.
469        while True:
470
471            # Set a dead man's switch for this pass.
472            dumper = DeadlockDumper(timeout=5.171)
473
474            # Sleep most of the way through it but give ourselves time
475            # to turn it off if we're still responsive.
476            time.sleep(4.129)
477            now = time.monotonic()
478
479            found_fresh_expired = False
480
481            # If any watcher is still alive but expired, sleep past the
482            # timeout to force the dumper to do its thing.
483            with cls.watchers_lock:
484
485                for wref in cls.watchers:
486                    w = wref()
487                    if (
488                        w is not None
489                        and now - w.create_time > w.timeout
490                        and not w.noted_expire
491                    ):
492                        # If they supplied a logger, let them know they
493                        # should check stderr for a dump.
494                        if w.logger is not None:
495                            w.logger.error(
496                                'DeadlockWatcher with time %.2f expired;'
497                                ' check stderr for stack traces.',
498                                w.timeout,
499                                extra=w.logextra,
500                            )
501                        found_fresh_expired = True
502                        w.noted_expire = True
503
504                    # Important to clear this ref; otherwise we can keep
505                    # a random watcher alive until our next time through.
506                    w = None
507
508                # Prune dead watchers and reset for the next pass.
509                cls.watchers = [w for w in cls.watchers if w() is not None]
510
511            if found_fresh_expired:
512                # Push us over the dumper time limit which give us a
513                # lovely dump. Technically we could just do an immediate
514                # dump here instead, but that would give us two paths to
515                # maintain instead of this single one.
516                time.sleep(2.0)
517
518            # Call off the dump if it hasn't happened yet.
519            del dumper

Individual watcher for deadlock conditions.

Use the enable_deadlock_watchers() to enable this system.

Next, create these in contexts where they will be torn down after some operation completes. If any is not torn down within the timeout period, a traceback of all threads will be dumped.

Note that the checker thread runs a cycle every ~5 seconds, so something stuck needs to remain stuck for 5 seconds or so to be caught for sure.

DeadlockWatcher( timeout: float = 10.0, logger: logging.Logger | None = None, logextra: dict | None = None)
415    def __init__(
416        self,
417        timeout: float = 10.0,
418        logger: Logger | None = None,
419        logextra: dict | None = None,
420    ) -> None:
421        # pylint: disable=not-context-manager
422        cls = type(self)
423        if cls.watchers_lock is None or cls.watchers is None:
424            print(
425                'DeadlockWatcher created without watchers enabled.',
426                file=sys.stderr,
427            )
428            return
429
430        # All we do is record when we were made and how long till we
431        # expire.
432        self.create_time = time.monotonic()
433        self.timeout = timeout
434        self.noted_expire = False
435        self.logger = logger
436        self.logextra = logextra
437
438        with cls.watchers_lock:
439            cls.watchers.append(weakref.ref(self))
watchers_lock: 'threading.Lock | None' = None
watchers: list[weakref.ReferenceType[DeadlockWatcher]] | None = None
create_time
timeout
noted_expire
logger
logextra
@classmethod
def enable_deadlock_watchers(cls) -> None:
441    @classmethod
442    def enable_deadlock_watchers(cls) -> None:
443        """Spins up deadlock-watcher functionality.
444
445        Must be explicitly called before any DeadlockWatchers are
446        created.
447        """
448        assert cls.watchers_lock is None
449        cls.watchers_lock = threading.Lock()
450        assert cls.watchers is None
451        cls.watchers = []
452
453        threading.Thread(
454            target=cls._deadlock_watcher_thread_main, daemon=True
455        ).start()

Spins up deadlock-watcher functionality.

Must be explicitly called before any DeadlockWatchers are created.