efro.logging
Logging functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Logging functionality.""" 4from __future__ import annotations 5 6import sys 7import time 8import asyncio 9import logging 10import datetime 11import itertools 12from enum import Enum 13from functools import partial 14from collections import deque 15from dataclasses import dataclass, field 16from typing import TYPE_CHECKING, Annotated, override 17from threading import Thread, current_thread, Lock 18 19from efro.util import utc_now 20from efro.terminal import Clr, color_enabled 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json 22 23if TYPE_CHECKING: 24 from pathlib import Path 25 from typing import Any, Callable, TextIO, Literal 26 27 28class LogLevel(Enum): 29 """Severity level for a log entry. 30 31 These enums have numeric values so they can be compared in severity. 32 Note that these values are not currently interchangeable with the 33 logging.ERROR, logging.DEBUG, etc. values. 34 """ 35 36 DEBUG = 0 37 INFO = 1 38 WARNING = 2 39 ERROR = 3 40 CRITICAL = 4 41 42 @property 43 def python_logging_level(self) -> int: 44 """Give the corresponding logging level.""" 45 return LOG_LEVEL_LEVELNOS[self] 46 47 @classmethod 48 def from_python_logging_level(cls, levelno: int) -> LogLevel: 49 """Given a Python logging level, return a LogLevel.""" 50 return LEVELNO_LOG_LEVELS[levelno] 51 52 53# Python logging levels from LogLevels 54LOG_LEVEL_LEVELNOS = { 55 LogLevel.DEBUG: logging.DEBUG, 56 LogLevel.INFO: logging.INFO, 57 LogLevel.WARNING: logging.WARNING, 58 LogLevel.ERROR: logging.ERROR, 59 LogLevel.CRITICAL: logging.CRITICAL, 60} 61 62# LogLevels from Python logging levels 63LEVELNO_LOG_LEVELS = { 64 logging.DEBUG: LogLevel.DEBUG, 65 logging.INFO: LogLevel.INFO, 66 logging.WARNING: LogLevel.WARNING, 67 logging.ERROR: LogLevel.ERROR, 68 logging.CRITICAL: LogLevel.CRITICAL, 69} 70 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = { 72 logging.DEBUG: (Clr.CYN, Clr.RST), 73 logging.INFO: ('', ''), 74 logging.WARNING: (Clr.YLW, Clr.RST), 75 logging.ERROR: (Clr.RED, Clr.RST), 76 logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST), 77} 78 79 80@ioprepped 81@dataclass 82class LogEntry: 83 """Single logged message.""" 84 85 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 86 message: Annotated[str, IOAttrs('m')] 87 level: Annotated[LogLevel, IOAttrs('l')] 88 time: Annotated[datetime.datetime, IOAttrs('t')] 89 90 # We support arbitrary string labels per log entry which can be 91 # incorporated into custom log processing. To populate this, our 92 # LogHandler class looks for a 'labels' dict passed in the optional 93 # 'extra' dict arg to standard Python log calls. 94 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 95 field(default_factory=dict) 96 ) 97 98 99@ioprepped 100@dataclass 101class LogArchive: 102 """Info and data for a log.""" 103 104 # Total number of entries submitted to the log. 105 log_size: Annotated[int, IOAttrs('t')] 106 107 # Offset for the entries contained here. 108 # (10 means our first entry is the 10th in the log, etc.) 109 start_index: Annotated[int, IOAttrs('c')] 110 111 entries: Annotated[list[LogEntry], IOAttrs('e')] 112 113 114class LogHandler(logging.Handler): 115 """Fancy-pants handler for logging output. 116 117 Writes logs to disk in structured json format and echoes them 118 to stdout/stderr with pretty colors. 119 """ 120 121 _event_loop: asyncio.AbstractEventLoop 122 123 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 124 # Otherwise we can get infinite loops as those prints come back to us 125 # as new log entries. 126 127 def __init__( 128 self, 129 *, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 cache_size_limit: int, 133 cache_time_limit: datetime.timedelta | None, 134 echofile_timestamp_format: Literal['default', 'relative'] = 'default', 135 launch_time: float | None = None, 136 ): 137 super().__init__() 138 # pylint: disable=consider-using-with 139 self._file = None if path is None else open(path, 'w', encoding='utf-8') 140 self._echofile = echofile 141 self._echofile_timestamp_format = echofile_timestamp_format 142 self._callbacks: list[Callable[[LogEntry], None]] = [] 143 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 144 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 145 'stdout': None, 146 'stderr': None, 147 } 148 self._launch_time = time.time() if launch_time is None else launch_time 149 self._cache_size = 0 150 assert cache_size_limit >= 0 151 self._cache_size_limit = cache_size_limit 152 self._cache_time_limit = cache_time_limit 153 self._cache = deque[tuple[int, LogEntry]]() 154 self._cache_index_offset = 0 155 self._cache_lock = Lock() 156 self._printed_callback_error = False 157 self._thread_bootstrapped = False 158 self._thread = Thread(target=self._log_thread_main, daemon=True) 159 if __debug__: 160 self._last_slow_emit_warning_time: float | None = None 161 self._thread.start() 162 163 # Spin until our thread is up and running; otherwise we could 164 # wind up trying to push stuff to our event loop before the loop 165 # exists. 166 while not self._thread_bootstrapped: 167 time.sleep(0.001) 168 169 def add_callback( 170 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 171 ) -> None: 172 """Add a callback to be run for each LogEntry. 173 174 Note that this callback will always run in a background thread. 175 Passing True for feed_existing_logs will cause all cached logs 176 in the handler to be fed to the callback (still in the 177 background thread though). 178 """ 179 180 # Kick this over to our bg thread to add the callback and 181 # process cached entries at the same time to ensure there are no 182 # race conditions that could cause entries to be skipped/etc. 183 self._event_loop.call_soon_threadsafe( 184 partial(self._add_callback_in_thread, call, feed_existing_logs) 185 ) 186 187 def _add_callback_in_thread( 188 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 189 ) -> None: 190 """Add a callback to be run for each LogEntry. 191 192 Note that this callback will always run in a background thread. 193 Passing True for feed_existing_logs will cause all cached logs 194 in the handler to be fed to the callback (still in the 195 background thread though). 196 """ 197 assert current_thread() is self._thread 198 self._callbacks.append(call) 199 200 # Run all of our cached entries through the new callback if desired. 201 if feed_existing_logs and self._cache_size_limit > 0: 202 with self._cache_lock: 203 for _id, entry in self._cache: 204 self._run_callback_on_entry(call, entry) 205 206 def _log_thread_main(self) -> None: 207 self._event_loop = asyncio.new_event_loop() 208 209 # In our background thread event loop we do a fair amount of 210 # slow synchronous stuff such as mucking with the log cache. 211 # Let's avoid getting tons of warnings about this in debug mode. 212 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 213 214 # NOTE: if we ever use default threadpool at all we should allow 215 # setting it for our loop. 216 asyncio.set_event_loop(self._event_loop) 217 self._thread_bootstrapped = True 218 try: 219 if self._cache_time_limit is not None: 220 _prunetask = self._event_loop.create_task( 221 self._time_prune_cache() 222 ) 223 self._event_loop.run_forever() 224 except BaseException: 225 # If this ever goes down we're in trouble; we won't be able 226 # to log about it though. Try to make some noise however we 227 # can. 228 print('LogHandler died!!!', file=sys.stderr) 229 import traceback 230 231 traceback.print_exc() 232 raise 233 234 async def _time_prune_cache(self) -> None: 235 assert self._cache_time_limit is not None 236 while bool(True): 237 await asyncio.sleep(61.27) 238 now = utc_now() 239 with self._cache_lock: 240 # Prune the oldest entry as long as there is a first one 241 # that is too old. 242 while ( 243 self._cache 244 and (now - self._cache[0][1].time) >= self._cache_time_limit 245 ): 246 popped = self._cache.popleft() 247 self._cache_size -= popped[0] 248 self._cache_index_offset += 1 249 250 def get_cached( 251 self, start_index: int = 0, max_entries: int | None = None 252 ) -> LogArchive: 253 """Build and return an archive of cached log entries. 254 255 This will only include entries that have been processed by the 256 background thread, so may not include just-submitted logs or 257 entries for partially written stdout/stderr lines. Entries from 258 the range [start_index:start_index+max_entries] which are still 259 present in the cache will be returned. 260 """ 261 262 assert start_index >= 0 263 if max_entries is not None: 264 assert max_entries >= 0 265 with self._cache_lock: 266 # Transform start_index to our present cache space. 267 start_index -= self._cache_index_offset 268 # Calc end-index in our present cache space. 269 end_index = ( 270 len(self._cache) 271 if max_entries is None 272 else start_index + max_entries 273 ) 274 275 # Clamp both indexes to both ends of our present space. 276 start_index = max(0, min(start_index, len(self._cache))) 277 end_index = max(0, min(end_index, len(self._cache))) 278 279 return LogArchive( 280 log_size=self._cache_index_offset + len(self._cache), 281 start_index=start_index + self._cache_index_offset, 282 entries=self._cache_slice(start_index, end_index), 283 ) 284 285 def _cache_slice( 286 self, start: int, end: int, step: int = 1 287 ) -> list[LogEntry]: 288 # Deque doesn't natively support slicing but we can do it 289 # manually. It sounds like rotating the deque and pulling from 290 # the beginning is the most efficient way to do this. The 291 # downside is the deque gets temporarily modified in the process 292 # so we need to make sure we're holding the lock. 293 assert self._cache_lock.locked() 294 cache = self._cache 295 cache.rotate(-start) 296 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 297 cache.rotate(start) 298 return slc 299 300 @classmethod 301 def _is_immutable_log_data(cls, data: Any) -> bool: 302 if isinstance(data, (str, bool, int, float, bytes)): 303 return True 304 if isinstance(data, tuple): 305 return all(cls._is_immutable_log_data(x) for x in data) 306 return False 307 308 def call_in_thread(self, call: Callable[[], Any]) -> None: 309 """Submit a call to be run in the logging background thread.""" 310 self._event_loop.call_soon_threadsafe(call) 311 312 @override 313 def emit(self, record: logging.LogRecord) -> None: 314 # pylint: disable=too-many-branches 315 # pylint: disable=too-many-locals 316 317 if __debug__: 318 starttime = time.monotonic() 319 320 # Called by logging to send us records. 321 322 # Optimization: if our log args are all simple immutable values, 323 # we can just kick the whole thing over to our background thread 324 # to be formatted there at our leisure. If anything is mutable 325 # and thus could possibly change between now and then or if we 326 # want to do immediate file echoing then we need to bite the 327 # bullet and do that stuff here at the call site. 328 fast_path = self._echofile is None and self._is_immutable_log_data( 329 record.args 330 ) 331 332 # Note: just assuming types are correct here, but they'll be 333 # checked properly when the resulting LogEntry gets exported. 334 labels: dict[str, str] | None = getattr(record, 'labels', None) 335 if labels is None: 336 labels = {} 337 338 if fast_path: 339 if __debug__: 340 formattime = echotime = time.monotonic() 341 self._event_loop.call_soon_threadsafe( 342 partial( 343 self._emit_in_thread, 344 record.name, 345 record.levelno, 346 record.created, 347 record, 348 labels, 349 ) 350 ) 351 else: 352 # Slow case; do formatting and echoing here at the log call 353 # site. 354 msg = self.format(record) 355 356 if __debug__: 357 formattime = time.monotonic() 358 359 # Also immediately print pretty colored output to our echo 360 # file (generally stderr). We do this part here instead of 361 # in our bg thread because the delay can throw off command 362 # line prompts or make tight debugging harder. 363 if self._echofile is not None: 364 if self._echofile_timestamp_format == 'relative': 365 timestamp = f'{record.created - self._launch_time:.3f}' 366 else: 367 timestamp = ( 368 datetime.datetime.fromtimestamp( 369 record.created, tz=datetime.UTC 370 ).strftime('%H:%M:%S') 371 + f'.{int(record.msecs):03d}' 372 ) 373 374 # If color printing is disabled, show level through text 375 # instead of color. 376 lvlnameex = ( 377 '' 378 if color_enabled 379 else f' {logging.getLevelName(record.levelno)}' 380 ) 381 382 preinfo = ( 383 f'{Clr.WHT}{timestamp} {record.name}{lvlnameex}:' 384 f'{Clr.RST} ' 385 ) 386 ends = LEVELNO_COLOR_CODES.get(record.levelno) 387 if ends is not None: 388 self._echofile.write(f'{preinfo}{ends[0]}{msg}{ends[1]}\n') 389 else: 390 self._echofile.write(f'{preinfo}{msg}\n') 391 self._echofile.flush() 392 393 if __debug__: 394 echotime = time.monotonic() 395 396 self._event_loop.call_soon_threadsafe( 397 partial( 398 self._emit_in_thread, 399 record.name, 400 record.levelno, 401 record.created, 402 msg, 403 labels, 404 ) 405 ) 406 407 if __debug__: 408 # pylint: disable=used-before-assignment 409 # 410 # Make noise if we're taking a significant amount of time 411 # here. Limit the noise to once every so often though; 412 # otherwise we could get a feedback loop where every log 413 # emit results in a warning log which results in another, 414 # etc. 415 now = time.monotonic() 416 duration = now - starttime 417 format_duration = formattime - starttime 418 echo_duration = echotime - formattime 419 if duration > 0.05 and ( 420 self._last_slow_emit_warning_time is None 421 or now > self._last_slow_emit_warning_time + 10.0 422 ): 423 # Logging calls from *within* a logging handler sounds 424 # sketchy, so let's just kick this over to the bg event 425 # loop thread we've already got. 426 self._last_slow_emit_warning_time = now 427 self._event_loop.call_soon_threadsafe( 428 partial( 429 logging.warning, 430 'efro.logging.LogHandler emit took too long' 431 ' (%.2fs total; %.2fs format, %.2fs echo,' 432 ' fast_path=%s).', 433 duration, 434 format_duration, 435 echo_duration, 436 fast_path, 437 ) 438 ) 439 440 def _emit_in_thread( 441 self, 442 name: str, 443 levelno: int, 444 created: float, 445 message: str | logging.LogRecord, 446 labels: dict[str, str], 447 ) -> None: 448 # pylint: disable=too-many-positional-arguments 449 try: 450 # If they passed a raw record here, bake it down to a string. 451 if isinstance(message, logging.LogRecord): 452 message = self.format(message) 453 454 self._emit_entry( 455 LogEntry( 456 name=name, 457 message=message, 458 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 459 time=datetime.datetime.fromtimestamp( 460 created, datetime.timezone.utc 461 ), 462 labels=labels, 463 ) 464 ) 465 except Exception: 466 import traceback 467 468 traceback.print_exc(file=self._echofile) 469 470 def file_write(self, name: str, output: str) -> None: 471 """Send raw stdout/stderr output to the logger to be collated.""" 472 473 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 474 # lines in stack traces get written as lots of individual '^' 475 # writes. It feels a bit dirty to be pushing a deferred call to 476 # another thread for each character. Perhaps should do some sort 477 # of basic accumulation here? 478 self._event_loop.call_soon_threadsafe( 479 partial(self._file_write_in_thread, name, output) 480 ) 481 482 def _file_write_in_thread(self, name: str, output: str) -> None: 483 try: 484 assert name in ('stdout', 'stderr') 485 486 # Here we try to be somewhat smart about breaking arbitrary 487 # print output into discrete log entries. 488 489 self._file_chunks[name].append(output) 490 491 # Individual parts of a print come across as separate 492 # writes, and the end of a print will be a standalone '\n' 493 # by default. Let's use that as a hint that we're likely at 494 # the end of a full print statement and ship what we've got. 495 if output == '\n': 496 self._ship_file_chunks(name, cancel_ship_task=True) 497 else: 498 # By default just keep adding chunks. However we keep a 499 # timer running anytime we've got unshipped chunks so 500 # that we can ship what we've got after a short bit if 501 # we never get a newline. 502 ship_task = self._file_chunk_ship_task[name] 503 if ship_task is None: 504 self._file_chunk_ship_task[name] = ( 505 self._event_loop.create_task( 506 self._ship_chunks_task(name), 507 name='log ship file chunks', 508 ) 509 ) 510 511 except Exception: 512 import traceback 513 514 traceback.print_exc(file=self._echofile) 515 516 def shutdown(self) -> None: 517 """Block until all pending logs/prints are done.""" 518 done = False 519 self.file_flush('stdout') 520 self.file_flush('stderr') 521 522 def _set_done() -> None: 523 nonlocal done 524 done = True 525 526 self._event_loop.call_soon_threadsafe(_set_done) 527 528 starttime = time.monotonic() 529 while not done: 530 if time.monotonic() - starttime > 5.0: 531 print('LogHandler shutdown hung!!!', file=sys.stderr) 532 break 533 time.sleep(0.01) 534 535 def file_flush(self, name: str) -> None: 536 """Send raw stdout/stderr flush to the logger to be collated.""" 537 538 self._event_loop.call_soon_threadsafe( 539 partial(self._file_flush_in_thread, name) 540 ) 541 542 def _file_flush_in_thread(self, name: str) -> None: 543 try: 544 assert name in ('stdout', 'stderr') 545 546 # Immediately ship whatever chunks we've got. 547 if self._file_chunks[name]: 548 self._ship_file_chunks(name, cancel_ship_task=True) 549 550 except Exception: 551 import traceback 552 553 traceback.print_exc(file=self._echofile) 554 555 async def _ship_chunks_task(self, name: str) -> None: 556 # Note: it's important we sleep here for a moment. Otherwise, 557 # things like '^^^^^^^^^^^^' lines in stack traces, which come 558 # through as lots of individual '^' writes, tend to get broken 559 # into lots of tiny little lines by us. 560 await asyncio.sleep(0.01) 561 self._ship_file_chunks(name, cancel_ship_task=False) 562 563 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 564 # Note: Raw print input generally ends in a newline, but that is 565 # redundant when we break things into log entries and results in 566 # extra empty lines. So strip off a single trailing newline if 567 # one is present. 568 text = ''.join(self._file_chunks[name]).removesuffix('\n') 569 570 self._emit_entry( 571 LogEntry( 572 name=name, message=text, level=LogLevel.INFO, time=utc_now() 573 ) 574 ) 575 self._file_chunks[name] = [] 576 ship_task = self._file_chunk_ship_task[name] 577 if cancel_ship_task and ship_task is not None: 578 ship_task.cancel() 579 self._file_chunk_ship_task[name] = None 580 581 def _emit_entry(self, entry: LogEntry) -> None: 582 assert current_thread() is self._thread 583 584 # Store to our cache. 585 if self._cache_size_limit > 0: 586 with self._cache_lock: 587 # Do a rough calc of how many bytes this entry consumes. 588 entry_size = sum( 589 sys.getsizeof(x) 590 for x in ( 591 entry, 592 entry.name, 593 entry.message, 594 entry.level, 595 entry.time, 596 ) 597 ) 598 self._cache.append((entry_size, entry)) 599 self._cache_size += entry_size 600 601 # Prune old until we are back at or under our limit. 602 while self._cache_size > self._cache_size_limit: 603 popped = self._cache.popleft() 604 self._cache_size -= popped[0] 605 self._cache_index_offset += 1 606 607 # Pass to callbacks. 608 for call in self._callbacks: 609 self._run_callback_on_entry(call, entry) 610 611 # Dump to our structured log file. 612 # 613 # TODO: should set a timer for flushing; don't flush every line. 614 if self._file is not None: 615 entry_s = dataclass_to_json(entry) 616 assert '\n' not in entry_s # Make sure its a single line. 617 print(entry_s, file=self._file, flush=True) 618 619 def _run_callback_on_entry( 620 self, callback: Callable[[LogEntry], None], entry: LogEntry 621 ) -> None: 622 """Run a callback and handle any errors.""" 623 try: 624 callback(entry) 625 except Exception: 626 # Only print the first callback error to avoid insanity. 627 if not self._printed_callback_error: 628 import traceback 629 630 traceback.print_exc(file=self._echofile) 631 self._printed_callback_error = True 632 633 634class FileLogEcho: 635 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 636 637 def __init__( 638 self, original: TextIO, name: str, handler: LogHandler 639 ) -> None: 640 assert name in ('stdout', 'stderr') 641 self._original = original 642 self._name = name 643 self._handler = handler 644 645 def write(self, output: Any) -> None: 646 """Override standard write call.""" 647 self._original.write(output) 648 self._handler.file_write(self._name, output) 649 650 def flush(self) -> None: 651 """Flush the file.""" 652 self._original.flush() 653 654 # We also use this as a hint to ship whatever file chunks 655 # we've accumulated (we have to try and be smart about breaking 656 # our arbitrary file output into discrete entries). 657 self._handler.file_flush(self._name) 658 659 def isatty(self) -> bool: 660 """Are we a terminal?""" 661 return self._original.isatty() 662 663 664def setup_logging( 665 log_path: str | Path | None, 666 level: LogLevel, 667 *, 668 log_stdout_stderr: bool = False, 669 echo_to_stderr: bool = True, 670 cache_size_limit: int = 0, 671 cache_time_limit: datetime.timedelta | None = None, 672 launch_time: float | None = None, 673) -> LogHandler: 674 """Set up our logging environment. 675 676 Returns the custom handler which can be used to fetch information 677 about logs that have passed through it. (worst log-levels, caches, etc.). 678 """ 679 680 lmap = { 681 LogLevel.DEBUG: logging.DEBUG, 682 LogLevel.INFO: logging.INFO, 683 LogLevel.WARNING: logging.WARNING, 684 LogLevel.ERROR: logging.ERROR, 685 LogLevel.CRITICAL: logging.CRITICAL, 686 } 687 688 # Wire logger output to go to a structured log file. Also echo it to 689 # stderr IF we're running in a terminal. 690 # 691 # UPDATE: Actually gonna always go to stderr. Is there a reason we 692 # shouldn't? This makes debugging possible if all we have is access 693 # to a non-interactive terminal or file dump. We could add a 694 # '--quiet' arg or whatnot to change this behavior. 695 696 # Note: by passing in the *original* stderr here before we 697 # (potentially) replace it, we ensure that our log echos won't 698 # themselves be intercepted and sent to the logger which would 699 # create an infinite loop. 700 loghandler = LogHandler( 701 path=log_path, 702 echofile=sys.stderr if echo_to_stderr else None, 703 echofile_timestamp_format='relative', 704 cache_size_limit=cache_size_limit, 705 cache_time_limit=cache_time_limit, 706 launch_time=launch_time, 707 ) 708 709 # Note: going ahead with force=True here so that we replace any 710 # existing logger. Though we warn if it looks like we are doing that 711 # so we can try to avoid creating the first one. 712 had_previous_handlers = bool(logging.root.handlers) 713 logging.basicConfig( 714 level=lmap[level], 715 # We dump *only* the message here. We pass various log record 716 # bits around so we can write rich logs or format things later. 717 format='%(message)s', 718 handlers=[loghandler], 719 force=True, 720 ) 721 if had_previous_handlers: 722 logging.warning( 723 'setup_logging: Replacing existing handlers.' 724 ' Something may have logged before expected.' 725 ) 726 727 # Optionally intercept Python's stdout/stderr output and generate 728 # log entries from it. 729 if log_stdout_stderr: 730 sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler) 731 sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler) 732 733 return loghandler
29class LogLevel(Enum): 30 """Severity level for a log entry. 31 32 These enums have numeric values so they can be compared in severity. 33 Note that these values are not currently interchangeable with the 34 logging.ERROR, logging.DEBUG, etc. values. 35 """ 36 37 DEBUG = 0 38 INFO = 1 39 WARNING = 2 40 ERROR = 3 41 CRITICAL = 4 42 43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self] 47 48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Severity level for a log entry.
These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.
43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self]
Give the corresponding logging level.
81@ioprepped 82@dataclass 83class LogEntry: 84 """Single logged message.""" 85 86 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 87 message: Annotated[str, IOAttrs('m')] 88 level: Annotated[LogLevel, IOAttrs('l')] 89 time: Annotated[datetime.datetime, IOAttrs('t')] 90 91 # We support arbitrary string labels per log entry which can be 92 # incorporated into custom log processing. To populate this, our 93 # LogHandler class looks for a 'labels' dict passed in the optional 94 # 'extra' dict arg to standard Python log calls. 95 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 96 field(default_factory=dict) 97 )
Single logged message.
100@ioprepped 101@dataclass 102class LogArchive: 103 """Info and data for a log.""" 104 105 # Total number of entries submitted to the log. 106 log_size: Annotated[int, IOAttrs('t')] 107 108 # Offset for the entries contained here. 109 # (10 means our first entry is the 10th in the log, etc.) 110 start_index: Annotated[int, IOAttrs('c')] 111 112 entries: Annotated[list[LogEntry], IOAttrs('e')]
Info and data for a log.
115class LogHandler(logging.Handler): 116 """Fancy-pants handler for logging output. 117 118 Writes logs to disk in structured json format and echoes them 119 to stdout/stderr with pretty colors. 120 """ 121 122 _event_loop: asyncio.AbstractEventLoop 123 124 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 125 # Otherwise we can get infinite loops as those prints come back to us 126 # as new log entries. 127 128 def __init__( 129 self, 130 *, 131 path: str | Path | None, 132 echofile: TextIO | None, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 echofile_timestamp_format: Literal['default', 'relative'] = 'default', 136 launch_time: float | None = None, 137 ): 138 super().__init__() 139 # pylint: disable=consider-using-with 140 self._file = None if path is None else open(path, 'w', encoding='utf-8') 141 self._echofile = echofile 142 self._echofile_timestamp_format = echofile_timestamp_format 143 self._callbacks: list[Callable[[LogEntry], None]] = [] 144 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 145 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 146 'stdout': None, 147 'stderr': None, 148 } 149 self._launch_time = time.time() if launch_time is None else launch_time 150 self._cache_size = 0 151 assert cache_size_limit >= 0 152 self._cache_size_limit = cache_size_limit 153 self._cache_time_limit = cache_time_limit 154 self._cache = deque[tuple[int, LogEntry]]() 155 self._cache_index_offset = 0 156 self._cache_lock = Lock() 157 self._printed_callback_error = False 158 self._thread_bootstrapped = False 159 self._thread = Thread(target=self._log_thread_main, daemon=True) 160 if __debug__: 161 self._last_slow_emit_warning_time: float | None = None 162 self._thread.start() 163 164 # Spin until our thread is up and running; otherwise we could 165 # wind up trying to push stuff to our event loop before the loop 166 # exists. 167 while not self._thread_bootstrapped: 168 time.sleep(0.001) 169 170 def add_callback( 171 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 172 ) -> None: 173 """Add a callback to be run for each LogEntry. 174 175 Note that this callback will always run in a background thread. 176 Passing True for feed_existing_logs will cause all cached logs 177 in the handler to be fed to the callback (still in the 178 background thread though). 179 """ 180 181 # Kick this over to our bg thread to add the callback and 182 # process cached entries at the same time to ensure there are no 183 # race conditions that could cause entries to be skipped/etc. 184 self._event_loop.call_soon_threadsafe( 185 partial(self._add_callback_in_thread, call, feed_existing_logs) 186 ) 187 188 def _add_callback_in_thread( 189 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 190 ) -> None: 191 """Add a callback to be run for each LogEntry. 192 193 Note that this callback will always run in a background thread. 194 Passing True for feed_existing_logs will cause all cached logs 195 in the handler to be fed to the callback (still in the 196 background thread though). 197 """ 198 assert current_thread() is self._thread 199 self._callbacks.append(call) 200 201 # Run all of our cached entries through the new callback if desired. 202 if feed_existing_logs and self._cache_size_limit > 0: 203 with self._cache_lock: 204 for _id, entry in self._cache: 205 self._run_callback_on_entry(call, entry) 206 207 def _log_thread_main(self) -> None: 208 self._event_loop = asyncio.new_event_loop() 209 210 # In our background thread event loop we do a fair amount of 211 # slow synchronous stuff such as mucking with the log cache. 212 # Let's avoid getting tons of warnings about this in debug mode. 213 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 214 215 # NOTE: if we ever use default threadpool at all we should allow 216 # setting it for our loop. 217 asyncio.set_event_loop(self._event_loop) 218 self._thread_bootstrapped = True 219 try: 220 if self._cache_time_limit is not None: 221 _prunetask = self._event_loop.create_task( 222 self._time_prune_cache() 223 ) 224 self._event_loop.run_forever() 225 except BaseException: 226 # If this ever goes down we're in trouble; we won't be able 227 # to log about it though. Try to make some noise however we 228 # can. 229 print('LogHandler died!!!', file=sys.stderr) 230 import traceback 231 232 traceback.print_exc() 233 raise 234 235 async def _time_prune_cache(self) -> None: 236 assert self._cache_time_limit is not None 237 while bool(True): 238 await asyncio.sleep(61.27) 239 now = utc_now() 240 with self._cache_lock: 241 # Prune the oldest entry as long as there is a first one 242 # that is too old. 243 while ( 244 self._cache 245 and (now - self._cache[0][1].time) >= self._cache_time_limit 246 ): 247 popped = self._cache.popleft() 248 self._cache_size -= popped[0] 249 self._cache_index_offset += 1 250 251 def get_cached( 252 self, start_index: int = 0, max_entries: int | None = None 253 ) -> LogArchive: 254 """Build and return an archive of cached log entries. 255 256 This will only include entries that have been processed by the 257 background thread, so may not include just-submitted logs or 258 entries for partially written stdout/stderr lines. Entries from 259 the range [start_index:start_index+max_entries] which are still 260 present in the cache will be returned. 261 """ 262 263 assert start_index >= 0 264 if max_entries is not None: 265 assert max_entries >= 0 266 with self._cache_lock: 267 # Transform start_index to our present cache space. 268 start_index -= self._cache_index_offset 269 # Calc end-index in our present cache space. 270 end_index = ( 271 len(self._cache) 272 if max_entries is None 273 else start_index + max_entries 274 ) 275 276 # Clamp both indexes to both ends of our present space. 277 start_index = max(0, min(start_index, len(self._cache))) 278 end_index = max(0, min(end_index, len(self._cache))) 279 280 return LogArchive( 281 log_size=self._cache_index_offset + len(self._cache), 282 start_index=start_index + self._cache_index_offset, 283 entries=self._cache_slice(start_index, end_index), 284 ) 285 286 def _cache_slice( 287 self, start: int, end: int, step: int = 1 288 ) -> list[LogEntry]: 289 # Deque doesn't natively support slicing but we can do it 290 # manually. It sounds like rotating the deque and pulling from 291 # the beginning is the most efficient way to do this. The 292 # downside is the deque gets temporarily modified in the process 293 # so we need to make sure we're holding the lock. 294 assert self._cache_lock.locked() 295 cache = self._cache 296 cache.rotate(-start) 297 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 298 cache.rotate(start) 299 return slc 300 301 @classmethod 302 def _is_immutable_log_data(cls, data: Any) -> bool: 303 if isinstance(data, (str, bool, int, float, bytes)): 304 return True 305 if isinstance(data, tuple): 306 return all(cls._is_immutable_log_data(x) for x in data) 307 return False 308 309 def call_in_thread(self, call: Callable[[], Any]) -> None: 310 """Submit a call to be run in the logging background thread.""" 311 self._event_loop.call_soon_threadsafe(call) 312 313 @override 314 def emit(self, record: logging.LogRecord) -> None: 315 # pylint: disable=too-many-branches 316 # pylint: disable=too-many-locals 317 318 if __debug__: 319 starttime = time.monotonic() 320 321 # Called by logging to send us records. 322 323 # Optimization: if our log args are all simple immutable values, 324 # we can just kick the whole thing over to our background thread 325 # to be formatted there at our leisure. If anything is mutable 326 # and thus could possibly change between now and then or if we 327 # want to do immediate file echoing then we need to bite the 328 # bullet and do that stuff here at the call site. 329 fast_path = self._echofile is None and self._is_immutable_log_data( 330 record.args 331 ) 332 333 # Note: just assuming types are correct here, but they'll be 334 # checked properly when the resulting LogEntry gets exported. 335 labels: dict[str, str] | None = getattr(record, 'labels', None) 336 if labels is None: 337 labels = {} 338 339 if fast_path: 340 if __debug__: 341 formattime = echotime = time.monotonic() 342 self._event_loop.call_soon_threadsafe( 343 partial( 344 self._emit_in_thread, 345 record.name, 346 record.levelno, 347 record.created, 348 record, 349 labels, 350 ) 351 ) 352 else: 353 # Slow case; do formatting and echoing here at the log call 354 # site. 355 msg = self.format(record) 356 357 if __debug__: 358 formattime = time.monotonic() 359 360 # Also immediately print pretty colored output to our echo 361 # file (generally stderr). We do this part here instead of 362 # in our bg thread because the delay can throw off command 363 # line prompts or make tight debugging harder. 364 if self._echofile is not None: 365 if self._echofile_timestamp_format == 'relative': 366 timestamp = f'{record.created - self._launch_time:.3f}' 367 else: 368 timestamp = ( 369 datetime.datetime.fromtimestamp( 370 record.created, tz=datetime.UTC 371 ).strftime('%H:%M:%S') 372 + f'.{int(record.msecs):03d}' 373 ) 374 375 # If color printing is disabled, show level through text 376 # instead of color. 377 lvlnameex = ( 378 '' 379 if color_enabled 380 else f' {logging.getLevelName(record.levelno)}' 381 ) 382 383 preinfo = ( 384 f'{Clr.WHT}{timestamp} {record.name}{lvlnameex}:' 385 f'{Clr.RST} ' 386 ) 387 ends = LEVELNO_COLOR_CODES.get(record.levelno) 388 if ends is not None: 389 self._echofile.write(f'{preinfo}{ends[0]}{msg}{ends[1]}\n') 390 else: 391 self._echofile.write(f'{preinfo}{msg}\n') 392 self._echofile.flush() 393 394 if __debug__: 395 echotime = time.monotonic() 396 397 self._event_loop.call_soon_threadsafe( 398 partial( 399 self._emit_in_thread, 400 record.name, 401 record.levelno, 402 record.created, 403 msg, 404 labels, 405 ) 406 ) 407 408 if __debug__: 409 # pylint: disable=used-before-assignment 410 # 411 # Make noise if we're taking a significant amount of time 412 # here. Limit the noise to once every so often though; 413 # otherwise we could get a feedback loop where every log 414 # emit results in a warning log which results in another, 415 # etc. 416 now = time.monotonic() 417 duration = now - starttime 418 format_duration = formattime - starttime 419 echo_duration = echotime - formattime 420 if duration > 0.05 and ( 421 self._last_slow_emit_warning_time is None 422 or now > self._last_slow_emit_warning_time + 10.0 423 ): 424 # Logging calls from *within* a logging handler sounds 425 # sketchy, so let's just kick this over to the bg event 426 # loop thread we've already got. 427 self._last_slow_emit_warning_time = now 428 self._event_loop.call_soon_threadsafe( 429 partial( 430 logging.warning, 431 'efro.logging.LogHandler emit took too long' 432 ' (%.2fs total; %.2fs format, %.2fs echo,' 433 ' fast_path=%s).', 434 duration, 435 format_duration, 436 echo_duration, 437 fast_path, 438 ) 439 ) 440 441 def _emit_in_thread( 442 self, 443 name: str, 444 levelno: int, 445 created: float, 446 message: str | logging.LogRecord, 447 labels: dict[str, str], 448 ) -> None: 449 # pylint: disable=too-many-positional-arguments 450 try: 451 # If they passed a raw record here, bake it down to a string. 452 if isinstance(message, logging.LogRecord): 453 message = self.format(message) 454 455 self._emit_entry( 456 LogEntry( 457 name=name, 458 message=message, 459 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 460 time=datetime.datetime.fromtimestamp( 461 created, datetime.timezone.utc 462 ), 463 labels=labels, 464 ) 465 ) 466 except Exception: 467 import traceback 468 469 traceback.print_exc(file=self._echofile) 470 471 def file_write(self, name: str, output: str) -> None: 472 """Send raw stdout/stderr output to the logger to be collated.""" 473 474 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 475 # lines in stack traces get written as lots of individual '^' 476 # writes. It feels a bit dirty to be pushing a deferred call to 477 # another thread for each character. Perhaps should do some sort 478 # of basic accumulation here? 479 self._event_loop.call_soon_threadsafe( 480 partial(self._file_write_in_thread, name, output) 481 ) 482 483 def _file_write_in_thread(self, name: str, output: str) -> None: 484 try: 485 assert name in ('stdout', 'stderr') 486 487 # Here we try to be somewhat smart about breaking arbitrary 488 # print output into discrete log entries. 489 490 self._file_chunks[name].append(output) 491 492 # Individual parts of a print come across as separate 493 # writes, and the end of a print will be a standalone '\n' 494 # by default. Let's use that as a hint that we're likely at 495 # the end of a full print statement and ship what we've got. 496 if output == '\n': 497 self._ship_file_chunks(name, cancel_ship_task=True) 498 else: 499 # By default just keep adding chunks. However we keep a 500 # timer running anytime we've got unshipped chunks so 501 # that we can ship what we've got after a short bit if 502 # we never get a newline. 503 ship_task = self._file_chunk_ship_task[name] 504 if ship_task is None: 505 self._file_chunk_ship_task[name] = ( 506 self._event_loop.create_task( 507 self._ship_chunks_task(name), 508 name='log ship file chunks', 509 ) 510 ) 511 512 except Exception: 513 import traceback 514 515 traceback.print_exc(file=self._echofile) 516 517 def shutdown(self) -> None: 518 """Block until all pending logs/prints are done.""" 519 done = False 520 self.file_flush('stdout') 521 self.file_flush('stderr') 522 523 def _set_done() -> None: 524 nonlocal done 525 done = True 526 527 self._event_loop.call_soon_threadsafe(_set_done) 528 529 starttime = time.monotonic() 530 while not done: 531 if time.monotonic() - starttime > 5.0: 532 print('LogHandler shutdown hung!!!', file=sys.stderr) 533 break 534 time.sleep(0.01) 535 536 def file_flush(self, name: str) -> None: 537 """Send raw stdout/stderr flush to the logger to be collated.""" 538 539 self._event_loop.call_soon_threadsafe( 540 partial(self._file_flush_in_thread, name) 541 ) 542 543 def _file_flush_in_thread(self, name: str) -> None: 544 try: 545 assert name in ('stdout', 'stderr') 546 547 # Immediately ship whatever chunks we've got. 548 if self._file_chunks[name]: 549 self._ship_file_chunks(name, cancel_ship_task=True) 550 551 except Exception: 552 import traceback 553 554 traceback.print_exc(file=self._echofile) 555 556 async def _ship_chunks_task(self, name: str) -> None: 557 # Note: it's important we sleep here for a moment. Otherwise, 558 # things like '^^^^^^^^^^^^' lines in stack traces, which come 559 # through as lots of individual '^' writes, tend to get broken 560 # into lots of tiny little lines by us. 561 await asyncio.sleep(0.01) 562 self._ship_file_chunks(name, cancel_ship_task=False) 563 564 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 565 # Note: Raw print input generally ends in a newline, but that is 566 # redundant when we break things into log entries and results in 567 # extra empty lines. So strip off a single trailing newline if 568 # one is present. 569 text = ''.join(self._file_chunks[name]).removesuffix('\n') 570 571 self._emit_entry( 572 LogEntry( 573 name=name, message=text, level=LogLevel.INFO, time=utc_now() 574 ) 575 ) 576 self._file_chunks[name] = [] 577 ship_task = self._file_chunk_ship_task[name] 578 if cancel_ship_task and ship_task is not None: 579 ship_task.cancel() 580 self._file_chunk_ship_task[name] = None 581 582 def _emit_entry(self, entry: LogEntry) -> None: 583 assert current_thread() is self._thread 584 585 # Store to our cache. 586 if self._cache_size_limit > 0: 587 with self._cache_lock: 588 # Do a rough calc of how many bytes this entry consumes. 589 entry_size = sum( 590 sys.getsizeof(x) 591 for x in ( 592 entry, 593 entry.name, 594 entry.message, 595 entry.level, 596 entry.time, 597 ) 598 ) 599 self._cache.append((entry_size, entry)) 600 self._cache_size += entry_size 601 602 # Prune old until we are back at or under our limit. 603 while self._cache_size > self._cache_size_limit: 604 popped = self._cache.popleft() 605 self._cache_size -= popped[0] 606 self._cache_index_offset += 1 607 608 # Pass to callbacks. 609 for call in self._callbacks: 610 self._run_callback_on_entry(call, entry) 611 612 # Dump to our structured log file. 613 # 614 # TODO: should set a timer for flushing; don't flush every line. 615 if self._file is not None: 616 entry_s = dataclass_to_json(entry) 617 assert '\n' not in entry_s # Make sure its a single line. 618 print(entry_s, file=self._file, flush=True) 619 620 def _run_callback_on_entry( 621 self, callback: Callable[[LogEntry], None], entry: LogEntry 622 ) -> None: 623 """Run a callback and handle any errors.""" 624 try: 625 callback(entry) 626 except Exception: 627 # Only print the first callback error to avoid insanity. 628 if not self._printed_callback_error: 629 import traceback 630 631 traceback.print_exc(file=self._echofile) 632 self._printed_callback_error = True
Fancy-pants handler for logging output.
Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.
128 def __init__( 129 self, 130 *, 131 path: str | Path | None, 132 echofile: TextIO | None, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 echofile_timestamp_format: Literal['default', 'relative'] = 'default', 136 launch_time: float | None = None, 137 ): 138 super().__init__() 139 # pylint: disable=consider-using-with 140 self._file = None if path is None else open(path, 'w', encoding='utf-8') 141 self._echofile = echofile 142 self._echofile_timestamp_format = echofile_timestamp_format 143 self._callbacks: list[Callable[[LogEntry], None]] = [] 144 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 145 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 146 'stdout': None, 147 'stderr': None, 148 } 149 self._launch_time = time.time() if launch_time is None else launch_time 150 self._cache_size = 0 151 assert cache_size_limit >= 0 152 self._cache_size_limit = cache_size_limit 153 self._cache_time_limit = cache_time_limit 154 self._cache = deque[tuple[int, LogEntry]]() 155 self._cache_index_offset = 0 156 self._cache_lock = Lock() 157 self._printed_callback_error = False 158 self._thread_bootstrapped = False 159 self._thread = Thread(target=self._log_thread_main, daemon=True) 160 if __debug__: 161 self._last_slow_emit_warning_time: float | None = None 162 self._thread.start() 163 164 # Spin until our thread is up and running; otherwise we could 165 # wind up trying to push stuff to our event loop before the loop 166 # exists. 167 while not self._thread_bootstrapped: 168 time.sleep(0.001)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
170 def add_callback( 171 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 172 ) -> None: 173 """Add a callback to be run for each LogEntry. 174 175 Note that this callback will always run in a background thread. 176 Passing True for feed_existing_logs will cause all cached logs 177 in the handler to be fed to the callback (still in the 178 background thread though). 179 """ 180 181 # Kick this over to our bg thread to add the callback and 182 # process cached entries at the same time to ensure there are no 183 # race conditions that could cause entries to be skipped/etc. 184 self._event_loop.call_soon_threadsafe( 185 partial(self._add_callback_in_thread, call, feed_existing_logs) 186 )
Add a callback to be run for each LogEntry.
Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).
251 def get_cached( 252 self, start_index: int = 0, max_entries: int | None = None 253 ) -> LogArchive: 254 """Build and return an archive of cached log entries. 255 256 This will only include entries that have been processed by the 257 background thread, so may not include just-submitted logs or 258 entries for partially written stdout/stderr lines. Entries from 259 the range [start_index:start_index+max_entries] which are still 260 present in the cache will be returned. 261 """ 262 263 assert start_index >= 0 264 if max_entries is not None: 265 assert max_entries >= 0 266 with self._cache_lock: 267 # Transform start_index to our present cache space. 268 start_index -= self._cache_index_offset 269 # Calc end-index in our present cache space. 270 end_index = ( 271 len(self._cache) 272 if max_entries is None 273 else start_index + max_entries 274 ) 275 276 # Clamp both indexes to both ends of our present space. 277 start_index = max(0, min(start_index, len(self._cache))) 278 end_index = max(0, min(end_index, len(self._cache))) 279 280 return LogArchive( 281 log_size=self._cache_index_offset + len(self._cache), 282 start_index=start_index + self._cache_index_offset, 283 entries=self._cache_slice(start_index, end_index), 284 )
Build and return an archive of cached log entries.
This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.
309 def call_in_thread(self, call: Callable[[], Any]) -> None: 310 """Submit a call to be run in the logging background thread.""" 311 self._event_loop.call_soon_threadsafe(call)
Submit a call to be run in the logging background thread.
313 @override 314 def emit(self, record: logging.LogRecord) -> None: 315 # pylint: disable=too-many-branches 316 # pylint: disable=too-many-locals 317 318 if __debug__: 319 starttime = time.monotonic() 320 321 # Called by logging to send us records. 322 323 # Optimization: if our log args are all simple immutable values, 324 # we can just kick the whole thing over to our background thread 325 # to be formatted there at our leisure. If anything is mutable 326 # and thus could possibly change between now and then or if we 327 # want to do immediate file echoing then we need to bite the 328 # bullet and do that stuff here at the call site. 329 fast_path = self._echofile is None and self._is_immutable_log_data( 330 record.args 331 ) 332 333 # Note: just assuming types are correct here, but they'll be 334 # checked properly when the resulting LogEntry gets exported. 335 labels: dict[str, str] | None = getattr(record, 'labels', None) 336 if labels is None: 337 labels = {} 338 339 if fast_path: 340 if __debug__: 341 formattime = echotime = time.monotonic() 342 self._event_loop.call_soon_threadsafe( 343 partial( 344 self._emit_in_thread, 345 record.name, 346 record.levelno, 347 record.created, 348 record, 349 labels, 350 ) 351 ) 352 else: 353 # Slow case; do formatting and echoing here at the log call 354 # site. 355 msg = self.format(record) 356 357 if __debug__: 358 formattime = time.monotonic() 359 360 # Also immediately print pretty colored output to our echo 361 # file (generally stderr). We do this part here instead of 362 # in our bg thread because the delay can throw off command 363 # line prompts or make tight debugging harder. 364 if self._echofile is not None: 365 if self._echofile_timestamp_format == 'relative': 366 timestamp = f'{record.created - self._launch_time:.3f}' 367 else: 368 timestamp = ( 369 datetime.datetime.fromtimestamp( 370 record.created, tz=datetime.UTC 371 ).strftime('%H:%M:%S') 372 + f'.{int(record.msecs):03d}' 373 ) 374 375 # If color printing is disabled, show level through text 376 # instead of color. 377 lvlnameex = ( 378 '' 379 if color_enabled 380 else f' {logging.getLevelName(record.levelno)}' 381 ) 382 383 preinfo = ( 384 f'{Clr.WHT}{timestamp} {record.name}{lvlnameex}:' 385 f'{Clr.RST} ' 386 ) 387 ends = LEVELNO_COLOR_CODES.get(record.levelno) 388 if ends is not None: 389 self._echofile.write(f'{preinfo}{ends[0]}{msg}{ends[1]}\n') 390 else: 391 self._echofile.write(f'{preinfo}{msg}\n') 392 self._echofile.flush() 393 394 if __debug__: 395 echotime = time.monotonic() 396 397 self._event_loop.call_soon_threadsafe( 398 partial( 399 self._emit_in_thread, 400 record.name, 401 record.levelno, 402 record.created, 403 msg, 404 labels, 405 ) 406 ) 407 408 if __debug__: 409 # pylint: disable=used-before-assignment 410 # 411 # Make noise if we're taking a significant amount of time 412 # here. Limit the noise to once every so often though; 413 # otherwise we could get a feedback loop where every log 414 # emit results in a warning log which results in another, 415 # etc. 416 now = time.monotonic() 417 duration = now - starttime 418 format_duration = formattime - starttime 419 echo_duration = echotime - formattime 420 if duration > 0.05 and ( 421 self._last_slow_emit_warning_time is None 422 or now > self._last_slow_emit_warning_time + 10.0 423 ): 424 # Logging calls from *within* a logging handler sounds 425 # sketchy, so let's just kick this over to the bg event 426 # loop thread we've already got. 427 self._last_slow_emit_warning_time = now 428 self._event_loop.call_soon_threadsafe( 429 partial( 430 logging.warning, 431 'efro.logging.LogHandler emit took too long' 432 ' (%.2fs total; %.2fs format, %.2fs echo,' 433 ' fast_path=%s).', 434 duration, 435 format_duration, 436 echo_duration, 437 fast_path, 438 ) 439 )
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
471 def file_write(self, name: str, output: str) -> None: 472 """Send raw stdout/stderr output to the logger to be collated.""" 473 474 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 475 # lines in stack traces get written as lots of individual '^' 476 # writes. It feels a bit dirty to be pushing a deferred call to 477 # another thread for each character. Perhaps should do some sort 478 # of basic accumulation here? 479 self._event_loop.call_soon_threadsafe( 480 partial(self._file_write_in_thread, name, output) 481 )
Send raw stdout/stderr output to the logger to be collated.
517 def shutdown(self) -> None: 518 """Block until all pending logs/prints are done.""" 519 done = False 520 self.file_flush('stdout') 521 self.file_flush('stderr') 522 523 def _set_done() -> None: 524 nonlocal done 525 done = True 526 527 self._event_loop.call_soon_threadsafe(_set_done) 528 529 starttime = time.monotonic() 530 while not done: 531 if time.monotonic() - starttime > 5.0: 532 print('LogHandler shutdown hung!!!', file=sys.stderr) 533 break 534 time.sleep(0.01)
Block until all pending logs/prints are done.
536 def file_flush(self, name: str) -> None: 537 """Send raw stdout/stderr flush to the logger to be collated.""" 538 539 self._event_loop.call_soon_threadsafe( 540 partial(self._file_flush_in_thread, name) 541 )
Send raw stdout/stderr flush to the logger to be collated.
635class FileLogEcho: 636 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 637 638 def __init__( 639 self, original: TextIO, name: str, handler: LogHandler 640 ) -> None: 641 assert name in ('stdout', 'stderr') 642 self._original = original 643 self._name = name 644 self._handler = handler 645 646 def write(self, output: Any) -> None: 647 """Override standard write call.""" 648 self._original.write(output) 649 self._handler.file_write(self._name, output) 650 651 def flush(self) -> None: 652 """Flush the file.""" 653 self._original.flush() 654 655 # We also use this as a hint to ship whatever file chunks 656 # we've accumulated (we have to try and be smart about breaking 657 # our arbitrary file output into discrete entries). 658 self._handler.file_flush(self._name) 659 660 def isatty(self) -> bool: 661 """Are we a terminal?""" 662 return self._original.isatty()
A file-like object for forwarding stdout/stderr to a LogHandler.
646 def write(self, output: Any) -> None: 647 """Override standard write call.""" 648 self._original.write(output) 649 self._handler.file_write(self._name, output)
Override standard write call.
651 def flush(self) -> None: 652 """Flush the file.""" 653 self._original.flush() 654 655 # We also use this as a hint to ship whatever file chunks 656 # we've accumulated (we have to try and be smart about breaking 657 # our arbitrary file output into discrete entries). 658 self._handler.file_flush(self._name)
Flush the file.
665def setup_logging( 666 log_path: str | Path | None, 667 level: LogLevel, 668 *, 669 log_stdout_stderr: bool = False, 670 echo_to_stderr: bool = True, 671 cache_size_limit: int = 0, 672 cache_time_limit: datetime.timedelta | None = None, 673 launch_time: float | None = None, 674) -> LogHandler: 675 """Set up our logging environment. 676 677 Returns the custom handler which can be used to fetch information 678 about logs that have passed through it. (worst log-levels, caches, etc.). 679 """ 680 681 lmap = { 682 LogLevel.DEBUG: logging.DEBUG, 683 LogLevel.INFO: logging.INFO, 684 LogLevel.WARNING: logging.WARNING, 685 LogLevel.ERROR: logging.ERROR, 686 LogLevel.CRITICAL: logging.CRITICAL, 687 } 688 689 # Wire logger output to go to a structured log file. Also echo it to 690 # stderr IF we're running in a terminal. 691 # 692 # UPDATE: Actually gonna always go to stderr. Is there a reason we 693 # shouldn't? This makes debugging possible if all we have is access 694 # to a non-interactive terminal or file dump. We could add a 695 # '--quiet' arg or whatnot to change this behavior. 696 697 # Note: by passing in the *original* stderr here before we 698 # (potentially) replace it, we ensure that our log echos won't 699 # themselves be intercepted and sent to the logger which would 700 # create an infinite loop. 701 loghandler = LogHandler( 702 path=log_path, 703 echofile=sys.stderr if echo_to_stderr else None, 704 echofile_timestamp_format='relative', 705 cache_size_limit=cache_size_limit, 706 cache_time_limit=cache_time_limit, 707 launch_time=launch_time, 708 ) 709 710 # Note: going ahead with force=True here so that we replace any 711 # existing logger. Though we warn if it looks like we are doing that 712 # so we can try to avoid creating the first one. 713 had_previous_handlers = bool(logging.root.handlers) 714 logging.basicConfig( 715 level=lmap[level], 716 # We dump *only* the message here. We pass various log record 717 # bits around so we can write rich logs or format things later. 718 format='%(message)s', 719 handlers=[loghandler], 720 force=True, 721 ) 722 if had_previous_handlers: 723 logging.warning( 724 'setup_logging: Replacing existing handlers.' 725 ' Something may have logged before expected.' 726 ) 727 728 # Optionally intercept Python's stdout/stderr output and generate 729 # log entries from it. 730 if log_stdout_stderr: 731 sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler) 732 sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler) 733 734 return loghandler
Set up our logging environment.
Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).