efro.util

Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Small handy bits of functionality."""
  4
  5from __future__ import annotations
  6
  7import os
  8import time
  9import weakref
 10import datetime
 11from enum import Enum
 12from typing import TYPE_CHECKING, cast, TypeVar, Generic, overload
 13
 14if TYPE_CHECKING:
 15    import asyncio
 16    from typing import Any, Callable, Literal
 17
 18T = TypeVar('T')
 19ValT = TypeVar('ValT')
 20ArgT = TypeVar('ArgT')
 21SelfT = TypeVar('SelfT')
 22RetT = TypeVar('RetT')
 23EnumT = TypeVar('EnumT', bound=Enum)
 24
 25
 26class _EmptyObj:
 27    pass
 28
 29
 30# A dead weak-ref should be immutable, right? So we can create exactly
 31# one and return it for all cases that need an empty weak-ref.
 32_g_empty_weak_ref = weakref.ref(_EmptyObj())
 33assert _g_empty_weak_ref() is None
 34
 35
 36def explicit_bool(val: bool) -> bool:
 37    """Return a non-inferable boolean value.
 38
 39    Useful to be able to disable blocks of code without type checkers
 40    complaining/etc.
 41    """
 42    # pylint: disable=no-else-return
 43    if TYPE_CHECKING:
 44        # infer this! <boom>
 45        import random
 46
 47        return random.random() < 0.5
 48    else:
 49        return val
 50
 51
 52def snake_case_to_title(val: str) -> str:
 53    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 54    # Kill empty words resulting from leading/trailing/multiple underscores.
 55    return ' '.join(w for w in val.split('_') if w).title()
 56
 57
 58def snake_case_to_camel_case(val: str) -> str:
 59    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 60    # Replace underscores with spaces; capitalize words; kill spaces.
 61    # Not sure about efficiency, but logically simple.
 62    return val.replace('_', ' ').title().replace(' ', '')
 63
 64
 65def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
 66    """Create an enum from a value.
 67
 68    This is basically the same as doing 'obj = EnumType(value)' except
 69    that it works around an issue where a reference loop is created
 70    if an exception is thrown due to an invalid value. Since we disable
 71    the cyclic garbage collector for most of the time, such loops can lead
 72    to our objects sticking around longer than we want.
 73    This issue has been submitted to Python as a bug so hopefully we can
 74    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
 75    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
 76    off once we are 3.9+ across the board.
 77    """
 78
 79    # Note: we don't recreate *ALL* the functionality of the Enum constructor
 80    # such as the _missing_ hook; but this should cover our basic needs.
 81    value2member_map = getattr(cls, '_value2member_map_')
 82    assert value2member_map is not None
 83    try:
 84        out = value2member_map[value]
 85        assert isinstance(out, cls)
 86        return out
 87    except KeyError:
 88        # pylint: disable=consider-using-f-string
 89        raise ValueError(
 90            '%r is not a valid %s' % (value, cls.__name__)
 91        ) from None
 92
 93
 94def check_utc(value: datetime.datetime) -> None:
 95    """Ensure a datetime value is timezone-aware utc."""
 96    if value.tzinfo is not datetime.UTC:
 97        raise ValueError(
 98            'datetime value does not have timezone set as datetime.UTC'
 99        )
100
101
102def utc_now() -> datetime.datetime:
103    """Get timezone-aware current utc time.
104
105    Just a shortcut for datetime.datetime.now(datetime.UTC).
106    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
107    times.
108    """
109    return datetime.datetime.now(datetime.UTC)
110
111
112def utc_now_naive() -> datetime.datetime:
113    """Get naive utc time.
114
115    This can be used to replace datetime.utcnow(), which is now deprecated.
116    Most all code should migrate to use timezone-aware times instead of
117    this.
118    """
119    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
120
121
122def utc_today() -> datetime.datetime:
123    """Get offset-aware midnight in the utc time zone."""
124    now = datetime.datetime.now(datetime.UTC)
125    return datetime.datetime(
126        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
127    )
128
129
130def utc_this_hour() -> datetime.datetime:
131    """Get offset-aware beginning of the current hour in the utc time zone."""
132    now = datetime.datetime.now(datetime.UTC)
133    return datetime.datetime(
134        year=now.year,
135        month=now.month,
136        day=now.day,
137        hour=now.hour,
138        tzinfo=now.tzinfo,
139    )
140
141
142def utc_this_minute() -> datetime.datetime:
143    """Get offset-aware beginning of current minute in the utc time zone."""
144    now = datetime.datetime.now(datetime.UTC)
145    return datetime.datetime(
146        year=now.year,
147        month=now.month,
148        day=now.day,
149        hour=now.hour,
150        minute=now.minute,
151        tzinfo=now.tzinfo,
152    )
153
154
155def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
156    """Return an invalidated weak-reference for the specified type."""
157    # At runtime, all weakrefs are the same; our type arg is just
158    # for the static type checker.
159    del objtype  # Unused.
160
161    # Just create an object and let it die. Is there a cleaner way to do this?
162    # return weakref.ref(_EmptyObj())  # type: ignore
163
164    # Sharing a single ones seems at least a bit better.
165    return _g_empty_weak_ref  # type: ignore
166
167
168def data_size_str(bytecount: int, compact: bool = False) -> str:
169    """Given a size in bytes, returns a short human readable string.
170
171    In compact mode this should be 6 or fewer chars for most all
172    sane file sizes.
173    """
174    # pylint: disable=too-many-return-statements
175
176    # Special case: handle negatives.
177    if bytecount < 0:
178        val = data_size_str(-bytecount, compact=compact)
179        return f'-{val}'
180
181    if bytecount <= 999:
182        suffix = 'B' if compact else 'bytes'
183        return f'{bytecount} {suffix}'
184    kbytecount = bytecount / 1024
185    if round(kbytecount, 1) < 10.0:
186        return f'{kbytecount:.1f} KB'
187    if round(kbytecount, 0) < 999:
188        return f'{kbytecount:.0f} KB'
189    mbytecount = bytecount / (1024 * 1024)
190    if round(mbytecount, 1) < 10.0:
191        return f'{mbytecount:.1f} MB'
192    if round(mbytecount, 0) < 999:
193        return f'{mbytecount:.0f} MB'
194    gbytecount = bytecount / (1024 * 1024 * 1024)
195    if round(gbytecount, 1) < 10.0:
196        return f'{gbytecount:.1f} GB'
197    return f'{gbytecount:.0f} GB'
198
199
200class DirtyBit:
201    """Manages whether a thing is dirty and regulates attempts to clean it.
202
203    To use, simply set the 'dirty' value on this object to True when some
204    action is needed, and then check the 'should_update' value to regulate
205    when attempts to clean it should be made. Set 'dirty' back to False after
206    a successful update.
207    If 'use_lock' is True, an asyncio Lock will be created and incorporated
208    into update attempts to prevent simultaneous updates (should_update will
209    only return True when the lock is unlocked). Note that It is up to the user
210    to lock/unlock the lock during the actual update attempt.
211    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
212    itself back to dirty after being clean for the given amount of time.
213    'min_update_interval' can be used to enforce a minimum update
214    interval even when updates are successful (retry_interval only applies
215    when updates fail)
216    """
217
218    def __init__(
219        self,
220        dirty: bool = False,
221        retry_interval: float = 5.0,
222        use_lock: bool = False,
223        auto_dirty_seconds: float | None = None,
224        min_update_interval: float | None = None,
225    ):
226        curtime = time.monotonic()
227        self._retry_interval = retry_interval
228        self._auto_dirty_seconds = auto_dirty_seconds
229        self._min_update_interval = min_update_interval
230        self._dirty = dirty
231        self._next_update_time: float | None = curtime if dirty else None
232        self._last_update_time: float | None = None
233        self._next_auto_dirty_time: float | None = (
234            (curtime + self._auto_dirty_seconds)
235            if (not dirty and self._auto_dirty_seconds is not None)
236            else None
237        )
238        self._use_lock = use_lock
239        self.lock: asyncio.Lock
240        if self._use_lock:
241            import asyncio
242
243            self.lock = asyncio.Lock()
244
245    @property
246    def dirty(self) -> bool:
247        """Whether the target is currently dirty.
248
249        This should be set to False once an update is successful.
250        """
251        return self._dirty
252
253    @dirty.setter
254    def dirty(self, value: bool) -> None:
255        # If we're freshly clean, set our next auto-dirty time (if we have
256        # one).
257        if self._dirty and not value and self._auto_dirty_seconds is not None:
258            self._next_auto_dirty_time = (
259                time.monotonic() + self._auto_dirty_seconds
260            )
261
262        # If we're freshly dirty, schedule an immediate update.
263        if not self._dirty and value:
264            self._next_update_time = time.monotonic()
265
266            # If they want to enforce a minimum update interval,
267            # push out the next update time if it hasn't been long enough.
268            if (
269                self._min_update_interval is not None
270                and self._last_update_time is not None
271            ):
272                self._next_update_time = max(
273                    self._next_update_time,
274                    self._last_update_time + self._min_update_interval,
275                )
276
277        self._dirty = value
278
279    @property
280    def should_update(self) -> bool:
281        """Whether an attempt should be made to clean the target now.
282
283        Always returns False if the target is not dirty.
284        Takes into account the amount of time passed since the target
285        was marked dirty or since should_update last returned True.
286        """
287        curtime = time.monotonic()
288
289        # Auto-dirty ourself if we're into that.
290        if (
291            self._next_auto_dirty_time is not None
292            and curtime > self._next_auto_dirty_time
293        ):
294            self.dirty = True
295            self._next_auto_dirty_time = None
296        if not self._dirty:
297            return False
298        if self._use_lock and self.lock.locked():
299            return False
300        assert self._next_update_time is not None
301        if curtime > self._next_update_time:
302            self._next_update_time = curtime + self._retry_interval
303            self._last_update_time = curtime
304            return True
305        return False
306
307
308class DispatchMethodWrapper(Generic[ArgT, RetT]):
309    """Type-aware standin for the dispatch func returned by dispatchmethod."""
310
311    def __call__(self, arg: ArgT) -> RetT:
312        raise RuntimeError('Should not get here')
313
314    @staticmethod
315    def register(
316        func: Callable[[Any, Any], RetT]
317    ) -> Callable[[Any, Any], RetT]:
318        """Register a new dispatch handler for this dispatch-method."""
319        raise RuntimeError('Should not get here')
320
321    registry: dict[Any, Callable]
322
323
324# noinspection PyProtectedMember,PyTypeHints
325def dispatchmethod(
326    func: Callable[[Any, ArgT], RetT]
327) -> DispatchMethodWrapper[ArgT, RetT]:
328    """A variation of functools.singledispatch for methods.
329
330    Note: as of Python 3.9 there is now functools.singledispatchmethod,
331    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
332    which gives us a reason to keep this one around for now.
333    """
334    from functools import singledispatch, update_wrapper
335
336    origwrapper: Any = singledispatch(func)
337
338    # Pull this out so hopefully origwrapper can die,
339    # otherwise we reference origwrapper in our wrapper.
340    dispatch = origwrapper.dispatch
341
342    # All we do here is recreate the end of functools.singledispatch
343    # where it returns a wrapper except instead of the wrapper using the
344    # first arg to the function ours uses the second (to skip 'self').
345    # This was made against Python 3.7; we should probably check up on
346    # this in later versions in case anything has changed.
347    # (or hopefully they'll add this functionality to their version)
348    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
349    def wrapper(*args: Any, **kw: Any) -> Any:
350        if not args or len(args) < 2:
351            raise TypeError(
352                f'{funcname} requires at least ' '2 positional arguments'
353            )
354
355        return dispatch(args[1].__class__)(*args, **kw)
356
357    funcname = getattr(func, '__name__', 'dispatchmethod method')
358    wrapper.register = origwrapper.register  # type: ignore
359    wrapper.dispatch = dispatch  # type: ignore
360    wrapper.registry = origwrapper.registry  # type: ignore
361    # pylint: disable=protected-access
362    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
363    update_wrapper(wrapper, func)
364    # pylint: enable=protected-access
365    return cast(DispatchMethodWrapper, wrapper)
366
367
368def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
369    """Decorator for functions to allow dispatching based on a value.
370
371    This differs from functools.singledispatch in that it dispatches based
372    on the value of an argument, not based on its type.
373    The 'register' method of a value-dispatch function can be used
374    to assign new functions to handle particular values.
375    Unhandled values wind up in the original dispatch function."""
376    return ValueDispatcher(call)
377
378
379class ValueDispatcher(Generic[ValT, RetT]):
380    """Used by the valuedispatch decorator"""
381
382    def __init__(self, call: Callable[[ValT], RetT]) -> None:
383        self._base_call = call
384        self._handlers: dict[ValT, Callable[[], RetT]] = {}
385
386    def __call__(self, value: ValT) -> RetT:
387        handler = self._handlers.get(value)
388        if handler is not None:
389            return handler()
390        return self._base_call(value)
391
392    def _add_handler(
393        self, value: ValT, call: Callable[[], RetT]
394    ) -> Callable[[], RetT]:
395        if value in self._handlers:
396            raise RuntimeError(f'Duplicate handlers added for {value}')
397        self._handlers[value] = call
398        return call
399
400    def register(
401        self, value: ValT
402    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
403        """Add a handler to the dispatcher."""
404        from functools import partial
405
406        return partial(self._add_handler, value)
407
408
409def valuedispatch1arg(
410    call: Callable[[ValT, ArgT], RetT]
411) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
412    """Like valuedispatch but for functions taking an extra argument."""
413    return ValueDispatcher1Arg(call)
414
415
416class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
417    """Used by the valuedispatch1arg decorator"""
418
419    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
420        self._base_call = call
421        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
422
423    def __call__(self, value: ValT, arg: ArgT) -> RetT:
424        handler = self._handlers.get(value)
425        if handler is not None:
426            return handler(arg)
427        return self._base_call(value, arg)
428
429    def _add_handler(
430        self, value: ValT, call: Callable[[ArgT], RetT]
431    ) -> Callable[[ArgT], RetT]:
432        if value in self._handlers:
433            raise RuntimeError(f'Duplicate handlers added for {value}')
434        self._handlers[value] = call
435        return call
436
437    def register(
438        self, value: ValT
439    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
440        """Add a handler to the dispatcher."""
441        from functools import partial
442
443        return partial(self._add_handler, value)
444
445
446if TYPE_CHECKING:
447
448    class ValueDispatcherMethod(Generic[ValT, RetT]):
449        """Used by the valuedispatchmethod decorator."""
450
451        def __call__(self, value: ValT) -> RetT: ...
452
453        def register(
454            self, value: ValT
455        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
456            """Add a handler to the dispatcher."""
457            ...
458
459
460def valuedispatchmethod(
461    call: Callable[[SelfT, ValT], RetT]
462) -> ValueDispatcherMethod[ValT, RetT]:
463    """Like valuedispatch but works with methods instead of functions."""
464
465    # NOTE: It seems that to wrap a method with a decorator and have self
466    # dispatching do the right thing, we must return a function and not
467    # an executable object. So for this version we store our data here
468    # in the function call dict and simply return a call.
469
470    _base_call = call
471    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
472
473    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
474        if value in _handlers:
475            raise RuntimeError(f'Duplicate handlers added for {value}')
476        _handlers[value] = addcall
477
478    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
479        from functools import partial
480
481        return partial(_add_handler, value)
482
483    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
484        handler = _handlers.get(value)
485        if handler is not None:
486            return handler(self)
487        return _base_call(self, value)
488
489    # We still want to use our returned object to register handlers, but we're
490    # actually just returning a function. So manually stuff the call onto it.
491    setattr(_call_wrapper, 'register', _register)
492
493    # To the type checker's eyes we return a ValueDispatchMethod instance;
494    # this lets it know about our register func and type-check its usage.
495    # In reality we just return a raw function call (for reasons listed above).
496    # pylint: disable=undefined-variable, no-else-return
497    if TYPE_CHECKING:
498        return ValueDispatcherMethod[ValT, RetT]()
499    else:
500        return _call_wrapper
501
502
503def make_hash(obj: Any) -> int:
504    """Makes a hash from a dictionary, list, tuple or set to any level,
505    that contains only other hashable types (including any lists, tuples,
506    sets, and dictionaries).
507
508    Note that this uses Python's hash() function internally so collisions/etc.
509    may be more common than with fancy cryptographic hashes.
510
511    Also be aware that Python's hash() output varies across processes, so
512    this should only be used for values that will remain in a single process.
513    """
514    import copy
515
516    if isinstance(obj, (set, tuple, list)):
517        return hash(tuple(make_hash(e) for e in obj))
518    if not isinstance(obj, dict):
519        return hash(obj)
520
521    new_obj = copy.deepcopy(obj)
522    for k, v in new_obj.items():
523        new_obj[k] = make_hash(v)
524
525    # NOTE: there is sorted works correctly because it compares only
526    # unique first values (i.e. dict keys)
527    return hash(tuple(frozenset(sorted(new_obj.items()))))
528
529
530def float_hash_from_string(s: str) -> float:
531    """Given a string value, returns a float between 0 and 1.
532
533    If consistent across processes. Can be useful for assigning db ids
534    shard values for efficient parallel processing.
535    """
536    import hashlib
537
538    hash_bytes = hashlib.md5(s.encode()).digest()
539
540    # Generate a random 64 bit int from hash digest bytes.
541    ival = int.from_bytes(hash_bytes[:8])
542    return ival / ((1 << 64) - 1)
543
544
545def asserttype(obj: Any, typ: type[T]) -> T:
546    """Return an object typed as a given type.
547
548    Assert is used to check its actual type, so only use this when
549    failures are not expected. Otherwise use checktype.
550    """
551    assert isinstance(typ, type), 'only actual types accepted'
552    assert isinstance(obj, typ)
553    return obj
554
555
556def asserttype_o(obj: Any, typ: type[T]) -> T | None:
557    """Return an object typed as a given optional type.
558
559    Assert is used to check its actual type, so only use this when
560    failures are not expected. Otherwise use checktype.
561    """
562    assert isinstance(typ, type), 'only actual types accepted'
563    assert isinstance(obj, (typ, type(None)))
564    return obj
565
566
567def checktype(obj: Any, typ: type[T]) -> T:
568    """Return an object typed as a given type.
569
570    Always checks the type at runtime with isinstance and throws a TypeError
571    on failure. Use asserttype for more efficient (but less safe) equivalent.
572    """
573    assert isinstance(typ, type), 'only actual types accepted'
574    if not isinstance(obj, typ):
575        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
576    return obj
577
578
579def checktype_o(obj: Any, typ: type[T]) -> T | None:
580    """Return an object typed as a given optional type.
581
582    Always checks the type at runtime with isinstance and throws a TypeError
583    on failure. Use asserttype for more efficient (but less safe) equivalent.
584    """
585    assert isinstance(typ, type), 'only actual types accepted'
586    if not isinstance(obj, (typ, type(None))):
587        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
588    return obj
589
590
591def warntype(obj: Any, typ: type[T]) -> T:
592    """Return an object typed as a given type.
593
594    Always checks the type at runtime and simply logs a warning if it is
595    not what is expected.
596    """
597    assert isinstance(typ, type), 'only actual types accepted'
598    if not isinstance(obj, typ):
599        import logging
600
601        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
602    return obj  # type: ignore
603
604
605def warntype_o(obj: Any, typ: type[T]) -> T | None:
606    """Return an object typed as a given type.
607
608    Always checks the type at runtime and simply logs a warning if it is
609    not what is expected.
610    """
611    assert isinstance(typ, type), 'only actual types accepted'
612    if not isinstance(obj, (typ, type(None))):
613        import logging
614
615        logging.warning(
616            'warntype: expected a %s or None, got a %s', typ, type(obj)
617        )
618    return obj  # type: ignore
619
620
621def assert_non_optional(obj: T | None) -> T:
622    """Return an object with Optional typing removed.
623
624    Assert is used to check its actual type, so only use this when
625    failures are not expected. Use check_non_optional otherwise.
626    """
627    assert obj is not None
628    return obj
629
630
631def check_non_optional(obj: T | None) -> T:
632    """Return an object with Optional typing removed.
633
634    Always checks the actual type and throws a TypeError on failure.
635    Use assert_non_optional for a more efficient (but less safe) equivalent.
636    """
637    if obj is None:
638        raise ValueError('Got None value in check_non_optional.')
639    return obj
640
641
642def smoothstep(edge0: float, edge1: float, x: float) -> float:
643    """A smooth transition function.
644
645    Returns a value that smoothly moves from 0 to 1 as we go between edges.
646    Values outside of the range return 0 or 1.
647    """
648    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
649    return y * y * (3.0 - 2.0 * y)
650
651
652def linearstep(edge0: float, edge1: float, x: float) -> float:
653    """A linear transition function.
654
655    Returns a value that linearly moves from 0 to 1 as we go between edges.
656    Values outside of the range return 0 or 1.
657    """
658    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
659
660
661def _compact_id(num: int, chars: str) -> str:
662    if num < 0:
663        raise ValueError('Negative integers not allowed.')
664
665    # Chars must be in sorted order for sorting to work correctly
666    # on our output.
667    assert ''.join(sorted(list(chars))) == chars
668
669    base = len(chars)
670    out = ''
671    while num:
672        out += chars[num % base]
673        num //= base
674    return out[::-1] or '0'
675
676
677def human_readable_compact_id(num: int) -> str:
678    """Given a positive int, return a compact string representation for it.
679
680    Handy for visualizing unique numeric ids using as few as possible chars.
681    This representation uses only lowercase letters and numbers (minus the
682    following letters for readability):
683     's' is excluded due to similarity to '5'.
684     'l' is excluded due to similarity to '1'.
685     'i' is excluded due to similarity to '1'.
686     'o' is excluded due to similarity to '0'.
687     'z' is excluded due to similarity to '2'.
688
689    Therefore for n chars this can store values of 21^n.
690
691    When reading human input consisting of these IDs, it may be desirable
692    to map the disallowed chars to their corresponding allowed ones
693    ('o' -> '0', etc).
694
695    Sort order for these ids is the same as the original numbers.
696
697    If more compactness is desired at the expense of readability,
698    use compact_id() instead.
699    """
700    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
701
702
703def compact_id(num: int) -> str:
704    """Given a positive int, return a compact string representation for it.
705
706    Handy for visualizing unique numeric ids using as few as possible chars.
707    This version is more compact than human_readable_compact_id() but less
708    friendly to humans due to using both capital and lowercase letters,
709    both 'O' and '0', etc.
710
711    Therefore for n chars this can store values of 62^n.
712
713    Sort order for these ids is the same as the original numbers.
714    """
715    return _compact_id(
716        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
717    )
718
719
720def caller_source_location() -> str:
721    """Returns source file name and line of the code calling us.
722
723    Example: 'mymodule.py:23'
724    """
725    try:
726        import inspect
727
728        frame = inspect.currentframe()
729        for _i in range(2):
730            if frame is None:
731                raise RuntimeError()
732            frame = frame.f_back
733        if frame is None:
734            raise RuntimeError()
735        fname = os.path.basename(frame.f_code.co_filename)
736        return f'{fname}:{frame.f_lineno}'
737    except Exception:
738        return '<unknown source location>'
739
740
741def unchanging_hostname() -> str:
742    """Return an unchanging name for the local device.
743
744    Similar to the `hostname` call (or os.uname().nodename in Python)
745    except attempts to give a name that doesn't change depending on
746    network conditions. (A Mac will tend to go from Foo to Foo.local,
747    Foo.lan etc. throughout its various adventures)
748    """
749    import platform
750    import subprocess
751
752    # On Mac, this should give the computer name assigned in System Prefs.
753    if platform.system() == 'Darwin':
754        return (
755            subprocess.run(
756                ['scutil', '--get', 'ComputerName'],
757                check=True,
758                capture_output=True,
759            )
760            .stdout.decode()
761            .strip()
762            .replace(' ', '-')
763        )
764    return os.uname().nodename
765
766
767def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
768    """Do the thing."""
769    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
770        return
771
772    modulename = module_globals.get('__name__')
773    if not isinstance(modulename, str):
774        raise RuntimeError('Unable to get module name.')
775    assert not modulename.startswith('_')
776    modulename_prefix = f'{modulename}.'
777    modulename_prefix_2 = f'_{modulename}.'
778
779    for name, obj in module_globals.items():
780        if name.startswith('_'):
781            continue
782        existing = getattr(obj, '__module__', None)
783        try:
784            # Override the module ONLY if it lives under us somewhere.
785            # So ourpackage._submodule.Foo becomes ourpackage.Foo
786            # but otherpackage._submodule.Foo remains untouched.
787            if existing is not None and (
788                existing.startswith(modulename_prefix)
789                or existing.startswith(modulename_prefix_2)
790            ):
791                obj.__module__ = modulename
792        except Exception:
793            import logging
794
795            logging.warning(
796                'set_canonical_module_names: unable to change __module__'
797                " from '%s' to '%s' on %s object at '%s'.",
798                existing,
799                modulename,
800                type(obj),
801                name,
802            )
803
804
805def timedelta_str(
806    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
807) -> str:
808    """Return a simple human readable time string for a length of time.
809
810    Time can be given as a timedelta or a float representing seconds.
811    Example output:
812      "23d 1h 2m 32s" (with maxparts == 4)
813      "23d 1h" (with maxparts == 2)
814      "23d 1.08h" (with maxparts == 2 and decimals == 2)
815
816    Note that this is hard-coded in English and probably not especially
817    performant.
818    """
819    # pylint: disable=too-many-locals
820
821    if isinstance(timeval, float):
822        timevalfin = datetime.timedelta(seconds=timeval)
823    else:
824        timevalfin = timeval
825
826    # Internally we only handle positive values.
827    if timevalfin.total_seconds() < 0:
828        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
829
830    years = timevalfin.days // 365
831    days = timevalfin.days % 365
832    hours = timevalfin.seconds // 3600
833    hour_remainder = timevalfin.seconds % 3600
834    minutes = hour_remainder // 60
835    seconds = hour_remainder % 60
836
837    # Now, if we want decimal places for our last value,
838    # calc fractional parts.
839    if decimals:
840        # Calc totals of each type.
841        t_seconds = timevalfin.total_seconds()
842        t_minutes = t_seconds / 60
843        t_hours = t_minutes / 60
844        t_days = t_hours / 24
845        t_years = t_days / 365
846
847        # Calc fractional parts that exclude all whole values to their left.
848        years_covered = years
849        years_f = t_years - years_covered
850        days_covered = years_covered * 365 + days
851        days_f = t_days - days_covered
852        hours_covered = days_covered * 24 + hours
853        hours_f = t_hours - hours_covered
854        minutes_covered = hours_covered * 60 + minutes
855        minutes_f = t_minutes - minutes_covered
856        seconds_covered = minutes_covered * 60 + seconds
857        seconds_f = t_seconds - seconds_covered
858    else:
859        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
860
861    parts: list[str] = []
862    for part, part_f, suffix in (
863        (years, years_f, 'y'),
864        (days, days_f, 'd'),
865        (hours, hours_f, 'h'),
866        (minutes, minutes_f, 'm'),
867        (seconds, seconds_f, 's'),
868    ):
869        if part or parts or (not parts and suffix == 's'):
870            # Do decimal version only for the last part.
871            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
872                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
873            else:
874                parts.append(f'{part}{suffix}')
875            if len(parts) >= maxparts:
876                break
877    return ' '.join(parts)
878
879
880def ago_str(
881    timeval: datetime.datetime,
882    maxparts: int = 1,
883    now: datetime.datetime | None = None,
884    decimals: int = 0,
885) -> str:
886    """Given a datetime, return a clean human readable 'ago' str.
887
888    Note that this is hard-coded in English so should not be used
889    for visible in-game elements; only tools/etc.
890
891    If now is not passed, efro.util.utc_now() is used.
892    """
893    if now is None:
894        now = utc_now()
895    return (
896        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
897        + ' ago'
898    )
899
900
901def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
902    """Split a single list into smaller lists."""
903    return [
904        input_list[i : i + max_length]
905        for i in range(0, len(input_list), max_length)
906    ]
907
908
909def extract_flag(args: list[str], name: str) -> bool:
910    """Given a list of args and a flag name, returns whether it is present.
911
912    The arg flag, if present, is removed from the arg list.
913    """
914    from efro.error import CleanError
915
916    count = args.count(name)
917    if count > 1:
918        raise CleanError(f'Flag {name} passed multiple times.')
919    if not count:
920        return False
921    args.remove(name)
922    return True
923
924
925@overload
926def extract_arg(
927    args: list[str], name: str, required: Literal[False] = False
928) -> str | None: ...
929
930
931@overload
932def extract_arg(args: list[str], name: str, required: Literal[True]) -> str: ...
933
934
935def extract_arg(
936    args: list[str], name: str, required: bool = False
937) -> str | None:
938    """Given a list of args and an arg name, returns a value.
939
940    The arg flag and value are removed from the arg list.
941    raises CleanErrors on any problems.
942    """
943    from efro.error import CleanError
944
945    count = args.count(name)
946    if not count:
947        if required:
948            raise CleanError(f'Required argument {name} not passed.')
949        return None
950
951    if count > 1:
952        raise CleanError(f'Arg {name} passed multiple times.')
953
954    argindex = args.index(name)
955    if argindex + 1 >= len(args):
956        raise CleanError(f'No value passed after {name} arg.')
957
958    val = args[argindex + 1]
959    del args[argindex : argindex + 2]
960
961    return val
def explicit_bool(val: bool) -> bool:
37def explicit_bool(val: bool) -> bool:
38    """Return a non-inferable boolean value.
39
40    Useful to be able to disable blocks of code without type checkers
41    complaining/etc.
42    """
43    # pylint: disable=no-else-return
44    if TYPE_CHECKING:
45        # infer this! <boom>
46        import random
47
48        return random.random() < 0.5
49    else:
50        return val

Return a non-inferable boolean value.

Useful to be able to disable blocks of code without type checkers complaining/etc.

def snake_case_to_title(val: str) -> str:
53def snake_case_to_title(val: str) -> str:
54    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
55    # Kill empty words resulting from leading/trailing/multiple underscores.
56    return ' '.join(w for w in val.split('_') if w).title()

Given a snake-case string 'foo_bar', returns 'Foo Bar'.

def snake_case_to_camel_case(val: str) -> str:
59def snake_case_to_camel_case(val: str) -> str:
60    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
61    # Replace underscores with spaces; capitalize words; kill spaces.
62    # Not sure about efficiency, but logically simple.
63    return val.replace('_', ' ').title().replace(' ', '')

Given a snake-case string 'foo_bar', returns camel-case 'FooBar'.

def enum_by_value(cls: type[~EnumT], value: Any) -> ~EnumT:
66def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
67    """Create an enum from a value.
68
69    This is basically the same as doing 'obj = EnumType(value)' except
70    that it works around an issue where a reference loop is created
71    if an exception is thrown due to an invalid value. Since we disable
72    the cyclic garbage collector for most of the time, such loops can lead
73    to our objects sticking around longer than we want.
74    This issue has been submitted to Python as a bug so hopefully we can
75    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
76    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
77    off once we are 3.9+ across the board.
78    """
79
80    # Note: we don't recreate *ALL* the functionality of the Enum constructor
81    # such as the _missing_ hook; but this should cover our basic needs.
82    value2member_map = getattr(cls, '_value2member_map_')
83    assert value2member_map is not None
84    try:
85        out = value2member_map[value]
86        assert isinstance(out, cls)
87        return out
88    except KeyError:
89        # pylint: disable=consider-using-f-string
90        raise ValueError(
91            '%r is not a valid %s' % (value, cls.__name__)
92        ) from None

Create an enum from a value.

This is basically the same as doing 'obj = EnumType(value)' except that it works around an issue where a reference loop is created if an exception is thrown due to an invalid value. Since we disable the cyclic garbage collector for most of the time, such loops can lead to our objects sticking around longer than we want. This issue has been submitted to Python as a bug so hopefully we can remove this eventually if it gets fixed: https://bugs.python.org/issue42248 UPDATE: This has been fixed as of later 3.8 builds, so we can kill this off once we are 3.9+ across the board.

def check_utc(value: datetime.datetime) -> None:
 95def check_utc(value: datetime.datetime) -> None:
 96    """Ensure a datetime value is timezone-aware utc."""
 97    if value.tzinfo is not datetime.UTC:
 98        raise ValueError(
 99            'datetime value does not have timezone set as datetime.UTC'
100        )

Ensure a datetime value is timezone-aware utc.

def utc_now() -> datetime.datetime:
103def utc_now() -> datetime.datetime:
104    """Get timezone-aware current utc time.
105
106    Just a shortcut for datetime.datetime.now(datetime.UTC).
107    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
108    times.
109    """
110    return datetime.datetime.now(datetime.UTC)

Get timezone-aware current utc time.

Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.

def utc_now_naive() -> datetime.datetime:
113def utc_now_naive() -> datetime.datetime:
114    """Get naive utc time.
115
116    This can be used to replace datetime.utcnow(), which is now deprecated.
117    Most all code should migrate to use timezone-aware times instead of
118    this.
119    """
120    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)

Get naive utc time.

This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of this.

def utc_today() -> datetime.datetime:
123def utc_today() -> datetime.datetime:
124    """Get offset-aware midnight in the utc time zone."""
125    now = datetime.datetime.now(datetime.UTC)
126    return datetime.datetime(
127        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
128    )

Get offset-aware midnight in the utc time zone.

def utc_this_hour() -> datetime.datetime:
131def utc_this_hour() -> datetime.datetime:
132    """Get offset-aware beginning of the current hour in the utc time zone."""
133    now = datetime.datetime.now(datetime.UTC)
134    return datetime.datetime(
135        year=now.year,
136        month=now.month,
137        day=now.day,
138        hour=now.hour,
139        tzinfo=now.tzinfo,
140    )

Get offset-aware beginning of the current hour in the utc time zone.

def utc_this_minute() -> datetime.datetime:
143def utc_this_minute() -> datetime.datetime:
144    """Get offset-aware beginning of current minute in the utc time zone."""
145    now = datetime.datetime.now(datetime.UTC)
146    return datetime.datetime(
147        year=now.year,
148        month=now.month,
149        day=now.day,
150        hour=now.hour,
151        minute=now.minute,
152        tzinfo=now.tzinfo,
153    )

Get offset-aware beginning of current minute in the utc time zone.

def empty_weakref(objtype: type[~T]) -> weakref.ReferenceType[~T]:
156def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
157    """Return an invalidated weak-reference for the specified type."""
158    # At runtime, all weakrefs are the same; our type arg is just
159    # for the static type checker.
160    del objtype  # Unused.
161
162    # Just create an object and let it die. Is there a cleaner way to do this?
163    # return weakref.ref(_EmptyObj())  # type: ignore
164
165    # Sharing a single ones seems at least a bit better.
166    return _g_empty_weak_ref  # type: ignore

Return an invalidated weak-reference for the specified type.

def data_size_str(bytecount: int, compact: bool = False) -> str:
169def data_size_str(bytecount: int, compact: bool = False) -> str:
170    """Given a size in bytes, returns a short human readable string.
171
172    In compact mode this should be 6 or fewer chars for most all
173    sane file sizes.
174    """
175    # pylint: disable=too-many-return-statements
176
177    # Special case: handle negatives.
178    if bytecount < 0:
179        val = data_size_str(-bytecount, compact=compact)
180        return f'-{val}'
181
182    if bytecount <= 999:
183        suffix = 'B' if compact else 'bytes'
184        return f'{bytecount} {suffix}'
185    kbytecount = bytecount / 1024
186    if round(kbytecount, 1) < 10.0:
187        return f'{kbytecount:.1f} KB'
188    if round(kbytecount, 0) < 999:
189        return f'{kbytecount:.0f} KB'
190    mbytecount = bytecount / (1024 * 1024)
191    if round(mbytecount, 1) < 10.0:
192        return f'{mbytecount:.1f} MB'
193    if round(mbytecount, 0) < 999:
194        return f'{mbytecount:.0f} MB'
195    gbytecount = bytecount / (1024 * 1024 * 1024)
196    if round(gbytecount, 1) < 10.0:
197        return f'{gbytecount:.1f} GB'
198    return f'{gbytecount:.0f} GB'

Given a size in bytes, returns a short human readable string.

In compact mode this should be 6 or fewer chars for most all sane file sizes.

class DirtyBit:
201class DirtyBit:
202    """Manages whether a thing is dirty and regulates attempts to clean it.
203
204    To use, simply set the 'dirty' value on this object to True when some
205    action is needed, and then check the 'should_update' value to regulate
206    when attempts to clean it should be made. Set 'dirty' back to False after
207    a successful update.
208    If 'use_lock' is True, an asyncio Lock will be created and incorporated
209    into update attempts to prevent simultaneous updates (should_update will
210    only return True when the lock is unlocked). Note that It is up to the user
211    to lock/unlock the lock during the actual update attempt.
212    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
213    itself back to dirty after being clean for the given amount of time.
214    'min_update_interval' can be used to enforce a minimum update
215    interval even when updates are successful (retry_interval only applies
216    when updates fail)
217    """
218
219    def __init__(
220        self,
221        dirty: bool = False,
222        retry_interval: float = 5.0,
223        use_lock: bool = False,
224        auto_dirty_seconds: float | None = None,
225        min_update_interval: float | None = None,
226    ):
227        curtime = time.monotonic()
228        self._retry_interval = retry_interval
229        self._auto_dirty_seconds = auto_dirty_seconds
230        self._min_update_interval = min_update_interval
231        self._dirty = dirty
232        self._next_update_time: float | None = curtime if dirty else None
233        self._last_update_time: float | None = None
234        self._next_auto_dirty_time: float | None = (
235            (curtime + self._auto_dirty_seconds)
236            if (not dirty and self._auto_dirty_seconds is not None)
237            else None
238        )
239        self._use_lock = use_lock
240        self.lock: asyncio.Lock
241        if self._use_lock:
242            import asyncio
243
244            self.lock = asyncio.Lock()
245
246    @property
247    def dirty(self) -> bool:
248        """Whether the target is currently dirty.
249
250        This should be set to False once an update is successful.
251        """
252        return self._dirty
253
254    @dirty.setter
255    def dirty(self, value: bool) -> None:
256        # If we're freshly clean, set our next auto-dirty time (if we have
257        # one).
258        if self._dirty and not value and self._auto_dirty_seconds is not None:
259            self._next_auto_dirty_time = (
260                time.monotonic() + self._auto_dirty_seconds
261            )
262
263        # If we're freshly dirty, schedule an immediate update.
264        if not self._dirty and value:
265            self._next_update_time = time.monotonic()
266
267            # If they want to enforce a minimum update interval,
268            # push out the next update time if it hasn't been long enough.
269            if (
270                self._min_update_interval is not None
271                and self._last_update_time is not None
272            ):
273                self._next_update_time = max(
274                    self._next_update_time,
275                    self._last_update_time + self._min_update_interval,
276                )
277
278        self._dirty = value
279
280    @property
281    def should_update(self) -> bool:
282        """Whether an attempt should be made to clean the target now.
283
284        Always returns False if the target is not dirty.
285        Takes into account the amount of time passed since the target
286        was marked dirty or since should_update last returned True.
287        """
288        curtime = time.monotonic()
289
290        # Auto-dirty ourself if we're into that.
291        if (
292            self._next_auto_dirty_time is not None
293            and curtime > self._next_auto_dirty_time
294        ):
295            self.dirty = True
296            self._next_auto_dirty_time = None
297        if not self._dirty:
298            return False
299        if self._use_lock and self.lock.locked():
300            return False
301        assert self._next_update_time is not None
302        if curtime > self._next_update_time:
303            self._next_update_time = curtime + self._retry_interval
304            self._last_update_time = curtime
305            return True
306        return False

Manages whether a thing is dirty and regulates attempts to clean it.

To use, simply set the 'dirty' value on this object to True when some action is needed, and then check the 'should_update' value to regulate when attempts to clean it should be made. Set 'dirty' back to False after a successful update. If 'use_lock' is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt. If a value is passed for 'auto_dirty_seconds', the dirtybit will flip itself back to dirty after being clean for the given amount of time. 'min_update_interval' can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

DirtyBit( dirty: bool = False, retry_interval: float = 5.0, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)
219    def __init__(
220        self,
221        dirty: bool = False,
222        retry_interval: float = 5.0,
223        use_lock: bool = False,
224        auto_dirty_seconds: float | None = None,
225        min_update_interval: float | None = None,
226    ):
227        curtime = time.monotonic()
228        self._retry_interval = retry_interval
229        self._auto_dirty_seconds = auto_dirty_seconds
230        self._min_update_interval = min_update_interval
231        self._dirty = dirty
232        self._next_update_time: float | None = curtime if dirty else None
233        self._last_update_time: float | None = None
234        self._next_auto_dirty_time: float | None = (
235            (curtime + self._auto_dirty_seconds)
236            if (not dirty and self._auto_dirty_seconds is not None)
237            else None
238        )
239        self._use_lock = use_lock
240        self.lock: asyncio.Lock
241        if self._use_lock:
242            import asyncio
243
244            self.lock = asyncio.Lock()
lock: asyncio.locks.Lock
dirty: bool
246    @property
247    def dirty(self) -> bool:
248        """Whether the target is currently dirty.
249
250        This should be set to False once an update is successful.
251        """
252        return self._dirty

Whether the target is currently dirty.

This should be set to False once an update is successful.

should_update: bool
280    @property
281    def should_update(self) -> bool:
282        """Whether an attempt should be made to clean the target now.
283
284        Always returns False if the target is not dirty.
285        Takes into account the amount of time passed since the target
286        was marked dirty or since should_update last returned True.
287        """
288        curtime = time.monotonic()
289
290        # Auto-dirty ourself if we're into that.
291        if (
292            self._next_auto_dirty_time is not None
293            and curtime > self._next_auto_dirty_time
294        ):
295            self.dirty = True
296            self._next_auto_dirty_time = None
297        if not self._dirty:
298            return False
299        if self._use_lock and self.lock.locked():
300            return False
301        assert self._next_update_time is not None
302        if curtime > self._next_update_time:
303            self._next_update_time = curtime + self._retry_interval
304            self._last_update_time = curtime
305            return True
306        return False

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class DispatchMethodWrapper(typing.Generic[~ArgT, ~RetT]):
309class DispatchMethodWrapper(Generic[ArgT, RetT]):
310    """Type-aware standin for the dispatch func returned by dispatchmethod."""
311
312    def __call__(self, arg: ArgT) -> RetT:
313        raise RuntimeError('Should not get here')
314
315    @staticmethod
316    def register(
317        func: Callable[[Any, Any], RetT]
318    ) -> Callable[[Any, Any], RetT]:
319        """Register a new dispatch handler for this dispatch-method."""
320        raise RuntimeError('Should not get here')
321
322    registry: dict[Any, Callable]

Type-aware standin for the dispatch func returned by dispatchmethod.

@staticmethod
def register(func: Callable[[Any, Any], ~RetT]) -> Callable[[Any, Any], ~RetT]:
315    @staticmethod
316    def register(
317        func: Callable[[Any, Any], RetT]
318    ) -> Callable[[Any, Any], RetT]:
319        """Register a new dispatch handler for this dispatch-method."""
320        raise RuntimeError('Should not get here')

Register a new dispatch handler for this dispatch-method.

registry: dict[typing.Any, typing.Callable]
def dispatchmethod( func: Callable[[Any, ~ArgT], ~RetT]) -> DispatchMethodWrapper[~ArgT, ~RetT]:
326def dispatchmethod(
327    func: Callable[[Any, ArgT], RetT]
328) -> DispatchMethodWrapper[ArgT, RetT]:
329    """A variation of functools.singledispatch for methods.
330
331    Note: as of Python 3.9 there is now functools.singledispatchmethod,
332    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
333    which gives us a reason to keep this one around for now.
334    """
335    from functools import singledispatch, update_wrapper
336
337    origwrapper: Any = singledispatch(func)
338
339    # Pull this out so hopefully origwrapper can die,
340    # otherwise we reference origwrapper in our wrapper.
341    dispatch = origwrapper.dispatch
342
343    # All we do here is recreate the end of functools.singledispatch
344    # where it returns a wrapper except instead of the wrapper using the
345    # first arg to the function ours uses the second (to skip 'self').
346    # This was made against Python 3.7; we should probably check up on
347    # this in later versions in case anything has changed.
348    # (or hopefully they'll add this functionality to their version)
349    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
350    def wrapper(*args: Any, **kw: Any) -> Any:
351        if not args or len(args) < 2:
352            raise TypeError(
353                f'{funcname} requires at least ' '2 positional arguments'
354            )
355
356        return dispatch(args[1].__class__)(*args, **kw)
357
358    funcname = getattr(func, '__name__', 'dispatchmethod method')
359    wrapper.register = origwrapper.register  # type: ignore
360    wrapper.dispatch = dispatch  # type: ignore
361    wrapper.registry = origwrapper.registry  # type: ignore
362    # pylint: disable=protected-access
363    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
364    update_wrapper(wrapper, func)
365    # pylint: enable=protected-access
366    return cast(DispatchMethodWrapper, wrapper)

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

def valuedispatch( call: Callable[[~ValT], ~RetT]) -> ValueDispatcher[~ValT, ~RetT]:
369def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
370    """Decorator for functions to allow dispatching based on a value.
371
372    This differs from functools.singledispatch in that it dispatches based
373    on the value of an argument, not based on its type.
374    The 'register' method of a value-dispatch function can be used
375    to assign new functions to handle particular values.
376    Unhandled values wind up in the original dispatch function."""
377    return ValueDispatcher(call)

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The 'register' method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

class ValueDispatcher(typing.Generic[~ValT, ~RetT]):
380class ValueDispatcher(Generic[ValT, RetT]):
381    """Used by the valuedispatch decorator"""
382
383    def __init__(self, call: Callable[[ValT], RetT]) -> None:
384        self._base_call = call
385        self._handlers: dict[ValT, Callable[[], RetT]] = {}
386
387    def __call__(self, value: ValT) -> RetT:
388        handler = self._handlers.get(value)
389        if handler is not None:
390            return handler()
391        return self._base_call(value)
392
393    def _add_handler(
394        self, value: ValT, call: Callable[[], RetT]
395    ) -> Callable[[], RetT]:
396        if value in self._handlers:
397            raise RuntimeError(f'Duplicate handlers added for {value}')
398        self._handlers[value] = call
399        return call
400
401    def register(
402        self, value: ValT
403    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
404        """Add a handler to the dispatcher."""
405        from functools import partial
406
407        return partial(self._add_handler, value)

Used by the valuedispatch decorator

ValueDispatcher(call: Callable[[~ValT], ~RetT])
383    def __init__(self, call: Callable[[ValT], RetT]) -> None:
384        self._base_call = call
385        self._handlers: dict[ValT, Callable[[], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[], ~RetT]], Callable[[], ~RetT]]:
401    def register(
402        self, value: ValT
403    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
404        """Add a handler to the dispatcher."""
405        from functools import partial
406
407        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatch1arg( call: Callable[[~ValT, ~ArgT], ~RetT]) -> ValueDispatcher1Arg[~ValT, ~ArgT, ~RetT]:
410def valuedispatch1arg(
411    call: Callable[[ValT, ArgT], RetT]
412) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
413    """Like valuedispatch but for functions taking an extra argument."""
414    return ValueDispatcher1Arg(call)

Like valuedispatch but for functions taking an extra argument.

class ValueDispatcher1Arg(typing.Generic[~ValT, ~ArgT, ~RetT]):
417class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
418    """Used by the valuedispatch1arg decorator"""
419
420    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
421        self._base_call = call
422        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
423
424    def __call__(self, value: ValT, arg: ArgT) -> RetT:
425        handler = self._handlers.get(value)
426        if handler is not None:
427            return handler(arg)
428        return self._base_call(value, arg)
429
430    def _add_handler(
431        self, value: ValT, call: Callable[[ArgT], RetT]
432    ) -> Callable[[ArgT], RetT]:
433        if value in self._handlers:
434            raise RuntimeError(f'Duplicate handlers added for {value}')
435        self._handlers[value] = call
436        return call
437
438    def register(
439        self, value: ValT
440    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
441        """Add a handler to the dispatcher."""
442        from functools import partial
443
444        return partial(self._add_handler, value)

Used by the valuedispatch1arg decorator

ValueDispatcher1Arg(call: Callable[[~ValT, ~ArgT], ~RetT])
420    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
421        self._base_call = call
422        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
def register( self, value: ~ValT) -> Callable[[Callable[[~ArgT], ~RetT]], Callable[[~ArgT], ~RetT]]:
438    def register(
439        self, value: ValT
440    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
441        """Add a handler to the dispatcher."""
442        from functools import partial
443
444        return partial(self._add_handler, value)

Add a handler to the dispatcher.

def valuedispatchmethod( call: Callable[[~SelfT, ~ValT], ~RetT]) -> efro.util.ValueDispatcherMethod[~ValT, ~RetT]:
461def valuedispatchmethod(
462    call: Callable[[SelfT, ValT], RetT]
463) -> ValueDispatcherMethod[ValT, RetT]:
464    """Like valuedispatch but works with methods instead of functions."""
465
466    # NOTE: It seems that to wrap a method with a decorator and have self
467    # dispatching do the right thing, we must return a function and not
468    # an executable object. So for this version we store our data here
469    # in the function call dict and simply return a call.
470
471    _base_call = call
472    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
473
474    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
475        if value in _handlers:
476            raise RuntimeError(f'Duplicate handlers added for {value}')
477        _handlers[value] = addcall
478
479    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
480        from functools import partial
481
482        return partial(_add_handler, value)
483
484    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
485        handler = _handlers.get(value)
486        if handler is not None:
487            return handler(self)
488        return _base_call(self, value)
489
490    # We still want to use our returned object to register handlers, but we're
491    # actually just returning a function. So manually stuff the call onto it.
492    setattr(_call_wrapper, 'register', _register)
493
494    # To the type checker's eyes we return a ValueDispatchMethod instance;
495    # this lets it know about our register func and type-check its usage.
496    # In reality we just return a raw function call (for reasons listed above).
497    # pylint: disable=undefined-variable, no-else-return
498    if TYPE_CHECKING:
499        return ValueDispatcherMethod[ValT, RetT]()
500    else:
501        return _call_wrapper

Like valuedispatch but works with methods instead of functions.

def make_hash(obj: Any) -> int:
504def make_hash(obj: Any) -> int:
505    """Makes a hash from a dictionary, list, tuple or set to any level,
506    that contains only other hashable types (including any lists, tuples,
507    sets, and dictionaries).
508
509    Note that this uses Python's hash() function internally so collisions/etc.
510    may be more common than with fancy cryptographic hashes.
511
512    Also be aware that Python's hash() output varies across processes, so
513    this should only be used for values that will remain in a single process.
514    """
515    import copy
516
517    if isinstance(obj, (set, tuple, list)):
518        return hash(tuple(make_hash(e) for e in obj))
519    if not isinstance(obj, dict):
520        return hash(obj)
521
522    new_obj = copy.deepcopy(obj)
523    for k, v in new_obj.items():
524        new_obj[k] = make_hash(v)
525
526    # NOTE: there is sorted works correctly because it compares only
527    # unique first values (i.e. dict keys)
528    return hash(tuple(frozenset(sorted(new_obj.items()))))

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python's hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python's hash() output varies across processes, so this should only be used for values that will remain in a single process.

def float_hash_from_string(s: str) -> float:
531def float_hash_from_string(s: str) -> float:
532    """Given a string value, returns a float between 0 and 1.
533
534    If consistent across processes. Can be useful for assigning db ids
535    shard values for efficient parallel processing.
536    """
537    import hashlib
538
539    hash_bytes = hashlib.md5(s.encode()).digest()
540
541    # Generate a random 64 bit int from hash digest bytes.
542    ival = int.from_bytes(hash_bytes[:8])
543    return ival / ((1 << 64) - 1)

Given a string value, returns a float between 0 and 1.

If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.

def asserttype(obj: Any, typ: type[~T]) -> ~T:
546def asserttype(obj: Any, typ: type[T]) -> T:
547    """Return an object typed as a given type.
548
549    Assert is used to check its actual type, so only use this when
550    failures are not expected. Otherwise use checktype.
551    """
552    assert isinstance(typ, type), 'only actual types accepted'
553    assert isinstance(obj, typ)
554    return obj

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def asserttype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
557def asserttype_o(obj: Any, typ: type[T]) -> T | None:
558    """Return an object typed as a given optional type.
559
560    Assert is used to check its actual type, so only use this when
561    failures are not expected. Otherwise use checktype.
562    """
563    assert isinstance(typ, type), 'only actual types accepted'
564    assert isinstance(obj, (typ, type(None)))
565    return obj

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

def checktype(obj: Any, typ: type[~T]) -> ~T:
568def checktype(obj: Any, typ: type[T]) -> T:
569    """Return an object typed as a given type.
570
571    Always checks the type at runtime with isinstance and throws a TypeError
572    on failure. Use asserttype for more efficient (but less safe) equivalent.
573    """
574    assert isinstance(typ, type), 'only actual types accepted'
575    if not isinstance(obj, typ):
576        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
577    return obj

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def checktype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
580def checktype_o(obj: Any, typ: type[T]) -> T | None:
581    """Return an object typed as a given optional type.
582
583    Always checks the type at runtime with isinstance and throws a TypeError
584    on failure. Use asserttype for more efficient (but less safe) equivalent.
585    """
586    assert isinstance(typ, type), 'only actual types accepted'
587    if not isinstance(obj, (typ, type(None))):
588        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
589    return obj

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

def warntype(obj: Any, typ: type[~T]) -> ~T:
592def warntype(obj: Any, typ: type[T]) -> T:
593    """Return an object typed as a given type.
594
595    Always checks the type at runtime and simply logs a warning if it is
596    not what is expected.
597    """
598    assert isinstance(typ, type), 'only actual types accepted'
599    if not isinstance(obj, typ):
600        import logging
601
602        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
603    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def warntype_o(obj: Any, typ: type[~T]) -> Optional[~T]:
606def warntype_o(obj: Any, typ: type[T]) -> T | None:
607    """Return an object typed as a given type.
608
609    Always checks the type at runtime and simply logs a warning if it is
610    not what is expected.
611    """
612    assert isinstance(typ, type), 'only actual types accepted'
613    if not isinstance(obj, (typ, type(None))):
614        import logging
615
616        logging.warning(
617            'warntype: expected a %s or None, got a %s', typ, type(obj)
618        )
619    return obj  # type: ignore

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

def assert_non_optional(obj: Optional[~T]) -> ~T:
622def assert_non_optional(obj: T | None) -> T:
623    """Return an object with Optional typing removed.
624
625    Assert is used to check its actual type, so only use this when
626    failures are not expected. Use check_non_optional otherwise.
627    """
628    assert obj is not None
629    return obj

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

def check_non_optional(obj: Optional[~T]) -> ~T:
632def check_non_optional(obj: T | None) -> T:
633    """Return an object with Optional typing removed.
634
635    Always checks the actual type and throws a TypeError on failure.
636    Use assert_non_optional for a more efficient (but less safe) equivalent.
637    """
638    if obj is None:
639        raise ValueError('Got None value in check_non_optional.')
640    return obj

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

def smoothstep(edge0: float, edge1: float, x: float) -> float:
643def smoothstep(edge0: float, edge1: float, x: float) -> float:
644    """A smooth transition function.
645
646    Returns a value that smoothly moves from 0 to 1 as we go between edges.
647    Values outside of the range return 0 or 1.
648    """
649    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
650    return y * y * (3.0 - 2.0 * y)

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def linearstep(edge0: float, edge1: float, x: float) -> float:
653def linearstep(edge0: float, edge1: float, x: float) -> float:
654    """A linear transition function.
655
656    Returns a value that linearly moves from 0 to 1 as we go between edges.
657    Values outside of the range return 0 or 1.
658    """
659    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

def human_readable_compact_id(num: int) -> str:
678def human_readable_compact_id(num: int) -> str:
679    """Given a positive int, return a compact string representation for it.
680
681    Handy for visualizing unique numeric ids using as few as possible chars.
682    This representation uses only lowercase letters and numbers (minus the
683    following letters for readability):
684     's' is excluded due to similarity to '5'.
685     'l' is excluded due to similarity to '1'.
686     'i' is excluded due to similarity to '1'.
687     'o' is excluded due to similarity to '0'.
688     'z' is excluded due to similarity to '2'.
689
690    Therefore for n chars this can store values of 21^n.
691
692    When reading human input consisting of these IDs, it may be desirable
693    to map the disallowed chars to their corresponding allowed ones
694    ('o' -> '0', etc).
695
696    Sort order for these ids is the same as the original numbers.
697
698    If more compactness is desired at the expense of readability,
699    use compact_id() instead.
700    """
701    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability): 's' is excluded due to similarity to '5'. 'l' is excluded due to similarity to '1'. 'i' is excluded due to similarity to '1'. 'o' is excluded due to similarity to '0'. 'z' is excluded due to similarity to '2'.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones ('o' -> '0', etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

def compact_id(num: int) -> str:
704def compact_id(num: int) -> str:
705    """Given a positive int, return a compact string representation for it.
706
707    Handy for visualizing unique numeric ids using as few as possible chars.
708    This version is more compact than human_readable_compact_id() but less
709    friendly to humans due to using both capital and lowercase letters,
710    both 'O' and '0', etc.
711
712    Therefore for n chars this can store values of 62^n.
713
714    Sort order for these ids is the same as the original numbers.
715    """
716    return _compact_id(
717        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
718    )

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both 'O' and '0', etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

def caller_source_location() -> str:
721def caller_source_location() -> str:
722    """Returns source file name and line of the code calling us.
723
724    Example: 'mymodule.py:23'
725    """
726    try:
727        import inspect
728
729        frame = inspect.currentframe()
730        for _i in range(2):
731            if frame is None:
732                raise RuntimeError()
733            frame = frame.f_back
734        if frame is None:
735            raise RuntimeError()
736        fname = os.path.basename(frame.f_code.co_filename)
737        return f'{fname}:{frame.f_lineno}'
738    except Exception:
739        return '<unknown source location>'

Returns source file name and line of the code calling us.

Example: 'mymodule.py:23'

def unchanging_hostname() -> str:
742def unchanging_hostname() -> str:
743    """Return an unchanging name for the local device.
744
745    Similar to the `hostname` call (or os.uname().nodename in Python)
746    except attempts to give a name that doesn't change depending on
747    network conditions. (A Mac will tend to go from Foo to Foo.local,
748    Foo.lan etc. throughout its various adventures)
749    """
750    import platform
751    import subprocess
752
753    # On Mac, this should give the computer name assigned in System Prefs.
754    if platform.system() == 'Darwin':
755        return (
756            subprocess.run(
757                ['scutil', '--get', 'ComputerName'],
758                check=True,
759                capture_output=True,
760            )
761            .stdout.decode()
762            .strip()
763            .replace(' ', '-')
764        )
765    return os.uname().nodename

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn't change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

def set_canonical_module_names(module_globals: dict[str, typing.Any]) -> None:
768def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
769    """Do the thing."""
770    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
771        return
772
773    modulename = module_globals.get('__name__')
774    if not isinstance(modulename, str):
775        raise RuntimeError('Unable to get module name.')
776    assert not modulename.startswith('_')
777    modulename_prefix = f'{modulename}.'
778    modulename_prefix_2 = f'_{modulename}.'
779
780    for name, obj in module_globals.items():
781        if name.startswith('_'):
782            continue
783        existing = getattr(obj, '__module__', None)
784        try:
785            # Override the module ONLY if it lives under us somewhere.
786            # So ourpackage._submodule.Foo becomes ourpackage.Foo
787            # but otherpackage._submodule.Foo remains untouched.
788            if existing is not None and (
789                existing.startswith(modulename_prefix)
790                or existing.startswith(modulename_prefix_2)
791            ):
792                obj.__module__ = modulename
793        except Exception:
794            import logging
795
796            logging.warning(
797                'set_canonical_module_names: unable to change __module__'
798                " from '%s' to '%s' on %s object at '%s'.",
799                existing,
800                modulename,
801                type(obj),
802                name,
803            )

Do the thing.

def timedelta_str( timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0) -> str:
806def timedelta_str(
807    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
808) -> str:
809    """Return a simple human readable time string for a length of time.
810
811    Time can be given as a timedelta or a float representing seconds.
812    Example output:
813      "23d 1h 2m 32s" (with maxparts == 4)
814      "23d 1h" (with maxparts == 2)
815      "23d 1.08h" (with maxparts == 2 and decimals == 2)
816
817    Note that this is hard-coded in English and probably not especially
818    performant.
819    """
820    # pylint: disable=too-many-locals
821
822    if isinstance(timeval, float):
823        timevalfin = datetime.timedelta(seconds=timeval)
824    else:
825        timevalfin = timeval
826
827    # Internally we only handle positive values.
828    if timevalfin.total_seconds() < 0:
829        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
830
831    years = timevalfin.days // 365
832    days = timevalfin.days % 365
833    hours = timevalfin.seconds // 3600
834    hour_remainder = timevalfin.seconds % 3600
835    minutes = hour_remainder // 60
836    seconds = hour_remainder % 60
837
838    # Now, if we want decimal places for our last value,
839    # calc fractional parts.
840    if decimals:
841        # Calc totals of each type.
842        t_seconds = timevalfin.total_seconds()
843        t_minutes = t_seconds / 60
844        t_hours = t_minutes / 60
845        t_days = t_hours / 24
846        t_years = t_days / 365
847
848        # Calc fractional parts that exclude all whole values to their left.
849        years_covered = years
850        years_f = t_years - years_covered
851        days_covered = years_covered * 365 + days
852        days_f = t_days - days_covered
853        hours_covered = days_covered * 24 + hours
854        hours_f = t_hours - hours_covered
855        minutes_covered = hours_covered * 60 + minutes
856        minutes_f = t_minutes - minutes_covered
857        seconds_covered = minutes_covered * 60 + seconds
858        seconds_f = t_seconds - seconds_covered
859    else:
860        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
861
862    parts: list[str] = []
863    for part, part_f, suffix in (
864        (years, years_f, 'y'),
865        (days, days_f, 'd'),
866        (hours, hours_f, 'h'),
867        (minutes, minutes_f, 'm'),
868        (seconds, seconds_f, 's'),
869    ):
870        if part or parts or (not parts and suffix == 's'):
871            # Do decimal version only for the last part.
872            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
873                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
874            else:
875                parts.append(f'{part}{suffix}')
876            if len(parts) >= maxparts:
877                break
878    return ' '.join(parts)

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output: "23d 1h 2m 32s" (with maxparts == 4) "23d 1h" (with maxparts == 2) "23d 1.08h" (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

def ago_str( timeval: datetime.datetime, maxparts: int = 1, now: datetime.datetime | None = None, decimals: int = 0) -> str:
881def ago_str(
882    timeval: datetime.datetime,
883    maxparts: int = 1,
884    now: datetime.datetime | None = None,
885    decimals: int = 0,
886) -> str:
887    """Given a datetime, return a clean human readable 'ago' str.
888
889    Note that this is hard-coded in English so should not be used
890    for visible in-game elements; only tools/etc.
891
892    If now is not passed, efro.util.utc_now() is used.
893    """
894    if now is None:
895        now = utc_now()
896    return (
897        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
898        + ' ago'
899    )

Given a datetime, return a clean human readable 'ago' str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, utc_now() is used.

def split_list(input_list: list[~T], max_length: int) -> list[list[~T]]:
902def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
903    """Split a single list into smaller lists."""
904    return [
905        input_list[i : i + max_length]
906        for i in range(0, len(input_list), max_length)
907    ]

Split a single list into smaller lists.

def extract_flag(args: list[str], name: str) -> bool:
910def extract_flag(args: list[str], name: str) -> bool:
911    """Given a list of args and a flag name, returns whether it is present.
912
913    The arg flag, if present, is removed from the arg list.
914    """
915    from efro.error import CleanError
916
917    count = args.count(name)
918    if count > 1:
919        raise CleanError(f'Flag {name} passed multiple times.')
920    if not count:
921        return False
922    args.remove(name)
923    return True

Given a list of args and a flag name, returns whether it is present.

The arg flag, if present, is removed from the arg list.

def extract_arg(args: list[str], name: str, required: bool = False) -> str | None:
936def extract_arg(
937    args: list[str], name: str, required: bool = False
938) -> str | None:
939    """Given a list of args and an arg name, returns a value.
940
941    The arg flag and value are removed from the arg list.
942    raises CleanErrors on any problems.
943    """
944    from efro.error import CleanError
945
946    count = args.count(name)
947    if not count:
948        if required:
949            raise CleanError(f'Required argument {name} not passed.')
950        return None
951
952    if count > 1:
953        raise CleanError(f'Arg {name} passed multiple times.')
954
955    argindex = args.index(name)
956    if argindex + 1 >= len(args):
957        raise CleanError(f'No value passed after {name} arg.')
958
959    val = args[argindex + 1]
960    del args[argindex : argindex + 2]
961
962    return val

Given a list of args and an arg name, returns a value.

The arg flag and value are removed from the arg list. raises CleanErrors on any problems.