efro.threadpool

Thread pool functionality.

 1# Released under the MIT License. See LICENSE for details.
 2#
 3"""Thread pool functionality."""
 4
 5from __future__ import annotations
 6
 7import time
 8import logging
 9import threading
10from typing import TYPE_CHECKING, ParamSpec
11from concurrent.futures import ThreadPoolExecutor
12
13if TYPE_CHECKING:
14    from typing import Any, Callable
15    from concurrent.futures import Future
16
17P = ParamSpec('P')
18
19logger = logging.getLogger(__name__)
20
21
22class ThreadPoolExecutorPlus(ThreadPoolExecutor):
23    """A ThreadPoolExecutor with additional functionality added."""
24
25    def __init__(
26        self,
27        max_workers: int | None = None,
28        thread_name_prefix: str = '',
29        initializer: Callable[[], None] | None = None,
30        max_no_wait_count: int | None = None,
31    ) -> None:
32        super().__init__(
33            max_workers=max_workers,
34            thread_name_prefix=thread_name_prefix,
35            initializer=initializer,
36        )
37        self.no_wait_count = 0
38
39        self._max_no_wait_count = (
40            max_no_wait_count
41            if max_no_wait_count is not None
42            else 50 if max_workers is None else max_workers * 4
43        )
44        self._last_no_wait_warn_time: float | None = None
45        self._no_wait_count_lock = threading.Lock()
46
47    def submit_no_wait(
48        self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs
49    ) -> None:
50        """Submit work to the threadpool with no expectation of waiting.
51
52        Any errors occurring in the passed callable will be logged. This
53        call will block and log a warning if the threadpool reaches its
54        max queued no-wait call count.
55        """
56        # If we're too backlogged, issue a warning and block until we
57        # aren't. We don't bother with the lock here since this can be
58        # slightly inexact. In general we should aim to not hit this
59        # limit but it is good to have backpressure to avoid runaway
60        # queues in cases of network outages/etc.
61        if self.no_wait_count > self._max_no_wait_count:
62            now = time.monotonic()
63            if (
64                self._last_no_wait_warn_time is None
65                or now - self._last_no_wait_warn_time > 10.0
66            ):
67                logger.warning(
68                    'ThreadPoolExecutorPlus hit max no-wait limit of %s;'
69                    ' blocking.',
70                    self._max_no_wait_count,
71                )
72                self._last_no_wait_warn_time = now
73            while self.no_wait_count > self._max_no_wait_count:
74                time.sleep(0.01)
75
76        fut = self.submit(call, *args, **keywds)
77        with self._no_wait_count_lock:
78            self.no_wait_count += 1
79        fut.add_done_callback(self._no_wait_done)
80
81    def _no_wait_done(self, fut: Future) -> None:
82        with self._no_wait_count_lock:
83            self.no_wait_count -= 1
84        try:
85            fut.result()
86        except Exception:
87            logger.exception('Error in work submitted via submit_no_wait().')
P = ~P
logger = <Logger efro.threadpool (WARNING)>
class ThreadPoolExecutorPlus(concurrent.futures.thread.ThreadPoolExecutor):
23class ThreadPoolExecutorPlus(ThreadPoolExecutor):
24    """A ThreadPoolExecutor with additional functionality added."""
25
26    def __init__(
27        self,
28        max_workers: int | None = None,
29        thread_name_prefix: str = '',
30        initializer: Callable[[], None] | None = None,
31        max_no_wait_count: int | None = None,
32    ) -> None:
33        super().__init__(
34            max_workers=max_workers,
35            thread_name_prefix=thread_name_prefix,
36            initializer=initializer,
37        )
38        self.no_wait_count = 0
39
40        self._max_no_wait_count = (
41            max_no_wait_count
42            if max_no_wait_count is not None
43            else 50 if max_workers is None else max_workers * 4
44        )
45        self._last_no_wait_warn_time: float | None = None
46        self._no_wait_count_lock = threading.Lock()
47
48    def submit_no_wait(
49        self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs
50    ) -> None:
51        """Submit work to the threadpool with no expectation of waiting.
52
53        Any errors occurring in the passed callable will be logged. This
54        call will block and log a warning if the threadpool reaches its
55        max queued no-wait call count.
56        """
57        # If we're too backlogged, issue a warning and block until we
58        # aren't. We don't bother with the lock here since this can be
59        # slightly inexact. In general we should aim to not hit this
60        # limit but it is good to have backpressure to avoid runaway
61        # queues in cases of network outages/etc.
62        if self.no_wait_count > self._max_no_wait_count:
63            now = time.monotonic()
64            if (
65                self._last_no_wait_warn_time is None
66                or now - self._last_no_wait_warn_time > 10.0
67            ):
68                logger.warning(
69                    'ThreadPoolExecutorPlus hit max no-wait limit of %s;'
70                    ' blocking.',
71                    self._max_no_wait_count,
72                )
73                self._last_no_wait_warn_time = now
74            while self.no_wait_count > self._max_no_wait_count:
75                time.sleep(0.01)
76
77        fut = self.submit(call, *args, **keywds)
78        with self._no_wait_count_lock:
79            self.no_wait_count += 1
80        fut.add_done_callback(self._no_wait_done)
81
82    def _no_wait_done(self, fut: Future) -> None:
83        with self._no_wait_count_lock:
84            self.no_wait_count -= 1
85        try:
86            fut.result()
87        except Exception:
88            logger.exception('Error in work submitted via submit_no_wait().')

A ThreadPoolExecutor with additional functionality added.

ThreadPoolExecutorPlus( max_workers: int | None = None, thread_name_prefix: str = '', initializer: Optional[Callable[[], NoneType]] = None, max_no_wait_count: int | None = None)
26    def __init__(
27        self,
28        max_workers: int | None = None,
29        thread_name_prefix: str = '',
30        initializer: Callable[[], None] | None = None,
31        max_no_wait_count: int | None = None,
32    ) -> None:
33        super().__init__(
34            max_workers=max_workers,
35            thread_name_prefix=thread_name_prefix,
36            initializer=initializer,
37        )
38        self.no_wait_count = 0
39
40        self._max_no_wait_count = (
41            max_no_wait_count
42            if max_no_wait_count is not None
43            else 50 if max_workers is None else max_workers * 4
44        )
45        self._last_no_wait_warn_time: float | None = None
46        self._no_wait_count_lock = threading.Lock()

Initializes a new ThreadPoolExecutor instance.

Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer.

no_wait_count
def submit_no_wait(self, call: Callable[~P, Any], *args: P.args, **keywds: P.kwargs) -> None:
48    def submit_no_wait(
49        self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs
50    ) -> None:
51        """Submit work to the threadpool with no expectation of waiting.
52
53        Any errors occurring in the passed callable will be logged. This
54        call will block and log a warning if the threadpool reaches its
55        max queued no-wait call count.
56        """
57        # If we're too backlogged, issue a warning and block until we
58        # aren't. We don't bother with the lock here since this can be
59        # slightly inexact. In general we should aim to not hit this
60        # limit but it is good to have backpressure to avoid runaway
61        # queues in cases of network outages/etc.
62        if self.no_wait_count > self._max_no_wait_count:
63            now = time.monotonic()
64            if (
65                self._last_no_wait_warn_time is None
66                or now - self._last_no_wait_warn_time > 10.0
67            ):
68                logger.warning(
69                    'ThreadPoolExecutorPlus hit max no-wait limit of %s;'
70                    ' blocking.',
71                    self._max_no_wait_count,
72                )
73                self._last_no_wait_warn_time = now
74            while self.no_wait_count > self._max_no_wait_count:
75                time.sleep(0.01)
76
77        fut = self.submit(call, *args, **keywds)
78        with self._no_wait_count_lock:
79            self.no_wait_count += 1
80        fut.add_done_callback(self._no_wait_done)

Submit work to the threadpool with no expectation of waiting.

Any errors occurring in the passed callable will be logged. This call will block and log a warning if the threadpool reaches its max queued no-wait call count.