efro.util
Small handy bits of functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Small handy bits of functionality.""" 4 5from __future__ import annotations 6 7import os 8import time 9import weakref 10import datetime 11from enum import Enum 12from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload 13 14if TYPE_CHECKING: 15 import asyncio 16 from typing import Any, Callable, Literal 17 18T = TypeVar('T') 19ValT = TypeVar('ValT') 20ArgT = TypeVar('ArgT') 21SelfT = TypeVar('SelfT') 22RetT = TypeVar('RetT') 23EnumT = TypeVar('EnumT', bound=Enum) 24 25 26class _EmptyObj: 27 pass 28 29 30# A dead weak-ref should be immutable, right? So we can create exactly 31# one and return it for all cases that need an empty weak-ref. 32_g_empty_weak_ref = weakref.ref(_EmptyObj()) 33assert _g_empty_weak_ref() is None 34 35 36def explicit_bool(val: bool) -> bool: 37 """Return a non-inferable boolean value. 38 39 Useful to be able to disable blocks of code without type checkers 40 complaining/etc. 41 """ 42 # pylint: disable=no-else-return 43 if TYPE_CHECKING: 44 # infer this! <boom> 45 import random 46 47 return random.random() < 0.5 48 else: 49 return val 50 51 52def snake_case_to_title(val: str) -> str: 53 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 54 # Kill empty words resulting from leading/trailing/multiple underscores. 55 return ' '.join(w for w in val.split('_') if w).title() 56 57 58def snake_case_to_camel_case(val: str) -> str: 59 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 60 # Replace underscores with spaces; capitalize words; kill spaces. 61 # Not sure about efficiency, but logically simple. 62 return val.replace('_', ' ').title().replace(' ', '') 63 64 65def check_utc(value: datetime.datetime) -> None: 66 """Ensure a datetime value is timezone-aware utc.""" 67 if value.tzinfo is not datetime.UTC: 68 raise ValueError( 69 'datetime value does not have timezone set as datetime.UTC' 70 ) 71 72 73def utc_now() -> datetime.datetime: 74 """Get timezone-aware current utc time. 75 76 Just a shortcut for datetime.datetime.now(datetime.UTC). 77 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 78 times. 79 """ 80 return datetime.datetime.now(datetime.UTC) 81 82 83def utc_now_naive() -> datetime.datetime: 84 """Get naive utc time. 85 86 This can be used to replace datetime.utcnow(), which is now deprecated. 87 Most all code should migrate to use timezone-aware times instead of 88 this. 89 """ 90 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None) 91 92 93def utc_today() -> datetime.datetime: 94 """Get offset-aware midnight in the utc time zone.""" 95 now = datetime.datetime.now(datetime.UTC) 96 return datetime.datetime( 97 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 98 ) 99 100 101def utc_this_hour() -> datetime.datetime: 102 """Get offset-aware beginning of the current hour in the utc time zone.""" 103 now = datetime.datetime.now(datetime.UTC) 104 return datetime.datetime( 105 year=now.year, 106 month=now.month, 107 day=now.day, 108 hour=now.hour, 109 tzinfo=now.tzinfo, 110 ) 111 112 113def utc_this_minute() -> datetime.datetime: 114 """Get offset-aware beginning of current minute in the utc time zone.""" 115 now = datetime.datetime.now(datetime.UTC) 116 return datetime.datetime( 117 year=now.year, 118 month=now.month, 119 day=now.day, 120 hour=now.hour, 121 minute=now.minute, 122 tzinfo=now.tzinfo, 123 ) 124 125 126def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 127 """Return an invalidated weak-reference for the specified type.""" 128 # At runtime, all weakrefs are the same; our type arg is just 129 # for the static type checker. 130 del objtype # Unused. 131 132 # Just create an object and let it die. Is there a cleaner way to do this? 133 # return weakref.ref(_EmptyObj()) # type: ignore 134 135 # Sharing a single ones seems at least a bit better. 136 return _g_empty_weak_ref # type: ignore 137 138 139def data_size_str(bytecount: int, compact: bool = False) -> str: 140 """Given a size in bytes, returns a short human readable string. 141 142 In compact mode this should be 6 or fewer chars for most all 143 sane file sizes. 144 """ 145 # pylint: disable=too-many-return-statements 146 147 # Special case: handle negatives. 148 if bytecount < 0: 149 val = data_size_str(-bytecount, compact=compact) 150 return f'-{val}' 151 152 if bytecount <= 999: 153 suffix = 'B' if compact else 'bytes' 154 return f'{bytecount} {suffix}' 155 kbytecount = bytecount / 1024 156 if round(kbytecount, 1) < 10.0: 157 return f'{kbytecount:.1f} KB' 158 if round(kbytecount, 0) < 999: 159 return f'{kbytecount:.0f} KB' 160 mbytecount = bytecount / (1024 * 1024) 161 if round(mbytecount, 1) < 10.0: 162 return f'{mbytecount:.1f} MB' 163 if round(mbytecount, 0) < 999: 164 return f'{mbytecount:.0f} MB' 165 gbytecount = bytecount / (1024 * 1024 * 1024) 166 if round(gbytecount, 1) < 10.0: 167 return f'{gbytecount:.1f} GB' 168 return f'{gbytecount:.0f} GB' 169 170 171class DirtyBit: 172 """Manages whether a thing is dirty and regulates attempts to clean it. 173 174 To use, simply set the 'dirty' value on this object to True when some 175 action is needed, and then check the 'should_update' value to regulate 176 when attempts to clean it should be made. Set 'dirty' back to False after 177 a successful update. 178 If 'use_lock' is True, an asyncio Lock will be created and incorporated 179 into update attempts to prevent simultaneous updates (should_update will 180 only return True when the lock is unlocked). Note that It is up to the user 181 to lock/unlock the lock during the actual update attempt. 182 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 183 itself back to dirty after being clean for the given amount of time. 184 'min_update_interval' can be used to enforce a minimum update 185 interval even when updates are successful (retry_interval only applies 186 when updates fail) 187 """ 188 189 def __init__( 190 self, 191 dirty: bool = False, 192 retry_interval: float = 5.0, 193 use_lock: bool = False, 194 auto_dirty_seconds: float | None = None, 195 min_update_interval: float | None = None, 196 ): 197 curtime = time.monotonic() 198 self._retry_interval = retry_interval 199 self._auto_dirty_seconds = auto_dirty_seconds 200 self._min_update_interval = min_update_interval 201 self._dirty = dirty 202 self._next_update_time: float | None = curtime if dirty else None 203 self._last_update_time: float | None = None 204 self._next_auto_dirty_time: float | None = ( 205 (curtime + self._auto_dirty_seconds) 206 if (not dirty and self._auto_dirty_seconds is not None) 207 else None 208 ) 209 self._use_lock = use_lock 210 self.lock: asyncio.Lock 211 if self._use_lock: 212 import asyncio 213 214 self.lock = asyncio.Lock() 215 216 @property 217 def dirty(self) -> bool: 218 """Whether the target is currently dirty. 219 220 This should be set to False once an update is successful. 221 """ 222 return self._dirty 223 224 @dirty.setter 225 def dirty(self, value: bool) -> None: 226 # If we're freshly clean, set our next auto-dirty time (if we have 227 # one). 228 if self._dirty and not value and self._auto_dirty_seconds is not None: 229 self._next_auto_dirty_time = ( 230 time.monotonic() + self._auto_dirty_seconds 231 ) 232 233 # If we're freshly dirty, schedule an immediate update. 234 if not self._dirty and value: 235 self._next_update_time = time.monotonic() 236 237 # If they want to enforce a minimum update interval, 238 # push out the next update time if it hasn't been long enough. 239 if ( 240 self._min_update_interval is not None 241 and self._last_update_time is not None 242 ): 243 self._next_update_time = max( 244 self._next_update_time, 245 self._last_update_time + self._min_update_interval, 246 ) 247 248 self._dirty = value 249 250 @property 251 def should_update(self) -> bool: 252 """Whether an attempt should be made to clean the target now. 253 254 Always returns False if the target is not dirty. 255 Takes into account the amount of time passed since the target 256 was marked dirty or since should_update last returned True. 257 """ 258 curtime = time.monotonic() 259 260 # Auto-dirty ourself if we're into that. 261 if ( 262 self._next_auto_dirty_time is not None 263 and curtime > self._next_auto_dirty_time 264 ): 265 self.dirty = True 266 self._next_auto_dirty_time = None 267 if not self._dirty: 268 return False 269 if self._use_lock and self.lock.locked(): 270 return False 271 assert self._next_update_time is not None 272 if curtime > self._next_update_time: 273 self._next_update_time = curtime + self._retry_interval 274 self._last_update_time = curtime 275 return True 276 return False 277 278 279class DispatchMethodWrapper(Generic[ArgT, RetT]): 280 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 281 282 def __call__(self, arg: ArgT) -> RetT: 283 raise RuntimeError('Should not get here') 284 285 @staticmethod 286 def register( 287 func: Callable[[Any, Any], RetT] 288 ) -> Callable[[Any, Any], RetT]: 289 """Register a new dispatch handler for this dispatch-method.""" 290 raise RuntimeError('Should not get here') 291 292 registry: dict[Any, Callable] 293 294 295# noinspection PyProtectedMember,PyTypeHints 296def dispatchmethod( 297 func: Callable[[Any, ArgT], RetT] 298) -> DispatchMethodWrapper[ArgT, RetT]: 299 """A variation of functools.singledispatch for methods. 300 301 Note: as of Python 3.9 there is now functools.singledispatchmethod, 302 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 303 which gives us a reason to keep this one around for now. 304 """ 305 from functools import singledispatch, update_wrapper 306 307 origwrapper: Any = singledispatch(func) 308 309 # Pull this out so hopefully origwrapper can die, 310 # otherwise we reference origwrapper in our wrapper. 311 dispatch = origwrapper.dispatch 312 313 # All we do here is recreate the end of functools.singledispatch 314 # where it returns a wrapper except instead of the wrapper using the 315 # first arg to the function ours uses the second (to skip 'self'). 316 # This was made against Python 3.7; we should probably check up on 317 # this in later versions in case anything has changed. 318 # (or hopefully they'll add this functionality to their version) 319 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 320 def wrapper(*args: Any, **kw: Any) -> Any: 321 if not args or len(args) < 2: 322 raise TypeError( 323 f'{funcname} requires at least ' '2 positional arguments' 324 ) 325 326 return dispatch(args[1].__class__)(*args, **kw) 327 328 funcname = getattr(func, '__name__', 'dispatchmethod method') 329 wrapper.register = origwrapper.register # type: ignore 330 wrapper.dispatch = dispatch # type: ignore 331 wrapper.registry = origwrapper.registry # type: ignore 332 # pylint: disable=protected-access 333 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 334 update_wrapper(wrapper, func) 335 # pylint: enable=protected-access 336 return cast(DispatchMethodWrapper, wrapper) 337 338 339def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 340 """Decorator for functions to allow dispatching based on a value. 341 342 This differs from functools.singledispatch in that it dispatches based 343 on the value of an argument, not based on its type. 344 The 'register' method of a value-dispatch function can be used 345 to assign new functions to handle particular values. 346 Unhandled values wind up in the original dispatch function.""" 347 return ValueDispatcher(call) 348 349 350class ValueDispatcher(Generic[ValT, RetT]): 351 """Used by the valuedispatch decorator""" 352 353 def __init__(self, call: Callable[[ValT], RetT]) -> None: 354 self._base_call = call 355 self._handlers: dict[ValT, Callable[[], RetT]] = {} 356 357 def __call__(self, value: ValT) -> RetT: 358 handler = self._handlers.get(value) 359 if handler is not None: 360 return handler() 361 return self._base_call(value) 362 363 def _add_handler( 364 self, value: ValT, call: Callable[[], RetT] 365 ) -> Callable[[], RetT]: 366 if value in self._handlers: 367 raise RuntimeError(f'Duplicate handlers added for {value}') 368 self._handlers[value] = call 369 return call 370 371 def register( 372 self, value: ValT 373 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 374 """Add a handler to the dispatcher.""" 375 from functools import partial 376 377 return partial(self._add_handler, value) 378 379 380def valuedispatch1arg( 381 call: Callable[[ValT, ArgT], RetT] 382) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 383 """Like valuedispatch but for functions taking an extra argument.""" 384 return ValueDispatcher1Arg(call) 385 386 387class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 388 """Used by the valuedispatch1arg decorator""" 389 390 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 391 self._base_call = call 392 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 393 394 def __call__(self, value: ValT, arg: ArgT) -> RetT: 395 handler = self._handlers.get(value) 396 if handler is not None: 397 return handler(arg) 398 return self._base_call(value, arg) 399 400 def _add_handler( 401 self, value: ValT, call: Callable[[ArgT], RetT] 402 ) -> Callable[[ArgT], RetT]: 403 if value in self._handlers: 404 raise RuntimeError(f'Duplicate handlers added for {value}') 405 self._handlers[value] = call 406 return call 407 408 def register( 409 self, value: ValT 410 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 411 """Add a handler to the dispatcher.""" 412 from functools import partial 413 414 return partial(self._add_handler, value) 415 416 417if TYPE_CHECKING: 418 419 class ValueDispatcherMethod(Generic[ValT, RetT]): 420 """Used by the valuedispatchmethod decorator.""" 421 422 def __call__(self, value: ValT) -> RetT: ... 423 424 def register( 425 self, value: ValT 426 ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]: 427 """Add a handler to the dispatcher.""" 428 ... 429 430 431def valuedispatchmethod( 432 call: Callable[[SelfT, ValT], RetT] 433) -> ValueDispatcherMethod[ValT, RetT]: 434 """Like valuedispatch but works with methods instead of functions.""" 435 436 # NOTE: It seems that to wrap a method with a decorator and have self 437 # dispatching do the right thing, we must return a function and not 438 # an executable object. So for this version we store our data here 439 # in the function call dict and simply return a call. 440 441 _base_call = call 442 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 443 444 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 445 if value in _handlers: 446 raise RuntimeError(f'Duplicate handlers added for {value}') 447 _handlers[value] = addcall 448 449 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 450 from functools import partial 451 452 return partial(_add_handler, value) 453 454 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 455 handler = _handlers.get(value) 456 if handler is not None: 457 return handler(self) 458 return _base_call(self, value) 459 460 # We still want to use our returned object to register handlers, but we're 461 # actually just returning a function. So manually stuff the call onto it. 462 setattr(_call_wrapper, 'register', _register) 463 464 # To the type checker's eyes we return a ValueDispatchMethod instance; 465 # this lets it know about our register func and type-check its usage. 466 # In reality we just return a raw function call (for reasons listed above). 467 # pylint: disable=undefined-variable, no-else-return 468 if TYPE_CHECKING: 469 return ValueDispatcherMethod[ValT, RetT]() 470 else: 471 return _call_wrapper 472 473 474def make_hash(obj: Any) -> int: 475 """Makes a hash from a dictionary, list, tuple or set to any level, 476 that contains only other hashable types (including any lists, tuples, 477 sets, and dictionaries). 478 479 Note that this uses Python's hash() function internally so collisions/etc. 480 may be more common than with fancy cryptographic hashes. 481 482 Also be aware that Python's hash() output varies across processes, so 483 this should only be used for values that will remain in a single process. 484 """ 485 import copy 486 487 if isinstance(obj, (set, tuple, list)): 488 return hash(tuple(make_hash(e) for e in obj)) 489 if not isinstance(obj, dict): 490 return hash(obj) 491 492 new_obj = copy.deepcopy(obj) 493 for k, v in new_obj.items(): 494 new_obj[k] = make_hash(v) 495 496 # NOTE: there is sorted works correctly because it compares only 497 # unique first values (i.e. dict keys) 498 return hash(tuple(frozenset(sorted(new_obj.items())))) 499 500 501def float_hash_from_string(s: str) -> float: 502 """Given a string value, returns a float between 0 and 1. 503 504 If consistent across processes. Can be useful for assigning db ids 505 shard values for efficient parallel processing. 506 """ 507 import hashlib 508 509 hash_bytes = hashlib.md5(s.encode()).digest() 510 511 # Generate a random 64 bit int from hash digest bytes. 512 ival = int.from_bytes(hash_bytes[:8]) 513 return ival / ((1 << 64) - 1) 514 515 516def asserttype(obj: Any, typ: type[T]) -> T: 517 """Return an object typed as a given type. 518 519 Assert is used to check its actual type, so only use this when 520 failures are not expected. Otherwise use checktype. 521 """ 522 assert isinstance(typ, type), 'only actual types accepted' 523 assert isinstance(obj, typ) 524 return obj 525 526 527def asserttype_o(obj: Any, typ: type[T]) -> T | None: 528 """Return an object typed as a given optional type. 529 530 Assert is used to check its actual type, so only use this when 531 failures are not expected. Otherwise use checktype. 532 """ 533 assert isinstance(typ, type), 'only actual types accepted' 534 assert isinstance(obj, (typ, type(None))) 535 return obj 536 537 538def checktype(obj: Any, typ: type[T]) -> T: 539 """Return an object typed as a given type. 540 541 Always checks the type at runtime with isinstance and throws a TypeError 542 on failure. Use asserttype for more efficient (but less safe) equivalent. 543 """ 544 assert isinstance(typ, type), 'only actual types accepted' 545 if not isinstance(obj, typ): 546 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 547 return obj 548 549 550def checktype_o(obj: Any, typ: type[T]) -> T | None: 551 """Return an object typed as a given optional type. 552 553 Always checks the type at runtime with isinstance and throws a TypeError 554 on failure. Use asserttype for more efficient (but less safe) equivalent. 555 """ 556 assert isinstance(typ, type), 'only actual types accepted' 557 if not isinstance(obj, (typ, type(None))): 558 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 559 return obj 560 561 562def warntype(obj: Any, typ: type[T]) -> T: 563 """Return an object typed as a given type. 564 565 Always checks the type at runtime and simply logs a warning if it is 566 not what is expected. 567 """ 568 assert isinstance(typ, type), 'only actual types accepted' 569 if not isinstance(obj, typ): 570 import logging 571 572 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 573 return obj # type: ignore 574 575 576def warntype_o(obj: Any, typ: type[T]) -> T | None: 577 """Return an object typed as a given type. 578 579 Always checks the type at runtime and simply logs a warning if it is 580 not what is expected. 581 """ 582 assert isinstance(typ, type), 'only actual types accepted' 583 if not isinstance(obj, (typ, type(None))): 584 import logging 585 586 logging.warning( 587 'warntype: expected a %s or None, got a %s', typ, type(obj) 588 ) 589 return obj # type: ignore 590 591 592def assert_non_optional(obj: T | None) -> T: 593 """Return an object with Optional typing removed. 594 595 Assert is used to check its actual type, so only use this when 596 failures are not expected. Use check_non_optional otherwise. 597 """ 598 assert obj is not None 599 return obj 600 601 602def check_non_optional(obj: T | None) -> T: 603 """Return an object with Optional typing removed. 604 605 Always checks the actual type and throws a TypeError on failure. 606 Use assert_non_optional for a more efficient (but less safe) equivalent. 607 """ 608 if obj is None: 609 raise ValueError('Got None value in check_non_optional.') 610 return obj 611 612 613def smoothstep(edge0: float, edge1: float, x: float) -> float: 614 """A smooth transition function. 615 616 Returns a value that smoothly moves from 0 to 1 as we go between edges. 617 Values outside of the range return 0 or 1. 618 """ 619 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 620 return y * y * (3.0 - 2.0 * y) 621 622 623def linearstep(edge0: float, edge1: float, x: float) -> float: 624 """A linear transition function. 625 626 Returns a value that linearly moves from 0 to 1 as we go between edges. 627 Values outside of the range return 0 or 1. 628 """ 629 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) 630 631 632def _compact_id(num: int, chars: str) -> str: 633 if num < 0: 634 raise ValueError('Negative integers not allowed.') 635 636 # Chars must be in sorted order for sorting to work correctly 637 # on our output. 638 assert ''.join(sorted(list(chars))) == chars 639 640 base = len(chars) 641 out = '' 642 while num: 643 out += chars[num % base] 644 num //= base 645 return out[::-1] or '0' 646 647 648def human_readable_compact_id(num: int) -> str: 649 """Given a positive int, return a compact string representation for it. 650 651 Handy for visualizing unique numeric ids using as few as possible chars. 652 This representation uses only lowercase letters and numbers (minus the 653 following letters for readability): 654 's' is excluded due to similarity to '5'. 655 'l' is excluded due to similarity to '1'. 656 'i' is excluded due to similarity to '1'. 657 'o' is excluded due to similarity to '0'. 658 'z' is excluded due to similarity to '2'. 659 660 Therefore for n chars this can store values of 21^n. 661 662 When reading human input consisting of these IDs, it may be desirable 663 to map the disallowed chars to their corresponding allowed ones 664 ('o' -> '0', etc). 665 666 Sort order for these ids is the same as the original numbers. 667 668 If more compactness is desired at the expense of readability, 669 use compact_id() instead. 670 """ 671 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy') 672 673 674def compact_id(num: int) -> str: 675 """Given a positive int, return a compact string representation for it. 676 677 Handy for visualizing unique numeric ids using as few as possible chars. 678 This version is more compact than human_readable_compact_id() but less 679 friendly to humans due to using both capital and lowercase letters, 680 both 'O' and '0', etc. 681 682 Therefore for n chars this can store values of 62^n. 683 684 Sort order for these ids is the same as the original numbers. 685 """ 686 return _compact_id( 687 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 688 ) 689 690 691def caller_source_location() -> str: 692 """Returns source file name and line of the code calling us. 693 694 Example: 'mymodule.py:23' 695 """ 696 try: 697 import inspect 698 699 frame = inspect.currentframe() 700 for _i in range(2): 701 if frame is None: 702 raise RuntimeError() 703 frame = frame.f_back 704 if frame is None: 705 raise RuntimeError() 706 fname = os.path.basename(frame.f_code.co_filename) 707 return f'{fname}:{frame.f_lineno}' 708 except Exception: 709 return '<unknown source location>' 710 711 712def unchanging_hostname() -> str: 713 """Return an unchanging name for the local device. 714 715 Similar to the `hostname` call (or os.uname().nodename in Python) 716 except attempts to give a name that doesn't change depending on 717 network conditions. (A Mac will tend to go from Foo to Foo.local, 718 Foo.lan etc. throughout its various adventures) 719 """ 720 import platform 721 import subprocess 722 723 # On Mac, this should give the computer name assigned in System Prefs. 724 if platform.system() == 'Darwin': 725 return ( 726 subprocess.run( 727 ['scutil', '--get', 'ComputerName'], 728 check=True, 729 capture_output=True, 730 ) 731 .stdout.decode() 732 .strip() 733 .replace(' ', '-') 734 ) 735 return os.uname().nodename 736 737 738def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 739 """Do the thing.""" 740 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 741 return 742 743 modulename = module_globals.get('__name__') 744 if not isinstance(modulename, str): 745 raise RuntimeError('Unable to get module name.') 746 assert not modulename.startswith('_') 747 modulename_prefix = f'{modulename}.' 748 modulename_prefix_2 = f'_{modulename}.' 749 750 for name, obj in module_globals.items(): 751 if name.startswith('_'): 752 continue 753 existing = getattr(obj, '__module__', None) 754 try: 755 # Override the module ONLY if it lives under us somewhere. 756 # So ourpackage._submodule.Foo becomes ourpackage.Foo 757 # but otherpackage._submodule.Foo remains untouched. 758 if existing is not None and ( 759 existing.startswith(modulename_prefix) 760 or existing.startswith(modulename_prefix_2) 761 ): 762 obj.__module__ = modulename 763 except Exception: 764 import logging 765 766 logging.warning( 767 'set_canonical_module_names: unable to change __module__' 768 " from '%s' to '%s' on %s object at '%s'.", 769 existing, 770 modulename, 771 type(obj), 772 name, 773 ) 774 775 776def timedelta_str( 777 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 778) -> str: 779 """Return a simple human readable time string for a length of time. 780 781 Time can be given as a timedelta or a float representing seconds. 782 Example output: 783 "23d 1h 2m 32s" (with maxparts == 4) 784 "23d 1h" (with maxparts == 2) 785 "23d 1.08h" (with maxparts == 2 and decimals == 2) 786 787 Note that this is hard-coded in English and probably not especially 788 performant. 789 """ 790 # pylint: disable=too-many-locals 791 792 if isinstance(timeval, float): 793 timevalfin = datetime.timedelta(seconds=timeval) 794 else: 795 timevalfin = timeval 796 797 # Internally we only handle positive values. 798 if timevalfin.total_seconds() < 0: 799 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 800 801 years = timevalfin.days // 365 802 days = timevalfin.days % 365 803 hours = timevalfin.seconds // 3600 804 hour_remainder = timevalfin.seconds % 3600 805 minutes = hour_remainder // 60 806 seconds = hour_remainder % 60 807 808 # Now, if we want decimal places for our last value, 809 # calc fractional parts. 810 if decimals: 811 # Calc totals of each type. 812 t_seconds = timevalfin.total_seconds() 813 t_minutes = t_seconds / 60 814 t_hours = t_minutes / 60 815 t_days = t_hours / 24 816 t_years = t_days / 365 817 818 # Calc fractional parts that exclude all whole values to their left. 819 years_covered = years 820 years_f = t_years - years_covered 821 days_covered = years_covered * 365 + days 822 days_f = t_days - days_covered 823 hours_covered = days_covered * 24 + hours 824 hours_f = t_hours - hours_covered 825 minutes_covered = hours_covered * 60 + minutes 826 minutes_f = t_minutes - minutes_covered 827 seconds_covered = minutes_covered * 60 + seconds 828 seconds_f = t_seconds - seconds_covered 829 else: 830 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 831 832 parts: list[str] = [] 833 for part, part_f, suffix in ( 834 (years, years_f, 'y'), 835 (days, days_f, 'd'), 836 (hours, hours_f, 'h'), 837 (minutes, minutes_f, 'm'), 838 (seconds, seconds_f, 's'), 839 ): 840 if part or parts or (not parts and suffix == 's'): 841 # Do decimal version only for the last part. 842 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 843 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 844 else: 845 parts.append(f'{part}{suffix}') 846 if len(parts) >= maxparts: 847 break 848 return ' '.join(parts) 849 850 851def ago_str( 852 timeval: datetime.datetime, 853 maxparts: int = 1, 854 now: datetime.datetime | None = None, 855 decimals: int = 0, 856) -> str: 857 """Given a datetime, return a clean human readable 'ago' str. 858 859 Note that this is hard-coded in English so should not be used 860 for visible in-game elements; only tools/etc. 861 862 If now is not passed, efro.util.utc_now() is used. 863 """ 864 if now is None: 865 now = utc_now() 866 return ( 867 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 868 + ' ago' 869 ) 870 871 872def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 873 """Split a single list into smaller lists.""" 874 return [ 875 input_list[i : i + max_length] 876 for i in range(0, len(input_list), max_length) 877 ] 878 879 880def extract_flag(args: list[str], name: str) -> bool: 881 """Given a list of args and a flag name, returns whether it is present. 882 883 The arg flag, if present, is removed from the arg list. 884 """ 885 from efro.error import CleanError 886 887 count = args.count(name) 888 if count > 1: 889 raise CleanError(f'Flag {name} passed multiple times.') 890 if not count: 891 return False 892 args.remove(name) 893 return True 894 895 896@overload 897def extract_arg( 898 args: list[str], name: str, required: Literal[False] = False 899) -> str | None: ... 900 901 902@overload 903def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ... 904 905 906def extract_arg( 907 args: list[str], name: str, required: bool = False 908) -> str | None: 909 """Given a list of args and an arg name, returns a value. 910 911 The arg flag and value are removed from the arg list. 912 raises CleanErrors on any problems. 913 """ 914 from efro.error import CleanError 915 916 count = args.count(name) 917 if not count: 918 if required: 919 raise CleanError(f'Required argument {name} not passed.') 920 return None 921 922 if count > 1: 923 raise CleanError(f'Arg {name} passed multiple times.') 924 925 argindex = args.index(name) 926 if argindex + 1 >= len(args): 927 raise CleanError(f'No value passed after {name} arg.') 928 929 val = args[argindex + 1] 930 del args[argindex : argindex + 2] 931 932 return val
37def explicit_bool(val: bool) -> bool: 38 """Return a non-inferable boolean value. 39 40 Useful to be able to disable blocks of code without type checkers 41 complaining/etc. 42 """ 43 # pylint: disable=no-else-return 44 if TYPE_CHECKING: 45 # infer this! <boom> 46 import random 47 48 return random.random() < 0.5 49 else: 50 return val
Return a non-inferable boolean value.
Useful to be able to disable blocks of code without type checkers complaining/etc.
53def snake_case_to_title(val: str) -> str: 54 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 55 # Kill empty words resulting from leading/trailing/multiple underscores. 56 return ' '.join(w for w in val.split('_') if w).title()
Given a snake-case string 'foo_bar', returns 'Foo Bar'.
59def snake_case_to_camel_case(val: str) -> str: 60 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 61 # Replace underscores with spaces; capitalize words; kill spaces. 62 # Not sure about efficiency, but logically simple. 63 return val.replace('_', ' ').title().replace(' ', '')
Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.
66def check_utc(value: datetime.datetime) -> None: 67 """Ensure a datetime value is timezone-aware utc.""" 68 if value.tzinfo is not datetime.UTC: 69 raise ValueError( 70 'datetime value does not have timezone set as datetime.UTC' 71 )
Ensure a datetime value is timezone-aware utc.
74def utc_now() -> datetime.datetime: 75 """Get timezone-aware current utc time. 76 77 Just a shortcut for datetime.datetime.now(datetime.UTC). 78 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 79 times. 80 """ 81 return datetime.datetime.now(datetime.UTC)
Get timezone-aware current utc time.
Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.
84def utc_now_naive() -> datetime.datetime: 85 """Get naive utc time. 86 87 This can be used to replace datetime.utcnow(), which is now deprecated. 88 Most all code should migrate to use timezone-aware times instead of 89 this. 90 """ 91 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
Get naive utc time.
This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.
94def utc_today() -> datetime.datetime: 95 """Get offset-aware midnight in the utc time zone.""" 96 now = datetime.datetime.now(datetime.UTC) 97 return datetime.datetime( 98 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 99 )
Get offset-aware midnight in the utc time zone.
102def utc_this_hour() -> datetime.datetime: 103 """Get offset-aware beginning of the current hour in the utc time zone.""" 104 now = datetime.datetime.now(datetime.UTC) 105 return datetime.datetime( 106 year=now.year, 107 month=now.month, 108 day=now.day, 109 hour=now.hour, 110 tzinfo=now.tzinfo, 111 )
Get offset-aware beginning of the current hour in the utc time zone.
114def utc_this_minute() -> datetime.datetime: 115 """Get offset-aware beginning of current minute in the utc time zone.""" 116 now = datetime.datetime.now(datetime.UTC) 117 return datetime.datetime( 118 year=now.year, 119 month=now.month, 120 day=now.day, 121 hour=now.hour, 122 minute=now.minute, 123 tzinfo=now.tzinfo, 124 )
Get offset-aware beginning of current minute in the utc time zone.
127def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 128 """Return an invalidated weak-reference for the specified type.""" 129 # At runtime, all weakrefs are the same; our type arg is just 130 # for the static type checker. 131 del objtype # Unused. 132 133 # Just create an object and let it die. Is there a cleaner way to do this? 134 # return weakref.ref(_EmptyObj()) # type: ignore 135 136 # Sharing a single ones seems at least a bit better. 137 return _g_empty_weak_ref # type: ignore
Return an invalidated weak-reference for the specified type.
140def data_size_str(bytecount: int, compact: bool = False) -> str: 141 """Given a size in bytes, returns a short human readable string. 142 143 In compact mode this should be 6 or fewer chars for most all 144 sane file sizes. 145 """ 146 # pylint: disable=too-many-return-statements 147 148 # Special case: handle negatives. 149 if bytecount < 0: 150 val = data_size_str(-bytecount, compact=compact) 151 return f'-{val}' 152 153 if bytecount <= 999: 154 suffix = 'B' if compact else 'bytes' 155 return f'{bytecount} {suffix}' 156 kbytecount = bytecount / 1024 157 if round(kbytecount, 1) < 10.0: 158 return f'{kbytecount:.1f} KB' 159 if round(kbytecount, 0) < 999: 160 return f'{kbytecount:.0f} KB' 161 mbytecount = bytecount / (1024 * 1024) 162 if round(mbytecount, 1) < 10.0: 163 return f'{mbytecount:.1f} MB' 164 if round(mbytecount, 0) < 999: 165 return f'{mbytecount:.0f} MB' 166 gbytecount = bytecount / (1024 * 1024 * 1024) 167 if round(gbytecount, 1) < 10.0: 168 return f'{gbytecount:.1f} GB' 169 return f'{gbytecount:.0f} GB'
Given a size in bytes, returns a short human readable string.
In compact mode this should be 6 or fewer chars for most all sane file sizes.
172class DirtyBit: 173 """Manages whether a thing is dirty and regulates attempts to clean it. 174 175 To use, simply set the 'dirty' value on this object to True when some 176 action is needed, and then check the 'should_update' value to regulate 177 when attempts to clean it should be made. Set 'dirty' back to False after 178 a successful update. 179 If 'use_lock' is True, an asyncio Lock will be created and incorporated 180 into update attempts to prevent simultaneous updates (should_update will 181 only return True when the lock is unlocked). Note that It is up to the user 182 to lock/unlock the lock during the actual update attempt. 183 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 184 itself back to dirty after being clean for the given amount of time. 185 'min_update_interval' can be used to enforce a minimum update 186 interval even when updates are successful (retry_interval only applies 187 when updates fail) 188 """ 189 190 def __init__( 191 self, 192 dirty: bool = False, 193 retry_interval: float = 5.0, 194 use_lock: bool = False, 195 auto_dirty_seconds: float | None = None, 196 min_update_interval: float | None = None, 197 ): 198 curtime = time.monotonic() 199 self._retry_interval = retry_interval 200 self._auto_dirty_seconds = auto_dirty_seconds 201 self._min_update_interval = min_update_interval 202 self._dirty = dirty 203 self._next_update_time: float | None = curtime if dirty else None 204 self._last_update_time: float | None = None 205 self._next_auto_dirty_time: float | None = ( 206 (curtime + self._auto_dirty_seconds) 207 if (not dirty and self._auto_dirty_seconds is not None) 208 else None 209 ) 210 self._use_lock = use_lock 211 self.lock: asyncio.Lock 212 if self._use_lock: 213 import asyncio 214 215 self.lock = asyncio.Lock() 216 217 @property 218 def dirty(self) -> bool: 219 """Whether the target is currently dirty. 220 221 This should be set to False once an update is successful. 222 """ 223 return self._dirty 224 225 @dirty.setter 226 def dirty(self, value: bool) -> None: 227 # If we're freshly clean, set our next auto-dirty time (if we have 228 # one). 229 if self._dirty and not value and self._auto_dirty_seconds is not None: 230 self._next_auto_dirty_time = ( 231 time.monotonic() + self._auto_dirty_seconds 232 ) 233 234 # If we're freshly dirty, schedule an immediate update. 235 if not self._dirty and value: 236 self._next_update_time = time.monotonic() 237 238 # If they want to enforce a minimum update interval, 239 # push out the next update time if it hasn't been long enough. 240 if ( 241 self._min_update_interval is not None 242 and self._last_update_time is not None 243 ): 244 self._next_update_time = max( 245 self._next_update_time, 246 self._last_update_time + self._min_update_interval, 247 ) 248 249 self._dirty = value 250 251 @property 252 def should_update(self) -> bool: 253 """Whether an attempt should be made to clean the target now. 254 255 Always returns False if the target is not dirty. 256 Takes into account the amount of time passed since the target 257 was marked dirty or since should_update last returned True. 258 """ 259 curtime = time.monotonic() 260 261 # Auto-dirty ourself if we're into that. 262 if ( 263 self._next_auto_dirty_time is not None 264 and curtime > self._next_auto_dirty_time 265 ): 266 self.dirty = True 267 self._next_auto_dirty_time = None 268 if not self._dirty: 269 return False 270 if self._use_lock and self.lock.locked(): 271 return False 272 assert self._next_update_time is not None 273 if curtime > self._next_update_time: 274 self._next_update_time = curtime + self._retry_interval 275 self._last_update_time = curtime 276 return True 277 return False
Manages whether a thing is dirty and regulates attempts to clean it.
To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
190 def __init__( 191 self, 192 dirty: bool = False, 193 retry_interval: float = 5.0, 194 use_lock: bool = False, 195 auto_dirty_seconds: float | None = None, 196 min_update_interval: float | None = None, 197 ): 198 curtime = time.monotonic() 199 self._retry_interval = retry_interval 200 self._auto_dirty_seconds = auto_dirty_seconds 201 self._min_update_interval = min_update_interval 202 self._dirty = dirty 203 self._next_update_time: float | None = curtime if dirty else None 204 self._last_update_time: float | None = None 205 self._next_auto_dirty_time: float | None = ( 206 (curtime + self._auto_dirty_seconds) 207 if (not dirty and self._auto_dirty_seconds is not None) 208 else None 209 ) 210 self._use_lock = use_lock 211 self.lock: asyncio.Lock 212 if self._use_lock: 213 import asyncio 214 215 self.lock = asyncio.Lock()
217 @property 218 def dirty(self) -> bool: 219 """Whether the target is currently dirty. 220 221 This should be set to False once an update is successful. 222 """ 223 return self._dirty
Whether the target is currently dirty.
This should be set to False once an update is successful.
251 @property 252 def should_update(self) -> bool: 253 """Whether an attempt should be made to clean the target now. 254 255 Always returns False if the target is not dirty. 256 Takes into account the amount of time passed since the target 257 was marked dirty or since should_update last returned True. 258 """ 259 curtime = time.monotonic() 260 261 # Auto-dirty ourself if we're into that. 262 if ( 263 self._next_auto_dirty_time is not None 264 and curtime > self._next_auto_dirty_time 265 ): 266 self.dirty = True 267 self._next_auto_dirty_time = None 268 if not self._dirty: 269 return False 270 if self._use_lock and self.lock.locked(): 271 return False 272 assert self._next_update_time is not None 273 if curtime > self._next_update_time: 274 self._next_update_time = curtime + self._retry_interval 275 self._last_update_time = curtime 276 return True 277 return False
Whether an attempt should be made to clean the target now.
Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.
280class DispatchMethodWrapper(Generic[ArgT, RetT]): 281 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 282 283 def __call__(self, arg: ArgT) -> RetT: 284 raise RuntimeError('Should not get here') 285 286 @staticmethod 287 def register( 288 func: Callable[[Any, Any], RetT] 289 ) -> Callable[[Any, Any], RetT]: 290 """Register a new dispatch handler for this dispatch-method.""" 291 raise RuntimeError('Should not get here') 292 293 registry: dict[Any, Callable]
Type-aware standin for the dispatch func returned by dispatchmethod.
286 @staticmethod 287 def register( 288 func: Callable[[Any, Any], RetT] 289 ) -> Callable[[Any, Any], RetT]: 290 """Register a new dispatch handler for this dispatch-method.""" 291 raise RuntimeError('Should not get here')
Register a new dispatch handler for this dispatch-method.
297def dispatchmethod( 298 func: Callable[[Any, ArgT], RetT] 299) -> DispatchMethodWrapper[ArgT, RetT]: 300 """A variation of functools.singledispatch for methods. 301 302 Note: as of Python 3.9 there is now functools.singledispatchmethod, 303 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 304 which gives us a reason to keep this one around for now. 305 """ 306 from functools import singledispatch, update_wrapper 307 308 origwrapper: Any = singledispatch(func) 309 310 # Pull this out so hopefully origwrapper can die, 311 # otherwise we reference origwrapper in our wrapper. 312 dispatch = origwrapper.dispatch 313 314 # All we do here is recreate the end of functools.singledispatch 315 # where it returns a wrapper except instead of the wrapper using the 316 # first arg to the function ours uses the second (to skip 'self'). 317 # This was made against Python 3.7; we should probably check up on 318 # this in later versions in case anything has changed. 319 # (or hopefully they'll add this functionality to their version) 320 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 321 def wrapper(*args: Any, **kw: Any) -> Any: 322 if not args or len(args) < 2: 323 raise TypeError( 324 f'{funcname} requires at least ' '2 positional arguments' 325 ) 326 327 return dispatch(args[1].__class__)(*args, **kw) 328 329 funcname = getattr(func, '__name__', 'dispatchmethod method') 330 wrapper.register = origwrapper.register # type: ignore 331 wrapper.dispatch = dispatch # type: ignore 332 wrapper.registry = origwrapper.registry # type: ignore 333 # pylint: disable=protected-access 334 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 335 update_wrapper(wrapper, func) 336 # pylint: enable=protected-access 337 return cast(DispatchMethodWrapper, wrapper)
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
340def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 341 """Decorator for functions to allow dispatching based on a value. 342 343 This differs from functools.singledispatch in that it dispatches based 344 on the value of an argument, not based on its type. 345 The 'register' method of a value-dispatch function can be used 346 to assign new functions to handle particular values. 347 Unhandled values wind up in the original dispatch function.""" 348 return ValueDispatcher(call)
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
351class ValueDispatcher(Generic[ValT, RetT]): 352 """Used by the valuedispatch decorator""" 353 354 def __init__(self, call: Callable[[ValT], RetT]) -> None: 355 self._base_call = call 356 self._handlers: dict[ValT, Callable[[], RetT]] = {} 357 358 def __call__(self, value: ValT) -> RetT: 359 handler = self._handlers.get(value) 360 if handler is not None: 361 return handler() 362 return self._base_call(value) 363 364 def _add_handler( 365 self, value: ValT, call: Callable[[], RetT] 366 ) -> Callable[[], RetT]: 367 if value in self._handlers: 368 raise RuntimeError(f'Duplicate handlers added for {value}') 369 self._handlers[value] = call 370 return call 371 372 def register( 373 self, value: ValT 374 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 375 """Add a handler to the dispatcher.""" 376 from functools import partial 377 378 return partial(self._add_handler, value)
Used by the valuedispatch decorator
372 def register( 373 self, value: ValT 374 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 375 """Add a handler to the dispatcher.""" 376 from functools import partial 377 378 return partial(self._add_handler, value)
Add a handler to the dispatcher.
381def valuedispatch1arg( 382 call: Callable[[ValT, ArgT], RetT] 383) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 384 """Like valuedispatch but for functions taking an extra argument.""" 385 return ValueDispatcher1Arg(call)
Like valuedispatch but for functions taking an extra argument.
388class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 389 """Used by the valuedispatch1arg decorator""" 390 391 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 392 self._base_call = call 393 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 394 395 def __call__(self, value: ValT, arg: ArgT) -> RetT: 396 handler = self._handlers.get(value) 397 if handler is not None: 398 return handler(arg) 399 return self._base_call(value, arg) 400 401 def _add_handler( 402 self, value: ValT, call: Callable[[ArgT], RetT] 403 ) -> Callable[[ArgT], RetT]: 404 if value in self._handlers: 405 raise RuntimeError(f'Duplicate handlers added for {value}') 406 self._handlers[value] = call 407 return call 408 409 def register( 410 self, value: ValT 411 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 412 """Add a handler to the dispatcher.""" 413 from functools import partial 414 415 return partial(self._add_handler, value)
Used by the valuedispatch1arg decorator
409 def register( 410 self, value: ValT 411 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 412 """Add a handler to the dispatcher.""" 413 from functools import partial 414 415 return partial(self._add_handler, value)
Add a handler to the dispatcher.
432def valuedispatchmethod( 433 call: Callable[[SelfT, ValT], RetT] 434) -> ValueDispatcherMethod[ValT, RetT]: 435 """Like valuedispatch but works with methods instead of functions.""" 436 437 # NOTE: It seems that to wrap a method with a decorator and have self 438 # dispatching do the right thing, we must return a function and not 439 # an executable object. So for this version we store our data here 440 # in the function call dict and simply return a call. 441 442 _base_call = call 443 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 444 445 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 446 if value in _handlers: 447 raise RuntimeError(f'Duplicate handlers added for {value}') 448 _handlers[value] = addcall 449 450 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 451 from functools import partial 452 453 return partial(_add_handler, value) 454 455 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 456 handler = _handlers.get(value) 457 if handler is not None: 458 return handler(self) 459 return _base_call(self, value) 460 461 # We still want to use our returned object to register handlers, but we're 462 # actually just returning a function. So manually stuff the call onto it. 463 setattr(_call_wrapper, 'register', _register) 464 465 # To the type checker's eyes we return a ValueDispatchMethod instance; 466 # this lets it know about our register func and type-check its usage. 467 # In reality we just return a raw function call (for reasons listed above). 468 # pylint: disable=undefined-variable, no-else-return 469 if TYPE_CHECKING: 470 return ValueDispatcherMethod[ValT, RetT]() 471 else: 472 return _call_wrapper
Like valuedispatch but works with methods instead of functions.
475def make_hash(obj: Any) -> int: 476 """Makes a hash from a dictionary, list, tuple or set to any level, 477 that contains only other hashable types (including any lists, tuples, 478 sets, and dictionaries). 479 480 Note that this uses Python's hash() function internally so collisions/etc. 481 may be more common than with fancy cryptographic hashes. 482 483 Also be aware that Python's hash() output varies across processes, so 484 this should only be used for values that will remain in a single process. 485 """ 486 import copy 487 488 if isinstance(obj, (set, tuple, list)): 489 return hash(tuple(make_hash(e) for e in obj)) 490 if not isinstance(obj, dict): 491 return hash(obj) 492 493 new_obj = copy.deepcopy(obj) 494 for k, v in new_obj.items(): 495 new_obj[k] = make_hash(v) 496 497 # NOTE: there is sorted works correctly because it compares only 498 # unique first values (i.e. dict keys) 499 return hash(tuple(frozenset(sorted(new_obj.items()))))
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.
502def float_hash_from_string(s: str) -> float: 503 """Given a string value, returns a float between 0 and 1. 504 505 If consistent across processes. Can be useful for assigning db ids 506 shard values for efficient parallel processing. 507 """ 508 import hashlib 509 510 hash_bytes = hashlib.md5(s.encode()).digest() 511 512 # Generate a random 64 bit int from hash digest bytes. 513 ival = int.from_bytes(hash_bytes[:8]) 514 return ival / ((1 << 64) - 1)
Given a string value, returns a float between 0 and 1.
If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.
517def asserttype(obj: Any, typ: type[T]) -> T: 518 """Return an object typed as a given type. 519 520 Assert is used to check its actual type, so only use this when 521 failures are not expected. Otherwise use checktype. 522 """ 523 assert isinstance(typ, type), 'only actual types accepted' 524 assert isinstance(obj, typ) 525 return obj
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
528def asserttype_o(obj: Any, typ: type[T]) -> T | None: 529 """Return an object typed as a given optional type. 530 531 Assert is used to check its actual type, so only use this when 532 failures are not expected. Otherwise use checktype. 533 """ 534 assert isinstance(typ, type), 'only actual types accepted' 535 assert isinstance(obj, (typ, type(None))) 536 return obj
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
539def checktype(obj: Any, typ: type[T]) -> T: 540 """Return an object typed as a given type. 541 542 Always checks the type at runtime with isinstance and throws a TypeError 543 on failure. Use asserttype for more efficient (but less safe) equivalent. 544 """ 545 assert isinstance(typ, type), 'only actual types accepted' 546 if not isinstance(obj, typ): 547 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 548 return obj
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
551def checktype_o(obj: Any, typ: type[T]) -> T | None: 552 """Return an object typed as a given optional type. 553 554 Always checks the type at runtime with isinstance and throws a TypeError 555 on failure. Use asserttype for more efficient (but less safe) equivalent. 556 """ 557 assert isinstance(typ, type), 'only actual types accepted' 558 if not isinstance(obj, (typ, type(None))): 559 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 560 return obj
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
563def warntype(obj: Any, typ: type[T]) -> T: 564 """Return an object typed as a given type. 565 566 Always checks the type at runtime and simply logs a warning if it is 567 not what is expected. 568 """ 569 assert isinstance(typ, type), 'only actual types accepted' 570 if not isinstance(obj, typ): 571 import logging 572 573 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 574 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
577def warntype_o(obj: Any, typ: type[T]) -> T | None: 578 """Return an object typed as a given type. 579 580 Always checks the type at runtime and simply logs a warning if it is 581 not what is expected. 582 """ 583 assert isinstance(typ, type), 'only actual types accepted' 584 if not isinstance(obj, (typ, type(None))): 585 import logging 586 587 logging.warning( 588 'warntype: expected a %s or None, got a %s', typ, type(obj) 589 ) 590 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
593def assert_non_optional(obj: T | None) -> T: 594 """Return an object with Optional typing removed. 595 596 Assert is used to check its actual type, so only use this when 597 failures are not expected. Use check_non_optional otherwise. 598 """ 599 assert obj is not None 600 return obj
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
603def check_non_optional(obj: T | None) -> T: 604 """Return an object with Optional typing removed. 605 606 Always checks the actual type and throws a TypeError on failure. 607 Use assert_non_optional for a more efficient (but less safe) equivalent. 608 """ 609 if obj is None: 610 raise ValueError('Got None value in check_non_optional.') 611 return obj
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
614def smoothstep(edge0: float, edge1: float, x: float) -> float: 615 """A smooth transition function. 616 617 Returns a value that smoothly moves from 0 to 1 as we go between edges. 618 Values outside of the range return 0 or 1. 619 """ 620 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 621 return y * y * (3.0 - 2.0 * y)
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
624def linearstep(edge0: float, edge1: float, x: float) -> float: 625 """A linear transition function. 626 627 Returns a value that linearly moves from 0 to 1 as we go between edges. 628 Values outside of the range return 0 or 1. 629 """ 630 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
649def human_readable_compact_id(num: int) -> str: 650 """Given a positive int, return a compact string representation for it. 651 652 Handy for visualizing unique numeric ids using as few as possible chars. 653 This representation uses only lowercase letters and numbers (minus the 654 following letters for readability): 655 's' is excluded due to similarity to '5'. 656 'l' is excluded due to similarity to '1'. 657 'i' is excluded due to similarity to '1'. 658 'o' is excluded due to similarity to '0'. 659 'z' is excluded due to similarity to '2'. 660 661 Therefore for n chars this can store values of 21^n. 662 663 When reading human input consisting of these IDs, it may be desirable 664 to map the disallowed chars to their corresponding allowed ones 665 ('o' -> '0', etc). 666 667 Sort order for these ids is the same as the original numbers. 668 669 If more compactness is desired at the expense of readability, 670 use compact_id() instead. 671 """ 672 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
675def compact_id(num: int) -> str: 676 """Given a positive int, return a compact string representation for it. 677 678 Handy for visualizing unique numeric ids using as few as possible chars. 679 This version is more compact than human_readable_compact_id() but less 680 friendly to humans due to using both capital and lowercase letters, 681 both 'O' and '0', etc. 682 683 Therefore for n chars this can store values of 62^n. 684 685 Sort order for these ids is the same as the original numbers. 686 """ 687 return _compact_id( 688 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 689 )
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
692def caller_source_location() -> str: 693 """Returns source file name and line of the code calling us. 694 695 Example: 'mymodule.py:23' 696 """ 697 try: 698 import inspect 699 700 frame = inspect.currentframe() 701 for _i in range(2): 702 if frame is None: 703 raise RuntimeError() 704 frame = frame.f_back 705 if frame is None: 706 raise RuntimeError() 707 fname = os.path.basename(frame.f_code.co_filename) 708 return f'{fname}:{frame.f_lineno}' 709 except Exception: 710 return '<unknown source location>'
Returns source file name and line of the code calling us.
Example: 'mymodule.py:23'
713def unchanging_hostname() -> str: 714 """Return an unchanging name for the local device. 715 716 Similar to the `hostname` call (or os.uname().nodename in Python) 717 except attempts to give a name that doesn't change depending on 718 network conditions. (A Mac will tend to go from Foo to Foo.local, 719 Foo.lan etc. throughout its various adventures) 720 """ 721 import platform 722 import subprocess 723 724 # On Mac, this should give the computer name assigned in System Prefs. 725 if platform.system() == 'Darwin': 726 return ( 727 subprocess.run( 728 ['scutil', '--get', 'ComputerName'], 729 check=True, 730 capture_output=True, 731 ) 732 .stdout.decode() 733 .strip() 734 .replace(' ', '-') 735 ) 736 return os.uname().nodename
Return an unchanging name for the local device.
Similar to the hostname
call (or os.uname().nodename in Python)
except attempts to give a name that doesn't change depending on
network conditions. (A Mac will tend to go from Foo to Foo.local,
Foo.lan etc. throughout its various adventures)
739def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 740 """Do the thing.""" 741 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 742 return 743 744 modulename = module_globals.get('__name__') 745 if not isinstance(modulename, str): 746 raise RuntimeError('Unable to get module name.') 747 assert not modulename.startswith('_') 748 modulename_prefix = f'{modulename}.' 749 modulename_prefix_2 = f'_{modulename}.' 750 751 for name, obj in module_globals.items(): 752 if name.startswith('_'): 753 continue 754 existing = getattr(obj, '__module__', None) 755 try: 756 # Override the module ONLY if it lives under us somewhere. 757 # So ourpackage._submodule.Foo becomes ourpackage.Foo 758 # but otherpackage._submodule.Foo remains untouched. 759 if existing is not None and ( 760 existing.startswith(modulename_prefix) 761 or existing.startswith(modulename_prefix_2) 762 ): 763 obj.__module__ = modulename 764 except Exception: 765 import logging 766 767 logging.warning( 768 'set_canonical_module_names: unable to change __module__' 769 " from '%s' to '%s' on %s object at '%s'.", 770 existing, 771 modulename, 772 type(obj), 773 name, 774 )
Do the thing.
777def timedelta_str( 778 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 779) -> str: 780 """Return a simple human readable time string for a length of time. 781 782 Time can be given as a timedelta or a float representing seconds. 783 Example output: 784 "23d 1h 2m 32s" (with maxparts == 4) 785 "23d 1h" (with maxparts == 2) 786 "23d 1.08h" (with maxparts == 2 and decimals == 2) 787 788 Note that this is hard-coded in English and probably not especially 789 performant. 790 """ 791 # pylint: disable=too-many-locals 792 793 if isinstance(timeval, float): 794 timevalfin = datetime.timedelta(seconds=timeval) 795 else: 796 timevalfin = timeval 797 798 # Internally we only handle positive values. 799 if timevalfin.total_seconds() < 0: 800 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 801 802 years = timevalfin.days // 365 803 days = timevalfin.days % 365 804 hours = timevalfin.seconds // 3600 805 hour_remainder = timevalfin.seconds % 3600 806 minutes = hour_remainder // 60 807 seconds = hour_remainder % 60 808 809 # Now, if we want decimal places for our last value, 810 # calc fractional parts. 811 if decimals: 812 # Calc totals of each type. 813 t_seconds = timevalfin.total_seconds() 814 t_minutes = t_seconds / 60 815 t_hours = t_minutes / 60 816 t_days = t_hours / 24 817 t_years = t_days / 365 818 819 # Calc fractional parts that exclude all whole values to their left. 820 years_covered = years 821 years_f = t_years - years_covered 822 days_covered = years_covered * 365 + days 823 days_f = t_days - days_covered 824 hours_covered = days_covered * 24 + hours 825 hours_f = t_hours - hours_covered 826 minutes_covered = hours_covered * 60 + minutes 827 minutes_f = t_minutes - minutes_covered 828 seconds_covered = minutes_covered * 60 + seconds 829 seconds_f = t_seconds - seconds_covered 830 else: 831 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 832 833 parts: list[str] = [] 834 for part, part_f, suffix in ( 835 (years, years_f, 'y'), 836 (days, days_f, 'd'), 837 (hours, hours_f, 'h'), 838 (minutes, minutes_f, 'm'), 839 (seconds, seconds_f, 's'), 840 ): 841 if part or parts or (not parts and suffix == 's'): 842 # Do decimal version only for the last part. 843 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 844 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 845 else: 846 parts.append(f'{part}{suffix}') 847 if len(parts) >= maxparts: 848 break 849 return ' '.join(parts)
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
852def ago_str( 853 timeval: datetime.datetime, 854 maxparts: int = 1, 855 now: datetime.datetime | None = None, 856 decimals: int = 0, 857) -> str: 858 """Given a datetime, return a clean human readable 'ago' str. 859 860 Note that this is hard-coded in English so should not be used 861 for visible in-game elements; only tools/etc. 862 863 If now is not passed, efro.util.utc_now() is used. 864 """ 865 if now is None: 866 now = utc_now() 867 return ( 868 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 869 + ' ago' 870 )
Given a datetime, return a clean human readable 'ago' str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, utc_now() is used.
873def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 874 """Split a single list into smaller lists.""" 875 return [ 876 input_list[i : i + max_length] 877 for i in range(0, len(input_list), max_length) 878 ]
Split a single list into smaller lists.
881def extract_flag(args: list[str], name: str) -> bool: 882 """Given a list of args and a flag name, returns whether it is present. 883 884 The arg flag, if present, is removed from the arg list. 885 """ 886 from efro.error import CleanError 887 888 count = args.count(name) 889 if count > 1: 890 raise CleanError(f'Flag {name} passed multiple times.') 891 if not count: 892 return False 893 args.remove(name) 894 return True
Given a list of args and a flag name, returns whether it is present.
The arg flag, if present, is removed from the arg list.
907def extract_arg( 908 args: list[str], name: str, required: bool = False 909) -> str | None: 910 """Given a list of args and an arg name, returns a value. 911 912 The arg flag and value are removed from the arg list. 913 raises CleanErrors on any problems. 914 """ 915 from efro.error import CleanError 916 917 count = args.count(name) 918 if not count: 919 if required: 920 raise CleanError(f'Required argument {name} not passed.') 921 return None 922 923 if count > 1: 924 raise CleanError(f'Arg {name} passed multiple times.') 925 926 argindex = args.index(name) 927 if argindex + 1 >= len(args): 928 raise CleanError(f'No value passed after {name} arg.') 929 930 val = args[argindex + 1] 931 del args[argindex : argindex + 2] 932 933 return val
Given a list of args and an arg name, returns a value.
The arg flag and value are removed from the arg list. raises CleanErrors on any problems.