efro.log
Logging functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Logging functionality.""" 4from __future__ import annotations 5 6import sys 7import time 8import asyncio 9import logging 10import datetime 11import itertools 12from enum import Enum 13from functools import partial 14from collections import deque 15from dataclasses import dataclass, field 16from typing import TYPE_CHECKING, Annotated, override 17from threading import Thread, current_thread, Lock 18 19from efro.util import utc_now 20from efro.terminal import Clr 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json 22 23if TYPE_CHECKING: 24 from pathlib import Path 25 from typing import Any, Callable, TextIO 26 27 28class LogLevel(Enum): 29 """Severity level for a log entry. 30 31 These enums have numeric values so they can be compared in severity. 32 Note that these values are not currently interchangeable with the 33 logging.ERROR, logging.DEBUG, etc. values. 34 """ 35 36 DEBUG = 0 37 INFO = 1 38 WARNING = 2 39 ERROR = 3 40 CRITICAL = 4 41 42 @property 43 def python_logging_level(self) -> int: 44 """Give the corresponding logging level.""" 45 return LOG_LEVEL_LEVELNOS[self] 46 47 @classmethod 48 def from_python_logging_level(cls, levelno: int) -> LogLevel: 49 """Given a Python logging level, return a LogLevel.""" 50 return LEVELNO_LOG_LEVELS[levelno] 51 52 53# Python logging levels from LogLevels 54LOG_LEVEL_LEVELNOS = { 55 LogLevel.DEBUG: logging.DEBUG, 56 LogLevel.INFO: logging.INFO, 57 LogLevel.WARNING: logging.WARNING, 58 LogLevel.ERROR: logging.ERROR, 59 LogLevel.CRITICAL: logging.CRITICAL, 60} 61 62# LogLevels from Python logging levels 63LEVELNO_LOG_LEVELS = { 64 logging.DEBUG: LogLevel.DEBUG, 65 logging.INFO: LogLevel.INFO, 66 logging.WARNING: LogLevel.WARNING, 67 logging.ERROR: LogLevel.ERROR, 68 logging.CRITICAL: LogLevel.CRITICAL, 69} 70 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = { 72 logging.DEBUG: (Clr.CYN, Clr.RST), 73 logging.INFO: ('', ''), 74 logging.WARNING: (Clr.YLW, Clr.RST), 75 logging.ERROR: (Clr.RED, Clr.RST), 76 logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST), 77} 78 79 80@ioprepped 81@dataclass 82class LogEntry: 83 """Single logged message.""" 84 85 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 86 message: Annotated[str, IOAttrs('m')] 87 level: Annotated[LogLevel, IOAttrs('l')] 88 time: Annotated[datetime.datetime, IOAttrs('t')] 89 90 # We support arbitrary string labels per log entry which can be 91 # incorporated into custom log processing. To populate this, our 92 # LogHandler class looks for a 'labels' dict passed in the optional 93 # 'extra' dict arg to standard Python log calls. 94 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 95 field(default_factory=dict) 96 ) 97 98 99@ioprepped 100@dataclass 101class LogArchive: 102 """Info and data for a log.""" 103 104 # Total number of entries submitted to the log. 105 log_size: Annotated[int, IOAttrs('t')] 106 107 # Offset for the entries contained here. 108 # (10 means our first entry is the 10th in the log, etc.) 109 start_index: Annotated[int, IOAttrs('c')] 110 111 entries: Annotated[list[LogEntry], IOAttrs('e')] 112 113 114class LogHandler(logging.Handler): 115 """Fancy-pants handler for logging output. 116 117 Writes logs to disk in structured json format and echoes them 118 to stdout/stderr with pretty colors. 119 """ 120 121 _event_loop: asyncio.AbstractEventLoop 122 123 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 124 # Otherwise we can get infinite loops as those prints come back to us 125 # as new log entries. 126 127 def __init__( 128 self, 129 path: str | Path | None, 130 echofile: TextIO | None, 131 suppress_non_root_debug: bool, 132 cache_size_limit: int, 133 cache_time_limit: datetime.timedelta | None, 134 ): 135 super().__init__() 136 # pylint: disable=consider-using-with 137 self._file = None if path is None else open(path, 'w', encoding='utf-8') 138 self._echofile = echofile 139 self._callbacks: list[Callable[[LogEntry], None]] = [] 140 self._suppress_non_root_debug = suppress_non_root_debug 141 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 142 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 143 'stdout': None, 144 'stderr': None, 145 } 146 self._cache_size = 0 147 assert cache_size_limit >= 0 148 self._cache_size_limit = cache_size_limit 149 self._cache_time_limit = cache_time_limit 150 self._cache = deque[tuple[int, LogEntry]]() 151 self._cache_index_offset = 0 152 self._cache_lock = Lock() 153 self._printed_callback_error = False 154 self._thread_bootstrapped = False 155 self._thread = Thread(target=self._log_thread_main, daemon=True) 156 if __debug__: 157 self._last_slow_emit_warning_time: float | None = None 158 self._thread.start() 159 160 # Spin until our thread is up and running; otherwise we could 161 # wind up trying to push stuff to our event loop before the 162 # loop exists. 163 while not self._thread_bootstrapped: 164 time.sleep(0.001) 165 166 def add_callback( 167 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 168 ) -> None: 169 """Add a callback to be run for each LogEntry. 170 171 Note that this callback will always run in a background thread. 172 Passing True for feed_existing_logs will cause all cached logs 173 in the handler to be fed to the callback (still in the 174 background thread though). 175 """ 176 177 # Kick this over to our bg thread to add the callback and 178 # process cached entries at the same time to ensure there are no 179 # race conditions that could cause entries to be skipped/etc. 180 self._event_loop.call_soon_threadsafe( 181 partial(self._add_callback_in_thread, call, feed_existing_logs) 182 ) 183 184 def _add_callback_in_thread( 185 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 186 ) -> None: 187 """Add a callback to be run for each LogEntry. 188 189 Note that this callback will always run in a background thread. 190 Passing True for feed_existing_logs will cause all cached logs 191 in the handler to be fed to the callback (still in the 192 background thread though). 193 """ 194 assert current_thread() is self._thread 195 self._callbacks.append(call) 196 197 # Run all of our cached entries through the new callback if desired. 198 if feed_existing_logs and self._cache_size_limit > 0: 199 with self._cache_lock: 200 for _id, entry in self._cache: 201 self._run_callback_on_entry(call, entry) 202 203 def _log_thread_main(self) -> None: 204 self._event_loop = asyncio.new_event_loop() 205 206 # In our background thread event loop we do a fair amount of 207 # slow synchronous stuff such as mucking with the log cache. 208 # Let's avoid getting tons of warnings about this in debug mode. 209 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 210 211 # NOTE: if we ever use default threadpool at all we should allow 212 # setting it for our loop. 213 asyncio.set_event_loop(self._event_loop) 214 self._thread_bootstrapped = True 215 try: 216 if self._cache_time_limit is not None: 217 _prunetask = self._event_loop.create_task( 218 self._time_prune_cache() 219 ) 220 self._event_loop.run_forever() 221 except BaseException: 222 # If this ever goes down we're in trouble; we won't be able 223 # to log about it though. Try to make some noise however we 224 # can. 225 print('LogHandler died!!!', file=sys.stderr) 226 import traceback 227 228 traceback.print_exc() 229 raise 230 231 async def _time_prune_cache(self) -> None: 232 assert self._cache_time_limit is not None 233 while bool(True): 234 await asyncio.sleep(61.27) 235 now = utc_now() 236 with self._cache_lock: 237 # Prune the oldest entry as long as there is a first one that 238 # is too old. 239 while ( 240 self._cache 241 and (now - self._cache[0][1].time) >= self._cache_time_limit 242 ): 243 popped = self._cache.popleft() 244 self._cache_size -= popped[0] 245 self._cache_index_offset += 1 246 247 def get_cached( 248 self, start_index: int = 0, max_entries: int | None = None 249 ) -> LogArchive: 250 """Build and return an archive of cached log entries. 251 252 This will only include entries that have been processed by the 253 background thread, so may not include just-submitted logs or 254 entries for partially written stdout/stderr lines. 255 Entries from the range [start_index:start_index+max_entries] 256 which are still present in the cache will be returned. 257 """ 258 259 assert start_index >= 0 260 if max_entries is not None: 261 assert max_entries >= 0 262 with self._cache_lock: 263 # Transform start_index to our present cache space. 264 start_index -= self._cache_index_offset 265 # Calc end-index in our present cache space. 266 end_index = ( 267 len(self._cache) 268 if max_entries is None 269 else start_index + max_entries 270 ) 271 272 # Clamp both indexes to both ends of our present space. 273 start_index = max(0, min(start_index, len(self._cache))) 274 end_index = max(0, min(end_index, len(self._cache))) 275 276 return LogArchive( 277 log_size=self._cache_index_offset + len(self._cache), 278 start_index=start_index + self._cache_index_offset, 279 entries=self._cache_slice(start_index, end_index), 280 ) 281 282 def _cache_slice( 283 self, start: int, end: int, step: int = 1 284 ) -> list[LogEntry]: 285 # Deque doesn't natively support slicing but we can do it manually. 286 # It sounds like rotating the deque and pulling from the beginning 287 # is the most efficient way to do this. The downside is the deque 288 # gets temporarily modified in the process so we need to make sure 289 # we're holding the lock. 290 assert self._cache_lock.locked() 291 cache = self._cache 292 cache.rotate(-start) 293 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 294 cache.rotate(start) 295 return slc 296 297 @classmethod 298 def _is_immutable_log_data(cls, data: Any) -> bool: 299 if isinstance(data, (str, bool, int, float, bytes)): 300 return True 301 if isinstance(data, tuple): 302 return all(cls._is_immutable_log_data(x) for x in data) 303 return False 304 305 def call_in_thread(self, call: Callable[[], Any]) -> None: 306 """Submit a call to be run in the logging background thread.""" 307 self._event_loop.call_soon_threadsafe(call) 308 309 @override 310 def emit(self, record: logging.LogRecord) -> None: 311 # pylint: disable=too-many-branches 312 if __debug__: 313 starttime = time.monotonic() 314 315 # Called by logging to send us records. 316 317 # TODO - kill this. 318 if ( 319 self._suppress_non_root_debug 320 and record.name != 'root' 321 and record.levelname == 'DEBUG' 322 ): 323 return 324 325 # Optimization: if our log args are all simple immutable values, 326 # we can just kick the whole thing over to our background thread to 327 # be formatted there at our leisure. If anything is mutable and 328 # thus could possibly change between now and then or if we want 329 # to do immediate file echoing then we need to bite the bullet 330 # and do that stuff here at the call site. 331 fast_path = self._echofile is None and self._is_immutable_log_data( 332 record.args 333 ) 334 335 # Note: just assuming types are correct here, but they'll be 336 # checked properly when the resulting LogEntry gets exported. 337 labels: dict[str, str] | None = getattr(record, 'labels', None) 338 if labels is None: 339 labels = {} 340 341 if fast_path: 342 if __debug__: 343 formattime = echotime = time.monotonic() 344 self._event_loop.call_soon_threadsafe( 345 partial( 346 self._emit_in_thread, 347 record.name, 348 record.levelno, 349 record.created, 350 record, 351 labels, 352 ) 353 ) 354 else: 355 # Slow case; do formatting and echoing here at the log call 356 # site. 357 msg = self.format(record) 358 359 if __debug__: 360 formattime = time.monotonic() 361 362 # Also immediately print pretty colored output to our echo file 363 # (generally stderr). We do this part here instead of in our bg 364 # thread because the delay can throw off command line prompts or 365 # make tight debugging harder. 366 if self._echofile is not None: 367 # try: 368 # if self._report_blocking_io_on_echo_error: 369 # premsg = ( 370 # 'WARNING: BlockingIOError ON LOG ECHO OUTPUT;' 371 # ' YOU ARE PROBABLY MISSING LOGS\n' 372 # ) 373 # self._report_blocking_io_on_echo_error = False 374 # else: 375 # premsg = '' 376 ends = LEVELNO_COLOR_CODES.get(record.levelno) 377 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 378 if ends is not None: 379 self._echofile.write( 380 f'{namepre}{ends[0]}' 381 f'{msg}{ends[1]}\n' 382 # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n' 383 ) 384 else: 385 self._echofile.write(f'{namepre}{msg}\n') 386 self._echofile.flush() 387 # except BlockingIOError: 388 # # Ran into this when doing a bunch of logging; assuming 389 # # this is asyncio's doing?.. For now trying to survive 390 # # the error but telling the user something is probably 391 # # missing in their output. 392 # self._report_blocking_io_on_echo_error = True 393 394 if __debug__: 395 echotime = time.monotonic() 396 397 self._event_loop.call_soon_threadsafe( 398 partial( 399 self._emit_in_thread, 400 record.name, 401 record.levelno, 402 record.created, 403 msg, 404 labels, 405 ) 406 ) 407 408 if __debug__: 409 # pylint: disable=used-before-assignment 410 # Make noise if we're taking a significant amount of time here. 411 # Limit the noise to once every so often though; otherwise we 412 # could get a feedback loop where every log emit results in a 413 # warning log which results in another, etc. 414 now = time.monotonic() 415 # noinspection PyUnboundLocalVariable 416 duration = now - starttime # pyright: ignore 417 # noinspection PyUnboundLocalVariable 418 format_duration = formattime - starttime # pyright: ignore 419 # noinspection PyUnboundLocalVariable 420 echo_duration = echotime - formattime # pyright: ignore 421 if duration > 0.05 and ( 422 self._last_slow_emit_warning_time is None 423 or now > self._last_slow_emit_warning_time + 10.0 424 ): 425 # Logging calls from *within* a logging handler 426 # sounds sketchy, so let's just kick this over to 427 # the bg event loop thread we've already got. 428 self._last_slow_emit_warning_time = now 429 self._event_loop.call_soon_threadsafe( 430 partial( 431 logging.warning, 432 'efro.log.LogHandler emit took too long' 433 ' (%.2fs total; %.2fs format, %.2fs echo,' 434 ' fast_path=%s).', 435 duration, 436 format_duration, 437 echo_duration, 438 fast_path, 439 ) 440 ) 441 442 def _emit_in_thread( 443 self, 444 name: str, 445 levelno: int, 446 created: float, 447 message: str | logging.LogRecord, 448 labels: dict[str, str], 449 ) -> None: 450 try: 451 # If they passed a raw record here, bake it down to a string. 452 if isinstance(message, logging.LogRecord): 453 message = self.format(message) 454 455 self._emit_entry( 456 LogEntry( 457 name=name, 458 message=message, 459 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 460 time=datetime.datetime.fromtimestamp( 461 created, datetime.timezone.utc 462 ), 463 labels=labels, 464 ) 465 ) 466 except Exception: 467 import traceback 468 469 traceback.print_exc(file=self._echofile) 470 471 def file_write(self, name: str, output: str) -> None: 472 """Send raw stdout/stderr output to the logger to be collated.""" 473 474 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 475 # lines in stack traces get written as lots of individual '^' 476 # writes. It feels a bit dirty to be pushing a deferred call to 477 # another thread for each character. Perhaps should do some sort 478 # of basic accumulation here? 479 self._event_loop.call_soon_threadsafe( 480 partial(self._file_write_in_thread, name, output) 481 ) 482 483 def _file_write_in_thread(self, name: str, output: str) -> None: 484 try: 485 assert name in ('stdout', 'stderr') 486 487 # Here we try to be somewhat smart about breaking arbitrary 488 # print output into discrete log entries. 489 490 self._file_chunks[name].append(output) 491 492 # Individual parts of a print come across as separate writes, 493 # and the end of a print will be a standalone '\n' by default. 494 # Let's use that as a hint that we're likely at the end of 495 # a full print statement and ship what we've got. 496 if output == '\n': 497 self._ship_file_chunks(name, cancel_ship_task=True) 498 else: 499 # By default just keep adding chunks. 500 # However we keep a timer running anytime we've got 501 # unshipped chunks so that we can ship what we've got 502 # after a short bit if we never get a newline. 503 ship_task = self._file_chunk_ship_task[name] 504 if ship_task is None: 505 self._file_chunk_ship_task[name] = ( 506 self._event_loop.create_task( 507 self._ship_chunks_task(name), 508 name='log ship file chunks', 509 ) 510 ) 511 512 except Exception: 513 import traceback 514 515 traceback.print_exc(file=self._echofile) 516 517 def shutdown(self) -> None: 518 """Block until all pending logs/prints are done.""" 519 done = False 520 self.file_flush('stdout') 521 self.file_flush('stderr') 522 523 def _set_done() -> None: 524 nonlocal done 525 done = True 526 527 self._event_loop.call_soon_threadsafe(_set_done) 528 529 starttime = time.monotonic() 530 while not done: 531 if time.monotonic() - starttime > 5.0: 532 print('LogHandler shutdown hung!!!', file=sys.stderr) 533 break 534 time.sleep(0.01) 535 536 def file_flush(self, name: str) -> None: 537 """Send raw stdout/stderr flush to the logger to be collated.""" 538 539 self._event_loop.call_soon_threadsafe( 540 partial(self._file_flush_in_thread, name) 541 ) 542 543 def _file_flush_in_thread(self, name: str) -> None: 544 try: 545 assert name in ('stdout', 'stderr') 546 547 # Immediately ship whatever chunks we've got. 548 if self._file_chunks[name]: 549 self._ship_file_chunks(name, cancel_ship_task=True) 550 551 except Exception: 552 import traceback 553 554 traceback.print_exc(file=self._echofile) 555 556 async def _ship_chunks_task(self, name: str) -> None: 557 # Note: it's important we sleep here for a moment. Otherwise, 558 # things like '^^^^^^^^^^^^' lines in stack traces, which come 559 # through as lots of individual '^' writes, tend to get broken 560 # into lots of tiny little lines by us. 561 await asyncio.sleep(0.01) 562 self._ship_file_chunks(name, cancel_ship_task=False) 563 564 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 565 # Note: Raw print input generally ends in a newline, but that is 566 # redundant when we break things into log entries and results in 567 # extra empty lines. So strip off a single trailing newline if 568 # one is present. 569 text = ''.join(self._file_chunks[name]).removesuffix('\n') 570 571 self._emit_entry( 572 LogEntry( 573 name=name, message=text, level=LogLevel.INFO, time=utc_now() 574 ) 575 ) 576 self._file_chunks[name] = [] 577 ship_task = self._file_chunk_ship_task[name] 578 if cancel_ship_task and ship_task is not None: 579 ship_task.cancel() 580 self._file_chunk_ship_task[name] = None 581 582 def _emit_entry(self, entry: LogEntry) -> None: 583 assert current_thread() is self._thread 584 585 # Store to our cache. 586 if self._cache_size_limit > 0: 587 with self._cache_lock: 588 # Do a rough calc of how many bytes this entry consumes. 589 entry_size = sum( 590 sys.getsizeof(x) 591 for x in ( 592 entry, 593 entry.name, 594 entry.message, 595 entry.level, 596 entry.time, 597 ) 598 ) 599 self._cache.append((entry_size, entry)) 600 self._cache_size += entry_size 601 602 # Prune old until we are back at or under our limit. 603 while self._cache_size > self._cache_size_limit: 604 popped = self._cache.popleft() 605 self._cache_size -= popped[0] 606 self._cache_index_offset += 1 607 608 # Pass to callbacks. 609 for call in self._callbacks: 610 self._run_callback_on_entry(call, entry) 611 612 # Dump to our structured log file. 613 # TODO: should set a timer for flushing; don't flush every line. 614 if self._file is not None: 615 entry_s = dataclass_to_json(entry) 616 assert '\n' not in entry_s # Make sure its a single line. 617 print(entry_s, file=self._file, flush=True) 618 619 def _run_callback_on_entry( 620 self, callback: Callable[[LogEntry], None], entry: LogEntry 621 ) -> None: 622 """Run a callback and handle any errors.""" 623 try: 624 callback(entry) 625 except Exception: 626 # Only print the first callback error to avoid insanity. 627 if not self._printed_callback_error: 628 import traceback 629 630 traceback.print_exc(file=self._echofile) 631 self._printed_callback_error = True 632 633 634class FileLogEcho: 635 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 636 637 def __init__( 638 self, original: TextIO, name: str, handler: LogHandler 639 ) -> None: 640 assert name in ('stdout', 'stderr') 641 self._original = original 642 self._name = name 643 self._handler = handler 644 645 # Think this was a result of setting non-blocking stdin somehow; 646 # probably not needed. 647 # self._report_blocking_io_error = False 648 649 def write(self, output: Any) -> None: 650 """Override standard write call.""" 651 # try: 652 # if self._report_blocking_io_error: 653 # self._report_blocking_io_error = False 654 # self._original.write( 655 # 'WARNING: BlockingIOError ENCOUNTERED;' 656 # ' OUTPUT IS PROBABLY MISSING' 657 # ) 658 659 self._original.write(output) 660 # except BlockingIOError: 661 # self._report_blocking_io_error = True 662 self._handler.file_write(self._name, output) 663 664 def flush(self) -> None: 665 """Flush the file.""" 666 self._original.flush() 667 668 # We also use this as a hint to ship whatever file chunks 669 # we've accumulated (we have to try and be smart about breaking 670 # our arbitrary file output into discrete entries). 671 self._handler.file_flush(self._name) 672 673 def isatty(self) -> bool: 674 """Are we a terminal?""" 675 return self._original.isatty() 676 677 678def setup_logging( 679 log_path: str | Path | None, 680 level: LogLevel, 681 suppress_non_root_debug: bool = False, 682 log_stdout_stderr: bool = False, 683 echo_to_stderr: bool = True, 684 cache_size_limit: int = 0, 685 cache_time_limit: datetime.timedelta | None = None, 686) -> LogHandler: 687 """Set up our logging environment. 688 689 Returns the custom handler which can be used to fetch information 690 about logs that have passed through it. (worst log-levels, caches, etc.). 691 """ 692 693 lmap = { 694 LogLevel.DEBUG: logging.DEBUG, 695 LogLevel.INFO: logging.INFO, 696 LogLevel.WARNING: logging.WARNING, 697 LogLevel.ERROR: logging.ERROR, 698 LogLevel.CRITICAL: logging.CRITICAL, 699 } 700 701 # Wire logger output to go to a structured log file. 702 # Also echo it to stderr IF we're running in a terminal. 703 # UPDATE: Actually gonna always go to stderr. Is there a 704 # reason we shouldn't? This makes debugging possible if all 705 # we have is access to a non-interactive terminal or file dump. 706 # We could add a '--quiet' arg or whatnot to change this behavior. 707 708 # Note: by passing in the *original* stderr here before we 709 # (potentially) replace it, we ensure that our log echos 710 # won't themselves be intercepted and sent to the logger 711 # which would create an infinite loop. 712 loghandler = LogHandler( 713 log_path, 714 echofile=sys.stderr if echo_to_stderr else None, 715 suppress_non_root_debug=suppress_non_root_debug, 716 cache_size_limit=cache_size_limit, 717 cache_time_limit=cache_time_limit, 718 ) 719 720 # Note: going ahead with force=True here so that we replace any 721 # existing logger. Though we warn if it looks like we are doing 722 # that so we can try to avoid creating the first one. 723 had_previous_handlers = bool(logging.root.handlers) 724 logging.basicConfig( 725 level=lmap[level], 726 # format='%(name)s: %(message)s', 727 # We dump *only* the message here. We pass various log record bits 728 # around and format things fancier where they end up. 729 format='%(message)s', 730 handlers=[loghandler], 731 force=True, 732 ) 733 if had_previous_handlers: 734 logging.warning( 735 'setup_logging: Replacing existing handlers.' 736 ' Something may have logged before expected.' 737 ) 738 739 # Optionally intercept Python's stdout/stderr output and generate 740 # log entries from it. 741 if log_stdout_stderr: 742 sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler) 743 sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler) 744 745 return loghandler
29class LogLevel(Enum): 30 """Severity level for a log entry. 31 32 These enums have numeric values so they can be compared in severity. 33 Note that these values are not currently interchangeable with the 34 logging.ERROR, logging.DEBUG, etc. values. 35 """ 36 37 DEBUG = 0 38 INFO = 1 39 WARNING = 2 40 ERROR = 3 41 CRITICAL = 4 42 43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self] 47 48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Severity level for a log entry.
These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.
43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self]
Give the corresponding logging level.
48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Given a Python logging level, return a LogLevel.
Inherited Members
- enum.Enum
- name
- value
81@ioprepped 82@dataclass 83class LogEntry: 84 """Single logged message.""" 85 86 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 87 message: Annotated[str, IOAttrs('m')] 88 level: Annotated[LogLevel, IOAttrs('l')] 89 time: Annotated[datetime.datetime, IOAttrs('t')] 90 91 # We support arbitrary string labels per log entry which can be 92 # incorporated into custom log processing. To populate this, our 93 # LogHandler class looks for a 'labels' dict passed in the optional 94 # 'extra' dict arg to standard Python log calls. 95 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 96 field(default_factory=dict) 97 )
Single logged message.
100@ioprepped 101@dataclass 102class LogArchive: 103 """Info and data for a log.""" 104 105 # Total number of entries submitted to the log. 106 log_size: Annotated[int, IOAttrs('t')] 107 108 # Offset for the entries contained here. 109 # (10 means our first entry is the 10th in the log, etc.) 110 start_index: Annotated[int, IOAttrs('c')] 111 112 entries: Annotated[list[LogEntry], IOAttrs('e')]
Info and data for a log.
115class LogHandler(logging.Handler): 116 """Fancy-pants handler for logging output. 117 118 Writes logs to disk in structured json format and echoes them 119 to stdout/stderr with pretty colors. 120 """ 121 122 _event_loop: asyncio.AbstractEventLoop 123 124 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 125 # Otherwise we can get infinite loops as those prints come back to us 126 # as new log entries. 127 128 def __init__( 129 self, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001) 166 167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 partial(self._add_callback_in_thread, call, feed_existing_logs) 183 ) 184 185 def _add_callback_in_thread( 186 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 187 ) -> None: 188 """Add a callback to be run for each LogEntry. 189 190 Note that this callback will always run in a background thread. 191 Passing True for feed_existing_logs will cause all cached logs 192 in the handler to be fed to the callback (still in the 193 background thread though). 194 """ 195 assert current_thread() is self._thread 196 self._callbacks.append(call) 197 198 # Run all of our cached entries through the new callback if desired. 199 if feed_existing_logs and self._cache_size_limit > 0: 200 with self._cache_lock: 201 for _id, entry in self._cache: 202 self._run_callback_on_entry(call, entry) 203 204 def _log_thread_main(self) -> None: 205 self._event_loop = asyncio.new_event_loop() 206 207 # In our background thread event loop we do a fair amount of 208 # slow synchronous stuff such as mucking with the log cache. 209 # Let's avoid getting tons of warnings about this in debug mode. 210 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 211 212 # NOTE: if we ever use default threadpool at all we should allow 213 # setting it for our loop. 214 asyncio.set_event_loop(self._event_loop) 215 self._thread_bootstrapped = True 216 try: 217 if self._cache_time_limit is not None: 218 _prunetask = self._event_loop.create_task( 219 self._time_prune_cache() 220 ) 221 self._event_loop.run_forever() 222 except BaseException: 223 # If this ever goes down we're in trouble; we won't be able 224 # to log about it though. Try to make some noise however we 225 # can. 226 print('LogHandler died!!!', file=sys.stderr) 227 import traceback 228 229 traceback.print_exc() 230 raise 231 232 async def _time_prune_cache(self) -> None: 233 assert self._cache_time_limit is not None 234 while bool(True): 235 await asyncio.sleep(61.27) 236 now = utc_now() 237 with self._cache_lock: 238 # Prune the oldest entry as long as there is a first one that 239 # is too old. 240 while ( 241 self._cache 242 and (now - self._cache[0][1].time) >= self._cache_time_limit 243 ): 244 popped = self._cache.popleft() 245 self._cache_size -= popped[0] 246 self._cache_index_offset += 1 247 248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 ) 282 283 def _cache_slice( 284 self, start: int, end: int, step: int = 1 285 ) -> list[LogEntry]: 286 # Deque doesn't natively support slicing but we can do it manually. 287 # It sounds like rotating the deque and pulling from the beginning 288 # is the most efficient way to do this. The downside is the deque 289 # gets temporarily modified in the process so we need to make sure 290 # we're holding the lock. 291 assert self._cache_lock.locked() 292 cache = self._cache 293 cache.rotate(-start) 294 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 295 cache.rotate(start) 296 return slc 297 298 @classmethod 299 def _is_immutable_log_data(cls, data: Any) -> bool: 300 if isinstance(data, (str, bool, int, float, bytes)): 301 return True 302 if isinstance(data, tuple): 303 return all(cls._is_immutable_log_data(x) for x in data) 304 return False 305 306 def call_in_thread(self, call: Callable[[], Any]) -> None: 307 """Submit a call to be run in the logging background thread.""" 308 self._event_loop.call_soon_threadsafe(call) 309 310 @override 311 def emit(self, record: logging.LogRecord) -> None: 312 # pylint: disable=too-many-branches 313 if __debug__: 314 starttime = time.monotonic() 315 316 # Called by logging to send us records. 317 318 # TODO - kill this. 319 if ( 320 self._suppress_non_root_debug 321 and record.name != 'root' 322 and record.levelname == 'DEBUG' 323 ): 324 return 325 326 # Optimization: if our log args are all simple immutable values, 327 # we can just kick the whole thing over to our background thread to 328 # be formatted there at our leisure. If anything is mutable and 329 # thus could possibly change between now and then or if we want 330 # to do immediate file echoing then we need to bite the bullet 331 # and do that stuff here at the call site. 332 fast_path = self._echofile is None and self._is_immutable_log_data( 333 record.args 334 ) 335 336 # Note: just assuming types are correct here, but they'll be 337 # checked properly when the resulting LogEntry gets exported. 338 labels: dict[str, str] | None = getattr(record, 'labels', None) 339 if labels is None: 340 labels = {} 341 342 if fast_path: 343 if __debug__: 344 formattime = echotime = time.monotonic() 345 self._event_loop.call_soon_threadsafe( 346 partial( 347 self._emit_in_thread, 348 record.name, 349 record.levelno, 350 record.created, 351 record, 352 labels, 353 ) 354 ) 355 else: 356 # Slow case; do formatting and echoing here at the log call 357 # site. 358 msg = self.format(record) 359 360 if __debug__: 361 formattime = time.monotonic() 362 363 # Also immediately print pretty colored output to our echo file 364 # (generally stderr). We do this part here instead of in our bg 365 # thread because the delay can throw off command line prompts or 366 # make tight debugging harder. 367 if self._echofile is not None: 368 # try: 369 # if self._report_blocking_io_on_echo_error: 370 # premsg = ( 371 # 'WARNING: BlockingIOError ON LOG ECHO OUTPUT;' 372 # ' YOU ARE PROBABLY MISSING LOGS\n' 373 # ) 374 # self._report_blocking_io_on_echo_error = False 375 # else: 376 # premsg = '' 377 ends = LEVELNO_COLOR_CODES.get(record.levelno) 378 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 379 if ends is not None: 380 self._echofile.write( 381 f'{namepre}{ends[0]}' 382 f'{msg}{ends[1]}\n' 383 # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n' 384 ) 385 else: 386 self._echofile.write(f'{namepre}{msg}\n') 387 self._echofile.flush() 388 # except BlockingIOError: 389 # # Ran into this when doing a bunch of logging; assuming 390 # # this is asyncio's doing?.. For now trying to survive 391 # # the error but telling the user something is probably 392 # # missing in their output. 393 # self._report_blocking_io_on_echo_error = True 394 395 if __debug__: 396 echotime = time.monotonic() 397 398 self._event_loop.call_soon_threadsafe( 399 partial( 400 self._emit_in_thread, 401 record.name, 402 record.levelno, 403 record.created, 404 msg, 405 labels, 406 ) 407 ) 408 409 if __debug__: 410 # pylint: disable=used-before-assignment 411 # Make noise if we're taking a significant amount of time here. 412 # Limit the noise to once every so often though; otherwise we 413 # could get a feedback loop where every log emit results in a 414 # warning log which results in another, etc. 415 now = time.monotonic() 416 # noinspection PyUnboundLocalVariable 417 duration = now - starttime # pyright: ignore 418 # noinspection PyUnboundLocalVariable 419 format_duration = formattime - starttime # pyright: ignore 420 # noinspection PyUnboundLocalVariable 421 echo_duration = echotime - formattime # pyright: ignore 422 if duration > 0.05 and ( 423 self._last_slow_emit_warning_time is None 424 or now > self._last_slow_emit_warning_time + 10.0 425 ): 426 # Logging calls from *within* a logging handler 427 # sounds sketchy, so let's just kick this over to 428 # the bg event loop thread we've already got. 429 self._last_slow_emit_warning_time = now 430 self._event_loop.call_soon_threadsafe( 431 partial( 432 logging.warning, 433 'efro.log.LogHandler emit took too long' 434 ' (%.2fs total; %.2fs format, %.2fs echo,' 435 ' fast_path=%s).', 436 duration, 437 format_duration, 438 echo_duration, 439 fast_path, 440 ) 441 ) 442 443 def _emit_in_thread( 444 self, 445 name: str, 446 levelno: int, 447 created: float, 448 message: str | logging.LogRecord, 449 labels: dict[str, str], 450 ) -> None: 451 try: 452 # If they passed a raw record here, bake it down to a string. 453 if isinstance(message, logging.LogRecord): 454 message = self.format(message) 455 456 self._emit_entry( 457 LogEntry( 458 name=name, 459 message=message, 460 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 461 time=datetime.datetime.fromtimestamp( 462 created, datetime.timezone.utc 463 ), 464 labels=labels, 465 ) 466 ) 467 except Exception: 468 import traceback 469 470 traceback.print_exc(file=self._echofile) 471 472 def file_write(self, name: str, output: str) -> None: 473 """Send raw stdout/stderr output to the logger to be collated.""" 474 475 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 476 # lines in stack traces get written as lots of individual '^' 477 # writes. It feels a bit dirty to be pushing a deferred call to 478 # another thread for each character. Perhaps should do some sort 479 # of basic accumulation here? 480 self._event_loop.call_soon_threadsafe( 481 partial(self._file_write_in_thread, name, output) 482 ) 483 484 def _file_write_in_thread(self, name: str, output: str) -> None: 485 try: 486 assert name in ('stdout', 'stderr') 487 488 # Here we try to be somewhat smart about breaking arbitrary 489 # print output into discrete log entries. 490 491 self._file_chunks[name].append(output) 492 493 # Individual parts of a print come across as separate writes, 494 # and the end of a print will be a standalone '\n' by default. 495 # Let's use that as a hint that we're likely at the end of 496 # a full print statement and ship what we've got. 497 if output == '\n': 498 self._ship_file_chunks(name, cancel_ship_task=True) 499 else: 500 # By default just keep adding chunks. 501 # However we keep a timer running anytime we've got 502 # unshipped chunks so that we can ship what we've got 503 # after a short bit if we never get a newline. 504 ship_task = self._file_chunk_ship_task[name] 505 if ship_task is None: 506 self._file_chunk_ship_task[name] = ( 507 self._event_loop.create_task( 508 self._ship_chunks_task(name), 509 name='log ship file chunks', 510 ) 511 ) 512 513 except Exception: 514 import traceback 515 516 traceback.print_exc(file=self._echofile) 517 518 def shutdown(self) -> None: 519 """Block until all pending logs/prints are done.""" 520 done = False 521 self.file_flush('stdout') 522 self.file_flush('stderr') 523 524 def _set_done() -> None: 525 nonlocal done 526 done = True 527 528 self._event_loop.call_soon_threadsafe(_set_done) 529 530 starttime = time.monotonic() 531 while not done: 532 if time.monotonic() - starttime > 5.0: 533 print('LogHandler shutdown hung!!!', file=sys.stderr) 534 break 535 time.sleep(0.01) 536 537 def file_flush(self, name: str) -> None: 538 """Send raw stdout/stderr flush to the logger to be collated.""" 539 540 self._event_loop.call_soon_threadsafe( 541 partial(self._file_flush_in_thread, name) 542 ) 543 544 def _file_flush_in_thread(self, name: str) -> None: 545 try: 546 assert name in ('stdout', 'stderr') 547 548 # Immediately ship whatever chunks we've got. 549 if self._file_chunks[name]: 550 self._ship_file_chunks(name, cancel_ship_task=True) 551 552 except Exception: 553 import traceback 554 555 traceback.print_exc(file=self._echofile) 556 557 async def _ship_chunks_task(self, name: str) -> None: 558 # Note: it's important we sleep here for a moment. Otherwise, 559 # things like '^^^^^^^^^^^^' lines in stack traces, which come 560 # through as lots of individual '^' writes, tend to get broken 561 # into lots of tiny little lines by us. 562 await asyncio.sleep(0.01) 563 self._ship_file_chunks(name, cancel_ship_task=False) 564 565 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 566 # Note: Raw print input generally ends in a newline, but that is 567 # redundant when we break things into log entries and results in 568 # extra empty lines. So strip off a single trailing newline if 569 # one is present. 570 text = ''.join(self._file_chunks[name]).removesuffix('\n') 571 572 self._emit_entry( 573 LogEntry( 574 name=name, message=text, level=LogLevel.INFO, time=utc_now() 575 ) 576 ) 577 self._file_chunks[name] = [] 578 ship_task = self._file_chunk_ship_task[name] 579 if cancel_ship_task and ship_task is not None: 580 ship_task.cancel() 581 self._file_chunk_ship_task[name] = None 582 583 def _emit_entry(self, entry: LogEntry) -> None: 584 assert current_thread() is self._thread 585 586 # Store to our cache. 587 if self._cache_size_limit > 0: 588 with self._cache_lock: 589 # Do a rough calc of how many bytes this entry consumes. 590 entry_size = sum( 591 sys.getsizeof(x) 592 for x in ( 593 entry, 594 entry.name, 595 entry.message, 596 entry.level, 597 entry.time, 598 ) 599 ) 600 self._cache.append((entry_size, entry)) 601 self._cache_size += entry_size 602 603 # Prune old until we are back at or under our limit. 604 while self._cache_size > self._cache_size_limit: 605 popped = self._cache.popleft() 606 self._cache_size -= popped[0] 607 self._cache_index_offset += 1 608 609 # Pass to callbacks. 610 for call in self._callbacks: 611 self._run_callback_on_entry(call, entry) 612 613 # Dump to our structured log file. 614 # TODO: should set a timer for flushing; don't flush every line. 615 if self._file is not None: 616 entry_s = dataclass_to_json(entry) 617 assert '\n' not in entry_s # Make sure its a single line. 618 print(entry_s, file=self._file, flush=True) 619 620 def _run_callback_on_entry( 621 self, callback: Callable[[LogEntry], None], entry: LogEntry 622 ) -> None: 623 """Run a callback and handle any errors.""" 624 try: 625 callback(entry) 626 except Exception: 627 # Only print the first callback error to avoid insanity. 628 if not self._printed_callback_error: 629 import traceback 630 631 traceback.print_exc(file=self._echofile) 632 self._printed_callback_error = True
Fancy-pants handler for logging output.
Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.
128 def __init__( 129 self, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 partial(self._add_callback_in_thread, call, feed_existing_logs) 183 )
Add a callback to be run for each LogEntry.
Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).
248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 )
Build and return an archive of cached log entries.
This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.
306 def call_in_thread(self, call: Callable[[], Any]) -> None: 307 """Submit a call to be run in the logging background thread.""" 308 self._event_loop.call_soon_threadsafe(call)
Submit a call to be run in the logging background thread.
310 @override 311 def emit(self, record: logging.LogRecord) -> None: 312 # pylint: disable=too-many-branches 313 if __debug__: 314 starttime = time.monotonic() 315 316 # Called by logging to send us records. 317 318 # TODO - kill this. 319 if ( 320 self._suppress_non_root_debug 321 and record.name != 'root' 322 and record.levelname == 'DEBUG' 323 ): 324 return 325 326 # Optimization: if our log args are all simple immutable values, 327 # we can just kick the whole thing over to our background thread to 328 # be formatted there at our leisure. If anything is mutable and 329 # thus could possibly change between now and then or if we want 330 # to do immediate file echoing then we need to bite the bullet 331 # and do that stuff here at the call site. 332 fast_path = self._echofile is None and self._is_immutable_log_data( 333 record.args 334 ) 335 336 # Note: just assuming types are correct here, but they'll be 337 # checked properly when the resulting LogEntry gets exported. 338 labels: dict[str, str] | None = getattr(record, 'labels', None) 339 if labels is None: 340 labels = {} 341 342 if fast_path: 343 if __debug__: 344 formattime = echotime = time.monotonic() 345 self._event_loop.call_soon_threadsafe( 346 partial( 347 self._emit_in_thread, 348 record.name, 349 record.levelno, 350 record.created, 351 record, 352 labels, 353 ) 354 ) 355 else: 356 # Slow case; do formatting and echoing here at the log call 357 # site. 358 msg = self.format(record) 359 360 if __debug__: 361 formattime = time.monotonic() 362 363 # Also immediately print pretty colored output to our echo file 364 # (generally stderr). We do this part here instead of in our bg 365 # thread because the delay can throw off command line prompts or 366 # make tight debugging harder. 367 if self._echofile is not None: 368 # try: 369 # if self._report_blocking_io_on_echo_error: 370 # premsg = ( 371 # 'WARNING: BlockingIOError ON LOG ECHO OUTPUT;' 372 # ' YOU ARE PROBABLY MISSING LOGS\n' 373 # ) 374 # self._report_blocking_io_on_echo_error = False 375 # else: 376 # premsg = '' 377 ends = LEVELNO_COLOR_CODES.get(record.levelno) 378 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 379 if ends is not None: 380 self._echofile.write( 381 f'{namepre}{ends[0]}' 382 f'{msg}{ends[1]}\n' 383 # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n' 384 ) 385 else: 386 self._echofile.write(f'{namepre}{msg}\n') 387 self._echofile.flush() 388 # except BlockingIOError: 389 # # Ran into this when doing a bunch of logging; assuming 390 # # this is asyncio's doing?.. For now trying to survive 391 # # the error but telling the user something is probably 392 # # missing in their output. 393 # self._report_blocking_io_on_echo_error = True 394 395 if __debug__: 396 echotime = time.monotonic() 397 398 self._event_loop.call_soon_threadsafe( 399 partial( 400 self._emit_in_thread, 401 record.name, 402 record.levelno, 403 record.created, 404 msg, 405 labels, 406 ) 407 ) 408 409 if __debug__: 410 # pylint: disable=used-before-assignment 411 # Make noise if we're taking a significant amount of time here. 412 # Limit the noise to once every so often though; otherwise we 413 # could get a feedback loop where every log emit results in a 414 # warning log which results in another, etc. 415 now = time.monotonic() 416 # noinspection PyUnboundLocalVariable 417 duration = now - starttime # pyright: ignore 418 # noinspection PyUnboundLocalVariable 419 format_duration = formattime - starttime # pyright: ignore 420 # noinspection PyUnboundLocalVariable 421 echo_duration = echotime - formattime # pyright: ignore 422 if duration > 0.05 and ( 423 self._last_slow_emit_warning_time is None 424 or now > self._last_slow_emit_warning_time + 10.0 425 ): 426 # Logging calls from *within* a logging handler 427 # sounds sketchy, so let's just kick this over to 428 # the bg event loop thread we've already got. 429 self._last_slow_emit_warning_time = now 430 self._event_loop.call_soon_threadsafe( 431 partial( 432 logging.warning, 433 'efro.log.LogHandler emit took too long' 434 ' (%.2fs total; %.2fs format, %.2fs echo,' 435 ' fast_path=%s).', 436 duration, 437 format_duration, 438 echo_duration, 439 fast_path, 440 ) 441 )
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
472 def file_write(self, name: str, output: str) -> None: 473 """Send raw stdout/stderr output to the logger to be collated.""" 474 475 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 476 # lines in stack traces get written as lots of individual '^' 477 # writes. It feels a bit dirty to be pushing a deferred call to 478 # another thread for each character. Perhaps should do some sort 479 # of basic accumulation here? 480 self._event_loop.call_soon_threadsafe( 481 partial(self._file_write_in_thread, name, output) 482 )
Send raw stdout/stderr output to the logger to be collated.
518 def shutdown(self) -> None: 519 """Block until all pending logs/prints are done.""" 520 done = False 521 self.file_flush('stdout') 522 self.file_flush('stderr') 523 524 def _set_done() -> None: 525 nonlocal done 526 done = True 527 528 self._event_loop.call_soon_threadsafe(_set_done) 529 530 starttime = time.monotonic() 531 while not done: 532 if time.monotonic() - starttime > 5.0: 533 print('LogHandler shutdown hung!!!', file=sys.stderr) 534 break 535 time.sleep(0.01)
Block until all pending logs/prints are done.
537 def file_flush(self, name: str) -> None: 538 """Send raw stdout/stderr flush to the logger to be collated.""" 539 540 self._event_loop.call_soon_threadsafe( 541 partial(self._file_flush_in_thread, name) 542 )
Send raw stdout/stderr flush to the logger to be collated.
Inherited Members
- logging.Handler
- level
- formatter
- get_name
- set_name
- name
- createLock
- acquire
- release
- setLevel
- format
- handle
- setFormatter
- flush
- close
- handleError
- logging.Filterer
- filters
- addFilter
- removeFilter
- filter
635class FileLogEcho: 636 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 637 638 def __init__( 639 self, original: TextIO, name: str, handler: LogHandler 640 ) -> None: 641 assert name in ('stdout', 'stderr') 642 self._original = original 643 self._name = name 644 self._handler = handler 645 646 # Think this was a result of setting non-blocking stdin somehow; 647 # probably not needed. 648 # self._report_blocking_io_error = False 649 650 def write(self, output: Any) -> None: 651 """Override standard write call.""" 652 # try: 653 # if self._report_blocking_io_error: 654 # self._report_blocking_io_error = False 655 # self._original.write( 656 # 'WARNING: BlockingIOError ENCOUNTERED;' 657 # ' OUTPUT IS PROBABLY MISSING' 658 # ) 659 660 self._original.write(output) 661 # except BlockingIOError: 662 # self._report_blocking_io_error = True 663 self._handler.file_write(self._name, output) 664 665 def flush(self) -> None: 666 """Flush the file.""" 667 self._original.flush() 668 669 # We also use this as a hint to ship whatever file chunks 670 # we've accumulated (we have to try and be smart about breaking 671 # our arbitrary file output into discrete entries). 672 self._handler.file_flush(self._name) 673 674 def isatty(self) -> bool: 675 """Are we a terminal?""" 676 return self._original.isatty()
A file-like object for forwarding stdout/stderr to a LogHandler.
638 def __init__( 639 self, original: TextIO, name: str, handler: LogHandler 640 ) -> None: 641 assert name in ('stdout', 'stderr') 642 self._original = original 643 self._name = name 644 self._handler = handler 645 646 # Think this was a result of setting non-blocking stdin somehow; 647 # probably not needed. 648 # self._report_blocking_io_error = False
650 def write(self, output: Any) -> None: 651 """Override standard write call.""" 652 # try: 653 # if self._report_blocking_io_error: 654 # self._report_blocking_io_error = False 655 # self._original.write( 656 # 'WARNING: BlockingIOError ENCOUNTERED;' 657 # ' OUTPUT IS PROBABLY MISSING' 658 # ) 659 660 self._original.write(output) 661 # except BlockingIOError: 662 # self._report_blocking_io_error = True 663 self._handler.file_write(self._name, output)
Override standard write call.
665 def flush(self) -> None: 666 """Flush the file.""" 667 self._original.flush() 668 669 # We also use this as a hint to ship whatever file chunks 670 # we've accumulated (we have to try and be smart about breaking 671 # our arbitrary file output into discrete entries). 672 self._handler.file_flush(self._name)
Flush the file.
679def setup_logging( 680 log_path: str | Path | None, 681 level: LogLevel, 682 suppress_non_root_debug: bool = False, 683 log_stdout_stderr: bool = False, 684 echo_to_stderr: bool = True, 685 cache_size_limit: int = 0, 686 cache_time_limit: datetime.timedelta | None = None, 687) -> LogHandler: 688 """Set up our logging environment. 689 690 Returns the custom handler which can be used to fetch information 691 about logs that have passed through it. (worst log-levels, caches, etc.). 692 """ 693 694 lmap = { 695 LogLevel.DEBUG: logging.DEBUG, 696 LogLevel.INFO: logging.INFO, 697 LogLevel.WARNING: logging.WARNING, 698 LogLevel.ERROR: logging.ERROR, 699 LogLevel.CRITICAL: logging.CRITICAL, 700 } 701 702 # Wire logger output to go to a structured log file. 703 # Also echo it to stderr IF we're running in a terminal. 704 # UPDATE: Actually gonna always go to stderr. Is there a 705 # reason we shouldn't? This makes debugging possible if all 706 # we have is access to a non-interactive terminal or file dump. 707 # We could add a '--quiet' arg or whatnot to change this behavior. 708 709 # Note: by passing in the *original* stderr here before we 710 # (potentially) replace it, we ensure that our log echos 711 # won't themselves be intercepted and sent to the logger 712 # which would create an infinite loop. 713 loghandler = LogHandler( 714 log_path, 715 echofile=sys.stderr if echo_to_stderr else None, 716 suppress_non_root_debug=suppress_non_root_debug, 717 cache_size_limit=cache_size_limit, 718 cache_time_limit=cache_time_limit, 719 ) 720 721 # Note: going ahead with force=True here so that we replace any 722 # existing logger. Though we warn if it looks like we are doing 723 # that so we can try to avoid creating the first one. 724 had_previous_handlers = bool(logging.root.handlers) 725 logging.basicConfig( 726 level=lmap[level], 727 # format='%(name)s: %(message)s', 728 # We dump *only* the message here. We pass various log record bits 729 # around and format things fancier where they end up. 730 format='%(message)s', 731 handlers=[loghandler], 732 force=True, 733 ) 734 if had_previous_handlers: 735 logging.warning( 736 'setup_logging: Replacing existing handlers.' 737 ' Something may have logged before expected.' 738 ) 739 740 # Optionally intercept Python's stdout/stderr output and generate 741 # log entries from it. 742 if log_stdout_stderr: 743 sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler) 744 sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler) 745 746 return loghandler
Set up our logging environment.
Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).