efro.threadpool
Thread pool functionality.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Thread pool functionality.""" 4 5from __future__ import annotations 6 7import time 8import logging 9import threading 10from typing import TYPE_CHECKING, ParamSpec 11from concurrent.futures import ThreadPoolExecutor 12 13if TYPE_CHECKING: 14 from typing import Any, Callable 15 from concurrent.futures import Future 16 17P = ParamSpec('P') 18 19logger = logging.getLogger(__name__) 20 21 22class ThreadPoolExecutorPlus(ThreadPoolExecutor): 23 """A ThreadPoolExecutor with additional functionality added.""" 24 25 def __init__( 26 self, 27 max_workers: int | None = None, 28 thread_name_prefix: str = '', 29 initializer: Callable[[], None] | None = None, 30 max_no_wait_count: int | None = None, 31 ) -> None: 32 super().__init__( 33 max_workers=max_workers, 34 thread_name_prefix=thread_name_prefix, 35 initializer=initializer, 36 ) 37 self.no_wait_count = 0 38 39 self._max_no_wait_count = ( 40 max_no_wait_count 41 if max_no_wait_count is not None 42 else 50 if max_workers is None else max_workers * 4 43 ) 44 self._last_no_wait_warn_time: float | None = None 45 self._no_wait_count_lock = threading.Lock() 46 47 def submit_no_wait( 48 self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs 49 ) -> None: 50 """Submit work to the threadpool with no expectation of waiting. 51 52 Any errors occurring in the passed callable will be logged. This 53 call will block and log a warning if the threadpool reaches its 54 max queued no-wait call count. 55 """ 56 # If we're too backlogged, issue a warning and block until we 57 # aren't. We don't bother with the lock here since this can be 58 # slightly inexact. In general we should aim to not hit this 59 # limit but it is good to have backpressure to avoid runaway 60 # queues in cases of network outages/etc. 61 if self.no_wait_count > self._max_no_wait_count: 62 now = time.monotonic() 63 if ( 64 self._last_no_wait_warn_time is None 65 or now - self._last_no_wait_warn_time > 10.0 66 ): 67 logger.warning( 68 'ThreadPoolExecutorPlus hit max no-wait limit of %s;' 69 ' blocking.', 70 self._max_no_wait_count, 71 ) 72 self._last_no_wait_warn_time = now 73 while self.no_wait_count > self._max_no_wait_count: 74 time.sleep(0.01) 75 76 fut = self.submit(call, *args, **keywds) 77 with self._no_wait_count_lock: 78 self.no_wait_count += 1 79 fut.add_done_callback(self._no_wait_done) 80 81 def _no_wait_done(self, fut: Future) -> None: 82 with self._no_wait_count_lock: 83 self.no_wait_count -= 1 84 try: 85 fut.result() 86 except Exception: 87 logger.exception('Error in work submitted via submit_no_wait().')
P =
~P
logger =
<Logger efro.threadpool (WARNING)>
class
ThreadPoolExecutorPlus(concurrent.futures.thread.ThreadPoolExecutor):
23class ThreadPoolExecutorPlus(ThreadPoolExecutor): 24 """A ThreadPoolExecutor with additional functionality added.""" 25 26 def __init__( 27 self, 28 max_workers: int | None = None, 29 thread_name_prefix: str = '', 30 initializer: Callable[[], None] | None = None, 31 max_no_wait_count: int | None = None, 32 ) -> None: 33 super().__init__( 34 max_workers=max_workers, 35 thread_name_prefix=thread_name_prefix, 36 initializer=initializer, 37 ) 38 self.no_wait_count = 0 39 40 self._max_no_wait_count = ( 41 max_no_wait_count 42 if max_no_wait_count is not None 43 else 50 if max_workers is None else max_workers * 4 44 ) 45 self._last_no_wait_warn_time: float | None = None 46 self._no_wait_count_lock = threading.Lock() 47 48 def submit_no_wait( 49 self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs 50 ) -> None: 51 """Submit work to the threadpool with no expectation of waiting. 52 53 Any errors occurring in the passed callable will be logged. This 54 call will block and log a warning if the threadpool reaches its 55 max queued no-wait call count. 56 """ 57 # If we're too backlogged, issue a warning and block until we 58 # aren't. We don't bother with the lock here since this can be 59 # slightly inexact. In general we should aim to not hit this 60 # limit but it is good to have backpressure to avoid runaway 61 # queues in cases of network outages/etc. 62 if self.no_wait_count > self._max_no_wait_count: 63 now = time.monotonic() 64 if ( 65 self._last_no_wait_warn_time is None 66 or now - self._last_no_wait_warn_time > 10.0 67 ): 68 logger.warning( 69 'ThreadPoolExecutorPlus hit max no-wait limit of %s;' 70 ' blocking.', 71 self._max_no_wait_count, 72 ) 73 self._last_no_wait_warn_time = now 74 while self.no_wait_count > self._max_no_wait_count: 75 time.sleep(0.01) 76 77 fut = self.submit(call, *args, **keywds) 78 with self._no_wait_count_lock: 79 self.no_wait_count += 1 80 fut.add_done_callback(self._no_wait_done) 81 82 def _no_wait_done(self, fut: Future) -> None: 83 with self._no_wait_count_lock: 84 self.no_wait_count -= 1 85 try: 86 fut.result() 87 except Exception: 88 logger.exception('Error in work submitted via submit_no_wait().')
A ThreadPoolExecutor with additional functionality added.
ThreadPoolExecutorPlus( max_workers: int | None = None, thread_name_prefix: str = '', initializer: Optional[Callable[[], NoneType]] = None, max_no_wait_count: int | None = None)
26 def __init__( 27 self, 28 max_workers: int | None = None, 29 thread_name_prefix: str = '', 30 initializer: Callable[[], None] | None = None, 31 max_no_wait_count: int | None = None, 32 ) -> None: 33 super().__init__( 34 max_workers=max_workers, 35 thread_name_prefix=thread_name_prefix, 36 initializer=initializer, 37 ) 38 self.no_wait_count = 0 39 40 self._max_no_wait_count = ( 41 max_no_wait_count 42 if max_no_wait_count is not None 43 else 50 if max_workers is None else max_workers * 4 44 ) 45 self._last_no_wait_warn_time: float | None = None 46 self._no_wait_count_lock = threading.Lock()
Initializes a new ThreadPoolExecutor instance.
Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer.
def
submit_no_wait(self, call: Callable[~P, Any], *args: P.args, **keywds: P.kwargs) -> None:
48 def submit_no_wait( 49 self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs 50 ) -> None: 51 """Submit work to the threadpool with no expectation of waiting. 52 53 Any errors occurring in the passed callable will be logged. This 54 call will block and log a warning if the threadpool reaches its 55 max queued no-wait call count. 56 """ 57 # If we're too backlogged, issue a warning and block until we 58 # aren't. We don't bother with the lock here since this can be 59 # slightly inexact. In general we should aim to not hit this 60 # limit but it is good to have backpressure to avoid runaway 61 # queues in cases of network outages/etc. 62 if self.no_wait_count > self._max_no_wait_count: 63 now = time.monotonic() 64 if ( 65 self._last_no_wait_warn_time is None 66 or now - self._last_no_wait_warn_time > 10.0 67 ): 68 logger.warning( 69 'ThreadPoolExecutorPlus hit max no-wait limit of %s;' 70 ' blocking.', 71 self._max_no_wait_count, 72 ) 73 self._last_no_wait_warn_time = now 74 while self.no_wait_count > self._max_no_wait_count: 75 time.sleep(0.01) 76 77 fut = self.submit(call, *args, **keywds) 78 with self._no_wait_count_lock: 79 self.no_wait_count += 1 80 fut.add_done_callback(self._no_wait_done)
Submit work to the threadpool with no expectation of waiting.
Any errors occurring in the passed callable will be logged. This call will block and log a warning if the threadpool reaches its max queued no-wait call count.