efro.util
Small handy bits of functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Small handy bits of functionality.""" 4 5from __future__ import annotations 6 7import os 8import time 9import weakref 10import functools 11import datetime 12from enum import Enum 13from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload, ParamSpec 14 15if TYPE_CHECKING: 16 import asyncio 17 from typing import Any, Callable, Literal 18 19T = TypeVar('T') 20ValT = TypeVar('ValT') 21ArgT = TypeVar('ArgT') 22SelfT = TypeVar('SelfT') 23RetT = TypeVar('RetT') 24EnumT = TypeVar('EnumT', bound=Enum) 25 26P = ParamSpec('P') 27 28 29class _EmptyObj: 30 pass 31 32 33# A dead weak-ref should be immutable, right? So we can create exactly 34# one and return it for all cases that need an empty weak-ref. 35_g_empty_weak_ref = weakref.ref(_EmptyObj()) 36assert _g_empty_weak_ref() is None 37 38# Note to self: adding a special form of partial for when we don't need 39# to pass further args/kwargs (which I think is most cases). Even though 40# partial is now type-checked in Mypy (as of Nov 2024) there are still some 41# pitfalls that this avoids (see func docs below). Perhaps it would make 42# sense to simply define a Call class for this purpose; it might be more 43# efficient than wrapping partial anyway (should test this). 44if TYPE_CHECKING: 45 46 def strict_partial( 47 func: Callable[P, T], *args: P.args, **kwargs: P.kwargs 48 ) -> Callable[[], T]: 49 """A version of functools.partial requiring all args to be passed. 50 51 This helps avoid pitfalls where a function is wrapped in a 52 partial but then an extra required arg is added to the function 53 but no type checking error is triggered at usage sites because 54 vanilla partial assumes that extra arg will be provided at call 55 time. 56 57 Note: it would seem like this pitfall could also be avoided on 58 the back end by ensuring that the thing accepting the partial 59 asks for Callable[[], None] instead of just Callable, but as of 60 Nov 2024 it seems that Mypy does not support this; it in fact 61 allows partials to be passed for any callable signature(!). 62 """ 63 ... 64 65else: 66 strict_partial = functools.partial 67 68 69def explicit_bool(val: bool) -> bool: 70 """Return a non-inferable boolean value. 71 72 Useful to be able to disable blocks of code without type checkers 73 complaining/etc. 74 """ 75 # pylint: disable=no-else-return 76 if TYPE_CHECKING: 77 # infer this! <boom> 78 import random 79 80 return random.random() < 0.5 81 else: 82 return val 83 84 85def snake_case_to_title(val: str) -> str: 86 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 87 # Kill empty words resulting from leading/trailing/multiple underscores. 88 return ' '.join(w for w in val.split('_') if w).title() 89 90 91def snake_case_to_camel_case(val: str) -> str: 92 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 93 # Replace underscores with spaces; capitalize words; kill spaces. 94 # Not sure about efficiency, but logically simple. 95 return val.replace('_', ' ').title().replace(' ', '') 96 97 98def check_utc(value: datetime.datetime) -> None: 99 """Ensure a datetime value is timezone-aware utc.""" 100 if value.tzinfo is not datetime.UTC: 101 raise ValueError( 102 'datetime value does not have timezone set as datetime.UTC' 103 ) 104 105 106def utc_now() -> datetime.datetime: 107 """Get timezone-aware current utc time. 108 109 Just a shortcut for datetime.datetime.now(datetime.UTC). 110 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 111 times. 112 """ 113 return datetime.datetime.now(datetime.UTC) 114 115 116def utc_now_naive() -> datetime.datetime: 117 """Get naive utc time. 118 119 This can be used to replace datetime.utcnow(), which is now deprecated. 120 Most all code should migrate to use timezone-aware times instead of 121 relying on this. 122 """ 123 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None) 124 125 126def utc_from_timestamp_naive(timestamp: float) -> datetime.datetime: 127 """Get a naive utc time from a timestamp. 128 129 This can be used to replace datetime.utcfromtimestamp(), which is now 130 deprecated. Most all code should migrate to use timezone-aware times 131 instead of relying on this. 132 """ 133 134 return datetime.datetime.fromtimestamp(timestamp, tz=datetime.UTC).replace( 135 tzinfo=None 136 ) 137 138 139def utc_today() -> datetime.datetime: 140 """Get offset-aware midnight in the utc time zone.""" 141 now = datetime.datetime.now(datetime.UTC) 142 return datetime.datetime( 143 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 144 ) 145 146 147def utc_this_hour() -> datetime.datetime: 148 """Get offset-aware beginning of the current hour in the utc time zone.""" 149 now = datetime.datetime.now(datetime.UTC) 150 return datetime.datetime( 151 year=now.year, 152 month=now.month, 153 day=now.day, 154 hour=now.hour, 155 tzinfo=now.tzinfo, 156 ) 157 158 159def utc_this_minute() -> datetime.datetime: 160 """Get offset-aware beginning of current minute in the utc time zone.""" 161 now = datetime.datetime.now(datetime.UTC) 162 return datetime.datetime( 163 year=now.year, 164 month=now.month, 165 day=now.day, 166 hour=now.hour, 167 minute=now.minute, 168 tzinfo=now.tzinfo, 169 ) 170 171 172def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 173 """Return an invalidated weak-reference for the specified type.""" 174 # At runtime, all weakrefs are the same; our type arg is just 175 # for the static type checker. 176 del objtype # Unused. 177 178 # Just create an object and let it die. Is there a cleaner way to do this? 179 # return weakref.ref(_EmptyObj()) # type: ignore 180 181 # Sharing a single ones seems at least a bit better. 182 return _g_empty_weak_ref # type: ignore 183 184 185def data_size_str(bytecount: int, compact: bool = False) -> str: 186 """Given a size in bytes, returns a short human readable string. 187 188 In compact mode this should be 6 or fewer chars for most all 189 sane file sizes. 190 """ 191 # pylint: disable=too-many-return-statements 192 193 # Special case: handle negatives. 194 if bytecount < 0: 195 val = data_size_str(-bytecount, compact=compact) 196 return f'-{val}' 197 198 if bytecount <= 999: 199 suffix = 'B' if compact else 'bytes' 200 return f'{bytecount} {suffix}' 201 kbytecount = bytecount / 1024 202 if round(kbytecount, 1) < 10.0: 203 return f'{kbytecount:.1f} KB' 204 if round(kbytecount, 0) < 999: 205 return f'{kbytecount:.0f} KB' 206 mbytecount = bytecount / (1024 * 1024) 207 if round(mbytecount, 1) < 10.0: 208 return f'{mbytecount:.1f} MB' 209 if round(mbytecount, 0) < 999: 210 return f'{mbytecount:.0f} MB' 211 gbytecount = bytecount / (1024 * 1024 * 1024) 212 if round(gbytecount, 1) < 10.0: 213 return f'{gbytecount:.1f} GB' 214 return f'{gbytecount:.0f} GB' 215 216 217class DirtyBit: 218 """Manages whether a thing is dirty and regulates cleaning it. 219 220 To use, simply set the 'dirty' value on this object to True when 221 some update is needed, and then check the 'should_update' value to 222 regulate when the actual update should occur. Set 'dirty' back to 223 False after a successful update. 224 225 If 'use_lock' is True, an asyncio Lock will be created and 226 incorporated into update attempts to prevent simultaneous updates 227 (should_update will only return True when the lock is unlocked). 228 Note that It is up to the user to lock/unlock the lock during the 229 actual update attempt. 230 231 If a value is passed for 'auto_dirty_seconds', the dirtybit will 232 flip itself back to dirty after being clean for the given amount of 233 time. 234 235 'min_update_interval' can be used to enforce a minimum update 236 interval even when updates are successful (retry_interval only 237 applies when updates fail) 238 """ 239 240 def __init__( 241 self, 242 dirty: bool = False, 243 retry_interval: float = 5.0, 244 *, 245 use_lock: bool = False, 246 auto_dirty_seconds: float | None = None, 247 min_update_interval: float | None = None, 248 ): 249 curtime = time.monotonic() 250 self._retry_interval = retry_interval 251 self._auto_dirty_seconds = auto_dirty_seconds 252 self._min_update_interval = min_update_interval 253 self._dirty = dirty 254 self._next_update_time: float | None = curtime if dirty else None 255 self._last_update_time: float | None = None 256 self._next_auto_dirty_time: float | None = ( 257 (curtime + self._auto_dirty_seconds) 258 if (not dirty and self._auto_dirty_seconds is not None) 259 else None 260 ) 261 self._use_lock = use_lock 262 self.lock: asyncio.Lock 263 if self._use_lock: 264 import asyncio 265 266 self.lock = asyncio.Lock() 267 268 @property 269 def dirty(self) -> bool: 270 """Whether the target is currently dirty. 271 272 This should be set to False once an update is successful. 273 """ 274 return self._dirty 275 276 @dirty.setter 277 def dirty(self, value: bool) -> None: 278 # If we're freshly clean, set our next auto-dirty time (if we have 279 # one). 280 if self._dirty and not value and self._auto_dirty_seconds is not None: 281 self._next_auto_dirty_time = ( 282 time.monotonic() + self._auto_dirty_seconds 283 ) 284 285 # If we're freshly dirty, schedule an immediate update. 286 if not self._dirty and value: 287 self._next_update_time = time.monotonic() 288 289 # If they want to enforce a minimum update interval, 290 # push out the next update time if it hasn't been long enough. 291 if ( 292 self._min_update_interval is not None 293 and self._last_update_time is not None 294 ): 295 self._next_update_time = max( 296 self._next_update_time, 297 self._last_update_time + self._min_update_interval, 298 ) 299 300 self._dirty = value 301 302 @property 303 def should_update(self) -> bool: 304 """Whether an attempt should be made to clean the target now. 305 306 Always returns False if the target is not dirty. 307 Takes into account the amount of time passed since the target 308 was marked dirty or since should_update last returned True. 309 """ 310 curtime = time.monotonic() 311 312 # Auto-dirty ourself if we're into that. 313 if ( 314 self._next_auto_dirty_time is not None 315 and curtime > self._next_auto_dirty_time 316 ): 317 self.dirty = True 318 self._next_auto_dirty_time = None 319 if not self._dirty: 320 return False 321 if self._use_lock and self.lock.locked(): 322 return False 323 assert self._next_update_time is not None 324 if curtime > self._next_update_time: 325 self._next_update_time = curtime + self._retry_interval 326 self._last_update_time = curtime 327 return True 328 return False 329 330 331class DispatchMethodWrapper(Generic[ArgT, RetT]): 332 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 333 334 def __call__(self, arg: ArgT) -> RetT: 335 raise RuntimeError('Should not get here') 336 337 @staticmethod 338 def register( 339 func: Callable[[Any, Any], RetT] 340 ) -> Callable[[Any, Any], RetT]: 341 """Register a new dispatch handler for this dispatch-method.""" 342 raise RuntimeError('Should not get here') 343 344 registry: dict[Any, Callable] 345 346 347# noinspection PyProtectedMember,PyTypeHints 348def dispatchmethod( 349 func: Callable[[Any, ArgT], RetT] 350) -> DispatchMethodWrapper[ArgT, RetT]: 351 """A variation of functools.singledispatch for methods. 352 353 Note: as of Python 3.9 there is now functools.singledispatchmethod, 354 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 355 which gives us a reason to keep this one around for now. 356 """ 357 from functools import singledispatch, update_wrapper 358 359 origwrapper: Any = singledispatch(func) 360 361 # Pull this out so hopefully origwrapper can die, 362 # otherwise we reference origwrapper in our wrapper. 363 dispatch = origwrapper.dispatch 364 365 # All we do here is recreate the end of functools.singledispatch 366 # where it returns a wrapper except instead of the wrapper using the 367 # first arg to the function ours uses the second (to skip 'self'). 368 # This was made against Python 3.7; we should probably check up on 369 # this in later versions in case anything has changed. 370 # (or hopefully they'll add this functionality to their version) 371 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 372 def wrapper(*args: Any, **kw: Any) -> Any: 373 if not args or len(args) < 2: 374 raise TypeError( 375 f'{funcname} requires at least ' '2 positional arguments' 376 ) 377 378 return dispatch(args[1].__class__)(*args, **kw) 379 380 funcname = getattr(func, '__name__', 'dispatchmethod method') 381 wrapper.register = origwrapper.register # type: ignore 382 wrapper.dispatch = dispatch # type: ignore 383 wrapper.registry = origwrapper.registry # type: ignore 384 # pylint: disable=protected-access 385 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 386 update_wrapper(wrapper, func) 387 # pylint: enable=protected-access 388 return cast(DispatchMethodWrapper, wrapper) 389 390 391def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 392 """Decorator for functions to allow dispatching based on a value. 393 394 This differs from functools.singledispatch in that it dispatches based 395 on the value of an argument, not based on its type. 396 The 'register' method of a value-dispatch function can be used 397 to assign new functions to handle particular values. 398 Unhandled values wind up in the original dispatch function.""" 399 return ValueDispatcher(call) 400 401 402class ValueDispatcher(Generic[ValT, RetT]): 403 """Used by the valuedispatch decorator""" 404 405 def __init__(self, call: Callable[[ValT], RetT]) -> None: 406 self._base_call = call 407 self._handlers: dict[ValT, Callable[[], RetT]] = {} 408 409 def __call__(self, value: ValT) -> RetT: 410 handler = self._handlers.get(value) 411 if handler is not None: 412 return handler() 413 return self._base_call(value) 414 415 def _add_handler( 416 self, value: ValT, call: Callable[[], RetT] 417 ) -> Callable[[], RetT]: 418 if value in self._handlers: 419 raise RuntimeError(f'Duplicate handlers added for {value}') 420 self._handlers[value] = call 421 return call 422 423 def register( 424 self, value: ValT 425 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 426 """Add a handler to the dispatcher.""" 427 from functools import partial 428 429 return partial(self._add_handler, value) 430 431 432def valuedispatch1arg( 433 call: Callable[[ValT, ArgT], RetT] 434) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 435 """Like valuedispatch but for functions taking an extra argument.""" 436 return ValueDispatcher1Arg(call) 437 438 439class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 440 """Used by the valuedispatch1arg decorator""" 441 442 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 443 self._base_call = call 444 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 445 446 def __call__(self, value: ValT, arg: ArgT) -> RetT: 447 handler = self._handlers.get(value) 448 if handler is not None: 449 return handler(arg) 450 return self._base_call(value, arg) 451 452 def _add_handler( 453 self, value: ValT, call: Callable[[ArgT], RetT] 454 ) -> Callable[[ArgT], RetT]: 455 if value in self._handlers: 456 raise RuntimeError(f'Duplicate handlers added for {value}') 457 self._handlers[value] = call 458 return call 459 460 def register( 461 self, value: ValT 462 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 463 """Add a handler to the dispatcher.""" 464 from functools import partial 465 466 return partial(self._add_handler, value) 467 468 469if TYPE_CHECKING: 470 471 class ValueDispatcherMethod(Generic[ValT, RetT]): 472 """Used by the valuedispatchmethod decorator.""" 473 474 def __call__(self, value: ValT) -> RetT: ... 475 476 def register( 477 self, value: ValT 478 ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]: 479 """Add a handler to the dispatcher.""" 480 ... 481 482 483def valuedispatchmethod( 484 call: Callable[[SelfT, ValT], RetT] 485) -> ValueDispatcherMethod[ValT, RetT]: 486 """Like valuedispatch but works with methods instead of functions.""" 487 488 # NOTE: It seems that to wrap a method with a decorator and have self 489 # dispatching do the right thing, we must return a function and not 490 # an executable object. So for this version we store our data here 491 # in the function call dict and simply return a call. 492 493 _base_call = call 494 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 495 496 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 497 if value in _handlers: 498 raise RuntimeError(f'Duplicate handlers added for {value}') 499 _handlers[value] = addcall 500 501 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 502 from functools import partial 503 504 return partial(_add_handler, value) 505 506 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 507 handler = _handlers.get(value) 508 if handler is not None: 509 return handler(self) 510 return _base_call(self, value) 511 512 # We still want to use our returned object to register handlers, but we're 513 # actually just returning a function. So manually stuff the call onto it. 514 setattr(_call_wrapper, 'register', _register) 515 516 # To the type checker's eyes we return a ValueDispatchMethod instance; 517 # this lets it know about our register func and type-check its usage. 518 # In reality we just return a raw function call (for reasons listed above). 519 # pylint: disable=undefined-variable, no-else-return 520 if TYPE_CHECKING: 521 return ValueDispatcherMethod[ValT, RetT]() 522 else: 523 return _call_wrapper 524 525 526def make_hash(obj: Any) -> int: 527 """Makes a hash from a dictionary, list, tuple or set to any level, 528 that contains only other hashable types (including any lists, tuples, 529 sets, and dictionaries). 530 531 Note that this uses Python's hash() function internally so collisions/etc. 532 may be more common than with fancy cryptographic hashes. 533 534 Also be aware that Python's hash() output varies across processes, so 535 this should only be used for values that will remain in a single process. 536 """ 537 import copy 538 539 if isinstance(obj, (set, tuple, list)): 540 return hash(tuple(make_hash(e) for e in obj)) 541 if not isinstance(obj, dict): 542 return hash(obj) 543 544 new_obj = copy.deepcopy(obj) 545 for k, v in new_obj.items(): 546 new_obj[k] = make_hash(v) 547 548 # NOTE: there is sorted works correctly because it compares only 549 # unique first values (i.e. dict keys) 550 return hash(tuple(frozenset(sorted(new_obj.items())))) 551 552 553def float_hash_from_string(s: str) -> float: 554 """Given a string value, returns a float between 0 and 1. 555 556 If consistent across processes. Can be useful for assigning db ids 557 shard values for efficient parallel processing. 558 """ 559 import hashlib 560 561 hash_bytes = hashlib.md5(s.encode()).digest() 562 563 # Generate a random 64 bit int from hash digest bytes. 564 ival = int.from_bytes(hash_bytes[:8]) 565 return ival / ((1 << 64) - 1) 566 567 568def asserttype(obj: Any, typ: type[T]) -> T: 569 """Return an object typed as a given type. 570 571 Assert is used to check its actual type, so only use this when 572 failures are not expected. Otherwise use checktype. 573 """ 574 assert isinstance(typ, type), 'only actual types accepted' 575 assert isinstance(obj, typ) 576 return obj 577 578 579def asserttype_o(obj: Any, typ: type[T]) -> T | None: 580 """Return an object typed as a given optional type. 581 582 Assert is used to check its actual type, so only use this when 583 failures are not expected. Otherwise use checktype. 584 """ 585 assert isinstance(typ, type), 'only actual types accepted' 586 assert isinstance(obj, (typ, type(None))) 587 return obj 588 589 590def checktype(obj: Any, typ: type[T]) -> T: 591 """Return an object typed as a given type. 592 593 Always checks the type at runtime with isinstance and throws a TypeError 594 on failure. Use asserttype for more efficient (but less safe) equivalent. 595 """ 596 assert isinstance(typ, type), 'only actual types accepted' 597 if not isinstance(obj, typ): 598 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 599 return obj 600 601 602def checktype_o(obj: Any, typ: type[T]) -> T | None: 603 """Return an object typed as a given optional type. 604 605 Always checks the type at runtime with isinstance and throws a TypeError 606 on failure. Use asserttype for more efficient (but less safe) equivalent. 607 """ 608 assert isinstance(typ, type), 'only actual types accepted' 609 if not isinstance(obj, (typ, type(None))): 610 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 611 return obj 612 613 614def warntype(obj: Any, typ: type[T]) -> T: 615 """Return an object typed as a given type. 616 617 Always checks the type at runtime and simply logs a warning if it is 618 not what is expected. 619 """ 620 assert isinstance(typ, type), 'only actual types accepted' 621 if not isinstance(obj, typ): 622 import logging 623 624 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 625 return obj # type: ignore 626 627 628def warntype_o(obj: Any, typ: type[T]) -> T | None: 629 """Return an object typed as a given type. 630 631 Always checks the type at runtime and simply logs a warning if it is 632 not what is expected. 633 """ 634 assert isinstance(typ, type), 'only actual types accepted' 635 if not isinstance(obj, (typ, type(None))): 636 import logging 637 638 logging.warning( 639 'warntype: expected a %s or None, got a %s', typ, type(obj) 640 ) 641 return obj # type: ignore 642 643 644def assert_non_optional(obj: T | None) -> T: 645 """Return an object with Optional typing removed. 646 647 Assert is used to check its actual type, so only use this when 648 failures are not expected. Use check_non_optional otherwise. 649 """ 650 assert obj is not None 651 return obj 652 653 654def check_non_optional(obj: T | None) -> T: 655 """Return an object with Optional typing removed. 656 657 Always checks the actual type and throws a TypeError on failure. 658 Use assert_non_optional for a more efficient (but less safe) equivalent. 659 """ 660 if obj is None: 661 raise ValueError('Got None value in check_non_optional.') 662 return obj 663 664 665def smoothstep(edge0: float, edge1: float, x: float) -> float: 666 """A smooth transition function. 667 668 Returns a value that smoothly moves from 0 to 1 as we go between edges. 669 Values outside of the range return 0 or 1. 670 """ 671 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 672 return y * y * (3.0 - 2.0 * y) 673 674 675def linearstep(edge0: float, edge1: float, x: float) -> float: 676 """A linear transition function. 677 678 Returns a value that linearly moves from 0 to 1 as we go between edges. 679 Values outside of the range return 0 or 1. 680 """ 681 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) 682 683 684def _compact_id(num: int, chars: str) -> str: 685 if num < 0: 686 raise ValueError('Negative integers not allowed.') 687 688 # Chars must be in sorted order for sorting to work correctly 689 # on our output. 690 assert ''.join(sorted(list(chars))) == chars 691 692 base = len(chars) 693 out = '' 694 while num: 695 out += chars[num % base] 696 num //= base 697 return out[::-1] or '0' 698 699 700def human_readable_compact_id(num: int) -> str: 701 """Given a positive int, return a compact string representation for it. 702 703 Handy for visualizing unique numeric ids using as few as possible chars. 704 This representation uses only lowercase letters and numbers (minus the 705 following letters for readability): 706 's' is excluded due to similarity to '5'. 707 'l' is excluded due to similarity to '1'. 708 'i' is excluded due to similarity to '1'. 709 'o' is excluded due to similarity to '0'. 710 'z' is excluded due to similarity to '2'. 711 712 Therefore for n chars this can store values of 21^n. 713 714 When reading human input consisting of these IDs, it may be desirable 715 to map the disallowed chars to their corresponding allowed ones 716 ('o' -> '0', etc). 717 718 Sort order for these ids is the same as the original numbers. 719 720 If more compactness is desired at the expense of readability, 721 use compact_id() instead. 722 """ 723 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy') 724 725 726def compact_id(num: int) -> str: 727 """Given a positive int, return a compact string representation for it. 728 729 Handy for visualizing unique numeric ids using as few as possible chars. 730 This version is more compact than human_readable_compact_id() but less 731 friendly to humans due to using both capital and lowercase letters, 732 both 'O' and '0', etc. 733 734 Therefore for n chars this can store values of 62^n. 735 736 Sort order for these ids is the same as the original numbers. 737 """ 738 return _compact_id( 739 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 740 ) 741 742 743def caller_source_location() -> str: 744 """Returns source file name and line of the code calling us. 745 746 Example: 'mymodule.py:23' 747 """ 748 try: 749 import inspect 750 751 frame = inspect.currentframe() 752 for _i in range(2): 753 if frame is None: 754 raise RuntimeError() 755 frame = frame.f_back 756 if frame is None: 757 raise RuntimeError() 758 fname = os.path.basename(frame.f_code.co_filename) 759 return f'{fname}:{frame.f_lineno}' 760 except Exception: 761 return '<unknown source location>' 762 763 764def unchanging_hostname() -> str: 765 """Return an unchanging name for the local device. 766 767 Similar to the `hostname` call (or os.uname().nodename in Python) 768 except attempts to give a name that doesn't change depending on 769 network conditions. (A Mac will tend to go from Foo to Foo.local, 770 Foo.lan etc. throughout its various adventures) 771 """ 772 import platform 773 import subprocess 774 775 # On Mac, this should give the computer name assigned in System Prefs. 776 if platform.system() == 'Darwin': 777 return ( 778 subprocess.run( 779 ['scutil', '--get', 'ComputerName'], 780 check=True, 781 capture_output=True, 782 ) 783 .stdout.decode() 784 .strip() 785 .replace(' ', '-') 786 ) 787 return os.uname().nodename 788 789 790def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 791 """Do the thing.""" 792 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 793 return 794 795 modulename = module_globals.get('__name__') 796 if not isinstance(modulename, str): 797 raise RuntimeError('Unable to get module name.') 798 assert not modulename.startswith('_') 799 modulename_prefix = f'{modulename}.' 800 modulename_prefix_2 = f'_{modulename}.' 801 802 for name, obj in module_globals.items(): 803 if name.startswith('_'): 804 continue 805 existing = getattr(obj, '__module__', None) 806 try: 807 # Override the module ONLY if it lives under us somewhere. 808 # So ourpackage._submodule.Foo becomes ourpackage.Foo 809 # but otherpackage._submodule.Foo remains untouched. 810 if existing is not None and ( 811 existing.startswith(modulename_prefix) 812 or existing.startswith(modulename_prefix_2) 813 ): 814 obj.__module__ = modulename 815 except Exception: 816 import logging 817 818 logging.warning( 819 'set_canonical_module_names: unable to change __module__' 820 " from '%s' to '%s' on %s object at '%s'.", 821 existing, 822 modulename, 823 type(obj), 824 name, 825 ) 826 827 828def timedelta_str( 829 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 830) -> str: 831 """Return a simple human readable time string for a length of time. 832 833 Time can be given as a timedelta or a float representing seconds. 834 Example output: 835 "23d 1h 2m 32s" (with maxparts == 4) 836 "23d 1h" (with maxparts == 2) 837 "23d 1.08h" (with maxparts == 2 and decimals == 2) 838 839 Note that this is hard-coded in English and probably not especially 840 performant. 841 """ 842 # pylint: disable=too-many-locals 843 844 if isinstance(timeval, float): 845 timevalfin = datetime.timedelta(seconds=timeval) 846 else: 847 timevalfin = timeval 848 849 # Internally we only handle positive values. 850 if timevalfin.total_seconds() < 0: 851 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 852 853 years = timevalfin.days // 365 854 days = timevalfin.days % 365 855 hours = timevalfin.seconds // 3600 856 hour_remainder = timevalfin.seconds % 3600 857 minutes = hour_remainder // 60 858 seconds = hour_remainder % 60 859 860 # Now, if we want decimal places for our last value, 861 # calc fractional parts. 862 if decimals: 863 # Calc totals of each type. 864 t_seconds = timevalfin.total_seconds() 865 t_minutes = t_seconds / 60 866 t_hours = t_minutes / 60 867 t_days = t_hours / 24 868 t_years = t_days / 365 869 870 # Calc fractional parts that exclude all whole values to their left. 871 years_covered = years 872 years_f = t_years - years_covered 873 days_covered = years_covered * 365 + days 874 days_f = t_days - days_covered 875 hours_covered = days_covered * 24 + hours 876 hours_f = t_hours - hours_covered 877 minutes_covered = hours_covered * 60 + minutes 878 minutes_f = t_minutes - minutes_covered 879 seconds_covered = minutes_covered * 60 + seconds 880 seconds_f = t_seconds - seconds_covered 881 else: 882 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 883 884 parts: list[str] = [] 885 for part, part_f, suffix in ( 886 (years, years_f, 'y'), 887 (days, days_f, 'd'), 888 (hours, hours_f, 'h'), 889 (minutes, minutes_f, 'm'), 890 (seconds, seconds_f, 's'), 891 ): 892 if part or parts or (not parts and suffix == 's'): 893 # Do decimal version only for the last part. 894 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 895 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 896 else: 897 parts.append(f'{part}{suffix}') 898 if len(parts) >= maxparts: 899 break 900 return ' '.join(parts) 901 902 903def ago_str( 904 timeval: datetime.datetime, 905 maxparts: int = 1, 906 now: datetime.datetime | None = None, 907 decimals: int = 0, 908) -> str: 909 """Given a datetime, return a clean human readable 'ago' str. 910 911 Note that this is hard-coded in English so should not be used 912 for visible in-game elements; only tools/etc. 913 914 If now is not passed, efro.util.utc_now() is used. 915 """ 916 if now is None: 917 now = utc_now() 918 return ( 919 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 920 + ' ago' 921 ) 922 923 924def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 925 """Split a single list into smaller lists.""" 926 return [ 927 input_list[i : i + max_length] 928 for i in range(0, len(input_list), max_length) 929 ] 930 931 932def extract_flag(args: list[str], name: str) -> bool: 933 """Given a list of args and a flag name, returns whether it is present. 934 935 The arg flag, if present, is removed from the arg list. 936 """ 937 from efro.error import CleanError 938 939 count = args.count(name) 940 if count > 1: 941 raise CleanError(f'Flag {name} passed multiple times.') 942 if not count: 943 return False 944 args.remove(name) 945 return True 946 947 948@overload 949def extract_arg( 950 args: list[str], name: str, required: Literal[False] = False 951) -> str | None: ... 952 953 954@overload 955def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ... 956 957 958def extract_arg( 959 args: list[str], name: str, required: bool = False 960) -> str | None: 961 """Given a list of args and an arg name, returns a value. 962 963 The arg flag and value are removed from the arg list. 964 raises CleanErrors on any problems. 965 """ 966 from efro.error import CleanError 967 968 count = args.count(name) 969 if not count: 970 if required: 971 raise CleanError(f'Required argument {name} not passed.') 972 return None 973 974 if count > 1: 975 raise CleanError(f'Arg {name} passed multiple times.') 976 977 argindex = args.index(name) 978 if argindex + 1 >= len(args): 979 raise CleanError(f'No value passed after {name} arg.') 980 981 val = args[argindex + 1] 982 del args[argindex : argindex + 2] 983 984 return val
70def explicit_bool(val: bool) -> bool: 71 """Return a non-inferable boolean value. 72 73 Useful to be able to disable blocks of code without type checkers 74 complaining/etc. 75 """ 76 # pylint: disable=no-else-return 77 if TYPE_CHECKING: 78 # infer this! <boom> 79 import random 80 81 return random.random() < 0.5 82 else: 83 return val
Return a non-inferable boolean value.
Useful to be able to disable blocks of code without type checkers complaining/etc.
86def snake_case_to_title(val: str) -> str: 87 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 88 # Kill empty words resulting from leading/trailing/multiple underscores. 89 return ' '.join(w for w in val.split('_') if w).title()
Given a snake-case string 'foo_bar', returns 'Foo Bar'.
92def snake_case_to_camel_case(val: str) -> str: 93 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 94 # Replace underscores with spaces; capitalize words; kill spaces. 95 # Not sure about efficiency, but logically simple. 96 return val.replace('_', ' ').title().replace(' ', '')
Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.
99def check_utc(value: datetime.datetime) -> None: 100 """Ensure a datetime value is timezone-aware utc.""" 101 if value.tzinfo is not datetime.UTC: 102 raise ValueError( 103 'datetime value does not have timezone set as datetime.UTC' 104 )
Ensure a datetime value is timezone-aware utc.
107def utc_now() -> datetime.datetime: 108 """Get timezone-aware current utc time. 109 110 Just a shortcut for datetime.datetime.now(datetime.UTC). 111 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 112 times. 113 """ 114 return datetime.datetime.now(datetime.UTC)
Get timezone-aware current utc time.
Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.
117def utc_now_naive() -> datetime.datetime: 118 """Get naive utc time. 119 120 This can be used to replace datetime.utcnow(), which is now deprecated. 121 Most all code should migrate to use timezone-aware times instead of 122 relying on this. 123 """ 124 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
Get naive utc time.
This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.
127def utc_from_timestamp_naive(timestamp: float) -> datetime.datetime: 128 """Get a naive utc time from a timestamp. 129 130 This can be used to replace datetime.utcfromtimestamp(), which is now 131 deprecated. Most all code should migrate to use timezone-aware times 132 instead of relying on this. 133 """ 134 135 return datetime.datetime.fromtimestamp(timestamp, tz=datetime.UTC).replace( 136 tzinfo=None 137 )
Get a naive utc time from a timestamp.
This can be used to replace datetime.utcfromtimestamp(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.
140def utc_today() -> datetime.datetime: 141 """Get offset-aware midnight in the utc time zone.""" 142 now = datetime.datetime.now(datetime.UTC) 143 return datetime.datetime( 144 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 145 )
Get offset-aware midnight in the utc time zone.
148def utc_this_hour() -> datetime.datetime: 149 """Get offset-aware beginning of the current hour in the utc time zone.""" 150 now = datetime.datetime.now(datetime.UTC) 151 return datetime.datetime( 152 year=now.year, 153 month=now.month, 154 day=now.day, 155 hour=now.hour, 156 tzinfo=now.tzinfo, 157 )
Get offset-aware beginning of the current hour in the utc time zone.
160def utc_this_minute() -> datetime.datetime: 161 """Get offset-aware beginning of current minute in the utc time zone.""" 162 now = datetime.datetime.now(datetime.UTC) 163 return datetime.datetime( 164 year=now.year, 165 month=now.month, 166 day=now.day, 167 hour=now.hour, 168 minute=now.minute, 169 tzinfo=now.tzinfo, 170 )
Get offset-aware beginning of current minute in the utc time zone.
173def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 174 """Return an invalidated weak-reference for the specified type.""" 175 # At runtime, all weakrefs are the same; our type arg is just 176 # for the static type checker. 177 del objtype # Unused. 178 179 # Just create an object and let it die. Is there a cleaner way to do this? 180 # return weakref.ref(_EmptyObj()) # type: ignore 181 182 # Sharing a single ones seems at least a bit better. 183 return _g_empty_weak_ref # type: ignore
Return an invalidated weak-reference for the specified type.
186def data_size_str(bytecount: int, compact: bool = False) -> str: 187 """Given a size in bytes, returns a short human readable string. 188 189 In compact mode this should be 6 or fewer chars for most all 190 sane file sizes. 191 """ 192 # pylint: disable=too-many-return-statements 193 194 # Special case: handle negatives. 195 if bytecount < 0: 196 val = data_size_str(-bytecount, compact=compact) 197 return f'-{val}' 198 199 if bytecount <= 999: 200 suffix = 'B' if compact else 'bytes' 201 return f'{bytecount} {suffix}' 202 kbytecount = bytecount / 1024 203 if round(kbytecount, 1) < 10.0: 204 return f'{kbytecount:.1f} KB' 205 if round(kbytecount, 0) < 999: 206 return f'{kbytecount:.0f} KB' 207 mbytecount = bytecount / (1024 * 1024) 208 if round(mbytecount, 1) < 10.0: 209 return f'{mbytecount:.1f} MB' 210 if round(mbytecount, 0) < 999: 211 return f'{mbytecount:.0f} MB' 212 gbytecount = bytecount / (1024 * 1024 * 1024) 213 if round(gbytecount, 1) < 10.0: 214 return f'{gbytecount:.1f} GB' 215 return f'{gbytecount:.0f} GB'
Given a size in bytes, returns a short human readable string.
In compact mode this should be 6 or fewer chars for most all sane file sizes.
218class DirtyBit: 219 """Manages whether a thing is dirty and regulates cleaning it. 220 221 To use, simply set the 'dirty' value on this object to True when 222 some update is needed, and then check the 'should_update' value to 223 regulate when the actual update should occur. Set 'dirty' back to 224 False after a successful update. 225 226 If 'use_lock' is True, an asyncio Lock will be created and 227 incorporated into update attempts to prevent simultaneous updates 228 (should_update will only return True when the lock is unlocked). 229 Note that It is up to the user to lock/unlock the lock during the 230 actual update attempt. 231 232 If a value is passed for 'auto_dirty_seconds', the dirtybit will 233 flip itself back to dirty after being clean for the given amount of 234 time. 235 236 'min_update_interval' can be used to enforce a minimum update 237 interval even when updates are successful (retry_interval only 238 applies when updates fail) 239 """ 240 241 def __init__( 242 self, 243 dirty: bool = False, 244 retry_interval: float = 5.0, 245 *, 246 use_lock: bool = False, 247 auto_dirty_seconds: float | None = None, 248 min_update_interval: float | None = None, 249 ): 250 curtime = time.monotonic() 251 self._retry_interval = retry_interval 252 self._auto_dirty_seconds = auto_dirty_seconds 253 self._min_update_interval = min_update_interval 254 self._dirty = dirty 255 self._next_update_time: float | None = curtime if dirty else None 256 self._last_update_time: float | None = None 257 self._next_auto_dirty_time: float | None = ( 258 (curtime + self._auto_dirty_seconds) 259 if (not dirty and self._auto_dirty_seconds is not None) 260 else None 261 ) 262 self._use_lock = use_lock 263 self.lock: asyncio.Lock 264 if self._use_lock: 265 import asyncio 266 267 self.lock = asyncio.Lock() 268 269 @property 270 def dirty(self) -> bool: 271 """Whether the target is currently dirty. 272 273 This should be set to False once an update is successful. 274 """ 275 return self._dirty 276 277 @dirty.setter 278 def dirty(self, value: bool) -> None: 279 # If we're freshly clean, set our next auto-dirty time (if we have 280 # one). 281 if self._dirty and not value and self._auto_dirty_seconds is not None: 282 self._next_auto_dirty_time = ( 283 time.monotonic() + self._auto_dirty_seconds 284 ) 285 286 # If we're freshly dirty, schedule an immediate update. 287 if not self._dirty and value: 288 self._next_update_time = time.monotonic() 289 290 # If they want to enforce a minimum update interval, 291 # push out the next update time if it hasn't been long enough. 292 if ( 293 self._min_update_interval is not None 294 and self._last_update_time is not None 295 ): 296 self._next_update_time = max( 297 self._next_update_time, 298 self._last_update_time + self._min_update_interval, 299 ) 300 301 self._dirty = value 302 303 @property 304 def should_update(self) -> bool: 305 """Whether an attempt should be made to clean the target now. 306 307 Always returns False if the target is not dirty. 308 Takes into account the amount of time passed since the target 309 was marked dirty or since should_update last returned True. 310 """ 311 curtime = time.monotonic() 312 313 # Auto-dirty ourself if we're into that. 314 if ( 315 self._next_auto_dirty_time is not None 316 and curtime > self._next_auto_dirty_time 317 ): 318 self.dirty = True 319 self._next_auto_dirty_time = None 320 if not self._dirty: 321 return False 322 if self._use_lock and self.lock.locked(): 323 return False 324 assert self._next_update_time is not None 325 if curtime > self._next_update_time: 326 self._next_update_time = curtime + self._retry_interval 327 self._last_update_time = curtime 328 return True 329 return False
Manages whether a thing is dirty and regulates cleaning it.
To use, simply set the 'dirty' value on this object to True when some update is needed, and then check the 'should_update' value to regulate when the actual update should occur. Set 'dirty' back to False after a successful update.
If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt.
If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time.
'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
241 def __init__( 242 self, 243 dirty: bool = False, 244 retry_interval: float = 5.0, 245 *, 246 use_lock: bool = False, 247 auto_dirty_seconds: float | None = None, 248 min_update_interval: float | None = None, 249 ): 250 curtime = time.monotonic() 251 self._retry_interval = retry_interval 252 self._auto_dirty_seconds = auto_dirty_seconds 253 self._min_update_interval = min_update_interval 254 self._dirty = dirty 255 self._next_update_time: float | None = curtime if dirty else None 256 self._last_update_time: float | None = None 257 self._next_auto_dirty_time: float | None = ( 258 (curtime + self._auto_dirty_seconds) 259 if (not dirty and self._auto_dirty_seconds is not None) 260 else None 261 ) 262 self._use_lock = use_lock 263 self.lock: asyncio.Lock 264 if self._use_lock: 265 import asyncio 266 267 self.lock = asyncio.Lock()
269 @property 270 def dirty(self) -> bool: 271 """Whether the target is currently dirty. 272 273 This should be set to False once an update is successful. 274 """ 275 return self._dirty
Whether the target is currently dirty.
This should be set to False once an update is successful.
303 @property 304 def should_update(self) -> bool: 305 """Whether an attempt should be made to clean the target now. 306 307 Always returns False if the target is not dirty. 308 Takes into account the amount of time passed since the target 309 was marked dirty or since should_update last returned True. 310 """ 311 curtime = time.monotonic() 312 313 # Auto-dirty ourself if we're into that. 314 if ( 315 self._next_auto_dirty_time is not None 316 and curtime > self._next_auto_dirty_time 317 ): 318 self.dirty = True 319 self._next_auto_dirty_time = None 320 if not self._dirty: 321 return False 322 if self._use_lock and self.lock.locked(): 323 return False 324 assert self._next_update_time is not None 325 if curtime > self._next_update_time: 326 self._next_update_time = curtime + self._retry_interval 327 self._last_update_time = curtime 328 return True 329 return False
Whether an attempt should be made to clean the target now.
Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.
332class DispatchMethodWrapper(Generic[ArgT, RetT]): 333 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 334 335 def __call__(self, arg: ArgT) -> RetT: 336 raise RuntimeError('Should not get here') 337 338 @staticmethod 339 def register( 340 func: Callable[[Any, Any], RetT] 341 ) -> Callable[[Any, Any], RetT]: 342 """Register a new dispatch handler for this dispatch-method.""" 343 raise RuntimeError('Should not get here') 344 345 registry: dict[Any, Callable]
Type-aware standin for the dispatch func returned by dispatchmethod.
338 @staticmethod 339 def register( 340 func: Callable[[Any, Any], RetT] 341 ) -> Callable[[Any, Any], RetT]: 342 """Register a new dispatch handler for this dispatch-method.""" 343 raise RuntimeError('Should not get here')
Register a new dispatch handler for this dispatch-method.
349def dispatchmethod( 350 func: Callable[[Any, ArgT], RetT] 351) -> DispatchMethodWrapper[ArgT, RetT]: 352 """A variation of functools.singledispatch for methods. 353 354 Note: as of Python 3.9 there is now functools.singledispatchmethod, 355 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 356 which gives us a reason to keep this one around for now. 357 """ 358 from functools import singledispatch, update_wrapper 359 360 origwrapper: Any = singledispatch(func) 361 362 # Pull this out so hopefully origwrapper can die, 363 # otherwise we reference origwrapper in our wrapper. 364 dispatch = origwrapper.dispatch 365 366 # All we do here is recreate the end of functools.singledispatch 367 # where it returns a wrapper except instead of the wrapper using the 368 # first arg to the function ours uses the second (to skip 'self'). 369 # This was made against Python 3.7; we should probably check up on 370 # this in later versions in case anything has changed. 371 # (or hopefully they'll add this functionality to their version) 372 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 373 def wrapper(*args: Any, **kw: Any) -> Any: 374 if not args or len(args) < 2: 375 raise TypeError( 376 f'{funcname} requires at least ' '2 positional arguments' 377 ) 378 379 return dispatch(args[1].__class__)(*args, **kw) 380 381 funcname = getattr(func, '__name__', 'dispatchmethod method') 382 wrapper.register = origwrapper.register # type: ignore 383 wrapper.dispatch = dispatch # type: ignore 384 wrapper.registry = origwrapper.registry # type: ignore 385 # pylint: disable=protected-access 386 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 387 update_wrapper(wrapper, func) 388 # pylint: enable=protected-access 389 return cast(DispatchMethodWrapper, wrapper)
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
392def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 393 """Decorator for functions to allow dispatching based on a value. 394 395 This differs from functools.singledispatch in that it dispatches based 396 on the value of an argument, not based on its type. 397 The 'register' method of a value-dispatch function can be used 398 to assign new functions to handle particular values. 399 Unhandled values wind up in the original dispatch function.""" 400 return ValueDispatcher(call)
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
403class ValueDispatcher(Generic[ValT, RetT]): 404 """Used by the valuedispatch decorator""" 405 406 def __init__(self, call: Callable[[ValT], RetT]) -> None: 407 self._base_call = call 408 self._handlers: dict[ValT, Callable[[], RetT]] = {} 409 410 def __call__(self, value: ValT) -> RetT: 411 handler = self._handlers.get(value) 412 if handler is not None: 413 return handler() 414 return self._base_call(value) 415 416 def _add_handler( 417 self, value: ValT, call: Callable[[], RetT] 418 ) -> Callable[[], RetT]: 419 if value in self._handlers: 420 raise RuntimeError(f'Duplicate handlers added for {value}') 421 self._handlers[value] = call 422 return call 423 424 def register( 425 self, value: ValT 426 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 427 """Add a handler to the dispatcher.""" 428 from functools import partial 429 430 return partial(self._add_handler, value)
Used by the valuedispatch decorator
424 def register( 425 self, value: ValT 426 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 427 """Add a handler to the dispatcher.""" 428 from functools import partial 429 430 return partial(self._add_handler, value)
Add a handler to the dispatcher.
433def valuedispatch1arg( 434 call: Callable[[ValT, ArgT], RetT] 435) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 436 """Like valuedispatch but for functions taking an extra argument.""" 437 return ValueDispatcher1Arg(call)
Like valuedispatch but for functions taking an extra argument.
440class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 441 """Used by the valuedispatch1arg decorator""" 442 443 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 444 self._base_call = call 445 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 446 447 def __call__(self, value: ValT, arg: ArgT) -> RetT: 448 handler = self._handlers.get(value) 449 if handler is not None: 450 return handler(arg) 451 return self._base_call(value, arg) 452 453 def _add_handler( 454 self, value: ValT, call: Callable[[ArgT], RetT] 455 ) -> Callable[[ArgT], RetT]: 456 if value in self._handlers: 457 raise RuntimeError(f'Duplicate handlers added for {value}') 458 self._handlers[value] = call 459 return call 460 461 def register( 462 self, value: ValT 463 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 464 """Add a handler to the dispatcher.""" 465 from functools import partial 466 467 return partial(self._add_handler, value)
Used by the valuedispatch1arg decorator
461 def register( 462 self, value: ValT 463 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 464 """Add a handler to the dispatcher.""" 465 from functools import partial 466 467 return partial(self._add_handler, value)
Add a handler to the dispatcher.
484def valuedispatchmethod( 485 call: Callable[[SelfT, ValT], RetT] 486) -> ValueDispatcherMethod[ValT, RetT]: 487 """Like valuedispatch but works with methods instead of functions.""" 488 489 # NOTE: It seems that to wrap a method with a decorator and have self 490 # dispatching do the right thing, we must return a function and not 491 # an executable object. So for this version we store our data here 492 # in the function call dict and simply return a call. 493 494 _base_call = call 495 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 496 497 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 498 if value in _handlers: 499 raise RuntimeError(f'Duplicate handlers added for {value}') 500 _handlers[value] = addcall 501 502 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 503 from functools import partial 504 505 return partial(_add_handler, value) 506 507 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 508 handler = _handlers.get(value) 509 if handler is not None: 510 return handler(self) 511 return _base_call(self, value) 512 513 # We still want to use our returned object to register handlers, but we're 514 # actually just returning a function. So manually stuff the call onto it. 515 setattr(_call_wrapper, 'register', _register) 516 517 # To the type checker's eyes we return a ValueDispatchMethod instance; 518 # this lets it know about our register func and type-check its usage. 519 # In reality we just return a raw function call (for reasons listed above). 520 # pylint: disable=undefined-variable, no-else-return 521 if TYPE_CHECKING: 522 return ValueDispatcherMethod[ValT, RetT]() 523 else: 524 return _call_wrapper
Like valuedispatch but works with methods instead of functions.
527def make_hash(obj: Any) -> int: 528 """Makes a hash from a dictionary, list, tuple or set to any level, 529 that contains only other hashable types (including any lists, tuples, 530 sets, and dictionaries). 531 532 Note that this uses Python's hash() function internally so collisions/etc. 533 may be more common than with fancy cryptographic hashes. 534 535 Also be aware that Python's hash() output varies across processes, so 536 this should only be used for values that will remain in a single process. 537 """ 538 import copy 539 540 if isinstance(obj, (set, tuple, list)): 541 return hash(tuple(make_hash(e) for e in obj)) 542 if not isinstance(obj, dict): 543 return hash(obj) 544 545 new_obj = copy.deepcopy(obj) 546 for k, v in new_obj.items(): 547 new_obj[k] = make_hash(v) 548 549 # NOTE: there is sorted works correctly because it compares only 550 # unique first values (i.e. dict keys) 551 return hash(tuple(frozenset(sorted(new_obj.items()))))
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.
554def float_hash_from_string(s: str) -> float: 555 """Given a string value, returns a float between 0 and 1. 556 557 If consistent across processes. Can be useful for assigning db ids 558 shard values for efficient parallel processing. 559 """ 560 import hashlib 561 562 hash_bytes = hashlib.md5(s.encode()).digest() 563 564 # Generate a random 64 bit int from hash digest bytes. 565 ival = int.from_bytes(hash_bytes[:8]) 566 return ival / ((1 << 64) - 1)
Given a string value, returns a float between 0 and 1.
If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.
569def asserttype(obj: Any, typ: type[T]) -> T: 570 """Return an object typed as a given type. 571 572 Assert is used to check its actual type, so only use this when 573 failures are not expected. Otherwise use checktype. 574 """ 575 assert isinstance(typ, type), 'only actual types accepted' 576 assert isinstance(obj, typ) 577 return obj
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
580def asserttype_o(obj: Any, typ: type[T]) -> T | None: 581 """Return an object typed as a given optional type. 582 583 Assert is used to check its actual type, so only use this when 584 failures are not expected. Otherwise use checktype. 585 """ 586 assert isinstance(typ, type), 'only actual types accepted' 587 assert isinstance(obj, (typ, type(None))) 588 return obj
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
591def checktype(obj: Any, typ: type[T]) -> T: 592 """Return an object typed as a given type. 593 594 Always checks the type at runtime with isinstance and throws a TypeError 595 on failure. Use asserttype for more efficient (but less safe) equivalent. 596 """ 597 assert isinstance(typ, type), 'only actual types accepted' 598 if not isinstance(obj, typ): 599 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 600 return obj
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
603def checktype_o(obj: Any, typ: type[T]) -> T | None: 604 """Return an object typed as a given optional type. 605 606 Always checks the type at runtime with isinstance and throws a TypeError 607 on failure. Use asserttype for more efficient (but less safe) equivalent. 608 """ 609 assert isinstance(typ, type), 'only actual types accepted' 610 if not isinstance(obj, (typ, type(None))): 611 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 612 return obj
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
615def warntype(obj: Any, typ: type[T]) -> T: 616 """Return an object typed as a given type. 617 618 Always checks the type at runtime and simply logs a warning if it is 619 not what is expected. 620 """ 621 assert isinstance(typ, type), 'only actual types accepted' 622 if not isinstance(obj, typ): 623 import logging 624 625 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 626 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
629def warntype_o(obj: Any, typ: type[T]) -> T | None: 630 """Return an object typed as a given type. 631 632 Always checks the type at runtime and simply logs a warning if it is 633 not what is expected. 634 """ 635 assert isinstance(typ, type), 'only actual types accepted' 636 if not isinstance(obj, (typ, type(None))): 637 import logging 638 639 logging.warning( 640 'warntype: expected a %s or None, got a %s', typ, type(obj) 641 ) 642 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
645def assert_non_optional(obj: T | None) -> T: 646 """Return an object with Optional typing removed. 647 648 Assert is used to check its actual type, so only use this when 649 failures are not expected. Use check_non_optional otherwise. 650 """ 651 assert obj is not None 652 return obj
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
655def check_non_optional(obj: T | None) -> T: 656 """Return an object with Optional typing removed. 657 658 Always checks the actual type and throws a TypeError on failure. 659 Use assert_non_optional for a more efficient (but less safe) equivalent. 660 """ 661 if obj is None: 662 raise ValueError('Got None value in check_non_optional.') 663 return obj
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
666def smoothstep(edge0: float, edge1: float, x: float) -> float: 667 """A smooth transition function. 668 669 Returns a value that smoothly moves from 0 to 1 as we go between edges. 670 Values outside of the range return 0 or 1. 671 """ 672 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 673 return y * y * (3.0 - 2.0 * y)
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
676def linearstep(edge0: float, edge1: float, x: float) -> float: 677 """A linear transition function. 678 679 Returns a value that linearly moves from 0 to 1 as we go between edges. 680 Values outside of the range return 0 or 1. 681 """ 682 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
701def human_readable_compact_id(num: int) -> str: 702 """Given a positive int, return a compact string representation for it. 703 704 Handy for visualizing unique numeric ids using as few as possible chars. 705 This representation uses only lowercase letters and numbers (minus the 706 following letters for readability): 707 's' is excluded due to similarity to '5'. 708 'l' is excluded due to similarity to '1'. 709 'i' is excluded due to similarity to '1'. 710 'o' is excluded due to similarity to '0'. 711 'z' is excluded due to similarity to '2'. 712 713 Therefore for n chars this can store values of 21^n. 714 715 When reading human input consisting of these IDs, it may be desirable 716 to map the disallowed chars to their corresponding allowed ones 717 ('o' -> '0', etc). 718 719 Sort order for these ids is the same as the original numbers. 720 721 If more compactness is desired at the expense of readability, 722 use compact_id() instead. 723 """ 724 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
727def compact_id(num: int) -> str: 728 """Given a positive int, return a compact string representation for it. 729 730 Handy for visualizing unique numeric ids using as few as possible chars. 731 This version is more compact than human_readable_compact_id() but less 732 friendly to humans due to using both capital and lowercase letters, 733 both 'O' and '0', etc. 734 735 Therefore for n chars this can store values of 62^n. 736 737 Sort order for these ids is the same as the original numbers. 738 """ 739 return _compact_id( 740 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 741 )
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
744def caller_source_location() -> str: 745 """Returns source file name and line of the code calling us. 746 747 Example: 'mymodule.py:23' 748 """ 749 try: 750 import inspect 751 752 frame = inspect.currentframe() 753 for _i in range(2): 754 if frame is None: 755 raise RuntimeError() 756 frame = frame.f_back 757 if frame is None: 758 raise RuntimeError() 759 fname = os.path.basename(frame.f_code.co_filename) 760 return f'{fname}:{frame.f_lineno}' 761 except Exception: 762 return '<unknown source location>'
Returns source file name and line of the code calling us.
Example: 'mymodule.py:23'
765def unchanging_hostname() -> str: 766 """Return an unchanging name for the local device. 767 768 Similar to the `hostname` call (or os.uname().nodename in Python) 769 except attempts to give a name that doesn't change depending on 770 network conditions. (A Mac will tend to go from Foo to Foo.local, 771 Foo.lan etc. throughout its various adventures) 772 """ 773 import platform 774 import subprocess 775 776 # On Mac, this should give the computer name assigned in System Prefs. 777 if platform.system() == 'Darwin': 778 return ( 779 subprocess.run( 780 ['scutil', '--get', 'ComputerName'], 781 check=True, 782 capture_output=True, 783 ) 784 .stdout.decode() 785 .strip() 786 .replace(' ', '-') 787 ) 788 return os.uname().nodename
Return an unchanging name for the local device.
Similar to the hostname
call (or os.uname().nodename in Python)
except attempts to give a name that doesn't change depending on
network conditions. (A Mac will tend to go from Foo to Foo.local,
Foo.lan etc. throughout its various adventures)
791def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 792 """Do the thing.""" 793 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 794 return 795 796 modulename = module_globals.get('__name__') 797 if not isinstance(modulename, str): 798 raise RuntimeError('Unable to get module name.') 799 assert not modulename.startswith('_') 800 modulename_prefix = f'{modulename}.' 801 modulename_prefix_2 = f'_{modulename}.' 802 803 for name, obj in module_globals.items(): 804 if name.startswith('_'): 805 continue 806 existing = getattr(obj, '__module__', None) 807 try: 808 # Override the module ONLY if it lives under us somewhere. 809 # So ourpackage._submodule.Foo becomes ourpackage.Foo 810 # but otherpackage._submodule.Foo remains untouched. 811 if existing is not None and ( 812 existing.startswith(modulename_prefix) 813 or existing.startswith(modulename_prefix_2) 814 ): 815 obj.__module__ = modulename 816 except Exception: 817 import logging 818 819 logging.warning( 820 'set_canonical_module_names: unable to change __module__' 821 " from '%s' to '%s' on %s object at '%s'.", 822 existing, 823 modulename, 824 type(obj), 825 name, 826 )
Do the thing.
829def timedelta_str( 830 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 831) -> str: 832 """Return a simple human readable time string for a length of time. 833 834 Time can be given as a timedelta or a float representing seconds. 835 Example output: 836 "23d 1h 2m 32s" (with maxparts == 4) 837 "23d 1h" (with maxparts == 2) 838 "23d 1.08h" (with maxparts == 2 and decimals == 2) 839 840 Note that this is hard-coded in English and probably not especially 841 performant. 842 """ 843 # pylint: disable=too-many-locals 844 845 if isinstance(timeval, float): 846 timevalfin = datetime.timedelta(seconds=timeval) 847 else: 848 timevalfin = timeval 849 850 # Internally we only handle positive values. 851 if timevalfin.total_seconds() < 0: 852 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 853 854 years = timevalfin.days // 365 855 days = timevalfin.days % 365 856 hours = timevalfin.seconds // 3600 857 hour_remainder = timevalfin.seconds % 3600 858 minutes = hour_remainder // 60 859 seconds = hour_remainder % 60 860 861 # Now, if we want decimal places for our last value, 862 # calc fractional parts. 863 if decimals: 864 # Calc totals of each type. 865 t_seconds = timevalfin.total_seconds() 866 t_minutes = t_seconds / 60 867 t_hours = t_minutes / 60 868 t_days = t_hours / 24 869 t_years = t_days / 365 870 871 # Calc fractional parts that exclude all whole values to their left. 872 years_covered = years 873 years_f = t_years - years_covered 874 days_covered = years_covered * 365 + days 875 days_f = t_days - days_covered 876 hours_covered = days_covered * 24 + hours 877 hours_f = t_hours - hours_covered 878 minutes_covered = hours_covered * 60 + minutes 879 minutes_f = t_minutes - minutes_covered 880 seconds_covered = minutes_covered * 60 + seconds 881 seconds_f = t_seconds - seconds_covered 882 else: 883 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 884 885 parts: list[str] = [] 886 for part, part_f, suffix in ( 887 (years, years_f, 'y'), 888 (days, days_f, 'd'), 889 (hours, hours_f, 'h'), 890 (minutes, minutes_f, 'm'), 891 (seconds, seconds_f, 's'), 892 ): 893 if part or parts or (not parts and suffix == 's'): 894 # Do decimal version only for the last part. 895 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 896 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 897 else: 898 parts.append(f'{part}{suffix}') 899 if len(parts) >= maxparts: 900 break 901 return ' '.join(parts)
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
904def ago_str( 905 timeval: datetime.datetime, 906 maxparts: int = 1, 907 now: datetime.datetime | None = None, 908 decimals: int = 0, 909) -> str: 910 """Given a datetime, return a clean human readable 'ago' str. 911 912 Note that this is hard-coded in English so should not be used 913 for visible in-game elements; only tools/etc. 914 915 If now is not passed, efro.util.utc_now() is used. 916 """ 917 if now is None: 918 now = utc_now() 919 return ( 920 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 921 + ' ago' 922 )
Given a datetime, return a clean human readable 'ago' str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, efro.util.utc_now() is used.
925def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 926 """Split a single list into smaller lists.""" 927 return [ 928 input_list[i : i + max_length] 929 for i in range(0, len(input_list), max_length) 930 ]
Split a single list into smaller lists.
933def extract_flag(args: list[str], name: str) -> bool: 934 """Given a list of args and a flag name, returns whether it is present. 935 936 The arg flag, if present, is removed from the arg list. 937 """ 938 from efro.error import CleanError 939 940 count = args.count(name) 941 if count > 1: 942 raise CleanError(f'Flag {name} passed multiple times.') 943 if not count: 944 return False 945 args.remove(name) 946 return True
Given a list of args and a flag name, returns whether it is present.
The arg flag, if present, is removed from the arg list.
959def extract_arg( 960 args: list[str], name: str, required: bool = False 961) -> str | None: 962 """Given a list of args and an arg name, returns a value. 963 964 The arg flag and value are removed from the arg list. 965 raises CleanErrors on any problems. 966 """ 967 from efro.error import CleanError 968 969 count = args.count(name) 970 if not count: 971 if required: 972 raise CleanError(f'Required argument {name} not passed.') 973 return None 974 975 if count > 1: 976 raise CleanError(f'Arg {name} passed multiple times.') 977 978 argindex = args.index(name) 979 if argindex + 1 >= len(args): 980 raise CleanError(f'No value passed after {name} arg.') 981 982 val = args[argindex + 1] 983 del args[argindex : argindex + 2] 984 985 return val
Given a list of args and an arg name, returns a value.
The arg flag and value are removed from the arg list. raises CleanErrors on any problems.