efro.util

Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Small handy bits of functionality."""
  4
  5from __future__ import annotations
  6
  7import os
  8import time
  9import weakref
 10import functools
 11import datetime
 12from enum import Enum
 13from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload, ParamSpec
 14
 15if TYPE_CHECKING:
 16    import asyncio
 17    from typing import Any, Callable, Literal
 18
 19T = TypeVar('T')
 20ValT = TypeVar('ValT')
 21ArgT = TypeVar('ArgT')
 22SelfT = TypeVar('SelfT')
 23RetT = TypeVar('RetT')
 24EnumT = TypeVar('EnumT', bound=Enum)
 25
 26P = ParamSpec('P')
 27
 28
 29class _EmptyObj:
 30    pass
 31
 32
 33# A dead weak-ref should be immutable, right? So we can create exactly
 34# one and return it for all cases that need an empty weak-ref.
 35_g_empty_weak_ref = weakref.ref(_EmptyObj())
 36assert _g_empty_weak_ref() is None
 37
 38# Note to self: adding a special form of partial for when we don't need
 39# to pass further args/kwargs (which I think is most cases). Even though
 40# partial is now type-checked in Mypy (as of Nov 2024) there are still some
 41# pitfalls that this avoids (see func docs below). Perhaps it would make
 42# sense to simply define a Call class for this purpose; it might be more
 43# efficient than wrapping partial anyway (should test this).
 44if TYPE_CHECKING:
 45
 46    def strict_partial(
 47        func: Callable[P, T], *args: P.args, **kwargs: P.kwargs
 48    ) -> Callable[[], T]:
 49        """A version of functools.partial requiring all args to be passed.
 50
 51        This helps avoid pitfalls where a function is wrapped in a
 52        partial but then an extra required arg is added to the function
 53        but no type checking error is triggered at usage sites because
 54        vanilla partial assumes that extra arg will be provided at call
 55        time.
 56
 57        Note: it would seem like this pitfall could also be avoided on
 58        the back end by ensuring that the thing accepting the partial
 59        asks for Callable[[], None] instead of just Callable, but as of
 60        Nov 2024 it seems that Mypy does not support this; it in fact
 61        allows partials to be passed for any callable signature(!).
 62        """
 63        ...
 64
 65else:
 66    strict_partial = functools.partial
 67
 68
 69def explicit_bool(val: bool) -> bool:
 70    """Return a non-inferable boolean value.
 71
 72    Useful to be able to disable blocks of code without type checkers
 73    complaining/etc.
 74    """
 75    # pylint: disable=no-else-return
 76    if TYPE_CHECKING:
 77        # infer this! <boom>
 78        import random
 79
 80        return random.random() < 0.5
 81    else:
 82        return val
 83
 84
 85def snake_case_to_title(val: str) -> str:
 86    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 87    # Kill empty words resulting from leading/trailing/multiple underscores.
 88    return ' '.join(w for w in val.split('_') if w).title()
 89
 90
 91def snake_case_to_camel_case(val: str) -> str:
 92    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 93    # Replace underscores with spaces; capitalize words; kill spaces.
 94    # Not sure about efficiency, but logically simple.
 95    return val.replace('_', ' ').title().replace(' ', '')
 96
 97
 98def check_utc(value: datetime.datetime) -> None:
 99    """Ensure a datetime value is timezone-aware utc."""
100    if value.tzinfo is not datetime.UTC:
101        raise ValueError(
102            'datetime value does not have timezone set as datetime.UTC'
103        )
104
105
106def utc_now() -> datetime.datetime:
107    """Get timezone-aware current utc time.
108
109    Just a shortcut for datetime.datetime.now(datetime.UTC).
110    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
111    times.
112    """
113    return datetime.datetime.now(datetime.UTC)
114
115
116def utc_now_naive() -> datetime.datetime:
117    """Get naive utc time.
118
119    This can be used to replace datetime.utcnow(), which is now deprecated.
120    Most all code should migrate to use timezone-aware times instead of
121    relying on this.
122    """
123    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
124
125
126def utc_from_timestamp_naive(timestamp: float) -> datetime.datetime:
127    """Get a naive utc time from a timestamp.
128
129    This can be used to replace datetime.utcfromtimestamp(), which is now
130    deprecated. Most all code should migrate to use timezone-aware times
131    instead of relying on this.
132    """
133
134    return datetime.datetime.fromtimestamp(timestamp, tz=datetime.UTC).replace(
135        tzinfo=None
136    )
137
138
139def utc_today() -> datetime.datetime:
140    """Get offset-aware midnight in the utc time zone."""
141    now = datetime.datetime.now(datetime.UTC)
142    return datetime.datetime(
143        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
144    )
145
146
147def utc_this_hour() -> datetime.datetime:
148    """Get offset-aware beginning of the current hour in the utc time zone."""
149    now = datetime.datetime.now(datetime.UTC)
150    return datetime.datetime(
151        year=now.year,
152        month=now.month,
153        day=now.day,
154        hour=now.hour,
155        tzinfo=now.tzinfo,
156    )
157
158
159def utc_this_minute() -> datetime.datetime:
160    """Get offset-aware beginning of current minute in the utc time zone."""
161    now = datetime.datetime.now(datetime.UTC)
162    return datetime.datetime(
163        year=now.year,
164        month=now.month,
165        day=now.day,
166        hour=now.hour,
167        minute=now.minute,
168        tzinfo=now.tzinfo,
169    )
170
171
172def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
173    """Return an invalidated weak-reference for the specified type."""
174    # At runtime, all weakrefs are the same; our type arg is just
175    # for the static type checker.
176    del objtype  # Unused.
177
178    # Just create an object and let it die. Is there a cleaner way to do this?
179    # return weakref.ref(_EmptyObj())  # type: ignore
180
181    # Sharing a single ones seems at least a bit better.
182    return _g_empty_weak_ref  # type: ignore
183
184
185def data_size_str(bytecount: int, compact: bool = False) -> str:
186    """Given a size in bytes, returns a short human readable string.
187
188    In compact mode this should be 6 or fewer chars for most all
189    sane file sizes.
190    """
191    # pylint: disable=too-many-return-statements
192
193    # Special case: handle negatives.
194    if bytecount < 0:
195        val = data_size_str(-bytecount, compact=compact)
196        return f'-{val}'
197
198    if bytecount <= 999:
199        suffix = 'B' if compact else 'bytes'
200        return f'{bytecount} {suffix}'
201    kbytecount = bytecount / 1024
202    if round(kbytecount, 1) < 10.0:
203        return f'{kbytecount:.1f} KB'
204    if round(kbytecount, 0) < 999:
205        return f'{kbytecount:.0f} KB'
206    mbytecount = bytecount / (1024 * 1024)
207    if round(mbytecount, 1) < 10.0:
208        return f'{mbytecount:.1f} MB'
209    if round(mbytecount, 0) < 999:
210        return f'{mbytecount:.0f} MB'
211    gbytecount = bytecount / (1024 * 1024 * 1024)
212    if round(gbytecount, 1) < 10.0:
213        return f'{gbytecount:.1f} GB'
214    return f'{gbytecount:.0f} GB'
215
216
217class DirtyBit:
218    """Manages whether a thing is dirty and regulates cleaning it.
219
220    To use, simply set the 'dirty' value on this object to True when
221    some update is needed, and then check the 'should_update' value to
222    regulate when the actual update should occur. Set 'dirty' back to
223    False after a successful update.
224
225    If 'use_lock' is True, an asyncio Lock will be created and
226    incorporated into update attempts to prevent simultaneous updates
227    (should_update will only return True when the lock is unlocked).
228    Note that It is up to the user to lock/unlock the lock during the
229    actual update attempt.
230
231    If a value is passed for 'auto_dirty_seconds', the dirtybit will
232    flip itself back to dirty after being clean for the given amount of
233    time.
234
235    'min_update_interval' can be used to enforce a minimum update
236    interval even when updates are successful (retry_interval only
237    applies when updates fail)
238    """
239
240    def __init__(
241        self,
242        dirty: bool = False,
243        retry_interval: float = 5.0,
244        *,
245        use_lock: bool = False,
246        auto_dirty_seconds: float | None = None,
247        min_update_interval: float | None = None,
248    ):
249        curtime = time.monotonic()
250        self._retry_interval = retry_interval
251        self._auto_dirty_seconds = auto_dirty_seconds
252        self._min_update_interval = min_update_interval
253        self._dirty = dirty
254        self._next_update_time: float | None = curtime if dirty else None
255        self._last_update_time: float | None = None
256        self._next_auto_dirty_time: float | None = (
257            (curtime + self._auto_dirty_seconds)
258            if (not dirty and self._auto_dirty_seconds is not None)
259            else None
260        )
261        self._use_lock = use_lock
262        self.lock: asyncio.Lock
263        if self._use_lock:
264            import asyncio
265
266            self.lock = asyncio.Lock()
267
268    @property
269    def dirty(self) -> bool:
270        """Whether the target is currently dirty.
271
272        This should be set to False once an update is successful.
273        """
274        return self._dirty
275
276    @dirty.setter
277    def dirty(self, value: bool) -> None:
278        # If we're freshly clean, set our next auto-dirty time (if we have
279        # one).
280        if self._dirty and not value and self._auto_dirty_seconds is not None:
281            self._next_auto_dirty_time = (
282                time.monotonic() + self._auto_dirty_seconds
283            )
284
285        # If we're freshly dirty, schedule an immediate update.
286        if not self._dirty and value:
287            self._next_update_time = time.monotonic()
288
289            # If they want to enforce a minimum update interval,
290            # push out the next update time if it hasn't been long enough.
291            if (
292                self._min_update_interval is not None
293                and self._last_update_time is not None
294            ):
295                self._next_update_time = max(
296                    self._next_update_time,
297                    self._last_update_time + self._min_update_interval,
298                )
299
300        self._dirty = value
301
302    @property
303    def should_update(self) -> bool:
304        """Whether an attempt should be made to clean the target now.
305
306        Always returns False if the target is not dirty.
307        Takes into account the amount of time passed since the target
308        was marked dirty or since should_update last returned True.
309        """
310        curtime = time.monotonic()
311
312        # Auto-dirty ourself if we're into that.
313        if (
314            self._next_auto_dirty_time is not None
315            and curtime > self._next_auto_dirty_time
316        ):
317            self.dirty = True
318            self._next_auto_dirty_time = None
319        if not self._dirty:
320            return False
321        if self._use_lock and self.lock.locked():
322            return False
323        assert self._next_update_time is not None
324        if curtime > self._next_update_time:
325            self._next_update_time = curtime + self._retry_interval
326            self._last_update_time = curtime
327            return True
328        return False
329
330
331class DispatchMethodWrapper(Generic[ArgT, RetT]):
332    """Type-aware standin for the dispatch func returned by dispatchmethod."""
333
334    def __call__(self, arg: ArgT) -> RetT:
335        raise RuntimeError('Should not get here')
336
337    @staticmethod
338    def register(
339        func: Callable[[Any, Any], RetT]
340    ) -> Callable[[Any, Any], RetT]:
341        """Register a new dispatch handler for this dispatch-method."""
342        raise RuntimeError('Should not get here')
343
344    registry: dict[Any, Callable]
345
346
347# noinspection PyProtectedMember,PyTypeHints
348def dispatchmethod(
349    func: Callable[[Any, ArgT], RetT]
350) -> DispatchMethodWrapper[ArgT, RetT]:
351    """A variation of functools.singledispatch for methods.
352
353    Note: as of Python 3.9 there is now functools.singledispatchmethod,
354    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
355    which gives us a reason to keep this one around for now.
356    """
357    from functools import singledispatch, update_wrapper
358
359    origwrapper: Any = singledispatch(func)
360
361    # Pull this out so hopefully origwrapper can die,
362    # otherwise we reference origwrapper in our wrapper.
363    dispatch = origwrapper.dispatch
364
365    # All we do here is recreate the end of functools.singledispatch
366    # where it returns a wrapper except instead of the wrapper using the
367    # first arg to the function ours uses the second (to skip 'self').
368    # This was made against Python 3.7; we should probably check up on
369    # this in later versions in case anything has changed.
370    # (or hopefully they'll add this functionality to their version)
371    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
372    def wrapper(*args: Any, **kw: Any) -> Any:
373        if not args or len(args) < 2:
374            raise TypeError(
375                f'{funcname} requires at least ' '2 positional arguments'
376            )
377
378        return dispatch(args[1].__class__)(*args, **kw)
379
380    funcname = getattr(func, '__name__', 'dispatchmethod method')
381    wrapper.register = origwrapper.register  # type: ignore
382    wrapper.dispatch = dispatch  # type: ignore
383    wrapper.registry = origwrapper.registry  # type: ignore
384    # pylint: disable=protected-access
385    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
386    update_wrapper(wrapper, func)
387    # pylint: enable=protected-access
388    return cast(DispatchMethodWrapper, wrapper)
389
390
391def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
392    """Decorator for functions to allow dispatching based on a value.
393
394    This differs from functools.singledispatch in that it dispatches based
395    on the value of an argument, not based on its type.
396    The 'register' method of a value-dispatch function can be used
397    to assign new functions to handle particular values.
398    Unhandled values wind up in the original dispatch function."""
399    return ValueDispatcher(call)
400
401
402class ValueDispatcher(Generic[ValT, RetT]):
403    """Used by the valuedispatch decorator"""
404
405    def __init__(self, call: Callable[[ValT], RetT]) -> None:
406        self._base_call = call
407        self._handlers: dict[ValT, Callable[[], RetT]] = {}
408
409    def __call__(self, value: ValT) -> RetT:
410        handler = self._handlers.get(value)
411        if handler is not None:
412            return handler()
413        return self._base_call(value)
414
415    def _add_handler(
416        self, value: ValT, call: Callable[[], RetT]
417    ) -> Callable[[], RetT]:
418        if value in self._handlers:
419            raise RuntimeError(f'Duplicate handlers added for {value}')
420        self._handlers[value] = call
421        return call
422
423    def register(
424        self, value: ValT
425    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
426        """Add a handler to the dispatcher."""
427        from functools import partial
428
429        return partial(self._add_handler, value)
430
431
432def valuedispatch1arg(
433    call: Callable[[ValT, ArgT], RetT]
434) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
435    """Like valuedispatch but for functions taking an extra argument."""
436    return ValueDispatcher1Arg(call)
437
438
439class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
440    """Used by the valuedispatch1arg decorator"""
441
442    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
443        self._base_call = call
444        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
445
446    def __call__(self, value: ValT, arg: ArgT) -> RetT:
447        handler = self._handlers.get(value)
448        if handler is not None:
449            return handler(arg)
450        return self._base_call(value, arg)
451
452    def _add_handler(
453        self, value: ValT, call: Callable[[ArgT], RetT]
454    ) -> Callable[[ArgT], RetT]:
455        if value in self._handlers:
456            raise RuntimeError(f'Duplicate handlers added for {value}')
457        self._handlers[value] = call
458        return call
459
460    def register(
461        self, value: ValT
462    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
463        """Add a handler to the dispatcher."""
464        from functools import partial
465
466        return partial(self._add_handler, value)
467
468
469if TYPE_CHECKING:
470
471    class ValueDispatcherMethod(Generic[ValT, RetT]):
472        """Used by the valuedispatchmethod decorator."""
473
474        def __call__(self, value: ValT) -> RetT: ...
475
476        def register(
477            self, value: ValT
478        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
479            """Add a handler to the dispatcher."""
480            ...
481
482
483def valuedispatchmethod(
484    call: Callable[[SelfT, ValT], RetT]
485) -> ValueDispatcherMethod[ValT, RetT]:
486    """Like valuedispatch but works with methods instead of functions."""
487
488    # NOTE: It seems that to wrap a method with a decorator and have self
489    # dispatching do the right thing, we must return a function and not
490    # an executable object. So for this version we store our data here
491    # in the function call dict and simply return a call.
492
493    _base_call = call
494    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
495
496    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
497        if value in _handlers:
498            raise RuntimeError(f'Duplicate handlers added for {value}')
499        _handlers[value] = addcall
500
501    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
502        from functools import partial
503
504        return partial(_add_handler, value)
505
506    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
507        handler = _handlers.get(value)
508        if handler is not None:
509            return handler(self)
510        return _base_call(self, value)
511
512    # We still want to use our returned object to register handlers, but we're
513    # actually just returning a function. So manually stuff the call onto it.
514    setattr(_call_wrapper, 'register', _register)
515
516    # To the type checker's eyes we return a ValueDispatchMethod instance;
517    # this lets it know about our register func and type-check its usage.
518    # In reality we just return a raw function call (for reasons listed above).
519    # pylint: disable=undefined-variable, no-else-return
520    if TYPE_CHECKING:
521        return ValueDispatcherMethod[ValT, RetT]()
522    else:
523        return _call_wrapper
524
525
526def make_hash(obj: Any) -> int:
527    """Makes a hash from a dictionary, list, tuple or set to any level,
528    that contains only other hashable types (including any lists, tuples,
529    sets, and dictionaries).
530
531    Note that this uses Python's hash() function internally so collisions/etc.
532    may be more common than with fancy cryptographic hashes.
533
534    Also be aware that Python's hash() output varies across processes, so
535    this should only be used for values that will remain in a single process.
536    """
537    import copy
538
539    if isinstance(obj, (set, tuple, list)):
540        return hash(tuple(make_hash(e) for e in obj))
541    if not isinstance(obj, dict):
542        return hash(obj)
543
544    new_obj = copy.deepcopy(obj)
545    for k, v in new_obj.items():
546        new_obj[k] = make_hash(v)
547
548    # NOTE: there is sorted works correctly because it compares only
549    # unique first values (i.e. dict keys)
550    return hash(tuple(frozenset(sorted(new_obj.items()))))
551
552
553def float_hash_from_string(s: str) -> float:
554    """Given a string value, returns a float between 0 and 1.
555
556    If consistent across processes. Can be useful for assigning db ids
557    shard values for efficient parallel processing.
558    """
559    import hashlib
560
561    hash_bytes = hashlib.md5(s.encode()).digest()
562
563    # Generate a random 64 bit int from hash digest bytes.
564    ival = int.from_bytes(hash_bytes[:8])
565    return ival / ((1 << 64) - 1)
566
567
568def asserttype(obj: Any, typ: type[T]) -> T:
569    """Return an object typed as a given type.
570
571    Assert is used to check its actual type, so only use this when
572    failures are not expected. Otherwise use checktype.
573    """
574    assert isinstance(typ, type), 'only actual types accepted'
575    assert isinstance(obj, typ)
576    return obj
577
578
579def asserttype_o(obj: Any, typ: type[T]) -> T | None:
580    """Return an object typed as a given optional type.
581
582    Assert is used to check its actual type, so only use this when
583    failures are not expected. Otherwise use checktype.
584    """
585    assert isinstance(typ, type), 'only actual types accepted'
586    assert isinstance(obj, (typ, type(None)))
587    return obj
588
589
590def checktype(obj: Any, typ: type[T]) -> T:
591    """Return an object typed as a given type.
592
593    Always checks the type at runtime with isinstance and throws a TypeError
594    on failure. Use asserttype for more efficient (but less safe) equivalent.
595    """
596    assert isinstance(typ, type), 'only actual types accepted'
597    if not isinstance(obj, typ):
598        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
599    return obj
600
601
602def checktype_o(obj: Any, typ: type[T]) -> T | None:
603    """Return an object typed as a given optional type.
604
605    Always checks the type at runtime with isinstance and throws a TypeError
606    on failure. Use asserttype for more efficient (but less safe) equivalent.
607    """
608    assert isinstance(typ, type), 'only actual types accepted'
609    if not isinstance(obj, (typ, type(None))):
610        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
611    return obj
612
613
614def warntype(obj: Any, typ: type[T]) -> T:
615    """Return an object typed as a given type.
616
617    Always checks the type at runtime and simply logs a warning if it is
618    not what is expected.
619    """
620    assert isinstance(typ, type), 'only actual types accepted'
621    if not isinstance(obj, typ):
622        import logging
623
624        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
625    return obj  # type: ignore
626
627
628def warntype_o(obj: Any, typ: type[T]) -> T | None:
629    """Return an object typed as a given type.
630
631    Always checks the type at runtime and simply logs a warning if it is
632    not what is expected.
633    """
634    assert isinstance(typ, type), 'only actual types accepted'
635    if not isinstance(obj, (typ, type(None))):
636        import logging
637
638        logging.warning(
639            'warntype: expected a %s or None, got a %s', typ, type(obj)
640        )
641    return obj  # type: ignore
642
643
644def assert_non_optional(obj: T | None) -> T:
645    """Return an object with Optional typing removed.
646
647    Assert is used to check its actual type, so only use this when
648    failures are not expected. Use check_non_optional otherwise.
649    """
650    assert obj is not None
651    return obj
652
653
654def check_non_optional(obj: T | None) -> T:
655    """Return an object with Optional typing removed.
656
657    Always checks the actual type and throws a TypeError on failure.
658    Use assert_non_optional for a more efficient (but less safe) equivalent.
659    """
660    if obj is None:
661        raise ValueError('Got None value in check_non_optional.')
662    return obj
663
664
665def smoothstep(edge0: float, edge1: float, x: float) -> float:
666    """A smooth transition function.
667
668    Returns a value that smoothly moves from 0 to 1 as we go between edges.
669    Values outside of the range return 0 or 1.
670    """
671    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
672    return y * y * (3.0 - 2.0 * y)
673
674
675def linearstep(edge0: float, edge1: float, x: float) -> float:
676    """A linear transition function.
677
678    Returns a value that linearly moves from 0 to 1 as we go between edges.
679    Values outside of the range return 0 or 1.
680    """
681    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
682
683
684def _compact_id(num: int, chars: str) -> str:
685    if num < 0:
686        raise ValueError('Negative integers not allowed.')
687
688    # Chars must be in sorted order for sorting to work correctly
689    # on our output.
690    assert ''.join(sorted(list(chars))) == chars
691
692    base = len(chars)
693    out = ''
694    while num:
695        out += chars[num % base]
696        num //= base
697    return out[::-1] or '0'
698
699
700def human_readable_compact_id(num: int) -> str:
701    """Given a positive int, return a compact string representation for it.
702
703    Handy for visualizing unique numeric ids using as few as possible chars.
704    This representation uses only lowercase letters and numbers (minus the
705    following letters for readability):
706     's' is excluded due to similarity to '5'.
707     'l' is excluded due to similarity to '1'.
708     'i' is excluded due to similarity to '1'.
709     'o' is excluded due to similarity to '0'.
710     'z' is excluded due to similarity to '2'.
711
712    Therefore for n chars this can store values of 21^n.
713
714    When reading human input consisting of these IDs, it may be desirable
715    to map the disallowed chars to their corresponding allowed ones
716    ('o' -> '0', etc).
717
718    Sort order for these ids is the same as the original numbers.
719
720    If more compactness is desired at the expense of readability,
721    use compact_id() instead.
722    """
723    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
724
725
726def compact_id(num: int) -> str:
727    """Given a positive int, return a compact string representation for it.
728
729    Handy for visualizing unique numeric ids using as few as possible chars.
730    This version is more compact than human_readable_compact_id() but less
731    friendly to humans due to using both capital and lowercase letters,
732    both 'O' and '0', etc.
733
734    Therefore for n chars this can store values of 62^n.
735
736    Sort order for these ids is the same as the original numbers.
737    """
738    return _compact_id(
739        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
740    )
741
742
743def caller_source_location() -> str:
744    """Returns source file name and line of the code calling us.
745
746    Example: 'mymodule.py:23'
747    """
748    try:
749        import inspect
750
751        frame = inspect.currentframe()
752        for _i in range(2):
753            if frame is None:
754                raise RuntimeError()
755            frame = frame.f_back
756        if frame is None:
757            raise RuntimeError()
758        fname = os.path.basename(frame.f_code.co_filename)
759        return f'{fname}:{frame.f_lineno}'
760    except Exception:
761        return '<unknown source location>'
762
763
764def unchanging_hostname() -> str:
765    """Return an unchanging name for the local device.
766
767    Similar to the `hostname` call (or os.uname().nodename in Python)
768    except attempts to give a name that doesn't change depending on
769    network conditions. (A Mac will tend to go from Foo to Foo.local,
770    Foo.lan etc. throughout its various adventures)
771    """
772    import platform
773    import subprocess
774
775    # On Mac, this should give the computer name assigned in System Prefs.
776    if platform.system() == 'Darwin':
777        return (
778            subprocess.run(
779                ['scutil', '--get', 'ComputerName'],
780                check=True,
781                capture_output=True,
782            )
783            .stdout.decode()
784            .strip()
785            .replace(' ', '-')
786        )
787    return os.uname().nodename
788
789
790def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
791    """Do the thing."""
792    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
793        return
794
795    modulename = module_globals.get('__name__')
796    if not isinstance(modulename, str):
797        raise RuntimeError('Unable to get module name.')
798    assert not modulename.startswith('_')
799    modulename_prefix = f'{modulename}.'
800    modulename_prefix_2 = f'_{modulename}.'
801
802    for name, obj in module_globals.items():
803        if name.startswith('_'):
804            continue
805        existing = getattr(obj, '__module__', None)
806        try:
807            # Override the module ONLY if it lives under us somewhere.
808            # So ourpackage._submodule.Foo becomes ourpackage.Foo
809            # but otherpackage._submodule.Foo remains untouched.
810            if existing is not None and (
811                existing.startswith(modulename_prefix)
812                or existing.startswith(modulename_prefix_2)
813            ):
814                obj.__module__ = modulename
815        except Exception:
816            import logging
817
818            logging.warning(
819                'set_canonical_module_names: unable to change __module__'
820                " from '%s' to '%s' on %s object at '%s'.",
821                existing,
822                modulename,
823                type(obj),
824                name,
825            )
826
827
828def timedelta_str(
829    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
830) -> str:
831    """Return a simple human readable time string for a length of time.
832
833    Time can be given as a timedelta or a float representing seconds.
834    Example output:
835      "23d 1h 2m 32s" (with maxparts == 4)
836      "23d 1h" (with maxparts == 2)
837      "23d 1.08h" (with maxparts == 2 and decimals == 2)
838
839    Note that this is hard-coded in English and probably not especially
840    performant.
841    """
842    # pylint: disable=too-many-locals
843
844    if isinstance(timeval, float):
845        timevalfin = datetime.timedelta(seconds=timeval)
846    else:
847        timevalfin = timeval
848
849    # Internally we only handle positive values.
850    if timevalfin.total_seconds() < 0:
851        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
852
853    years = timevalfin.days // 365
854    days = timevalfin.days % 365
855    hours = timevalfin.seconds // 3600
856    hour_remainder = timevalfin.seconds % 3600
857    minutes = hour_remainder // 60
858    seconds = hour_remainder % 60
859
860    # Now, if we want decimal places for our last value,
861    # calc fractional parts.
862    if decimals:
863        # Calc totals of each type.
864        t_seconds = timevalfin.total_seconds()
865        t_minutes = t_seconds / 60
866        t_hours = t_minutes / 60
867        t_days = t_hours / 24
868        t_years = t_days / 365
869
870        # Calc fractional parts that exclude all whole values to their left.
871        years_covered = years
872        years_f = t_years - years_covered
873        days_covered = years_covered * 365 + days
874        days_f = t_days - days_covered
875        hours_covered = days_covered * 24 + hours
876        hours_f = t_hours - hours_covered
877        minutes_covered = hours_covered * 60 + minutes
878        minutes_f = t_minutes - minutes_covered
879        seconds_covered = minutes_covered * 60 + seconds
880        seconds_f = t_seconds - seconds_covered
881    else:
882        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
883
884    parts: list[str] = []
885    for part, part_f, suffix in (
886        (years, years_f, 'y'),
887        (days, days_f, 'd'),
888        (hours, hours_f, 'h'),
889        (minutes, minutes_f, 'm'),
890        (seconds, seconds_f, 's'),
891    ):
892        if part or parts or (not parts and suffix == 's'):
893            # Do decimal version only for the last part.
894            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
895                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
896            else:
897                parts.append(f'{part}{suffix}')
898            if len(parts) >= maxparts:
899                break
900    return ' '.join(parts)
901
902
903def ago_str(
904    timeval: datetime.datetime,
905    maxparts: int = 1,
906    now: datetime.datetime | None = None,
907    decimals: int = 0,
908) -> str:
909    """Given a datetime, return a clean human readable 'ago' str.
910
911    Note that this is hard-coded in English so should not be used
912    for visible in-game elements; only tools/etc.
913
914    If now is not passed, efro.util.utc_now() is used.
915    """
916    if now is None:
917        now = utc_now()
918    return (
919        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
920        + ' ago'
921    )
922
923
924def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
925    """Split a single list into smaller lists."""
926    return [
927        input_list[i : i + max_length]
928        for i in range(0, len(input_list), max_length)
929    ]
930
931
932def extract_flag(args: list[str], name: str) -> bool:
933    """Given a list of args and a flag name, returns whether it is present.
934
935    The arg flag, if present, is removed from the arg list.
936    """
937    from efro.error import CleanError
938
939    count = args.count(name)
940    if count > 1:
941        raise CleanError(f'Flag {name} passed multiple times.')
942    if not count:
943        return False
944    args.remove(name)
945    return True
946
947
948@overload
949def extract_arg(
950    args: list[str], name: str, required: Literal[False] = False
951) -> str | None: ...
952
953
954@overload
955def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ...
956
957
958def extract_arg(
959    args: list[str], name: str, required: bool = False
960) -> str | None:
961    """Given a list of args and an arg name, returns a value.
962
963    The arg flag and value are removed from the arg list.
964    raises CleanErrors on any problems.
965    """
966    from efro.error import CleanError
967
968    count = args.count(name)
969    if not count:
970        if required:
971            raise CleanError(f'Required argument {name} not passed.')
972        return None
973
974    if count > 1:
975        raise CleanError(f'Arg {name} passed multiple times.')
976
977    argindex = args.index(name)
978    if argindex + 1 >= len(args):
979        raise CleanError(f'No value passed after {name} arg.')
980
981    val = args[argindex + 1]
982    del args[argindex : argindex + 2]
983
984    return val
P = ~P
def explicit_bool(val: bool) -> bool:
70def explicit_bool(val: bool) -> bool:
71    """Return a non-inferable boolean value.
72
73    Useful to be able to disable blocks of code without type checkers
74    complaining/etc.
75    """
76    # pylint: disable=no-else-return
77    if TYPE_CHECKING:
78        # infer this! <boom>
79        import random
80
81        return random.random() < 0.5
82    else:
83        return val

Return a non-inferable boolean value.

Useful to be able to disable blocks of code without type checkers complaining/etc.

def snake_case_to_title(val: str) -> str:
86def snake_case_to_title(val: str) -> str:
87    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
88    # Kill empty words resulting from leading/trailing/multiple underscores.
89    return ' '.join(w for w in val.split('_') if w).title()

Given a snake-case string 'foo_bar', returns 'Foo Bar'.

def snake_case_to_camel_case(val: str) -> str:
92def snake_case_to_camel_case(val: str) -> str:
93    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
94    # Replace underscores with spaces; capitalize words; kill spaces.
95    # Not sure about efficiency, but logically simple.
96    return val.replace('_', ' ').title().replace(' ', '')

Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.

def check_utc(value: datetime.datetime) -> None:
 99def check_utc(value: datetime.datetime) -> None:
100    """Ensure a datetime value is timezone-aware utc."""
101    if value.tzinfo is not datetime.UTC:
102        raise ValueError(
103            'datetime value does not have timezone set as datetime.UTC'
104        )

Ensure a datetime value is timezone-aware utc.

def utc_now() -> datetime.datetime:
107def utc_now() -> datetime.datetime:
108    """Get timezone-aware current utc time.
109
110    Just a shortcut for datetime.datetime.now(datetime.UTC).
111    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
112    times.
113    """
114    return datetime.datetime.now(datetime.UTC)

Get timezone-aware current utc time.

Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.

def utc_now_naive() -> datetime.datetime:
117def utc_now_naive() -> datetime.datetime:
118    """Get naive utc time.
119
120    This can be used to replace datetime.utcnow(), which is now deprecated.
121    Most all code should migrate to use timezone-aware times instead of
122    relying on this.
123    """
124    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)

Get naive utc time.

This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.

def utc_from_timestamp_naive(timestamp: float) -> datetime.datetime:
127def utc_from_timestamp_naive(timestamp: float) -> datetime.datetime:
128    """Get a naive utc time from a timestamp.
129
130    This can be used to replace datetime.utcfromtimestamp(), which is now
131    deprecated. Most all code should migrate to use timezone-aware times
132    instead of relying on this.
133    """
134
135    return datetime.datetime.fromtimestamp(timestamp, tz=datetime.UTC).replace(
136        tzinfo=None
137    )

Get a naive utc time from a timestamp.

This can be used to replace datetime.utcfromtimestamp(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.

def utc_today() -> datetime.datetime:
140def utc_today() -> datetime.datetime:
141    """Get offset-aware midnight in the utc time zone."""
142    now = datetime.datetime.now(datetime.UTC)
143    return datetime.datetime(
144        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
145    )

Get offset-aware midnight in the utc time zone.

def utc_this_hour() -> datetime.datetime:
148def utc_this_hour() -> datetime.datetime:
149    """Get offset-aware beginning of the current hour in the utc time zone."""
150    now = datetime.datetime.now(datetime.UTC)
151    return datetime.datetime(
152        year=now.year,
153        month=now.month,
154        day=now.day,
155        hour=now.hour,
156        tzinfo=now.tzinfo,
157    )

Get offset-aware beginning of the current hour in the utc time zone.

def utc_this_minute() -> datetime.datetime:
160def utc_this_minute() -> datetime.datetime:
161    """Get offset-aware beginning of current minute in the utc time zone."""
162    now = datetime.datetime.now(datetime.UTC)
163    return datetime.datetime(
164        year=now.year,
165        month=now.month,
166        day=now.day,
167        hour=now.hour,
168        minute=now.minute,
169        tzinfo=now.tzinfo,
170    )

Get offset-aware beginning of current minute in the utc time zone.

def empty_weakref(objtype: type[~T]) -> weakref.ReferenceType[~T]:
173def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
174    """Return an invalidated weak-reference for the specified type."""
175    # At runtime, all weakrefs are the same; our type arg is just
176    # for the static type checker.
177    del objtype  # Unused.
178
179    # Just create an object and let it die. Is there a cleaner way to do this?
180    # return weakref.ref(_EmptyObj())  # type: ignore
181
182    # Sharing a single ones seems at least a bit better.
183    return _g_empty_weak_ref  # type: ignore

Return an invalidated weak-reference for the specified type.

def data_size_str(bytecount: int, compact: bool = False) -> str:
186def data_size_str(bytecount: int, compact: bool = False) -> str:
187    """Given a size in bytes, returns a short human readable string.
188
189    In compact mode this should be 6 or fewer chars for most all
190    sane file sizes.
191    """
192    # pylint: disable=too-many-return-statements
193
194    # Special case: handle negatives.
195    if bytecount < 0:
196        val = data_size_str(-bytecount, compact=compact)
197        return f'-{val}'
198
199    if bytecount <= 999:
200        suffix = 'B' if compact else 'bytes'
201        return f'{bytecount} {suffix}'
202    kbytecount = bytecount / 1024
203    if round(kbytecount, 1) < 10.0:
204        return f'{kbytecount:.1f} KB'
205    if round(kbytecount, 0) < 999:
206        return f'{kbytecount:.0f} KB'
207    mbytecount = bytecount / (1024 * 1024)
208    if round(mbytecount, 1) < 10.0:
209        return f'{mbytecount:.1f} MB'
210    if round(mbytecount, 0) < 999:
211        return f'{mbytecount:.0f} MB'
212    gbytecount = bytecount / (1024 * 1024 * 1024)
213    if round(gbytecount, 1) < 10.0:
214        return f'{gbytecount:.1f} GB'
215    return f'{gbytecount:.0f} GB'

Given a size in bytes, returns a short human readable string.

In compact mode this should be 6 or fewer chars for most all sane file sizes.

class DirtyBit:
218class DirtyBit:
219    """Manages whether a thing is dirty and regulates cleaning it.
220
221    To use, simply set the 'dirty' value on this object to True when
222    some update is needed, and then check the 'should_update' value to
223    regulate when the actual update should occur. Set 'dirty' back to
224    False after a successful update.
225
226    If 'use_lock' is True, an asyncio Lock will be created and
227    incorporated into update attempts to prevent simultaneous updates
228    (should_update will only return True when the lock is unlocked).
229    Note that It is up to the user to lock/unlock the lock during the
230    actual update attempt.
231
232    If a value is passed for 'auto_dirty_seconds', the dirtybit will
233    flip itself back to dirty after being clean for the given amount of
234    time.
235
236    'min_update_interval' can be used to enforce a minimum update
237    interval even when updates are successful (retry_interval only
238    applies when updates fail)
239    """
240
241    def __init__(
242        self,
243        dirty: bool = False,
244        retry_interval: float = 5.0,
245        *,
246        use_lock: bool = False,
247        auto_dirty_seconds: float | None = None,
248        min_update_interval: float | None = None,
249    ):
250        curtime = time.monotonic()
251        self._retry_interval = retry_interval
252        self._auto_dirty_seconds = auto_dirty_seconds
253        self._min_update_interval = min_update_interval
254        self._dirty = dirty
255        self._next_update_time: float | None = curtime if dirty else None
256        self._last_update_time: float | None = None
257        self._next_auto_dirty_time: float | None = (
258            (curtime + self._auto_dirty_seconds)
259            if (not dirty and self._auto_dirty_seconds is not None)
260            else None
261        )
262        self._use_lock = use_lock
263        self.lock: asyncio.Lock
264        if self._use_lock:
265            import asyncio
266
267            self.lock = asyncio.Lock()
268
269    @property
270    def dirty(self) -> bool:
271        """Whether the target is currently dirty.
272
273        This should be set to False once an update is successful.
274        """
275        return self._dirty
276
277    @dirty.setter
278    def dirty(self, value: bool) -> None:
279        # If we're freshly clean, set our next auto-dirty time (if we have
280        # one).
281        if self._dirty and not value and self._auto_dirty_seconds is not None:
282            self._next_auto_dirty_time = (
283                time.monotonic() + self._auto_dirty_seconds
284            )
285
286        # If we're freshly dirty, schedule an immediate update.
287        if not self._dirty and value:
288            self._next_update_time = time.monotonic()
289
290            # If they want to enforce a minimum update interval,
291            # push out the next update time if it hasn't been long enough.
292            if (
293                self._min_update_interval is not None
294                and self._last_update_time is not None
295            ):
296                self._next_update_time = max(
297                    self._next_update_time,
298                    self._last_update_time + self._min_update_interval,
299                )
300
301        self._dirty = value
302
303    @property
304    def should_update(self) -> bool:
305        """Whether an attempt should be made to clean the target now.
306
307        Always returns False if the target is not dirty.
308        Takes into account the amount of time passed since the target
309        was marked dirty or since should_update last returned True.
310        """
311        curtime = time.monotonic()
312
313        # Auto-dirty ourself if we're into that.
314        if (
315            self._next_auto_dirty_time is not None
316            and curtime > self._next_auto_dirty_time
317        ):
318            self.dirty = True
319            self._next_auto_dirty_time = None
320        if not self._dirty:
321            return False
322        if self._use_lock and self.lock.locked():
323            return False
324        assert self._next_update_time is not None
325        if curtime > self._next_update_time:
326            self._next_update_time = curtime + self._retry_interval
327            self._last_update_time = curtime
328            return True
329        return False

Manages whether a thing is dirty and regulates cleaning it.

To use, simply set the 'dirty' value on this object to True when some update is needed, and then check the 'should_update' value to regulate when the actual update should occur. Set 'dirty' back to False after a successful update.

If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt.

If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time.

'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

DirtyBit( dirty: bool = False, retry_interval: float = 5.0, *, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)
241    def __init__(
242        self,
243        dirty: bool = False,
244        retry_interval: float = 5.0,
245        *,
246        use_lock: bool = False,
247        auto_dirty_seconds: float | None = None,
248        min_update_interval: float | None = None,
249    ):
250        curtime = time.monotonic()
251        self._retry_interval = retry_interval
252        self._auto_dirty_seconds = auto_dirty_seconds
253        self._min_update_interval = min_update_interval
254        self._dirty = dirty
255        self._next_update_time: float | None = curtime if dirty else None
256        self._last_update_time: float | None = None
257        self._next_auto_dirty_time: float | None = (
258            (curtime + self._auto_dirty_seconds)
259            if (not dirty and self._auto_dirty_seconds is not None)
260            else None
261        )
262        self._use_lock = use_lock
263        self.lock: asyncio.Lock
264        if self._use_lock:
265            import asyncio
266
267            self.lock = asyncio.Lock()
lock: asyncio.locks.Lock
dirty: bool
269    @property
270    def dirty(self) -> bool:
271        """Whether the target is currently dirty.
272
273        This should be set to False once an update is successful.
274        """
275        return self._dirty

Whether the target is currently dirty.

This should be set to False once an update is successful.

should_update: bool
303    @property
304    def should_update(self) -> bool:
305        """Whether an attempt should be made to clean the target now.
306
307        Always returns False if the target is not dirty.
308        Takes into account the amount of time passed since the target
309        was marked dirty or since should_update last returned True.
310        """
311        curtime = time.monotonic()
312
313        # Auto-dirty ourself if we're into that.
314        if (
315            self._next_auto_dirty_time is not None
316            and curtime > self._next_auto_dirty_time
317        ):
318            self.dirty = True
319            self._next_auto_dirty_time = None
320        if not self._dirty:
321            return False
322        if self._use_lock and self.lock.locked():
323            return False
324        assert self._next_update_time is not None
325        if curtime > self._next_update_time:
326            self._next_update_time = curtime + self._retry_interval
327            self._last_update_time = curtime
328            return True
329        return False

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class DispatchMethodWrapper(typing.Generic[~ArgT, ~RetT]):
332class DispatchMethodWrapper(Generic[ArgT, RetT]):
333    """Type-aware standin for the dispatch func returned by dispatchmethod."""
334
335    def __call__(self, arg: ArgT) -> RetT:
336        raise RuntimeError('Should not get here')
337
338    @staticmethod
339    def register(
340        func: Callable[[Any, Any], RetT]
341    ) -> Callable[[Any, Any], RetT]:
342        """Register a new dispatch handler for this dispatch-method."""
343        raise RuntimeError('Should not get here')
344
345    registry: dict[Any, Callable]

Type-aware standin for the dispatch func returned by dispatchmethod.

@staticmethod
def register(func: Callable[[Any, Any], ~RetT]) -> Callable[[Any, Any], ~RetT]:
338    @staticmethod
339    def register(
340        func: Callable[[Any, Any], RetT]
341    ) -> Callable[[Any, Any], RetT]:
342        """Register a new dispatch handler for this dispatch-method."""
343        raise RuntimeError('Should not get here')

Register a new dispatch handler for this dispatch-method.

registry: dict[typing.Any, typing.Callable]
def dispatchmethod( func: Callable[[Any, ~ArgT], ~RetT]) -> DispatchMethodWrapper[~ArgT, ~RetT]:
349def dispatchmethod(
350    func: Callable[[Any, ArgT], RetT]
351) -> DispatchMethodWrapper[ArgT, RetT]:
352    """A variation of functools.singledispatch for methods.
353
354    Note: as of Python 3.9 there is now functools.singledispatchmethod,
355    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
356    which gives us a reason to keep this one around for now.
357    """
358    from functools import singledispatch, update_wrapper
359
360    origwrapper: Any = singledispatch(func)
361
362    # Pull this out so hopefully origwrapper can die,
363    # otherwise we reference origwrapper in our wrapper.
364    dispatch = origwrapper.dispatch
365
366    # All we do here is recreate the end of functools.singledispatch
367    # where it returns a wrapper except instead of the wrapper using the
368    # first arg to the function ours uses the second (to skip 'self').
369    # This was made against Python 3.7; we should probably check up on
370    # this in later versions in case anything has changed.
371    # (or hopefully they'll add this functionality to their version)
372    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
373    def wrapper(*args: Any, **kw: Any) -> Any:
374        if not args or len(args) < 2:
375            raise TypeError(
376                f'{funcname} requires at least ' '2 positional arguments'
377            )
378
379        return dispatch(args[1].__class__)(*args, **kw)
380
381    funcname = getattr(func, '__name__', 'dispatchmethod method')
382    wrapper.register = origwrapper.register  # type: ignore
383    wrapper.dispatch = dispatch  # type: ignore
384    wrapper.registry = origwrapper.registry  # type: ignore
385    # pylint: disable=protected-access
386    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
387    update_wrapper(wrapper, func)
388    # pylint: enable=protected-access
389    return cast(DispatchMethodWrapper, wrapper)

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

def valuedispatch( call: Callable[[~ValT], ~RetT]) -> ValueDispatcher[~ValT, ~RetT]:
392def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
393    """Decorator for functions to allow dispatching based on a value.
394
395    This differs from functools.singledispatch in that it dispatches based
396    on the value of an argument, not based on its type.
397    The 'register' method of a value-dispatch function can be used
398    to assign new functions to handle particular values.
399    Unhandled values wind up in the original dispatch function."""
400    return ValueDispatcher(call)

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

class ValueDispatcher(typing.Generic[~ValT, ~RetT]):
403class ValueDispatcher(Generic[ValT, RetT]):
404    """Used by the valuedispatch decorator"""
405
406    def __init__(self, call: Callable[[ValT], RetT]) -> None:
407        self._base_call = call
408        self._handlers: dict[ValT, Callable[[], RetT]] = {}
409
410    def __call__(self, value: ValT) -> RetT:
411        handler = self._handlers.get(value)
412        if handler is not None:
413            return handler()
414        return self._base_call(value)
415
416    def _add_handler(
417        self, value: ValT, call: Callable[[], RetT]
418    ) -> Callable[[], RetT]:
419        if value in self._handlers:
420            raise RuntimeError(f'Duplicate handlers added for {value}')
421        self._handlers[value] = call
422        return call
423
424    def register(
425        self, value: ValT
426    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
427        """Add a handler to the dispatcher."""
428        from functools import partial
429
430        return partial(self._add_handler, value)

Used by the valuedispatch decorator

ValueDispatcher(call: Callable[[~ValT], ~RetT])
406    def __init__(self, call: Callable[[ValT], RetT]) -> None:
407        self._base_call = call
408        self._handlers: dict[ValT, Callable[[], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[], ~RetT]], Callable[[], ~RetT]]:
424    def register(
425        self, value: ValT
426    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
427        """Add a handler to the dispatcher."""
428        from functools import partial
429
430        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatch1arg( call: Callable[[~ValT, ~ArgT], ~RetT]) -> ValueDispatcher1Arg[~ValT, ~ArgT, ~RetT]:
433def valuedispatch1arg(
434    call: Callable[[ValT, ArgT], RetT]
435) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
436    """Like valuedispatch but for functions taking an extra argument."""
437    return ValueDispatcher1Arg(call)

Like valuedispatch but for functions taking an extra argument.

class ValueDispatcher1Arg(typing.Generic[~ValT, ~ArgT, ~RetT]):
440class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
441    """Used by the valuedispatch1arg decorator"""
442
443    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
444        self._base_call = call
445        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
446
447    def __call__(self, value: ValT, arg: ArgT) -> RetT:
448        handler = self._handlers.get(value)
449        if handler is not None:
450            return handler(arg)
451        return self._base_call(value, arg)
452
453    def _add_handler(
454        self, value: ValT, call: Callable[[ArgT], RetT]
455    ) -> Callable[[ArgT], RetT]:
456        if value in self._handlers:
457            raise RuntimeError(f'Duplicate handlers added for {value}')
458        self._handlers[value] = call
459        return call
460
461    def register(
462        self, value: ValT
463    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
464        """Add a handler to the dispatcher."""
465        from functools import partial
466
467        return partial(self._add_handler, value)

Used by the valuedispatch1arg decorator

ValueDispatcher1Arg(call: Callable[[~ValT, ~ArgT], ~RetT])
443    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
444        self._base_call = call
445        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[~ArgT], ~RetT]], Callable[[~ArgT], ~RetT]]:
461    def register(
462        self, value: ValT
463    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
464        """Add a handler to the dispatcher."""
465        from functools import partial
466
467        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatchmethod( call: Callable[[~SelfT, ~ValT], ~RetT]) -> efro.util.ValueDispatcherMethod[~ValT, ~RetT]:
484def valuedispatchmethod(
485    call: Callable[[SelfT, ValT], RetT]
486) -> ValueDispatcherMethod[ValT, RetT]:
487    """Like valuedispatch but works with methods instead of functions."""
488
489    # NOTE: It seems that to wrap a method with a decorator and have self
490    # dispatching do the right thing, we must return a function and not
491    # an executable object. So for this version we store our data here
492    # in the function call dict and simply return a call.
493
494    _base_call = call
495    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
496
497    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
498        if value in _handlers:
499            raise RuntimeError(f'Duplicate handlers added for {value}')
500        _handlers[value] = addcall
501
502    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
503        from functools import partial
504
505        return partial(_add_handler, value)
506
507    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
508        handler = _handlers.get(value)
509        if handler is not None:
510            return handler(self)
511        return _base_call(self, value)
512
513    # We still want to use our returned object to register handlers, but we're
514    # actually just returning a function. So manually stuff the call onto it.
515    setattr(_call_wrapper, 'register', _register)
516
517    # To the type checker's eyes we return a ValueDispatchMethod instance;
518    # this lets it know about our register func and type-check its usage.
519    # In reality we just return a raw function call (for reasons listed above).
520    # pylint: disable=undefined-variable, no-else-return
521    if TYPE_CHECKING:
522        return ValueDispatcherMethod[ValT, RetT]()
523    else:
524        return _call_wrapper

Like valuedispatch but works with methods instead of functions.

def make_hash(obj: Any) -> int:
527def make_hash(obj: Any) -> int:
528    """Makes a hash from a dictionary, list, tuple or set to any level,
529    that contains only other hashable types (including any lists, tuples,
530    sets, and dictionaries).
531
532    Note that this uses Python's hash() function internally so collisions/etc.
533    may be more common than with fancy cryptographic hashes.
534
535    Also be aware that Python's hash() output varies across processes, so
536    this should only be used for values that will remain in a single process.
537    """
538    import copy
539
540    if isinstance(obj, (set, tuple, list)):
541        return hash(tuple(make_hash(e) for e in obj))
542    if not isinstance(obj, dict):
543        return hash(obj)
544
545    new_obj = copy.deepcopy(obj)
546    for k, v in new_obj.items():
547        new_obj[k] = make_hash(v)
548
549    # NOTE: there is sorted works correctly because it compares only
550    # unique first values (i.e. dict keys)
551    return hash(tuple(frozenset(sorted(new_obj.items()))))

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.

def float_hash_from_string(s: str) -> float:
554def float_hash_from_string(s: str) -> float:
555    """Given a string value, returns a float between 0 and 1.
556
557    If consistent across processes. Can be useful for assigning db ids
558    shard values for efficient parallel processing.
559    """
560    import hashlib
561
562    hash_bytes = hashlib.md5(s.encode()).digest()
563
564    # Generate a random 64 bit int from hash digest bytes.
565    ival = int.from_bytes(hash_bytes[:8])
566    return ival / ((1 << 64) - 1)

Given a string value, returns a float between 0 and 1.

If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.

def asserttype(obj: Any, typ: type[~T]) -> ~T:
569def asserttype(obj: Any, typ: type[T]) -> T:
570    """Return an object typed as a given type.
571
572    Assert is used to check its actual type, so only use this when
573    failures are not expected. Otherwise use checktype.
574    """
575    assert isinstance(typ, type), 'only actual types accepted'
576    assert isinstance(obj, typ)
577    return obj

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def asserttype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
580def asserttype_o(obj: Any, typ: type[T]) -> T | None:
581    """Return an object typed as a given optional type.
582
583    Assert is used to check its actual type, so only use this when
584    failures are not expected. Otherwise use checktype.
585    """
586    assert isinstance(typ, type), 'only actual types accepted'
587    assert isinstance(obj, (typ, type(None)))
588    return obj

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def checktype(obj: Any, typ: type[~T]) -> ~T:
591def checktype(obj: Any, typ: type[T]) -> T:
592    """Return an object typed as a given type.
593
594    Always checks the type at runtime with isinstance and throws a TypeError
595    on failure. Use asserttype for more efficient (but less safe) equivalent.
596    """
597    assert isinstance(typ, type), 'only actual types accepted'
598    if not isinstance(obj, typ):
599        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
600    return obj

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def checktype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
603def checktype_o(obj: Any, typ: type[T]) -> T | None:
604    """Return an object typed as a given optional type.
605
606    Always checks the type at runtime with isinstance and throws a TypeError
607    on failure. Use asserttype for more efficient (but less safe) equivalent.
608    """
609    assert isinstance(typ, type), 'only actual types accepted'
610    if not isinstance(obj, (typ, type(None))):
611        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
612    return obj

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def warntype(obj: Any, typ: type[~T]) -> ~T:
615def warntype(obj: Any, typ: type[T]) -> T:
616    """Return an object typed as a given type.
617
618    Always checks the type at runtime and simply logs a warning if it is
619    not what is expected.
620    """
621    assert isinstance(typ, type), 'only actual types accepted'
622    if not isinstance(obj, typ):
623        import logging
624
625        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
626    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def warntype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
629def warntype_o(obj: Any, typ: type[T]) -> T | None:
630    """Return an object typed as a given type.
631
632    Always checks the type at runtime and simply logs a warning if it is
633    not what is expected.
634    """
635    assert isinstance(typ, type), 'only actual types accepted'
636    if not isinstance(obj, (typ, type(None))):
637        import logging
638
639        logging.warning(
640            'warntype: expected a %s or None, got a %s', typ, type(obj)
641        )
642    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def assert_non_optional(obj: Optional[~T]) -> ~T:
645def assert_non_optional(obj: T | None) -> T:
646    """Return an object with Optional typing removed.
647
648    Assert is used to check its actual type, so only use this when
649    failures are not expected. Use check_non_optional otherwise.
650    """
651    assert obj is not None
652    return obj

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

def check_non_optional(obj: Optional[~T]) -> ~T:
655def check_non_optional(obj: T | None) -> T:
656    """Return an object with Optional typing removed.
657
658    Always checks the actual type and throws a TypeError on failure.
659    Use assert_non_optional for a more efficient (but less safe) equivalent.
660    """
661    if obj is None:
662        raise ValueError('Got None value in check_non_optional.')
663    return obj

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

def smoothstep(edge0: float, edge1: float, x: float) -> float:
666def smoothstep(edge0: float, edge1: float, x: float) -> float:
667    """A smooth transition function.
668
669    Returns a value that smoothly moves from 0 to 1 as we go between edges.
670    Values outside of the range return 0 or 1.
671    """
672    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
673    return y * y * (3.0 - 2.0 * y)

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def linearstep(edge0: float, edge1: float, x: float) -> float:
676def linearstep(edge0: float, edge1: float, x: float) -> float:
677    """A linear transition function.
678
679    Returns a value that linearly moves from 0 to 1 as we go between edges.
680    Values outside of the range return 0 or 1.
681    """
682    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def human_readable_compact_id(num: int) -> str:
701def human_readable_compact_id(num: int) -> str:
702    """Given a positive int, return a compact string representation for it.
703
704    Handy for visualizing unique numeric ids using as few as possible chars.
705    This representation uses only lowercase letters and numbers (minus the
706    following letters for readability):
707     's' is excluded due to similarity to '5'.
708     'l' is excluded due to similarity to '1'.
709     'i' is excluded due to similarity to '1'.
710     'o' is excluded due to similarity to '0'.
711     'z' is excluded due to similarity to '2'.
712
713    Therefore for n chars this can store values of 21^n.
714
715    When reading human input consisting of these IDs, it may be desirable
716    to map the disallowed chars to their corresponding allowed ones
717    ('o' -> '0', etc).
718
719    Sort order for these ids is the same as the original numbers.
720
721    If more compactness is desired at the expense of readability,
722    use compact_id() instead.
723    """
724    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

def compact_id(num: int) -> str:
727def compact_id(num: int) -> str:
728    """Given a positive int, return a compact string representation for it.
729
730    Handy for visualizing unique numeric ids using as few as possible chars.
731    This version is more compact than human_readable_compact_id() but less
732    friendly to humans due to using both capital and lowercase letters,
733    both 'O' and '0', etc.
734
735    Therefore for n chars this can store values of 62^n.
736
737    Sort order for these ids is the same as the original numbers.
738    """
739    return _compact_id(
740        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
741    )

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

def caller_source_location() -> str:
744def caller_source_location() -> str:
745    """Returns source file name and line of the code calling us.
746
747    Example: 'mymodule.py:23'
748    """
749    try:
750        import inspect
751
752        frame = inspect.currentframe()
753        for _i in range(2):
754            if frame is None:
755                raise RuntimeError()
756            frame = frame.f_back
757        if frame is None:
758            raise RuntimeError()
759        fname = os.path.basename(frame.f_code.co_filename)
760        return f'{fname}:{frame.f_lineno}'
761    except Exception:
762        return '<unknown source location>'

Returns source file name and line of the code calling us.

Example: 'mymodule.py:23'

def unchanging_hostname() -> str:
765def unchanging_hostname() -> str:
766    """Return an unchanging name for the local device.
767
768    Similar to the `hostname` call (or os.uname().nodename in Python)
769    except attempts to give a name that doesn't change depending on
770    network conditions. (A Mac will tend to go from Foo to Foo.local,
771    Foo.lan etc. throughout its various adventures)
772    """
773    import platform
774    import subprocess
775
776    # On Mac, this should give the computer name assigned in System Prefs.
777    if platform.system() == 'Darwin':
778        return (
779            subprocess.run(
780                ['scutil', '--get', 'ComputerName'],
781                check=True,
782                capture_output=True,
783            )
784            .stdout.decode()
785            .strip()
786            .replace(' ', '-')
787        )
788    return os.uname().nodename

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn't change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

def set_canonical_module_names(module_globals: dict[str, typing.Any]) -> None:
791def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
792    """Do the thing."""
793    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
794        return
795
796    modulename = module_globals.get('__name__')
797    if not isinstance(modulename, str):
798        raise RuntimeError('Unable to get module name.')
799    assert not modulename.startswith('_')
800    modulename_prefix = f'{modulename}.'
801    modulename_prefix_2 = f'_{modulename}.'
802
803    for name, obj in module_globals.items():
804        if name.startswith('_'):
805            continue
806        existing = getattr(obj, '__module__', None)
807        try:
808            # Override the module ONLY if it lives under us somewhere.
809            # So ourpackage._submodule.Foo becomes ourpackage.Foo
810            # but otherpackage._submodule.Foo remains untouched.
811            if existing is not None and (
812                existing.startswith(modulename_prefix)
813                or existing.startswith(modulename_prefix_2)
814            ):
815                obj.__module__ = modulename
816        except Exception:
817            import logging
818
819            logging.warning(
820                'set_canonical_module_names: unable to change __module__'
821                " from '%s' to '%s' on %s object at '%s'.",
822                existing,
823                modulename,
824                type(obj),
825                name,
826            )

Do the thing.

def timedelta_str( timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0) -> str:
829def timedelta_str(
830    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
831) -> str:
832    """Return a simple human readable time string for a length of time.
833
834    Time can be given as a timedelta or a float representing seconds.
835    Example output:
836      "23d 1h 2m 32s" (with maxparts == 4)
837      "23d 1h" (with maxparts == 2)
838      "23d 1.08h" (with maxparts == 2 and decimals == 2)
839
840    Note that this is hard-coded in English and probably not especially
841    performant.
842    """
843    # pylint: disable=too-many-locals
844
845    if isinstance(timeval, float):
846        timevalfin = datetime.timedelta(seconds=timeval)
847    else:
848        timevalfin = timeval
849
850    # Internally we only handle positive values.
851    if timevalfin.total_seconds() < 0:
852        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
853
854    years = timevalfin.days // 365
855    days = timevalfin.days % 365
856    hours = timevalfin.seconds // 3600
857    hour_remainder = timevalfin.seconds % 3600
858    minutes = hour_remainder // 60
859    seconds = hour_remainder % 60
860
861    # Now, if we want decimal places for our last value,
862    # calc fractional parts.
863    if decimals:
864        # Calc totals of each type.
865        t_seconds = timevalfin.total_seconds()
866        t_minutes = t_seconds / 60
867        t_hours = t_minutes / 60
868        t_days = t_hours / 24
869        t_years = t_days / 365
870
871        # Calc fractional parts that exclude all whole values to their left.
872        years_covered = years
873        years_f = t_years - years_covered
874        days_covered = years_covered * 365 + days
875        days_f = t_days - days_covered
876        hours_covered = days_covered * 24 + hours
877        hours_f = t_hours - hours_covered
878        minutes_covered = hours_covered * 60 + minutes
879        minutes_f = t_minutes - minutes_covered
880        seconds_covered = minutes_covered * 60 + seconds
881        seconds_f = t_seconds - seconds_covered
882    else:
883        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
884
885    parts: list[str] = []
886    for part, part_f, suffix in (
887        (years, years_f, 'y'),
888        (days, days_f, 'd'),
889        (hours, hours_f, 'h'),
890        (minutes, minutes_f, 'm'),
891        (seconds, seconds_f, 's'),
892    ):
893        if part or parts or (not parts and suffix == 's'):
894            # Do decimal version only for the last part.
895            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
896                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
897            else:
898                parts.append(f'{part}{suffix}')
899            if len(parts) >= maxparts:
900                break
901    return ' '.join(parts)

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

def ago_str( timeval: datetime.datetime, maxparts: int = 1, now: datetime.datetime | None = None, decimals: int = 0) -> str:
904def ago_str(
905    timeval: datetime.datetime,
906    maxparts: int = 1,
907    now: datetime.datetime | None = None,
908    decimals: int = 0,
909) -> str:
910    """Given a datetime, return a clean human readable 'ago' str.
911
912    Note that this is hard-coded in English so should not be used
913    for visible in-game elements; only tools/etc.
914
915    If now is not passed, efro.util.utc_now() is used.
916    """
917    if now is None:
918        now = utc_now()
919    return (
920        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
921        + ' ago'
922    )

Given a datetime, return a clean human readable 'ago' str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, efro.util.utc_now() is used.

def split_list(input_list: list[~T], max_length: int) -> list[list[~T]]:
925def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
926    """Split a single list into smaller lists."""
927    return [
928        input_list[i : i + max_length]
929        for i in range(0, len(input_list), max_length)
930    ]

Split a single list into smaller lists.

def extract_flag(args: list[str], name: str) -> bool:
933def extract_flag(args: list[str], name: str) -> bool:
934    """Given a list of args and a flag name, returns whether it is present.
935
936    The arg flag, if present, is removed from the arg list.
937    """
938    from efro.error import CleanError
939
940    count = args.count(name)
941    if count > 1:
942        raise CleanError(f'Flag {name} passed multiple times.')
943    if not count:
944        return False
945    args.remove(name)
946    return True

Given a list of args and a flag name, returns whether it is present.

The arg flag, if present, is removed from the arg list.

def extract_arg(args: list[str], name: str, required: bool = False) -> str | None:
959def extract_arg(
960    args: list[str], name: str, required: bool = False
961) -> str | None:
962    """Given a list of args and an arg name, returns a value.
963
964    The arg flag and value are removed from the arg list.
965    raises CleanErrors on any problems.
966    """
967    from efro.error import CleanError
968
969    count = args.count(name)
970    if not count:
971        if required:
972            raise CleanError(f'Required argument {name} not passed.')
973        return None
974
975    if count > 1:
976        raise CleanError(f'Arg {name} passed multiple times.')
977
978    argindex = args.index(name)
979    if argindex + 1 >= len(args):
980        raise CleanError(f'No value passed after {name} arg.')
981
982    val = args[argindex + 1]
983    del args[argindex : argindex + 2]
984
985    return val

Given a list of args and an arg name, returns a value.

The arg flag and value are removed from the arg list. raises CleanErrors on any problems.