efro.util
Small handy bits of functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Small handy bits of functionality.""" 4 5from __future__ import annotations 6 7import os 8import time 9import weakref 10import datetime 11import functools 12from enum import Enum 13from typing import TYPE_CHECKING, cast, TypeVar, Generic 14 15if TYPE_CHECKING: 16 import asyncio 17 from efro.call import Call as Call # 'as Call' so we re-export. 18 from typing import Any, Callable 19 20T = TypeVar('T') 21ValT = TypeVar('ValT') 22ArgT = TypeVar('ArgT') 23SelfT = TypeVar('SelfT') 24RetT = TypeVar('RetT') 25EnumT = TypeVar('EnumT', bound=Enum) 26 27 28class _EmptyObj: 29 pass 30 31 32# A dead weak-ref should be immutable, right? So we can create exactly 33# one and return it for all cases that need an empty weak-ref. 34_g_empty_weak_ref = weakref.ref(_EmptyObj()) 35assert _g_empty_weak_ref() is None 36 37 38# TODO: kill this and just use efro.call.tpartial 39if TYPE_CHECKING: 40 Call = Call 41else: 42 Call = functools.partial 43 44 45def explicit_bool(val: bool) -> bool: 46 """Return a non-inferable boolean value. 47 48 Useful to be able to disable blocks of code without type checkers 49 complaining/etc. 50 """ 51 # pylint: disable=no-else-return 52 if TYPE_CHECKING: 53 # infer this! <boom> 54 import random 55 56 return random.random() < 0.5 57 else: 58 return val 59 60 61def snake_case_to_title(val: str) -> str: 62 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 63 # Kill empty words resulting from leading/trailing/multiple underscores. 64 return ' '.join(w for w in val.split('_') if w).title() 65 66 67def snake_case_to_camel_case(val: str) -> str: 68 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 69 # Replace underscores with spaces; capitalize words; kill spaces. 70 # Not sure about efficiency, but logically simple. 71 return val.replace('_', ' ').title().replace(' ', '') 72 73 74def enum_by_value(cls: type[EnumT], value: Any) -> EnumT: 75 """Create an enum from a value. 76 77 This is basically the same as doing 'obj = EnumType(value)' except 78 that it works around an issue where a reference loop is created 79 if an exception is thrown due to an invalid value. Since we disable 80 the cyclic garbage collector for most of the time, such loops can lead 81 to our objects sticking around longer than we want. 82 This issue has been submitted to Python as a bug so hopefully we can 83 remove this eventually if it gets fixed: https://bugs.python.org/issue42248 84 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this 85 off once we are 3.9+ across the board. 86 """ 87 88 # Note: we don't recreate *ALL* the functionality of the Enum constructor 89 # such as the _missing_ hook; but this should cover our basic needs. 90 value2member_map = getattr(cls, '_value2member_map_') 91 assert value2member_map is not None 92 try: 93 out = value2member_map[value] 94 assert isinstance(out, cls) 95 return out 96 except KeyError: 97 # pylint: disable=consider-using-f-string 98 raise ValueError( 99 '%r is not a valid %s' % (value, cls.__name__) 100 ) from None 101 102 103def check_utc(value: datetime.datetime) -> None: 104 """Ensure a datetime value is timezone-aware utc.""" 105 if value.tzinfo is not datetime.UTC: 106 raise ValueError( 107 'datetime value does not have timezone set as datetime.UTC' 108 ) 109 110 111def utc_now() -> datetime.datetime: 112 """Get timezone-aware current utc time. 113 114 Just a shortcut for datetime.datetime.now(datetime.UTC). 115 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 116 times. 117 """ 118 return datetime.datetime.now(datetime.UTC) 119 120 121def utc_now_naive() -> datetime.datetime: 122 """Get naive utc time. 123 124 This can be used to replace datetime.utcnow(), which is now deprecated. 125 Most all code should migrate to use timezone-aware times instead of 126 this. 127 """ 128 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None) 129 130 131def utc_today() -> datetime.datetime: 132 """Get offset-aware midnight in the utc time zone.""" 133 now = datetime.datetime.now(datetime.UTC) 134 return datetime.datetime( 135 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 136 ) 137 138 139def utc_this_hour() -> datetime.datetime: 140 """Get offset-aware beginning of the current hour in the utc time zone.""" 141 now = datetime.datetime.now(datetime.UTC) 142 return datetime.datetime( 143 year=now.year, 144 month=now.month, 145 day=now.day, 146 hour=now.hour, 147 tzinfo=now.tzinfo, 148 ) 149 150 151def utc_this_minute() -> datetime.datetime: 152 """Get offset-aware beginning of current minute in the utc time zone.""" 153 now = datetime.datetime.now(datetime.UTC) 154 return datetime.datetime( 155 year=now.year, 156 month=now.month, 157 day=now.day, 158 hour=now.hour, 159 minute=now.minute, 160 tzinfo=now.tzinfo, 161 ) 162 163 164def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 165 """Return an invalidated weak-reference for the specified type.""" 166 # At runtime, all weakrefs are the same; our type arg is just 167 # for the static type checker. 168 del objtype # Unused. 169 170 # Just create an object and let it die. Is there a cleaner way to do this? 171 # return weakref.ref(_EmptyObj()) # type: ignore 172 173 # Sharing a single ones seems at least a bit better. 174 return _g_empty_weak_ref # type: ignore 175 176 177def data_size_str(bytecount: int, compact: bool = False) -> str: 178 """Given a size in bytes, returns a short human readable string. 179 180 In compact mode this should be 6 or fewer chars for most all 181 sane file sizes. 182 """ 183 # pylint: disable=too-many-return-statements 184 185 # Special case: handle negatives. 186 if bytecount < 0: 187 val = data_size_str(-bytecount, compact=compact) 188 return f'-{val}' 189 190 if bytecount <= 999: 191 suffix = 'B' if compact else 'bytes' 192 return f'{bytecount} {suffix}' 193 kbytecount = bytecount / 1024 194 if round(kbytecount, 1) < 10.0: 195 return f'{kbytecount:.1f} KB' 196 if round(kbytecount, 0) < 999: 197 return f'{kbytecount:.0f} KB' 198 mbytecount = bytecount / (1024 * 1024) 199 if round(mbytecount, 1) < 10.0: 200 return f'{mbytecount:.1f} MB' 201 if round(mbytecount, 0) < 999: 202 return f'{mbytecount:.0f} MB' 203 gbytecount = bytecount / (1024 * 1024 * 1024) 204 if round(gbytecount, 1) < 10.0: 205 return f'{gbytecount:.1f} GB' 206 return f'{gbytecount:.0f} GB' 207 208 209class DirtyBit: 210 """Manages whether a thing is dirty and regulates attempts to clean it. 211 212 To use, simply set the 'dirty' value on this object to True when some 213 action is needed, and then check the 'should_update' value to regulate 214 when attempts to clean it should be made. Set 'dirty' back to False after 215 a successful update. 216 If 'use_lock' is True, an asyncio Lock will be created and incorporated 217 into update attempts to prevent simultaneous updates (should_update will 218 only return True when the lock is unlocked). Note that It is up to the user 219 to lock/unlock the lock during the actual update attempt. 220 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 221 itself back to dirty after being clean for the given amount of time. 222 'min_update_interval' can be used to enforce a minimum update 223 interval even when updates are successful (retry_interval only applies 224 when updates fail) 225 """ 226 227 def __init__( 228 self, 229 dirty: bool = False, 230 retry_interval: float = 5.0, 231 use_lock: bool = False, 232 auto_dirty_seconds: float | None = None, 233 min_update_interval: float | None = None, 234 ): 235 curtime = time.monotonic() 236 self._retry_interval = retry_interval 237 self._auto_dirty_seconds = auto_dirty_seconds 238 self._min_update_interval = min_update_interval 239 self._dirty = dirty 240 self._next_update_time: float | None = curtime if dirty else None 241 self._last_update_time: float | None = None 242 self._next_auto_dirty_time: float | None = ( 243 (curtime + self._auto_dirty_seconds) 244 if (not dirty and self._auto_dirty_seconds is not None) 245 else None 246 ) 247 self._use_lock = use_lock 248 self.lock: asyncio.Lock 249 if self._use_lock: 250 import asyncio 251 252 self.lock = asyncio.Lock() 253 254 @property 255 def dirty(self) -> bool: 256 """Whether the target is currently dirty. 257 258 This should be set to False once an update is successful. 259 """ 260 return self._dirty 261 262 @dirty.setter 263 def dirty(self, value: bool) -> None: 264 # If we're freshly clean, set our next auto-dirty time (if we have 265 # one). 266 if self._dirty and not value and self._auto_dirty_seconds is not None: 267 self._next_auto_dirty_time = ( 268 time.monotonic() + self._auto_dirty_seconds 269 ) 270 271 # If we're freshly dirty, schedule an immediate update. 272 if not self._dirty and value: 273 self._next_update_time = time.monotonic() 274 275 # If they want to enforce a minimum update interval, 276 # push out the next update time if it hasn't been long enough. 277 if ( 278 self._min_update_interval is not None 279 and self._last_update_time is not None 280 ): 281 self._next_update_time = max( 282 self._next_update_time, 283 self._last_update_time + self._min_update_interval, 284 ) 285 286 self._dirty = value 287 288 @property 289 def should_update(self) -> bool: 290 """Whether an attempt should be made to clean the target now. 291 292 Always returns False if the target is not dirty. 293 Takes into account the amount of time passed since the target 294 was marked dirty or since should_update last returned True. 295 """ 296 curtime = time.monotonic() 297 298 # Auto-dirty ourself if we're into that. 299 if ( 300 self._next_auto_dirty_time is not None 301 and curtime > self._next_auto_dirty_time 302 ): 303 self.dirty = True 304 self._next_auto_dirty_time = None 305 if not self._dirty: 306 return False 307 if self._use_lock and self.lock.locked(): 308 return False 309 assert self._next_update_time is not None 310 if curtime > self._next_update_time: 311 self._next_update_time = curtime + self._retry_interval 312 self._last_update_time = curtime 313 return True 314 return False 315 316 317class DispatchMethodWrapper(Generic[ArgT, RetT]): 318 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 319 320 def __call__(self, arg: ArgT) -> RetT: 321 raise RuntimeError('Should not get here') 322 323 @staticmethod 324 def register( 325 func: Callable[[Any, Any], RetT] 326 ) -> Callable[[Any, Any], RetT]: 327 """Register a new dispatch handler for this dispatch-method.""" 328 raise RuntimeError('Should not get here') 329 330 registry: dict[Any, Callable] 331 332 333# noinspection PyProtectedMember,PyTypeHints 334def dispatchmethod( 335 func: Callable[[Any, ArgT], RetT] 336) -> DispatchMethodWrapper[ArgT, RetT]: 337 """A variation of functools.singledispatch for methods. 338 339 Note: as of Python 3.9 there is now functools.singledispatchmethod, 340 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 341 which gives us a reason to keep this one around for now. 342 """ 343 from functools import singledispatch, update_wrapper 344 345 origwrapper: Any = singledispatch(func) 346 347 # Pull this out so hopefully origwrapper can die, 348 # otherwise we reference origwrapper in our wrapper. 349 dispatch = origwrapper.dispatch 350 351 # All we do here is recreate the end of functools.singledispatch 352 # where it returns a wrapper except instead of the wrapper using the 353 # first arg to the function ours uses the second (to skip 'self'). 354 # This was made against Python 3.7; we should probably check up on 355 # this in later versions in case anything has changed. 356 # (or hopefully they'll add this functionality to their version) 357 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 358 def wrapper(*args: Any, **kw: Any) -> Any: 359 if not args or len(args) < 2: 360 raise TypeError( 361 f'{funcname} requires at least ' '2 positional arguments' 362 ) 363 364 return dispatch(args[1].__class__)(*args, **kw) 365 366 funcname = getattr(func, '__name__', 'dispatchmethod method') 367 wrapper.register = origwrapper.register # type: ignore 368 wrapper.dispatch = dispatch # type: ignore 369 wrapper.registry = origwrapper.registry # type: ignore 370 # pylint: disable=protected-access 371 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 372 update_wrapper(wrapper, func) 373 # pylint: enable=protected-access 374 return cast(DispatchMethodWrapper, wrapper) 375 376 377def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 378 """Decorator for functions to allow dispatching based on a value. 379 380 This differs from functools.singledispatch in that it dispatches based 381 on the value of an argument, not based on its type. 382 The 'register' method of a value-dispatch function can be used 383 to assign new functions to handle particular values. 384 Unhandled values wind up in the original dispatch function.""" 385 return ValueDispatcher(call) 386 387 388class ValueDispatcher(Generic[ValT, RetT]): 389 """Used by the valuedispatch decorator""" 390 391 def __init__(self, call: Callable[[ValT], RetT]) -> None: 392 self._base_call = call 393 self._handlers: dict[ValT, Callable[[], RetT]] = {} 394 395 def __call__(self, value: ValT) -> RetT: 396 handler = self._handlers.get(value) 397 if handler is not None: 398 return handler() 399 return self._base_call(value) 400 401 def _add_handler( 402 self, value: ValT, call: Callable[[], RetT] 403 ) -> Callable[[], RetT]: 404 if value in self._handlers: 405 raise RuntimeError(f'Duplicate handlers added for {value}') 406 self._handlers[value] = call 407 return call 408 409 def register( 410 self, value: ValT 411 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 412 """Add a handler to the dispatcher.""" 413 from functools import partial 414 415 return partial(self._add_handler, value) 416 417 418def valuedispatch1arg( 419 call: Callable[[ValT, ArgT], RetT] 420) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 421 """Like valuedispatch but for functions taking an extra argument.""" 422 return ValueDispatcher1Arg(call) 423 424 425class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 426 """Used by the valuedispatch1arg decorator""" 427 428 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 429 self._base_call = call 430 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 431 432 def __call__(self, value: ValT, arg: ArgT) -> RetT: 433 handler = self._handlers.get(value) 434 if handler is not None: 435 return handler(arg) 436 return self._base_call(value, arg) 437 438 def _add_handler( 439 self, value: ValT, call: Callable[[ArgT], RetT] 440 ) -> Callable[[ArgT], RetT]: 441 if value in self._handlers: 442 raise RuntimeError(f'Duplicate handlers added for {value}') 443 self._handlers[value] = call 444 return call 445 446 def register( 447 self, value: ValT 448 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 449 """Add a handler to the dispatcher.""" 450 from functools import partial 451 452 return partial(self._add_handler, value) 453 454 455if TYPE_CHECKING: 456 457 class ValueDispatcherMethod(Generic[ValT, RetT]): 458 """Used by the valuedispatchmethod decorator.""" 459 460 def __call__(self, value: ValT) -> RetT: ... 461 462 def register( 463 self, value: ValT 464 ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]: 465 """Add a handler to the dispatcher.""" 466 ... 467 468 469def valuedispatchmethod( 470 call: Callable[[SelfT, ValT], RetT] 471) -> ValueDispatcherMethod[ValT, RetT]: 472 """Like valuedispatch but works with methods instead of functions.""" 473 474 # NOTE: It seems that to wrap a method with a decorator and have self 475 # dispatching do the right thing, we must return a function and not 476 # an executable object. So for this version we store our data here 477 # in the function call dict and simply return a call. 478 479 _base_call = call 480 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 481 482 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 483 if value in _handlers: 484 raise RuntimeError(f'Duplicate handlers added for {value}') 485 _handlers[value] = addcall 486 487 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 488 from functools import partial 489 490 return partial(_add_handler, value) 491 492 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 493 handler = _handlers.get(value) 494 if handler is not None: 495 return handler(self) 496 return _base_call(self, value) 497 498 # We still want to use our returned object to register handlers, but we're 499 # actually just returning a function. So manually stuff the call onto it. 500 setattr(_call_wrapper, 'register', _register) 501 502 # To the type checker's eyes we return a ValueDispatchMethod instance; 503 # this lets it know about our register func and type-check its usage. 504 # In reality we just return a raw function call (for reasons listed above). 505 # pylint: disable=undefined-variable, no-else-return 506 if TYPE_CHECKING: 507 return ValueDispatcherMethod[ValT, RetT]() 508 else: 509 return _call_wrapper 510 511 512def make_hash(obj: Any) -> int: 513 """Makes a hash from a dictionary, list, tuple or set to any level, 514 that contains only other hashable types (including any lists, tuples, 515 sets, and dictionaries). 516 517 Note that this uses Python's hash() function internally so collisions/etc. 518 may be more common than with fancy cryptographic hashes. 519 520 Also be aware that Python's hash() output varies across processes, so 521 this should only be used for values that will remain in a single process. 522 """ 523 import copy 524 525 if isinstance(obj, (set, tuple, list)): 526 return hash(tuple(make_hash(e) for e in obj)) 527 if not isinstance(obj, dict): 528 return hash(obj) 529 530 new_obj = copy.deepcopy(obj) 531 for k, v in new_obj.items(): 532 new_obj[k] = make_hash(v) 533 534 # NOTE: there is sorted works correctly because it compares only 535 # unique first values (i.e. dict keys) 536 return hash(tuple(frozenset(sorted(new_obj.items())))) 537 538 539def asserttype(obj: Any, typ: type[T]) -> T: 540 """Return an object typed as a given type. 541 542 Assert is used to check its actual type, so only use this when 543 failures are not expected. Otherwise use checktype. 544 """ 545 assert isinstance(typ, type), 'only actual types accepted' 546 assert isinstance(obj, typ) 547 return obj 548 549 550def asserttype_o(obj: Any, typ: type[T]) -> T | None: 551 """Return an object typed as a given optional type. 552 553 Assert is used to check its actual type, so only use this when 554 failures are not expected. Otherwise use checktype. 555 """ 556 assert isinstance(typ, type), 'only actual types accepted' 557 assert isinstance(obj, (typ, type(None))) 558 return obj 559 560 561def checktype(obj: Any, typ: type[T]) -> T: 562 """Return an object typed as a given type. 563 564 Always checks the type at runtime with isinstance and throws a TypeError 565 on failure. Use asserttype for more efficient (but less safe) equivalent. 566 """ 567 assert isinstance(typ, type), 'only actual types accepted' 568 if not isinstance(obj, typ): 569 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 570 return obj 571 572 573def checktype_o(obj: Any, typ: type[T]) -> T | None: 574 """Return an object typed as a given optional type. 575 576 Always checks the type at runtime with isinstance and throws a TypeError 577 on failure. Use asserttype for more efficient (but less safe) equivalent. 578 """ 579 assert isinstance(typ, type), 'only actual types accepted' 580 if not isinstance(obj, (typ, type(None))): 581 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 582 return obj 583 584 585def warntype(obj: Any, typ: type[T]) -> T: 586 """Return an object typed as a given type. 587 588 Always checks the type at runtime and simply logs a warning if it is 589 not what is expected. 590 """ 591 assert isinstance(typ, type), 'only actual types accepted' 592 if not isinstance(obj, typ): 593 import logging 594 595 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 596 return obj # type: ignore 597 598 599def warntype_o(obj: Any, typ: type[T]) -> T | None: 600 """Return an object typed as a given type. 601 602 Always checks the type at runtime and simply logs a warning if it is 603 not what is expected. 604 """ 605 assert isinstance(typ, type), 'only actual types accepted' 606 if not isinstance(obj, (typ, type(None))): 607 import logging 608 609 logging.warning( 610 'warntype: expected a %s or None, got a %s', typ, type(obj) 611 ) 612 return obj # type: ignore 613 614 615def assert_non_optional(obj: T | None) -> T: 616 """Return an object with Optional typing removed. 617 618 Assert is used to check its actual type, so only use this when 619 failures are not expected. Use check_non_optional otherwise. 620 """ 621 assert obj is not None 622 return obj 623 624 625def check_non_optional(obj: T | None) -> T: 626 """Return an object with Optional typing removed. 627 628 Always checks the actual type and throws a TypeError on failure. 629 Use assert_non_optional for a more efficient (but less safe) equivalent. 630 """ 631 if obj is None: 632 raise ValueError('Got None value in check_non_optional.') 633 return obj 634 635 636def smoothstep(edge0: float, edge1: float, x: float) -> float: 637 """A smooth transition function. 638 639 Returns a value that smoothly moves from 0 to 1 as we go between edges. 640 Values outside of the range return 0 or 1. 641 """ 642 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 643 return y * y * (3.0 - 2.0 * y) 644 645 646def linearstep(edge0: float, edge1: float, x: float) -> float: 647 """A linear transition function. 648 649 Returns a value that linearly moves from 0 to 1 as we go between edges. 650 Values outside of the range return 0 or 1. 651 """ 652 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) 653 654 655def _compact_id(num: int, chars: str) -> str: 656 if num < 0: 657 raise ValueError('Negative integers not allowed.') 658 659 # Chars must be in sorted order for sorting to work correctly 660 # on our output. 661 assert ''.join(sorted(list(chars))) == chars 662 663 base = len(chars) 664 out = '' 665 while num: 666 out += chars[num % base] 667 num //= base 668 return out[::-1] or '0' 669 670 671def human_readable_compact_id(num: int) -> str: 672 """Given a positive int, return a compact string representation for it. 673 674 Handy for visualizing unique numeric ids using as few as possible chars. 675 This representation uses only lowercase letters and numbers (minus the 676 following letters for readability): 677 's' is excluded due to similarity to '5'. 678 'l' is excluded due to similarity to '1'. 679 'i' is excluded due to similarity to '1'. 680 'o' is excluded due to similarity to '0'. 681 'z' is excluded due to similarity to '2'. 682 683 Therefore for n chars this can store values of 21^n. 684 685 When reading human input consisting of these IDs, it may be desirable 686 to map the disallowed chars to their corresponding allowed ones 687 ('o' -> '0', etc). 688 689 Sort order for these ids is the same as the original numbers. 690 691 If more compactness is desired at the expense of readability, 692 use compact_id() instead. 693 """ 694 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy') 695 696 697def compact_id(num: int) -> str: 698 """Given a positive int, return a compact string representation for it. 699 700 Handy for visualizing unique numeric ids using as few as possible chars. 701 This version is more compact than human_readable_compact_id() but less 702 friendly to humans due to using both capital and lowercase letters, 703 both 'O' and '0', etc. 704 705 Therefore for n chars this can store values of 62^n. 706 707 Sort order for these ids is the same as the original numbers. 708 """ 709 return _compact_id( 710 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 711 ) 712 713 714def unchanging_hostname() -> str: 715 """Return an unchanging name for the local device. 716 717 Similar to the `hostname` call (or os.uname().nodename in Python) 718 except attempts to give a name that doesn't change depending on 719 network conditions. (A Mac will tend to go from Foo to Foo.local, 720 Foo.lan etc. throughout its various adventures) 721 """ 722 import platform 723 import subprocess 724 725 # On Mac, this should give the computer name assigned in System Prefs. 726 if platform.system() == 'Darwin': 727 return ( 728 subprocess.run( 729 ['scutil', '--get', 'ComputerName'], 730 check=True, 731 capture_output=True, 732 ) 733 .stdout.decode() 734 .strip() 735 .replace(' ', '-') 736 ) 737 return os.uname().nodename 738 739 740def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 741 """Do the thing.""" 742 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 743 return 744 745 modulename = module_globals.get('__name__') 746 if not isinstance(modulename, str): 747 raise RuntimeError('Unable to get module name.') 748 assert not modulename.startswith('_') 749 modulename_prefix = f'{modulename}.' 750 modulename_prefix_2 = f'_{modulename}.' 751 752 for name, obj in module_globals.items(): 753 if name.startswith('_'): 754 continue 755 existing = getattr(obj, '__module__', None) 756 try: 757 # Override the module ONLY if it lives under us somewhere. 758 # So ourpackage._submodule.Foo becomes ourpackage.Foo 759 # but otherpackage._submodule.Foo remains untouched. 760 if existing is not None and ( 761 existing.startswith(modulename_prefix) 762 or existing.startswith(modulename_prefix_2) 763 ): 764 obj.__module__ = modulename 765 except Exception: 766 import logging 767 768 logging.warning( 769 'set_canonical_module_names: unable to change __module__' 770 " from '%s' to '%s' on %s object at '%s'.", 771 existing, 772 modulename, 773 type(obj), 774 name, 775 ) 776 777 778def timedelta_str( 779 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 780) -> str: 781 """Return a simple human readable time string for a length of time. 782 783 Time can be given as a timedelta or a float representing seconds. 784 Example output: 785 "23d 1h 2m 32s" (with maxparts == 4) 786 "23d 1h" (with maxparts == 2) 787 "23d 1.08h" (with maxparts == 2 and decimals == 2) 788 789 Note that this is hard-coded in English and probably not especially 790 performant. 791 """ 792 # pylint: disable=too-many-locals 793 794 if isinstance(timeval, float): 795 timevalfin = datetime.timedelta(seconds=timeval) 796 else: 797 timevalfin = timeval 798 799 # Internally we only handle positive values. 800 if timevalfin.total_seconds() < 0: 801 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 802 803 years = timevalfin.days // 365 804 days = timevalfin.days % 365 805 hours = timevalfin.seconds // 3600 806 hour_remainder = timevalfin.seconds % 3600 807 minutes = hour_remainder // 60 808 seconds = hour_remainder % 60 809 810 # Now, if we want decimal places for our last value, 811 # calc fractional parts. 812 if decimals: 813 # Calc totals of each type. 814 t_seconds = timevalfin.total_seconds() 815 t_minutes = t_seconds / 60 816 t_hours = t_minutes / 60 817 t_days = t_hours / 24 818 t_years = t_days / 365 819 820 # Calc fractional parts that exclude all whole values to their left. 821 years_covered = years 822 years_f = t_years - years_covered 823 days_covered = years_covered * 365 + days 824 days_f = t_days - days_covered 825 hours_covered = days_covered * 24 + hours 826 hours_f = t_hours - hours_covered 827 minutes_covered = hours_covered * 60 + minutes 828 minutes_f = t_minutes - minutes_covered 829 seconds_covered = minutes_covered * 60 + seconds 830 seconds_f = t_seconds - seconds_covered 831 else: 832 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 833 834 parts: list[str] = [] 835 for part, part_f, suffix in ( 836 (years, years_f, 'y'), 837 (days, days_f, 'd'), 838 (hours, hours_f, 'h'), 839 (minutes, minutes_f, 'm'), 840 (seconds, seconds_f, 's'), 841 ): 842 if part or parts or (not parts and suffix == 's'): 843 # Do decimal version only for the last part. 844 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 845 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 846 else: 847 parts.append(f'{part}{suffix}') 848 if len(parts) >= maxparts: 849 break 850 return ' '.join(parts) 851 852 853def ago_str( 854 timeval: datetime.datetime, 855 maxparts: int = 1, 856 now: datetime.datetime | None = None, 857 decimals: int = 0, 858) -> str: 859 """Given a datetime, return a clean human readable 'ago' str. 860 861 Note that this is hard-coded in English so should not be used 862 for visible in-game elements; only tools/etc. 863 864 If now is not passed, efro.util.utc_now() is used. 865 """ 866 if now is None: 867 now = utc_now() 868 return ( 869 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 870 + ' ago' 871 ) 872 873 874def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 875 """Split a single list into smaller lists.""" 876 return [ 877 input_list[i : i + max_length] 878 for i in range(0, len(input_list), max_length) 879 ]
46def explicit_bool(val: bool) -> bool: 47 """Return a non-inferable boolean value. 48 49 Useful to be able to disable blocks of code without type checkers 50 complaining/etc. 51 """ 52 # pylint: disable=no-else-return 53 if TYPE_CHECKING: 54 # infer this! <boom> 55 import random 56 57 return random.random() < 0.5 58 else: 59 return val
Return a non-inferable boolean value.
Useful to be able to disable blocks of code without type checkers complaining/etc.
62def snake_case_to_title(val: str) -> str: 63 """Given a snake-case string 'foo_bar', returns 'Foo Bar'.""" 64 # Kill empty words resulting from leading/trailing/multiple underscores. 65 return ' '.join(w for w in val.split('_') if w).title()
Given a snake-case string 'foo_bar', returns 'Foo Bar'.
68def snake_case_to_camel_case(val: str) -> str: 69 """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.""" 70 # Replace underscores with spaces; capitalize words; kill spaces. 71 # Not sure about efficiency, but logically simple. 72 return val.replace('_', ' ').title().replace(' ', '')
Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.
75def enum_by_value(cls: type[EnumT], value: Any) -> EnumT: 76 """Create an enum from a value. 77 78 This is basically the same as doing 'obj = EnumType(value)' except 79 that it works around an issue where a reference loop is created 80 if an exception is thrown due to an invalid value. Since we disable 81 the cyclic garbage collector for most of the time, such loops can lead 82 to our objects sticking around longer than we want. 83 This issue has been submitted to Python as a bug so hopefully we can 84 remove this eventually if it gets fixed: https://bugs.python.org/issue42248 85 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this 86 off once we are 3.9+ across the board. 87 """ 88 89 # Note: we don't recreate *ALL* the functionality of the Enum constructor 90 # such as the _missing_ hook; but this should cover our basic needs. 91 value2member_map = getattr(cls, '_value2member_map_') 92 assert value2member_map is not None 93 try: 94 out = value2member_map[value] 95 assert isinstance(out, cls) 96 return out 97 except KeyError: 98 # pylint: disable=consider-using-f-string 99 raise ValueError( 100 '%r is not a valid %s' % (value, cls.__name__) 101 ) from None
Create an enum from a value.
This is basically the same as doing 'obj = EnumType(value)' except that it works around an issue where a reference loop is created if an exception is thrown due to an invalid value. Since we disable the cyclic garbage collector for most of the time, such loops can lead to our objects sticking around longer than we want. This issue has been submitted to Python as a bug so hopefully we can remove this eventually if it gets fixed: https://bugs.python.org/issue42248 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this off once we are 3.9+ across the board.
104def check_utc(value: datetime.datetime) -> None: 105 """Ensure a datetime value is timezone-aware utc.""" 106 if value.tzinfo is not datetime.UTC: 107 raise ValueError( 108 'datetime value does not have timezone set as datetime.UTC' 109 )
Ensure a datetime value is timezone-aware utc.
112def utc_now() -> datetime.datetime: 113 """Get timezone-aware current utc time. 114 115 Just a shortcut for datetime.datetime.now(datetime.UTC). 116 Avoid datetime.datetime.utcnow() which is deprecated and gives naive 117 times. 118 """ 119 return datetime.datetime.now(datetime.UTC)
Get timezone-aware current utc time.
Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.
122def utc_now_naive() -> datetime.datetime: 123 """Get naive utc time. 124 125 This can be used to replace datetime.utcnow(), which is now deprecated. 126 Most all code should migrate to use timezone-aware times instead of 127 this. 128 """ 129 return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
Get naive utc time.
This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.
132def utc_today() -> datetime.datetime: 133 """Get offset-aware midnight in the utc time zone.""" 134 now = datetime.datetime.now(datetime.UTC) 135 return datetime.datetime( 136 year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo 137 )
Get offset-aware midnight in the utc time zone.
140def utc_this_hour() -> datetime.datetime: 141 """Get offset-aware beginning of the current hour in the utc time zone.""" 142 now = datetime.datetime.now(datetime.UTC) 143 return datetime.datetime( 144 year=now.year, 145 month=now.month, 146 day=now.day, 147 hour=now.hour, 148 tzinfo=now.tzinfo, 149 )
Get offset-aware beginning of the current hour in the utc time zone.
152def utc_this_minute() -> datetime.datetime: 153 """Get offset-aware beginning of current minute in the utc time zone.""" 154 now = datetime.datetime.now(datetime.UTC) 155 return datetime.datetime( 156 year=now.year, 157 month=now.month, 158 day=now.day, 159 hour=now.hour, 160 minute=now.minute, 161 tzinfo=now.tzinfo, 162 )
Get offset-aware beginning of current minute in the utc time zone.
165def empty_weakref(objtype: type[T]) -> weakref.ref[T]: 166 """Return an invalidated weak-reference for the specified type.""" 167 # At runtime, all weakrefs are the same; our type arg is just 168 # for the static type checker. 169 del objtype # Unused. 170 171 # Just create an object and let it die. Is there a cleaner way to do this? 172 # return weakref.ref(_EmptyObj()) # type: ignore 173 174 # Sharing a single ones seems at least a bit better. 175 return _g_empty_weak_ref # type: ignore
Return an invalidated weak-reference for the specified type.
178def data_size_str(bytecount: int, compact: bool = False) -> str: 179 """Given a size in bytes, returns a short human readable string. 180 181 In compact mode this should be 6 or fewer chars for most all 182 sane file sizes. 183 """ 184 # pylint: disable=too-many-return-statements 185 186 # Special case: handle negatives. 187 if bytecount < 0: 188 val = data_size_str(-bytecount, compact=compact) 189 return f'-{val}' 190 191 if bytecount <= 999: 192 suffix = 'B' if compact else 'bytes' 193 return f'{bytecount} {suffix}' 194 kbytecount = bytecount / 1024 195 if round(kbytecount, 1) < 10.0: 196 return f'{kbytecount:.1f} KB' 197 if round(kbytecount, 0) < 999: 198 return f'{kbytecount:.0f} KB' 199 mbytecount = bytecount / (1024 * 1024) 200 if round(mbytecount, 1) < 10.0: 201 return f'{mbytecount:.1f} MB' 202 if round(mbytecount, 0) < 999: 203 return f'{mbytecount:.0f} MB' 204 gbytecount = bytecount / (1024 * 1024 * 1024) 205 if round(gbytecount, 1) < 10.0: 206 return f'{gbytecount:.1f} GB' 207 return f'{gbytecount:.0f} GB'
Given a size in bytes, returns a short human readable string.
In compact mode this should be 6 or fewer chars for most all sane file sizes.
210class DirtyBit: 211 """Manages whether a thing is dirty and regulates attempts to clean it. 212 213 To use, simply set the 'dirty' value on this object to True when some 214 action is needed, and then check the 'should_update' value to regulate 215 when attempts to clean it should be made. Set 'dirty' back to False after 216 a successful update. 217 If 'use_lock' is True, an asyncio Lock will be created and incorporated 218 into update attempts to prevent simultaneous updates (should_update will 219 only return True when the lock is unlocked). Note that It is up to the user 220 to lock/unlock the lock during the actual update attempt. 221 If a value is passed for 'auto_dirty_seconds', the dirtybit will flip 222 itself back to dirty after being clean for the given amount of time. 223 'min_update_interval' can be used to enforce a minimum update 224 interval even when updates are successful (retry_interval only applies 225 when updates fail) 226 """ 227 228 def __init__( 229 self, 230 dirty: bool = False, 231 retry_interval: float = 5.0, 232 use_lock: bool = False, 233 auto_dirty_seconds: float | None = None, 234 min_update_interval: float | None = None, 235 ): 236 curtime = time.monotonic() 237 self._retry_interval = retry_interval 238 self._auto_dirty_seconds = auto_dirty_seconds 239 self._min_update_interval = min_update_interval 240 self._dirty = dirty 241 self._next_update_time: float | None = curtime if dirty else None 242 self._last_update_time: float | None = None 243 self._next_auto_dirty_time: float | None = ( 244 (curtime + self._auto_dirty_seconds) 245 if (not dirty and self._auto_dirty_seconds is not None) 246 else None 247 ) 248 self._use_lock = use_lock 249 self.lock: asyncio.Lock 250 if self._use_lock: 251 import asyncio 252 253 self.lock = asyncio.Lock() 254 255 @property 256 def dirty(self) -> bool: 257 """Whether the target is currently dirty. 258 259 This should be set to False once an update is successful. 260 """ 261 return self._dirty 262 263 @dirty.setter 264 def dirty(self, value: bool) -> None: 265 # If we're freshly clean, set our next auto-dirty time (if we have 266 # one). 267 if self._dirty and not value and self._auto_dirty_seconds is not None: 268 self._next_auto_dirty_time = ( 269 time.monotonic() + self._auto_dirty_seconds 270 ) 271 272 # If we're freshly dirty, schedule an immediate update. 273 if not self._dirty and value: 274 self._next_update_time = time.monotonic() 275 276 # If they want to enforce a minimum update interval, 277 # push out the next update time if it hasn't been long enough. 278 if ( 279 self._min_update_interval is not None 280 and self._last_update_time is not None 281 ): 282 self._next_update_time = max( 283 self._next_update_time, 284 self._last_update_time + self._min_update_interval, 285 ) 286 287 self._dirty = value 288 289 @property 290 def should_update(self) -> bool: 291 """Whether an attempt should be made to clean the target now. 292 293 Always returns False if the target is not dirty. 294 Takes into account the amount of time passed since the target 295 was marked dirty or since should_update last returned True. 296 """ 297 curtime = time.monotonic() 298 299 # Auto-dirty ourself if we're into that. 300 if ( 301 self._next_auto_dirty_time is not None 302 and curtime > self._next_auto_dirty_time 303 ): 304 self.dirty = True 305 self._next_auto_dirty_time = None 306 if not self._dirty: 307 return False 308 if self._use_lock and self.lock.locked(): 309 return False 310 assert self._next_update_time is not None 311 if curtime > self._next_update_time: 312 self._next_update_time = curtime + self._retry_interval 313 self._last_update_time = curtime 314 return True 315 return False
Manages whether a thing is dirty and regulates attempts to clean it.
To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
228 def __init__( 229 self, 230 dirty: bool = False, 231 retry_interval: float = 5.0, 232 use_lock: bool = False, 233 auto_dirty_seconds: float | None = None, 234 min_update_interval: float | None = None, 235 ): 236 curtime = time.monotonic() 237 self._retry_interval = retry_interval 238 self._auto_dirty_seconds = auto_dirty_seconds 239 self._min_update_interval = min_update_interval 240 self._dirty = dirty 241 self._next_update_time: float | None = curtime if dirty else None 242 self._last_update_time: float | None = None 243 self._next_auto_dirty_time: float | None = ( 244 (curtime + self._auto_dirty_seconds) 245 if (not dirty and self._auto_dirty_seconds is not None) 246 else None 247 ) 248 self._use_lock = use_lock 249 self.lock: asyncio.Lock 250 if self._use_lock: 251 import asyncio 252 253 self.lock = asyncio.Lock()
255 @property 256 def dirty(self) -> bool: 257 """Whether the target is currently dirty. 258 259 This should be set to False once an update is successful. 260 """ 261 return self._dirty
Whether the target is currently dirty.
This should be set to False once an update is successful.
289 @property 290 def should_update(self) -> bool: 291 """Whether an attempt should be made to clean the target now. 292 293 Always returns False if the target is not dirty. 294 Takes into account the amount of time passed since the target 295 was marked dirty or since should_update last returned True. 296 """ 297 curtime = time.monotonic() 298 299 # Auto-dirty ourself if we're into that. 300 if ( 301 self._next_auto_dirty_time is not None 302 and curtime > self._next_auto_dirty_time 303 ): 304 self.dirty = True 305 self._next_auto_dirty_time = None 306 if not self._dirty: 307 return False 308 if self._use_lock and self.lock.locked(): 309 return False 310 assert self._next_update_time is not None 311 if curtime > self._next_update_time: 312 self._next_update_time = curtime + self._retry_interval 313 self._last_update_time = curtime 314 return True 315 return False
Whether an attempt should be made to clean the target now.
Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.
318class DispatchMethodWrapper(Generic[ArgT, RetT]): 319 """Type-aware standin for the dispatch func returned by dispatchmethod.""" 320 321 def __call__(self, arg: ArgT) -> RetT: 322 raise RuntimeError('Should not get here') 323 324 @staticmethod 325 def register( 326 func: Callable[[Any, Any], RetT] 327 ) -> Callable[[Any, Any], RetT]: 328 """Register a new dispatch handler for this dispatch-method.""" 329 raise RuntimeError('Should not get here') 330 331 registry: dict[Any, Callable]
Type-aware standin for the dispatch func returned by dispatchmethod.
324 @staticmethod 325 def register( 326 func: Callable[[Any, Any], RetT] 327 ) -> Callable[[Any, Any], RetT]: 328 """Register a new dispatch handler for this dispatch-method.""" 329 raise RuntimeError('Should not get here')
Register a new dispatch handler for this dispatch-method.
335def dispatchmethod( 336 func: Callable[[Any, ArgT], RetT] 337) -> DispatchMethodWrapper[ArgT, RetT]: 338 """A variation of functools.singledispatch for methods. 339 340 Note: as of Python 3.9 there is now functools.singledispatchmethod, 341 but it currently (as of Jan 2021) is not type-aware (at least in mypy), 342 which gives us a reason to keep this one around for now. 343 """ 344 from functools import singledispatch, update_wrapper 345 346 origwrapper: Any = singledispatch(func) 347 348 # Pull this out so hopefully origwrapper can die, 349 # otherwise we reference origwrapper in our wrapper. 350 dispatch = origwrapper.dispatch 351 352 # All we do here is recreate the end of functools.singledispatch 353 # where it returns a wrapper except instead of the wrapper using the 354 # first arg to the function ours uses the second (to skip 'self'). 355 # This was made against Python 3.7; we should probably check up on 356 # this in later versions in case anything has changed. 357 # (or hopefully they'll add this functionality to their version) 358 # NOTE: sounds like we can use functools singledispatchmethod in 3.8 359 def wrapper(*args: Any, **kw: Any) -> Any: 360 if not args or len(args) < 2: 361 raise TypeError( 362 f'{funcname} requires at least ' '2 positional arguments' 363 ) 364 365 return dispatch(args[1].__class__)(*args, **kw) 366 367 funcname = getattr(func, '__name__', 'dispatchmethod method') 368 wrapper.register = origwrapper.register # type: ignore 369 wrapper.dispatch = dispatch # type: ignore 370 wrapper.registry = origwrapper.registry # type: ignore 371 # pylint: disable=protected-access 372 wrapper._clear_cache = origwrapper._clear_cache # type: ignore 373 update_wrapper(wrapper, func) 374 # pylint: enable=protected-access 375 return cast(DispatchMethodWrapper, wrapper)
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
378def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]: 379 """Decorator for functions to allow dispatching based on a value. 380 381 This differs from functools.singledispatch in that it dispatches based 382 on the value of an argument, not based on its type. 383 The 'register' method of a value-dispatch function can be used 384 to assign new functions to handle particular values. 385 Unhandled values wind up in the original dispatch function.""" 386 return ValueDispatcher(call)
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
389class ValueDispatcher(Generic[ValT, RetT]): 390 """Used by the valuedispatch decorator""" 391 392 def __init__(self, call: Callable[[ValT], RetT]) -> None: 393 self._base_call = call 394 self._handlers: dict[ValT, Callable[[], RetT]] = {} 395 396 def __call__(self, value: ValT) -> RetT: 397 handler = self._handlers.get(value) 398 if handler is not None: 399 return handler() 400 return self._base_call(value) 401 402 def _add_handler( 403 self, value: ValT, call: Callable[[], RetT] 404 ) -> Callable[[], RetT]: 405 if value in self._handlers: 406 raise RuntimeError(f'Duplicate handlers added for {value}') 407 self._handlers[value] = call 408 return call 409 410 def register( 411 self, value: ValT 412 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 413 """Add a handler to the dispatcher.""" 414 from functools import partial 415 416 return partial(self._add_handler, value)
Used by the valuedispatch decorator
410 def register( 411 self, value: ValT 412 ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]: 413 """Add a handler to the dispatcher.""" 414 from functools import partial 415 416 return partial(self._add_handler, value)
Add a handler to the dispatcher.
419def valuedispatch1arg( 420 call: Callable[[ValT, ArgT], RetT] 421) -> ValueDispatcher1Arg[ValT, ArgT, RetT]: 422 """Like valuedispatch but for functions taking an extra argument.""" 423 return ValueDispatcher1Arg(call)
Like valuedispatch but for functions taking an extra argument.
426class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]): 427 """Used by the valuedispatch1arg decorator""" 428 429 def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None: 430 self._base_call = call 431 self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {} 432 433 def __call__(self, value: ValT, arg: ArgT) -> RetT: 434 handler = self._handlers.get(value) 435 if handler is not None: 436 return handler(arg) 437 return self._base_call(value, arg) 438 439 def _add_handler( 440 self, value: ValT, call: Callable[[ArgT], RetT] 441 ) -> Callable[[ArgT], RetT]: 442 if value in self._handlers: 443 raise RuntimeError(f'Duplicate handlers added for {value}') 444 self._handlers[value] = call 445 return call 446 447 def register( 448 self, value: ValT 449 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 450 """Add a handler to the dispatcher.""" 451 from functools import partial 452 453 return partial(self._add_handler, value)
Used by the valuedispatch1arg decorator
447 def register( 448 self, value: ValT 449 ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]: 450 """Add a handler to the dispatcher.""" 451 from functools import partial 452 453 return partial(self._add_handler, value)
Add a handler to the dispatcher.
470def valuedispatchmethod( 471 call: Callable[[SelfT, ValT], RetT] 472) -> ValueDispatcherMethod[ValT, RetT]: 473 """Like valuedispatch but works with methods instead of functions.""" 474 475 # NOTE: It seems that to wrap a method with a decorator and have self 476 # dispatching do the right thing, we must return a function and not 477 # an executable object. So for this version we store our data here 478 # in the function call dict and simply return a call. 479 480 _base_call = call 481 _handlers: dict[ValT, Callable[[SelfT], RetT]] = {} 482 483 def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None: 484 if value in _handlers: 485 raise RuntimeError(f'Duplicate handlers added for {value}') 486 _handlers[value] = addcall 487 488 def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]: 489 from functools import partial 490 491 return partial(_add_handler, value) 492 493 def _call_wrapper(self: SelfT, value: ValT) -> RetT: 494 handler = _handlers.get(value) 495 if handler is not None: 496 return handler(self) 497 return _base_call(self, value) 498 499 # We still want to use our returned object to register handlers, but we're 500 # actually just returning a function. So manually stuff the call onto it. 501 setattr(_call_wrapper, 'register', _register) 502 503 # To the type checker's eyes we return a ValueDispatchMethod instance; 504 # this lets it know about our register func and type-check its usage. 505 # In reality we just return a raw function call (for reasons listed above). 506 # pylint: disable=undefined-variable, no-else-return 507 if TYPE_CHECKING: 508 return ValueDispatcherMethod[ValT, RetT]() 509 else: 510 return _call_wrapper
Like valuedispatch but works with methods instead of functions.
513def make_hash(obj: Any) -> int: 514 """Makes a hash from a dictionary, list, tuple or set to any level, 515 that contains only other hashable types (including any lists, tuples, 516 sets, and dictionaries). 517 518 Note that this uses Python's hash() function internally so collisions/etc. 519 may be more common than with fancy cryptographic hashes. 520 521 Also be aware that Python's hash() output varies across processes, so 522 this should only be used for values that will remain in a single process. 523 """ 524 import copy 525 526 if isinstance(obj, (set, tuple, list)): 527 return hash(tuple(make_hash(e) for e in obj)) 528 if not isinstance(obj, dict): 529 return hash(obj) 530 531 new_obj = copy.deepcopy(obj) 532 for k, v in new_obj.items(): 533 new_obj[k] = make_hash(v) 534 535 # NOTE: there is sorted works correctly because it compares only 536 # unique first values (i.e. dict keys) 537 return hash(tuple(frozenset(sorted(new_obj.items()))))
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.
540def asserttype(obj: Any, typ: type[T]) -> T: 541 """Return an object typed as a given type. 542 543 Assert is used to check its actual type, so only use this when 544 failures are not expected. Otherwise use checktype. 545 """ 546 assert isinstance(typ, type), 'only actual types accepted' 547 assert isinstance(obj, typ) 548 return obj
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
551def asserttype_o(obj: Any, typ: type[T]) -> T | None: 552 """Return an object typed as a given optional type. 553 554 Assert is used to check its actual type, so only use this when 555 failures are not expected. Otherwise use checktype. 556 """ 557 assert isinstance(typ, type), 'only actual types accepted' 558 assert isinstance(obj, (typ, type(None))) 559 return obj
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
562def checktype(obj: Any, typ: type[T]) -> T: 563 """Return an object typed as a given type. 564 565 Always checks the type at runtime with isinstance and throws a TypeError 566 on failure. Use asserttype for more efficient (but less safe) equivalent. 567 """ 568 assert isinstance(typ, type), 'only actual types accepted' 569 if not isinstance(obj, typ): 570 raise TypeError(f'Expected a {typ}; got a {type(obj)}.') 571 return obj
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
574def checktype_o(obj: Any, typ: type[T]) -> T | None: 575 """Return an object typed as a given optional type. 576 577 Always checks the type at runtime with isinstance and throws a TypeError 578 on failure. Use asserttype for more efficient (but less safe) equivalent. 579 """ 580 assert isinstance(typ, type), 'only actual types accepted' 581 if not isinstance(obj, (typ, type(None))): 582 raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.') 583 return obj
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
586def warntype(obj: Any, typ: type[T]) -> T: 587 """Return an object typed as a given type. 588 589 Always checks the type at runtime and simply logs a warning if it is 590 not what is expected. 591 """ 592 assert isinstance(typ, type), 'only actual types accepted' 593 if not isinstance(obj, typ): 594 import logging 595 596 logging.warning('warntype: expected a %s, got a %s', typ, type(obj)) 597 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
600def warntype_o(obj: Any, typ: type[T]) -> T | None: 601 """Return an object typed as a given type. 602 603 Always checks the type at runtime and simply logs a warning if it is 604 not what is expected. 605 """ 606 assert isinstance(typ, type), 'only actual types accepted' 607 if not isinstance(obj, (typ, type(None))): 608 import logging 609 610 logging.warning( 611 'warntype: expected a %s or None, got a %s', typ, type(obj) 612 ) 613 return obj # type: ignore
Return an object typed as a given type.
Always checks the type at runtime and simply logs a warning if it is not what is expected.
616def assert_non_optional(obj: T | None) -> T: 617 """Return an object with Optional typing removed. 618 619 Assert is used to check its actual type, so only use this when 620 failures are not expected. Use check_non_optional otherwise. 621 """ 622 assert obj is not None 623 return obj
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
626def check_non_optional(obj: T | None) -> T: 627 """Return an object with Optional typing removed. 628 629 Always checks the actual type and throws a TypeError on failure. 630 Use assert_non_optional for a more efficient (but less safe) equivalent. 631 """ 632 if obj is None: 633 raise ValueError('Got None value in check_non_optional.') 634 return obj
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
637def smoothstep(edge0: float, edge1: float, x: float) -> float: 638 """A smooth transition function. 639 640 Returns a value that smoothly moves from 0 to 1 as we go between edges. 641 Values outside of the range return 0 or 1. 642 """ 643 y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0))) 644 return y * y * (3.0 - 2.0 * y)
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
647def linearstep(edge0: float, edge1: float, x: float) -> float: 648 """A linear transition function. 649 650 Returns a value that linearly moves from 0 to 1 as we go between edges. 651 Values outside of the range return 0 or 1. 652 """ 653 return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
672def human_readable_compact_id(num: int) -> str: 673 """Given a positive int, return a compact string representation for it. 674 675 Handy for visualizing unique numeric ids using as few as possible chars. 676 This representation uses only lowercase letters and numbers (minus the 677 following letters for readability): 678 's' is excluded due to similarity to '5'. 679 'l' is excluded due to similarity to '1'. 680 'i' is excluded due to similarity to '1'. 681 'o' is excluded due to similarity to '0'. 682 'z' is excluded due to similarity to '2'. 683 684 Therefore for n chars this can store values of 21^n. 685 686 When reading human input consisting of these IDs, it may be desirable 687 to map the disallowed chars to their corresponding allowed ones 688 ('o' -> '0', etc). 689 690 Sort order for these ids is the same as the original numbers. 691 692 If more compactness is desired at the expense of readability, 693 use compact_id() instead. 694 """ 695 return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
698def compact_id(num: int) -> str: 699 """Given a positive int, return a compact string representation for it. 700 701 Handy for visualizing unique numeric ids using as few as possible chars. 702 This version is more compact than human_readable_compact_id() but less 703 friendly to humans due to using both capital and lowercase letters, 704 both 'O' and '0', etc. 705 706 Therefore for n chars this can store values of 62^n. 707 708 Sort order for these ids is the same as the original numbers. 709 """ 710 return _compact_id( 711 num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' 712 )
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
715def unchanging_hostname() -> str: 716 """Return an unchanging name for the local device. 717 718 Similar to the `hostname` call (or os.uname().nodename in Python) 719 except attempts to give a name that doesn't change depending on 720 network conditions. (A Mac will tend to go from Foo to Foo.local, 721 Foo.lan etc. throughout its various adventures) 722 """ 723 import platform 724 import subprocess 725 726 # On Mac, this should give the computer name assigned in System Prefs. 727 if platform.system() == 'Darwin': 728 return ( 729 subprocess.run( 730 ['scutil', '--get', 'ComputerName'], 731 check=True, 732 capture_output=True, 733 ) 734 .stdout.decode() 735 .strip() 736 .replace(' ', '-') 737 ) 738 return os.uname().nodename
Return an unchanging name for the local device.
Similar to the hostname
call (or os.uname().nodename in Python)
except attempts to give a name that doesn't change depending on
network conditions. (A Mac will tend to go from Foo to Foo.local,
Foo.lan etc. throughout its various adventures)
741def set_canonical_module_names(module_globals: dict[str, Any]) -> None: 742 """Do the thing.""" 743 if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1': 744 return 745 746 modulename = module_globals.get('__name__') 747 if not isinstance(modulename, str): 748 raise RuntimeError('Unable to get module name.') 749 assert not modulename.startswith('_') 750 modulename_prefix = f'{modulename}.' 751 modulename_prefix_2 = f'_{modulename}.' 752 753 for name, obj in module_globals.items(): 754 if name.startswith('_'): 755 continue 756 existing = getattr(obj, '__module__', None) 757 try: 758 # Override the module ONLY if it lives under us somewhere. 759 # So ourpackage._submodule.Foo becomes ourpackage.Foo 760 # but otherpackage._submodule.Foo remains untouched. 761 if existing is not None and ( 762 existing.startswith(modulename_prefix) 763 or existing.startswith(modulename_prefix_2) 764 ): 765 obj.__module__ = modulename 766 except Exception: 767 import logging 768 769 logging.warning( 770 'set_canonical_module_names: unable to change __module__' 771 " from '%s' to '%s' on %s object at '%s'.", 772 existing, 773 modulename, 774 type(obj), 775 name, 776 )
Do the thing.
779def timedelta_str( 780 timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0 781) -> str: 782 """Return a simple human readable time string for a length of time. 783 784 Time can be given as a timedelta or a float representing seconds. 785 Example output: 786 "23d 1h 2m 32s" (with maxparts == 4) 787 "23d 1h" (with maxparts == 2) 788 "23d 1.08h" (with maxparts == 2 and decimals == 2) 789 790 Note that this is hard-coded in English and probably not especially 791 performant. 792 """ 793 # pylint: disable=too-many-locals 794 795 if isinstance(timeval, float): 796 timevalfin = datetime.timedelta(seconds=timeval) 797 else: 798 timevalfin = timeval 799 800 # Internally we only handle positive values. 801 if timevalfin.total_seconds() < 0: 802 return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}' 803 804 years = timevalfin.days // 365 805 days = timevalfin.days % 365 806 hours = timevalfin.seconds // 3600 807 hour_remainder = timevalfin.seconds % 3600 808 minutes = hour_remainder // 60 809 seconds = hour_remainder % 60 810 811 # Now, if we want decimal places for our last value, 812 # calc fractional parts. 813 if decimals: 814 # Calc totals of each type. 815 t_seconds = timevalfin.total_seconds() 816 t_minutes = t_seconds / 60 817 t_hours = t_minutes / 60 818 t_days = t_hours / 24 819 t_years = t_days / 365 820 821 # Calc fractional parts that exclude all whole values to their left. 822 years_covered = years 823 years_f = t_years - years_covered 824 days_covered = years_covered * 365 + days 825 days_f = t_days - days_covered 826 hours_covered = days_covered * 24 + hours 827 hours_f = t_hours - hours_covered 828 minutes_covered = hours_covered * 60 + minutes 829 minutes_f = t_minutes - minutes_covered 830 seconds_covered = minutes_covered * 60 + seconds 831 seconds_f = t_seconds - seconds_covered 832 else: 833 years_f = days_f = hours_f = minutes_f = seconds_f = 0.0 834 835 parts: list[str] = [] 836 for part, part_f, suffix in ( 837 (years, years_f, 'y'), 838 (days, days_f, 'd'), 839 (hours, hours_f, 'h'), 840 (minutes, minutes_f, 'm'), 841 (seconds, seconds_f, 's'), 842 ): 843 if part or parts or (not parts and suffix == 's'): 844 # Do decimal version only for the last part. 845 if decimals and (len(parts) >= maxparts - 1 or suffix == 's'): 846 parts.append(f'{part+part_f:.{decimals}f}{suffix}') 847 else: 848 parts.append(f'{part}{suffix}') 849 if len(parts) >= maxparts: 850 break 851 return ' '.join(parts)
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
854def ago_str( 855 timeval: datetime.datetime, 856 maxparts: int = 1, 857 now: datetime.datetime | None = None, 858 decimals: int = 0, 859) -> str: 860 """Given a datetime, return a clean human readable 'ago' str. 861 862 Note that this is hard-coded in English so should not be used 863 for visible in-game elements; only tools/etc. 864 865 If now is not passed, efro.util.utc_now() is used. 866 """ 867 if now is None: 868 now = utc_now() 869 return ( 870 timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals) 871 + ' ago' 872 )
Given a datetime, return a clean human readable 'ago' str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, utc_now() is used.
875def split_list(input_list: list[T], max_length: int) -> list[list[T]]: 876 """Split a single list into smaller lists.""" 877 return [ 878 input_list[i : i + max_length] 879 for i in range(0, len(input_list), max_length) 880 ]
Split a single list into smaller lists.