efro.util
Small handy bits of functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Small handy bits of functionality.""" 4 5from __future__ import annotations 6 7import os 8import time 9import weakref 10import datetime 11from enum import Enum 12from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload 13 14if TYPE_CHECKING: 15 import asyncio 16 from typing import Any, Callable, Literal 17 18T = TypeVar('T') 19ValT = TypeVar('ValT') 20ArgT = TypeVar('ArgT') 21SelfT = TypeVar('SelfT') 22RetT = TypeVar('RetT') 23EnumT = TypeVar('EnumT', bound=Enum) 24 25 26class _EmptyObj: 27 pass 28 29 30# A dead weak-ref should be immutable, right? So we can create exactly 31# one and return it for all cases that need an empty weak-ref. 32_g_empty_weak_ref = weakref.ref(_EmptyObj()) 33assert _g_empty_weak_ref() is None 34 35 36def explicit_bool(val: bool) -> bool: 37 """Return a non-inferable boolean value. 38 39 Useful to be able to disable blocks of code without type checkers 40 complaining/etc. 41 """ 42 # pylint: disable=no-else-return 43 if TYPE_CHECKING: 44 # infer this! <boom> 45 import random 46 47 return random.random() < 0.5 48 else: 49 return val 50 51 52def snake_case_to_title(val: str) -> str: 53 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 54 # Kill empty words resulting from leading/trailing/multiple underscores. 55 return ' '.join(w for w in val.split('_') if w).title() 56 57 58def snake_case_to_camel_case(val: str) -> str: 59 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 60 # Replace underscores with spaces; capitalize words; kill spaces. 61 # Not sure about efficiency, but logically simple. 62 return val.replace('_', ' ').title().replace(' ', '') 63 64 65def enum_by_value(cls: type[EnumT], value: Any) -> EnumT: 66 """Create an enum from a value. 67 68 This is basically the same as doing 'obj = EnumType(value)' except 69 that it works around an issue where a reference loop is created 70 if an exception is thrown due to an invalid value. Since we disable 71 the cyclic garbage collector for most of the time, such loops can lead 72 to our objects sticking around longer than we want. 73 This issue has been submitted to Python as a bug so hopefully we can 74 remove this eventually if it gets fixed: https://bugs.python.org/issue42248 75 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this 76 off once we are 3.9+ across the board. 77 """ 78 79 # Note: we don't recreate *ALL* the functionality of the Enum constructor 80 # such as the _missing_ hook; but this should cover our basic needs. 81 value2member_map = getattr(cls, '_value2member_map_') 82 assert value2member_map is not None 83 try: 84 out = value2member_map[value] 85 assert isinstance(out, cls) 86 return out 87 except KeyError: 88 # pylint: disable=consider-using-f-string 89 raise ValueError( 90 '%r is not a valid %s' % (value, cls.__name__) 91 ) from None 92 93 94def check_utc(value: datetime.datetime) -> None: 95 """Ensure a datetime value is timezone-aware utc.""" 96 if value.tzinfo is not datetime.UTC: 97 raise ValueError( 98 'datetime value does not have timezone set as datetime.UTC' 99 ) 100 101 102def utc_now() -> datetime.datetime: 103 """Get timezone-aware current utc time. 104 105 Just a shortcut for datetime.datetime.now(datetime.UTC). 106 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 107 times. 108 """ 109 return datetime.datetime.now(datetime.UTC) 110 111 112def utc_now_naive() -> datetime.datetime: 113 """Get naive utc time. 114 115 This can be used to replace datetime.utcnow(), which is now deprecated. 116 Most all code should migrate to use timezone-aware times instead of 117 this. 118 """ 119 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None) 120 121 122def utc_today() -> datetime.datetime: 123 """Get offset-aware midnight in the utc time zone.""" 124 now = datetime.datetime.now(datetime.UTC) 125 return datetime.datetime( 126 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 127 ) 128 129 130def utc_this_hour() -> datetime.datetime: 131 """Get offset-aware beginning of the current hour in the utc time zone.""" 132 now = datetime.datetime.now(datetime.UTC) 133 return datetime.datetime( 134 year=now.year, 135 month=now.month, 136 day=now.day, 137 hour=now.hour, 138 tzinfo=now.tzinfo, 139 ) 140 141 142def utc_this_minute() -> datetime.datetime: 143 """Get offset-aware beginning of current minute in the utc time zone.""" 144 now = datetime.datetime.now(datetime.UTC) 145 return datetime.datetime( 146 year=now.year, 147 month=now.month, 148 day=now.day, 149 hour=now.hour, 150 minute=now.minute, 151 tzinfo=now.tzinfo, 152 ) 153 154 155def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 156 """Return an invalidated weak-reference for the specified type.""" 157 # At runtime, all weakrefs are the same; our type arg is just 158 # for the static type checker. 159 del objtype # Unused. 160 161 # Just create an object and let it die. Is there a cleaner way to do this? 162 # return weakref.ref(_EmptyObj()) # type: ignore 163 164 # Sharing a single ones seems at least a bit better. 165 return _g_empty_weak_ref # type: ignore 166 167 168def data_size_str(bytecount: int, compact: bool = False) -> str: 169 """Given a size in bytes, returns a short human readable string. 170 171 In compact mode this should be 6 or fewer chars for most all 172 sane file sizes. 173 """ 174 # pylint: disable=too-many-return-statements 175 176 # Special case: handle negatives. 177 if bytecount < 0: 178 val = data_size_str(-bytecount, compact=compact) 179 return f'-{val}' 180 181 if bytecount <= 999: 182 suffix = 'B' if compact else 'bytes' 183 return f'{bytecount} {suffix}' 184 kbytecount = bytecount / 1024 185 if round(kbytecount, 1) < 10.0: 186 return f'{kbytecount:.1f} KB' 187 if round(kbytecount, 0) < 999: 188 return f'{kbytecount:.0f} KB' 189 mbytecount = bytecount / (1024 * 1024) 190 if round(mbytecount, 1) < 10.0: 191 return f'{mbytecount:.1f} MB' 192 if round(mbytecount, 0) < 999: 193 return f'{mbytecount:.0f} MB' 194 gbytecount = bytecount / (1024 * 1024 * 1024) 195 if round(gbytecount, 1) < 10.0: 196 return f'{gbytecount:.1f} GB' 197 return f'{gbytecount:.0f} GB' 198 199 200class DirtyBit: 201 """Manages whether a thing is dirty and regulates attempts to clean it. 202 203 To use, simply set the 'dirty' value on this object to True when some 204 action is needed, and then check the 'should_update' value to regulate 205 when attempts to clean it should be made. Set 'dirty' back to False after 206 a successful update. 207 If 'use_lock' is True, an asyncio Lock will be created and incorporated 208 into update attempts to prevent simultaneous updates (should_update will 209 only return True when the lock is unlocked). Note that It is up to the user 210 to lock/unlock the lock during the actual update attempt. 211 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 212 itself back to dirty after being clean for the given amount of time. 213 'min_update_interval' can be used to enforce a minimum update 214 interval even when updates are successful (retry_interval only applies 215 when updates fail) 216 """ 217 218 def __init__( 219 self, 220 dirty: bool = False, 221 retry_interval: float = 5.0, 222 use_lock: bool = False, 223 auto_dirty_seconds: float | None = None, 224 min_update_interval: float | None = None, 225 ): 226 curtime = time.monotonic() 227 self._retry_interval = retry_interval 228 self._auto_dirty_seconds = auto_dirty_seconds 229 self._min_update_interval = min_update_interval 230 self._dirty = dirty 231 self._next_update_time: float | None = curtime if dirty else None 232 self._last_update_time: float | None = None 233 self._next_auto_dirty_time: float | None = ( 234 (curtime + self._auto_dirty_seconds) 235 if (not dirty and self._auto_dirty_seconds is not None) 236 else None 237 ) 238 self._use_lock = use_lock 239 self.lock: asyncio.Lock 240 if self._use_lock: 241 import asyncio 242 243 self.lock = asyncio.Lock() 244 245 @property 246 def dirty(self) -> bool: 247 """Whether the target is currently dirty. 248 249 This should be set to False once an update is successful. 250 """ 251 return self._dirty 252 253 @dirty.setter 254 def dirty(self, value: bool) -> None: 255 # If we're freshly clean, set our next auto-dirty time (if we have 256 # one). 257 if self._dirty and not value and self._auto_dirty_seconds is not None: 258 self._next_auto_dirty_time = ( 259 time.monotonic() + self._auto_dirty_seconds 260 ) 261 262 # If we're freshly dirty, schedule an immediate update. 263 if not self._dirty and value: 264 self._next_update_time = time.monotonic() 265 266 # If they want to enforce a minimum update interval, 267 # push out the next update time if it hasn't been long enough. 268 if ( 269 self._min_update_interval is not None 270 and self._last_update_time is not None 271 ): 272 self._next_update_time = max( 273 self._next_update_time, 274 self._last_update_time + self._min_update_interval, 275 ) 276 277 self._dirty = value 278 279 @property 280 def should_update(self) -> bool: 281 """Whether an attempt should be made to clean the target now. 282 283 Always returns False if the target is not dirty. 284 Takes into account the amount of time passed since the target 285 was marked dirty or since should_update last returned True. 286 """ 287 curtime = time.monotonic() 288 289 # Auto-dirty ourself if we're into that. 290 if ( 291 self._next_auto_dirty_time is not None 292 and curtime > self._next_auto_dirty_time 293 ): 294 self.dirty = True 295 self._next_auto_dirty_time = None 296 if not self._dirty: 297 return False 298 if self._use_lock and self.lock.locked(): 299 return False 300 assert self._next_update_time is not None 301 if curtime > self._next_update_time: 302 self._next_update_time = curtime + self._retry_interval 303 self._last_update_time = curtime 304 return True 305 return False 306 307 308class DispatchMethodWrapper(Generic[ArgT, RetT]): 309 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 310 311 def __call__(self, arg: ArgT) -> RetT: 312 raise RuntimeError('Should not get here') 313 314 @staticmethod 315 def register( 316 func: Callable[[Any, Any], RetT] 317 ) -> Callable[[Any, Any], RetT]: 318 """Register a new dispatch handler for this dispatch-method.""" 319 raise RuntimeError('Should not get here') 320 321 registry: dict[Any, Callable] 322 323 324# noinspection PyProtectedMember,PyTypeHints 325def dispatchmethod( 326 func: Callable[[Any, ArgT], RetT] 327) -> DispatchMethodWrapper[ArgT, RetT]: 328 """A variation of functools.singledispatch for methods. 329 330 Note: as of Python 3.9 there is now functools.singledispatchmethod, 331 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 332 which gives us a reason to keep this one around for now. 333 """ 334 from functools import singledispatch, update_wrapper 335 336 origwrapper: Any = singledispatch(func) 337 338 # Pull this out so hopefully origwrapper can die, 339 # otherwise we reference origwrapper in our wrapper. 340 dispatch = origwrapper.dispatch 341 342 # All we do here is recreate the end of functools.singledispatch 343 # where it returns a wrapper except instead of the wrapper using the 344 # first arg to the function ours uses the second (to skip 'self'). 345 # This was made against Python 3.7; we should probably check up on 346 # this in later versions in case anything has changed. 347 # (or hopefully they'll add this functionality to their version) 348 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 349 def wrapper(*args: Any, **kw: Any) -> Any: 350 if not args or len(args) < 2: 351 raise TypeError( 352 f'{funcname} requires at least ' '2 positional arguments' 353 ) 354 355 return dispatch(args[1].__class__)(*args, **kw) 356 357 funcname = getattr(func, '__name__', 'dispatchmethod method') 358 wrapper.register = origwrapper.register # type: ignore 359 wrapper.dispatch = dispatch # type: ignore 360 wrapper.registry = origwrapper.registry # type: ignore 361 # pylint: disable=protected-access 362 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 363 update_wrapper(wrapper, func) 364 # pylint: enable=protected-access 365 return cast(DispatchMethodWrapper, wrapper) 366 367 368def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 369 """Decorator for functions to allow dispatching based on a value. 370 371 This differs from functools.singledispatch in that it dispatches based 372 on the value of an argument, not based on its type. 373 The 'register' method of a value-dispatch function can be used 374 to assign new functions to handle particular values. 375 Unhandled values wind up in the original dispatch function.""" 376 return ValueDispatcher(call) 377 378 379class ValueDispatcher(Generic[ValT, RetT]): 380 """Used by the valuedispatch decorator""" 381 382 def __init__(self, call: Callable[[ValT], RetT]) -> None: 383 self._base_call = call 384 self._handlers: dict[ValT, Callable[[], RetT]] = {} 385 386 def __call__(self, value: ValT) -> RetT: 387 handler = self._handlers.get(value) 388 if handler is not None: 389 return handler() 390 return self._base_call(value) 391 392 def _add_handler( 393 self, value: ValT, call: Callable[[], RetT] 394 ) -> Callable[[], RetT]: 395 if value in self._handlers: 396 raise RuntimeError(f'Duplicate handlers added for {value}') 397 self._handlers[value] = call 398 return call 399 400 def register( 401 self, value: ValT 402 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 403 """Add a handler to the dispatcher.""" 404 from functools import partial 405 406 return partial(self._add_handler, value) 407 408 409def valuedispatch1arg( 410 call: Callable[[ValT, ArgT], RetT] 411) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 412 """Like valuedispatch but for functions taking an extra argument.""" 413 return ValueDispatcher1Arg(call) 414 415 416class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 417 """Used by the valuedispatch1arg decorator""" 418 419 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 420 self._base_call = call 421 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 422 423 def __call__(self, value: ValT, arg: ArgT) -> RetT: 424 handler = self._handlers.get(value) 425 if handler is not None: 426 return handler(arg) 427 return self._base_call(value, arg) 428 429 def _add_handler( 430 self, value: ValT, call: Callable[[ArgT], RetT] 431 ) -> Callable[[ArgT], RetT]: 432 if value in self._handlers: 433 raise RuntimeError(f'Duplicate handlers added for {value}') 434 self._handlers[value] = call 435 return call 436 437 def register( 438 self, value: ValT 439 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 440 """Add a handler to the dispatcher.""" 441 from functools import partial 442 443 return partial(self._add_handler, value) 444 445 446if TYPE_CHECKING: 447 448 class ValueDispatcherMethod(Generic[ValT, RetT]): 449 """Used by the valuedispatchmethod decorator.""" 450 451 def __call__(self, value: ValT) -> RetT: ... 452 453 def register( 454 self, value: ValT 455 ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]: 456 """Add a handler to the dispatcher.""" 457 ... 458 459 460def valuedispatchmethod( 461 call: Callable[[SelfT, ValT], RetT] 462) -> ValueDispatcherMethod[ValT, RetT]: 463 """Like valuedispatch but works with methods instead of functions.""" 464 465 # NOTE: It seems that to wrap a method with a decorator and have self 466 # dispatching do the right thing, we must return a function and not 467 # an executable object. So for this version we store our data here 468 # in the function call dict and simply return a call. 469 470 _base_call = call 471 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 472 473 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 474 if value in _handlers: 475 raise RuntimeError(f'Duplicate handlers added for {value}') 476 _handlers[value] = addcall 477 478 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 479 from functools import partial 480 481 return partial(_add_handler, value) 482 483 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 484 handler = _handlers.get(value) 485 if handler is not None: 486 return handler(self) 487 return _base_call(self, value) 488 489 # We still want to use our returned object to register handlers, but we're 490 # actually just returning a function. So manually stuff the call onto it. 491 setattr(_call_wrapper, 'register', _register) 492 493 # To the type checker's eyes we return a ValueDispatchMethod instance; 494 # this lets it know about our register func and type-check its usage. 495 # In reality we just return a raw function call (for reasons listed above). 496 # pylint: disable=undefined-variable, no-else-return 497 if TYPE_CHECKING: 498 return ValueDispatcherMethod[ValT, RetT]() 499 else: 500 return _call_wrapper 501 502 503def make_hash(obj: Any) -> int: 504 """Makes a hash from a dictionary, list, tuple or set to any level, 505 that contains only other hashable types (including any lists, tuples, 506 sets, and dictionaries). 507 508 Note that this uses Python's hash() function internally so collisions/etc. 509 may be more common than with fancy cryptographic hashes. 510 511 Also be aware that Python's hash() output varies across processes, so 512 this should only be used for values that will remain in a single process. 513 """ 514 import copy 515 516 if isinstance(obj, (set, tuple, list)): 517 return hash(tuple(make_hash(e) for e in obj)) 518 if not isinstance(obj, dict): 519 return hash(obj) 520 521 new_obj = copy.deepcopy(obj) 522 for k, v in new_obj.items(): 523 new_obj[k] = make_hash(v) 524 525 # NOTE: there is sorted works correctly because it compares only 526 # unique first values (i.e. dict keys) 527 return hash(tuple(frozenset(sorted(new_obj.items())))) 528 529 530def float_hash_from_string(s: str) -> float: 531 """Given a string value, returns a float between 0 and 1. 532 533 If consistent across processes. Can be useful for assigning db ids 534 shard values for efficient parallel processing. 535 """ 536 import hashlib 537 538 hash_bytes = hashlib.md5(s.encode()).digest() 539 540 # Generate a random 64 bit int from hash digest bytes. 541 ival = int.from_bytes(hash_bytes[:8]) 542 return ival / ((1 << 64) - 1) 543 544 545def asserttype(obj: Any, typ: type[T]) -> T: 546 """Return an object typed as a given type. 547 548 Assert is used to check its actual type, so only use this when 549 failures are not expected. Otherwise use checktype. 550 """ 551 assert isinstance(typ, type), 'only actual types accepted' 552 assert isinstance(obj, typ) 553 return obj 554 555 556def asserttype_o(obj: Any, typ: type[T]) -> T | None: 557 """Return an object typed as a given optional type. 558 559 Assert is used to check its actual type, so only use this when 560 failures are not expected. Otherwise use checktype. 561 """ 562 assert isinstance(typ, type), 'only actual types accepted' 563 assert isinstance(obj, (typ, type(None))) 564 return obj 565 566 567def checktype(obj: Any, typ: type[T]) -> T: 568 """Return an object typed as a given type. 569 570 Always checks the type at runtime with isinstance and throws a TypeError 571 on failure. Use asserttype for more efficient (but less safe) equivalent. 572 """ 573 assert isinstance(typ, type), 'only actual types accepted' 574 if not isinstance(obj, typ): 575 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 576 return obj 577 578 579def checktype_o(obj: Any, typ: type[T]) -> T | None: 580 """Return an object typed as a given optional type. 581 582 Always checks the type at runtime with isinstance and throws a TypeError 583 on failure. Use asserttype for more efficient (but less safe) equivalent. 584 """ 585 assert isinstance(typ, type), 'only actual types accepted' 586 if not isinstance(obj, (typ, type(None))): 587 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 588 return obj 589 590 591def warntype(obj: Any, typ: type[T]) -> T: 592 """Return an object typed as a given type. 593 594 Always checks the type at runtime and simply logs a warning if it is 595 not what is expected. 596 """ 597 assert isinstance(typ, type), 'only actual types accepted' 598 if not isinstance(obj, typ): 599 import logging 600 601 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 602 return obj # type: ignore 603 604 605def warntype_o(obj: Any, typ: type[T]) -> T | None: 606 """Return an object typed as a given type. 607 608 Always checks the type at runtime and simply logs a warning if it is 609 not what is expected. 610 """ 611 assert isinstance(typ, type), 'only actual types accepted' 612 if not isinstance(obj, (typ, type(None))): 613 import logging 614 615 logging.warning( 616 'warntype: expected a %s or None, got a %s', typ, type(obj) 617 ) 618 return obj # type: ignore 619 620 621def assert_non_optional(obj: T | None) -> T: 622 """Return an object with Optional typing removed. 623 624 Assert is used to check its actual type, so only use this when 625 failures are not expected. Use check_non_optional otherwise. 626 """ 627 assert obj is not None 628 return obj 629 630 631def check_non_optional(obj: T | None) -> T: 632 """Return an object with Optional typing removed. 633 634 Always checks the actual type and throws a TypeError on failure. 635 Use assert_non_optional for a more efficient (but less safe) equivalent. 636 """ 637 if obj is None: 638 raise ValueError('Got None value in check_non_optional.') 639 return obj 640 641 642def smoothstep(edge0: float, edge1: float, x: float) -> float: 643 """A smooth transition function. 644 645 Returns a value that smoothly moves from 0 to 1 as we go between edges. 646 Values outside of the range return 0 or 1. 647 """ 648 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 649 return y * y * (3.0 - 2.0 * y) 650 651 652def linearstep(edge0: float, edge1: float, x: float) -> float: 653 """A linear transition function. 654 655 Returns a value that linearly moves from 0 to 1 as we go between edges. 656 Values outside of the range return 0 or 1. 657 """ 658 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) 659 660 661def _compact_id(num: int, chars: str) -> str: 662 if num < 0: 663 raise ValueError('Negative integers not allowed.') 664 665 # Chars must be in sorted order for sorting to work correctly 666 # on our output. 667 assert ''.join(sorted(list(chars))) == chars 668 669 base = len(chars) 670 out = '' 671 while num: 672 out += chars[num % base] 673 num //= base 674 return out[::-1] or '0' 675 676 677def human_readable_compact_id(num: int) -> str: 678 """Given a positive int, return a compact string representation for it. 679 680 Handy for visualizing unique numeric ids using as few as possible chars. 681 This representation uses only lowercase letters and numbers (minus the 682 following letters for readability): 683 's' is excluded due to similarity to '5'. 684 'l' is excluded due to similarity to '1'. 685 'i' is excluded due to similarity to '1'. 686 'o' is excluded due to similarity to '0'. 687 'z' is excluded due to similarity to '2'. 688 689 Therefore for n chars this can store values of 21^n. 690 691 When reading human input consisting of these IDs, it may be desirable 692 to map the disallowed chars to their corresponding allowed ones 693 ('o' -> '0', etc). 694 695 Sort order for these ids is the same as the original numbers. 696 697 If more compactness is desired at the expense of readability, 698 use compact_id() instead. 699 """ 700 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy') 701 702 703def compact_id(num: int) -> str: 704 """Given a positive int, return a compact string representation for it. 705 706 Handy for visualizing unique numeric ids using as few as possible chars. 707 This version is more compact than human_readable_compact_id() but less 708 friendly to humans due to using both capital and lowercase letters, 709 both 'O' and '0', etc. 710 711 Therefore for n chars this can store values of 62^n. 712 713 Sort order for these ids is the same as the original numbers. 714 """ 715 return _compact_id( 716 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 717 ) 718 719 720def caller_source_location() -> str: 721 """Returns source file name and line of the code calling us. 722 723 Example: 'mymodule.py:23' 724 """ 725 try: 726 import inspect 727 728 frame = inspect.currentframe() 729 for _i in range(2): 730 if frame is None: 731 raise RuntimeError() 732 frame = frame.f_back 733 if frame is None: 734 raise RuntimeError() 735 fname = os.path.basename(frame.f_code.co_filename) 736 return f'{fname}:{frame.f_lineno}' 737 except Exception: 738 return '<unknown source location>' 739 740 741def unchanging_hostname() -> str: 742 """Return an unchanging name for the local device. 743 744 Similar to the `hostname` call (or os.uname().nodename in Python) 745 except attempts to give a name that doesn't change depending on 746 network conditions. (A Mac will tend to go from Foo to Foo.local, 747 Foo.lan etc. throughout its various adventures) 748 """ 749 import platform 750 import subprocess 751 752 # On Mac, this should give the computer name assigned in System Prefs. 753 if platform.system() == 'Darwin': 754 return ( 755 subprocess.run( 756 ['scutil', '--get', 'ComputerName'], 757 check=True, 758 capture_output=True, 759 ) 760 .stdout.decode() 761 .strip() 762 .replace(' ', '-') 763 ) 764 return os.uname().nodename 765 766 767def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 768 """Do the thing.""" 769 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 770 return 771 772 modulename = module_globals.get('__name__') 773 if not isinstance(modulename, str): 774 raise RuntimeError('Unable to get module name.') 775 assert not modulename.startswith('_') 776 modulename_prefix = f'{modulename}.' 777 modulename_prefix_2 = f'_{modulename}.' 778 779 for name, obj in module_globals.items(): 780 if name.startswith('_'): 781 continue 782 existing = getattr(obj, '__module__', None) 783 try: 784 # Override the module ONLY if it lives under us somewhere. 785 # So ourpackage._submodule.Foo becomes ourpackage.Foo 786 # but otherpackage._submodule.Foo remains untouched. 787 if existing is not None and ( 788 existing.startswith(modulename_prefix) 789 or existing.startswith(modulename_prefix_2) 790 ): 791 obj.__module__ = modulename 792 except Exception: 793 import logging 794 795 logging.warning( 796 'set_canonical_module_names: unable to change __module__' 797 " from '%s' to '%s' on %s object at '%s'.", 798 existing, 799 modulename, 800 type(obj), 801 name, 802 ) 803 804 805def timedelta_str( 806 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 807) -> str: 808 """Return a simple human readable time string for a length of time. 809 810 Time can be given as a timedelta or a float representing seconds. 811 Example output: 812 "23d 1h 2m 32s" (with maxparts == 4) 813 "23d 1h" (with maxparts == 2) 814 "23d 1.08h" (with maxparts == 2 and decimals == 2) 815 816 Note that this is hard-coded in English and probably not especially 817 performant. 818 """ 819 # pylint: disable=too-many-locals 820 821 if isinstance(timeval, float): 822 timevalfin = datetime.timedelta(seconds=timeval) 823 else: 824 timevalfin = timeval 825 826 # Internally we only handle positive values. 827 if timevalfin.total_seconds() < 0: 828 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 829 830 years = timevalfin.days // 365 831 days = timevalfin.days % 365 832 hours = timevalfin.seconds // 3600 833 hour_remainder = timevalfin.seconds % 3600 834 minutes = hour_remainder // 60 835 seconds = hour_remainder % 60 836 837 # Now, if we want decimal places for our last value, 838 # calc fractional parts. 839 if decimals: 840 # Calc totals of each type. 841 t_seconds = timevalfin.total_seconds() 842 t_minutes = t_seconds / 60 843 t_hours = t_minutes / 60 844 t_days = t_hours / 24 845 t_years = t_days / 365 846 847 # Calc fractional parts that exclude all whole values to their left. 848 years_covered = years 849 years_f = t_years - years_covered 850 days_covered = years_covered * 365 + days 851 days_f = t_days - days_covered 852 hours_covered = days_covered * 24 + hours 853 hours_f = t_hours - hours_covered 854 minutes_covered = hours_covered * 60 + minutes 855 minutes_f = t_minutes - minutes_covered 856 seconds_covered = minutes_covered * 60 + seconds 857 seconds_f = t_seconds - seconds_covered 858 else: 859 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 860 861 parts: list[str] = [] 862 for part, part_f, suffix in ( 863 (years, years_f, 'y'), 864 (days, days_f, 'd'), 865 (hours, hours_f, 'h'), 866 (minutes, minutes_f, 'm'), 867 (seconds, seconds_f, 's'), 868 ): 869 if part or parts or (not parts and suffix == 's'): 870 # Do decimal version only for the last part. 871 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 872 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 873 else: 874 parts.append(f'{part}{suffix}') 875 if len(parts) >= maxparts: 876 break 877 return ' '.join(parts) 878 879 880def ago_str( 881 timeval: datetime.datetime, 882 maxparts: int = 1, 883 now: datetime.datetime | None = None, 884 decimals: int = 0, 885) -> str: 886 """Given a datetime, return a clean human readable 'ago' str. 887 888 Note that this is hard-coded in English so should not be used 889 for visible in-game elements; only tools/etc. 890 891 If now is not passed, efro.util.utc_now() is used. 892 """ 893 if now is None: 894 now = utc_now() 895 return ( 896 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 897 + ' ago' 898 ) 899 900 901def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 902 """Split a single list into smaller lists.""" 903 return [ 904 input_list[i : i + max_length] 905 for i in range(0, len(input_list), max_length) 906 ] 907 908 909def extract_flag(args: list[str], name: str) -> bool: 910 """Given a list of args and a flag name, returns whether it is present. 911 912 The arg flag, if present, is removed from the arg list. 913 """ 914 from efro.error import CleanError 915 916 count = args.count(name) 917 if count > 1: 918 raise CleanError(f'Flag {name} passed multiple times.') 919 if not count: 920 return False 921 args.remove(name) 922 return True 923 924 925@overload 926def extract_arg( 927 args: list[str], name: str, required: Literal[False] = False 928) -> str | None: ... 929 930 931@overload 932def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ... 933 934 935def extract_arg( 936 args: list[str], name: str, required: bool = False 937) -> str | None: 938 """Given a list of args and an arg name, returns a value. 939 940 The arg flag and value are removed from the arg list. 941 raises CleanErrors on any problems. 942 """ 943 from efro.error import CleanError 944 945 count = args.count(name) 946 if not count: 947 if required: 948 raise CleanError(f'Required argument {name} not passed.') 949 return None 950 951 if count > 1: 952 raise CleanError(f'Arg {name} passed multiple times.') 953 954 argindex = args.index(name) 955 if argindex + 1 >= len(args): 956 raise CleanError(f'No value passed after {name} arg.') 957 958 val = args[argindex + 1] 959 del args[argindex : argindex + 2] 960 961 return val
37def explicit_bool(val: bool) -> bool: 38 """Return a non-inferable boolean value. 39 40 Useful to be able to disable blocks of code without type checkers 41 complaining/etc. 42 """ 43 # pylint: disable=no-else-return 44 if TYPE_CHECKING: 45 # infer this! <boom> 46 import random 47 48 return random.random() < 0.5 49 else: 50 return val
Return a non-inferable boolean value.
Useful to be able to disable blocks of code without type checkers complaining/etc.
53def snake_case_to_title(val: str) -> str: 54 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 55 # Kill empty words resulting from leading/trailing/multiple underscores. 56 return ' '.join(w for w in val.split('_') if w).title()
Given a snake-case string 'foo_bar', returns 'Foo Bar'.
59def snake_case_to_camel_case(val: str) -> str: 60 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 61 # Replace underscores with spaces; capitalize words; kill spaces. 62 # Not sure about efficiency, but logically simple. 63 return val.replace('_', ' ').title().replace(' ', '')
Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.
66def enum_by_value(cls: type[EnumT], value: Any) -> EnumT: 67 """Create an enum from a value. 68 69 This is basically the same as doing 'obj = EnumType(value)' except 70 that it works around an issue where a reference loop is created 71 if an exception is thrown due to an invalid value. Since we disable 72 the cyclic garbage collector for most of the time, such loops can lead 73 to our objects sticking around longer than we want. 74 This issue has been submitted to Python as a bug so hopefully we can 75 remove this eventually if it gets fixed: https://bugs.python.org/issue42248 76 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this 77 off once we are 3.9+ across the board. 78 """ 79 80 # Note: we don't recreate *ALL* the functionality of the Enum constructor 81 # such as the _missing_ hook; but this should cover our basic needs. 82 value2member_map = getattr(cls, '_value2member_map_') 83 assert value2member_map is not None 84 try: 85 out = value2member_map[value] 86 assert isinstance(out, cls) 87 return out 88 except KeyError: 89 # pylint: disable=consider-using-f-string 90 raise ValueError( 91 '%r is not a valid %s' % (value, cls.__name__) 92 ) from None
Create an enum from a value.
This is basically the same as doing 'obj = EnumType(value)' except that it works around an issue where a reference loop is created if an exception is thrown due to an invalid value. Since we disable the cyclic garbage collector for most of the time, such loops can lead to our objects sticking around longer than we want. This issue has been submitted to Python as a bug so hopefully we can remove this eventually if it gets fixed: https://bugs.python.org/issue42248 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this off once we are 3.9+ across the board.
95def check_utc(value: datetime.datetime) -> None: 96 """Ensure a datetime value is timezone-aware utc.""" 97 if value.tzinfo is not datetime.UTC: 98 raise ValueError( 99 'datetime value does not have timezone set as datetime.UTC' 100 )
Ensure a datetime value is timezone-aware utc.
103def utc_now() -> datetime.datetime: 104 """Get timezone-aware current utc time. 105 106 Just a shortcut for datetime.datetime.now(datetime.UTC). 107 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 108 times. 109 """ 110 return datetime.datetime.now(datetime.UTC)
Get timezone-aware current utc time.
Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.
113def utc_now_naive() -> datetime.datetime: 114 """Get naive utc time. 115 116 This can be used to replace datetime.utcnow(), which is now deprecated. 117 Most all code should migrate to use timezone-aware times instead of 118 this. 119 """ 120 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
Get naive utc time.
This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.
123def utc_today() -> datetime.datetime: 124 """Get offset-aware midnight in the utc time zone.""" 125 now = datetime.datetime.now(datetime.UTC) 126 return datetime.datetime( 127 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 128 )
Get offset-aware midnight in the utc time zone.
131def utc_this_hour() -> datetime.datetime: 132 """Get offset-aware beginning of the current hour in the utc time zone.""" 133 now = datetime.datetime.now(datetime.UTC) 134 return datetime.datetime( 135 year=now.year, 136 month=now.month, 137 day=now.day, 138 hour=now.hour, 139 tzinfo=now.tzinfo, 140 )
Get offset-aware beginning of the current hour in the utc time zone.
143def utc_this_minute() -> datetime.datetime: 144 """Get offset-aware beginning of current minute in the utc time zone.""" 145 now = datetime.datetime.now(datetime.UTC) 146 return datetime.datetime( 147 year=now.year, 148 month=now.month, 149 day=now.day, 150 hour=now.hour, 151 minute=now.minute, 152 tzinfo=now.tzinfo, 153 )
Get offset-aware beginning of current minute in the utc time zone.
156def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 157 """Return an invalidated weak-reference for the specified type.""" 158 # At runtime, all weakrefs are the same; our type arg is just 159 # for the static type checker. 160 del objtype # Unused. 161 162 # Just create an object and let it die. Is there a cleaner way to do this? 163 # return weakref.ref(_EmptyObj()) # type: ignore 164 165 # Sharing a single ones seems at least a bit better. 166 return _g_empty_weak_ref # type: ignore
Return an invalidated weak-reference for the specified type.
169def data_size_str(bytecount: int, compact: bool = False) -> str: 170 """Given a size in bytes, returns a short human readable string. 171 172 In compact mode this should be 6 or fewer chars for most all 173 sane file sizes. 174 """ 175 # pylint: disable=too-many-return-statements 176 177 # Special case: handle negatives. 178 if bytecount < 0: 179 val = data_size_str(-bytecount, compact=compact) 180 return f'-{val}' 181 182 if bytecount <= 999: 183 suffix = 'B' if compact else 'bytes' 184 return f'{bytecount} {suffix}' 185 kbytecount = bytecount / 1024 186 if round(kbytecount, 1) < 10.0: 187 return f'{kbytecount:.1f} KB' 188 if round(kbytecount, 0) < 999: 189 return f'{kbytecount:.0f} KB' 190 mbytecount = bytecount / (1024 * 1024) 191 if round(mbytecount, 1) < 10.0: 192 return f'{mbytecount:.1f} MB' 193 if round(mbytecount, 0) < 999: 194 return f'{mbytecount:.0f} MB' 195 gbytecount = bytecount / (1024 * 1024 * 1024) 196 if round(gbytecount, 1) < 10.0: 197 return f'{gbytecount:.1f} GB' 198 return f'{gbytecount:.0f} GB'
Given a size in bytes, returns a short human readable string.
In compact mode this should be 6 or fewer chars for most all sane file sizes.
201class DirtyBit: 202 """Manages whether a thing is dirty and regulates attempts to clean it. 203 204 To use, simply set the 'dirty' value on this object to True when some 205 action is needed, and then check the 'should_update' value to regulate 206 when attempts to clean it should be made. Set 'dirty' back to False after 207 a successful update. 208 If 'use_lock' is True, an asyncio Lock will be created and incorporated 209 into update attempts to prevent simultaneous updates (should_update will 210 only return True when the lock is unlocked). Note that It is up to the user 211 to lock/unlock the lock during the actual update attempt. 212 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 213 itself back to dirty after being clean for the given amount of time. 214 'min_update_interval' can be used to enforce a minimum update 215 interval even when updates are successful (retry_interval only applies 216 when updates fail) 217 """ 218 219 def __init__( 220 self, 221 dirty: bool = False, 222 retry_interval: float = 5.0, 223 use_lock: bool = False, 224 auto_dirty_seconds: float | None = None, 225 min_update_interval: float | None = None, 226 ): 227 curtime = time.monotonic() 228 self._retry_interval = retry_interval 229 self._auto_dirty_seconds = auto_dirty_seconds 230 self._min_update_interval = min_update_interval 231 self._dirty = dirty 232 self._next_update_time: float | None = curtime if dirty else None 233 self._last_update_time: float | None = None 234 self._next_auto_dirty_time: float | None = ( 235 (curtime + self._auto_dirty_seconds) 236 if (not dirty and self._auto_dirty_seconds is not None) 237 else None 238 ) 239 self._use_lock = use_lock 240 self.lock: asyncio.Lock 241 if self._use_lock: 242 import asyncio 243 244 self.lock = asyncio.Lock() 245 246 @property 247 def dirty(self) -> bool: 248 """Whether the target is currently dirty. 249 250 This should be set to False once an update is successful. 251 """ 252 return self._dirty 253 254 @dirty.setter 255 def dirty(self, value: bool) -> None: 256 # If we're freshly clean, set our next auto-dirty time (if we have 257 # one). 258 if self._dirty and not value and self._auto_dirty_seconds is not None: 259 self._next_auto_dirty_time = ( 260 time.monotonic() + self._auto_dirty_seconds 261 ) 262 263 # If we're freshly dirty, schedule an immediate update. 264 if not self._dirty and value: 265 self._next_update_time = time.monotonic() 266 267 # If they want to enforce a minimum update interval, 268 # push out the next update time if it hasn't been long enough. 269 if ( 270 self._min_update_interval is not None 271 and self._last_update_time is not None 272 ): 273 self._next_update_time = max( 274 self._next_update_time, 275 self._last_update_time + self._min_update_interval, 276 ) 277 278 self._dirty = value 279 280 @property 281 def should_update(self) -> bool: 282 """Whether an attempt should be made to clean the target now. 283 284 Always returns False if the target is not dirty. 285 Takes into account the amount of time passed since the target 286 was marked dirty or since should_update last returned True. 287 """ 288 curtime = time.monotonic() 289 290 # Auto-dirty ourself if we're into that. 291 if ( 292 self._next_auto_dirty_time is not None 293 and curtime > self._next_auto_dirty_time 294 ): 295 self.dirty = True 296 self._next_auto_dirty_time = None 297 if not self._dirty: 298 return False 299 if self._use_lock and self.lock.locked(): 300 return False 301 assert self._next_update_time is not None 302 if curtime > self._next_update_time: 303 self._next_update_time = curtime + self._retry_interval 304 self._last_update_time = curtime 305 return True 306 return False
Manages whether a thing is dirty and regulates attempts to clean it.
To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
219 def __init__( 220 self, 221 dirty: bool = False, 222 retry_interval: float = 5.0, 223 use_lock: bool = False, 224 auto_dirty_seconds: float | None = None, 225 min_update_interval: float | None = None, 226 ): 227 curtime = time.monotonic() 228 self._retry_interval = retry_interval 229 self._auto_dirty_seconds = auto_dirty_seconds 230 self._min_update_interval = min_update_interval 231 self._dirty = dirty 232 self._next_update_time: float | None = curtime if dirty else None 233 self._last_update_time: float | None = None 234 self._next_auto_dirty_time: float | None = ( 235 (curtime + self._auto_dirty_seconds) 236 if (not dirty and self._auto_dirty_seconds is not None) 237 else None 238 ) 239 self._use_lock = use_lock 240 self.lock: asyncio.Lock 241 if self._use_lock: 242 import asyncio 243 244 self.lock = asyncio.Lock()
246 @property 247 def dirty(self) -> bool: 248 """Whether the target is currently dirty. 249 250 This should be set to False once an update is successful. 251 """ 252 return self._dirty
Whether the target is currently dirty.
This should be set to False once an update is successful.
280 @property 281 def should_update(self) -> bool: 282 """Whether an attempt should be made to clean the target now. 283 284 Always returns False if the target is not dirty. 285 Takes into account the amount of time passed since the target 286 was marked dirty or since should_update last returned True. 287 """ 288 curtime = time.monotonic() 289 290 # Auto-dirty ourself if we're into that. 291 if ( 292 self._next_auto_dirty_time is not None 293 and curtime > self._next_auto_dirty_time 294 ): 295 self.dirty = True 296 self._next_auto_dirty_time = None 297 if not self._dirty: 298 return False 299 if self._use_lock and self.lock.locked(): 300 return False 301 assert self._next_update_time is not None 302 if curtime > self._next_update_time: 303 self._next_update_time = curtime + self._retry_interval 304 self._last_update_time = curtime 305 return True 306 return False
Whether an attempt should be made to clean the target now.
Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.
309class DispatchMethodWrapper(Generic[ArgT, RetT]): 310 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 311 312 def __call__(self, arg: ArgT) -> RetT: 313 raise RuntimeError('Should not get here') 314 315 @staticmethod 316 def register( 317 func: Callable[[Any, Any], RetT] 318 ) -> Callable[[Any, Any], RetT]: 319 """Register a new dispatch handler for this dispatch-method.""" 320 raise RuntimeError('Should not get here') 321 322 registry: dict[Any, Callable]
Type-aware standin for the dispatch func returned by dispatchmethod.
315 @staticmethod 316 def register( 317 func: Callable[[Any, Any], RetT] 318 ) -> Callable[[Any, Any], RetT]: 319 """Register a new dispatch handler for this dispatch-method.""" 320 raise RuntimeError('Should not get here')
Register a new dispatch handler for this dispatch-method.
326def dispatchmethod( 327 func: Callable[[Any, ArgT], RetT] 328) -> DispatchMethodWrapper[ArgT, RetT]: 329 """A variation of functools.singledispatch for methods. 330 331 Note: as of Python 3.9 there is now functools.singledispatchmethod, 332 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 333 which gives us a reason to keep this one around for now. 334 """ 335 from functools import singledispatch, update_wrapper 336 337 origwrapper: Any = singledispatch(func) 338 339 # Pull this out so hopefully origwrapper can die, 340 # otherwise we reference origwrapper in our wrapper. 341 dispatch = origwrapper.dispatch 342 343 # All we do here is recreate the end of functools.singledispatch 344 # where it returns a wrapper except instead of the wrapper using the 345 # first arg to the function ours uses the second (to skip 'self'). 346 # This was made against Python 3.7; we should probably check up on 347 # this in later versions in case anything has changed. 348 # (or hopefully they'll add this functionality to their version) 349 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 350 def wrapper(*args: Any, **kw: Any) -> Any: 351 if not args or len(args) < 2: 352 raise TypeError( 353 f'{funcname} requires at least ' '2 positional arguments' 354 ) 355 356 return dispatch(args[1].__class__)(*args, **kw) 357 358 funcname = getattr(func, '__name__', 'dispatchmethod method') 359 wrapper.register = origwrapper.register # type: ignore 360 wrapper.dispatch = dispatch # type: ignore 361 wrapper.registry = origwrapper.registry # type: ignore 362 # pylint: disable=protected-access 363 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 364 update_wrapper(wrapper, func) 365 # pylint: enable=protected-access 366 return cast(DispatchMethodWrapper, wrapper)
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
369def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 370 """Decorator for functions to allow dispatching based on a value. 371 372 This differs from functools.singledispatch in that it dispatches based 373 on the value of an argument, not based on its type. 374 The 'register' method of a value-dispatch function can be used 375 to assign new functions to handle particular values. 376 Unhandled values wind up in the original dispatch function.""" 377 return ValueDispatcher(call)
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
380class ValueDispatcher(Generic[ValT, RetT]): 381 """Used by the valuedispatch decorator""" 382 383 def __init__(self, call: Callable[[ValT], RetT]) -> None: 384 self._base_call = call 385 self._handlers: dict[ValT, Callable[[], RetT]] = {} 386 387 def __call__(self, value: ValT) -> RetT: 388 handler = self._handlers.get(value) 389 if handler is not None: 390 return handler() 391 return self._base_call(value) 392 393 def _add_handler( 394 self, value: ValT, call: Callable[[], RetT] 395 ) -> Callable[[], RetT]: 396 if value in self._handlers: 397 raise RuntimeError(f'Duplicate handlers added for {value}') 398 self._handlers[value] = call 399 return call 400 401 def register( 402 self, value: ValT 403 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 404 """Add a handler to the dispatcher.""" 405 from functools import partial 406 407 return partial(self._add_handler, value)
Used by the valuedispatch decorator
401 def register( 402 self, value: ValT 403 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 404 """Add a handler to the dispatcher.""" 405 from functools import partial 406 407 return partial(self._add_handler, value)
Add a handler to the dispatcher.
410def valuedispatch1arg( 411 call: Callable[[ValT, ArgT], RetT] 412) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 413 """Like valuedispatch but for functions taking an extra argument.""" 414 return ValueDispatcher1Arg(call)
Like valuedispatch but for functions taking an extra argument.
417class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 418 """Used by the valuedispatch1arg decorator""" 419 420 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 421 self._base_call = call 422 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 423 424 def __call__(self, value: ValT, arg: ArgT) -> RetT: 425 handler = self._handlers.get(value) 426 if handler is not None: 427 return handler(arg) 428 return self._base_call(value, arg) 429 430 def _add_handler( 431 self, value: ValT, call: Callable[[ArgT], RetT] 432 ) -> Callable[[ArgT], RetT]: 433 if value in self._handlers: 434 raise RuntimeError(f'Duplicate handlers added for {value}') 435 self._handlers[value] = call 436 return call 437 438 def register( 439 self, value: ValT 440 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 441 """Add a handler to the dispatcher.""" 442 from functools import partial 443 444 return partial(self._add_handler, value)
Used by the valuedispatch1arg decorator
438 def register( 439 self, value: ValT 440 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 441 """Add a handler to the dispatcher.""" 442 from functools import partial 443 444 return partial(self._add_handler, value)
Add a handler to the dispatcher.
461def valuedispatchmethod( 462 call: Callable[[SelfT, ValT], RetT] 463) -> ValueDispatcherMethod[ValT, RetT]: 464 """Like valuedispatch but works with methods instead of functions.""" 465 466 # NOTE: It seems that to wrap a method with a decorator and have self 467 # dispatching do the right thing, we must return a function and not 468 # an executable object. So for this version we store our data here 469 # in the function call dict and simply return a call. 470 471 _base_call = call 472 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 473 474 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 475 if value in _handlers: 476 raise RuntimeError(f'Duplicate handlers added for {value}') 477 _handlers[value] = addcall 478 479 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 480 from functools import partial 481 482 return partial(_add_handler, value) 483 484 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 485 handler = _handlers.get(value) 486 if handler is not None: 487 return handler(self) 488 return _base_call(self, value) 489 490 # We still want to use our returned object to register handlers, but we're 491 # actually just returning a function. So manually stuff the call onto it. 492 setattr(_call_wrapper, 'register', _register) 493 494 # To the type checker's eyes we return a ValueDispatchMethod instance; 495 # this lets it know about our register func and type-check its usage. 496 # In reality we just return a raw function call (for reasons listed above). 497 # pylint: disable=undefined-variable, no-else-return 498 if TYPE_CHECKING: 499 return ValueDispatcherMethod[ValT, RetT]() 500 else: 501 return _call_wrapper
Like valuedispatch but works with methods instead of functions.
504def make_hash(obj: Any) -> int: 505 """Makes a hash from a dictionary, list, tuple or set to any level, 506 that contains only other hashable types (including any lists, tuples, 507 sets, and dictionaries). 508 509 Note that this uses Python's hash() function internally so collisions/etc. 510 may be more common than with fancy cryptographic hashes. 511 512 Also be aware that Python's hash() output varies across processes, so 513 this should only be used for values that will remain in a single process. 514 """ 515 import copy 516 517 if isinstance(obj, (set, tuple, list)): 518 return hash(tuple(make_hash(e) for e in obj)) 519 if not isinstance(obj, dict): 520 return hash(obj) 521 522 new_obj = copy.deepcopy(obj) 523 for k, v in new_obj.items(): 524 new_obj[k] = make_hash(v) 525 526 # NOTE: there is sorted works correctly because it compares only 527 # unique first values (i.e. dict keys) 528 return hash(tuple(frozenset(sorted(new_obj.items()))))
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.
531def float_hash_from_string(s: str) -> float: 532 """Given a string value, returns a float between 0 and 1. 533 534 If consistent across processes. Can be useful for assigning db ids 535 shard values for efficient parallel processing. 536 """ 537 import hashlib 538 539 hash_bytes = hashlib.md5(s.encode()).digest() 540 541 # Generate a random 64 bit int from hash digest bytes. 542 ival = int.from_bytes(hash_bytes[:8]) 543 return ival / ((1 << 64) - 1)
Given a string value, returns a float between 0 and 1.
If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.
546def asserttype(obj: Any, typ: type[T]) -> T: 547 """Return an object typed as a given type. 548 549 Assert is used to check its actual type, so only use this when 550 failures are not expected. Otherwise use checktype. 551 """ 552 assert isinstance(typ, type), 'only actual types accepted' 553 assert isinstance(obj, typ) 554 return obj
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
557def asserttype_o(obj: Any, typ: type[T]) -> T | None: 558 """Return an object typed as a given optional type. 559 560 Assert is used to check its actual type, so only use this when 561 failures are not expected. Otherwise use checktype. 562 """ 563 assert isinstance(typ, type), 'only actual types accepted' 564 assert isinstance(obj, (typ, type(None))) 565 return obj
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
568def checktype(obj: Any, typ: type[T]) -> T: 569 """Return an object typed as a given type. 570 571 Always checks the type at runtime with isinstance and throws a TypeError 572 on failure. Use asserttype for more efficient (but less safe) equivalent. 573 """ 574 assert isinstance(typ, type), 'only actual types accepted' 575 if not isinstance(obj, typ): 576 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 577 return obj
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
580def checktype_o(obj: Any, typ: type[T]) -> T | None: 581 """Return an object typed as a given optional type. 582 583 Always checks the type at runtime with isinstance and throws a TypeError 584 on failure. Use asserttype for more efficient (but less safe) equivalent. 585 """ 586 assert isinstance(typ, type), 'only actual types accepted' 587 if not isinstance(obj, (typ, type(None))): 588 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 589 return obj
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
592def warntype(obj: Any, typ: type[T]) -> T: 593 """Return an object typed as a given type. 594 595 Always checks the type at runtime and simply logs a warning if it is 596 not what is expected. 597 """ 598 assert isinstance(typ, type), 'only actual types accepted' 599 if not isinstance(obj, typ): 600 import logging 601 602 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 603 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
606def warntype_o(obj: Any, typ: type[T]) -> T | None: 607 """Return an object typed as a given type. 608 609 Always checks the type at runtime and simply logs a warning if it is 610 not what is expected. 611 """ 612 assert isinstance(typ, type), 'only actual types accepted' 613 if not isinstance(obj, (typ, type(None))): 614 import logging 615 616 logging.warning( 617 'warntype: expected a %s or None, got a %s', typ, type(obj) 618 ) 619 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
622def assert_non_optional(obj: T | None) -> T: 623 """Return an object with Optional typing removed. 624 625 Assert is used to check its actual type, so only use this when 626 failures are not expected. Use check_non_optional otherwise. 627 """ 628 assert obj is not None 629 return obj
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
632def check_non_optional(obj: T | None) -> T: 633 """Return an object with Optional typing removed. 634 635 Always checks the actual type and throws a TypeError on failure. 636 Use assert_non_optional for a more efficient (but less safe) equivalent. 637 """ 638 if obj is None: 639 raise ValueError('Got None value in check_non_optional.') 640 return obj
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
643def smoothstep(edge0: float, edge1: float, x: float) -> float: 644 """A smooth transition function. 645 646 Returns a value that smoothly moves from 0 to 1 as we go between edges. 647 Values outside of the range return 0 or 1. 648 """ 649 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 650 return y * y * (3.0 - 2.0 * y)
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
653def linearstep(edge0: float, edge1: float, x: float) -> float: 654 """A linear transition function. 655 656 Returns a value that linearly moves from 0 to 1 as we go between edges. 657 Values outside of the range return 0 or 1. 658 """ 659 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
678def human_readable_compact_id(num: int) -> str: 679 """Given a positive int, return a compact string representation for it. 680 681 Handy for visualizing unique numeric ids using as few as possible chars. 682 This representation uses only lowercase letters and numbers (minus the 683 following letters for readability): 684 's' is excluded due to similarity to '5'. 685 'l' is excluded due to similarity to '1'. 686 'i' is excluded due to similarity to '1'. 687 'o' is excluded due to similarity to '0'. 688 'z' is excluded due to similarity to '2'. 689 690 Therefore for n chars this can store values of 21^n. 691 692 When reading human input consisting of these IDs, it may be desirable 693 to map the disallowed chars to their corresponding allowed ones 694 ('o' -> '0', etc). 695 696 Sort order for these ids is the same as the original numbers. 697 698 If more compactness is desired at the expense of readability, 699 use compact_id() instead. 700 """ 701 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
704def compact_id(num: int) -> str: 705 """Given a positive int, return a compact string representation for it. 706 707 Handy for visualizing unique numeric ids using as few as possible chars. 708 This version is more compact than human_readable_compact_id() but less 709 friendly to humans due to using both capital and lowercase letters, 710 both 'O' and '0', etc. 711 712 Therefore for n chars this can store values of 62^n. 713 714 Sort order for these ids is the same as the original numbers. 715 """ 716 return _compact_id( 717 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 718 )
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
721def caller_source_location() -> str: 722 """Returns source file name and line of the code calling us. 723 724 Example: 'mymodule.py:23' 725 """ 726 try: 727 import inspect 728 729 frame = inspect.currentframe() 730 for _i in range(2): 731 if frame is None: 732 raise RuntimeError() 733 frame = frame.f_back 734 if frame is None: 735 raise RuntimeError() 736 fname = os.path.basename(frame.f_code.co_filename) 737 return f'{fname}:{frame.f_lineno}' 738 except Exception: 739 return '<unknown source location>'
Returns source file name and line of the code calling us.
Example: 'mymodule.py:23'
742def unchanging_hostname() -> str: 743 """Return an unchanging name for the local device. 744 745 Similar to the `hostname` call (or os.uname().nodename in Python) 746 except attempts to give a name that doesn't change depending on 747 network conditions. (A Mac will tend to go from Foo to Foo.local, 748 Foo.lan etc. throughout its various adventures) 749 """ 750 import platform 751 import subprocess 752 753 # On Mac, this should give the computer name assigned in System Prefs. 754 if platform.system() == 'Darwin': 755 return ( 756 subprocess.run( 757 ['scutil', '--get', 'ComputerName'], 758 check=True, 759 capture_output=True, 760 ) 761 .stdout.decode() 762 .strip() 763 .replace(' ', '-') 764 ) 765 return os.uname().nodename
Return an unchanging name for the local device.
Similar to the hostname
call (or os.uname().nodename in Python)
except attempts to give a name that doesn't change depending on
network conditions. (A Mac will tend to go from Foo to Foo.local,
Foo.lan etc. throughout its various adventures)
768def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 769 """Do the thing.""" 770 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 771 return 772 773 modulename = module_globals.get('__name__') 774 if not isinstance(modulename, str): 775 raise RuntimeError('Unable to get module name.') 776 assert not modulename.startswith('_') 777 modulename_prefix = f'{modulename}.' 778 modulename_prefix_2 = f'_{modulename}.' 779 780 for name, obj in module_globals.items(): 781 if name.startswith('_'): 782 continue 783 existing = getattr(obj, '__module__', None) 784 try: 785 # Override the module ONLY if it lives under us somewhere. 786 # So ourpackage._submodule.Foo becomes ourpackage.Foo 787 # but otherpackage._submodule.Foo remains untouched. 788 if existing is not None and ( 789 existing.startswith(modulename_prefix) 790 or existing.startswith(modulename_prefix_2) 791 ): 792 obj.__module__ = modulename 793 except Exception: 794 import logging 795 796 logging.warning( 797 'set_canonical_module_names: unable to change __module__' 798 " from '%s' to '%s' on %s object at '%s'.", 799 existing, 800 modulename, 801 type(obj), 802 name, 803 )
Do the thing.
806def timedelta_str( 807 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 808) -> str: 809 """Return a simple human readable time string for a length of time. 810 811 Time can be given as a timedelta or a float representing seconds. 812 Example output: 813 "23d 1h 2m 32s" (with maxparts == 4) 814 "23d 1h" (with maxparts == 2) 815 "23d 1.08h" (with maxparts == 2 and decimals == 2) 816 817 Note that this is hard-coded in English and probably not especially 818 performant. 819 """ 820 # pylint: disable=too-many-locals 821 822 if isinstance(timeval, float): 823 timevalfin = datetime.timedelta(seconds=timeval) 824 else: 825 timevalfin = timeval 826 827 # Internally we only handle positive values. 828 if timevalfin.total_seconds() < 0: 829 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 830 831 years = timevalfin.days // 365 832 days = timevalfin.days % 365 833 hours = timevalfin.seconds // 3600 834 hour_remainder = timevalfin.seconds % 3600 835 minutes = hour_remainder // 60 836 seconds = hour_remainder % 60 837 838 # Now, if we want decimal places for our last value, 839 # calc fractional parts. 840 if decimals: 841 # Calc totals of each type. 842 t_seconds = timevalfin.total_seconds() 843 t_minutes = t_seconds / 60 844 t_hours = t_minutes / 60 845 t_days = t_hours / 24 846 t_years = t_days / 365 847 848 # Calc fractional parts that exclude all whole values to their left. 849 years_covered = years 850 years_f = t_years - years_covered 851 days_covered = years_covered * 365 + days 852 days_f = t_days - days_covered 853 hours_covered = days_covered * 24 + hours 854 hours_f = t_hours - hours_covered 855 minutes_covered = hours_covered * 60 + minutes 856 minutes_f = t_minutes - minutes_covered 857 seconds_covered = minutes_covered * 60 + seconds 858 seconds_f = t_seconds - seconds_covered 859 else: 860 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 861 862 parts: list[str] = [] 863 for part, part_f, suffix in ( 864 (years, years_f, 'y'), 865 (days, days_f, 'd'), 866 (hours, hours_f, 'h'), 867 (minutes, minutes_f, 'm'), 868 (seconds, seconds_f, 's'), 869 ): 870 if part or parts or (not parts and suffix == 's'): 871 # Do decimal version only for the last part. 872 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 873 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 874 else: 875 parts.append(f'{part}{suffix}') 876 if len(parts) >= maxparts: 877 break 878 return ' '.join(parts)
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
881def ago_str( 882 timeval: datetime.datetime, 883 maxparts: int = 1, 884 now: datetime.datetime | None = None, 885 decimals: int = 0, 886) -> str: 887 """Given a datetime, return a clean human readable 'ago' str. 888 889 Note that this is hard-coded in English so should not be used 890 for visible in-game elements; only tools/etc. 891 892 If now is not passed, efro.util.utc_now() is used. 893 """ 894 if now is None: 895 now = utc_now() 896 return ( 897 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 898 + ' ago' 899 )
Given a datetime, return a clean human readable 'ago' str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, utc_now() is used.
902def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 903 """Split a single list into smaller lists.""" 904 return [ 905 input_list[i : i + max_length] 906 for i in range(0, len(input_list), max_length) 907 ]
Split a single list into smaller lists.
910def extract_flag(args: list[str], name: str) -> bool: 911 """Given a list of args and a flag name, returns whether it is present. 912 913 The arg flag, if present, is removed from the arg list. 914 """ 915 from efro.error import CleanError 916 917 count = args.count(name) 918 if count > 1: 919 raise CleanError(f'Flag {name} passed multiple times.') 920 if not count: 921 return False 922 args.remove(name) 923 return True
Given a list of args and a flag name, returns whether it is present.
The arg flag, if present, is removed from the arg list.
936def extract_arg( 937 args: list[str], name: str, required: bool = False 938) -> str | None: 939 """Given a list of args and an arg name, returns a value. 940 941 The arg flag and value are removed from the arg list. 942 raises CleanErrors on any problems. 943 """ 944 from efro.error import CleanError 945 946 count = args.count(name) 947 if not count: 948 if required: 949 raise CleanError(f'Required argument {name} not passed.') 950 return None 951 952 if count > 1: 953 raise CleanError(f'Arg {name} passed multiple times.') 954 955 argindex = args.index(name) 956 if argindex + 1 >= len(args): 957 raise CleanError(f'No value passed after {name} arg.') 958 959 val = args[argindex + 1] 960 del args[argindex : argindex + 2] 961 962 return val
Given a list of args and an arg name, returns a value.
The arg flag and value are removed from the arg list. raises CleanErrors on any problems.