efro.util

Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Small handy bits of functionality."""
  4
  5from __future__ import annotations
  6
  7import os
  8import time
  9import weakref
 10import datetime
 11from enum import Enum
 12from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload
 13
 14if TYPE_CHECKING:
 15    import asyncio
 16    from typing import Any, Callable, Literal
 17
 18T = TypeVar('T')
 19ValT = TypeVar('ValT')
 20ArgT = TypeVar('ArgT')
 21SelfT = TypeVar('SelfT')
 22RetT = TypeVar('RetT')
 23EnumT = TypeVar('EnumT', bound=Enum)
 24
 25
 26class _EmptyObj:
 27    pass
 28
 29
 30# A dead weak-ref should be immutable, right? So we can create exactly
 31# one and return it for all cases that need an empty weak-ref.
 32_g_empty_weak_ref = weakref.ref(_EmptyObj())
 33assert _g_empty_weak_ref() is None
 34
 35
 36def explicit_bool(val: bool) -> bool:
 37    """Return a non-inferable boolean value.
 38
 39    Useful to be able to disable blocks of code without type checkers
 40    complaining/etc.
 41    """
 42    # pylint: disable=no-else-return
 43    if TYPE_CHECKING:
 44        # infer this! <boom>
 45        import random
 46
 47        return random.random() < 0.5
 48    else:
 49        return val
 50
 51
 52def snake_case_to_title(val: str) -> str:
 53    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 54    # Kill empty words resulting from leading/trailing/multiple underscores.
 55    return ' '.join(w for w in val.split('_') if w).title()
 56
 57
 58def snake_case_to_camel_case(val: str) -> str:
 59    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 60    # Replace underscores with spaces; capitalize words; kill spaces.
 61    # Not sure about efficiency, but logically simple.
 62    return val.replace('_', ' ').title().replace(' ', '')
 63
 64
 65def check_utc(value: datetime.datetime) -> None:
 66    """Ensure a datetime value is timezone-aware utc."""
 67    if value.tzinfo is not datetime.UTC:
 68        raise ValueError(
 69            'datetime value does not have timezone set as datetime.UTC'
 70        )
 71
 72
 73def utc_now() -> datetime.datetime:
 74    """Get timezone-aware current utc time.
 75
 76    Just a shortcut for datetime.datetime.now(datetime.UTC).
 77    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
 78    times.
 79    """
 80    return datetime.datetime.now(datetime.UTC)
 81
 82
 83def utc_now_naive() -> datetime.datetime:
 84    """Get naive utc time.
 85
 86    This can be used to replace datetime.utcnow(), which is now deprecated.
 87    Most all code should migrate to use timezone-aware times instead of
 88    this.
 89    """
 90    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
 91
 92
 93def utc_today() -> datetime.datetime:
 94    """Get offset-aware midnight in the utc time zone."""
 95    now = datetime.datetime.now(datetime.UTC)
 96    return datetime.datetime(
 97        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
 98    )
 99
100
101def utc_this_hour() -> datetime.datetime:
102    """Get offset-aware beginning of the current hour in the utc time zone."""
103    now = datetime.datetime.now(datetime.UTC)
104    return datetime.datetime(
105        year=now.year,
106        month=now.month,
107        day=now.day,
108        hour=now.hour,
109        tzinfo=now.tzinfo,
110    )
111
112
113def utc_this_minute() -> datetime.datetime:
114    """Get offset-aware beginning of current minute in the utc time zone."""
115    now = datetime.datetime.now(datetime.UTC)
116    return datetime.datetime(
117        year=now.year,
118        month=now.month,
119        day=now.day,
120        hour=now.hour,
121        minute=now.minute,
122        tzinfo=now.tzinfo,
123    )
124
125
126def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
127    """Return an invalidated weak-reference for the specified type."""
128    # At runtime, all weakrefs are the same; our type arg is just
129    # for the static type checker.
130    del objtype  # Unused.
131
132    # Just create an object and let it die. Is there a cleaner way to do this?
133    # return weakref.ref(_EmptyObj())  # type: ignore
134
135    # Sharing a single ones seems at least a bit better.
136    return _g_empty_weak_ref  # type: ignore
137
138
139def data_size_str(bytecount: int, compact: bool = False) -> str:
140    """Given a size in bytes, returns a short human readable string.
141
142    In compact mode this should be 6 or fewer chars for most all
143    sane file sizes.
144    """
145    # pylint: disable=too-many-return-statements
146
147    # Special case: handle negatives.
148    if bytecount < 0:
149        val = data_size_str(-bytecount, compact=compact)
150        return f'-{val}'
151
152    if bytecount <= 999:
153        suffix = 'B' if compact else 'bytes'
154        return f'{bytecount} {suffix}'
155    kbytecount = bytecount / 1024
156    if round(kbytecount, 1) < 10.0:
157        return f'{kbytecount:.1f} KB'
158    if round(kbytecount, 0) < 999:
159        return f'{kbytecount:.0f} KB'
160    mbytecount = bytecount / (1024 * 1024)
161    if round(mbytecount, 1) < 10.0:
162        return f'{mbytecount:.1f} MB'
163    if round(mbytecount, 0) < 999:
164        return f'{mbytecount:.0f} MB'
165    gbytecount = bytecount / (1024 * 1024 * 1024)
166    if round(gbytecount, 1) < 10.0:
167        return f'{gbytecount:.1f} GB'
168    return f'{gbytecount:.0f} GB'
169
170
171class DirtyBit:
172    """Manages whether a thing is dirty and regulates attempts to clean it.
173
174    To use, simply set the 'dirty' value on this object to True when some
175    action is needed, and then check the 'should_update' value to regulate
176    when attempts to clean it should be made. Set 'dirty' back to False after
177    a successful update.
178    If 'use_lock' is True, an asyncio Lock will be created and incorporated
179    into update attempts to prevent simultaneous updates (should_update will
180    only return True when the lock is unlocked). Note that It is up to the user
181    to lock/unlock the lock during the actual update attempt.
182    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
183    itself back to dirty after being clean for the given amount of time.
184    'min_update_interval' can be used to enforce a minimum update
185    interval even when updates are successful (retry_interval only applies
186    when updates fail)
187    """
188
189    def __init__(
190        self,
191        dirty: bool = False,
192        retry_interval: float = 5.0,
193        *,
194        use_lock: bool = False,
195        auto_dirty_seconds: float | None = None,
196        min_update_interval: float | None = None,
197    ):
198        curtime = time.monotonic()
199        self._retry_interval = retry_interval
200        self._auto_dirty_seconds = auto_dirty_seconds
201        self._min_update_interval = min_update_interval
202        self._dirty = dirty
203        self._next_update_time: float | None = curtime if dirty else None
204        self._last_update_time: float | None = None
205        self._next_auto_dirty_time: float | None = (
206            (curtime + self._auto_dirty_seconds)
207            if (not dirty and self._auto_dirty_seconds is not None)
208            else None
209        )
210        self._use_lock = use_lock
211        self.lock: asyncio.Lock
212        if self._use_lock:
213            import asyncio
214
215            self.lock = asyncio.Lock()
216
217    @property
218    def dirty(self) -> bool:
219        """Whether the target is currently dirty.
220
221        This should be set to False once an update is successful.
222        """
223        return self._dirty
224
225    @dirty.setter
226    def dirty(self, value: bool) -> None:
227        # If we're freshly clean, set our next auto-dirty time (if we have
228        # one).
229        if self._dirty and not value and self._auto_dirty_seconds is not None:
230            self._next_auto_dirty_time = (
231                time.monotonic() + self._auto_dirty_seconds
232            )
233
234        # If we're freshly dirty, schedule an immediate update.
235        if not self._dirty and value:
236            self._next_update_time = time.monotonic()
237
238            # If they want to enforce a minimum update interval,
239            # push out the next update time if it hasn't been long enough.
240            if (
241                self._min_update_interval is not None
242                and self._last_update_time is not None
243            ):
244                self._next_update_time = max(
245                    self._next_update_time,
246                    self._last_update_time + self._min_update_interval,
247                )
248
249        self._dirty = value
250
251    @property
252    def should_update(self) -> bool:
253        """Whether an attempt should be made to clean the target now.
254
255        Always returns False if the target is not dirty.
256        Takes into account the amount of time passed since the target
257        was marked dirty or since should_update last returned True.
258        """
259        curtime = time.monotonic()
260
261        # Auto-dirty ourself if we're into that.
262        if (
263            self._next_auto_dirty_time is not None
264            and curtime > self._next_auto_dirty_time
265        ):
266            self.dirty = True
267            self._next_auto_dirty_time = None
268        if not self._dirty:
269            return False
270        if self._use_lock and self.lock.locked():
271            return False
272        assert self._next_update_time is not None
273        if curtime > self._next_update_time:
274            self._next_update_time = curtime + self._retry_interval
275            self._last_update_time = curtime
276            return True
277        return False
278
279
280class DispatchMethodWrapper(Generic[ArgT, RetT]):
281    """Type-aware standin for the dispatch func returned by dispatchmethod."""
282
283    def __call__(self, arg: ArgT) -> RetT:
284        raise RuntimeError('Should not get here')
285
286    @staticmethod
287    def register(
288        func: Callable[[Any, Any], RetT]
289    ) -> Callable[[Any, Any], RetT]:
290        """Register a new dispatch handler for this dispatch-method."""
291        raise RuntimeError('Should not get here')
292
293    registry: dict[Any, Callable]
294
295
296# noinspection PyProtectedMember,PyTypeHints
297def dispatchmethod(
298    func: Callable[[Any, ArgT], RetT]
299) -> DispatchMethodWrapper[ArgT, RetT]:
300    """A variation of functools.singledispatch for methods.
301
302    Note: as of Python 3.9 there is now functools.singledispatchmethod,
303    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
304    which gives us a reason to keep this one around for now.
305    """
306    from functools import singledispatch, update_wrapper
307
308    origwrapper: Any = singledispatch(func)
309
310    # Pull this out so hopefully origwrapper can die,
311    # otherwise we reference origwrapper in our wrapper.
312    dispatch = origwrapper.dispatch
313
314    # All we do here is recreate the end of functools.singledispatch
315    # where it returns a wrapper except instead of the wrapper using the
316    # first arg to the function ours uses the second (to skip 'self').
317    # This was made against Python 3.7; we should probably check up on
318    # this in later versions in case anything has changed.
319    # (or hopefully they'll add this functionality to their version)
320    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
321    def wrapper(*args: Any, **kw: Any) -> Any:
322        if not args or len(args) < 2:
323            raise TypeError(
324                f'{funcname} requires at least ' '2 positional arguments'
325            )
326
327        return dispatch(args[1].__class__)(*args, **kw)
328
329    funcname = getattr(func, '__name__', 'dispatchmethod method')
330    wrapper.register = origwrapper.register  # type: ignore
331    wrapper.dispatch = dispatch  # type: ignore
332    wrapper.registry = origwrapper.registry  # type: ignore
333    # pylint: disable=protected-access
334    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
335    update_wrapper(wrapper, func)
336    # pylint: enable=protected-access
337    return cast(DispatchMethodWrapper, wrapper)
338
339
340def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
341    """Decorator for functions to allow dispatching based on a value.
342
343    This differs from functools.singledispatch in that it dispatches based
344    on the value of an argument, not based on its type.
345    The 'register' method of a value-dispatch function can be used
346    to assign new functions to handle particular values.
347    Unhandled values wind up in the original dispatch function."""
348    return ValueDispatcher(call)
349
350
351class ValueDispatcher(Generic[ValT, RetT]):
352    """Used by the valuedispatch decorator"""
353
354    def __init__(self, call: Callable[[ValT], RetT]) -> None:
355        self._base_call = call
356        self._handlers: dict[ValT, Callable[[], RetT]] = {}
357
358    def __call__(self, value: ValT) -> RetT:
359        handler = self._handlers.get(value)
360        if handler is not None:
361            return handler()
362        return self._base_call(value)
363
364    def _add_handler(
365        self, value: ValT, call: Callable[[], RetT]
366    ) -> Callable[[], RetT]:
367        if value in self._handlers:
368            raise RuntimeError(f'Duplicate handlers added for {value}')
369        self._handlers[value] = call
370        return call
371
372    def register(
373        self, value: ValT
374    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
375        """Add a handler to the dispatcher."""
376        from functools import partial
377
378        return partial(self._add_handler, value)
379
380
381def valuedispatch1arg(
382    call: Callable[[ValT, ArgT], RetT]
383) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
384    """Like valuedispatch but for functions taking an extra argument."""
385    return ValueDispatcher1Arg(call)
386
387
388class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
389    """Used by the valuedispatch1arg decorator"""
390
391    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
392        self._base_call = call
393        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
394
395    def __call__(self, value: ValT, arg: ArgT) -> RetT:
396        handler = self._handlers.get(value)
397        if handler is not None:
398            return handler(arg)
399        return self._base_call(value, arg)
400
401    def _add_handler(
402        self, value: ValT, call: Callable[[ArgT], RetT]
403    ) -> Callable[[ArgT], RetT]:
404        if value in self._handlers:
405            raise RuntimeError(f'Duplicate handlers added for {value}')
406        self._handlers[value] = call
407        return call
408
409    def register(
410        self, value: ValT
411    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
412        """Add a handler to the dispatcher."""
413        from functools import partial
414
415        return partial(self._add_handler, value)
416
417
418if TYPE_CHECKING:
419
420    class ValueDispatcherMethod(Generic[ValT, RetT]):
421        """Used by the valuedispatchmethod decorator."""
422
423        def __call__(self, value: ValT) -> RetT: ...
424
425        def register(
426            self, value: ValT
427        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
428            """Add a handler to the dispatcher."""
429            ...
430
431
432def valuedispatchmethod(
433    call: Callable[[SelfT, ValT], RetT]
434) -> ValueDispatcherMethod[ValT, RetT]:
435    """Like valuedispatch but works with methods instead of functions."""
436
437    # NOTE: It seems that to wrap a method with a decorator and have self
438    # dispatching do the right thing, we must return a function and not
439    # an executable object. So for this version we store our data here
440    # in the function call dict and simply return a call.
441
442    _base_call = call
443    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
444
445    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
446        if value in _handlers:
447            raise RuntimeError(f'Duplicate handlers added for {value}')
448        _handlers[value] = addcall
449
450    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
451        from functools import partial
452
453        return partial(_add_handler, value)
454
455    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
456        handler = _handlers.get(value)
457        if handler is not None:
458            return handler(self)
459        return _base_call(self, value)
460
461    # We still want to use our returned object to register handlers, but we're
462    # actually just returning a function. So manually stuff the call onto it.
463    setattr(_call_wrapper, 'register', _register)
464
465    # To the type checker's eyes we return a ValueDispatchMethod instance;
466    # this lets it know about our register func and type-check its usage.
467    # In reality we just return a raw function call (for reasons listed above).
468    # pylint: disable=undefined-variable, no-else-return
469    if TYPE_CHECKING:
470        return ValueDispatcherMethod[ValT, RetT]()
471    else:
472        return _call_wrapper
473
474
475def make_hash(obj: Any) -> int:
476    """Makes a hash from a dictionary, list, tuple or set to any level,
477    that contains only other hashable types (including any lists, tuples,
478    sets, and dictionaries).
479
480    Note that this uses Python's hash() function internally so collisions/etc.
481    may be more common than with fancy cryptographic hashes.
482
483    Also be aware that Python's hash() output varies across processes, so
484    this should only be used for values that will remain in a single process.
485    """
486    import copy
487
488    if isinstance(obj, (set, tuple, list)):
489        return hash(tuple(make_hash(e) for e in obj))
490    if not isinstance(obj, dict):
491        return hash(obj)
492
493    new_obj = copy.deepcopy(obj)
494    for k, v in new_obj.items():
495        new_obj[k] = make_hash(v)
496
497    # NOTE: there is sorted works correctly because it compares only
498    # unique first values (i.e. dict keys)
499    return hash(tuple(frozenset(sorted(new_obj.items()))))
500
501
502def float_hash_from_string(s: str) -> float:
503    """Given a string value, returns a float between 0 and 1.
504
505    If consistent across processes. Can be useful for assigning db ids
506    shard values for efficient parallel processing.
507    """
508    import hashlib
509
510    hash_bytes = hashlib.md5(s.encode()).digest()
511
512    # Generate a random 64 bit int from hash digest bytes.
513    ival = int.from_bytes(hash_bytes[:8])
514    return ival / ((1 << 64) - 1)
515
516
517def asserttype(obj: Any, typ: type[T]) -> T:
518    """Return an object typed as a given type.
519
520    Assert is used to check its actual type, so only use this when
521    failures are not expected. Otherwise use checktype.
522    """
523    assert isinstance(typ, type), 'only actual types accepted'
524    assert isinstance(obj, typ)
525    return obj
526
527
528def asserttype_o(obj: Any, typ: type[T]) -> T | None:
529    """Return an object typed as a given optional type.
530
531    Assert is used to check its actual type, so only use this when
532    failures are not expected. Otherwise use checktype.
533    """
534    assert isinstance(typ, type), 'only actual types accepted'
535    assert isinstance(obj, (typ, type(None)))
536    return obj
537
538
539def checktype(obj: Any, typ: type[T]) -> T:
540    """Return an object typed as a given type.
541
542    Always checks the type at runtime with isinstance and throws a TypeError
543    on failure. Use asserttype for more efficient (but less safe) equivalent.
544    """
545    assert isinstance(typ, type), 'only actual types accepted'
546    if not isinstance(obj, typ):
547        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
548    return obj
549
550
551def checktype_o(obj: Any, typ: type[T]) -> T | None:
552    """Return an object typed as a given optional type.
553
554    Always checks the type at runtime with isinstance and throws a TypeError
555    on failure. Use asserttype for more efficient (but less safe) equivalent.
556    """
557    assert isinstance(typ, type), 'only actual types accepted'
558    if not isinstance(obj, (typ, type(None))):
559        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
560    return obj
561
562
563def warntype(obj: Any, typ: type[T]) -> T:
564    """Return an object typed as a given type.
565
566    Always checks the type at runtime and simply logs a warning if it is
567    not what is expected.
568    """
569    assert isinstance(typ, type), 'only actual types accepted'
570    if not isinstance(obj, typ):
571        import logging
572
573        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
574    return obj  # type: ignore
575
576
577def warntype_o(obj: Any, typ: type[T]) -> T | None:
578    """Return an object typed as a given type.
579
580    Always checks the type at runtime and simply logs a warning if it is
581    not what is expected.
582    """
583    assert isinstance(typ, type), 'only actual types accepted'
584    if not isinstance(obj, (typ, type(None))):
585        import logging
586
587        logging.warning(
588            'warntype: expected a %s or None, got a %s', typ, type(obj)
589        )
590    return obj  # type: ignore
591
592
593def assert_non_optional(obj: T | None) -> T:
594    """Return an object with Optional typing removed.
595
596    Assert is used to check its actual type, so only use this when
597    failures are not expected. Use check_non_optional otherwise.
598    """
599    assert obj is not None
600    return obj
601
602
603def check_non_optional(obj: T | None) -> T:
604    """Return an object with Optional typing removed.
605
606    Always checks the actual type and throws a TypeError on failure.
607    Use assert_non_optional for a more efficient (but less safe) equivalent.
608    """
609    if obj is None:
610        raise ValueError('Got None value in check_non_optional.')
611    return obj
612
613
614def smoothstep(edge0: float, edge1: float, x: float) -> float:
615    """A smooth transition function.
616
617    Returns a value that smoothly moves from 0 to 1 as we go between edges.
618    Values outside of the range return 0 or 1.
619    """
620    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
621    return y * y * (3.0 - 2.0 * y)
622
623
624def linearstep(edge0: float, edge1: float, x: float) -> float:
625    """A linear transition function.
626
627    Returns a value that linearly moves from 0 to 1 as we go between edges.
628    Values outside of the range return 0 or 1.
629    """
630    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
631
632
633def _compact_id(num: int, chars: str) -> str:
634    if num < 0:
635        raise ValueError('Negative integers not allowed.')
636
637    # Chars must be in sorted order for sorting to work correctly
638    # on our output.
639    assert ''.join(sorted(list(chars))) == chars
640
641    base = len(chars)
642    out = ''
643    while num:
644        out += chars[num % base]
645        num //= base
646    return out[::-1] or '0'
647
648
649def human_readable_compact_id(num: int) -> str:
650    """Given a positive int, return a compact string representation for it.
651
652    Handy for visualizing unique numeric ids using as few as possible chars.
653    This representation uses only lowercase letters and numbers (minus the
654    following letters for readability):
655     's' is excluded due to similarity to '5'.
656     'l' is excluded due to similarity to '1'.
657     'i' is excluded due to similarity to '1'.
658     'o' is excluded due to similarity to '0'.
659     'z' is excluded due to similarity to '2'.
660
661    Therefore for n chars this can store values of 21^n.
662
663    When reading human input consisting of these IDs, it may be desirable
664    to map the disallowed chars to their corresponding allowed ones
665    ('o' -> '0', etc).
666
667    Sort order for these ids is the same as the original numbers.
668
669    If more compactness is desired at the expense of readability,
670    use compact_id() instead.
671    """
672    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
673
674
675def compact_id(num: int) -> str:
676    """Given a positive int, return a compact string representation for it.
677
678    Handy for visualizing unique numeric ids using as few as possible chars.
679    This version is more compact than human_readable_compact_id() but less
680    friendly to humans due to using both capital and lowercase letters,
681    both 'O' and '0', etc.
682
683    Therefore for n chars this can store values of 62^n.
684
685    Sort order for these ids is the same as the original numbers.
686    """
687    return _compact_id(
688        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
689    )
690
691
692def caller_source_location() -> str:
693    """Returns source file name and line of the code calling us.
694
695    Example: 'mymodule.py:23'
696    """
697    try:
698        import inspect
699
700        frame = inspect.currentframe()
701        for _i in range(2):
702            if frame is None:
703                raise RuntimeError()
704            frame = frame.f_back
705        if frame is None:
706            raise RuntimeError()
707        fname = os.path.basename(frame.f_code.co_filename)
708        return f'{fname}:{frame.f_lineno}'
709    except Exception:
710        return '<unknown source location>'
711
712
713def unchanging_hostname() -> str:
714    """Return an unchanging name for the local device.
715
716    Similar to the `hostname` call (or os.uname().nodename in Python)
717    except attempts to give a name that doesn't change depending on
718    network conditions. (A Mac will tend to go from Foo to Foo.local,
719    Foo.lan etc. throughout its various adventures)
720    """
721    import platform
722    import subprocess
723
724    # On Mac, this should give the computer name assigned in System Prefs.
725    if platform.system() == 'Darwin':
726        return (
727            subprocess.run(
728                ['scutil', '--get', 'ComputerName'],
729                check=True,
730                capture_output=True,
731            )
732            .stdout.decode()
733            .strip()
734            .replace(' ', '-')
735        )
736    return os.uname().nodename
737
738
739def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
740    """Do the thing."""
741    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
742        return
743
744    modulename = module_globals.get('__name__')
745    if not isinstance(modulename, str):
746        raise RuntimeError('Unable to get module name.')
747    assert not modulename.startswith('_')
748    modulename_prefix = f'{modulename}.'
749    modulename_prefix_2 = f'_{modulename}.'
750
751    for name, obj in module_globals.items():
752        if name.startswith('_'):
753            continue
754        existing = getattr(obj, '__module__', None)
755        try:
756            # Override the module ONLY if it lives under us somewhere.
757            # So ourpackage._submodule.Foo becomes ourpackage.Foo
758            # but otherpackage._submodule.Foo remains untouched.
759            if existing is not None and (
760                existing.startswith(modulename_prefix)
761                or existing.startswith(modulename_prefix_2)
762            ):
763                obj.__module__ = modulename
764        except Exception:
765            import logging
766
767            logging.warning(
768                'set_canonical_module_names: unable to change __module__'
769                " from '%s' to '%s' on %s object at '%s'.",
770                existing,
771                modulename,
772                type(obj),
773                name,
774            )
775
776
777def timedelta_str(
778    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
779) -> str:
780    """Return a simple human readable time string for a length of time.
781
782    Time can be given as a timedelta or a float representing seconds.
783    Example output:
784      "23d 1h 2m 32s" (with maxparts == 4)
785      "23d 1h" (with maxparts == 2)
786      "23d 1.08h" (with maxparts == 2 and decimals == 2)
787
788    Note that this is hard-coded in English and probably not especially
789    performant.
790    """
791    # pylint: disable=too-many-locals
792
793    if isinstance(timeval, float):
794        timevalfin = datetime.timedelta(seconds=timeval)
795    else:
796        timevalfin = timeval
797
798    # Internally we only handle positive values.
799    if timevalfin.total_seconds() < 0:
800        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
801
802    years = timevalfin.days // 365
803    days = timevalfin.days % 365
804    hours = timevalfin.seconds // 3600
805    hour_remainder = timevalfin.seconds % 3600
806    minutes = hour_remainder // 60
807    seconds = hour_remainder % 60
808
809    # Now, if we want decimal places for our last value,
810    # calc fractional parts.
811    if decimals:
812        # Calc totals of each type.
813        t_seconds = timevalfin.total_seconds()
814        t_minutes = t_seconds / 60
815        t_hours = t_minutes / 60
816        t_days = t_hours / 24
817        t_years = t_days / 365
818
819        # Calc fractional parts that exclude all whole values to their left.
820        years_covered = years
821        years_f = t_years - years_covered
822        days_covered = years_covered * 365 + days
823        days_f = t_days - days_covered
824        hours_covered = days_covered * 24 + hours
825        hours_f = t_hours - hours_covered
826        minutes_covered = hours_covered * 60 + minutes
827        minutes_f = t_minutes - minutes_covered
828        seconds_covered = minutes_covered * 60 + seconds
829        seconds_f = t_seconds - seconds_covered
830    else:
831        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
832
833    parts: list[str] = []
834    for part, part_f, suffix in (
835        (years, years_f, 'y'),
836        (days, days_f, 'd'),
837        (hours, hours_f, 'h'),
838        (minutes, minutes_f, 'm'),
839        (seconds, seconds_f, 's'),
840    ):
841        if part or parts or (not parts and suffix == 's'):
842            # Do decimal version only for the last part.
843            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
844                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
845            else:
846                parts.append(f'{part}{suffix}')
847            if len(parts) >= maxparts:
848                break
849    return ' '.join(parts)
850
851
852def ago_str(
853    timeval: datetime.datetime,
854    maxparts: int = 1,
855    now: datetime.datetime | None = None,
856    decimals: int = 0,
857) -> str:
858    """Given a datetime, return a clean human readable 'ago' str.
859
860    Note that this is hard-coded in English so should not be used
861    for visible in-game elements; only tools/etc.
862
863    If now is not passed, efro.util.utc_now() is used.
864    """
865    if now is None:
866        now = utc_now()
867    return (
868        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
869        + ' ago'
870    )
871
872
873def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
874    """Split a single list into smaller lists."""
875    return [
876        input_list[i : i + max_length]
877        for i in range(0, len(input_list), max_length)
878    ]
879
880
881def extract_flag(args: list[str], name: str) -> bool:
882    """Given a list of args and a flag name, returns whether it is present.
883
884    The arg flag, if present, is removed from the arg list.
885    """
886    from efro.error import CleanError
887
888    count = args.count(name)
889    if count > 1:
890        raise CleanError(f'Flag {name} passed multiple times.')
891    if not count:
892        return False
893    args.remove(name)
894    return True
895
896
897@overload
898def extract_arg(
899    args: list[str], name: str, required: Literal[False] = False
900) -> str | None: ...
901
902
903@overload
904def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ...
905
906
907def extract_arg(
908    args: list[str], name: str, required: bool = False
909) -> str | None:
910    """Given a list of args and an arg name, returns a value.
911
912    The arg flag and value are removed from the arg list.
913    raises CleanErrors on any problems.
914    """
915    from efro.error import CleanError
916
917    count = args.count(name)
918    if not count:
919        if required:
920            raise CleanError(f'Required argument {name} not passed.')
921        return None
922
923    if count > 1:
924        raise CleanError(f'Arg {name} passed multiple times.')
925
926    argindex = args.index(name)
927    if argindex + 1 >= len(args):
928        raise CleanError(f'No value passed after {name} arg.')
929
930    val = args[argindex + 1]
931    del args[argindex : argindex + 2]
932
933    return val
def explicit_bool(val: bool) -> bool:
37def explicit_bool(val: bool) -> bool:
38    """Return a non-inferable boolean value.
39
40    Useful to be able to disable blocks of code without type checkers
41    complaining/etc.
42    """
43    # pylint: disable=no-else-return
44    if TYPE_CHECKING:
45        # infer this! <boom>
46        import random
47
48        return random.random() < 0.5
49    else:
50        return val

Return a non-inferable boolean value.

Useful to be able to disable blocks of code without type checkers complaining/etc.

def snake_case_to_title(val: str) -> str:
53def snake_case_to_title(val: str) -> str:
54    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
55    # Kill empty words resulting from leading/trailing/multiple underscores.
56    return ' '.join(w for w in val.split('_') if w).title()

Given a snake-case string 'foo_bar', returns 'Foo Bar'.

def snake_case_to_camel_case(val: str) -> str:
59def snake_case_to_camel_case(val: str) -> str:
60    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
61    # Replace underscores with spaces; capitalize words; kill spaces.
62    # Not sure about efficiency, but logically simple.
63    return val.replace('_', ' ').title().replace(' ', '')

Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.

def check_utc(value: datetime.datetime) -> None:
66def check_utc(value: datetime.datetime) -> None:
67    """Ensure a datetime value is timezone-aware utc."""
68    if value.tzinfo is not datetime.UTC:
69        raise ValueError(
70            'datetime value does not have timezone set as datetime.UTC'
71        )

Ensure a datetime value is timezone-aware utc.

def utc_now() -> datetime.datetime:
74def utc_now() -> datetime.datetime:
75    """Get timezone-aware current utc time.
76
77    Just a shortcut for datetime.datetime.now(datetime.UTC).
78    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
79    times.
80    """
81    return datetime.datetime.now(datetime.UTC)

Get timezone-aware current utc time.

Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.

def utc_now_naive() -> datetime.datetime:
84def utc_now_naive() -> datetime.datetime:
85    """Get naive utc time.
86
87    This can be used to replace datetime.utcnow(), which is now deprecated.
88    Most all code should migrate to use timezone-aware times instead of
89    this.
90    """
91    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)

Get naive utc time.

This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.

def utc_today() -> datetime.datetime:
94def utc_today() -> datetime.datetime:
95    """Get offset-aware midnight in the utc time zone."""
96    now = datetime.datetime.now(datetime.UTC)
97    return datetime.datetime(
98        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
99    )

Get offset-aware midnight in the utc time zone.

def utc_this_hour() -> datetime.datetime:
102def utc_this_hour() -> datetime.datetime:
103    """Get offset-aware beginning of the current hour in the utc time zone."""
104    now = datetime.datetime.now(datetime.UTC)
105    return datetime.datetime(
106        year=now.year,
107        month=now.month,
108        day=now.day,
109        hour=now.hour,
110        tzinfo=now.tzinfo,
111    )

Get offset-aware beginning of the current hour in the utc time zone.

def utc_this_minute() -> datetime.datetime:
114def utc_this_minute() -> datetime.datetime:
115    """Get offset-aware beginning of current minute in the utc time zone."""
116    now = datetime.datetime.now(datetime.UTC)
117    return datetime.datetime(
118        year=now.year,
119        month=now.month,
120        day=now.day,
121        hour=now.hour,
122        minute=now.minute,
123        tzinfo=now.tzinfo,
124    )

Get offset-aware beginning of current minute in the utc time zone.

def empty_weakref(objtype: type[~T]) -> weakref.ReferenceType[~T]:
127def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
128    """Return an invalidated weak-reference for the specified type."""
129    # At runtime, all weakrefs are the same; our type arg is just
130    # for the static type checker.
131    del objtype  # Unused.
132
133    # Just create an object and let it die. Is there a cleaner way to do this?
134    # return weakref.ref(_EmptyObj())  # type: ignore
135
136    # Sharing a single ones seems at least a bit better.
137    return _g_empty_weak_ref  # type: ignore

Return an invalidated weak-reference for the specified type.

def data_size_str(bytecount: int, compact: bool = False) -> str:
140def data_size_str(bytecount: int, compact: bool = False) -> str:
141    """Given a size in bytes, returns a short human readable string.
142
143    In compact mode this should be 6 or fewer chars for most all
144    sane file sizes.
145    """
146    # pylint: disable=too-many-return-statements
147
148    # Special case: handle negatives.
149    if bytecount < 0:
150        val = data_size_str(-bytecount, compact=compact)
151        return f'-{val}'
152
153    if bytecount <= 999:
154        suffix = 'B' if compact else 'bytes'
155        return f'{bytecount} {suffix}'
156    kbytecount = bytecount / 1024
157    if round(kbytecount, 1) < 10.0:
158        return f'{kbytecount:.1f} KB'
159    if round(kbytecount, 0) < 999:
160        return f'{kbytecount:.0f} KB'
161    mbytecount = bytecount / (1024 * 1024)
162    if round(mbytecount, 1) < 10.0:
163        return f'{mbytecount:.1f} MB'
164    if round(mbytecount, 0) < 999:
165        return f'{mbytecount:.0f} MB'
166    gbytecount = bytecount / (1024 * 1024 * 1024)
167    if round(gbytecount, 1) < 10.0:
168        return f'{gbytecount:.1f} GB'
169    return f'{gbytecount:.0f} GB'

Given a size in bytes, returns a short human readable string.

In compact mode this should be 6 or fewer chars for most all sane file sizes.

class DirtyBit:
172class DirtyBit:
173    """Manages whether a thing is dirty and regulates attempts to clean it.
174
175    To use, simply set the 'dirty' value on this object to True when some
176    action is needed, and then check the 'should_update' value to regulate
177    when attempts to clean it should be made. Set 'dirty' back to False after
178    a successful update.
179    If 'use_lock' is True, an asyncio Lock will be created and incorporated
180    into update attempts to prevent simultaneous updates (should_update will
181    only return True when the lock is unlocked). Note that It is up to the user
182    to lock/unlock the lock during the actual update attempt.
183    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
184    itself back to dirty after being clean for the given amount of time.
185    'min_update_interval' can be used to enforce a minimum update
186    interval even when updates are successful (retry_interval only applies
187    when updates fail)
188    """
189
190    def __init__(
191        self,
192        dirty: bool = False,
193        retry_interval: float = 5.0,
194        *,
195        use_lock: bool = False,
196        auto_dirty_seconds: float | None = None,
197        min_update_interval: float | None = None,
198    ):
199        curtime = time.monotonic()
200        self._retry_interval = retry_interval
201        self._auto_dirty_seconds = auto_dirty_seconds
202        self._min_update_interval = min_update_interval
203        self._dirty = dirty
204        self._next_update_time: float | None = curtime if dirty else None
205        self._last_update_time: float | None = None
206        self._next_auto_dirty_time: float | None = (
207            (curtime + self._auto_dirty_seconds)
208            if (not dirty and self._auto_dirty_seconds is not None)
209            else None
210        )
211        self._use_lock = use_lock
212        self.lock: asyncio.Lock
213        if self._use_lock:
214            import asyncio
215
216            self.lock = asyncio.Lock()
217
218    @property
219    def dirty(self) -> bool:
220        """Whether the target is currently dirty.
221
222        This should be set to False once an update is successful.
223        """
224        return self._dirty
225
226    @dirty.setter
227    def dirty(self, value: bool) -> None:
228        # If we're freshly clean, set our next auto-dirty time (if we have
229        # one).
230        if self._dirty and not value and self._auto_dirty_seconds is not None:
231            self._next_auto_dirty_time = (
232                time.monotonic() + self._auto_dirty_seconds
233            )
234
235        # If we're freshly dirty, schedule an immediate update.
236        if not self._dirty and value:
237            self._next_update_time = time.monotonic()
238
239            # If they want to enforce a minimum update interval,
240            # push out the next update time if it hasn't been long enough.
241            if (
242                self._min_update_interval is not None
243                and self._last_update_time is not None
244            ):
245                self._next_update_time = max(
246                    self._next_update_time,
247                    self._last_update_time + self._min_update_interval,
248                )
249
250        self._dirty = value
251
252    @property
253    def should_update(self) -> bool:
254        """Whether an attempt should be made to clean the target now.
255
256        Always returns False if the target is not dirty.
257        Takes into account the amount of time passed since the target
258        was marked dirty or since should_update last returned True.
259        """
260        curtime = time.monotonic()
261
262        # Auto-dirty ourself if we're into that.
263        if (
264            self._next_auto_dirty_time is not None
265            and curtime > self._next_auto_dirty_time
266        ):
267            self.dirty = True
268            self._next_auto_dirty_time = None
269        if not self._dirty:
270            return False
271        if self._use_lock and self.lock.locked():
272            return False
273        assert self._next_update_time is not None
274        if curtime > self._next_update_time:
275            self._next_update_time = curtime + self._retry_interval
276            self._last_update_time = curtime
277            return True
278        return False

Manages whether a thing is dirty and regulates attempts to clean it.

To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

DirtyBit( dirty: bool = False, retry_interval: float = 5.0, *, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)
190    def __init__(
191        self,
192        dirty: bool = False,
193        retry_interval: float = 5.0,
194        *,
195        use_lock: bool = False,
196        auto_dirty_seconds: float | None = None,
197        min_update_interval: float | None = None,
198    ):
199        curtime = time.monotonic()
200        self._retry_interval = retry_interval
201        self._auto_dirty_seconds = auto_dirty_seconds
202        self._min_update_interval = min_update_interval
203        self._dirty = dirty
204        self._next_update_time: float | None = curtime if dirty else None
205        self._last_update_time: float | None = None
206        self._next_auto_dirty_time: float | None = (
207            (curtime + self._auto_dirty_seconds)
208            if (not dirty and self._auto_dirty_seconds is not None)
209            else None
210        )
211        self._use_lock = use_lock
212        self.lock: asyncio.Lock
213        if self._use_lock:
214            import asyncio
215
216            self.lock = asyncio.Lock()
lock: asyncio.locks.Lock
dirty: bool
218    @property
219    def dirty(self) -> bool:
220        """Whether the target is currently dirty.
221
222        This should be set to False once an update is successful.
223        """
224        return self._dirty

Whether the target is currently dirty.

This should be set to False once an update is successful.

should_update: bool
252    @property
253    def should_update(self) -> bool:
254        """Whether an attempt should be made to clean the target now.
255
256        Always returns False if the target is not dirty.
257        Takes into account the amount of time passed since the target
258        was marked dirty or since should_update last returned True.
259        """
260        curtime = time.monotonic()
261
262        # Auto-dirty ourself if we're into that.
263        if (
264            self._next_auto_dirty_time is not None
265            and curtime > self._next_auto_dirty_time
266        ):
267            self.dirty = True
268            self._next_auto_dirty_time = None
269        if not self._dirty:
270            return False
271        if self._use_lock and self.lock.locked():
272            return False
273        assert self._next_update_time is not None
274        if curtime > self._next_update_time:
275            self._next_update_time = curtime + self._retry_interval
276            self._last_update_time = curtime
277            return True
278        return False

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class DispatchMethodWrapper(typing.Generic[~ArgT, ~RetT]):
281class DispatchMethodWrapper(Generic[ArgT, RetT]):
282    """Type-aware standin for the dispatch func returned by dispatchmethod."""
283
284    def __call__(self, arg: ArgT) -> RetT:
285        raise RuntimeError('Should not get here')
286
287    @staticmethod
288    def register(
289        func: Callable[[Any, Any], RetT]
290    ) -> Callable[[Any, Any], RetT]:
291        """Register a new dispatch handler for this dispatch-method."""
292        raise RuntimeError('Should not get here')
293
294    registry: dict[Any, Callable]

Type-aware standin for the dispatch func returned by dispatchmethod.

@staticmethod
def register(func: Callable[[Any, Any], ~RetT]) -> Callable[[Any, Any], ~RetT]:
287    @staticmethod
288    def register(
289        func: Callable[[Any, Any], RetT]
290    ) -> Callable[[Any, Any], RetT]:
291        """Register a new dispatch handler for this dispatch-method."""
292        raise RuntimeError('Should not get here')

Register a new dispatch handler for this dispatch-method.

registry: dict[typing.Any, typing.Callable]
def dispatchmethod( func: Callable[[Any, ~ArgT], ~RetT]) -> DispatchMethodWrapper[~ArgT, ~RetT]:
298def dispatchmethod(
299    func: Callable[[Any, ArgT], RetT]
300) -> DispatchMethodWrapper[ArgT, RetT]:
301    """A variation of functools.singledispatch for methods.
302
303    Note: as of Python 3.9 there is now functools.singledispatchmethod,
304    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
305    which gives us a reason to keep this one around for now.
306    """
307    from functools import singledispatch, update_wrapper
308
309    origwrapper: Any = singledispatch(func)
310
311    # Pull this out so hopefully origwrapper can die,
312    # otherwise we reference origwrapper in our wrapper.
313    dispatch = origwrapper.dispatch
314
315    # All we do here is recreate the end of functools.singledispatch
316    # where it returns a wrapper except instead of the wrapper using the
317    # first arg to the function ours uses the second (to skip 'self').
318    # This was made against Python 3.7; we should probably check up on
319    # this in later versions in case anything has changed.
320    # (or hopefully they'll add this functionality to their version)
321    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
322    def wrapper(*args: Any, **kw: Any) -> Any:
323        if not args or len(args) < 2:
324            raise TypeError(
325                f'{funcname} requires at least ' '2 positional arguments'
326            )
327
328        return dispatch(args[1].__class__)(*args, **kw)
329
330    funcname = getattr(func, '__name__', 'dispatchmethod method')
331    wrapper.register = origwrapper.register  # type: ignore
332    wrapper.dispatch = dispatch  # type: ignore
333    wrapper.registry = origwrapper.registry  # type: ignore
334    # pylint: disable=protected-access
335    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
336    update_wrapper(wrapper, func)
337    # pylint: enable=protected-access
338    return cast(DispatchMethodWrapper, wrapper)

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

def valuedispatch( call: Callable[[~ValT], ~RetT]) -> ValueDispatcher[~ValT, ~RetT]:
341def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
342    """Decorator for functions to allow dispatching based on a value.
343
344    This differs from functools.singledispatch in that it dispatches based
345    on the value of an argument, not based on its type.
346    The 'register' method of a value-dispatch function can be used
347    to assign new functions to handle particular values.
348    Unhandled values wind up in the original dispatch function."""
349    return ValueDispatcher(call)

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

class ValueDispatcher(typing.Generic[~ValT, ~RetT]):
352class ValueDispatcher(Generic[ValT, RetT]):
353    """Used by the valuedispatch decorator"""
354
355    def __init__(self, call: Callable[[ValT], RetT]) -> None:
356        self._base_call = call
357        self._handlers: dict[ValT, Callable[[], RetT]] = {}
358
359    def __call__(self, value: ValT) -> RetT:
360        handler = self._handlers.get(value)
361        if handler is not None:
362            return handler()
363        return self._base_call(value)
364
365    def _add_handler(
366        self, value: ValT, call: Callable[[], RetT]
367    ) -> Callable[[], RetT]:
368        if value in self._handlers:
369            raise RuntimeError(f'Duplicate handlers added for {value}')
370        self._handlers[value] = call
371        return call
372
373    def register(
374        self, value: ValT
375    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
376        """Add a handler to the dispatcher."""
377        from functools import partial
378
379        return partial(self._add_handler, value)

Used by the valuedispatch decorator

ValueDispatcher(call: Callable[[~ValT], ~RetT])
355    def __init__(self, call: Callable[[ValT], RetT]) -> None:
356        self._base_call = call
357        self._handlers: dict[ValT, Callable[[], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[], ~RetT]], Callable[[], ~RetT]]:
373    def register(
374        self, value: ValT
375    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
376        """Add a handler to the dispatcher."""
377        from functools import partial
378
379        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatch1arg( call: Callable[[~ValT, ~ArgT], ~RetT]) -> ValueDispatcher1Arg[~ValT, ~ArgT, ~RetT]:
382def valuedispatch1arg(
383    call: Callable[[ValT, ArgT], RetT]
384) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
385    """Like valuedispatch but for functions taking an extra argument."""
386    return ValueDispatcher1Arg(call)

Like valuedispatch but for functions taking an extra argument.

class ValueDispatcher1Arg(typing.Generic[~ValT, ~ArgT, ~RetT]):
389class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
390    """Used by the valuedispatch1arg decorator"""
391
392    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
393        self._base_call = call
394        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
395
396    def __call__(self, value: ValT, arg: ArgT) -> RetT:
397        handler = self._handlers.get(value)
398        if handler is not None:
399            return handler(arg)
400        return self._base_call(value, arg)
401
402    def _add_handler(
403        self, value: ValT, call: Callable[[ArgT], RetT]
404    ) -> Callable[[ArgT], RetT]:
405        if value in self._handlers:
406            raise RuntimeError(f'Duplicate handlers added for {value}')
407        self._handlers[value] = call
408        return call
409
410    def register(
411        self, value: ValT
412    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
413        """Add a handler to the dispatcher."""
414        from functools import partial
415
416        return partial(self._add_handler, value)

Used by the valuedispatch1arg decorator

ValueDispatcher1Arg(call: Callable[[~ValT, ~ArgT], ~RetT])
392    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
393        self._base_call = call
394        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[~ArgT], ~RetT]], Callable[[~ArgT], ~RetT]]:
410    def register(
411        self, value: ValT
412    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
413        """Add a handler to the dispatcher."""
414        from functools import partial
415
416        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatchmethod( call: Callable[[~SelfT, ~ValT], ~RetT]) -> efro.util.ValueDispatcherMethod[~ValT, ~RetT]:
433def valuedispatchmethod(
434    call: Callable[[SelfT, ValT], RetT]
435) -> ValueDispatcherMethod[ValT, RetT]:
436    """Like valuedispatch but works with methods instead of functions."""
437
438    # NOTE: It seems that to wrap a method with a decorator and have self
439    # dispatching do the right thing, we must return a function and not
440    # an executable object. So for this version we store our data here
441    # in the function call dict and simply return a call.
442
443    _base_call = call
444    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
445
446    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
447        if value in _handlers:
448            raise RuntimeError(f'Duplicate handlers added for {value}')
449        _handlers[value] = addcall
450
451    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
452        from functools import partial
453
454        return partial(_add_handler, value)
455
456    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
457        handler = _handlers.get(value)
458        if handler is not None:
459            return handler(self)
460        return _base_call(self, value)
461
462    # We still want to use our returned object to register handlers, but we're
463    # actually just returning a function. So manually stuff the call onto it.
464    setattr(_call_wrapper, 'register', _register)
465
466    # To the type checker's eyes we return a ValueDispatchMethod instance;
467    # this lets it know about our register func and type-check its usage.
468    # In reality we just return a raw function call (for reasons listed above).
469    # pylint: disable=undefined-variable, no-else-return
470    if TYPE_CHECKING:
471        return ValueDispatcherMethod[ValT, RetT]()
472    else:
473        return _call_wrapper

Like valuedispatch but works with methods instead of functions.

def make_hash(obj: Any) -> int:
476def make_hash(obj: Any) -> int:
477    """Makes a hash from a dictionary, list, tuple or set to any level,
478    that contains only other hashable types (including any lists, tuples,
479    sets, and dictionaries).
480
481    Note that this uses Python's hash() function internally so collisions/etc.
482    may be more common than with fancy cryptographic hashes.
483
484    Also be aware that Python's hash() output varies across processes, so
485    this should only be used for values that will remain in a single process.
486    """
487    import copy
488
489    if isinstance(obj, (set, tuple, list)):
490        return hash(tuple(make_hash(e) for e in obj))
491    if not isinstance(obj, dict):
492        return hash(obj)
493
494    new_obj = copy.deepcopy(obj)
495    for k, v in new_obj.items():
496        new_obj[k] = make_hash(v)
497
498    # NOTE: there is sorted works correctly because it compares only
499    # unique first values (i.e. dict keys)
500    return hash(tuple(frozenset(sorted(new_obj.items()))))

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.

def float_hash_from_string(s: str) -> float:
503def float_hash_from_string(s: str) -> float:
504    """Given a string value, returns a float between 0 and 1.
505
506    If consistent across processes. Can be useful for assigning db ids
507    shard values for efficient parallel processing.
508    """
509    import hashlib
510
511    hash_bytes = hashlib.md5(s.encode()).digest()
512
513    # Generate a random 64 bit int from hash digest bytes.
514    ival = int.from_bytes(hash_bytes[:8])
515    return ival / ((1 << 64) - 1)

Given a string value, returns a float between 0 and 1.

If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.

def asserttype(obj: Any, typ: type[~T]) -> ~T:
518def asserttype(obj: Any, typ: type[T]) -> T:
519    """Return an object typed as a given type.
520
521    Assert is used to check its actual type, so only use this when
522    failures are not expected. Otherwise use checktype.
523    """
524    assert isinstance(typ, type), 'only actual types accepted'
525    assert isinstance(obj, typ)
526    return obj

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def asserttype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
529def asserttype_o(obj: Any, typ: type[T]) -> T | None:
530    """Return an object typed as a given optional type.
531
532    Assert is used to check its actual type, so only use this when
533    failures are not expected. Otherwise use checktype.
534    """
535    assert isinstance(typ, type), 'only actual types accepted'
536    assert isinstance(obj, (typ, type(None)))
537    return obj

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def checktype(obj: Any, typ: type[~T]) -> ~T:
540def checktype(obj: Any, typ: type[T]) -> T:
541    """Return an object typed as a given type.
542
543    Always checks the type at runtime with isinstance and throws a TypeError
544    on failure. Use asserttype for more efficient (but less safe) equivalent.
545    """
546    assert isinstance(typ, type), 'only actual types accepted'
547    if not isinstance(obj, typ):
548        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
549    return obj

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def checktype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
552def checktype_o(obj: Any, typ: type[T]) -> T | None:
553    """Return an object typed as a given optional type.
554
555    Always checks the type at runtime with isinstance and throws a TypeError
556    on failure. Use asserttype for more efficient (but less safe) equivalent.
557    """
558    assert isinstance(typ, type), 'only actual types accepted'
559    if not isinstance(obj, (typ, type(None))):
560        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
561    return obj

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def warntype(obj: Any, typ: type[~T]) -> ~T:
564def warntype(obj: Any, typ: type[T]) -> T:
565    """Return an object typed as a given type.
566
567    Always checks the type at runtime and simply logs a warning if it is
568    not what is expected.
569    """
570    assert isinstance(typ, type), 'only actual types accepted'
571    if not isinstance(obj, typ):
572        import logging
573
574        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
575    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def warntype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
578def warntype_o(obj: Any, typ: type[T]) -> T | None:
579    """Return an object typed as a given type.
580
581    Always checks the type at runtime and simply logs a warning if it is
582    not what is expected.
583    """
584    assert isinstance(typ, type), 'only actual types accepted'
585    if not isinstance(obj, (typ, type(None))):
586        import logging
587
588        logging.warning(
589            'warntype: expected a %s or None, got a %s', typ, type(obj)
590        )
591    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def assert_non_optional(obj: Optional[~T]) -> ~T:
594def assert_non_optional(obj: T | None) -> T:
595    """Return an object with Optional typing removed.
596
597    Assert is used to check its actual type, so only use this when
598    failures are not expected. Use check_non_optional otherwise.
599    """
600    assert obj is not None
601    return obj

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

def check_non_optional(obj: Optional[~T]) -> ~T:
604def check_non_optional(obj: T | None) -> T:
605    """Return an object with Optional typing removed.
606
607    Always checks the actual type and throws a TypeError on failure.
608    Use assert_non_optional for a more efficient (but less safe) equivalent.
609    """
610    if obj is None:
611        raise ValueError('Got None value in check_non_optional.')
612    return obj

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

def smoothstep(edge0: float, edge1: float, x: float) -> float:
615def smoothstep(edge0: float, edge1: float, x: float) -> float:
616    """A smooth transition function.
617
618    Returns a value that smoothly moves from 0 to 1 as we go between edges.
619    Values outside of the range return 0 or 1.
620    """
621    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
622    return y * y * (3.0 - 2.0 * y)

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def linearstep(edge0: float, edge1: float, x: float) -> float:
625def linearstep(edge0: float, edge1: float, x: float) -> float:
626    """A linear transition function.
627
628    Returns a value that linearly moves from 0 to 1 as we go between edges.
629    Values outside of the range return 0 or 1.
630    """
631    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def human_readable_compact_id(num: int) -> str:
650def human_readable_compact_id(num: int) -> str:
651    """Given a positive int, return a compact string representation for it.
652
653    Handy for visualizing unique numeric ids using as few as possible chars.
654    This representation uses only lowercase letters and numbers (minus the
655    following letters for readability):
656     's' is excluded due to similarity to '5'.
657     'l' is excluded due to similarity to '1'.
658     'i' is excluded due to similarity to '1'.
659     'o' is excluded due to similarity to '0'.
660     'z' is excluded due to similarity to '2'.
661
662    Therefore for n chars this can store values of 21^n.
663
664    When reading human input consisting of these IDs, it may be desirable
665    to map the disallowed chars to their corresponding allowed ones
666    ('o' -> '0', etc).
667
668    Sort order for these ids is the same as the original numbers.
669
670    If more compactness is desired at the expense of readability,
671    use compact_id() instead.
672    """
673    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

def compact_id(num: int) -> str:
676def compact_id(num: int) -> str:
677    """Given a positive int, return a compact string representation for it.
678
679    Handy for visualizing unique numeric ids using as few as possible chars.
680    This version is more compact than human_readable_compact_id() but less
681    friendly to humans due to using both capital and lowercase letters,
682    both 'O' and '0', etc.
683
684    Therefore for n chars this can store values of 62^n.
685
686    Sort order for these ids is the same as the original numbers.
687    """
688    return _compact_id(
689        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
690    )

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

def caller_source_location() -> str:
693def caller_source_location() -> str:
694    """Returns source file name and line of the code calling us.
695
696    Example: 'mymodule.py:23'
697    """
698    try:
699        import inspect
700
701        frame = inspect.currentframe()
702        for _i in range(2):
703            if frame is None:
704                raise RuntimeError()
705            frame = frame.f_back
706        if frame is None:
707            raise RuntimeError()
708        fname = os.path.basename(frame.f_code.co_filename)
709        return f'{fname}:{frame.f_lineno}'
710    except Exception:
711        return '<unknown source location>'

Returns source file name and line of the code calling us.

Example: 'mymodule.py:23'

def unchanging_hostname() -> str:
714def unchanging_hostname() -> str:
715    """Return an unchanging name for the local device.
716
717    Similar to the `hostname` call (or os.uname().nodename in Python)
718    except attempts to give a name that doesn't change depending on
719    network conditions. (A Mac will tend to go from Foo to Foo.local,
720    Foo.lan etc. throughout its various adventures)
721    """
722    import platform
723    import subprocess
724
725    # On Mac, this should give the computer name assigned in System Prefs.
726    if platform.system() == 'Darwin':
727        return (
728            subprocess.run(
729                ['scutil', '--get', 'ComputerName'],
730                check=True,
731                capture_output=True,
732            )
733            .stdout.decode()
734            .strip()
735            .replace(' ', '-')
736        )
737    return os.uname().nodename

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn't change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

def set_canonical_module_names(module_globals: dict[str, typing.Any]) -> None:
740def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
741    """Do the thing."""
742    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
743        return
744
745    modulename = module_globals.get('__name__')
746    if not isinstance(modulename, str):
747        raise RuntimeError('Unable to get module name.')
748    assert not modulename.startswith('_')
749    modulename_prefix = f'{modulename}.'
750    modulename_prefix_2 = f'_{modulename}.'
751
752    for name, obj in module_globals.items():
753        if name.startswith('_'):
754            continue
755        existing = getattr(obj, '__module__', None)
756        try:
757            # Override the module ONLY if it lives under us somewhere.
758            # So ourpackage._submodule.Foo becomes ourpackage.Foo
759            # but otherpackage._submodule.Foo remains untouched.
760            if existing is not None and (
761                existing.startswith(modulename_prefix)
762                or existing.startswith(modulename_prefix_2)
763            ):
764                obj.__module__ = modulename
765        except Exception:
766            import logging
767
768            logging.warning(
769                'set_canonical_module_names: unable to change __module__'
770                " from '%s' to '%s' on %s object at '%s'.",
771                existing,
772                modulename,
773                type(obj),
774                name,
775            )

Do the thing.

def timedelta_str( timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0) -> str:
778def timedelta_str(
779    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
780) -> str:
781    """Return a simple human readable time string for a length of time.
782
783    Time can be given as a timedelta or a float representing seconds.
784    Example output:
785      "23d 1h 2m 32s" (with maxparts == 4)
786      "23d 1h" (with maxparts == 2)
787      "23d 1.08h" (with maxparts == 2 and decimals == 2)
788
789    Note that this is hard-coded in English and probably not especially
790    performant.
791    """
792    # pylint: disable=too-many-locals
793
794    if isinstance(timeval, float):
795        timevalfin = datetime.timedelta(seconds=timeval)
796    else:
797        timevalfin = timeval
798
799    # Internally we only handle positive values.
800    if timevalfin.total_seconds() < 0:
801        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
802
803    years = timevalfin.days // 365
804    days = timevalfin.days % 365
805    hours = timevalfin.seconds // 3600
806    hour_remainder = timevalfin.seconds % 3600
807    minutes = hour_remainder // 60
808    seconds = hour_remainder % 60
809
810    # Now, if we want decimal places for our last value,
811    # calc fractional parts.
812    if decimals:
813        # Calc totals of each type.
814        t_seconds = timevalfin.total_seconds()
815        t_minutes = t_seconds / 60
816        t_hours = t_minutes / 60
817        t_days = t_hours / 24
818        t_years = t_days / 365
819
820        # Calc fractional parts that exclude all whole values to their left.
821        years_covered = years
822        years_f = t_years - years_covered
823        days_covered = years_covered * 365 + days
824        days_f = t_days - days_covered
825        hours_covered = days_covered * 24 + hours
826        hours_f = t_hours - hours_covered
827        minutes_covered = hours_covered * 60 + minutes
828        minutes_f = t_minutes - minutes_covered
829        seconds_covered = minutes_covered * 60 + seconds
830        seconds_f = t_seconds - seconds_covered
831    else:
832        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
833
834    parts: list[str] = []
835    for part, part_f, suffix in (
836        (years, years_f, 'y'),
837        (days, days_f, 'd'),
838        (hours, hours_f, 'h'),
839        (minutes, minutes_f, 'm'),
840        (seconds, seconds_f, 's'),
841    ):
842        if part or parts or (not parts and suffix == 's'):
843            # Do decimal version only for the last part.
844            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
845                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
846            else:
847                parts.append(f'{part}{suffix}')
848            if len(parts) >= maxparts:
849                break
850    return ' '.join(parts)

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

def ago_str( timeval: datetime.datetime, maxparts: int = 1, now: datetime.datetime | None = None, decimals: int = 0) -> str:
853def ago_str(
854    timeval: datetime.datetime,
855    maxparts: int = 1,
856    now: datetime.datetime | None = None,
857    decimals: int = 0,
858) -> str:
859    """Given a datetime, return a clean human readable 'ago' str.
860
861    Note that this is hard-coded in English so should not be used
862    for visible in-game elements; only tools/etc.
863
864    If now is not passed, efro.util.utc_now() is used.
865    """
866    if now is None:
867        now = utc_now()
868    return (
869        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
870        + ' ago'
871    )

Given a datetime, return a clean human readable 'ago' str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, efro.util.utc_now() is used.

def split_list(input_list: list[~T], max_length: int) -> list[list[~T]]:
874def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
875    """Split a single list into smaller lists."""
876    return [
877        input_list[i : i + max_length]
878        for i in range(0, len(input_list), max_length)
879    ]

Split a single list into smaller lists.

def extract_flag(args: list[str], name: str) -> bool:
882def extract_flag(args: list[str], name: str) -> bool:
883    """Given a list of args and a flag name, returns whether it is present.
884
885    The arg flag, if present, is removed from the arg list.
886    """
887    from efro.error import CleanError
888
889    count = args.count(name)
890    if count > 1:
891        raise CleanError(f'Flag {name} passed multiple times.')
892    if not count:
893        return False
894    args.remove(name)
895    return True

Given a list of args and a flag name, returns whether it is present.

The arg flag, if present, is removed from the arg list.

def extract_arg(args: list[str], name: str, required: bool = False) -> str | None:
908def extract_arg(
909    args: list[str], name: str, required: bool = False
910) -> str | None:
911    """Given a list of args and an arg name, returns a value.
912
913    The arg flag and value are removed from the arg list.
914    raises CleanErrors on any problems.
915    """
916    from efro.error import CleanError
917
918    count = args.count(name)
919    if not count:
920        if required:
921            raise CleanError(f'Required argument {name} not passed.')
922        return None
923
924    if count > 1:
925        raise CleanError(f'Arg {name} passed multiple times.')
926
927    argindex = args.index(name)
928    if argindex + 1 >= len(args):
929        raise CleanError(f'No value passed after {name} arg.')
930
931    val = args[argindex + 1]
932    del args[argindex : argindex + 2]
933
934    return val

Given a list of args and an arg name, returns a value.

The arg flag and value are removed from the arg list. raises CleanErrors on any problems.