
Small handy bits of functionality.

  1# Released under the MIT License. See LICENSE for details.
  3"""Small handy bits of functionality."""
  5from __future__ import annotations
  7import os
  8import time
  9import weakref
 10import datetime
 11import functools
 12from enum import Enum
 13from typing import TYPE_CHECKING, cast, TypeVar, Generic
 16    import asyncio
 17    from efro.call import Call as Call  # 'as Call' so we re-export.
 18    from typing import Any, Callable
 20T = TypeVar('T')
 21ValT = TypeVar('ValT')
 22ArgT = TypeVar('ArgT')
 23SelfT = TypeVar('SelfT')
 24RetT = TypeVar('RetT')
 25EnumT = TypeVar('EnumT', bound=Enum)
 28class _EmptyObj:
 29    pass
 32# A dead weak-ref should be immutable, right? So we can create exactly
 33# one and return it for all cases that need an empty weak-ref.
 34_g_empty_weak_ref = weakref.ref(_EmptyObj())
 35assert _g_empty_weak_ref() is None
 38# TODO: kill this and just use efro.call.tpartial
 40    Call = Call
 42    Call = functools.partial
 45def explicit_bool(val: bool) -> bool:
 46    """Return a non-inferable boolean value.
 48    Useful to be able to disable blocks of code without type checkers
 49    complaining/etc.
 50    """
 51    # pylint: disable=no-else-return
 52    if TYPE_CHECKING:
 53        # infer this! <boom>
 54        import random
 56        return random.random() < 0.5
 57    else:
 58        return val
 61def snake_case_to_title(val: str) -> str:
 62    """Given a snake-case string 'foo_bar', returns 'Foo Bar'."""
 63    # Kill empty words resulting from leading/trailing/multiple underscores.
 64    return ' '.join(w for w in val.split('_') if w).title()
 67def snake_case_to_camel_case(val: str) -> str:
 68    """Given a snake-case string 'foo_bar', returns camel-case 'FooBar'."""
 69    # Replace underscores with spaces; capitalize words; kill spaces.
 70    # Not sure about efficiency, but logically simple.
 71    return val.replace('_', ' ').title().replace(' ', '')
 74def enum_by_value(cls: type[EnumT], value: Any) -> EnumT:
 75    """Create an enum from a value.
 77    This is basically the same as doing 'obj = EnumType(value)' except
 78    that it works around an issue where a reference loop is created
 79    if an exception is thrown due to an invalid value. Since we disable
 80    the cyclic garbage collector for most of the time, such loops can lead
 81    to our objects sticking around longer than we want.
 82    This issue has been submitted to Python as a bug so hopefully we can
 83    remove this eventually if it gets fixed: https://bugs.python.org/issue42248
 84    UPDATE: This has been fixed as of later 3.8 builds, so we can kill this
 85    off once we are 3.9+ across the board.
 86    """
 88    # Note: we don't recreate *ALL* the functionality of the Enum constructor
 89    # such as the _missing_ hook; but this should cover our basic needs.
 90    value2member_map = getattr(cls, '_value2member_map_')
 91    assert value2member_map is not None
 92    try:
 93        out = value2member_map[value]
 94        assert isinstance(out, cls)
 95        return out
 96    except KeyError:
 97        # pylint: disable=consider-using-f-string
 98        raise ValueError(
 99            '%r is not a valid %s' % (value, cls.__name__)
100        ) from None
103def check_utc(value: datetime.datetime) -> None:
104    """Ensure a datetime value is timezone-aware utc."""
105    if value.tzinfo is not datetime.UTC:
106        raise ValueError(
107            'datetime value does not have timezone set as datetime.UTC'
108        )
111def utc_now() -> datetime.datetime:
112    """Get timezone-aware current utc time.
114    Just a shortcut for datetime.datetime.now(datetime.UTC).
115    Avoid datetime.datetime.utcnow() which is deprecated and gives naive
116    times.
117    """
118    return datetime.datetime.now(datetime.UTC)
121def utc_now_naive() -> datetime.datetime:
122    """Get naive utc time.
124    This can be used to replace datetime.utcnow(), which is now deprecated.
125    Most all code should migrate to use timezone-aware times instead of
126    this.
127    """
128    return datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
131def utc_today() -> datetime.datetime:
132    """Get offset-aware midnight in the utc time zone."""
133    now = datetime.datetime.now(datetime.UTC)
134    return datetime.datetime(
135        year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
136    )
139def utc_this_hour() -> datetime.datetime:
140    """Get offset-aware beginning of the current hour in the utc time zone."""
141    now = datetime.datetime.now(datetime.UTC)
142    return datetime.datetime(
143        year=now.year,
144        month=now.month,
145        day=now.day,
146        hour=now.hour,
147        tzinfo=now.tzinfo,
148    )
151def utc_this_minute() -> datetime.datetime:
152    """Get offset-aware beginning of current minute in the utc time zone."""
153    now = datetime.datetime.now(datetime.UTC)
154    return datetime.datetime(
155        year=now.year,
156        month=now.month,
157        day=now.day,
158        hour=now.hour,
159        minute=now.minute,
160        tzinfo=now.tzinfo,
161    )
164def empty_weakref(objtype: type[T]) -> weakref.ref[T]:
165    """Return an invalidated weak-reference for the specified type."""
166    # At runtime, all weakrefs are the same; our type arg is just
167    # for the static type checker.
168    del objtype  # Unused.
170    # Just create an object and let it die. Is there a cleaner way to do this?
171    # return weakref.ref(_EmptyObj())  # type: ignore
173    # Sharing a single ones seems at least a bit better.
174    return _g_empty_weak_ref  # type: ignore
177def data_size_str(bytecount: int, compact: bool = False) -> str:
178    """Given a size in bytes, returns a short human readable string.
180    In compact mode this should be 6 or fewer chars for most all
181    sane file sizes.
182    """
183    # pylint: disable=too-many-return-statements
185    # Special case: handle negatives.
186    if bytecount < 0:
187        val = data_size_str(-bytecount, compact=compact)
188        return f'-{val}'
190    if bytecount <= 999:
191        suffix = 'B' if compact else 'bytes'
192        return f'{bytecount} {suffix}'
193    kbytecount = bytecount / 1024
194    if round(kbytecount, 1) < 10.0:
195        return f'{kbytecount:.1f} KB'
196    if round(kbytecount, 0) < 999:
197        return f'{kbytecount:.0f} KB'
198    mbytecount = bytecount / (1024 * 1024)
199    if round(mbytecount, 1) < 10.0:
200        return f'{mbytecount:.1f} MB'
201    if round(mbytecount, 0) < 999:
202        return f'{mbytecount:.0f} MB'
203    gbytecount = bytecount / (1024 * 1024 * 1024)
204    if round(gbytecount, 1) < 10.0:
205        return f'{gbytecount:.1f} GB'
206    return f'{gbytecount:.0f} GB'
209class DirtyBit:
210    """Manages whether a thing is dirty and regulates attempts to clean it.
212    To use, simply set the 'dirty' value on this object to True when some
213    action is needed, and then check the 'should_update' value to regulate
214    when attempts to clean it should be made. Set 'dirty' back to False after
215    a successful update.
216    If 'use_lock' is True, an asyncio Lock will be created and incorporated
217    into update attempts to prevent simultaneous updates (should_update will
218    only return True when the lock is unlocked). Note that It is up to the user
219    to lock/unlock the lock during the actual update attempt.
220    If a value is passed for 'auto_dirty_seconds', the dirtybit will flip
221    itself back to dirty after being clean for the given amount of time.
222    'min_update_interval' can be used to enforce a minimum update
223    interval even when updates are successful (retry_interval only applies
224    when updates fail)
225    """
227    def __init__(
228        self,
229        dirty: bool = False,
230        retry_interval: float = 5.0,
231        use_lock: bool = False,
232        auto_dirty_seconds: float | None = None,
233        min_update_interval: float | None = None,
234    ):
235        curtime = time.monotonic()
236        self._retry_interval = retry_interval
237        self._auto_dirty_seconds = auto_dirty_seconds
238        self._min_update_interval = min_update_interval
239        self._dirty = dirty
240        self._next_update_time: float | None = curtime if dirty else None
241        self._last_update_time: float | None = None
242        self._next_auto_dirty_time: float | None = (
243            (curtime + self._auto_dirty_seconds)
244            if (not dirty and self._auto_dirty_seconds is not None)
245            else None
246        )
247        self._use_lock = use_lock
248        self.lock: asyncio.Lock
249        if self._use_lock:
250            import asyncio
252            self.lock = asyncio.Lock()
254    @property
255    def dirty(self) -> bool:
256        """Whether the target is currently dirty.
258        This should be set to False once an update is successful.
259        """
260        return self._dirty
262    @dirty.setter
263    def dirty(self, value: bool) -> None:
264        # If we're freshly clean, set our next auto-dirty time (if we have
265        # one).
266        if self._dirty and not value and self._auto_dirty_seconds is not None:
267            self._next_auto_dirty_time = (
268                time.monotonic() + self._auto_dirty_seconds
269            )
271        # If we're freshly dirty, schedule an immediate update.
272        if not self._dirty and value:
273            self._next_update_time = time.monotonic()
275            # If they want to enforce a minimum update interval,
276            # push out the next update time if it hasn't been long enough.
277            if (
278                self._min_update_interval is not None
279                and self._last_update_time is not None
280            ):
281                self._next_update_time = max(
282                    self._next_update_time,
283                    self._last_update_time + self._min_update_interval,
284                )
286        self._dirty = value
288    @property
289    def should_update(self) -> bool:
290        """Whether an attempt should be made to clean the target now.
292        Always returns False if the target is not dirty.
293        Takes into account the amount of time passed since the target
294        was marked dirty or since should_update last returned True.
295        """
296        curtime = time.monotonic()
298        # Auto-dirty ourself if we're into that.
299        if (
300            self._next_auto_dirty_time is not None
301            and curtime > self._next_auto_dirty_time
302        ):
303            self.dirty = True
304            self._next_auto_dirty_time = None
305        if not self._dirty:
306            return False
307        if self._use_lock and self.lock.locked():
308            return False
309        assert self._next_update_time is not None
310        if curtime > self._next_update_time:
311            self._next_update_time = curtime + self._retry_interval
312            self._last_update_time = curtime
313            return True
314        return False
317class DispatchMethodWrapper(Generic[ArgT, RetT]):
318    """Type-aware standin for the dispatch func returned by dispatchmethod."""
320    def __call__(self, arg: ArgT) -> RetT:
321        raise RuntimeError('Should not get here')
323    @staticmethod
324    def register(
325        func: Callable[[Any, Any], RetT]
326    ) -> Callable[[Any, Any], RetT]:
327        """Register a new dispatch handler for this dispatch-method."""
328        raise RuntimeError('Should not get here')
330    registry: dict[Any, Callable]
333# noinspection PyProtectedMember,PyTypeHints
334def dispatchmethod(
335    func: Callable[[Any, ArgT], RetT]
336) -> DispatchMethodWrapper[ArgT, RetT]:
337    """A variation of functools.singledispatch for methods.
339    Note: as of Python 3.9 there is now functools.singledispatchmethod,
340    but it currently (as of Jan 2021) is not type-aware (at least in mypy),
341    which gives us a reason to keep this one around for now.
342    """
343    from functools import singledispatch, update_wrapper
345    origwrapper: Any = singledispatch(func)
347    # Pull this out so hopefully origwrapper can die,
348    # otherwise we reference origwrapper in our wrapper.
349    dispatch = origwrapper.dispatch
351    # All we do here is recreate the end of functools.singledispatch
352    # where it returns a wrapper except instead of the wrapper using the
353    # first arg to the function ours uses the second (to skip 'self').
354    # This was made against Python 3.7; we should probably check up on
355    # this in later versions in case anything has changed.
356    # (or hopefully they'll add this functionality to their version)
357    # NOTE: sounds like we can use functools singledispatchmethod in 3.8
358    def wrapper(*args: Any, **kw: Any) -> Any:
359        if not args or len(args) < 2:
360            raise TypeError(
361                f'{funcname} requires at least ' '2 positional arguments'
362            )
364        return dispatch(args[1].__class__)(*args, **kw)
366    funcname = getattr(func, '__name__', 'dispatchmethod method')
367    wrapper.register = origwrapper.register  # type: ignore
368    wrapper.dispatch = dispatch  # type: ignore
369    wrapper.registry = origwrapper.registry  # type: ignore
370    # pylint: disable=protected-access
371    wrapper._clear_cache = origwrapper._clear_cache  # type: ignore
372    update_wrapper(wrapper, func)
373    # pylint: enable=protected-access
374    return cast(DispatchMethodWrapper, wrapper)
377def valuedispatch(call: Callable[[ValT], RetT]) -> ValueDispatcher[ValT, RetT]:
378    """Decorator for functions to allow dispatching based on a value.
380    This differs from functools.singledispatch in that it dispatches based
381    on the value of an argument, not based on its type.
382    The 'register' method of a value-dispatch function can be used
383    to assign new functions to handle particular values.
384    Unhandled values wind up in the original dispatch function."""
385    return ValueDispatcher(call)
388class ValueDispatcher(Generic[ValT, RetT]):
389    """Used by the valuedispatch decorator"""
391    def __init__(self, call: Callable[[ValT], RetT]) -> None:
392        self._base_call = call
393        self._handlers: dict[ValT, Callable[[], RetT]] = {}
395    def __call__(self, value: ValT) -> RetT:
396        handler = self._handlers.get(value)
397        if handler is not None:
398            return handler()
399        return self._base_call(value)
401    def _add_handler(
402        self, value: ValT, call: Callable[[], RetT]
403    ) -> Callable[[], RetT]:
404        if value in self._handlers:
405            raise RuntimeError(f'Duplicate handlers added for {value}')
406        self._handlers[value] = call
407        return call
409    def register(
410        self, value: ValT
411    ) -> Callable[[Callable[[], RetT]], Callable[[], RetT]]:
412        """Add a handler to the dispatcher."""
413        from functools import partial
415        return partial(self._add_handler, value)
418def valuedispatch1arg(
419    call: Callable[[ValT, ArgT], RetT]
420) -> ValueDispatcher1Arg[ValT, ArgT, RetT]:
421    """Like valuedispatch but for functions taking an extra argument."""
422    return ValueDispatcher1Arg(call)
425class ValueDispatcher1Arg(Generic[ValT, ArgT, RetT]):
426    """Used by the valuedispatch1arg decorator"""
428    def __init__(self, call: Callable[[ValT, ArgT], RetT]) -> None:
429        self._base_call = call
430        self._handlers: dict[ValT, Callable[[ArgT], RetT]] = {}
432    def __call__(self, value: ValT, arg: ArgT) -> RetT:
433        handler = self._handlers.get(value)
434        if handler is not None:
435            return handler(arg)
436        return self._base_call(value, arg)
438    def _add_handler(
439        self, value: ValT, call: Callable[[ArgT], RetT]
440    ) -> Callable[[ArgT], RetT]:
441        if value in self._handlers:
442            raise RuntimeError(f'Duplicate handlers added for {value}')
443        self._handlers[value] = call
444        return call
446    def register(
447        self, value: ValT
448    ) -> Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]]:
449        """Add a handler to the dispatcher."""
450        from functools import partial
452        return partial(self._add_handler, value)
457    class ValueDispatcherMethod(Generic[ValT, RetT]):
458        """Used by the valuedispatchmethod decorator."""
460        def __call__(self, value: ValT) -> RetT: ...
462        def register(
463            self, value: ValT
464        ) -> Callable[[Callable[[SelfT], RetT]], Callable[[SelfT], RetT]]:
465            """Add a handler to the dispatcher."""
466            ...
469def valuedispatchmethod(
470    call: Callable[[SelfT, ValT], RetT]
471) -> ValueDispatcherMethod[ValT, RetT]:
472    """Like valuedispatch but works with methods instead of functions."""
474    # NOTE: It seems that to wrap a method with a decorator and have self
475    # dispatching do the right thing, we must return a function and not
476    # an executable object. So for this version we store our data here
477    # in the function call dict and simply return a call.
479    _base_call = call
480    _handlers: dict[ValT, Callable[[SelfT], RetT]] = {}
482    def _add_handler(value: ValT, addcall: Callable[[SelfT], RetT]) -> None:
483        if value in _handlers:
484            raise RuntimeError(f'Duplicate handlers added for {value}')
485        _handlers[value] = addcall
487    def _register(value: ValT) -> Callable[[Callable[[SelfT], RetT]], None]:
488        from functools import partial
490        return partial(_add_handler, value)
492    def _call_wrapper(self: SelfT, value: ValT) -> RetT:
493        handler = _handlers.get(value)
494        if handler is not None:
495            return handler(self)
496        return _base_call(self, value)
498    # We still want to use our returned object to register handlers, but we're
499    # actually just returning a function. So manually stuff the call onto it.
500    setattr(_call_wrapper, 'register', _register)
502    # To the type checker's eyes we return a ValueDispatchMethod instance;
503    # this lets it know about our register func and type-check its usage.
504    # In reality we just return a raw function call (for reasons listed above).
505    # pylint: disable=undefined-variable, no-else-return
506    if TYPE_CHECKING:
507        return ValueDispatcherMethod[ValT, RetT]()
508    else:
509        return _call_wrapper
512def make_hash(obj: Any) -> int:
513    """Makes a hash from a dictionary, list, tuple or set to any level,
514    that contains only other hashable types (including any lists, tuples,
515    sets, and dictionaries).
517    Note that this uses Python's hash() function internally so collisions/etc.
518    may be more common than with fancy cryptographic hashes.
520    Also be aware that Python's hash() output varies across processes, so
521    this should only be used for values that will remain in a single process.
522    """
523    import copy
525    if isinstance(obj, (set, tuple, list)):
526        return hash(tuple(make_hash(e) for e in obj))
527    if not isinstance(obj, dict):
528        return hash(obj)
530    new_obj = copy.deepcopy(obj)
531    for k, v in new_obj.items():
532        new_obj[k] = make_hash(v)
534    # NOTE: there is sorted works correctly because it compares only
535    # unique first values (i.e. dict keys)
536    return hash(tuple(frozenset(sorted(new_obj.items()))))
539def asserttype(obj: Any, typ: type[T]) -> T:
540    """Return an object typed as a given type.
542    Assert is used to check its actual type, so only use this when
543    failures are not expected. Otherwise use checktype.
544    """
545    assert isinstance(typ, type), 'only actual types accepted'
546    assert isinstance(obj, typ)
547    return obj
550def asserttype_o(obj: Any, typ: type[T]) -> T | None:
551    """Return an object typed as a given optional type.
553    Assert is used to check its actual type, so only use this when
554    failures are not expected. Otherwise use checktype.
555    """
556    assert isinstance(typ, type), 'only actual types accepted'
557    assert isinstance(obj, (typ, type(None)))
558    return obj
561def checktype(obj: Any, typ: type[T]) -> T:
562    """Return an object typed as a given type.
564    Always checks the type at runtime with isinstance and throws a TypeError
565    on failure. Use asserttype for more efficient (but less safe) equivalent.
566    """
567    assert isinstance(typ, type), 'only actual types accepted'
568    if not isinstance(obj, typ):
569        raise TypeError(f'Expected a {typ}; got a {type(obj)}.')
570    return obj
573def checktype_o(obj: Any, typ: type[T]) -> T | None:
574    """Return an object typed as a given optional type.
576    Always checks the type at runtime with isinstance and throws a TypeError
577    on failure. Use asserttype for more efficient (but less safe) equivalent.
578    """
579    assert isinstance(typ, type), 'only actual types accepted'
580    if not isinstance(obj, (typ, type(None))):
581        raise TypeError(f'Expected a {typ} or None; got a {type(obj)}.')
582    return obj
585def warntype(obj: Any, typ: type[T]) -> T:
586    """Return an object typed as a given type.
588    Always checks the type at runtime and simply logs a warning if it is
589    not what is expected.
590    """
591    assert isinstance(typ, type), 'only actual types accepted'
592    if not isinstance(obj, typ):
593        import logging
595        logging.warning('warntype: expected a %s, got a %s', typ, type(obj))
596    return obj  # type: ignore
599def warntype_o(obj: Any, typ: type[T]) -> T | None:
600    """Return an object typed as a given type.
602    Always checks the type at runtime and simply logs a warning if it is
603    not what is expected.
604    """
605    assert isinstance(typ, type), 'only actual types accepted'
606    if not isinstance(obj, (typ, type(None))):
607        import logging
609        logging.warning(
610            'warntype: expected a %s or None, got a %s', typ, type(obj)
611        )
612    return obj  # type: ignore
615def assert_non_optional(obj: T | None) -> T:
616    """Return an object with Optional typing removed.
618    Assert is used to check its actual type, so only use this when
619    failures are not expected. Use check_non_optional otherwise.
620    """
621    assert obj is not None
622    return obj
625def check_non_optional(obj: T | None) -> T:
626    """Return an object with Optional typing removed.
628    Always checks the actual type and throws a TypeError on failure.
629    Use assert_non_optional for a more efficient (but less safe) equivalent.
630    """
631    if obj is None:
632        raise ValueError('Got None value in check_non_optional.')
633    return obj
636def smoothstep(edge0: float, edge1: float, x: float) -> float:
637    """A smooth transition function.
639    Returns a value that smoothly moves from 0 to 1 as we go between edges.
640    Values outside of the range return 0 or 1.
641    """
642    y = min(1.0, max(0.0, (x - edge0) / (edge1 - edge0)))
643    return y * y * (3.0 - 2.0 * y)
646def linearstep(edge0: float, edge1: float, x: float) -> float:
647    """A linear transition function.
649    Returns a value that linearly moves from 0 to 1 as we go between edges.
650    Values outside of the range return 0 or 1.
651    """
652    return max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
655def _compact_id(num: int, chars: str) -> str:
656    if num < 0:
657        raise ValueError('Negative integers not allowed.')
659    # Chars must be in sorted order for sorting to work correctly
660    # on our output.
661    assert ''.join(sorted(list(chars))) == chars
663    base = len(chars)
664    out = ''
665    while num:
666        out += chars[num % base]
667        num //= base
668    return out[::-1] or '0'
671def human_readable_compact_id(num: int) -> str:
672    """Given a positive int, return a compact string representation for it.
674    Handy for visualizing unique numeric ids using as few as possible chars.
675    This representation uses only lowercase letters and numbers (minus the
676    following letters for readability):
677     's' is excluded due to similarity to '5'.
678     'l' is excluded due to similarity to '1'.
679     'i' is excluded due to similarity to '1'.
680     'o' is excluded due to similarity to '0'.
681     'z' is excluded due to similarity to '2'.
683    Therefore for n chars this can store values of 21^n.
685    When reading human input consisting of these IDs, it may be desirable
686    to map the disallowed chars to their corresponding allowed ones
687    ('o' -> '0', etc).
689    Sort order for these ids is the same as the original numbers.
691    If more compactness is desired at the expense of readability,
692    use compact_id() instead.
693    """
694    return _compact_id(num, '0123456789abcdefghjkmnpqrtuvwxy')
697def compact_id(num: int) -> str:
698    """Given a positive int, return a compact string representation for it.
700    Handy for visualizing unique numeric ids using as few as possible chars.
701    This version is more compact than human_readable_compact_id() but less
702    friendly to humans due to using both capital and lowercase letters,
703    both 'O' and '0', etc.
705    Therefore for n chars this can store values of 62^n.
707    Sort order for these ids is the same as the original numbers.
708    """
709    return _compact_id(
710        num, '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
711    )
714def unchanging_hostname() -> str:
715    """Return an unchanging name for the local device.
717    Similar to the `hostname` call (or os.uname().nodename in Python)
718    except attempts to give a name that doesn't change depending on
719    network conditions. (A Mac will tend to go from Foo to Foo.local,
720    Foo.lan etc. throughout its various adventures)
721    """
722    import platform
723    import subprocess
725    # On Mac, this should give the computer name assigned in System Prefs.
726    if platform.system() == 'Darwin':
727        return (
728            subprocess.run(
729                ['scutil', '--get', 'ComputerName'],
730                check=True,
731                capture_output=True,
732            )
733            .stdout.decode()
734            .strip()
735            .replace(' ', '-')
736        )
737    return os.uname().nodename
740def set_canonical_module_names(module_globals: dict[str, Any]) -> None:
741    """Do the thing."""
742    if os.environ.get('EFRO_SUPPRESS_SET_CANONICAL_MODULE_NAMES') == '1':
743        return
745    modulename = module_globals.get('__name__')
746    if not isinstance(modulename, str):
747        raise RuntimeError('Unable to get module name.')
748    assert not modulename.startswith('_')
749    modulename_prefix = f'{modulename}.'
750    modulename_prefix_2 = f'_{modulename}.'
752    for name, obj in module_globals.items():
753        if name.startswith('_'):
754            continue
755        existing = getattr(obj, '__module__', None)
756        try:
757            # Override the module ONLY if it lives under us somewhere.
758            # So ourpackage._submodule.Foo becomes ourpackage.Foo
759            # but otherpackage._submodule.Foo remains untouched.
760            if existing is not None and (
761                existing.startswith(modulename_prefix)
762                or existing.startswith(modulename_prefix_2)
763            ):
764                obj.__module__ = modulename
765        except Exception:
766            import logging
768            logging.warning(
769                'set_canonical_module_names: unable to change __module__'
770                " from '%s' to '%s' on %s object at '%s'.",
771                existing,
772                modulename,
773                type(obj),
774                name,
775            )
778def timedelta_str(
779    timeval: datetime.timedelta | float, maxparts: int = 2, decimals: int = 0
780) -> str:
781    """Return a simple human readable time string for a length of time.
783    Time can be given as a timedelta or a float representing seconds.
784    Example output:
785      "23d 1h 2m 32s" (with maxparts == 4)
786      "23d 1h" (with maxparts == 2)
787      "23d 1.08h" (with maxparts == 2 and decimals == 2)
789    Note that this is hard-coded in English and probably not especially
790    performant.
791    """
792    # pylint: disable=too-many-locals
794    if isinstance(timeval, float):
795        timevalfin = datetime.timedelta(seconds=timeval)
796    else:
797        timevalfin = timeval
799    # Internally we only handle positive values.
800    if timevalfin.total_seconds() < 0:
801        return f'-{timedelta_str(timeval=-timeval, maxparts=maxparts)}'
803    years = timevalfin.days // 365
804    days = timevalfin.days % 365
805    hours = timevalfin.seconds // 3600
806    hour_remainder = timevalfin.seconds % 3600
807    minutes = hour_remainder // 60
808    seconds = hour_remainder % 60
810    # Now, if we want decimal places for our last value,
811    # calc fractional parts.
812    if decimals:
813        # Calc totals of each type.
814        t_seconds = timevalfin.total_seconds()
815        t_minutes = t_seconds / 60
816        t_hours = t_minutes / 60
817        t_days = t_hours / 24
818        t_years = t_days / 365
820        # Calc fractional parts that exclude all whole values to their left.
821        years_covered = years
822        years_f = t_years - years_covered
823        days_covered = years_covered * 365 + days
824        days_f = t_days - days_covered
825        hours_covered = days_covered * 24 + hours
826        hours_f = t_hours - hours_covered
827        minutes_covered = hours_covered * 60 + minutes
828        minutes_f = t_minutes - minutes_covered
829        seconds_covered = minutes_covered * 60 + seconds
830        seconds_f = t_seconds - seconds_covered
831    else:
832        years_f = days_f = hours_f = minutes_f = seconds_f = 0.0
834    parts: list[str] = []
835    for part, part_f, suffix in (
836        (years, years_f, 'y'),
837        (days, days_f, 'd'),
838        (hours, hours_f, 'h'),
839        (minutes, minutes_f, 'm'),
840        (seconds, seconds_f, 's'),
841    ):
842        if part or parts or (not parts and suffix == 's'):
843            # Do decimal version only for the last part.
844            if decimals and (len(parts) >= maxparts - 1 or suffix == 's'):
845                parts.append(f'{part+part_f:.{decimals}f}{suffix}')
846            else:
847                parts.append(f'{part}{suffix}')
848            if len(parts) >= maxparts:
849                break
850    return ' '.join(parts)
853def ago_str(
854    timeval: datetime.datetime,
855    maxparts: int = 1,
856    now: datetime.datetime | None = None,
857    decimals: int = 0,
858) -> str:
859    """Given a datetime, return a clean human readable 'ago' str.
861    Note that this is hard-coded in English so should not be used
862    for visible in-game elements; only tools/etc.
864    If now is not passed, efro.util.utc_now() is used.
865    """
866    if now is None:
867        now = utc_now()
868    return (
869        timedelta_str(now - timeval, maxparts=maxparts, decimals=decimals)
870        + ' ago'
871    )
874def split_list(input_list: list[T], max_length: int) -> list[list[T]]:
875    """Split a single list into smaller lists."""
876    return [
877        input_list[i : i + max_length]
878        for i in range(0, len(input_list), max_length)
879    ]
