jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1105000?usp=email )
Change subject: IMPR: Add BoundedPoolExecutor ......................................................................
IMPR: Add BoundedPoolExecutor
- add BoundedPoolExecutor to tools.threading which inherits from every concurrent.futures.Executor subclass but uses BoundedSemaphore to limit the items added to worker in submit() function. - test BoundedPoolExecutor - update all scripts using ThreadPoolExecutor and use the new BoundedThreadPoolExecutor instead.
Bug: T333741 Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e --- M pywikibot/scripts/login.py M pywikibot/tools/threading.py M scripts/archivebot.py M scripts/fixing_redirects.py M scripts/watchlist.py M tests/tools_threading_tests.py 6 files changed, 232 insertions(+), 15 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/scripts/login.py b/pywikibot/scripts/login.py index 3cc1bf8..c1af7f6 100755 --- a/pywikibot/scripts/login.py +++ b/pywikibot/scripts/login.py @@ -51,13 +51,13 @@ from __future__ import annotations
import datetime -from concurrent.futures import ThreadPoolExecutor from contextlib import nullcontext, suppress
import pywikibot from pywikibot import config from pywikibot.exceptions import NoUsernameError, SiteDefinitionError from pywikibot.login import OauthLoginManager +from pywikibot.tools.threading import BoundedPoolExecutor
def _get_consumer_token(site) -> tuple[str, str]: @@ -157,8 +157,9 @@ namedict = {site.family.name: {site.code: None}}
params = oauth, logout, autocreate - context = ThreadPoolExecutor if asynchronous else nullcontext - with context() as executor: + context = (nullcontext(), + BoundedPoolExecutor('ThreadPoolExecutor'))[asynchronous] + with context as executor: for family_name in namedict: for lang in namedict[family_name]: if asynchronous: diff --git a/pywikibot/tools/threading.py b/pywikibot/tools/threading.py index 7539561..b93b6d9 100644 --- a/pywikibot/tools/threading.py +++ b/pywikibot/tools/threading.py @@ -1,20 +1,25 @@ """Classes which can be used for threading.""" # -# (C) Pywikibot team, 2008-2022 +# (C) Pywikibot team, 2008-2024 # # Distributed under the terms of the MIT license. # from __future__ import annotations
+import concurrent.futures as futures +import importlib import queue import re import threading import time +from typing import Any
import pywikibot # T306760 +from pywikibot.tools import SPHINX_RUNNING
__all__ = ( + 'BoundedPoolExecutor', 'RLock', 'ThreadedGenerator', 'ThreadList', @@ -189,6 +194,8 @@ ... pool.append(threading.Thread(target=work)) ...
+ .. seealso:: :class:`BoundedPoolExecutor` + """
def __init__(self, limit: int = 128, wait_time: float = 2, *args) -> None: @@ -225,3 +232,121 @@ super().append(thd) thd.start() pywikibot.logging.debug(f"thread {len(self)} ('{type(thd)}') started") + + +class BoundedPoolExecutor(futures.Executor): + + """A bounded Executor which limits prefetched Futures. + + BoundedThreadPoolExecutor behaves like other executors derived from + :pylib:`concurrent.futures.Executor + <concurrent.futures.html#concurrent.futures.Executor>` but will + block further items on :meth:`submit` calls to be added to workers + queue if the *max_bound* limit is reached. + + .. versionadded:: 10.0 + + .. seealso:: + - :pylib:`concurrent.futures.html#executor-objects` + - :class:`ThreadList` + + :param executor: One of the executors found in ``concurrent.futures``. + The parameter may be given as class type or its name. + :param max_bound: the maximum number of items in the workers queue. + If not given or None, the number is set to *max_workers*. + :param args: Any positional argument for the given *executor* + :param kwargs: Any keyword argument for the given *executor* + :raises AttributeError: given *executor* is not found in + concurrent.futures. + :raises TypeError: given *executor* is not a class or not a real + subclass of concurrent.futures.Executor. + :raises ValueError: minimum *max_bound* is 1. + """ + + def __new__( + cls, + executor: futures.Executor | str, + /, + max_bound: int | None = None, + *args: Any, + **kwargs: Any + ) -> BoundedPoolExecutor: + """Create a new BoundedPoolExecutor subclass. + + The class inherits from :class:`BoundedPoolExecutor` and the + given *executor*. The class name is composed of "Bounded" and + the name of the *executor*. + """ + module = 'concurrent.futures' + if isinstance(executor, str): + base = getattr( + importlib.import_module(module), executor) + else: + base = executor + + if base is futures.Executor or not issubclass(base, futures.Executor): + raise TypeError( + f'expected a real subclass of {module + ".Executor"!r} or the ' + f'class name for executor parameter, not {base.__name__!r}' + ) + new = type('Bounded' + base.__name__, (cls, base), {}) + return super().__new__(new) + + def __init__(self, executor, /, max_bound=None, *args, **kwargs) -> None: + """Initializer.""" + if max_bound is not None and max_bound < 1: + raise ValueError("Minimum 'max_bound' is 1") + + super().__init__(*args, **kwargs) + self._bound_semaphore = threading.BoundedSemaphore( + max_bound or self._max_workers) + + def submit(self, fn, /, *args, **kwargs) -> futures.Future: + """Schedules callable *fn* to be executed as ``fn(*args, **kwargs)``. + + .. code-block:: python + + with BoundedPoolExecutor('ThreadPoolExecutor', + max_bound=5, + max_workers=1) as executor: + future = executor.submit(pow, 323, 1235) + print(future.result()) + + """ + self._bound_semaphore.acquire() + + try: + f = super().submit(fn, *args, **kwargs) + except futures.BrokenExecutor: + self._bound_semaphore.release() + raise + + f.add_done_callback(lambda _f: self._bound_semaphore.release()) + return f + + if not SPHINX_RUNNING: + submit.__doc__ = futures.Executor.submit.__doc__ + + def _bound(self, sep: str = '') -> str: + """Helper method for str and repr.""" + if not hasattr(self, '_bound_semaphore'): + # class is not fully initialized + return '' + + bound = self._bound_semaphore._initial_value + return '' if bound == self._max_workers else f'{sep}{bound}' + + def __str__(self): + """String of current BoundedPoolExecutor type. + + Includes *max_bound* if necessary. + """ + return f'{type(self).__name__}({self._bound()})' + + def __repr__(self): + """Representation string of BoundedPoolExecutor. + + Includes the *executor* and *max_bound* if necessary. + """ + base, executor = type(self).__bases__ + return f'{base.__name__}({executor.__name__!r}{self._bound(", ")})' diff --git a/scripts/archivebot.py b/scripts/archivebot.py index ac8b0fc..ccabe1f 100755 --- a/scripts/archivebot.py +++ b/scripts/archivebot.py @@ -164,7 +164,6 @@ import threading import time from collections import OrderedDict, defaultdict -from concurrent.futures import ThreadPoolExecutor from contextlib import nullcontext from hashlib import md5 from math import ceil @@ -185,6 +184,7 @@ ) from pywikibot.time import MW_KEYS, parse_duration, str2timedelta from pywikibot.tools import PYTHON_VERSION +from pywikibot.tools.threading import BoundedPoolExecutor
class ArchiveBotSiteConfigError(Error): @@ -970,9 +970,9 @@
if asynchronous: signal.signal(signal.SIGINT, signal_handler) - context = ThreadPoolExecutor + context = BoundedPoolExecutor('ThreadPoolExecutor') else: - context = nullcontext + context = nullcontext()
for template_name in templates: tmpl = pywikibot.Page(site, template_name, ns=10) @@ -992,7 +992,7 @@
botargs = tmpl, salt, force, keep, sort futures = [] # needed for Python < 3.9 - with context() as executor: + with context as executor: for pg in gen: if asynchronous: future = executor.submit(process_page, pg, *botargs) diff --git a/scripts/fixing_redirects.py b/scripts/fixing_redirects.py index d327621..dba56d8 100755 --- a/scripts/fixing_redirects.py +++ b/scripts/fixing_redirects.py @@ -23,7 +23,7 @@ from __future__ import annotations
import re -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import as_completed from contextlib import suppress
import pywikibot @@ -45,6 +45,7 @@ from pywikibot.textlib import isDisabled from pywikibot.tools import first_lower from pywikibot.tools import first_upper as firstcap +from pywikibot.tools.threading import BoundedPoolExecutor
# This is required for the text that is shown when you run this script @@ -187,7 +188,7 @@ pywikibot.error(e) return
- with ThreadPoolExecutor() as executor: + with BoundedPoolExecutor('ThreadPoolExecutor') as executor: futures = {executor.submit(self.get_target, p) for p in self.current_page.linkedPages()} for future in as_completed(futures): diff --git a/scripts/watchlist.py b/scripts/watchlist.py index d5a2e1c..c6d1a61 100755 --- a/scripts/watchlist.py +++ b/scripts/watchlist.py @@ -33,12 +33,13 @@
import datetime import os -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import as_completed
import pywikibot from pywikibot import config from pywikibot.data.api import CachedRequest from pywikibot.exceptions import InvalidTitleError +from pywikibot.tools.threading import BoundedPoolExecutor
try: @@ -67,7 +68,7 @@ if not quiet: pywikibot.info('Counting pages in watchlists of all wikis...')
- with ThreadPoolExecutor() as executor: + with BoundedPoolExecutor('ThreadPoolExecutor') as executor: futures = {executor.submit(refresh, pywikibot.Site(lang, family)) for family in config.usernames for lang in config.usernames[family]} @@ -95,7 +96,7 @@ cache_path = CachedRequest._get_cache_dir() files = os.scandir(cache_path) seen = set() - with ThreadPoolExecutor() as executor: + with BoundedPoolExecutor('ThreadPoolExecutor') as executor: for filename in files: entry = CacheEntry(cache_path, filename) entry._load_cache() diff --git a/tests/tools_threading_tests.py b/tests/tools_threading_tests.py index 4808895..d14981e 100755 --- a/tests/tools_threading_tests.py +++ b/tests/tools_threading_tests.py @@ -1,16 +1,24 @@ #!/usr/bin/env python3 """Tests for threading tools.""" # -# (C) Pywikibot team, 2014-2022 +# (C) Pywikibot team, 2014-2024 # # Distributed under the terms of the MIT license. # from __future__ import annotations
+import time import unittest +from concurrent.futures import ( + Executor, + Future, + ProcessPoolExecutor, + ThreadPoolExecutor, +) from contextlib import suppress
-from pywikibot.tools.threading import ThreadedGenerator +from pywikibot.tools import PYTHON_VERSION +from pywikibot.tools.threading import BoundedPoolExecutor, ThreadedGenerator from tests.aspects import TestCase
@@ -41,6 +49,87 @@ self.assertEqual(list(thd_gen), list(iterable))
+class BoundedThreadPoolTests(TestCase): + + """BoundedThreadPool test cases.""" + + net = False + + def test_strings(self): + """Test string and repr methods for executor strings.""" + executors = ['ThreadPoolExecutor', 'ProcessPoolExecutor'] + if PYTHON_VERSION >= (3, 14): + executors.append('InterpreterPoolExecutor') + + for executor in executors: + with self.subTest(executor=executor): + pool = BoundedPoolExecutor(executor) + self.assertEqual(str(pool), f'Bounded{executor}()') + self.assertEqual(repr(pool), + f'BoundedPoolExecutor({executor!r})') + self.assertEqual(pool._bound_semaphore._initial_value, + pool._max_workers) + + def test_class(self): + """Test string and repr methods for a executor class.""" + executors = [ThreadPoolExecutor, ProcessPoolExecutor] + if PYTHON_VERSION >= (3, 14): + from concurrent.futures import InterpreterPoolExecutor + executors.append(InterpreterPoolExecutor) + + for executor in executors: + with self.subTest(executor=executor): + pool = BoundedPoolExecutor(executor) + self.assertEqual(str(pool), f'Bounded{executor.__name__}()') + self.assertEqual(repr(pool), + f'BoundedPoolExecutor({executor.__name__!r})') + self.assertEqual(pool._bound_semaphore._initial_value, + pool._max_workers) + + def test_run(self): + """Test examples for Executor during run.""" + for bound in (2, 5, 7): + futures = [] + with self.subTest(bound=bound), \ + BoundedPoolExecutor('ThreadPoolExecutor', + max_bound=bound, + max_workers=5) as pool: + for _ in range(10): + future = pool.submit(time.sleep, 1) + self.assertIsInstance(future, Future) + futures.append(future) + + self.assertLength(futures, 10) + for future in futures: + self.assertTrue(future.done()) + self.assertIsNone(future.result()) + + def test_exceptions(self): + """Test exceptions when creating a bounded executor.""" + with self.assertRaisesRegex(TypeError, + r'issubclass() arg 1 must be a class'): + BoundedPoolExecutor(PYTHON_VERSION) + with self.assertRaisesRegex(TypeError, + 'expected a real subclass of ' + r"'concurrent.futures.Executor'"): + BoundedPoolExecutor(TestCase) + with self.assertRaisesRegex(TypeError, + 'expected a real subclass of ' + r"'concurrent.futures.Executor'"): + BoundedPoolExecutor(Future) + with self.assertRaisesRegex(TypeError, + 'expected a real subclass of ' + r"'concurrent.futures.Executor'"): + BoundedPoolExecutor(Executor) + with self.assertRaisesRegex( + TypeError, "duplicate base class '?BoundedPoolExecutor'?"): + BoundedPoolExecutor(BoundedPoolExecutor) + with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"): + BoundedPoolExecutor('ThreadPoolExecutor', 0) + with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"): + BoundedPoolExecutor('ThreadPoolExecutor', max_bound=0) + + if __name__ == '__main__': with suppress(SystemExit): unittest.main()