efro.log
Logging functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Logging functionality.""" 4from __future__ import annotations 5 6import sys 7import time 8import asyncio 9import logging 10import datetime 11import itertools 12from enum import Enum 13from collections import deque 14from dataclasses import dataclass, field 15from typing import TYPE_CHECKING, Annotated, override 16from threading import Thread, current_thread, Lock 17 18from efro.util import utc_now 19from efro.call import tpartial 20from efro.terminal import Clr 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json 22 23if TYPE_CHECKING: 24 from pathlib import Path 25 from typing import Any, Callable, TextIO 26 27 28class LogLevel(Enum): 29 """Severity level for a log entry. 30 31 These enums have numeric values so they can be compared in severity. 32 Note that these values are not currently interchangeable with the 33 logging.ERROR, logging.DEBUG, etc. values. 34 """ 35 36 DEBUG = 0 37 INFO = 1 38 WARNING = 2 39 ERROR = 3 40 CRITICAL = 4 41 42 @property 43 def python_logging_level(self) -> int: 44 """Give the corresponding logging level.""" 45 return LOG_LEVEL_LEVELNOS[self] 46 47 @classmethod 48 def from_python_logging_level(cls, levelno: int) -> LogLevel: 49 """Given a Python logging level, return a LogLevel.""" 50 return LEVELNO_LOG_LEVELS[levelno] 51 52 53# Python logging levels from LogLevels 54LOG_LEVEL_LEVELNOS = { 55 LogLevel.DEBUG: logging.DEBUG, 56 LogLevel.INFO: logging.INFO, 57 LogLevel.WARNING: logging.WARNING, 58 LogLevel.ERROR: logging.ERROR, 59 LogLevel.CRITICAL: logging.CRITICAL, 60} 61 62# LogLevels from Python logging levels 63LEVELNO_LOG_LEVELS = { 64 logging.DEBUG: LogLevel.DEBUG, 65 logging.INFO: LogLevel.INFO, 66 logging.WARNING: LogLevel.WARNING, 67 logging.ERROR: LogLevel.ERROR, 68 logging.CRITICAL: LogLevel.CRITICAL, 69} 70 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = { 72 logging.DEBUG: (Clr.CYN, Clr.RST), 73 logging.INFO: ('', ''), 74 logging.WARNING: (Clr.YLW, Clr.RST), 75 logging.ERROR: (Clr.RED, Clr.RST), 76 logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST), 77} 78 79 80@ioprepped 81@dataclass 82class LogEntry: 83 """Single logged message.""" 84 85 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 86 message: Annotated[str, IOAttrs('m')] 87 level: Annotated[LogLevel, IOAttrs('l')] 88 time: Annotated[datetime.datetime, IOAttrs('t')] 89 90 # We support arbitrary string labels per log entry which can be 91 # incorporated into custom log processing. To populate this, our 92 # LogHandler class looks for a 'labels' dict passed in the optional 93 # 'extra' dict arg to standard Python log calls. 94 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 95 field(default_factory=dict) 96 ) 97 98 99@ioprepped 100@dataclass 101class LogArchive: 102 """Info and data for a log.""" 103 104 # Total number of entries submitted to the log. 105 log_size: Annotated[int, IOAttrs('t')] 106 107 # Offset for the entries contained here. 108 # (10 means our first entry is the 10th in the log, etc.) 109 start_index: Annotated[int, IOAttrs('c')] 110 111 entries: Annotated[list[LogEntry], IOAttrs('e')] 112 113 114class LogHandler(logging.Handler): 115 """Fancy-pants handler for logging output. 116 117 Writes logs to disk in structured json format and echoes them 118 to stdout/stderr with pretty colors. 119 """ 120 121 _event_loop: asyncio.AbstractEventLoop 122 123 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 124 # Otherwise we can get infinite loops as those prints come back to us 125 # as new log entries. 126 127 def __init__( 128 self, 129 path: str | Path | None, 130 echofile: TextIO | None, 131 suppress_non_root_debug: bool, 132 cache_size_limit: int, 133 cache_time_limit: datetime.timedelta | None, 134 ): 135 super().__init__() 136 # pylint: disable=consider-using-with 137 self._file = None if path is None else open(path, 'w', encoding='utf-8') 138 self._echofile = echofile 139 self._callbacks: list[Callable[[LogEntry], None]] = [] 140 self._suppress_non_root_debug = suppress_non_root_debug 141 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 142 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 143 'stdout': None, 144 'stderr': None, 145 } 146 self._cache_size = 0 147 assert cache_size_limit >= 0 148 self._cache_size_limit = cache_size_limit 149 self._cache_time_limit = cache_time_limit 150 self._cache = deque[tuple[int, LogEntry]]() 151 self._cache_index_offset = 0 152 self._cache_lock = Lock() 153 self._printed_callback_error = False 154 self._thread_bootstrapped = False 155 self._thread = Thread(target=self._log_thread_main, daemon=True) 156 if __debug__: 157 self._last_slow_emit_warning_time: float | None = None 158 self._thread.start() 159 160 # Spin until our thread is up and running; otherwise we could 161 # wind up trying to push stuff to our event loop before the 162 # loop exists. 163 while not self._thread_bootstrapped: 164 time.sleep(0.001) 165 166 def add_callback( 167 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 168 ) -> None: 169 """Add a callback to be run for each LogEntry. 170 171 Note that this callback will always run in a background thread. 172 Passing True for feed_existing_logs will cause all cached logs 173 in the handler to be fed to the callback (still in the 174 background thread though). 175 """ 176 177 # Kick this over to our bg thread to add the callback and 178 # process cached entries at the same time to ensure there are no 179 # race conditions that could cause entries to be skipped/etc. 180 self._event_loop.call_soon_threadsafe( 181 tpartial(self._add_callback_in_thread, call, feed_existing_logs) 182 ) 183 184 def _add_callback_in_thread( 185 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 186 ) -> None: 187 """Add a callback to be run for each LogEntry. 188 189 Note that this callback will always run in a background thread. 190 Passing True for feed_existing_logs will cause all cached logs 191 in the handler to be fed to the callback (still in the 192 background thread though). 193 """ 194 assert current_thread() is self._thread 195 self._callbacks.append(call) 196 197 # Run all of our cached entries through the new callback if desired. 198 if feed_existing_logs and self._cache_size_limit > 0: 199 with self._cache_lock: 200 for _id, entry in self._cache: 201 self._run_callback_on_entry(call, entry) 202 203 def _log_thread_main(self) -> None: 204 self._event_loop = asyncio.new_event_loop() 205 206 # In our background thread event loop we do a fair amount of 207 # slow synchronous stuff such as mucking with the log cache. 208 # Let's avoid getting tons of warnings about this in debug mode. 209 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 210 211 # NOTE: if we ever use default threadpool at all we should allow 212 # setting it for our loop. 213 asyncio.set_event_loop(self._event_loop) 214 self._thread_bootstrapped = True 215 try: 216 if self._cache_time_limit is not None: 217 _prunetask = self._event_loop.create_task( 218 self._time_prune_cache() 219 ) 220 self._event_loop.run_forever() 221 except BaseException: 222 # If this ever goes down we're in trouble; we won't be able 223 # to log about it though. Try to make some noise however we 224 # can. 225 print('LogHandler died!!!', file=sys.stderr) 226 import traceback 227 228 traceback.print_exc() 229 raise 230 231 async def _time_prune_cache(self) -> None: 232 assert self._cache_time_limit is not None 233 while bool(True): 234 await asyncio.sleep(61.27) 235 now = utc_now() 236 with self._cache_lock: 237 # Prune the oldest entry as long as there is a first one that 238 # is too old. 239 while ( 240 self._cache 241 and (now - self._cache[0][1].time) >= self._cache_time_limit 242 ): 243 popped = self._cache.popleft() 244 self._cache_size -= popped[0] 245 self._cache_index_offset += 1 246 247 def get_cached( 248 self, start_index: int = 0, max_entries: int | None = None 249 ) -> LogArchive: 250 """Build and return an archive of cached log entries. 251 252 This will only include entries that have been processed by the 253 background thread, so may not include just-submitted logs or 254 entries for partially written stdout/stderr lines. 255 Entries from the range [start_index:start_index+max_entries] 256 which are still present in the cache will be returned. 257 """ 258 259 assert start_index >= 0 260 if max_entries is not None: 261 assert max_entries >= 0 262 with self._cache_lock: 263 # Transform start_index to our present cache space. 264 start_index -= self._cache_index_offset 265 # Calc end-index in our present cache space. 266 end_index = ( 267 len(self._cache) 268 if max_entries is None 269 else start_index + max_entries 270 ) 271 272 # Clamp both indexes to both ends of our present space. 273 start_index = max(0, min(start_index, len(self._cache))) 274 end_index = max(0, min(end_index, len(self._cache))) 275 276 return LogArchive( 277 log_size=self._cache_index_offset + len(self._cache), 278 start_index=start_index + self._cache_index_offset, 279 entries=self._cache_slice(start_index, end_index), 280 ) 281 282 def _cache_slice( 283 self, start: int, end: int, step: int = 1 284 ) -> list[LogEntry]: 285 # Deque doesn't natively support slicing but we can do it manually. 286 # It sounds like rotating the deque and pulling from the beginning 287 # is the most efficient way to do this. The downside is the deque 288 # gets temporarily modified in the process so we need to make sure 289 # we're holding the lock. 290 assert self._cache_lock.locked() 291 cache = self._cache 292 cache.rotate(-start) 293 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 294 cache.rotate(start) 295 return slc 296 297 @classmethod 298 def _is_immutable_log_data(cls, data: Any) -> bool: 299 if isinstance(data, (str, bool, int, float, bytes)): 300 return True 301 if isinstance(data, tuple): 302 return all(cls._is_immutable_log_data(x) for x in data) 303 return False 304 305 def call_in_thread(self, call: Callable[[], Any]) -> None: 306 """Submit a call to be run in the logging background thread.""" 307 self._event_loop.call_soon_threadsafe(call) 308 309 @override 310 def emit(self, record: logging.LogRecord) -> None: 311 # pylint: disable=too-many-branches 312 if __debug__: 313 starttime = time.monotonic() 314 315 # Called by logging to send us records. 316 317 # TODO - kill this. 318 if ( 319 self._suppress_non_root_debug 320 and record.name != 'root' 321 and record.levelname == 'DEBUG' 322 ): 323 return 324 325 # Optimization: if our log args are all simple immutable values, 326 # we can just kick the whole thing over to our background thread to 327 # be formatted there at our leisure. If anything is mutable and 328 # thus could possibly change between now and then or if we want 329 # to do immediate file echoing then we need to bite the bullet 330 # and do that stuff here at the call site. 331 fast_path = self._echofile is None and self._is_immutable_log_data( 332 record.args 333 ) 334 335 # Note: just assuming types are correct here, but they'll be 336 # checked properly when the resulting LogEntry gets exported. 337 labels: dict[str, str] | None = getattr(record, 'labels', None) 338 if labels is None: 339 labels = {} 340 341 if fast_path: 342 if __debug__: 343 formattime = echotime = time.monotonic() 344 self._event_loop.call_soon_threadsafe( 345 tpartial( 346 self._emit_in_thread, 347 record.name, 348 record.levelno, 349 record.created, 350 record, 351 labels, 352 ) 353 ) 354 else: 355 # Slow case; do formatting and echoing here at the log call 356 # site. 357 msg = self.format(record) 358 359 if __debug__: 360 formattime = time.monotonic() 361 362 # Also immediately print pretty colored output to our echo file 363 # (generally stderr). We do this part here instead of in our bg 364 # thread because the delay can throw off command line prompts or 365 # make tight debugging harder. 366 if self._echofile is not None: 367 ends = LEVELNO_COLOR_CODES.get(record.levelno) 368 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 369 if ends is not None: 370 self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n') 371 else: 372 self._echofile.write(f'{namepre}{msg}\n') 373 self._echofile.flush() 374 375 if __debug__: 376 echotime = time.monotonic() 377 378 self._event_loop.call_soon_threadsafe( 379 tpartial( 380 self._emit_in_thread, 381 record.name, 382 record.levelno, 383 record.created, 384 msg, 385 labels, 386 ) 387 ) 388 389 if __debug__: 390 # Make noise if we're taking a significant amount of time here. 391 # Limit the noise to once every so often though; otherwise we 392 # could get a feedback loop where every log emit results in a 393 # warning log which results in another, etc. 394 now = time.monotonic() 395 # noinspection PyUnboundLocalVariable 396 duration = now - starttime # pyright: ignore 397 # noinspection PyUnboundLocalVariable 398 format_duration = formattime - starttime # pyright: ignore 399 # noinspection PyUnboundLocalVariable 400 echo_duration = echotime - formattime # pyright: ignore 401 if duration > 0.05 and ( 402 self._last_slow_emit_warning_time is None 403 or now > self._last_slow_emit_warning_time + 10.0 404 ): 405 # Logging calls from *within* a logging handler 406 # sounds sketchy, so let's just kick this over to 407 # the bg event loop thread we've already got. 408 self._last_slow_emit_warning_time = now 409 self._event_loop.call_soon_threadsafe( 410 tpartial( 411 logging.warning, 412 'efro.log.LogHandler emit took too long' 413 ' (%.2fs total; %.2fs format, %.2fs echo,' 414 ' fast_path=%s).', 415 duration, 416 format_duration, 417 echo_duration, 418 fast_path, 419 ) 420 ) 421 422 def _emit_in_thread( 423 self, 424 name: str, 425 levelno: int, 426 created: float, 427 message: str | logging.LogRecord, 428 labels: dict[str, str], 429 ) -> None: 430 try: 431 # If they passed a raw record here, bake it down to a string. 432 if isinstance(message, logging.LogRecord): 433 message = self.format(message) 434 435 self._emit_entry( 436 LogEntry( 437 name=name, 438 message=message, 439 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 440 time=datetime.datetime.fromtimestamp( 441 created, datetime.timezone.utc 442 ), 443 labels=labels, 444 ) 445 ) 446 except Exception: 447 import traceback 448 449 traceback.print_exc(file=self._echofile) 450 451 def file_write(self, name: str, output: str) -> None: 452 """Send raw stdout/stderr output to the logger to be collated.""" 453 454 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 455 # lines in stack traces get written as lots of individual '^' 456 # writes. It feels a bit dirty to be pushing a deferred call to 457 # another thread for each character. Perhaps should do some sort 458 # of basic accumulation here? 459 self._event_loop.call_soon_threadsafe( 460 tpartial(self._file_write_in_thread, name, output) 461 ) 462 463 def _file_write_in_thread(self, name: str, output: str) -> None: 464 try: 465 assert name in ('stdout', 'stderr') 466 467 # Here we try to be somewhat smart about breaking arbitrary 468 # print output into discrete log entries. 469 470 self._file_chunks[name].append(output) 471 472 # Individual parts of a print come across as separate writes, 473 # and the end of a print will be a standalone '\n' by default. 474 # Let's use that as a hint that we're likely at the end of 475 # a full print statement and ship what we've got. 476 if output == '\n': 477 self._ship_file_chunks(name, cancel_ship_task=True) 478 else: 479 # By default just keep adding chunks. 480 # However we keep a timer running anytime we've got 481 # unshipped chunks so that we can ship what we've got 482 # after a short bit if we never get a newline. 483 ship_task = self._file_chunk_ship_task[name] 484 if ship_task is None: 485 self._file_chunk_ship_task[name] = ( 486 self._event_loop.create_task( 487 self._ship_chunks_task(name), 488 name='log ship file chunks', 489 ) 490 ) 491 492 except Exception: 493 import traceback 494 495 traceback.print_exc(file=self._echofile) 496 497 def file_flush(self, name: str) -> None: 498 """Send raw stdout/stderr flush to the logger to be collated.""" 499 500 self._event_loop.call_soon_threadsafe( 501 tpartial(self._file_flush_in_thread, name) 502 ) 503 504 def _file_flush_in_thread(self, name: str) -> None: 505 try: 506 assert name in ('stdout', 'stderr') 507 508 # Immediately ship whatever chunks we've got. 509 if self._file_chunks[name]: 510 self._ship_file_chunks(name, cancel_ship_task=True) 511 512 except Exception: 513 import traceback 514 515 traceback.print_exc(file=self._echofile) 516 517 async def _ship_chunks_task(self, name: str) -> None: 518 # Note: it's important we sleep here for a moment. Otherwise, 519 # things like '^^^^^^^^^^^^' lines in stack traces, which come 520 # through as lots of individual '^' writes, tend to get broken 521 # into lots of tiny little lines by us. 522 await asyncio.sleep(0.01) 523 self._ship_file_chunks(name, cancel_ship_task=False) 524 525 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 526 # Note: Raw print input generally ends in a newline, but that is 527 # redundant when we break things into log entries and results in 528 # extra empty lines. So strip off a single trailing newline if 529 # one is present. 530 text = ''.join(self._file_chunks[name]).removesuffix('\n') 531 532 self._emit_entry( 533 LogEntry( 534 name=name, message=text, level=LogLevel.INFO, time=utc_now() 535 ) 536 ) 537 self._file_chunks[name] = [] 538 ship_task = self._file_chunk_ship_task[name] 539 if cancel_ship_task and ship_task is not None: 540 ship_task.cancel() 541 self._file_chunk_ship_task[name] = None 542 543 def _emit_entry(self, entry: LogEntry) -> None: 544 assert current_thread() is self._thread 545 546 # Store to our cache. 547 if self._cache_size_limit > 0: 548 with self._cache_lock: 549 # Do a rough calc of how many bytes this entry consumes. 550 entry_size = sum( 551 sys.getsizeof(x) 552 for x in ( 553 entry, 554 entry.name, 555 entry.message, 556 entry.level, 557 entry.time, 558 ) 559 ) 560 self._cache.append((entry_size, entry)) 561 self._cache_size += entry_size 562 563 # Prune old until we are back at or under our limit. 564 while self._cache_size > self._cache_size_limit: 565 popped = self._cache.popleft() 566 self._cache_size -= popped[0] 567 self._cache_index_offset += 1 568 569 # Pass to callbacks. 570 for call in self._callbacks: 571 self._run_callback_on_entry(call, entry) 572 573 # Dump to our structured log file. 574 # TODO: should set a timer for flushing; don't flush every line. 575 if self._file is not None: 576 entry_s = dataclass_to_json(entry) 577 assert '\n' not in entry_s # Make sure its a single line. 578 print(entry_s, file=self._file, flush=True) 579 580 def _run_callback_on_entry( 581 self, callback: Callable[[LogEntry], None], entry: LogEntry 582 ) -> None: 583 """Run a callback and handle any errors.""" 584 try: 585 callback(entry) 586 except Exception: 587 # Only print the first callback error to avoid insanity. 588 if not self._printed_callback_error: 589 import traceback 590 591 traceback.print_exc(file=self._echofile) 592 self._printed_callback_error = True 593 594 595class FileLogEcho: 596 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 597 598 def __init__( 599 self, original: TextIO, name: str, handler: LogHandler 600 ) -> None: 601 assert name in ('stdout', 'stderr') 602 self._original = original 603 self._name = name 604 self._handler = handler 605 606 def write(self, output: Any) -> None: 607 """Override standard write call.""" 608 self._original.write(output) 609 self._handler.file_write(self._name, output) 610 611 def flush(self) -> None: 612 """Flush the file.""" 613 self._original.flush() 614 615 # We also use this as a hint to ship whatever file chunks 616 # we've accumulated (we have to try and be smart about breaking 617 # our arbitrary file output into discrete entries). 618 self._handler.file_flush(self._name) 619 620 def isatty(self) -> bool: 621 """Are we a terminal?""" 622 return self._original.isatty() 623 624 625def setup_logging( 626 log_path: str | Path | None, 627 level: LogLevel, 628 suppress_non_root_debug: bool = False, 629 log_stdout_stderr: bool = False, 630 echo_to_stderr: bool = True, 631 cache_size_limit: int = 0, 632 cache_time_limit: datetime.timedelta | None = None, 633) -> LogHandler: 634 """Set up our logging environment. 635 636 Returns the custom handler which can be used to fetch information 637 about logs that have passed through it. (worst log-levels, caches, etc.). 638 """ 639 640 lmap = { 641 LogLevel.DEBUG: logging.DEBUG, 642 LogLevel.INFO: logging.INFO, 643 LogLevel.WARNING: logging.WARNING, 644 LogLevel.ERROR: logging.ERROR, 645 LogLevel.CRITICAL: logging.CRITICAL, 646 } 647 648 # Wire logger output to go to a structured log file. 649 # Also echo it to stderr IF we're running in a terminal. 650 # UPDATE: Actually gonna always go to stderr. Is there a 651 # reason we shouldn't? This makes debugging possible if all 652 # we have is access to a non-interactive terminal or file dump. 653 # We could add a '--quiet' arg or whatnot to change this behavior. 654 655 # Note: by passing in the *original* stderr here before we 656 # (potentially) replace it, we ensure that our log echos 657 # won't themselves be intercepted and sent to the logger 658 # which would create an infinite loop. 659 loghandler = LogHandler( 660 log_path, 661 echofile=sys.stderr if echo_to_stderr else None, 662 suppress_non_root_debug=suppress_non_root_debug, 663 cache_size_limit=cache_size_limit, 664 cache_time_limit=cache_time_limit, 665 ) 666 667 # Note: going ahead with force=True here so that we replace any 668 # existing logger. Though we warn if it looks like we are doing 669 # that so we can try to avoid creating the first one. 670 had_previous_handlers = bool(logging.root.handlers) 671 logging.basicConfig( 672 level=lmap[level], 673 # format='%(name)s: %(message)s', 674 # We dump *only* the message here. We pass various log record bits 675 # around and format things fancier where they end up. 676 format='%(message)s', 677 handlers=[loghandler], 678 force=True, 679 ) 680 if had_previous_handlers: 681 logging.warning( 682 'setup_logging: Replacing existing handlers.' 683 ' Something may have logged before expected.' 684 ) 685 686 # Optionally intercept Python's stdout/stderr output and generate 687 # log entries from it. 688 if log_stdout_stderr: 689 sys.stdout = FileLogEcho( # type: ignore 690 sys.stdout, 'stdout', loghandler 691 ) 692 sys.stderr = FileLogEcho( # type: ignore 693 sys.stderr, 'stderr', loghandler 694 ) 695 696 return loghandler
29class LogLevel(Enum): 30 """Severity level for a log entry. 31 32 These enums have numeric values so they can be compared in severity. 33 Note that these values are not currently interchangeable with the 34 logging.ERROR, logging.DEBUG, etc. values. 35 """ 36 37 DEBUG = 0 38 INFO = 1 39 WARNING = 2 40 ERROR = 3 41 CRITICAL = 4 42 43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self] 47 48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Severity level for a log entry.
These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.
43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self]
Give the corresponding logging level.
48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Given a Python logging level, return a LogLevel.
Inherited Members
- enum.Enum
- name
- value
81@ioprepped 82@dataclass 83class LogEntry: 84 """Single logged message.""" 85 86 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 87 message: Annotated[str, IOAttrs('m')] 88 level: Annotated[LogLevel, IOAttrs('l')] 89 time: Annotated[datetime.datetime, IOAttrs('t')] 90 91 # We support arbitrary string labels per log entry which can be 92 # incorporated into custom log processing. To populate this, our 93 # LogHandler class looks for a 'labels' dict passed in the optional 94 # 'extra' dict arg to standard Python log calls. 95 labels: Annotated[dict[str, str], IOAttrs('la', store_default=False)] = ( 96 field(default_factory=dict) 97 )
Single logged message.
100@ioprepped 101@dataclass 102class LogArchive: 103 """Info and data for a log.""" 104 105 # Total number of entries submitted to the log. 106 log_size: Annotated[int, IOAttrs('t')] 107 108 # Offset for the entries contained here. 109 # (10 means our first entry is the 10th in the log, etc.) 110 start_index: Annotated[int, IOAttrs('c')] 111 112 entries: Annotated[list[LogEntry], IOAttrs('e')]
Info and data for a log.
115class LogHandler(logging.Handler): 116 """Fancy-pants handler for logging output. 117 118 Writes logs to disk in structured json format and echoes them 119 to stdout/stderr with pretty colors. 120 """ 121 122 _event_loop: asyncio.AbstractEventLoop 123 124 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 125 # Otherwise we can get infinite loops as those prints come back to us 126 # as new log entries. 127 128 def __init__( 129 self, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001) 166 167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 tpartial(self._add_callback_in_thread, call, feed_existing_logs) 183 ) 184 185 def _add_callback_in_thread( 186 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 187 ) -> None: 188 """Add a callback to be run for each LogEntry. 189 190 Note that this callback will always run in a background thread. 191 Passing True for feed_existing_logs will cause all cached logs 192 in the handler to be fed to the callback (still in the 193 background thread though). 194 """ 195 assert current_thread() is self._thread 196 self._callbacks.append(call) 197 198 # Run all of our cached entries through the new callback if desired. 199 if feed_existing_logs and self._cache_size_limit > 0: 200 with self._cache_lock: 201 for _id, entry in self._cache: 202 self._run_callback_on_entry(call, entry) 203 204 def _log_thread_main(self) -> None: 205 self._event_loop = asyncio.new_event_loop() 206 207 # In our background thread event loop we do a fair amount of 208 # slow synchronous stuff such as mucking with the log cache. 209 # Let's avoid getting tons of warnings about this in debug mode. 210 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 211 212 # NOTE: if we ever use default threadpool at all we should allow 213 # setting it for our loop. 214 asyncio.set_event_loop(self._event_loop) 215 self._thread_bootstrapped = True 216 try: 217 if self._cache_time_limit is not None: 218 _prunetask = self._event_loop.create_task( 219 self._time_prune_cache() 220 ) 221 self._event_loop.run_forever() 222 except BaseException: 223 # If this ever goes down we're in trouble; we won't be able 224 # to log about it though. Try to make some noise however we 225 # can. 226 print('LogHandler died!!!', file=sys.stderr) 227 import traceback 228 229 traceback.print_exc() 230 raise 231 232 async def _time_prune_cache(self) -> None: 233 assert self._cache_time_limit is not None 234 while bool(True): 235 await asyncio.sleep(61.27) 236 now = utc_now() 237 with self._cache_lock: 238 # Prune the oldest entry as long as there is a first one that 239 # is too old. 240 while ( 241 self._cache 242 and (now - self._cache[0][1].time) >= self._cache_time_limit 243 ): 244 popped = self._cache.popleft() 245 self._cache_size -= popped[0] 246 self._cache_index_offset += 1 247 248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 ) 282 283 def _cache_slice( 284 self, start: int, end: int, step: int = 1 285 ) -> list[LogEntry]: 286 # Deque doesn't natively support slicing but we can do it manually. 287 # It sounds like rotating the deque and pulling from the beginning 288 # is the most efficient way to do this. The downside is the deque 289 # gets temporarily modified in the process so we need to make sure 290 # we're holding the lock. 291 assert self._cache_lock.locked() 292 cache = self._cache 293 cache.rotate(-start) 294 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 295 cache.rotate(start) 296 return slc 297 298 @classmethod 299 def _is_immutable_log_data(cls, data: Any) -> bool: 300 if isinstance(data, (str, bool, int, float, bytes)): 301 return True 302 if isinstance(data, tuple): 303 return all(cls._is_immutable_log_data(x) for x in data) 304 return False 305 306 def call_in_thread(self, call: Callable[[], Any]) -> None: 307 """Submit a call to be run in the logging background thread.""" 308 self._event_loop.call_soon_threadsafe(call) 309 310 @override 311 def emit(self, record: logging.LogRecord) -> None: 312 # pylint: disable=too-many-branches 313 if __debug__: 314 starttime = time.monotonic() 315 316 # Called by logging to send us records. 317 318 # TODO - kill this. 319 if ( 320 self._suppress_non_root_debug 321 and record.name != 'root' 322 and record.levelname == 'DEBUG' 323 ): 324 return 325 326 # Optimization: if our log args are all simple immutable values, 327 # we can just kick the whole thing over to our background thread to 328 # be formatted there at our leisure. If anything is mutable and 329 # thus could possibly change between now and then or if we want 330 # to do immediate file echoing then we need to bite the bullet 331 # and do that stuff here at the call site. 332 fast_path = self._echofile is None and self._is_immutable_log_data( 333 record.args 334 ) 335 336 # Note: just assuming types are correct here, but they'll be 337 # checked properly when the resulting LogEntry gets exported. 338 labels: dict[str, str] | None = getattr(record, 'labels', None) 339 if labels is None: 340 labels = {} 341 342 if fast_path: 343 if __debug__: 344 formattime = echotime = time.monotonic() 345 self._event_loop.call_soon_threadsafe( 346 tpartial( 347 self._emit_in_thread, 348 record.name, 349 record.levelno, 350 record.created, 351 record, 352 labels, 353 ) 354 ) 355 else: 356 # Slow case; do formatting and echoing here at the log call 357 # site. 358 msg = self.format(record) 359 360 if __debug__: 361 formattime = time.monotonic() 362 363 # Also immediately print pretty colored output to our echo file 364 # (generally stderr). We do this part here instead of in our bg 365 # thread because the delay can throw off command line prompts or 366 # make tight debugging harder. 367 if self._echofile is not None: 368 ends = LEVELNO_COLOR_CODES.get(record.levelno) 369 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 370 if ends is not None: 371 self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n') 372 else: 373 self._echofile.write(f'{namepre}{msg}\n') 374 self._echofile.flush() 375 376 if __debug__: 377 echotime = time.monotonic() 378 379 self._event_loop.call_soon_threadsafe( 380 tpartial( 381 self._emit_in_thread, 382 record.name, 383 record.levelno, 384 record.created, 385 msg, 386 labels, 387 ) 388 ) 389 390 if __debug__: 391 # Make noise if we're taking a significant amount of time here. 392 # Limit the noise to once every so often though; otherwise we 393 # could get a feedback loop where every log emit results in a 394 # warning log which results in another, etc. 395 now = time.monotonic() 396 # noinspection PyUnboundLocalVariable 397 duration = now - starttime # pyright: ignore 398 # noinspection PyUnboundLocalVariable 399 format_duration = formattime - starttime # pyright: ignore 400 # noinspection PyUnboundLocalVariable 401 echo_duration = echotime - formattime # pyright: ignore 402 if duration > 0.05 and ( 403 self._last_slow_emit_warning_time is None 404 or now > self._last_slow_emit_warning_time + 10.0 405 ): 406 # Logging calls from *within* a logging handler 407 # sounds sketchy, so let's just kick this over to 408 # the bg event loop thread we've already got. 409 self._last_slow_emit_warning_time = now 410 self._event_loop.call_soon_threadsafe( 411 tpartial( 412 logging.warning, 413 'efro.log.LogHandler emit took too long' 414 ' (%.2fs total; %.2fs format, %.2fs echo,' 415 ' fast_path=%s).', 416 duration, 417 format_duration, 418 echo_duration, 419 fast_path, 420 ) 421 ) 422 423 def _emit_in_thread( 424 self, 425 name: str, 426 levelno: int, 427 created: float, 428 message: str | logging.LogRecord, 429 labels: dict[str, str], 430 ) -> None: 431 try: 432 # If they passed a raw record here, bake it down to a string. 433 if isinstance(message, logging.LogRecord): 434 message = self.format(message) 435 436 self._emit_entry( 437 LogEntry( 438 name=name, 439 message=message, 440 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 441 time=datetime.datetime.fromtimestamp( 442 created, datetime.timezone.utc 443 ), 444 labels=labels, 445 ) 446 ) 447 except Exception: 448 import traceback 449 450 traceback.print_exc(file=self._echofile) 451 452 def file_write(self, name: str, output: str) -> None: 453 """Send raw stdout/stderr output to the logger to be collated.""" 454 455 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 456 # lines in stack traces get written as lots of individual '^' 457 # writes. It feels a bit dirty to be pushing a deferred call to 458 # another thread for each character. Perhaps should do some sort 459 # of basic accumulation here? 460 self._event_loop.call_soon_threadsafe( 461 tpartial(self._file_write_in_thread, name, output) 462 ) 463 464 def _file_write_in_thread(self, name: str, output: str) -> None: 465 try: 466 assert name in ('stdout', 'stderr') 467 468 # Here we try to be somewhat smart about breaking arbitrary 469 # print output into discrete log entries. 470 471 self._file_chunks[name].append(output) 472 473 # Individual parts of a print come across as separate writes, 474 # and the end of a print will be a standalone '\n' by default. 475 # Let's use that as a hint that we're likely at the end of 476 # a full print statement and ship what we've got. 477 if output == '\n': 478 self._ship_file_chunks(name, cancel_ship_task=True) 479 else: 480 # By default just keep adding chunks. 481 # However we keep a timer running anytime we've got 482 # unshipped chunks so that we can ship what we've got 483 # after a short bit if we never get a newline. 484 ship_task = self._file_chunk_ship_task[name] 485 if ship_task is None: 486 self._file_chunk_ship_task[name] = ( 487 self._event_loop.create_task( 488 self._ship_chunks_task(name), 489 name='log ship file chunks', 490 ) 491 ) 492 493 except Exception: 494 import traceback 495 496 traceback.print_exc(file=self._echofile) 497 498 def file_flush(self, name: str) -> None: 499 """Send raw stdout/stderr flush to the logger to be collated.""" 500 501 self._event_loop.call_soon_threadsafe( 502 tpartial(self._file_flush_in_thread, name) 503 ) 504 505 def _file_flush_in_thread(self, name: str) -> None: 506 try: 507 assert name in ('stdout', 'stderr') 508 509 # Immediately ship whatever chunks we've got. 510 if self._file_chunks[name]: 511 self._ship_file_chunks(name, cancel_ship_task=True) 512 513 except Exception: 514 import traceback 515 516 traceback.print_exc(file=self._echofile) 517 518 async def _ship_chunks_task(self, name: str) -> None: 519 # Note: it's important we sleep here for a moment. Otherwise, 520 # things like '^^^^^^^^^^^^' lines in stack traces, which come 521 # through as lots of individual '^' writes, tend to get broken 522 # into lots of tiny little lines by us. 523 await asyncio.sleep(0.01) 524 self._ship_file_chunks(name, cancel_ship_task=False) 525 526 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 527 # Note: Raw print input generally ends in a newline, but that is 528 # redundant when we break things into log entries and results in 529 # extra empty lines. So strip off a single trailing newline if 530 # one is present. 531 text = ''.join(self._file_chunks[name]).removesuffix('\n') 532 533 self._emit_entry( 534 LogEntry( 535 name=name, message=text, level=LogLevel.INFO, time=utc_now() 536 ) 537 ) 538 self._file_chunks[name] = [] 539 ship_task = self._file_chunk_ship_task[name] 540 if cancel_ship_task and ship_task is not None: 541 ship_task.cancel() 542 self._file_chunk_ship_task[name] = None 543 544 def _emit_entry(self, entry: LogEntry) -> None: 545 assert current_thread() is self._thread 546 547 # Store to our cache. 548 if self._cache_size_limit > 0: 549 with self._cache_lock: 550 # Do a rough calc of how many bytes this entry consumes. 551 entry_size = sum( 552 sys.getsizeof(x) 553 for x in ( 554 entry, 555 entry.name, 556 entry.message, 557 entry.level, 558 entry.time, 559 ) 560 ) 561 self._cache.append((entry_size, entry)) 562 self._cache_size += entry_size 563 564 # Prune old until we are back at or under our limit. 565 while self._cache_size > self._cache_size_limit: 566 popped = self._cache.popleft() 567 self._cache_size -= popped[0] 568 self._cache_index_offset += 1 569 570 # Pass to callbacks. 571 for call in self._callbacks: 572 self._run_callback_on_entry(call, entry) 573 574 # Dump to our structured log file. 575 # TODO: should set a timer for flushing; don't flush every line. 576 if self._file is not None: 577 entry_s = dataclass_to_json(entry) 578 assert '\n' not in entry_s # Make sure its a single line. 579 print(entry_s, file=self._file, flush=True) 580 581 def _run_callback_on_entry( 582 self, callback: Callable[[LogEntry], None], entry: LogEntry 583 ) -> None: 584 """Run a callback and handle any errors.""" 585 try: 586 callback(entry) 587 except Exception: 588 # Only print the first callback error to avoid insanity. 589 if not self._printed_callback_error: 590 import traceback 591 592 traceback.print_exc(file=self._echofile) 593 self._printed_callback_error = True
Fancy-pants handler for logging output.
Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.
128 def __init__( 129 self, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 tpartial(self._add_callback_in_thread, call, feed_existing_logs) 183 )
Add a callback to be run for each LogEntry.
Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).
248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 )
Build and return an archive of cached log entries.
This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.
306 def call_in_thread(self, call: Callable[[], Any]) -> None: 307 """Submit a call to be run in the logging background thread.""" 308 self._event_loop.call_soon_threadsafe(call)
Submit a call to be run in the logging background thread.
310 @override 311 def emit(self, record: logging.LogRecord) -> None: 312 # pylint: disable=too-many-branches 313 if __debug__: 314 starttime = time.monotonic() 315 316 # Called by logging to send us records. 317 318 # TODO - kill this. 319 if ( 320 self._suppress_non_root_debug 321 and record.name != 'root' 322 and record.levelname == 'DEBUG' 323 ): 324 return 325 326 # Optimization: if our log args are all simple immutable values, 327 # we can just kick the whole thing over to our background thread to 328 # be formatted there at our leisure. If anything is mutable and 329 # thus could possibly change between now and then or if we want 330 # to do immediate file echoing then we need to bite the bullet 331 # and do that stuff here at the call site. 332 fast_path = self._echofile is None and self._is_immutable_log_data( 333 record.args 334 ) 335 336 # Note: just assuming types are correct here, but they'll be 337 # checked properly when the resulting LogEntry gets exported. 338 labels: dict[str, str] | None = getattr(record, 'labels', None) 339 if labels is None: 340 labels = {} 341 342 if fast_path: 343 if __debug__: 344 formattime = echotime = time.monotonic() 345 self._event_loop.call_soon_threadsafe( 346 tpartial( 347 self._emit_in_thread, 348 record.name, 349 record.levelno, 350 record.created, 351 record, 352 labels, 353 ) 354 ) 355 else: 356 # Slow case; do formatting and echoing here at the log call 357 # site. 358 msg = self.format(record) 359 360 if __debug__: 361 formattime = time.monotonic() 362 363 # Also immediately print pretty colored output to our echo file 364 # (generally stderr). We do this part here instead of in our bg 365 # thread because the delay can throw off command line prompts or 366 # make tight debugging harder. 367 if self._echofile is not None: 368 ends = LEVELNO_COLOR_CODES.get(record.levelno) 369 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 370 if ends is not None: 371 self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n') 372 else: 373 self._echofile.write(f'{namepre}{msg}\n') 374 self._echofile.flush() 375 376 if __debug__: 377 echotime = time.monotonic() 378 379 self._event_loop.call_soon_threadsafe( 380 tpartial( 381 self._emit_in_thread, 382 record.name, 383 record.levelno, 384 record.created, 385 msg, 386 labels, 387 ) 388 ) 389 390 if __debug__: 391 # Make noise if we're taking a significant amount of time here. 392 # Limit the noise to once every so often though; otherwise we 393 # could get a feedback loop where every log emit results in a 394 # warning log which results in another, etc. 395 now = time.monotonic() 396 # noinspection PyUnboundLocalVariable 397 duration = now - starttime # pyright: ignore 398 # noinspection PyUnboundLocalVariable 399 format_duration = formattime - starttime # pyright: ignore 400 # noinspection PyUnboundLocalVariable 401 echo_duration = echotime - formattime # pyright: ignore 402 if duration > 0.05 and ( 403 self._last_slow_emit_warning_time is None 404 or now > self._last_slow_emit_warning_time + 10.0 405 ): 406 # Logging calls from *within* a logging handler 407 # sounds sketchy, so let's just kick this over to 408 # the bg event loop thread we've already got. 409 self._last_slow_emit_warning_time = now 410 self._event_loop.call_soon_threadsafe( 411 tpartial( 412 logging.warning, 413 'efro.log.LogHandler emit took too long' 414 ' (%.2fs total; %.2fs format, %.2fs echo,' 415 ' fast_path=%s).', 416 duration, 417 format_duration, 418 echo_duration, 419 fast_path, 420 ) 421 )
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
452 def file_write(self, name: str, output: str) -> None: 453 """Send raw stdout/stderr output to the logger to be collated.""" 454 455 # Note to self: it turns out that things like '^^^^^^^^^^^^^^' 456 # lines in stack traces get written as lots of individual '^' 457 # writes. It feels a bit dirty to be pushing a deferred call to 458 # another thread for each character. Perhaps should do some sort 459 # of basic accumulation here? 460 self._event_loop.call_soon_threadsafe( 461 tpartial(self._file_write_in_thread, name, output) 462 )
Send raw stdout/stderr output to the logger to be collated.
498 def file_flush(self, name: str) -> None: 499 """Send raw stdout/stderr flush to the logger to be collated.""" 500 501 self._event_loop.call_soon_threadsafe( 502 tpartial(self._file_flush_in_thread, name) 503 )
Send raw stdout/stderr flush to the logger to be collated.
Inherited Members
- logging.Handler
- level
- formatter
- get_name
- set_name
- name
- createLock
- acquire
- release
- setLevel
- format
- handle
- setFormatter
- flush
- close
- handleError
- logging.Filterer
- filters
- addFilter
- removeFilter
- filter
596class FileLogEcho: 597 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 598 599 def __init__( 600 self, original: TextIO, name: str, handler: LogHandler 601 ) -> None: 602 assert name in ('stdout', 'stderr') 603 self._original = original 604 self._name = name 605 self._handler = handler 606 607 def write(self, output: Any) -> None: 608 """Override standard write call.""" 609 self._original.write(output) 610 self._handler.file_write(self._name, output) 611 612 def flush(self) -> None: 613 """Flush the file.""" 614 self._original.flush() 615 616 # We also use this as a hint to ship whatever file chunks 617 # we've accumulated (we have to try and be smart about breaking 618 # our arbitrary file output into discrete entries). 619 self._handler.file_flush(self._name) 620 621 def isatty(self) -> bool: 622 """Are we a terminal?""" 623 return self._original.isatty()
A file-like object for forwarding stdout/stderr to a LogHandler.
607 def write(self, output: Any) -> None: 608 """Override standard write call.""" 609 self._original.write(output) 610 self._handler.file_write(self._name, output)
Override standard write call.
612 def flush(self) -> None: 613 """Flush the file.""" 614 self._original.flush() 615 616 # We also use this as a hint to ship whatever file chunks 617 # we've accumulated (we have to try and be smart about breaking 618 # our arbitrary file output into discrete entries). 619 self._handler.file_flush(self._name)
Flush the file.
626def setup_logging( 627 log_path: str | Path | None, 628 level: LogLevel, 629 suppress_non_root_debug: bool = False, 630 log_stdout_stderr: bool = False, 631 echo_to_stderr: bool = True, 632 cache_size_limit: int = 0, 633 cache_time_limit: datetime.timedelta | None = None, 634) -> LogHandler: 635 """Set up our logging environment. 636 637 Returns the custom handler which can be used to fetch information 638 about logs that have passed through it. (worst log-levels, caches, etc.). 639 """ 640 641 lmap = { 642 LogLevel.DEBUG: logging.DEBUG, 643 LogLevel.INFO: logging.INFO, 644 LogLevel.WARNING: logging.WARNING, 645 LogLevel.ERROR: logging.ERROR, 646 LogLevel.CRITICAL: logging.CRITICAL, 647 } 648 649 # Wire logger output to go to a structured log file. 650 # Also echo it to stderr IF we're running in a terminal. 651 # UPDATE: Actually gonna always go to stderr. Is there a 652 # reason we shouldn't? This makes debugging possible if all 653 # we have is access to a non-interactive terminal or file dump. 654 # We could add a '--quiet' arg or whatnot to change this behavior. 655 656 # Note: by passing in the *original* stderr here before we 657 # (potentially) replace it, we ensure that our log echos 658 # won't themselves be intercepted and sent to the logger 659 # which would create an infinite loop. 660 loghandler = LogHandler( 661 log_path, 662 echofile=sys.stderr if echo_to_stderr else None, 663 suppress_non_root_debug=suppress_non_root_debug, 664 cache_size_limit=cache_size_limit, 665 cache_time_limit=cache_time_limit, 666 ) 667 668 # Note: going ahead with force=True here so that we replace any 669 # existing logger. Though we warn if it looks like we are doing 670 # that so we can try to avoid creating the first one. 671 had_previous_handlers = bool(logging.root.handlers) 672 logging.basicConfig( 673 level=lmap[level], 674 # format='%(name)s: %(message)s', 675 # We dump *only* the message here. We pass various log record bits 676 # around and format things fancier where they end up. 677 format='%(message)s', 678 handlers=[loghandler], 679 force=True, 680 ) 681 if had_previous_handlers: 682 logging.warning( 683 'setup_logging: Replacing existing handlers.' 684 ' Something may have logged before expected.' 685 ) 686 687 # Optionally intercept Python's stdout/stderr output and generate 688 # log entries from it. 689 if log_stdout_stderr: 690 sys.stdout = FileLogEcho( # type: ignore 691 sys.stdout, 'stdout', loghandler 692 ) 693 sys.stderr = FileLogEcho( # type: ignore 694 sys.stderr, 'stderr', loghandler 695 ) 696 697 return loghandler
Set up our logging environment.
Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).