efro.log

Logging functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Logging functionality."""
  4from __future__ import annotations
  5
  6import sys
  7import time
  8import asyncio
  9import logging
 10import datetime
 11import itertools
 12from enum import Enum
 13from functools import partial
 14from collections import deque
 15from dataclasses import dataclass, field
 16from typing import TYPE_CHECKING, Annotated, override
 17from threading import Thread, current_thread, Lock
 18
 19from efro.util import utc_now
 20from efro.terminal import Clr
 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
 22
 23if TYPE_CHECKING:
 24    from pathlib import Path
 25    from typing import Any, Callable, TextIO
 26
 27
 28class LogLevel(Enum):
 29    """Severity level for a log entry.
 30
 31    These enums have numeric values so they can be compared in severity.
 32    Note that these values are not currently interchangeable with the
 33    logging.ERROR, logging.DEBUG, etc. values.
 34    """
 35
 36    DEBUG = 0
 37    INFO = 1
 38    WARNING = 2
 39    ERROR = 3
 40    CRITICAL = 4
 41
 42    @property
 43    def python_logging_level(self) -> int:
 44        """Give the corresponding logging level."""
 45        return LOG_LEVEL_LEVELNOS[self]
 46
 47    @classmethod
 48    def from_python_logging_level(cls, levelno: int) -> LogLevel:
 49        """Given a Python logging level, return a LogLevel."""
 50        return LEVELNO_LOG_LEVELS[levelno]
 51
 52
 53# Python logging levels from LogLevels
 54LOG_LEVEL_LEVELNOS = {
 55    LogLevel.DEBUG: logging.DEBUG,
 56    LogLevel.INFO: logging.INFO,
 57    LogLevel.WARNING: logging.WARNING,
 58    LogLevel.ERROR: logging.ERROR,
 59    LogLevel.CRITICAL: logging.CRITICAL,
 60}
 61
 62# LogLevels from Python logging levels
 63LEVELNO_LOG_LEVELS = {
 64    logging.DEBUG: LogLevel.DEBUG,
 65    logging.INFO: LogLevel.INFO,
 66    logging.WARNING: LogLevel.WARNING,
 67    logging.ERROR: LogLevel.ERROR,
 68    logging.CRITICAL: LogLevel.CRITICAL,
 69}
 70
 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {
 72    logging.DEBUG: (Clr.CYN, Clr.RST),
 73    logging.INFO: ('', ''),
 74    logging.WARNING: (Clr.YLW, Clr.RST),
 75    logging.ERROR: (Clr.RED, Clr.RST),
 76    logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST),
 77}
 78
 79
 80@ioprepped
 81@dataclass
 82class LogEntry:
 83    """Single logged message."""
 84
 85    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
 86    message: Annotated[str, IOAttrs('m')]
 87    level: Annotated[LogLevel, IOAttrs('l')]
 88    time: Annotated[datetime.datetime, IOAttrs('t')]
 89
 90    # We support arbitrary string labels per log entry which can be
 91    # incorporated into custom log processing. To populate this, our
 92    # LogHandler class looks for a 'labels' dict passed in the optional
 93    # 'extra' dict arg to standard Python log calls.
 94    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
 95        field(default_factory=dict)
 96    )
 97
 98
 99@ioprepped
100@dataclass
101class LogArchive:
102    """Info and data for a log."""
103
104    # Total number of entries submitted to the log.
105    log_size: Annotated[int, IOAttrs('t')]
106
107    # Offset for the entries contained here.
108    # (10 means our first entry is the 10th in the log, etc.)
109    start_index: Annotated[int, IOAttrs('c')]
110
111    entries: Annotated[list[LogEntry], IOAttrs('e')]
112
113
114class LogHandler(logging.Handler):
115    """Fancy-pants handler for logging output.
116
117    Writes logs to disk in structured json format and echoes them
118    to stdout/stderr with pretty colors.
119    """
120
121    _event_loop: asyncio.AbstractEventLoop
122
123    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
124    # Otherwise we can get infinite loops as those prints come back to us
125    # as new log entries.
126
127    def __init__(
128        self,
129        *,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        suppress_non_root_debug: bool,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135    ):
136        super().__init__()
137        # pylint: disable=consider-using-with
138        self._file = None if path is None else open(path, 'w', encoding='utf-8')
139        self._echofile = echofile
140        self._callbacks: list[Callable[[LogEntry], None]] = []
141        self._suppress_non_root_debug = suppress_non_root_debug
142        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
143        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
144            'stdout': None,
145            'stderr': None,
146        }
147        self._cache_size = 0
148        assert cache_size_limit >= 0
149        self._cache_size_limit = cache_size_limit
150        self._cache_time_limit = cache_time_limit
151        self._cache = deque[tuple[int, LogEntry]]()
152        self._cache_index_offset = 0
153        self._cache_lock = Lock()
154        self._printed_callback_error = False
155        self._thread_bootstrapped = False
156        self._thread = Thread(target=self._log_thread_main, daemon=True)
157        if __debug__:
158            self._last_slow_emit_warning_time: float | None = None
159        self._thread.start()
160
161        # Spin until our thread is up and running; otherwise we could
162        # wind up trying to push stuff to our event loop before the
163        # loop exists.
164        while not self._thread_bootstrapped:
165            time.sleep(0.001)
166
167    def add_callback(
168        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
169    ) -> None:
170        """Add a callback to be run for each LogEntry.
171
172        Note that this callback will always run in a background thread.
173        Passing True for feed_existing_logs will cause all cached logs
174        in the handler to be fed to the callback (still in the
175        background thread though).
176        """
177
178        # Kick this over to our bg thread to add the callback and
179        # process cached entries at the same time to ensure there are no
180        # race conditions that could cause entries to be skipped/etc.
181        self._event_loop.call_soon_threadsafe(
182            partial(self._add_callback_in_thread, call, feed_existing_logs)
183        )
184
185    def _add_callback_in_thread(
186        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
187    ) -> None:
188        """Add a callback to be run for each LogEntry.
189
190        Note that this callback will always run in a background thread.
191        Passing True for feed_existing_logs will cause all cached logs
192        in the handler to be fed to the callback (still in the
193        background thread though).
194        """
195        assert current_thread() is self._thread
196        self._callbacks.append(call)
197
198        # Run all of our cached entries through the new callback if desired.
199        if feed_existing_logs and self._cache_size_limit > 0:
200            with self._cache_lock:
201                for _id, entry in self._cache:
202                    self._run_callback_on_entry(call, entry)
203
204    def _log_thread_main(self) -> None:
205        self._event_loop = asyncio.new_event_loop()
206
207        # In our background thread event loop we do a fair amount of
208        # slow synchronous stuff such as mucking with the log cache.
209        # Let's avoid getting tons of warnings about this in debug mode.
210        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
211
212        # NOTE: if we ever use default threadpool at all we should allow
213        # setting it for our loop.
214        asyncio.set_event_loop(self._event_loop)
215        self._thread_bootstrapped = True
216        try:
217            if self._cache_time_limit is not None:
218                _prunetask = self._event_loop.create_task(
219                    self._time_prune_cache()
220                )
221            self._event_loop.run_forever()
222        except BaseException:
223            # If this ever goes down we're in trouble; we won't be able
224            # to log about it though. Try to make some noise however we
225            # can.
226            print('LogHandler died!!!', file=sys.stderr)
227            import traceback
228
229            traceback.print_exc()
230            raise
231
232    async def _time_prune_cache(self) -> None:
233        assert self._cache_time_limit is not None
234        while bool(True):
235            await asyncio.sleep(61.27)
236            now = utc_now()
237            with self._cache_lock:
238                # Prune the oldest entry as long as there is a first one that
239                # is too old.
240                while (
241                    self._cache
242                    and (now - self._cache[0][1].time) >= self._cache_time_limit
243                ):
244                    popped = self._cache.popleft()
245                    self._cache_size -= popped[0]
246                    self._cache_index_offset += 1
247
248    def get_cached(
249        self, start_index: int = 0, max_entries: int | None = None
250    ) -> LogArchive:
251        """Build and return an archive of cached log entries.
252
253        This will only include entries that have been processed by the
254        background thread, so may not include just-submitted logs or
255        entries for partially written stdout/stderr lines.
256        Entries from the range [start_index:start_index+max_entries]
257        which are still present in the cache will be returned.
258        """
259
260        assert start_index >= 0
261        if max_entries is not None:
262            assert max_entries >= 0
263        with self._cache_lock:
264            # Transform start_index to our present cache space.
265            start_index -= self._cache_index_offset
266            # Calc end-index in our present cache space.
267            end_index = (
268                len(self._cache)
269                if max_entries is None
270                else start_index + max_entries
271            )
272
273            # Clamp both indexes to both ends of our present space.
274            start_index = max(0, min(start_index, len(self._cache)))
275            end_index = max(0, min(end_index, len(self._cache)))
276
277            return LogArchive(
278                log_size=self._cache_index_offset + len(self._cache),
279                start_index=start_index + self._cache_index_offset,
280                entries=self._cache_slice(start_index, end_index),
281            )
282
283    def _cache_slice(
284        self, start: int, end: int, step: int = 1
285    ) -> list[LogEntry]:
286        # Deque doesn't natively support slicing but we can do it manually.
287        # It sounds like rotating the deque and pulling from the beginning
288        # is the most efficient way to do this. The downside is the deque
289        # gets temporarily modified in the process so we need to make sure
290        # we're holding the lock.
291        assert self._cache_lock.locked()
292        cache = self._cache
293        cache.rotate(-start)
294        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
295        cache.rotate(start)
296        return slc
297
298    @classmethod
299    def _is_immutable_log_data(cls, data: Any) -> bool:
300        if isinstance(data, (str, bool, int, float, bytes)):
301            return True
302        if isinstance(data, tuple):
303            return all(cls._is_immutable_log_data(x) for x in data)
304        return False
305
306    def call_in_thread(self, call: Callable[[], Any]) -> None:
307        """Submit a call to be run in the logging background thread."""
308        self._event_loop.call_soon_threadsafe(call)
309
310    @override
311    def emit(self, record: logging.LogRecord) -> None:
312        # pylint: disable=too-many-branches
313        if __debug__:
314            starttime = time.monotonic()
315
316        # Called by logging to send us records.
317
318        # TODO - kill this.
319        if (
320            self._suppress_non_root_debug
321            and record.name != 'root'
322            and record.levelname == 'DEBUG'
323        ):
324            return
325
326        # Optimization: if our log args are all simple immutable values,
327        # we can just kick the whole thing over to our background thread to
328        # be formatted there at our leisure. If anything is mutable and
329        # thus could possibly change between now and then or if we want
330        # to do immediate file echoing then we need to bite the bullet
331        # and do that stuff here at the call site.
332        fast_path = self._echofile is None and self._is_immutable_log_data(
333            record.args
334        )
335
336        # Note: just assuming types are correct here, but they'll be
337        # checked properly when the resulting LogEntry gets exported.
338        labels: dict[str, str] | None = getattr(record, 'labels', None)
339        if labels is None:
340            labels = {}
341
342        if fast_path:
343            if __debug__:
344                formattime = echotime = time.monotonic()
345            self._event_loop.call_soon_threadsafe(
346                partial(
347                    self._emit_in_thread,
348                    record.name,
349                    record.levelno,
350                    record.created,
351                    record,
352                    labels,
353                )
354            )
355        else:
356            # Slow case; do formatting and echoing here at the log call
357            # site.
358            msg = self.format(record)
359
360            if __debug__:
361                formattime = time.monotonic()
362
363            # Also immediately print pretty colored output to our echo file
364            # (generally stderr). We do this part here instead of in our bg
365            # thread because the delay can throw off command line prompts or
366            # make tight debugging harder.
367            if self._echofile is not None:
368                # try:
369                # if self._report_blocking_io_on_echo_error:
370                #     premsg = (
371                #         'WARNING: BlockingIOError ON LOG ECHO OUTPUT;'
372                #         ' YOU ARE PROBABLY MISSING LOGS\n'
373                #     )
374                #     self._report_blocking_io_on_echo_error = False
375                # else:
376                #     premsg = ''
377                ends = LEVELNO_COLOR_CODES.get(record.levelno)
378                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
379                if ends is not None:
380                    self._echofile.write(
381                        f'{namepre}{ends[0]}'
382                        f'{msg}{ends[1]}\n'
383                        # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n'
384                    )
385                else:
386                    self._echofile.write(f'{namepre}{msg}\n')
387                self._echofile.flush()
388                # except BlockingIOError:
389                #     # Ran into this when doing a bunch of logging; assuming
390                #     # this is asyncio's doing?.. For now trying to survive
391                #     # the error but telling the user something is probably
392                #     # missing in their output.
393                #     self._report_blocking_io_on_echo_error = True
394
395            if __debug__:
396                echotime = time.monotonic()
397
398            self._event_loop.call_soon_threadsafe(
399                partial(
400                    self._emit_in_thread,
401                    record.name,
402                    record.levelno,
403                    record.created,
404                    msg,
405                    labels,
406                )
407            )
408
409        if __debug__:
410            # pylint: disable=used-before-assignment
411            # Make noise if we're taking a significant amount of time here.
412            # Limit the noise to once every so often though; otherwise we
413            # could get a feedback loop where every log emit results in a
414            # warning log which results in another, etc.
415            now = time.monotonic()
416            # noinspection PyUnboundLocalVariable
417            duration = now - starttime  # pyright: ignore
418            # noinspection PyUnboundLocalVariable
419            format_duration = formattime - starttime  # pyright: ignore
420            # noinspection PyUnboundLocalVariable
421            echo_duration = echotime - formattime  # pyright: ignore
422            if duration > 0.05 and (
423                self._last_slow_emit_warning_time is None
424                or now > self._last_slow_emit_warning_time + 10.0
425            ):
426                # Logging calls from *within* a logging handler
427                # sounds sketchy, so let's just kick this over to
428                # the bg event loop thread we've already got.
429                self._last_slow_emit_warning_time = now
430                self._event_loop.call_soon_threadsafe(
431                    partial(
432                        logging.warning,
433                        'efro.log.LogHandler emit took too long'
434                        ' (%.2fs total; %.2fs format, %.2fs echo,'
435                        ' fast_path=%s).',
436                        duration,
437                        format_duration,
438                        echo_duration,
439                        fast_path,
440                    )
441                )
442
443    def _emit_in_thread(
444        self,
445        name: str,
446        levelno: int,
447        created: float,
448        message: str | logging.LogRecord,
449        labels: dict[str, str],
450    ) -> None:
451        # pylint: disable=too-many-positional-arguments
452        try:
453            # If they passed a raw record here, bake it down to a string.
454            if isinstance(message, logging.LogRecord):
455                message = self.format(message)
456
457            self._emit_entry(
458                LogEntry(
459                    name=name,
460                    message=message,
461                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
462                    time=datetime.datetime.fromtimestamp(
463                        created, datetime.timezone.utc
464                    ),
465                    labels=labels,
466                )
467            )
468        except Exception:
469            import traceback
470
471            traceback.print_exc(file=self._echofile)
472
473    def file_write(self, name: str, output: str) -> None:
474        """Send raw stdout/stderr output to the logger to be collated."""
475
476        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
477        # lines in stack traces get written as lots of individual '^'
478        # writes. It feels a bit dirty to be pushing a deferred call to
479        # another thread for each character. Perhaps should do some sort
480        # of basic accumulation here?
481        self._event_loop.call_soon_threadsafe(
482            partial(self._file_write_in_thread, name, output)
483        )
484
485    def _file_write_in_thread(self, name: str, output: str) -> None:
486        try:
487            assert name in ('stdout', 'stderr')
488
489            # Here we try to be somewhat smart about breaking arbitrary
490            # print output into discrete log entries.
491
492            self._file_chunks[name].append(output)
493
494            # Individual parts of a print come across as separate writes,
495            # and the end of a print will be a standalone '\n' by default.
496            # Let's use that as a hint that we're likely at the end of
497            # a full print statement and ship what we've got.
498            if output == '\n':
499                self._ship_file_chunks(name, cancel_ship_task=True)
500            else:
501                # By default just keep adding chunks.
502                # However we keep a timer running anytime we've got
503                # unshipped chunks so that we can ship what we've got
504                # after a short bit if we never get a newline.
505                ship_task = self._file_chunk_ship_task[name]
506                if ship_task is None:
507                    self._file_chunk_ship_task[name] = (
508                        self._event_loop.create_task(
509                            self._ship_chunks_task(name),
510                            name='log ship file chunks',
511                        )
512                    )
513
514        except Exception:
515            import traceback
516
517            traceback.print_exc(file=self._echofile)
518
519    def shutdown(self) -> None:
520        """Block until all pending logs/prints are done."""
521        done = False
522        self.file_flush('stdout')
523        self.file_flush('stderr')
524
525        def _set_done() -> None:
526            nonlocal done
527            done = True
528
529        self._event_loop.call_soon_threadsafe(_set_done)
530
531        starttime = time.monotonic()
532        while not done:
533            if time.monotonic() - starttime > 5.0:
534                print('LogHandler shutdown hung!!!', file=sys.stderr)
535                break
536            time.sleep(0.01)
537
538    def file_flush(self, name: str) -> None:
539        """Send raw stdout/stderr flush to the logger to be collated."""
540
541        self._event_loop.call_soon_threadsafe(
542            partial(self._file_flush_in_thread, name)
543        )
544
545    def _file_flush_in_thread(self, name: str) -> None:
546        try:
547            assert name in ('stdout', 'stderr')
548
549            # Immediately ship whatever chunks we've got.
550            if self._file_chunks[name]:
551                self._ship_file_chunks(name, cancel_ship_task=True)
552
553        except Exception:
554            import traceback
555
556            traceback.print_exc(file=self._echofile)
557
558    async def _ship_chunks_task(self, name: str) -> None:
559        # Note: it's important we sleep here for a moment. Otherwise,
560        # things like '^^^^^^^^^^^^' lines in stack traces, which come
561        # through as lots of individual '^' writes, tend to get broken
562        # into lots of tiny little lines by us.
563        await asyncio.sleep(0.01)
564        self._ship_file_chunks(name, cancel_ship_task=False)
565
566    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
567        # Note: Raw print input generally ends in a newline, but that is
568        # redundant when we break things into log entries and results in
569        # extra empty lines. So strip off a single trailing newline if
570        # one is present.
571        text = ''.join(self._file_chunks[name]).removesuffix('\n')
572
573        self._emit_entry(
574            LogEntry(
575                name=name, message=text, level=LogLevel.INFO, time=utc_now()
576            )
577        )
578        self._file_chunks[name] = []
579        ship_task = self._file_chunk_ship_task[name]
580        if cancel_ship_task and ship_task is not None:
581            ship_task.cancel()
582        self._file_chunk_ship_task[name] = None
583
584    def _emit_entry(self, entry: LogEntry) -> None:
585        assert current_thread() is self._thread
586
587        # Store to our cache.
588        if self._cache_size_limit > 0:
589            with self._cache_lock:
590                # Do a rough calc of how many bytes this entry consumes.
591                entry_size = sum(
592                    sys.getsizeof(x)
593                    for x in (
594                        entry,
595                        entry.name,
596                        entry.message,
597                        entry.level,
598                        entry.time,
599                    )
600                )
601                self._cache.append((entry_size, entry))
602                self._cache_size += entry_size
603
604                # Prune old until we are back at or under our limit.
605                while self._cache_size > self._cache_size_limit:
606                    popped = self._cache.popleft()
607                    self._cache_size -= popped[0]
608                    self._cache_index_offset += 1
609
610        # Pass to callbacks.
611        for call in self._callbacks:
612            self._run_callback_on_entry(call, entry)
613
614        # Dump to our structured log file.
615        # TODO: should set a timer for flushing; don't flush every line.
616        if self._file is not None:
617            entry_s = dataclass_to_json(entry)
618            assert '\n' not in entry_s  # Make sure its a single line.
619            print(entry_s, file=self._file, flush=True)
620
621    def _run_callback_on_entry(
622        self, callback: Callable[[LogEntry], None], entry: LogEntry
623    ) -> None:
624        """Run a callback and handle any errors."""
625        try:
626            callback(entry)
627        except Exception:
628            # Only print the first callback error to avoid insanity.
629            if not self._printed_callback_error:
630                import traceback
631
632                traceback.print_exc(file=self._echofile)
633                self._printed_callback_error = True
634
635
636class FileLogEcho:
637    """A file-like object for forwarding stdout/stderr to a LogHandler."""
638
639    def __init__(
640        self, original: TextIO, name: str, handler: LogHandler
641    ) -> None:
642        assert name in ('stdout', 'stderr')
643        self._original = original
644        self._name = name
645        self._handler = handler
646
647        # Think this was a result of setting non-blocking stdin somehow;
648        # probably not needed.
649        # self._report_blocking_io_error = False
650
651    def write(self, output: Any) -> None:
652        """Override standard write call."""
653        # try:
654        # if self._report_blocking_io_error:
655        #     self._report_blocking_io_error = False
656        #     self._original.write(
657        #         'WARNING: BlockingIOError ENCOUNTERED;'
658        #         ' OUTPUT IS PROBABLY MISSING'
659        #     )
660
661        self._original.write(output)
662        # except BlockingIOError:
663        #     self._report_blocking_io_error = True
664        self._handler.file_write(self._name, output)
665
666    def flush(self) -> None:
667        """Flush the file."""
668        self._original.flush()
669
670        # We also use this as a hint to ship whatever file chunks
671        # we've accumulated (we have to try and be smart about breaking
672        # our arbitrary file output into discrete entries).
673        self._handler.file_flush(self._name)
674
675    def isatty(self) -> bool:
676        """Are we a terminal?"""
677        return self._original.isatty()
678
679
680def setup_logging(
681    log_path: str | Path | None,
682    level: LogLevel,
683    *,
684    suppress_non_root_debug: bool = False,
685    log_stdout_stderr: bool = False,
686    echo_to_stderr: bool = True,
687    cache_size_limit: int = 0,
688    cache_time_limit: datetime.timedelta | None = None,
689) -> LogHandler:
690    """Set up our logging environment.
691
692    Returns the custom handler which can be used to fetch information
693    about logs that have passed through it. (worst log-levels, caches, etc.).
694    """
695
696    lmap = {
697        LogLevel.DEBUG: logging.DEBUG,
698        LogLevel.INFO: logging.INFO,
699        LogLevel.WARNING: logging.WARNING,
700        LogLevel.ERROR: logging.ERROR,
701        LogLevel.CRITICAL: logging.CRITICAL,
702    }
703
704    # Wire logger output to go to a structured log file.
705    # Also echo it to stderr IF we're running in a terminal.
706    # UPDATE: Actually gonna always go to stderr. Is there a
707    # reason we shouldn't? This makes debugging possible if all
708    # we have is access to a non-interactive terminal or file dump.
709    # We could add a '--quiet' arg or whatnot to change this behavior.
710
711    # Note: by passing in the *original* stderr here before we
712    # (potentially) replace it, we ensure that our log echos
713    # won't themselves be intercepted and sent to the logger
714    # which would create an infinite loop.
715    loghandler = LogHandler(
716        path=log_path,
717        echofile=sys.stderr if echo_to_stderr else None,
718        suppress_non_root_debug=suppress_non_root_debug,
719        cache_size_limit=cache_size_limit,
720        cache_time_limit=cache_time_limit,
721    )
722
723    # Note: going ahead with force=True here so that we replace any
724    # existing logger. Though we warn if it looks like we are doing
725    # that so we can try to avoid creating the first one.
726    had_previous_handlers = bool(logging.root.handlers)
727    logging.basicConfig(
728        level=lmap[level],
729        # format='%(name)s: %(message)s',
730        # We dump *only* the message here. We pass various log record bits
731        # around and format things fancier where they end up.
732        format='%(message)s',
733        handlers=[loghandler],
734        force=True,
735    )
736    if had_previous_handlers:
737        logging.warning(
738            'setup_logging: Replacing existing handlers.'
739            ' Something may have logged before expected.'
740        )
741
742    # Optionally intercept Python's stdout/stderr output and generate
743    # log entries from it.
744    if log_stdout_stderr:
745        sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler)
746        sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler)
747
748    return loghandler
class LogLevel(enum.Enum):
29class LogLevel(Enum):
30    """Severity level for a log entry.
31
32    These enums have numeric values so they can be compared in severity.
33    Note that these values are not currently interchangeable with the
34    logging.ERROR, logging.DEBUG, etc. values.
35    """
36
37    DEBUG = 0
38    INFO = 1
39    WARNING = 2
40    ERROR = 3
41    CRITICAL = 4
42
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]
47
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

DEBUG = <LogLevel.DEBUG: 0>
INFO = <LogLevel.INFO: 1>
WARNING = <LogLevel.WARNING: 2>
ERROR = <LogLevel.ERROR: 3>
CRITICAL = <LogLevel.CRITICAL: 4>
python_logging_level: int
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]

Give the corresponding logging level.

@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Given a Python logging level, return a LogLevel.

Inherited Members
enum.Enum
name
value
LOG_LEVEL_LEVELNOS = {<LogLevel.DEBUG: 0>: 10, <LogLevel.INFO: 1>: 20, <LogLevel.WARNING: 2>: 30, <LogLevel.ERROR: 3>: 40, <LogLevel.CRITICAL: 4>: 50}
LEVELNO_LOG_LEVELS = {10: <LogLevel.DEBUG: 0>, 20: <LogLevel.INFO: 1>, 30: <LogLevel.WARNING: 2>, 40: <LogLevel.ERROR: 3>, 50: <LogLevel.CRITICAL: 4>}
LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {10: ('', ''), 20: ('', ''), 30: ('', ''), 40: ('', ''), 50: ('', '')}
@ioprepped
@dataclass
class LogEntry:
81@ioprepped
82@dataclass
83class LogEntry:
84    """Single logged message."""
85
86    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
87    message: Annotated[str, IOAttrs('m')]
88    level: Annotated[LogLevel, IOAttrs('l')]
89    time: Annotated[datetime.datetime, IOAttrs('t')]
90
91    # We support arbitrary string labels per log entry which can be
92    # incorporated into custom log processing. To populate this, our
93    # LogHandler class looks for a 'labels' dict passed in the optional
94    # 'extra' dict arg to standard Python log calls.
95    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
96        field(default_factory=dict)
97    )

Single logged message.

LogEntry( name: Annotated[str, <efro.dataclassio.IOAttrs object>], message: Annotated[str, <efro.dataclassio.IOAttrs object>], level: Annotated[LogLevel, <efro.dataclassio.IOAttrs object>], time: Annotated[datetime.datetime, <efro.dataclassio.IOAttrs object>], labels: Annotated[dict[str, str], <efro.dataclassio.IOAttrs object>] = <factory>)
name: Annotated[str, <efro.dataclassio.IOAttrs object at 0x116d17e30>]
message: Annotated[str, <efro.dataclassio.IOAttrs object at 0x116d16b70>]
level: Annotated[LogLevel, <efro.dataclassio.IOAttrs object at 0x116d14aa0>]
time: Annotated[datetime.datetime, <efro.dataclassio.IOAttrs object at 0x116d141d0>]
labels: Annotated[dict[str, str], <efro.dataclassio.IOAttrs object at 0x116d16180>]
@ioprepped
@dataclass
class LogArchive:
100@ioprepped
101@dataclass
102class LogArchive:
103    """Info and data for a log."""
104
105    # Total number of entries submitted to the log.
106    log_size: Annotated[int, IOAttrs('t')]
107
108    # Offset for the entries contained here.
109    # (10 means our first entry is the 10th in the log, etc.)
110    start_index: Annotated[int, IOAttrs('c')]
111
112    entries: Annotated[list[LogEntry], IOAttrs('e')]

Info and data for a log.

LogArchive( log_size: Annotated[int, <efro.dataclassio.IOAttrs object>], start_index: Annotated[int, <efro.dataclassio.IOAttrs object>], entries: Annotated[list[LogEntry], <efro.dataclassio.IOAttrs object>])
log_size: Annotated[int, <efro.dataclassio.IOAttrs object at 0x116d147a0>]
start_index: Annotated[int, <efro.dataclassio.IOAttrs object at 0x116d15370>]
entries: Annotated[list[LogEntry], <efro.dataclassio.IOAttrs object at 0x116d153d0>]
class LogHandler(logging.Handler):
115class LogHandler(logging.Handler):
116    """Fancy-pants handler for logging output.
117
118    Writes logs to disk in structured json format and echoes them
119    to stdout/stderr with pretty colors.
120    """
121
122    _event_loop: asyncio.AbstractEventLoop
123
124    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
125    # Otherwise we can get infinite loops as those prints come back to us
126    # as new log entries.
127
128    def __init__(
129        self,
130        *,
131        path: str | Path | None,
132        echofile: TextIO | None,
133        suppress_non_root_debug: bool,
134        cache_size_limit: int,
135        cache_time_limit: datetime.timedelta | None,
136    ):
137        super().__init__()
138        # pylint: disable=consider-using-with
139        self._file = None if path is None else open(path, 'w', encoding='utf-8')
140        self._echofile = echofile
141        self._callbacks: list[Callable[[LogEntry], None]] = []
142        self._suppress_non_root_debug = suppress_non_root_debug
143        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
144        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
145            'stdout': None,
146            'stderr': None,
147        }
148        self._cache_size = 0
149        assert cache_size_limit >= 0
150        self._cache_size_limit = cache_size_limit
151        self._cache_time_limit = cache_time_limit
152        self._cache = deque[tuple[int, LogEntry]]()
153        self._cache_index_offset = 0
154        self._cache_lock = Lock()
155        self._printed_callback_error = False
156        self._thread_bootstrapped = False
157        self._thread = Thread(target=self._log_thread_main, daemon=True)
158        if __debug__:
159            self._last_slow_emit_warning_time: float | None = None
160        self._thread.start()
161
162        # Spin until our thread is up and running; otherwise we could
163        # wind up trying to push stuff to our event loop before the
164        # loop exists.
165        while not self._thread_bootstrapped:
166            time.sleep(0.001)
167
168    def add_callback(
169        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
170    ) -> None:
171        """Add a callback to be run for each LogEntry.
172
173        Note that this callback will always run in a background thread.
174        Passing True for feed_existing_logs will cause all cached logs
175        in the handler to be fed to the callback (still in the
176        background thread though).
177        """
178
179        # Kick this over to our bg thread to add the callback and
180        # process cached entries at the same time to ensure there are no
181        # race conditions that could cause entries to be skipped/etc.
182        self._event_loop.call_soon_threadsafe(
183            partial(self._add_callback_in_thread, call, feed_existing_logs)
184        )
185
186    def _add_callback_in_thread(
187        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
188    ) -> None:
189        """Add a callback to be run for each LogEntry.
190
191        Note that this callback will always run in a background thread.
192        Passing True for feed_existing_logs will cause all cached logs
193        in the handler to be fed to the callback (still in the
194        background thread though).
195        """
196        assert current_thread() is self._thread
197        self._callbacks.append(call)
198
199        # Run all of our cached entries through the new callback if desired.
200        if feed_existing_logs and self._cache_size_limit > 0:
201            with self._cache_lock:
202                for _id, entry in self._cache:
203                    self._run_callback_on_entry(call, entry)
204
205    def _log_thread_main(self) -> None:
206        self._event_loop = asyncio.new_event_loop()
207
208        # In our background thread event loop we do a fair amount of
209        # slow synchronous stuff such as mucking with the log cache.
210        # Let's avoid getting tons of warnings about this in debug mode.
211        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
212
213        # NOTE: if we ever use default threadpool at all we should allow
214        # setting it for our loop.
215        asyncio.set_event_loop(self._event_loop)
216        self._thread_bootstrapped = True
217        try:
218            if self._cache_time_limit is not None:
219                _prunetask = self._event_loop.create_task(
220                    self._time_prune_cache()
221                )
222            self._event_loop.run_forever()
223        except BaseException:
224            # If this ever goes down we're in trouble; we won't be able
225            # to log about it though. Try to make some noise however we
226            # can.
227            print('LogHandler died!!!', file=sys.stderr)
228            import traceback
229
230            traceback.print_exc()
231            raise
232
233    async def _time_prune_cache(self) -> None:
234        assert self._cache_time_limit is not None
235        while bool(True):
236            await asyncio.sleep(61.27)
237            now = utc_now()
238            with self._cache_lock:
239                # Prune the oldest entry as long as there is a first one that
240                # is too old.
241                while (
242                    self._cache
243                    and (now - self._cache[0][1].time) >= self._cache_time_limit
244                ):
245                    popped = self._cache.popleft()
246                    self._cache_size -= popped[0]
247                    self._cache_index_offset += 1
248
249    def get_cached(
250        self, start_index: int = 0, max_entries: int | None = None
251    ) -> LogArchive:
252        """Build and return an archive of cached log entries.
253
254        This will only include entries that have been processed by the
255        background thread, so may not include just-submitted logs or
256        entries for partially written stdout/stderr lines.
257        Entries from the range [start_index:start_index+max_entries]
258        which are still present in the cache will be returned.
259        """
260
261        assert start_index >= 0
262        if max_entries is not None:
263            assert max_entries >= 0
264        with self._cache_lock:
265            # Transform start_index to our present cache space.
266            start_index -= self._cache_index_offset
267            # Calc end-index in our present cache space.
268            end_index = (
269                len(self._cache)
270                if max_entries is None
271                else start_index + max_entries
272            )
273
274            # Clamp both indexes to both ends of our present space.
275            start_index = max(0, min(start_index, len(self._cache)))
276            end_index = max(0, min(end_index, len(self._cache)))
277
278            return LogArchive(
279                log_size=self._cache_index_offset + len(self._cache),
280                start_index=start_index + self._cache_index_offset,
281                entries=self._cache_slice(start_index, end_index),
282            )
283
284    def _cache_slice(
285        self, start: int, end: int, step: int = 1
286    ) -> list[LogEntry]:
287        # Deque doesn't natively support slicing but we can do it manually.
288        # It sounds like rotating the deque and pulling from the beginning
289        # is the most efficient way to do this. The downside is the deque
290        # gets temporarily modified in the process so we need to make sure
291        # we're holding the lock.
292        assert self._cache_lock.locked()
293        cache = self._cache
294        cache.rotate(-start)
295        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
296        cache.rotate(start)
297        return slc
298
299    @classmethod
300    def _is_immutable_log_data(cls, data: Any) -> bool:
301        if isinstance(data, (str, bool, int, float, bytes)):
302            return True
303        if isinstance(data, tuple):
304            return all(cls._is_immutable_log_data(x) for x in data)
305        return False
306
307    def call_in_thread(self, call: Callable[[], Any]) -> None:
308        """Submit a call to be run in the logging background thread."""
309        self._event_loop.call_soon_threadsafe(call)
310
311    @override
312    def emit(self, record: logging.LogRecord) -> None:
313        # pylint: disable=too-many-branches
314        if __debug__:
315            starttime = time.monotonic()
316
317        # Called by logging to send us records.
318
319        # TODO - kill this.
320        if (
321            self._suppress_non_root_debug
322            and record.name != 'root'
323            and record.levelname == 'DEBUG'
324        ):
325            return
326
327        # Optimization: if our log args are all simple immutable values,
328        # we can just kick the whole thing over to our background thread to
329        # be formatted there at our leisure. If anything is mutable and
330        # thus could possibly change between now and then or if we want
331        # to do immediate file echoing then we need to bite the bullet
332        # and do that stuff here at the call site.
333        fast_path = self._echofile is None and self._is_immutable_log_data(
334            record.args
335        )
336
337        # Note: just assuming types are correct here, but they'll be
338        # checked properly when the resulting LogEntry gets exported.
339        labels: dict[str, str] | None = getattr(record, 'labels', None)
340        if labels is None:
341            labels = {}
342
343        if fast_path:
344            if __debug__:
345                formattime = echotime = time.monotonic()
346            self._event_loop.call_soon_threadsafe(
347                partial(
348                    self._emit_in_thread,
349                    record.name,
350                    record.levelno,
351                    record.created,
352                    record,
353                    labels,
354                )
355            )
356        else:
357            # Slow case; do formatting and echoing here at the log call
358            # site.
359            msg = self.format(record)
360
361            if __debug__:
362                formattime = time.monotonic()
363
364            # Also immediately print pretty colored output to our echo file
365            # (generally stderr). We do this part here instead of in our bg
366            # thread because the delay can throw off command line prompts or
367            # make tight debugging harder.
368            if self._echofile is not None:
369                # try:
370                # if self._report_blocking_io_on_echo_error:
371                #     premsg = (
372                #         'WARNING: BlockingIOError ON LOG ECHO OUTPUT;'
373                #         ' YOU ARE PROBABLY MISSING LOGS\n'
374                #     )
375                #     self._report_blocking_io_on_echo_error = False
376                # else:
377                #     premsg = ''
378                ends = LEVELNO_COLOR_CODES.get(record.levelno)
379                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
380                if ends is not None:
381                    self._echofile.write(
382                        f'{namepre}{ends[0]}'
383                        f'{msg}{ends[1]}\n'
384                        # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n'
385                    )
386                else:
387                    self._echofile.write(f'{namepre}{msg}\n')
388                self._echofile.flush()
389                # except BlockingIOError:
390                #     # Ran into this when doing a bunch of logging; assuming
391                #     # this is asyncio's doing?.. For now trying to survive
392                #     # the error but telling the user something is probably
393                #     # missing in their output.
394                #     self._report_blocking_io_on_echo_error = True
395
396            if __debug__:
397                echotime = time.monotonic()
398
399            self._event_loop.call_soon_threadsafe(
400                partial(
401                    self._emit_in_thread,
402                    record.name,
403                    record.levelno,
404                    record.created,
405                    msg,
406                    labels,
407                )
408            )
409
410        if __debug__:
411            # pylint: disable=used-before-assignment
412            # Make noise if we're taking a significant amount of time here.
413            # Limit the noise to once every so often though; otherwise we
414            # could get a feedback loop where every log emit results in a
415            # warning log which results in another, etc.
416            now = time.monotonic()
417            # noinspection PyUnboundLocalVariable
418            duration = now - starttime  # pyright: ignore
419            # noinspection PyUnboundLocalVariable
420            format_duration = formattime - starttime  # pyright: ignore
421            # noinspection PyUnboundLocalVariable
422            echo_duration = echotime - formattime  # pyright: ignore
423            if duration > 0.05 and (
424                self._last_slow_emit_warning_time is None
425                or now > self._last_slow_emit_warning_time + 10.0
426            ):
427                # Logging calls from *within* a logging handler
428                # sounds sketchy, so let's just kick this over to
429                # the bg event loop thread we've already got.
430                self._last_slow_emit_warning_time = now
431                self._event_loop.call_soon_threadsafe(
432                    partial(
433                        logging.warning,
434                        'efro.log.LogHandler emit took too long'
435                        ' (%.2fs total; %.2fs format, %.2fs echo,'
436                        ' fast_path=%s).',
437                        duration,
438                        format_duration,
439                        echo_duration,
440                        fast_path,
441                    )
442                )
443
444    def _emit_in_thread(
445        self,
446        name: str,
447        levelno: int,
448        created: float,
449        message: str | logging.LogRecord,
450        labels: dict[str, str],
451    ) -> None:
452        # pylint: disable=too-many-positional-arguments
453        try:
454            # If they passed a raw record here, bake it down to a string.
455            if isinstance(message, logging.LogRecord):
456                message = self.format(message)
457
458            self._emit_entry(
459                LogEntry(
460                    name=name,
461                    message=message,
462                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
463                    time=datetime.datetime.fromtimestamp(
464                        created, datetime.timezone.utc
465                    ),
466                    labels=labels,
467                )
468            )
469        except Exception:
470            import traceback
471
472            traceback.print_exc(file=self._echofile)
473
474    def file_write(self, name: str, output: str) -> None:
475        """Send raw stdout/stderr output to the logger to be collated."""
476
477        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
478        # lines in stack traces get written as lots of individual '^'
479        # writes. It feels a bit dirty to be pushing a deferred call to
480        # another thread for each character. Perhaps should do some sort
481        # of basic accumulation here?
482        self._event_loop.call_soon_threadsafe(
483            partial(self._file_write_in_thread, name, output)
484        )
485
486    def _file_write_in_thread(self, name: str, output: str) -> None:
487        try:
488            assert name in ('stdout', 'stderr')
489
490            # Here we try to be somewhat smart about breaking arbitrary
491            # print output into discrete log entries.
492
493            self._file_chunks[name].append(output)
494
495            # Individual parts of a print come across as separate writes,
496            # and the end of a print will be a standalone '\n' by default.
497            # Let's use that as a hint that we're likely at the end of
498            # a full print statement and ship what we've got.
499            if output == '\n':
500                self._ship_file_chunks(name, cancel_ship_task=True)
501            else:
502                # By default just keep adding chunks.
503                # However we keep a timer running anytime we've got
504                # unshipped chunks so that we can ship what we've got
505                # after a short bit if we never get a newline.
506                ship_task = self._file_chunk_ship_task[name]
507                if ship_task is None:
508                    self._file_chunk_ship_task[name] = (
509                        self._event_loop.create_task(
510                            self._ship_chunks_task(name),
511                            name='log ship file chunks',
512                        )
513                    )
514
515        except Exception:
516            import traceback
517
518            traceback.print_exc(file=self._echofile)
519
520    def shutdown(self) -> None:
521        """Block until all pending logs/prints are done."""
522        done = False
523        self.file_flush('stdout')
524        self.file_flush('stderr')
525
526        def _set_done() -> None:
527            nonlocal done
528            done = True
529
530        self._event_loop.call_soon_threadsafe(_set_done)
531
532        starttime = time.monotonic()
533        while not done:
534            if time.monotonic() - starttime > 5.0:
535                print('LogHandler shutdown hung!!!', file=sys.stderr)
536                break
537            time.sleep(0.01)
538
539    def file_flush(self, name: str) -> None:
540        """Send raw stdout/stderr flush to the logger to be collated."""
541
542        self._event_loop.call_soon_threadsafe(
543            partial(self._file_flush_in_thread, name)
544        )
545
546    def _file_flush_in_thread(self, name: str) -> None:
547        try:
548            assert name in ('stdout', 'stderr')
549
550            # Immediately ship whatever chunks we've got.
551            if self._file_chunks[name]:
552                self._ship_file_chunks(name, cancel_ship_task=True)
553
554        except Exception:
555            import traceback
556
557            traceback.print_exc(file=self._echofile)
558
559    async def _ship_chunks_task(self, name: str) -> None:
560        # Note: it's important we sleep here for a moment. Otherwise,
561        # things like '^^^^^^^^^^^^' lines in stack traces, which come
562        # through as lots of individual '^' writes, tend to get broken
563        # into lots of tiny little lines by us.
564        await asyncio.sleep(0.01)
565        self._ship_file_chunks(name, cancel_ship_task=False)
566
567    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
568        # Note: Raw print input generally ends in a newline, but that is
569        # redundant when we break things into log entries and results in
570        # extra empty lines. So strip off a single trailing newline if
571        # one is present.
572        text = ''.join(self._file_chunks[name]).removesuffix('\n')
573
574        self._emit_entry(
575            LogEntry(
576                name=name, message=text, level=LogLevel.INFO, time=utc_now()
577            )
578        )
579        self._file_chunks[name] = []
580        ship_task = self._file_chunk_ship_task[name]
581        if cancel_ship_task and ship_task is not None:
582            ship_task.cancel()
583        self._file_chunk_ship_task[name] = None
584
585    def _emit_entry(self, entry: LogEntry) -> None:
586        assert current_thread() is self._thread
587
588        # Store to our cache.
589        if self._cache_size_limit > 0:
590            with self._cache_lock:
591                # Do a rough calc of how many bytes this entry consumes.
592                entry_size = sum(
593                    sys.getsizeof(x)
594                    for x in (
595                        entry,
596                        entry.name,
597                        entry.message,
598                        entry.level,
599                        entry.time,
600                    )
601                )
602                self._cache.append((entry_size, entry))
603                self._cache_size += entry_size
604
605                # Prune old until we are back at or under our limit.
606                while self._cache_size > self._cache_size_limit:
607                    popped = self._cache.popleft()
608                    self._cache_size -= popped[0]
609                    self._cache_index_offset += 1
610
611        # Pass to callbacks.
612        for call in self._callbacks:
613            self._run_callback_on_entry(call, entry)
614
615        # Dump to our structured log file.
616        # TODO: should set a timer for flushing; don't flush every line.
617        if self._file is not None:
618            entry_s = dataclass_to_json(entry)
619            assert '\n' not in entry_s  # Make sure its a single line.
620            print(entry_s, file=self._file, flush=True)
621
622    def _run_callback_on_entry(
623        self, callback: Callable[[LogEntry], None], entry: LogEntry
624    ) -> None:
625        """Run a callback and handle any errors."""
626        try:
627            callback(entry)
628        except Exception:
629            # Only print the first callback error to avoid insanity.
630            if not self._printed_callback_error:
631                import traceback
632
633                traceback.print_exc(file=self._echofile)
634                self._printed_callback_error = True

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

LogHandler( *, path: str | pathlib.Path | None, echofile: typing.TextIO | None, suppress_non_root_debug: bool, cache_size_limit: int, cache_time_limit: datetime.timedelta | None)
128    def __init__(
129        self,
130        *,
131        path: str | Path | None,
132        echofile: TextIO | None,
133        suppress_non_root_debug: bool,
134        cache_size_limit: int,
135        cache_time_limit: datetime.timedelta | None,
136    ):
137        super().__init__()
138        # pylint: disable=consider-using-with
139        self._file = None if path is None else open(path, 'w', encoding='utf-8')
140        self._echofile = echofile
141        self._callbacks: list[Callable[[LogEntry], None]] = []
142        self._suppress_non_root_debug = suppress_non_root_debug
143        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
144        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
145            'stdout': None,
146            'stderr': None,
147        }
148        self._cache_size = 0
149        assert cache_size_limit >= 0
150        self._cache_size_limit = cache_size_limit
151        self._cache_time_limit = cache_time_limit
152        self._cache = deque[tuple[int, LogEntry]]()
153        self._cache_index_offset = 0
154        self._cache_lock = Lock()
155        self._printed_callback_error = False
156        self._thread_bootstrapped = False
157        self._thread = Thread(target=self._log_thread_main, daemon=True)
158        if __debug__:
159            self._last_slow_emit_warning_time: float | None = None
160        self._thread.start()
161
162        # Spin until our thread is up and running; otherwise we could
163        # wind up trying to push stuff to our event loop before the
164        # loop exists.
165        while not self._thread_bootstrapped:
166            time.sleep(0.001)

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def add_callback( self, call: Callable[[LogEntry], NoneType], feed_existing_logs: bool = False) -> None:
168    def add_callback(
169        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
170    ) -> None:
171        """Add a callback to be run for each LogEntry.
172
173        Note that this callback will always run in a background thread.
174        Passing True for feed_existing_logs will cause all cached logs
175        in the handler to be fed to the callback (still in the
176        background thread though).
177        """
178
179        # Kick this over to our bg thread to add the callback and
180        # process cached entries at the same time to ensure there are no
181        # race conditions that could cause entries to be skipped/etc.
182        self._event_loop.call_soon_threadsafe(
183            partial(self._add_callback_in_thread, call, feed_existing_logs)
184        )

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

def get_cached( self, start_index: int = 0, max_entries: int | None = None) -> LogArchive:
249    def get_cached(
250        self, start_index: int = 0, max_entries: int | None = None
251    ) -> LogArchive:
252        """Build and return an archive of cached log entries.
253
254        This will only include entries that have been processed by the
255        background thread, so may not include just-submitted logs or
256        entries for partially written stdout/stderr lines.
257        Entries from the range [start_index:start_index+max_entries]
258        which are still present in the cache will be returned.
259        """
260
261        assert start_index >= 0
262        if max_entries is not None:
263            assert max_entries >= 0
264        with self._cache_lock:
265            # Transform start_index to our present cache space.
266            start_index -= self._cache_index_offset
267            # Calc end-index in our present cache space.
268            end_index = (
269                len(self._cache)
270                if max_entries is None
271                else start_index + max_entries
272            )
273
274            # Clamp both indexes to both ends of our present space.
275            start_index = max(0, min(start_index, len(self._cache)))
276            end_index = max(0, min(end_index, len(self._cache)))
277
278            return LogArchive(
279                log_size=self._cache_index_offset + len(self._cache),
280                start_index=start_index + self._cache_index_offset,
281                entries=self._cache_slice(start_index, end_index),
282            )

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

def call_in_thread(self, call: Callable[[], Any]) -> None:
307    def call_in_thread(self, call: Callable[[], Any]) -> None:
308        """Submit a call to be run in the logging background thread."""
309        self._event_loop.call_soon_threadsafe(call)

Submit a call to be run in the logging background thread.

@override
def emit(self, record: logging.LogRecord) -> None:
311    @override
312    def emit(self, record: logging.LogRecord) -> None:
313        # pylint: disable=too-many-branches
314        if __debug__:
315            starttime = time.monotonic()
316
317        # Called by logging to send us records.
318
319        # TODO - kill this.
320        if (
321            self._suppress_non_root_debug
322            and record.name != 'root'
323            and record.levelname == 'DEBUG'
324        ):
325            return
326
327        # Optimization: if our log args are all simple immutable values,
328        # we can just kick the whole thing over to our background thread to
329        # be formatted there at our leisure. If anything is mutable and
330        # thus could possibly change between now and then or if we want
331        # to do immediate file echoing then we need to bite the bullet
332        # and do that stuff here at the call site.
333        fast_path = self._echofile is None and self._is_immutable_log_data(
334            record.args
335        )
336
337        # Note: just assuming types are correct here, but they'll be
338        # checked properly when the resulting LogEntry gets exported.
339        labels: dict[str, str] | None = getattr(record, 'labels', None)
340        if labels is None:
341            labels = {}
342
343        if fast_path:
344            if __debug__:
345                formattime = echotime = time.monotonic()
346            self._event_loop.call_soon_threadsafe(
347                partial(
348                    self._emit_in_thread,
349                    record.name,
350                    record.levelno,
351                    record.created,
352                    record,
353                    labels,
354                )
355            )
356        else:
357            # Slow case; do formatting and echoing here at the log call
358            # site.
359            msg = self.format(record)
360
361            if __debug__:
362                formattime = time.monotonic()
363
364            # Also immediately print pretty colored output to our echo file
365            # (generally stderr). We do this part here instead of in our bg
366            # thread because the delay can throw off command line prompts or
367            # make tight debugging harder.
368            if self._echofile is not None:
369                # try:
370                # if self._report_blocking_io_on_echo_error:
371                #     premsg = (
372                #         'WARNING: BlockingIOError ON LOG ECHO OUTPUT;'
373                #         ' YOU ARE PROBABLY MISSING LOGS\n'
374                #     )
375                #     self._report_blocking_io_on_echo_error = False
376                # else:
377                #     premsg = ''
378                ends = LEVELNO_COLOR_CODES.get(record.levelno)
379                namepre = f'{Clr.WHT}{record.name}:{Clr.RST} '
380                if ends is not None:
381                    self._echofile.write(
382                        f'{namepre}{ends[0]}'
383                        f'{msg}{ends[1]}\n'
384                        # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n'
385                    )
386                else:
387                    self._echofile.write(f'{namepre}{msg}\n')
388                self._echofile.flush()
389                # except BlockingIOError:
390                #     # Ran into this when doing a bunch of logging; assuming
391                #     # this is asyncio's doing?.. For now trying to survive
392                #     # the error but telling the user something is probably
393                #     # missing in their output.
394                #     self._report_blocking_io_on_echo_error = True
395
396            if __debug__:
397                echotime = time.monotonic()
398
399            self._event_loop.call_soon_threadsafe(
400                partial(
401                    self._emit_in_thread,
402                    record.name,
403                    record.levelno,
404                    record.created,
405                    msg,
406                    labels,
407                )
408            )
409
410        if __debug__:
411            # pylint: disable=used-before-assignment
412            # Make noise if we're taking a significant amount of time here.
413            # Limit the noise to once every so often though; otherwise we
414            # could get a feedback loop where every log emit results in a
415            # warning log which results in another, etc.
416            now = time.monotonic()
417            # noinspection PyUnboundLocalVariable
418            duration = now - starttime  # pyright: ignore
419            # noinspection PyUnboundLocalVariable
420            format_duration = formattime - starttime  # pyright: ignore
421            # noinspection PyUnboundLocalVariable
422            echo_duration = echotime - formattime  # pyright: ignore
423            if duration > 0.05 and (
424                self._last_slow_emit_warning_time is None
425                or now > self._last_slow_emit_warning_time + 10.0
426            ):
427                # Logging calls from *within* a logging handler
428                # sounds sketchy, so let's just kick this over to
429                # the bg event loop thread we've already got.
430                self._last_slow_emit_warning_time = now
431                self._event_loop.call_soon_threadsafe(
432                    partial(
433                        logging.warning,
434                        'efro.log.LogHandler emit took too long'
435                        ' (%.2fs total; %.2fs format, %.2fs echo,'
436                        ' fast_path=%s).',
437                        duration,
438                        format_duration,
439                        echo_duration,
440                        fast_path,
441                    )
442                )

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

def file_write(self, name: str, output: str) -> None:
474    def file_write(self, name: str, output: str) -> None:
475        """Send raw stdout/stderr output to the logger to be collated."""
476
477        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
478        # lines in stack traces get written as lots of individual '^'
479        # writes. It feels a bit dirty to be pushing a deferred call to
480        # another thread for each character. Perhaps should do some sort
481        # of basic accumulation here?
482        self._event_loop.call_soon_threadsafe(
483            partial(self._file_write_in_thread, name, output)
484        )

Send raw stdout/stderr output to the logger to be collated.

def shutdown(self) -> None:
520    def shutdown(self) -> None:
521        """Block until all pending logs/prints are done."""
522        done = False
523        self.file_flush('stdout')
524        self.file_flush('stderr')
525
526        def _set_done() -> None:
527            nonlocal done
528            done = True
529
530        self._event_loop.call_soon_threadsafe(_set_done)
531
532        starttime = time.monotonic()
533        while not done:
534            if time.monotonic() - starttime > 5.0:
535                print('LogHandler shutdown hung!!!', file=sys.stderr)
536                break
537            time.sleep(0.01)

Block until all pending logs/prints are done.

def file_flush(self, name: str) -> None:
539    def file_flush(self, name: str) -> None:
540        """Send raw stdout/stderr flush to the logger to be collated."""
541
542        self._event_loop.call_soon_threadsafe(
543            partial(self._file_flush_in_thread, name)
544        )

Send raw stdout/stderr flush to the logger to be collated.

Inherited Members
logging.Handler
level
formatter
get_name
set_name
name
createLock
acquire
release
setLevel
format
handle
setFormatter
flush
close
handleError
logging.Filterer
filters
addFilter
removeFilter
filter
class FileLogEcho:
637class FileLogEcho:
638    """A file-like object for forwarding stdout/stderr to a LogHandler."""
639
640    def __init__(
641        self, original: TextIO, name: str, handler: LogHandler
642    ) -> None:
643        assert name in ('stdout', 'stderr')
644        self._original = original
645        self._name = name
646        self._handler = handler
647
648        # Think this was a result of setting non-blocking stdin somehow;
649        # probably not needed.
650        # self._report_blocking_io_error = False
651
652    def write(self, output: Any) -> None:
653        """Override standard write call."""
654        # try:
655        # if self._report_blocking_io_error:
656        #     self._report_blocking_io_error = False
657        #     self._original.write(
658        #         'WARNING: BlockingIOError ENCOUNTERED;'
659        #         ' OUTPUT IS PROBABLY MISSING'
660        #     )
661
662        self._original.write(output)
663        # except BlockingIOError:
664        #     self._report_blocking_io_error = True
665        self._handler.file_write(self._name, output)
666
667    def flush(self) -> None:
668        """Flush the file."""
669        self._original.flush()
670
671        # We also use this as a hint to ship whatever file chunks
672        # we've accumulated (we have to try and be smart about breaking
673        # our arbitrary file output into discrete entries).
674        self._handler.file_flush(self._name)
675
676    def isatty(self) -> bool:
677        """Are we a terminal?"""
678        return self._original.isatty()

A file-like object for forwarding stdout/stderr to a LogHandler.

FileLogEcho(original: <class 'TextIO'>, name: str, handler: LogHandler)
640    def __init__(
641        self, original: TextIO, name: str, handler: LogHandler
642    ) -> None:
643        assert name in ('stdout', 'stderr')
644        self._original = original
645        self._name = name
646        self._handler = handler
647
648        # Think this was a result of setting non-blocking stdin somehow;
649        # probably not needed.
650        # self._report_blocking_io_error = False
def write(self, output: Any) -> None:
652    def write(self, output: Any) -> None:
653        """Override standard write call."""
654        # try:
655        # if self._report_blocking_io_error:
656        #     self._report_blocking_io_error = False
657        #     self._original.write(
658        #         'WARNING: BlockingIOError ENCOUNTERED;'
659        #         ' OUTPUT IS PROBABLY MISSING'
660        #     )
661
662        self._original.write(output)
663        # except BlockingIOError:
664        #     self._report_blocking_io_error = True
665        self._handler.file_write(self._name, output)

Override standard write call.

def flush(self) -> None:
667    def flush(self) -> None:
668        """Flush the file."""
669        self._original.flush()
670
671        # We also use this as a hint to ship whatever file chunks
672        # we've accumulated (we have to try and be smart about breaking
673        # our arbitrary file output into discrete entries).
674        self._handler.file_flush(self._name)

Flush the file.

def isatty(self) -> bool:
676    def isatty(self) -> bool:
677        """Are we a terminal?"""
678        return self._original.isatty()

Are we a terminal?

def setup_logging( log_path: str | pathlib.Path | None, level: LogLevel, *, suppress_non_root_debug: bool = False, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None) -> LogHandler:
681def setup_logging(
682    log_path: str | Path | None,
683    level: LogLevel,
684    *,
685    suppress_non_root_debug: bool = False,
686    log_stdout_stderr: bool = False,
687    echo_to_stderr: bool = True,
688    cache_size_limit: int = 0,
689    cache_time_limit: datetime.timedelta | None = None,
690) -> LogHandler:
691    """Set up our logging environment.
692
693    Returns the custom handler which can be used to fetch information
694    about logs that have passed through it. (worst log-levels, caches, etc.).
695    """
696
697    lmap = {
698        LogLevel.DEBUG: logging.DEBUG,
699        LogLevel.INFO: logging.INFO,
700        LogLevel.WARNING: logging.WARNING,
701        LogLevel.ERROR: logging.ERROR,
702        LogLevel.CRITICAL: logging.CRITICAL,
703    }
704
705    # Wire logger output to go to a structured log file.
706    # Also echo it to stderr IF we're running in a terminal.
707    # UPDATE: Actually gonna always go to stderr. Is there a
708    # reason we shouldn't? This makes debugging possible if all
709    # we have is access to a non-interactive terminal or file dump.
710    # We could add a '--quiet' arg or whatnot to change this behavior.
711
712    # Note: by passing in the *original* stderr here before we
713    # (potentially) replace it, we ensure that our log echos
714    # won't themselves be intercepted and sent to the logger
715    # which would create an infinite loop.
716    loghandler = LogHandler(
717        path=log_path,
718        echofile=sys.stderr if echo_to_stderr else None,
719        suppress_non_root_debug=suppress_non_root_debug,
720        cache_size_limit=cache_size_limit,
721        cache_time_limit=cache_time_limit,
722    )
723
724    # Note: going ahead with force=True here so that we replace any
725    # existing logger. Though we warn if it looks like we are doing
726    # that so we can try to avoid creating the first one.
727    had_previous_handlers = bool(logging.root.handlers)
728    logging.basicConfig(
729        level=lmap[level],
730        # format='%(name)s: %(message)s',
731        # We dump *only* the message here. We pass various log record bits
732        # around and format things fancier where they end up.
733        format='%(message)s',
734        handlers=[loghandler],
735        force=True,
736    )
737    if had_previous_handlers:
738        logging.warning(
739            'setup_logging: Replacing existing handlers.'
740            ' Something may have logged before expected.'
741        )
742
743    # Optionally intercept Python's stdout/stderr output and generate
744    # log entries from it.
745    if log_stdout_stderr:
746        sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler)
747        sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler)
748
749    return loghandler

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).