efro.util

Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Small handy bits of functionality."""
  4
  5from __future__ import annotations
  6
  7import os
  8import time
  9import weakref
 10import datetime
 11from enum import Enum
 12from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload
 13
 14if TYPE_CHECKING:
 15    import asyncio
 16    from typing import Any, Callable, Literal
 17
 18T = TypeVar('T')
 19ValT = TypeVar('ValT')
 20ArgT = TypeVar('ArgT')
 21SelfT = TypeVar('SelfT')
 22RetT = TypeVar('RetT')
 23EnumT = TypeVar('EnumT', bound=Enum)
 24
 25
 26class _EmptyObj:
 27    pass
 28
 29
 30# A dead weak-ref should be immutable, right? So we can create exactly
 31# one and return it for all cases that need an empty weak-ref.
 32_g_empty_weak_ref = weakref.ref(_EmptyObj())
 33assert _g_empty_weak_ref() is None
 34
 35
 36def explicit_bool(val: bool) -> bool:
 37    """Return a non-inferable boolean value.
 38
 39    Useful to be able to disable blocks of code without type checkers
 40    complaining/etc.
 41    """
 42    # pylint: disable=no-else-return
 43    if TYPE_CHECKING:
 44        # infer this! <boom>
 45        import random
 46
 47        return random.random() < 0.5
 48    else:
 49        return val
 50
 51
 52def snake_case_to_title(val: str) -> str:
 53    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 54    # Kill empty words resulting from leading/trailing/multiple underscores.
 55    return ' '.join(w for w in val.split('_') if w).title()
 56
 57
 58def snake_case_to_camel_case(val: str) -> str:
 59    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 60    # Replace underscores with spaces; capitalize words; kill spaces.
 61    # Not sure about efficiency, but logically simple.
 62    return val.replace('_', ' ').title().replace(' ', '')
 63
 64
 65def check_utc(value: datetime.datetime) -> None:
 66    """Ensure a datetime value is timezone-aware utc."""
 67    if value.tzinfo is not datetime.UTC:
 68        raise ValueError(
 69            'datetime value does not have timezone set as datetime.UTC'
 70        )
 71
 72
 73def utc_now() -> datetime.datetime:
 74    """Get timezone-aware current utc time.
 75
 76    Just a shortcut for datetime.datetime.now(datetime.UTC).
 77    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
 78    times.
 79    """
 80    return datetime.datetime.now(datetime.UTC)
 81
 82
 83def utc_now_naive() -> datetime.datetime:
 84    """Get naive utc time.
 85
 86    This can be used to replace datetime.utcnow(), which is now deprecated.
 87    Most all code should migrate to use timezone-aware times instead of
 88    this.
 89    """
 90    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
 91
 92
 93def utc_today() -> datetime.datetime:
 94    """Get offset-aware midnight in the utc time zone."""
 95    now = datetime.datetime.now(datetime.UTC)
 96    return datetime.datetime(
 97        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
 98    )
 99
100
101def utc_this_hour() -> datetime.datetime:
102    """Get offset-aware beginning of the current hour in the utc time zone."""
103    now = datetime.datetime.now(datetime.UTC)
104    return datetime.datetime(
105        year=now.year,
106        month=now.month,
107        day=now.day,
108        hour=now.hour,
109        tzinfo=now.tzinfo,
110    )
111
112
113def utc_this_minute() -> datetime.datetime:
114    """Get offset-aware beginning of current minute in the utc time zone."""
115    now = datetime.datetime.now(datetime.UTC)
116    return datetime.datetime(
117        year=now.year,
118        month=now.month,
119        day=now.day,
120        hour=now.hour,
121        minute=now.minute,
122        tzinfo=now.tzinfo,
123    )
124
125
126def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
127    """Return an invalidated weak-reference for the specified type."""
128    # At runtime, all weakrefs are the same; our type arg is just
129    # for the static type checker.
130    del objtype  # Unused.
131
132    # Just create an object and let it die. Is there a cleaner way to do this?
133    # return weakref.ref(_EmptyObj())  # type: ignore
134
135    # Sharing a single ones seems at least a bit better.
136    return _g_empty_weak_ref  # type: ignore
137
138
139def data_size_str(bytecount: int, compact: bool = False) -> str:
140    """Given a size in bytes, returns a short human readable string.
141
142    In compact mode this should be 6 or fewer chars for most all
143    sane file sizes.
144    """
145    # pylint: disable=too-many-return-statements
146
147    # Special case: handle negatives.
148    if bytecount < 0:
149        val = data_size_str(-bytecount, compact=compact)
150        return f'-{val}'
151
152    if bytecount <= 999:
153        suffix = 'B' if compact else 'bytes'
154        return f'{bytecount} {suffix}'
155    kbytecount = bytecount / 1024
156    if round(kbytecount, 1) < 10.0:
157        return f'{kbytecount:.1f} KB'
158    if round(kbytecount, 0) < 999:
159        return f'{kbytecount:.0f} KB'
160    mbytecount = bytecount / (1024 * 1024)
161    if round(mbytecount, 1) < 10.0:
162        return f'{mbytecount:.1f} MB'
163    if round(mbytecount, 0) < 999:
164        return f'{mbytecount:.0f} MB'
165    gbytecount = bytecount / (1024 * 1024 * 1024)
166    if round(gbytecount, 1) < 10.0:
167        return f'{gbytecount:.1f} GB'
168    return f'{gbytecount:.0f} GB'
169
170
171class DirtyBit:
172    """Manages whether a thing is dirty and regulates attempts to clean it.
173
174    To use, simply set the 'dirty' value on this object to True when some
175    action is needed, and then check the 'should_update' value to regulate
176    when attempts to clean it should be made. Set 'dirty' back to False after
177    a successful update.
178    If 'use_lock' is True, an asyncio Lock will be created and incorporated
179    into update attempts to prevent simultaneous updates (should_update will
180    only return True when the lock is unlocked). Note that It is up to the user
181    to lock/unlock the lock during the actual update attempt.
182    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
183    itself back to dirty after being clean for the given amount of time.
184    'min_update_interval' can be used to enforce a minimum update
185    interval even when updates are successful (retry_interval only applies
186    when updates fail)
187    """
188
189    def __init__(
190        self,
191        dirty: bool = False,
192        retry_interval: float = 5.0,
193        use_lock: bool = False,
194        auto_dirty_seconds: float | None = None,
195        min_update_interval: float | None = None,
196    ):
197        curtime = time.monotonic()
198        self._retry_interval = retry_interval
199        self._auto_dirty_seconds = auto_dirty_seconds
200        self._min_update_interval = min_update_interval
201        self._dirty = dirty
202        self._next_update_time: float | None = curtime if dirty else None
203        self._last_update_time: float | None = None
204        self._next_auto_dirty_time: float | None = (
205            (curtime + self._auto_dirty_seconds)
206            if (not dirty and self._auto_dirty_seconds is not None)
207            else None
208        )
209        self._use_lock = use_lock
210        self.lock: asyncio.Lock
211        if self._use_lock:
212            import asyncio
213
214            self.lock = asyncio.Lock()
215
216    @property
217    def dirty(self) -> bool:
218        """Whether the target is currently dirty.
219
220        This should be set to False once an update is successful.
221        """
222        return self._dirty
223
224    @dirty.setter
225    def dirty(self, value: bool) -> None:
226        # If we're freshly clean, set our next auto-dirty time (if we have
227        # one).
228        if self._dirty and not value and self._auto_dirty_seconds is not None:
229            self._next_auto_dirty_time = (
230                time.monotonic() + self._auto_dirty_seconds
231            )
232
233        # If we're freshly dirty, schedule an immediate update.
234        if not self._dirty and value:
235            self._next_update_time = time.monotonic()
236
237            # If they want to enforce a minimum update interval,
238            # push out the next update time if it hasn't been long enough.
239            if (
240                self._min_update_interval is not None
241                and self._last_update_time is not None
242            ):
243                self._next_update_time = max(
244                    self._next_update_time,
245                    self._last_update_time + self._min_update_interval,
246                )
247
248        self._dirty = value
249
250    @property
251    def should_update(self) -> bool:
252        """Whether an attempt should be made to clean the target now.
253
254        Always returns False if the target is not dirty.
255        Takes into account the amount of time passed since the target
256        was marked dirty or since should_update last returned True.
257        """
258        curtime = time.monotonic()
259
260        # Auto-dirty ourself if we're into that.
261        if (
262            self._next_auto_dirty_time is not None
263            and curtime > self._next_auto_dirty_time
264        ):
265            self.dirty = True
266            self._next_auto_dirty_time = None
267        if not self._dirty:
268            return False
269        if self._use_lock and self.lock.locked():
270            return False
271        assert self._next_update_time is not None
272        if curtime > self._next_update_time:
273            self._next_update_time = curtime + self._retry_interval
274            self._last_update_time = curtime
275            return True
276        return False
277
278
279class DispatchMethodWrapper(Generic[ArgT, RetT]):
280    """Type-aware standin for the dispatch func returned by dispatchmethod."""
281
282    def __call__(self, arg: ArgT) -> RetT:
283        raise RuntimeError('Should not get here')
284
285    @staticmethod
286    def register(
287        func: Callable[[Any, Any], RetT]
288    ) -> Callable[[Any, Any], RetT]:
289        """Register a new dispatch handler for this dispatch-method."""
290        raise RuntimeError('Should not get here')
291
292    registry: dict[Any, Callable]
293
294
295# noinspection PyProtectedMember,PyTypeHints
296def dispatchmethod(
297    func: Callable[[Any, ArgT], RetT]
298) -> DispatchMethodWrapper[ArgT, RetT]:
299    """A variation of functools.singledispatch for methods.
300
301    Note: as of Python 3.9 there is now functools.singledispatchmethod,
302    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
303    which gives us a reason to keep this one around for now.
304    """
305    from functools import singledispatch, update_wrapper
306
307    origwrapper: Any = singledispatch(func)
308
309    # Pull this out so hopefully origwrapper can die,
310    # otherwise we reference origwrapper in our wrapper.
311    dispatch = origwrapper.dispatch
312
313    # All we do here is recreate the end of functools.singledispatch
314    # where it returns a wrapper except instead of the wrapper using the
315    # first arg to the function ours uses the second (to skip 'self').
316    # This was made against Python 3.7; we should probably check up on
317    # this in later versions in case anything has changed.
318    # (or hopefully they'll add this functionality to their version)
319    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
320    def wrapper(*args: Any, **kw: Any) -> Any:
321        if not args or len(args) < 2:
322            raise TypeError(
323                f'{funcname} requires at least ' '2 positional arguments'
324            )
325
326        return dispatch(args[1].__class__)(*args, **kw)
327
328    funcname = getattr(func, '__name__', 'dispatchmethod method')
329    wrapper.register = origwrapper.register  # type: ignore
330    wrapper.dispatch = dispatch  # type: ignore
331    wrapper.registry = origwrapper.registry  # type: ignore
332    # pylint: disable=protected-access
333    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
334    update_wrapper(wrapper, func)
335    # pylint: enable=protected-access
336    return cast(DispatchMethodWrapper, wrapper)
337
338
339def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
340    """Decorator for functions to allow dispatching based on a value.
341
342    This differs from functools.singledispatch in that it dispatches based
343    on the value of an argument, not based on its type.
344    The 'register' method of a value-dispatch function can be used
345    to assign new functions to handle particular values.
346    Unhandled values wind up in the original dispatch function."""
347    return ValueDispatcher(call)
348
349
350class ValueDispatcher(Generic[ValT, RetT]):
351    """Used by the valuedispatch decorator"""
352
353    def __init__(self, call: Callable[[ValT], RetT]) -> None:
354        self._base_call = call
355        self._handlers: dict[ValT, Callable[[], RetT]] = {}
356
357    def __call__(self, value: ValT) -> RetT:
358        handler = self._handlers.get(value)
359        if handler is not None:
360            return handler()
361        return self._base_call(value)
362
363    def _add_handler(
364        self, value: ValT, call: Callable[[], RetT]
365    ) -> Callable[[], RetT]:
366        if value in self._handlers:
367            raise RuntimeError(f'Duplicate handlers added for {value}')
368        self._handlers[value] = call
369        return call
370
371    def register(
372        self, value: ValT
373    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
374        """Add a handler to the dispatcher."""
375        from functools import partial
376
377        return partial(self._add_handler, value)
378
379
380def valuedispatch1arg(
381    call: Callable[[ValT, ArgT], RetT]
382) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
383    """Like valuedispatch but for functions taking an extra argument."""
384    return ValueDispatcher1Arg(call)
385
386
387class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
388    """Used by the valuedispatch1arg decorator"""
389
390    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
391        self._base_call = call
392        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
393
394    def __call__(self, value: ValT, arg: ArgT) -> RetT:
395        handler = self._handlers.get(value)
396        if handler is not None:
397            return handler(arg)
398        return self._base_call(value, arg)
399
400    def _add_handler(
401        self, value: ValT, call: Callable[[ArgT], RetT]
402    ) -> Callable[[ArgT], RetT]:
403        if value in self._handlers:
404            raise RuntimeError(f'Duplicate handlers added for {value}')
405        self._handlers[value] = call
406        return call
407
408    def register(
409        self, value: ValT
410    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
411        """Add a handler to the dispatcher."""
412        from functools import partial
413
414        return partial(self._add_handler, value)
415
416
417if TYPE_CHECKING:
418
419    class ValueDispatcherMethod(Generic[ValT, RetT]):
420        """Used by the valuedispatchmethod decorator."""
421
422        def __call__(self, value: ValT) -> RetT: ...
423
424        def register(
425            self, value: ValT
426        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
427            """Add a handler to the dispatcher."""
428            ...
429
430
431def valuedispatchmethod(
432    call: Callable[[SelfT, ValT], RetT]
433) -> ValueDispatcherMethod[ValT, RetT]:
434    """Like valuedispatch but works with methods instead of functions."""
435
436    # NOTE: It seems that to wrap a method with a decorator and have self
437    # dispatching do the right thing, we must return a function and not
438    # an executable object. So for this version we store our data here
439    # in the function call dict and simply return a call.
440
441    _base_call = call
442    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
443
444    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
445        if value in _handlers:
446            raise RuntimeError(f'Duplicate handlers added for {value}')
447        _handlers[value] = addcall
448
449    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
450        from functools import partial
451
452        return partial(_add_handler, value)
453
454    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
455        handler = _handlers.get(value)
456        if handler is not None:
457            return handler(self)
458        return _base_call(self, value)
459
460    # We still want to use our returned object to register handlers, but we're
461    # actually just returning a function. So manually stuff the call onto it.
462    setattr(_call_wrapper, 'register', _register)
463
464    # To the type checker's eyes we return a ValueDispatchMethod instance;
465    # this lets it know about our register func and type-check its usage.
466    # In reality we just return a raw function call (for reasons listed above).
467    # pylint: disable=undefined-variable, no-else-return
468    if TYPE_CHECKING:
469        return ValueDispatcherMethod[ValT, RetT]()
470    else:
471        return _call_wrapper
472
473
474def make_hash(obj: Any) -> int:
475    """Makes a hash from a dictionary, list, tuple or set to any level,
476    that contains only other hashable types (including any lists, tuples,
477    sets, and dictionaries).
478
479    Note that this uses Python's hash() function internally so collisions/etc.
480    may be more common than with fancy cryptographic hashes.
481
482    Also be aware that Python's hash() output varies across processes, so
483    this should only be used for values that will remain in a single process.
484    """
485    import copy
486
487    if isinstance(obj, (set, tuple, list)):
488        return hash(tuple(make_hash(e) for e in obj))
489    if not isinstance(obj, dict):
490        return hash(obj)
491
492    new_obj = copy.deepcopy(obj)
493    for k, v in new_obj.items():
494        new_obj[k] = make_hash(v)
495
496    # NOTE: there is sorted works correctly because it compares only
497    # unique first values (i.e. dict keys)
498    return hash(tuple(frozenset(sorted(new_obj.items()))))
499
500
501def float_hash_from_string(s: str) -> float:
502    """Given a string value, returns a float between 0 and 1.
503
504    If consistent across processes. Can be useful for assigning db ids
505    shard values for efficient parallel processing.
506    """
507    import hashlib
508
509    hash_bytes = hashlib.md5(s.encode()).digest()
510
511    # Generate a random 64 bit int from hash digest bytes.
512    ival = int.from_bytes(hash_bytes[:8])
513    return ival / ((1 << 64) - 1)
514
515
516def asserttype(obj: Any, typ: type[T]) -> T:
517    """Return an object typed as a given type.
518
519    Assert is used to check its actual type, so only use this when
520    failures are not expected. Otherwise use checktype.
521    """
522    assert isinstance(typ, type), 'only actual types accepted'
523    assert isinstance(obj, typ)
524    return obj
525
526
527def asserttype_o(obj: Any, typ: type[T]) -> T | None:
528    """Return an object typed as a given optional type.
529
530    Assert is used to check its actual type, so only use this when
531    failures are not expected. Otherwise use checktype.
532    """
533    assert isinstance(typ, type), 'only actual types accepted'
534    assert isinstance(obj, (typ, type(None)))
535    return obj
536
537
538def checktype(obj: Any, typ: type[T]) -> T:
539    """Return an object typed as a given type.
540
541    Always checks the type at runtime with isinstance and throws a TypeError
542    on failure. Use asserttype for more efficient (but less safe) equivalent.
543    """
544    assert isinstance(typ, type), 'only actual types accepted'
545    if not isinstance(obj, typ):
546        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
547    return obj
548
549
550def checktype_o(obj: Any, typ: type[T]) -> T | None:
551    """Return an object typed as a given optional type.
552
553    Always checks the type at runtime with isinstance and throws a TypeError
554    on failure. Use asserttype for more efficient (but less safe) equivalent.
555    """
556    assert isinstance(typ, type), 'only actual types accepted'
557    if not isinstance(obj, (typ, type(None))):
558        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
559    return obj
560
561
562def warntype(obj: Any, typ: type[T]) -> T:
563    """Return an object typed as a given type.
564
565    Always checks the type at runtime and simply logs a warning if it is
566    not what is expected.
567    """
568    assert isinstance(typ, type), 'only actual types accepted'
569    if not isinstance(obj, typ):
570        import logging
571
572        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
573    return obj  # type: ignore
574
575
576def warntype_o(obj: Any, typ: type[T]) -> T | None:
577    """Return an object typed as a given type.
578
579    Always checks the type at runtime and simply logs a warning if it is
580    not what is expected.
581    """
582    assert isinstance(typ, type), 'only actual types accepted'
583    if not isinstance(obj, (typ, type(None))):
584        import logging
585
586        logging.warning(
587            'warntype: expected a %s or None, got a %s', typ, type(obj)
588        )
589    return obj  # type: ignore
590
591
592def assert_non_optional(obj: T | None) -> T:
593    """Return an object with Optional typing removed.
594
595    Assert is used to check its actual type, so only use this when
596    failures are not expected. Use check_non_optional otherwise.
597    """
598    assert obj is not None
599    return obj
600
601
602def check_non_optional(obj: T | None) -> T:
603    """Return an object with Optional typing removed.
604
605    Always checks the actual type and throws a TypeError on failure.
606    Use assert_non_optional for a more efficient (but less safe) equivalent.
607    """
608    if obj is None:
609        raise ValueError('Got None value in check_non_optional.')
610    return obj
611
612
613def smoothstep(edge0: float, edge1: float, x: float) -> float:
614    """A smooth transition function.
615
616    Returns a value that smoothly moves from 0 to 1 as we go between edges.
617    Values outside of the range return 0 or 1.
618    """
619    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
620    return y * y * (3.0 - 2.0 * y)
621
622
623def linearstep(edge0: float, edge1: float, x: float) -> float:
624    """A linear transition function.
625
626    Returns a value that linearly moves from 0 to 1 as we go between edges.
627    Values outside of the range return 0 or 1.
628    """
629    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
630
631
632def _compact_id(num: int, chars: str) -> str:
633    if num < 0:
634        raise ValueError('Negative integers not allowed.')
635
636    # Chars must be in sorted order for sorting to work correctly
637    # on our output.
638    assert ''.join(sorted(list(chars))) == chars
639
640    base = len(chars)
641    out = ''
642    while num:
643        out += chars[num % base]
644        num //= base
645    return out[::-1] or '0'
646
647
648def human_readable_compact_id(num: int) -> str:
649    """Given a positive int, return a compact string representation for it.
650
651    Handy for visualizing unique numeric ids using as few as possible chars.
652    This representation uses only lowercase letters and numbers (minus the
653    following letters for readability):
654     's' is excluded due to similarity to '5'.
655     'l' is excluded due to similarity to '1'.
656     'i' is excluded due to similarity to '1'.
657     'o' is excluded due to similarity to '0'.
658     'z' is excluded due to similarity to '2'.
659
660    Therefore for n chars this can store values of 21^n.
661
662    When reading human input consisting of these IDs, it may be desirable
663    to map the disallowed chars to their corresponding allowed ones
664    ('o' -> '0', etc).
665
666    Sort order for these ids is the same as the original numbers.
667
668    If more compactness is desired at the expense of readability,
669    use compact_id() instead.
670    """
671    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
672
673
674def compact_id(num: int) -> str:
675    """Given a positive int, return a compact string representation for it.
676
677    Handy for visualizing unique numeric ids using as few as possible chars.
678    This version is more compact than human_readable_compact_id() but less
679    friendly to humans due to using both capital and lowercase letters,
680    both 'O' and '0', etc.
681
682    Therefore for n chars this can store values of 62^n.
683
684    Sort order for these ids is the same as the original numbers.
685    """
686    return _compact_id(
687        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
688    )
689
690
691def caller_source_location() -> str:
692    """Returns source file name and line of the code calling us.
693
694    Example: 'mymodule.py:23'
695    """
696    try:
697        import inspect
698
699        frame = inspect.currentframe()
700        for _i in range(2):
701            if frame is None:
702                raise RuntimeError()
703            frame = frame.f_back
704        if frame is None:
705            raise RuntimeError()
706        fname = os.path.basename(frame.f_code.co_filename)
707        return f'{fname}:{frame.f_lineno}'
708    except Exception:
709        return '<unknown source location>'
710
711
712def unchanging_hostname() -> str:
713    """Return an unchanging name for the local device.
714
715    Similar to the `hostname` call (or os.uname().nodename in Python)
716    except attempts to give a name that doesn't change depending on
717    network conditions. (A Mac will tend to go from Foo to Foo.local,
718    Foo.lan etc. throughout its various adventures)
719    """
720    import platform
721    import subprocess
722
723    # On Mac, this should give the computer name assigned in System Prefs.
724    if platform.system() == 'Darwin':
725        return (
726            subprocess.run(
727                ['scutil', '--get', 'ComputerName'],
728                check=True,
729                capture_output=True,
730            )
731            .stdout.decode()
732            .strip()
733            .replace(' ', '-')
734        )
735    return os.uname().nodename
736
737
738def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
739    """Do the thing."""
740    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
741        return
742
743    modulename = module_globals.get('__name__')
744    if not isinstance(modulename, str):
745        raise RuntimeError('Unable to get module name.')
746    assert not modulename.startswith('_')
747    modulename_prefix = f'{modulename}.'
748    modulename_prefix_2 = f'_{modulename}.'
749
750    for name, obj in module_globals.items():
751        if name.startswith('_'):
752            continue
753        existing = getattr(obj, '__module__', None)
754        try:
755            # Override the module ONLY if it lives under us somewhere.
756            # So ourpackage._submodule.Foo becomes ourpackage.Foo
757            # but otherpackage._submodule.Foo remains untouched.
758            if existing is not None and (
759                existing.startswith(modulename_prefix)
760                or existing.startswith(modulename_prefix_2)
761            ):
762                obj.__module__ = modulename
763        except Exception:
764            import logging
765
766            logging.warning(
767                'set_canonical_module_names: unable to change __module__'
768                " from '%s' to '%s' on %s object at '%s'.",
769                existing,
770                modulename,
771                type(obj),
772                name,
773            )
774
775
776def timedelta_str(
777    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
778) -> str:
779    """Return a simple human readable time string for a length of time.
780
781    Time can be given as a timedelta or a float representing seconds.
782    Example output:
783      "23d 1h 2m 32s" (with maxparts == 4)
784      "23d 1h" (with maxparts == 2)
785      "23d 1.08h" (with maxparts == 2 and decimals == 2)
786
787    Note that this is hard-coded in English and probably not especially
788    performant.
789    """
790    # pylint: disable=too-many-locals
791
792    if isinstance(timeval, float):
793        timevalfin = datetime.timedelta(seconds=timeval)
794    else:
795        timevalfin = timeval
796
797    # Internally we only handle positive values.
798    if timevalfin.total_seconds() < 0:
799        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
800
801    years = timevalfin.days // 365
802    days = timevalfin.days % 365
803    hours = timevalfin.seconds // 3600
804    hour_remainder = timevalfin.seconds % 3600
805    minutes = hour_remainder // 60
806    seconds = hour_remainder % 60
807
808    # Now, if we want decimal places for our last value,
809    # calc fractional parts.
810    if decimals:
811        # Calc totals of each type.
812        t_seconds = timevalfin.total_seconds()
813        t_minutes = t_seconds / 60
814        t_hours = t_minutes / 60
815        t_days = t_hours / 24
816        t_years = t_days / 365
817
818        # Calc fractional parts that exclude all whole values to their left.
819        years_covered = years
820        years_f = t_years - years_covered
821        days_covered = years_covered * 365 + days
822        days_f = t_days - days_covered
823        hours_covered = days_covered * 24 + hours
824        hours_f = t_hours - hours_covered
825        minutes_covered = hours_covered * 60 + minutes
826        minutes_f = t_minutes - minutes_covered
827        seconds_covered = minutes_covered * 60 + seconds
828        seconds_f = t_seconds - seconds_covered
829    else:
830        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
831
832    parts: list[str] = []
833    for part, part_f, suffix in (
834        (years, years_f, 'y'),
835        (days, days_f, 'd'),
836        (hours, hours_f, 'h'),
837        (minutes, minutes_f, 'm'),
838        (seconds, seconds_f, 's'),
839    ):
840        if part or parts or (not parts and suffix == 's'):
841            # Do decimal version only for the last part.
842            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
843                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
844            else:
845                parts.append(f'{part}{suffix}')
846            if len(parts) >= maxparts:
847                break
848    return ' '.join(parts)
849
850
851def ago_str(
852    timeval: datetime.datetime,
853    maxparts: int = 1,
854    now: datetime.datetime | None = None,
855    decimals: int = 0,
856) -> str:
857    """Given a datetime, return a clean human readable 'ago' str.
858
859    Note that this is hard-coded in English so should not be used
860    for visible in-game elements; only tools/etc.
861
862    If now is not passed, efro.util.utc_now() is used.
863    """
864    if now is None:
865        now = utc_now()
866    return (
867        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
868        + ' ago'
869    )
870
871
872def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
873    """Split a single list into smaller lists."""
874    return [
875        input_list[i : i + max_length]
876        for i in range(0, len(input_list), max_length)
877    ]
878
879
880def extract_flag(args: list[str], name: str) -> bool:
881    """Given a list of args and a flag name, returns whether it is present.
882
883    The arg flag, if present, is removed from the arg list.
884    """
885    from efro.error import CleanError
886
887    count = args.count(name)
888    if count > 1:
889        raise CleanError(f'Flag {name} passed multiple times.')
890    if not count:
891        return False
892    args.remove(name)
893    return True
894
895
896@overload
897def extract_arg(
898    args: list[str], name: str, required: Literal[False] = False
899) -> str | None: ...
900
901
902@overload
903def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ...
904
905
906def extract_arg(
907    args: list[str], name: str, required: bool = False
908) -> str | None:
909    """Given a list of args and an arg name, returns a value.
910
911    The arg flag and value are removed from the arg list.
912    raises CleanErrors on any problems.
913    """
914    from efro.error import CleanError
915
916    count = args.count(name)
917    if not count:
918        if required:
919            raise CleanError(f'Required argument {name} not passed.')
920        return None
921
922    if count > 1:
923        raise CleanError(f'Arg {name} passed multiple times.')
924
925    argindex = args.index(name)
926    if argindex + 1 >= len(args):
927        raise CleanError(f'No value passed after {name} arg.')
928
929    val = args[argindex + 1]
930    del args[argindex : argindex + 2]
931
932    return val
def explicit_bool(val: bool) -> bool:
37def explicit_bool(val: bool) -> bool:
38    """Return a non-inferable boolean value.
39
40    Useful to be able to disable blocks of code without type checkers
41    complaining/etc.
42    """
43    # pylint: disable=no-else-return
44    if TYPE_CHECKING:
45        # infer this! <boom>
46        import random
47
48        return random.random() < 0.5
49    else:
50        return val

Return a non-inferable boolean value.

Useful to be able to disable blocks of code without type checkers complaining/etc.

def snake_case_to_title(val: str) -> str:
53def snake_case_to_title(val: str) -> str:
54    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
55    # Kill empty words resulting from leading/trailing/multiple underscores.
56    return ' '.join(w for w in val.split('_') if w).title()

Given a snake-case string 'foo_bar', returns 'Foo Bar'.

def snake_case_to_camel_case(val: str) -> str:
59def snake_case_to_camel_case(val: str) -> str:
60    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
61    # Replace underscores with spaces; capitalize words; kill spaces.
62    # Not sure about efficiency, but logically simple.
63    return val.replace('_', ' ').title().replace(' ', '')

Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.

def check_utc(value: datetime.datetime) -> None:
66def check_utc(value: datetime.datetime) -> None:
67    """Ensure a datetime value is timezone-aware utc."""
68    if value.tzinfo is not datetime.UTC:
69        raise ValueError(
70            'datetime value does not have timezone set as datetime.UTC'
71        )

Ensure a datetime value is timezone-aware utc.

def utc_now() -> datetime.datetime:
74def utc_now() -> datetime.datetime:
75    """Get timezone-aware current utc time.
76
77    Just a shortcut for datetime.datetime.now(datetime.UTC).
78    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
79    times.
80    """
81    return datetime.datetime.now(datetime.UTC)

Get timezone-aware current utc time.

Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.

def utc_now_naive() -> datetime.datetime:
84def utc_now_naive() -> datetime.datetime:
85    """Get naive utc time.
86
87    This can be used to replace datetime.utcnow(), which is now deprecated.
88    Most all code should migrate to use timezone-aware times instead of
89    this.
90    """
91    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)

Get naive utc time.

This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.

def utc_today() -> datetime.datetime:
94def utc_today() -> datetime.datetime:
95    """Get offset-aware midnight in the utc time zone."""
96    now = datetime.datetime.now(datetime.UTC)
97    return datetime.datetime(
98        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
99    )

Get offset-aware midnight in the utc time zone.

def utc_this_hour() -> datetime.datetime:
102def utc_this_hour() -> datetime.datetime:
103    """Get offset-aware beginning of the current hour in the utc time zone."""
104    now = datetime.datetime.now(datetime.UTC)
105    return datetime.datetime(
106        year=now.year,
107        month=now.month,
108        day=now.day,
109        hour=now.hour,
110        tzinfo=now.tzinfo,
111    )

Get offset-aware beginning of the current hour in the utc time zone.

def utc_this_minute() -> datetime.datetime:
114def utc_this_minute() -> datetime.datetime:
115    """Get offset-aware beginning of current minute in the utc time zone."""
116    now = datetime.datetime.now(datetime.UTC)
117    return datetime.datetime(
118        year=now.year,
119        month=now.month,
120        day=now.day,
121        hour=now.hour,
122        minute=now.minute,
123        tzinfo=now.tzinfo,
124    )

Get offset-aware beginning of current minute in the utc time zone.

def empty_weakref(objtype: type[~T]) -> weakref.ReferenceType[~T]:
127def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
128    """Return an invalidated weak-reference for the specified type."""
129    # At runtime, all weakrefs are the same; our type arg is just
130    # for the static type checker.
131    del objtype  # Unused.
132
133    # Just create an object and let it die. Is there a cleaner way to do this?
134    # return weakref.ref(_EmptyObj())  # type: ignore
135
136    # Sharing a single ones seems at least a bit better.
137    return _g_empty_weak_ref  # type: ignore

Return an invalidated weak-reference for the specified type.

def data_size_str(bytecount: int, compact: bool = False) -> str:
140def data_size_str(bytecount: int, compact: bool = False) -> str:
141    """Given a size in bytes, returns a short human readable string.
142
143    In compact mode this should be 6 or fewer chars for most all
144    sane file sizes.
145    """
146    # pylint: disable=too-many-return-statements
147
148    # Special case: handle negatives.
149    if bytecount < 0:
150        val = data_size_str(-bytecount, compact=compact)
151        return f'-{val}'
152
153    if bytecount <= 999:
154        suffix = 'B' if compact else 'bytes'
155        return f'{bytecount} {suffix}'
156    kbytecount = bytecount / 1024
157    if round(kbytecount, 1) < 10.0:
158        return f'{kbytecount:.1f} KB'
159    if round(kbytecount, 0) < 999:
160        return f'{kbytecount:.0f} KB'
161    mbytecount = bytecount / (1024 * 1024)
162    if round(mbytecount, 1) < 10.0:
163        return f'{mbytecount:.1f} MB'
164    if round(mbytecount, 0) < 999:
165        return f'{mbytecount:.0f} MB'
166    gbytecount = bytecount / (1024 * 1024 * 1024)
167    if round(gbytecount, 1) < 10.0:
168        return f'{gbytecount:.1f} GB'
169    return f'{gbytecount:.0f} GB'

Given a size in bytes, returns a short human readable string.

In compact mode this should be 6 or fewer chars for most all sane file sizes.

class DirtyBit:
172class DirtyBit:
173    """Manages whether a thing is dirty and regulates attempts to clean it.
174
175    To use, simply set the 'dirty' value on this object to True when some
176    action is needed, and then check the 'should_update' value to regulate
177    when attempts to clean it should be made. Set 'dirty' back to False after
178    a successful update.
179    If 'use_lock' is True, an asyncio Lock will be created and incorporated
180    into update attempts to prevent simultaneous updates (should_update will
181    only return True when the lock is unlocked). Note that It is up to the user
182    to lock/unlock the lock during the actual update attempt.
183    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
184    itself back to dirty after being clean for the given amount of time.
185    'min_update_interval' can be used to enforce a minimum update
186    interval even when updates are successful (retry_interval only applies
187    when updates fail)
188    """
189
190    def __init__(
191        self,
192        dirty: bool = False,
193        retry_interval: float = 5.0,
194        use_lock: bool = False,
195        auto_dirty_seconds: float | None = None,
196        min_update_interval: float | None = None,
197    ):
198        curtime = time.monotonic()
199        self._retry_interval = retry_interval
200        self._auto_dirty_seconds = auto_dirty_seconds
201        self._min_update_interval = min_update_interval
202        self._dirty = dirty
203        self._next_update_time: float | None = curtime if dirty else None
204        self._last_update_time: float | None = None
205        self._next_auto_dirty_time: float | None = (
206            (curtime + self._auto_dirty_seconds)
207            if (not dirty and self._auto_dirty_seconds is not None)
208            else None
209        )
210        self._use_lock = use_lock
211        self.lock: asyncio.Lock
212        if self._use_lock:
213            import asyncio
214
215            self.lock = asyncio.Lock()
216
217    @property
218    def dirty(self) -> bool:
219        """Whether the target is currently dirty.
220
221        This should be set to False once an update is successful.
222        """
223        return self._dirty
224
225    @dirty.setter
226    def dirty(self, value: bool) -> None:
227        # If we're freshly clean, set our next auto-dirty time (if we have
228        # one).
229        if self._dirty and not value and self._auto_dirty_seconds is not None:
230            self._next_auto_dirty_time = (
231                time.monotonic() + self._auto_dirty_seconds
232            )
233
234        # If we're freshly dirty, schedule an immediate update.
235        if not self._dirty and value:
236            self._next_update_time = time.monotonic()
237
238            # If they want to enforce a minimum update interval,
239            # push out the next update time if it hasn't been long enough.
240            if (
241                self._min_update_interval is not None
242                and self._last_update_time is not None
243            ):
244                self._next_update_time = max(
245                    self._next_update_time,
246                    self._last_update_time + self._min_update_interval,
247                )
248
249        self._dirty = value
250
251    @property
252    def should_update(self) -> bool:
253        """Whether an attempt should be made to clean the target now.
254
255        Always returns False if the target is not dirty.
256        Takes into account the amount of time passed since the target
257        was marked dirty or since should_update last returned True.
258        """
259        curtime = time.monotonic()
260
261        # Auto-dirty ourself if we're into that.
262        if (
263            self._next_auto_dirty_time is not None
264            and curtime > self._next_auto_dirty_time
265        ):
266            self.dirty = True
267            self._next_auto_dirty_time = None
268        if not self._dirty:
269            return False
270        if self._use_lock and self.lock.locked():
271            return False
272        assert self._next_update_time is not None
273        if curtime > self._next_update_time:
274            self._next_update_time = curtime + self._retry_interval
275            self._last_update_time = curtime
276            return True
277        return False

Manages whether a thing is dirty and regulates attempts to clean it.

To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

DirtyBit( dirty: bool = False, retry_interval: float = 5.0, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)
190    def __init__(
191        self,
192        dirty: bool = False,
193        retry_interval: float = 5.0,
194        use_lock: bool = False,
195        auto_dirty_seconds: float | None = None,
196        min_update_interval: float | None = None,
197    ):
198        curtime = time.monotonic()
199        self._retry_interval = retry_interval
200        self._auto_dirty_seconds = auto_dirty_seconds
201        self._min_update_interval = min_update_interval
202        self._dirty = dirty
203        self._next_update_time: float | None = curtime if dirty else None
204        self._last_update_time: float | None = None
205        self._next_auto_dirty_time: float | None = (
206            (curtime + self._auto_dirty_seconds)
207            if (not dirty and self._auto_dirty_seconds is not None)
208            else None
209        )
210        self._use_lock = use_lock
211        self.lock: asyncio.Lock
212        if self._use_lock:
213            import asyncio
214
215            self.lock = asyncio.Lock()
lock: asyncio.locks.Lock
dirty: bool
217    @property
218    def dirty(self) -> bool:
219        """Whether the target is currently dirty.
220
221        This should be set to False once an update is successful.
222        """
223        return self._dirty

Whether the target is currently dirty.

This should be set to False once an update is successful.

should_update: bool
251    @property
252    def should_update(self) -> bool:
253        """Whether an attempt should be made to clean the target now.
254
255        Always returns False if the target is not dirty.
256        Takes into account the amount of time passed since the target
257        was marked dirty or since should_update last returned True.
258        """
259        curtime = time.monotonic()
260
261        # Auto-dirty ourself if we're into that.
262        if (
263            self._next_auto_dirty_time is not None
264            and curtime > self._next_auto_dirty_time
265        ):
266            self.dirty = True
267            self._next_auto_dirty_time = None
268        if not self._dirty:
269            return False
270        if self._use_lock and self.lock.locked():
271            return False
272        assert self._next_update_time is not None
273        if curtime > self._next_update_time:
274            self._next_update_time = curtime + self._retry_interval
275            self._last_update_time = curtime
276            return True
277        return False

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class DispatchMethodWrapper(typing.Generic[~ArgT, ~RetT]):
280class DispatchMethodWrapper(Generic[ArgT, RetT]):
281    """Type-aware standin for the dispatch func returned by dispatchmethod."""
282
283    def __call__(self, arg: ArgT) -> RetT:
284        raise RuntimeError('Should not get here')
285
286    @staticmethod
287    def register(
288        func: Callable[[Any, Any], RetT]
289    ) -> Callable[[Any, Any], RetT]:
290        """Register a new dispatch handler for this dispatch-method."""
291        raise RuntimeError('Should not get here')
292
293    registry: dict[Any, Callable]

Type-aware standin for the dispatch func returned by dispatchmethod.

@staticmethod
def register(func: Callable[[Any, Any], ~RetT]) -> Callable[[Any, Any], ~RetT]:
286    @staticmethod
287    def register(
288        func: Callable[[Any, Any], RetT]
289    ) -> Callable[[Any, Any], RetT]:
290        """Register a new dispatch handler for this dispatch-method."""
291        raise RuntimeError('Should not get here')

Register a new dispatch handler for this dispatch-method.

registry: dict[typing.Any, typing.Callable]
def dispatchmethod( func: Callable[[Any, ~ArgT], ~RetT]) -> DispatchMethodWrapper[~ArgT, ~RetT]:
297def dispatchmethod(
298    func: Callable[[Any, ArgT], RetT]
299) -> DispatchMethodWrapper[ArgT, RetT]:
300    """A variation of functools.singledispatch for methods.
301
302    Note: as of Python 3.9 there is now functools.singledispatchmethod,
303    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
304    which gives us a reason to keep this one around for now.
305    """
306    from functools import singledispatch, update_wrapper
307
308    origwrapper: Any = singledispatch(func)
309
310    # Pull this out so hopefully origwrapper can die,
311    # otherwise we reference origwrapper in our wrapper.
312    dispatch = origwrapper.dispatch
313
314    # All we do here is recreate the end of functools.singledispatch
315    # where it returns a wrapper except instead of the wrapper using the
316    # first arg to the function ours uses the second (to skip 'self').
317    # This was made against Python 3.7; we should probably check up on
318    # this in later versions in case anything has changed.
319    # (or hopefully they'll add this functionality to their version)
320    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
321    def wrapper(*args: Any, **kw: Any) -> Any:
322        if not args or len(args) < 2:
323            raise TypeError(
324                f'{funcname} requires at least ' '2 positional arguments'
325            )
326
327        return dispatch(args[1].__class__)(*args, **kw)
328
329    funcname = getattr(func, '__name__', 'dispatchmethod method')
330    wrapper.register = origwrapper.register  # type: ignore
331    wrapper.dispatch = dispatch  # type: ignore
332    wrapper.registry = origwrapper.registry  # type: ignore
333    # pylint: disable=protected-access
334    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
335    update_wrapper(wrapper, func)
336    # pylint: enable=protected-access
337    return cast(DispatchMethodWrapper, wrapper)

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

def valuedispatch( call: Callable[[~ValT], ~RetT]) -> ValueDispatcher[~ValT, ~RetT]:
340def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
341    """Decorator for functions to allow dispatching based on a value.
342
343    This differs from functools.singledispatch in that it dispatches based
344    on the value of an argument, not based on its type.
345    The 'register' method of a value-dispatch function can be used
346    to assign new functions to handle particular values.
347    Unhandled values wind up in the original dispatch function."""
348    return ValueDispatcher(call)

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

class ValueDispatcher(typing.Generic[~ValT, ~RetT]):
351class ValueDispatcher(Generic[ValT, RetT]):
352    """Used by the valuedispatch decorator"""
353
354    def __init__(self, call: Callable[[ValT], RetT]) -> None:
355        self._base_call = call
356        self._handlers: dict[ValT, Callable[[], RetT]] = {}
357
358    def __call__(self, value: ValT) -> RetT:
359        handler = self._handlers.get(value)
360        if handler is not None:
361            return handler()
362        return self._base_call(value)
363
364    def _add_handler(
365        self, value: ValT, call: Callable[[], RetT]
366    ) -> Callable[[], RetT]:
367        if value in self._handlers:
368            raise RuntimeError(f'Duplicate handlers added for {value}')
369        self._handlers[value] = call
370        return call
371
372    def register(
373        self, value: ValT
374    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
375        """Add a handler to the dispatcher."""
376        from functools import partial
377
378        return partial(self._add_handler, value)

Used by the valuedispatch decorator

ValueDispatcher(call: Callable[[~ValT], ~RetT])
354    def __init__(self, call: Callable[[ValT], RetT]) -> None:
355        self._base_call = call
356        self._handlers: dict[ValT, Callable[[], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[], ~RetT]], Callable[[], ~RetT]]:
372    def register(
373        self, value: ValT
374    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
375        """Add a handler to the dispatcher."""
376        from functools import partial
377
378        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatch1arg( call: Callable[[~ValT, ~ArgT], ~RetT]) -> ValueDispatcher1Arg[~ValT, ~ArgT, ~RetT]:
381def valuedispatch1arg(
382    call: Callable[[ValT, ArgT], RetT]
383) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
384    """Like valuedispatch but for functions taking an extra argument."""
385    return ValueDispatcher1Arg(call)

Like valuedispatch but for functions taking an extra argument.

class ValueDispatcher1Arg(typing.Generic[~ValT, ~ArgT, ~RetT]):
388class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
389    """Used by the valuedispatch1arg decorator"""
390
391    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
392        self._base_call = call
393        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
394
395    def __call__(self, value: ValT, arg: ArgT) -> RetT:
396        handler = self._handlers.get(value)
397        if handler is not None:
398            return handler(arg)
399        return self._base_call(value, arg)
400
401    def _add_handler(
402        self, value: ValT, call: Callable[[ArgT], RetT]
403    ) -> Callable[[ArgT], RetT]:
404        if value in self._handlers:
405            raise RuntimeError(f'Duplicate handlers added for {value}')
406        self._handlers[value] = call
407        return call
408
409    def register(
410        self, value: ValT
411    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
412        """Add a handler to the dispatcher."""
413        from functools import partial
414
415        return partial(self._add_handler, value)

Used by the valuedispatch1arg decorator

ValueDispatcher1Arg(call: Callable[[~ValT, ~ArgT], ~RetT])
391    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
392        self._base_call = call
393        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[~ArgT], ~RetT]], Callable[[~ArgT], ~RetT]]:
409    def register(
410        self, value: ValT
411    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
412        """Add a handler to the dispatcher."""
413        from functools import partial
414
415        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatchmethod( call: Callable[[~SelfT, ~ValT], ~RetT]) -> efro.util.ValueDispatcherMethod[~ValT, ~RetT]:
432def valuedispatchmethod(
433    call: Callable[[SelfT, ValT], RetT]
434) -> ValueDispatcherMethod[ValT, RetT]:
435    """Like valuedispatch but works with methods instead of functions."""
436
437    # NOTE: It seems that to wrap a method with a decorator and have self
438    # dispatching do the right thing, we must return a function and not
439    # an executable object. So for this version we store our data here
440    # in the function call dict and simply return a call.
441
442    _base_call = call
443    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
444
445    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
446        if value in _handlers:
447            raise RuntimeError(f'Duplicate handlers added for {value}')
448        _handlers[value] = addcall
449
450    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
451        from functools import partial
452
453        return partial(_add_handler, value)
454
455    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
456        handler = _handlers.get(value)
457        if handler is not None:
458            return handler(self)
459        return _base_call(self, value)
460
461    # We still want to use our returned object to register handlers, but we're
462    # actually just returning a function. So manually stuff the call onto it.
463    setattr(_call_wrapper, 'register', _register)
464
465    # To the type checker's eyes we return a ValueDispatchMethod instance;
466    # this lets it know about our register func and type-check its usage.
467    # In reality we just return a raw function call (for reasons listed above).
468    # pylint: disable=undefined-variable, no-else-return
469    if TYPE_CHECKING:
470        return ValueDispatcherMethod[ValT, RetT]()
471    else:
472        return _call_wrapper

Like valuedispatch but works with methods instead of functions.

def make_hash(obj: Any) -> int:
475def make_hash(obj: Any) -> int:
476    """Makes a hash from a dictionary, list, tuple or set to any level,
477    that contains only other hashable types (including any lists, tuples,
478    sets, and dictionaries).
479
480    Note that this uses Python's hash() function internally so collisions/etc.
481    may be more common than with fancy cryptographic hashes.
482
483    Also be aware that Python's hash() output varies across processes, so
484    this should only be used for values that will remain in a single process.
485    """
486    import copy
487
488    if isinstance(obj, (set, tuple, list)):
489        return hash(tuple(make_hash(e) for e in obj))
490    if not isinstance(obj, dict):
491        return hash(obj)
492
493    new_obj = copy.deepcopy(obj)
494    for k, v in new_obj.items():
495        new_obj[k] = make_hash(v)
496
497    # NOTE: there is sorted works correctly because it compares only
498    # unique first values (i.e. dict keys)
499    return hash(tuple(frozenset(sorted(new_obj.items()))))

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.

def float_hash_from_string(s: str) -> float:
502def float_hash_from_string(s: str) -> float:
503    """Given a string value, returns a float between 0 and 1.
504
505    If consistent across processes. Can be useful for assigning db ids
506    shard values for efficient parallel processing.
507    """
508    import hashlib
509
510    hash_bytes = hashlib.md5(s.encode()).digest()
511
512    # Generate a random 64 bit int from hash digest bytes.
513    ival = int.from_bytes(hash_bytes[:8])
514    return ival / ((1 << 64) - 1)

Given a string value, returns a float between 0 and 1.

If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.

def asserttype(obj: Any, typ: type[~T]) -> ~T:
517def asserttype(obj: Any, typ: type[T]) -> T:
518    """Return an object typed as a given type.
519
520    Assert is used to check its actual type, so only use this when
521    failures are not expected. Otherwise use checktype.
522    """
523    assert isinstance(typ, type), 'only actual types accepted'
524    assert isinstance(obj, typ)
525    return obj

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def asserttype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
528def asserttype_o(obj: Any, typ: type[T]) -> T | None:
529    """Return an object typed as a given optional type.
530
531    Assert is used to check its actual type, so only use this when
532    failures are not expected. Otherwise use checktype.
533    """
534    assert isinstance(typ, type), 'only actual types accepted'
535    assert isinstance(obj, (typ, type(None)))
536    return obj

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def checktype(obj: Any, typ: type[~T]) -> ~T:
539def checktype(obj: Any, typ: type[T]) -> T:
540    """Return an object typed as a given type.
541
542    Always checks the type at runtime with isinstance and throws a TypeError
543    on failure. Use asserttype for more efficient (but less safe) equivalent.
544    """
545    assert isinstance(typ, type), 'only actual types accepted'
546    if not isinstance(obj, typ):
547        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
548    return obj

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def checktype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
551def checktype_o(obj: Any, typ: type[T]) -> T | None:
552    """Return an object typed as a given optional type.
553
554    Always checks the type at runtime with isinstance and throws a TypeError
555    on failure. Use asserttype for more efficient (but less safe) equivalent.
556    """
557    assert isinstance(typ, type), 'only actual types accepted'
558    if not isinstance(obj, (typ, type(None))):
559        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
560    return obj

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def warntype(obj: Any, typ: type[~T]) -> ~T:
563def warntype(obj: Any, typ: type[T]) -> T:
564    """Return an object typed as a given type.
565
566    Always checks the type at runtime and simply logs a warning if it is
567    not what is expected.
568    """
569    assert isinstance(typ, type), 'only actual types accepted'
570    if not isinstance(obj, typ):
571        import logging
572
573        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
574    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def warntype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
577def warntype_o(obj: Any, typ: type[T]) -> T | None:
578    """Return an object typed as a given type.
579
580    Always checks the type at runtime and simply logs a warning if it is
581    not what is expected.
582    """
583    assert isinstance(typ, type), 'only actual types accepted'
584    if not isinstance(obj, (typ, type(None))):
585        import logging
586
587        logging.warning(
588            'warntype: expected a %s or None, got a %s', typ, type(obj)
589        )
590    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def assert_non_optional(obj: Optional[~T]) -> ~T:
593def assert_non_optional(obj: T | None) -> T:
594    """Return an object with Optional typing removed.
595
596    Assert is used to check its actual type, so only use this when
597    failures are not expected. Use check_non_optional otherwise.
598    """
599    assert obj is not None
600    return obj

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

def check_non_optional(obj: Optional[~T]) -> ~T:
603def check_non_optional(obj: T | None) -> T:
604    """Return an object with Optional typing removed.
605
606    Always checks the actual type and throws a TypeError on failure.
607    Use assert_non_optional for a more efficient (but less safe) equivalent.
608    """
609    if obj is None:
610        raise ValueError('Got None value in check_non_optional.')
611    return obj

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

def smoothstep(edge0: float, edge1: float, x: float) -> float:
614def smoothstep(edge0: float, edge1: float, x: float) -> float:
615    """A smooth transition function.
616
617    Returns a value that smoothly moves from 0 to 1 as we go between edges.
618    Values outside of the range return 0 or 1.
619    """
620    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
621    return y * y * (3.0 - 2.0 * y)

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def linearstep(edge0: float, edge1: float, x: float) -> float:
624def linearstep(edge0: float, edge1: float, x: float) -> float:
625    """A linear transition function.
626
627    Returns a value that linearly moves from 0 to 1 as we go between edges.
628    Values outside of the range return 0 or 1.
629    """
630    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def human_readable_compact_id(num: int) -> str:
649def human_readable_compact_id(num: int) -> str:
650    """Given a positive int, return a compact string representation for it.
651
652    Handy for visualizing unique numeric ids using as few as possible chars.
653    This representation uses only lowercase letters and numbers (minus the
654    following letters for readability):
655     's' is excluded due to similarity to '5'.
656     'l' is excluded due to similarity to '1'.
657     'i' is excluded due to similarity to '1'.
658     'o' is excluded due to similarity to '0'.
659     'z' is excluded due to similarity to '2'.
660
661    Therefore for n chars this can store values of 21^n.
662
663    When reading human input consisting of these IDs, it may be desirable
664    to map the disallowed chars to their corresponding allowed ones
665    ('o' -> '0', etc).
666
667    Sort order for these ids is the same as the original numbers.
668
669    If more compactness is desired at the expense of readability,
670    use compact_id() instead.
671    """
672    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

def compact_id(num: int) -> str:
675def compact_id(num: int) -> str:
676    """Given a positive int, return a compact string representation for it.
677
678    Handy for visualizing unique numeric ids using as few as possible chars.
679    This version is more compact than human_readable_compact_id() but less
680    friendly to humans due to using both capital and lowercase letters,
681    both 'O' and '0', etc.
682
683    Therefore for n chars this can store values of 62^n.
684
685    Sort order for these ids is the same as the original numbers.
686    """
687    return _compact_id(
688        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
689    )

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

def caller_source_location() -> str:
692def caller_source_location() -> str:
693    """Returns source file name and line of the code calling us.
694
695    Example: 'mymodule.py:23'
696    """
697    try:
698        import inspect
699
700        frame = inspect.currentframe()
701        for _i in range(2):
702            if frame is None:
703                raise RuntimeError()
704            frame = frame.f_back
705        if frame is None:
706            raise RuntimeError()
707        fname = os.path.basename(frame.f_code.co_filename)
708        return f'{fname}:{frame.f_lineno}'
709    except Exception:
710        return '<unknown source location>'

Returns source file name and line of the code calling us.

Example: 'mymodule.py:23'

def unchanging_hostname() -> str:
713def unchanging_hostname() -> str:
714    """Return an unchanging name for the local device.
715
716    Similar to the `hostname` call (or os.uname().nodename in Python)
717    except attempts to give a name that doesn't change depending on
718    network conditions. (A Mac will tend to go from Foo to Foo.local,
719    Foo.lan etc. throughout its various adventures)
720    """
721    import platform
722    import subprocess
723
724    # On Mac, this should give the computer name assigned in System Prefs.
725    if platform.system() == 'Darwin':
726        return (
727            subprocess.run(
728                ['scutil', '--get', 'ComputerName'],
729                check=True,
730                capture_output=True,
731            )
732            .stdout.decode()
733            .strip()
734            .replace(' ', '-')
735        )
736    return os.uname().nodename

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn't change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

def set_canonical_module_names(module_globals: dict[str, typing.Any]) -> None:
739def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
740    """Do the thing."""
741    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
742        return
743
744    modulename = module_globals.get('__name__')
745    if not isinstance(modulename, str):
746        raise RuntimeError('Unable to get module name.')
747    assert not modulename.startswith('_')
748    modulename_prefix = f'{modulename}.'
749    modulename_prefix_2 = f'_{modulename}.'
750
751    for name, obj in module_globals.items():
752        if name.startswith('_'):
753            continue
754        existing = getattr(obj, '__module__', None)
755        try:
756            # Override the module ONLY if it lives under us somewhere.
757            # So ourpackage._submodule.Foo becomes ourpackage.Foo
758            # but otherpackage._submodule.Foo remains untouched.
759            if existing is not None and (
760                existing.startswith(modulename_prefix)
761                or existing.startswith(modulename_prefix_2)
762            ):
763                obj.__module__ = modulename
764        except Exception:
765            import logging
766
767            logging.warning(
768                'set_canonical_module_names: unable to change __module__'
769                " from '%s' to '%s' on %s object at '%s'.",
770                existing,
771                modulename,
772                type(obj),
773                name,
774            )

Do the thing.

def timedelta_str( timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0) -> str:
777def timedelta_str(
778    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
779) -> str:
780    """Return a simple human readable time string for a length of time.
781
782    Time can be given as a timedelta or a float representing seconds.
783    Example output:
784      "23d 1h 2m 32s" (with maxparts == 4)
785      "23d 1h" (with maxparts == 2)
786      "23d 1.08h" (with maxparts == 2 and decimals == 2)
787
788    Note that this is hard-coded in English and probably not especially
789    performant.
790    """
791    # pylint: disable=too-many-locals
792
793    if isinstance(timeval, float):
794        timevalfin = datetime.timedelta(seconds=timeval)
795    else:
796        timevalfin = timeval
797
798    # Internally we only handle positive values.
799    if timevalfin.total_seconds() < 0:
800        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
801
802    years = timevalfin.days // 365
803    days = timevalfin.days % 365
804    hours = timevalfin.seconds // 3600
805    hour_remainder = timevalfin.seconds % 3600
806    minutes = hour_remainder // 60
807    seconds = hour_remainder % 60
808
809    # Now, if we want decimal places for our last value,
810    # calc fractional parts.
811    if decimals:
812        # Calc totals of each type.
813        t_seconds = timevalfin.total_seconds()
814        t_minutes = t_seconds / 60
815        t_hours = t_minutes / 60
816        t_days = t_hours / 24
817        t_years = t_days / 365
818
819        # Calc fractional parts that exclude all whole values to their left.
820        years_covered = years
821        years_f = t_years - years_covered
822        days_covered = years_covered * 365 + days
823        days_f = t_days - days_covered
824        hours_covered = days_covered * 24 + hours
825        hours_f = t_hours - hours_covered
826        minutes_covered = hours_covered * 60 + minutes
827        minutes_f = t_minutes - minutes_covered
828        seconds_covered = minutes_covered * 60 + seconds
829        seconds_f = t_seconds - seconds_covered
830    else:
831        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
832
833    parts: list[str] = []
834    for part, part_f, suffix in (
835        (years, years_f, 'y'),
836        (days, days_f, 'd'),
837        (hours, hours_f, 'h'),
838        (minutes, minutes_f, 'm'),
839        (seconds, seconds_f, 's'),
840    ):
841        if part or parts or (not parts and suffix == 's'):
842            # Do decimal version only for the last part.
843            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
844                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
845            else:
846                parts.append(f'{part}{suffix}')
847            if len(parts) >= maxparts:
848                break
849    return ' '.join(parts)

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

def ago_str( timeval: datetime.datetime, maxparts: int = 1, now: datetime.datetime | None = None, decimals: int = 0) -> str:
852def ago_str(
853    timeval: datetime.datetime,
854    maxparts: int = 1,
855    now: datetime.datetime | None = None,
856    decimals: int = 0,
857) -> str:
858    """Given a datetime, return a clean human readable 'ago' str.
859
860    Note that this is hard-coded in English so should not be used
861    for visible in-game elements; only tools/etc.
862
863    If now is not passed, efro.util.utc_now() is used.
864    """
865    if now is None:
866        now = utc_now()
867    return (
868        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
869        + ' ago'
870    )

Given a datetime, return a clean human readable 'ago' str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, utc_now() is used.

def split_list(input_list: list[~T], max_length: int) -> list[list[~T]]:
873def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
874    """Split a single list into smaller lists."""
875    return [
876        input_list[i : i + max_length]
877        for i in range(0, len(input_list), max_length)
878    ]

Split a single list into smaller lists.

def extract_flag(args: list[str], name: str) -> bool:
881def extract_flag(args: list[str], name: str) -> bool:
882    """Given a list of args and a flag name, returns whether it is present.
883
884    The arg flag, if present, is removed from the arg list.
885    """
886    from efro.error import CleanError
887
888    count = args.count(name)
889    if count > 1:
890        raise CleanError(f'Flag {name} passed multiple times.')
891    if not count:
892        return False
893    args.remove(name)
894    return True

Given a list of args and a flag name, returns whether it is present.

The arg flag, if present, is removed from the arg list.

def extract_arg(args: list[str], name: str, required: bool = False) -> str | None:
907def extract_arg(
908    args: list[str], name: str, required: bool = False
909) -> str | None:
910    """Given a list of args and an arg name, returns a value.
911
912    The arg flag and value are removed from the arg list.
913    raises CleanErrors on any problems.
914    """
915    from efro.error import CleanError
916
917    count = args.count(name)
918    if not count:
919        if required:
920            raise CleanError(f'Required argument {name} not passed.')
921        return None
922
923    if count > 1:
924        raise CleanError(f'Arg {name} passed multiple times.')
925
926    argindex = args.index(name)
927    if argindex + 1 >= len(args):
928        raise CleanError(f'No value passed after {name} arg.')
929
930    val = args[argindex + 1]
931    del args[argindex : argindex + 2]
932
933    return val

Given a list of args and an arg name, returns a value.

The arg flag and value are removed from the arg list. raises CleanErrors on any problems.