jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1106560?usp=email )
Change subject: cleanup: remove unintentional *args in ThreadList
......................................................................
cleanup: remove unintentional *args in ThreadList
threading.ThreadList has *args parameter by which Threads could be
added to the ThreadList. But they were never started and also never
completed. Thus they only clogged up the list. Therefore remove it.
Also use dataclass for parameters and update documentation.
Bug: T382787
Change-Id: I3fbb73a5288dedddc026ae77e3463d04938dda0c
---
M pywikibot/tools/threading.py
1 file changed, 30 insertions(+), 26 deletions(-)
Approvals:
Xqt: Looks good to me, approved
jenkins-bot: Verified
diff --git a/pywikibot/tools/threading.py b/pywikibot/tools/threading.py
index b93b6d9..8ef6c11 100644
--- a/pywikibot/tools/threading.py
+++ b/pywikibot/tools/threading.py
@@ -7,6 +7,7 @@
from __future__ import annotations
import concurrent.futures as futures
+import dataclasses
import importlib
import queue
import re
@@ -177,41 +178,41 @@
self.stop()
+(a)dataclasses.dataclass(repr=False, eq=False)
class ThreadList(list):
"""A simple threadpool class to limit the number of simultaneous threads.
- Any threading.Thread object can be added to the pool using the append()
- method. If the maximum number of simultaneous threads has not been reached,
- the Thread object will be started immediately; if not, the append() call
- will block until the thread is able to start.
+ Any threading.Thread object can be added to the pool using the
+ :meth:`append` method. If the maximum number of simultaneous threads
+ has not been reached, the Thread object will be started immediately;
+ if not, the append() call will block until the thread is able to
+ start.
- >>> pool = ThreadList(limit=10)
- >>> def work():
- ... time.sleep(1)
- ...
- >>> for x in range(20):
- ... pool.append(threading.Thread(target=work))
- ...
+ Example:
+
+ .. code-block:: python
+
+ pool = ThreadList(limit=10)
+ def work():
+ time.sleep(1)
+
+ for x in range(20):
+ pool.append(threading.Thread(target=work))
+
+ .. versionchanged:: 10.0
+ the unintentional and undocumented *args* parameter was removed.
.. seealso:: :class:`BoundedPoolExecutor`
+ :param limit: the number of simultaneous threads
+ :param wait_time: how long to wait if active threads exceeds limit
"""
- def __init__(self, limit: int = 128, wait_time: float = 2, *args) -> None:
- """Initializer.
+ limit: int = 128 #: :meta private:
+ wait_time: float = 2.0 #: :meta private:
- :param limit: the number of simultaneous threads
- :param wait_time: how long to wait if active threads exceeds limit
- """
- self.limit = limit
- self.wait_time = wait_time
- super().__init__(*args)
- for item in self:
- if not isinstance(item, threading.Thread):
- raise TypeError(f"Cannot add '{type(item)}' to ThreadList")
-
- def active_count(self):
+ def active_count(self) -> int:
"""Return the number of alive threads and delete all non-alive ones."""
cnt = 0
for item in self[:]:
@@ -221,8 +222,11 @@
self.remove(item)
return cnt
- def append(self, thd):
- """Add a thread to the pool and start it."""
+ def append(self, thd: threading.Thread) -> None:
+ """Add a thread to the pool and start it.
+
+ :param thd: the Thread to be appended to the ThreadList.
+ """
if not isinstance(thd, threading.Thread):
raise TypeError(f"Cannot append '{type(thd)}' to ThreadList")
--
To view, visit https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1106560?usp=email
To unsubscribe, or for help writing mail filters, visit https://gerrit.wikimedia.org/r/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I3fbb73a5288dedddc026ae77e3463d04938dda0c
Gerrit-Change-Number: 1106560
Gerrit-PatchSet: 11
Gerrit-Owner: Xqt <info(a)gno.de>
Gerrit-Reviewer: Xqt <info(a)gno.de>
Gerrit-Reviewer: jenkins-bot
jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1105000?usp=email )
Change subject: IMPR: Add BoundedPoolExecutor
......................................................................
IMPR: Add BoundedPoolExecutor
- add BoundedPoolExecutor to tools.threading which inherits from every
concurrent.futures.Executor subclass but uses BoundedSemaphore
to limit the items added to worker in submit() function.
- test BoundedPoolExecutor
- update all scripts using ThreadPoolExecutor and use the new
BoundedThreadPoolExecutor instead.
Bug: T333741
Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e
---
M pywikibot/scripts/login.py
M pywikibot/tools/threading.py
M scripts/archivebot.py
M scripts/fixing_redirects.py
M scripts/watchlist.py
M tests/tools_threading_tests.py
6 files changed, 232 insertions(+), 15 deletions(-)
Approvals:
Xqt: Looks good to me, approved
jenkins-bot: Verified
diff --git a/pywikibot/scripts/login.py b/pywikibot/scripts/login.py
index 3cc1bf8..c1af7f6 100755
--- a/pywikibot/scripts/login.py
+++ b/pywikibot/scripts/login.py
@@ -51,13 +51,13 @@
from __future__ import annotations
import datetime
-from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext, suppress
import pywikibot
from pywikibot import config
from pywikibot.exceptions import NoUsernameError, SiteDefinitionError
from pywikibot.login import OauthLoginManager
+from pywikibot.tools.threading import BoundedPoolExecutor
def _get_consumer_token(site) -> tuple[str, str]:
@@ -157,8 +157,9 @@
namedict = {site.family.name: {site.code: None}}
params = oauth, logout, autocreate
- context = ThreadPoolExecutor if asynchronous else nullcontext
- with context() as executor:
+ context = (nullcontext(),
+ BoundedPoolExecutor('ThreadPoolExecutor'))[asynchronous]
+ with context as executor:
for family_name in namedict:
for lang in namedict[family_name]:
if asynchronous:
diff --git a/pywikibot/tools/threading.py b/pywikibot/tools/threading.py
index 7539561..b93b6d9 100644
--- a/pywikibot/tools/threading.py
+++ b/pywikibot/tools/threading.py
@@ -1,20 +1,25 @@
"""Classes which can be used for threading."""
#
-# (C) Pywikibot team, 2008-2022
+# (C) Pywikibot team, 2008-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations
+import concurrent.futures as futures
+import importlib
import queue
import re
import threading
import time
+from typing import Any
import pywikibot # T306760
+from pywikibot.tools import SPHINX_RUNNING
__all__ = (
+ 'BoundedPoolExecutor',
'RLock',
'ThreadedGenerator',
'ThreadList',
@@ -189,6 +194,8 @@
... pool.append(threading.Thread(target=work))
...
+ .. seealso:: :class:`BoundedPoolExecutor`
+
"""
def __init__(self, limit: int = 128, wait_time: float = 2, *args) -> None:
@@ -225,3 +232,121 @@
super().append(thd)
thd.start()
pywikibot.logging.debug(f"thread {len(self)} ('{type(thd)}') started")
+
+
+class BoundedPoolExecutor(futures.Executor):
+
+ """A bounded Executor which limits prefetched Futures.
+
+ BoundedThreadPoolExecutor behaves like other executors derived from
+ :pylib:`concurrent.futures.Executor
+ <concurrent.futures.html#concurrent.futures.Executor>` but will
+ block further items on :meth:`submit` calls to be added to workers
+ queue if the *max_bound* limit is reached.
+
+ .. versionadded:: 10.0
+
+ .. seealso::
+ - :pylib:`concurrent.futures.html#executor-objects`
+ - :class:`ThreadList`
+
+ :param executor: One of the executors found in ``concurrent.futures``.
+ The parameter may be given as class type or its name.
+ :param max_bound: the maximum number of items in the workers queue.
+ If not given or None, the number is set to *max_workers*.
+ :param args: Any positional argument for the given *executor*
+ :param kwargs: Any keyword argument for the given *executor*
+ :raises AttributeError: given *executor* is not found in
+ concurrent.futures.
+ :raises TypeError: given *executor* is not a class or not a real
+ subclass of concurrent.futures.Executor.
+ :raises ValueError: minimum *max_bound* is 1.
+ """
+
+ def __new__(
+ cls,
+ executor: futures.Executor | str,
+ /,
+ max_bound: int | None = None,
+ *args: Any,
+ **kwargs: Any
+ ) -> BoundedPoolExecutor:
+ """Create a new BoundedPoolExecutor subclass.
+
+ The class inherits from :class:`BoundedPoolExecutor` and the
+ given *executor*. The class name is composed of "Bounded" and
+ the name of the *executor*.
+ """
+ module = 'concurrent.futures'
+ if isinstance(executor, str):
+ base = getattr(
+ importlib.import_module(module), executor)
+ else:
+ base = executor
+
+ if base is futures.Executor or not issubclass(base, futures.Executor):
+ raise TypeError(
+ f'expected a real subclass of {module + ".Executor"!r} or the '
+ f'class name for executor parameter, not {base.__name__!r}'
+ )
+ new = type('Bounded' + base.__name__, (cls, base), {})
+ return super().__new__(new)
+
+ def __init__(self, executor, /, max_bound=None, *args, **kwargs) -> None:
+ """Initializer."""
+ if max_bound is not None and max_bound < 1:
+ raise ValueError("Minimum 'max_bound' is 1")
+
+ super().__init__(*args, **kwargs)
+ self._bound_semaphore = threading.BoundedSemaphore(
+ max_bound or self._max_workers)
+
+ def submit(self, fn, /, *args, **kwargs) -> futures.Future:
+ """Schedules callable *fn* to be executed as ``fn(*args, **kwargs)``.
+
+ .. code-block:: python
+
+ with BoundedPoolExecutor('ThreadPoolExecutor',
+ max_bound=5,
+ max_workers=1) as executor:
+ future = executor.submit(pow, 323, 1235)
+ print(future.result())
+
+ """
+ self._bound_semaphore.acquire()
+
+ try:
+ f = super().submit(fn, *args, **kwargs)
+ except futures.BrokenExecutor:
+ self._bound_semaphore.release()
+ raise
+
+ f.add_done_callback(lambda _f: self._bound_semaphore.release())
+ return f
+
+ if not SPHINX_RUNNING:
+ submit.__doc__ = futures.Executor.submit.__doc__
+
+ def _bound(self, sep: str = '') -> str:
+ """Helper method for str and repr."""
+ if not hasattr(self, '_bound_semaphore'):
+ # class is not fully initialized
+ return ''
+
+ bound = self._bound_semaphore._initial_value
+ return '' if bound == self._max_workers else f'{sep}{bound}'
+
+ def __str__(self):
+ """String of current BoundedPoolExecutor type.
+
+ Includes *max_bound* if necessary.
+ """
+ return f'{type(self).__name__}({self._bound()})'
+
+ def __repr__(self):
+ """Representation string of BoundedPoolExecutor.
+
+ Includes the *executor* and *max_bound* if necessary.
+ """
+ base, executor = type(self).__bases__
+ return f'{base.__name__}({executor.__name__!r}{self._bound(", ")})'
diff --git a/scripts/archivebot.py b/scripts/archivebot.py
index ac8b0fc..ccabe1f 100755
--- a/scripts/archivebot.py
+++ b/scripts/archivebot.py
@@ -164,7 +164,6 @@
import threading
import time
from collections import OrderedDict, defaultdict
-from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from hashlib import md5
from math import ceil
@@ -185,6 +184,7 @@
)
from pywikibot.time import MW_KEYS, parse_duration, str2timedelta
from pywikibot.tools import PYTHON_VERSION
+from pywikibot.tools.threading import BoundedPoolExecutor
class ArchiveBotSiteConfigError(Error):
@@ -970,9 +970,9 @@
if asynchronous:
signal.signal(signal.SIGINT, signal_handler)
- context = ThreadPoolExecutor
+ context = BoundedPoolExecutor('ThreadPoolExecutor')
else:
- context = nullcontext
+ context = nullcontext()
for template_name in templates:
tmpl = pywikibot.Page(site, template_name, ns=10)
@@ -992,7 +992,7 @@
botargs = tmpl, salt, force, keep, sort
futures = [] # needed for Python < 3.9
- with context() as executor:
+ with context as executor:
for pg in gen:
if asynchronous:
future = executor.submit(process_page, pg, *botargs)
diff --git a/scripts/fixing_redirects.py b/scripts/fixing_redirects.py
index d327621..dba56d8 100755
--- a/scripts/fixing_redirects.py
+++ b/scripts/fixing_redirects.py
@@ -23,7 +23,7 @@
from __future__ import annotations
import re
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import as_completed
from contextlib import suppress
import pywikibot
@@ -45,6 +45,7 @@
from pywikibot.textlib import isDisabled
from pywikibot.tools import first_lower
from pywikibot.tools import first_upper as firstcap
+from pywikibot.tools.threading import BoundedPoolExecutor
# This is required for the text that is shown when you run this script
@@ -187,7 +188,7 @@
pywikibot.error(e)
return
- with ThreadPoolExecutor() as executor:
+ with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
futures = {executor.submit(self.get_target, p)
for p in self.current_page.linkedPages()}
for future in as_completed(futures):
diff --git a/scripts/watchlist.py b/scripts/watchlist.py
index d5a2e1c..c6d1a61 100755
--- a/scripts/watchlist.py
+++ b/scripts/watchlist.py
@@ -33,12 +33,13 @@
import datetime
import os
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import as_completed
import pywikibot
from pywikibot import config
from pywikibot.data.api import CachedRequest
from pywikibot.exceptions import InvalidTitleError
+from pywikibot.tools.threading import BoundedPoolExecutor
try:
@@ -67,7 +68,7 @@
if not quiet:
pywikibot.info('Counting pages in watchlists of all wikis...')
- with ThreadPoolExecutor() as executor:
+ with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
futures = {executor.submit(refresh, pywikibot.Site(lang, family))
for family in config.usernames
for lang in config.usernames[family]}
@@ -95,7 +96,7 @@
cache_path = CachedRequest._get_cache_dir()
files = os.scandir(cache_path)
seen = set()
- with ThreadPoolExecutor() as executor:
+ with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
for filename in files:
entry = CacheEntry(cache_path, filename)
entry._load_cache()
diff --git a/tests/tools_threading_tests.py b/tests/tools_threading_tests.py
index 4808895..d14981e 100755
--- a/tests/tools_threading_tests.py
+++ b/tests/tools_threading_tests.py
@@ -1,16 +1,24 @@
#!/usr/bin/env python3
"""Tests for threading tools."""
#
-# (C) Pywikibot team, 2014-2022
+# (C) Pywikibot team, 2014-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations
+import time
import unittest
+from concurrent.futures import (
+ Executor,
+ Future,
+ ProcessPoolExecutor,
+ ThreadPoolExecutor,
+)
from contextlib import suppress
-from pywikibot.tools.threading import ThreadedGenerator
+from pywikibot.tools import PYTHON_VERSION
+from pywikibot.tools.threading import BoundedPoolExecutor, ThreadedGenerator
from tests.aspects import TestCase
@@ -41,6 +49,87 @@
self.assertEqual(list(thd_gen), list(iterable))
+class BoundedThreadPoolTests(TestCase):
+
+ """BoundedThreadPool test cases."""
+
+ net = False
+
+ def test_strings(self):
+ """Test string and repr methods for executor strings."""
+ executors = ['ThreadPoolExecutor', 'ProcessPoolExecutor']
+ if PYTHON_VERSION >= (3, 14):
+ executors.append('InterpreterPoolExecutor')
+
+ for executor in executors:
+ with self.subTest(executor=executor):
+ pool = BoundedPoolExecutor(executor)
+ self.assertEqual(str(pool), f'Bounded{executor}()')
+ self.assertEqual(repr(pool),
+ f'BoundedPoolExecutor({executor!r})')
+ self.assertEqual(pool._bound_semaphore._initial_value,
+ pool._max_workers)
+
+ def test_class(self):
+ """Test string and repr methods for a executor class."""
+ executors = [ThreadPoolExecutor, ProcessPoolExecutor]
+ if PYTHON_VERSION >= (3, 14):
+ from concurrent.futures import InterpreterPoolExecutor
+ executors.append(InterpreterPoolExecutor)
+
+ for executor in executors:
+ with self.subTest(executor=executor):
+ pool = BoundedPoolExecutor(executor)
+ self.assertEqual(str(pool), f'Bounded{executor.__name__}()')
+ self.assertEqual(repr(pool),
+ f'BoundedPoolExecutor({executor.__name__!r})')
+ self.assertEqual(pool._bound_semaphore._initial_value,
+ pool._max_workers)
+
+ def test_run(self):
+ """Test examples for Executor during run."""
+ for bound in (2, 5, 7):
+ futures = []
+ with self.subTest(bound=bound), \
+ BoundedPoolExecutor('ThreadPoolExecutor',
+ max_bound=bound,
+ max_workers=5) as pool:
+ for _ in range(10):
+ future = pool.submit(time.sleep, 1)
+ self.assertIsInstance(future, Future)
+ futures.append(future)
+
+ self.assertLength(futures, 10)
+ for future in futures:
+ self.assertTrue(future.done())
+ self.assertIsNone(future.result())
+
+ def test_exceptions(self):
+ """Test exceptions when creating a bounded executor."""
+ with self.assertRaisesRegex(TypeError,
+ r'issubclass\(\) arg 1 must be a class'):
+ BoundedPoolExecutor(PYTHON_VERSION)
+ with self.assertRaisesRegex(TypeError,
+ 'expected a real subclass of '
+ r"'concurrent\.futures\.Executor'"):
+ BoundedPoolExecutor(TestCase)
+ with self.assertRaisesRegex(TypeError,
+ 'expected a real subclass of '
+ r"'concurrent\.futures\.Executor'"):
+ BoundedPoolExecutor(Future)
+ with self.assertRaisesRegex(TypeError,
+ 'expected a real subclass of '
+ r"'concurrent\.futures\.Executor'"):
+ BoundedPoolExecutor(Executor)
+ with self.assertRaisesRegex(
+ TypeError, "duplicate base class '?BoundedPoolExecutor'?"):
+ BoundedPoolExecutor(BoundedPoolExecutor)
+ with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"):
+ BoundedPoolExecutor('ThreadPoolExecutor', 0)
+ with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"):
+ BoundedPoolExecutor('ThreadPoolExecutor', max_bound=0)
+
+
if __name__ == '__main__':
with suppress(SystemExit):
unittest.main()
--
To view, visit https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1105000?usp=email
To unsubscribe, or for help writing mail filters, visit https://gerrit.wikimedia.org/r/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e
Gerrit-Change-Number: 1105000
Gerrit-PatchSet: 6
Gerrit-Owner: Xqt <info(a)gno.de>
Gerrit-Reviewer: Xqt <info(a)gno.de>
Gerrit-Reviewer: jenkins-bot