efro.log

Logging functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Logging functionality."""
  4from __future__ import annotations
  5
  6import sys
  7import time
  8import asyncio
  9import logging
 10import datetime
 11import itertools
 12from enum import Enum
 13from collections import deque
 14from dataclasses import dataclass, field
 15from typing import TYPE_CHECKING, Annotated, override
 16from threading import Thread, current_thread, Lock
 17
 18from efro.util import utc_now
 19from efro.call import tpartial
 20from efro.terminal import Clr
 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
 22
 23if TYPE_CHECKING:
 24    from pathlib import Path
 25    from typing import Any, Callable, TextIO
 26
 27
 28class LogLevel(Enum):
 29    """Severity level for a log entry.
 30
 31    These enums have numeric values so they can be compared in severity.
 32    Note that these values are not currently interchangeable with the
 33    logging.ERROR, logging.DEBUG, etc. values.
 34    """
 35
 36    DEBUG = 0
 37    INFO = 1
 38    WARNING = 2
 39    ERROR = 3
 40    CRITICAL = 4
 41
 42    @property
 43    def python_logging_level(self) -> int:
 44        """Give the corresponding logging level."""
 45        return LOG_LEVEL_LEVELNOS[self]
 46
 47    @classmethod
 48    def from_python_logging_level(cls, levelno: int) -> LogLevel:
 49        """Given a Python logging level, return a LogLevel."""
 50        return LEVELNO_LOG_LEVELS[levelno]
 51
 52
 53# Python logging levels from LogLevels
 54LOG_LEVEL_LEVELNOS = {
 55    LogLevel.DEBUG: logging.DEBUG,
 56    LogLevel.INFO: logging.INFO,
 57    LogLevel.WARNING: logging.WARNING,
 58    LogLevel.ERROR: logging.ERROR,
 59    LogLevel.CRITICAL: logging.CRITICAL,
 60}
 61
 62# LogLevels from Python logging levels
 63LEVELNO_LOG_LEVELS = {
 64    logging.DEBUG: LogLevel.DEBUG,
 65    logging.INFO: LogLevel.INFO,
 66    logging.WARNING: LogLevel.WARNING,
 67    logging.ERROR: LogLevel.ERROR,
 68    logging.CRITICAL: LogLevel.CRITICAL,
 69}
 70
 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {
 72    logging.DEBUG: (Clr.CYN, Clr.RST),
 73    logging.INFO: ('', ''),
 74    logging.WARNING: (Clr.YLW, Clr.RST),
 75    logging.ERROR: (Clr.RED, Clr.RST),
 76    logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST),
 77}
 78
 79
 80@ioprepped
 81@dataclass
 82class LogEntry:
 83    """Single logged message."""
 84
 85    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
 86    message: Annotated[str, IOAttrs('m')]
 87    level: Annotated[LogLevel, IOAttrs('l')]
 88    time: Annotated[datetime.datetime, IOAttrs('t')]
 89
 90    # We support arbitrary string labels per log entry which can be
 91    # incorporated into custom log processing. To populate this, our
 92    # LogHandler class looks for a 'labels' dict passed in the optional
 93    # 'extra' dict arg to standard Python log calls.
 94    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
 95        field(default_factory=dict)
 96    )
 97
 98
 99@ioprepped
100@dataclass
101class LogArchive:
102    """Info and data for a log."""
103
104    # Total number of entries submitted to the log.
105    log_size: Annotated[int, IOAttrs('t')]
106
107    # Offset for the entries contained here.
108    # (10 means our first entry is the 10th in the log, etc.)
109    start_index: Annotated[int, IOAttrs('c')]
110
111    entries: Annotated[list[LogEntry], IOAttrs('e')]
112
113
114class LogHandler(logging.Handler):
115    """Fancy-pants handler for logging output.
116
117    Writes logs to disk in structured json format and echoes them
118    to stdout/stderr with pretty colors.
119    """
120
121    _event_loop: asyncio.AbstractEventLoop
122
123    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
124    # Otherwise we can get infinite loops as those prints come back to us
125    # as new log entries.
126
127    def __init__(
128        self,
129        path: str | Path | None,
130        echofile: TextIO | None,
131        suppress_non_root_debug: bool,
132        cache_size_limit: int,
133        cache_time_limit: datetime.timedelta | None,
134    ):
135        super().__init__()
136        # pylint: disable=consider-using-with
137        self._file = None if path is None else open(path, 'w', encoding='utf-8')
138        self._echofile = echofile
139        self._callbacks: list[Callable[[LogEntry], None]] = []
140        self._suppress_non_root_debug = suppress_non_root_debug
141        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
142        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
143            'stdout': None,
144            'stderr': None,
145        }
146        self._cache_size = 0
147        assert cache_size_limit >= 0
148        self._cache_size_limit = cache_size_limit
149        self._cache_time_limit = cache_time_limit
150        self._cache = deque[tuple[int, LogEntry]]()
151        self._cache_index_offset = 0
152        self._cache_lock = Lock()
153        self._printed_callback_error = False
154        self._thread_bootstrapped = False
155        self._thread = Thread(target=self._log_thread_main, daemon=True)
156        if __debug__:
157            self._last_slow_emit_warning_time: float | None = None
158        self._thread.start()
159
160        # Spin until our thread is up and running; otherwise we could
161        # wind up trying to push stuff to our event loop before the
162        # loop exists.
163        while not self._thread_bootstrapped:
164            time.sleep(0.001)
165
166    def add_callback(
167        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
168    ) -> None:
169        """Add a callback to be run for each LogEntry.
170
171        Note that this callback will always run in a background thread.
172        Passing True for feed_existing_logs will cause all cached logs
173        in the handler to be fed to the callback (still in the
174        background thread though).
175        """
176
177        # Kick this over to our bg thread to add the callback and
178        # process cached entries at the same time to ensure there are no
179        # race conditions that could cause entries to be skipped/etc.
180        self._event_loop.call_soon_threadsafe(
181            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
182        )
183
184    def _add_callback_in_thread(
185        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
186    ) -> None:
187        """Add a callback to be run for each LogEntry.
188
189        Note that this callback will always run in a background thread.
190        Passing True for feed_existing_logs will cause all cached logs
191        in the handler to be fed to the callback (still in the
192        background thread though).
193        """
194        assert current_thread() is self._thread
195        self._callbacks.append(call)
196
197        # Run all of our cached entries through the new callback if desired.
198        if feed_existing_logs and self._cache_size_limit > 0:
199            with self._cache_lock:
200                for _id, entry in self._cache:
201                    self._run_callback_on_entry(call, entry)
202
203    def _log_thread_main(self) -> None:
204        self._event_loop = asyncio.new_event_loop()
205
206        # In our background thread event loop we do a fair amount of
207        # slow synchronous stuff such as mucking with the log cache.
208        # Let's avoid getting tons of warnings about this in debug mode.
209        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
210
211        # NOTE: if we ever use default threadpool at all we should allow
212        # setting it for our loop.
213        asyncio.set_event_loop(self._event_loop)
214        self._thread_bootstrapped = True
215        try:
216            if self._cache_time_limit is not None:
217                _prunetask = self._event_loop.create_task(
218                    self._time_prune_cache()
219                )
220            self._event_loop.run_forever()
221        except BaseException:
222            # If this ever goes down we're in trouble; we won't be able
223            # to log about it though. Try to make some noise however we
224            # can.
225            print('LogHandler died!!!', file=sys.stderr)
226            import traceback
227
228            traceback.print_exc()
229            raise
230
231    async def _time_prune_cache(self) -> None:
232        assert self._cache_time_limit is not None
233        while bool(True):
234            await asyncio.sleep(61.27)
235            now = utc_now()
236            with self._cache_lock:
237                # Prune the oldest entry as long as there is a first one that
238                # is too old.
239                while (
240                    self._cache
241                    and (now - self._cache[0][1].time) >= self._cache_time_limit
242                ):
243                    popped = self._cache.popleft()
244                    self._cache_size -= popped[0]
245                    self._cache_index_offset += 1
246
247    def get_cached(
248        self, start_index: int = 0, max_entries: int | None = None
249    ) -> LogArchive:
250        """Build and return an archive of cached log entries.
251
252        This will only include entries that have been processed by the
253        background thread, so may not include just-submitted logs or
254        entries for partially written stdout/stderr lines.
255        Entries from the range [start_index:start_index+max_entries]
256        which are still present in the cache will be returned.
257        """
258
259        assert start_index >= 0
260        if max_entries is not None:
261            assert max_entries >= 0
262        with self._cache_lock:
263            # Transform start_index to our present cache space.
264            start_index -= self._cache_index_offset
265            # Calc end-index in our present cache space.
266            end_index = (
267                len(self._cache)
268                if max_entries is None
269                else start_index + max_entries
270            )
271
272            # Clamp both indexes to both ends of our present space.
273            start_index = max(0, min(start_index, len(self._cache)))
274            end_index = max(0, min(end_index, len(self._cache)))
275
276            return LogArchive(
277                log_size=self._cache_index_offset + len(self._cache),
278                start_index=start_index + self._cache_index_offset,
279                entries=self._cache_slice(start_index, end_index),
280            )
281
282    def _cache_slice(
283        self, start: int, end: int, step: int = 1
284    ) -> list[LogEntry]:
285        # Deque doesn't natively support slicing but we can do it manually.
286        # It sounds like rotating the deque and pulling from the beginning
287        # is the most efficient way to do this. The downside is the deque
288        # gets temporarily modified in the process so we need to make sure
289        # we're holding the lock.
290        assert self._cache_lock.locked()
291        cache = self._cache
292        cache.rotate(-start)
293        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
294        cache.rotate(start)
295        return slc
296
297    @classmethod
298    def _is_immutable_log_data(cls, data: Any) -> bool:
299        if isinstance(data, (str, bool, int, float, bytes)):
300            return True
301        if isinstance(data, tuple):
302            return all(cls._is_immutable_log_data(x) for x in data)
303        return False
304
305    def call_in_thread(self, call: Callable[[], Any]) -> None:
306        """Submit a call to be run in the logging background thread."""
307        self._event_loop.call_soon_threadsafe(call)
308
309    @override
310    def emit(self, record: logging.LogRecord) -> None:
311        # pylint: disable=too-many-branches
312        if __debug__:
313            starttime = time.monotonic()
314
315        # Called by logging to send us records.
316
317        # TODO - kill this.
318        if (
319            self._suppress_non_root_debug
320            and record.name != 'root'
321            and record.levelname == 'DEBUG'
322        ):
323            return
324
325        # Optimization: if our log args are all simple immutable values,
326        # we can just kick the whole thing over to our background thread to
327        # be formatted there at our leisure. If anything is mutable and
328        # thus could possibly change between now and then or if we want
329        # to do immediate file echoing then we need to bite the bullet
330        # and do that stuff here at the call site.
331        fast_path = self._echofile is None and self._is_immutable_log_data(
332            record.args
333        )
334
335        # Note: just assuming types are correct here, but they'll be
336        # checked properly when the resulting LogEntry gets exported.
337        labels: dict[str, str] | None = getattr(record, 'labels', None)
338        if labels is None:
339            labels = {}
340
341        if fast_path:
342            if __debug__:
343                formattime = echotime = time.monotonic()
344            self._event_loop.call_soon_threadsafe(
345                tpartial(
346                    self._emit_in_thread,
347                    record.name,
348                    record.levelno,
349                    record.created,
350                    record,
351                    labels,
352                )
353            )
354        else:
355            # Slow case; do formatting and echoing here at the log call
356            # site.
357            msg = self.format(record)
358
359            if __debug__:
360                formattime = time.monotonic()
361
362            # Also immediately print pretty colored output to our echo file
363            # (generally stderr). We do this part here instead of in our bg
364            # thread because the delay can throw off command line prompts or
365            # make tight debugging harder.
366            if self._echofile is not None:
367                ends = LEVELNO_COLOR_CODES.get(record.levelno)
368                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
369                if ends is not None:
370                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
371                else:
372                    self._echofile.write(f'{namepre}{msg}\n')
373                self._echofile.flush()
374
375            if __debug__:
376                echotime = time.monotonic()
377
378            self._event_loop.call_soon_threadsafe(
379                tpartial(
380                    self._emit_in_thread,
381                    record.name,
382                    record.levelno,
383                    record.created,
384                    msg,
385                    labels,
386                )
387            )
388
389        if __debug__:
390            # Make noise if we're taking a significant amount of time here.
391            # Limit the noise to once every so often though; otherwise we
392            # could get a feedback loop where every log emit results in a
393            # warning log which results in another, etc.
394            now = time.monotonic()
395            # noinspection PyUnboundLocalVariable
396            duration = now - starttime  # pyright: ignore
397            # noinspection PyUnboundLocalVariable
398            format_duration = formattime - starttime  # pyright: ignore
399            # noinspection PyUnboundLocalVariable
400            echo_duration = echotime - formattime  # pyright: ignore
401            if duration > 0.05 and (
402                self._last_slow_emit_warning_time is None
403                or now > self._last_slow_emit_warning_time + 10.0
404            ):
405                # Logging calls from *within* a logging handler
406                # sounds sketchy, so let's just kick this over to
407                # the bg event loop thread we've already got.
408                self._last_slow_emit_warning_time = now
409                self._event_loop.call_soon_threadsafe(
410                    tpartial(
411                        logging.warning,
412                        'efro.log.LogHandler emit took too long'
413                        ' (%.2fs total; %.2fs format, %.2fs echo,'
414                        ' fast_path=%s).',
415                        duration,
416                        format_duration,
417                        echo_duration,
418                        fast_path,
419                    )
420                )
421
422    def _emit_in_thread(
423        self,
424        name: str,
425        levelno: int,
426        created: float,
427        message: str | logging.LogRecord,
428        labels: dict[str, str],
429    ) -> None:
430        try:
431            # If they passed a raw record here, bake it down to a string.
432            if isinstance(message, logging.LogRecord):
433                message = self.format(message)
434
435            self._emit_entry(
436                LogEntry(
437                    name=name,
438                    message=message,
439                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
440                    time=datetime.datetime.fromtimestamp(
441                        created, datetime.timezone.utc
442                    ),
443                    labels=labels,
444                )
445            )
446        except Exception:
447            import traceback
448
449            traceback.print_exc(file=self._echofile)
450
451    def file_write(self, name: str, output: str) -> None:
452        """Send raw stdout/stderr output to the logger to be collated."""
453
454        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
455        # lines in stack traces get written as lots of individual '^'
456        # writes. It feels a bit dirty to be pushing a deferred call to
457        # another thread for each character. Perhaps should do some sort
458        # of basic accumulation here?
459        self._event_loop.call_soon_threadsafe(
460            tpartial(self._file_write_in_thread, name, output)
461        )
462
463    def _file_write_in_thread(self, name: str, output: str) -> None:
464        try:
465            assert name in ('stdout', 'stderr')
466
467            # Here we try to be somewhat smart about breaking arbitrary
468            # print output into discrete log entries.
469
470            self._file_chunks[name].append(output)
471
472            # Individual parts of a print come across as separate writes,
473            # and the end of a print will be a standalone '\n' by default.
474            # Let's use that as a hint that we're likely at the end of
475            # a full print statement and ship what we've got.
476            if output == '\n':
477                self._ship_file_chunks(name, cancel_ship_task=True)
478            else:
479                # By default just keep adding chunks.
480                # However we keep a timer running anytime we've got
481                # unshipped chunks so that we can ship what we've got
482                # after a short bit if we never get a newline.
483                ship_task = self._file_chunk_ship_task[name]
484                if ship_task is None:
485                    self._file_chunk_ship_task[name] = (
486                        self._event_loop.create_task(
487                            self._ship_chunks_task(name),
488                            name='log ship file chunks',
489                        )
490                    )
491
492        except Exception:
493            import traceback
494
495            traceback.print_exc(file=self._echofile)
496
497    def file_flush(self, name: str) -> None:
498        """Send raw stdout/stderr flush to the logger to be collated."""
499
500        self._event_loop.call_soon_threadsafe(
501            tpartial(self._file_flush_in_thread, name)
502        )
503
504    def _file_flush_in_thread(self, name: str) -> None:
505        try:
506            assert name in ('stdout', 'stderr')
507
508            # Immediately ship whatever chunks we've got.
509            if self._file_chunks[name]:
510                self._ship_file_chunks(name, cancel_ship_task=True)
511
512        except Exception:
513            import traceback
514
515            traceback.print_exc(file=self._echofile)
516
517    async def _ship_chunks_task(self, name: str) -> None:
518        # Note: it's important we sleep here for a moment. Otherwise,
519        # things like '^^^^^^^^^^^^' lines in stack traces, which come
520        # through as lots of individual '^' writes, tend to get broken
521        # into lots of tiny little lines by us.
522        await asyncio.sleep(0.01)
523        self._ship_file_chunks(name, cancel_ship_task=False)
524
525    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
526        # Note: Raw print input generally ends in a newline, but that is
527        # redundant when we break things into log entries and results in
528        # extra empty lines. So strip off a single trailing newline if
529        # one is present.
530        text = ''.join(self._file_chunks[name]).removesuffix('\n')
531
532        self._emit_entry(
533            LogEntry(
534                name=name, message=text, level=LogLevel.INFO, time=utc_now()
535            )
536        )
537        self._file_chunks[name] = []
538        ship_task = self._file_chunk_ship_task[name]
539        if cancel_ship_task and ship_task is not None:
540            ship_task.cancel()
541        self._file_chunk_ship_task[name] = None
542
543    def _emit_entry(self, entry: LogEntry) -> None:
544        assert current_thread() is self._thread
545
546        # Store to our cache.
547        if self._cache_size_limit > 0:
548            with self._cache_lock:
549                # Do a rough calc of how many bytes this entry consumes.
550                entry_size = sum(
551                    sys.getsizeof(x)
552                    for x in (
553                        entry,
554                        entry.name,
555                        entry.message,
556                        entry.level,
557                        entry.time,
558                    )
559                )
560                self._cache.append((entry_size, entry))
561                self._cache_size += entry_size
562
563                # Prune old until we are back at or under our limit.
564                while self._cache_size > self._cache_size_limit:
565                    popped = self._cache.popleft()
566                    self._cache_size -= popped[0]
567                    self._cache_index_offset += 1
568
569        # Pass to callbacks.
570        for call in self._callbacks:
571            self._run_callback_on_entry(call, entry)
572
573        # Dump to our structured log file.
574        # TODO: should set a timer for flushing; don't flush every line.
575        if self._file is not None:
576            entry_s = dataclass_to_json(entry)
577            assert '\n' not in entry_s  # Make sure its a single line.
578            print(entry_s, file=self._file, flush=True)
579
580    def _run_callback_on_entry(
581        self, callback: Callable[[LogEntry], None], entry: LogEntry
582    ) -> None:
583        """Run a callback and handle any errors."""
584        try:
585            callback(entry)
586        except Exception:
587            # Only print the first callback error to avoid insanity.
588            if not self._printed_callback_error:
589                import traceback
590
591                traceback.print_exc(file=self._echofile)
592                self._printed_callback_error = True
593
594
595class FileLogEcho:
596    """A file-like object for forwarding stdout/stderr to a LogHandler."""
597
598    def __init__(
599        self, original: TextIO, name: str, handler: LogHandler
600    ) -> None:
601        assert name in ('stdout', 'stderr')
602        self._original = original
603        self._name = name
604        self._handler = handler
605
606    def write(self, output: Any) -> None:
607        """Override standard write call."""
608        self._original.write(output)
609        self._handler.file_write(self._name, output)
610
611    def flush(self) -> None:
612        """Flush the file."""
613        self._original.flush()
614
615        # We also use this as a hint to ship whatever file chunks
616        # we've accumulated (we have to try and be smart about breaking
617        # our arbitrary file output into discrete entries).
618        self._handler.file_flush(self._name)
619
620    def isatty(self) -> bool:
621        """Are we a terminal?"""
622        return self._original.isatty()
623
624
625def setup_logging(
626    log_path: str | Path | None,
627    level: LogLevel,
628    suppress_non_root_debug: bool = False,
629    log_stdout_stderr: bool = False,
630    echo_to_stderr: bool = True,
631    cache_size_limit: int = 0,
632    cache_time_limit: datetime.timedelta | None = None,
633) -> LogHandler:
634    """Set up our logging environment.
635
636    Returns the custom handler which can be used to fetch information
637    about logs that have passed through it. (worst log-levels, caches, etc.).
638    """
639
640    lmap = {
641        LogLevel.DEBUG: logging.DEBUG,
642        LogLevel.INFO: logging.INFO,
643        LogLevel.WARNING: logging.WARNING,
644        LogLevel.ERROR: logging.ERROR,
645        LogLevel.CRITICAL: logging.CRITICAL,
646    }
647
648    # Wire logger output to go to a structured log file.
649    # Also echo it to stderr IF we're running in a terminal.
650    # UPDATE: Actually gonna always go to stderr. Is there a
651    # reason we shouldn't? This makes debugging possible if all
652    # we have is access to a non-interactive terminal or file dump.
653    # We could add a '--quiet' arg or whatnot to change this behavior.
654
655    # Note: by passing in the *original* stderr here before we
656    # (potentially) replace it, we ensure that our log echos
657    # won't themselves be intercepted and sent to the logger
658    # which would create an infinite loop.
659    loghandler = LogHandler(
660        log_path,
661        echofile=sys.stderr if echo_to_stderr else None,
662        suppress_non_root_debug=suppress_non_root_debug,
663        cache_size_limit=cache_size_limit,
664        cache_time_limit=cache_time_limit,
665    )
666
667    # Note: going ahead with force=True here so that we replace any
668    # existing logger. Though we warn if it looks like we are doing
669    # that so we can try to avoid creating the first one.
670    had_previous_handlers = bool(logging.root.handlers)
671    logging.basicConfig(
672        level=lmap[level],
673        # format='%(name)s: %(message)s',
674        # We dump *only* the message here. We pass various log record bits
675        # around and format things fancier where they end up.
676        format='%(message)s',
677        handlers=[loghandler],
678        force=True,
679    )
680    if had_previous_handlers:
681        logging.warning(
682            'setup_logging: Replacing existing handlers.'
683            ' Something may have logged before expected.'
684        )
685
686    # Optionally intercept Python's stdout/stderr output and generate
687    # log entries from it.
688    if log_stdout_stderr:
689        sys.stdout = FileLogEcho(  # type: ignore
690            sys.stdout, 'stdout', loghandler
691        )
692        sys.stderr = FileLogEcho(  # type: ignore
693            sys.stderr, 'stderr', loghandler
694        )
695
696    return loghandler
class LogLevel(enum.Enum):
29class LogLevel(Enum):
30    """Severity level for a log entry.
31
32    These enums have numeric values so they can be compared in severity.
33    Note that these values are not currently interchangeable with the
34    logging.ERROR, logging.DEBUG, etc. values.
35    """
36
37    DEBUG = 0
38    INFO = 1
39    WARNING = 2
40    ERROR = 3
41    CRITICAL = 4
42
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]
47
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

DEBUG = <LogLevel.DEBUG: 0>
INFO = <LogLevel.INFO: 1>
WARNING = <LogLevel.WARNING: 2>
ERROR = <LogLevel.ERROR: 3>
CRITICAL = <LogLevel.CRITICAL: 4>
python_logging_level: int
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]

Give the corresponding logging level.

@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Given a Python logging level, return a LogLevel.

Inherited Members
enum.Enum
name
value
LOG_LEVEL_LEVELNOS = {<LogLevel.DEBUG: 0>: 10, <LogLevel.INFO: 1>: 20, <LogLevel.WARNING: 2>: 30, <LogLevel.ERROR: 3>: 40, <LogLevel.CRITICAL: 4>: 50}
LEVELNO_LOG_LEVELS = {10: <LogLevel.DEBUG: 0>, 20: <LogLevel.INFO: 1>, 30: <LogLevel.WARNING: 2>, 40: <LogLevel.ERROR: 3>, 50: <LogLevel.CRITICAL: 4>}
LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {10: ('', ''), 20: ('', ''), 30: ('', ''), 40: ('', ''), 50: ('', '')}
@ioprepped
@dataclass
class LogEntry:
81@ioprepped
82@dataclass
83class LogEntry:
84    """Single logged message."""
85
86    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
87    message: Annotated[str, IOAttrs('m')]
88    level: Annotated[LogLevel, IOAttrs('l')]
89    time: Annotated[datetime.datetime, IOAttrs('t')]
90
91    # We support arbitrary string labels per log entry which can be
92    # incorporated into custom log processing. To populate this, our
93    # LogHandler class looks for a 'labels' dict passed in the optional
94    # 'extra' dict arg to standard Python log calls.
95    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
96        field(default_factory=dict)
97    )

Single logged message.

LogEntry( name: Annotated[str, <efro.dataclassio._base.IOAttrs object>], message: Annotated[str, <efro.dataclassio._base.IOAttrs object>], level: Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object>], time: Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object>], labels: Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object>] = <factory>)
name: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1193f7a10>]
message: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1193f42c0>]
level: Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object at 0x1193f6570>]
time: Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object at 0x1193f5490>]
labels: Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object at 0x1193f4860>]
@ioprepped
@dataclass
class LogArchive:
100@ioprepped
101@dataclass
102class LogArchive:
103    """Info and data for a log."""
104
105    # Total number of entries submitted to the log.
106    log_size: Annotated[int, IOAttrs('t')]
107
108    # Offset for the entries contained here.
109    # (10 means our first entry is the 10th in the log, etc.)
110    start_index: Annotated[int, IOAttrs('c')]
111
112    entries: Annotated[list[LogEntry], IOAttrs('e')]

Info and data for a log.

LogArchive( log_size: Annotated[int, <efro.dataclassio._base.IOAttrs object>], start_index: Annotated[int, <efro.dataclassio._base.IOAttrs object>], entries: Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object>])
log_size: Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1193f4b60>]
start_index: Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1193f5730>]
entries: Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object at 0x1193f57c0>]
class LogHandler(logging.Handler):
115class LogHandler(logging.Handler):
116    """Fancy-pants handler for logging output.
117
118    Writes logs to disk in structured json format and echoes them
119    to stdout/stderr with pretty colors.
120    """
121
122    _event_loop: asyncio.AbstractEventLoop
123
124    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
125    # Otherwise we can get infinite loops as those prints come back to us
126    # as new log entries.
127
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)
166
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
183        )
184
185    def _add_callback_in_thread(
186        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
187    ) -> None:
188        """Add a callback to be run for each LogEntry.
189
190        Note that this callback will always run in a background thread.
191        Passing True for feed_existing_logs will cause all cached logs
192        in the handler to be fed to the callback (still in the
193        background thread though).
194        """
195        assert current_thread() is self._thread
196        self._callbacks.append(call)
197
198        # Run all of our cached entries through the new callback if desired.
199        if feed_existing_logs and self._cache_size_limit > 0:
200            with self._cache_lock:
201                for _id, entry in self._cache:
202                    self._run_callback_on_entry(call, entry)
203
204    def _log_thread_main(self) -> None:
205        self._event_loop = asyncio.new_event_loop()
206
207        # In our background thread event loop we do a fair amount of
208        # slow synchronous stuff such as mucking with the log cache.
209        # Let's avoid getting tons of warnings about this in debug mode.
210        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
211
212        # NOTE: if we ever use default threadpool at all we should allow
213        # setting it for our loop.
214        asyncio.set_event_loop(self._event_loop)
215        self._thread_bootstrapped = True
216        try:
217            if self._cache_time_limit is not None:
218                _prunetask = self._event_loop.create_task(
219                    self._time_prune_cache()
220                )
221            self._event_loop.run_forever()
222        except BaseException:
223            # If this ever goes down we're in trouble; we won't be able
224            # to log about it though. Try to make some noise however we
225            # can.
226            print('LogHandler died!!!', file=sys.stderr)
227            import traceback
228
229            traceback.print_exc()
230            raise
231
232    async def _time_prune_cache(self) -> None:
233        assert self._cache_time_limit is not None
234        while bool(True):
235            await asyncio.sleep(61.27)
236            now = utc_now()
237            with self._cache_lock:
238                # Prune the oldest entry as long as there is a first one that
239                # is too old.
240                while (
241                    self._cache
242                    and (now - self._cache[0][1].time) >= self._cache_time_limit
243                ):
244                    popped = self._cache.popleft()
245                    self._cache_size -= popped[0]
246                    self._cache_index_offset += 1
247
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )
282
283    def _cache_slice(
284        self, start: int, end: int, step: int = 1
285    ) -> list[LogEntry]:
286        # Deque doesn't natively support slicing but we can do it manually.
287        # It sounds like rotating the deque and pulling from the beginning
288        # is the most efficient way to do this. The downside is the deque
289        # gets temporarily modified in the process so we need to make sure
290        # we're holding the lock.
291        assert self._cache_lock.locked()
292        cache = self._cache
293        cache.rotate(-start)
294        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
295        cache.rotate(start)
296        return slc
297
298    @classmethod
299    def _is_immutable_log_data(cls, data: Any) -> bool:
300        if isinstance(data, (str, bool, int, float, bytes)):
301            return True
302        if isinstance(data, tuple):
303            return all(cls._is_immutable_log_data(x) for x in data)
304        return False
305
306    def call_in_thread(self, call: Callable[[], Any]) -> None:
307        """Submit a call to be run in the logging background thread."""
308        self._event_loop.call_soon_threadsafe(call)
309
310    @override
311    def emit(self, record: logging.LogRecord) -> None:
312        # pylint: disable=too-many-branches
313        if __debug__:
314            starttime = time.monotonic()
315
316        # Called by logging to send us records.
317
318        # TODO - kill this.
319        if (
320            self._suppress_non_root_debug
321            and record.name != 'root'
322            and record.levelname == 'DEBUG'
323        ):
324            return
325
326        # Optimization: if our log args are all simple immutable values,
327        # we can just kick the whole thing over to our background thread to
328        # be formatted there at our leisure. If anything is mutable and
329        # thus could possibly change between now and then or if we want
330        # to do immediate file echoing then we need to bite the bullet
331        # and do that stuff here at the call site.
332        fast_path = self._echofile is None and self._is_immutable_log_data(
333            record.args
334        )
335
336        # Note: just assuming types are correct here, but they'll be
337        # checked properly when the resulting LogEntry gets exported.
338        labels: dict[str, str] | None = getattr(record, 'labels', None)
339        if labels is None:
340            labels = {}
341
342        if fast_path:
343            if __debug__:
344                formattime = echotime = time.monotonic()
345            self._event_loop.call_soon_threadsafe(
346                tpartial(
347                    self._emit_in_thread,
348                    record.name,
349                    record.levelno,
350                    record.created,
351                    record,
352                    labels,
353                )
354            )
355        else:
356            # Slow case; do formatting and echoing here at the log call
357            # site.
358            msg = self.format(record)
359
360            if __debug__:
361                formattime = time.monotonic()
362
363            # Also immediately print pretty colored output to our echo file
364            # (generally stderr). We do this part here instead of in our bg
365            # thread because the delay can throw off command line prompts or
366            # make tight debugging harder.
367            if self._echofile is not None:
368                ends = LEVELNO_COLOR_CODES.get(record.levelno)
369                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
370                if ends is not None:
371                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
372                else:
373                    self._echofile.write(f'{namepre}{msg}\n')
374                self._echofile.flush()
375
376            if __debug__:
377                echotime = time.monotonic()
378
379            self._event_loop.call_soon_threadsafe(
380                tpartial(
381                    self._emit_in_thread,
382                    record.name,
383                    record.levelno,
384                    record.created,
385                    msg,
386                    labels,
387                )
388            )
389
390        if __debug__:
391            # Make noise if we're taking a significant amount of time here.
392            # Limit the noise to once every so often though; otherwise we
393            # could get a feedback loop where every log emit results in a
394            # warning log which results in another, etc.
395            now = time.monotonic()
396            # noinspection PyUnboundLocalVariable
397            duration = now - starttime  # pyright: ignore
398            # noinspection PyUnboundLocalVariable
399            format_duration = formattime - starttime  # pyright: ignore
400            # noinspection PyUnboundLocalVariable
401            echo_duration = echotime - formattime  # pyright: ignore
402            if duration > 0.05 and (
403                self._last_slow_emit_warning_time is None
404                or now > self._last_slow_emit_warning_time + 10.0
405            ):
406                # Logging calls from *within* a logging handler
407                # sounds sketchy, so let's just kick this over to
408                # the bg event loop thread we've already got.
409                self._last_slow_emit_warning_time = now
410                self._event_loop.call_soon_threadsafe(
411                    tpartial(
412                        logging.warning,
413                        'efro.log.LogHandler emit took too long'
414                        ' (%.2fs total; %.2fs format, %.2fs echo,'
415                        ' fast_path=%s).',
416                        duration,
417                        format_duration,
418                        echo_duration,
419                        fast_path,
420                    )
421                )
422
423    def _emit_in_thread(
424        self,
425        name: str,
426        levelno: int,
427        created: float,
428        message: str | logging.LogRecord,
429        labels: dict[str, str],
430    ) -> None:
431        try:
432            # If they passed a raw record here, bake it down to a string.
433            if isinstance(message, logging.LogRecord):
434                message = self.format(message)
435
436            self._emit_entry(
437                LogEntry(
438                    name=name,
439                    message=message,
440                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
441                    time=datetime.datetime.fromtimestamp(
442                        created, datetime.timezone.utc
443                    ),
444                    labels=labels,
445                )
446            )
447        except Exception:
448            import traceback
449
450            traceback.print_exc(file=self._echofile)
451
452    def file_write(self, name: str, output: str) -> None:
453        """Send raw stdout/stderr output to the logger to be collated."""
454
455        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
456        # lines in stack traces get written as lots of individual '^'
457        # writes. It feels a bit dirty to be pushing a deferred call to
458        # another thread for each character. Perhaps should do some sort
459        # of basic accumulation here?
460        self._event_loop.call_soon_threadsafe(
461            tpartial(self._file_write_in_thread, name, output)
462        )
463
464    def _file_write_in_thread(self, name: str, output: str) -> None:
465        try:
466            assert name in ('stdout', 'stderr')
467
468            # Here we try to be somewhat smart about breaking arbitrary
469            # print output into discrete log entries.
470
471            self._file_chunks[name].append(output)
472
473            # Individual parts of a print come across as separate writes,
474            # and the end of a print will be a standalone '\n' by default.
475            # Let's use that as a hint that we're likely at the end of
476            # a full print statement and ship what we've got.
477            if output == '\n':
478                self._ship_file_chunks(name, cancel_ship_task=True)
479            else:
480                # By default just keep adding chunks.
481                # However we keep a timer running anytime we've got
482                # unshipped chunks so that we can ship what we've got
483                # after a short bit if we never get a newline.
484                ship_task = self._file_chunk_ship_task[name]
485                if ship_task is None:
486                    self._file_chunk_ship_task[name] = (
487                        self._event_loop.create_task(
488                            self._ship_chunks_task(name),
489                            name='log ship file chunks',
490                        )
491                    )
492
493        except Exception:
494            import traceback
495
496            traceback.print_exc(file=self._echofile)
497
498    def file_flush(self, name: str) -> None:
499        """Send raw stdout/stderr flush to the logger to be collated."""
500
501        self._event_loop.call_soon_threadsafe(
502            tpartial(self._file_flush_in_thread, name)
503        )
504
505    def _file_flush_in_thread(self, name: str) -> None:
506        try:
507            assert name in ('stdout', 'stderr')
508
509            # Immediately ship whatever chunks we've got.
510            if self._file_chunks[name]:
511                self._ship_file_chunks(name, cancel_ship_task=True)
512
513        except Exception:
514            import traceback
515
516            traceback.print_exc(file=self._echofile)
517
518    async def _ship_chunks_task(self, name: str) -> None:
519        # Note: it's important we sleep here for a moment. Otherwise,
520        # things like '^^^^^^^^^^^^' lines in stack traces, which come
521        # through as lots of individual '^' writes, tend to get broken
522        # into lots of tiny little lines by us.
523        await asyncio.sleep(0.01)
524        self._ship_file_chunks(name, cancel_ship_task=False)
525
526    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
527        # Note: Raw print input generally ends in a newline, but that is
528        # redundant when we break things into log entries and results in
529        # extra empty lines. So strip off a single trailing newline if
530        # one is present.
531        text = ''.join(self._file_chunks[name]).removesuffix('\n')
532
533        self._emit_entry(
534            LogEntry(
535                name=name, message=text, level=LogLevel.INFO, time=utc_now()
536            )
537        )
538        self._file_chunks[name] = []
539        ship_task = self._file_chunk_ship_task[name]
540        if cancel_ship_task and ship_task is not None:
541            ship_task.cancel()
542        self._file_chunk_ship_task[name] = None
543
544    def _emit_entry(self, entry: LogEntry) -> None:
545        assert current_thread() is self._thread
546
547        # Store to our cache.
548        if self._cache_size_limit > 0:
549            with self._cache_lock:
550                # Do a rough calc of how many bytes this entry consumes.
551                entry_size = sum(
552                    sys.getsizeof(x)
553                    for x in (
554                        entry,
555                        entry.name,
556                        entry.message,
557                        entry.level,
558                        entry.time,
559                    )
560                )
561                self._cache.append((entry_size, entry))
562                self._cache_size += entry_size
563
564                # Prune old until we are back at or under our limit.
565                while self._cache_size > self._cache_size_limit:
566                    popped = self._cache.popleft()
567                    self._cache_size -= popped[0]
568                    self._cache_index_offset += 1
569
570        # Pass to callbacks.
571        for call in self._callbacks:
572            self._run_callback_on_entry(call, entry)
573
574        # Dump to our structured log file.
575        # TODO: should set a timer for flushing; don't flush every line.
576        if self._file is not None:
577            entry_s = dataclass_to_json(entry)
578            assert '\n' not in entry_s  # Make sure its a single line.
579            print(entry_s, file=self._file, flush=True)
580
581    def _run_callback_on_entry(
582        self, callback: Callable[[LogEntry], None], entry: LogEntry
583    ) -> None:
584        """Run a callback and handle any errors."""
585        try:
586            callback(entry)
587        except Exception:
588            # Only print the first callback error to avoid insanity.
589            if not self._printed_callback_error:
590                import traceback
591
592                traceback.print_exc(file=self._echofile)
593                self._printed_callback_error = True

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

LogHandler( path: str | pathlib.Path | None, echofile: typing.TextIO | None, suppress_non_root_debug: bool, cache_size_limit: int, cache_time_limit: datetime.timedelta | None)
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def add_callback( self, call: Callable[[LogEntry], NoneType], feed_existing_logs: bool = False) -> None:
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
183        )

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

def get_cached( self, start_index: int = 0, max_entries: int | None = None) -> LogArchive:
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

def call_in_thread(self, call: Callable[[], Any]) -> None:
306    def call_in_thread(self, call: Callable[[], Any]) -> None:
307        """Submit a call to be run in the logging background thread."""
308        self._event_loop.call_soon_threadsafe(call)

Submit a call to be run in the logging background thread.

@override
def emit(self, record: logging.LogRecord) -> None:
310    @override
311    def emit(self, record: logging.LogRecord) -> None:
312        # pylint: disable=too-many-branches
313        if __debug__:
314            starttime = time.monotonic()
315
316        # Called by logging to send us records.
317
318        # TODO - kill this.
319        if (
320            self._suppress_non_root_debug
321            and record.name != 'root'
322            and record.levelname == 'DEBUG'
323        ):
324            return
325
326        # Optimization: if our log args are all simple immutable values,
327        # we can just kick the whole thing over to our background thread to
328        # be formatted there at our leisure. If anything is mutable and
329        # thus could possibly change between now and then or if we want
330        # to do immediate file echoing then we need to bite the bullet
331        # and do that stuff here at the call site.
332        fast_path = self._echofile is None and self._is_immutable_log_data(
333            record.args
334        )
335
336        # Note: just assuming types are correct here, but they'll be
337        # checked properly when the resulting LogEntry gets exported.
338        labels: dict[str, str] | None = getattr(record, 'labels', None)
339        if labels is None:
340            labels = {}
341
342        if fast_path:
343            if __debug__:
344                formattime = echotime = time.monotonic()
345            self._event_loop.call_soon_threadsafe(
346                tpartial(
347                    self._emit_in_thread,
348                    record.name,
349                    record.levelno,
350                    record.created,
351                    record,
352                    labels,
353                )
354            )
355        else:
356            # Slow case; do formatting and echoing here at the log call
357            # site.
358            msg = self.format(record)
359
360            if __debug__:
361                formattime = time.monotonic()
362
363            # Also immediately print pretty colored output to our echo file
364            # (generally stderr). We do this part here instead of in our bg
365            # thread because the delay can throw off command line prompts or
366            # make tight debugging harder.
367            if self._echofile is not None:
368                ends = LEVELNO_COLOR_CODES.get(record.levelno)
369                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
370                if ends is not None:
371                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
372                else:
373                    self._echofile.write(f'{namepre}{msg}\n')
374                self._echofile.flush()
375
376            if __debug__:
377                echotime = time.monotonic()
378
379            self._event_loop.call_soon_threadsafe(
380                tpartial(
381                    self._emit_in_thread,
382                    record.name,
383                    record.levelno,
384                    record.created,
385                    msg,
386                    labels,
387                )
388            )
389
390        if __debug__:
391            # Make noise if we're taking a significant amount of time here.
392            # Limit the noise to once every so often though; otherwise we
393            # could get a feedback loop where every log emit results in a
394            # warning log which results in another, etc.
395            now = time.monotonic()
396            # noinspection PyUnboundLocalVariable
397            duration = now - starttime  # pyright: ignore
398            # noinspection PyUnboundLocalVariable
399            format_duration = formattime - starttime  # pyright: ignore
400            # noinspection PyUnboundLocalVariable
401            echo_duration = echotime - formattime  # pyright: ignore
402            if duration > 0.05 and (
403                self._last_slow_emit_warning_time is None
404                or now > self._last_slow_emit_warning_time + 10.0
405            ):
406                # Logging calls from *within* a logging handler
407                # sounds sketchy, so let's just kick this over to
408                # the bg event loop thread we've already got.
409                self._last_slow_emit_warning_time = now
410                self._event_loop.call_soon_threadsafe(
411                    tpartial(
412                        logging.warning,
413                        'efro.log.LogHandler emit took too long'
414                        ' (%.2fs total; %.2fs format, %.2fs echo,'
415                        ' fast_path=%s).',
416                        duration,
417                        format_duration,
418                        echo_duration,
419                        fast_path,
420                    )
421                )

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

def file_write(self, name: str, output: str) -> None:
452    def file_write(self, name: str, output: str) -> None:
453        """Send raw stdout/stderr output to the logger to be collated."""
454
455        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
456        # lines in stack traces get written as lots of individual '^'
457        # writes. It feels a bit dirty to be pushing a deferred call to
458        # another thread for each character. Perhaps should do some sort
459        # of basic accumulation here?
460        self._event_loop.call_soon_threadsafe(
461            tpartial(self._file_write_in_thread, name, output)
462        )

Send raw stdout/stderr output to the logger to be collated.

def file_flush(self, name: str) -> None:
498    def file_flush(self, name: str) -> None:
499        """Send raw stdout/stderr flush to the logger to be collated."""
500
501        self._event_loop.call_soon_threadsafe(
502            tpartial(self._file_flush_in_thread, name)
503        )

Send raw stdout/stderr flush to the logger to be collated.

Inherited Members
logging.Handler
level
formatter
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
close
handleError
logging.Filterer
filters
addFilter
removeFilter
filter
class FileLogEcho:
596class FileLogEcho:
597    """A file-like object for forwarding stdout/stderr to a LogHandler."""
598
599    def __init__(
600        self, original: TextIO, name: str, handler: LogHandler
601    ) -> None:
602        assert name in ('stdout', 'stderr')
603        self._original = original
604        self._name = name
605        self._handler = handler
606
607    def write(self, output: Any) -> None:
608        """Override standard write call."""
609        self._original.write(output)
610        self._handler.file_write(self._name, output)
611
612    def flush(self) -> None:
613        """Flush the file."""
614        self._original.flush()
615
616        # We also use this as a hint to ship whatever file chunks
617        # we've accumulated (we have to try and be smart about breaking
618        # our arbitrary file output into discrete entries).
619        self._handler.file_flush(self._name)
620
621    def isatty(self) -> bool:
622        """Are we a terminal?"""
623        return self._original.isatty()

A file-like object for forwarding stdout/stderr to a LogHandler.

FileLogEcho(original: <class 'TextIO'>, name: str, handler: LogHandler)
599    def __init__(
600        self, original: TextIO, name: str, handler: LogHandler
601    ) -> None:
602        assert name in ('stdout', 'stderr')
603        self._original = original
604        self._name = name
605        self._handler = handler
def write(self, output: Any) -> None:
607    def write(self, output: Any) -> None:
608        """Override standard write call."""
609        self._original.write(output)
610        self._handler.file_write(self._name, output)

Override standard write call.

def flush(self) -> None:
612    def flush(self) -> None:
613        """Flush the file."""
614        self._original.flush()
615
616        # We also use this as a hint to ship whatever file chunks
617        # we've accumulated (we have to try and be smart about breaking
618        # our arbitrary file output into discrete entries).
619        self._handler.file_flush(self._name)

Flush the file.

def isatty(self) -> bool:
621    def isatty(self) -> bool:
622        """Are we a terminal?"""
623        return self._original.isatty()

Are we a terminal?

def setup_logging( log_path: str | pathlib.Path | None, level: LogLevel, suppress_non_root_debug: bool = False, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None) -> LogHandler:
626def setup_logging(
627    log_path: str | Path | None,
628    level: LogLevel,
629    suppress_non_root_debug: bool = False,
630    log_stdout_stderr: bool = False,
631    echo_to_stderr: bool = True,
632    cache_size_limit: int = 0,
633    cache_time_limit: datetime.timedelta | None = None,
634) -> LogHandler:
635    """Set up our logging environment.
636
637    Returns the custom handler which can be used to fetch information
638    about logs that have passed through it. (worst log-levels, caches, etc.).
639    """
640
641    lmap = {
642        LogLevel.DEBUG: logging.DEBUG,
643        LogLevel.INFO: logging.INFO,
644        LogLevel.WARNING: logging.WARNING,
645        LogLevel.ERROR: logging.ERROR,
646        LogLevel.CRITICAL: logging.CRITICAL,
647    }
648
649    # Wire logger output to go to a structured log file.
650    # Also echo it to stderr IF we're running in a terminal.
651    # UPDATE: Actually gonna always go to stderr. Is there a
652    # reason we shouldn't? This makes debugging possible if all
653    # we have is access to a non-interactive terminal or file dump.
654    # We could add a '--quiet' arg or whatnot to change this behavior.
655
656    # Note: by passing in the *original* stderr here before we
657    # (potentially) replace it, we ensure that our log echos
658    # won't themselves be intercepted and sent to the logger
659    # which would create an infinite loop.
660    loghandler = LogHandler(
661        log_path,
662        echofile=sys.stderr if echo_to_stderr else None,
663        suppress_non_root_debug=suppress_non_root_debug,
664        cache_size_limit=cache_size_limit,
665        cache_time_limit=cache_time_limit,
666    )
667
668    # Note: going ahead with force=True here so that we replace any
669    # existing logger. Though we warn if it looks like we are doing
670    # that so we can try to avoid creating the first one.
671    had_previous_handlers = bool(logging.root.handlers)
672    logging.basicConfig(
673        level=lmap[level],
674        # format='%(name)s: %(message)s',
675        # We dump *only* the message here. We pass various log record bits
676        # around and format things fancier where they end up.
677        format='%(message)s',
678        handlers=[loghandler],
679        force=True,
680    )
681    if had_previous_handlers:
682        logging.warning(
683            'setup_logging: Replacing existing handlers.'
684            ' Something may have logged before expected.'
685        )
686
687    # Optionally intercept Python's stdout/stderr output and generate
688    # log entries from it.
689    if log_stdout_stderr:
690        sys.stdout = FileLogEcho(  # type: ignore
691            sys.stdout, 'stdout', loghandler
692        )
693        sys.stderr = FileLogEcho(  # type: ignore
694            sys.stderr, 'stderr', loghandler
695        )
696
697    return loghandler

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).