efro.logging

Logging functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Logging functionality."""
  4from __future__ import annotations
  5
  6import sys
  7import time
  8import asyncio
  9import logging
 10import datetime
 11import itertools
 12from enum import Enum
 13from functools import partial
 14from collections import deque
 15from dataclasses import dataclass, field
 16from typing import TYPE_CHECKING, Annotated, override
 17from threading import Thread, current_thread, Lock
 18
 19from efro.util import utc_now
 20from efro.terminal import Clr, color_enabled
 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
 22
 23if TYPE_CHECKING:
 24    from pathlib import Path
 25    from typing import Any, Callable, TextIO, Literal
 26
 27
 28class LogLevel(Enum):
 29    """Severity level for a log entry.
 30
 31    These enums have numeric values so they can be compared in severity.
 32    Note that these values are not currently interchangeable with the
 33    logging.ERROR, logging.DEBUG, etc. values.
 34    """
 35
 36    DEBUG = 0
 37    INFO = 1
 38    WARNING = 2
 39    ERROR = 3
 40    CRITICAL = 4
 41
 42    @property
 43    def python_logging_level(self) -> int:
 44        """Give the corresponding logging level."""
 45        return LOG_LEVEL_LEVELNOS[self]
 46
 47    @classmethod
 48    def from_python_logging_level(cls, levelno: int) -> LogLevel:
 49        """Given a Python logging level, return a LogLevel."""
 50        return LEVELNO_LOG_LEVELS[levelno]
 51
 52
 53# Python logging levels from LogLevels
 54LOG_LEVEL_LEVELNOS = {
 55    LogLevel.DEBUG: logging.DEBUG,
 56    LogLevel.INFO: logging.INFO,
 57    LogLevel.WARNING: logging.WARNING,
 58    LogLevel.ERROR: logging.ERROR,
 59    LogLevel.CRITICAL: logging.CRITICAL,
 60}
 61
 62# LogLevels from Python logging levels
 63LEVELNO_LOG_LEVELS = {
 64    logging.DEBUG: LogLevel.DEBUG,
 65    logging.INFO: LogLevel.INFO,
 66    logging.WARNING: LogLevel.WARNING,
 67    logging.ERROR: LogLevel.ERROR,
 68    logging.CRITICAL: LogLevel.CRITICAL,
 69}
 70
 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {
 72    logging.DEBUG: (Clr.CYN, Clr.RST),
 73    logging.INFO: ('', ''),
 74    logging.WARNING: (Clr.YLW, Clr.RST),
 75    logging.ERROR: (Clr.RED, Clr.RST),
 76    logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST),
 77}
 78
 79
 80@ioprepped
 81@dataclass
 82class LogEntry:
 83    """Single logged message."""
 84
 85    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
 86    message: Annotated[str, IOAttrs('m')]
 87    level: Annotated[LogLevel, IOAttrs('l')]
 88    time: Annotated[datetime.datetime, IOAttrs('t')]
 89
 90    # We support arbitrary string labels per log entry which can be
 91    # incorporated into custom log processing. To populate this, our
 92    # LogHandler class looks for a 'labels' dict passed in the optional
 93    # 'extra' dict arg to standard Python log calls.
 94    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
 95        field(default_factory=dict)
 96    )
 97
 98
 99@ioprepped
100@dataclass
101class LogArchive:
102    """Info and data for a log."""
103
104    # Total number of entries submitted to the log.
105    log_size: Annotated[int, IOAttrs('t')]
106
107    # Offset for the entries contained here.
108    # (10 means our first entry is the 10th in the log, etc.)
109    start_index: Annotated[int, IOAttrs('c')]
110
111    entries: Annotated[list[LogEntry], IOAttrs('e')]
112
113
114class LogHandler(logging.Handler):
115    """Fancy-pants handler for logging output.
116
117    Writes logs to disk in structured json format and echoes them
118    to stdout/stderr with pretty colors.
119    """
120
121    _event_loop: asyncio.AbstractEventLoop
122
123    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
124    # Otherwise we can get infinite loops as those prints come back to us
125    # as new log entries.
126
127    def __init__(
128        self,
129        *,
130        path: str | Path | None,
131        echofile: TextIO | None,
132        cache_size_limit: int,
133        cache_time_limit: datetime.timedelta | None,
134        echofile_timestamp_format: Literal['default', 'relative'] = 'default',
135        launch_time: float | None = None,
136    ):
137        super().__init__()
138        # pylint: disable=consider-using-with
139        self._file = None if path is None else open(path, 'w', encoding='utf-8')
140        self._echofile = echofile
141        self._echofile_timestamp_format = echofile_timestamp_format
142        self._callbacks: list[Callable[[LogEntry], None]] = []
143        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
144        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
145            'stdout': None,
146            'stderr': None,
147        }
148        self._launch_time = time.time() if launch_time is None else launch_time
149        self._cache_size = 0
150        assert cache_size_limit >= 0
151        self._cache_size_limit = cache_size_limit
152        self._cache_time_limit = cache_time_limit
153        self._cache = deque[tuple[int, LogEntry]]()
154        self._cache_index_offset = 0
155        self._cache_lock = Lock()
156        self._printed_callback_error = False
157        self._thread_bootstrapped = False
158        self._thread = Thread(target=self._log_thread_main, daemon=True)
159        if __debug__:
160            self._last_slow_emit_warning_time: float | None = None
161        self._thread.start()
162
163        # Spin until our thread is up and running; otherwise we could
164        # wind up trying to push stuff to our event loop before the loop
165        # exists.
166        while not self._thread_bootstrapped:
167            time.sleep(0.001)
168
169    def add_callback(
170        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
171    ) -> None:
172        """Add a callback to be run for each LogEntry.
173
174        Note that this callback will always run in a background thread.
175        Passing True for feed_existing_logs will cause all cached logs
176        in the handler to be fed to the callback (still in the
177        background thread though).
178        """
179
180        # Kick this over to our bg thread to add the callback and
181        # process cached entries at the same time to ensure there are no
182        # race conditions that could cause entries to be skipped/etc.
183        self._event_loop.call_soon_threadsafe(
184            partial(self._add_callback_in_thread, call, feed_existing_logs)
185        )
186
187    def _add_callback_in_thread(
188        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
189    ) -> None:
190        """Add a callback to be run for each LogEntry.
191
192        Note that this callback will always run in a background thread.
193        Passing True for feed_existing_logs will cause all cached logs
194        in the handler to be fed to the callback (still in the
195        background thread though).
196        """
197        assert current_thread() is self._thread
198        self._callbacks.append(call)
199
200        # Run all of our cached entries through the new callback if desired.
201        if feed_existing_logs and self._cache_size_limit > 0:
202            with self._cache_lock:
203                for _id, entry in self._cache:
204                    self._run_callback_on_entry(call, entry)
205
206    def _log_thread_main(self) -> None:
207        self._event_loop = asyncio.new_event_loop()
208
209        # In our background thread event loop we do a fair amount of
210        # slow synchronous stuff such as mucking with the log cache.
211        # Let's avoid getting tons of warnings about this in debug mode.
212        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
213
214        # NOTE: if we ever use default threadpool at all we should allow
215        # setting it for our loop.
216        asyncio.set_event_loop(self._event_loop)
217        self._thread_bootstrapped = True
218        try:
219            if self._cache_time_limit is not None:
220                _prunetask = self._event_loop.create_task(
221                    self._time_prune_cache()
222                )
223            self._event_loop.run_forever()
224        except BaseException:
225            # If this ever goes down we're in trouble; we won't be able
226            # to log about it though. Try to make some noise however we
227            # can.
228            print('LogHandler died!!!', file=sys.stderr)
229            import traceback
230
231            traceback.print_exc()
232            raise
233
234    async def _time_prune_cache(self) -> None:
235        assert self._cache_time_limit is not None
236        while bool(True):
237            await asyncio.sleep(61.27)
238            now = utc_now()
239            with self._cache_lock:
240                # Prune the oldest entry as long as there is a first one
241                # that is too old.
242                while (
243                    self._cache
244                    and (now - self._cache[0][1].time) >= self._cache_time_limit
245                ):
246                    popped = self._cache.popleft()
247                    self._cache_size -= popped[0]
248                    self._cache_index_offset += 1
249
250    def get_cached(
251        self, start_index: int = 0, max_entries: int | None = None
252    ) -> LogArchive:
253        """Build and return an archive of cached log entries.
254
255        This will only include entries that have been processed by the
256        background thread, so may not include just-submitted logs or
257        entries for partially written stdout/stderr lines. Entries from
258        the range [start_index:start_index+max_entries] which are still
259        present in the cache will be returned.
260        """
261
262        assert start_index >= 0
263        if max_entries is not None:
264            assert max_entries >= 0
265        with self._cache_lock:
266            # Transform start_index to our present cache space.
267            start_index -= self._cache_index_offset
268            # Calc end-index in our present cache space.
269            end_index = (
270                len(self._cache)
271                if max_entries is None
272                else start_index + max_entries
273            )
274
275            # Clamp both indexes to both ends of our present space.
276            start_index = max(0, min(start_index, len(self._cache)))
277            end_index = max(0, min(end_index, len(self._cache)))
278
279            return LogArchive(
280                log_size=self._cache_index_offset + len(self._cache),
281                start_index=start_index + self._cache_index_offset,
282                entries=self._cache_slice(start_index, end_index),
283            )
284
285    def _cache_slice(
286        self, start: int, end: int, step: int = 1
287    ) -> list[LogEntry]:
288        # Deque doesn't natively support slicing but we can do it
289        # manually. It sounds like rotating the deque and pulling from
290        # the beginning is the most efficient way to do this. The
291        # downside is the deque gets temporarily modified in the process
292        # so we need to make sure we're holding the lock.
293        assert self._cache_lock.locked()
294        cache = self._cache
295        cache.rotate(-start)
296        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
297        cache.rotate(start)
298        return slc
299
300    @classmethod
301    def _is_immutable_log_data(cls, data: Any) -> bool:
302        if isinstance(data, (str, bool, int, float, bytes)):
303            return True
304        if isinstance(data, tuple):
305            return all(cls._is_immutable_log_data(x) for x in data)
306        return False
307
308    def call_in_thread(self, call: Callable[[], Any]) -> None:
309        """Submit a call to be run in the logging background thread."""
310        self._event_loop.call_soon_threadsafe(call)
311
312    @override
313    def emit(self, record: logging.LogRecord) -> None:
314        # pylint: disable=too-many-branches
315        # pylint: disable=too-many-locals
316
317        if __debug__:
318            starttime = time.monotonic()
319
320        # Called by logging to send us records.
321
322        # Optimization: if our log args are all simple immutable values,
323        # we can just kick the whole thing over to our background thread
324        # to be formatted there at our leisure. If anything is mutable
325        # and thus could possibly change between now and then or if we
326        # want to do immediate file echoing then we need to bite the
327        # bullet and do that stuff here at the call site.
328        fast_path = self._echofile is None and self._is_immutable_log_data(
329            record.args
330        )
331
332        # Note: just assuming types are correct here, but they'll be
333        # checked properly when the resulting LogEntry gets exported.
334        labels: dict[str, str] | None = getattr(record, 'labels', None)
335        if labels is None:
336            labels = {}
337
338        if fast_path:
339            if __debug__:
340                formattime = echotime = time.monotonic()
341            self._event_loop.call_soon_threadsafe(
342                partial(
343                    self._emit_in_thread,
344                    record.name,
345                    record.levelno,
346                    record.created,
347                    record,
348                    labels,
349                )
350            )
351        else:
352            # Slow case; do formatting and echoing here at the log call
353            # site.
354            msg = self.format(record)
355
356            if __debug__:
357                formattime = time.monotonic()
358
359            # Also immediately print pretty colored output to our echo
360            # file (generally stderr). We do this part here instead of
361            # in our bg thread because the delay can throw off command
362            # line prompts or make tight debugging harder.
363            if self._echofile is not None:
364                if self._echofile_timestamp_format == 'relative':
365                    timestamp = f'{record.created - self._launch_time:.3f}'
366                else:
367                    timestamp = (
368                        datetime.datetime.fromtimestamp(
369                            record.created, tz=datetime.UTC
370                        ).strftime('%H:%M:%S')
371                        + f'.{int(record.msecs):03d}'
372                    )
373
374                # If color printing is disabled, show level through text
375                # instead of color.
376                lvlnameex = (
377                    ''
378                    if color_enabled
379                    else f' {logging.getLevelName(record.levelno)}'
380                )
381
382                preinfo = (
383                    f'{Clr.WHT}{timestamp} {record.name}{lvlnameex}:'
384                    f'{Clr.RST} '
385                )
386                ends = LEVELNO_COLOR_CODES.get(record.levelno)
387                if ends is not None:
388                    self._echofile.write(f'{preinfo}{ends[0]}{msg}{ends[1]}\n')
389                else:
390                    self._echofile.write(f'{preinfo}{msg}\n')
391                self._echofile.flush()
392
393            if __debug__:
394                echotime = time.monotonic()
395
396            self._event_loop.call_soon_threadsafe(
397                partial(
398                    self._emit_in_thread,
399                    record.name,
400                    record.levelno,
401                    record.created,
402                    msg,
403                    labels,
404                )
405            )
406
407        if __debug__:
408            # pylint: disable=used-before-assignment
409            #
410            # Make noise if we're taking a significant amount of time
411            # here. Limit the noise to once every so often though;
412            # otherwise we could get a feedback loop where every log
413            # emit results in a warning log which results in another,
414            # etc.
415            now = time.monotonic()
416            duration = now - starttime
417            format_duration = formattime - starttime
418            echo_duration = echotime - formattime
419            if duration > 0.05 and (
420                self._last_slow_emit_warning_time is None
421                or now > self._last_slow_emit_warning_time + 10.0
422            ):
423                # Logging calls from *within* a logging handler sounds
424                # sketchy, so let's just kick this over to the bg event
425                # loop thread we've already got.
426                self._last_slow_emit_warning_time = now
427                self._event_loop.call_soon_threadsafe(
428                    partial(
429                        logging.warning,
430                        'efro.logging.LogHandler emit took too long'
431                        ' (%.2fs total; %.2fs format, %.2fs echo,'
432                        ' fast_path=%s).',
433                        duration,
434                        format_duration,
435                        echo_duration,
436                        fast_path,
437                    )
438                )
439
440    def _emit_in_thread(
441        self,
442        name: str,
443        levelno: int,
444        created: float,
445        message: str | logging.LogRecord,
446        labels: dict[str, str],
447    ) -> None:
448        # pylint: disable=too-many-positional-arguments
449        try:
450            # If they passed a raw record here, bake it down to a string.
451            if isinstance(message, logging.LogRecord):
452                message = self.format(message)
453
454            self._emit_entry(
455                LogEntry(
456                    name=name,
457                    message=message,
458                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
459                    time=datetime.datetime.fromtimestamp(
460                        created, datetime.timezone.utc
461                    ),
462                    labels=labels,
463                )
464            )
465        except Exception:
466            import traceback
467
468            traceback.print_exc(file=self._echofile)
469
470    def file_write(self, name: str, output: str) -> None:
471        """Send raw stdout/stderr output to the logger to be collated."""
472
473        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
474        # lines in stack traces get written as lots of individual '^'
475        # writes. It feels a bit dirty to be pushing a deferred call to
476        # another thread for each character. Perhaps should do some sort
477        # of basic accumulation here?
478        self._event_loop.call_soon_threadsafe(
479            partial(self._file_write_in_thread, name, output)
480        )
481
482    def _file_write_in_thread(self, name: str, output: str) -> None:
483        try:
484            assert name in ('stdout', 'stderr')
485
486            # Here we try to be somewhat smart about breaking arbitrary
487            # print output into discrete log entries.
488
489            self._file_chunks[name].append(output)
490
491            # Individual parts of a print come across as separate
492            # writes, and the end of a print will be a standalone '\n'
493            # by default. Let's use that as a hint that we're likely at
494            # the end of a full print statement and ship what we've got.
495            if output == '\n':
496                self._ship_file_chunks(name, cancel_ship_task=True)
497            else:
498                # By default just keep adding chunks. However we keep a
499                # timer running anytime we've got unshipped chunks so
500                # that we can ship what we've got after a short bit if
501                # we never get a newline.
502                ship_task = self._file_chunk_ship_task[name]
503                if ship_task is None:
504                    self._file_chunk_ship_task[name] = (
505                        self._event_loop.create_task(
506                            self._ship_chunks_task(name),
507                            name='log ship file chunks',
508                        )
509                    )
510
511        except Exception:
512            import traceback
513
514            traceback.print_exc(file=self._echofile)
515
516    def shutdown(self) -> None:
517        """Block until all pending logs/prints are done."""
518        done = False
519        self.file_flush('stdout')
520        self.file_flush('stderr')
521
522        def _set_done() -> None:
523            nonlocal done
524            done = True
525
526        self._event_loop.call_soon_threadsafe(_set_done)
527
528        starttime = time.monotonic()
529        while not done:
530            if time.monotonic() - starttime > 5.0:
531                print('LogHandler shutdown hung!!!', file=sys.stderr)
532                break
533            time.sleep(0.01)
534
535    def file_flush(self, name: str) -> None:
536        """Send raw stdout/stderr flush to the logger to be collated."""
537
538        self._event_loop.call_soon_threadsafe(
539            partial(self._file_flush_in_thread, name)
540        )
541
542    def _file_flush_in_thread(self, name: str) -> None:
543        try:
544            assert name in ('stdout', 'stderr')
545
546            # Immediately ship whatever chunks we've got.
547            if self._file_chunks[name]:
548                self._ship_file_chunks(name, cancel_ship_task=True)
549
550        except Exception:
551            import traceback
552
553            traceback.print_exc(file=self._echofile)
554
555    async def _ship_chunks_task(self, name: str) -> None:
556        # Note: it's important we sleep here for a moment. Otherwise,
557        # things like '^^^^^^^^^^^^' lines in stack traces, which come
558        # through as lots of individual '^' writes, tend to get broken
559        # into lots of tiny little lines by us.
560        await asyncio.sleep(0.01)
561        self._ship_file_chunks(name, cancel_ship_task=False)
562
563    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
564        # Note: Raw print input generally ends in a newline, but that is
565        # redundant when we break things into log entries and results in
566        # extra empty lines. So strip off a single trailing newline if
567        # one is present.
568        text = ''.join(self._file_chunks[name]).removesuffix('\n')
569
570        self._emit_entry(
571            LogEntry(
572                name=name, message=text, level=LogLevel.INFO, time=utc_now()
573            )
574        )
575        self._file_chunks[name] = []
576        ship_task = self._file_chunk_ship_task[name]
577        if cancel_ship_task and ship_task is not None:
578            ship_task.cancel()
579        self._file_chunk_ship_task[name] = None
580
581    def _emit_entry(self, entry: LogEntry) -> None:
582        assert current_thread() is self._thread
583
584        # Store to our cache.
585        if self._cache_size_limit > 0:
586            with self._cache_lock:
587                # Do a rough calc of how many bytes this entry consumes.
588                entry_size = sum(
589                    sys.getsizeof(x)
590                    for x in (
591                        entry,
592                        entry.name,
593                        entry.message,
594                        entry.level,
595                        entry.time,
596                    )
597                )
598                self._cache.append((entry_size, entry))
599                self._cache_size += entry_size
600
601                # Prune old until we are back at or under our limit.
602                while self._cache_size > self._cache_size_limit:
603                    popped = self._cache.popleft()
604                    self._cache_size -= popped[0]
605                    self._cache_index_offset += 1
606
607        # Pass to callbacks.
608        for call in self._callbacks:
609            self._run_callback_on_entry(call, entry)
610
611        # Dump to our structured log file.
612        #
613        # TODO: should set a timer for flushing; don't flush every line.
614        if self._file is not None:
615            entry_s = dataclass_to_json(entry)
616            assert '\n' not in entry_s  # Make sure its a single line.
617            print(entry_s, file=self._file, flush=True)
618
619    def _run_callback_on_entry(
620        self, callback: Callable[[LogEntry], None], entry: LogEntry
621    ) -> None:
622        """Run a callback and handle any errors."""
623        try:
624            callback(entry)
625        except Exception:
626            # Only print the first callback error to avoid insanity.
627            if not self._printed_callback_error:
628                import traceback
629
630                traceback.print_exc(file=self._echofile)
631                self._printed_callback_error = True
632
633
634class FileLogEcho:
635    """A file-like object for forwarding stdout/stderr to a LogHandler."""
636
637    def __init__(
638        self, original: TextIO, name: str, handler: LogHandler
639    ) -> None:
640        assert name in ('stdout', 'stderr')
641        self._original = original
642        self._name = name
643        self._handler = handler
644
645    def write(self, output: Any) -> None:
646        """Override standard write call."""
647        self._original.write(output)
648        self._handler.file_write(self._name, output)
649
650    def flush(self) -> None:
651        """Flush the file."""
652        self._original.flush()
653
654        # We also use this as a hint to ship whatever file chunks
655        # we've accumulated (we have to try and be smart about breaking
656        # our arbitrary file output into discrete entries).
657        self._handler.file_flush(self._name)
658
659    def isatty(self) -> bool:
660        """Are we a terminal?"""
661        return self._original.isatty()
662
663
664def setup_logging(
665    log_path: str | Path | None,
666    level: LogLevel,
667    *,
668    log_stdout_stderr: bool = False,
669    echo_to_stderr: bool = True,
670    cache_size_limit: int = 0,
671    cache_time_limit: datetime.timedelta | None = None,
672    launch_time: float | None = None,
673) -> LogHandler:
674    """Set up our logging environment.
675
676    Returns the custom handler which can be used to fetch information
677    about logs that have passed through it. (worst log-levels, caches, etc.).
678    """
679
680    lmap = {
681        LogLevel.DEBUG: logging.DEBUG,
682        LogLevel.INFO: logging.INFO,
683        LogLevel.WARNING: logging.WARNING,
684        LogLevel.ERROR: logging.ERROR,
685        LogLevel.CRITICAL: logging.CRITICAL,
686    }
687
688    # Wire logger output to go to a structured log file. Also echo it to
689    # stderr IF we're running in a terminal.
690    #
691    # UPDATE: Actually gonna always go to stderr. Is there a reason we
692    # shouldn't? This makes debugging possible if all we have is access
693    # to a non-interactive terminal or file dump. We could add a
694    # '--quiet' arg or whatnot to change this behavior.
695
696    # Note: by passing in the *original* stderr here before we
697    # (potentially) replace it, we ensure that our log echos won't
698    # themselves be intercepted and sent to the logger which would
699    # create an infinite loop.
700    loghandler = LogHandler(
701        path=log_path,
702        echofile=sys.stderr if echo_to_stderr else None,
703        echofile_timestamp_format='relative',
704        cache_size_limit=cache_size_limit,
705        cache_time_limit=cache_time_limit,
706        launch_time=launch_time,
707    )
708
709    # Note: going ahead with force=True here so that we replace any
710    # existing logger. Though we warn if it looks like we are doing that
711    # so we can try to avoid creating the first one.
712    had_previous_handlers = bool(logging.root.handlers)
713    logging.basicConfig(
714        level=lmap[level],
715        # We dump *only* the message here. We pass various log record
716        # bits around so we can write rich logs or format things later.
717        format='%(message)s',
718        handlers=[loghandler],
719        force=True,
720    )
721    if had_previous_handlers:
722        logging.warning(
723            'setup_logging: Replacing existing handlers.'
724            ' Something may have logged before expected.'
725        )
726
727    # Optionally intercept Python's stdout/stderr output and generate
728    # log entries from it.
729    if log_stdout_stderr:
730        sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler)
731        sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler)
732
733    return loghandler
class LogLevel(enum.Enum):
29class LogLevel(Enum):
30    """Severity level for a log entry.
31
32    These enums have numeric values so they can be compared in severity.
33    Note that these values are not currently interchangeable with the
34    logging.ERROR, logging.DEBUG, etc. values.
35    """
36
37    DEBUG = 0
38    INFO = 1
39    WARNING = 2
40    ERROR = 3
41    CRITICAL = 4
42
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]
47
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

DEBUG = <LogLevel.DEBUG: 0>
INFO = <LogLevel.INFO: 1>
WARNING = <LogLevel.WARNING: 2>
ERROR = <LogLevel.ERROR: 3>
CRITICAL = <LogLevel.CRITICAL: 4>
python_logging_level: int
43    @property
44    def python_logging_level(self) -> int:
45        """Give the corresponding logging level."""
46        return LOG_LEVEL_LEVELNOS[self]

Give the corresponding logging level.

@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
48    @classmethod
49    def from_python_logging_level(cls, levelno: int) -> LogLevel:
50        """Given a Python logging level, return a LogLevel."""
51        return LEVELNO_LOG_LEVELS[levelno]

Given a Python logging level, return a LogLevel.

LOG_LEVEL_LEVELNOS = {<LogLevel.DEBUG: 0>: 10, <LogLevel.INFO: 1>: 20, <LogLevel.WARNING: 2>: 30, <LogLevel.ERROR: 3>: 40, <LogLevel.CRITICAL: 4>: 50}
LEVELNO_LOG_LEVELS = {10: <LogLevel.DEBUG: 0>, 20: <LogLevel.INFO: 1>, 30: <LogLevel.WARNING: 2>, 40: <LogLevel.ERROR: 3>, 50: <LogLevel.CRITICAL: 4>}
LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = {10: ('', ''), 20: ('', ''), 30: ('', ''), 40: ('', ''), 50: ('', '')}
@ioprepped
@dataclass
class LogEntry:
81@ioprepped
82@dataclass
83class LogEntry:
84    """Single logged message."""
85
86    name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)]
87    message: Annotated[str, IOAttrs('m')]
88    level: Annotated[LogLevel, IOAttrs('l')]
89    time: Annotated[datetime.datetime, IOAttrs('t')]
90
91    # We support arbitrary string labels per log entry which can be
92    # incorporated into custom log processing. To populate this, our
93    # LogHandler class looks for a 'labels' dict passed in the optional
94    # 'extra' dict arg to standard Python log calls.
95    labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = (
96        field(default_factory=dict)
97    )

Single logged message.

LogEntry( name: Annotated[str, <efro.dataclassio.IOAttrs object>], message: Annotated[str, <efro.dataclassio.IOAttrs object>], level: Annotated[LogLevel, <efro.dataclassio.IOAttrs object>], time: Annotated[datetime.datetime, <efro.dataclassio.IOAttrs object>], labels: Annotated[dict[str, str], <efro.dataclassio.IOAttrs object>] = <factory>)
name: Annotated[str, <efro.dataclassio.IOAttrs object at 0x12ab93bc0>]
message: Annotated[str, <efro.dataclassio.IOAttrs object at 0x12ab93500>]
level: Annotated[LogLevel, <efro.dataclassio.IOAttrs object at 0x12ab91130>]
time: Annotated[datetime.datetime, <efro.dataclassio.IOAttrs object at 0x12ab90f20>]
labels: Annotated[dict[str, str], <efro.dataclassio.IOAttrs object at 0x12ab90050>]
@ioprepped
@dataclass
class LogArchive:
100@ioprepped
101@dataclass
102class LogArchive:
103    """Info and data for a log."""
104
105    # Total number of entries submitted to the log.
106    log_size: Annotated[int, IOAttrs('t')]
107
108    # Offset for the entries contained here.
109    # (10 means our first entry is the 10th in the log, etc.)
110    start_index: Annotated[int, IOAttrs('c')]
111
112    entries: Annotated[list[LogEntry], IOAttrs('e')]

Info and data for a log.

LogArchive( log_size: Annotated[int, <efro.dataclassio.IOAttrs object>], start_index: Annotated[int, <efro.dataclassio.IOAttrs object>], entries: Annotated[list[LogEntry], <efro.dataclassio.IOAttrs object>])
log_size: Annotated[int, <efro.dataclassio.IOAttrs object at 0x12ab901d0>]
start_index: Annotated[int, <efro.dataclassio.IOAttrs object at 0x12ab91280>]
entries: Annotated[list[LogEntry], <efro.dataclassio.IOAttrs object at 0x12ab912e0>]
class LogHandler(logging.Handler):
115class LogHandler(logging.Handler):
116    """Fancy-pants handler for logging output.
117
118    Writes logs to disk in structured json format and echoes them
119    to stdout/stderr with pretty colors.
120    """
121
122    _event_loop: asyncio.AbstractEventLoop
123
124    # IMPORTANT: Any debug prints we do here should ONLY go to echofile.
125    # Otherwise we can get infinite loops as those prints come back to us
126    # as new log entries.
127
128    def __init__(
129        self,
130        *,
131        path: str | Path | None,
132        echofile: TextIO | None,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135        echofile_timestamp_format: Literal['default', 'relative'] = 'default',
136        launch_time: float | None = None,
137    ):
138        super().__init__()
139        # pylint: disable=consider-using-with
140        self._file = None if path is None else open(path, 'w', encoding='utf-8')
141        self._echofile = echofile
142        self._echofile_timestamp_format = echofile_timestamp_format
143        self._callbacks: list[Callable[[LogEntry], None]] = []
144        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
145        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
146            'stdout': None,
147            'stderr': None,
148        }
149        self._launch_time = time.time() if launch_time is None else launch_time
150        self._cache_size = 0
151        assert cache_size_limit >= 0
152        self._cache_size_limit = cache_size_limit
153        self._cache_time_limit = cache_time_limit
154        self._cache = deque[tuple[int, LogEntry]]()
155        self._cache_index_offset = 0
156        self._cache_lock = Lock()
157        self._printed_callback_error = False
158        self._thread_bootstrapped = False
159        self._thread = Thread(target=self._log_thread_main, daemon=True)
160        if __debug__:
161            self._last_slow_emit_warning_time: float | None = None
162        self._thread.start()
163
164        # Spin until our thread is up and running; otherwise we could
165        # wind up trying to push stuff to our event loop before the loop
166        # exists.
167        while not self._thread_bootstrapped:
168            time.sleep(0.001)
169
170    def add_callback(
171        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
172    ) -> None:
173        """Add a callback to be run for each LogEntry.
174
175        Note that this callback will always run in a background thread.
176        Passing True for feed_existing_logs will cause all cached logs
177        in the handler to be fed to the callback (still in the
178        background thread though).
179        """
180
181        # Kick this over to our bg thread to add the callback and
182        # process cached entries at the same time to ensure there are no
183        # race conditions that could cause entries to be skipped/etc.
184        self._event_loop.call_soon_threadsafe(
185            partial(self._add_callback_in_thread, call, feed_existing_logs)
186        )
187
188    def _add_callback_in_thread(
189        self, call: Callable[[LogEntry], None], feed_existing_logs: bool
190    ) -> None:
191        """Add a callback to be run for each LogEntry.
192
193        Note that this callback will always run in a background thread.
194        Passing True for feed_existing_logs will cause all cached logs
195        in the handler to be fed to the callback (still in the
196        background thread though).
197        """
198        assert current_thread() is self._thread
199        self._callbacks.append(call)
200
201        # Run all of our cached entries through the new callback if desired.
202        if feed_existing_logs and self._cache_size_limit > 0:
203            with self._cache_lock:
204                for _id, entry in self._cache:
205                    self._run_callback_on_entry(call, entry)
206
207    def _log_thread_main(self) -> None:
208        self._event_loop = asyncio.new_event_loop()
209
210        # In our background thread event loop we do a fair amount of
211        # slow synchronous stuff such as mucking with the log cache.
212        # Let's avoid getting tons of warnings about this in debug mode.
213        self._event_loop.slow_callback_duration = 2.0  # Default is 0.1
214
215        # NOTE: if we ever use default threadpool at all we should allow
216        # setting it for our loop.
217        asyncio.set_event_loop(self._event_loop)
218        self._thread_bootstrapped = True
219        try:
220            if self._cache_time_limit is not None:
221                _prunetask = self._event_loop.create_task(
222                    self._time_prune_cache()
223                )
224            self._event_loop.run_forever()
225        except BaseException:
226            # If this ever goes down we're in trouble; we won't be able
227            # to log about it though. Try to make some noise however we
228            # can.
229            print('LogHandler died!!!', file=sys.stderr)
230            import traceback
231
232            traceback.print_exc()
233            raise
234
235    async def _time_prune_cache(self) -> None:
236        assert self._cache_time_limit is not None
237        while bool(True):
238            await asyncio.sleep(61.27)
239            now = utc_now()
240            with self._cache_lock:
241                # Prune the oldest entry as long as there is a first one
242                # that is too old.
243                while (
244                    self._cache
245                    and (now - self._cache[0][1].time) >= self._cache_time_limit
246                ):
247                    popped = self._cache.popleft()
248                    self._cache_size -= popped[0]
249                    self._cache_index_offset += 1
250
251    def get_cached(
252        self, start_index: int = 0, max_entries: int | None = None
253    ) -> LogArchive:
254        """Build and return an archive of cached log entries.
255
256        This will only include entries that have been processed by the
257        background thread, so may not include just-submitted logs or
258        entries for partially written stdout/stderr lines. Entries from
259        the range [start_index:start_index+max_entries] which are still
260        present in the cache will be returned.
261        """
262
263        assert start_index >= 0
264        if max_entries is not None:
265            assert max_entries >= 0
266        with self._cache_lock:
267            # Transform start_index to our present cache space.
268            start_index -= self._cache_index_offset
269            # Calc end-index in our present cache space.
270            end_index = (
271                len(self._cache)
272                if max_entries is None
273                else start_index + max_entries
274            )
275
276            # Clamp both indexes to both ends of our present space.
277            start_index = max(0, min(start_index, len(self._cache)))
278            end_index = max(0, min(end_index, len(self._cache)))
279
280            return LogArchive(
281                log_size=self._cache_index_offset + len(self._cache),
282                start_index=start_index + self._cache_index_offset,
283                entries=self._cache_slice(start_index, end_index),
284            )
285
286    def _cache_slice(
287        self, start: int, end: int, step: int = 1
288    ) -> list[LogEntry]:
289        # Deque doesn't natively support slicing but we can do it
290        # manually. It sounds like rotating the deque and pulling from
291        # the beginning is the most efficient way to do this. The
292        # downside is the deque gets temporarily modified in the process
293        # so we need to make sure we're holding the lock.
294        assert self._cache_lock.locked()
295        cache = self._cache
296        cache.rotate(-start)
297        slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)]
298        cache.rotate(start)
299        return slc
300
301    @classmethod
302    def _is_immutable_log_data(cls, data: Any) -> bool:
303        if isinstance(data, (str, bool, int, float, bytes)):
304            return True
305        if isinstance(data, tuple):
306            return all(cls._is_immutable_log_data(x) for x in data)
307        return False
308
309    def call_in_thread(self, call: Callable[[], Any]) -> None:
310        """Submit a call to be run in the logging background thread."""
311        self._event_loop.call_soon_threadsafe(call)
312
313    @override
314    def emit(self, record: logging.LogRecord) -> None:
315        # pylint: disable=too-many-branches
316        # pylint: disable=too-many-locals
317
318        if __debug__:
319            starttime = time.monotonic()
320
321        # Called by logging to send us records.
322
323        # Optimization: if our log args are all simple immutable values,
324        # we can just kick the whole thing over to our background thread
325        # to be formatted there at our leisure. If anything is mutable
326        # and thus could possibly change between now and then or if we
327        # want to do immediate file echoing then we need to bite the
328        # bullet and do that stuff here at the call site.
329        fast_path = self._echofile is None and self._is_immutable_log_data(
330            record.args
331        )
332
333        # Note: just assuming types are correct here, but they'll be
334        # checked properly when the resulting LogEntry gets exported.
335        labels: dict[str, str] | None = getattr(record, 'labels', None)
336        if labels is None:
337            labels = {}
338
339        if fast_path:
340            if __debug__:
341                formattime = echotime = time.monotonic()
342            self._event_loop.call_soon_threadsafe(
343                partial(
344                    self._emit_in_thread,
345                    record.name,
346                    record.levelno,
347                    record.created,
348                    record,
349                    labels,
350                )
351            )
352        else:
353            # Slow case; do formatting and echoing here at the log call
354            # site.
355            msg = self.format(record)
356
357            if __debug__:
358                formattime = time.monotonic()
359
360            # Also immediately print pretty colored output to our echo
361            # file (generally stderr). We do this part here instead of
362            # in our bg thread because the delay can throw off command
363            # line prompts or make tight debugging harder.
364            if self._echofile is not None:
365                if self._echofile_timestamp_format == 'relative':
366                    timestamp = f'{record.created - self._launch_time:.3f}'
367                else:
368                    timestamp = (
369                        datetime.datetime.fromtimestamp(
370                            record.created, tz=datetime.UTC
371                        ).strftime('%H:%M:%S')
372                        + f'.{int(record.msecs):03d}'
373                    )
374
375                # If color printing is disabled, show level through text
376                # instead of color.
377                lvlnameex = (
378                    ''
379                    if color_enabled
380                    else f' {logging.getLevelName(record.levelno)}'
381                )
382
383                preinfo = (
384                    f'{Clr.WHT}{timestamp} {record.name}{lvlnameex}:'
385                    f'{Clr.RST} '
386                )
387                ends = LEVELNO_COLOR_CODES.get(record.levelno)
388                if ends is not None:
389                    self._echofile.write(f'{preinfo}{ends[0]}{msg}{ends[1]}\n')
390                else:
391                    self._echofile.write(f'{preinfo}{msg}\n')
392                self._echofile.flush()
393
394            if __debug__:
395                echotime = time.monotonic()
396
397            self._event_loop.call_soon_threadsafe(
398                partial(
399                    self._emit_in_thread,
400                    record.name,
401                    record.levelno,
402                    record.created,
403                    msg,
404                    labels,
405                )
406            )
407
408        if __debug__:
409            # pylint: disable=used-before-assignment
410            #
411            # Make noise if we're taking a significant amount of time
412            # here. Limit the noise to once every so often though;
413            # otherwise we could get a feedback loop where every log
414            # emit results in a warning log which results in another,
415            # etc.
416            now = time.monotonic()
417            duration = now - starttime
418            format_duration = formattime - starttime
419            echo_duration = echotime - formattime
420            if duration > 0.05 and (
421                self._last_slow_emit_warning_time is None
422                or now > self._last_slow_emit_warning_time + 10.0
423            ):
424                # Logging calls from *within* a logging handler sounds
425                # sketchy, so let's just kick this over to the bg event
426                # loop thread we've already got.
427                self._last_slow_emit_warning_time = now
428                self._event_loop.call_soon_threadsafe(
429                    partial(
430                        logging.warning,
431                        'efro.logging.LogHandler emit took too long'
432                        ' (%.2fs total; %.2fs format, %.2fs echo,'
433                        ' fast_path=%s).',
434                        duration,
435                        format_duration,
436                        echo_duration,
437                        fast_path,
438                    )
439                )
440
441    def _emit_in_thread(
442        self,
443        name: str,
444        levelno: int,
445        created: float,
446        message: str | logging.LogRecord,
447        labels: dict[str, str],
448    ) -> None:
449        # pylint: disable=too-many-positional-arguments
450        try:
451            # If they passed a raw record here, bake it down to a string.
452            if isinstance(message, logging.LogRecord):
453                message = self.format(message)
454
455            self._emit_entry(
456                LogEntry(
457                    name=name,
458                    message=message,
459                    level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO),
460                    time=datetime.datetime.fromtimestamp(
461                        created, datetime.timezone.utc
462                    ),
463                    labels=labels,
464                )
465            )
466        except Exception:
467            import traceback
468
469            traceback.print_exc(file=self._echofile)
470
471    def file_write(self, name: str, output: str) -> None:
472        """Send raw stdout/stderr output to the logger to be collated."""
473
474        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
475        # lines in stack traces get written as lots of individual '^'
476        # writes. It feels a bit dirty to be pushing a deferred call to
477        # another thread for each character. Perhaps should do some sort
478        # of basic accumulation here?
479        self._event_loop.call_soon_threadsafe(
480            partial(self._file_write_in_thread, name, output)
481        )
482
483    def _file_write_in_thread(self, name: str, output: str) -> None:
484        try:
485            assert name in ('stdout', 'stderr')
486
487            # Here we try to be somewhat smart about breaking arbitrary
488            # print output into discrete log entries.
489
490            self._file_chunks[name].append(output)
491
492            # Individual parts of a print come across as separate
493            # writes, and the end of a print will be a standalone '\n'
494            # by default. Let's use that as a hint that we're likely at
495            # the end of a full print statement and ship what we've got.
496            if output == '\n':
497                self._ship_file_chunks(name, cancel_ship_task=True)
498            else:
499                # By default just keep adding chunks. However we keep a
500                # timer running anytime we've got unshipped chunks so
501                # that we can ship what we've got after a short bit if
502                # we never get a newline.
503                ship_task = self._file_chunk_ship_task[name]
504                if ship_task is None:
505                    self._file_chunk_ship_task[name] = (
506                        self._event_loop.create_task(
507                            self._ship_chunks_task(name),
508                            name='log ship file chunks',
509                        )
510                    )
511
512        except Exception:
513            import traceback
514
515            traceback.print_exc(file=self._echofile)
516
517    def shutdown(self) -> None:
518        """Block until all pending logs/prints are done."""
519        done = False
520        self.file_flush('stdout')
521        self.file_flush('stderr')
522
523        def _set_done() -> None:
524            nonlocal done
525            done = True
526
527        self._event_loop.call_soon_threadsafe(_set_done)
528
529        starttime = time.monotonic()
530        while not done:
531            if time.monotonic() - starttime > 5.0:
532                print('LogHandler shutdown hung!!!', file=sys.stderr)
533                break
534            time.sleep(0.01)
535
536    def file_flush(self, name: str) -> None:
537        """Send raw stdout/stderr flush to the logger to be collated."""
538
539        self._event_loop.call_soon_threadsafe(
540            partial(self._file_flush_in_thread, name)
541        )
542
543    def _file_flush_in_thread(self, name: str) -> None:
544        try:
545            assert name in ('stdout', 'stderr')
546
547            # Immediately ship whatever chunks we've got.
548            if self._file_chunks[name]:
549                self._ship_file_chunks(name, cancel_ship_task=True)
550
551        except Exception:
552            import traceback
553
554            traceback.print_exc(file=self._echofile)
555
556    async def _ship_chunks_task(self, name: str) -> None:
557        # Note: it's important we sleep here for a moment. Otherwise,
558        # things like '^^^^^^^^^^^^' lines in stack traces, which come
559        # through as lots of individual '^' writes, tend to get broken
560        # into lots of tiny little lines by us.
561        await asyncio.sleep(0.01)
562        self._ship_file_chunks(name, cancel_ship_task=False)
563
564    def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None:
565        # Note: Raw print input generally ends in a newline, but that is
566        # redundant when we break things into log entries and results in
567        # extra empty lines. So strip off a single trailing newline if
568        # one is present.
569        text = ''.join(self._file_chunks[name]).removesuffix('\n')
570
571        self._emit_entry(
572            LogEntry(
573                name=name, message=text, level=LogLevel.INFO, time=utc_now()
574            )
575        )
576        self._file_chunks[name] = []
577        ship_task = self._file_chunk_ship_task[name]
578        if cancel_ship_task and ship_task is not None:
579            ship_task.cancel()
580        self._file_chunk_ship_task[name] = None
581
582    def _emit_entry(self, entry: LogEntry) -> None:
583        assert current_thread() is self._thread
584
585        # Store to our cache.
586        if self._cache_size_limit > 0:
587            with self._cache_lock:
588                # Do a rough calc of how many bytes this entry consumes.
589                entry_size = sum(
590                    sys.getsizeof(x)
591                    for x in (
592                        entry,
593                        entry.name,
594                        entry.message,
595                        entry.level,
596                        entry.time,
597                    )
598                )
599                self._cache.append((entry_size, entry))
600                self._cache_size += entry_size
601
602                # Prune old until we are back at or under our limit.
603                while self._cache_size > self._cache_size_limit:
604                    popped = self._cache.popleft()
605                    self._cache_size -= popped[0]
606                    self._cache_index_offset += 1
607
608        # Pass to callbacks.
609        for call in self._callbacks:
610            self._run_callback_on_entry(call, entry)
611
612        # Dump to our structured log file.
613        #
614        # TODO: should set a timer for flushing; don't flush every line.
615        if self._file is not None:
616            entry_s = dataclass_to_json(entry)
617            assert '\n' not in entry_s  # Make sure its a single line.
618            print(entry_s, file=self._file, flush=True)
619
620    def _run_callback_on_entry(
621        self, callback: Callable[[LogEntry], None], entry: LogEntry
622    ) -> None:
623        """Run a callback and handle any errors."""
624        try:
625            callback(entry)
626        except Exception:
627            # Only print the first callback error to avoid insanity.
628            if not self._printed_callback_error:
629                import traceback
630
631                traceback.print_exc(file=self._echofile)
632                self._printed_callback_error = True

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

LogHandler( *, path: str | pathlib.Path | None, echofile: typing.TextIO | None, cache_size_limit: int, cache_time_limit: datetime.timedelta | None, echofile_timestamp_format: Literal['default', 'relative'] = 'default', launch_time: float | None = None)
128    def __init__(
129        self,
130        *,
131        path: str | Path | None,
132        echofile: TextIO | None,
133        cache_size_limit: int,
134        cache_time_limit: datetime.timedelta | None,
135        echofile_timestamp_format: Literal['default', 'relative'] = 'default',
136        launch_time: float | None = None,
137    ):
138        super().__init__()
139        # pylint: disable=consider-using-with
140        self._file = None if path is None else open(path, 'w', encoding='utf-8')
141        self._echofile = echofile
142        self._echofile_timestamp_format = echofile_timestamp_format
143        self._callbacks: list[Callable[[LogEntry], None]] = []
144        self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []}
145        self._file_chunk_ship_task: dict[str, asyncio.Task | None] = {
146            'stdout': None,
147            'stderr': None,
148        }
149        self._launch_time = time.time() if launch_time is None else launch_time
150        self._cache_size = 0
151        assert cache_size_limit >= 0
152        self._cache_size_limit = cache_size_limit
153        self._cache_time_limit = cache_time_limit
154        self._cache = deque[tuple[int, LogEntry]]()
155        self._cache_index_offset = 0
156        self._cache_lock = Lock()
157        self._printed_callback_error = False
158        self._thread_bootstrapped = False
159        self._thread = Thread(target=self._log_thread_main, daemon=True)
160        if __debug__:
161            self._last_slow_emit_warning_time: float | None = None
162        self._thread.start()
163
164        # Spin until our thread is up and running; otherwise we could
165        # wind up trying to push stuff to our event loop before the loop
166        # exists.
167        while not self._thread_bootstrapped:
168            time.sleep(0.001)

Initializes the instance - basically setting the formatter to None and the filter list to empty.

def add_callback( self, call: Callable[[LogEntry], NoneType], feed_existing_logs: bool = False) -> None:
170    def add_callback(
171        self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False
172    ) -> None:
173        """Add a callback to be run for each LogEntry.
174
175        Note that this callback will always run in a background thread.
176        Passing True for feed_existing_logs will cause all cached logs
177        in the handler to be fed to the callback (still in the
178        background thread though).
179        """
180
181        # Kick this over to our bg thread to add the callback and
182        # process cached entries at the same time to ensure there are no
183        # race conditions that could cause entries to be skipped/etc.
184        self._event_loop.call_soon_threadsafe(
185            partial(self._add_callback_in_thread, call, feed_existing_logs)
186        )

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

def get_cached( self, start_index: int = 0, max_entries: int | None = None) -> LogArchive:
251    def get_cached(
252        self, start_index: int = 0, max_entries: int | None = None
253    ) -> LogArchive:
254        """Build and return an archive of cached log entries.
255
256        This will only include entries that have been processed by the
257        background thread, so may not include just-submitted logs or
258        entries for partially written stdout/stderr lines. Entries from
259        the range [start_index:start_index+max_entries] which are still
260        present in the cache will be returned.
261        """
262
263        assert start_index >= 0
264        if max_entries is not None:
265            assert max_entries >= 0
266        with self._cache_lock:
267            # Transform start_index to our present cache space.
268            start_index -= self._cache_index_offset
269            # Calc end-index in our present cache space.
270            end_index = (
271                len(self._cache)
272                if max_entries is None
273                else start_index + max_entries
274            )
275
276            # Clamp both indexes to both ends of our present space.
277            start_index = max(0, min(start_index, len(self._cache)))
278            end_index = max(0, min(end_index, len(self._cache)))
279
280            return LogArchive(
281                log_size=self._cache_index_offset + len(self._cache),
282                start_index=start_index + self._cache_index_offset,
283                entries=self._cache_slice(start_index, end_index),
284            )

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

def call_in_thread(self, call: Callable[[], Any]) -> None:
309    def call_in_thread(self, call: Callable[[], Any]) -> None:
310        """Submit a call to be run in the logging background thread."""
311        self._event_loop.call_soon_threadsafe(call)

Submit a call to be run in the logging background thread.

@override
def emit(self, record: logging.LogRecord) -> None:
313    @override
314    def emit(self, record: logging.LogRecord) -> None:
315        # pylint: disable=too-many-branches
316        # pylint: disable=too-many-locals
317
318        if __debug__:
319            starttime = time.monotonic()
320
321        # Called by logging to send us records.
322
323        # Optimization: if our log args are all simple immutable values,
324        # we can just kick the whole thing over to our background thread
325        # to be formatted there at our leisure. If anything is mutable
326        # and thus could possibly change between now and then or if we
327        # want to do immediate file echoing then we need to bite the
328        # bullet and do that stuff here at the call site.
329        fast_path = self._echofile is None and self._is_immutable_log_data(
330            record.args
331        )
332
333        # Note: just assuming types are correct here, but they'll be
334        # checked properly when the resulting LogEntry gets exported.
335        labels: dict[str, str] | None = getattr(record, 'labels', None)
336        if labels is None:
337            labels = {}
338
339        if fast_path:
340            if __debug__:
341                formattime = echotime = time.monotonic()
342            self._event_loop.call_soon_threadsafe(
343                partial(
344                    self._emit_in_thread,
345                    record.name,
346                    record.levelno,
347                    record.created,
348                    record,
349                    labels,
350                )
351            )
352        else:
353            # Slow case; do formatting and echoing here at the log call
354            # site.
355            msg = self.format(record)
356
357            if __debug__:
358                formattime = time.monotonic()
359
360            # Also immediately print pretty colored output to our echo
361            # file (generally stderr). We do this part here instead of
362            # in our bg thread because the delay can throw off command
363            # line prompts or make tight debugging harder.
364            if self._echofile is not None:
365                if self._echofile_timestamp_format == 'relative':
366                    timestamp = f'{record.created - self._launch_time:.3f}'
367                else:
368                    timestamp = (
369                        datetime.datetime.fromtimestamp(
370                            record.created, tz=datetime.UTC
371                        ).strftime('%H:%M:%S')
372                        + f'.{int(record.msecs):03d}'
373                    )
374
375                # If color printing is disabled, show level through text
376                # instead of color.
377                lvlnameex = (
378                    ''
379                    if color_enabled
380                    else f' {logging.getLevelName(record.levelno)}'
381                )
382
383                preinfo = (
384                    f'{Clr.WHT}{timestamp} {record.name}{lvlnameex}:'
385                    f'{Clr.RST} '
386                )
387                ends = LEVELNO_COLOR_CODES.get(record.levelno)
388                if ends is not None:
389                    self._echofile.write(f'{preinfo}{ends[0]}{msg}{ends[1]}\n')
390                else:
391                    self._echofile.write(f'{preinfo}{msg}\n')
392                self._echofile.flush()
393
394            if __debug__:
395                echotime = time.monotonic()
396
397            self._event_loop.call_soon_threadsafe(
398                partial(
399                    self._emit_in_thread,
400                    record.name,
401                    record.levelno,
402                    record.created,
403                    msg,
404                    labels,
405                )
406            )
407
408        if __debug__:
409            # pylint: disable=used-before-assignment
410            #
411            # Make noise if we're taking a significant amount of time
412            # here. Limit the noise to once every so often though;
413            # otherwise we could get a feedback loop where every log
414            # emit results in a warning log which results in another,
415            # etc.
416            now = time.monotonic()
417            duration = now - starttime
418            format_duration = formattime - starttime
419            echo_duration = echotime - formattime
420            if duration > 0.05 and (
421                self._last_slow_emit_warning_time is None
422                or now > self._last_slow_emit_warning_time + 10.0
423            ):
424                # Logging calls from *within* a logging handler sounds
425                # sketchy, so let's just kick this over to the bg event
426                # loop thread we've already got.
427                self._last_slow_emit_warning_time = now
428                self._event_loop.call_soon_threadsafe(
429                    partial(
430                        logging.warning,
431                        'efro.logging.LogHandler emit took too long'
432                        ' (%.2fs total; %.2fs format, %.2fs echo,'
433                        ' fast_path=%s).',
434                        duration,
435                        format_duration,
436                        echo_duration,
437                        fast_path,
438                    )
439                )

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

def file_write(self, name: str, output: str) -> None:
471    def file_write(self, name: str, output: str) -> None:
472        """Send raw stdout/stderr output to the logger to be collated."""
473
474        # Note to self: it turns out that things like '^^^^^^^^^^^^^^'
475        # lines in stack traces get written as lots of individual '^'
476        # writes. It feels a bit dirty to be pushing a deferred call to
477        # another thread for each character. Perhaps should do some sort
478        # of basic accumulation here?
479        self._event_loop.call_soon_threadsafe(
480            partial(self._file_write_in_thread, name, output)
481        )

Send raw stdout/stderr output to the logger to be collated.

def shutdown(self) -> None:
517    def shutdown(self) -> None:
518        """Block until all pending logs/prints are done."""
519        done = False
520        self.file_flush('stdout')
521        self.file_flush('stderr')
522
523        def _set_done() -> None:
524            nonlocal done
525            done = True
526
527        self._event_loop.call_soon_threadsafe(_set_done)
528
529        starttime = time.monotonic()
530        while not done:
531            if time.monotonic() - starttime > 5.0:
532                print('LogHandler shutdown hung!!!', file=sys.stderr)
533                break
534            time.sleep(0.01)

Block until all pending logs/prints are done.

def file_flush(self, name: str) -> None:
536    def file_flush(self, name: str) -> None:
537        """Send raw stdout/stderr flush to the logger to be collated."""
538
539        self._event_loop.call_soon_threadsafe(
540            partial(self._file_flush_in_thread, name)
541        )

Send raw stdout/stderr flush to the logger to be collated.

class FileLogEcho:
635class FileLogEcho:
636    """A file-like object for forwarding stdout/stderr to a LogHandler."""
637
638    def __init__(
639        self, original: TextIO, name: str, handler: LogHandler
640    ) -> None:
641        assert name in ('stdout', 'stderr')
642        self._original = original
643        self._name = name
644        self._handler = handler
645
646    def write(self, output: Any) -> None:
647        """Override standard write call."""
648        self._original.write(output)
649        self._handler.file_write(self._name, output)
650
651    def flush(self) -> None:
652        """Flush the file."""
653        self._original.flush()
654
655        # We also use this as a hint to ship whatever file chunks
656        # we've accumulated (we have to try and be smart about breaking
657        # our arbitrary file output into discrete entries).
658        self._handler.file_flush(self._name)
659
660    def isatty(self) -> bool:
661        """Are we a terminal?"""
662        return self._original.isatty()

A file-like object for forwarding stdout/stderr to a LogHandler.

FileLogEcho( original: <class 'TextIO'>, name: str, handler: LogHandler)
638    def __init__(
639        self, original: TextIO, name: str, handler: LogHandler
640    ) -> None:
641        assert name in ('stdout', 'stderr')
642        self._original = original
643        self._name = name
644        self._handler = handler
def write(self, output: Any) -> None:
646    def write(self, output: Any) -> None:
647        """Override standard write call."""
648        self._original.write(output)
649        self._handler.file_write(self._name, output)

Override standard write call.

def flush(self) -> None:
651    def flush(self) -> None:
652        """Flush the file."""
653        self._original.flush()
654
655        # We also use this as a hint to ship whatever file chunks
656        # we've accumulated (we have to try and be smart about breaking
657        # our arbitrary file output into discrete entries).
658        self._handler.file_flush(self._name)

Flush the file.

def isatty(self) -> bool:
660    def isatty(self) -> bool:
661        """Are we a terminal?"""
662        return self._original.isatty()

Are we a terminal?

def setup_logging( log_path: str | pathlib.Path | None, level: LogLevel, *, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None, launch_time: float | None = None) -> LogHandler:
665def setup_logging(
666    log_path: str | Path | None,
667    level: LogLevel,
668    *,
669    log_stdout_stderr: bool = False,
670    echo_to_stderr: bool = True,
671    cache_size_limit: int = 0,
672    cache_time_limit: datetime.timedelta | None = None,
673    launch_time: float | None = None,
674) -> LogHandler:
675    """Set up our logging environment.
676
677    Returns the custom handler which can be used to fetch information
678    about logs that have passed through it. (worst log-levels, caches, etc.).
679    """
680
681    lmap = {
682        LogLevel.DEBUG: logging.DEBUG,
683        LogLevel.INFO: logging.INFO,
684        LogLevel.WARNING: logging.WARNING,
685        LogLevel.ERROR: logging.ERROR,
686        LogLevel.CRITICAL: logging.CRITICAL,
687    }
688
689    # Wire logger output to go to a structured log file. Also echo it to
690    # stderr IF we're running in a terminal.
691    #
692    # UPDATE: Actually gonna always go to stderr. Is there a reason we
693    # shouldn't? This makes debugging possible if all we have is access
694    # to a non-interactive terminal or file dump. We could add a
695    # '--quiet' arg or whatnot to change this behavior.
696
697    # Note: by passing in the *original* stderr here before we
698    # (potentially) replace it, we ensure that our log echos won't
699    # themselves be intercepted and sent to the logger which would
700    # create an infinite loop.
701    loghandler = LogHandler(
702        path=log_path,
703        echofile=sys.stderr if echo_to_stderr else None,
704        echofile_timestamp_format='relative',
705        cache_size_limit=cache_size_limit,
706        cache_time_limit=cache_time_limit,
707        launch_time=launch_time,
708    )
709
710    # Note: going ahead with force=True here so that we replace any
711    # existing logger. Though we warn if it looks like we are doing that
712    # so we can try to avoid creating the first one.
713    had_previous_handlers = bool(logging.root.handlers)
714    logging.basicConfig(
715        level=lmap[level],
716        # We dump *only* the message here. We pass various log record
717        # bits around so we can write rich logs or format things later.
718        format='%(message)s',
719        handlers=[loghandler],
720        force=True,
721    )
722    if had_previous_handlers:
723        logging.warning(
724            'setup_logging: Replacing existing handlers.'
725            ' Something may have logged before expected.'
726        )
727
728    # Optionally intercept Python's stdout/stderr output and generate
729    # log entries from it.
730    if log_stdout_stderr:
731        sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler)
732        sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler)
733
734    return loghandler

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).