efro.util
Small handy bits of functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Small handy bits of functionality.""" 4 5from __future__ import annotations 6 7import os 8import time 9import weakref 10import datetime 11import functools 12from enum import Enum 13from typing import TYPE_CHECKING, cast, TypeVar, Generic 14 15_pytz_utc: Any 16 17# We don't *require* pytz, but we want to support it for tzinfos if available. 18try: 19 import pytz 20 21 _pytz_utc = pytz.utc 22except ModuleNotFoundError: 23 _pytz_utc = None # pylint: disable=invalid-name 24 25if TYPE_CHECKING: 26 import asyncio 27 from efro.call import Call as Call # 'as Call' so we re-export. 28 from typing import Any, Callable 29 30T = TypeVar('T') 31ValT = TypeVar('ValT') 32ArgT = TypeVar('ArgT') 33SelfT = TypeVar('SelfT') 34RetT = TypeVar('RetT') 35EnumT = TypeVar('EnumT', bound=Enum) 36 37 38class _EmptyObj: 39 pass 40 41 42# A dead weak-ref should be immutable, right? So we can create exactly 43# one and return it for all cases that need an empty weak-ref. 44_g_empty_weak_ref = weakref.ref(_EmptyObj()) 45assert _g_empty_weak_ref() is None 46 47 48# TODO: kill this and just use efro.call.tpartial 49if TYPE_CHECKING: 50 Call = Call 51else: 52 Call = functools.partial 53 54 55def snake_case_to_title(val: str) -> str: 56 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 57 # Kill empty words resulting from leading/trailing/multiple underscores. 58 return ' '.join(w for w in val.split('_') if w).title() 59 60 61def snake_case_to_camel_case(val: str) -> str: 62 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 63 # Replace underscores with spaces; capitalize words; kill spaces. 64 # Not sure about efficiency, but logically simple. 65 return val.replace('_', ' ').title().replace(' ', '') 66 67 68def enum_by_value(cls: type[EnumT], value: Any) -> EnumT: 69 """Create an enum from a value. 70 71 This is basically the same as doing 'obj = EnumType(value)' except 72 that it works around an issue where a reference loop is created 73 if an exception is thrown due to an invalid value. Since we disable 74 the cyclic garbage collector for most of the time, such loops can lead 75 to our objects sticking around longer than we want. 76 This issue has been submitted to Python as a bug so hopefully we can 77 remove this eventually if it gets fixed: https://bugs.python.org/issue42248 78 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this 79 off once we are 3.9+ across the board. 80 """ 81 82 # Note: we don't recreate *ALL* the functionality of the Enum constructor 83 # such as the _missing_ hook; but this should cover our basic needs. 84 value2member_map = getattr(cls, '_value2member_map_') 85 assert value2member_map is not None 86 try: 87 out = value2member_map[value] 88 assert isinstance(out, cls) 89 return out 90 except KeyError: 91 # pylint: disable=consider-using-f-string 92 raise ValueError( 93 '%r is not a valid %s' % (value, cls.__name__) 94 ) from None 95 96 97def check_utc(value: datetime.datetime) -> None: 98 """Ensure a datetime value is timezone-aware utc.""" 99 if value.tzinfo is not datetime.timezone.utc and ( 100 _pytz_utc is None or value.tzinfo is not _pytz_utc 101 ): 102 raise ValueError( 103 'datetime value does not have timezone set as' 104 ' datetime.timezone.utc' 105 ) 106 107 108def utc_now() -> datetime.datetime: 109 """Get offset-aware current utc time. 110 111 This should be used for all datetimes getting sent over the network, 112 used with the entity system, etc. 113 (datetime.utcnow() gives a utc time value, but it is not timezone-aware 114 which makes it less safe to use) 115 """ 116 return datetime.datetime.now(datetime.timezone.utc) 117 118 119def utc_today() -> datetime.datetime: 120 """Get offset-aware midnight in the utc time zone.""" 121 now = datetime.datetime.now(datetime.timezone.utc) 122 return datetime.datetime( 123 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 124 ) 125 126 127def utc_this_hour() -> datetime.datetime: 128 """Get offset-aware beginning of the current hour in the utc time zone.""" 129 now = datetime.datetime.now(datetime.timezone.utc) 130 return datetime.datetime( 131 year=now.year, 132 month=now.month, 133 day=now.day, 134 hour=now.hour, 135 tzinfo=now.tzinfo, 136 ) 137 138 139def utc_this_minute() -> datetime.datetime: 140 """Get offset-aware beginning of current minute in the utc time zone.""" 141 now = datetime.datetime.now(datetime.timezone.utc) 142 return datetime.datetime( 143 year=now.year, 144 month=now.month, 145 day=now.day, 146 hour=now.hour, 147 minute=now.minute, 148 tzinfo=now.tzinfo, 149 ) 150 151 152def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 153 """Return an invalidated weak-reference for the specified type.""" 154 # At runtime, all weakrefs are the same; our type arg is just 155 # for the static type checker. 156 del objtype # Unused. 157 158 # Just create an object and let it die. Is there a cleaner way to do this? 159 # return weakref.ref(_EmptyObj()) # type: ignore 160 161 return _g_empty_weak_ref # type: ignore 162 163 164def data_size_str(bytecount: int) -> str: 165 """Given a size in bytes, returns a short human readable string. 166 167 This should be 6 or fewer chars for most all sane file sizes. 168 """ 169 # pylint: disable=too-many-return-statements 170 if bytecount <= 999: 171 return f'{bytecount} B' 172 kbytecount = bytecount / 1024 173 if round(kbytecount, 1) < 10.0: 174 return f'{kbytecount:.1f} KB' 175 if round(kbytecount, 0) < 999: 176 return f'{kbytecount:.0f} KB' 177 mbytecount = bytecount / (1024 * 1024) 178 if round(mbytecount, 1) < 10.0: 179 return f'{mbytecount:.1f} MB' 180 if round(mbytecount, 0) < 999: 181 return f'{mbytecount:.0f} MB' 182 gbytecount = bytecount / (1024 * 1024 * 1024) 183 if round(gbytecount, 1) < 10.0: 184 return f'{mbytecount:.1f} GB' 185 return f'{gbytecount:.0f} GB' 186 187 188class DirtyBit: 189 """Manages whether a thing is dirty and regulates attempts to clean it. 190 191 To use, simply set the 'dirty' value on this object to True when some 192 action is needed, and then check the 'should_update' value to regulate 193 when attempts to clean it should be made. Set 'dirty' back to False after 194 a successful update. 195 If 'use_lock' is True, an asyncio Lock will be created and incorporated 196 into update attempts to prevent simultaneous updates (should_update will 197 only return True when the lock is unlocked). Note that It is up to the user 198 to lock/unlock the lock during the actual update attempt. 199 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 200 itself back to dirty after being clean for the given amount of time. 201 'min_update_interval' can be used to enforce a minimum update 202 interval even when updates are successful (retry_interval only applies 203 when updates fail) 204 """ 205 206 def __init__( 207 self, 208 dirty: bool = False, 209 retry_interval: float = 5.0, 210 use_lock: bool = False, 211 auto_dirty_seconds: float | None = None, 212 min_update_interval: float | None = None, 213 ): 214 curtime = time.time() 215 self._retry_interval = retry_interval 216 self._auto_dirty_seconds = auto_dirty_seconds 217 self._min_update_interval = min_update_interval 218 self._dirty = dirty 219 self._next_update_time: float | None = curtime if dirty else None 220 self._last_update_time: float | None = None 221 self._next_auto_dirty_time: float | None = ( 222 (curtime + self._auto_dirty_seconds) 223 if (not dirty and self._auto_dirty_seconds is not None) 224 else None 225 ) 226 self._use_lock = use_lock 227 self.lock: asyncio.Lock 228 if self._use_lock: 229 import asyncio 230 231 self.lock = asyncio.Lock() 232 233 @property 234 def dirty(self) -> bool: 235 """Whether the target is currently dirty. 236 237 This should be set to False once an update is successful. 238 """ 239 return self._dirty 240 241 @dirty.setter 242 def dirty(self, value: bool) -> None: 243 # If we're freshly clean, set our next auto-dirty time (if we have 244 # one). 245 if self._dirty and not value and self._auto_dirty_seconds is not None: 246 self._next_auto_dirty_time = time.time() + self._auto_dirty_seconds 247 248 # If we're freshly dirty, schedule an immediate update. 249 if not self._dirty and value: 250 self._next_update_time = time.time() 251 252 # If they want to enforce a minimum update interval, 253 # push out the next update time if it hasn't been long enough. 254 if ( 255 self._min_update_interval is not None 256 and self._last_update_time is not None 257 ): 258 self._next_update_time = max( 259 self._next_update_time, 260 self._last_update_time + self._min_update_interval, 261 ) 262 263 self._dirty = value 264 265 @property 266 def should_update(self) -> bool: 267 """Whether an attempt should be made to clean the target now. 268 269 Always returns False if the target is not dirty. 270 Takes into account the amount of time passed since the target 271 was marked dirty or since should_update last returned True. 272 """ 273 curtime = time.time() 274 275 # Auto-dirty ourself if we're into that. 276 if ( 277 self._next_auto_dirty_time is not None 278 and curtime > self._next_auto_dirty_time 279 ): 280 self.dirty = True 281 self._next_auto_dirty_time = None 282 if not self._dirty: 283 return False 284 if self._use_lock and self.lock.locked(): 285 return False 286 assert self._next_update_time is not None 287 if curtime > self._next_update_time: 288 self._next_update_time = curtime + self._retry_interval 289 self._last_update_time = curtime 290 return True 291 return False 292 293 294class DispatchMethodWrapper(Generic[ArgT, RetT]): 295 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 296 297 def __call__(self, arg: ArgT) -> RetT: 298 raise RuntimeError('Should not get here') 299 300 @staticmethod 301 def register( 302 func: Callable[[Any, Any], RetT] 303 ) -> Callable[[Any, Any], RetT]: 304 """Register a new dispatch handler for this dispatch-method.""" 305 raise RuntimeError('Should not get here') 306 307 registry: dict[Any, Callable] 308 309 310# noinspection PyProtectedMember,PyTypeHints 311def dispatchmethod( 312 func: Callable[[Any, ArgT], RetT] 313) -> DispatchMethodWrapper[ArgT, RetT]: 314 """A variation of functools.singledispatch for methods. 315 316 Note: as of Python 3.9 there is now functools.singledispatchmethod, 317 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 318 which gives us a reason to keep this one around for now. 319 """ 320 from functools import singledispatch, update_wrapper 321 322 origwrapper: Any = singledispatch(func) 323 324 # Pull this out so hopefully origwrapper can die, 325 # otherwise we reference origwrapper in our wrapper. 326 dispatch = origwrapper.dispatch 327 328 # All we do here is recreate the end of functools.singledispatch 329 # where it returns a wrapper except instead of the wrapper using the 330 # first arg to the function ours uses the second (to skip 'self'). 331 # This was made against Python 3.7; we should probably check up on 332 # this in later versions in case anything has changed. 333 # (or hopefully they'll add this functionality to their version) 334 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 335 def wrapper(*args: Any, **kw: Any) -> Any: 336 if not args or len(args) < 2: 337 raise TypeError( 338 f'{funcname} requires at least ' '2 positional arguments' 339 ) 340 341 return dispatch(args[1].__class__)(*args, **kw) 342 343 funcname = getattr(func, '__name__', 'dispatchmethod method') 344 wrapper.register = origwrapper.register # type: ignore 345 wrapper.dispatch = dispatch # type: ignore 346 wrapper.registry = origwrapper.registry # type: ignore 347 # pylint: disable=protected-access 348 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 349 update_wrapper(wrapper, func) 350 # pylint: enable=protected-access 351 return cast(DispatchMethodWrapper, wrapper) 352 353 354def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 355 """Decorator for functions to allow dispatching based on a value. 356 357 This differs from functools.singledispatch in that it dispatches based 358 on the value of an argument, not based on its type. 359 The 'register' method of a value-dispatch function can be used 360 to assign new functions to handle particular values. 361 Unhandled values wind up in the original dispatch function.""" 362 return ValueDispatcher(call) 363 364 365class ValueDispatcher(Generic[ValT, RetT]): 366 """Used by the valuedispatch decorator""" 367 368 def __init__(self, call: Callable[[ValT], RetT]) -> None: 369 self._base_call = call 370 self._handlers: dict[ValT, Callable[[], RetT]] = {} 371 372 def __call__(self, value: ValT) -> RetT: 373 handler = self._handlers.get(value) 374 if handler is not None: 375 return handler() 376 return self._base_call(value) 377 378 def _add_handler( 379 self, value: ValT, call: Callable[[], RetT] 380 ) -> Callable[[], RetT]: 381 if value in self._handlers: 382 raise RuntimeError(f'Duplicate handlers added for {value}') 383 self._handlers[value] = call 384 return call 385 386 def register( 387 self, value: ValT 388 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 389 """Add a handler to the dispatcher.""" 390 from functools import partial 391 392 return partial(self._add_handler, value) 393 394 395def valuedispatch1arg( 396 call: Callable[[ValT, ArgT], RetT] 397) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 398 """Like valuedispatch but for functions taking an extra argument.""" 399 return ValueDispatcher1Arg(call) 400 401 402class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 403 """Used by the valuedispatch1arg decorator""" 404 405 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 406 self._base_call = call 407 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 408 409 def __call__(self, value: ValT, arg: ArgT) -> RetT: 410 handler = self._handlers.get(value) 411 if handler is not None: 412 return handler(arg) 413 return self._base_call(value, arg) 414 415 def _add_handler( 416 self, value: ValT, call: Callable[[ArgT], RetT] 417 ) -> Callable[[ArgT], RetT]: 418 if value in self._handlers: 419 raise RuntimeError(f'Duplicate handlers added for {value}') 420 self._handlers[value] = call 421 return call 422 423 def register( 424 self, value: ValT 425 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 426 """Add a handler to the dispatcher.""" 427 from functools import partial 428 429 return partial(self._add_handler, value) 430 431 432if TYPE_CHECKING: 433 434 class ValueDispatcherMethod(Generic[ValT, RetT]): 435 """Used by the valuedispatchmethod decorator.""" 436 437 def __call__(self, value: ValT) -> RetT: 438 ... 439 440 def register( 441 self, value: ValT 442 ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]: 443 """Add a handler to the dispatcher.""" 444 ... 445 446 447def valuedispatchmethod( 448 call: Callable[[SelfT, ValT], RetT] 449) -> ValueDispatcherMethod[ValT, RetT]: 450 """Like valuedispatch but works with methods instead of functions.""" 451 452 # NOTE: It seems that to wrap a method with a decorator and have self 453 # dispatching do the right thing, we must return a function and not 454 # an executable object. So for this version we store our data here 455 # in the function call dict and simply return a call. 456 457 _base_call = call 458 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 459 460 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 461 if value in _handlers: 462 raise RuntimeError(f'Duplicate handlers added for {value}') 463 _handlers[value] = addcall 464 465 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 466 from functools import partial 467 468 return partial(_add_handler, value) 469 470 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 471 handler = _handlers.get(value) 472 if handler is not None: 473 return handler(self) 474 return _base_call(self, value) 475 476 # We still want to use our returned object to register handlers, but we're 477 # actually just returning a function. So manually stuff the call onto it. 478 setattr(_call_wrapper, 'register', _register) 479 480 # To the type checker's eyes we return a ValueDispatchMethod instance; 481 # this lets it know about our register func and type-check its usage. 482 # In reality we just return a raw function call (for reasons listed above). 483 # pylint: disable=undefined-variable, no-else-return 484 if TYPE_CHECKING: 485 return ValueDispatcherMethod[ValT, RetT]() 486 else: 487 return _call_wrapper 488 489 490def make_hash(obj: Any) -> int: 491 """Makes a hash from a dictionary, list, tuple or set to any level, 492 that contains only other hashable types (including any lists, tuples, 493 sets, and dictionaries). 494 495 Note that this uses Python's hash() function internally so collisions/etc. 496 may be more common than with fancy cryptographic hashes. 497 498 Also be aware that Python's hash() output varies across processes, so 499 this should only be used for values that will remain in a single process. 500 """ 501 import copy 502 503 if isinstance(obj, (set, tuple, list)): 504 return hash(tuple(make_hash(e) for e in obj)) 505 if not isinstance(obj, dict): 506 return hash(obj) 507 508 new_obj = copy.deepcopy(obj) 509 for k, v in new_obj.items(): 510 new_obj[k] = make_hash(v) 511 512 # NOTE: there is sorted works correctly because it compares only 513 # unique first values (i.e. dict keys) 514 return hash(tuple(frozenset(sorted(new_obj.items())))) 515 516 517def asserttype(obj: Any, typ: type[T]) -> T: 518 """Return an object typed as a given type. 519 520 Assert is used to check its actual type, so only use this when 521 failures are not expected. Otherwise use checktype. 522 """ 523 assert isinstance(typ, type), 'only actual types accepted' 524 assert isinstance(obj, typ) 525 return obj 526 527 528def asserttype_o(obj: Any, typ: type[T]) -> T | None: 529 """Return an object typed as a given optional type. 530 531 Assert is used to check its actual type, so only use this when 532 failures are not expected. Otherwise use checktype. 533 """ 534 assert isinstance(typ, type), 'only actual types accepted' 535 assert isinstance(obj, (typ, type(None))) 536 return obj 537 538 539def checktype(obj: Any, typ: type[T]) -> T: 540 """Return an object typed as a given type. 541 542 Always checks the type at runtime with isinstance and throws a TypeError 543 on failure. Use asserttype for more efficient (but less safe) equivalent. 544 """ 545 assert isinstance(typ, type), 'only actual types accepted' 546 if not isinstance(obj, typ): 547 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 548 return obj 549 550 551def checktype_o(obj: Any, typ: type[T]) -> T | None: 552 """Return an object typed as a given optional type. 553 554 Always checks the type at runtime with isinstance and throws a TypeError 555 on failure. Use asserttype for more efficient (but less safe) equivalent. 556 """ 557 assert isinstance(typ, type), 'only actual types accepted' 558 if not isinstance(obj, (typ, type(None))): 559 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 560 return obj 561 562 563def warntype(obj: Any, typ: type[T]) -> T: 564 """Return an object typed as a given type. 565 566 Always checks the type at runtime and simply logs a warning if it is 567 not what is expected. 568 """ 569 assert isinstance(typ, type), 'only actual types accepted' 570 if not isinstance(obj, typ): 571 import logging 572 573 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 574 return obj # type: ignore 575 576 577def warntype_o(obj: Any, typ: type[T]) -> T | None: 578 """Return an object typed as a given type. 579 580 Always checks the type at runtime and simply logs a warning if it is 581 not what is expected. 582 """ 583 assert isinstance(typ, type), 'only actual types accepted' 584 if not isinstance(obj, (typ, type(None))): 585 import logging 586 587 logging.warning( 588 'warntype: expected a %s or None, got a %s', typ, type(obj) 589 ) 590 return obj # type: ignore 591 592 593def assert_non_optional(obj: T | None) -> T: 594 """Return an object with Optional typing removed. 595 596 Assert is used to check its actual type, so only use this when 597 failures are not expected. Use check_non_optional otherwise. 598 """ 599 assert obj is not None 600 return obj 601 602 603def check_non_optional(obj: T | None) -> T: 604 """Return an object with Optional typing removed. 605 606 Always checks the actual type and throws a TypeError on failure. 607 Use assert_non_optional for a more efficient (but less safe) equivalent. 608 """ 609 if obj is None: 610 raise TypeError('Got None value in check_non_optional.') 611 return obj 612 613 614def smoothstep(edge0: float, edge1: float, x: float) -> float: 615 """A smooth transition function. 616 617 Returns a value that smoothly moves from 0 to 1 as we go between edges. 618 Values outside of the range return 0 or 1. 619 """ 620 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 621 return y * y * (3.0 - 2.0 * y) 622 623 624def linearstep(edge0: float, edge1: float, x: float) -> float: 625 """A linear transition function. 626 627 Returns a value that linearly moves from 0 to 1 as we go between edges. 628 Values outside of the range return 0 or 1. 629 """ 630 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) 631 632 633def _compact_id(num: int, chars: str) -> str: 634 if num < 0: 635 raise ValueError('Negative integers not allowed.') 636 637 # Chars must be in sorted order for sorting to work correctly 638 # on our output. 639 assert ''.join(sorted(list(chars))) == chars 640 641 base = len(chars) 642 out = '' 643 while num: 644 out += chars[num % base] 645 num //= base 646 return out[::-1] or '0' 647 648 649def human_readable_compact_id(num: int) -> str: 650 """Given a positive int, return a compact string representation for it. 651 652 Handy for visualizing unique numeric ids using as few as possible chars. 653 This representation uses only lowercase letters and numbers (minus the 654 following letters for readability): 655 's' is excluded due to similarity to '5'. 656 'l' is excluded due to similarity to '1'. 657 'i' is excluded due to similarity to '1'. 658 'o' is excluded due to similarity to '0'. 659 'z' is excluded due to similarity to '2'. 660 661 Therefore for n chars this can store values of 21^n. 662 663 When reading human input consisting of these IDs, it may be desirable 664 to map the disallowed chars to their corresponding allowed ones 665 ('o' -> '0', etc). 666 667 Sort order for these ids is the same as the original numbers. 668 669 If more compactness is desired at the expense of readability, 670 use compact_id() instead. 671 """ 672 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy') 673 674 675def compact_id(num: int) -> str: 676 """Given a positive int, return a compact string representation for it. 677 678 Handy for visualizing unique numeric ids using as few as possible chars. 679 This version is more compact than human_readable_compact_id() but less 680 friendly to humans due to using both capital and lowercase letters, 681 both 'O' and '0', etc. 682 683 Therefore for n chars this can store values of 62^n. 684 685 Sort order for these ids is the same as the original numbers. 686 """ 687 return _compact_id( 688 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 689 ) 690 691 692def unchanging_hostname() -> str: 693 """Return an unchanging name for the local device. 694 695 Similar to the `hostname` call (or os.uname().nodename in Python) 696 except attempts to give a name that doesn't change depending on 697 network conditions. (A Mac will tend to go from Foo to Foo.local, 698 Foo.lan etc. throughout its various adventures) 699 """ 700 import platform 701 import subprocess 702 703 # On Mac, this should give the computer name assigned in System Prefs. 704 if platform.system() == 'Darwin': 705 return ( 706 subprocess.run( 707 ['scutil', '--get', 'ComputerName'], 708 check=True, 709 capture_output=True, 710 ) 711 .stdout.decode() 712 .strip() 713 .replace(' ', '-') 714 ) 715 return os.uname().nodename 716 717 718def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 719 """Do the thing.""" 720 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 721 return 722 723 modulename = module_globals.get('__name__') 724 if not isinstance(modulename, str): 725 raise RuntimeError('Unable to get module name.') 726 assert not modulename.startswith('_') 727 modulename_prefix = f'{modulename}.' 728 modulename_prefix_2 = f'_{modulename}.' 729 730 for name, obj in module_globals.items(): 731 if name.startswith('_'): 732 continue 733 existing = getattr(obj, '__module__', None) 734 try: 735 # Override the module ONLY if it lives under us somewhere. 736 # So ourpackage._submodule.Foo becomes ourpackage.Foo 737 # but otherpackage._submodule.Foo remains untouched. 738 if existing is not None and ( 739 existing.startswith(modulename_prefix) 740 or existing.startswith(modulename_prefix_2) 741 ): 742 obj.__module__ = modulename 743 except Exception: 744 import logging 745 746 logging.warning( 747 'set_canonical_module_names: unable to change __module__' 748 " from '%s' to '%s' on %s object at '%s'.", 749 existing, 750 modulename, 751 type(obj), 752 name, 753 ) 754 755 756def timedelta_str( 757 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 758) -> str: 759 """Return a simple human readable time string for a length of time. 760 761 Time can be given as a timedelta or a float representing seconds. 762 Example output: 763 "23d 1h 2m 32s" (with maxparts == 4) 764 "23d 1h" (with maxparts == 2) 765 "23d 1.08h" (with maxparts == 2 and decimals == 2) 766 767 Note that this is hard-coded in English and probably not especially 768 performant. 769 """ 770 # pylint: disable=too-many-locals 771 772 if isinstance(timeval, float): 773 timevalfin = datetime.timedelta(seconds=timeval) 774 else: 775 timevalfin = timeval 776 777 # Internally we only handle positive values. 778 if timevalfin.total_seconds() < 0: 779 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 780 781 years = timevalfin.days // 365 782 days = timevalfin.days % 365 783 hours = timevalfin.seconds // 3600 784 hour_remainder = timevalfin.seconds % 3600 785 minutes = hour_remainder // 60 786 seconds = hour_remainder % 60 787 788 # Now, if we want decimal places for our last value, 789 # calc fractional parts. 790 if decimals: 791 # Calc totals of each type. 792 t_seconds = timevalfin.total_seconds() 793 t_minutes = t_seconds / 60 794 t_hours = t_minutes / 60 795 t_days = t_hours / 24 796 t_years = t_days / 365 797 798 # Calc fractional parts that exclude all whole values to their left. 799 years_covered = years 800 years_f = t_years - years_covered 801 days_covered = years_covered * 365 + days 802 days_f = t_days - days_covered 803 hours_covered = days_covered * 24 + hours 804 hours_f = t_hours - hours_covered 805 minutes_covered = hours_covered * 60 + minutes 806 minutes_f = t_minutes - minutes_covered 807 seconds_covered = minutes_covered * 60 + seconds 808 seconds_f = t_seconds - seconds_covered 809 else: 810 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 811 812 parts: list[str] = [] 813 for part, part_f, suffix in ( 814 (years, years_f, 'y'), 815 (days, days_f, 'd'), 816 (hours, hours_f, 'h'), 817 (minutes, minutes_f, 'm'), 818 (seconds, seconds_f, 's'), 819 ): 820 if part or parts or (not parts and suffix == 's'): 821 # Do decimal version only for the last part. 822 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 823 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 824 else: 825 parts.append(f'{part}{suffix}') 826 if len(parts) >= maxparts: 827 break 828 return ' '.join(parts) 829 830 831def ago_str( 832 timeval: datetime.datetime, 833 maxparts: int = 1, 834 now: datetime.datetime | None = None, 835 decimals: int = 0, 836) -> str: 837 """Given a datetime, return a clean human readable 'ago' str. 838 839 Note that this is hard-coded in English so should not be used 840 for visible in-game elements; only tools/etc. 841 842 If now is not passed, efro.util.utc_now() is used. 843 """ 844 if now is None: 845 now = utc_now() 846 return ( 847 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 848 + ' ago' 849 )
56def snake_case_to_title(val: str) -> str: 57 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 58 # Kill empty words resulting from leading/trailing/multiple underscores. 59 return ' '.join(w for w in val.split('_') if w).title()
Given a snake-case string 'foo_bar', returns 'Foo Bar'.
62def snake_case_to_camel_case(val: str) -> str: 63 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 64 # Replace underscores with spaces; capitalize words; kill spaces. 65 # Not sure about efficiency, but logically simple. 66 return val.replace('_', ' ').title().replace(' ', '')
Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.
69def enum_by_value(cls: type[EnumT], value: Any) -> EnumT: 70 """Create an enum from a value. 71 72 This is basically the same as doing 'obj = EnumType(value)' except 73 that it works around an issue where a reference loop is created 74 if an exception is thrown due to an invalid value. Since we disable 75 the cyclic garbage collector for most of the time, such loops can lead 76 to our objects sticking around longer than we want. 77 This issue has been submitted to Python as a bug so hopefully we can 78 remove this eventually if it gets fixed: https://bugs.python.org/issue42248 79 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this 80 off once we are 3.9+ across the board. 81 """ 82 83 # Note: we don't recreate *ALL* the functionality of the Enum constructor 84 # such as the _missing_ hook; but this should cover our basic needs. 85 value2member_map = getattr(cls, '_value2member_map_') 86 assert value2member_map is not None 87 try: 88 out = value2member_map[value] 89 assert isinstance(out, cls) 90 return out 91 except KeyError: 92 # pylint: disable=consider-using-f-string 93 raise ValueError( 94 '%r is not a valid %s' % (value, cls.__name__) 95 ) from None
Create an enum from a value.
This is basically the same as doing 'obj = EnumType(value)' except that it works around an issue where a reference loop is created if an exception is thrown due to an invalid value. Since we disable the cyclic garbage collector for most of the time, such loops can lead to our objects sticking around longer than we want. This issue has been submitted to Python as a bug so hopefully we can remove this eventually if it gets fixed: https://bugs.python.org/issue42248 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this off once we are 3.9+ across the board.
98def check_utc(value: datetime.datetime) -> None: 99 """Ensure a datetime value is timezone-aware utc.""" 100 if value.tzinfo is not datetime.timezone.utc and ( 101 _pytz_utc is None or value.tzinfo is not _pytz_utc 102 ): 103 raise ValueError( 104 'datetime value does not have timezone set as' 105 ' datetime.timezone.utc' 106 )
Ensure a datetime value is timezone-aware utc.
109def utc_now() -> datetime.datetime: 110 """Get offset-aware current utc time. 111 112 This should be used for all datetimes getting sent over the network, 113 used with the entity system, etc. 114 (datetime.utcnow() gives a utc time value, but it is not timezone-aware 115 which makes it less safe to use) 116 """ 117 return datetime.datetime.now(datetime.timezone.utc)
Get offset-aware current utc time.
This should be used for all datetimes getting sent over the network, used with the entity system, etc. (datetime.utcnow() gives a utc time value, but it is not timezone-aware which makes it less safe to use)
120def utc_today() -> datetime.datetime: 121 """Get offset-aware midnight in the utc time zone.""" 122 now = datetime.datetime.now(datetime.timezone.utc) 123 return datetime.datetime( 124 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 125 )
Get offset-aware midnight in the utc time zone.
128def utc_this_hour() -> datetime.datetime: 129 """Get offset-aware beginning of the current hour in the utc time zone.""" 130 now = datetime.datetime.now(datetime.timezone.utc) 131 return datetime.datetime( 132 year=now.year, 133 month=now.month, 134 day=now.day, 135 hour=now.hour, 136 tzinfo=now.tzinfo, 137 )
Get offset-aware beginning of the current hour in the utc time zone.
140def utc_this_minute() -> datetime.datetime: 141 """Get offset-aware beginning of current minute in the utc time zone.""" 142 now = datetime.datetime.now(datetime.timezone.utc) 143 return datetime.datetime( 144 year=now.year, 145 month=now.month, 146 day=now.day, 147 hour=now.hour, 148 minute=now.minute, 149 tzinfo=now.tzinfo, 150 )
Get offset-aware beginning of current minute in the utc time zone.
153def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 154 """Return an invalidated weak-reference for the specified type.""" 155 # At runtime, all weakrefs are the same; our type arg is just 156 # for the static type checker. 157 del objtype # Unused. 158 159 # Just create an object and let it die. Is there a cleaner way to do this? 160 # return weakref.ref(_EmptyObj()) # type: ignore 161 162 return _g_empty_weak_ref # type: ignore
Return an invalidated weak-reference for the specified type.
165def data_size_str(bytecount: int) -> str: 166 """Given a size in bytes, returns a short human readable string. 167 168 This should be 6 or fewer chars for most all sane file sizes. 169 """ 170 # pylint: disable=too-many-return-statements 171 if bytecount <= 999: 172 return f'{bytecount} B' 173 kbytecount = bytecount / 1024 174 if round(kbytecount, 1) < 10.0: 175 return f'{kbytecount:.1f} KB' 176 if round(kbytecount, 0) < 999: 177 return f'{kbytecount:.0f} KB' 178 mbytecount = bytecount / (1024 * 1024) 179 if round(mbytecount, 1) < 10.0: 180 return f'{mbytecount:.1f} MB' 181 if round(mbytecount, 0) < 999: 182 return f'{mbytecount:.0f} MB' 183 gbytecount = bytecount / (1024 * 1024 * 1024) 184 if round(gbytecount, 1) < 10.0: 185 return f'{mbytecount:.1f} GB' 186 return f'{gbytecount:.0f} GB'
Given a size in bytes, returns a short human readable string.
This should be 6 or fewer chars for most all sane file sizes.
189class DirtyBit: 190 """Manages whether a thing is dirty and regulates attempts to clean it. 191 192 To use, simply set the 'dirty' value on this object to True when some 193 action is needed, and then check the 'should_update' value to regulate 194 when attempts to clean it should be made. Set 'dirty' back to False after 195 a successful update. 196 If 'use_lock' is True, an asyncio Lock will be created and incorporated 197 into update attempts to prevent simultaneous updates (should_update will 198 only return True when the lock is unlocked). Note that It is up to the user 199 to lock/unlock the lock during the actual update attempt. 200 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 201 itself back to dirty after being clean for the given amount of time. 202 'min_update_interval' can be used to enforce a minimum update 203 interval even when updates are successful (retry_interval only applies 204 when updates fail) 205 """ 206 207 def __init__( 208 self, 209 dirty: bool = False, 210 retry_interval: float = 5.0, 211 use_lock: bool = False, 212 auto_dirty_seconds: float | None = None, 213 min_update_interval: float | None = None, 214 ): 215 curtime = time.time() 216 self._retry_interval = retry_interval 217 self._auto_dirty_seconds = auto_dirty_seconds 218 self._min_update_interval = min_update_interval 219 self._dirty = dirty 220 self._next_update_time: float | None = curtime if dirty else None 221 self._last_update_time: float | None = None 222 self._next_auto_dirty_time: float | None = ( 223 (curtime + self._auto_dirty_seconds) 224 if (not dirty and self._auto_dirty_seconds is not None) 225 else None 226 ) 227 self._use_lock = use_lock 228 self.lock: asyncio.Lock 229 if self._use_lock: 230 import asyncio 231 232 self.lock = asyncio.Lock() 233 234 @property 235 def dirty(self) -> bool: 236 """Whether the target is currently dirty. 237 238 This should be set to False once an update is successful. 239 """ 240 return self._dirty 241 242 @dirty.setter 243 def dirty(self, value: bool) -> None: 244 # If we're freshly clean, set our next auto-dirty time (if we have 245 # one). 246 if self._dirty and not value and self._auto_dirty_seconds is not None: 247 self._next_auto_dirty_time = time.time() + self._auto_dirty_seconds 248 249 # If we're freshly dirty, schedule an immediate update. 250 if not self._dirty and value: 251 self._next_update_time = time.time() 252 253 # If they want to enforce a minimum update interval, 254 # push out the next update time if it hasn't been long enough. 255 if ( 256 self._min_update_interval is not None 257 and self._last_update_time is not None 258 ): 259 self._next_update_time = max( 260 self._next_update_time, 261 self._last_update_time + self._min_update_interval, 262 ) 263 264 self._dirty = value 265 266 @property 267 def should_update(self) -> bool: 268 """Whether an attempt should be made to clean the target now. 269 270 Always returns False if the target is not dirty. 271 Takes into account the amount of time passed since the target 272 was marked dirty or since should_update last returned True. 273 """ 274 curtime = time.time() 275 276 # Auto-dirty ourself if we're into that. 277 if ( 278 self._next_auto_dirty_time is not None 279 and curtime > self._next_auto_dirty_time 280 ): 281 self.dirty = True 282 self._next_auto_dirty_time = None 283 if not self._dirty: 284 return False 285 if self._use_lock and self.lock.locked(): 286 return False 287 assert self._next_update_time is not None 288 if curtime > self._next_update_time: 289 self._next_update_time = curtime + self._retry_interval 290 self._last_update_time = curtime 291 return True 292 return False
Manages whether a thing is dirty and regulates attempts to clean it.
To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
207 def __init__( 208 self, 209 dirty: bool = False, 210 retry_interval: float = 5.0, 211 use_lock: bool = False, 212 auto_dirty_seconds: float | None = None, 213 min_update_interval: float | None = None, 214 ): 215 curtime = time.time() 216 self._retry_interval = retry_interval 217 self._auto_dirty_seconds = auto_dirty_seconds 218 self._min_update_interval = min_update_interval 219 self._dirty = dirty 220 self._next_update_time: float | None = curtime if dirty else None 221 self._last_update_time: float | None = None 222 self._next_auto_dirty_time: float | None = ( 223 (curtime + self._auto_dirty_seconds) 224 if (not dirty and self._auto_dirty_seconds is not None) 225 else None 226 ) 227 self._use_lock = use_lock 228 self.lock: asyncio.Lock 229 if self._use_lock: 230 import asyncio 231 232 self.lock = asyncio.Lock()
295class DispatchMethodWrapper(Generic[ArgT, RetT]): 296 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 297 298 def __call__(self, arg: ArgT) -> RetT: 299 raise RuntimeError('Should not get here') 300 301 @staticmethod 302 def register( 303 func: Callable[[Any, Any], RetT] 304 ) -> Callable[[Any, Any], RetT]: 305 """Register a new dispatch handler for this dispatch-method.""" 306 raise RuntimeError('Should not get here') 307 308 registry: dict[Any, Callable]
Type-aware standin for the dispatch func returned by dispatchmethod.
301 @staticmethod 302 def register( 303 func: Callable[[Any, Any], RetT] 304 ) -> Callable[[Any, Any], RetT]: 305 """Register a new dispatch handler for this dispatch-method.""" 306 raise RuntimeError('Should not get here')
Register a new dispatch handler for this dispatch-method.
312def dispatchmethod( 313 func: Callable[[Any, ArgT], RetT] 314) -> DispatchMethodWrapper[ArgT, RetT]: 315 """A variation of functools.singledispatch for methods. 316 317 Note: as of Python 3.9 there is now functools.singledispatchmethod, 318 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 319 which gives us a reason to keep this one around for now. 320 """ 321 from functools import singledispatch, update_wrapper 322 323 origwrapper: Any = singledispatch(func) 324 325 # Pull this out so hopefully origwrapper can die, 326 # otherwise we reference origwrapper in our wrapper. 327 dispatch = origwrapper.dispatch 328 329 # All we do here is recreate the end of functools.singledispatch 330 # where it returns a wrapper except instead of the wrapper using the 331 # first arg to the function ours uses the second (to skip 'self'). 332 # This was made against Python 3.7; we should probably check up on 333 # this in later versions in case anything has changed. 334 # (or hopefully they'll add this functionality to their version) 335 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 336 def wrapper(*args: Any, **kw: Any) -> Any: 337 if not args or len(args) < 2: 338 raise TypeError( 339 f'{funcname} requires at least ' '2 positional arguments' 340 ) 341 342 return dispatch(args[1].__class__)(*args, **kw) 343 344 funcname = getattr(func, '__name__', 'dispatchmethod method') 345 wrapper.register = origwrapper.register # type: ignore 346 wrapper.dispatch = dispatch # type: ignore 347 wrapper.registry = origwrapper.registry # type: ignore 348 # pylint: disable=protected-access 349 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 350 update_wrapper(wrapper, func) 351 # pylint: enable=protected-access 352 return cast(DispatchMethodWrapper, wrapper)
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
355def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 356 """Decorator for functions to allow dispatching based on a value. 357 358 This differs from functools.singledispatch in that it dispatches based 359 on the value of an argument, not based on its type. 360 The 'register' method of a value-dispatch function can be used 361 to assign new functions to handle particular values. 362 Unhandled values wind up in the original dispatch function.""" 363 return ValueDispatcher(call)
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
366class ValueDispatcher(Generic[ValT, RetT]): 367 """Used by the valuedispatch decorator""" 368 369 def __init__(self, call: Callable[[ValT], RetT]) -> None: 370 self._base_call = call 371 self._handlers: dict[ValT, Callable[[], RetT]] = {} 372 373 def __call__(self, value: ValT) -> RetT: 374 handler = self._handlers.get(value) 375 if handler is not None: 376 return handler() 377 return self._base_call(value) 378 379 def _add_handler( 380 self, value: ValT, call: Callable[[], RetT] 381 ) -> Callable[[], RetT]: 382 if value in self._handlers: 383 raise RuntimeError(f'Duplicate handlers added for {value}') 384 self._handlers[value] = call 385 return call 386 387 def register( 388 self, value: ValT 389 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 390 """Add a handler to the dispatcher.""" 391 from functools import partial 392 393 return partial(self._add_handler, value)
Used by the valuedispatch decorator
387 def register( 388 self, value: ValT 389 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 390 """Add a handler to the dispatcher.""" 391 from functools import partial 392 393 return partial(self._add_handler, value)
Add a handler to the dispatcher.
396def valuedispatch1arg( 397 call: Callable[[ValT, ArgT], RetT] 398) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 399 """Like valuedispatch but for functions taking an extra argument.""" 400 return ValueDispatcher1Arg(call)
Like valuedispatch but for functions taking an extra argument.
403class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 404 """Used by the valuedispatch1arg decorator""" 405 406 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 407 self._base_call = call 408 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 409 410 def __call__(self, value: ValT, arg: ArgT) -> RetT: 411 handler = self._handlers.get(value) 412 if handler is not None: 413 return handler(arg) 414 return self._base_call(value, arg) 415 416 def _add_handler( 417 self, value: ValT, call: Callable[[ArgT], RetT] 418 ) -> Callable[[ArgT], RetT]: 419 if value in self._handlers: 420 raise RuntimeError(f'Duplicate handlers added for {value}') 421 self._handlers[value] = call 422 return call 423 424 def register( 425 self, value: ValT 426 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 427 """Add a handler to the dispatcher.""" 428 from functools import partial 429 430 return partial(self._add_handler, value)
Used by the valuedispatch1arg decorator
424 def register( 425 self, value: ValT 426 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 427 """Add a handler to the dispatcher.""" 428 from functools import partial 429 430 return partial(self._add_handler, value)
Add a handler to the dispatcher.
448def valuedispatchmethod( 449 call: Callable[[SelfT, ValT], RetT] 450) -> ValueDispatcherMethod[ValT, RetT]: 451 """Like valuedispatch but works with methods instead of functions.""" 452 453 # NOTE: It seems that to wrap a method with a decorator and have self 454 # dispatching do the right thing, we must return a function and not 455 # an executable object. So for this version we store our data here 456 # in the function call dict and simply return a call. 457 458 _base_call = call 459 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 460 461 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 462 if value in _handlers: 463 raise RuntimeError(f'Duplicate handlers added for {value}') 464 _handlers[value] = addcall 465 466 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 467 from functools import partial 468 469 return partial(_add_handler, value) 470 471 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 472 handler = _handlers.get(value) 473 if handler is not None: 474 return handler(self) 475 return _base_call(self, value) 476 477 # We still want to use our returned object to register handlers, but we're 478 # actually just returning a function. So manually stuff the call onto it. 479 setattr(_call_wrapper, 'register', _register) 480 481 # To the type checker's eyes we return a ValueDispatchMethod instance; 482 # this lets it know about our register func and type-check its usage. 483 # In reality we just return a raw function call (for reasons listed above). 484 # pylint: disable=undefined-variable, no-else-return 485 if TYPE_CHECKING: 486 return ValueDispatcherMethod[ValT, RetT]() 487 else: 488 return _call_wrapper
Like valuedispatch but works with methods instead of functions.
491def make_hash(obj: Any) -> int: 492 """Makes a hash from a dictionary, list, tuple or set to any level, 493 that contains only other hashable types (including any lists, tuples, 494 sets, and dictionaries). 495 496 Note that this uses Python's hash() function internally so collisions/etc. 497 may be more common than with fancy cryptographic hashes. 498 499 Also be aware that Python's hash() output varies across processes, so 500 this should only be used for values that will remain in a single process. 501 """ 502 import copy 503 504 if isinstance(obj, (set, tuple, list)): 505 return hash(tuple(make_hash(e) for e in obj)) 506 if not isinstance(obj, dict): 507 return hash(obj) 508 509 new_obj = copy.deepcopy(obj) 510 for k, v in new_obj.items(): 511 new_obj[k] = make_hash(v) 512 513 # NOTE: there is sorted works correctly because it compares only 514 # unique first values (i.e. dict keys) 515 return hash(tuple(frozenset(sorted(new_obj.items()))))
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.
518def asserttype(obj: Any, typ: type[T]) -> T: 519 """Return an object typed as a given type. 520 521 Assert is used to check its actual type, so only use this when 522 failures are not expected. Otherwise use checktype. 523 """ 524 assert isinstance(typ, type), 'only actual types accepted' 525 assert isinstance(obj, typ) 526 return obj
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
529def asserttype_o(obj: Any, typ: type[T]) -> T | None: 530 """Return an object typed as a given optional type. 531 532 Assert is used to check its actual type, so only use this when 533 failures are not expected. Otherwise use checktype. 534 """ 535 assert isinstance(typ, type), 'only actual types accepted' 536 assert isinstance(obj, (typ, type(None))) 537 return obj
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
540def checktype(obj: Any, typ: type[T]) -> T: 541 """Return an object typed as a given type. 542 543 Always checks the type at runtime with isinstance and throws a TypeError 544 on failure. Use asserttype for more efficient (but less safe) equivalent. 545 """ 546 assert isinstance(typ, type), 'only actual types accepted' 547 if not isinstance(obj, typ): 548 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 549 return obj
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
552def checktype_o(obj: Any, typ: type[T]) -> T | None: 553 """Return an object typed as a given optional type. 554 555 Always checks the type at runtime with isinstance and throws a TypeError 556 on failure. Use asserttype for more efficient (but less safe) equivalent. 557 """ 558 assert isinstance(typ, type), 'only actual types accepted' 559 if not isinstance(obj, (typ, type(None))): 560 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 561 return obj
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
564def warntype(obj: Any, typ: type[T]) -> T: 565 """Return an object typed as a given type. 566 567 Always checks the type at runtime and simply logs a warning if it is 568 not what is expected. 569 """ 570 assert isinstance(typ, type), 'only actual types accepted' 571 if not isinstance(obj, typ): 572 import logging 573 574 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 575 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
578def warntype_o(obj: Any, typ: type[T]) -> T | None: 579 """Return an object typed as a given type. 580 581 Always checks the type at runtime and simply logs a warning if it is 582 not what is expected. 583 """ 584 assert isinstance(typ, type), 'only actual types accepted' 585 if not isinstance(obj, (typ, type(None))): 586 import logging 587 588 logging.warning( 589 'warntype: expected a %s or None, got a %s', typ, type(obj) 590 ) 591 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
594def assert_non_optional(obj: T | None) -> T: 595 """Return an object with Optional typing removed. 596 597 Assert is used to check its actual type, so only use this when 598 failures are not expected. Use check_non_optional otherwise. 599 """ 600 assert obj is not None 601 return obj
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
604def check_non_optional(obj: T | None) -> T: 605 """Return an object with Optional typing removed. 606 607 Always checks the actual type and throws a TypeError on failure. 608 Use assert_non_optional for a more efficient (but less safe) equivalent. 609 """ 610 if obj is None: 611 raise TypeError('Got None value in check_non_optional.') 612 return obj
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
615def smoothstep(edge0: float, edge1: float, x: float) -> float: 616 """A smooth transition function. 617 618 Returns a value that smoothly moves from 0 to 1 as we go between edges. 619 Values outside of the range return 0 or 1. 620 """ 621 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 622 return y * y * (3.0 - 2.0 * y)
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
625def linearstep(edge0: float, edge1: float, x: float) -> float: 626 """A linear transition function. 627 628 Returns a value that linearly moves from 0 to 1 as we go between edges. 629 Values outside of the range return 0 or 1. 630 """ 631 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
650def human_readable_compact_id(num: int) -> str: 651 """Given a positive int, return a compact string representation for it. 652 653 Handy for visualizing unique numeric ids using as few as possible chars. 654 This representation uses only lowercase letters and numbers (minus the 655 following letters for readability): 656 's' is excluded due to similarity to '5'. 657 'l' is excluded due to similarity to '1'. 658 'i' is excluded due to similarity to '1'. 659 'o' is excluded due to similarity to '0'. 660 'z' is excluded due to similarity to '2'. 661 662 Therefore for n chars this can store values of 21^n. 663 664 When reading human input consisting of these IDs, it may be desirable 665 to map the disallowed chars to their corresponding allowed ones 666 ('o' -> '0', etc). 667 668 Sort order for these ids is the same as the original numbers. 669 670 If more compactness is desired at the expense of readability, 671 use compact_id() instead. 672 """ 673 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
676def compact_id(num: int) -> str: 677 """Given a positive int, return a compact string representation for it. 678 679 Handy for visualizing unique numeric ids using as few as possible chars. 680 This version is more compact than human_readable_compact_id() but less 681 friendly to humans due to using both capital and lowercase letters, 682 both 'O' and '0', etc. 683 684 Therefore for n chars this can store values of 62^n. 685 686 Sort order for these ids is the same as the original numbers. 687 """ 688 return _compact_id( 689 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 690 )
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
693def unchanging_hostname() -> str: 694 """Return an unchanging name for the local device. 695 696 Similar to the `hostname` call (or os.uname().nodename in Python) 697 except attempts to give a name that doesn't change depending on 698 network conditions. (A Mac will tend to go from Foo to Foo.local, 699 Foo.lan etc. throughout its various adventures) 700 """ 701 import platform 702 import subprocess 703 704 # On Mac, this should give the computer name assigned in System Prefs. 705 if platform.system() == 'Darwin': 706 return ( 707 subprocess.run( 708 ['scutil', '--get', 'ComputerName'], 709 check=True, 710 capture_output=True, 711 ) 712 .stdout.decode() 713 .strip() 714 .replace(' ', '-') 715 ) 716 return os.uname().nodename
Return an unchanging name for the local device.
Similar to the hostname
call (or os.uname().nodename in Python)
except attempts to give a name that doesn't change depending on
network conditions. (A Mac will tend to go from Foo to Foo.local,
Foo.lan etc. throughout its various adventures)
719def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 720 """Do the thing.""" 721 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 722 return 723 724 modulename = module_globals.get('__name__') 725 if not isinstance(modulename, str): 726 raise RuntimeError('Unable to get module name.') 727 assert not modulename.startswith('_') 728 modulename_prefix = f'{modulename}.' 729 modulename_prefix_2 = f'_{modulename}.' 730 731 for name, obj in module_globals.items(): 732 if name.startswith('_'): 733 continue 734 existing = getattr(obj, '__module__', None) 735 try: 736 # Override the module ONLY if it lives under us somewhere. 737 # So ourpackage._submodule.Foo becomes ourpackage.Foo 738 # but otherpackage._submodule.Foo remains untouched. 739 if existing is not None and ( 740 existing.startswith(modulename_prefix) 741 or existing.startswith(modulename_prefix_2) 742 ): 743 obj.__module__ = modulename 744 except Exception: 745 import logging 746 747 logging.warning( 748 'set_canonical_module_names: unable to change __module__' 749 " from '%s' to '%s' on %s object at '%s'.", 750 existing, 751 modulename, 752 type(obj), 753 name, 754 )
Do the thing.
757def timedelta_str( 758 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 759) -> str: 760 """Return a simple human readable time string for a length of time. 761 762 Time can be given as a timedelta or a float representing seconds. 763 Example output: 764 "23d 1h 2m 32s" (with maxparts == 4) 765 "23d 1h" (with maxparts == 2) 766 "23d 1.08h" (with maxparts == 2 and decimals == 2) 767 768 Note that this is hard-coded in English and probably not especially 769 performant. 770 """ 771 # pylint: disable=too-many-locals 772 773 if isinstance(timeval, float): 774 timevalfin = datetime.timedelta(seconds=timeval) 775 else: 776 timevalfin = timeval 777 778 # Internally we only handle positive values. 779 if timevalfin.total_seconds() < 0: 780 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 781 782 years = timevalfin.days // 365 783 days = timevalfin.days % 365 784 hours = timevalfin.seconds // 3600 785 hour_remainder = timevalfin.seconds % 3600 786 minutes = hour_remainder // 60 787 seconds = hour_remainder % 60 788 789 # Now, if we want decimal places for our last value, 790 # calc fractional parts. 791 if decimals: 792 # Calc totals of each type. 793 t_seconds = timevalfin.total_seconds() 794 t_minutes = t_seconds / 60 795 t_hours = t_minutes / 60 796 t_days = t_hours / 24 797 t_years = t_days / 365 798 799 # Calc fractional parts that exclude all whole values to their left. 800 years_covered = years 801 years_f = t_years - years_covered 802 days_covered = years_covered * 365 + days 803 days_f = t_days - days_covered 804 hours_covered = days_covered * 24 + hours 805 hours_f = t_hours - hours_covered 806 minutes_covered = hours_covered * 60 + minutes 807 minutes_f = t_minutes - minutes_covered 808 seconds_covered = minutes_covered * 60 + seconds 809 seconds_f = t_seconds - seconds_covered 810 else: 811 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 812 813 parts: list[str] = [] 814 for part, part_f, suffix in ( 815 (years, years_f, 'y'), 816 (days, days_f, 'd'), 817 (hours, hours_f, 'h'), 818 (minutes, minutes_f, 'm'), 819 (seconds, seconds_f, 's'), 820 ): 821 if part or parts or (not parts and suffix == 's'): 822 # Do decimal version only for the last part. 823 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 824 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 825 else: 826 parts.append(f'{part}{suffix}') 827 if len(parts) >= maxparts: 828 break 829 return ' '.join(parts)
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
832def ago_str( 833 timeval: datetime.datetime, 834 maxparts: int = 1, 835 now: datetime.datetime | None = None, 836 decimals: int = 0, 837) -> str: 838 """Given a datetime, return a clean human readable 'ago' str. 839 840 Note that this is hard-coded in English so should not be used 841 for visible in-game elements; only tools/etc. 842 843 If now is not passed, efro.util.utc_now() is used. 844 """ 845 if now is None: 846 now = utc_now() 847 return ( 848 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 849 + ' ago' 850 )
Given a datetime, return a clean human readable 'ago' str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, utc_now() is used.