efro.log
Logging functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Logging functionality.""" 4from __future__ import annotations 5 6import sys 7import time 8import asyncio 9import logging 10import datetime 11import itertools 12from enum import Enum 13from functools import partial 14from collections import deque 15from dataclasses import dataclass, field 16from typing import TYPE_CHECKING, Annotated, override 17from threading import Thread, current_thread, Lock 18 19from efro.util import utc_now 20from efro.terminal import Clr 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json 22 23if TYPE_CHECKING: 24 from pathlib import Path 25 from typing import Any, Callable, TextIO 26 27 28class LogLevel(Enum): 29 """Severity level for a log entry. 30 31 These enums have numeric values so they can be compared in severity. 32 Note that these values are not currently interchangeable with the 33 logging.ERROR, logging.DEBUG, etc. values. 34 """ 35 36 DEBUG = 0 37 INFO = 1 38 WARNING = 2 39 ERROR = 3 40 CRITICAL = 4 41 42 @property 43 def python_logging_level(self) -> int: 44 """Give the corresponding logging level.""" 45 return LOG_LEVEL_LEVELNOS[self] 46 47 @classmethod 48 def from_python_logging_level(cls, levelno: int) -> LogLevel: 49 """Given a Python logging level, return a LogLevel.""" 50 return LEVELNO_LOG_LEVELS[levelno] 51 52 53# Python logging levels from LogLevels 54LOG_LEVEL_LEVELNOS = { 55 LogLevel.DEBUG: logging.DEBUG, 56 LogLevel.INFO: logging.INFO, 57 LogLevel.WARNING: logging.WARNING, 58 LogLevel.ERROR: logging.ERROR, 59 LogLevel.CRITICAL: logging.CRITICAL, 60} 61 62# LogLevels from Python logging levels 63LEVELNO_LOG_LEVELS = { 64 logging.DEBUG: LogLevel.DEBUG, 65 logging.INFO: LogLevel.INFO, 66 logging.WARNING: LogLevel.WARNING, 67 logging.ERROR: LogLevel.ERROR, 68 logging.CRITICAL: LogLevel.CRITICAL, 69} 70 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = { 72 logging.DEBUG: (Clr.CYN, Clr.RST), 73 logging.INFO: ('', ''), 74 logging.WARNING: (Clr.YLW, Clr.RST), 75 logging.ERROR: (Clr.RED, Clr.RST), 76 logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST), 77} 78 79 80@ioprepped 81@dataclass 82class LogEntry: 83 """Single logged message.""" 84 85 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 86 message: Annotated[str, IOAttrs('m')] 87 level: Annotated[LogLevel, IOAttrs('l')] 88 time: Annotated[datetime.datetime, IOAttrs('t')] 89 90 # We support arbitrary string labels per log entry which can be 91 # incorporated into custom log processing. To populate this, our 92 # LogHandler class looks for a 'labels' dict passed in the optional 93 # 'extra' dict arg to standard Python log calls. 94 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 95 field(default_factory=dict) 96 ) 97 98 99@ioprepped 100@dataclass 101class LogArchive: 102 """Info and data for a log.""" 103 104 # Total number of entries submitted to the log. 105 log_size: Annotated[int, IOAttrs('t')] 106 107 # Offset for the entries contained here. 108 # (10 means our first entry is the 10th in the log, etc.) 109 start_index: Annotated[int, IOAttrs('c')] 110 111 entries: Annotated[list[LogEntry], IOAttrs('e')] 112 113 114class LogHandler(logging.Handler): 115 """Fancy-pants handler for logging output. 116 117 Writes logs to disk in structured json format and echoes them 118 to stdout/stderr with pretty colors. 119 """ 120 121 _event_loop: asyncio.AbstractEventLoop 122 123 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 124 # Otherwise we can get infinite loops as those prints come back to us 125 # as new log entries. 126 127 def __init__( 128 self, 129 *, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001) 166 167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 partial(self._add_callback_in_thread, call, feed_existing_logs) 183 ) 184 185 def _add_callback_in_thread( 186 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 187 ) -> None: 188 """Add a callback to be run for each LogEntry. 189 190 Note that this callback will always run in a background thread. 191 Passing True for feed_existing_logs will cause all cached logs 192 in the handler to be fed to the callback (still in the 193 background thread though). 194 """ 195 assert current_thread() is self._thread 196 self._callbacks.append(call) 197 198 # Run all of our cached entries through the new callback if desired. 199 if feed_existing_logs and self._cache_size_limit > 0: 200 with self._cache_lock: 201 for _id, entry in self._cache: 202 self._run_callback_on_entry(call, entry) 203 204 def _log_thread_main(self) -> None: 205 self._event_loop = asyncio.new_event_loop() 206 207 # In our background thread event loop we do a fair amount of 208 # slow synchronous stuff such as mucking with the log cache. 209 # Let's avoid getting tons of warnings about this in debug mode. 210 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 211 212 # NOTE: if we ever use default threadpool at all we should allow 213 # setting it for our loop. 214 asyncio.set_event_loop(self._event_loop) 215 self._thread_bootstrapped = True 216 try: 217 if self._cache_time_limit is not None: 218 _prunetask = self._event_loop.create_task( 219 self._time_prune_cache() 220 ) 221 self._event_loop.run_forever() 222 except BaseException: 223 # If this ever goes down we're in trouble; we won't be able 224 # to log about it though. Try to make some noise however we 225 # can. 226 print('LogHandler died!!!', file=sys.stderr) 227 import traceback 228 229 traceback.print_exc() 230 raise 231 232 async def _time_prune_cache(self) -> None: 233 assert self._cache_time_limit is not None 234 while bool(True): 235 await asyncio.sleep(61.27) 236 now = utc_now() 237 with self._cache_lock: 238 # Prune the oldest entry as long as there is a first one that 239 # is too old. 240 while ( 241 self._cache 242 and (now - self._cache[0][1].time) >= self._cache_time_limit 243 ): 244 popped = self._cache.popleft() 245 self._cache_size -= popped[0] 246 self._cache_index_offset += 1 247 248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 ) 282 283 def _cache_slice( 284 self, start: int, end: int, step: int = 1 285 ) -> list[LogEntry]: 286 # Deque doesn't natively support slicing but we can do it manually. 287 # It sounds like rotating the deque and pulling from the beginning 288 # is the most efficient way to do this. The downside is the deque 289 # gets temporarily modified in the process so we need to make sure 290 # we're holding the lock. 291 assert self._cache_lock.locked() 292 cache = self._cache 293 cache.rotate(-start) 294 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 295 cache.rotate(start) 296 return slc 297 298 @classmethod 299 def _is_immutable_log_data(cls, data: Any) -> bool: 300 if isinstance(data, (str, bool, int, float, bytes)): 301 return True 302 if isinstance(data, tuple): 303 return all(cls._is_immutable_log_data(x) for x in data) 304 return False 305 306 def call_in_thread(self, call: Callable[[], Any]) -> None: 307 """Submit a call to be run in the logging background thread.""" 308 self._event_loop.call_soon_threadsafe(call) 309 310 @override 311 def emit(self, record: logging.LogRecord) -> None: 312 # pylint: disable=too-many-branches 313 if __debug__: 314 starttime = time.monotonic() 315 316 # Called by logging to send us records. 317 318 # TODO - kill this. 319 if ( 320 self._suppress_non_root_debug 321 and record.name != 'root' 322 and record.levelname == 'DEBUG' 323 ): 324 return 325 326 # Optimization: if our log args are all simple immutable values, 327 # we can just kick the whole thing over to our background thread to 328 # be formatted there at our leisure. If anything is mutable and 329 # thus could possibly change between now and then or if we want 330 # to do immediate file echoing then we need to bite the bullet 331 # and do that stuff here at the call site. 332 fast_path = self._echofile is None and self._is_immutable_log_data( 333 record.args 334 ) 335 336 # Note: just assuming types are correct here, but they'll be 337 # checked properly when the resulting LogEntry gets exported. 338 labels: dict[str, str] | None = getattr(record, 'labels', None) 339 if labels is None: 340 labels = {} 341 342 if fast_path: 343 if __debug__: 344 formattime = echotime = time.monotonic() 345 self._event_loop.call_soon_threadsafe( 346 partial( 347 self._emit_in_thread, 348 record.name, 349 record.levelno, 350 record.created, 351 record, 352 labels, 353 ) 354 ) 355 else: 356 # Slow case; do formatting and echoing here at the log call 357 # site. 358 msg = self.format(record) 359 360 if __debug__: 361 formattime = time.monotonic() 362 363 # Also immediately print pretty colored output to our echo file 364 # (generally stderr). We do this part here instead of in our bg 365 # thread because the delay can throw off command line prompts or 366 # make tight debugging harder. 367 if self._echofile is not None: 368 # try: 369 # if self._report_blocking_io_on_echo_error: 370 # premsg = ( 371 # 'WARNING: BlockingIOError ON LOG ECHO OUTPUT;' 372 # ' YOU ARE PROBABLY MISSING LOGS\n' 373 # ) 374 # self._report_blocking_io_on_echo_error = False 375 # else: 376 # premsg = '' 377 ends = LEVELNO_COLOR_CODES.get(record.levelno) 378 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 379 if ends is not None: 380 self._echofile.write( 381 f'{namepre}{ends[0]}' 382 f'{msg}{ends[1]}\n' 383 # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n' 384 ) 385 else: 386 self._echofile.write(f'{namepre}{msg}\n') 387 self._echofile.flush() 388 # except BlockingIOError: 389 # # Ran into this when doing a bunch of logging; assuming 390 # # this is asyncio's doing?.. For now trying to survive 391 # # the error but telling the user something is probably 392 # # missing in their output. 393 # self._report_blocking_io_on_echo_error = True 394 395 if __debug__: 396 echotime = time.monotonic() 397 398 self._event_loop.call_soon_threadsafe( 399 partial( 400 self._emit_in_thread, 401 record.name, 402 record.levelno, 403 record.created, 404 msg, 405 labels, 406 ) 407 ) 408 409 if __debug__: 410 # pylint: disable=used-before-assignment 411 # Make noise if we're taking a significant amount of time here. 412 # Limit the noise to once every so often though; otherwise we 413 # could get a feedback loop where every log emit results in a 414 # warning log which results in another, etc. 415 now = time.monotonic() 416 # noinspection PyUnboundLocalVariable 417 duration = now - starttime # pyright: ignore 418 # noinspection PyUnboundLocalVariable 419 format_duration = formattime - starttime # pyright: ignore 420 # noinspection PyUnboundLocalVariable 421 echo_duration = echotime - formattime # pyright: ignore 422 if duration > 0.05 and ( 423 self._last_slow_emit_warning_time is None 424 or now > self._last_slow_emit_warning_time + 10.0 425 ): 426 # Logging calls from *within* a logging handler 427 # sounds sketchy, so let's just kick this over to 428 # the bg event loop thread we've already got. 429 self._last_slow_emit_warning_time = now 430 self._event_loop.call_soon_threadsafe( 431 partial( 432 logging.warning, 433 'efro.log.LogHandler emit took too long' 434 ' (%.2fs total; %.2fs format, %.2fs echo,' 435 ' fast_path=%s).', 436 duration, 437 format_duration, 438 echo_duration, 439 fast_path, 440 ) 441 ) 442 443 def _emit_in_thread( 444 self, 445 name: str, 446 levelno: int, 447 created: float, 448 message: str | logging.LogRecord, 449 labels: dict[str, str], 450 ) -> None: 451 # pylint: disable=too-many-positional-arguments 452 try: 453 # If they passed a raw record here, bake it down to a string. 454 if isinstance(message, logging.LogRecord): 455 message = self.format(message) 456 457 self._emit_entry( 458 LogEntry( 459 name=name, 460 message=message, 461 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 462 time=datetime.datetime.fromtimestamp( 463 created, datetime.timezone.utc 464 ), 465 labels=labels, 466 ) 467 ) 468 except Exception: 469 import traceback 470 471 traceback.print_exc(file=self._echofile) 472 473 def file_write(self, name: str, output: str) -> None: 474 """Send raw stdout/stderr output to the logger to be collated.""" 475 476 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 477 # lines in stack traces get written as lots of individual '^' 478 # writes. It feels a bit dirty to be pushing a deferred call to 479 # another thread for each character. Perhaps should do some sort 480 # of basic accumulation here? 481 self._event_loop.call_soon_threadsafe( 482 partial(self._file_write_in_thread, name, output) 483 ) 484 485 def _file_write_in_thread(self, name: str, output: str) -> None: 486 try: 487 assert name in ('stdout', 'stderr') 488 489 # Here we try to be somewhat smart about breaking arbitrary 490 # print output into discrete log entries. 491 492 self._file_chunks[name].append(output) 493 494 # Individual parts of a print come across as separate writes, 495 # and the end of a print will be a standalone '\n' by default. 496 # Let's use that as a hint that we're likely at the end of 497 # a full print statement and ship what we've got. 498 if output == '\n': 499 self._ship_file_chunks(name, cancel_ship_task=True) 500 else: 501 # By default just keep adding chunks. 502 # However we keep a timer running anytime we've got 503 # unshipped chunks so that we can ship what we've got 504 # after a short bit if we never get a newline. 505 ship_task = self._file_chunk_ship_task[name] 506 if ship_task is None: 507 self._file_chunk_ship_task[name] = ( 508 self._event_loop.create_task( 509 self._ship_chunks_task(name), 510 name='log ship file chunks', 511 ) 512 ) 513 514 except Exception: 515 import traceback 516 517 traceback.print_exc(file=self._echofile) 518 519 def shutdown(self) -> None: 520 """Block until all pending logs/prints are done.""" 521 done = False 522 self.file_flush('stdout') 523 self.file_flush('stderr') 524 525 def _set_done() -> None: 526 nonlocal done 527 done = True 528 529 self._event_loop.call_soon_threadsafe(_set_done) 530 531 starttime = time.monotonic() 532 while not done: 533 if time.monotonic() - starttime > 5.0: 534 print('LogHandler shutdown hung!!!', file=sys.stderr) 535 break 536 time.sleep(0.01) 537 538 def file_flush(self, name: str) -> None: 539 """Send raw stdout/stderr flush to the logger to be collated.""" 540 541 self._event_loop.call_soon_threadsafe( 542 partial(self._file_flush_in_thread, name) 543 ) 544 545 def _file_flush_in_thread(self, name: str) -> None: 546 try: 547 assert name in ('stdout', 'stderr') 548 549 # Immediately ship whatever chunks we've got. 550 if self._file_chunks[name]: 551 self._ship_file_chunks(name, cancel_ship_task=True) 552 553 except Exception: 554 import traceback 555 556 traceback.print_exc(file=self._echofile) 557 558 async def _ship_chunks_task(self, name: str) -> None: 559 # Note: it's important we sleep here for a moment. Otherwise, 560 # things like '^^^^^^^^^^^^' lines in stack traces, which come 561 # through as lots of individual '^' writes, tend to get broken 562 # into lots of tiny little lines by us. 563 await asyncio.sleep(0.01) 564 self._ship_file_chunks(name, cancel_ship_task=False) 565 566 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 567 # Note: Raw print input generally ends in a newline, but that is 568 # redundant when we break things into log entries and results in 569 # extra empty lines. So strip off a single trailing newline if 570 # one is present. 571 text = ''.join(self._file_chunks[name]).removesuffix('\n') 572 573 self._emit_entry( 574 LogEntry( 575 name=name, message=text, level=LogLevel.INFO, time=utc_now() 576 ) 577 ) 578 self._file_chunks[name] = [] 579 ship_task = self._file_chunk_ship_task[name] 580 if cancel_ship_task and ship_task is not None: 581 ship_task.cancel() 582 self._file_chunk_ship_task[name] = None 583 584 def _emit_entry(self, entry: LogEntry) -> None: 585 assert current_thread() is self._thread 586 587 # Store to our cache. 588 if self._cache_size_limit > 0: 589 with self._cache_lock: 590 # Do a rough calc of how many bytes this entry consumes. 591 entry_size = sum( 592 sys.getsizeof(x) 593 for x in ( 594 entry, 595 entry.name, 596 entry.message, 597 entry.level, 598 entry.time, 599 ) 600 ) 601 self._cache.append((entry_size, entry)) 602 self._cache_size += entry_size 603 604 # Prune old until we are back at or under our limit. 605 while self._cache_size > self._cache_size_limit: 606 popped = self._cache.popleft() 607 self._cache_size -= popped[0] 608 self._cache_index_offset += 1 609 610 # Pass to callbacks. 611 for call in self._callbacks: 612 self._run_callback_on_entry(call, entry) 613 614 # Dump to our structured log file. 615 # TODO: should set a timer for flushing; don't flush every line. 616 if self._file is not None: 617 entry_s = dataclass_to_json(entry) 618 assert '\n' not in entry_s # Make sure its a single line. 619 print(entry_s, file=self._file, flush=True) 620 621 def _run_callback_on_entry( 622 self, callback: Callable[[LogEntry], None], entry: LogEntry 623 ) -> None: 624 """Run a callback and handle any errors.""" 625 try: 626 callback(entry) 627 except Exception: 628 # Only print the first callback error to avoid insanity. 629 if not self._printed_callback_error: 630 import traceback 631 632 traceback.print_exc(file=self._echofile) 633 self._printed_callback_error = True 634 635 636class FileLogEcho: 637 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 638 639 def __init__( 640 self, original: TextIO, name: str, handler: LogHandler 641 ) -> None: 642 assert name in ('stdout', 'stderr') 643 self._original = original 644 self._name = name 645 self._handler = handler 646 647 # Think this was a result of setting non-blocking stdin somehow; 648 # probably not needed. 649 # self._report_blocking_io_error = False 650 651 def write(self, output: Any) -> None: 652 """Override standard write call.""" 653 # try: 654 # if self._report_blocking_io_error: 655 # self._report_blocking_io_error = False 656 # self._original.write( 657 # 'WARNING: BlockingIOError ENCOUNTERED;' 658 # ' OUTPUT IS PROBABLY MISSING' 659 # ) 660 661 self._original.write(output) 662 # except BlockingIOError: 663 # self._report_blocking_io_error = True 664 self._handler.file_write(self._name, output) 665 666 def flush(self) -> None: 667 """Flush the file.""" 668 self._original.flush() 669 670 # We also use this as a hint to ship whatever file chunks 671 # we've accumulated (we have to try and be smart about breaking 672 # our arbitrary file output into discrete entries). 673 self._handler.file_flush(self._name) 674 675 def isatty(self) -> bool: 676 """Are we a terminal?""" 677 return self._original.isatty() 678 679 680def setup_logging( 681 log_path: str | Path | None, 682 level: LogLevel, 683 *, 684 suppress_non_root_debug: bool = False, 685 log_stdout_stderr: bool = False, 686 echo_to_stderr: bool = True, 687 cache_size_limit: int = 0, 688 cache_time_limit: datetime.timedelta | None = None, 689) -> LogHandler: 690 """Set up our logging environment. 691 692 Returns the custom handler which can be used to fetch information 693 about logs that have passed through it. (worst log-levels, caches, etc.). 694 """ 695 696 lmap = { 697 LogLevel.DEBUG: logging.DEBUG, 698 LogLevel.INFO: logging.INFO, 699 LogLevel.WARNING: logging.WARNING, 700 LogLevel.ERROR: logging.ERROR, 701 LogLevel.CRITICAL: logging.CRITICAL, 702 } 703 704 # Wire logger output to go to a structured log file. 705 # Also echo it to stderr IF we're running in a terminal. 706 # UPDATE: Actually gonna always go to stderr. Is there a 707 # reason we shouldn't? This makes debugging possible if all 708 # we have is access to a non-interactive terminal or file dump. 709 # We could add a '--quiet' arg or whatnot to change this behavior. 710 711 # Note: by passing in the *original* stderr here before we 712 # (potentially) replace it, we ensure that our log echos 713 # won't themselves be intercepted and sent to the logger 714 # which would create an infinite loop. 715 loghandler = LogHandler( 716 path=log_path, 717 echofile=sys.stderr if echo_to_stderr else None, 718 suppress_non_root_debug=suppress_non_root_debug, 719 cache_size_limit=cache_size_limit, 720 cache_time_limit=cache_time_limit, 721 ) 722 723 # Note: going ahead with force=True here so that we replace any 724 # existing logger. Though we warn if it looks like we are doing 725 # that so we can try to avoid creating the first one. 726 had_previous_handlers = bool(logging.root.handlers) 727 logging.basicConfig( 728 level=lmap[level], 729 # format='%(name)s: %(message)s', 730 # We dump *only* the message here. We pass various log record bits 731 # around and format things fancier where they end up. 732 format='%(message)s', 733 handlers=[loghandler], 734 force=True, 735 ) 736 if had_previous_handlers: 737 logging.warning( 738 'setup_logging: Replacing existing handlers.' 739 ' Something may have logged before expected.' 740 ) 741 742 # Optionally intercept Python's stdout/stderr output and generate 743 # log entries from it. 744 if log_stdout_stderr: 745 sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler) 746 sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler) 747 748 return loghandler
29class LogLevel(Enum): 30 """Severity level for a log entry. 31 32 These enums have numeric values so they can be compared in severity. 33 Note that these values are not currently interchangeable with the 34 logging.ERROR, logging.DEBUG, etc. values. 35 """ 36 37 DEBUG = 0 38 INFO = 1 39 WARNING = 2 40 ERROR = 3 41 CRITICAL = 4 42 43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self] 47 48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Severity level for a log entry.
These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.
43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self]
Give the corresponding logging level.
48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Given a Python logging level, return a LogLevel.
Inherited Members
- enum.Enum
- name
- value
81@ioprepped 82@dataclass 83class LogEntry: 84 """Single logged message.""" 85 86 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 87 message: Annotated[str, IOAttrs('m')] 88 level: Annotated[LogLevel, IOAttrs('l')] 89 time: Annotated[datetime.datetime, IOAttrs('t')] 90 91 # We support arbitrary string labels per log entry which can be 92 # incorporated into custom log processing. To populate this, our 93 # LogHandler class looks for a 'labels' dict passed in the optional 94 # 'extra' dict arg to standard Python log calls. 95 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 96 field(default_factory=dict) 97 )
Single logged message.
100@ioprepped 101@dataclass 102class LogArchive: 103 """Info and data for a log.""" 104 105 # Total number of entries submitted to the log. 106 log_size: Annotated[int, IOAttrs('t')] 107 108 # Offset for the entries contained here. 109 # (10 means our first entry is the 10th in the log, etc.) 110 start_index: Annotated[int, IOAttrs('c')] 111 112 entries: Annotated[list[LogEntry], IOAttrs('e')]
Info and data for a log.
115class LogHandler(logging.Handler): 116 """Fancy-pants handler for logging output. 117 118 Writes logs to disk in structured json format and echoes them 119 to stdout/stderr with pretty colors. 120 """ 121 122 _event_loop: asyncio.AbstractEventLoop 123 124 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 125 # Otherwise we can get infinite loops as those prints come back to us 126 # as new log entries. 127 128 def __init__( 129 self, 130 *, 131 path: str | Path | None, 132 echofile: TextIO | None, 133 suppress_non_root_debug: bool, 134 cache_size_limit: int, 135 cache_time_limit: datetime.timedelta | None, 136 ): 137 super().__init__() 138 # pylint: disable=consider-using-with 139 self._file = None if path is None else open(path, 'w', encoding='utf-8') 140 self._echofile = echofile 141 self._callbacks: list[Callable[[LogEntry], None]] = [] 142 self._suppress_non_root_debug = suppress_non_root_debug 143 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 144 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 145 'stdout': None, 146 'stderr': None, 147 } 148 self._cache_size = 0 149 assert cache_size_limit >= 0 150 self._cache_size_limit = cache_size_limit 151 self._cache_time_limit = cache_time_limit 152 self._cache = deque[tuple[int, LogEntry]]() 153 self._cache_index_offset = 0 154 self._cache_lock = Lock() 155 self._printed_callback_error = False 156 self._thread_bootstrapped = False 157 self._thread = Thread(target=self._log_thread_main, daemon=True) 158 if __debug__: 159 self._last_slow_emit_warning_time: float | None = None 160 self._thread.start() 161 162 # Spin until our thread is up and running; otherwise we could 163 # wind up trying to push stuff to our event loop before the 164 # loop exists. 165 while not self._thread_bootstrapped: 166 time.sleep(0.001) 167 168 def add_callback( 169 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 170 ) -> None: 171 """Add a callback to be run for each LogEntry. 172 173 Note that this callback will always run in a background thread. 174 Passing True for feed_existing_logs will cause all cached logs 175 in the handler to be fed to the callback (still in the 176 background thread though). 177 """ 178 179 # Kick this over to our bg thread to add the callback and 180 # process cached entries at the same time to ensure there are no 181 # race conditions that could cause entries to be skipped/etc. 182 self._event_loop.call_soon_threadsafe( 183 partial(self._add_callback_in_thread, call, feed_existing_logs) 184 ) 185 186 def _add_callback_in_thread( 187 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 188 ) -> None: 189 """Add a callback to be run for each LogEntry. 190 191 Note that this callback will always run in a background thread. 192 Passing True for feed_existing_logs will cause all cached logs 193 in the handler to be fed to the callback (still in the 194 background thread though). 195 """ 196 assert current_thread() is self._thread 197 self._callbacks.append(call) 198 199 # Run all of our cached entries through the new callback if desired. 200 if feed_existing_logs and self._cache_size_limit > 0: 201 with self._cache_lock: 202 for _id, entry in self._cache: 203 self._run_callback_on_entry(call, entry) 204 205 def _log_thread_main(self) -> None: 206 self._event_loop = asyncio.new_event_loop() 207 208 # In our background thread event loop we do a fair amount of 209 # slow synchronous stuff such as mucking with the log cache. 210 # Let's avoid getting tons of warnings about this in debug mode. 211 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 212 213 # NOTE: if we ever use default threadpool at all we should allow 214 # setting it for our loop. 215 asyncio.set_event_loop(self._event_loop) 216 self._thread_bootstrapped = True 217 try: 218 if self._cache_time_limit is not None: 219 _prunetask = self._event_loop.create_task( 220 self._time_prune_cache() 221 ) 222 self._event_loop.run_forever() 223 except BaseException: 224 # If this ever goes down we're in trouble; we won't be able 225 # to log about it though. Try to make some noise however we 226 # can. 227 print('LogHandler died!!!', file=sys.stderr) 228 import traceback 229 230 traceback.print_exc() 231 raise 232 233 async def _time_prune_cache(self) -> None: 234 assert self._cache_time_limit is not None 235 while bool(True): 236 await asyncio.sleep(61.27) 237 now = utc_now() 238 with self._cache_lock: 239 # Prune the oldest entry as long as there is a first one that 240 # is too old. 241 while ( 242 self._cache 243 and (now - self._cache[0][1].time) >= self._cache_time_limit 244 ): 245 popped = self._cache.popleft() 246 self._cache_size -= popped[0] 247 self._cache_index_offset += 1 248 249 def get_cached( 250 self, start_index: int = 0, max_entries: int | None = None 251 ) -> LogArchive: 252 """Build and return an archive of cached log entries. 253 254 This will only include entries that have been processed by the 255 background thread, so may not include just-submitted logs or 256 entries for partially written stdout/stderr lines. 257 Entries from the range [start_index:start_index+max_entries] 258 which are still present in the cache will be returned. 259 """ 260 261 assert start_index >= 0 262 if max_entries is not None: 263 assert max_entries >= 0 264 with self._cache_lock: 265 # Transform start_index to our present cache space. 266 start_index -= self._cache_index_offset 267 # Calc end-index in our present cache space. 268 end_index = ( 269 len(self._cache) 270 if max_entries is None 271 else start_index + max_entries 272 ) 273 274 # Clamp both indexes to both ends of our present space. 275 start_index = max(0, min(start_index, len(self._cache))) 276 end_index = max(0, min(end_index, len(self._cache))) 277 278 return LogArchive( 279 log_size=self._cache_index_offset + len(self._cache), 280 start_index=start_index + self._cache_index_offset, 281 entries=self._cache_slice(start_index, end_index), 282 ) 283 284 def _cache_slice( 285 self, start: int, end: int, step: int = 1 286 ) -> list[LogEntry]: 287 # Deque doesn't natively support slicing but we can do it manually. 288 # It sounds like rotating the deque and pulling from the beginning 289 # is the most efficient way to do this. The downside is the deque 290 # gets temporarily modified in the process so we need to make sure 291 # we're holding the lock. 292 assert self._cache_lock.locked() 293 cache = self._cache 294 cache.rotate(-start) 295 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 296 cache.rotate(start) 297 return slc 298 299 @classmethod 300 def _is_immutable_log_data(cls, data: Any) -> bool: 301 if isinstance(data, (str, bool, int, float, bytes)): 302 return True 303 if isinstance(data, tuple): 304 return all(cls._is_immutable_log_data(x) for x in data) 305 return False 306 307 def call_in_thread(self, call: Callable[[], Any]) -> None: 308 """Submit a call to be run in the logging background thread.""" 309 self._event_loop.call_soon_threadsafe(call) 310 311 @override 312 def emit(self, record: logging.LogRecord) -> None: 313 # pylint: disable=too-many-branches 314 if __debug__: 315 starttime = time.monotonic() 316 317 # Called by logging to send us records. 318 319 # TODO - kill this. 320 if ( 321 self._suppress_non_root_debug 322 and record.name != 'root' 323 and record.levelname == 'DEBUG' 324 ): 325 return 326 327 # Optimization: if our log args are all simple immutable values, 328 # we can just kick the whole thing over to our background thread to 329 # be formatted there at our leisure. If anything is mutable and 330 # thus could possibly change between now and then or if we want 331 # to do immediate file echoing then we need to bite the bullet 332 # and do that stuff here at the call site. 333 fast_path = self._echofile is None and self._is_immutable_log_data( 334 record.args 335 ) 336 337 # Note: just assuming types are correct here, but they'll be 338 # checked properly when the resulting LogEntry gets exported. 339 labels: dict[str, str] | None = getattr(record, 'labels', None) 340 if labels is None: 341 labels = {} 342 343 if fast_path: 344 if __debug__: 345 formattime = echotime = time.monotonic() 346 self._event_loop.call_soon_threadsafe( 347 partial( 348 self._emit_in_thread, 349 record.name, 350 record.levelno, 351 record.created, 352 record, 353 labels, 354 ) 355 ) 356 else: 357 # Slow case; do formatting and echoing here at the log call 358 # site. 359 msg = self.format(record) 360 361 if __debug__: 362 formattime = time.monotonic() 363 364 # Also immediately print pretty colored output to our echo file 365 # (generally stderr). We do this part here instead of in our bg 366 # thread because the delay can throw off command line prompts or 367 # make tight debugging harder. 368 if self._echofile is not None: 369 # try: 370 # if self._report_blocking_io_on_echo_error: 371 # premsg = ( 372 # 'WARNING: BlockingIOError ON LOG ECHO OUTPUT;' 373 # ' YOU ARE PROBABLY MISSING LOGS\n' 374 # ) 375 # self._report_blocking_io_on_echo_error = False 376 # else: 377 # premsg = '' 378 ends = LEVELNO_COLOR_CODES.get(record.levelno) 379 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 380 if ends is not None: 381 self._echofile.write( 382 f'{namepre}{ends[0]}' 383 f'{msg}{ends[1]}\n' 384 # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n' 385 ) 386 else: 387 self._echofile.write(f'{namepre}{msg}\n') 388 self._echofile.flush() 389 # except BlockingIOError: 390 # # Ran into this when doing a bunch of logging; assuming 391 # # this is asyncio's doing?.. For now trying to survive 392 # # the error but telling the user something is probably 393 # # missing in their output. 394 # self._report_blocking_io_on_echo_error = True 395 396 if __debug__: 397 echotime = time.monotonic() 398 399 self._event_loop.call_soon_threadsafe( 400 partial( 401 self._emit_in_thread, 402 record.name, 403 record.levelno, 404 record.created, 405 msg, 406 labels, 407 ) 408 ) 409 410 if __debug__: 411 # pylint: disable=used-before-assignment 412 # Make noise if we're taking a significant amount of time here. 413 # Limit the noise to once every so often though; otherwise we 414 # could get a feedback loop where every log emit results in a 415 # warning log which results in another, etc. 416 now = time.monotonic() 417 # noinspection PyUnboundLocalVariable 418 duration = now - starttime # pyright: ignore 419 # noinspection PyUnboundLocalVariable 420 format_duration = formattime - starttime # pyright: ignore 421 # noinspection PyUnboundLocalVariable 422 echo_duration = echotime - formattime # pyright: ignore 423 if duration > 0.05 and ( 424 self._last_slow_emit_warning_time is None 425 or now > self._last_slow_emit_warning_time + 10.0 426 ): 427 # Logging calls from *within* a logging handler 428 # sounds sketchy, so let's just kick this over to 429 # the bg event loop thread we've already got. 430 self._last_slow_emit_warning_time = now 431 self._event_loop.call_soon_threadsafe( 432 partial( 433 logging.warning, 434 'efro.log.LogHandler emit took too long' 435 ' (%.2fs total; %.2fs format, %.2fs echo,' 436 ' fast_path=%s).', 437 duration, 438 format_duration, 439 echo_duration, 440 fast_path, 441 ) 442 ) 443 444 def _emit_in_thread( 445 self, 446 name: str, 447 levelno: int, 448 created: float, 449 message: str | logging.LogRecord, 450 labels: dict[str, str], 451 ) -> None: 452 # pylint: disable=too-many-positional-arguments 453 try: 454 # If they passed a raw record here, bake it down to a string. 455 if isinstance(message, logging.LogRecord): 456 message = self.format(message) 457 458 self._emit_entry( 459 LogEntry( 460 name=name, 461 message=message, 462 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 463 time=datetime.datetime.fromtimestamp( 464 created, datetime.timezone.utc 465 ), 466 labels=labels, 467 ) 468 ) 469 except Exception: 470 import traceback 471 472 traceback.print_exc(file=self._echofile) 473 474 def file_write(self, name: str, output: str) -> None: 475 """Send raw stdout/stderr output to the logger to be collated.""" 476 477 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 478 # lines in stack traces get written as lots of individual '^' 479 # writes. It feels a bit dirty to be pushing a deferred call to 480 # another thread for each character. Perhaps should do some sort 481 # of basic accumulation here? 482 self._event_loop.call_soon_threadsafe( 483 partial(self._file_write_in_thread, name, output) 484 ) 485 486 def _file_write_in_thread(self, name: str, output: str) -> None: 487 try: 488 assert name in ('stdout', 'stderr') 489 490 # Here we try to be somewhat smart about breaking arbitrary 491 # print output into discrete log entries. 492 493 self._file_chunks[name].append(output) 494 495 # Individual parts of a print come across as separate writes, 496 # and the end of a print will be a standalone '\n' by default. 497 # Let's use that as a hint that we're likely at the end of 498 # a full print statement and ship what we've got. 499 if output == '\n': 500 self._ship_file_chunks(name, cancel_ship_task=True) 501 else: 502 # By default just keep adding chunks. 503 # However we keep a timer running anytime we've got 504 # unshipped chunks so that we can ship what we've got 505 # after a short bit if we never get a newline. 506 ship_task = self._file_chunk_ship_task[name] 507 if ship_task is None: 508 self._file_chunk_ship_task[name] = ( 509 self._event_loop.create_task( 510 self._ship_chunks_task(name), 511 name='log ship file chunks', 512 ) 513 ) 514 515 except Exception: 516 import traceback 517 518 traceback.print_exc(file=self._echofile) 519 520 def shutdown(self) -> None: 521 """Block until all pending logs/prints are done.""" 522 done = False 523 self.file_flush('stdout') 524 self.file_flush('stderr') 525 526 def _set_done() -> None: 527 nonlocal done 528 done = True 529 530 self._event_loop.call_soon_threadsafe(_set_done) 531 532 starttime = time.monotonic() 533 while not done: 534 if time.monotonic() - starttime > 5.0: 535 print('LogHandler shutdown hung!!!', file=sys.stderr) 536 break 537 time.sleep(0.01) 538 539 def file_flush(self, name: str) -> None: 540 """Send raw stdout/stderr flush to the logger to be collated.""" 541 542 self._event_loop.call_soon_threadsafe( 543 partial(self._file_flush_in_thread, name) 544 ) 545 546 def _file_flush_in_thread(self, name: str) -> None: 547 try: 548 assert name in ('stdout', 'stderr') 549 550 # Immediately ship whatever chunks we've got. 551 if self._file_chunks[name]: 552 self._ship_file_chunks(name, cancel_ship_task=True) 553 554 except Exception: 555 import traceback 556 557 traceback.print_exc(file=self._echofile) 558 559 async def _ship_chunks_task(self, name: str) -> None: 560 # Note: it's important we sleep here for a moment. Otherwise, 561 # things like '^^^^^^^^^^^^' lines in stack traces, which come 562 # through as lots of individual '^' writes, tend to get broken 563 # into lots of tiny little lines by us. 564 await asyncio.sleep(0.01) 565 self._ship_file_chunks(name, cancel_ship_task=False) 566 567 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 568 # Note: Raw print input generally ends in a newline, but that is 569 # redundant when we break things into log entries and results in 570 # extra empty lines. So strip off a single trailing newline if 571 # one is present. 572 text = ''.join(self._file_chunks[name]).removesuffix('\n') 573 574 self._emit_entry( 575 LogEntry( 576 name=name, message=text, level=LogLevel.INFO, time=utc_now() 577 ) 578 ) 579 self._file_chunks[name] = [] 580 ship_task = self._file_chunk_ship_task[name] 581 if cancel_ship_task and ship_task is not None: 582 ship_task.cancel() 583 self._file_chunk_ship_task[name] = None 584 585 def _emit_entry(self, entry: LogEntry) -> None: 586 assert current_thread() is self._thread 587 588 # Store to our cache. 589 if self._cache_size_limit > 0: 590 with self._cache_lock: 591 # Do a rough calc of how many bytes this entry consumes. 592 entry_size = sum( 593 sys.getsizeof(x) 594 for x in ( 595 entry, 596 entry.name, 597 entry.message, 598 entry.level, 599 entry.time, 600 ) 601 ) 602 self._cache.append((entry_size, entry)) 603 self._cache_size += entry_size 604 605 # Prune old until we are back at or under our limit. 606 while self._cache_size > self._cache_size_limit: 607 popped = self._cache.popleft() 608 self._cache_size -= popped[0] 609 self._cache_index_offset += 1 610 611 # Pass to callbacks. 612 for call in self._callbacks: 613 self._run_callback_on_entry(call, entry) 614 615 # Dump to our structured log file. 616 # TODO: should set a timer for flushing; don't flush every line. 617 if self._file is not None: 618 entry_s = dataclass_to_json(entry) 619 assert '\n' not in entry_s # Make sure its a single line. 620 print(entry_s, file=self._file, flush=True) 621 622 def _run_callback_on_entry( 623 self, callback: Callable[[LogEntry], None], entry: LogEntry 624 ) -> None: 625 """Run a callback and handle any errors.""" 626 try: 627 callback(entry) 628 except Exception: 629 # Only print the first callback error to avoid insanity. 630 if not self._printed_callback_error: 631 import traceback 632 633 traceback.print_exc(file=self._echofile) 634 self._printed_callback_error = True
Fancy-pants handler for logging output.
Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.
128 def __init__( 129 self, 130 *, 131 path: str | Path | None, 132 echofile: TextIO | None, 133 suppress_non_root_debug: bool, 134 cache_size_limit: int, 135 cache_time_limit: datetime.timedelta | None, 136 ): 137 super().__init__() 138 # pylint: disable=consider-using-with 139 self._file = None if path is None else open(path, 'w', encoding='utf-8') 140 self._echofile = echofile 141 self._callbacks: list[Callable[[LogEntry], None]] = [] 142 self._suppress_non_root_debug = suppress_non_root_debug 143 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 144 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 145 'stdout': None, 146 'stderr': None, 147 } 148 self._cache_size = 0 149 assert cache_size_limit >= 0 150 self._cache_size_limit = cache_size_limit 151 self._cache_time_limit = cache_time_limit 152 self._cache = deque[tuple[int, LogEntry]]() 153 self._cache_index_offset = 0 154 self._cache_lock = Lock() 155 self._printed_callback_error = False 156 self._thread_bootstrapped = False 157 self._thread = Thread(target=self._log_thread_main, daemon=True) 158 if __debug__: 159 self._last_slow_emit_warning_time: float | None = None 160 self._thread.start() 161 162 # Spin until our thread is up and running; otherwise we could 163 # wind up trying to push stuff to our event loop before the 164 # loop exists. 165 while not self._thread_bootstrapped: 166 time.sleep(0.001)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
168 def add_callback( 169 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 170 ) -> None: 171 """Add a callback to be run for each LogEntry. 172 173 Note that this callback will always run in a background thread. 174 Passing True for feed_existing_logs will cause all cached logs 175 in the handler to be fed to the callback (still in the 176 background thread though). 177 """ 178 179 # Kick this over to our bg thread to add the callback and 180 # process cached entries at the same time to ensure there are no 181 # race conditions that could cause entries to be skipped/etc. 182 self._event_loop.call_soon_threadsafe( 183 partial(self._add_callback_in_thread, call, feed_existing_logs) 184 )
Add a callback to be run for each LogEntry.
Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).
249 def get_cached( 250 self, start_index: int = 0, max_entries: int | None = None 251 ) -> LogArchive: 252 """Build and return an archive of cached log entries. 253 254 This will only include entries that have been processed by the 255 background thread, so may not include just-submitted logs or 256 entries for partially written stdout/stderr lines. 257 Entries from the range [start_index:start_index+max_entries] 258 which are still present in the cache will be returned. 259 """ 260 261 assert start_index >= 0 262 if max_entries is not None: 263 assert max_entries >= 0 264 with self._cache_lock: 265 # Transform start_index to our present cache space. 266 start_index -= self._cache_index_offset 267 # Calc end-index in our present cache space. 268 end_index = ( 269 len(self._cache) 270 if max_entries is None 271 else start_index + max_entries 272 ) 273 274 # Clamp both indexes to both ends of our present space. 275 start_index = max(0, min(start_index, len(self._cache))) 276 end_index = max(0, min(end_index, len(self._cache))) 277 278 return LogArchive( 279 log_size=self._cache_index_offset + len(self._cache), 280 start_index=start_index + self._cache_index_offset, 281 entries=self._cache_slice(start_index, end_index), 282 )
Build and return an archive of cached log entries.
This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.
307 def call_in_thread(self, call: Callable[[], Any]) -> None: 308 """Submit a call to be run in the logging background thread.""" 309 self._event_loop.call_soon_threadsafe(call)
Submit a call to be run in the logging background thread.
311 @override 312 def emit(self, record: logging.LogRecord) -> None: 313 # pylint: disable=too-many-branches 314 if __debug__: 315 starttime = time.monotonic() 316 317 # Called by logging to send us records. 318 319 # TODO - kill this. 320 if ( 321 self._suppress_non_root_debug 322 and record.name != 'root' 323 and record.levelname == 'DEBUG' 324 ): 325 return 326 327 # Optimization: if our log args are all simple immutable values, 328 # we can just kick the whole thing over to our background thread to 329 # be formatted there at our leisure. If anything is mutable and 330 # thus could possibly change between now and then or if we want 331 # to do immediate file echoing then we need to bite the bullet 332 # and do that stuff here at the call site. 333 fast_path = self._echofile is None and self._is_immutable_log_data( 334 record.args 335 ) 336 337 # Note: just assuming types are correct here, but they'll be 338 # checked properly when the resulting LogEntry gets exported. 339 labels: dict[str, str] | None = getattr(record, 'labels', None) 340 if labels is None: 341 labels = {} 342 343 if fast_path: 344 if __debug__: 345 formattime = echotime = time.monotonic() 346 self._event_loop.call_soon_threadsafe( 347 partial( 348 self._emit_in_thread, 349 record.name, 350 record.levelno, 351 record.created, 352 record, 353 labels, 354 ) 355 ) 356 else: 357 # Slow case; do formatting and echoing here at the log call 358 # site. 359 msg = self.format(record) 360 361 if __debug__: 362 formattime = time.monotonic() 363 364 # Also immediately print pretty colored output to our echo file 365 # (generally stderr). We do this part here instead of in our bg 366 # thread because the delay can throw off command line prompts or 367 # make tight debugging harder. 368 if self._echofile is not None: 369 # try: 370 # if self._report_blocking_io_on_echo_error: 371 # premsg = ( 372 # 'WARNING: BlockingIOError ON LOG ECHO OUTPUT;' 373 # ' YOU ARE PROBABLY MISSING LOGS\n' 374 # ) 375 # self._report_blocking_io_on_echo_error = False 376 # else: 377 # premsg = '' 378 ends = LEVELNO_COLOR_CODES.get(record.levelno) 379 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 380 if ends is not None: 381 self._echofile.write( 382 f'{namepre}{ends[0]}' 383 f'{msg}{ends[1]}\n' 384 # f'{namepre}{ends[0]}' f'{premsg}{msg}{ends[1]}\n' 385 ) 386 else: 387 self._echofile.write(f'{namepre}{msg}\n') 388 self._echofile.flush() 389 # except BlockingIOError: 390 # # Ran into this when doing a bunch of logging; assuming 391 # # this is asyncio's doing?.. For now trying to survive 392 # # the error but telling the user something is probably 393 # # missing in their output. 394 # self._report_blocking_io_on_echo_error = True 395 396 if __debug__: 397 echotime = time.monotonic() 398 399 self._event_loop.call_soon_threadsafe( 400 partial( 401 self._emit_in_thread, 402 record.name, 403 record.levelno, 404 record.created, 405 msg, 406 labels, 407 ) 408 ) 409 410 if __debug__: 411 # pylint: disable=used-before-assignment 412 # Make noise if we're taking a significant amount of time here. 413 # Limit the noise to once every so often though; otherwise we 414 # could get a feedback loop where every log emit results in a 415 # warning log which results in another, etc. 416 now = time.monotonic() 417 # noinspection PyUnboundLocalVariable 418 duration = now - starttime # pyright: ignore 419 # noinspection PyUnboundLocalVariable 420 format_duration = formattime - starttime # pyright: ignore 421 # noinspection PyUnboundLocalVariable 422 echo_duration = echotime - formattime # pyright: ignore 423 if duration > 0.05 and ( 424 self._last_slow_emit_warning_time is None 425 or now > self._last_slow_emit_warning_time + 10.0 426 ): 427 # Logging calls from *within* a logging handler 428 # sounds sketchy, so let's just kick this over to 429 # the bg event loop thread we've already got. 430 self._last_slow_emit_warning_time = now 431 self._event_loop.call_soon_threadsafe( 432 partial( 433 logging.warning, 434 'efro.log.LogHandler emit took too long' 435 ' (%.2fs total; %.2fs format, %.2fs echo,' 436 ' fast_path=%s).', 437 duration, 438 format_duration, 439 echo_duration, 440 fast_path, 441 ) 442 )
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
474 def file_write(self, name: str, output: str) -> None: 475 """Send raw stdout/stderr output to the logger to be collated.""" 476 477 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 478 # lines in stack traces get written as lots of individual '^' 479 # writes. It feels a bit dirty to be pushing a deferred call to 480 # another thread for each character. Perhaps should do some sort 481 # of basic accumulation here? 482 self._event_loop.call_soon_threadsafe( 483 partial(self._file_write_in_thread, name, output) 484 )
Send raw stdout/stderr output to the logger to be collated.
520 def shutdown(self) -> None: 521 """Block until all pending logs/prints are done.""" 522 done = False 523 self.file_flush('stdout') 524 self.file_flush('stderr') 525 526 def _set_done() -> None: 527 nonlocal done 528 done = True 529 530 self._event_loop.call_soon_threadsafe(_set_done) 531 532 starttime = time.monotonic() 533 while not done: 534 if time.monotonic() - starttime > 5.0: 535 print('LogHandler shutdown hung!!!', file=sys.stderr) 536 break 537 time.sleep(0.01)
Block until all pending logs/prints are done.
539 def file_flush(self, name: str) -> None: 540 """Send raw stdout/stderr flush to the logger to be collated.""" 541 542 self._event_loop.call_soon_threadsafe( 543 partial(self._file_flush_in_thread, name) 544 )
Send raw stdout/stderr flush to the logger to be collated.
Inherited Members
- logging.Handler
- level
- formatter
- get_name
- set_name
- name
- createLock
- acquire
- release
- setLevel
- format
- handle
- setFormatter
- flush
- close
- handleError
- logging.Filterer
- filters
- addFilter
- removeFilter
- filter
637class FileLogEcho: 638 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 639 640 def __init__( 641 self, original: TextIO, name: str, handler: LogHandler 642 ) -> None: 643 assert name in ('stdout', 'stderr') 644 self._original = original 645 self._name = name 646 self._handler = handler 647 648 # Think this was a result of setting non-blocking stdin somehow; 649 # probably not needed. 650 # self._report_blocking_io_error = False 651 652 def write(self, output: Any) -> None: 653 """Override standard write call.""" 654 # try: 655 # if self._report_blocking_io_error: 656 # self._report_blocking_io_error = False 657 # self._original.write( 658 # 'WARNING: BlockingIOError ENCOUNTERED;' 659 # ' OUTPUT IS PROBABLY MISSING' 660 # ) 661 662 self._original.write(output) 663 # except BlockingIOError: 664 # self._report_blocking_io_error = True 665 self._handler.file_write(self._name, output) 666 667 def flush(self) -> None: 668 """Flush the file.""" 669 self._original.flush() 670 671 # We also use this as a hint to ship whatever file chunks 672 # we've accumulated (we have to try and be smart about breaking 673 # our arbitrary file output into discrete entries). 674 self._handler.file_flush(self._name) 675 676 def isatty(self) -> bool: 677 """Are we a terminal?""" 678 return self._original.isatty()
A file-like object for forwarding stdout/stderr to a LogHandler.
640 def __init__( 641 self, original: TextIO, name: str, handler: LogHandler 642 ) -> None: 643 assert name in ('stdout', 'stderr') 644 self._original = original 645 self._name = name 646 self._handler = handler 647 648 # Think this was a result of setting non-blocking stdin somehow; 649 # probably not needed. 650 # self._report_blocking_io_error = False
652 def write(self, output: Any) -> None: 653 """Override standard write call.""" 654 # try: 655 # if self._report_blocking_io_error: 656 # self._report_blocking_io_error = False 657 # self._original.write( 658 # 'WARNING: BlockingIOError ENCOUNTERED;' 659 # ' OUTPUT IS PROBABLY MISSING' 660 # ) 661 662 self._original.write(output) 663 # except BlockingIOError: 664 # self._report_blocking_io_error = True 665 self._handler.file_write(self._name, output)
Override standard write call.
667 def flush(self) -> None: 668 """Flush the file.""" 669 self._original.flush() 670 671 # We also use this as a hint to ship whatever file chunks 672 # we've accumulated (we have to try and be smart about breaking 673 # our arbitrary file output into discrete entries). 674 self._handler.file_flush(self._name)
Flush the file.
681def setup_logging( 682 log_path: str | Path | None, 683 level: LogLevel, 684 *, 685 suppress_non_root_debug: bool = False, 686 log_stdout_stderr: bool = False, 687 echo_to_stderr: bool = True, 688 cache_size_limit: int = 0, 689 cache_time_limit: datetime.timedelta | None = None, 690) -> LogHandler: 691 """Set up our logging environment. 692 693 Returns the custom handler which can be used to fetch information 694 about logs that have passed through it. (worst log-levels, caches, etc.). 695 """ 696 697 lmap = { 698 LogLevel.DEBUG: logging.DEBUG, 699 LogLevel.INFO: logging.INFO, 700 LogLevel.WARNING: logging.WARNING, 701 LogLevel.ERROR: logging.ERROR, 702 LogLevel.CRITICAL: logging.CRITICAL, 703 } 704 705 # Wire logger output to go to a structured log file. 706 # Also echo it to stderr IF we're running in a terminal. 707 # UPDATE: Actually gonna always go to stderr. Is there a 708 # reason we shouldn't? This makes debugging possible if all 709 # we have is access to a non-interactive terminal or file dump. 710 # We could add a '--quiet' arg or whatnot to change this behavior. 711 712 # Note: by passing in the *original* stderr here before we 713 # (potentially) replace it, we ensure that our log echos 714 # won't themselves be intercepted and sent to the logger 715 # which would create an infinite loop. 716 loghandler = LogHandler( 717 path=log_path, 718 echofile=sys.stderr if echo_to_stderr else None, 719 suppress_non_root_debug=suppress_non_root_debug, 720 cache_size_limit=cache_size_limit, 721 cache_time_limit=cache_time_limit, 722 ) 723 724 # Note: going ahead with force=True here so that we replace any 725 # existing logger. Though we warn if it looks like we are doing 726 # that so we can try to avoid creating the first one. 727 had_previous_handlers = bool(logging.root.handlers) 728 logging.basicConfig( 729 level=lmap[level], 730 # format='%(name)s: %(message)s', 731 # We dump *only* the message here. We pass various log record bits 732 # around and format things fancier where they end up. 733 format='%(message)s', 734 handlers=[loghandler], 735 force=True, 736 ) 737 if had_previous_handlers: 738 logging.warning( 739 'setup_logging: Replacing existing handlers.' 740 ' Something may have logged before expected.' 741 ) 742 743 # Optionally intercept Python's stdout/stderr output and generate 744 # log entries from it. 745 if log_stdout_stderr: 746 sys.stdout = FileLogEcho(sys.stdout, 'stdout', loghandler) 747 sys.stderr = FileLogEcho(sys.stderr, 'stderr', loghandler) 748 749 return loghandler
Set up our logging environment.
Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).