efro.util
Small handy bits of functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Small handy bits of functionality.""" 4 5from __future__ import annotations 6 7import os 8import time 9import weakref 10import datetime 11from enum import Enum 12from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload 13 14if TYPE_CHECKING: 15 import asyncio 16 from typing import Any, Callable, Literal 17 18T = TypeVar('T') 19ValT = TypeVar('ValT') 20ArgT = TypeVar('ArgT') 21SelfT = TypeVar('SelfT') 22RetT = TypeVar('RetT') 23EnumT = TypeVar('EnumT', bound=Enum) 24 25 26class _EmptyObj: 27 pass 28 29 30# A dead weak-ref should be immutable, right? So we can create exactly 31# one and return it for all cases that need an empty weak-ref. 32_g_empty_weak_ref = weakref.ref(_EmptyObj()) 33assert _g_empty_weak_ref() is None 34 35 36def explicit_bool(val: bool) -> bool: 37 """Return a non-inferable boolean value. 38 39 Useful to be able to disable blocks of code without type checkers 40 complaining/etc. 41 """ 42 # pylint: disable=no-else-return 43 if TYPE_CHECKING: 44 # infer this! <boom> 45 import random 46 47 return random.random() < 0.5 48 else: 49 return val 50 51 52def snake_case_to_title(val: str) -> str: 53 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 54 # Kill empty words resulting from leading/trailing/multiple underscores. 55 return ' '.join(w for w in val.split('_') if w).title() 56 57 58def snake_case_to_camel_case(val: str) -> str: 59 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 60 # Replace underscores with spaces; capitalize words; kill spaces. 61 # Not sure about efficiency, but logically simple. 62 return val.replace('_', ' ').title().replace(' ', '') 63 64 65def check_utc(value: datetime.datetime) -> None: 66 """Ensure a datetime value is timezone-aware utc.""" 67 if value.tzinfo is not datetime.UTC: 68 raise ValueError( 69 'datetime value does not have timezone set as datetime.UTC' 70 ) 71 72 73def utc_now() -> datetime.datetime: 74 """Get timezone-aware current utc time. 75 76 Just a shortcut for datetime.datetime.now(datetime.UTC). 77 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 78 times. 79 """ 80 return datetime.datetime.now(datetime.UTC) 81 82 83def utc_now_naive() -> datetime.datetime: 84 """Get naive utc time. 85 86 This can be used to replace datetime.utcnow(), which is now deprecated. 87 Most all code should migrate to use timezone-aware times instead of 88 this. 89 """ 90 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None) 91 92 93def utc_today() -> datetime.datetime: 94 """Get offset-aware midnight in the utc time zone.""" 95 now = datetime.datetime.now(datetime.UTC) 96 return datetime.datetime( 97 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 98 ) 99 100 101def utc_this_hour() -> datetime.datetime: 102 """Get offset-aware beginning of the current hour in the utc time zone.""" 103 now = datetime.datetime.now(datetime.UTC) 104 return datetime.datetime( 105 year=now.year, 106 month=now.month, 107 day=now.day, 108 hour=now.hour, 109 tzinfo=now.tzinfo, 110 ) 111 112 113def utc_this_minute() -> datetime.datetime: 114 """Get offset-aware beginning of current minute in the utc time zone.""" 115 now = datetime.datetime.now(datetime.UTC) 116 return datetime.datetime( 117 year=now.year, 118 month=now.month, 119 day=now.day, 120 hour=now.hour, 121 minute=now.minute, 122 tzinfo=now.tzinfo, 123 ) 124 125 126def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 127 """Return an invalidated weak-reference for the specified type.""" 128 # At runtime, all weakrefs are the same; our type arg is just 129 # for the static type checker. 130 del objtype # Unused. 131 132 # Just create an object and let it die. Is there a cleaner way to do this? 133 # return weakref.ref(_EmptyObj()) # type: ignore 134 135 # Sharing a single ones seems at least a bit better. 136 return _g_empty_weak_ref # type: ignore 137 138 139def data_size_str(bytecount: int, compact: bool = False) -> str: 140 """Given a size in bytes, returns a short human readable string. 141 142 In compact mode this should be 6 or fewer chars for most all 143 sane file sizes. 144 """ 145 # pylint: disable=too-many-return-statements 146 147 # Special case: handle negatives. 148 if bytecount < 0: 149 val = data_size_str(-bytecount, compact=compact) 150 return f'-{val}' 151 152 if bytecount <= 999: 153 suffix = 'B' if compact else 'bytes' 154 return f'{bytecount} {suffix}' 155 kbytecount = bytecount / 1024 156 if round(kbytecount, 1) < 10.0: 157 return f'{kbytecount:.1f} KB' 158 if round(kbytecount, 0) < 999: 159 return f'{kbytecount:.0f} KB' 160 mbytecount = bytecount / (1024 * 1024) 161 if round(mbytecount, 1) < 10.0: 162 return f'{mbytecount:.1f} MB' 163 if round(mbytecount, 0) < 999: 164 return f'{mbytecount:.0f} MB' 165 gbytecount = bytecount / (1024 * 1024 * 1024) 166 if round(gbytecount, 1) < 10.0: 167 return f'{gbytecount:.1f} GB' 168 return f'{gbytecount:.0f} GB' 169 170 171class DirtyBit: 172 """Manages whether a thing is dirty and regulates attempts to clean it. 173 174 To use, simply set the 'dirty' value on this object to True when some 175 action is needed, and then check the 'should_update' value to regulate 176 when attempts to clean it should be made. Set 'dirty' back to False after 177 a successful update. 178 If 'use_lock' is True, an asyncio Lock will be created and incorporated 179 into update attempts to prevent simultaneous updates (should_update will 180 only return True when the lock is unlocked). Note that It is up to the user 181 to lock/unlock the lock during the actual update attempt. 182 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 183 itself back to dirty after being clean for the given amount of time. 184 'min_update_interval' can be used to enforce a minimum update 185 interval even when updates are successful (retry_interval only applies 186 when updates fail) 187 """ 188 189 def __init__( 190 self, 191 dirty: bool = False, 192 retry_interval: float = 5.0, 193 *, 194 use_lock: bool = False, 195 auto_dirty_seconds: float | None = None, 196 min_update_interval: float | None = None, 197 ): 198 curtime = time.monotonic() 199 self._retry_interval = retry_interval 200 self._auto_dirty_seconds = auto_dirty_seconds 201 self._min_update_interval = min_update_interval 202 self._dirty = dirty 203 self._next_update_time: float | None = curtime if dirty else None 204 self._last_update_time: float | None = None 205 self._next_auto_dirty_time: float | None = ( 206 (curtime + self._auto_dirty_seconds) 207 if (not dirty and self._auto_dirty_seconds is not None) 208 else None 209 ) 210 self._use_lock = use_lock 211 self.lock: asyncio.Lock 212 if self._use_lock: 213 import asyncio 214 215 self.lock = asyncio.Lock() 216 217 @property 218 def dirty(self) -> bool: 219 """Whether the target is currently dirty. 220 221 This should be set to False once an update is successful. 222 """ 223 return self._dirty 224 225 @dirty.setter 226 def dirty(self, value: bool) -> None: 227 # If we're freshly clean, set our next auto-dirty time (if we have 228 # one). 229 if self._dirty and not value and self._auto_dirty_seconds is not None: 230 self._next_auto_dirty_time = ( 231 time.monotonic() + self._auto_dirty_seconds 232 ) 233 234 # If we're freshly dirty, schedule an immediate update. 235 if not self._dirty and value: 236 self._next_update_time = time.monotonic() 237 238 # If they want to enforce a minimum update interval, 239 # push out the next update time if it hasn't been long enough. 240 if ( 241 self._min_update_interval is not None 242 and self._last_update_time is not None 243 ): 244 self._next_update_time = max( 245 self._next_update_time, 246 self._last_update_time + self._min_update_interval, 247 ) 248 249 self._dirty = value 250 251 @property 252 def should_update(self) -> bool: 253 """Whether an attempt should be made to clean the target now. 254 255 Always returns False if the target is not dirty. 256 Takes into account the amount of time passed since the target 257 was marked dirty or since should_update last returned True. 258 """ 259 curtime = time.monotonic() 260 261 # Auto-dirty ourself if we're into that. 262 if ( 263 self._next_auto_dirty_time is not None 264 and curtime > self._next_auto_dirty_time 265 ): 266 self.dirty = True 267 self._next_auto_dirty_time = None 268 if not self._dirty: 269 return False 270 if self._use_lock and self.lock.locked(): 271 return False 272 assert self._next_update_time is not None 273 if curtime > self._next_update_time: 274 self._next_update_time = curtime + self._retry_interval 275 self._last_update_time = curtime 276 return True 277 return False 278 279 280class DispatchMethodWrapper(Generic[ArgT, RetT]): 281 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 282 283 def __call__(self, arg: ArgT) -> RetT: 284 raise RuntimeError('Should not get here') 285 286 @staticmethod 287 def register( 288 func: Callable[[Any, Any], RetT] 289 ) -> Callable[[Any, Any], RetT]: 290 """Register a new dispatch handler for this dispatch-method.""" 291 raise RuntimeError('Should not get here') 292 293 registry: dict[Any, Callable] 294 295 296# noinspection PyProtectedMember,PyTypeHints 297def dispatchmethod( 298 func: Callable[[Any, ArgT], RetT] 299) -> DispatchMethodWrapper[ArgT, RetT]: 300 """A variation of functools.singledispatch for methods. 301 302 Note: as of Python 3.9 there is now functools.singledispatchmethod, 303 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 304 which gives us a reason to keep this one around for now. 305 """ 306 from functools import singledispatch, update_wrapper 307 308 origwrapper: Any = singledispatch(func) 309 310 # Pull this out so hopefully origwrapper can die, 311 # otherwise we reference origwrapper in our wrapper. 312 dispatch = origwrapper.dispatch 313 314 # All we do here is recreate the end of functools.singledispatch 315 # where it returns a wrapper except instead of the wrapper using the 316 # first arg to the function ours uses the second (to skip 'self'). 317 # This was made against Python 3.7; we should probably check up on 318 # this in later versions in case anything has changed. 319 # (or hopefully they'll add this functionality to their version) 320 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 321 def wrapper(*args: Any, **kw: Any) -> Any: 322 if not args or len(args) < 2: 323 raise TypeError( 324 f'{funcname} requires at least ' '2 positional arguments' 325 ) 326 327 return dispatch(args[1].__class__)(*args, **kw) 328 329 funcname = getattr(func, '__name__', 'dispatchmethod method') 330 wrapper.register = origwrapper.register # type: ignore 331 wrapper.dispatch = dispatch # type: ignore 332 wrapper.registry = origwrapper.registry # type: ignore 333 # pylint: disable=protected-access 334 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 335 update_wrapper(wrapper, func) 336 # pylint: enable=protected-access 337 return cast(DispatchMethodWrapper, wrapper) 338 339 340def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 341 """Decorator for functions to allow dispatching based on a value. 342 343 This differs from functools.singledispatch in that it dispatches based 344 on the value of an argument, not based on its type. 345 The 'register' method of a value-dispatch function can be used 346 to assign new functions to handle particular values. 347 Unhandled values wind up in the original dispatch function.""" 348 return ValueDispatcher(call) 349 350 351class ValueDispatcher(Generic[ValT, RetT]): 352 """Used by the valuedispatch decorator""" 353 354 def __init__(self, call: Callable[[ValT], RetT]) -> None: 355 self._base_call = call 356 self._handlers: dict[ValT, Callable[[], RetT]] = {} 357 358 def __call__(self, value: ValT) -> RetT: 359 handler = self._handlers.get(value) 360 if handler is not None: 361 return handler() 362 return self._base_call(value) 363 364 def _add_handler( 365 self, value: ValT, call: Callable[[], RetT] 366 ) -> Callable[[], RetT]: 367 if value in self._handlers: 368 raise RuntimeError(f'Duplicate handlers added for {value}') 369 self._handlers[value] = call 370 return call 371 372 def register( 373 self, value: ValT 374 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 375 """Add a handler to the dispatcher.""" 376 from functools import partial 377 378 return partial(self._add_handler, value) 379 380 381def valuedispatch1arg( 382 call: Callable[[ValT, ArgT], RetT] 383) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 384 """Like valuedispatch but for functions taking an extra argument.""" 385 return ValueDispatcher1Arg(call) 386 387 388class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 389 """Used by the valuedispatch1arg decorator""" 390 391 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 392 self._base_call = call 393 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 394 395 def __call__(self, value: ValT, arg: ArgT) -> RetT: 396 handler = self._handlers.get(value) 397 if handler is not None: 398 return handler(arg) 399 return self._base_call(value, arg) 400 401 def _add_handler( 402 self, value: ValT, call: Callable[[ArgT], RetT] 403 ) -> Callable[[ArgT], RetT]: 404 if value in self._handlers: 405 raise RuntimeError(f'Duplicate handlers added for {value}') 406 self._handlers[value] = call 407 return call 408 409 def register( 410 self, value: ValT 411 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 412 """Add a handler to the dispatcher.""" 413 from functools import partial 414 415 return partial(self._add_handler, value) 416 417 418if TYPE_CHECKING: 419 420 class ValueDispatcherMethod(Generic[ValT, RetT]): 421 """Used by the valuedispatchmethod decorator.""" 422 423 def __call__(self, value: ValT) -> RetT: ... 424 425 def register( 426 self, value: ValT 427 ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]: 428 """Add a handler to the dispatcher.""" 429 ... 430 431 432def valuedispatchmethod( 433 call: Callable[[SelfT, ValT], RetT] 434) -> ValueDispatcherMethod[ValT, RetT]: 435 """Like valuedispatch but works with methods instead of functions.""" 436 437 # NOTE: It seems that to wrap a method with a decorator and have self 438 # dispatching do the right thing, we must return a function and not 439 # an executable object. So for this version we store our data here 440 # in the function call dict and simply return a call. 441 442 _base_call = call 443 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 444 445 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 446 if value in _handlers: 447 raise RuntimeError(f'Duplicate handlers added for {value}') 448 _handlers[value] = addcall 449 450 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 451 from functools import partial 452 453 return partial(_add_handler, value) 454 455 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 456 handler = _handlers.get(value) 457 if handler is not None: 458 return handler(self) 459 return _base_call(self, value) 460 461 # We still want to use our returned object to register handlers, but we're 462 # actually just returning a function. So manually stuff the call onto it. 463 setattr(_call_wrapper, 'register', _register) 464 465 # To the type checker's eyes we return a ValueDispatchMethod instance; 466 # this lets it know about our register func and type-check its usage. 467 # In reality we just return a raw function call (for reasons listed above). 468 # pylint: disable=undefined-variable, no-else-return 469 if TYPE_CHECKING: 470 return ValueDispatcherMethod[ValT, RetT]() 471 else: 472 return _call_wrapper 473 474 475def make_hash(obj: Any) -> int: 476 """Makes a hash from a dictionary, list, tuple or set to any level, 477 that contains only other hashable types (including any lists, tuples, 478 sets, and dictionaries). 479 480 Note that this uses Python's hash() function internally so collisions/etc. 481 may be more common than with fancy cryptographic hashes. 482 483 Also be aware that Python's hash() output varies across processes, so 484 this should only be used for values that will remain in a single process. 485 """ 486 import copy 487 488 if isinstance(obj, (set, tuple, list)): 489 return hash(tuple(make_hash(e) for e in obj)) 490 if not isinstance(obj, dict): 491 return hash(obj) 492 493 new_obj = copy.deepcopy(obj) 494 for k, v in new_obj.items(): 495 new_obj[k] = make_hash(v) 496 497 # NOTE: there is sorted works correctly because it compares only 498 # unique first values (i.e. dict keys) 499 return hash(tuple(frozenset(sorted(new_obj.items())))) 500 501 502def float_hash_from_string(s: str) -> float: 503 """Given a string value, returns a float between 0 and 1. 504 505 If consistent across processes. Can be useful for assigning db ids 506 shard values for efficient parallel processing. 507 """ 508 import hashlib 509 510 hash_bytes = hashlib.md5(s.encode()).digest() 511 512 # Generate a random 64 bit int from hash digest bytes. 513 ival = int.from_bytes(hash_bytes[:8]) 514 return ival / ((1 << 64) - 1) 515 516 517def asserttype(obj: Any, typ: type[T]) -> T: 518 """Return an object typed as a given type. 519 520 Assert is used to check its actual type, so only use this when 521 failures are not expected. Otherwise use checktype. 522 """ 523 assert isinstance(typ, type), 'only actual types accepted' 524 assert isinstance(obj, typ) 525 return obj 526 527 528def asserttype_o(obj: Any, typ: type[T]) -> T | None: 529 """Return an object typed as a given optional type. 530 531 Assert is used to check its actual type, so only use this when 532 failures are not expected. Otherwise use checktype. 533 """ 534 assert isinstance(typ, type), 'only actual types accepted' 535 assert isinstance(obj, (typ, type(None))) 536 return obj 537 538 539def checktype(obj: Any, typ: type[T]) -> T: 540 """Return an object typed as a given type. 541 542 Always checks the type at runtime with isinstance and throws a TypeError 543 on failure. Use asserttype for more efficient (but less safe) equivalent. 544 """ 545 assert isinstance(typ, type), 'only actual types accepted' 546 if not isinstance(obj, typ): 547 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 548 return obj 549 550 551def checktype_o(obj: Any, typ: type[T]) -> T | None: 552 """Return an object typed as a given optional type. 553 554 Always checks the type at runtime with isinstance and throws a TypeError 555 on failure. Use asserttype for more efficient (but less safe) equivalent. 556 """ 557 assert isinstance(typ, type), 'only actual types accepted' 558 if not isinstance(obj, (typ, type(None))): 559 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 560 return obj 561 562 563def warntype(obj: Any, typ: type[T]) -> T: 564 """Return an object typed as a given type. 565 566 Always checks the type at runtime and simply logs a warning if it is 567 not what is expected. 568 """ 569 assert isinstance(typ, type), 'only actual types accepted' 570 if not isinstance(obj, typ): 571 import logging 572 573 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 574 return obj # type: ignore 575 576 577def warntype_o(obj: Any, typ: type[T]) -> T | None: 578 """Return an object typed as a given type. 579 580 Always checks the type at runtime and simply logs a warning if it is 581 not what is expected. 582 """ 583 assert isinstance(typ, type), 'only actual types accepted' 584 if not isinstance(obj, (typ, type(None))): 585 import logging 586 587 logging.warning( 588 'warntype: expected a %s or None, got a %s', typ, type(obj) 589 ) 590 return obj # type: ignore 591 592 593def assert_non_optional(obj: T | None) -> T: 594 """Return an object with Optional typing removed. 595 596 Assert is used to check its actual type, so only use this when 597 failures are not expected. Use check_non_optional otherwise. 598 """ 599 assert obj is not None 600 return obj 601 602 603def check_non_optional(obj: T | None) -> T: 604 """Return an object with Optional typing removed. 605 606 Always checks the actual type and throws a TypeError on failure. 607 Use assert_non_optional for a more efficient (but less safe) equivalent. 608 """ 609 if obj is None: 610 raise ValueError('Got None value in check_non_optional.') 611 return obj 612 613 614def smoothstep(edge0: float, edge1: float, x: float) -> float: 615 """A smooth transition function. 616 617 Returns a value that smoothly moves from 0 to 1 as we go between edges. 618 Values outside of the range return 0 or 1. 619 """ 620 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 621 return y * y * (3.0 - 2.0 * y) 622 623 624def linearstep(edge0: float, edge1: float, x: float) -> float: 625 """A linear transition function. 626 627 Returns a value that linearly moves from 0 to 1 as we go between edges. 628 Values outside of the range return 0 or 1. 629 """ 630 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) 631 632 633def _compact_id(num: int, chars: str) -> str: 634 if num < 0: 635 raise ValueError('Negative integers not allowed.') 636 637 # Chars must be in sorted order for sorting to work correctly 638 # on our output. 639 assert ''.join(sorted(list(chars))) == chars 640 641 base = len(chars) 642 out = '' 643 while num: 644 out += chars[num % base] 645 num //= base 646 return out[::-1] or '0' 647 648 649def human_readable_compact_id(num: int) -> str: 650 """Given a positive int, return a compact string representation for it. 651 652 Handy for visualizing unique numeric ids using as few as possible chars. 653 This representation uses only lowercase letters and numbers (minus the 654 following letters for readability): 655 's' is excluded due to similarity to '5'. 656 'l' is excluded due to similarity to '1'. 657 'i' is excluded due to similarity to '1'. 658 'o' is excluded due to similarity to '0'. 659 'z' is excluded due to similarity to '2'. 660 661 Therefore for n chars this can store values of 21^n. 662 663 When reading human input consisting of these IDs, it may be desirable 664 to map the disallowed chars to their corresponding allowed ones 665 ('o' -> '0', etc). 666 667 Sort order for these ids is the same as the original numbers. 668 669 If more compactness is desired at the expense of readability, 670 use compact_id() instead. 671 """ 672 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy') 673 674 675def compact_id(num: int) -> str: 676 """Given a positive int, return a compact string representation for it. 677 678 Handy for visualizing unique numeric ids using as few as possible chars. 679 This version is more compact than human_readable_compact_id() but less 680 friendly to humans due to using both capital and lowercase letters, 681 both 'O' and '0', etc. 682 683 Therefore for n chars this can store values of 62^n. 684 685 Sort order for these ids is the same as the original numbers. 686 """ 687 return _compact_id( 688 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 689 ) 690 691 692def caller_source_location() -> str: 693 """Returns source file name and line of the code calling us. 694 695 Example: 'mymodule.py:23' 696 """ 697 try: 698 import inspect 699 700 frame = inspect.currentframe() 701 for _i in range(2): 702 if frame is None: 703 raise RuntimeError() 704 frame = frame.f_back 705 if frame is None: 706 raise RuntimeError() 707 fname = os.path.basename(frame.f_code.co_filename) 708 return f'{fname}:{frame.f_lineno}' 709 except Exception: 710 return '<unknown source location>' 711 712 713def unchanging_hostname() -> str: 714 """Return an unchanging name for the local device. 715 716 Similar to the `hostname` call (or os.uname().nodename in Python) 717 except attempts to give a name that doesn't change depending on 718 network conditions. (A Mac will tend to go from Foo to Foo.local, 719 Foo.lan etc. throughout its various adventures) 720 """ 721 import platform 722 import subprocess 723 724 # On Mac, this should give the computer name assigned in System Prefs. 725 if platform.system() == 'Darwin': 726 return ( 727 subprocess.run( 728 ['scutil', '--get', 'ComputerName'], 729 check=True, 730 capture_output=True, 731 ) 732 .stdout.decode() 733 .strip() 734 .replace(' ', '-') 735 ) 736 return os.uname().nodename 737 738 739def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 740 """Do the thing.""" 741 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 742 return 743 744 modulename = module_globals.get('__name__') 745 if not isinstance(modulename, str): 746 raise RuntimeError('Unable to get module name.') 747 assert not modulename.startswith('_') 748 modulename_prefix = f'{modulename}.' 749 modulename_prefix_2 = f'_{modulename}.' 750 751 for name, obj in module_globals.items(): 752 if name.startswith('_'): 753 continue 754 existing = getattr(obj, '__module__', None) 755 try: 756 # Override the module ONLY if it lives under us somewhere. 757 # So ourpackage._submodule.Foo becomes ourpackage.Foo 758 # but otherpackage._submodule.Foo remains untouched. 759 if existing is not None and ( 760 existing.startswith(modulename_prefix) 761 or existing.startswith(modulename_prefix_2) 762 ): 763 obj.__module__ = modulename 764 except Exception: 765 import logging 766 767 logging.warning( 768 'set_canonical_module_names: unable to change __module__' 769 " from '%s' to '%s' on %s object at '%s'.", 770 existing, 771 modulename, 772 type(obj), 773 name, 774 ) 775 776 777def timedelta_str( 778 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 779) -> str: 780 """Return a simple human readable time string for a length of time. 781 782 Time can be given as a timedelta or a float representing seconds. 783 Example output: 784 "23d 1h 2m 32s" (with maxparts == 4) 785 "23d 1h" (with maxparts == 2) 786 "23d 1.08h" (with maxparts == 2 and decimals == 2) 787 788 Note that this is hard-coded in English and probably not especially 789 performant. 790 """ 791 # pylint: disable=too-many-locals 792 793 if isinstance(timeval, float): 794 timevalfin = datetime.timedelta(seconds=timeval) 795 else: 796 timevalfin = timeval 797 798 # Internally we only handle positive values. 799 if timevalfin.total_seconds() < 0: 800 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 801 802 years = timevalfin.days // 365 803 days = timevalfin.days % 365 804 hours = timevalfin.seconds // 3600 805 hour_remainder = timevalfin.seconds % 3600 806 minutes = hour_remainder // 60 807 seconds = hour_remainder % 60 808 809 # Now, if we want decimal places for our last value, 810 # calc fractional parts. 811 if decimals: 812 # Calc totals of each type. 813 t_seconds = timevalfin.total_seconds() 814 t_minutes = t_seconds / 60 815 t_hours = t_minutes / 60 816 t_days = t_hours / 24 817 t_years = t_days / 365 818 819 # Calc fractional parts that exclude all whole values to their left. 820 years_covered = years 821 years_f = t_years - years_covered 822 days_covered = years_covered * 365 + days 823 days_f = t_days - days_covered 824 hours_covered = days_covered * 24 + hours 825 hours_f = t_hours - hours_covered 826 minutes_covered = hours_covered * 60 + minutes 827 minutes_f = t_minutes - minutes_covered 828 seconds_covered = minutes_covered * 60 + seconds 829 seconds_f = t_seconds - seconds_covered 830 else: 831 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 832 833 parts: list[str] = [] 834 for part, part_f, suffix in ( 835 (years, years_f, 'y'), 836 (days, days_f, 'd'), 837 (hours, hours_f, 'h'), 838 (minutes, minutes_f, 'm'), 839 (seconds, seconds_f, 's'), 840 ): 841 if part or parts or (not parts and suffix == 's'): 842 # Do decimal version only for the last part. 843 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 844 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 845 else: 846 parts.append(f'{part}{suffix}') 847 if len(parts) >= maxparts: 848 break 849 return ' '.join(parts) 850 851 852def ago_str( 853 timeval: datetime.datetime, 854 maxparts: int = 1, 855 now: datetime.datetime | None = None, 856 decimals: int = 0, 857) -> str: 858 """Given a datetime, return a clean human readable 'ago' str. 859 860 Note that this is hard-coded in English so should not be used 861 for visible in-game elements; only tools/etc. 862 863 If now is not passed, efro.util.utc_now() is used. 864 """ 865 if now is None: 866 now = utc_now() 867 return ( 868 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 869 + ' ago' 870 ) 871 872 873def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 874 """Split a single list into smaller lists.""" 875 return [ 876 input_list[i : i + max_length] 877 for i in range(0, len(input_list), max_length) 878 ] 879 880 881def extract_flag(args: list[str], name: str) -> bool: 882 """Given a list of args and a flag name, returns whether it is present. 883 884 The arg flag, if present, is removed from the arg list. 885 """ 886 from efro.error import CleanError 887 888 count = args.count(name) 889 if count > 1: 890 raise CleanError(f'Flag {name} passed multiple times.') 891 if not count: 892 return False 893 args.remove(name) 894 return True 895 896 897@overload 898def extract_arg( 899 args: list[str], name: str, required: Literal[False] = False 900) -> str | None: ... 901 902 903@overload 904def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ... 905 906 907def extract_arg( 908 args: list[str], name: str, required: bool = False 909) -> str | None: 910 """Given a list of args and an arg name, returns a value. 911 912 The arg flag and value are removed from the arg list. 913 raises CleanErrors on any problems. 914 """ 915 from efro.error import CleanError 916 917 count = args.count(name) 918 if not count: 919 if required: 920 raise CleanError(f'Required argument {name} not passed.') 921 return None 922 923 if count > 1: 924 raise CleanError(f'Arg {name} passed multiple times.') 925 926 argindex = args.index(name) 927 if argindex + 1 >= len(args): 928 raise CleanError(f'No value passed after {name} arg.') 929 930 val = args[argindex + 1] 931 del args[argindex : argindex + 2] 932 933 return val
37def explicit_bool(val: bool) -> bool: 38 """Return a non-inferable boolean value. 39 40 Useful to be able to disable blocks of code without type checkers 41 complaining/etc. 42 """ 43 # pylint: disable=no-else-return 44 if TYPE_CHECKING: 45 # infer this! <boom> 46 import random 47 48 return random.random() < 0.5 49 else: 50 return val
Return a non-inferable boolean value.
Useful to be able to disable blocks of code without type checkers complaining/etc.
53def snake_case_to_title(val: str) -> str: 54 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 55 # Kill empty words resulting from leading/trailing/multiple underscores. 56 return ' '.join(w for w in val.split('_') if w).title()
Given a snake-case string 'foo_bar', returns 'Foo Bar'.
59def snake_case_to_camel_case(val: str) -> str: 60 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 61 # Replace underscores with spaces; capitalize words; kill spaces. 62 # Not sure about efficiency, but logically simple. 63 return val.replace('_', ' ').title().replace(' ', '')
Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.
66def check_utc(value: datetime.datetime) -> None: 67 """Ensure a datetime value is timezone-aware utc.""" 68 if value.tzinfo is not datetime.UTC: 69 raise ValueError( 70 'datetime value does not have timezone set as datetime.UTC' 71 )
Ensure a datetime value is timezone-aware utc.
74def utc_now() -> datetime.datetime: 75 """Get timezone-aware current utc time. 76 77 Just a shortcut for datetime.datetime.now(datetime.UTC). 78 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 79 times. 80 """ 81 return datetime.datetime.now(datetime.UTC)
Get timezone-aware current utc time.
Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.
84def utc_now_naive() -> datetime.datetime: 85 """Get naive utc time. 86 87 This can be used to replace datetime.utcnow(), which is now deprecated. 88 Most all code should migrate to use timezone-aware times instead of 89 this. 90 """ 91 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
Get naive utc time.
This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.
94def utc_today() -> datetime.datetime: 95 """Get offset-aware midnight in the utc time zone.""" 96 now = datetime.datetime.now(datetime.UTC) 97 return datetime.datetime( 98 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 99 )
Get offset-aware midnight in the utc time zone.
102def utc_this_hour() -> datetime.datetime: 103 """Get offset-aware beginning of the current hour in the utc time zone.""" 104 now = datetime.datetime.now(datetime.UTC) 105 return datetime.datetime( 106 year=now.year, 107 month=now.month, 108 day=now.day, 109 hour=now.hour, 110 tzinfo=now.tzinfo, 111 )
Get offset-aware beginning of the current hour in the utc time zone.
114def utc_this_minute() -> datetime.datetime: 115 """Get offset-aware beginning of current minute in the utc time zone.""" 116 now = datetime.datetime.now(datetime.UTC) 117 return datetime.datetime( 118 year=now.year, 119 month=now.month, 120 day=now.day, 121 hour=now.hour, 122 minute=now.minute, 123 tzinfo=now.tzinfo, 124 )
Get offset-aware beginning of current minute in the utc time zone.
127def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 128 """Return an invalidated weak-reference for the specified type.""" 129 # At runtime, all weakrefs are the same; our type arg is just 130 # for the static type checker. 131 del objtype # Unused. 132 133 # Just create an object and let it die. Is there a cleaner way to do this? 134 # return weakref.ref(_EmptyObj()) # type: ignore 135 136 # Sharing a single ones seems at least a bit better. 137 return _g_empty_weak_ref # type: ignore
Return an invalidated weak-reference for the specified type.
140def data_size_str(bytecount: int, compact: bool = False) -> str: 141 """Given a size in bytes, returns a short human readable string. 142 143 In compact mode this should be 6 or fewer chars for most all 144 sane file sizes. 145 """ 146 # pylint: disable=too-many-return-statements 147 148 # Special case: handle negatives. 149 if bytecount < 0: 150 val = data_size_str(-bytecount, compact=compact) 151 return f'-{val}' 152 153 if bytecount <= 999: 154 suffix = 'B' if compact else 'bytes' 155 return f'{bytecount} {suffix}' 156 kbytecount = bytecount / 1024 157 if round(kbytecount, 1) < 10.0: 158 return f'{kbytecount:.1f} KB' 159 if round(kbytecount, 0) < 999: 160 return f'{kbytecount:.0f} KB' 161 mbytecount = bytecount / (1024 * 1024) 162 if round(mbytecount, 1) < 10.0: 163 return f'{mbytecount:.1f} MB' 164 if round(mbytecount, 0) < 999: 165 return f'{mbytecount:.0f} MB' 166 gbytecount = bytecount / (1024 * 1024 * 1024) 167 if round(gbytecount, 1) < 10.0: 168 return f'{gbytecount:.1f} GB' 169 return f'{gbytecount:.0f} GB'
Given a size in bytes, returns a short human readable string.
In compact mode this should be 6 or fewer chars for most all sane file sizes.
172class DirtyBit: 173 """Manages whether a thing is dirty and regulates attempts to clean it. 174 175 To use, simply set the 'dirty' value on this object to True when some 176 action is needed, and then check the 'should_update' value to regulate 177 when attempts to clean it should be made. Set 'dirty' back to False after 178 a successful update. 179 If 'use_lock' is True, an asyncio Lock will be created and incorporated 180 into update attempts to prevent simultaneous updates (should_update will 181 only return True when the lock is unlocked). Note that It is up to the user 182 to lock/unlock the lock during the actual update attempt. 183 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 184 itself back to dirty after being clean for the given amount of time. 185 'min_update_interval' can be used to enforce a minimum update 186 interval even when updates are successful (retry_interval only applies 187 when updates fail) 188 """ 189 190 def __init__( 191 self, 192 dirty: bool = False, 193 retry_interval: float = 5.0, 194 *, 195 use_lock: bool = False, 196 auto_dirty_seconds: float | None = None, 197 min_update_interval: float | None = None, 198 ): 199 curtime = time.monotonic() 200 self._retry_interval = retry_interval 201 self._auto_dirty_seconds = auto_dirty_seconds 202 self._min_update_interval = min_update_interval 203 self._dirty = dirty 204 self._next_update_time: float | None = curtime if dirty else None 205 self._last_update_time: float | None = None 206 self._next_auto_dirty_time: float | None = ( 207 (curtime + self._auto_dirty_seconds) 208 if (not dirty and self._auto_dirty_seconds is not None) 209 else None 210 ) 211 self._use_lock = use_lock 212 self.lock: asyncio.Lock 213 if self._use_lock: 214 import asyncio 215 216 self.lock = asyncio.Lock() 217 218 @property 219 def dirty(self) -> bool: 220 """Whether the target is currently dirty. 221 222 This should be set to False once an update is successful. 223 """ 224 return self._dirty 225 226 @dirty.setter 227 def dirty(self, value: bool) -> None: 228 # If we're freshly clean, set our next auto-dirty time (if we have 229 # one). 230 if self._dirty and not value and self._auto_dirty_seconds is not None: 231 self._next_auto_dirty_time = ( 232 time.monotonic() + self._auto_dirty_seconds 233 ) 234 235 # If we're freshly dirty, schedule an immediate update. 236 if not self._dirty and value: 237 self._next_update_time = time.monotonic() 238 239 # If they want to enforce a minimum update interval, 240 # push out the next update time if it hasn't been long enough. 241 if ( 242 self._min_update_interval is not None 243 and self._last_update_time is not None 244 ): 245 self._next_update_time = max( 246 self._next_update_time, 247 self._last_update_time + self._min_update_interval, 248 ) 249 250 self._dirty = value 251 252 @property 253 def should_update(self) -> bool: 254 """Whether an attempt should be made to clean the target now. 255 256 Always returns False if the target is not dirty. 257 Takes into account the amount of time passed since the target 258 was marked dirty or since should_update last returned True. 259 """ 260 curtime = time.monotonic() 261 262 # Auto-dirty ourself if we're into that. 263 if ( 264 self._next_auto_dirty_time is not None 265 and curtime > self._next_auto_dirty_time 266 ): 267 self.dirty = True 268 self._next_auto_dirty_time = None 269 if not self._dirty: 270 return False 271 if self._use_lock and self.lock.locked(): 272 return False 273 assert self._next_update_time is not None 274 if curtime > self._next_update_time: 275 self._next_update_time = curtime + self._retry_interval 276 self._last_update_time = curtime 277 return True 278 return False
Manages whether a thing is dirty and regulates attempts to clean it.
To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
190 def __init__( 191 self, 192 dirty: bool = False, 193 retry_interval: float = 5.0, 194 *, 195 use_lock: bool = False, 196 auto_dirty_seconds: float | None = None, 197 min_update_interval: float | None = None, 198 ): 199 curtime = time.monotonic() 200 self._retry_interval = retry_interval 201 self._auto_dirty_seconds = auto_dirty_seconds 202 self._min_update_interval = min_update_interval 203 self._dirty = dirty 204 self._next_update_time: float | None = curtime if dirty else None 205 self._last_update_time: float | None = None 206 self._next_auto_dirty_time: float | None = ( 207 (curtime + self._auto_dirty_seconds) 208 if (not dirty and self._auto_dirty_seconds is not None) 209 else None 210 ) 211 self._use_lock = use_lock 212 self.lock: asyncio.Lock 213 if self._use_lock: 214 import asyncio 215 216 self.lock = asyncio.Lock()
218 @property 219 def dirty(self) -> bool: 220 """Whether the target is currently dirty. 221 222 This should be set to False once an update is successful. 223 """ 224 return self._dirty
Whether the target is currently dirty.
This should be set to False once an update is successful.
252 @property 253 def should_update(self) -> bool: 254 """Whether an attempt should be made to clean the target now. 255 256 Always returns False if the target is not dirty. 257 Takes into account the amount of time passed since the target 258 was marked dirty or since should_update last returned True. 259 """ 260 curtime = time.monotonic() 261 262 # Auto-dirty ourself if we're into that. 263 if ( 264 self._next_auto_dirty_time is not None 265 and curtime > self._next_auto_dirty_time 266 ): 267 self.dirty = True 268 self._next_auto_dirty_time = None 269 if not self._dirty: 270 return False 271 if self._use_lock and self.lock.locked(): 272 return False 273 assert self._next_update_time is not None 274 if curtime > self._next_update_time: 275 self._next_update_time = curtime + self._retry_interval 276 self._last_update_time = curtime 277 return True 278 return False
Whether an attempt should be made to clean the target now.
Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.
281class DispatchMethodWrapper(Generic[ArgT, RetT]): 282 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 283 284 def __call__(self, arg: ArgT) -> RetT: 285 raise RuntimeError('Should not get here') 286 287 @staticmethod 288 def register( 289 func: Callable[[Any, Any], RetT] 290 ) -> Callable[[Any, Any], RetT]: 291 """Register a new dispatch handler for this dispatch-method.""" 292 raise RuntimeError('Should not get here') 293 294 registry: dict[Any, Callable]
Type-aware standin for the dispatch func returned by dispatchmethod.
287 @staticmethod 288 def register( 289 func: Callable[[Any, Any], RetT] 290 ) -> Callable[[Any, Any], RetT]: 291 """Register a new dispatch handler for this dispatch-method.""" 292 raise RuntimeError('Should not get here')
Register a new dispatch handler for this dispatch-method.
298def dispatchmethod( 299 func: Callable[[Any, ArgT], RetT] 300) -> DispatchMethodWrapper[ArgT, RetT]: 301 """A variation of functools.singledispatch for methods. 302 303 Note: as of Python 3.9 there is now functools.singledispatchmethod, 304 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 305 which gives us a reason to keep this one around for now. 306 """ 307 from functools import singledispatch, update_wrapper 308 309 origwrapper: Any = singledispatch(func) 310 311 # Pull this out so hopefully origwrapper can die, 312 # otherwise we reference origwrapper in our wrapper. 313 dispatch = origwrapper.dispatch 314 315 # All we do here is recreate the end of functools.singledispatch 316 # where it returns a wrapper except instead of the wrapper using the 317 # first arg to the function ours uses the second (to skip 'self'). 318 # This was made against Python 3.7; we should probably check up on 319 # this in later versions in case anything has changed. 320 # (or hopefully they'll add this functionality to their version) 321 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 322 def wrapper(*args: Any, **kw: Any) -> Any: 323 if not args or len(args) < 2: 324 raise TypeError( 325 f'{funcname} requires at least ' '2 positional arguments' 326 ) 327 328 return dispatch(args[1].__class__)(*args, **kw) 329 330 funcname = getattr(func, '__name__', 'dispatchmethod method') 331 wrapper.register = origwrapper.register # type: ignore 332 wrapper.dispatch = dispatch # type: ignore 333 wrapper.registry = origwrapper.registry # type: ignore 334 # pylint: disable=protected-access 335 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 336 update_wrapper(wrapper, func) 337 # pylint: enable=protected-access 338 return cast(DispatchMethodWrapper, wrapper)
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
341def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 342 """Decorator for functions to allow dispatching based on a value. 343 344 This differs from functools.singledispatch in that it dispatches based 345 on the value of an argument, not based on its type. 346 The 'register' method of a value-dispatch function can be used 347 to assign new functions to handle particular values. 348 Unhandled values wind up in the original dispatch function.""" 349 return ValueDispatcher(call)
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
352class ValueDispatcher(Generic[ValT, RetT]): 353 """Used by the valuedispatch decorator""" 354 355 def __init__(self, call: Callable[[ValT], RetT]) -> None: 356 self._base_call = call 357 self._handlers: dict[ValT, Callable[[], RetT]] = {} 358 359 def __call__(self, value: ValT) -> RetT: 360 handler = self._handlers.get(value) 361 if handler is not None: 362 return handler() 363 return self._base_call(value) 364 365 def _add_handler( 366 self, value: ValT, call: Callable[[], RetT] 367 ) -> Callable[[], RetT]: 368 if value in self._handlers: 369 raise RuntimeError(f'Duplicate handlers added for {value}') 370 self._handlers[value] = call 371 return call 372 373 def register( 374 self, value: ValT 375 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 376 """Add a handler to the dispatcher.""" 377 from functools import partial 378 379 return partial(self._add_handler, value)
Used by the valuedispatch decorator
373 def register( 374 self, value: ValT 375 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 376 """Add a handler to the dispatcher.""" 377 from functools import partial 378 379 return partial(self._add_handler, value)
Add a handler to the dispatcher.
382def valuedispatch1arg( 383 call: Callable[[ValT, ArgT], RetT] 384) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 385 """Like valuedispatch but for functions taking an extra argument.""" 386 return ValueDispatcher1Arg(call)
Like valuedispatch but for functions taking an extra argument.
389class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 390 """Used by the valuedispatch1arg decorator""" 391 392 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 393 self._base_call = call 394 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 395 396 def __call__(self, value: ValT, arg: ArgT) -> RetT: 397 handler = self._handlers.get(value) 398 if handler is not None: 399 return handler(arg) 400 return self._base_call(value, arg) 401 402 def _add_handler( 403 self, value: ValT, call: Callable[[ArgT], RetT] 404 ) -> Callable[[ArgT], RetT]: 405 if value in self._handlers: 406 raise RuntimeError(f'Duplicate handlers added for {value}') 407 self._handlers[value] = call 408 return call 409 410 def register( 411 self, value: ValT 412 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 413 """Add a handler to the dispatcher.""" 414 from functools import partial 415 416 return partial(self._add_handler, value)
Used by the valuedispatch1arg decorator
410 def register( 411 self, value: ValT 412 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 413 """Add a handler to the dispatcher.""" 414 from functools import partial 415 416 return partial(self._add_handler, value)
Add a handler to the dispatcher.
433def valuedispatchmethod( 434 call: Callable[[SelfT, ValT], RetT] 435) -> ValueDispatcherMethod[ValT, RetT]: 436 """Like valuedispatch but works with methods instead of functions.""" 437 438 # NOTE: It seems that to wrap a method with a decorator and have self 439 # dispatching do the right thing, we must return a function and not 440 # an executable object. So for this version we store our data here 441 # in the function call dict and simply return a call. 442 443 _base_call = call 444 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 445 446 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 447 if value in _handlers: 448 raise RuntimeError(f'Duplicate handlers added for {value}') 449 _handlers[value] = addcall 450 451 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 452 from functools import partial 453 454 return partial(_add_handler, value) 455 456 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 457 handler = _handlers.get(value) 458 if handler is not None: 459 return handler(self) 460 return _base_call(self, value) 461 462 # We still want to use our returned object to register handlers, but we're 463 # actually just returning a function. So manually stuff the call onto it. 464 setattr(_call_wrapper, 'register', _register) 465 466 # To the type checker's eyes we return a ValueDispatchMethod instance; 467 # this lets it know about our register func and type-check its usage. 468 # In reality we just return a raw function call (for reasons listed above). 469 # pylint: disable=undefined-variable, no-else-return 470 if TYPE_CHECKING: 471 return ValueDispatcherMethod[ValT, RetT]() 472 else: 473 return _call_wrapper
Like valuedispatch but works with methods instead of functions.
476def make_hash(obj: Any) -> int: 477 """Makes a hash from a dictionary, list, tuple or set to any level, 478 that contains only other hashable types (including any lists, tuples, 479 sets, and dictionaries). 480 481 Note that this uses Python's hash() function internally so collisions/etc. 482 may be more common than with fancy cryptographic hashes. 483 484 Also be aware that Python's hash() output varies across processes, so 485 this should only be used for values that will remain in a single process. 486 """ 487 import copy 488 489 if isinstance(obj, (set, tuple, list)): 490 return hash(tuple(make_hash(e) for e in obj)) 491 if not isinstance(obj, dict): 492 return hash(obj) 493 494 new_obj = copy.deepcopy(obj) 495 for k, v in new_obj.items(): 496 new_obj[k] = make_hash(v) 497 498 # NOTE: there is sorted works correctly because it compares only 499 # unique first values (i.e. dict keys) 500 return hash(tuple(frozenset(sorted(new_obj.items()))))
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.
503def float_hash_from_string(s: str) -> float: 504 """Given a string value, returns a float between 0 and 1. 505 506 If consistent across processes. Can be useful for assigning db ids 507 shard values for efficient parallel processing. 508 """ 509 import hashlib 510 511 hash_bytes = hashlib.md5(s.encode()).digest() 512 513 # Generate a random 64 bit int from hash digest bytes. 514 ival = int.from_bytes(hash_bytes[:8]) 515 return ival / ((1 << 64) - 1)
Given a string value, returns a float between 0 and 1.
If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.
518def asserttype(obj: Any, typ: type[T]) -> T: 519 """Return an object typed as a given type. 520 521 Assert is used to check its actual type, so only use this when 522 failures are not expected. Otherwise use checktype. 523 """ 524 assert isinstance(typ, type), 'only actual types accepted' 525 assert isinstance(obj, typ) 526 return obj
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
529def asserttype_o(obj: Any, typ: type[T]) -> T | None: 530 """Return an object typed as a given optional type. 531 532 Assert is used to check its actual type, so only use this when 533 failures are not expected. Otherwise use checktype. 534 """ 535 assert isinstance(typ, type), 'only actual types accepted' 536 assert isinstance(obj, (typ, type(None))) 537 return obj
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
540def checktype(obj: Any, typ: type[T]) -> T: 541 """Return an object typed as a given type. 542 543 Always checks the type at runtime with isinstance and throws a TypeError 544 on failure. Use asserttype for more efficient (but less safe) equivalent. 545 """ 546 assert isinstance(typ, type), 'only actual types accepted' 547 if not isinstance(obj, typ): 548 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 549 return obj
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
552def checktype_o(obj: Any, typ: type[T]) -> T | None: 553 """Return an object typed as a given optional type. 554 555 Always checks the type at runtime with isinstance and throws a TypeError 556 on failure. Use asserttype for more efficient (but less safe) equivalent. 557 """ 558 assert isinstance(typ, type), 'only actual types accepted' 559 if not isinstance(obj, (typ, type(None))): 560 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 561 return obj
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
564def warntype(obj: Any, typ: type[T]) -> T: 565 """Return an object typed as a given type. 566 567 Always checks the type at runtime and simply logs a warning if it is 568 not what is expected. 569 """ 570 assert isinstance(typ, type), 'only actual types accepted' 571 if not isinstance(obj, typ): 572 import logging 573 574 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 575 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
578def warntype_o(obj: Any, typ: type[T]) -> T | None: 579 """Return an object typed as a given type. 580 581 Always checks the type at runtime and simply logs a warning if it is 582 not what is expected. 583 """ 584 assert isinstance(typ, type), 'only actual types accepted' 585 if not isinstance(obj, (typ, type(None))): 586 import logging 587 588 logging.warning( 589 'warntype: expected a %s or None, got a %s', typ, type(obj) 590 ) 591 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
594def assert_non_optional(obj: T | None) -> T: 595 """Return an object with Optional typing removed. 596 597 Assert is used to check its actual type, so only use this when 598 failures are not expected. Use check_non_optional otherwise. 599 """ 600 assert obj is not None 601 return obj
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
604def check_non_optional(obj: T | None) -> T: 605 """Return an object with Optional typing removed. 606 607 Always checks the actual type and throws a TypeError on failure. 608 Use assert_non_optional for a more efficient (but less safe) equivalent. 609 """ 610 if obj is None: 611 raise ValueError('Got None value in check_non_optional.') 612 return obj
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
615def smoothstep(edge0: float, edge1: float, x: float) -> float: 616 """A smooth transition function. 617 618 Returns a value that smoothly moves from 0 to 1 as we go between edges. 619 Values outside of the range return 0 or 1. 620 """ 621 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 622 return y * y * (3.0 - 2.0 * y)
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
625def linearstep(edge0: float, edge1: float, x: float) -> float: 626 """A linear transition function. 627 628 Returns a value that linearly moves from 0 to 1 as we go between edges. 629 Values outside of the range return 0 or 1. 630 """ 631 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
650def human_readable_compact_id(num: int) -> str: 651 """Given a positive int, return a compact string representation for it. 652 653 Handy for visualizing unique numeric ids using as few as possible chars. 654 This representation uses only lowercase letters and numbers (minus the 655 following letters for readability): 656 's' is excluded due to similarity to '5'. 657 'l' is excluded due to similarity to '1'. 658 'i' is excluded due to similarity to '1'. 659 'o' is excluded due to similarity to '0'. 660 'z' is excluded due to similarity to '2'. 661 662 Therefore for n chars this can store values of 21^n. 663 664 When reading human input consisting of these IDs, it may be desirable 665 to map the disallowed chars to their corresponding allowed ones 666 ('o' -> '0', etc). 667 668 Sort order for these ids is the same as the original numbers. 669 670 If more compactness is desired at the expense of readability, 671 use compact_id() instead. 672 """ 673 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
676def compact_id(num: int) -> str: 677 """Given a positive int, return a compact string representation for it. 678 679 Handy for visualizing unique numeric ids using as few as possible chars. 680 This version is more compact than human_readable_compact_id() but less 681 friendly to humans due to using both capital and lowercase letters, 682 both 'O' and '0', etc. 683 684 Therefore for n chars this can store values of 62^n. 685 686 Sort order for these ids is the same as the original numbers. 687 """ 688 return _compact_id( 689 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 690 )
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
693def caller_source_location() -> str: 694 """Returns source file name and line of the code calling us. 695 696 Example: 'mymodule.py:23' 697 """ 698 try: 699 import inspect 700 701 frame = inspect.currentframe() 702 for _i in range(2): 703 if frame is None: 704 raise RuntimeError() 705 frame = frame.f_back 706 if frame is None: 707 raise RuntimeError() 708 fname = os.path.basename(frame.f_code.co_filename) 709 return f'{fname}:{frame.f_lineno}' 710 except Exception: 711 return '<unknown source location>'
Returns source file name and line of the code calling us.
Example: 'mymodule.py:23'
714def unchanging_hostname() -> str: 715 """Return an unchanging name for the local device. 716 717 Similar to the `hostname` call (or os.uname().nodename in Python) 718 except attempts to give a name that doesn't change depending on 719 network conditions. (A Mac will tend to go from Foo to Foo.local, 720 Foo.lan etc. throughout its various adventures) 721 """ 722 import platform 723 import subprocess 724 725 # On Mac, this should give the computer name assigned in System Prefs. 726 if platform.system() == 'Darwin': 727 return ( 728 subprocess.run( 729 ['scutil', '--get', 'ComputerName'], 730 check=True, 731 capture_output=True, 732 ) 733 .stdout.decode() 734 .strip() 735 .replace(' ', '-') 736 ) 737 return os.uname().nodename
Return an unchanging name for the local device.
Similar to the hostname
call (or os.uname().nodename in Python)
except attempts to give a name that doesn't change depending on
network conditions. (A Mac will tend to go from Foo to Foo.local,
Foo.lan etc. throughout its various adventures)
740def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 741 """Do the thing.""" 742 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 743 return 744 745 modulename = module_globals.get('__name__') 746 if not isinstance(modulename, str): 747 raise RuntimeError('Unable to get module name.') 748 assert not modulename.startswith('_') 749 modulename_prefix = f'{modulename}.' 750 modulename_prefix_2 = f'_{modulename}.' 751 752 for name, obj in module_globals.items(): 753 if name.startswith('_'): 754 continue 755 existing = getattr(obj, '__module__', None) 756 try: 757 # Override the module ONLY if it lives under us somewhere. 758 # So ourpackage._submodule.Foo becomes ourpackage.Foo 759 # but otherpackage._submodule.Foo remains untouched. 760 if existing is not None and ( 761 existing.startswith(modulename_prefix) 762 or existing.startswith(modulename_prefix_2) 763 ): 764 obj.__module__ = modulename 765 except Exception: 766 import logging 767 768 logging.warning( 769 'set_canonical_module_names: unable to change __module__' 770 " from '%s' to '%s' on %s object at '%s'.", 771 existing, 772 modulename, 773 type(obj), 774 name, 775 )
Do the thing.
778def timedelta_str( 779 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 780) -> str: 781 """Return a simple human readable time string for a length of time. 782 783 Time can be given as a timedelta or a float representing seconds. 784 Example output: 785 "23d 1h 2m 32s" (with maxparts == 4) 786 "23d 1h" (with maxparts == 2) 787 "23d 1.08h" (with maxparts == 2 and decimals == 2) 788 789 Note that this is hard-coded in English and probably not especially 790 performant. 791 """ 792 # pylint: disable=too-many-locals 793 794 if isinstance(timeval, float): 795 timevalfin = datetime.timedelta(seconds=timeval) 796 else: 797 timevalfin = timeval 798 799 # Internally we only handle positive values. 800 if timevalfin.total_seconds() < 0: 801 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 802 803 years = timevalfin.days // 365 804 days = timevalfin.days % 365 805 hours = timevalfin.seconds // 3600 806 hour_remainder = timevalfin.seconds % 3600 807 minutes = hour_remainder // 60 808 seconds = hour_remainder % 60 809 810 # Now, if we want decimal places for our last value, 811 # calc fractional parts. 812 if decimals: 813 # Calc totals of each type. 814 t_seconds = timevalfin.total_seconds() 815 t_minutes = t_seconds / 60 816 t_hours = t_minutes / 60 817 t_days = t_hours / 24 818 t_years = t_days / 365 819 820 # Calc fractional parts that exclude all whole values to their left. 821 years_covered = years 822 years_f = t_years - years_covered 823 days_covered = years_covered * 365 + days 824 days_f = t_days - days_covered 825 hours_covered = days_covered * 24 + hours 826 hours_f = t_hours - hours_covered 827 minutes_covered = hours_covered * 60 + minutes 828 minutes_f = t_minutes - minutes_covered 829 seconds_covered = minutes_covered * 60 + seconds 830 seconds_f = t_seconds - seconds_covered 831 else: 832 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 833 834 parts: list[str] = [] 835 for part, part_f, suffix in ( 836 (years, years_f, 'y'), 837 (days, days_f, 'd'), 838 (hours, hours_f, 'h'), 839 (minutes, minutes_f, 'm'), 840 (seconds, seconds_f, 's'), 841 ): 842 if part or parts or (not parts and suffix == 's'): 843 # Do decimal version only for the last part. 844 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 845 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 846 else: 847 parts.append(f'{part}{suffix}') 848 if len(parts) >= maxparts: 849 break 850 return ' '.join(parts)
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
853def ago_str( 854 timeval: datetime.datetime, 855 maxparts: int = 1, 856 now: datetime.datetime | None = None, 857 decimals: int = 0, 858) -> str: 859 """Given a datetime, return a clean human readable 'ago' str. 860 861 Note that this is hard-coded in English so should not be used 862 for visible in-game elements; only tools/etc. 863 864 If now is not passed, efro.util.utc_now() is used. 865 """ 866 if now is None: 867 now = utc_now() 868 return ( 869 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 870 + ' ago' 871 )
Given a datetime, return a clean human readable 'ago' str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, efro.util.utc_now() is used.
874def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 875 """Split a single list into smaller lists.""" 876 return [ 877 input_list[i : i + max_length] 878 for i in range(0, len(input_list), max_length) 879 ]
Split a single list into smaller lists.
882def extract_flag(args: list[str], name: str) -> bool: 883 """Given a list of args and a flag name, returns whether it is present. 884 885 The arg flag, if present, is removed from the arg list. 886 """ 887 from efro.error import CleanError 888 889 count = args.count(name) 890 if count > 1: 891 raise CleanError(f'Flag {name} passed multiple times.') 892 if not count: 893 return False 894 args.remove(name) 895 return True
Given a list of args and a flag name, returns whether it is present.
The arg flag, if present, is removed from the arg list.
908def extract_arg( 909 args: list[str], name: str, required: bool = False 910) -> str | None: 911 """Given a list of args and an arg name, returns a value. 912 913 The arg flag and value are removed from the arg list. 914 raises CleanErrors on any problems. 915 """ 916 from efro.error import CleanError 917 918 count = args.count(name) 919 if not count: 920 if required: 921 raise CleanError(f'Required argument {name} not passed.') 922 return None 923 924 if count > 1: 925 raise CleanError(f'Arg {name} passed multiple times.') 926 927 argindex = args.index(name) 928 if argindex + 1 >= len(args): 929 raise CleanError(f'No value passed after {name} arg.') 930 931 val = args[argindex + 1] 932 del args[argindex : argindex + 2] 933 934 return val
Given a list of args and an arg name, returns a value.
The arg flag and value are removed from the arg list. raises CleanErrors on any problems.