efro.log

Logging functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Logging functionality."""
  4from __future__ import annotations
  5
  6import sys
  7import time
  8import asyncio
  9import logging
 10import datetime
 11import itertools
 12from enum import Enum
 13from collections import deque
 14from dataclasses import dataclass, field
 15from typing import TYPE_CHECKING, Annotated
 16from threading import Thread, current_thread, Lock
 17
 18from efro.util import utc_now
 19from efro.call import tpartial
 20from efro.terminal import Clr
 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
 22
 23if TYPE_CHECKING:
 24    from pathlib import Path
 25    from typing import Any, Callable, TextIO
 26
 27
 28class LogLevel(Enum):
 29    """Severity level for a log entry.
 30
 31    These enums have numeric values so they can be compared in severity.
 32    Note that these values are not currently interchangeable with the
 33    logging.ERROR, logging.DEBUG, etc. values.
 34    """
 35
 36    DEBUG = 0
 37    INFO = 1
 38    WARNING = 2
 39    ERROR = 3
 40    CRITICAL = 4
 41
 42    @property
 43    def python_logging_level(self) -> int:
 44        """Give the corresponding logging level."""
 45        return LOG_LEVEL_LEVELNOS[self]
 46
 47    @classmethod
 48    def from_python_logging_level(cls, levelno: int) -> LogLevel:
 49        """Given a Python logging level, return a LogLevel."""
 50        return LEVELNO_LOG_LEVELS[levelno]
 51
 52
 53# Python logging levels from LogLevels
 54LOG_LEVEL_LEVELNOS = {
 55    LogLevel.DEBUG: logging.DEBUG,
 56    LogLevel.INFO: logging.INFO,
 57    LogLevel.WARNING: logging.WARNING,
 58    LogLevel.ERROR: logging.ERROR,
 59    LogLevel.CRITICAL: logging.CRITICAL,
 60}
 61
 62# LogLevels from Python logging levels
 63LEVELNO_LOG_LEVELS = {
 64    logging.DEBUG: LogLevel.DEBUG,
 65    logging.INFO: LogLevel.INFO,
 66    logging.WARNING: LogLevel.WARNING,
 67    logging.ERROR: LogLevel.ERROR,
 68    logging.CRITICAL: LogLevel.CRITICAL,
 69}
 70
 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {
 72    logging.DEBUG: (Clr.CYN, Clr.RST),
 73    logging.INFO: ('', ''),
 74    logging.WARNING: (Clr.YLW, Clr.RST),
 75    logging.ERROR: (Clr.RED, Clr.RST),
 76    logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST),
 77}
 78
 79
 80@ioprepped
 81@dataclass
 82class LogEntry:
 83    """Single logged message."""
 84
 85    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
 86    message: Annotated[str, IOAttrs('m')]
 87    level: Annotated[LogLevel, IOAttrs('l')]
 88    time: Annotated[datetime.datetime, IOAttrs('t')]
 89
 90    # We support arbitrary string labels per log entry which can be
 91    # incorporated into custom log processing. To populate this, our
 92    # LogHandler class looks for a 'labels' dict passed in the optional
 93    # 'extra' dict arg to standard Python log calls.
 94    labels: Annotated[
 95        dict[str, str], IOAttrs('la', store_default=False)
 96    ] = field(default_factory=dict)
 97
 98
 99@ioprepped
100@dataclass
101class LogArchive:
102    """Info and data for a log."""
103
104    # Total number of entries submitted to the log.
105    log_size: Annotated[int, IOAttrs('t')]
106
107    # Offset for the entries contained here.
108    # (10 means our first entry is the 10th in the log, etc.)
109    start_index: Annotated[int, IOAttrs('c')]
110
111    entries: Annotated[list[LogEntry], IOAttrs('e')]
112
113
114class LogHandler(logging.Handler):
115    """Fancy-pants handler for logging output.
116
117    Writes logs to disk in structured json format and echoes them
118    to stdout/stderr with pretty colors.
119    """
120
121    _event_loop: asyncio.AbstractEventLoop
122
123    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
124    # Otherwise we can get infinite loops as those prints come back to us
125    # as new log entries.
126
127    def __init__(
128        self,
129        path: str | Path | None,
130        echofile: TextIO | None,
131        suppress_non_root_debug: bool,
132        cache_size_limit: int,
133        cache_time_limit: datetime.timedelta | None,
134    ):
135        super().__init__()
136        # pylint: disable=consider-using-with
137        self._file = None if path is None else open(path, 'w', encoding='utf-8')
138        self._echofile = echofile
139        self._callbacks: list[Callable[[LogEntry], None]] = []
140        self._suppress_non_root_debug = suppress_non_root_debug
141        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
142        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
143            'stdout': None,
144            'stderr': None,
145        }
146        self._cache_size = 0
147        assert cache_size_limit >= 0
148        self._cache_size_limit = cache_size_limit
149        self._cache_time_limit = cache_time_limit
150        self._cache = deque[tuple[int, LogEntry]]()
151        self._cache_index_offset = 0
152        self._cache_lock = Lock()
153        self._printed_callback_error = False
154        self._thread_bootstrapped = False
155        self._thread = Thread(target=self._log_thread_main, daemon=True)
156        if __debug__:
157            self._last_slow_emit_warning_time: float | None = None
158        self._thread.start()
159
160        # Spin until our thread is up and running; otherwise we could
161        # wind up trying to push stuff to our event loop before the
162        # loop exists.
163        while not self._thread_bootstrapped:
164            time.sleep(0.001)
165
166    def add_callback(
167        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
168    ) -> None:
169        """Add a callback to be run for each LogEntry.
170
171        Note that this callback will always run in a background thread.
172        Passing True for feed_existing_logs will cause all cached logs
173        in the handler to be fed to the callback (still in the
174        background thread though).
175        """
176
177        # Kick this over to our bg thread to add the callback and
178        # process cached entries at the same time to ensure there are no
179        # race conditions that could cause entries to be skipped/etc.
180        self._event_loop.call_soon_threadsafe(
181            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
182        )
183
184    def _add_callback_in_thread(
185        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
186    ) -> None:
187        """Add a callback to be run for each LogEntry.
188
189        Note that this callback will always run in a background thread.
190        Passing True for feed_existing_logs will cause all cached logs
191        in the handler to be fed to the callback (still in the
192        background thread though).
193        """
194        assert current_thread() is self._thread
195        self._callbacks.append(call)
196
197        # Run all of our cached entries through the new callback if desired.
198        if feed_existing_logs and self._cache_size_limit > 0:
199            with self._cache_lock:
200                for _id, entry in self._cache:
201                    self._run_callback_on_entry(call, entry)
202
203    def _log_thread_main(self) -> None:
204        self._event_loop = asyncio.new_event_loop()
205
206        # In our background thread event loop we do a fair amount of
207        # slow synchronous stuff such as mucking with the log cache.
208        # Let's avoid getting tons of warnings about this in debug mode.
209        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
210
211        # NOTE: if we ever use default threadpool at all we should allow
212        # setting it for our loop.
213        asyncio.set_event_loop(self._event_loop)
214        self._thread_bootstrapped = True
215        try:
216            if self._cache_time_limit is not None:
217                _prunetask = self._event_loop.create_task(
218                    self._time_prune_cache()
219                )
220            self._event_loop.run_forever()
221        except BaseException:
222            # If this ever goes down we're in trouble; we won't be able
223            # to log about it though. Try to make some noise however we
224            # can.
225            print('LogHandler died!!!', file=sys.stderr)
226            import traceback
227
228            traceback.print_exc()
229            raise
230
231    async def _time_prune_cache(self) -> None:
232        assert self._cache_time_limit is not None
233        while bool(True):
234            await asyncio.sleep(61.27)
235            now = utc_now()
236            with self._cache_lock:
237                # Prune the oldest entry as long as there is a first one that
238                # is too old.
239                while (
240                    self._cache
241                    and (now - self._cache[0][1].time) >= self._cache_time_limit
242                ):
243                    popped = self._cache.popleft()
244                    self._cache_size -= popped[0]
245                    self._cache_index_offset += 1
246
247    def get_cached(
248        self, start_index: int = 0, max_entries: int | None = None
249    ) -> LogArchive:
250        """Build and return an archive of cached log entries.
251
252        This will only include entries that have been processed by the
253        background thread, so may not include just-submitted logs or
254        entries for partially written stdout/stderr lines.
255        Entries from the range [start_index:start_index+max_entries]
256        which are still present in the cache will be returned.
257        """
258
259        assert start_index >= 0
260        if max_entries is not None:
261            assert max_entries >= 0
262        with self._cache_lock:
263            # Transform start_index to our present cache space.
264            start_index -= self._cache_index_offset
265            # Calc end-index in our present cache space.
266            end_index = (
267                len(self._cache)
268                if max_entries is None
269                else start_index + max_entries
270            )
271
272            # Clamp both indexes to both ends of our present space.
273            start_index = max(0, min(start_index, len(self._cache)))
274            end_index = max(0, min(end_index, len(self._cache)))
275
276            return LogArchive(
277                log_size=self._cache_index_offset + len(self._cache),
278                start_index=start_index + self._cache_index_offset,
279                entries=self._cache_slice(start_index, end_index),
280            )
281
282    def _cache_slice(
283        self, start: int, end: int, step: int = 1
284    ) -> list[LogEntry]:
285        # Deque doesn't natively support slicing but we can do it manually.
286        # It sounds like rotating the deque and pulling from the beginning
287        # is the most efficient way to do this. The downside is the deque
288        # gets temporarily modified in the process so we need to make sure
289        # we're holding the lock.
290        assert self._cache_lock.locked()
291        cache = self._cache
292        cache.rotate(-start)
293        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
294        cache.rotate(start)
295        return slc
296
297    @classmethod
298    def _is_immutable_log_data(cls, data: Any) -> bool:
299        if isinstance(data, (str, bool, int, float, bytes)):
300            return True
301        if isinstance(data, tuple):
302            return all(cls._is_immutable_log_data(x) for x in data)
303        return False
304
305    def emit(self, record: logging.LogRecord) -> None:
306        # pylint: disable=too-many-branches
307        if __debug__:
308            starttime = time.monotonic()
309
310        # Called by logging to send us records.
311
312        # TODO - kill this.
313        if (
314            self._suppress_non_root_debug
315            and record.name != 'root'
316            and record.levelname == 'DEBUG'
317        ):
318            return
319
320        # Optimization: if our log args are all simple immutable values,
321        # we can just kick the whole thing over to our background thread to
322        # be formatted there at our leisure. If anything is mutable and
323        # thus could possibly change between now and then or if we want
324        # to do immediate file echoing then we need to bite the bullet
325        # and do that stuff here at the call site.
326        fast_path = self._echofile is None and self._is_immutable_log_data(
327            record.args
328        )
329
330        # Note: just assuming types are correct here, but they'll be
331        # checked properly when the resulting LogEntry gets exported.
332        labels: dict[str, str] | None = getattr(record, 'labels', None)
333        if labels is None:
334            labels = {}
335
336        if fast_path:
337            if __debug__:
338                formattime = echotime = time.monotonic()
339            self._event_loop.call_soon_threadsafe(
340                tpartial(
341                    self._emit_in_thread,
342                    record.name,
343                    record.levelno,
344                    record.created,
345                    record,
346                    labels,
347                )
348            )
349        else:
350            # Slow case; do formatting and echoing here at the log call
351            # site.
352            msg = self.format(record)
353
354            if __debug__:
355                formattime = time.monotonic()
356
357            # Also immediately print pretty colored output to our echo file
358            # (generally stderr). We do this part here instead of in our bg
359            # thread because the delay can throw off command line prompts or
360            # make tight debugging harder.
361            if self._echofile is not None:
362                ends = LEVELNO_COLOR_CODES.get(record.levelno)
363                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
364                if ends is not None:
365                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
366                else:
367                    self._echofile.write(f'{namepre}{msg}\n')
368                self._echofile.flush()
369
370            if __debug__:
371                echotime = time.monotonic()
372
373            self._event_loop.call_soon_threadsafe(
374                tpartial(
375                    self._emit_in_thread,
376                    record.name,
377                    record.levelno,
378                    record.created,
379                    msg,
380                    labels,
381                )
382            )
383
384        if __debug__:
385            # Make noise if we're taking a significant amount of time here.
386            # Limit the noise to once every so often though; otherwise we
387            # could get a feedback loop where every log emit results in a
388            # warning log which results in another, etc.
389            now = time.monotonic()
390            # noinspection PyUnboundLocalVariable
391            duration = now - starttime  # pyright: ignore
392            # noinspection PyUnboundLocalVariable
393            format_duration = formattime - starttime  # pyright: ignore
394            # noinspection PyUnboundLocalVariable
395            echo_duration = echotime - formattime  # pyright: ignore
396            if duration > 0.05 and (
397                self._last_slow_emit_warning_time is None
398                or now > self._last_slow_emit_warning_time + 10.0
399            ):
400                # Logging calls from *within* a logging handler
401                # sounds sketchy, so let's just kick this over to
402                # the bg event loop thread we've already got.
403                self._last_slow_emit_warning_time = now
404                self._event_loop.call_soon_threadsafe(
405                    tpartial(
406                        logging.warning,
407                        'efro.log.LogHandler emit took too long'
408                        ' (%.2fs total; %.2fs format, %.2fs echo,'
409                        ' fast_path=%s).',
410                        duration,
411                        format_duration,
412                        echo_duration,
413                        fast_path,
414                    )
415                )
416
417    def _emit_in_thread(
418        self,
419        name: str,
420        levelno: int,
421        created: float,
422        message: str | logging.LogRecord,
423        labels: dict[str, str],
424    ) -> None:
425        try:
426            # If they passed a raw record here, bake it down to a string.
427            if isinstance(message, logging.LogRecord):
428                message = self.format(message)
429
430            self._emit_entry(
431                LogEntry(
432                    name=name,
433                    message=message,
434                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
435                    time=datetime.datetime.fromtimestamp(
436                        created, datetime.timezone.utc
437                    ),
438                    labels=labels,
439                )
440            )
441        except Exception:
442            import traceback
443
444            traceback.print_exc(file=self._echofile)
445
446    def file_write(self, name: str, output: str) -> None:
447        """Send raw stdout/stderr output to the logger to be collated."""
448
449        self._event_loop.call_soon_threadsafe(
450            tpartial(self._file_write_in_thread, name, output)
451        )
452
453    def _file_write_in_thread(self, name: str, output: str) -> None:
454        try:
455            assert name in ('stdout', 'stderr')
456
457            # Here we try to be somewhat smart about breaking arbitrary
458            # print output into discrete log entries.
459
460            self._file_chunks[name].append(output)
461
462            # Individual parts of a print come across as separate writes,
463            # and the end of a print will be a standalone '\n' by default.
464            # Let's use that as a hint that we're likely at the end of
465            # a full print statement and ship what we've got.
466            if output == '\n':
467                self._ship_file_chunks(name, cancel_ship_task=True)
468            else:
469                # By default just keep adding chunks.
470                # However we keep a timer running anytime we've got
471                # unshipped chunks so that we can ship what we've got
472                # after a short bit if we never get a newline.
473                ship_task = self._file_chunk_ship_task[name]
474                if ship_task is None:
475                    self._file_chunk_ship_task[
476                        name
477                    ] = self._event_loop.create_task(
478                        self._ship_chunks_task(name),
479                        name='log ship file chunks',
480                    )
481
482        except Exception:
483            import traceback
484
485            traceback.print_exc(file=self._echofile)
486
487    def file_flush(self, name: str) -> None:
488        """Send raw stdout/stderr flush to the logger to be collated."""
489
490        self._event_loop.call_soon_threadsafe(
491            tpartial(self._file_flush_in_thread, name)
492        )
493
494    def _file_flush_in_thread(self, name: str) -> None:
495        try:
496            assert name in ('stdout', 'stderr')
497
498            # Immediately ship whatever chunks we've got.
499            if self._file_chunks[name]:
500                self._ship_file_chunks(name, cancel_ship_task=True)
501
502        except Exception:
503            import traceback
504
505            traceback.print_exc(file=self._echofile)
506
507    async def _ship_chunks_task(self, name: str) -> None:
508        self._ship_file_chunks(name, cancel_ship_task=False)
509
510    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
511        # Note: Raw print input generally ends in a newline, but that is
512        # redundant when we break things into log entries and results
513        # in extra empty lines. So strip off a single trailing newline.
514        text = ''.join(self._file_chunks[name]).removesuffix('\n')
515
516        self._emit_entry(
517            LogEntry(
518                name=name, message=text, level=LogLevel.INFO, time=utc_now()
519            )
520        )
521        self._file_chunks[name] = []
522        ship_task = self._file_chunk_ship_task[name]
523        if cancel_ship_task and ship_task is not None:
524            ship_task.cancel()
525        self._file_chunk_ship_task[name] = None
526
527    def _emit_entry(self, entry: LogEntry) -> None:
528        assert current_thread() is self._thread
529
530        # Store to our cache.
531        if self._cache_size_limit > 0:
532            with self._cache_lock:
533                # Do a rough calc of how many bytes this entry consumes.
534                entry_size = sum(
535                    sys.getsizeof(x)
536                    for x in (
537                        entry,
538                        entry.name,
539                        entry.message,
540                        entry.level,
541                        entry.time,
542                    )
543                )
544                self._cache.append((entry_size, entry))
545                self._cache_size += entry_size
546
547                # Prune old until we are back at or under our limit.
548                while self._cache_size > self._cache_size_limit:
549                    popped = self._cache.popleft()
550                    self._cache_size -= popped[0]
551                    self._cache_index_offset += 1
552
553        # Pass to callbacks.
554        for call in self._callbacks:
555            self._run_callback_on_entry(call, entry)
556
557        # Dump to our structured log file.
558        # TODO: should set a timer for flushing; don't flush every line.
559        if self._file is not None:
560            entry_s = dataclass_to_json(entry)
561            assert '\n' not in entry_s  # Make sure its a single line.
562            print(entry_s, file=self._file, flush=True)
563
564    def _run_callback_on_entry(
565        self, callback: Callable[[LogEntry], None], entry: LogEntry
566    ) -> None:
567        """Run a callback and handle any errors."""
568        try:
569            callback(entry)
570        except Exception:
571            # Only print the first callback error to avoid insanity.
572            if not self._printed_callback_error:
573                import traceback
574
575                traceback.print_exc(file=self._echofile)
576                self._printed_callback_error = True
577
578
579class FileLogEcho:
580    """A file-like object for forwarding stdout/stderr to a LogHandler."""
581
582    def __init__(
583        self, original: TextIO, name: str, handler: LogHandler
584    ) -> None:
585        assert name in ('stdout', 'stderr')
586        self._original = original
587        self._name = name
588        self._handler = handler
589
590    def write(self, output: Any) -> None:
591        """Override standard write call."""
592        self._original.write(output)
593        self._handler.file_write(self._name, output)
594
595    def flush(self) -> None:
596        """Flush the file."""
597        self._original.flush()
598
599        # We also use this as a hint to ship whatever file chunks
600        # we've accumulated (we have to try and be smart about breaking
601        # our arbitrary file output into discrete entries).
602        self._handler.file_flush(self._name)
603
604    def isatty(self) -> bool:
605        """Are we a terminal?"""
606        return self._original.isatty()
607
608
609def setup_logging(
610    log_path: str | Path | None,
611    level: LogLevel,
612    suppress_non_root_debug: bool = False,
613    log_stdout_stderr: bool = False,
614    echo_to_stderr: bool = True,
615    cache_size_limit: int = 0,
616    cache_time_limit: datetime.timedelta | None = None,
617) -> LogHandler:
618    """Set up our logging environment.
619
620    Returns the custom handler which can be used to fetch information
621    about logs that have passed through it. (worst log-levels, caches, etc.).
622    """
623
624    lmap = {
625        LogLevel.DEBUG: logging.DEBUG,
626        LogLevel.INFO: logging.INFO,
627        LogLevel.WARNING: logging.WARNING,
628        LogLevel.ERROR: logging.ERROR,
629        LogLevel.CRITICAL: logging.CRITICAL,
630    }
631
632    # Wire logger output to go to a structured log file.
633    # Also echo it to stderr IF we're running in a terminal.
634    # UPDATE: Actually gonna always go to stderr. Is there a
635    # reason we shouldn't? This makes debugging possible if all
636    # we have is access to a non-interactive terminal or file dump.
637    # We could add a '--quiet' arg or whatnot to change this behavior.
638
639    # Note: by passing in the *original* stderr here before we
640    # (potentially) replace it, we ensure that our log echos
641    # won't themselves be intercepted and sent to the logger
642    # which would create an infinite loop.
643    loghandler = LogHandler(
644        log_path,
645        echofile=sys.stderr if echo_to_stderr else None,
646        suppress_non_root_debug=suppress_non_root_debug,
647        cache_size_limit=cache_size_limit,
648        cache_time_limit=cache_time_limit,
649    )
650
651    # Note: going ahead with force=True here so that we replace any
652    # existing logger. Though we warn if it looks like we are doing
653    # that so we can try to avoid creating the first one.
654    had_previous_handlers = bool(logging.root.handlers)
655    logging.basicConfig(
656        level=lmap[level],
657        # format='%(name)s: %(message)s',
658        # We dump *only* the message here. We pass various log record bits
659        # around and format things fancier where they end up.
660        format='%(message)s',
661        handlers=[loghandler],
662        force=True,
663    )
664    if had_previous_handlers:
665        logging.warning(
666            'setup_logging: Replacing existing handlers.'
667            ' Something may have logged before expected.'
668        )
669
670    # Optionally intercept Python's stdout/stderr output and generate
671    # log entries from it.
672    if log_stdout_stderr:
673        sys.stdout = FileLogEcho(  # type: ignore
674            sys.stdout, 'stdout', loghandler
675        )
676        sys.stderr = FileLogEcho(  # type: ignore
677            sys.stderr, 'stderr', loghandler
678        )
679
680    return loghandler
class LogLevel(enum.Enum):
29class LogLevel(Enum):
30    """Severity level for a log entry.
31
32    These enums have numeric values so they can be compared in severity.
33    Note that these values are not currently interchangeable with the
34    logging.ERROR, logging.DEBUG, etc. values.
35    """
36
37    DEBUG = 0
38    INFO = 1
39    WARNING = 2
40    ERROR = 3
41    CRITICAL = 4
42
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]
47
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

DEBUG = <LogLevel.DEBUG: 0>
INFO = <LogLevel.INFO: 1>
WARNING = <LogLevel.WARNING: 2>
ERROR = <LogLevel.ERROR: 3>
CRITICAL = <LogLevel.CRITICAL: 4>
python_logging_level: int

Give the corresponding logging level.

@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Given a Python logging level, return a LogLevel.

Inherited Members
enum.Enum
name
value
LOG_LEVEL_LEVELNOS = {<LogLevel.DEBUG: 0>: 10, <LogLevel.INFO: 1>: 20, <LogLevel.WARNING: 2>: 30, <LogLevel.ERROR: 3>: 40, <LogLevel.CRITICAL: 4>: 50}
LEVELNO_LOG_LEVELS = {10: <LogLevel.DEBUG: 0>, 20: <LogLevel.INFO: 1>, 30: <LogLevel.WARNING: 2>, 40: <LogLevel.ERROR: 3>, 50: <LogLevel.CRITICAL: 4>}
LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {10: ('', ''), 20: ('', ''), 30: ('', ''), 40: ('', ''), 50: ('', '')}
@ioprepped
@dataclass
class LogEntry:
81@ioprepped
82@dataclass
83class LogEntry:
84    """Single logged message."""
85
86    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
87    message: Annotated[str, IOAttrs('m')]
88    level: Annotated[LogLevel, IOAttrs('l')]
89    time: Annotated[datetime.datetime, IOAttrs('t')]
90
91    # We support arbitrary string labels per log entry which can be
92    # incorporated into custom log processing. To populate this, our
93    # LogHandler class looks for a 'labels' dict passed in the optional
94    # 'extra' dict arg to standard Python log calls.
95    labels: Annotated[
96        dict[str, str], IOAttrs('la', store_default=False)
97    ] = field(default_factory=dict)

Single logged message.

LogEntry( name: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x116180990>], message: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x116180cd0>], level: typing.Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object at 0x116180390>], time: typing.Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object at 0x1161802d0>], labels: typing.Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object at 0x116180e50>] = <factory>)
name: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x11615bd90>]
message: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x115f2e590>]
level: typing.Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object at 0x1137b64d0>]
time: typing.Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object at 0x11527a490>]
labels: typing.Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object at 0x11161a390>]
@ioprepped
@dataclass
class LogArchive:
100@ioprepped
101@dataclass
102class LogArchive:
103    """Info and data for a log."""
104
105    # Total number of entries submitted to the log.
106    log_size: Annotated[int, IOAttrs('t')]
107
108    # Offset for the entries contained here.
109    # (10 means our first entry is the 10th in the log, etc.)
110    start_index: Annotated[int, IOAttrs('c')]
111
112    entries: Annotated[list[LogEntry], IOAttrs('e')]

Info and data for a log.

LogArchive( log_size: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1161aa010>], start_index: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1161a9e90>], entries: typing.Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object at 0x1161aa450>])
log_size: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x115812cd0>]
start_index: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1149c9550>]
entries: typing.Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object at 0x111cf8f10>]
class LogHandler(logging.Handler):
115class LogHandler(logging.Handler):
116    """Fancy-pants handler for logging output.
117
118    Writes logs to disk in structured json format and echoes them
119    to stdout/stderr with pretty colors.
120    """
121
122    _event_loop: asyncio.AbstractEventLoop
123
124    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
125    # Otherwise we can get infinite loops as those prints come back to us
126    # as new log entries.
127
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)
166
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
183        )
184
185    def _add_callback_in_thread(
186        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
187    ) -> None:
188        """Add a callback to be run for each LogEntry.
189
190        Note that this callback will always run in a background thread.
191        Passing True for feed_existing_logs will cause all cached logs
192        in the handler to be fed to the callback (still in the
193        background thread though).
194        """
195        assert current_thread() is self._thread
196        self._callbacks.append(call)
197
198        # Run all of our cached entries through the new callback if desired.
199        if feed_existing_logs and self._cache_size_limit > 0:
200            with self._cache_lock:
201                for _id, entry in self._cache:
202                    self._run_callback_on_entry(call, entry)
203
204    def _log_thread_main(self) -> None:
205        self._event_loop = asyncio.new_event_loop()
206
207        # In our background thread event loop we do a fair amount of
208        # slow synchronous stuff such as mucking with the log cache.
209        # Let's avoid getting tons of warnings about this in debug mode.
210        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
211
212        # NOTE: if we ever use default threadpool at all we should allow
213        # setting it for our loop.
214        asyncio.set_event_loop(self._event_loop)
215        self._thread_bootstrapped = True
216        try:
217            if self._cache_time_limit is not None:
218                _prunetask = self._event_loop.create_task(
219                    self._time_prune_cache()
220                )
221            self._event_loop.run_forever()
222        except BaseException:
223            # If this ever goes down we're in trouble; we won't be able
224            # to log about it though. Try to make some noise however we
225            # can.
226            print('LogHandler died!!!', file=sys.stderr)
227            import traceback
228
229            traceback.print_exc()
230            raise
231
232    async def _time_prune_cache(self) -> None:
233        assert self._cache_time_limit is not None
234        while bool(True):
235            await asyncio.sleep(61.27)
236            now = utc_now()
237            with self._cache_lock:
238                # Prune the oldest entry as long as there is a first one that
239                # is too old.
240                while (
241                    self._cache
242                    and (now - self._cache[0][1].time) >= self._cache_time_limit
243                ):
244                    popped = self._cache.popleft()
245                    self._cache_size -= popped[0]
246                    self._cache_index_offset += 1
247
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )
282
283    def _cache_slice(
284        self, start: int, end: int, step: int = 1
285    ) -> list[LogEntry]:
286        # Deque doesn't natively support slicing but we can do it manually.
287        # It sounds like rotating the deque and pulling from the beginning
288        # is the most efficient way to do this. The downside is the deque
289        # gets temporarily modified in the process so we need to make sure
290        # we're holding the lock.
291        assert self._cache_lock.locked()
292        cache = self._cache
293        cache.rotate(-start)
294        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
295        cache.rotate(start)
296        return slc
297
298    @classmethod
299    def _is_immutable_log_data(cls, data: Any) -> bool:
300        if isinstance(data, (str, bool, int, float, bytes)):
301            return True
302        if isinstance(data, tuple):
303            return all(cls._is_immutable_log_data(x) for x in data)
304        return False
305
306    def emit(self, record: logging.LogRecord) -> None:
307        # pylint: disable=too-many-branches
308        if __debug__:
309            starttime = time.monotonic()
310
311        # Called by logging to send us records.
312
313        # TODO - kill this.
314        if (
315            self._suppress_non_root_debug
316            and record.name != 'root'
317            and record.levelname == 'DEBUG'
318        ):
319            return
320
321        # Optimization: if our log args are all simple immutable values,
322        # we can just kick the whole thing over to our background thread to
323        # be formatted there at our leisure. If anything is mutable and
324        # thus could possibly change between now and then or if we want
325        # to do immediate file echoing then we need to bite the bullet
326        # and do that stuff here at the call site.
327        fast_path = self._echofile is None and self._is_immutable_log_data(
328            record.args
329        )
330
331        # Note: just assuming types are correct here, but they'll be
332        # checked properly when the resulting LogEntry gets exported.
333        labels: dict[str, str] | None = getattr(record, 'labels', None)
334        if labels is None:
335            labels = {}
336
337        if fast_path:
338            if __debug__:
339                formattime = echotime = time.monotonic()
340            self._event_loop.call_soon_threadsafe(
341                tpartial(
342                    self._emit_in_thread,
343                    record.name,
344                    record.levelno,
345                    record.created,
346                    record,
347                    labels,
348                )
349            )
350        else:
351            # Slow case; do formatting and echoing here at the log call
352            # site.
353            msg = self.format(record)
354
355            if __debug__:
356                formattime = time.monotonic()
357
358            # Also immediately print pretty colored output to our echo file
359            # (generally stderr). We do this part here instead of in our bg
360            # thread because the delay can throw off command line prompts or
361            # make tight debugging harder.
362            if self._echofile is not None:
363                ends = LEVELNO_COLOR_CODES.get(record.levelno)
364                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
365                if ends is not None:
366                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
367                else:
368                    self._echofile.write(f'{namepre}{msg}\n')
369                self._echofile.flush()
370
371            if __debug__:
372                echotime = time.monotonic()
373
374            self._event_loop.call_soon_threadsafe(
375                tpartial(
376                    self._emit_in_thread,
377                    record.name,
378                    record.levelno,
379                    record.created,
380                    msg,
381                    labels,
382                )
383            )
384
385        if __debug__:
386            # Make noise if we're taking a significant amount of time here.
387            # Limit the noise to once every so often though; otherwise we
388            # could get a feedback loop where every log emit results in a
389            # warning log which results in another, etc.
390            now = time.monotonic()
391            # noinspection PyUnboundLocalVariable
392            duration = now - starttime  # pyright: ignore
393            # noinspection PyUnboundLocalVariable
394            format_duration = formattime - starttime  # pyright: ignore
395            # noinspection PyUnboundLocalVariable
396            echo_duration = echotime - formattime  # pyright: ignore
397            if duration > 0.05 and (
398                self._last_slow_emit_warning_time is None
399                or now > self._last_slow_emit_warning_time + 10.0
400            ):
401                # Logging calls from *within* a logging handler
402                # sounds sketchy, so let's just kick this over to
403                # the bg event loop thread we've already got.
404                self._last_slow_emit_warning_time = now
405                self._event_loop.call_soon_threadsafe(
406                    tpartial(
407                        logging.warning,
408                        'efro.log.LogHandler emit took too long'
409                        ' (%.2fs total; %.2fs format, %.2fs echo,'
410                        ' fast_path=%s).',
411                        duration,
412                        format_duration,
413                        echo_duration,
414                        fast_path,
415                    )
416                )
417
418    def _emit_in_thread(
419        self,
420        name: str,
421        levelno: int,
422        created: float,
423        message: str | logging.LogRecord,
424        labels: dict[str, str],
425    ) -> None:
426        try:
427            # If they passed a raw record here, bake it down to a string.
428            if isinstance(message, logging.LogRecord):
429                message = self.format(message)
430
431            self._emit_entry(
432                LogEntry(
433                    name=name,
434                    message=message,
435                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
436                    time=datetime.datetime.fromtimestamp(
437                        created, datetime.timezone.utc
438                    ),
439                    labels=labels,
440                )
441            )
442        except Exception:
443            import traceback
444
445            traceback.print_exc(file=self._echofile)
446
447    def file_write(self, name: str, output: str) -> None:
448        """Send raw stdout/stderr output to the logger to be collated."""
449
450        self._event_loop.call_soon_threadsafe(
451            tpartial(self._file_write_in_thread, name, output)
452        )
453
454    def _file_write_in_thread(self, name: str, output: str) -> None:
455        try:
456            assert name in ('stdout', 'stderr')
457
458            # Here we try to be somewhat smart about breaking arbitrary
459            # print output into discrete log entries.
460
461            self._file_chunks[name].append(output)
462
463            # Individual parts of a print come across as separate writes,
464            # and the end of a print will be a standalone '\n' by default.
465            # Let's use that as a hint that we're likely at the end of
466            # a full print statement and ship what we've got.
467            if output == '\n':
468                self._ship_file_chunks(name, cancel_ship_task=True)
469            else:
470                # By default just keep adding chunks.
471                # However we keep a timer running anytime we've got
472                # unshipped chunks so that we can ship what we've got
473                # after a short bit if we never get a newline.
474                ship_task = self._file_chunk_ship_task[name]
475                if ship_task is None:
476                    self._file_chunk_ship_task[
477                        name
478                    ] = self._event_loop.create_task(
479                        self._ship_chunks_task(name),
480                        name='log ship file chunks',
481                    )
482
483        except Exception:
484            import traceback
485
486            traceback.print_exc(file=self._echofile)
487
488    def file_flush(self, name: str) -> None:
489        """Send raw stdout/stderr flush to the logger to be collated."""
490
491        self._event_loop.call_soon_threadsafe(
492            tpartial(self._file_flush_in_thread, name)
493        )
494
495    def _file_flush_in_thread(self, name: str) -> None:
496        try:
497            assert name in ('stdout', 'stderr')
498
499            # Immediately ship whatever chunks we've got.
500            if self._file_chunks[name]:
501                self._ship_file_chunks(name, cancel_ship_task=True)
502
503        except Exception:
504            import traceback
505
506            traceback.print_exc(file=self._echofile)
507
508    async def _ship_chunks_task(self, name: str) -> None:
509        self._ship_file_chunks(name, cancel_ship_task=False)
510
511    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
512        # Note: Raw print input generally ends in a newline, but that is
513        # redundant when we break things into log entries and results
514        # in extra empty lines. So strip off a single trailing newline.
515        text = ''.join(self._file_chunks[name]).removesuffix('\n')
516
517        self._emit_entry(
518            LogEntry(
519                name=name, message=text, level=LogLevel.INFO, time=utc_now()
520            )
521        )
522        self._file_chunks[name] = []
523        ship_task = self._file_chunk_ship_task[name]
524        if cancel_ship_task and ship_task is not None:
525            ship_task.cancel()
526        self._file_chunk_ship_task[name] = None
527
528    def _emit_entry(self, entry: LogEntry) -> None:
529        assert current_thread() is self._thread
530
531        # Store to our cache.
532        if self._cache_size_limit > 0:
533            with self._cache_lock:
534                # Do a rough calc of how many bytes this entry consumes.
535                entry_size = sum(
536                    sys.getsizeof(x)
537                    for x in (
538                        entry,
539                        entry.name,
540                        entry.message,
541                        entry.level,
542                        entry.time,
543                    )
544                )
545                self._cache.append((entry_size, entry))
546                self._cache_size += entry_size
547
548                # Prune old until we are back at or under our limit.
549                while self._cache_size > self._cache_size_limit:
550                    popped = self._cache.popleft()
551                    self._cache_size -= popped[0]
552                    self._cache_index_offset += 1
553
554        # Pass to callbacks.
555        for call in self._callbacks:
556            self._run_callback_on_entry(call, entry)
557
558        # Dump to our structured log file.
559        # TODO: should set a timer for flushing; don't flush every line.
560        if self._file is not None:
561            entry_s = dataclass_to_json(entry)
562            assert '\n' not in entry_s  # Make sure its a single line.
563            print(entry_s, file=self._file, flush=True)
564
565    def _run_callback_on_entry(
566        self, callback: Callable[[LogEntry], None], entry: LogEntry
567    ) -> None:
568        """Run a callback and handle any errors."""
569        try:
570            callback(entry)
571        except Exception:
572            # Only print the first callback error to avoid insanity.
573            if not self._printed_callback_error:
574                import traceback
575
576                traceback.print_exc(file=self._echofile)
577                self._printed_callback_error = True

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

LogHandler( path: str | pathlib.Path | None, echofile: typing.TextIO | None, suppress_non_root_debug: bool, cache_size_limit: int, cache_time_limit: datetime.timedelta | None)
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def add_callback( self, call: Callable[[LogEntry], NoneType], feed_existing_logs: bool = False) -> None:
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            tpartial(self._add_callback_in_thread, call, feed_existing_logs)
183        )

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

def get_cached( self, start_index: int = 0, max_entries: int | None = None) -> LogArchive:
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

def emit(self, record: logging.LogRecord) -> None:
306    def emit(self, record: logging.LogRecord) -> None:
307        # pylint: disable=too-many-branches
308        if __debug__:
309            starttime = time.monotonic()
310
311        # Called by logging to send us records.
312
313        # TODO - kill this.
314        if (
315            self._suppress_non_root_debug
316            and record.name != 'root'
317            and record.levelname == 'DEBUG'
318        ):
319            return
320
321        # Optimization: if our log args are all simple immutable values,
322        # we can just kick the whole thing over to our background thread to
323        # be formatted there at our leisure. If anything is mutable and
324        # thus could possibly change between now and then or if we want
325        # to do immediate file echoing then we need to bite the bullet
326        # and do that stuff here at the call site.
327        fast_path = self._echofile is None and self._is_immutable_log_data(
328            record.args
329        )
330
331        # Note: just assuming types are correct here, but they'll be
332        # checked properly when the resulting LogEntry gets exported.
333        labels: dict[str, str] | None = getattr(record, 'labels', None)
334        if labels is None:
335            labels = {}
336
337        if fast_path:
338            if __debug__:
339                formattime = echotime = time.monotonic()
340            self._event_loop.call_soon_threadsafe(
341                tpartial(
342                    self._emit_in_thread,
343                    record.name,
344                    record.levelno,
345                    record.created,
346                    record,
347                    labels,
348                )
349            )
350        else:
351            # Slow case; do formatting and echoing here at the log call
352            # site.
353            msg = self.format(record)
354
355            if __debug__:
356                formattime = time.monotonic()
357
358            # Also immediately print pretty colored output to our echo file
359            # (generally stderr). We do this part here instead of in our bg
360            # thread because the delay can throw off command line prompts or
361            # make tight debugging harder.
362            if self._echofile is not None:
363                ends = LEVELNO_COLOR_CODES.get(record.levelno)
364                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
365                if ends is not None:
366                    self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n')
367                else:
368                    self._echofile.write(f'{namepre}{msg}\n')
369                self._echofile.flush()
370
371            if __debug__:
372                echotime = time.monotonic()
373
374            self._event_loop.call_soon_threadsafe(
375                tpartial(
376                    self._emit_in_thread,
377                    record.name,
378                    record.levelno,
379                    record.created,
380                    msg,
381                    labels,
382                )
383            )
384
385        if __debug__:
386            # Make noise if we're taking a significant amount of time here.
387            # Limit the noise to once every so often though; otherwise we
388            # could get a feedback loop where every log emit results in a
389            # warning log which results in another, etc.
390            now = time.monotonic()
391            # noinspection PyUnboundLocalVariable
392            duration = now - starttime  # pyright: ignore
393            # noinspection PyUnboundLocalVariable
394            format_duration = formattime - starttime  # pyright: ignore
395            # noinspection PyUnboundLocalVariable
396            echo_duration = echotime - formattime  # pyright: ignore
397            if duration > 0.05 and (
398                self._last_slow_emit_warning_time is None
399                or now > self._last_slow_emit_warning_time + 10.0
400            ):
401                # Logging calls from *within* a logging handler
402                # sounds sketchy, so let's just kick this over to
403                # the bg event loop thread we've already got.
404                self._last_slow_emit_warning_time = now
405                self._event_loop.call_soon_threadsafe(
406                    tpartial(
407                        logging.warning,
408                        'efro.log.LogHandler emit took too long'
409                        ' (%.2fs total; %.2fs format, %.2fs echo,'
410                        ' fast_path=%s).',
411                        duration,
412                        format_duration,
413                        echo_duration,
414                        fast_path,
415                    )
416                )

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

def file_write(self, name: str, output: str) -> None:
447    def file_write(self, name: str, output: str) -> None:
448        """Send raw stdout/stderr output to the logger to be collated."""
449
450        self._event_loop.call_soon_threadsafe(
451            tpartial(self._file_write_in_thread, name, output)
452        )

Send raw stdout/stderr output to the logger to be collated.

def file_flush(self, name: str) -> None:
488    def file_flush(self, name: str) -> None:
489        """Send raw stdout/stderr flush to the logger to be collated."""
490
491        self._event_loop.call_soon_threadsafe(
492            tpartial(self._file_flush_in_thread, name)
493        )

Send raw stdout/stderr flush to the logger to be collated.

Inherited Members
logging.Handler
level
formatter
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
close
handleError
logging.Filterer
filters
addFilter
removeFilter
filter
class FileLogEcho:
580class FileLogEcho:
581    """A file-like object for forwarding stdout/stderr to a LogHandler."""
582
583    def __init__(
584        self, original: TextIO, name: str, handler: LogHandler
585    ) -> None:
586        assert name in ('stdout', 'stderr')
587        self._original = original
588        self._name = name
589        self._handler = handler
590
591    def write(self, output: Any) -> None:
592        """Override standard write call."""
593        self._original.write(output)
594        self._handler.file_write(self._name, output)
595
596    def flush(self) -> None:
597        """Flush the file."""
598        self._original.flush()
599
600        # We also use this as a hint to ship whatever file chunks
601        # we've accumulated (we have to try and be smart about breaking
602        # our arbitrary file output into discrete entries).
603        self._handler.file_flush(self._name)
604
605    def isatty(self) -> bool:
606        """Are we a terminal?"""
607        return self._original.isatty()

A file-like object for forwarding stdout/stderr to a LogHandler.

FileLogEcho(original: <class 'TextIO'>, name: str, handler: LogHandler)
583    def __init__(
584        self, original: TextIO, name: str, handler: LogHandler
585    ) -> None:
586        assert name in ('stdout', 'stderr')
587        self._original = original
588        self._name = name
589        self._handler = handler
def write(self, output: Any) -> None:
591    def write(self, output: Any) -> None:
592        """Override standard write call."""
593        self._original.write(output)
594        self._handler.file_write(self._name, output)

Override standard write call.

def flush(self) -> None:
596    def flush(self) -> None:
597        """Flush the file."""
598        self._original.flush()
599
600        # We also use this as a hint to ship whatever file chunks
601        # we've accumulated (we have to try and be smart about breaking
602        # our arbitrary file output into discrete entries).
603        self._handler.file_flush(self._name)

Flush the file.

def isatty(self) -> bool:
605    def isatty(self) -> bool:
606        """Are we a terminal?"""
607        return self._original.isatty()

Are we a terminal?

def setup_logging( log_path: str | pathlib.Path | None, level: LogLevel, suppress_non_root_debug: bool = False, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None) -> LogHandler:
610def setup_logging(
611    log_path: str | Path | None,
612    level: LogLevel,
613    suppress_non_root_debug: bool = False,
614    log_stdout_stderr: bool = False,
615    echo_to_stderr: bool = True,
616    cache_size_limit: int = 0,
617    cache_time_limit: datetime.timedelta | None = None,
618) -> LogHandler:
619    """Set up our logging environment.
620
621    Returns the custom handler which can be used to fetch information
622    about logs that have passed through it. (worst log-levels, caches, etc.).
623    """
624
625    lmap = {
626        LogLevel.DEBUG: logging.DEBUG,
627        LogLevel.INFO: logging.INFO,
628        LogLevel.WARNING: logging.WARNING,
629        LogLevel.ERROR: logging.ERROR,
630        LogLevel.CRITICAL: logging.CRITICAL,
631    }
632
633    # Wire logger output to go to a structured log file.
634    # Also echo it to stderr IF we're running in a terminal.
635    # UPDATE: Actually gonna always go to stderr. Is there a
636    # reason we shouldn't? This makes debugging possible if all
637    # we have is access to a non-interactive terminal or file dump.
638    # We could add a '--quiet' arg or whatnot to change this behavior.
639
640    # Note: by passing in the *original* stderr here before we
641    # (potentially) replace it, we ensure that our log echos
642    # won't themselves be intercepted and sent to the logger
643    # which would create an infinite loop.
644    loghandler = LogHandler(
645        log_path,
646        echofile=sys.stderr if echo_to_stderr else None,
647        suppress_non_root_debug=suppress_non_root_debug,
648        cache_size_limit=cache_size_limit,
649        cache_time_limit=cache_time_limit,
650    )
651
652    # Note: going ahead with force=True here so that we replace any
653    # existing logger. Though we warn if it looks like we are doing
654    # that so we can try to avoid creating the first one.
655    had_previous_handlers = bool(logging.root.handlers)
656    logging.basicConfig(
657        level=lmap[level],
658        # format='%(name)s: %(message)s',
659        # We dump *only* the message here. We pass various log record bits
660        # around and format things fancier where they end up.
661        format='%(message)s',
662        handlers=[loghandler],
663        force=True,
664    )
665    if had_previous_handlers:
666        logging.warning(
667            'setup_logging: Replacing existing handlers.'
668            ' Something may have logged before expected.'
669        )
670
671    # Optionally intercept Python's stdout/stderr output and generate
672    # log entries from it.
673    if log_stdout_stderr:
674        sys.stdout = FileLogEcho(  # type: ignore
675            sys.stdout, 'stdout', loghandler
676        )
677        sys.stderr = FileLogEcho(  # type: ignore
678            sys.stderr, 'stderr', loghandler
679        )
680
681    return loghandler

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).