jenkins-bot submitted this change.

View Change

Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
IMPR: Add BoundedPoolExecutor

- add BoundedPoolExecutor to tools.threading which inherits from every
concurrent.futures.Executor subclass but uses BoundedSemaphore
to limit the items added to worker in submit() function.
- test BoundedPoolExecutor
- update all scripts using ThreadPoolExecutor and use the new
BoundedThreadPoolExecutor instead.

Bug: T333741
Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e
---
M pywikibot/scripts/login.py
M pywikibot/tools/threading.py
M scripts/archivebot.py
M scripts/fixing_redirects.py
M scripts/watchlist.py
M tests/tools_threading_tests.py
6 files changed, 232 insertions(+), 15 deletions(-)

diff --git a/pywikibot/scripts/login.py b/pywikibot/scripts/login.py
index 3cc1bf8..c1af7f6 100755
--- a/pywikibot/scripts/login.py
+++ b/pywikibot/scripts/login.py
@@ -51,13 +51,13 @@
from __future__ import annotations

import datetime
-from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext, suppress

import pywikibot
from pywikibot import config
from pywikibot.exceptions import NoUsernameError, SiteDefinitionError
from pywikibot.login import OauthLoginManager
+from pywikibot.tools.threading import BoundedPoolExecutor


def _get_consumer_token(site) -> tuple[str, str]:
@@ -157,8 +157,9 @@
namedict = {site.family.name: {site.code: None}}

params = oauth, logout, autocreate
- context = ThreadPoolExecutor if asynchronous else nullcontext
- with context() as executor:
+ context = (nullcontext(),
+ BoundedPoolExecutor('ThreadPoolExecutor'))[asynchronous]
+ with context as executor:
for family_name in namedict:
for lang in namedict[family_name]:
if asynchronous:
diff --git a/pywikibot/tools/threading.py b/pywikibot/tools/threading.py
index 7539561..b93b6d9 100644
--- a/pywikibot/tools/threading.py
+++ b/pywikibot/tools/threading.py
@@ -1,20 +1,25 @@
"""Classes which can be used for threading."""
#
-# (C) Pywikibot team, 2008-2022
+# (C) Pywikibot team, 2008-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations

+import concurrent.futures as futures
+import importlib
import queue
import re
import threading
import time
+from typing import Any

import pywikibot # T306760
+from pywikibot.tools import SPHINX_RUNNING


__all__ = (
+ 'BoundedPoolExecutor',
'RLock',
'ThreadedGenerator',
'ThreadList',
@@ -189,6 +194,8 @@
... pool.append(threading.Thread(target=work))
...

+ .. seealso:: :class:`BoundedPoolExecutor`
+
"""

def __init__(self, limit: int = 128, wait_time: float = 2, *args) -> None:
@@ -225,3 +232,121 @@
super().append(thd)
thd.start()
pywikibot.logging.debug(f"thread {len(self)} ('{type(thd)}') started")
+
+
+class BoundedPoolExecutor(futures.Executor):
+
+ """A bounded Executor which limits prefetched Futures.
+
+ BoundedThreadPoolExecutor behaves like other executors derived from
+ :pylib:`concurrent.futures.Executor
+ <concurrent.futures.html#concurrent.futures.Executor>` but will
+ block further items on :meth:`submit` calls to be added to workers
+ queue if the *max_bound* limit is reached.
+
+ .. versionadded:: 10.0
+
+ .. seealso::
+ - :pylib:`concurrent.futures.html#executor-objects`
+ - :class:`ThreadList`
+
+ :param executor: One of the executors found in ``concurrent.futures``.
+ The parameter may be given as class type or its name.
+ :param max_bound: the maximum number of items in the workers queue.
+ If not given or None, the number is set to *max_workers*.
+ :param args: Any positional argument for the given *executor*
+ :param kwargs: Any keyword argument for the given *executor*
+ :raises AttributeError: given *executor* is not found in
+ concurrent.futures.
+ :raises TypeError: given *executor* is not a class or not a real
+ subclass of concurrent.futures.Executor.
+ :raises ValueError: minimum *max_bound* is 1.
+ """
+
+ def __new__(
+ cls,
+ executor: futures.Executor | str,
+ /,
+ max_bound: int | None = None,
+ *args: Any,
+ **kwargs: Any
+ ) -> BoundedPoolExecutor:
+ """Create a new BoundedPoolExecutor subclass.
+
+ The class inherits from :class:`BoundedPoolExecutor` and the
+ given *executor*. The class name is composed of "Bounded" and
+ the name of the *executor*.
+ """
+ module = 'concurrent.futures'
+ if isinstance(executor, str):
+ base = getattr(
+ importlib.import_module(module), executor)
+ else:
+ base = executor
+
+ if base is futures.Executor or not issubclass(base, futures.Executor):
+ raise TypeError(
+ f'expected a real subclass of {module + ".Executor"!r} or the '
+ f'class name for executor parameter, not {base.__name__!r}'
+ )
+ new = type('Bounded' + base.__name__, (cls, base), {})
+ return super().__new__(new)
+
+ def __init__(self, executor, /, max_bound=None, *args, **kwargs) -> None:
+ """Initializer."""
+ if max_bound is not None and max_bound < 1:
+ raise ValueError("Minimum 'max_bound' is 1")
+
+ super().__init__(*args, **kwargs)
+ self._bound_semaphore = threading.BoundedSemaphore(
+ max_bound or self._max_workers)
+
+ def submit(self, fn, /, *args, **kwargs) -> futures.Future:
+ """Schedules callable *fn* to be executed as ``fn(*args, **kwargs)``.
+
+ .. code-block:: python
+
+ with BoundedPoolExecutor('ThreadPoolExecutor',
+ max_bound=5,
+ max_workers=1) as executor:
+ future = executor.submit(pow, 323, 1235)
+ print(future.result())
+
+ """
+ self._bound_semaphore.acquire()
+
+ try:
+ f = super().submit(fn, *args, **kwargs)
+ except futures.BrokenExecutor:
+ self._bound_semaphore.release()
+ raise
+
+ f.add_done_callback(lambda _f: self._bound_semaphore.release())
+ return f
+
+ if not SPHINX_RUNNING:
+ submit.__doc__ = futures.Executor.submit.__doc__
+
+ def _bound(self, sep: str = '') -> str:
+ """Helper method for str and repr."""
+ if not hasattr(self, '_bound_semaphore'):
+ # class is not fully initialized
+ return ''
+
+ bound = self._bound_semaphore._initial_value
+ return '' if bound == self._max_workers else f'{sep}{bound}'
+
+ def __str__(self):
+ """String of current BoundedPoolExecutor type.
+
+ Includes *max_bound* if necessary.
+ """
+ return f'{type(self).__name__}({self._bound()})'
+
+ def __repr__(self):
+ """Representation string of BoundedPoolExecutor.
+
+ Includes the *executor* and *max_bound* if necessary.
+ """
+ base, executor = type(self).__bases__
+ return f'{base.__name__}({executor.__name__!r}{self._bound(", ")})'
diff --git a/scripts/archivebot.py b/scripts/archivebot.py
index ac8b0fc..ccabe1f 100755
--- a/scripts/archivebot.py
+++ b/scripts/archivebot.py
@@ -164,7 +164,6 @@
import threading
import time
from collections import OrderedDict, defaultdict
-from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from hashlib import md5
from math import ceil
@@ -185,6 +184,7 @@
)
from pywikibot.time import MW_KEYS, parse_duration, str2timedelta
from pywikibot.tools import PYTHON_VERSION
+from pywikibot.tools.threading import BoundedPoolExecutor


class ArchiveBotSiteConfigError(Error):
@@ -970,9 +970,9 @@

if asynchronous:
signal.signal(signal.SIGINT, signal_handler)
- context = ThreadPoolExecutor
+ context = BoundedPoolExecutor('ThreadPoolExecutor')
else:
- context = nullcontext
+ context = nullcontext()

for template_name in templates:
tmpl = pywikibot.Page(site, template_name, ns=10)
@@ -992,7 +992,7 @@

botargs = tmpl, salt, force, keep, sort
futures = [] # needed for Python < 3.9
- with context() as executor:
+ with context as executor:
for pg in gen:
if asynchronous:
future = executor.submit(process_page, pg, *botargs)
diff --git a/scripts/fixing_redirects.py b/scripts/fixing_redirects.py
index d327621..dba56d8 100755
--- a/scripts/fixing_redirects.py
+++ b/scripts/fixing_redirects.py
@@ -23,7 +23,7 @@
from __future__ import annotations

import re
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import as_completed
from contextlib import suppress

import pywikibot
@@ -45,6 +45,7 @@
from pywikibot.textlib import isDisabled
from pywikibot.tools import first_lower
from pywikibot.tools import first_upper as firstcap
+from pywikibot.tools.threading import BoundedPoolExecutor


# This is required for the text that is shown when you run this script
@@ -187,7 +188,7 @@
pywikibot.error(e)
return

- with ThreadPoolExecutor() as executor:
+ with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
futures = {executor.submit(self.get_target, p)
for p in self.current_page.linkedPages()}
for future in as_completed(futures):
diff --git a/scripts/watchlist.py b/scripts/watchlist.py
index d5a2e1c..c6d1a61 100755
--- a/scripts/watchlist.py
+++ b/scripts/watchlist.py
@@ -33,12 +33,13 @@

import datetime
import os
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import as_completed

import pywikibot
from pywikibot import config
from pywikibot.data.api import CachedRequest
from pywikibot.exceptions import InvalidTitleError
+from pywikibot.tools.threading import BoundedPoolExecutor


try:
@@ -67,7 +68,7 @@
if not quiet:
pywikibot.info('Counting pages in watchlists of all wikis...')

- with ThreadPoolExecutor() as executor:
+ with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
futures = {executor.submit(refresh, pywikibot.Site(lang, family))
for family in config.usernames
for lang in config.usernames[family]}
@@ -95,7 +96,7 @@
cache_path = CachedRequest._get_cache_dir()
files = os.scandir(cache_path)
seen = set()
- with ThreadPoolExecutor() as executor:
+ with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
for filename in files:
entry = CacheEntry(cache_path, filename)
entry._load_cache()
diff --git a/tests/tools_threading_tests.py b/tests/tools_threading_tests.py
index 4808895..d14981e 100755
--- a/tests/tools_threading_tests.py
+++ b/tests/tools_threading_tests.py
@@ -1,16 +1,24 @@
#!/usr/bin/env python3
"""Tests for threading tools."""
#
-# (C) Pywikibot team, 2014-2022
+# (C) Pywikibot team, 2014-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations

+import time
import unittest
+from concurrent.futures import (
+ Executor,
+ Future,
+ ProcessPoolExecutor,
+ ThreadPoolExecutor,
+)
from contextlib import suppress

-from pywikibot.tools.threading import ThreadedGenerator
+from pywikibot.tools import PYTHON_VERSION
+from pywikibot.tools.threading import BoundedPoolExecutor, ThreadedGenerator
from tests.aspects import TestCase


@@ -41,6 +49,87 @@
self.assertEqual(list(thd_gen), list(iterable))


+class BoundedThreadPoolTests(TestCase):
+
+ """BoundedThreadPool test cases."""
+
+ net = False
+
+ def test_strings(self):
+ """Test string and repr methods for executor strings."""
+ executors = ['ThreadPoolExecutor', 'ProcessPoolExecutor']
+ if PYTHON_VERSION >= (3, 14):
+ executors.append('InterpreterPoolExecutor')
+
+ for executor in executors:
+ with self.subTest(executor=executor):
+ pool = BoundedPoolExecutor(executor)
+ self.assertEqual(str(pool), f'Bounded{executor}()')
+ self.assertEqual(repr(pool),
+ f'BoundedPoolExecutor({executor!r})')
+ self.assertEqual(pool._bound_semaphore._initial_value,
+ pool._max_workers)
+
+ def test_class(self):
+ """Test string and repr methods for a executor class."""
+ executors = [ThreadPoolExecutor, ProcessPoolExecutor]
+ if PYTHON_VERSION >= (3, 14):
+ from concurrent.futures import InterpreterPoolExecutor
+ executors.append(InterpreterPoolExecutor)
+
+ for executor in executors:
+ with self.subTest(executor=executor):
+ pool = BoundedPoolExecutor(executor)
+ self.assertEqual(str(pool), f'Bounded{executor.__name__}()')
+ self.assertEqual(repr(pool),
+ f'BoundedPoolExecutor({executor.__name__!r})')
+ self.assertEqual(pool._bound_semaphore._initial_value,
+ pool._max_workers)
+
+ def test_run(self):
+ """Test examples for Executor during run."""
+ for bound in (2, 5, 7):
+ futures = []
+ with self.subTest(bound=bound), \
+ BoundedPoolExecutor('ThreadPoolExecutor',
+ max_bound=bound,
+ max_workers=5) as pool:
+ for _ in range(10):
+ future = pool.submit(time.sleep, 1)
+ self.assertIsInstance(future, Future)
+ futures.append(future)
+
+ self.assertLength(futures, 10)
+ for future in futures:
+ self.assertTrue(future.done())
+ self.assertIsNone(future.result())
+
+ def test_exceptions(self):
+ """Test exceptions when creating a bounded executor."""
+ with self.assertRaisesRegex(TypeError,
+ r'issubclass\(\) arg 1 must be a class'):
+ BoundedPoolExecutor(PYTHON_VERSION)
+ with self.assertRaisesRegex(TypeError,
+ 'expected a real subclass of '
+ r"'concurrent\.futures\.Executor'"):
+ BoundedPoolExecutor(TestCase)
+ with self.assertRaisesRegex(TypeError,
+ 'expected a real subclass of '
+ r"'concurrent\.futures\.Executor'"):
+ BoundedPoolExecutor(Future)
+ with self.assertRaisesRegex(TypeError,
+ 'expected a real subclass of '
+ r"'concurrent\.futures\.Executor'"):
+ BoundedPoolExecutor(Executor)
+ with self.assertRaisesRegex(
+ TypeError, "duplicate base class '?BoundedPoolExecutor'?"):
+ BoundedPoolExecutor(BoundedPoolExecutor)
+ with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"):
+ BoundedPoolExecutor('ThreadPoolExecutor', 0)
+ with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"):
+ BoundedPoolExecutor('ThreadPoolExecutor', max_bound=0)
+
+
if __name__ == '__main__':
with suppress(SystemExit):
unittest.main()

To view, visit change 1105000. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e
Gerrit-Change-Number: 1105000
Gerrit-PatchSet: 6
Gerrit-Owner: Xqt <info@gno.de>
Gerrit-Reviewer: Xqt <info@gno.de>
Gerrit-Reviewer: jenkins-bot