efro.log

Logging functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Logging functionality."""
  4from __future__ import annotations
  5
  6import sys
  7import time
  8import asyncio
  9import logging
 10import datetime
 11import itertools
 12from enum import Enum
 13from functools import partial
 14from collections import deque
 15from dataclasses import dataclass, field
 16from typing import TYPE_CHECKING, Annotated, override
 17from threading import Thread, current_thread, Lock
 18
 19from efro.util import utc_now
 20from efro.terminal import Clr
 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
 22
 23if TYPE_CHECKING:
 24    from pathlib import Path
 25    from typing import Any, Callable, TextIO
 26
 27
 28class LogLevel(Enum):
 29    """Severity level for a log entry.
 30
 31    These enums have numeric values so they can be compared in severity.
 32    Note that these values are not currently interchangeable with the
 33    logging.ERROR, logging.DEBUG, etc. values.
 34    """
 35
 36    DEBUG = 0
 37    INFO = 1
 38    WARNING = 2
 39    ERROR = 3
 40    CRITICAL = 4
 41
 42    @property
 43    def python_logging_level(self) -> int:
 44        """Give the corresponding logging level."""
 45        return LOG_LEVEL_LEVELNOS[self]
 46
 47    @classmethod
 48    def from_python_logging_level(cls, levelno: int) -> LogLevel:
 49        """Given a Python logging level, return a LogLevel."""
 50        return LEVELNO_LOG_LEVELS[levelno]
 51
 52
 53# Python logging levels from LogLevels
 54LOG_LEVEL_LEVELNOS = {
 55    LogLevel.DEBUG: logging.DEBUG,
 56    LogLevel.INFO: logging.INFO,
 57    LogLevel.WARNING: logging.WARNING,
 58    LogLevel.ERROR: logging.ERROR,
 59    LogLevel.CRITICAL: logging.CRITICAL,
 60}
 61
 62# LogLevels from Python logging levels
 63LEVELNO_LOG_LEVELS = {
 64    logging.DEBUG: LogLevel.DEBUG,
 65    logging.INFO: LogLevel.INFO,
 66    logging.WARNING: LogLevel.WARNING,
 67    logging.ERROR: LogLevel.ERROR,
 68    logging.CRITICAL: LogLevel.CRITICAL,
 69}
 70
 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {
 72    logging.DEBUG: (Clr.CYN, Clr.RST),
 73    logging.INFO: ('', ''),
 74    logging.WARNING: (Clr.YLW, Clr.RST),
 75    logging.ERROR: (Clr.RED, Clr.RST),
 76    logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST),
 77}
 78
 79
 80@ioprepped
 81@dataclass
 82class LogEntry:
 83    """Single logged message."""
 84
 85    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
 86    message: Annotated[str, IOAttrs('m')]
 87    level: Annotated[LogLevel, IOAttrs('l')]
 88    time: Annotated[datetime.datetime, IOAttrs('t')]
 89
 90    # We support arbitrary string labels per log entry which can be
 91    # incorporated into custom log processing. To populate this, our
 92    # LogHandler class looks for a 'labels' dict passed in the optional
 93    # 'extra' dict arg to standard Python log calls.
 94    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
 95        field(default_factory=dict)
 96    )
 97
 98
 99@ioprepped
100@dataclass
101class LogArchive:
102    """Info and data for a log."""
103
104    # Total number of entries submitted to the log.
105    log_size: Annotated[int, IOAttrs('t')]
106
107    # Offset for the entries contained here.
108    # (10 means our first entry is the 10th in the log, etc.)
109    start_index: Annotated[int, IOAttrs('c')]
110
111    entries: Annotated[list[LogEntry], IOAttrs('e')]
112
113
114class LogHandler(logging.Handler):
115    """Fancy-pants handler for logging output.
116
117    Writes logs to disk in structured json format and echoes them
118    to stdout/stderr with pretty colors.
119    """
120
121    _event_loop: asyncio.AbstractEventLoop
122
123    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
124    # Otherwise we can get infinite loops as those prints come back to us
125    # as new log entries.
126
127    def __init__(
128        self,
129        path: str | Path | None,
130        echofile: TextIO | None,
131        suppress_non_root_debug: bool,
132        cache_size_limit: int,
133        cache_time_limit: datetime.timedelta | None,
134    ):
135        super().__init__()
136        # pylint: disable=consider-using-with
137        self._file = None if path is None else open(path, 'w', encoding='utf-8')
138        self._echofile = echofile
139        self._callbacks: list[Callable[[LogEntry], None]] = []
140        self._suppress_non_root_debug = suppress_non_root_debug
141        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
142        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
143            'stdout': None,
144            'stderr': None,
145        }
146        self._cache_size = 0
147        assert cache_size_limit >= 0
148        self._cache_size_limit = cache_size_limit
149        self._cache_time_limit = cache_time_limit
150        self._cache = deque[tuple[int, LogEntry]]()
151        self._cache_index_offset = 0
152        self._cache_lock = Lock()
153        self._printed_callback_error = False
154        self._thread_bootstrapped = False
155        self._thread = Thread(target=self._log_thread_main, daemon=True)
156        if __debug__:
157            self._last_slow_emit_warning_time: float | None = None
158        self._thread.start()
159
160        # Spin until our thread is up and running; otherwise we could
161        # wind up trying to push stuff to our event loop before the
162        # loop exists.
163        while not self._thread_bootstrapped:
164            time.sleep(0.001)
165
166    def add_callback(
167        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
168    ) -> None:
169        """Add a callback to be run for each LogEntry.
170
171        Note that this callback will always run in a background thread.
172        Passing True for feed_existing_logs will cause all cached logs
173        in the handler to be fed to the callback (still in the
174        background thread though).
175        """
176
177        # Kick this over to our bg thread to add the callback and
178        # process cached entries at the same time to ensure there are no
179        # race conditions that could cause entries to be skipped/etc.
180        self._event_loop.call_soon_threadsafe(
181            partial(self._add_callback_in_thread, call, feed_existing_logs)
182        )
183
184    def _add_callback_in_thread(
185        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
186    ) -> None:
187        """Add a callback to be run for each LogEntry.
188
189        Note that this callback will always run in a background thread.
190        Passing True for feed_existing_logs will cause all cached logs
191        in the handler to be fed to the callback (still in the
192        background thread though).
193        """
194        assert current_thread() is self._thread
195        self._callbacks.append(call)
196
197        # Run all of our cached entries through the new callback if desired.
198        if feed_existing_logs and self._cache_size_limit > 0:
199            with self._cache_lock:
200                for _id, entry in self._cache:
201                    self._run_callback_on_entry(call, entry)
202
203    def _log_thread_main(self) -> None:
204        self._event_loop = asyncio.new_event_loop()
205
206        # In our background thread event loop we do a fair amount of
207        # slow synchronous stuff such as mucking with the log cache.
208        # Let's avoid getting tons of warnings about this in debug mode.
209        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
210
211        # NOTE: if we ever use default threadpool at all we should allow
212        # setting it for our loop.
213        asyncio.set_event_loop(self._event_loop)
214        self._thread_bootstrapped = True
215        try:
216            if self._cache_time_limit is not None:
217                _prunetask = self._event_loop.create_task(
218                    self._time_prune_cache()
219                )
220            self._event_loop.run_forever()
221        except BaseException:
222            # If this ever goes down we're in trouble; we won't be able
223            # to log about it though. Try to make some noise however we
224            # can.
225            print('LogHandler died!!!', file=sys.stderr)
226            import traceback
227
228            traceback.print_exc()
229            raise
230
231    async def _time_prune_cache(self) -> None:
232        assert self._cache_time_limit is not None
233        while bool(True):
234            await asyncio.sleep(61.27)
235            now = utc_now()
236            with self._cache_lock:
237                # Prune the oldest entry as long as there is a first one that
238                # is too old.
239                while (
240                    self._cache
241                    and (now - self._cache[0][1].time) >= self._cache_time_limit
242                ):
243                    popped = self._cache.popleft()
244                    self._cache_size -= popped[0]
245                    self._cache_index_offset += 1
246
247    def get_cached(
248        self, start_index: int = 0, max_entries: int | None = None
249    ) -> LogArchive:
250        """Build and return an archive of cached log entries.
251
252        This will only include entries that have been processed by the
253        background thread, so may not include just-submitted logs or
254        entries for partially written stdout/stderr lines.
255        Entries from the range [start_index:start_index+max_entries]
256        which are still present in the cache will be returned.
257        """
258
259        assert start_index >= 0
260        if max_entries is not None:
261            assert max_entries >= 0
262        with self._cache_lock:
263            # Transform start_index to our present cache space.
264            start_index -= self._cache_index_offset
265            # Calc end-index in our present cache space.
266            end_index = (
267                len(self._cache)
268                if max_entries is None
269                else start_index + max_entries
270            )
271
272            # Clamp both indexes to both ends of our present space.
273            start_index = max(0, min(start_index, len(self._cache)))
274            end_index = max(0, min(end_index, len(self._cache)))
275
276            return LogArchive(
277                log_size=self._cache_index_offset + len(self._cache),
278                start_index=start_index + self._cache_index_offset,
279                entries=self._cache_slice(start_index, end_index),
280            )
281
282    def _cache_slice(
283        self, start: int, end: int, step: int = 1
284    ) -> list[LogEntry]:
285        # Deque doesn't natively support slicing but we can do it manually.
286        # It sounds like rotating the deque and pulling from the beginning
287        # is the most efficient way to do this. The downside is the deque
288        # gets temporarily modified in the process so we need to make sure
289        # we're holding the lock.
290        assert self._cache_lock.locked()
291        cache = self._cache
292        cache.rotate(-start)
293        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
294        cache.rotate(start)
295        return slc
296
297    @classmethod
298    def _is_immutable_log_data(cls, data: Any) -> bool:
299        if isinstance(data, (str, bool, int, float, bytes)):
300            return True
301        if isinstance(data, tuple):
302            return all(cls._is_immutable_log_data(x) for x in data)
303        return False
304
305    def call_in_thread(self, call: Callable[[], Any]) -> None:
306        """Submit a call to be run in the logging background thread."""
307        self._event_loop.call_soon_threadsafe(call)
308
309    @override
310    def emit(self, record: logging.LogRecord) -> None:
311        # pylint: disable=too-many-branches
312        if __debug__:
313            starttime = time.monotonic()
314
315        # Called by logging to send us records.
316
317        # TODO - kill this.
318        if (
319            self._suppress_non_root_debug
320            and record.name != 'root'
321            and record.levelname == 'DEBUG'
322        ):
323            return
324
325        # Optimization: if our log args are all simple immutable values,
326        # we can just kick the whole thing over to our background thread to
327        # be formatted there at our leisure. If anything is mutable and
328        # thus could possibly change between now and then or if we want
329        # to do immediate file echoing then we need to bite the bullet
330        # and do that stuff here at the call site.
331        fast_path = self._echofile is None and self._is_immutable_log_data(
332            record.args
333        )
334
335        # Note: just assuming types are correct here, but they'll be
336        # checked properly when the resulting LogEntry gets exported.
337        labels: dict[str, str] | None = getattr(record, 'labels', None)
338        if labels is None:
339            labels = {}
340
341        if fast_path:
342            if __debug__:
343                formattime = echotime = time.monotonic()
344            self._event_loop.call_soon_threadsafe(
345                partial(
346                    self._emit_in_thread,
347                    record.name,
348                    record.levelno,
349                    record.created,
350                    record,
351                    labels,
352                )
353            )
354        else:
355            # Slow case; do formatting and echoing here at the log call
356            # site.
357            msg = self.format(record)
358
359            if __debug__:
360                formattime = time.monotonic()
361
362            # Also immediately print pretty colored output to our echo file
363            # (generally stderr). We do this part here instead of in our bg
364            # thread because the delay can throw off command line prompts or
365            # make tight debugging harder.
366            if self._echofile is not None:
367                # try:
368                # if self._report_blocking_io_on_echo_error:
369                #     premsg = (
370                #         'WARNING: BlockingIOError ON LOG ECHO OUTPUT;'
371                #         ' YOU ARE PROBABLY MISSING LOGS\n'
372                #     )
373                #     self._report_blocking_io_on_echo_error = False
374                # else:
375                #     premsg = ''
376                ends = LEVELNO_COLOR_CODES.get(record.levelno)
377                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
378                if ends is not None:
379                    self._echofile.write(
380                        f'{namepre}{ends[0]}'
381                        f'{msg}{ends[1]}\n'
382                        # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n'
383                    )
384                else:
385                    self._echofile.write(f'{namepre}{msg}\n')
386                self._echofile.flush()
387                # except BlockingIOError:
388                #     # Ran into this when doing a bunch of logging; assuming
389                #     # this is asyncio's doing?.. For now trying to survive
390                #     # the error but telling the user something is probably
391                #     # missing in their output.
392                #     self._report_blocking_io_on_echo_error = True
393
394            if __debug__:
395                echotime = time.monotonic()
396
397            self._event_loop.call_soon_threadsafe(
398                partial(
399                    self._emit_in_thread,
400                    record.name,
401                    record.levelno,
402                    record.created,
403                    msg,
404                    labels,
405                )
406            )
407
408        if __debug__:
409            # pylint: disable=used-before-assignment
410            # Make noise if we're taking a significant amount of time here.
411            # Limit the noise to once every so often though; otherwise we
412            # could get a feedback loop where every log emit results in a
413            # warning log which results in another, etc.
414            now = time.monotonic()
415            # noinspection PyUnboundLocalVariable
416            duration = now - starttime  # pyright: ignore
417            # noinspection PyUnboundLocalVariable
418            format_duration = formattime - starttime  # pyright: ignore
419            # noinspection PyUnboundLocalVariable
420            echo_duration = echotime - formattime  # pyright: ignore
421            if duration > 0.05 and (
422                self._last_slow_emit_warning_time is None
423                or now > self._last_slow_emit_warning_time + 10.0
424            ):
425                # Logging calls from *within* a logging handler
426                # sounds sketchy, so let's just kick this over to
427                # the bg event loop thread we've already got.
428                self._last_slow_emit_warning_time = now
429                self._event_loop.call_soon_threadsafe(
430                    partial(
431                        logging.warning,
432                        'efro.log.LogHandler emit took too long'
433                        ' (%.2fs total; %.2fs format, %.2fs echo,'
434                        ' fast_path=%s).',
435                        duration,
436                        format_duration,
437                        echo_duration,
438                        fast_path,
439                    )
440                )
441
442    def _emit_in_thread(
443        self,
444        name: str,
445        levelno: int,
446        created: float,
447        message: str | logging.LogRecord,
448        labels: dict[str, str],
449    ) -> None:
450        try:
451            # If they passed a raw record here, bake it down to a string.
452            if isinstance(message, logging.LogRecord):
453                message = self.format(message)
454
455            self._emit_entry(
456                LogEntry(
457                    name=name,
458                    message=message,
459                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
460                    time=datetime.datetime.fromtimestamp(
461                        created, datetime.timezone.utc
462                    ),
463                    labels=labels,
464                )
465            )
466        except Exception:
467            import traceback
468
469            traceback.print_exc(file=self._echofile)
470
471    def file_write(self, name: str, output: str) -> None:
472        """Send raw stdout/stderr output to the logger to be collated."""
473
474        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
475        # lines in stack traces get written as lots of individual '^'
476        # writes. It feels a bit dirty to be pushing a deferred call to
477        # another thread for each character. Perhaps should do some sort
478        # of basic accumulation here?
479        self._event_loop.call_soon_threadsafe(
480            partial(self._file_write_in_thread, name, output)
481        )
482
483    def _file_write_in_thread(self, name: str, output: str) -> None:
484        try:
485            assert name in ('stdout', 'stderr')
486
487            # Here we try to be somewhat smart about breaking arbitrary
488            # print output into discrete log entries.
489
490            self._file_chunks[name].append(output)
491
492            # Individual parts of a print come across as separate writes,
493            # and the end of a print will be a standalone '\n' by default.
494            # Let's use that as a hint that we're likely at the end of
495            # a full print statement and ship what we've got.
496            if output == '\n':
497                self._ship_file_chunks(name, cancel_ship_task=True)
498            else:
499                # By default just keep adding chunks.
500                # However we keep a timer running anytime we've got
501                # unshipped chunks so that we can ship what we've got
502                # after a short bit if we never get a newline.
503                ship_task = self._file_chunk_ship_task[name]
504                if ship_task is None:
505                    self._file_chunk_ship_task[name] = (
506                        self._event_loop.create_task(
507                            self._ship_chunks_task(name),
508                            name='log ship file chunks',
509                        )
510                    )
511
512        except Exception:
513            import traceback
514
515            traceback.print_exc(file=self._echofile)
516
517    def shutdown(self) -> None:
518        """Block until all pending logs/prints are done."""
519        done = False
520        self.file_flush('stdout')
521        self.file_flush('stderr')
522
523        def _set_done() -> None:
524            nonlocal done
525            done = True
526
527        self._event_loop.call_soon_threadsafe(_set_done)
528
529        starttime = time.monotonic()
530        while not done:
531            if time.monotonic() - starttime > 5.0:
532                print('LogHandler shutdown hung!!!', file=sys.stderr)
533                break
534            time.sleep(0.01)
535
536    def file_flush(self, name: str) -> None:
537        """Send raw stdout/stderr flush to the logger to be collated."""
538
539        self._event_loop.call_soon_threadsafe(
540            partial(self._file_flush_in_thread, name)
541        )
542
543    def _file_flush_in_thread(self, name: str) -> None:
544        try:
545            assert name in ('stdout', 'stderr')
546
547            # Immediately ship whatever chunks we've got.
548            if self._file_chunks[name]:
549                self._ship_file_chunks(name, cancel_ship_task=True)
550
551        except Exception:
552            import traceback
553
554            traceback.print_exc(file=self._echofile)
555
556    async def _ship_chunks_task(self, name: str) -> None:
557        # Note: it's important we sleep here for a moment. Otherwise,
558        # things like '^^^^^^^^^^^^' lines in stack traces, which come
559        # through as lots of individual '^' writes, tend to get broken
560        # into lots of tiny little lines by us.
561        await asyncio.sleep(0.01)
562        self._ship_file_chunks(name, cancel_ship_task=False)
563
564    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
565        # Note: Raw print input generally ends in a newline, but that is
566        # redundant when we break things into log entries and results in
567        # extra empty lines. So strip off a single trailing newline if
568        # one is present.
569        text = ''.join(self._file_chunks[name]).removesuffix('\n')
570
571        self._emit_entry(
572            LogEntry(
573                name=name, message=text, level=LogLevel.INFO, time=utc_now()
574            )
575        )
576        self._file_chunks[name] = []
577        ship_task = self._file_chunk_ship_task[name]
578        if cancel_ship_task and ship_task is not None:
579            ship_task.cancel()
580        self._file_chunk_ship_task[name] = None
581
582    def _emit_entry(self, entry: LogEntry) -> None:
583        assert current_thread() is self._thread
584
585        # Store to our cache.
586        if self._cache_size_limit > 0:
587            with self._cache_lock:
588                # Do a rough calc of how many bytes this entry consumes.
589                entry_size = sum(
590                    sys.getsizeof(x)
591                    for x in (
592                        entry,
593                        entry.name,
594                        entry.message,
595                        entry.level,
596                        entry.time,
597                    )
598                )
599                self._cache.append((entry_size, entry))
600                self._cache_size += entry_size
601
602                # Prune old until we are back at or under our limit.
603                while self._cache_size > self._cache_size_limit:
604                    popped = self._cache.popleft()
605                    self._cache_size -= popped[0]
606                    self._cache_index_offset += 1
607
608        # Pass to callbacks.
609        for call in self._callbacks:
610            self._run_callback_on_entry(call, entry)
611
612        # Dump to our structured log file.
613        # TODO: should set a timer for flushing; don't flush every line.
614        if self._file is not None:
615            entry_s = dataclass_to_json(entry)
616            assert '\n' not in entry_s  # Make sure its a single line.
617            print(entry_s, file=self._file, flush=True)
618
619    def _run_callback_on_entry(
620        self, callback: Callable[[LogEntry], None], entry: LogEntry
621    ) -> None:
622        """Run a callback and handle any errors."""
623        try:
624            callback(entry)
625        except Exception:
626            # Only print the first callback error to avoid insanity.
627            if not self._printed_callback_error:
628                import traceback
629
630                traceback.print_exc(file=self._echofile)
631                self._printed_callback_error = True
632
633
634class FileLogEcho:
635    """A file-like object for forwarding stdout/stderr to a LogHandler."""
636
637    def __init__(
638        self, original: TextIO, name: str, handler: LogHandler
639    ) -> None:
640        assert name in ('stdout', 'stderr')
641        self._original = original
642        self._name = name
643        self._handler = handler
644
645        # Think this was a result of setting non-blocking stdin somehow;
646        # probably not needed.
647        # self._report_blocking_io_error = False
648
649    def write(self, output: Any) -> None:
650        """Override standard write call."""
651        # try:
652        # if self._report_blocking_io_error:
653        #     self._report_blocking_io_error = False
654        #     self._original.write(
655        #         'WARNING: BlockingIOError ENCOUNTERED;'
656        #         ' OUTPUT IS PROBABLY MISSING'
657        #     )
658
659        self._original.write(output)
660        # except BlockingIOError:
661        #     self._report_blocking_io_error = True
662        self._handler.file_write(self._name, output)
663
664    def flush(self) -> None:
665        """Flush the file."""
666        self._original.flush()
667
668        # We also use this as a hint to ship whatever file chunks
669        # we've accumulated (we have to try and be smart about breaking
670        # our arbitrary file output into discrete entries).
671        self._handler.file_flush(self._name)
672
673    def isatty(self) -> bool:
674        """Are we a terminal?"""
675        return self._original.isatty()
676
677
678def setup_logging(
679    log_path: str | Path | None,
680    level: LogLevel,
681    suppress_non_root_debug: bool = False,
682    log_stdout_stderr: bool = False,
683    echo_to_stderr: bool = True,
684    cache_size_limit: int = 0,
685    cache_time_limit: datetime.timedelta | None = None,
686) -> LogHandler:
687    """Set up our logging environment.
688
689    Returns the custom handler which can be used to fetch information
690    about logs that have passed through it. (worst log-levels, caches, etc.).
691    """
692
693    lmap = {
694        LogLevel.DEBUG: logging.DEBUG,
695        LogLevel.INFO: logging.INFO,
696        LogLevel.WARNING: logging.WARNING,
697        LogLevel.ERROR: logging.ERROR,
698        LogLevel.CRITICAL: logging.CRITICAL,
699    }
700
701    # Wire logger output to go to a structured log file.
702    # Also echo it to stderr IF we're running in a terminal.
703    # UPDATE: Actually gonna always go to stderr. Is there a
704    # reason we shouldn't? This makes debugging possible if all
705    # we have is access to a non-interactive terminal or file dump.
706    # We could add a '--quiet' arg or whatnot to change this behavior.
707
708    # Note: by passing in the *original* stderr here before we
709    # (potentially) replace it, we ensure that our log echos
710    # won't themselves be intercepted and sent to the logger
711    # which would create an infinite loop.
712    loghandler = LogHandler(
713        log_path,
714        echofile=sys.stderr if echo_to_stderr else None,
715        suppress_non_root_debug=suppress_non_root_debug,
716        cache_size_limit=cache_size_limit,
717        cache_time_limit=cache_time_limit,
718    )
719
720    # Note: going ahead with force=True here so that we replace any
721    # existing logger. Though we warn if it looks like we are doing
722    # that so we can try to avoid creating the first one.
723    had_previous_handlers = bool(logging.root.handlers)
724    logging.basicConfig(
725        level=lmap[level],
726        # format='%(name)s: %(message)s',
727        # We dump *only* the message here. We pass various log record bits
728        # around and format things fancier where they end up.
729        format='%(message)s',
730        handlers=[loghandler],
731        force=True,
732    )
733    if had_previous_handlers:
734        logging.warning(
735            'setup_logging: Replacing existing handlers.'
736            ' Something may have logged before expected.'
737        )
738
739    # Optionally intercept Python's stdout/stderr output and generate
740    # log entries from it.
741    if log_stdout_stderr:
742        sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler)
743        sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler)
744
745    return loghandler
class LogLevel(enum.Enum):
29class LogLevel(Enum):
30    """Severity level for a log entry.
31
32    These enums have numeric values so they can be compared in severity.
33    Note that these values are not currently interchangeable with the
34    logging.ERROR, logging.DEBUG, etc. values.
35    """
36
37    DEBUG = 0
38    INFO = 1
39    WARNING = 2
40    ERROR = 3
41    CRITICAL = 4
42
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]
47
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

DEBUG = <LogLevel.DEBUG: 0>
INFO = <LogLevel.INFO: 1>
WARNING = <LogLevel.WARNING: 2>
ERROR = <LogLevel.ERROR: 3>
CRITICAL = <LogLevel.CRITICAL: 4>
python_logging_level: int
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]

Give the corresponding logging level.

@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Given a Python logging level, return a LogLevel.

Inherited Members
enum.Enum
name
value
LOG_LEVEL_LEVELNOS = {<LogLevel.DEBUG: 0>: 10, <LogLevel.INFO: 1>: 20, <LogLevel.WARNING: 2>: 30, <LogLevel.ERROR: 3>: 40, <LogLevel.CRITICAL: 4>: 50}
LEVELNO_LOG_LEVELS = {10: <LogLevel.DEBUG: 0>, 20: <LogLevel.INFO: 1>, 30: <LogLevel.WARNING: 2>, 40: <LogLevel.ERROR: 3>, 50: <LogLevel.CRITICAL: 4>}
LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {10: ('', ''), 20: ('', ''), 30: ('', ''), 40: ('', ''), 50: ('', '')}
@ioprepped
@dataclass
class LogEntry:
81@ioprepped
82@dataclass
83class LogEntry:
84    """Single logged message."""
85
86    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
87    message: Annotated[str, IOAttrs('m')]
88    level: Annotated[LogLevel, IOAttrs('l')]
89    time: Annotated[datetime.datetime, IOAttrs('t')]
90
91    # We support arbitrary string labels per log entry which can be
92    # incorporated into custom log processing. To populate this, our
93    # LogHandler class looks for a 'labels' dict passed in the optional
94    # 'extra' dict arg to standard Python log calls.
95    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
96        field(default_factory=dict)
97    )

Single logged message.

LogEntry( name: Annotated[str, <efro.dataclassio._base.IOAttrs object>], message: Annotated[str, <efro.dataclassio._base.IOAttrs object>], level: Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object>], time: Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object>], labels: Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object>] = <factory>)
name: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1173ea3f0>]
message: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1173ea840>]
level: Annotated[LogLevel, <efro.dataclassio._base.IOAttrs object at 0x1173ea990>]
time: Annotated[datetime.datetime, <efro.dataclassio._base.IOAttrs object at 0x1173eab10>]
labels: Annotated[dict[str, str], <efro.dataclassio._base.IOAttrs object at 0x1173eaed0>]
@ioprepped
@dataclass
class LogArchive:
100@ioprepped
101@dataclass
102class LogArchive:
103    """Info and data for a log."""
104
105    # Total number of entries submitted to the log.
106    log_size: Annotated[int, IOAttrs('t')]
107
108    # Offset for the entries contained here.
109    # (10 means our first entry is the 10th in the log, etc.)
110    start_index: Annotated[int, IOAttrs('c')]
111
112    entries: Annotated[list[LogEntry], IOAttrs('e')]

Info and data for a log.

LogArchive( log_size: Annotated[int, <efro.dataclassio._base.IOAttrs object>], start_index: Annotated[int, <efro.dataclassio._base.IOAttrs object>], entries: Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object>])
log_size: Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1173efcb0>]
start_index: Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1173ed4c0>]
entries: Annotated[list[LogEntry], <efro.dataclassio._base.IOAttrs object at 0x1173ed4f0>]
class LogHandler(logging.Handler):
115class LogHandler(logging.Handler):
116    """Fancy-pants handler for logging output.
117
118    Writes logs to disk in structured json format and echoes them
119    to stdout/stderr with pretty colors.
120    """
121
122    _event_loop: asyncio.AbstractEventLoop
123
124    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
125    # Otherwise we can get infinite loops as those prints come back to us
126    # as new log entries.
127
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)
166
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            partial(self._add_callback_in_thread, call, feed_existing_logs)
183        )
184
185    def _add_callback_in_thread(
186        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
187    ) -> None:
188        """Add a callback to be run for each LogEntry.
189
190        Note that this callback will always run in a background thread.
191        Passing True for feed_existing_logs will cause all cached logs
192        in the handler to be fed to the callback (still in the
193        background thread though).
194        """
195        assert current_thread() is self._thread
196        self._callbacks.append(call)
197
198        # Run all of our cached entries through the new callback if desired.
199        if feed_existing_logs and self._cache_size_limit > 0:
200            with self._cache_lock:
201                for _id, entry in self._cache:
202                    self._run_callback_on_entry(call, entry)
203
204    def _log_thread_main(self) -> None:
205        self._event_loop = asyncio.new_event_loop()
206
207        # In our background thread event loop we do a fair amount of
208        # slow synchronous stuff such as mucking with the log cache.
209        # Let's avoid getting tons of warnings about this in debug mode.
210        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
211
212        # NOTE: if we ever use default threadpool at all we should allow
213        # setting it for our loop.
214        asyncio.set_event_loop(self._event_loop)
215        self._thread_bootstrapped = True
216        try:
217            if self._cache_time_limit is not None:
218                _prunetask = self._event_loop.create_task(
219                    self._time_prune_cache()
220                )
221            self._event_loop.run_forever()
222        except BaseException:
223            # If this ever goes down we're in trouble; we won't be able
224            # to log about it though. Try to make some noise however we
225            # can.
226            print('LogHandler died!!!', file=sys.stderr)
227            import traceback
228
229            traceback.print_exc()
230            raise
231
232    async def _time_prune_cache(self) -> None:
233        assert self._cache_time_limit is not None
234        while bool(True):
235            await asyncio.sleep(61.27)
236            now = utc_now()
237            with self._cache_lock:
238                # Prune the oldest entry as long as there is a first one that
239                # is too old.
240                while (
241                    self._cache
242                    and (now - self._cache[0][1].time) >= self._cache_time_limit
243                ):
244                    popped = self._cache.popleft()
245                    self._cache_size -= popped[0]
246                    self._cache_index_offset += 1
247
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )
282
283    def _cache_slice(
284        self, start: int, end: int, step: int = 1
285    ) -> list[LogEntry]:
286        # Deque doesn't natively support slicing but we can do it manually.
287        # It sounds like rotating the deque and pulling from the beginning
288        # is the most efficient way to do this. The downside is the deque
289        # gets temporarily modified in the process so we need to make sure
290        # we're holding the lock.
291        assert self._cache_lock.locked()
292        cache = self._cache
293        cache.rotate(-start)
294        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
295        cache.rotate(start)
296        return slc
297
298    @classmethod
299    def _is_immutable_log_data(cls, data: Any) -> bool:
300        if isinstance(data, (str, bool, int, float, bytes)):
301            return True
302        if isinstance(data, tuple):
303            return all(cls._is_immutable_log_data(x) for x in data)
304        return False
305
306    def call_in_thread(self, call: Callable[[], Any]) -> None:
307        """Submit a call to be run in the logging background thread."""
308        self._event_loop.call_soon_threadsafe(call)
309
310    @override
311    def emit(self, record: logging.LogRecord) -> None:
312        # pylint: disable=too-many-branches
313        if __debug__:
314            starttime = time.monotonic()
315
316        # Called by logging to send us records.
317
318        # TODO - kill this.
319        if (
320            self._suppress_non_root_debug
321            and record.name != 'root'
322            and record.levelname == 'DEBUG'
323        ):
324            return
325
326        # Optimization: if our log args are all simple immutable values,
327        # we can just kick the whole thing over to our background thread to
328        # be formatted there at our leisure. If anything is mutable and
329        # thus could possibly change between now and then or if we want
330        # to do immediate file echoing then we need to bite the bullet
331        # and do that stuff here at the call site.
332        fast_path = self._echofile is None and self._is_immutable_log_data(
333            record.args
334        )
335
336        # Note: just assuming types are correct here, but they'll be
337        # checked properly when the resulting LogEntry gets exported.
338        labels: dict[str, str] | None = getattr(record, 'labels', None)
339        if labels is None:
340            labels = {}
341
342        if fast_path:
343            if __debug__:
344                formattime = echotime = time.monotonic()
345            self._event_loop.call_soon_threadsafe(
346                partial(
347                    self._emit_in_thread,
348                    record.name,
349                    record.levelno,
350                    record.created,
351                    record,
352                    labels,
353                )
354            )
355        else:
356            # Slow case; do formatting and echoing here at the log call
357            # site.
358            msg = self.format(record)
359
360            if __debug__:
361                formattime = time.monotonic()
362
363            # Also immediately print pretty colored output to our echo file
364            # (generally stderr). We do this part here instead of in our bg
365            # thread because the delay can throw off command line prompts or
366            # make tight debugging harder.
367            if self._echofile is not None:
368                # try:
369                # if self._report_blocking_io_on_echo_error:
370                #     premsg = (
371                #         'WARNING: BlockingIOError ON LOG ECHO OUTPUT;'
372                #         ' YOU ARE PROBABLY MISSING LOGS\n'
373                #     )
374                #     self._report_blocking_io_on_echo_error = False
375                # else:
376                #     premsg = ''
377                ends = LEVELNO_COLOR_CODES.get(record.levelno)
378                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
379                if ends is not None:
380                    self._echofile.write(
381                        f'{namepre}{ends[0]}'
382                        f'{msg}{ends[1]}\n'
383                        # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n'
384                    )
385                else:
386                    self._echofile.write(f'{namepre}{msg}\n')
387                self._echofile.flush()
388                # except BlockingIOError:
389                #     # Ran into this when doing a bunch of logging; assuming
390                #     # this is asyncio's doing?.. For now trying to survive
391                #     # the error but telling the user something is probably
392                #     # missing in their output.
393                #     self._report_blocking_io_on_echo_error = True
394
395            if __debug__:
396                echotime = time.monotonic()
397
398            self._event_loop.call_soon_threadsafe(
399                partial(
400                    self._emit_in_thread,
401                    record.name,
402                    record.levelno,
403                    record.created,
404                    msg,
405                    labels,
406                )
407            )
408
409        if __debug__:
410            # pylint: disable=used-before-assignment
411            # Make noise if we're taking a significant amount of time here.
412            # Limit the noise to once every so often though; otherwise we
413            # could get a feedback loop where every log emit results in a
414            # warning log which results in another, etc.
415            now = time.monotonic()
416            # noinspection PyUnboundLocalVariable
417            duration = now - starttime  # pyright: ignore
418            # noinspection PyUnboundLocalVariable
419            format_duration = formattime - starttime  # pyright: ignore
420            # noinspection PyUnboundLocalVariable
421            echo_duration = echotime - formattime  # pyright: ignore
422            if duration > 0.05 and (
423                self._last_slow_emit_warning_time is None
424                or now > self._last_slow_emit_warning_time + 10.0
425            ):
426                # Logging calls from *within* a logging handler
427                # sounds sketchy, so let's just kick this over to
428                # the bg event loop thread we've already got.
429                self._last_slow_emit_warning_time = now
430                self._event_loop.call_soon_threadsafe(
431                    partial(
432                        logging.warning,
433                        'efro.log.LogHandler emit took too long'
434                        ' (%.2fs total; %.2fs format, %.2fs echo,'
435                        ' fast_path=%s).',
436                        duration,
437                        format_duration,
438                        echo_duration,
439                        fast_path,
440                    )
441                )
442
443    def _emit_in_thread(
444        self,
445        name: str,
446        levelno: int,
447        created: float,
448        message: str | logging.LogRecord,
449        labels: dict[str, str],
450    ) -> None:
451        try:
452            # If they passed a raw record here, bake it down to a string.
453            if isinstance(message, logging.LogRecord):
454                message = self.format(message)
455
456            self._emit_entry(
457                LogEntry(
458                    name=name,
459                    message=message,
460                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
461                    time=datetime.datetime.fromtimestamp(
462                        created, datetime.timezone.utc
463                    ),
464                    labels=labels,
465                )
466            )
467        except Exception:
468            import traceback
469
470            traceback.print_exc(file=self._echofile)
471
472    def file_write(self, name: str, output: str) -> None:
473        """Send raw stdout/stderr output to the logger to be collated."""
474
475        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
476        # lines in stack traces get written as lots of individual '^'
477        # writes. It feels a bit dirty to be pushing a deferred call to
478        # another thread for each character. Perhaps should do some sort
479        # of basic accumulation here?
480        self._event_loop.call_soon_threadsafe(
481            partial(self._file_write_in_thread, name, output)
482        )
483
484    def _file_write_in_thread(self, name: str, output: str) -> None:
485        try:
486            assert name in ('stdout', 'stderr')
487
488            # Here we try to be somewhat smart about breaking arbitrary
489            # print output into discrete log entries.
490
491            self._file_chunks[name].append(output)
492
493            # Individual parts of a print come across as separate writes,
494            # and the end of a print will be a standalone '\n' by default.
495            # Let's use that as a hint that we're likely at the end of
496            # a full print statement and ship what we've got.
497            if output == '\n':
498                self._ship_file_chunks(name, cancel_ship_task=True)
499            else:
500                # By default just keep adding chunks.
501                # However we keep a timer running anytime we've got
502                # unshipped chunks so that we can ship what we've got
503                # after a short bit if we never get a newline.
504                ship_task = self._file_chunk_ship_task[name]
505                if ship_task is None:
506                    self._file_chunk_ship_task[name] = (
507                        self._event_loop.create_task(
508                            self._ship_chunks_task(name),
509                            name='log ship file chunks',
510                        )
511                    )
512
513        except Exception:
514            import traceback
515
516            traceback.print_exc(file=self._echofile)
517
518    def shutdown(self) -> None:
519        """Block until all pending logs/prints are done."""
520        done = False
521        self.file_flush('stdout')
522        self.file_flush('stderr')
523
524        def _set_done() -> None:
525            nonlocal done
526            done = True
527
528        self._event_loop.call_soon_threadsafe(_set_done)
529
530        starttime = time.monotonic()
531        while not done:
532            if time.monotonic() - starttime > 5.0:
533                print('LogHandler shutdown hung!!!', file=sys.stderr)
534                break
535            time.sleep(0.01)
536
537    def file_flush(self, name: str) -> None:
538        """Send raw stdout/stderr flush to the logger to be collated."""
539
540        self._event_loop.call_soon_threadsafe(
541            partial(self._file_flush_in_thread, name)
542        )
543
544    def _file_flush_in_thread(self, name: str) -> None:
545        try:
546            assert name in ('stdout', 'stderr')
547
548            # Immediately ship whatever chunks we've got.
549            if self._file_chunks[name]:
550                self._ship_file_chunks(name, cancel_ship_task=True)
551
552        except Exception:
553            import traceback
554
555            traceback.print_exc(file=self._echofile)
556
557    async def _ship_chunks_task(self, name: str) -> None:
558        # Note: it's important we sleep here for a moment. Otherwise,
559        # things like '^^^^^^^^^^^^' lines in stack traces, which come
560        # through as lots of individual '^' writes, tend to get broken
561        # into lots of tiny little lines by us.
562        await asyncio.sleep(0.01)
563        self._ship_file_chunks(name, cancel_ship_task=False)
564
565    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
566        # Note: Raw print input generally ends in a newline, but that is
567        # redundant when we break things into log entries and results in
568        # extra empty lines. So strip off a single trailing newline if
569        # one is present.
570        text = ''.join(self._file_chunks[name]).removesuffix('\n')
571
572        self._emit_entry(
573            LogEntry(
574                name=name, message=text, level=LogLevel.INFO, time=utc_now()
575            )
576        )
577        self._file_chunks[name] = []
578        ship_task = self._file_chunk_ship_task[name]
579        if cancel_ship_task and ship_task is not None:
580            ship_task.cancel()
581        self._file_chunk_ship_task[name] = None
582
583    def _emit_entry(self, entry: LogEntry) -> None:
584        assert current_thread() is self._thread
585
586        # Store to our cache.
587        if self._cache_size_limit > 0:
588            with self._cache_lock:
589                # Do a rough calc of how many bytes this entry consumes.
590                entry_size = sum(
591                    sys.getsizeof(x)
592                    for x in (
593                        entry,
594                        entry.name,
595                        entry.message,
596                        entry.level,
597                        entry.time,
598                    )
599                )
600                self._cache.append((entry_size, entry))
601                self._cache_size += entry_size
602
603                # Prune old until we are back at or under our limit.
604                while self._cache_size > self._cache_size_limit:
605                    popped = self._cache.popleft()
606                    self._cache_size -= popped[0]
607                    self._cache_index_offset += 1
608
609        # Pass to callbacks.
610        for call in self._callbacks:
611            self._run_callback_on_entry(call, entry)
612
613        # Dump to our structured log file.
614        # TODO: should set a timer for flushing; don't flush every line.
615        if self._file is not None:
616            entry_s = dataclass_to_json(entry)
617            assert '\n' not in entry_s  # Make sure its a single line.
618            print(entry_s, file=self._file, flush=True)
619
620    def _run_callback_on_entry(
621        self, callback: Callable[[LogEntry], None], entry: LogEntry
622    ) -> None:
623        """Run a callback and handle any errors."""
624        try:
625            callback(entry)
626        except Exception:
627            # Only print the first callback error to avoid insanity.
628            if not self._printed_callback_error:
629                import traceback
630
631                traceback.print_exc(file=self._echofile)
632                self._printed_callback_error = True

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

LogHandler( path: str | pathlib.Path | None, echofile: typing.TextIO | None, suppress_non_root_debug: bool, cache_size_limit: int, cache_time_limit: datetime.timedelta | None)
128    def __init__(
129        self,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def add_callback( self, call: Callable[[LogEntry], NoneType], feed_existing_logs: bool = False) -> None:
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            partial(self._add_callback_in_thread, call, feed_existing_logs)
183        )

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

def get_cached( self, start_index: int = 0, max_entries: int | None = None) -> LogArchive:
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

def call_in_thread(self, call: Callable[[], Any]) -> None:
306    def call_in_thread(self, call: Callable[[], Any]) -> None:
307        """Submit a call to be run in the logging background thread."""
308        self._event_loop.call_soon_threadsafe(call)

Submit a call to be run in the logging background thread.

@override
def emit(self, record: logging.LogRecord) -> None:
310    @override
311    def emit(self, record: logging.LogRecord) -> None:
312        # pylint: disable=too-many-branches
313        if __debug__:
314            starttime = time.monotonic()
315
316        # Called by logging to send us records.
317
318        # TODO - kill this.
319        if (
320            self._suppress_non_root_debug
321            and record.name != 'root'
322            and record.levelname == 'DEBUG'
323        ):
324            return
325
326        # Optimization: if our log args are all simple immutable values,
327        # we can just kick the whole thing over to our background thread to
328        # be formatted there at our leisure. If anything is mutable and
329        # thus could possibly change between now and then or if we want
330        # to do immediate file echoing then we need to bite the bullet
331        # and do that stuff here at the call site.
332        fast_path = self._echofile is None and self._is_immutable_log_data(
333            record.args
334        )
335
336        # Note: just assuming types are correct here, but they'll be
337        # checked properly when the resulting LogEntry gets exported.
338        labels: dict[str, str] | None = getattr(record, 'labels', None)
339        if labels is None:
340            labels = {}
341
342        if fast_path:
343            if __debug__:
344                formattime = echotime = time.monotonic()
345            self._event_loop.call_soon_threadsafe(
346                partial(
347                    self._emit_in_thread,
348                    record.name,
349                    record.levelno,
350                    record.created,
351                    record,
352                    labels,
353                )
354            )
355        else:
356            # Slow case; do formatting and echoing here at the log call
357            # site.
358            msg = self.format(record)
359
360            if __debug__:
361                formattime = time.monotonic()
362
363            # Also immediately print pretty colored output to our echo file
364            # (generally stderr). We do this part here instead of in our bg
365            # thread because the delay can throw off command line prompts or
366            # make tight debugging harder.
367            if self._echofile is not None:
368                # try:
369                # if self._report_blocking_io_on_echo_error:
370                #     premsg = (
371                #         'WARNING: BlockingIOError ON LOG ECHO OUTPUT;'
372                #         ' YOU ARE PROBABLY MISSING LOGS\n'
373                #     )
374                #     self._report_blocking_io_on_echo_error = False
375                # else:
376                #     premsg = ''
377                ends = LEVELNO_COLOR_CODES.get(record.levelno)
378                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
379                if ends is not None:
380                    self._echofile.write(
381                        f'{namepre}{ends[0]}'
382                        f'{msg}{ends[1]}\n'
383                        # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n'
384                    )
385                else:
386                    self._echofile.write(f'{namepre}{msg}\n')
387                self._echofile.flush()
388                # except BlockingIOError:
389                #     # Ran into this when doing a bunch of logging; assuming
390                #     # this is asyncio's doing?.. For now trying to survive
391                #     # the error but telling the user something is probably
392                #     # missing in their output.
393                #     self._report_blocking_io_on_echo_error = True
394
395            if __debug__:
396                echotime = time.monotonic()
397
398            self._event_loop.call_soon_threadsafe(
399                partial(
400                    self._emit_in_thread,
401                    record.name,
402                    record.levelno,
403                    record.created,
404                    msg,
405                    labels,
406                )
407            )
408
409        if __debug__:
410            # pylint: disable=used-before-assignment
411            # Make noise if we're taking a significant amount of time here.
412            # Limit the noise to once every so often though; otherwise we
413            # could get a feedback loop where every log emit results in a
414            # warning log which results in another, etc.
415            now = time.monotonic()
416            # noinspection PyUnboundLocalVariable
417            duration = now - starttime  # pyright: ignore
418            # noinspection PyUnboundLocalVariable
419            format_duration = formattime - starttime  # pyright: ignore
420            # noinspection PyUnboundLocalVariable
421            echo_duration = echotime - formattime  # pyright: ignore
422            if duration > 0.05 and (
423                self._last_slow_emit_warning_time is None
424                or now > self._last_slow_emit_warning_time + 10.0
425            ):
426                # Logging calls from *within* a logging handler
427                # sounds sketchy, so let's just kick this over to
428                # the bg event loop thread we've already got.
429                self._last_slow_emit_warning_time = now
430                self._event_loop.call_soon_threadsafe(
431                    partial(
432                        logging.warning,
433                        'efro.log.LogHandler emit took too long'
434                        ' (%.2fs total; %.2fs format, %.2fs echo,'
435                        ' fast_path=%s).',
436                        duration,
437                        format_duration,
438                        echo_duration,
439                        fast_path,
440                    )
441                )

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

def file_write(self, name: str, output: str) -> None:
472    def file_write(self, name: str, output: str) -> None:
473        """Send raw stdout/stderr output to the logger to be collated."""
474
475        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
476        # lines in stack traces get written as lots of individual '^'
477        # writes. It feels a bit dirty to be pushing a deferred call to
478        # another thread for each character. Perhaps should do some sort
479        # of basic accumulation here?
480        self._event_loop.call_soon_threadsafe(
481            partial(self._file_write_in_thread, name, output)
482        )

Send raw stdout/stderr output to the logger to be collated.

def shutdown(self) -> None:
518    def shutdown(self) -> None:
519        """Block until all pending logs/prints are done."""
520        done = False
521        self.file_flush('stdout')
522        self.file_flush('stderr')
523
524        def _set_done() -> None:
525            nonlocal done
526            done = True
527
528        self._event_loop.call_soon_threadsafe(_set_done)
529
530        starttime = time.monotonic()
531        while not done:
532            if time.monotonic() - starttime > 5.0:
533                print('LogHandler shutdown hung!!!', file=sys.stderr)
534                break
535            time.sleep(0.01)

Block until all pending logs/prints are done.

def file_flush(self, name: str) -> None:
537    def file_flush(self, name: str) -> None:
538        """Send raw stdout/stderr flush to the logger to be collated."""
539
540        self._event_loop.call_soon_threadsafe(
541            partial(self._file_flush_in_thread, name)
542        )

Send raw stdout/stderr flush to the logger to be collated.

Inherited Members
logging.Handler
level
formatter
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
close
handleError
logging.Filterer
filters
addFilter
removeFilter
filter
class FileLogEcho:
635class FileLogEcho:
636    """A file-like object for forwarding stdout/stderr to a LogHandler."""
637
638    def __init__(
639        self, original: TextIO, name: str, handler: LogHandler
640    ) -> None:
641        assert name in ('stdout', 'stderr')
642        self._original = original
643        self._name = name
644        self._handler = handler
645
646        # Think this was a result of setting non-blocking stdin somehow;
647        # probably not needed.
648        # self._report_blocking_io_error = False
649
650    def write(self, output: Any) -> None:
651        """Override standard write call."""
652        # try:
653        # if self._report_blocking_io_error:
654        #     self._report_blocking_io_error = False
655        #     self._original.write(
656        #         'WARNING: BlockingIOError ENCOUNTERED;'
657        #         ' OUTPUT IS PROBABLY MISSING'
658        #     )
659
660        self._original.write(output)
661        # except BlockingIOError:
662        #     self._report_blocking_io_error = True
663        self._handler.file_write(self._name, output)
664
665    def flush(self) -> None:
666        """Flush the file."""
667        self._original.flush()
668
669        # We also use this as a hint to ship whatever file chunks
670        # we've accumulated (we have to try and be smart about breaking
671        # our arbitrary file output into discrete entries).
672        self._handler.file_flush(self._name)
673
674    def isatty(self) -> bool:
675        """Are we a terminal?"""
676        return self._original.isatty()

A file-like object for forwarding stdout/stderr to a LogHandler.

FileLogEcho(original: <class 'TextIO'>, name: str, handler: LogHandler)
638    def __init__(
639        self, original: TextIO, name: str, handler: LogHandler
640    ) -> None:
641        assert name in ('stdout', 'stderr')
642        self._original = original
643        self._name = name
644        self._handler = handler
645
646        # Think this was a result of setting non-blocking stdin somehow;
647        # probably not needed.
648        # self._report_blocking_io_error = False
def write(self, output: Any) -> None:
650    def write(self, output: Any) -> None:
651        """Override standard write call."""
652        # try:
653        # if self._report_blocking_io_error:
654        #     self._report_blocking_io_error = False
655        #     self._original.write(
656        #         'WARNING: BlockingIOError ENCOUNTERED;'
657        #         ' OUTPUT IS PROBABLY MISSING'
658        #     )
659
660        self._original.write(output)
661        # except BlockingIOError:
662        #     self._report_blocking_io_error = True
663        self._handler.file_write(self._name, output)

Override standard write call.

def flush(self) -> None:
665    def flush(self) -> None:
666        """Flush the file."""
667        self._original.flush()
668
669        # We also use this as a hint to ship whatever file chunks
670        # we've accumulated (we have to try and be smart about breaking
671        # our arbitrary file output into discrete entries).
672        self._handler.file_flush(self._name)

Flush the file.

def isatty(self) -> bool:
674    def isatty(self) -> bool:
675        """Are we a terminal?"""
676        return self._original.isatty()

Are we a terminal?

def setup_logging( log_path: str | pathlib.Path | None, level: LogLevel, suppress_non_root_debug: bool = False, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None) -> LogHandler:
679def setup_logging(
680    log_path: str | Path | None,
681    level: LogLevel,
682    suppress_non_root_debug: bool = False,
683    log_stdout_stderr: bool = False,
684    echo_to_stderr: bool = True,
685    cache_size_limit: int = 0,
686    cache_time_limit: datetime.timedelta | None = None,
687) -> LogHandler:
688    """Set up our logging environment.
689
690    Returns the custom handler which can be used to fetch information
691    about logs that have passed through it. (worst log-levels, caches, etc.).
692    """
693
694    lmap = {
695        LogLevel.DEBUG: logging.DEBUG,
696        LogLevel.INFO: logging.INFO,
697        LogLevel.WARNING: logging.WARNING,
698        LogLevel.ERROR: logging.ERROR,
699        LogLevel.CRITICAL: logging.CRITICAL,
700    }
701
702    # Wire logger output to go to a structured log file.
703    # Also echo it to stderr IF we're running in a terminal.
704    # UPDATE: Actually gonna always go to stderr. Is there a
705    # reason we shouldn't? This makes debugging possible if all
706    # we have is access to a non-interactive terminal or file dump.
707    # We could add a '--quiet' arg or whatnot to change this behavior.
708
709    # Note: by passing in the *original* stderr here before we
710    # (potentially) replace it, we ensure that our log echos
711    # won't themselves be intercepted and sent to the logger
712    # which would create an infinite loop.
713    loghandler = LogHandler(
714        log_path,
715        echofile=sys.stderr if echo_to_stderr else None,
716        suppress_non_root_debug=suppress_non_root_debug,
717        cache_size_limit=cache_size_limit,
718        cache_time_limit=cache_time_limit,
719    )
720
721    # Note: going ahead with force=True here so that we replace any
722    # existing logger. Though we warn if it looks like we are doing
723    # that so we can try to avoid creating the first one.
724    had_previous_handlers = bool(logging.root.handlers)
725    logging.basicConfig(
726        level=lmap[level],
727        # format='%(name)s: %(message)s',
728        # We dump *only* the message here. We pass various log record bits
729        # around and format things fancier where they end up.
730        format='%(message)s',
731        handlers=[loghandler],
732        force=True,
733    )
734    if had_previous_handlers:
735        logging.warning(
736            'setup_logging: Replacing existing handlers.'
737            ' Something may have logged before expected.'
738        )
739
740    # Optionally intercept Python's stdout/stderr output and generate
741    # log entries from it.
742    if log_stdout_stderr:
743        sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler)
744        sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler)
745
746    return loghandler

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).