efro.log

Logging functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Logging functionality."""
  4from __future__ import annotations
  5
  6import sys
  7import time
  8import asyncio
  9import logging
 10import datetime
 11import itertools
 12from enum import Enum
 13from collections import deque
 14from dataclasses import dataclass, field
 15from typing import TYPE_CHECKING, Annotated
 16from threading import Thread, current_thread, Lock
 17
 18from typing_extensions import override
 19from efro.util import utc_now
 20from efro.call import tpartial
 21from efro.terminal import Clr
 22from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
 23
 24if TYPE_CHECKING:
 25    from pathlib import Path
 26    from typing import Any, Callable, TextIO
 27
 28
 29class LogLevel(Enum):
 30    """Severity level for a log entry.
 31
 32    These enums have numeric values so they can be compared in severity.
 33    Note that these values are not currently interchangeable with the
 34    logging.ERROR, logging.DEBUG, etc. values.
 35    """
 36
 37    DEBUG = 0
 38    INFO = 1
 39    WARNING = 2
 40    ERROR = 3
 41    CRITICAL = 4
 42
 43    @property
 44    def python_logging_level(self) -> int:
 45        """Give the corresponding logging level."""
 46        return LOG_LEVEL_LEVELNOS[self]
 47
 48    @classmethod
 49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
 50        """Given a Python logging level, return a LogLevel."""
 51        return LEVELNO_LOG_LEVELS[levelno]
 52
 53
 54# Python logging levels from LogLevels
 55LOG_LEVEL_LEVELNOS = {
 56    LogLevel.DEBUG: logging.DEBUG,
 57    LogLevel.INFO: logging.INFO,
 58    LogLevel.WARNING: logging.WARNING,
 59    LogLevel.ERROR: logging.ERROR,
 60    LogLevel.CRITICAL: logging.CRITICAL,
 61}
 62
 63# LogLevels from Python logging levels
 64LEVELNO_LOG_LEVELS = {
 65    logging.DEBUG: LogLevel.DEBUG,
 66    logging.INFO: LogLevel.INFO,
 67    logging.WARNING: LogLevel.WARNING,
 68    logging.ERROR: LogLevel.ERROR,
 69    logging.CRITICAL: LogLevel.CRITICAL,
 70}
 71
 72LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {
 73    logging.DEBUG: (Clr.CYN, Clr.RST),
 74    logging.INFO: ('', ''),
 75    logging.WARNING: (Clr.YLW, Clr.RST),
 76    logging.ERROR: (Clr.RED, Clr.RST),
 77    logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST),
 78}
 79
 80
 81@ioprepped
 82@dataclass
 83class LogEntry:
 84    """Single logged message."""
 85
 86    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
 87    message: Annotated[str, IOAttrs('m')]
 88    level: Annotated[LogLevel, IOAttrs('l')]
 89    time: Annotated[datetime.datetime, IOAttrs('t')]
 90
 91    # We support arbitrary string labels per log entry which can be
 92    # incorporated into custom log processing. To populate this, our
 93    # LogHandler class looks for a 'labels' dict passed in the optional
 94    # 'extra' dict arg to standard Python log calls.
 95    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
 96        field(default_factory=dict)
 97    )
 98
 99
100@ioprepped
101@dataclass
102class LogArchive:
103    """Info and data for a log."""
104
105    # Total number of entries submitted to the log.
106    log_size: Annotated[int, IOAttrs('t')]
107
108    # Offset for the entries contained here.
109    # (10 means our first entry is the 10th in the log, etc.)
110    start_index: Annotated[int, IOAttrs('c')]
111
112    entries: Annotated[list[LogEntry], IOAttrs('e')]
113
114
115class LogHandler(logging.Handler):
116    """Fancy-pants handler for logging output.
117
118    Writes logs to disk in structured json format and echoes them
119    to stdout/stderr with pretty colors.
120    """
121
122    _event_loop: asyncio.AbstractEventLoop
123
124    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
125    # Otherwise we can get infinite loops as those prints come back to us
126    # as new log entries.
127
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)
166
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
183        )
184
185    def _add_callback_in_thread(
186        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
187    ) -> None:
188        """Add a callback to be run for each LogEntry.
189
190        Note that this callback will always run in a background thread.
191        Passing True for feed_existing_logs will cause all cached logs
192        in the handler to be fed to the callback (still in the
193        background thread though).
194        """
195        assert current_thread() is self._thread
196        self._callbacks.append(call)
197
198        # Run all of our cached entries through the new callback if desired.
199        if feed_existing_logs and self._cache_size_limit > 0:
200            with self._cache_lock:
201                for _id, entry in self._cache:
202                    self._run_callback_on_entry(call, entry)
203
204    def _log_thread_main(self) -> None:
205        self._event_loop = asyncio.new_event_loop()
206
207        # In our background thread event loop we do a fair amount of
208        # slow synchronous stuff such as mucking with the log cache.
209        # Let's avoid getting tons of warnings about this in debug mode.
210        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
211
212        # NOTE: if we ever use default threadpool at all we should allow
213        # setting it for our loop.
214        asyncio.set_event_loop(self._event_loop)
215        self._thread_bootstrapped = True
216        try:
217            if self._cache_time_limit is not None:
218                _prunetask = self._event_loop.create_task(
219                    self._time_prune_cache()
220                )
221            self._event_loop.run_forever()
222        except BaseException:
223            # If this ever goes down we're in trouble; we won't be able
224            # to log about it though. Try to make some noise however we
225            # can.
226            print('LogHandler died!!!', file=sys.stderr)
227            import traceback
228
229            traceback.print_exc()
230            raise
231
232    async def _time_prune_cache(self) -> None:
233        assert self._cache_time_limit is not None
234        while bool(True):
235            await asyncio.sleep(61.27)
236            now = utc_now()
237            with self._cache_lock:
238                # Prune the oldest entry as long as there is a first one that
239                # is too old.
240                while (
241                    self._cache
242                    and (now - self._cache[0][1].time) >= self._cache_time_limit
243                ):
244                    popped = self._cache.popleft()
245                    self._cache_size -= popped[0]
246                    self._cache_index_offset += 1
247
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )
282
283    def _cache_slice(
284        self, start: int, end: int, step: int = 1
285    ) -> list[LogEntry]:
286        # Deque doesn't natively support slicing but we can do it manually.
287        # It sounds like rotating the deque and pulling from the beginning
288        # is the most efficient way to do this. The downside is the deque
289        # gets temporarily modified in the process so we need to make sure
290        # we're holding the lock.
291        assert self._cache_lock.locked()
292        cache = self._cache
293        cache.rotate(-start)
294        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
295        cache.rotate(start)
296        return slc
297
298    @classmethod
299    def _is_immutable_log_data(cls, data: Any) -> bool:
300        if isinstance(data, (str, bool, int, float, bytes)):
301            return True
302        if isinstance(data, tuple):
303            return all(cls._is_immutable_log_data(x) for x in data)
304        return False
305
306    def call_in_thread(self, call: Callable[[], Any]) -> None:
307        """Submit a call to be run in the logging background thread."""
308        self._event_loop.call_soon_threadsafe(call)
309
310    @override
311    def emit(self, record: logging.LogRecord) -> None:
312        # pylint: disable=too-many-branches
313        if __debug__:
314            starttime = time.monotonic()
315
316        # Called by logging to send us records.
317
318        # TODO - kill this.
319        if (
320            self._suppress_non_root_debug
321            and record.name != 'root'
322            and record.levelname == 'DEBUG'
323        ):
324            return
325
326        # Optimization: if our log args are all simple immutable values,
327        # we can just kick the whole thing over to our background thread to
328        # be formatted there at our leisure. If anything is mutable and
329        # thus could possibly change between now and then or if we want
330        # to do immediate file echoing then we need to bite the bullet
331        # and do that stuff here at the call site.
332        fast_path = self._echofile is None and self._is_immutable_log_data(
333            record.args
334        )
335
336        # Note: just assuming types are correct here, but they'll be
337        # checked properly when the resulting LogEntry gets exported.
338        labels: dict[str, str] | None = getattr(record, 'labels', None)
339        if labels is None:
340            labels = {}
341
342        if fast_path:
343            if __debug__:
344                formattime = echotime = time.monotonic()
345            self._event_loop.call_soon_threadsafe(
346                tpartial(
347                    self._emit_in_thread,
348                    record.name,
349                    record.levelno,
350                    record.created,
351                    record,
352                    labels,
353                )
354            )
355        else:
356            # Slow case; do formatting and echoing here at the log call
357            # site.
358            msg = self.format(record)
359
360            if __debug__:
361                formattime = time.monotonic()
362
363            # Also immediately print pretty colored output to our echo file
364            # (generally stderr). We do this part here instead of in our bg
365            # thread because the delay can throw off command line prompts or
366            # make tight debugging harder.
367            if self._echofile is not None:
368                ends = LEVELNO_COLOR_CODES.get(record.levelno)
369                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
370                if ends is not None:
371                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
372                else:
373                    self._echofile.write(f'{namepre}{msg}\n')
374                self._echofile.flush()
375
376            if __debug__:
377                echotime = time.monotonic()
378
379            self._event_loop.call_soon_threadsafe(
380                tpartial(
381                    self._emit_in_thread,
382                    record.name,
383                    record.levelno,
384                    record.created,
385                    msg,
386                    labels,
387                )
388            )
389
390        if __debug__:
391            # Make noise if we're taking a significant amount of time here.
392            # Limit the noise to once every so often though; otherwise we
393            # could get a feedback loop where every log emit results in a
394            # warning log which results in another, etc.
395            now = time.monotonic()
396            # noinspection PyUnboundLocalVariable
397            duration = now - starttime  # pyright: ignore
398            # noinspection PyUnboundLocalVariable
399            format_duration = formattime - starttime  # pyright: ignore
400            # noinspection PyUnboundLocalVariable
401            echo_duration = echotime - formattime  # pyright: ignore
402            if duration > 0.05 and (
403                self._last_slow_emit_warning_time is None
404                or now > self._last_slow_emit_warning_time + 10.0
405            ):
406                # Logging calls from *within* a logging handler
407                # sounds sketchy, so let's just kick this over to
408                # the bg event loop thread we've already got.
409                self._last_slow_emit_warning_time = now
410                self._event_loop.call_soon_threadsafe(
411                    tpartial(
412                        logging.warning,
413                        'efro.log.LogHandler emit took too long'
414                        ' (%.2fs total; %.2fs format, %.2fs echo,'
415                        ' fast_path=%s).',
416                        duration,
417                        format_duration,
418                        echo_duration,
419                        fast_path,
420                    )
421                )
422
423    def _emit_in_thread(
424        self,
425        name: str,
426        levelno: int,
427        created: float,
428        message: str | logging.LogRecord,
429        labels: dict[str, str],
430    ) -> None:
431        try:
432            # If they passed a raw record here, bake it down to a string.
433            if isinstance(message, logging.LogRecord):
434                message = self.format(message)
435
436            self._emit_entry(
437                LogEntry(
438                    name=name,
439                    message=message,
440                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
441                    time=datetime.datetime.fromtimestamp(
442                        created, datetime.timezone.utc
443                    ),
444                    labels=labels,
445                )
446            )
447        except Exception:
448            import traceback
449
450            traceback.print_exc(file=self._echofile)
451
452    def file_write(self, name: str, output: str) -> None:
453        """Send raw stdout/stderr output to the logger to be collated."""
454
455        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
456        # lines in stack traces get written as lots of individual '^'
457        # writes. It feels a bit dirty to be pushing a deferred call to
458        # another thread for each character. Perhaps should do some sort
459        # of basic accumulation here?
460        self._event_loop.call_soon_threadsafe(
461            tpartial(self._file_write_in_thread, name, output)
462        )
463
464    def _file_write_in_thread(self, name: str, output: str) -> None:
465        try:
466            assert name in ('stdout', 'stderr')
467
468            # Here we try to be somewhat smart about breaking arbitrary
469            # print output into discrete log entries.
470
471            self._file_chunks[name].append(output)
472
473            # Individual parts of a print come across as separate writes,
474            # and the end of a print will be a standalone '\n' by default.
475            # Let's use that as a hint that we're likely at the end of
476            # a full print statement and ship what we've got.
477            if output == '\n':
478                self._ship_file_chunks(name, cancel_ship_task=True)
479            else:
480                # By default just keep adding chunks.
481                # However we keep a timer running anytime we've got
482                # unshipped chunks so that we can ship what we've got
483                # after a short bit if we never get a newline.
484                ship_task = self._file_chunk_ship_task[name]
485                if ship_task is None:
486                    self._file_chunk_ship_task[name] = (
487                        self._event_loop.create_task(
488                            self._ship_chunks_task(name),
489                            name='log ship file chunks',
490                        )
491                    )
492
493        except Exception:
494            import traceback
495
496            traceback.print_exc(file=self._echofile)
497
498    def file_flush(self, name: str) -> None:
499        """Send raw stdout/stderr flush to the logger to be collated."""
500
501        self._event_loop.call_soon_threadsafe(
502            tpartial(self._file_flush_in_thread, name)
503        )
504
505    def _file_flush_in_thread(self, name: str) -> None:
506        try:
507            assert name in ('stdout', 'stderr')
508
509            # Immediately ship whatever chunks we've got.
510            if self._file_chunks[name]:
511                self._ship_file_chunks(name, cancel_ship_task=True)
512
513        except Exception:
514            import traceback
515
516            traceback.print_exc(file=self._echofile)
517
518    async def _ship_chunks_task(self, name: str) -> None:
519        # Note: it's important we sleep here for a moment. Otherwise,
520        # things like '^^^^^^^^^^^^' lines in stack traces, which come
521        # through as lots of individual '^' writes, tend to get broken
522        # into lots of tiny little lines by us.
523        await asyncio.sleep(0.01)
524        self._ship_file_chunks(name, cancel_ship_task=False)
525
526    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
527        # Note: Raw print input generally ends in a newline, but that is
528        # redundant when we break things into log entries and results in
529        # extra empty lines. So strip off a single trailing newline if
530        # one is present.
531        text = ''.join(self._file_chunks[name]).removesuffix('\n')
532
533        self._emit_entry(
534            LogEntry(
535                name=name, message=text, level=LogLevel.INFO, time=utc_now()
536            )
537        )
538        self._file_chunks[name] = []
539        ship_task = self._file_chunk_ship_task[name]
540        if cancel_ship_task and ship_task is not None:
541            ship_task.cancel()
542        self._file_chunk_ship_task[name] = None
543
544    def _emit_entry(self, entry: LogEntry) -> None:
545        assert current_thread() is self._thread
546
547        # Store to our cache.
548        if self._cache_size_limit > 0:
549            with self._cache_lock:
550                # Do a rough calc of how many bytes this entry consumes.
551                entry_size = sum(
552                    sys.getsizeof(x)
553                    for x in (
554                        entry,
555                        entry.name,
556                        entry.message,
557                        entry.level,
558                        entry.time,
559                    )
560                )
561                self._cache.append((entry_size, entry))
562                self._cache_size += entry_size
563
564                # Prune old until we are back at or under our limit.
565                while self._cache_size > self._cache_size_limit:
566                    popped = self._cache.popleft()
567                    self._cache_size -= popped[0]
568                    self._cache_index_offset += 1
569
570        # Pass to callbacks.
571        for call in self._callbacks:
572            self._run_callback_on_entry(call, entry)
573
574        # Dump to our structured log file.
575        # TODO: should set a timer for flushing; don't flush every line.
576        if self._file is not None:
577            entry_s = dataclass_to_json(entry)
578            assert '\n' not in entry_s  # Make sure its a single line.
579            print(entry_s, file=self._file, flush=True)
580
581    def _run_callback_on_entry(
582        self, callback: Callable[[LogEntry], None], entry: LogEntry
583    ) -> None:
584        """Run a callback and handle any errors."""
585        try:
586            callback(entry)
587        except Exception:
588            # Only print the first callback error to avoid insanity.
589            if not self._printed_callback_error:
590                import traceback
591
592                traceback.print_exc(file=self._echofile)
593                self._printed_callback_error = True
594
595
596class FileLogEcho:
597    """A file-like object for forwarding stdout/stderr to a LogHandler."""
598
599    def __init__(
600        self, original: TextIO, name: str, handler: LogHandler
601    ) -> None:
602        assert name in ('stdout', 'stderr')
603        self._original = original
604        self._name = name
605        self._handler = handler
606
607    def write(self, output: Any) -> None:
608        """Override standard write call."""
609        self._original.write(output)
610        self._handler.file_write(self._name, output)
611
612    def flush(self) -> None:
613        """Flush the file."""
614        self._original.flush()
615
616        # We also use this as a hint to ship whatever file chunks
617        # we've accumulated (we have to try and be smart about breaking
618        # our arbitrary file output into discrete entries).
619        self._handler.file_flush(self._name)
620
621    def isatty(self) -> bool:
622        """Are we a terminal?"""
623        return self._original.isatty()
624
625
626def setup_logging(
627    log_path: str | Path | None,
628    level: LogLevel,
629    suppress_non_root_debug: bool = False,
630    log_stdout_stderr: bool = False,
631    echo_to_stderr: bool = True,
632    cache_size_limit: int = 0,
633    cache_time_limit: datetime.timedelta | None = None,
634) -> LogHandler:
635    """Set up our logging environment.
636
637    Returns the custom handler which can be used to fetch information
638    about logs that have passed through it. (worst log-levels, caches, etc.).
639    """
640
641    lmap = {
642        LogLevel.DEBUG: logging.DEBUG,
643        LogLevel.INFO: logging.INFO,
644        LogLevel.WARNING: logging.WARNING,
645        LogLevel.ERROR: logging.ERROR,
646        LogLevel.CRITICAL: logging.CRITICAL,
647    }
648
649    # Wire logger output to go to a structured log file.
650    # Also echo it to stderr IF we're running in a terminal.
651    # UPDATE: Actually gonna always go to stderr. Is there a
652    # reason we shouldn't? This makes debugging possible if all
653    # we have is access to a non-interactive terminal or file dump.
654    # We could add a '--quiet' arg or whatnot to change this behavior.
655
656    # Note: by passing in the *original* stderr here before we
657    # (potentially) replace it, we ensure that our log echos
658    # won't themselves be intercepted and sent to the logger
659    # which would create an infinite loop.
660    loghandler = LogHandler(
661        log_path,
662        echofile=sys.stderr if echo_to_stderr else None,
663        suppress_non_root_debug=suppress_non_root_debug,
664        cache_size_limit=cache_size_limit,
665        cache_time_limit=cache_time_limit,
666    )
667
668    # Note: going ahead with force=True here so that we replace any
669    # existing logger. Though we warn if it looks like we are doing
670    # that so we can try to avoid creating the first one.
671    had_previous_handlers = bool(logging.root.handlers)
672    logging.basicConfig(
673        level=lmap[level],
674        # format='%(name)s: %(message)s',
675        # We dump *only* the message here. We pass various log record bits
676        # around and format things fancier where they end up.
677        format='%(message)s',
678        handlers=[loghandler],
679        force=True,
680    )
681    if had_previous_handlers:
682        logging.warning(
683            'setup_logging: Replacing existing handlers.'
684            ' Something may have logged before expected.'
685        )
686
687    # Optionally intercept Python's stdout/stderr output and generate
688    # log entries from it.
689    if log_stdout_stderr:
690        sys.stdout = FileLogEcho(  # type: ignore
691            sys.stdout, 'stdout', loghandler
692        )
693        sys.stderr = FileLogEcho(  # type: ignore
694            sys.stderr, 'stderr', loghandler
695        )
696
697    return loghandler
class LogLevel(enum.Enum):
30class LogLevel(Enum):
31    """Severity level for a log entry.
32
33    These enums have numeric values so they can be compared in severity.
34    Note that these values are not currently interchangeable with the
35    logging.ERROR, logging.DEBUG, etc. values.
36    """
37
38    DEBUG = 0
39    INFO = 1
40    WARNING = 2
41    ERROR = 3
42    CRITICAL = 4
43
44    @property
45    def python_logging_level(self) -> int:
46        """Give the corresponding logging level."""
47        return LOG_LEVEL_LEVELNOS[self]
48
49    @classmethod
50    def from_python_logging_level(cls, levelno: int) -> LogLevel:
51        """Given a Python logging level, return a LogLevel."""
52        return LEVELNO_LOG_LEVELS[levelno]

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

DEBUG = <LogLevel.DEBUG: 0>
INFO = <LogLevel.INFO: 1>
WARNING = <LogLevel.WARNING: 2>
ERROR = <LogLevel.ERROR: 3>
CRITICAL = <LogLevel.CRITICAL: 4>
python_logging_level: int
44    @property
45    def python_logging_level(self) -> int:
46        """Give the corresponding logging level."""
47        return LOG_LEVEL_LEVELNOS[self]

Give the corresponding logging level.

@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
49    @classmethod
50    def from_python_logging_level(cls, levelno: int) -> LogLevel:
51        """Given a Python logging level, return a LogLevel."""
52        return LEVELNO_LOG_LEVELS[levelno]

Given a Python logging level, return a LogLevel.

Inherited Members
enum.Enum
name
value
LOG_LEVEL_LEVELNOS = {<LogLevel.DEBUG: 0>: 10, <LogLevel.INFO: 1>: 20, <LogLevel.WARNING: 2>: 30, <LogLevel.ERROR: 3>: 40, <LogLevel.CRITICAL: 4>: 50}
LEVELNO_LOG_LEVELS = {10: <LogLevel.DEBUG: 0>, 20: <LogLevel.INFO: 1>, 30: <LogLevel.WARNING: 2>, 40: <LogLevel.ERROR: 3>, 50: <LogLevel.CRITICAL: 4>}
LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {10: ('', ''), 20: ('', ''), 30: ('', ''), 40: ('', ''), 50: ('', '')}
@ioprepped
@dataclass
class LogEntry:
82@ioprepped
83@dataclass
84class LogEntry:
85    """Single logged message."""
86
87    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
88    message: Annotated[str, IOAttrs('m')]
89    level: Annotated[LogLevel, IOAttrs('l')]
90    time: Annotated[datetime.datetime, IOAttrs('t')]
91
92    # We support arbitrary string labels per log entry which can be
93    # incorporated into custom log processing. To populate this, our
94    # LogHandler class looks for a 'labels' dict passed in the optional
95    # 'extra' dict arg to standard Python log calls.
96    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
97        field(default_factory=dict)
98    )

Single logged message.

LogEntry( name: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object>], message: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object>], level: typing.Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object>], time: typing.Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object>], labels: typing.Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object>] = <factory>)
name: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x10f70ea50>]
message: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x114402650>]
level: typing.Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object at 0x10d81e2d0>]
time: typing.Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object at 0x114643910>]
labels: typing.Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object at 0x113ae5c10>]
@ioprepped
@dataclass
class LogArchive:
101@ioprepped
102@dataclass
103class LogArchive:
104    """Info and data for a log."""
105
106    # Total number of entries submitted to the log.
107    log_size: Annotated[int, IOAttrs('t')]
108
109    # Offset for the entries contained here.
110    # (10 means our first entry is the 10th in the log, etc.)
111    start_index: Annotated[int, IOAttrs('c')]
112
113    entries: Annotated[list[LogEntry], IOAttrs('e')]

Info and data for a log.

LogArchive( log_size: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object>], start_index: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object>], entries: typing.Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object>])
log_size: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x10f70ec90>]
start_index: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1131ddb10>]
entries: typing.Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object at 0x1147c6890>]
class LogHandler(logging.Handler):
116class LogHandler(logging.Handler):
117    """Fancy-pants handler for logging output.
118
119    Writes logs to disk in structured json format and echoes them
120    to stdout/stderr with pretty colors.
121    """
122
123    _event_loop: asyncio.AbstractEventLoop
124
125    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
126    # Otherwise we can get infinite loops as those prints come back to us
127    # as new log entries.
128
129    def __init__(
130        self,
131        path: str | Path | None,
132        echofile: TextIO | None,
133        suppress_non_root_debug: bool,
134        cache_size_limit: int,
135        cache_time_limit: datetime.timedelta | None,
136    ):
137        super().__init__()
138        # pylint: disable=consider-using-with
139        self._file = None if path is None else open(path, 'w', encoding='utf-8')
140        self._echofile = echofile
141        self._callbacks: list[Callable[[LogEntry], None]] = []
142        self._suppress_non_root_debug = suppress_non_root_debug
143        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
144        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
145            'stdout': None,
146            'stderr': None,
147        }
148        self._cache_size = 0
149        assert cache_size_limit >= 0
150        self._cache_size_limit = cache_size_limit
151        self._cache_time_limit = cache_time_limit
152        self._cache = deque[tuple[int, LogEntry]]()
153        self._cache_index_offset = 0
154        self._cache_lock = Lock()
155        self._printed_callback_error = False
156        self._thread_bootstrapped = False
157        self._thread = Thread(target=self._log_thread_main, daemon=True)
158        if __debug__:
159            self._last_slow_emit_warning_time: float | None = None
160        self._thread.start()
161
162        # Spin until our thread is up and running; otherwise we could
163        # wind up trying to push stuff to our event loop before the
164        # loop exists.
165        while not self._thread_bootstrapped:
166            time.sleep(0.001)
167
168    def add_callback(
169        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
170    ) -> None:
171        """Add a callback to be run for each LogEntry.
172
173        Note that this callback will always run in a background thread.
174        Passing True for feed_existing_logs will cause all cached logs
175        in the handler to be fed to the callback (still in the
176        background thread though).
177        """
178
179        # Kick this over to our bg thread to add the callback and
180        # process cached entries at the same time to ensure there are no
181        # race conditions that could cause entries to be skipped/etc.
182        self._event_loop.call_soon_threadsafe(
183            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
184        )
185
186    def _add_callback_in_thread(
187        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
188    ) -> None:
189        """Add a callback to be run for each LogEntry.
190
191        Note that this callback will always run in a background thread.
192        Passing True for feed_existing_logs will cause all cached logs
193        in the handler to be fed to the callback (still in the
194        background thread though).
195        """
196        assert current_thread() is self._thread
197        self._callbacks.append(call)
198
199        # Run all of our cached entries through the new callback if desired.
200        if feed_existing_logs and self._cache_size_limit > 0:
201            with self._cache_lock:
202                for _id, entry in self._cache:
203                    self._run_callback_on_entry(call, entry)
204
205    def _log_thread_main(self) -> None:
206        self._event_loop = asyncio.new_event_loop()
207
208        # In our background thread event loop we do a fair amount of
209        # slow synchronous stuff such as mucking with the log cache.
210        # Let's avoid getting tons of warnings about this in debug mode.
211        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
212
213        # NOTE: if we ever use default threadpool at all we should allow
214        # setting it for our loop.
215        asyncio.set_event_loop(self._event_loop)
216        self._thread_bootstrapped = True
217        try:
218            if self._cache_time_limit is not None:
219                _prunetask = self._event_loop.create_task(
220                    self._time_prune_cache()
221                )
222            self._event_loop.run_forever()
223        except BaseException:
224            # If this ever goes down we're in trouble; we won't be able
225            # to log about it though. Try to make some noise however we
226            # can.
227            print('LogHandler died!!!', file=sys.stderr)
228            import traceback
229
230            traceback.print_exc()
231            raise
232
233    async def _time_prune_cache(self) -> None:
234        assert self._cache_time_limit is not None
235        while bool(True):
236            await asyncio.sleep(61.27)
237            now = utc_now()
238            with self._cache_lock:
239                # Prune the oldest entry as long as there is a first one that
240                # is too old.
241                while (
242                    self._cache
243                    and (now - self._cache[0][1].time) >= self._cache_time_limit
244                ):
245                    popped = self._cache.popleft()
246                    self._cache_size -= popped[0]
247                    self._cache_index_offset += 1
248
249    def get_cached(
250        self, start_index: int = 0, max_entries: int | None = None
251    ) -> LogArchive:
252        """Build and return an archive of cached log entries.
253
254        This will only include entries that have been processed by the
255        background thread, so may not include just-submitted logs or
256        entries for partially written stdout/stderr lines.
257        Entries from the range [start_index:start_index+max_entries]
258        which are still present in the cache will be returned.
259        """
260
261        assert start_index >= 0
262        if max_entries is not None:
263            assert max_entries >= 0
264        with self._cache_lock:
265            # Transform start_index to our present cache space.
266            start_index -= self._cache_index_offset
267            # Calc end-index in our present cache space.
268            end_index = (
269                len(self._cache)
270                if max_entries is None
271                else start_index + max_entries
272            )
273
274            # Clamp both indexes to both ends of our present space.
275            start_index = max(0, min(start_index, len(self._cache)))
276            end_index = max(0, min(end_index, len(self._cache)))
277
278            return LogArchive(
279                log_size=self._cache_index_offset + len(self._cache),
280                start_index=start_index + self._cache_index_offset,
281                entries=self._cache_slice(start_index, end_index),
282            )
283
284    def _cache_slice(
285        self, start: int, end: int, step: int = 1
286    ) -> list[LogEntry]:
287        # Deque doesn't natively support slicing but we can do it manually.
288        # It sounds like rotating the deque and pulling from the beginning
289        # is the most efficient way to do this. The downside is the deque
290        # gets temporarily modified in the process so we need to make sure
291        # we're holding the lock.
292        assert self._cache_lock.locked()
293        cache = self._cache
294        cache.rotate(-start)
295        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
296        cache.rotate(start)
297        return slc
298
299    @classmethod
300    def _is_immutable_log_data(cls, data: Any) -> bool:
301        if isinstance(data, (str, bool, int, float, bytes)):
302            return True
303        if isinstance(data, tuple):
304            return all(cls._is_immutable_log_data(x) for x in data)
305        return False
306
307    def call_in_thread(self, call: Callable[[], Any]) -> None:
308        """Submit a call to be run in the logging background thread."""
309        self._event_loop.call_soon_threadsafe(call)
310
311    @override
312    def emit(self, record: logging.LogRecord) -> None:
313        # pylint: disable=too-many-branches
314        if __debug__:
315            starttime = time.monotonic()
316
317        # Called by logging to send us records.
318
319        # TODO - kill this.
320        if (
321            self._suppress_non_root_debug
322            and record.name != 'root'
323            and record.levelname == 'DEBUG'
324        ):
325            return
326
327        # Optimization: if our log args are all simple immutable values,
328        # we can just kick the whole thing over to our background thread to
329        # be formatted there at our leisure. If anything is mutable and
330        # thus could possibly change between now and then or if we want
331        # to do immediate file echoing then we need to bite the bullet
332        # and do that stuff here at the call site.
333        fast_path = self._echofile is None and self._is_immutable_log_data(
334            record.args
335        )
336
337        # Note: just assuming types are correct here, but they'll be
338        # checked properly when the resulting LogEntry gets exported.
339        labels: dict[str, str] | None = getattr(record, 'labels', None)
340        if labels is None:
341            labels = {}
342
343        if fast_path:
344            if __debug__:
345                formattime = echotime = time.monotonic()
346            self._event_loop.call_soon_threadsafe(
347                tpartial(
348                    self._emit_in_thread,
349                    record.name,
350                    record.levelno,
351                    record.created,
352                    record,
353                    labels,
354                )
355            )
356        else:
357            # Slow case; do formatting and echoing here at the log call
358            # site.
359            msg = self.format(record)
360
361            if __debug__:
362                formattime = time.monotonic()
363
364            # Also immediately print pretty colored output to our echo file
365            # (generally stderr). We do this part here instead of in our bg
366            # thread because the delay can throw off command line prompts or
367            # make tight debugging harder.
368            if self._echofile is not None:
369                ends = LEVELNO_COLOR_CODES.get(record.levelno)
370                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
371                if ends is not None:
372                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
373                else:
374                    self._echofile.write(f'{namepre}{msg}\n')
375                self._echofile.flush()
376
377            if __debug__:
378                echotime = time.monotonic()
379
380            self._event_loop.call_soon_threadsafe(
381                tpartial(
382                    self._emit_in_thread,
383                    record.name,
384                    record.levelno,
385                    record.created,
386                    msg,
387                    labels,
388                )
389            )
390
391        if __debug__:
392            # Make noise if we're taking a significant amount of time here.
393            # Limit the noise to once every so often though; otherwise we
394            # could get a feedback loop where every log emit results in a
395            # warning log which results in another, etc.
396            now = time.monotonic()
397            # noinspection PyUnboundLocalVariable
398            duration = now - starttime  # pyright: ignore
399            # noinspection PyUnboundLocalVariable
400            format_duration = formattime - starttime  # pyright: ignore
401            # noinspection PyUnboundLocalVariable
402            echo_duration = echotime - formattime  # pyright: ignore
403            if duration > 0.05 and (
404                self._last_slow_emit_warning_time is None
405                or now > self._last_slow_emit_warning_time + 10.0
406            ):
407                # Logging calls from *within* a logging handler
408                # sounds sketchy, so let's just kick this over to
409                # the bg event loop thread we've already got.
410                self._last_slow_emit_warning_time = now
411                self._event_loop.call_soon_threadsafe(
412                    tpartial(
413                        logging.warning,
414                        'efro.log.LogHandler emit took too long'
415                        ' (%.2fs total; %.2fs format, %.2fs echo,'
416                        ' fast_path=%s).',
417                        duration,
418                        format_duration,
419                        echo_duration,
420                        fast_path,
421                    )
422                )
423
424    def _emit_in_thread(
425        self,
426        name: str,
427        levelno: int,
428        created: float,
429        message: str | logging.LogRecord,
430        labels: dict[str, str],
431    ) -> None:
432        try:
433            # If they passed a raw record here, bake it down to a string.
434            if isinstance(message, logging.LogRecord):
435                message = self.format(message)
436
437            self._emit_entry(
438                LogEntry(
439                    name=name,
440                    message=message,
441                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
442                    time=datetime.datetime.fromtimestamp(
443                        created, datetime.timezone.utc
444                    ),
445                    labels=labels,
446                )
447            )
448        except Exception:
449            import traceback
450
451            traceback.print_exc(file=self._echofile)
452
453    def file_write(self, name: str, output: str) -> None:
454        """Send raw stdout/stderr output to the logger to be collated."""
455
456        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
457        # lines in stack traces get written as lots of individual '^'
458        # writes. It feels a bit dirty to be pushing a deferred call to
459        # another thread for each character. Perhaps should do some sort
460        # of basic accumulation here?
461        self._event_loop.call_soon_threadsafe(
462            tpartial(self._file_write_in_thread, name, output)
463        )
464
465    def _file_write_in_thread(self, name: str, output: str) -> None:
466        try:
467            assert name in ('stdout', 'stderr')
468
469            # Here we try to be somewhat smart about breaking arbitrary
470            # print output into discrete log entries.
471
472            self._file_chunks[name].append(output)
473
474            # Individual parts of a print come across as separate writes,
475            # and the end of a print will be a standalone '\n' by default.
476            # Let's use that as a hint that we're likely at the end of
477            # a full print statement and ship what we've got.
478            if output == '\n':
479                self._ship_file_chunks(name, cancel_ship_task=True)
480            else:
481                # By default just keep adding chunks.
482                # However we keep a timer running anytime we've got
483                # unshipped chunks so that we can ship what we've got
484                # after a short bit if we never get a newline.
485                ship_task = self._file_chunk_ship_task[name]
486                if ship_task is None:
487                    self._file_chunk_ship_task[name] = (
488                        self._event_loop.create_task(
489                            self._ship_chunks_task(name),
490                            name='log ship file chunks',
491                        )
492                    )
493
494        except Exception:
495            import traceback
496
497            traceback.print_exc(file=self._echofile)
498
499    def file_flush(self, name: str) -> None:
500        """Send raw stdout/stderr flush to the logger to be collated."""
501
502        self._event_loop.call_soon_threadsafe(
503            tpartial(self._file_flush_in_thread, name)
504        )
505
506    def _file_flush_in_thread(self, name: str) -> None:
507        try:
508            assert name in ('stdout', 'stderr')
509
510            # Immediately ship whatever chunks we've got.
511            if self._file_chunks[name]:
512                self._ship_file_chunks(name, cancel_ship_task=True)
513
514        except Exception:
515            import traceback
516
517            traceback.print_exc(file=self._echofile)
518
519    async def _ship_chunks_task(self, name: str) -> None:
520        # Note: it's important we sleep here for a moment. Otherwise,
521        # things like '^^^^^^^^^^^^' lines in stack traces, which come
522        # through as lots of individual '^' writes, tend to get broken
523        # into lots of tiny little lines by us.
524        await asyncio.sleep(0.01)
525        self._ship_file_chunks(name, cancel_ship_task=False)
526
527    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
528        # Note: Raw print input generally ends in a newline, but that is
529        # redundant when we break things into log entries and results in
530        # extra empty lines. So strip off a single trailing newline if
531        # one is present.
532        text = ''.join(self._file_chunks[name]).removesuffix('\n')
533
534        self._emit_entry(
535            LogEntry(
536                name=name, message=text, level=LogLevel.INFO, time=utc_now()
537            )
538        )
539        self._file_chunks[name] = []
540        ship_task = self._file_chunk_ship_task[name]
541        if cancel_ship_task and ship_task is not None:
542            ship_task.cancel()
543        self._file_chunk_ship_task[name] = None
544
545    def _emit_entry(self, entry: LogEntry) -> None:
546        assert current_thread() is self._thread
547
548        # Store to our cache.
549        if self._cache_size_limit > 0:
550            with self._cache_lock:
551                # Do a rough calc of how many bytes this entry consumes.
552                entry_size = sum(
553                    sys.getsizeof(x)
554                    for x in (
555                        entry,
556                        entry.name,
557                        entry.message,
558                        entry.level,
559                        entry.time,
560                    )
561                )
562                self._cache.append((entry_size, entry))
563                self._cache_size += entry_size
564
565                # Prune old until we are back at or under our limit.
566                while self._cache_size > self._cache_size_limit:
567                    popped = self._cache.popleft()
568                    self._cache_size -= popped[0]
569                    self._cache_index_offset += 1
570
571        # Pass to callbacks.
572        for call in self._callbacks:
573            self._run_callback_on_entry(call, entry)
574
575        # Dump to our structured log file.
576        # TODO: should set a timer for flushing; don't flush every line.
577        if self._file is not None:
578            entry_s = dataclass_to_json(entry)
579            assert '\n' not in entry_s  # Make sure its a single line.
580            print(entry_s, file=self._file, flush=True)
581
582    def _run_callback_on_entry(
583        self, callback: Callable[[LogEntry], None], entry: LogEntry
584    ) -> None:
585        """Run a callback and handle any errors."""
586        try:
587            callback(entry)
588        except Exception:
589            # Only print the first callback error to avoid insanity.
590            if not self._printed_callback_error:
591                import traceback
592
593                traceback.print_exc(file=self._echofile)
594                self._printed_callback_error = True

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

LogHandler( path: str | pathlib.Path | None, echofile: typing.TextIO | None, suppress_non_root_debug: bool, cache_size_limit: int, cache_time_limit: datetime.timedelta | None)
129    def __init__(
130        self,
131        path: str | Path | None,
132        echofile: TextIO | None,
133        suppress_non_root_debug: bool,
134        cache_size_limit: int,
135        cache_time_limit: datetime.timedelta | None,
136    ):
137        super().__init__()
138        # pylint: disable=consider-using-with
139        self._file = None if path is None else open(path, 'w', encoding='utf-8')
140        self._echofile = echofile
141        self._callbacks: list[Callable[[LogEntry], None]] = []
142        self._suppress_non_root_debug = suppress_non_root_debug
143        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
144        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
145            'stdout': None,
146            'stderr': None,
147        }
148        self._cache_size = 0
149        assert cache_size_limit >= 0
150        self._cache_size_limit = cache_size_limit
151        self._cache_time_limit = cache_time_limit
152        self._cache = deque[tuple[int, LogEntry]]()
153        self._cache_index_offset = 0
154        self._cache_lock = Lock()
155        self._printed_callback_error = False
156        self._thread_bootstrapped = False
157        self._thread = Thread(target=self._log_thread_main, daemon=True)
158        if __debug__:
159            self._last_slow_emit_warning_time: float | None = None
160        self._thread.start()
161
162        # Spin until our thread is up and running; otherwise we could
163        # wind up trying to push stuff to our event loop before the
164        # loop exists.
165        while not self._thread_bootstrapped:
166            time.sleep(0.001)

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def add_callback( self, call: Callable[[LogEntry], NoneType], feed_existing_logs: bool = False) -> None:
168    def add_callback(
169        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
170    ) -> None:
171        """Add a callback to be run for each LogEntry.
172
173        Note that this callback will always run in a background thread.
174        Passing True for feed_existing_logs will cause all cached logs
175        in the handler to be fed to the callback (still in the
176        background thread though).
177        """
178
179        # Kick this over to our bg thread to add the callback and
180        # process cached entries at the same time to ensure there are no
181        # race conditions that could cause entries to be skipped/etc.
182        self._event_loop.call_soon_threadsafe(
183            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
184        )

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

def get_cached( self, start_index: int = 0, max_entries: int | None = None) -> LogArchive:
249    def get_cached(
250        self, start_index: int = 0, max_entries: int | None = None
251    ) -> LogArchive:
252        """Build and return an archive of cached log entries.
253
254        This will only include entries that have been processed by the
255        background thread, so may not include just-submitted logs or
256        entries for partially written stdout/stderr lines.
257        Entries from the range [start_index:start_index+max_entries]
258        which are still present in the cache will be returned.
259        """
260
261        assert start_index >= 0
262        if max_entries is not None:
263            assert max_entries >= 0
264        with self._cache_lock:
265            # Transform start_index to our present cache space.
266            start_index -= self._cache_index_offset
267            # Calc end-index in our present cache space.
268            end_index = (
269                len(self._cache)
270                if max_entries is None
271                else start_index + max_entries
272            )
273
274            # Clamp both indexes to both ends of our present space.
275            start_index = max(0, min(start_index, len(self._cache)))
276            end_index = max(0, min(end_index, len(self._cache)))
277
278            return LogArchive(
279                log_size=self._cache_index_offset + len(self._cache),
280                start_index=start_index + self._cache_index_offset,
281                entries=self._cache_slice(start_index, end_index),
282            )

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

def call_in_thread(self, call: Callable[[], Any]) -> None:
307    def call_in_thread(self, call: Callable[[], Any]) -> None:
308        """Submit a call to be run in the logging background thread."""
309        self._event_loop.call_soon_threadsafe(call)

Submit a call to be run in the logging background thread.

@override
def emit(self, record: logging.LogRecord) -> None:
311    @override
312    def emit(self, record: logging.LogRecord) -> None:
313        # pylint: disable=too-many-branches
314        if __debug__:
315            starttime = time.monotonic()
316
317        # Called by logging to send us records.
318
319        # TODO - kill this.
320        if (
321            self._suppress_non_root_debug
322            and record.name != 'root'
323            and record.levelname == 'DEBUG'
324        ):
325            return
326
327        # Optimization: if our log args are all simple immutable values,
328        # we can just kick the whole thing over to our background thread to
329        # be formatted there at our leisure. If anything is mutable and
330        # thus could possibly change between now and then or if we want
331        # to do immediate file echoing then we need to bite the bullet
332        # and do that stuff here at the call site.
333        fast_path = self._echofile is None and self._is_immutable_log_data(
334            record.args
335        )
336
337        # Note: just assuming types are correct here, but they'll be
338        # checked properly when the resulting LogEntry gets exported.
339        labels: dict[str, str] | None = getattr(record, 'labels', None)
340        if labels is None:
341            labels = {}
342
343        if fast_path:
344            if __debug__:
345                formattime = echotime = time.monotonic()
346            self._event_loop.call_soon_threadsafe(
347                tpartial(
348                    self._emit_in_thread,
349                    record.name,
350                    record.levelno,
351                    record.created,
352                    record,
353                    labels,
354                )
355            )
356        else:
357            # Slow case; do formatting and echoing here at the log call
358            # site.
359            msg = self.format(record)
360
361            if __debug__:
362                formattime = time.monotonic()
363
364            # Also immediately print pretty colored output to our echo file
365            # (generally stderr). We do this part here instead of in our bg
366            # thread because the delay can throw off command line prompts or
367            # make tight debugging harder.
368            if self._echofile is not None:
369                ends = LEVELNO_COLOR_CODES.get(record.levelno)
370                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
371                if ends is not None:
372                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
373                else:
374                    self._echofile.write(f'{namepre}{msg}\n')
375                self._echofile.flush()
376
377            if __debug__:
378                echotime = time.monotonic()
379
380            self._event_loop.call_soon_threadsafe(
381                tpartial(
382                    self._emit_in_thread,
383                    record.name,
384                    record.levelno,
385                    record.created,
386                    msg,
387                    labels,
388                )
389            )
390
391        if __debug__:
392            # Make noise if we're taking a significant amount of time here.
393            # Limit the noise to once every so often though; otherwise we
394            # could get a feedback loop where every log emit results in a
395            # warning log which results in another, etc.
396            now = time.monotonic()
397            # noinspection PyUnboundLocalVariable
398            duration = now - starttime  # pyright: ignore
399            # noinspection PyUnboundLocalVariable
400            format_duration = formattime - starttime  # pyright: ignore
401            # noinspection PyUnboundLocalVariable
402            echo_duration = echotime - formattime  # pyright: ignore
403            if duration > 0.05 and (
404                self._last_slow_emit_warning_time is None
405                or now > self._last_slow_emit_warning_time + 10.0
406            ):
407                # Logging calls from *within* a logging handler
408                # sounds sketchy, so let's just kick this over to
409                # the bg event loop thread we've already got.
410                self._last_slow_emit_warning_time = now
411                self._event_loop.call_soon_threadsafe(
412                    tpartial(
413                        logging.warning,
414                        'efro.log.LogHandler emit took too long'
415                        ' (%.2fs total; %.2fs format, %.2fs echo,'
416                        ' fast_path=%s).',
417                        duration,
418                        format_duration,
419                        echo_duration,
420                        fast_path,
421                    )
422                )

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

def file_write(self, name: str, output: str) -> None:
453    def file_write(self, name: str, output: str) -> None:
454        """Send raw stdout/stderr output to the logger to be collated."""
455
456        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
457        # lines in stack traces get written as lots of individual '^'
458        # writes. It feels a bit dirty to be pushing a deferred call to
459        # another thread for each character. Perhaps should do some sort
460        # of basic accumulation here?
461        self._event_loop.call_soon_threadsafe(
462            tpartial(self._file_write_in_thread, name, output)
463        )

Send raw stdout/stderr output to the logger to be collated.

def file_flush(self, name: str) -> None:
499    def file_flush(self, name: str) -> None:
500        """Send raw stdout/stderr flush to the logger to be collated."""
501
502        self._event_loop.call_soon_threadsafe(
503            tpartial(self._file_flush_in_thread, name)
504        )

Send raw stdout/stderr flush to the logger to be collated.

Inherited Members
logging.Handler
level
formatter
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
close
handleError
logging.Filterer
filters
addFilter
removeFilter
filter
class FileLogEcho:
597class FileLogEcho:
598    """A file-like object for forwarding stdout/stderr to a LogHandler."""
599
600    def __init__(
601        self, original: TextIO, name: str, handler: LogHandler
602    ) -> None:
603        assert name in ('stdout', 'stderr')
604        self._original = original
605        self._name = name
606        self._handler = handler
607
608    def write(self, output: Any) -> None:
609        """Override standard write call."""
610        self._original.write(output)
611        self._handler.file_write(self._name, output)
612
613    def flush(self) -> None:
614        """Flush the file."""
615        self._original.flush()
616
617        # We also use this as a hint to ship whatever file chunks
618        # we've accumulated (we have to try and be smart about breaking
619        # our arbitrary file output into discrete entries).
620        self._handler.file_flush(self._name)
621
622    def isatty(self) -> bool:
623        """Are we a terminal?"""
624        return self._original.isatty()

A file-like object for forwarding stdout/stderr to a LogHandler.

FileLogEcho(original: <class 'TextIO'>, name: str, handler: LogHandler)
600    def __init__(
601        self, original: TextIO, name: str, handler: LogHandler
602    ) -> None:
603        assert name in ('stdout', 'stderr')
604        self._original = original
605        self._name = name
606        self._handler = handler
def write(self, output: Any) -> None:
608    def write(self, output: Any) -> None:
609        """Override standard write call."""
610        self._original.write(output)
611        self._handler.file_write(self._name, output)

Override standard write call.

def flush(self) -> None:
613    def flush(self) -> None:
614        """Flush the file."""
615        self._original.flush()
616
617        # We also use this as a hint to ship whatever file chunks
618        # we've accumulated (we have to try and be smart about breaking
619        # our arbitrary file output into discrete entries).
620        self._handler.file_flush(self._name)

Flush the file.

def isatty(self) -> bool:
622    def isatty(self) -> bool:
623        """Are we a terminal?"""
624        return self._original.isatty()

Are we a terminal?

def setup_logging( log_path: str | pathlib.Path | None, level: LogLevel, suppress_non_root_debug: bool = False, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None) -> LogHandler:
627def setup_logging(
628    log_path: str | Path | None,
629    level: LogLevel,
630    suppress_non_root_debug: bool = False,
631    log_stdout_stderr: bool = False,
632    echo_to_stderr: bool = True,
633    cache_size_limit: int = 0,
634    cache_time_limit: datetime.timedelta | None = None,
635) -> LogHandler:
636    """Set up our logging environment.
637
638    Returns the custom handler which can be used to fetch information
639    about logs that have passed through it. (worst log-levels, caches, etc.).
640    """
641
642    lmap = {
643        LogLevel.DEBUG: logging.DEBUG,
644        LogLevel.INFO: logging.INFO,
645        LogLevel.WARNING: logging.WARNING,
646        LogLevel.ERROR: logging.ERROR,
647        LogLevel.CRITICAL: logging.CRITICAL,
648    }
649
650    # Wire logger output to go to a structured log file.
651    # Also echo it to stderr IF we're running in a terminal.
652    # UPDATE: Actually gonna always go to stderr. Is there a
653    # reason we shouldn't? This makes debugging possible if all
654    # we have is access to a non-interactive terminal or file dump.
655    # We could add a '--quiet' arg or whatnot to change this behavior.
656
657    # Note: by passing in the *original* stderr here before we
658    # (potentially) replace it, we ensure that our log echos
659    # won't themselves be intercepted and sent to the logger
660    # which would create an infinite loop.
661    loghandler = LogHandler(
662        log_path,
663        echofile=sys.stderr if echo_to_stderr else None,
664        suppress_non_root_debug=suppress_non_root_debug,
665        cache_size_limit=cache_size_limit,
666        cache_time_limit=cache_time_limit,
667    )
668
669    # Note: going ahead with force=True here so that we replace any
670    # existing logger. Though we warn if it looks like we are doing
671    # that so we can try to avoid creating the first one.
672    had_previous_handlers = bool(logging.root.handlers)
673    logging.basicConfig(
674        level=lmap[level],
675        # format='%(name)s: %(message)s',
676        # We dump *only* the message here. We pass various log record bits
677        # around and format things fancier where they end up.
678        format='%(message)s',
679        handlers=[loghandler],
680        force=True,
681    )
682    if had_previous_handlers:
683        logging.warning(
684            'setup_logging: Replacing existing handlers.'
685            ' Something may have logged before expected.'
686        )
687
688    # Optionally intercept Python's stdout/stderr output and generate
689    # log entries from it.
690    if log_stdout_stderr:
691        sys.stdout = FileLogEcho(  # type: ignore
692            sys.stdout, 'stdout', loghandler
693        )
694        sys.stderr = FileLogEcho(  # type: ignore
695            sys.stderr, 'stderr', loghandler
696        )
697
698    return loghandler

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).