efro.util

Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Small handy bits of functionality."""
  4
  5from __future__ import annotations
  6
  7import os
  8import time
  9import weakref
 10import datetime
 11import functools
 12from enum import Enum
 13from typing import TYPE_CHECKING, cast, TypeVar, Generic
 14
 15if TYPE_CHECKING:
 16    import asyncio
 17    from efro.call import Call as Call  # 'as Call' so we re-export.
 18    from typing import Any, Callable
 19
 20T = TypeVar('T')
 21ValT = TypeVar('ValT')
 22ArgT = TypeVar('ArgT')
 23SelfT = TypeVar('SelfT')
 24RetT = TypeVar('RetT')
 25EnumT = TypeVar('EnumT', bound=Enum)
 26
 27
 28class _EmptyObj:
 29    pass
 30
 31
 32# A dead weak-ref should be immutable, right? So we can create exactly
 33# one and return it for all cases that need an empty weak-ref.
 34_g_empty_weak_ref = weakref.ref(_EmptyObj())
 35assert _g_empty_weak_ref() is None
 36
 37
 38# TODO: kill this and just use efro.call.tpartial
 39if TYPE_CHECKING:
 40    Call = Call
 41else:
 42    Call = functools.partial
 43
 44
 45def explicit_bool(val: bool) -> bool:
 46    """Return a non-inferable boolean value.
 47
 48    Useful to be able to disable blocks of code without type checkers
 49    complaining/etc.
 50    """
 51    # pylint: disable=no-else-return
 52    if TYPE_CHECKING:
 53        # infer this! <boom>
 54        import random
 55
 56        return random.random() < 0.5
 57    else:
 58        return val
 59
 60
 61def snake_case_to_title(val: str) -> str:
 62    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 63    # Kill empty words resulting from leading/trailing/multiple underscores.
 64    return ' '.join(w for w in val.split('_') if w).title()
 65
 66
 67def snake_case_to_camel_case(val: str) -> str:
 68    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 69    # Replace underscores with spaces; capitalize words; kill spaces.
 70    # Not sure about efficiency, but logically simple.
 71    return val.replace('_', ' ').title().replace(' ', '')
 72
 73
 74def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
 75    """Create an enum from a value.
 76
 77    This is basically the same as doing 'obj = EnumType(value)' except
 78    that it works around an issue where a reference loop is created
 79    if an exception is thrown due to an invalid value. Since we disable
 80    the cyclic garbage collector for most of the time, such loops can lead
 81    to our objects sticking around longer than we want.
 82    This issue has been submitted to Python as a bug so hopefully we can
 83    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
 84    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
 85    off once we are 3.9+ across the board.
 86    """
 87
 88    # Note: we don't recreate *ALL* the functionality of the Enum constructor
 89    # such as the _missing_ hook; but this should cover our basic needs.
 90    value2member_map = getattr(cls, '_value2member_map_')
 91    assert value2member_map is not None
 92    try:
 93        out = value2member_map[value]
 94        assert isinstance(out, cls)
 95        return out
 96    except KeyError:
 97        # pylint: disable=consider-using-f-string
 98        raise ValueError(
 99            '%r is not a valid %s' % (value, cls.__name__)
100        ) from None
101
102
103def check_utc(value: datetime.datetime) -> None:
104    """Ensure a datetime value is timezone-aware utc."""
105    if value.tzinfo is not datetime.UTC:
106        raise ValueError(
107            'datetime value does not have timezone set as datetime.UTC'
108        )
109
110
111def utc_now() -> datetime.datetime:
112    """Get timezone-aware current utc time.
113
114    Just a shortcut for datetime.datetime.now(datetime.UTC).
115    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
116    times.
117    """
118    return datetime.datetime.now(datetime.UTC)
119
120
121def utc_now_naive() -> datetime.datetime:
122    """Get naive utc time.
123
124    This can be used to replace datetime.utcnow(), which is now deprecated.
125    Most all code should migrate to use timezone-aware times instead of
126    this.
127    """
128    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
129
130
131def utc_today() -> datetime.datetime:
132    """Get offset-aware midnight in the utc time zone."""
133    now = datetime.datetime.now(datetime.UTC)
134    return datetime.datetime(
135        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
136    )
137
138
139def utc_this_hour() -> datetime.datetime:
140    """Get offset-aware beginning of the current hour in the utc time zone."""
141    now = datetime.datetime.now(datetime.UTC)
142    return datetime.datetime(
143        year=now.year,
144        month=now.month,
145        day=now.day,
146        hour=now.hour,
147        tzinfo=now.tzinfo,
148    )
149
150
151def utc_this_minute() -> datetime.datetime:
152    """Get offset-aware beginning of current minute in the utc time zone."""
153    now = datetime.datetime.now(datetime.UTC)
154    return datetime.datetime(
155        year=now.year,
156        month=now.month,
157        day=now.day,
158        hour=now.hour,
159        minute=now.minute,
160        tzinfo=now.tzinfo,
161    )
162
163
164def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
165    """Return an invalidated weak-reference for the specified type."""
166    # At runtime, all weakrefs are the same; our type arg is just
167    # for the static type checker.
168    del objtype  # Unused.
169
170    # Just create an object and let it die. Is there a cleaner way to do this?
171    # return weakref.ref(_EmptyObj())  # type: ignore
172
173    # Sharing a single ones seems at least a bit better.
174    return _g_empty_weak_ref  # type: ignore
175
176
177def data_size_str(bytecount: int, compact: bool = False) -> str:
178    """Given a size in bytes, returns a short human readable string.
179
180    In compact mode this should be 6 or fewer chars for most all
181    sane file sizes.
182    """
183    # pylint: disable=too-many-return-statements
184
185    # Special case: handle negatives.
186    if bytecount < 0:
187        val = data_size_str(-bytecount, compact=compact)
188        return f'-{val}'
189
190    if bytecount <= 999:
191        suffix = 'B' if compact else 'bytes'
192        return f'{bytecount} {suffix}'
193    kbytecount = bytecount / 1024
194    if round(kbytecount, 1) < 10.0:
195        return f'{kbytecount:.1f} KB'
196    if round(kbytecount, 0) < 999:
197        return f'{kbytecount:.0f} KB'
198    mbytecount = bytecount / (1024 * 1024)
199    if round(mbytecount, 1) < 10.0:
200        return f'{mbytecount:.1f} MB'
201    if round(mbytecount, 0) < 999:
202        return f'{mbytecount:.0f} MB'
203    gbytecount = bytecount / (1024 * 1024 * 1024)
204    if round(gbytecount, 1) < 10.0:
205        return f'{gbytecount:.1f} GB'
206    return f'{gbytecount:.0f} GB'
207
208
209class DirtyBit:
210    """Manages whether a thing is dirty and regulates attempts to clean it.
211
212    To use, simply set the 'dirty' value on this object to True when some
213    action is needed, and then check the 'should_update' value to regulate
214    when attempts to clean it should be made. Set 'dirty' back to False after
215    a successful update.
216    If 'use_lock' is True, an asyncio Lock will be created and incorporated
217    into update attempts to prevent simultaneous updates (should_update will
218    only return True when the lock is unlocked). Note that It is up to the user
219    to lock/unlock the lock during the actual update attempt.
220    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
221    itself back to dirty after being clean for the given amount of time.
222    'min_update_interval' can be used to enforce a minimum update
223    interval even when updates are successful (retry_interval only applies
224    when updates fail)
225    """
226
227    def __init__(
228        self,
229        dirty: bool = False,
230        retry_interval: float = 5.0,
231        use_lock: bool = False,
232        auto_dirty_seconds: float | None = None,
233        min_update_interval: float | None = None,
234    ):
235        curtime = time.monotonic()
236        self._retry_interval = retry_interval
237        self._auto_dirty_seconds = auto_dirty_seconds
238        self._min_update_interval = min_update_interval
239        self._dirty = dirty
240        self._next_update_time: float | None = curtime if dirty else None
241        self._last_update_time: float | None = None
242        self._next_auto_dirty_time: float | None = (
243            (curtime + self._auto_dirty_seconds)
244            if (not dirty and self._auto_dirty_seconds is not None)
245            else None
246        )
247        self._use_lock = use_lock
248        self.lock: asyncio.Lock
249        if self._use_lock:
250            import asyncio
251
252            self.lock = asyncio.Lock()
253
254    @property
255    def dirty(self) -> bool:
256        """Whether the target is currently dirty.
257
258        This should be set to False once an update is successful.
259        """
260        return self._dirty
261
262    @dirty.setter
263    def dirty(self, value: bool) -> None:
264        # If we're freshly clean, set our next auto-dirty time (if we have
265        # one).
266        if self._dirty and not value and self._auto_dirty_seconds is not None:
267            self._next_auto_dirty_time = (
268                time.monotonic() + self._auto_dirty_seconds
269            )
270
271        # If we're freshly dirty, schedule an immediate update.
272        if not self._dirty and value:
273            self._next_update_time = time.monotonic()
274
275            # If they want to enforce a minimum update interval,
276            # push out the next update time if it hasn't been long enough.
277            if (
278                self._min_update_interval is not None
279                and self._last_update_time is not None
280            ):
281                self._next_update_time = max(
282                    self._next_update_time,
283                    self._last_update_time + self._min_update_interval,
284                )
285
286        self._dirty = value
287
288    @property
289    def should_update(self) -> bool:
290        """Whether an attempt should be made to clean the target now.
291
292        Always returns False if the target is not dirty.
293        Takes into account the amount of time passed since the target
294        was marked dirty or since should_update last returned True.
295        """
296        curtime = time.monotonic()
297
298        # Auto-dirty ourself if we're into that.
299        if (
300            self._next_auto_dirty_time is not None
301            and curtime > self._next_auto_dirty_time
302        ):
303            self.dirty = True
304            self._next_auto_dirty_time = None
305        if not self._dirty:
306            return False
307        if self._use_lock and self.lock.locked():
308            return False
309        assert self._next_update_time is not None
310        if curtime > self._next_update_time:
311            self._next_update_time = curtime + self._retry_interval
312            self._last_update_time = curtime
313            return True
314        return False
315
316
317class DispatchMethodWrapper(Generic[ArgT, RetT]):
318    """Type-aware standin for the dispatch func returned by dispatchmethod."""
319
320    def __call__(self, arg: ArgT) -> RetT:
321        raise RuntimeError('Should not get here')
322
323    @staticmethod
324    def register(
325        func: Callable[[Any, Any], RetT]
326    ) -> Callable[[Any, Any], RetT]:
327        """Register a new dispatch handler for this dispatch-method."""
328        raise RuntimeError('Should not get here')
329
330    registry: dict[Any, Callable]
331
332
333# noinspection PyProtectedMember,PyTypeHints
334def dispatchmethod(
335    func: Callable[[Any, ArgT], RetT]
336) -> DispatchMethodWrapper[ArgT, RetT]:
337    """A variation of functools.singledispatch for methods.
338
339    Note: as of Python 3.9 there is now functools.singledispatchmethod,
340    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
341    which gives us a reason to keep this one around for now.
342    """
343    from functools import singledispatch, update_wrapper
344
345    origwrapper: Any = singledispatch(func)
346
347    # Pull this out so hopefully origwrapper can die,
348    # otherwise we reference origwrapper in our wrapper.
349    dispatch = origwrapper.dispatch
350
351    # All we do here is recreate the end of functools.singledispatch
352    # where it returns a wrapper except instead of the wrapper using the
353    # first arg to the function ours uses the second (to skip 'self').
354    # This was made against Python 3.7; we should probably check up on
355    # this in later versions in case anything has changed.
356    # (or hopefully they'll add this functionality to their version)
357    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
358    def wrapper(*args: Any, **kw: Any) -> Any:
359        if not args or len(args) < 2:
360            raise TypeError(
361                f'{funcname} requires at least ' '2 positional arguments'
362            )
363
364        return dispatch(args[1].__class__)(*args, **kw)
365
366    funcname = getattr(func, '__name__', 'dispatchmethod method')
367    wrapper.register = origwrapper.register  # type: ignore
368    wrapper.dispatch = dispatch  # type: ignore
369    wrapper.registry = origwrapper.registry  # type: ignore
370    # pylint: disable=protected-access
371    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
372    update_wrapper(wrapper, func)
373    # pylint: enable=protected-access
374    return cast(DispatchMethodWrapper, wrapper)
375
376
377def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
378    """Decorator for functions to allow dispatching based on a value.
379
380    This differs from functools.singledispatch in that it dispatches based
381    on the value of an argument, not based on its type.
382    The 'register' method of a value-dispatch function can be used
383    to assign new functions to handle particular values.
384    Unhandled values wind up in the original dispatch function."""
385    return ValueDispatcher(call)
386
387
388class ValueDispatcher(Generic[ValT, RetT]):
389    """Used by the valuedispatch decorator"""
390
391    def __init__(self, call: Callable[[ValT], RetT]) -> None:
392        self._base_call = call
393        self._handlers: dict[ValT, Callable[[], RetT]] = {}
394
395    def __call__(self, value: ValT) -> RetT:
396        handler = self._handlers.get(value)
397        if handler is not None:
398            return handler()
399        return self._base_call(value)
400
401    def _add_handler(
402        self, value: ValT, call: Callable[[], RetT]
403    ) -> Callable[[], RetT]:
404        if value in self._handlers:
405            raise RuntimeError(f'Duplicate handlers added for {value}')
406        self._handlers[value] = call
407        return call
408
409    def register(
410        self, value: ValT
411    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
412        """Add a handler to the dispatcher."""
413        from functools import partial
414
415        return partial(self._add_handler, value)
416
417
418def valuedispatch1arg(
419    call: Callable[[ValT, ArgT], RetT]
420) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
421    """Like valuedispatch but for functions taking an extra argument."""
422    return ValueDispatcher1Arg(call)
423
424
425class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
426    """Used by the valuedispatch1arg decorator"""
427
428    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
429        self._base_call = call
430        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
431
432    def __call__(self, value: ValT, arg: ArgT) -> RetT:
433        handler = self._handlers.get(value)
434        if handler is not None:
435            return handler(arg)
436        return self._base_call(value, arg)
437
438    def _add_handler(
439        self, value: ValT, call: Callable[[ArgT], RetT]
440    ) -> Callable[[ArgT], RetT]:
441        if value in self._handlers:
442            raise RuntimeError(f'Duplicate handlers added for {value}')
443        self._handlers[value] = call
444        return call
445
446    def register(
447        self, value: ValT
448    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
449        """Add a handler to the dispatcher."""
450        from functools import partial
451
452        return partial(self._add_handler, value)
453
454
455if TYPE_CHECKING:
456
457    class ValueDispatcherMethod(Generic[ValT, RetT]):
458        """Used by the valuedispatchmethod decorator."""
459
460        def __call__(self, value: ValT) -> RetT: ...
461
462        def register(
463            self, value: ValT
464        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
465            """Add a handler to the dispatcher."""
466            ...
467
468
469def valuedispatchmethod(
470    call: Callable[[SelfT, ValT], RetT]
471) -> ValueDispatcherMethod[ValT, RetT]:
472    """Like valuedispatch but works with methods instead of functions."""
473
474    # NOTE: It seems that to wrap a method with a decorator and have self
475    # dispatching do the right thing, we must return a function and not
476    # an executable object. So for this version we store our data here
477    # in the function call dict and simply return a call.
478
479    _base_call = call
480    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
481
482    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
483        if value in _handlers:
484            raise RuntimeError(f'Duplicate handlers added for {value}')
485        _handlers[value] = addcall
486
487    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
488        from functools import partial
489
490        return partial(_add_handler, value)
491
492    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
493        handler = _handlers.get(value)
494        if handler is not None:
495            return handler(self)
496        return _base_call(self, value)
497
498    # We still want to use our returned object to register handlers, but we're
499    # actually just returning a function. So manually stuff the call onto it.
500    setattr(_call_wrapper, 'register', _register)
501
502    # To the type checker's eyes we return a ValueDispatchMethod instance;
503    # this lets it know about our register func and type-check its usage.
504    # In reality we just return a raw function call (for reasons listed above).
505    # pylint: disable=undefined-variable, no-else-return
506    if TYPE_CHECKING:
507        return ValueDispatcherMethod[ValT, RetT]()
508    else:
509        return _call_wrapper
510
511
512def make_hash(obj: Any) -> int:
513    """Makes a hash from a dictionary, list, tuple or set to any level,
514    that contains only other hashable types (including any lists, tuples,
515    sets, and dictionaries).
516
517    Note that this uses Python's hash() function internally so collisions/etc.
518    may be more common than with fancy cryptographic hashes.
519
520    Also be aware that Python's hash() output varies across processes, so
521    this should only be used for values that will remain in a single process.
522    """
523    import copy
524
525    if isinstance(obj, (set, tuple, list)):
526        return hash(tuple(make_hash(e) for e in obj))
527    if not isinstance(obj, dict):
528        return hash(obj)
529
530    new_obj = copy.deepcopy(obj)
531    for k, v in new_obj.items():
532        new_obj[k] = make_hash(v)
533
534    # NOTE: there is sorted works correctly because it compares only
535    # unique first values (i.e. dict keys)
536    return hash(tuple(frozenset(sorted(new_obj.items()))))
537
538
539def asserttype(obj: Any, typ: type[T]) -> T:
540    """Return an object typed as a given type.
541
542    Assert is used to check its actual type, so only use this when
543    failures are not expected. Otherwise use checktype.
544    """
545    assert isinstance(typ, type), 'only actual types accepted'
546    assert isinstance(obj, typ)
547    return obj
548
549
550def asserttype_o(obj: Any, typ: type[T]) -> T | None:
551    """Return an object typed as a given optional type.
552
553    Assert is used to check its actual type, so only use this when
554    failures are not expected. Otherwise use checktype.
555    """
556    assert isinstance(typ, type), 'only actual types accepted'
557    assert isinstance(obj, (typ, type(None)))
558    return obj
559
560
561def checktype(obj: Any, typ: type[T]) -> T:
562    """Return an object typed as a given type.
563
564    Always checks the type at runtime with isinstance and throws a TypeError
565    on failure. Use asserttype for more efficient (but less safe) equivalent.
566    """
567    assert isinstance(typ, type), 'only actual types accepted'
568    if not isinstance(obj, typ):
569        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
570    return obj
571
572
573def checktype_o(obj: Any, typ: type[T]) -> T | None:
574    """Return an object typed as a given optional type.
575
576    Always checks the type at runtime with isinstance and throws a TypeError
577    on failure. Use asserttype for more efficient (but less safe) equivalent.
578    """
579    assert isinstance(typ, type), 'only actual types accepted'
580    if not isinstance(obj, (typ, type(None))):
581        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
582    return obj
583
584
585def warntype(obj: Any, typ: type[T]) -> T:
586    """Return an object typed as a given type.
587
588    Always checks the type at runtime and simply logs a warning if it is
589    not what is expected.
590    """
591    assert isinstance(typ, type), 'only actual types accepted'
592    if not isinstance(obj, typ):
593        import logging
594
595        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
596    return obj  # type: ignore
597
598
599def warntype_o(obj: Any, typ: type[T]) -> T | None:
600    """Return an object typed as a given type.
601
602    Always checks the type at runtime and simply logs a warning if it is
603    not what is expected.
604    """
605    assert isinstance(typ, type), 'only actual types accepted'
606    if not isinstance(obj, (typ, type(None))):
607        import logging
608
609        logging.warning(
610            'warntype: expected a %s or None, got a %s', typ, type(obj)
611        )
612    return obj  # type: ignore
613
614
615def assert_non_optional(obj: T | None) -> T:
616    """Return an object with Optional typing removed.
617
618    Assert is used to check its actual type, so only use this when
619    failures are not expected. Use check_non_optional otherwise.
620    """
621    assert obj is not None
622    return obj
623
624
625def check_non_optional(obj: T | None) -> T:
626    """Return an object with Optional typing removed.
627
628    Always checks the actual type and throws a TypeError on failure.
629    Use assert_non_optional for a more efficient (but less safe) equivalent.
630    """
631    if obj is None:
632        raise ValueError('Got None value in check_non_optional.')
633    return obj
634
635
636def smoothstep(edge0: float, edge1: float, x: float) -> float:
637    """A smooth transition function.
638
639    Returns a value that smoothly moves from 0 to 1 as we go between edges.
640    Values outside of the range return 0 or 1.
641    """
642    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
643    return y * y * (3.0 - 2.0 * y)
644
645
646def linearstep(edge0: float, edge1: float, x: float) -> float:
647    """A linear transition function.
648
649    Returns a value that linearly moves from 0 to 1 as we go between edges.
650    Values outside of the range return 0 or 1.
651    """
652    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
653
654
655def _compact_id(num: int, chars: str) -> str:
656    if num < 0:
657        raise ValueError('Negative integers not allowed.')
658
659    # Chars must be in sorted order for sorting to work correctly
660    # on our output.
661    assert ''.join(sorted(list(chars))) == chars
662
663    base = len(chars)
664    out = ''
665    while num:
666        out += chars[num % base]
667        num //= base
668    return out[::-1] or '0'
669
670
671def human_readable_compact_id(num: int) -> str:
672    """Given a positive int, return a compact string representation for it.
673
674    Handy for visualizing unique numeric ids using as few as possible chars.
675    This representation uses only lowercase letters and numbers (minus the
676    following letters for readability):
677     's' is excluded due to similarity to '5'.
678     'l' is excluded due to similarity to '1'.
679     'i' is excluded due to similarity to '1'.
680     'o' is excluded due to similarity to '0'.
681     'z' is excluded due to similarity to '2'.
682
683    Therefore for n chars this can store values of 21^n.
684
685    When reading human input consisting of these IDs, it may be desirable
686    to map the disallowed chars to their corresponding allowed ones
687    ('o' -> '0', etc).
688
689    Sort order for these ids is the same as the original numbers.
690
691    If more compactness is desired at the expense of readability,
692    use compact_id() instead.
693    """
694    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
695
696
697def compact_id(num: int) -> str:
698    """Given a positive int, return a compact string representation for it.
699
700    Handy for visualizing unique numeric ids using as few as possible chars.
701    This version is more compact than human_readable_compact_id() but less
702    friendly to humans due to using both capital and lowercase letters,
703    both 'O' and '0', etc.
704
705    Therefore for n chars this can store values of 62^n.
706
707    Sort order for these ids is the same as the original numbers.
708    """
709    return _compact_id(
710        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
711    )
712
713
714def caller_source_location() -> str:
715    """Returns source file name and line of the code calling us.
716
717    Example: 'mymodule.py:23'
718    """
719    try:
720        import inspect
721
722        frame = inspect.currentframe()
723        for _i in range(2):
724            if frame is None:
725                raise RuntimeError()
726            frame = frame.f_back
727        if frame is None:
728            raise RuntimeError()
729        fname = os.path.basename(frame.f_code.co_filename)
730        return f'{fname}:{frame.f_lineno}'
731    except Exception:
732        return '<unknown source location>'
733
734
735def unchanging_hostname() -> str:
736    """Return an unchanging name for the local device.
737
738    Similar to the `hostname` call (or os.uname().nodename in Python)
739    except attempts to give a name that doesn't change depending on
740    network conditions. (A Mac will tend to go from Foo to Foo.local,
741    Foo.lan etc. throughout its various adventures)
742    """
743    import platform
744    import subprocess
745
746    # On Mac, this should give the computer name assigned in System Prefs.
747    if platform.system() == 'Darwin':
748        return (
749            subprocess.run(
750                ['scutil', '--get', 'ComputerName'],
751                check=True,
752                capture_output=True,
753            )
754            .stdout.decode()
755            .strip()
756            .replace(' ', '-')
757        )
758    return os.uname().nodename
759
760
761def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
762    """Do the thing."""
763    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
764        return
765
766    modulename = module_globals.get('__name__')
767    if not isinstance(modulename, str):
768        raise RuntimeError('Unable to get module name.')
769    assert not modulename.startswith('_')
770    modulename_prefix = f'{modulename}.'
771    modulename_prefix_2 = f'_{modulename}.'
772
773    for name, obj in module_globals.items():
774        if name.startswith('_'):
775            continue
776        existing = getattr(obj, '__module__', None)
777        try:
778            # Override the module ONLY if it lives under us somewhere.
779            # So ourpackage._submodule.Foo becomes ourpackage.Foo
780            # but otherpackage._submodule.Foo remains untouched.
781            if existing is not None and (
782                existing.startswith(modulename_prefix)
783                or existing.startswith(modulename_prefix_2)
784            ):
785                obj.__module__ = modulename
786        except Exception:
787            import logging
788
789            logging.warning(
790                'set_canonical_module_names: unable to change __module__'
791                " from '%s' to '%s' on %s object at '%s'.",
792                existing,
793                modulename,
794                type(obj),
795                name,
796            )
797
798
799def timedelta_str(
800    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
801) -> str:
802    """Return a simple human readable time string for a length of time.
803
804    Time can be given as a timedelta or a float representing seconds.
805    Example output:
806      "23d 1h 2m 32s" (with maxparts == 4)
807      "23d 1h" (with maxparts == 2)
808      "23d 1.08h" (with maxparts == 2 and decimals == 2)
809
810    Note that this is hard-coded in English and probably not especially
811    performant.
812    """
813    # pylint: disable=too-many-locals
814
815    if isinstance(timeval, float):
816        timevalfin = datetime.timedelta(seconds=timeval)
817    else:
818        timevalfin = timeval
819
820    # Internally we only handle positive values.
821    if timevalfin.total_seconds() < 0:
822        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
823
824    years = timevalfin.days // 365
825    days = timevalfin.days % 365
826    hours = timevalfin.seconds // 3600
827    hour_remainder = timevalfin.seconds % 3600
828    minutes = hour_remainder // 60
829    seconds = hour_remainder % 60
830
831    # Now, if we want decimal places for our last value,
832    # calc fractional parts.
833    if decimals:
834        # Calc totals of each type.
835        t_seconds = timevalfin.total_seconds()
836        t_minutes = t_seconds / 60
837        t_hours = t_minutes / 60
838        t_days = t_hours / 24
839        t_years = t_days / 365
840
841        # Calc fractional parts that exclude all whole values to their left.
842        years_covered = years
843        years_f = t_years - years_covered
844        days_covered = years_covered * 365 + days
845        days_f = t_days - days_covered
846        hours_covered = days_covered * 24 + hours
847        hours_f = t_hours - hours_covered
848        minutes_covered = hours_covered * 60 + minutes
849        minutes_f = t_minutes - minutes_covered
850        seconds_covered = minutes_covered * 60 + seconds
851        seconds_f = t_seconds - seconds_covered
852    else:
853        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
854
855    parts: list[str] = []
856    for part, part_f, suffix in (
857        (years, years_f, 'y'),
858        (days, days_f, 'd'),
859        (hours, hours_f, 'h'),
860        (minutes, minutes_f, 'm'),
861        (seconds, seconds_f, 's'),
862    ):
863        if part or parts or (not parts and suffix == 's'):
864            # Do decimal version only for the last part.
865            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
866                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
867            else:
868                parts.append(f'{part}{suffix}')
869            if len(parts) >= maxparts:
870                break
871    return ' '.join(parts)
872
873
874def ago_str(
875    timeval: datetime.datetime,
876    maxparts: int = 1,
877    now: datetime.datetime | None = None,
878    decimals: int = 0,
879) -> str:
880    """Given a datetime, return a clean human readable 'ago' str.
881
882    Note that this is hard-coded in English so should not be used
883    for visible in-game elements; only tools/etc.
884
885    If now is not passed, efro.util.utc_now() is used.
886    """
887    if now is None:
888        now = utc_now()
889    return (
890        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
891        + ' ago'
892    )
893
894
895def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
896    """Split a single list into smaller lists."""
897    return [
898        input_list[i : i + max_length]
899        for i in range(0, len(input_list), max_length)
900    ]
def explicit_bool(val: bool) -> bool:
46def explicit_bool(val: bool) -> bool:
47    """Return a non-inferable boolean value.
48
49    Useful to be able to disable blocks of code without type checkers
50    complaining/etc.
51    """
52    # pylint: disable=no-else-return
53    if TYPE_CHECKING:
54        # infer this! <boom>
55        import random
56
57        return random.random() < 0.5
58    else:
59        return val

Return a non-inferable boolean value.

Useful to be able to disable blocks of code without type checkers complaining/etc.

def snake_case_to_title(val: str) -> str:
62def snake_case_to_title(val: str) -> str:
63    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
64    # Kill empty words resulting from leading/trailing/multiple underscores.
65    return ' '.join(w for w in val.split('_') if w).title()

Given a snake-case string 'foo_bar', returns 'Foo Bar'.

def snake_case_to_camel_case(val: str) -> str:
68def snake_case_to_camel_case(val: str) -> str:
69    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
70    # Replace underscores with spaces; capitalize words; kill spaces.
71    # Not sure about efficiency, but logically simple.
72    return val.replace('_', ' ').title().replace(' ', '')

Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.

def enum_by_value(cls: type[~EnumT], value: Any) -> ~EnumT:
 75def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
 76    """Create an enum from a value.
 77
 78    This is basically the same as doing 'obj = EnumType(value)' except
 79    that it works around an issue where a reference loop is created
 80    if an exception is thrown due to an invalid value. Since we disable
 81    the cyclic garbage collector for most of the time, such loops can lead
 82    to our objects sticking around longer than we want.
 83    This issue has been submitted to Python as a bug so hopefully we can
 84    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
 85    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
 86    off once we are 3.9+ across the board.
 87    """
 88
 89    # Note: we don't recreate *ALL* the functionality of the Enum constructor
 90    # such as the _missing_ hook; but this should cover our basic needs.
 91    value2member_map = getattr(cls, '_value2member_map_')
 92    assert value2member_map is not None
 93    try:
 94        out = value2member_map[value]
 95        assert isinstance(out, cls)
 96        return out
 97    except KeyError:
 98        # pylint: disable=consider-using-f-string
 99        raise ValueError(
100            '%r is not a valid %s' % (value, cls.__name__)
101        ) from None

Create an enum from a value.

This is basically the same as doing 'obj = EnumType(value)' except that it works around an issue where a reference loop is created if an exception is thrown due to an invalid value. Since we disable the cyclic garbage collector for most of the time, such loops can lead to our objects sticking around longer than we want. This issue has been submitted to Python as a bug so hopefully we can remove this eventually if it gets fixed: https://bugs.python.org/issue42248 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this off once we are 3.9+ across the board.

def check_utc(value: datetime.datetime) -> None:
104def check_utc(value: datetime.datetime) -> None:
105    """Ensure a datetime value is timezone-aware utc."""
106    if value.tzinfo is not datetime.UTC:
107        raise ValueError(
108            'datetime value does not have timezone set as datetime.UTC'
109        )

Ensure a datetime value is timezone-aware utc.

def utc_now() -> datetime.datetime:
112def utc_now() -> datetime.datetime:
113    """Get timezone-aware current utc time.
114
115    Just a shortcut for datetime.datetime.now(datetime.UTC).
116    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
117    times.
118    """
119    return datetime.datetime.now(datetime.UTC)

Get timezone-aware current utc time.

Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.

def utc_now_naive() -> datetime.datetime:
122def utc_now_naive() -> datetime.datetime:
123    """Get naive utc time.
124
125    This can be used to replace datetime.utcnow(), which is now deprecated.
126    Most all code should migrate to use timezone-aware times instead of
127    this.
128    """
129    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)

Get naive utc time.

This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.

def utc_today() -> datetime.datetime:
132def utc_today() -> datetime.datetime:
133    """Get offset-aware midnight in the utc time zone."""
134    now = datetime.datetime.now(datetime.UTC)
135    return datetime.datetime(
136        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
137    )

Get offset-aware midnight in the utc time zone.

def utc_this_hour() -> datetime.datetime:
140def utc_this_hour() -> datetime.datetime:
141    """Get offset-aware beginning of the current hour in the utc time zone."""
142    now = datetime.datetime.now(datetime.UTC)
143    return datetime.datetime(
144        year=now.year,
145        month=now.month,
146        day=now.day,
147        hour=now.hour,
148        tzinfo=now.tzinfo,
149    )

Get offset-aware beginning of the current hour in the utc time zone.

def utc_this_minute() -> datetime.datetime:
152def utc_this_minute() -> datetime.datetime:
153    """Get offset-aware beginning of current minute in the utc time zone."""
154    now = datetime.datetime.now(datetime.UTC)
155    return datetime.datetime(
156        year=now.year,
157        month=now.month,
158        day=now.day,
159        hour=now.hour,
160        minute=now.minute,
161        tzinfo=now.tzinfo,
162    )

Get offset-aware beginning of current minute in the utc time zone.

def empty_weakref(objtype: type[~T]) -> weakref.ReferenceType[~T]:
165def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
166    """Return an invalidated weak-reference for the specified type."""
167    # At runtime, all weakrefs are the same; our type arg is just
168    # for the static type checker.
169    del objtype  # Unused.
170
171    # Just create an object and let it die. Is there a cleaner way to do this?
172    # return weakref.ref(_EmptyObj())  # type: ignore
173
174    # Sharing a single ones seems at least a bit better.
175    return _g_empty_weak_ref  # type: ignore

Return an invalidated weak-reference for the specified type.

def data_size_str(bytecount: int, compact: bool = False) -> str:
178def data_size_str(bytecount: int, compact: bool = False) -> str:
179    """Given a size in bytes, returns a short human readable string.
180
181    In compact mode this should be 6 or fewer chars for most all
182    sane file sizes.
183    """
184    # pylint: disable=too-many-return-statements
185
186    # Special case: handle negatives.
187    if bytecount < 0:
188        val = data_size_str(-bytecount, compact=compact)
189        return f'-{val}'
190
191    if bytecount <= 999:
192        suffix = 'B' if compact else 'bytes'
193        return f'{bytecount} {suffix}'
194    kbytecount = bytecount / 1024
195    if round(kbytecount, 1) < 10.0:
196        return f'{kbytecount:.1f} KB'
197    if round(kbytecount, 0) < 999:
198        return f'{kbytecount:.0f} KB'
199    mbytecount = bytecount / (1024 * 1024)
200    if round(mbytecount, 1) < 10.0:
201        return f'{mbytecount:.1f} MB'
202    if round(mbytecount, 0) < 999:
203        return f'{mbytecount:.0f} MB'
204    gbytecount = bytecount / (1024 * 1024 * 1024)
205    if round(gbytecount, 1) < 10.0:
206        return f'{gbytecount:.1f} GB'
207    return f'{gbytecount:.0f} GB'

Given a size in bytes, returns a short human readable string.

In compact mode this should be 6 or fewer chars for most all sane file sizes.

class DirtyBit:
210class DirtyBit:
211    """Manages whether a thing is dirty and regulates attempts to clean it.
212
213    To use, simply set the 'dirty' value on this object to True when some
214    action is needed, and then check the 'should_update' value to regulate
215    when attempts to clean it should be made. Set 'dirty' back to False after
216    a successful update.
217    If 'use_lock' is True, an asyncio Lock will be created and incorporated
218    into update attempts to prevent simultaneous updates (should_update will
219    only return True when the lock is unlocked). Note that It is up to the user
220    to lock/unlock the lock during the actual update attempt.
221    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
222    itself back to dirty after being clean for the given amount of time.
223    'min_update_interval' can be used to enforce a minimum update
224    interval even when updates are successful (retry_interval only applies
225    when updates fail)
226    """
227
228    def __init__(
229        self,
230        dirty: bool = False,
231        retry_interval: float = 5.0,
232        use_lock: bool = False,
233        auto_dirty_seconds: float | None = None,
234        min_update_interval: float | None = None,
235    ):
236        curtime = time.monotonic()
237        self._retry_interval = retry_interval
238        self._auto_dirty_seconds = auto_dirty_seconds
239        self._min_update_interval = min_update_interval
240        self._dirty = dirty
241        self._next_update_time: float | None = curtime if dirty else None
242        self._last_update_time: float | None = None
243        self._next_auto_dirty_time: float | None = (
244            (curtime + self._auto_dirty_seconds)
245            if (not dirty and self._auto_dirty_seconds is not None)
246            else None
247        )
248        self._use_lock = use_lock
249        self.lock: asyncio.Lock
250        if self._use_lock:
251            import asyncio
252
253            self.lock = asyncio.Lock()
254
255    @property
256    def dirty(self) -> bool:
257        """Whether the target is currently dirty.
258
259        This should be set to False once an update is successful.
260        """
261        return self._dirty
262
263    @dirty.setter
264    def dirty(self, value: bool) -> None:
265        # If we're freshly clean, set our next auto-dirty time (if we have
266        # one).
267        if self._dirty and not value and self._auto_dirty_seconds is not None:
268            self._next_auto_dirty_time = (
269                time.monotonic() + self._auto_dirty_seconds
270            )
271
272        # If we're freshly dirty, schedule an immediate update.
273        if not self._dirty and value:
274            self._next_update_time = time.monotonic()
275
276            # If they want to enforce a minimum update interval,
277            # push out the next update time if it hasn't been long enough.
278            if (
279                self._min_update_interval is not None
280                and self._last_update_time is not None
281            ):
282                self._next_update_time = max(
283                    self._next_update_time,
284                    self._last_update_time + self._min_update_interval,
285                )
286
287        self._dirty = value
288
289    @property
290    def should_update(self) -> bool:
291        """Whether an attempt should be made to clean the target now.
292
293        Always returns False if the target is not dirty.
294        Takes into account the amount of time passed since the target
295        was marked dirty or since should_update last returned True.
296        """
297        curtime = time.monotonic()
298
299        # Auto-dirty ourself if we're into that.
300        if (
301            self._next_auto_dirty_time is not None
302            and curtime > self._next_auto_dirty_time
303        ):
304            self.dirty = True
305            self._next_auto_dirty_time = None
306        if not self._dirty:
307            return False
308        if self._use_lock and self.lock.locked():
309            return False
310        assert self._next_update_time is not None
311        if curtime > self._next_update_time:
312            self._next_update_time = curtime + self._retry_interval
313            self._last_update_time = curtime
314            return True
315        return False

Manages whether a thing is dirty and regulates attempts to clean it.

To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

DirtyBit( dirty: bool = False, retry_interval: float = 5.0, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)
228    def __init__(
229        self,
230        dirty: bool = False,
231        retry_interval: float = 5.0,
232        use_lock: bool = False,
233        auto_dirty_seconds: float | None = None,
234        min_update_interval: float | None = None,
235    ):
236        curtime = time.monotonic()
237        self._retry_interval = retry_interval
238        self._auto_dirty_seconds = auto_dirty_seconds
239        self._min_update_interval = min_update_interval
240        self._dirty = dirty
241        self._next_update_time: float | None = curtime if dirty else None
242        self._last_update_time: float | None = None
243        self._next_auto_dirty_time: float | None = (
244            (curtime + self._auto_dirty_seconds)
245            if (not dirty and self._auto_dirty_seconds is not None)
246            else None
247        )
248        self._use_lock = use_lock
249        self.lock: asyncio.Lock
250        if self._use_lock:
251            import asyncio
252
253            self.lock = asyncio.Lock()
lock: asyncio.locks.Lock
dirty: bool
255    @property
256    def dirty(self) -> bool:
257        """Whether the target is currently dirty.
258
259        This should be set to False once an update is successful.
260        """
261        return self._dirty

Whether the target is currently dirty.

This should be set to False once an update is successful.

should_update: bool
289    @property
290    def should_update(self) -> bool:
291        """Whether an attempt should be made to clean the target now.
292
293        Always returns False if the target is not dirty.
294        Takes into account the amount of time passed since the target
295        was marked dirty or since should_update last returned True.
296        """
297        curtime = time.monotonic()
298
299        # Auto-dirty ourself if we're into that.
300        if (
301            self._next_auto_dirty_time is not None
302            and curtime > self._next_auto_dirty_time
303        ):
304            self.dirty = True
305            self._next_auto_dirty_time = None
306        if not self._dirty:
307            return False
308        if self._use_lock and self.lock.locked():
309            return False
310        assert self._next_update_time is not None
311        if curtime > self._next_update_time:
312            self._next_update_time = curtime + self._retry_interval
313            self._last_update_time = curtime
314            return True
315        return False

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class DispatchMethodWrapper(typing.Generic[~ArgT, ~RetT]):
318class DispatchMethodWrapper(Generic[ArgT, RetT]):
319    """Type-aware standin for the dispatch func returned by dispatchmethod."""
320
321    def __call__(self, arg: ArgT) -> RetT:
322        raise RuntimeError('Should not get here')
323
324    @staticmethod
325    def register(
326        func: Callable[[Any, Any], RetT]
327    ) -> Callable[[Any, Any], RetT]:
328        """Register a new dispatch handler for this dispatch-method."""
329        raise RuntimeError('Should not get here')
330
331    registry: dict[Any, Callable]

Type-aware standin for the dispatch func returned by dispatchmethod.

@staticmethod
def register(func: Callable[[Any, Any], ~RetT]) -> Callable[[Any, Any], ~RetT]:
324    @staticmethod
325    def register(
326        func: Callable[[Any, Any], RetT]
327    ) -> Callable[[Any, Any], RetT]:
328        """Register a new dispatch handler for this dispatch-method."""
329        raise RuntimeError('Should not get here')

Register a new dispatch handler for this dispatch-method.

registry: dict[typing.Any, typing.Callable]
def dispatchmethod( func: Callable[[Any, ~ArgT], ~RetT]) -> DispatchMethodWrapper[~ArgT, ~RetT]:
335def dispatchmethod(
336    func: Callable[[Any, ArgT], RetT]
337) -> DispatchMethodWrapper[ArgT, RetT]:
338    """A variation of functools.singledispatch for methods.
339
340    Note: as of Python 3.9 there is now functools.singledispatchmethod,
341    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
342    which gives us a reason to keep this one around for now.
343    """
344    from functools import singledispatch, update_wrapper
345
346    origwrapper: Any = singledispatch(func)
347
348    # Pull this out so hopefully origwrapper can die,
349    # otherwise we reference origwrapper in our wrapper.
350    dispatch = origwrapper.dispatch
351
352    # All we do here is recreate the end of functools.singledispatch
353    # where it returns a wrapper except instead of the wrapper using the
354    # first arg to the function ours uses the second (to skip 'self').
355    # This was made against Python 3.7; we should probably check up on
356    # this in later versions in case anything has changed.
357    # (or hopefully they'll add this functionality to their version)
358    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
359    def wrapper(*args: Any, **kw: Any) -> Any:
360        if not args or len(args) < 2:
361            raise TypeError(
362                f'{funcname} requires at least ' '2 positional arguments'
363            )
364
365        return dispatch(args[1].__class__)(*args, **kw)
366
367    funcname = getattr(func, '__name__', 'dispatchmethod method')
368    wrapper.register = origwrapper.register  # type: ignore
369    wrapper.dispatch = dispatch  # type: ignore
370    wrapper.registry = origwrapper.registry  # type: ignore
371    # pylint: disable=protected-access
372    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
373    update_wrapper(wrapper, func)
374    # pylint: enable=protected-access
375    return cast(DispatchMethodWrapper, wrapper)

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

def valuedispatch( call: Callable[[~ValT], ~RetT]) -> ValueDispatcher[~ValT, ~RetT]:
378def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
379    """Decorator for functions to allow dispatching based on a value.
380
381    This differs from functools.singledispatch in that it dispatches based
382    on the value of an argument, not based on its type.
383    The 'register' method of a value-dispatch function can be used
384    to assign new functions to handle particular values.
385    Unhandled values wind up in the original dispatch function."""
386    return ValueDispatcher(call)

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

class ValueDispatcher(typing.Generic[~ValT, ~RetT]):
389class ValueDispatcher(Generic[ValT, RetT]):
390    """Used by the valuedispatch decorator"""
391
392    def __init__(self, call: Callable[[ValT], RetT]) -> None:
393        self._base_call = call
394        self._handlers: dict[ValT, Callable[[], RetT]] = {}
395
396    def __call__(self, value: ValT) -> RetT:
397        handler = self._handlers.get(value)
398        if handler is not None:
399            return handler()
400        return self._base_call(value)
401
402    def _add_handler(
403        self, value: ValT, call: Callable[[], RetT]
404    ) -> Callable[[], RetT]:
405        if value in self._handlers:
406            raise RuntimeError(f'Duplicate handlers added for {value}')
407        self._handlers[value] = call
408        return call
409
410    def register(
411        self, value: ValT
412    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
413        """Add a handler to the dispatcher."""
414        from functools import partial
415
416        return partial(self._add_handler, value)

Used by the valuedispatch decorator

ValueDispatcher(call: Callable[[~ValT], ~RetT])
392    def __init__(self, call: Callable[[ValT], RetT]) -> None:
393        self._base_call = call
394        self._handlers: dict[ValT, Callable[[], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[], ~RetT]], Callable[[], ~RetT]]:
410    def register(
411        self, value: ValT
412    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
413        """Add a handler to the dispatcher."""
414        from functools import partial
415
416        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatch1arg( call: Callable[[~ValT, ~ArgT], ~RetT]) -> ValueDispatcher1Arg[~ValT, ~ArgT, ~RetT]:
419def valuedispatch1arg(
420    call: Callable[[ValT, ArgT], RetT]
421) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
422    """Like valuedispatch but for functions taking an extra argument."""
423    return ValueDispatcher1Arg(call)

Like valuedispatch but for functions taking an extra argument.

class ValueDispatcher1Arg(typing.Generic[~ValT, ~ArgT, ~RetT]):
426class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
427    """Used by the valuedispatch1arg decorator"""
428
429    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
430        self._base_call = call
431        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
432
433    def __call__(self, value: ValT, arg: ArgT) -> RetT:
434        handler = self._handlers.get(value)
435        if handler is not None:
436            return handler(arg)
437        return self._base_call(value, arg)
438
439    def _add_handler(
440        self, value: ValT, call: Callable[[ArgT], RetT]
441    ) -> Callable[[ArgT], RetT]:
442        if value in self._handlers:
443            raise RuntimeError(f'Duplicate handlers added for {value}')
444        self._handlers[value] = call
445        return call
446
447    def register(
448        self, value: ValT
449    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
450        """Add a handler to the dispatcher."""
451        from functools import partial
452
453        return partial(self._add_handler, value)

Used by the valuedispatch1arg decorator

ValueDispatcher1Arg(call: Callable[[~ValT, ~ArgT], ~RetT])
429    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
430        self._base_call = call
431        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[~ArgT], ~RetT]], Callable[[~ArgT], ~RetT]]:
447    def register(
448        self, value: ValT
449    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
450        """Add a handler to the dispatcher."""
451        from functools import partial
452
453        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatchmethod( call: Callable[[~SelfT, ~ValT], ~RetT]) -> efro.util.ValueDispatcherMethod[~ValT, ~RetT]:
470def valuedispatchmethod(
471    call: Callable[[SelfT, ValT], RetT]
472) -> ValueDispatcherMethod[ValT, RetT]:
473    """Like valuedispatch but works with methods instead of functions."""
474
475    # NOTE: It seems that to wrap a method with a decorator and have self
476    # dispatching do the right thing, we must return a function and not
477    # an executable object. So for this version we store our data here
478    # in the function call dict and simply return a call.
479
480    _base_call = call
481    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
482
483    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
484        if value in _handlers:
485            raise RuntimeError(f'Duplicate handlers added for {value}')
486        _handlers[value] = addcall
487
488    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
489        from functools import partial
490
491        return partial(_add_handler, value)
492
493    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
494        handler = _handlers.get(value)
495        if handler is not None:
496            return handler(self)
497        return _base_call(self, value)
498
499    # We still want to use our returned object to register handlers, but we're
500    # actually just returning a function. So manually stuff the call onto it.
501    setattr(_call_wrapper, 'register', _register)
502
503    # To the type checker's eyes we return a ValueDispatchMethod instance;
504    # this lets it know about our register func and type-check its usage.
505    # In reality we just return a raw function call (for reasons listed above).
506    # pylint: disable=undefined-variable, no-else-return
507    if TYPE_CHECKING:
508        return ValueDispatcherMethod[ValT, RetT]()
509    else:
510        return _call_wrapper

Like valuedispatch but works with methods instead of functions.

def make_hash(obj: Any) -> int:
513def make_hash(obj: Any) -> int:
514    """Makes a hash from a dictionary, list, tuple or set to any level,
515    that contains only other hashable types (including any lists, tuples,
516    sets, and dictionaries).
517
518    Note that this uses Python's hash() function internally so collisions/etc.
519    may be more common than with fancy cryptographic hashes.
520
521    Also be aware that Python's hash() output varies across processes, so
522    this should only be used for values that will remain in a single process.
523    """
524    import copy
525
526    if isinstance(obj, (set, tuple, list)):
527        return hash(tuple(make_hash(e) for e in obj))
528    if not isinstance(obj, dict):
529        return hash(obj)
530
531    new_obj = copy.deepcopy(obj)
532    for k, v in new_obj.items():
533        new_obj[k] = make_hash(v)
534
535    # NOTE: there is sorted works correctly because it compares only
536    # unique first values (i.e. dict keys)
537    return hash(tuple(frozenset(sorted(new_obj.items()))))

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.

def asserttype(obj: Any, typ: type[~T]) -> ~T:
540def asserttype(obj: Any, typ: type[T]) -> T:
541    """Return an object typed as a given type.
542
543    Assert is used to check its actual type, so only use this when
544    failures are not expected. Otherwise use checktype.
545    """
546    assert isinstance(typ, type), 'only actual types accepted'
547    assert isinstance(obj, typ)
548    return obj

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def asserttype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
551def asserttype_o(obj: Any, typ: type[T]) -> T | None:
552    """Return an object typed as a given optional type.
553
554    Assert is used to check its actual type, so only use this when
555    failures are not expected. Otherwise use checktype.
556    """
557    assert isinstance(typ, type), 'only actual types accepted'
558    assert isinstance(obj, (typ, type(None)))
559    return obj

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def checktype(obj: Any, typ: type[~T]) -> ~T:
562def checktype(obj: Any, typ: type[T]) -> T:
563    """Return an object typed as a given type.
564
565    Always checks the type at runtime with isinstance and throws a TypeError
566    on failure. Use asserttype for more efficient (but less safe) equivalent.
567    """
568    assert isinstance(typ, type), 'only actual types accepted'
569    if not isinstance(obj, typ):
570        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
571    return obj

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def checktype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
574def checktype_o(obj: Any, typ: type[T]) -> T | None:
575    """Return an object typed as a given optional type.
576
577    Always checks the type at runtime with isinstance and throws a TypeError
578    on failure. Use asserttype for more efficient (but less safe) equivalent.
579    """
580    assert isinstance(typ, type), 'only actual types accepted'
581    if not isinstance(obj, (typ, type(None))):
582        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
583    return obj

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def warntype(obj: Any, typ: type[~T]) -> ~T:
586def warntype(obj: Any, typ: type[T]) -> T:
587    """Return an object typed as a given type.
588
589    Always checks the type at runtime and simply logs a warning if it is
590    not what is expected.
591    """
592    assert isinstance(typ, type), 'only actual types accepted'
593    if not isinstance(obj, typ):
594        import logging
595
596        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
597    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def warntype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
600def warntype_o(obj: Any, typ: type[T]) -> T | None:
601    """Return an object typed as a given type.
602
603    Always checks the type at runtime and simply logs a warning if it is
604    not what is expected.
605    """
606    assert isinstance(typ, type), 'only actual types accepted'
607    if not isinstance(obj, (typ, type(None))):
608        import logging
609
610        logging.warning(
611            'warntype: expected a %s or None, got a %s', typ, type(obj)
612        )
613    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def assert_non_optional(obj: Optional[~T]) -> ~T:
616def assert_non_optional(obj: T | None) -> T:
617    """Return an object with Optional typing removed.
618
619    Assert is used to check its actual type, so only use this when
620    failures are not expected. Use check_non_optional otherwise.
621    """
622    assert obj is not None
623    return obj

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

def check_non_optional(obj: Optional[~T]) -> ~T:
626def check_non_optional(obj: T | None) -> T:
627    """Return an object with Optional typing removed.
628
629    Always checks the actual type and throws a TypeError on failure.
630    Use assert_non_optional for a more efficient (but less safe) equivalent.
631    """
632    if obj is None:
633        raise ValueError('Got None value in check_non_optional.')
634    return obj

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

def smoothstep(edge0: float, edge1: float, x: float) -> float:
637def smoothstep(edge0: float, edge1: float, x: float) -> float:
638    """A smooth transition function.
639
640    Returns a value that smoothly moves from 0 to 1 as we go between edges.
641    Values outside of the range return 0 or 1.
642    """
643    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
644    return y * y * (3.0 - 2.0 * y)

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def linearstep(edge0: float, edge1: float, x: float) -> float:
647def linearstep(edge0: float, edge1: float, x: float) -> float:
648    """A linear transition function.
649
650    Returns a value that linearly moves from 0 to 1 as we go between edges.
651    Values outside of the range return 0 or 1.
652    """
653    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def human_readable_compact_id(num: int) -> str:
672def human_readable_compact_id(num: int) -> str:
673    """Given a positive int, return a compact string representation for it.
674
675    Handy for visualizing unique numeric ids using as few as possible chars.
676    This representation uses only lowercase letters and numbers (minus the
677    following letters for readability):
678     's' is excluded due to similarity to '5'.
679     'l' is excluded due to similarity to '1'.
680     'i' is excluded due to similarity to '1'.
681     'o' is excluded due to similarity to '0'.
682     'z' is excluded due to similarity to '2'.
683
684    Therefore for n chars this can store values of 21^n.
685
686    When reading human input consisting of these IDs, it may be desirable
687    to map the disallowed chars to their corresponding allowed ones
688    ('o' -> '0', etc).
689
690    Sort order for these ids is the same as the original numbers.
691
692    If more compactness is desired at the expense of readability,
693    use compact_id() instead.
694    """
695    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

def compact_id(num: int) -> str:
698def compact_id(num: int) -> str:
699    """Given a positive int, return a compact string representation for it.
700
701    Handy for visualizing unique numeric ids using as few as possible chars.
702    This version is more compact than human_readable_compact_id() but less
703    friendly to humans due to using both capital and lowercase letters,
704    both 'O' and '0', etc.
705
706    Therefore for n chars this can store values of 62^n.
707
708    Sort order for these ids is the same as the original numbers.
709    """
710    return _compact_id(
711        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
712    )

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

def caller_source_location() -> str:
715def caller_source_location() -> str:
716    """Returns source file name and line of the code calling us.
717
718    Example: 'mymodule.py:23'
719    """
720    try:
721        import inspect
722
723        frame = inspect.currentframe()
724        for _i in range(2):
725            if frame is None:
726                raise RuntimeError()
727            frame = frame.f_back
728        if frame is None:
729            raise RuntimeError()
730        fname = os.path.basename(frame.f_code.co_filename)
731        return f'{fname}:{frame.f_lineno}'
732    except Exception:
733        return '<unknown source location>'

Returns source file name and line of the code calling us.

Example: 'mymodule.py:23'

def unchanging_hostname() -> str:
736def unchanging_hostname() -> str:
737    """Return an unchanging name for the local device.
738
739    Similar to the `hostname` call (or os.uname().nodename in Python)
740    except attempts to give a name that doesn't change depending on
741    network conditions. (A Mac will tend to go from Foo to Foo.local,
742    Foo.lan etc. throughout its various adventures)
743    """
744    import platform
745    import subprocess
746
747    # On Mac, this should give the computer name assigned in System Prefs.
748    if platform.system() == 'Darwin':
749        return (
750            subprocess.run(
751                ['scutil', '--get', 'ComputerName'],
752                check=True,
753                capture_output=True,
754            )
755            .stdout.decode()
756            .strip()
757            .replace(' ', '-')
758        )
759    return os.uname().nodename

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn't change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

def set_canonical_module_names(module_globals: dict[str, typing.Any]) -> None:
762def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
763    """Do the thing."""
764    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
765        return
766
767    modulename = module_globals.get('__name__')
768    if not isinstance(modulename, str):
769        raise RuntimeError('Unable to get module name.')
770    assert not modulename.startswith('_')
771    modulename_prefix = f'{modulename}.'
772    modulename_prefix_2 = f'_{modulename}.'
773
774    for name, obj in module_globals.items():
775        if name.startswith('_'):
776            continue
777        existing = getattr(obj, '__module__', None)
778        try:
779            # Override the module ONLY if it lives under us somewhere.
780            # So ourpackage._submodule.Foo becomes ourpackage.Foo
781            # but otherpackage._submodule.Foo remains untouched.
782            if existing is not None and (
783                existing.startswith(modulename_prefix)
784                or existing.startswith(modulename_prefix_2)
785            ):
786                obj.__module__ = modulename
787        except Exception:
788            import logging
789
790            logging.warning(
791                'set_canonical_module_names: unable to change __module__'
792                " from '%s' to '%s' on %s object at '%s'.",
793                existing,
794                modulename,
795                type(obj),
796                name,
797            )

Do the thing.

def timedelta_str( timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0) -> str:
800def timedelta_str(
801    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
802) -> str:
803    """Return a simple human readable time string for a length of time.
804
805    Time can be given as a timedelta or a float representing seconds.
806    Example output:
807      "23d 1h 2m 32s" (with maxparts == 4)
808      "23d 1h" (with maxparts == 2)
809      "23d 1.08h" (with maxparts == 2 and decimals == 2)
810
811    Note that this is hard-coded in English and probably not especially
812    performant.
813    """
814    # pylint: disable=too-many-locals
815
816    if isinstance(timeval, float):
817        timevalfin = datetime.timedelta(seconds=timeval)
818    else:
819        timevalfin = timeval
820
821    # Internally we only handle positive values.
822    if timevalfin.total_seconds() < 0:
823        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
824
825    years = timevalfin.days // 365
826    days = timevalfin.days % 365
827    hours = timevalfin.seconds // 3600
828    hour_remainder = timevalfin.seconds % 3600
829    minutes = hour_remainder // 60
830    seconds = hour_remainder % 60
831
832    # Now, if we want decimal places for our last value,
833    # calc fractional parts.
834    if decimals:
835        # Calc totals of each type.
836        t_seconds = timevalfin.total_seconds()
837        t_minutes = t_seconds / 60
838        t_hours = t_minutes / 60
839        t_days = t_hours / 24
840        t_years = t_days / 365
841
842        # Calc fractional parts that exclude all whole values to their left.
843        years_covered = years
844        years_f = t_years - years_covered
845        days_covered = years_covered * 365 + days
846        days_f = t_days - days_covered
847        hours_covered = days_covered * 24 + hours
848        hours_f = t_hours - hours_covered
849        minutes_covered = hours_covered * 60 + minutes
850        minutes_f = t_minutes - minutes_covered
851        seconds_covered = minutes_covered * 60 + seconds
852        seconds_f = t_seconds - seconds_covered
853    else:
854        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
855
856    parts: list[str] = []
857    for part, part_f, suffix in (
858        (years, years_f, 'y'),
859        (days, days_f, 'd'),
860        (hours, hours_f, 'h'),
861        (minutes, minutes_f, 'm'),
862        (seconds, seconds_f, 's'),
863    ):
864        if part or parts or (not parts and suffix == 's'):
865            # Do decimal version only for the last part.
866            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
867                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
868            else:
869                parts.append(f'{part}{suffix}')
870            if len(parts) >= maxparts:
871                break
872    return ' '.join(parts)

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

def ago_str( timeval: datetime.datetime, maxparts: int = 1, now: datetime.datetime | None = None, decimals: int = 0) -> str:
875def ago_str(
876    timeval: datetime.datetime,
877    maxparts: int = 1,
878    now: datetime.datetime | None = None,
879    decimals: int = 0,
880) -> str:
881    """Given a datetime, return a clean human readable 'ago' str.
882
883    Note that this is hard-coded in English so should not be used
884    for visible in-game elements; only tools/etc.
885
886    If now is not passed, efro.util.utc_now() is used.
887    """
888    if now is None:
889        now = utc_now()
890    return (
891        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
892        + ' ago'
893    )

Given a datetime, return a clean human readable 'ago' str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, utc_now() is used.

def split_list(input_list: list[~T], max_length: int) -> list[list[~T]]:
896def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
897    """Split a single list into smaller lists."""
898    return [
899        input_list[i : i + max_length]
900        for i in range(0, len(input_list), max_length)
901    ]

Split a single list into smaller lists.