efro.log
Logging functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Logging functionality.""" 4from __future__ import annotations 5 6import sys 7import time 8import asyncio 9import logging 10import datetime 11import itertools 12from enum import Enum 13from collections import deque 14from dataclasses import dataclass, field 15from typing import TYPE_CHECKING, Annotated 16from threading import Thread, current_thread, Lock 17 18from efro.util import utc_now 19from efro.call import tpartial 20from efro.terminal import Clr 21from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json 22 23if TYPE_CHECKING: 24 from pathlib import Path 25 from typing import Any, Callable, TextIO 26 27 28class LogLevel(Enum): 29 """Severity level for a log entry. 30 31 These enums have numeric values so they can be compared in severity. 32 Note that these values are not currently interchangeable with the 33 logging.ERROR, logging.DEBUG, etc. values. 34 """ 35 36 DEBUG = 0 37 INFO = 1 38 WARNING = 2 39 ERROR = 3 40 CRITICAL = 4 41 42 @property 43 def python_logging_level(self) -> int: 44 """Give the corresponding logging level.""" 45 return LOG_LEVEL_LEVELNOS[self] 46 47 @classmethod 48 def from_python_logging_level(cls, levelno: int) -> LogLevel: 49 """Given a Python logging level, return a LogLevel.""" 50 return LEVELNO_LOG_LEVELS[levelno] 51 52 53# Python logging levels from LogLevels 54LOG_LEVEL_LEVELNOS = { 55 LogLevel.DEBUG: logging.DEBUG, 56 LogLevel.INFO: logging.INFO, 57 LogLevel.WARNING: logging.WARNING, 58 LogLevel.ERROR: logging.ERROR, 59 LogLevel.CRITICAL: logging.CRITICAL, 60} 61 62# LogLevels from Python logging levels 63LEVELNO_LOG_LEVELS = { 64 logging.DEBUG: LogLevel.DEBUG, 65 logging.INFO: LogLevel.INFO, 66 logging.WARNING: LogLevel.WARNING, 67 logging.ERROR: LogLevel.ERROR, 68 logging.CRITICAL: LogLevel.CRITICAL, 69} 70 71LEVELNO_COLOR_CODES: dict[int, tuple[str, str]] = { 72 logging.DEBUG: (Clr.CYN, Clr.RST), 73 logging.INFO: ('', ''), 74 logging.WARNING: (Clr.YLW, Clr.RST), 75 logging.ERROR: (Clr.RED, Clr.RST), 76 logging.CRITICAL: (Clr.SMAG + Clr.BLD + Clr.BLK, Clr.RST), 77} 78 79 80@ioprepped 81@dataclass 82class LogEntry: 83 """Single logged message.""" 84 85 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 86 message: Annotated[str, IOAttrs('m')] 87 level: Annotated[LogLevel, IOAttrs('l')] 88 time: Annotated[datetime.datetime, IOAttrs('t')] 89 90 # We support arbitrary string labels per log entry which can be 91 # incorporated into custom log processing. To populate this, our 92 # LogHandler class looks for a 'labels' dict passed in the optional 93 # 'extra' dict arg to standard Python log calls. 94 labels: Annotated[ 95 dict[str, str], IOAttrs('la', store_default=False) 96 ] = field(default_factory=dict) 97 98 99@ioprepped 100@dataclass 101class LogArchive: 102 """Info and data for a log.""" 103 104 # Total number of entries submitted to the log. 105 log_size: Annotated[int, IOAttrs('t')] 106 107 # Offset for the entries contained here. 108 # (10 means our first entry is the 10th in the log, etc.) 109 start_index: Annotated[int, IOAttrs('c')] 110 111 entries: Annotated[list[LogEntry], IOAttrs('e')] 112 113 114class LogHandler(logging.Handler): 115 """Fancy-pants handler for logging output. 116 117 Writes logs to disk in structured json format and echoes them 118 to stdout/stderr with pretty colors. 119 """ 120 121 _event_loop: asyncio.AbstractEventLoop 122 123 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 124 # Otherwise we can get infinite loops as those prints come back to us 125 # as new log entries. 126 127 def __init__( 128 self, 129 path: str | Path | None, 130 echofile: TextIO | None, 131 suppress_non_root_debug: bool, 132 cache_size_limit: int, 133 cache_time_limit: datetime.timedelta | None, 134 ): 135 super().__init__() 136 # pylint: disable=consider-using-with 137 self._file = None if path is None else open(path, 'w', encoding='utf-8') 138 self._echofile = echofile 139 self._callbacks: list[Callable[[LogEntry], None]] = [] 140 self._suppress_non_root_debug = suppress_non_root_debug 141 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 142 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 143 'stdout': None, 144 'stderr': None, 145 } 146 self._cache_size = 0 147 assert cache_size_limit >= 0 148 self._cache_size_limit = cache_size_limit 149 self._cache_time_limit = cache_time_limit 150 self._cache = deque[tuple[int, LogEntry]]() 151 self._cache_index_offset = 0 152 self._cache_lock = Lock() 153 self._printed_callback_error = False 154 self._thread_bootstrapped = False 155 self._thread = Thread(target=self._log_thread_main, daemon=True) 156 if __debug__: 157 self._last_slow_emit_warning_time: float | None = None 158 self._thread.start() 159 160 # Spin until our thread is up and running; otherwise we could 161 # wind up trying to push stuff to our event loop before the 162 # loop exists. 163 while not self._thread_bootstrapped: 164 time.sleep(0.001) 165 166 def add_callback( 167 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 168 ) -> None: 169 """Add a callback to be run for each LogEntry. 170 171 Note that this callback will always run in a background thread. 172 Passing True for feed_existing_logs will cause all cached logs 173 in the handler to be fed to the callback (still in the 174 background thread though). 175 """ 176 177 # Kick this over to our bg thread to add the callback and 178 # process cached entries at the same time to ensure there are no 179 # race conditions that could cause entries to be skipped/etc. 180 self._event_loop.call_soon_threadsafe( 181 tpartial(self._add_callback_in_thread, call, feed_existing_logs) 182 ) 183 184 def _add_callback_in_thread( 185 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 186 ) -> None: 187 """Add a callback to be run for each LogEntry. 188 189 Note that this callback will always run in a background thread. 190 Passing True for feed_existing_logs will cause all cached logs 191 in the handler to be fed to the callback (still in the 192 background thread though). 193 """ 194 assert current_thread() is self._thread 195 self._callbacks.append(call) 196 197 # Run all of our cached entries through the new callback if desired. 198 if feed_existing_logs and self._cache_size_limit > 0: 199 with self._cache_lock: 200 for _id, entry in self._cache: 201 self._run_callback_on_entry(call, entry) 202 203 def _log_thread_main(self) -> None: 204 self._event_loop = asyncio.new_event_loop() 205 206 # In our background thread event loop we do a fair amount of 207 # slow synchronous stuff such as mucking with the log cache. 208 # Let's avoid getting tons of warnings about this in debug mode. 209 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 210 211 # NOTE: if we ever use default threadpool at all we should allow 212 # setting it for our loop. 213 asyncio.set_event_loop(self._event_loop) 214 self._thread_bootstrapped = True 215 try: 216 if self._cache_time_limit is not None: 217 _prunetask = self._event_loop.create_task( 218 self._time_prune_cache() 219 ) 220 self._event_loop.run_forever() 221 except BaseException: 222 # If this ever goes down we're in trouble; we won't be able 223 # to log about it though. Try to make some noise however we 224 # can. 225 print('LogHandler died!!!', file=sys.stderr) 226 import traceback 227 228 traceback.print_exc() 229 raise 230 231 async def _time_prune_cache(self) -> None: 232 assert self._cache_time_limit is not None 233 while bool(True): 234 await asyncio.sleep(61.27) 235 now = utc_now() 236 with self._cache_lock: 237 # Prune the oldest entry as long as there is a first one that 238 # is too old. 239 while ( 240 self._cache 241 and (now - self._cache[0][1].time) >= self._cache_time_limit 242 ): 243 popped = self._cache.popleft() 244 self._cache_size -= popped[0] 245 self._cache_index_offset += 1 246 247 def get_cached( 248 self, start_index: int = 0, max_entries: int | None = None 249 ) -> LogArchive: 250 """Build and return an archive of cached log entries. 251 252 This will only include entries that have been processed by the 253 background thread, so may not include just-submitted logs or 254 entries for partially written stdout/stderr lines. 255 Entries from the range [start_index:start_index+max_entries] 256 which are still present in the cache will be returned. 257 """ 258 259 assert start_index >= 0 260 if max_entries is not None: 261 assert max_entries >= 0 262 with self._cache_lock: 263 # Transform start_index to our present cache space. 264 start_index -= self._cache_index_offset 265 # Calc end-index in our present cache space. 266 end_index = ( 267 len(self._cache) 268 if max_entries is None 269 else start_index + max_entries 270 ) 271 272 # Clamp both indexes to both ends of our present space. 273 start_index = max(0, min(start_index, len(self._cache))) 274 end_index = max(0, min(end_index, len(self._cache))) 275 276 return LogArchive( 277 log_size=self._cache_index_offset + len(self._cache), 278 start_index=start_index + self._cache_index_offset, 279 entries=self._cache_slice(start_index, end_index), 280 ) 281 282 def _cache_slice( 283 self, start: int, end: int, step: int = 1 284 ) -> list[LogEntry]: 285 # Deque doesn't natively support slicing but we can do it manually. 286 # It sounds like rotating the deque and pulling from the beginning 287 # is the most efficient way to do this. The downside is the deque 288 # gets temporarily modified in the process so we need to make sure 289 # we're holding the lock. 290 assert self._cache_lock.locked() 291 cache = self._cache 292 cache.rotate(-start) 293 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 294 cache.rotate(start) 295 return slc 296 297 @classmethod 298 def _is_immutable_log_data(cls, data: Any) -> bool: 299 if isinstance(data, (str, bool, int, float, bytes)): 300 return True 301 if isinstance(data, tuple): 302 return all(cls._is_immutable_log_data(x) for x in data) 303 return False 304 305 def emit(self, record: logging.LogRecord) -> None: 306 # pylint: disable=too-many-branches 307 if __debug__: 308 starttime = time.monotonic() 309 310 # Called by logging to send us records. 311 312 # TODO - kill this. 313 if ( 314 self._suppress_non_root_debug 315 and record.name != 'root' 316 and record.levelname == 'DEBUG' 317 ): 318 return 319 320 # Optimization: if our log args are all simple immutable values, 321 # we can just kick the whole thing over to our background thread to 322 # be formatted there at our leisure. If anything is mutable and 323 # thus could possibly change between now and then or if we want 324 # to do immediate file echoing then we need to bite the bullet 325 # and do that stuff here at the call site. 326 fast_path = self._echofile is None and self._is_immutable_log_data( 327 record.args 328 ) 329 330 # Note: just assuming types are correct here, but they'll be 331 # checked properly when the resulting LogEntry gets exported. 332 labels: dict[str, str] | None = getattr(record, 'labels', None) 333 if labels is None: 334 labels = {} 335 336 if fast_path: 337 if __debug__: 338 formattime = echotime = time.monotonic() 339 self._event_loop.call_soon_threadsafe( 340 tpartial( 341 self._emit_in_thread, 342 record.name, 343 record.levelno, 344 record.created, 345 record, 346 labels, 347 ) 348 ) 349 else: 350 # Slow case; do formatting and echoing here at the log call 351 # site. 352 msg = self.format(record) 353 354 if __debug__: 355 formattime = time.monotonic() 356 357 # Also immediately print pretty colored output to our echo file 358 # (generally stderr). We do this part here instead of in our bg 359 # thread because the delay can throw off command line prompts or 360 # make tight debugging harder. 361 if self._echofile is not None: 362 ends = LEVELNO_COLOR_CODES.get(record.levelno) 363 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 364 if ends is not None: 365 self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n') 366 else: 367 self._echofile.write(f'{namepre}{msg}\n') 368 self._echofile.flush() 369 370 if __debug__: 371 echotime = time.monotonic() 372 373 self._event_loop.call_soon_threadsafe( 374 tpartial( 375 self._emit_in_thread, 376 record.name, 377 record.levelno, 378 record.created, 379 msg, 380 labels, 381 ) 382 ) 383 384 if __debug__: 385 # Make noise if we're taking a significant amount of time here. 386 # Limit the noise to once every so often though; otherwise we 387 # could get a feedback loop where every log emit results in a 388 # warning log which results in another, etc. 389 now = time.monotonic() 390 # noinspection PyUnboundLocalVariable 391 duration = now - starttime # pyright: ignore 392 # noinspection PyUnboundLocalVariable 393 format_duration = formattime - starttime # pyright: ignore 394 # noinspection PyUnboundLocalVariable 395 echo_duration = echotime - formattime # pyright: ignore 396 if duration > 0.05 and ( 397 self._last_slow_emit_warning_time is None 398 or now > self._last_slow_emit_warning_time + 10.0 399 ): 400 # Logging calls from *within* a logging handler 401 # sounds sketchy, so let's just kick this over to 402 # the bg event loop thread we've already got. 403 self._last_slow_emit_warning_time = now 404 self._event_loop.call_soon_threadsafe( 405 tpartial( 406 logging.warning, 407 'efro.log.LogHandler emit took too long' 408 ' (%.2fs total; %.2fs format, %.2fs echo,' 409 ' fast_path=%s).', 410 duration, 411 format_duration, 412 echo_duration, 413 fast_path, 414 ) 415 ) 416 417 def _emit_in_thread( 418 self, 419 name: str, 420 levelno: int, 421 created: float, 422 message: str | logging.LogRecord, 423 labels: dict[str, str], 424 ) -> None: 425 try: 426 # If they passed a raw record here, bake it down to a string. 427 if isinstance(message, logging.LogRecord): 428 message = self.format(message) 429 430 self._emit_entry( 431 LogEntry( 432 name=name, 433 message=message, 434 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 435 time=datetime.datetime.fromtimestamp( 436 created, datetime.timezone.utc 437 ), 438 labels=labels, 439 ) 440 ) 441 except Exception: 442 import traceback 443 444 traceback.print_exc(file=self._echofile) 445 446 def file_write(self, name: str, output: str) -> None: 447 """Send raw stdout/stderr output to the logger to be collated.""" 448 449 self._event_loop.call_soon_threadsafe( 450 tpartial(self._file_write_in_thread, name, output) 451 ) 452 453 def _file_write_in_thread(self, name: str, output: str) -> None: 454 try: 455 assert name in ('stdout', 'stderr') 456 457 # Here we try to be somewhat smart about breaking arbitrary 458 # print output into discrete log entries. 459 460 self._file_chunks[name].append(output) 461 462 # Individual parts of a print come across as separate writes, 463 # and the end of a print will be a standalone '\n' by default. 464 # Let's use that as a hint that we're likely at the end of 465 # a full print statement and ship what we've got. 466 if output == '\n': 467 self._ship_file_chunks(name, cancel_ship_task=True) 468 else: 469 # By default just keep adding chunks. 470 # However we keep a timer running anytime we've got 471 # unshipped chunks so that we can ship what we've got 472 # after a short bit if we never get a newline. 473 ship_task = self._file_chunk_ship_task[name] 474 if ship_task is None: 475 self._file_chunk_ship_task[ 476 name 477 ] = self._event_loop.create_task( 478 self._ship_chunks_task(name), 479 name='log ship file chunks', 480 ) 481 482 except Exception: 483 import traceback 484 485 traceback.print_exc(file=self._echofile) 486 487 def file_flush(self, name: str) -> None: 488 """Send raw stdout/stderr flush to the logger to be collated.""" 489 490 self._event_loop.call_soon_threadsafe( 491 tpartial(self._file_flush_in_thread, name) 492 ) 493 494 def _file_flush_in_thread(self, name: str) -> None: 495 try: 496 assert name in ('stdout', 'stderr') 497 498 # Immediately ship whatever chunks we've got. 499 if self._file_chunks[name]: 500 self._ship_file_chunks(name, cancel_ship_task=True) 501 502 except Exception: 503 import traceback 504 505 traceback.print_exc(file=self._echofile) 506 507 async def _ship_chunks_task(self, name: str) -> None: 508 self._ship_file_chunks(name, cancel_ship_task=False) 509 510 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 511 # Note: Raw print input generally ends in a newline, but that is 512 # redundant when we break things into log entries and results 513 # in extra empty lines. So strip off a single trailing newline. 514 text = ''.join(self._file_chunks[name]).removesuffix('\n') 515 516 self._emit_entry( 517 LogEntry( 518 name=name, message=text, level=LogLevel.INFO, time=utc_now() 519 ) 520 ) 521 self._file_chunks[name] = [] 522 ship_task = self._file_chunk_ship_task[name] 523 if cancel_ship_task and ship_task is not None: 524 ship_task.cancel() 525 self._file_chunk_ship_task[name] = None 526 527 def _emit_entry(self, entry: LogEntry) -> None: 528 assert current_thread() is self._thread 529 530 # Store to our cache. 531 if self._cache_size_limit > 0: 532 with self._cache_lock: 533 # Do a rough calc of how many bytes this entry consumes. 534 entry_size = sum( 535 sys.getsizeof(x) 536 for x in ( 537 entry, 538 entry.name, 539 entry.message, 540 entry.level, 541 entry.time, 542 ) 543 ) 544 self._cache.append((entry_size, entry)) 545 self._cache_size += entry_size 546 547 # Prune old until we are back at or under our limit. 548 while self._cache_size > self._cache_size_limit: 549 popped = self._cache.popleft() 550 self._cache_size -= popped[0] 551 self._cache_index_offset += 1 552 553 # Pass to callbacks. 554 for call in self._callbacks: 555 self._run_callback_on_entry(call, entry) 556 557 # Dump to our structured log file. 558 # TODO: should set a timer for flushing; don't flush every line. 559 if self._file is not None: 560 entry_s = dataclass_to_json(entry) 561 assert '\n' not in entry_s # Make sure its a single line. 562 print(entry_s, file=self._file, flush=True) 563 564 def _run_callback_on_entry( 565 self, callback: Callable[[LogEntry], None], entry: LogEntry 566 ) -> None: 567 """Run a callback and handle any errors.""" 568 try: 569 callback(entry) 570 except Exception: 571 # Only print the first callback error to avoid insanity. 572 if not self._printed_callback_error: 573 import traceback 574 575 traceback.print_exc(file=self._echofile) 576 self._printed_callback_error = True 577 578 579class FileLogEcho: 580 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 581 582 def __init__( 583 self, original: TextIO, name: str, handler: LogHandler 584 ) -> None: 585 assert name in ('stdout', 'stderr') 586 self._original = original 587 self._name = name 588 self._handler = handler 589 590 def write(self, output: Any) -> None: 591 """Override standard write call.""" 592 self._original.write(output) 593 self._handler.file_write(self._name, output) 594 595 def flush(self) -> None: 596 """Flush the file.""" 597 self._original.flush() 598 599 # We also use this as a hint to ship whatever file chunks 600 # we've accumulated (we have to try and be smart about breaking 601 # our arbitrary file output into discrete entries). 602 self._handler.file_flush(self._name) 603 604 def isatty(self) -> bool: 605 """Are we a terminal?""" 606 return self._original.isatty() 607 608 609def setup_logging( 610 log_path: str | Path | None, 611 level: LogLevel, 612 suppress_non_root_debug: bool = False, 613 log_stdout_stderr: bool = False, 614 echo_to_stderr: bool = True, 615 cache_size_limit: int = 0, 616 cache_time_limit: datetime.timedelta | None = None, 617) -> LogHandler: 618 """Set up our logging environment. 619 620 Returns the custom handler which can be used to fetch information 621 about logs that have passed through it. (worst log-levels, caches, etc.). 622 """ 623 624 lmap = { 625 LogLevel.DEBUG: logging.DEBUG, 626 LogLevel.INFO: logging.INFO, 627 LogLevel.WARNING: logging.WARNING, 628 LogLevel.ERROR: logging.ERROR, 629 LogLevel.CRITICAL: logging.CRITICAL, 630 } 631 632 # Wire logger output to go to a structured log file. 633 # Also echo it to stderr IF we're running in a terminal. 634 # UPDATE: Actually gonna always go to stderr. Is there a 635 # reason we shouldn't? This makes debugging possible if all 636 # we have is access to a non-interactive terminal or file dump. 637 # We could add a '--quiet' arg or whatnot to change this behavior. 638 639 # Note: by passing in the *original* stderr here before we 640 # (potentially) replace it, we ensure that our log echos 641 # won't themselves be intercepted and sent to the logger 642 # which would create an infinite loop. 643 loghandler = LogHandler( 644 log_path, 645 echofile=sys.stderr if echo_to_stderr else None, 646 suppress_non_root_debug=suppress_non_root_debug, 647 cache_size_limit=cache_size_limit, 648 cache_time_limit=cache_time_limit, 649 ) 650 651 # Note: going ahead with force=True here so that we replace any 652 # existing logger. Though we warn if it looks like we are doing 653 # that so we can try to avoid creating the first one. 654 had_previous_handlers = bool(logging.root.handlers) 655 logging.basicConfig( 656 level=lmap[level], 657 # format='%(name)s: %(message)s', 658 # We dump *only* the message here. We pass various log record bits 659 # around and format things fancier where they end up. 660 format='%(message)s', 661 handlers=[loghandler], 662 force=True, 663 ) 664 if had_previous_handlers: 665 logging.warning( 666 'setup_logging: Replacing existing handlers.' 667 ' Something may have logged before expected.' 668 ) 669 670 # Optionally intercept Python's stdout/stderr output and generate 671 # log entries from it. 672 if log_stdout_stderr: 673 sys.stdout = FileLogEcho( # type: ignore 674 sys.stdout, 'stdout', loghandler 675 ) 676 sys.stderr = FileLogEcho( # type: ignore 677 sys.stderr, 'stderr', loghandler 678 ) 679 680 return loghandler
29class LogLevel(Enum): 30 """Severity level for a log entry. 31 32 These enums have numeric values so they can be compared in severity. 33 Note that these values are not currently interchangeable with the 34 logging.ERROR, logging.DEBUG, etc. values. 35 """ 36 37 DEBUG = 0 38 INFO = 1 39 WARNING = 2 40 ERROR = 3 41 CRITICAL = 4 42 43 @property 44 def python_logging_level(self) -> int: 45 """Give the corresponding logging level.""" 46 return LOG_LEVEL_LEVELNOS[self] 47 48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Severity level for a log entry.
These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.
48 @classmethod 49 def from_python_logging_level(cls, levelno: int) -> LogLevel: 50 """Given a Python logging level, return a LogLevel.""" 51 return LEVELNO_LOG_LEVELS[levelno]
Given a Python logging level, return a LogLevel.
Inherited Members
- enum.Enum
- name
- value
81@ioprepped 82@dataclass 83class LogEntry: 84 """Single logged message.""" 85 86 name: Annotated[str, IOAttrs('n', soft_default='root', store_default=False)] 87 message: Annotated[str, IOAttrs('m')] 88 level: Annotated[LogLevel, IOAttrs('l')] 89 time: Annotated[datetime.datetime, IOAttrs('t')] 90 91 # We support arbitrary string labels per log entry which can be 92 # incorporated into custom log processing. To populate this, our 93 # LogHandler class looks for a 'labels' dict passed in the optional 94 # 'extra' dict arg to standard Python log calls. 95 labels: Annotated[ 96 dict[str, str], IOAttrs('la', store_default=False) 97 ] = field(default_factory=dict)
Single logged message.
100@ioprepped 101@dataclass 102class LogArchive: 103 """Info and data for a log.""" 104 105 # Total number of entries submitted to the log. 106 log_size: Annotated[int, IOAttrs('t')] 107 108 # Offset for the entries contained here. 109 # (10 means our first entry is the 10th in the log, etc.) 110 start_index: Annotated[int, IOAttrs('c')] 111 112 entries: Annotated[list[LogEntry], IOAttrs('e')]
Info and data for a log.
115class LogHandler(logging.Handler): 116 """Fancy-pants handler for logging output. 117 118 Writes logs to disk in structured json format and echoes them 119 to stdout/stderr with pretty colors. 120 """ 121 122 _event_loop: asyncio.AbstractEventLoop 123 124 # IMPORTANT: Any debug prints we do here should ONLY go to echofile. 125 # Otherwise we can get infinite loops as those prints come back to us 126 # as new log entries. 127 128 def __init__( 129 self, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001) 166 167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 tpartial(self._add_callback_in_thread, call, feed_existing_logs) 183 ) 184 185 def _add_callback_in_thread( 186 self, call: Callable[[LogEntry], None], feed_existing_logs: bool 187 ) -> None: 188 """Add a callback to be run for each LogEntry. 189 190 Note that this callback will always run in a background thread. 191 Passing True for feed_existing_logs will cause all cached logs 192 in the handler to be fed to the callback (still in the 193 background thread though). 194 """ 195 assert current_thread() is self._thread 196 self._callbacks.append(call) 197 198 # Run all of our cached entries through the new callback if desired. 199 if feed_existing_logs and self._cache_size_limit > 0: 200 with self._cache_lock: 201 for _id, entry in self._cache: 202 self._run_callback_on_entry(call, entry) 203 204 def _log_thread_main(self) -> None: 205 self._event_loop = asyncio.new_event_loop() 206 207 # In our background thread event loop we do a fair amount of 208 # slow synchronous stuff such as mucking with the log cache. 209 # Let's avoid getting tons of warnings about this in debug mode. 210 self._event_loop.slow_callback_duration = 2.0 # Default is 0.1 211 212 # NOTE: if we ever use default threadpool at all we should allow 213 # setting it for our loop. 214 asyncio.set_event_loop(self._event_loop) 215 self._thread_bootstrapped = True 216 try: 217 if self._cache_time_limit is not None: 218 _prunetask = self._event_loop.create_task( 219 self._time_prune_cache() 220 ) 221 self._event_loop.run_forever() 222 except BaseException: 223 # If this ever goes down we're in trouble; we won't be able 224 # to log about it though. Try to make some noise however we 225 # can. 226 print('LogHandler died!!!', file=sys.stderr) 227 import traceback 228 229 traceback.print_exc() 230 raise 231 232 async def _time_prune_cache(self) -> None: 233 assert self._cache_time_limit is not None 234 while bool(True): 235 await asyncio.sleep(61.27) 236 now = utc_now() 237 with self._cache_lock: 238 # Prune the oldest entry as long as there is a first one that 239 # is too old. 240 while ( 241 self._cache 242 and (now - self._cache[0][1].time) >= self._cache_time_limit 243 ): 244 popped = self._cache.popleft() 245 self._cache_size -= popped[0] 246 self._cache_index_offset += 1 247 248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 ) 282 283 def _cache_slice( 284 self, start: int, end: int, step: int = 1 285 ) -> list[LogEntry]: 286 # Deque doesn't natively support slicing but we can do it manually. 287 # It sounds like rotating the deque and pulling from the beginning 288 # is the most efficient way to do this. The downside is the deque 289 # gets temporarily modified in the process so we need to make sure 290 # we're holding the lock. 291 assert self._cache_lock.locked() 292 cache = self._cache 293 cache.rotate(-start) 294 slc = [e[1] for e in itertools.islice(cache, 0, end - start, step)] 295 cache.rotate(start) 296 return slc 297 298 @classmethod 299 def _is_immutable_log_data(cls, data: Any) -> bool: 300 if isinstance(data, (str, bool, int, float, bytes)): 301 return True 302 if isinstance(data, tuple): 303 return all(cls._is_immutable_log_data(x) for x in data) 304 return False 305 306 def emit(self, record: logging.LogRecord) -> None: 307 # pylint: disable=too-many-branches 308 if __debug__: 309 starttime = time.monotonic() 310 311 # Called by logging to send us records. 312 313 # TODO - kill this. 314 if ( 315 self._suppress_non_root_debug 316 and record.name != 'root' 317 and record.levelname == 'DEBUG' 318 ): 319 return 320 321 # Optimization: if our log args are all simple immutable values, 322 # we can just kick the whole thing over to our background thread to 323 # be formatted there at our leisure. If anything is mutable and 324 # thus could possibly change between now and then or if we want 325 # to do immediate file echoing then we need to bite the bullet 326 # and do that stuff here at the call site. 327 fast_path = self._echofile is None and self._is_immutable_log_data( 328 record.args 329 ) 330 331 # Note: just assuming types are correct here, but they'll be 332 # checked properly when the resulting LogEntry gets exported. 333 labels: dict[str, str] | None = getattr(record, 'labels', None) 334 if labels is None: 335 labels = {} 336 337 if fast_path: 338 if __debug__: 339 formattime = echotime = time.monotonic() 340 self._event_loop.call_soon_threadsafe( 341 tpartial( 342 self._emit_in_thread, 343 record.name, 344 record.levelno, 345 record.created, 346 record, 347 labels, 348 ) 349 ) 350 else: 351 # Slow case; do formatting and echoing here at the log call 352 # site. 353 msg = self.format(record) 354 355 if __debug__: 356 formattime = time.monotonic() 357 358 # Also immediately print pretty colored output to our echo file 359 # (generally stderr). We do this part here instead of in our bg 360 # thread because the delay can throw off command line prompts or 361 # make tight debugging harder. 362 if self._echofile is not None: 363 ends = LEVELNO_COLOR_CODES.get(record.levelno) 364 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 365 if ends is not None: 366 self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n') 367 else: 368 self._echofile.write(f'{namepre}{msg}\n') 369 self._echofile.flush() 370 371 if __debug__: 372 echotime = time.monotonic() 373 374 self._event_loop.call_soon_threadsafe( 375 tpartial( 376 self._emit_in_thread, 377 record.name, 378 record.levelno, 379 record.created, 380 msg, 381 labels, 382 ) 383 ) 384 385 if __debug__: 386 # Make noise if we're taking a significant amount of time here. 387 # Limit the noise to once every so often though; otherwise we 388 # could get a feedback loop where every log emit results in a 389 # warning log which results in another, etc. 390 now = time.monotonic() 391 # noinspection PyUnboundLocalVariable 392 duration = now - starttime # pyright: ignore 393 # noinspection PyUnboundLocalVariable 394 format_duration = formattime - starttime # pyright: ignore 395 # noinspection PyUnboundLocalVariable 396 echo_duration = echotime - formattime # pyright: ignore 397 if duration > 0.05 and ( 398 self._last_slow_emit_warning_time is None 399 or now > self._last_slow_emit_warning_time + 10.0 400 ): 401 # Logging calls from *within* a logging handler 402 # sounds sketchy, so let's just kick this over to 403 # the bg event loop thread we've already got. 404 self._last_slow_emit_warning_time = now 405 self._event_loop.call_soon_threadsafe( 406 tpartial( 407 logging.warning, 408 'efro.log.LogHandler emit took too long' 409 ' (%.2fs total; %.2fs format, %.2fs echo,' 410 ' fast_path=%s).', 411 duration, 412 format_duration, 413 echo_duration, 414 fast_path, 415 ) 416 ) 417 418 def _emit_in_thread( 419 self, 420 name: str, 421 levelno: int, 422 created: float, 423 message: str | logging.LogRecord, 424 labels: dict[str, str], 425 ) -> None: 426 try: 427 # If they passed a raw record here, bake it down to a string. 428 if isinstance(message, logging.LogRecord): 429 message = self.format(message) 430 431 self._emit_entry( 432 LogEntry( 433 name=name, 434 message=message, 435 level=LEVELNO_LOG_LEVELS.get(levelno, LogLevel.INFO), 436 time=datetime.datetime.fromtimestamp( 437 created, datetime.timezone.utc 438 ), 439 labels=labels, 440 ) 441 ) 442 except Exception: 443 import traceback 444 445 traceback.print_exc(file=self._echofile) 446 447 def file_write(self, name: str, output: str) -> None: 448 """Send raw stdout/stderr output to the logger to be collated.""" 449 450 self._event_loop.call_soon_threadsafe( 451 tpartial(self._file_write_in_thread, name, output) 452 ) 453 454 def _file_write_in_thread(self, name: str, output: str) -> None: 455 try: 456 assert name in ('stdout', 'stderr') 457 458 # Here we try to be somewhat smart about breaking arbitrary 459 # print output into discrete log entries. 460 461 self._file_chunks[name].append(output) 462 463 # Individual parts of a print come across as separate writes, 464 # and the end of a print will be a standalone '\n' by default. 465 # Let's use that as a hint that we're likely at the end of 466 # a full print statement and ship what we've got. 467 if output == '\n': 468 self._ship_file_chunks(name, cancel_ship_task=True) 469 else: 470 # By default just keep adding chunks. 471 # However we keep a timer running anytime we've got 472 # unshipped chunks so that we can ship what we've got 473 # after a short bit if we never get a newline. 474 ship_task = self._file_chunk_ship_task[name] 475 if ship_task is None: 476 self._file_chunk_ship_task[ 477 name 478 ] = self._event_loop.create_task( 479 self._ship_chunks_task(name), 480 name='log ship file chunks', 481 ) 482 483 except Exception: 484 import traceback 485 486 traceback.print_exc(file=self._echofile) 487 488 def file_flush(self, name: str) -> None: 489 """Send raw stdout/stderr flush to the logger to be collated.""" 490 491 self._event_loop.call_soon_threadsafe( 492 tpartial(self._file_flush_in_thread, name) 493 ) 494 495 def _file_flush_in_thread(self, name: str) -> None: 496 try: 497 assert name in ('stdout', 'stderr') 498 499 # Immediately ship whatever chunks we've got. 500 if self._file_chunks[name]: 501 self._ship_file_chunks(name, cancel_ship_task=True) 502 503 except Exception: 504 import traceback 505 506 traceback.print_exc(file=self._echofile) 507 508 async def _ship_chunks_task(self, name: str) -> None: 509 self._ship_file_chunks(name, cancel_ship_task=False) 510 511 def _ship_file_chunks(self, name: str, cancel_ship_task: bool) -> None: 512 # Note: Raw print input generally ends in a newline, but that is 513 # redundant when we break things into log entries and results 514 # in extra empty lines. So strip off a single trailing newline. 515 text = ''.join(self._file_chunks[name]).removesuffix('\n') 516 517 self._emit_entry( 518 LogEntry( 519 name=name, message=text, level=LogLevel.INFO, time=utc_now() 520 ) 521 ) 522 self._file_chunks[name] = [] 523 ship_task = self._file_chunk_ship_task[name] 524 if cancel_ship_task and ship_task is not None: 525 ship_task.cancel() 526 self._file_chunk_ship_task[name] = None 527 528 def _emit_entry(self, entry: LogEntry) -> None: 529 assert current_thread() is self._thread 530 531 # Store to our cache. 532 if self._cache_size_limit > 0: 533 with self._cache_lock: 534 # Do a rough calc of how many bytes this entry consumes. 535 entry_size = sum( 536 sys.getsizeof(x) 537 for x in ( 538 entry, 539 entry.name, 540 entry.message, 541 entry.level, 542 entry.time, 543 ) 544 ) 545 self._cache.append((entry_size, entry)) 546 self._cache_size += entry_size 547 548 # Prune old until we are back at or under our limit. 549 while self._cache_size > self._cache_size_limit: 550 popped = self._cache.popleft() 551 self._cache_size -= popped[0] 552 self._cache_index_offset += 1 553 554 # Pass to callbacks. 555 for call in self._callbacks: 556 self._run_callback_on_entry(call, entry) 557 558 # Dump to our structured log file. 559 # TODO: should set a timer for flushing; don't flush every line. 560 if self._file is not None: 561 entry_s = dataclass_to_json(entry) 562 assert '\n' not in entry_s # Make sure its a single line. 563 print(entry_s, file=self._file, flush=True) 564 565 def _run_callback_on_entry( 566 self, callback: Callable[[LogEntry], None], entry: LogEntry 567 ) -> None: 568 """Run a callback and handle any errors.""" 569 try: 570 callback(entry) 571 except Exception: 572 # Only print the first callback error to avoid insanity. 573 if not self._printed_callback_error: 574 import traceback 575 576 traceback.print_exc(file=self._echofile) 577 self._printed_callback_error = True
Fancy-pants handler for logging output.
Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.
128 def __init__( 129 self, 130 path: str | Path | None, 131 echofile: TextIO | None, 132 suppress_non_root_debug: bool, 133 cache_size_limit: int, 134 cache_time_limit: datetime.timedelta | None, 135 ): 136 super().__init__() 137 # pylint: disable=consider-using-with 138 self._file = None if path is None else open(path, 'w', encoding='utf-8') 139 self._echofile = echofile 140 self._callbacks: list[Callable[[LogEntry], None]] = [] 141 self._suppress_non_root_debug = suppress_non_root_debug 142 self._file_chunks: dict[str, list[str]] = {'stdout': [], 'stderr': []} 143 self._file_chunk_ship_task: dict[str, asyncio.Task | None] = { 144 'stdout': None, 145 'stderr': None, 146 } 147 self._cache_size = 0 148 assert cache_size_limit >= 0 149 self._cache_size_limit = cache_size_limit 150 self._cache_time_limit = cache_time_limit 151 self._cache = deque[tuple[int, LogEntry]]() 152 self._cache_index_offset = 0 153 self._cache_lock = Lock() 154 self._printed_callback_error = False 155 self._thread_bootstrapped = False 156 self._thread = Thread(target=self._log_thread_main, daemon=True) 157 if __debug__: 158 self._last_slow_emit_warning_time: float | None = None 159 self._thread.start() 160 161 # Spin until our thread is up and running; otherwise we could 162 # wind up trying to push stuff to our event loop before the 163 # loop exists. 164 while not self._thread_bootstrapped: 165 time.sleep(0.001)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
167 def add_callback( 168 self, call: Callable[[LogEntry], None], feed_existing_logs: bool = False 169 ) -> None: 170 """Add a callback to be run for each LogEntry. 171 172 Note that this callback will always run in a background thread. 173 Passing True for feed_existing_logs will cause all cached logs 174 in the handler to be fed to the callback (still in the 175 background thread though). 176 """ 177 178 # Kick this over to our bg thread to add the callback and 179 # process cached entries at the same time to ensure there are no 180 # race conditions that could cause entries to be skipped/etc. 181 self._event_loop.call_soon_threadsafe( 182 tpartial(self._add_callback_in_thread, call, feed_existing_logs) 183 )
Add a callback to be run for each LogEntry.
Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).
248 def get_cached( 249 self, start_index: int = 0, max_entries: int | None = None 250 ) -> LogArchive: 251 """Build and return an archive of cached log entries. 252 253 This will only include entries that have been processed by the 254 background thread, so may not include just-submitted logs or 255 entries for partially written stdout/stderr lines. 256 Entries from the range [start_index:start_index+max_entries] 257 which are still present in the cache will be returned. 258 """ 259 260 assert start_index >= 0 261 if max_entries is not None: 262 assert max_entries >= 0 263 with self._cache_lock: 264 # Transform start_index to our present cache space. 265 start_index -= self._cache_index_offset 266 # Calc end-index in our present cache space. 267 end_index = ( 268 len(self._cache) 269 if max_entries is None 270 else start_index + max_entries 271 ) 272 273 # Clamp both indexes to both ends of our present space. 274 start_index = max(0, min(start_index, len(self._cache))) 275 end_index = max(0, min(end_index, len(self._cache))) 276 277 return LogArchive( 278 log_size=self._cache_index_offset + len(self._cache), 279 start_index=start_index + self._cache_index_offset, 280 entries=self._cache_slice(start_index, end_index), 281 )
Build and return an archive of cached log entries.
This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.
306 def emit(self, record: logging.LogRecord) -> None: 307 # pylint: disable=too-many-branches 308 if __debug__: 309 starttime = time.monotonic() 310 311 # Called by logging to send us records. 312 313 # TODO - kill this. 314 if ( 315 self._suppress_non_root_debug 316 and record.name != 'root' 317 and record.levelname == 'DEBUG' 318 ): 319 return 320 321 # Optimization: if our log args are all simple immutable values, 322 # we can just kick the whole thing over to our background thread to 323 # be formatted there at our leisure. If anything is mutable and 324 # thus could possibly change between now and then or if we want 325 # to do immediate file echoing then we need to bite the bullet 326 # and do that stuff here at the call site. 327 fast_path = self._echofile is None and self._is_immutable_log_data( 328 record.args 329 ) 330 331 # Note: just assuming types are correct here, but they'll be 332 # checked properly when the resulting LogEntry gets exported. 333 labels: dict[str, str] | None = getattr(record, 'labels', None) 334 if labels is None: 335 labels = {} 336 337 if fast_path: 338 if __debug__: 339 formattime = echotime = time.monotonic() 340 self._event_loop.call_soon_threadsafe( 341 tpartial( 342 self._emit_in_thread, 343 record.name, 344 record.levelno, 345 record.created, 346 record, 347 labels, 348 ) 349 ) 350 else: 351 # Slow case; do formatting and echoing here at the log call 352 # site. 353 msg = self.format(record) 354 355 if __debug__: 356 formattime = time.monotonic() 357 358 # Also immediately print pretty colored output to our echo file 359 # (generally stderr). We do this part here instead of in our bg 360 # thread because the delay can throw off command line prompts or 361 # make tight debugging harder. 362 if self._echofile is not None: 363 ends = LEVELNO_COLOR_CODES.get(record.levelno) 364 namepre = f'{Clr.WHT}{record.name}:{Clr.RST} ' 365 if ends is not None: 366 self._echofile.write(f'{namepre}{ends[0]}{msg}{ends[1]}\n') 367 else: 368 self._echofile.write(f'{namepre}{msg}\n') 369 self._echofile.flush() 370 371 if __debug__: 372 echotime = time.monotonic() 373 374 self._event_loop.call_soon_threadsafe( 375 tpartial( 376 self._emit_in_thread, 377 record.name, 378 record.levelno, 379 record.created, 380 msg, 381 labels, 382 ) 383 ) 384 385 if __debug__: 386 # Make noise if we're taking a significant amount of time here. 387 # Limit the noise to once every so often though; otherwise we 388 # could get a feedback loop where every log emit results in a 389 # warning log which results in another, etc. 390 now = time.monotonic() 391 # noinspection PyUnboundLocalVariable 392 duration = now - starttime # pyright: ignore 393 # noinspection PyUnboundLocalVariable 394 format_duration = formattime - starttime # pyright: ignore 395 # noinspection PyUnboundLocalVariable 396 echo_duration = echotime - formattime # pyright: ignore 397 if duration > 0.05 and ( 398 self._last_slow_emit_warning_time is None 399 or now > self._last_slow_emit_warning_time + 10.0 400 ): 401 # Logging calls from *within* a logging handler 402 # sounds sketchy, so let's just kick this over to 403 # the bg event loop thread we've already got. 404 self._last_slow_emit_warning_time = now 405 self._event_loop.call_soon_threadsafe( 406 tpartial( 407 logging.warning, 408 'efro.log.LogHandler emit took too long' 409 ' (%.2fs total; %.2fs format, %.2fs echo,' 410 ' fast_path=%s).', 411 duration, 412 format_duration, 413 echo_duration, 414 fast_path, 415 ) 416 )
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
447 def file_write(self, name: str, output: str) -> None: 448 """Send raw stdout/stderr output to the logger to be collated.""" 449 450 self._event_loop.call_soon_threadsafe( 451 tpartial(self._file_write_in_thread, name, output) 452 )
Send raw stdout/stderr output to the logger to be collated.
488 def file_flush(self, name: str) -> None: 489 """Send raw stdout/stderr flush to the logger to be collated.""" 490 491 self._event_loop.call_soon_threadsafe( 492 tpartial(self._file_flush_in_thread, name) 493 )
Send raw stdout/stderr flush to the logger to be collated.
Inherited Members
- logging.Handler
- level
- formatter
- get_name
- set_name
- name
- createLock
- acquire
- release
- setLevel
- format
- handle
- setFormatter
- flush
- close
- handleError
- logging.Filterer
- filters
- addFilter
- removeFilter
- filter
580class FileLogEcho: 581 """A file-like object for forwarding stdout/stderr to a LogHandler.""" 582 583 def __init__( 584 self, original: TextIO, name: str, handler: LogHandler 585 ) -> None: 586 assert name in ('stdout', 'stderr') 587 self._original = original 588 self._name = name 589 self._handler = handler 590 591 def write(self, output: Any) -> None: 592 """Override standard write call.""" 593 self._original.write(output) 594 self._handler.file_write(self._name, output) 595 596 def flush(self) -> None: 597 """Flush the file.""" 598 self._original.flush() 599 600 # We also use this as a hint to ship whatever file chunks 601 # we've accumulated (we have to try and be smart about breaking 602 # our arbitrary file output into discrete entries). 603 self._handler.file_flush(self._name) 604 605 def isatty(self) -> bool: 606 """Are we a terminal?""" 607 return self._original.isatty()
A file-like object for forwarding stdout/stderr to a LogHandler.
591 def write(self, output: Any) -> None: 592 """Override standard write call.""" 593 self._original.write(output) 594 self._handler.file_write(self._name, output)
Override standard write call.
596 def flush(self) -> None: 597 """Flush the file.""" 598 self._original.flush() 599 600 # We also use this as a hint to ship whatever file chunks 601 # we've accumulated (we have to try and be smart about breaking 602 # our arbitrary file output into discrete entries). 603 self._handler.file_flush(self._name)
Flush the file.
610def setup_logging( 611 log_path: str | Path | None, 612 level: LogLevel, 613 suppress_non_root_debug: bool = False, 614 log_stdout_stderr: bool = False, 615 echo_to_stderr: bool = True, 616 cache_size_limit: int = 0, 617 cache_time_limit: datetime.timedelta | None = None, 618) -> LogHandler: 619 """Set up our logging environment. 620 621 Returns the custom handler which can be used to fetch information 622 about logs that have passed through it. (worst log-levels, caches, etc.). 623 """ 624 625 lmap = { 626 LogLevel.DEBUG: logging.DEBUG, 627 LogLevel.INFO: logging.INFO, 628 LogLevel.WARNING: logging.WARNING, 629 LogLevel.ERROR: logging.ERROR, 630 LogLevel.CRITICAL: logging.CRITICAL, 631 } 632 633 # Wire logger output to go to a structured log file. 634 # Also echo it to stderr IF we're running in a terminal. 635 # UPDATE: Actually gonna always go to stderr. Is there a 636 # reason we shouldn't? This makes debugging possible if all 637 # we have is access to a non-interactive terminal or file dump. 638 # We could add a '--quiet' arg or whatnot to change this behavior. 639 640 # Note: by passing in the *original* stderr here before we 641 # (potentially) replace it, we ensure that our log echos 642 # won't themselves be intercepted and sent to the logger 643 # which would create an infinite loop. 644 loghandler = LogHandler( 645 log_path, 646 echofile=sys.stderr if echo_to_stderr else None, 647 suppress_non_root_debug=suppress_non_root_debug, 648 cache_size_limit=cache_size_limit, 649 cache_time_limit=cache_time_limit, 650 ) 651 652 # Note: going ahead with force=True here so that we replace any 653 # existing logger. Though we warn if it looks like we are doing 654 # that so we can try to avoid creating the first one. 655 had_previous_handlers = bool(logging.root.handlers) 656 logging.basicConfig( 657 level=lmap[level], 658 # format='%(name)s: %(message)s', 659 # We dump *only* the message here. We pass various log record bits 660 # around and format things fancier where they end up. 661 format='%(message)s', 662 handlers=[loghandler], 663 force=True, 664 ) 665 if had_previous_handlers: 666 logging.warning( 667 'setup_logging: Replacing existing handlers.' 668 ' Something may have logged before expected.' 669 ) 670 671 # Optionally intercept Python's stdout/stderr output and generate 672 # log entries from it. 673 if log_stdout_stderr: 674 sys.stdout = FileLogEcho( # type: ignore 675 sys.stdout, 'stdout', loghandler 676 ) 677 sys.stderr = FileLogEcho( # type: ignore 678 sys.stderr, 'stderr', loghandler 679 ) 680 681 return loghandler
Set up our logging environment.
Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).