efro.util

Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Small handy bits of functionality."""
  4
  5from __future__ import annotations
  6
  7import os
  8import time
  9import weakref
 10import datetime
 11import functools
 12from enum import Enum
 13from typing import TYPE_CHECKING, cast, TypeVar, Generic
 14
 15_pytz_utc: Any
 16
 17# We don't *require* pytz, but we want to support it for tzinfos if available.
 18try:
 19    import pytz
 20
 21    _pytz_utc = pytz.utc
 22except ModuleNotFoundError:
 23    _pytz_utc = None  # pylint: disable=invalid-name
 24
 25if TYPE_CHECKING:
 26    import asyncio
 27    from efro.call import Call as Call  # 'as Call' so we re-export.
 28    from typing import Any, Callable
 29
 30T = TypeVar('T')
 31ValT = TypeVar('ValT')
 32ArgT = TypeVar('ArgT')
 33SelfT = TypeVar('SelfT')
 34RetT = TypeVar('RetT')
 35EnumT = TypeVar('EnumT', bound=Enum)
 36
 37
 38class _EmptyObj:
 39    pass
 40
 41
 42# A dead weak-ref should be immutable, right? So we can create exactly
 43# one and return it for all cases that need an empty weak-ref.
 44_g_empty_weak_ref = weakref.ref(_EmptyObj())
 45assert _g_empty_weak_ref() is None
 46
 47
 48# TODO: kill this and just use efro.call.tpartial
 49if TYPE_CHECKING:
 50    Call = Call
 51else:
 52    Call = functools.partial
 53
 54
 55def snake_case_to_title(val: str) -> str:
 56    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 57    # Kill empty words resulting from leading/trailing/multiple underscores.
 58    return ' '.join(w for w in val.split('_') if w).title()
 59
 60
 61def snake_case_to_camel_case(val: str) -> str:
 62    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 63    # Replace underscores with spaces; capitalize words; kill spaces.
 64    # Not sure about efficiency, but logically simple.
 65    return val.replace('_', ' ').title().replace(' ', '')
 66
 67
 68def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
 69    """Create an enum from a value.
 70
 71    This is basically the same as doing 'obj = EnumType(value)' except
 72    that it works around an issue where a reference loop is created
 73    if an exception is thrown due to an invalid value. Since we disable
 74    the cyclic garbage collector for most of the time, such loops can lead
 75    to our objects sticking around longer than we want.
 76    This issue has been submitted to Python as a bug so hopefully we can
 77    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
 78    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
 79    off once we are 3.9+ across the board.
 80    """
 81
 82    # Note: we don't recreate *ALL* the functionality of the Enum constructor
 83    # such as the _missing_ hook; but this should cover our basic needs.
 84    value2member_map = getattr(cls, '_value2member_map_')
 85    assert value2member_map is not None
 86    try:
 87        out = value2member_map[value]
 88        assert isinstance(out, cls)
 89        return out
 90    except KeyError:
 91        # pylint: disable=consider-using-f-string
 92        raise ValueError(
 93            '%r is not a valid %s' % (value, cls.__name__)
 94        ) from None
 95
 96
 97def check_utc(value: datetime.datetime) -> None:
 98    """Ensure a datetime value is timezone-aware utc."""
 99    if value.tzinfo is not datetime.timezone.utc and (
100        _pytz_utc is None or value.tzinfo is not _pytz_utc
101    ):
102        raise ValueError(
103            'datetime value does not have timezone set as'
104            ' datetime.timezone.utc'
105        )
106
107
108def utc_now() -> datetime.datetime:
109    """Get offset-aware current utc time.
110
111    This should be used for all datetimes getting sent over the network,
112    used with the entity system, etc.
113    (datetime.utcnow() gives a utc time value, but it is not timezone-aware
114    which makes it less safe to use)
115    """
116    return datetime.datetime.now(datetime.timezone.utc)
117
118
119def utc_today() -> datetime.datetime:
120    """Get offset-aware midnight in the utc time zone."""
121    now = datetime.datetime.now(datetime.timezone.utc)
122    return datetime.datetime(
123        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
124    )
125
126
127def utc_this_hour() -> datetime.datetime:
128    """Get offset-aware beginning of the current hour in the utc time zone."""
129    now = datetime.datetime.now(datetime.timezone.utc)
130    return datetime.datetime(
131        year=now.year,
132        month=now.month,
133        day=now.day,
134        hour=now.hour,
135        tzinfo=now.tzinfo,
136    )
137
138
139def utc_this_minute() -> datetime.datetime:
140    """Get offset-aware beginning of current minute in the utc time zone."""
141    now = datetime.datetime.now(datetime.timezone.utc)
142    return datetime.datetime(
143        year=now.year,
144        month=now.month,
145        day=now.day,
146        hour=now.hour,
147        minute=now.minute,
148        tzinfo=now.tzinfo,
149    )
150
151
152def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
153    """Return an invalidated weak-reference for the specified type."""
154    # At runtime, all weakrefs are the same; our type arg is just
155    # for the static type checker.
156    del objtype  # Unused.
157
158    # Just create an object and let it die. Is there a cleaner way to do this?
159    # return weakref.ref(_EmptyObj())  # type: ignore
160
161    return _g_empty_weak_ref  # type: ignore
162
163
164def data_size_str(bytecount: int) -> str:
165    """Given a size in bytes, returns a short human readable string.
166
167    This should be 6 or fewer chars for most all sane file sizes.
168    """
169    # pylint: disable=too-many-return-statements
170    if bytecount <= 999:
171        return f'{bytecount} B'
172    kbytecount = bytecount / 1024
173    if round(kbytecount, 1) < 10.0:
174        return f'{kbytecount:.1f} KB'
175    if round(kbytecount, 0) < 999:
176        return f'{kbytecount:.0f} KB'
177    mbytecount = bytecount / (1024 * 1024)
178    if round(mbytecount, 1) < 10.0:
179        return f'{mbytecount:.1f} MB'
180    if round(mbytecount, 0) < 999:
181        return f'{mbytecount:.0f} MB'
182    gbytecount = bytecount / (1024 * 1024 * 1024)
183    if round(gbytecount, 1) < 10.0:
184        return f'{mbytecount:.1f} GB'
185    return f'{gbytecount:.0f} GB'
186
187
188class DirtyBit:
189    """Manages whether a thing is dirty and regulates attempts to clean it.
190
191    To use, simply set the 'dirty' value on this object to True when some
192    action is needed, and then check the 'should_update' value to regulate
193    when attempts to clean it should be made. Set 'dirty' back to False after
194    a successful update.
195    If 'use_lock' is True, an asyncio Lock will be created and incorporated
196    into update attempts to prevent simultaneous updates (should_update will
197    only return True when the lock is unlocked). Note that It is up to the user
198    to lock/unlock the lock during the actual update attempt.
199    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
200    itself back to dirty after being clean for the given amount of time.
201    'min_update_interval' can be used to enforce a minimum update
202    interval even when updates are successful (retry_interval only applies
203    when updates fail)
204    """
205
206    def __init__(
207        self,
208        dirty: bool = False,
209        retry_interval: float = 5.0,
210        use_lock: bool = False,
211        auto_dirty_seconds: float | None = None,
212        min_update_interval: float | None = None,
213    ):
214        curtime = time.time()
215        self._retry_interval = retry_interval
216        self._auto_dirty_seconds = auto_dirty_seconds
217        self._min_update_interval = min_update_interval
218        self._dirty = dirty
219        self._next_update_time: float | None = curtime if dirty else None
220        self._last_update_time: float | None = None
221        self._next_auto_dirty_time: float | None = (
222            (curtime + self._auto_dirty_seconds)
223            if (not dirty and self._auto_dirty_seconds is not None)
224            else None
225        )
226        self._use_lock = use_lock
227        self.lock: asyncio.Lock
228        if self._use_lock:
229            import asyncio
230
231            self.lock = asyncio.Lock()
232
233    @property
234    def dirty(self) -> bool:
235        """Whether the target is currently dirty.
236
237        This should be set to False once an update is successful.
238        """
239        return self._dirty
240
241    @dirty.setter
242    def dirty(self, value: bool) -> None:
243        # If we're freshly clean, set our next auto-dirty time (if we have
244        # one).
245        if self._dirty and not value and self._auto_dirty_seconds is not None:
246            self._next_auto_dirty_time = time.time() + self._auto_dirty_seconds
247
248        # If we're freshly dirty, schedule an immediate update.
249        if not self._dirty and value:
250            self._next_update_time = time.time()
251
252            # If they want to enforce a minimum update interval,
253            # push out the next update time if it hasn't been long enough.
254            if (
255                self._min_update_interval is not None
256                and self._last_update_time is not None
257            ):
258                self._next_update_time = max(
259                    self._next_update_time,
260                    self._last_update_time + self._min_update_interval,
261                )
262
263        self._dirty = value
264
265    @property
266    def should_update(self) -> bool:
267        """Whether an attempt should be made to clean the target now.
268
269        Always returns False if the target is not dirty.
270        Takes into account the amount of time passed since the target
271        was marked dirty or since should_update last returned True.
272        """
273        curtime = time.time()
274
275        # Auto-dirty ourself if we're into that.
276        if (
277            self._next_auto_dirty_time is not None
278            and curtime > self._next_auto_dirty_time
279        ):
280            self.dirty = True
281            self._next_auto_dirty_time = None
282        if not self._dirty:
283            return False
284        if self._use_lock and self.lock.locked():
285            return False
286        assert self._next_update_time is not None
287        if curtime > self._next_update_time:
288            self._next_update_time = curtime + self._retry_interval
289            self._last_update_time = curtime
290            return True
291        return False
292
293
294class DispatchMethodWrapper(Generic[ArgT, RetT]):
295    """Type-aware standin for the dispatch func returned by dispatchmethod."""
296
297    def __call__(self, arg: ArgT) -> RetT:
298        raise RuntimeError('Should not get here')
299
300    @staticmethod
301    def register(
302        func: Callable[[Any, Any], RetT]
303    ) -> Callable[[Any, Any], RetT]:
304        """Register a new dispatch handler for this dispatch-method."""
305        raise RuntimeError('Should not get here')
306
307    registry: dict[Any, Callable]
308
309
310# noinspection PyProtectedMember,PyTypeHints
311def dispatchmethod(
312    func: Callable[[Any, ArgT], RetT]
313) -> DispatchMethodWrapper[ArgT, RetT]:
314    """A variation of functools.singledispatch for methods.
315
316    Note: as of Python 3.9 there is now functools.singledispatchmethod,
317    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
318    which gives us a reason to keep this one around for now.
319    """
320    from functools import singledispatch, update_wrapper
321
322    origwrapper: Any = singledispatch(func)
323
324    # Pull this out so hopefully origwrapper can die,
325    # otherwise we reference origwrapper in our wrapper.
326    dispatch = origwrapper.dispatch
327
328    # All we do here is recreate the end of functools.singledispatch
329    # where it returns a wrapper except instead of the wrapper using the
330    # first arg to the function ours uses the second (to skip 'self').
331    # This was made against Python 3.7; we should probably check up on
332    # this in later versions in case anything has changed.
333    # (or hopefully they'll add this functionality to their version)
334    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
335    def wrapper(*args: Any, **kw: Any) -> Any:
336        if not args or len(args) < 2:
337            raise TypeError(
338                f'{funcname} requires at least ' '2 positional arguments'
339            )
340
341        return dispatch(args[1].__class__)(*args, **kw)
342
343    funcname = getattr(func, '__name__', 'dispatchmethod method')
344    wrapper.register = origwrapper.register  # type: ignore
345    wrapper.dispatch = dispatch  # type: ignore
346    wrapper.registry = origwrapper.registry  # type: ignore
347    # pylint: disable=protected-access
348    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
349    update_wrapper(wrapper, func)
350    # pylint: enable=protected-access
351    return cast(DispatchMethodWrapper, wrapper)
352
353
354def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
355    """Decorator for functions to allow dispatching based on a value.
356
357    This differs from functools.singledispatch in that it dispatches based
358    on the value of an argument, not based on its type.
359    The 'register' method of a value-dispatch function can be used
360    to assign new functions to handle particular values.
361    Unhandled values wind up in the original dispatch function."""
362    return ValueDispatcher(call)
363
364
365class ValueDispatcher(Generic[ValT, RetT]):
366    """Used by the valuedispatch decorator"""
367
368    def __init__(self, call: Callable[[ValT], RetT]) -> None:
369        self._base_call = call
370        self._handlers: dict[ValT, Callable[[], RetT]] = {}
371
372    def __call__(self, value: ValT) -> RetT:
373        handler = self._handlers.get(value)
374        if handler is not None:
375            return handler()
376        return self._base_call(value)
377
378    def _add_handler(
379        self, value: ValT, call: Callable[[], RetT]
380    ) -> Callable[[], RetT]:
381        if value in self._handlers:
382            raise RuntimeError(f'Duplicate handlers added for {value}')
383        self._handlers[value] = call
384        return call
385
386    def register(
387        self, value: ValT
388    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
389        """Add a handler to the dispatcher."""
390        from functools import partial
391
392        return partial(self._add_handler, value)
393
394
395def valuedispatch1arg(
396    call: Callable[[ValT, ArgT], RetT]
397) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
398    """Like valuedispatch but for functions taking an extra argument."""
399    return ValueDispatcher1Arg(call)
400
401
402class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
403    """Used by the valuedispatch1arg decorator"""
404
405    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
406        self._base_call = call
407        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
408
409    def __call__(self, value: ValT, arg: ArgT) -> RetT:
410        handler = self._handlers.get(value)
411        if handler is not None:
412            return handler(arg)
413        return self._base_call(value, arg)
414
415    def _add_handler(
416        self, value: ValT, call: Callable[[ArgT], RetT]
417    ) -> Callable[[ArgT], RetT]:
418        if value in self._handlers:
419            raise RuntimeError(f'Duplicate handlers added for {value}')
420        self._handlers[value] = call
421        return call
422
423    def register(
424        self, value: ValT
425    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
426        """Add a handler to the dispatcher."""
427        from functools import partial
428
429        return partial(self._add_handler, value)
430
431
432if TYPE_CHECKING:
433
434    class ValueDispatcherMethod(Generic[ValT, RetT]):
435        """Used by the valuedispatchmethod decorator."""
436
437        def __call__(self, value: ValT) -> RetT:
438            ...
439
440        def register(
441            self, value: ValT
442        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
443            """Add a handler to the dispatcher."""
444            ...
445
446
447def valuedispatchmethod(
448    call: Callable[[SelfT, ValT], RetT]
449) -> ValueDispatcherMethod[ValT, RetT]:
450    """Like valuedispatch but works with methods instead of functions."""
451
452    # NOTE: It seems that to wrap a method with a decorator and have self
453    # dispatching do the right thing, we must return a function and not
454    # an executable object. So for this version we store our data here
455    # in the function call dict and simply return a call.
456
457    _base_call = call
458    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
459
460    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
461        if value in _handlers:
462            raise RuntimeError(f'Duplicate handlers added for {value}')
463        _handlers[value] = addcall
464
465    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
466        from functools import partial
467
468        return partial(_add_handler, value)
469
470    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
471        handler = _handlers.get(value)
472        if handler is not None:
473            return handler(self)
474        return _base_call(self, value)
475
476    # We still want to use our returned object to register handlers, but we're
477    # actually just returning a function. So manually stuff the call onto it.
478    setattr(_call_wrapper, 'register', _register)
479
480    # To the type checker's eyes we return a ValueDispatchMethod instance;
481    # this lets it know about our register func and type-check its usage.
482    # In reality we just return a raw function call (for reasons listed above).
483    # pylint: disable=undefined-variable, no-else-return
484    if TYPE_CHECKING:
485        return ValueDispatcherMethod[ValT, RetT]()
486    else:
487        return _call_wrapper
488
489
490def make_hash(obj: Any) -> int:
491    """Makes a hash from a dictionary, list, tuple or set to any level,
492    that contains only other hashable types (including any lists, tuples,
493    sets, and dictionaries).
494
495    Note that this uses Python's hash() function internally so collisions/etc.
496    may be more common than with fancy cryptographic hashes.
497
498    Also be aware that Python's hash() output varies across processes, so
499    this should only be used for values that will remain in a single process.
500    """
501    import copy
502
503    if isinstance(obj, (set, tuple, list)):
504        return hash(tuple(make_hash(e) for e in obj))
505    if not isinstance(obj, dict):
506        return hash(obj)
507
508    new_obj = copy.deepcopy(obj)
509    for k, v in new_obj.items():
510        new_obj[k] = make_hash(v)
511
512    # NOTE: there is sorted works correctly because it compares only
513    # unique first values (i.e. dict keys)
514    return hash(tuple(frozenset(sorted(new_obj.items()))))
515
516
517def asserttype(obj: Any, typ: type[T]) -> T:
518    """Return an object typed as a given type.
519
520    Assert is used to check its actual type, so only use this when
521    failures are not expected. Otherwise use checktype.
522    """
523    assert isinstance(typ, type), 'only actual types accepted'
524    assert isinstance(obj, typ)
525    return obj
526
527
528def asserttype_o(obj: Any, typ: type[T]) -> T | None:
529    """Return an object typed as a given optional type.
530
531    Assert is used to check its actual type, so only use this when
532    failures are not expected. Otherwise use checktype.
533    """
534    assert isinstance(typ, type), 'only actual types accepted'
535    assert isinstance(obj, (typ, type(None)))
536    return obj
537
538
539def checktype(obj: Any, typ: type[T]) -> T:
540    """Return an object typed as a given type.
541
542    Always checks the type at runtime with isinstance and throws a TypeError
543    on failure. Use asserttype for more efficient (but less safe) equivalent.
544    """
545    assert isinstance(typ, type), 'only actual types accepted'
546    if not isinstance(obj, typ):
547        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
548    return obj
549
550
551def checktype_o(obj: Any, typ: type[T]) -> T | None:
552    """Return an object typed as a given optional type.
553
554    Always checks the type at runtime with isinstance and throws a TypeError
555    on failure. Use asserttype for more efficient (but less safe) equivalent.
556    """
557    assert isinstance(typ, type), 'only actual types accepted'
558    if not isinstance(obj, (typ, type(None))):
559        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
560    return obj
561
562
563def warntype(obj: Any, typ: type[T]) -> T:
564    """Return an object typed as a given type.
565
566    Always checks the type at runtime and simply logs a warning if it is
567    not what is expected.
568    """
569    assert isinstance(typ, type), 'only actual types accepted'
570    if not isinstance(obj, typ):
571        import logging
572
573        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
574    return obj  # type: ignore
575
576
577def warntype_o(obj: Any, typ: type[T]) -> T | None:
578    """Return an object typed as a given type.
579
580    Always checks the type at runtime and simply logs a warning if it is
581    not what is expected.
582    """
583    assert isinstance(typ, type), 'only actual types accepted'
584    if not isinstance(obj, (typ, type(None))):
585        import logging
586
587        logging.warning(
588            'warntype: expected a %s or None, got a %s', typ, type(obj)
589        )
590    return obj  # type: ignore
591
592
593def assert_non_optional(obj: T | None) -> T:
594    """Return an object with Optional typing removed.
595
596    Assert is used to check its actual type, so only use this when
597    failures are not expected. Use check_non_optional otherwise.
598    """
599    assert obj is not None
600    return obj
601
602
603def check_non_optional(obj: T | None) -> T:
604    """Return an object with Optional typing removed.
605
606    Always checks the actual type and throws a TypeError on failure.
607    Use assert_non_optional for a more efficient (but less safe) equivalent.
608    """
609    if obj is None:
610        raise TypeError('Got None value in check_non_optional.')
611    return obj
612
613
614def smoothstep(edge0: float, edge1: float, x: float) -> float:
615    """A smooth transition function.
616
617    Returns a value that smoothly moves from 0 to 1 as we go between edges.
618    Values outside of the range return 0 or 1.
619    """
620    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
621    return y * y * (3.0 - 2.0 * y)
622
623
624def linearstep(edge0: float, edge1: float, x: float) -> float:
625    """A linear transition function.
626
627    Returns a value that linearly moves from 0 to 1 as we go between edges.
628    Values outside of the range return 0 or 1.
629    """
630    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
631
632
633def _compact_id(num: int, chars: str) -> str:
634    if num < 0:
635        raise ValueError('Negative integers not allowed.')
636
637    # Chars must be in sorted order for sorting to work correctly
638    # on our output.
639    assert ''.join(sorted(list(chars))) == chars
640
641    base = len(chars)
642    out = ''
643    while num:
644        out += chars[num % base]
645        num //= base
646    return out[::-1] or '0'
647
648
649def human_readable_compact_id(num: int) -> str:
650    """Given a positive int, return a compact string representation for it.
651
652    Handy for visualizing unique numeric ids using as few as possible chars.
653    This representation uses only lowercase letters and numbers (minus the
654    following letters for readability):
655     's' is excluded due to similarity to '5'.
656     'l' is excluded due to similarity to '1'.
657     'i' is excluded due to similarity to '1'.
658     'o' is excluded due to similarity to '0'.
659     'z' is excluded due to similarity to '2'.
660
661    Therefore for n chars this can store values of 21^n.
662
663    When reading human input consisting of these IDs, it may be desirable
664    to map the disallowed chars to their corresponding allowed ones
665    ('o' -> '0', etc).
666
667    Sort order for these ids is the same as the original numbers.
668
669    If more compactness is desired at the expense of readability,
670    use compact_id() instead.
671    """
672    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
673
674
675def compact_id(num: int) -> str:
676    """Given a positive int, return a compact string representation for it.
677
678    Handy for visualizing unique numeric ids using as few as possible chars.
679    This version is more compact than human_readable_compact_id() but less
680    friendly to humans due to using both capital and lowercase letters,
681    both 'O' and '0', etc.
682
683    Therefore for n chars this can store values of 62^n.
684
685    Sort order for these ids is the same as the original numbers.
686    """
687    return _compact_id(
688        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
689    )
690
691
692def unchanging_hostname() -> str:
693    """Return an unchanging name for the local device.
694
695    Similar to the `hostname` call (or os.uname().nodename in Python)
696    except attempts to give a name that doesn't change depending on
697    network conditions. (A Mac will tend to go from Foo to Foo.local,
698    Foo.lan etc. throughout its various adventures)
699    """
700    import platform
701    import subprocess
702
703    # On Mac, this should give the computer name assigned in System Prefs.
704    if platform.system() == 'Darwin':
705        return (
706            subprocess.run(
707                ['scutil', '--get', 'ComputerName'],
708                check=True,
709                capture_output=True,
710            )
711            .stdout.decode()
712            .strip()
713            .replace(' ', '-')
714        )
715    return os.uname().nodename
716
717
718def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
719    """Do the thing."""
720    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
721        return
722
723    modulename = module_globals.get('__name__')
724    if not isinstance(modulename, str):
725        raise RuntimeError('Unable to get module name.')
726    assert not modulename.startswith('_')
727    modulename_prefix = f'{modulename}.'
728    modulename_prefix_2 = f'_{modulename}.'
729
730    for name, obj in module_globals.items():
731        if name.startswith('_'):
732            continue
733        existing = getattr(obj, '__module__', None)
734        try:
735            # Override the module ONLY if it lives under us somewhere.
736            # So ourpackage._submodule.Foo becomes ourpackage.Foo
737            # but otherpackage._submodule.Foo remains untouched.
738            if existing is not None and (
739                existing.startswith(modulename_prefix)
740                or existing.startswith(modulename_prefix_2)
741            ):
742                obj.__module__ = modulename
743        except Exception:
744            import logging
745
746            logging.warning(
747                'set_canonical_module_names: unable to change __module__'
748                " from '%s' to '%s' on %s object at '%s'.",
749                existing,
750                modulename,
751                type(obj),
752                name,
753            )
754
755
756def timedelta_str(
757    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
758) -> str:
759    """Return a simple human readable time string for a length of time.
760
761    Time can be given as a timedelta or a float representing seconds.
762    Example output:
763      "23d 1h 2m 32s" (with maxparts == 4)
764      "23d 1h" (with maxparts == 2)
765      "23d 1.08h" (with maxparts == 2 and decimals == 2)
766
767    Note that this is hard-coded in English and probably not especially
768    performant.
769    """
770    # pylint: disable=too-many-locals
771
772    if isinstance(timeval, float):
773        timevalfin = datetime.timedelta(seconds=timeval)
774    else:
775        timevalfin = timeval
776
777    # Internally we only handle positive values.
778    if timevalfin.total_seconds() < 0:
779        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
780
781    years = timevalfin.days // 365
782    days = timevalfin.days % 365
783    hours = timevalfin.seconds // 3600
784    hour_remainder = timevalfin.seconds % 3600
785    minutes = hour_remainder // 60
786    seconds = hour_remainder % 60
787
788    # Now, if we want decimal places for our last value,
789    # calc fractional parts.
790    if decimals:
791        # Calc totals of each type.
792        t_seconds = timevalfin.total_seconds()
793        t_minutes = t_seconds / 60
794        t_hours = t_minutes / 60
795        t_days = t_hours / 24
796        t_years = t_days / 365
797
798        # Calc fractional parts that exclude all whole values to their left.
799        years_covered = years
800        years_f = t_years - years_covered
801        days_covered = years_covered * 365 + days
802        days_f = t_days - days_covered
803        hours_covered = days_covered * 24 + hours
804        hours_f = t_hours - hours_covered
805        minutes_covered = hours_covered * 60 + minutes
806        minutes_f = t_minutes - minutes_covered
807        seconds_covered = minutes_covered * 60 + seconds
808        seconds_f = t_seconds - seconds_covered
809    else:
810        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
811
812    parts: list[str] = []
813    for part, part_f, suffix in (
814        (years, years_f, 'y'),
815        (days, days_f, 'd'),
816        (hours, hours_f, 'h'),
817        (minutes, minutes_f, 'm'),
818        (seconds, seconds_f, 's'),
819    ):
820        if part or parts or (not parts and suffix == 's'):
821            # Do decimal version only for the last part.
822            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
823                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
824            else:
825                parts.append(f'{part}{suffix}')
826            if len(parts) >= maxparts:
827                break
828    return ' '.join(parts)
829
830
831def ago_str(
832    timeval: datetime.datetime,
833    maxparts: int = 1,
834    now: datetime.datetime | None = None,
835    decimals: int = 0,
836) -> str:
837    """Given a datetime, return a clean human readable 'ago' str.
838
839    Note that this is hard-coded in English so should not be used
840    for visible in-game elements; only tools/etc.
841
842    If now is not passed, efro.util.utc_now() is used.
843    """
844    if now is None:
845        now = utc_now()
846    return (
847        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
848        + ' ago'
849    )
def snake_case_to_title(val: str) -> str:
56def snake_case_to_title(val: str) -> str:
57    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
58    # Kill empty words resulting from leading/trailing/multiple underscores.
59    return ' '.join(w for w in val.split('_') if w).title()

Given a snake-case string 'foo_bar', returns 'Foo Bar'.

def snake_case_to_camel_case(val: str) -> str:
62def snake_case_to_camel_case(val: str) -> str:
63    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
64    # Replace underscores with spaces; capitalize words; kill spaces.
65    # Not sure about efficiency, but logically simple.
66    return val.replace('_', ' ').title().replace(' ', '')

Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.

def enum_by_value(cls: type[~EnumT], value: 'Any') -> ~EnumT:
69def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
70    """Create an enum from a value.
71
72    This is basically the same as doing 'obj = EnumType(value)' except
73    that it works around an issue where a reference loop is created
74    if an exception is thrown due to an invalid value. Since we disable
75    the cyclic garbage collector for most of the time, such loops can lead
76    to our objects sticking around longer than we want.
77    This issue has been submitted to Python as a bug so hopefully we can
78    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
79    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
80    off once we are 3.9+ across the board.
81    """
82
83    # Note: we don't recreate *ALL* the functionality of the Enum constructor
84    # such as the _missing_ hook; but this should cover our basic needs.
85    value2member_map = getattr(cls, '_value2member_map_')
86    assert value2member_map is not None
87    try:
88        out = value2member_map[value]
89        assert isinstance(out, cls)
90        return out
91    except KeyError:
92        # pylint: disable=consider-using-f-string
93        raise ValueError(
94            '%r is not a valid %s' % (value, cls.__name__)
95        ) from None

Create an enum from a value.

This is basically the same as doing 'obj = EnumType(value)' except that it works around an issue where a reference loop is created if an exception is thrown due to an invalid value. Since we disable the cyclic garbage collector for most of the time, such loops can lead to our objects sticking around longer than we want. This issue has been submitted to Python as a bug so hopefully we can remove this eventually if it gets fixed: https://bugs.python.org/issue42248 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this off once we are 3.9+ across the board.

def check_utc(value: datetime.datetime) -> None:
 98def check_utc(value: datetime.datetime) -> None:
 99    """Ensure a datetime value is timezone-aware utc."""
100    if value.tzinfo is not datetime.timezone.utc and (
101        _pytz_utc is None or value.tzinfo is not _pytz_utc
102    ):
103        raise ValueError(
104            'datetime value does not have timezone set as'
105            ' datetime.timezone.utc'
106        )

Ensure a datetime value is timezone-aware utc.

def utc_now() -> datetime.datetime:
109def utc_now() -> datetime.datetime:
110    """Get offset-aware current utc time.
111
112    This should be used for all datetimes getting sent over the network,
113    used with the entity system, etc.
114    (datetime.utcnow() gives a utc time value, but it is not timezone-aware
115    which makes it less safe to use)
116    """
117    return datetime.datetime.now(datetime.timezone.utc)

Get offset-aware current utc time.

This should be used for all datetimes getting sent over the network, used with the entity system, etc. (datetime.utcnow() gives a utc time value, but it is not timezone-aware which makes it less safe to use)

def utc_today() -> datetime.datetime:
120def utc_today() -> datetime.datetime:
121    """Get offset-aware midnight in the utc time zone."""
122    now = datetime.datetime.now(datetime.timezone.utc)
123    return datetime.datetime(
124        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
125    )

Get offset-aware midnight in the utc time zone.

def utc_this_hour() -> datetime.datetime:
128def utc_this_hour() -> datetime.datetime:
129    """Get offset-aware beginning of the current hour in the utc time zone."""
130    now = datetime.datetime.now(datetime.timezone.utc)
131    return datetime.datetime(
132        year=now.year,
133        month=now.month,
134        day=now.day,
135        hour=now.hour,
136        tzinfo=now.tzinfo,
137    )

Get offset-aware beginning of the current hour in the utc time zone.

def utc_this_minute() -> datetime.datetime:
140def utc_this_minute() -> datetime.datetime:
141    """Get offset-aware beginning of current minute in the utc time zone."""
142    now = datetime.datetime.now(datetime.timezone.utc)
143    return datetime.datetime(
144        year=now.year,
145        month=now.month,
146        day=now.day,
147        hour=now.hour,
148        minute=now.minute,
149        tzinfo=now.tzinfo,
150    )

Get offset-aware beginning of current minute in the utc time zone.

def empty_weakref(objtype: type[~T]) -> weakref.ReferenceType[~T]:
153def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
154    """Return an invalidated weak-reference for the specified type."""
155    # At runtime, all weakrefs are the same; our type arg is just
156    # for the static type checker.
157    del objtype  # Unused.
158
159    # Just create an object and let it die. Is there a cleaner way to do this?
160    # return weakref.ref(_EmptyObj())  # type: ignore
161
162    return _g_empty_weak_ref  # type: ignore

Return an invalidated weak-reference for the specified type.

def data_size_str(bytecount: int) -> str:
165def data_size_str(bytecount: int) -> str:
166    """Given a size in bytes, returns a short human readable string.
167
168    This should be 6 or fewer chars for most all sane file sizes.
169    """
170    # pylint: disable=too-many-return-statements
171    if bytecount <= 999:
172        return f'{bytecount} B'
173    kbytecount = bytecount / 1024
174    if round(kbytecount, 1) < 10.0:
175        return f'{kbytecount:.1f} KB'
176    if round(kbytecount, 0) < 999:
177        return f'{kbytecount:.0f} KB'
178    mbytecount = bytecount / (1024 * 1024)
179    if round(mbytecount, 1) < 10.0:
180        return f'{mbytecount:.1f} MB'
181    if round(mbytecount, 0) < 999:
182        return f'{mbytecount:.0f} MB'
183    gbytecount = bytecount / (1024 * 1024 * 1024)
184    if round(gbytecount, 1) < 10.0:
185        return f'{mbytecount:.1f} GB'
186    return f'{gbytecount:.0f} GB'

Given a size in bytes, returns a short human readable string.

This should be 6 or fewer chars for most all sane file sizes.

class DirtyBit:
189class DirtyBit:
190    """Manages whether a thing is dirty and regulates attempts to clean it.
191
192    To use, simply set the 'dirty' value on this object to True when some
193    action is needed, and then check the 'should_update' value to regulate
194    when attempts to clean it should be made. Set 'dirty' back to False after
195    a successful update.
196    If 'use_lock' is True, an asyncio Lock will be created and incorporated
197    into update attempts to prevent simultaneous updates (should_update will
198    only return True when the lock is unlocked). Note that It is up to the user
199    to lock/unlock the lock during the actual update attempt.
200    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
201    itself back to dirty after being clean for the given amount of time.
202    'min_update_interval' can be used to enforce a minimum update
203    interval even when updates are successful (retry_interval only applies
204    when updates fail)
205    """
206
207    def __init__(
208        self,
209        dirty: bool = False,
210        retry_interval: float = 5.0,
211        use_lock: bool = False,
212        auto_dirty_seconds: float | None = None,
213        min_update_interval: float | None = None,
214    ):
215        curtime = time.time()
216        self._retry_interval = retry_interval
217        self._auto_dirty_seconds = auto_dirty_seconds
218        self._min_update_interval = min_update_interval
219        self._dirty = dirty
220        self._next_update_time: float | None = curtime if dirty else None
221        self._last_update_time: float | None = None
222        self._next_auto_dirty_time: float | None = (
223            (curtime + self._auto_dirty_seconds)
224            if (not dirty and self._auto_dirty_seconds is not None)
225            else None
226        )
227        self._use_lock = use_lock
228        self.lock: asyncio.Lock
229        if self._use_lock:
230            import asyncio
231
232            self.lock = asyncio.Lock()
233
234    @property
235    def dirty(self) -> bool:
236        """Whether the target is currently dirty.
237
238        This should be set to False once an update is successful.
239        """
240        return self._dirty
241
242    @dirty.setter
243    def dirty(self, value: bool) -> None:
244        # If we're freshly clean, set our next auto-dirty time (if we have
245        # one).
246        if self._dirty and not value and self._auto_dirty_seconds is not None:
247            self._next_auto_dirty_time = time.time() + self._auto_dirty_seconds
248
249        # If we're freshly dirty, schedule an immediate update.
250        if not self._dirty and value:
251            self._next_update_time = time.time()
252
253            # If they want to enforce a minimum update interval,
254            # push out the next update time if it hasn't been long enough.
255            if (
256                self._min_update_interval is not None
257                and self._last_update_time is not None
258            ):
259                self._next_update_time = max(
260                    self._next_update_time,
261                    self._last_update_time + self._min_update_interval,
262                )
263
264        self._dirty = value
265
266    @property
267    def should_update(self) -> bool:
268        """Whether an attempt should be made to clean the target now.
269
270        Always returns False if the target is not dirty.
271        Takes into account the amount of time passed since the target
272        was marked dirty or since should_update last returned True.
273        """
274        curtime = time.time()
275
276        # Auto-dirty ourself if we're into that.
277        if (
278            self._next_auto_dirty_time is not None
279            and curtime > self._next_auto_dirty_time
280        ):
281            self.dirty = True
282            self._next_auto_dirty_time = None
283        if not self._dirty:
284            return False
285        if self._use_lock and self.lock.locked():
286            return False
287        assert self._next_update_time is not None
288        if curtime > self._next_update_time:
289            self._next_update_time = curtime + self._retry_interval
290            self._last_update_time = curtime
291            return True
292        return False

Manages whether a thing is dirty and regulates attempts to clean it.

To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

DirtyBit( dirty: bool = False, retry_interval: float = 5.0, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)
207    def __init__(
208        self,
209        dirty: bool = False,
210        retry_interval: float = 5.0,
211        use_lock: bool = False,
212        auto_dirty_seconds: float | None = None,
213        min_update_interval: float | None = None,
214    ):
215        curtime = time.time()
216        self._retry_interval = retry_interval
217        self._auto_dirty_seconds = auto_dirty_seconds
218        self._min_update_interval = min_update_interval
219        self._dirty = dirty
220        self._next_update_time: float | None = curtime if dirty else None
221        self._last_update_time: float | None = None
222        self._next_auto_dirty_time: float | None = (
223            (curtime + self._auto_dirty_seconds)
224            if (not dirty and self._auto_dirty_seconds is not None)
225            else None
226        )
227        self._use_lock = use_lock
228        self.lock: asyncio.Lock
229        if self._use_lock:
230            import asyncio
231
232            self.lock = asyncio.Lock()
lock: asyncio.locks.Lock
dirty: bool

Whether the target is currently dirty.

This should be set to False once an update is successful.

should_update: bool

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class DispatchMethodWrapper(typing.Generic[~ArgT, ~RetT]):
295class DispatchMethodWrapper(Generic[ArgT, RetT]):
296    """Type-aware standin for the dispatch func returned by dispatchmethod."""
297
298    def __call__(self, arg: ArgT) -> RetT:
299        raise RuntimeError('Should not get here')
300
301    @staticmethod
302    def register(
303        func: Callable[[Any, Any], RetT]
304    ) -> Callable[[Any, Any], RetT]:
305        """Register a new dispatch handler for this dispatch-method."""
306        raise RuntimeError('Should not get here')
307
308    registry: dict[Any, Callable]

Type-aware standin for the dispatch func returned by dispatchmethod.

@staticmethod
def register(func: 'Callable[[Any, Any], RetT]') -> 'Callable[[Any, Any], RetT]':
301    @staticmethod
302    def register(
303        func: Callable[[Any, Any], RetT]
304    ) -> Callable[[Any, Any], RetT]:
305        """Register a new dispatch handler for this dispatch-method."""
306        raise RuntimeError('Should not get here')

Register a new dispatch handler for this dispatch-method.

registry: 'dict[Any, Callable]'
def dispatchmethod( func: 'Callable[[Any, ArgT], RetT]') -> DispatchMethodWrapper[~ArgT, ~RetT]:
312def dispatchmethod(
313    func: Callable[[Any, ArgT], RetT]
314) -> DispatchMethodWrapper[ArgT, RetT]:
315    """A variation of functools.singledispatch for methods.
316
317    Note: as of Python 3.9 there is now functools.singledispatchmethod,
318    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
319    which gives us a reason to keep this one around for now.
320    """
321    from functools import singledispatch, update_wrapper
322
323    origwrapper: Any = singledispatch(func)
324
325    # Pull this out so hopefully origwrapper can die,
326    # otherwise we reference origwrapper in our wrapper.
327    dispatch = origwrapper.dispatch
328
329    # All we do here is recreate the end of functools.singledispatch
330    # where it returns a wrapper except instead of the wrapper using the
331    # first arg to the function ours uses the second (to skip 'self').
332    # This was made against Python 3.7; we should probably check up on
333    # this in later versions in case anything has changed.
334    # (or hopefully they'll add this functionality to their version)
335    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
336    def wrapper(*args: Any, **kw: Any) -> Any:
337        if not args or len(args) < 2:
338            raise TypeError(
339                f'{funcname} requires at least ' '2 positional arguments'
340            )
341
342        return dispatch(args[1].__class__)(*args, **kw)
343
344    funcname = getattr(func, '__name__', 'dispatchmethod method')
345    wrapper.register = origwrapper.register  # type: ignore
346    wrapper.dispatch = dispatch  # type: ignore
347    wrapper.registry = origwrapper.registry  # type: ignore
348    # pylint: disable=protected-access
349    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
350    update_wrapper(wrapper, func)
351    # pylint: enable=protected-access
352    return cast(DispatchMethodWrapper, wrapper)

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

def valuedispatch( call: 'Callable[[ValT], RetT]') -> ValueDispatcher[~ValT, ~RetT]:
355def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
356    """Decorator for functions to allow dispatching based on a value.
357
358    This differs from functools.singledispatch in that it dispatches based
359    on the value of an argument, not based on its type.
360    The 'register' method of a value-dispatch function can be used
361    to assign new functions to handle particular values.
362    Unhandled values wind up in the original dispatch function."""
363    return ValueDispatcher(call)

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

class ValueDispatcher(typing.Generic[~ValT, ~RetT]):
366class ValueDispatcher(Generic[ValT, RetT]):
367    """Used by the valuedispatch decorator"""
368
369    def __init__(self, call: Callable[[ValT], RetT]) -> None:
370        self._base_call = call
371        self._handlers: dict[ValT, Callable[[], RetT]] = {}
372
373    def __call__(self, value: ValT) -> RetT:
374        handler = self._handlers.get(value)
375        if handler is not None:
376            return handler()
377        return self._base_call(value)
378
379    def _add_handler(
380        self, value: ValT, call: Callable[[], RetT]
381    ) -> Callable[[], RetT]:
382        if value in self._handlers:
383            raise RuntimeError(f'Duplicate handlers added for {value}')
384        self._handlers[value] = call
385        return call
386
387    def register(
388        self, value: ValT
389    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
390        """Add a handler to the dispatcher."""
391        from functools import partial
392
393        return partial(self._add_handler, value)

Used by the valuedispatch decorator

ValueDispatcher(call: 'Callable[[ValT], RetT]')
369    def __init__(self, call: Callable[[ValT], RetT]) -> None:
370        self._base_call = call
371        self._handlers: dict[ValT, Callable[[], RetT]] = {}
def register( self, value: ~ValT) -> 'Callable[[Callable[[], RetT]], Callable[[], RetT]]':
387    def register(
388        self, value: ValT
389    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
390        """Add a handler to the dispatcher."""
391        from functools import partial
392
393        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatch1arg( call: 'Callable[[ValT, ArgT], RetT]') -> ValueDispatcher1Arg[~ValT, ~ArgT, ~RetT]:
396def valuedispatch1arg(
397    call: Callable[[ValT, ArgT], RetT]
398) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
399    """Like valuedispatch but for functions taking an extra argument."""
400    return ValueDispatcher1Arg(call)

Like valuedispatch but for functions taking an extra argument.

class ValueDispatcher1Arg(typing.Generic[~ValT, ~ArgT, ~RetT]):
403class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
404    """Used by the valuedispatch1arg decorator"""
405
406    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
407        self._base_call = call
408        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
409
410    def __call__(self, value: ValT, arg: ArgT) -> RetT:
411        handler = self._handlers.get(value)
412        if handler is not None:
413            return handler(arg)
414        return self._base_call(value, arg)
415
416    def _add_handler(
417        self, value: ValT, call: Callable[[ArgT], RetT]
418    ) -> Callable[[ArgT], RetT]:
419        if value in self._handlers:
420            raise RuntimeError(f'Duplicate handlers added for {value}')
421        self._handlers[value] = call
422        return call
423
424    def register(
425        self, value: ValT
426    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
427        """Add a handler to the dispatcher."""
428        from functools import partial
429
430        return partial(self._add_handler, value)

Used by the valuedispatch1arg decorator

ValueDispatcher1Arg(call: 'Callable[[ValT, ArgT], RetT]')
406    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
407        self._base_call = call
408        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
def register( self, value: ~ValT) -> 'Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]':
424    def register(
425        self, value: ValT
426    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
427        """Add a handler to the dispatcher."""
428        from functools import partial
429
430        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatchmethod( call: 'Callable[[SelfT, ValT], RetT]') -> 'ValueDispatcherMethod[ValT, RetT]':
448def valuedispatchmethod(
449    call: Callable[[SelfT, ValT], RetT]
450) -> ValueDispatcherMethod[ValT, RetT]:
451    """Like valuedispatch but works with methods instead of functions."""
452
453    # NOTE: It seems that to wrap a method with a decorator and have self
454    # dispatching do the right thing, we must return a function and not
455    # an executable object. So for this version we store our data here
456    # in the function call dict and simply return a call.
457
458    _base_call = call
459    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
460
461    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
462        if value in _handlers:
463            raise RuntimeError(f'Duplicate handlers added for {value}')
464        _handlers[value] = addcall
465
466    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
467        from functools import partial
468
469        return partial(_add_handler, value)
470
471    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
472        handler = _handlers.get(value)
473        if handler is not None:
474            return handler(self)
475        return _base_call(self, value)
476
477    # We still want to use our returned object to register handlers, but we're
478    # actually just returning a function. So manually stuff the call onto it.
479    setattr(_call_wrapper, 'register', _register)
480
481    # To the type checker's eyes we return a ValueDispatchMethod instance;
482    # this lets it know about our register func and type-check its usage.
483    # In reality we just return a raw function call (for reasons listed above).
484    # pylint: disable=undefined-variable, no-else-return
485    if TYPE_CHECKING:
486        return ValueDispatcherMethod[ValT, RetT]()
487    else:
488        return _call_wrapper

Like valuedispatch but works with methods instead of functions.

def make_hash(obj: 'Any') -> int:
491def make_hash(obj: Any) -> int:
492    """Makes a hash from a dictionary, list, tuple or set to any level,
493    that contains only other hashable types (including any lists, tuples,
494    sets, and dictionaries).
495
496    Note that this uses Python's hash() function internally so collisions/etc.
497    may be more common than with fancy cryptographic hashes.
498
499    Also be aware that Python's hash() output varies across processes, so
500    this should only be used for values that will remain in a single process.
501    """
502    import copy
503
504    if isinstance(obj, (set, tuple, list)):
505        return hash(tuple(make_hash(e) for e in obj))
506    if not isinstance(obj, dict):
507        return hash(obj)
508
509    new_obj = copy.deepcopy(obj)
510    for k, v in new_obj.items():
511        new_obj[k] = make_hash(v)
512
513    # NOTE: there is sorted works correctly because it compares only
514    # unique first values (i.e. dict keys)
515    return hash(tuple(frozenset(sorted(new_obj.items()))))

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.

def asserttype(obj: 'Any', typ: type[~T]) -> ~T:
518def asserttype(obj: Any, typ: type[T]) -> T:
519    """Return an object typed as a given type.
520
521    Assert is used to check its actual type, so only use this when
522    failures are not expected. Otherwise use checktype.
523    """
524    assert isinstance(typ, type), 'only actual types accepted'
525    assert isinstance(obj, typ)
526    return obj

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def asserttype_o(obj: 'Any', typ: type[~T]) -> Optional[~T]:
529def asserttype_o(obj: Any, typ: type[T]) -> T | None:
530    """Return an object typed as a given optional type.
531
532    Assert is used to check its actual type, so only use this when
533    failures are not expected. Otherwise use checktype.
534    """
535    assert isinstance(typ, type), 'only actual types accepted'
536    assert isinstance(obj, (typ, type(None)))
537    return obj

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def checktype(obj: 'Any', typ: type[~T]) -> ~T:
540def checktype(obj: Any, typ: type[T]) -> T:
541    """Return an object typed as a given type.
542
543    Always checks the type at runtime with isinstance and throws a TypeError
544    on failure. Use asserttype for more efficient (but less safe) equivalent.
545    """
546    assert isinstance(typ, type), 'only actual types accepted'
547    if not isinstance(obj, typ):
548        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
549    return obj

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def checktype_o(obj: 'Any', typ: type[~T]) -> Optional[~T]:
552def checktype_o(obj: Any, typ: type[T]) -> T | None:
553    """Return an object typed as a given optional type.
554
555    Always checks the type at runtime with isinstance and throws a TypeError
556    on failure. Use asserttype for more efficient (but less safe) equivalent.
557    """
558    assert isinstance(typ, type), 'only actual types accepted'
559    if not isinstance(obj, (typ, type(None))):
560        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
561    return obj

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def warntype(obj: 'Any', typ: type[~T]) -> ~T:
564def warntype(obj: Any, typ: type[T]) -> T:
565    """Return an object typed as a given type.
566
567    Always checks the type at runtime and simply logs a warning if it is
568    not what is expected.
569    """
570    assert isinstance(typ, type), 'only actual types accepted'
571    if not isinstance(obj, typ):
572        import logging
573
574        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
575    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def warntype_o(obj: 'Any', typ: type[~T]) -> Optional[~T]:
578def warntype_o(obj: Any, typ: type[T]) -> T | None:
579    """Return an object typed as a given type.
580
581    Always checks the type at runtime and simply logs a warning if it is
582    not what is expected.
583    """
584    assert isinstance(typ, type), 'only actual types accepted'
585    if not isinstance(obj, (typ, type(None))):
586        import logging
587
588        logging.warning(
589            'warntype: expected a %s or None, got a %s', typ, type(obj)
590        )
591    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def assert_non_optional(obj: Optional[~T]) -> ~T:
594def assert_non_optional(obj: T | None) -> T:
595    """Return an object with Optional typing removed.
596
597    Assert is used to check its actual type, so only use this when
598    failures are not expected. Use check_non_optional otherwise.
599    """
600    assert obj is not None
601    return obj

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

def check_non_optional(obj: Optional[~T]) -> ~T:
604def check_non_optional(obj: T | None) -> T:
605    """Return an object with Optional typing removed.
606
607    Always checks the actual type and throws a TypeError on failure.
608    Use assert_non_optional for a more efficient (but less safe) equivalent.
609    """
610    if obj is None:
611        raise TypeError('Got None value in check_non_optional.')
612    return obj

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

def smoothstep(edge0: float, edge1: float, x: float) -> float:
615def smoothstep(edge0: float, edge1: float, x: float) -> float:
616    """A smooth transition function.
617
618    Returns a value that smoothly moves from 0 to 1 as we go between edges.
619    Values outside of the range return 0 or 1.
620    """
621    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
622    return y * y * (3.0 - 2.0 * y)

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def linearstep(edge0: float, edge1: float, x: float) -> float:
625def linearstep(edge0: float, edge1: float, x: float) -> float:
626    """A linear transition function.
627
628    Returns a value that linearly moves from 0 to 1 as we go between edges.
629    Values outside of the range return 0 or 1.
630    """
631    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def human_readable_compact_id(num: int) -> str:
650def human_readable_compact_id(num: int) -> str:
651    """Given a positive int, return a compact string representation for it.
652
653    Handy for visualizing unique numeric ids using as few as possible chars.
654    This representation uses only lowercase letters and numbers (minus the
655    following letters for readability):
656     's' is excluded due to similarity to '5'.
657     'l' is excluded due to similarity to '1'.
658     'i' is excluded due to similarity to '1'.
659     'o' is excluded due to similarity to '0'.
660     'z' is excluded due to similarity to '2'.
661
662    Therefore for n chars this can store values of 21^n.
663
664    When reading human input consisting of these IDs, it may be desirable
665    to map the disallowed chars to their corresponding allowed ones
666    ('o' -> '0', etc).
667
668    Sort order for these ids is the same as the original numbers.
669
670    If more compactness is desired at the expense of readability,
671    use compact_id() instead.
672    """
673    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

def compact_id(num: int) -> str:
676def compact_id(num: int) -> str:
677    """Given a positive int, return a compact string representation for it.
678
679    Handy for visualizing unique numeric ids using as few as possible chars.
680    This version is more compact than human_readable_compact_id() but less
681    friendly to humans due to using both capital and lowercase letters,
682    both 'O' and '0', etc.
683
684    Therefore for n chars this can store values of 62^n.
685
686    Sort order for these ids is the same as the original numbers.
687    """
688    return _compact_id(
689        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
690    )

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

def unchanging_hostname() -> str:
693def unchanging_hostname() -> str:
694    """Return an unchanging name for the local device.
695
696    Similar to the `hostname` call (or os.uname().nodename in Python)
697    except attempts to give a name that doesn't change depending on
698    network conditions. (A Mac will tend to go from Foo to Foo.local,
699    Foo.lan etc. throughout its various adventures)
700    """
701    import platform
702    import subprocess
703
704    # On Mac, this should give the computer name assigned in System Prefs.
705    if platform.system() == 'Darwin':
706        return (
707            subprocess.run(
708                ['scutil', '--get', 'ComputerName'],
709                check=True,
710                capture_output=True,
711            )
712            .stdout.decode()
713            .strip()
714            .replace(' ', '-')
715        )
716    return os.uname().nodename

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn't change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

def set_canonical_module_names(module_globals: 'dict[str, Any]') -> None:
719def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
720    """Do the thing."""
721    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
722        return
723
724    modulename = module_globals.get('__name__')
725    if not isinstance(modulename, str):
726        raise RuntimeError('Unable to get module name.')
727    assert not modulename.startswith('_')
728    modulename_prefix = f'{modulename}.'
729    modulename_prefix_2 = f'_{modulename}.'
730
731    for name, obj in module_globals.items():
732        if name.startswith('_'):
733            continue
734        existing = getattr(obj, '__module__', None)
735        try:
736            # Override the module ONLY if it lives under us somewhere.
737            # So ourpackage._submodule.Foo becomes ourpackage.Foo
738            # but otherpackage._submodule.Foo remains untouched.
739            if existing is not None and (
740                existing.startswith(modulename_prefix)
741                or existing.startswith(modulename_prefix_2)
742            ):
743                obj.__module__ = modulename
744        except Exception:
745            import logging
746
747            logging.warning(
748                'set_canonical_module_names: unable to change __module__'
749                " from '%s' to '%s' on %s object at '%s'.",
750                existing,
751                modulename,
752                type(obj),
753                name,
754            )

Do the thing.

def timedelta_str( timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0) -> str:
757def timedelta_str(
758    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
759) -> str:
760    """Return a simple human readable time string for a length of time.
761
762    Time can be given as a timedelta or a float representing seconds.
763    Example output:
764      "23d 1h 2m 32s" (with maxparts == 4)
765      "23d 1h" (with maxparts == 2)
766      "23d 1.08h" (with maxparts == 2 and decimals == 2)
767
768    Note that this is hard-coded in English and probably not especially
769    performant.
770    """
771    # pylint: disable=too-many-locals
772
773    if isinstance(timeval, float):
774        timevalfin = datetime.timedelta(seconds=timeval)
775    else:
776        timevalfin = timeval
777
778    # Internally we only handle positive values.
779    if timevalfin.total_seconds() < 0:
780        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
781
782    years = timevalfin.days // 365
783    days = timevalfin.days % 365
784    hours = timevalfin.seconds // 3600
785    hour_remainder = timevalfin.seconds % 3600
786    minutes = hour_remainder // 60
787    seconds = hour_remainder % 60
788
789    # Now, if we want decimal places for our last value,
790    # calc fractional parts.
791    if decimals:
792        # Calc totals of each type.
793        t_seconds = timevalfin.total_seconds()
794        t_minutes = t_seconds / 60
795        t_hours = t_minutes / 60
796        t_days = t_hours / 24
797        t_years = t_days / 365
798
799        # Calc fractional parts that exclude all whole values to their left.
800        years_covered = years
801        years_f = t_years - years_covered
802        days_covered = years_covered * 365 + days
803        days_f = t_days - days_covered
804        hours_covered = days_covered * 24 + hours
805        hours_f = t_hours - hours_covered
806        minutes_covered = hours_covered * 60 + minutes
807        minutes_f = t_minutes - minutes_covered
808        seconds_covered = minutes_covered * 60 + seconds
809        seconds_f = t_seconds - seconds_covered
810    else:
811        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
812
813    parts: list[str] = []
814    for part, part_f, suffix in (
815        (years, years_f, 'y'),
816        (days, days_f, 'd'),
817        (hours, hours_f, 'h'),
818        (minutes, minutes_f, 'm'),
819        (seconds, seconds_f, 's'),
820    ):
821        if part or parts or (not parts and suffix == 's'):
822            # Do decimal version only for the last part.
823            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
824                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
825            else:
826                parts.append(f'{part}{suffix}')
827            if len(parts) >= maxparts:
828                break
829    return ' '.join(parts)

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

def ago_str( timeval: datetime.datetime, maxparts: int = 1, now: datetime.datetime | None = None, decimals: int = 0) -> str:
832def ago_str(
833    timeval: datetime.datetime,
834    maxparts: int = 1,
835    now: datetime.datetime | None = None,
836    decimals: int = 0,
837) -> str:
838    """Given a datetime, return a clean human readable 'ago' str.
839
840    Note that this is hard-coded in English so should not be used
841    for visible in-game elements; only tools/etc.
842
843    If now is not passed, efro.util.utc_now() is used.
844    """
845    if now is None:
846        now = utc_now()
847    return (
848        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
849        + ' ago'
850    )

Given a datetime, return a clean human readable 'ago' str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, utc_now() is used.