jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/840669 )
Change subject: [cleanup] Refactor TokenWallet ......................................................................
[cleanup] Refactor TokenWallet
Since MW 1.24 a new token system was introduced. All tokens can be retrieved at once.
_tokenwallet.py: - derive TokenWallet from collections.abc.Container - give up the user key in TokenWallet._tokens because usually a Site object is for only one user after user/sysop dualism was given up. Keep a _currentuser attribute for sanity check. - raise KeyError with TokenWallet if a key is not in the collection instead of pywikibot.Error as suggested by Python documentation. - keep the token replacement for outdated tokens but print a FutureWarning in such cases. - give up failed_cache attribute because __getitem__ lazy loads all tokens at once if one token was wanted. APISite.get_tokens() is used to get all tokens. - add a new method 'clear()' to clear the internal cache - deprecate 'load_tokens' method; just call clear() with it to enable a lazy loaded refresh of then tokens cache. - __repr__ method now gives a new result string which looks like a valid Python expression. - update_tokens was added to renew all tokens of a given list. - this changes should also solve T270380
_apisite.py: - deprecate validate_tokens() which is no longer needed. - remove warnhandler in get_tokens which didn't work since MW 1.24 change - deprecate 'all' parameter of get_tokens; all tokens are retrieved if list of 'types' is empty - add a tokens property to enable deleting the tokens cache; this is a variant of calling 'clear()' method
requests.py: - use TokenWallet.update_tokens() to renew tokens if needed in _bad_token() - remove logging and simplify the code; we already have API warnings.
api/_login.py: - deprecate get_login_token function and use self.tokens instead - remove pre 1.27 code
others: - update replaced tokens - update tests - update documentation
Bug: T306637 Bug: T270380 Change-Id: I12102055da723545f0f41408363cb45732b47967 --- M pywikibot/data/api/__init__.py M pywikibot/data/api/_login.py M pywikibot/data/api/_requests.py M pywikibot/page/_user.py M pywikibot/site/_apisite.py M pywikibot/site/_datasite.py M pywikibot/site/_extensions.py M pywikibot/site/_tokenwallet.py M pywikibot/site/_upload.py M scripts/change_pagelang.py M tests/aspects.py M tests/token_tests.py 12 files changed, 310 insertions(+), 280 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/data/api/__init__.py b/pywikibot/data/api/__init__.py index dbd6b4c..6445c8c 100644 --- a/pywikibot/data/api/__init__.py +++ b/pywikibot/data/api/__init__.py @@ -47,7 +47,7 @@ """ Clear cookies for site's second level domain.
- get_login_token() will generate new cookies needed. + The http module takes care of all the cookie stuff. This is a workaround for requests bug, see :phab:`T224712` and https://github.com/psf/requests/issues/5411 for more details. diff --git a/pywikibot/data/api/_login.py b/pywikibot/data/api/_login.py index 11de1ae..f91745e 100644 --- a/pywikibot/data/api/_login.py +++ b/pywikibot/data/api/_login.py @@ -12,6 +12,7 @@ from pywikibot import login from pywikibot.backports import Dict from pywikibot.login import LoginStatus +from pywikibot.tools import deprecated
__all__ = ['LoginManager']
@@ -39,14 +40,11 @@ def _login_parameters(self, *, botpassword: bool = False ) -> Dict[str, str]: """Return login parameters.""" - # Since MW 1.27 only for bot passwords. - self.action = 'login' - if not botpassword: - # get token using meta=tokens if supported - token = self.get_login_token() - if token: - # Standard login request since MW 1.27 - self.action = 'clientlogin' + if botpassword: + self.action = 'login' + else: + token = self.site.tokens['login'] + self.action = 'clientlogin'
# prepare default login parameters parameters = {'action': self.action, @@ -70,7 +68,6 @@ Note, this doesn't do anything with cookies. The http module takes care of all the cookie stuff. Throws exception on failure. """ - self.below_mw_1_27 = False if hasattr(self, '_waituntil') \ and datetime.datetime.now() < self._waituntil: diff = self._waituntil - datetime.datetime.now() @@ -114,21 +111,16 @@ return
if status in ('NeedToken', 'WrongToken', 'badtoken'): - token = response.get('token') - if token and self.below_mw_1_27: # pragma: no cover - # fetched token using action=login - login_request['lgtoken'] = token - pywikibot.log('Received login token, proceed with login.') - else: - # if incorrect login token was used, - # force relogin and generate fresh one - pywikibot.error('Received incorrect login token. ' - 'Forcing re-login.') - # invalidate superior wiki cookies (T224712) - pywikibot.data.api._invalidate_superior_cookies( - self.site.family) - login_request[ - self.keyword('token')] = self.get_login_token() + # if incorrect login token was used, + # force relogin and generate fresh one + pywikibot.error('Received incorrect login token. ' + 'Forcing re-login.') + # invalidate superior wiki cookies (T224712) + pywikibot.data.api._invalidate_superior_cookies( + self.site.family) + self.site.tokens.clear() + login_request[ + self.keyword('token')] = self.site.tokens['login'] continue
# messagecode was introduced with 1.29.0-wmf.14 @@ -155,19 +147,12 @@
raise pywikibot.exceptions.APIError(code=status, info=fail_reason)
+ @deprecated("site.tokens['login']", since='8.0.0') def get_login_token(self) -> Optional[str]: """Fetch login token for MediaWiki 1.27+.
+ .. deprecated:: 8.0 + :return: login token """ - login_token_request = self.site._request( - use_get=False, - parameters={'action': 'query', 'meta': 'tokens', 'type': 'login'}, - ) - login_token_result = login_token_request.submit() - # check if we have to use old implementation of mw < 1.27 - if 'query' in login_token_result: - return login_token_result['query']['tokens'].get('logintoken') - - self.below_mw_1_27 = True # pragma: no cover - return None + return self.site.tokens['login'] diff --git a/pywikibot/data/api/_requests.py b/pywikibot/data/api/_requests.py index f8dce8c..8d656c8 100644 --- a/pywikibot/data/api/_requests.py +++ b/pywikibot/data/api/_requests.py @@ -917,7 +917,12 @@ self.wait(delay)
def _bad_token(self, code) -> bool: - """Check for bad token.""" + """Check for bad token. + + Check for bad tokens, call :meth:`TokenWallet.update_tokens() + <pywikibot.site._tokenwallet.TokenWallet.update_tokens>` method + to update the bunch of tokens and continue loop in :meth:`submit`. + """ if code != 'badtoken': # Other code not handled here return False
@@ -926,40 +931,12 @@ .format(self.site._loginstatus.name)) return False
- user_tokens = self.site.tokens._tokens[self.site.user()] - # all token values mapped to their type - tokens = {token: t_type for t_type, token in user_tokens.items()} - # determine which tokens are bad - invalid_param = {name: tokens[param[0]] - for name, param in self._params.items() - if len(param) == 1 and param[0] in tokens} - # doesn't care about the cache so can directly load them - if invalid_param: - pywikibot.log( - 'Bad token error for {}. Tokens for "{}" used in request; ' - 'invalidated them.' - .format(self.site.user(), - '", "'.join(sorted(set(invalid_param.values()))))) - # invalidate superior wiki cookies (T224712) - pywikibot.data.api._invalidate_superior_cookies(self.site.family) - # request new token(s) instead of invalid - self.site.tokens.load_tokens(set(invalid_param.values())) - # fix parameters; lets hope that it doesn't mistake actual - # parameters as tokens - for name, t_type in invalid_param.items(): - self[name] = self.site.tokens[t_type] - return True - - # otherwise couldn't find any … weird there is nothing what - # can be done here because it doesn't know which parameters - # to fix - pywikibot.log( - 'Bad token error for {} but no parameter is using a ' - 'token. Current tokens: {}' - .format(self.site.user(), - ', '.join('{}: {}'.format(*e) - for e in user_tokens.items()))) - return False + # invalidate superior wiki cookies (T224712) + pywikibot.data.api._invalidate_superior_cookies(self.site.family) + # update tokens + tokens = self.site.tokens.update_tokens(self._params['token']) + self._params['token'] = tokens + return True
def submit(self) -> dict: """ diff --git a/pywikibot/page/_user.py b/pywikibot/page/_user.py index f12b507..9ac8e03 100644 --- a/pywikibot/page/_user.py +++ b/pywikibot/page/_user.py @@ -249,7 +249,7 @@ params = { 'action': 'emailuser', 'target': self.username, - 'token': self.site.tokens['email'], + 'token': self.site.tokens['csrf'], 'subject': subject, 'text': text, } diff --git a/pywikibot/site/_apisite.py b/pywikibot/site/_apisite.py index a3305ff..f45b6c0 100644 --- a/pywikibot/site/_apisite.py +++ b/pywikibot/site/_apisite.py @@ -14,9 +14,17 @@ from typing import Any, Iterable, Optional, Type, TypeVar, Union
import pywikibot -from pywikibot.backports import DefaultDict, Dict, List, Match +from pywikibot.backports import ( + DefaultDict, + Dict, + List, + Match, + Pattern, + Set, + Tuple, + removesuffix, +) from pywikibot.backports import OrderedDict as OrderedDictType -from pywikibot.backports import Pattern, Set, Tuple from pywikibot.comms.http import get_authentication from pywikibot.data import api from pywikibot.exceptions import ( @@ -72,6 +80,7 @@ MediaWikiVersion, cached, deprecated, + issue_deprecation_warning, merge_unique_dicts, normalize_username, ) @@ -121,20 +130,20 @@ self._msgcache: Dict[str, str] = {} self._paraminfo = api.ParamInfo(self) self._siteinfo = Siteinfo(self) - self.tokens = TokenWallet(self) + self._tokens = TokenWallet(self)
def __getstate__(self) -> Dict[str, Any]: """Remove TokenWallet before pickling, for security reasons.""" - new = super().__getstate__() - del new['tokens'] - del new['_interwikimap'] - return new + state = super().__getstate__() + del state['_tokens'] + del state['_interwikimap'] + return state
- def __setstate__(self, attrs: Dict[str, Any]) -> None: + def __setstate__(self, state: Dict[str, Any]) -> None: """Restore things removed in __getstate__.""" - super().__setstate__(attrs) + super().__setstate__(state) self._interwikimap = _InterwikiMap(self) - self.tokens = TokenWallet(self) + self._tokens = TokenWallet(self)
def interwiki(self, prefix: str) -> BaseSite: """ @@ -447,7 +456,7 @@
# Reset tokens and user properties del self.userinfo - self.tokens = TokenWallet(self) + self.tokens.clear() self._paraminfo = api.ParamInfo(self)
# Clear also cookies for site's second level domain (T224712) @@ -503,7 +512,7 @@ - :meth:`logged_in` to verify the user is loggend in to a site
.. seealso:: :api:`Userinfo` - .. versionchanged:: 8.0.0 + .. versionchanged:: 8.0 Use API formatversion 2.
:return: A dict with the following keys and values: @@ -1528,68 +1537,127 @@
return page._redirtarget
+ @deprecated(since='8.0.0') def validate_tokens(self, types: List[str]) -> List[str]: - """Validate if requested tokens are acceptable.""" + """Validate if requested tokens are acceptable. + + Valid tokens may depend on mw version. + + .. deprecated:: 8.0 + """ data = self._paraminfo.parameter('query+tokens', 'type') assert data is not None return [token for token in types if token in data['type']]
- def get_tokens( - self, - types: List[str], - all: bool = False - ) -> Dict[str, str]: - """Preload one or multiple tokens. + def get_tokens(self, types: List[str], *args, **kwargs) -> Dict[str, str]: + r"""Preload one or multiple tokens.
- For MediaWiki versions since 1.24wmfXXX a new token - system was introduced which reduced the amount of tokens available. - Most of them were merged into the 'csrf' token. If the token type in - the parameter is not known it will default to the 'csrf' token. + **Usage**
- The other token types available are: - - createaccount - - deleteglobalaccount - - login - - patrol - - rollback - - setglobalaccountstatus - - userrights - - watch + >>> site = pywikibot.Site() + >>> tokens = site.get_tokens([]) # get all tokens + >>> list(tokens.keys()) # result depends on user + ['createaccount', 'login'] + >>> tokens = site.get_tokens(['csrf', 'patrol']) + >>> list(tokens.keys()) # doctest: +SKIP + ['csrf', 'patrol'] + >>> token = site.get_tokens(['csrf']).get('csrf') # get a single token + >>> token # doctest: +SKIP + 'a9f...0a0+\' + >>> token = site.get_tokens(['unknown']) # try an invalid token + ... # doctest: +SKIP + ... # invalid token names shows a warnig and the key is not in result + ... + WARNING: API warning (tokens) of unknown format: + ... {'warnings': 'Unrecognized value for parameter "type": foo'} + {}
+ You should not call this method directly, especially if you only + need a specific token. Use :attr:`tokens` property instead. + + .. versionchanged:: 8.0 + ``all`` parameter is deprecated. Use an empty list for + ``types`` instead. + .. note:: ``args`` and ``kwargs`` are not used for deprecation + warning only. .. seealso:: :api:`Tokens`
- :param types: the types of token (e.g., "edit", "move", "delete"); - see API documentation for full list of types - :param all: load all available tokens, if None only if it can be done - in one request. - - return: a dict with retrieved valid tokens. + :param types: the types of token (e.g., "csrf", "login", "patrol"). + If the list is empty all available tokens are loaded. See + API documentation for full list of types. + :return: a dict with retrieved valid tokens. """ - def warn_handler(mod: str, text: str) -> Optional[Match[str]]: - """Filter warnings for not available tokens.""" - return re.match( - r'Action '\w+' is not allowed for the current user', text) + # deprecate 'all' parameter + if args or kwargs: + issue_deprecation_warning("'all' parameter", + "empty list for 'types' parameter", + since='8.0.0') + load_all = kwargs.get('all', args[0] if args else False) + else: + load_all = False
- user_tokens = {} - if all is not False: + if not types or load_all is not False: pdata = self._paraminfo.parameter('query+tokens', 'type') assert pdata is not None - types.extend(pdata['type']) + types = pdata['type']
req = self.simple_request(action='query', meta='tokens', - type=self.validate_tokens(types)) + type=types, formatversion=2)
- req._warning_handler = warn_handler data = req.submit() data = data.get('query', data)
+ user_tokens = {} if 'tokens' in data and data['tokens']: - user_tokens = {key[:-5]: val + user_tokens = {removesuffix(key, 'token'): val for key, val in data['tokens'].items() if val != '+\'}
return user_tokens
+ @property + def tokens(self) -> 'pywikibot.site._tokenwallet.TokenWallet': + r"""Return the TokenWallet collection. + + :class:`TokenWallet<pywikibot.site._tokenwallet.TokenWallet>` + collection holds all available tokens. The tokens are loaded + via :meth:`get_tokens` method with the first token request and + is retained until the TokenWallet is cleared. + + **Usage:** + + >>> site = pywikibot.Site() + >>> token = site.tokens['csrf'] # doctest: +SKIP + >>> token # doctest: +SKIP + 'df8...9e6+\' + >>> 'csrf' in site.tokens # doctest: +SKIP + ... # Check whether the token exists + True + >>> 'invalid' in site.tokens # doctest: +SKIP + False + >>> token = site.tokens['invalid'] # doctest: +SKIP + Traceback (most recent call last): + ... + KeyError: "Invalid token 'invalid' for user ... + >>> site.tokens.clear() # clears the internal cache + >>> site.tokens['csrf'] # doctest: +SKIP + ... # get a new token + '1c8...9d3+\' + >>> del site.tokens # another variant to clear the cache + + .. versionchanged:: 8.0 + ``tokens`` attribute became a property to enable deleter. + .. warning:: A deprecation warning is shown if the token name is + outdated, see :api:`Tokens (action)`. + .. seealso:: :api:`Tokens` for valid token types + """ + return self._tokens + + @tokens.deleter + def tokens(self) -> None: + """Deleter method to clear the TokenWallet collection.""" + self._tokens.clear() + # TODO: expand support to other parameters of action=parse? def get_parsed_page(self, page: 'pywikibot.page.BasePage') -> str: """Retrieve parsed text of the page using action=parse. @@ -1680,7 +1748,7 @@ elif target: page = pywikibot.Page(self, target)
- token = self.tokens['delete'] + token = self.tokens['csrf'] params = { 'action': 'revisiondelete', 'token': token, @@ -1841,7 +1909,7 @@ if not recreate: raise
- token = self.tokens['edit'] + token = self.tokens['csrf'] if bot is None: bot = self.has_right('bot') params = dict(action='edit', title=page, @@ -2143,7 +2211,7 @@ raise NoPageError(page, 'Cannot move page {page} because it ' 'does not exist on {site}.') - token = self.tokens['move'] + token = self.tokens['csrf'] self.lock_page(page) req = self.simple_request(action='move', noredirect=noredirect, @@ -2332,7 +2400,7 @@ raise TypeError("'page' must be a FilePage not a '{}'" .format(page.__class__.__name__))
- token = self.tokens['delete'] + token = self.tokens['csrf'] params = { 'action': 'delete', 'token': token, @@ -2404,7 +2472,7 @@ If None, restores all revisions. :param fileids: List of fileids to restore. """ - token = self.tokens['delete'] + token = self.tokens['csrf'] params = { 'action': 'undelete', 'title': page, @@ -2489,7 +2557,7 @@ applied to all protections. If None, 'infinite', 'indefinite', 'never', or '' is given, there is no expiry. """ - token = self.tokens['protect'] + token = self.tokens['csrf'] self.lock_page(page)
protections_list = [ptype + '=' + level @@ -2570,7 +2638,7 @@ blocked. :return: The data retrieved from the API request. """ - token = self.tokens['block'] + token = self.tokens['csrf'] if expiry is False: expiry = 'never' req = self.simple_request(action='block', user=user.username, @@ -2598,7 +2666,7 @@ """ req = self.simple_request(action='unblock', user=user.username, - token=self.tokens['block'], + token=self.tokens['csrf'], reason=reason)
data = req.submit() @@ -2698,7 +2766,7 @@ # TODO: is there another way? req = self._request(throttle=False, parameters={'action': 'upload', - 'token': self.tokens['edit']}) + 'token': self.tokens['csrf']}) try: req.submit() except APIError as error: diff --git a/pywikibot/site/_datasite.py b/pywikibot/site/_datasite.py index 4d870d8..75dce37 100644 --- a/pywikibot/site/_datasite.py +++ b/pywikibot/site/_datasite.py @@ -285,7 +285,7 @@ params['bot'] = 1 if 'baserevid' in kwargs and kwargs['baserevid']: params['baserevid'] = kwargs['baserevid'] - params['token'] = self.tokens['edit'] + params['token'] = self.tokens['csrf']
for arg in kwargs: if arg in ['clear', 'summary']: @@ -317,7 +317,7 @@ 'claim': json.dumps(claim.toJSON()), 'baserevid': entity.latest_revision_id, 'summary': summary, - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], 'bot': bot, } req = self.simple_request(**params) @@ -350,7 +350,7 @@ raise NoPageError(claim) params = {'action': 'wbsetclaimvalue', 'claim': claim.snak, 'snaktype': snaktype, 'summary': summary, 'bot': bot, - 'token': self.tokens['edit']} + 'token': self.tokens['csrf']}
if snaktype == 'value': params['value'] = json.dumps(claim._formatValue()) @@ -377,7 +377,7 @@ raise NoPageError(claim) params = {'action': 'wbsetclaim', 'claim': json.dumps(claim.toJSON()), - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], 'baserevid': claim.on_item.latest_revision_id, 'summary': summary, 'bot': bot, @@ -411,7 +411,7 @@ raise ValueError('The claim cannot have a source.') params = {'action': 'wbsetreference', 'statement': claim.snak, 'baserevid': claim.on_item.latest_revision_id, - 'summary': summary, 'bot': bot, 'token': self.tokens['edit']} + 'summary': summary, 'bot': bot, 'token': self.tokens['csrf']}
# build up the snak if isinstance(source, list): @@ -468,7 +468,7 @@ if (not new and hasattr(qualifier, 'hash') and qualifier.hash is not None): params['snakhash'] = qualifier.hash - params['token'] = self.tokens['edit'] + params['token'] = self.tokens['csrf'] # build up the snak if qualifier.getSnakType() == 'value': params['value'] = json.dumps(qualifier._formatValue()) @@ -505,7 +505,7 @@ 'summary': summary, 'bot': bot, 'claim': '|'.join(claim.snak for claim in claims), - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], }
req = self.simple_request(**params) @@ -534,7 +534,7 @@ 'summary': summary, 'bot': bot, 'statement': claim.snak, 'references': '|'.join(source.hash for source in sources), - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], }
req = self.simple_request(**params) @@ -564,7 +564,7 @@ 'summary': summary, 'bot': bot, 'qualifiers': [qualifier.hash for qualifier in qualifiers], - 'token': self.tokens['edit'] + 'token': self.tokens['csrf'] }
req = self.simple_request(**params) @@ -589,7 +589,7 @@ 'totitle': page1.title(), 'fromsite': page2.site.dbName(), 'fromtitle': page2.title(), - 'token': self.tokens['edit'] + 'token': self.tokens['csrf'] } if bot: params['bot'] = 1 @@ -621,7 +621,7 @@ 'fromid': from_item.getID(), 'toid': to_item.getID(), 'ignoreconflicts': ignore_conflicts, - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], 'summary': summary, } if bot: @@ -649,7 +649,7 @@ 'action': 'wblmergelexemes', 'source': from_lexeme.getID(), 'target': to_lexeme.getID(), - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], 'summary': summary, } if bot: @@ -673,7 +673,7 @@ 'action': 'wbcreateredirect', 'from': from_item.getID(), 'to': to_item.getID(), - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], 'bot': bot, } req = self.simple_request(**params) @@ -870,7 +870,7 @@ params.update( {'baserevid': baserevid, 'action': action, - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], 'bot': kwargs.pop('bot', True), }) params.update(prepare_data(action, action_data)) @@ -940,7 +940,7 @@ 'lexemeId': lexeme.getID(), 'data': json.dumps(form.toJSON()), 'bot': bot, - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], } if baserevid: params['baserevid'] = baserevid @@ -965,7 +965,7 @@ 'action': 'wblremoveform', 'id': form.getID(), 'bot': bot, - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], } if baserevid: params['baserevid'] = baserevid @@ -995,7 +995,7 @@ 'formId': form.getID(), 'data': json.dumps(data), 'bot': bot, - 'token': self.tokens['edit'], + 'token': self.tokens['csrf'], } if baserevid: params['baserevid'] = baserevid diff --git a/pywikibot/site/_extensions.py b/pywikibot/site/_extensions.py index 8232a9e..6c5cf58 100644 --- a/pywikibot/site/_extensions.py +++ b/pywikibot/site/_extensions.py @@ -65,7 +65,7 @@ # TODO: ensure that the 'echomarkread' action # is supported by the site kwargs = merge_unique_dicts(kwargs, action='echomarkread', - token=self.tokens['edit']) + token=self.tokens['csrf']) req = self.simple_request(**kwargs) data = req.submit() try: diff --git a/pywikibot/site/_tokenwallet.py b/pywikibot/site/_tokenwallet.py index 9e32f87..1ec829d 100644 --- a/pywikibot/site/_tokenwallet.py +++ b/pywikibot/site/_tokenwallet.py @@ -4,92 +4,132 @@ # # Distributed under the terms of the MIT license. # -from pywikibot import debug -from pywikibot.exceptions import Error +from collections.abc import Container +from typing import Any, Optional, TYPE_CHECKING + +from pywikibot.backports import Dict, List +from pywikibot.tools import issue_deprecation_warning, deprecated + +if TYPE_CHECKING: + from pywikibot.site import APISite
-class TokenWallet: +class TokenWallet(Container):
- """Container for tokens.""" + """Container for tokens.
- def __init__(self, site) -> None: - """Initializer. + You should not use this container class directly; use + :attr:`APISite.tokens<pywikibot.site._apisite.APISite.tokens>` + instead which gives access to the site's TokenWallet instance. + """
- :type site: pywikibot.site.APISite - """ - self.site = site - self._tokens = {} - self.failed_cache = set() # cache unavailable tokens. + def __init__(self, site: 'APISite') -> None: + """Initializer.""" + self.site: APISite = site + self._tokens: Dict[str, str] = {} + self._currentuser: Optional[str] = site.user()
- def load_tokens(self, types, all: bool = False) -> None: - """ - Preload one or multiple tokens. - - :param types: the types of token. - :type types: iterable - :param all: load all available tokens, if None only if it can be done - in one request. - """ - if self.site.user() is None: - self.site.login() - - self._tokens.setdefault(self.site.user(), {}).update( - self.site.get_tokens(types, all=all)) - - # Preload all only the first time. - # When all=True types is extended in site.get_tokens(). - # Keys not recognised as tokens, are cached so they are not requested - # any longer. - if all is not False: - for key in types: - if key not in self._tokens[self.site.user()]: - self.failed_cache.add((self.site.user(), key)) - - def __getitem__(self, key): + def __getitem__(self, key: str) -> str: """Get token value for the given key.""" - if self.site.user() is None: + if self.site.user() is None and key != 'login': self.site.login()
- user_tokens = self._tokens.setdefault(self.site.user(), {}) - # always preload all for users without tokens - failed_cache_key = (self.site.user(), key) + if self.site.user() != self._currentuser: + self._currentuser = self.site.user() + self.clear()
- # redirect old tokens to be compatible with older MW version + if not self._tokens: + self._tokens = self.site.get_tokens([]) + + # Redirect old tokens which were used by outdated MediaWiki versions + # but show a FutureWarning for this usage: # https://www.mediawiki.org/wiki/MediaWiki_1.37/Deprecation_of_legacy_API_toke... - if self.site.mw_version >= '1.24wmf19' \ - and key in {'edit', 'delete', 'protect', 'move', 'block', 'unblock', - 'email', 'import', 'options'}: - debug('Token {!r} was replaced by {!r}'.format(key, 'csrf')) + if key in {'edit', 'delete', 'protect', 'move', 'block', 'unblock', + 'email', 'import', 'options'}: + issue_deprecation_warning( + f'Token {key!r}', "'csrf'", since='8.0.0') key = 'csrf'
try: - key = self.site.validate_tokens([key])[0] - except IndexError: - raise Error( - "Requested token '{}' is invalid on {} wiki." - .format(key, self.site)) + token = self._tokens[key] + except KeyError: + raise KeyError( + f'Invalid token {key!r} for user {self._currentuser!r} on ' + f'{self.site} wiki.') from None
- if (key not in user_tokens - and failed_cache_key not in self.failed_cache): - self.load_tokens([key], all=False if user_tokens else None) - - if key in user_tokens: - return user_tokens[key] - # token not allowed for self.site.user() on self.site - self.failed_cache.add(failed_cache_key) - # to be changed back to a plain KeyError? - raise Error( - "Action '{}' is not allowed for user {} on {} wiki." - .format(key, self.site.user(), self.site)) + return token
def __contains__(self, key) -> bool: - """Return True if the given token name is cached.""" - return key in self._tokens.setdefault(self.site.user(), {}) + """Return True if the token name is cached for the current user.""" + try: + self[key] + except KeyError: + return False + return True
def __str__(self) -> str: """Return a str representation of the internal tokens dictionary.""" - return self._tokens.__str__() + return str(self._tokens)
def __repr__(self) -> str: - """Return a representation of the internal tokens dictionary.""" - return self._tokens.__repr__() + """Return a representation of the TokenWallet. + + >>> import pywikibot + >>> site = pywikibot.Site('wikipedia:test') + >>> repr(site.tokens) + "TokenWallet(pywikibot.Site('wikipedia:test'))" + + .. versionchanged:: 8.0 + Provide a string which looks like a valid Python expression. + """ + user = f', user={self._currentuser!r}' if self._currentuser else '' + return (f'{type(self).__name__}' + f'(pywikibot.Site({self.site.sitename!r}{user}))') + + def clear(self): + """Clear the self._tokens cache. Tokens are reloaded when needed. + + .. versionadded:: 8.0 + """ + self._tokens.clear() + + def update_tokens(self, tokens: List[str]) -> List[str]: + """Return a list of new tokens for a given list of tokens. + + This method can be used if a token is outdated and has to be + renewed but the token type is unknown and we only have the old + token. It first gets the token names from all given tokens, + clears the cache and returns fresh new tokens of the found types. + + **Usage:** + + >>> import pywikibot + >>> site = pywikibot.Site() + >>> tokens = [site.tokens['csrf']] # doctest: +SKIP + >>> new_tokens = site.tokens.update_tokens(tokens) # doctest: +SKIP + + .. code-block:: Python + :caption: An example for replacing request token parameters + + r._params['token'] = r.site.tokens.update_tokens(r._params['token']) + + .. versionadded:: 8.0 + """ + # find the token types + types = [key + for key, value in self._tokens.items() for token in tokens + if value == token] + self.clear() # clear the cache + return [self[token_type] for token_type in types] + + @deprecated('clear()', since='8.0.0') + def load_tokens(self, *args: Any, **kwargs: Any) -> None: + """Clear cache to lazy load tokens when needed. + + .. deprecated:: 8.0 + Use :meth:`clear` instead. + .. versionchanged:: 8.0 + Clear the cache instead of loading tokens. All parameters are + ignored. + """ + self.clear() diff --git a/pywikibot/site/_upload.py b/pywikibot/site/_upload.py index 0d23428..2dd2c32 100644 --- a/pywikibot/site/_upload.py +++ b/pywikibot/site/_upload.py @@ -171,7 +171,7 @@
ignore_all_warnings = not callable(ignore_warnings) and ignore_warnings
- token = self.site.tokens['edit'] + token = self.site.tokens['csrf'] result = None file_page_title = self.filepage.title(with_ns=False) file_size = None diff --git a/scripts/change_pagelang.py b/scripts/change_pagelang.py index 780007c..06817bf 100755 --- a/scripts/change_pagelang.py +++ b/scripts/change_pagelang.py @@ -60,11 +60,10 @@ :param page: The page to update and save :type page: pywikibot.page.BasePage """ - token = self.site.get_tokens(['csrf']).get('csrf') parameters = {'action': 'setpagelanguage', 'title': page.title(), 'lang': self.opt.setlang, - 'token': token} + 'token': self.site.tokens['csrf']} r = self.site.simple_request(**parameters) r.submit() pywikibot.info(f'<<lightpurple>>{page}<<default>>: Setting ' diff --git a/tests/aspects.py b/tests/aspects.py index 8f47518..431a387 100644 --- a/tests/aspects.py +++ b/tests/aspects.py @@ -1404,7 +1404,8 @@ """Test cases for deprecation function in the tools module."""
_generic_match = re.compile( - r'.* is deprecated(?: for \d+ [^;]*)?(; use .* instead)?.') + r'.* is deprecated(?: since release [\d.]+ [^;]*)?' + r'(; use .* instead)?.')
source_adjustment_skips = [ unittest.case._AssertRaisesContext, diff --git a/tests/token_tests.py b/tests/token_tests.py index 5ba8cef..8accc51 100755 --- a/tests/token_tests.py +++ b/tests/token_tests.py @@ -5,15 +5,20 @@ # # Distributed under the terms of the MIT license. # +import unittest from contextlib import suppress
from pywikibot.exceptions import APIError, Error from pywikibot.site import TokenWallet -from pywikibot.tools import MediaWikiVersion -from tests.aspects import DefaultSiteTestCase, TestCase, TestCaseBase, unittest +from tests.aspects import ( + DefaultSiteTestCase, + DeprecationTestCase, + TestCase, + TestCaseBase, +)
-class TestSiteTokens(DefaultSiteTestCase): +class TestSiteTokens(DeprecationTestCase, DefaultSiteTestCase):
"""Test cases for tokens in Site methods.
@@ -26,67 +31,22 @@
login = True
- def setUp(self): - """Store version.""" - super().setUp() - self.mysite = self.get_site() - self._version = self.mysite.mw_version - self.orig_version = self.mysite.version - - def tearDown(self): - """Restore version.""" - super().tearDown() - self.mysite.version = self.orig_version - - def _test_tokens(self, version, test_version, additional_token): + def test_tokens(self): """Test tokens.""" - if version and (self._version < version - or self._version < test_version): - raise unittest.SkipTest( - 'Site {} version {} is too low for this tests.' - .format(self.mysite, self._version)) - - self.mysite.version = lambda: test_version - del self.mysite._mw_version_time # remove cached mw_version - redirected_tokens = ['edit', 'move', 'delete'] - for ttype in redirected_tokens + ['patrol', additional_token]: - try: - token = self.mysite.tokens[ttype] - except Error as error_msg: - if self.mysite.validate_tokens([ttype]): - pattern = ("Action '[a-z]+' is not allowed " - 'for user .* on .* wiki.') - else: - pattern = "Requested token '[a-z]+' is invalid on .* wiki." - - self.assertRegex(str(error_msg), pattern) - - else: - self.assertIsInstance(token, str) - self.assertEqual(token, self.mysite.tokens[ttype]) - # test __contains__ - if test_version < '1.24wmf19': - self.assertIn(ttype, self.mysite.tokens) - elif ttype in redirected_tokens: - self.assertEqual(self.mysite.tokens[ttype], - self.mysite.tokens['csrf']) - - def test_tokens_in_mw_123_124wmf18(self): - """Test ability to get page tokens.""" - if MediaWikiVersion(self.orig_version()) >= '1.37wmf24': - self.skipTest('Site {} version {} is too new for this tests.' - .format(self.mysite, self._version)) - self._test_tokens('1.23', '1.24wmf18', 'deleteglobalaccount') - - def test_tokens_in_mw_124wmf19(self): - """Test ability to get page tokens.""" - self._test_tokens('1.24wmf19', '1.24wmf20', 'deleteglobalaccount') + for ttype in redirected_tokens + ['patrol', 'deleteglobalaccount']: + self.assertIsInstance(self.site.tokens[ttype], str) + self.assertIn(ttype, self.site.tokens) # test __contains__ + if ttype in redirected_tokens: + self.assertEqual(self.site.tokens[ttype], + self.site.tokens['csrf']) + self._do_test_warning_filename = False + self.assertDeprecationParts(f'Token {ttype!r}', "'csrf'")
def test_invalid_token(self): """Test invalid token.""" - with self.assertRaises(Error): - self.mysite.tokens['invalidtype'] + with self.assertRaises(KeyError): + self.site.tokens['invalidtype']
class TokenTestBase(TestCaseBase): @@ -100,11 +60,11 @@ ttype = self.token_type try: token = mysite.tokens[ttype] - except Error as error_msg: + except KeyError as error_msg: self.assertRegex( str(error_msg), - "Action '[a-z]+' is not allowed for user .* on .* wiki.") - self.assertNotIn(self.token_type, self.site.tokens) + f'Invalid token {ttype!r} for user .+ on {mysite} wiki.') + self.assertNotIn(ttype, self.site.tokens) self.skipTest(error_msg)
self.token = token