jenkins-bot submitted this change.

View Change

Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
[cleanup] Refactor TokenWallet

Since MW 1.24 a new token system was introduced. All tokens can be
retrieved at once.

_tokenwallet.py:
- derive TokenWallet from collections.abc.Container
- give up the user key in TokenWallet._tokens because usually a Site
object is for only one user after user/sysop dualism was given up.
Keep a _currentuser attribute for sanity check.
- raise KeyError with TokenWallet if a key is not in the collection
instead of pywikibot.Error as suggested by Python documentation.
- keep the token replacement for outdated tokens but print a
FutureWarning in such cases.
- give up failed_cache attribute because __getitem__ lazy loads all
tokens at once if one token was wanted. APISite.get_tokens() is used
to get all tokens.
- add a new method 'clear()' to clear the internal cache
- deprecate 'load_tokens' method; just call clear() with it to enable
a lazy loaded refresh of then tokens cache.
- __repr__ method now gives a new result string which looks like a
valid Python expression.
- update_tokens was added to renew all tokens of a given list.
- this changes should also solve T270380

_apisite.py:
- deprecate validate_tokens() which is no longer needed.
- remove warnhandler in get_tokens which didn't work since MW 1.24 change
- deprecate 'all' parameter of get_tokens; all tokens are retrieved if
list of 'types' is empty
- add a tokens property to enable deleting the tokens cache; this is a
variant of calling 'clear()' method

requests.py:
- use TokenWallet.update_tokens() to renew tokens if needed in _bad_token()
- remove logging and simplify the code; we already have API warnings.

api/_login.py:
- deprecate get_login_token function and use self.tokens instead
- remove pre 1.27 code

others:
- update replaced tokens
- update tests
- update documentation

Bug: T306637
Bug: T270380
Change-Id: I12102055da723545f0f41408363cb45732b47967
---
M pywikibot/data/api/__init__.py
M pywikibot/data/api/_login.py
M pywikibot/data/api/_requests.py
M pywikibot/page/_user.py
M pywikibot/site/_apisite.py
M pywikibot/site/_datasite.py
M pywikibot/site/_extensions.py
M pywikibot/site/_tokenwallet.py
M pywikibot/site/_upload.py
M scripts/change_pagelang.py
M tests/aspects.py
M tests/token_tests.py
12 files changed, 310 insertions(+), 280 deletions(-)

diff --git a/pywikibot/data/api/__init__.py b/pywikibot/data/api/__init__.py
index dbd6b4c..6445c8c 100644
--- a/pywikibot/data/api/__init__.py
+++ b/pywikibot/data/api/__init__.py
@@ -47,7 +47,7 @@
"""
Clear cookies for site's second level domain.

- get_login_token() will generate new cookies needed.
+ The http module takes care of all the cookie stuff.
This is a workaround for requests bug, see :phab:`T224712`
and https://github.com/psf/requests/issues/5411
for more details.
diff --git a/pywikibot/data/api/_login.py b/pywikibot/data/api/_login.py
index 11de1ae..f91745e 100644
--- a/pywikibot/data/api/_login.py
+++ b/pywikibot/data/api/_login.py
@@ -12,6 +12,7 @@
from pywikibot import login
from pywikibot.backports import Dict
from pywikibot.login import LoginStatus
+from pywikibot.tools import deprecated

__all__ = ['LoginManager']

@@ -39,14 +40,11 @@
def _login_parameters(self, *, botpassword: bool = False
) -> Dict[str, str]:
"""Return login parameters."""
- # Since MW 1.27 only for bot passwords.
- self.action = 'login'
- if not botpassword:
- # get token using meta=tokens if supported
- token = self.get_login_token()
- if token:
- # Standard login request since MW 1.27
- self.action = 'clientlogin'
+ if botpassword:
+ self.action = 'login'
+ else:
+ token = self.site.tokens['login']
+ self.action = 'clientlogin'

# prepare default login parameters
parameters = {'action': self.action,
@@ -70,7 +68,6 @@
Note, this doesn't do anything with cookies. The http module
takes care of all the cookie stuff. Throws exception on failure.
"""
- self.below_mw_1_27 = False
if hasattr(self, '_waituntil') \
and datetime.datetime.now() < self._waituntil:
diff = self._waituntil - datetime.datetime.now()
@@ -114,21 +111,16 @@
return

if status in ('NeedToken', 'WrongToken', 'badtoken'):
- token = response.get('token')
- if token and self.below_mw_1_27: # pragma: no cover
- # fetched token using action=login
- login_request['lgtoken'] = token
- pywikibot.log('Received login token, proceed with login.')
- else:
- # if incorrect login token was used,
- # force relogin and generate fresh one
- pywikibot.error('Received incorrect login token. '
- 'Forcing re-login.')
- # invalidate superior wiki cookies (T224712)
- pywikibot.data.api._invalidate_superior_cookies(
- self.site.family)
- login_request[
- self.keyword('token')] = self.get_login_token()
+ # if incorrect login token was used,
+ # force relogin and generate fresh one
+ pywikibot.error('Received incorrect login token. '
+ 'Forcing re-login.')
+ # invalidate superior wiki cookies (T224712)
+ pywikibot.data.api._invalidate_superior_cookies(
+ self.site.family)
+ self.site.tokens.clear()
+ login_request[
+ self.keyword('token')] = self.site.tokens['login']
continue

# messagecode was introduced with 1.29.0-wmf.14
@@ -155,19 +147,12 @@

raise pywikibot.exceptions.APIError(code=status, info=fail_reason)

+ @deprecated("site.tokens['login']", since='8.0.0')
def get_login_token(self) -> Optional[str]:
"""Fetch login token for MediaWiki 1.27+.

+ .. deprecated:: 8.0
+
:return: login token
"""
- login_token_request = self.site._request(
- use_get=False,
- parameters={'action': 'query', 'meta': 'tokens', 'type': 'login'},
- )
- login_token_result = login_token_request.submit()
- # check if we have to use old implementation of mw < 1.27
- if 'query' in login_token_result:
- return login_token_result['query']['tokens'].get('logintoken')
-
- self.below_mw_1_27 = True # pragma: no cover
- return None
+ return self.site.tokens['login']
diff --git a/pywikibot/data/api/_requests.py b/pywikibot/data/api/_requests.py
index f8dce8c..8d656c8 100644
--- a/pywikibot/data/api/_requests.py
+++ b/pywikibot/data/api/_requests.py
@@ -917,7 +917,12 @@
self.wait(delay)

def _bad_token(self, code) -> bool:
- """Check for bad token."""
+ """Check for bad token.
+
+ Check for bad tokens, call :meth:`TokenWallet.update_tokens()
+ <pywikibot.site._tokenwallet.TokenWallet.update_tokens>` method
+ to update the bunch of tokens and continue loop in :meth:`submit`.
+ """
if code != 'badtoken': # Other code not handled here
return False

@@ -926,40 +931,12 @@
.format(self.site._loginstatus.name))
return False

- user_tokens = self.site.tokens._tokens[self.site.user()]
- # all token values mapped to their type
- tokens = {token: t_type for t_type, token in user_tokens.items()}
- # determine which tokens are bad
- invalid_param = {name: tokens[param[0]]
- for name, param in self._params.items()
- if len(param) == 1 and param[0] in tokens}
- # doesn't care about the cache so can directly load them
- if invalid_param:
- pywikibot.log(
- 'Bad token error for {}. Tokens for "{}" used in request; '
- 'invalidated them.'
- .format(self.site.user(),
- '", "'.join(sorted(set(invalid_param.values())))))
- # invalidate superior wiki cookies (T224712)
- pywikibot.data.api._invalidate_superior_cookies(self.site.family)
- # request new token(s) instead of invalid
- self.site.tokens.load_tokens(set(invalid_param.values()))
- # fix parameters; lets hope that it doesn't mistake actual
- # parameters as tokens
- for name, t_type in invalid_param.items():
- self[name] = self.site.tokens[t_type]
- return True
-
- # otherwise couldn't find any … weird there is nothing what
- # can be done here because it doesn't know which parameters
- # to fix
- pywikibot.log(
- 'Bad token error for {} but no parameter is using a '
- 'token. Current tokens: {}'
- .format(self.site.user(),
- ', '.join('{}: {}'.format(*e)
- for e in user_tokens.items())))
- return False
+ # invalidate superior wiki cookies (T224712)
+ pywikibot.data.api._invalidate_superior_cookies(self.site.family)
+ # update tokens
+ tokens = self.site.tokens.update_tokens(self._params['token'])
+ self._params['token'] = tokens
+ return True

def submit(self) -> dict:
"""
diff --git a/pywikibot/page/_user.py b/pywikibot/page/_user.py
index f12b507..9ac8e03 100644
--- a/pywikibot/page/_user.py
+++ b/pywikibot/page/_user.py
@@ -249,7 +249,7 @@
params = {
'action': 'emailuser',
'target': self.username,
- 'token': self.site.tokens['email'],
+ 'token': self.site.tokens['csrf'],
'subject': subject,
'text': text,
}
diff --git a/pywikibot/site/_apisite.py b/pywikibot/site/_apisite.py
index a3305ff..f45b6c0 100644
--- a/pywikibot/site/_apisite.py
+++ b/pywikibot/site/_apisite.py
@@ -14,9 +14,17 @@
from typing import Any, Iterable, Optional, Type, TypeVar, Union

import pywikibot
-from pywikibot.backports import DefaultDict, Dict, List, Match
+from pywikibot.backports import (
+ DefaultDict,
+ Dict,
+ List,
+ Match,
+ Pattern,
+ Set,
+ Tuple,
+ removesuffix,
+)
from pywikibot.backports import OrderedDict as OrderedDictType
-from pywikibot.backports import Pattern, Set, Tuple
from pywikibot.comms.http import get_authentication
from pywikibot.data import api
from pywikibot.exceptions import (
@@ -72,6 +80,7 @@
MediaWikiVersion,
cached,
deprecated,
+ issue_deprecation_warning,
merge_unique_dicts,
normalize_username,
)
@@ -121,20 +130,20 @@
self._msgcache: Dict[str, str] = {}
self._paraminfo = api.ParamInfo(self)
self._siteinfo = Siteinfo(self)
- self.tokens = TokenWallet(self)
+ self._tokens = TokenWallet(self)

def __getstate__(self) -> Dict[str, Any]:
"""Remove TokenWallet before pickling, for security reasons."""
- new = super().__getstate__()
- del new['tokens']
- del new['_interwikimap']
- return new
+ state = super().__getstate__()
+ del state['_tokens']
+ del state['_interwikimap']
+ return state

- def __setstate__(self, attrs: Dict[str, Any]) -> None:
+ def __setstate__(self, state: Dict[str, Any]) -> None:
"""Restore things removed in __getstate__."""
- super().__setstate__(attrs)
+ super().__setstate__(state)
self._interwikimap = _InterwikiMap(self)
- self.tokens = TokenWallet(self)
+ self._tokens = TokenWallet(self)

def interwiki(self, prefix: str) -> BaseSite:
"""
@@ -447,7 +456,7 @@

# Reset tokens and user properties
del self.userinfo
- self.tokens = TokenWallet(self)
+ self.tokens.clear()
self._paraminfo = api.ParamInfo(self)

# Clear also cookies for site's second level domain (T224712)
@@ -503,7 +512,7 @@
- :meth:`logged_in` to verify the user is loggend in to a site

.. seealso:: :api:`Userinfo`
- .. versionchanged:: 8.0.0
+ .. versionchanged:: 8.0
Use API formatversion 2.

:return: A dict with the following keys and values:
@@ -1528,68 +1537,127 @@

return page._redirtarget

+ @deprecated(since='8.0.0')
def validate_tokens(self, types: List[str]) -> List[str]:
- """Validate if requested tokens are acceptable."""
+ """Validate if requested tokens are acceptable.
+
+ Valid tokens may depend on mw version.
+
+ .. deprecated:: 8.0
+ """
data = self._paraminfo.parameter('query+tokens', 'type')
assert data is not None
return [token for token in types if token in data['type']]

- def get_tokens(
- self,
- types: List[str],
- all: bool = False
- ) -> Dict[str, str]:
- """Preload one or multiple tokens.
+ def get_tokens(self, types: List[str], *args, **kwargs) -> Dict[str, str]:
+ r"""Preload one or multiple tokens.

- For MediaWiki versions since 1.24wmfXXX a new token
- system was introduced which reduced the amount of tokens available.
- Most of them were merged into the 'csrf' token. If the token type in
- the parameter is not known it will default to the 'csrf' token.
+ **Usage**

- The other token types available are:
- - createaccount
- - deleteglobalaccount
- - login
- - patrol
- - rollback
- - setglobalaccountstatus
- - userrights
- - watch
+ >>> site = pywikibot.Site()
+ >>> tokens = site.get_tokens([]) # get all tokens
+ >>> list(tokens.keys()) # result depends on user
+ ['createaccount', 'login']
+ >>> tokens = site.get_tokens(['csrf', 'patrol'])
+ >>> list(tokens.keys()) # doctest: +SKIP
+ ['csrf', 'patrol']
+ >>> token = site.get_tokens(['csrf']).get('csrf') # get a single token
+ >>> token # doctest: +SKIP
+ 'a9f...0a0+\\'
+ >>> token = site.get_tokens(['unknown']) # try an invalid token
+ ... # doctest: +SKIP
+ ... # invalid token names shows a warnig and the key is not in result
+ ...
+ WARNING: API warning (tokens) of unknown format:
+ ... {'warnings': 'Unrecognized value for parameter "type": foo'}
+ {}

+ You should not call this method directly, especially if you only
+ need a specific token. Use :attr:`tokens` property instead.
+
+ .. versionchanged:: 8.0
+ ``all`` parameter is deprecated. Use an empty list for
+ ``types`` instead.
+ .. note:: ``args`` and ``kwargs`` are not used for deprecation
+ warning only.
.. seealso:: :api:`Tokens`

- :param types: the types of token (e.g., "edit", "move", "delete");
- see API documentation for full list of types
- :param all: load all available tokens, if None only if it can be done
- in one request.
-
- return: a dict with retrieved valid tokens.
+ :param types: the types of token (e.g., "csrf", "login", "patrol").
+ If the list is empty all available tokens are loaded. See
+ API documentation for full list of types.
+ :return: a dict with retrieved valid tokens.
"""
- def warn_handler(mod: str, text: str) -> Optional[Match[str]]:
- """Filter warnings for not available tokens."""
- return re.match(
- r'Action \'\w+\' is not allowed for the current user', text)
+ # deprecate 'all' parameter
+ if args or kwargs:
+ issue_deprecation_warning("'all' parameter",
+ "empty list for 'types' parameter",
+ since='8.0.0')
+ load_all = kwargs.get('all', args[0] if args else False)
+ else:
+ load_all = False

- user_tokens = {}
- if all is not False:
+ if not types or load_all is not False:
pdata = self._paraminfo.parameter('query+tokens', 'type')
assert pdata is not None
- types.extend(pdata['type'])
+ types = pdata['type']

req = self.simple_request(action='query', meta='tokens',
- type=self.validate_tokens(types))
+ type=types, formatversion=2)

- req._warning_handler = warn_handler
data = req.submit()
data = data.get('query', data)

+ user_tokens = {}
if 'tokens' in data and data['tokens']:
- user_tokens = {key[:-5]: val
+ user_tokens = {removesuffix(key, 'token'): val
for key, val in data['tokens'].items()
if val != '+\\'}

return user_tokens

+ @property
+ def tokens(self) -> 'pywikibot.site._tokenwallet.TokenWallet':
+ r"""Return the TokenWallet collection.
+
+ :class:`TokenWallet<pywikibot.site._tokenwallet.TokenWallet>`
+ collection holds all available tokens. The tokens are loaded
+ via :meth:`get_tokens` method with the first token request and
+ is retained until the TokenWallet is cleared.
+
+ **Usage:**
+
+ >>> site = pywikibot.Site()
+ >>> token = site.tokens['csrf'] # doctest: +SKIP
+ >>> token # doctest: +SKIP
+ 'df8...9e6+\\'
+ >>> 'csrf' in site.tokens # doctest: +SKIP
+ ... # Check whether the token exists
+ True
+ >>> 'invalid' in site.tokens # doctest: +SKIP
+ False
+ >>> token = site.tokens['invalid'] # doctest: +SKIP
+ Traceback (most recent call last):
+ ...
+ KeyError: "Invalid token 'invalid' for user ...
+ >>> site.tokens.clear() # clears the internal cache
+ >>> site.tokens['csrf'] # doctest: +SKIP
+ ... # get a new token
+ '1c8...9d3+\\'
+ >>> del site.tokens # another variant to clear the cache
+
+ .. versionchanged:: 8.0
+ ``tokens`` attribute became a property to enable deleter.
+ .. warning:: A deprecation warning is shown if the token name is
+ outdated, see :api:`Tokens (action)`.
+ .. seealso:: :api:`Tokens` for valid token types
+ """
+ return self._tokens
+
+ @tokens.deleter
+ def tokens(self) -> None:
+ """Deleter method to clear the TokenWallet collection."""
+ self._tokens.clear()
+
# TODO: expand support to other parameters of action=parse?
def get_parsed_page(self, page: 'pywikibot.page.BasePage') -> str:
"""Retrieve parsed text of the page using action=parse.
@@ -1680,7 +1748,7 @@
elif target:
page = pywikibot.Page(self, target)

- token = self.tokens['delete']
+ token = self.tokens['csrf']
params = {
'action': 'revisiondelete',
'token': token,
@@ -1841,7 +1909,7 @@
if not recreate:
raise

- token = self.tokens['edit']
+ token = self.tokens['csrf']
if bot is None:
bot = self.has_right('bot')
params = dict(action='edit', title=page,
@@ -2143,7 +2211,7 @@
raise NoPageError(page,
'Cannot move page {page} because it '
'does not exist on {site}.')
- token = self.tokens['move']
+ token = self.tokens['csrf']
self.lock_page(page)
req = self.simple_request(action='move',
noredirect=noredirect,
@@ -2332,7 +2400,7 @@
raise TypeError("'page' must be a FilePage not a '{}'"
.format(page.__class__.__name__))

- token = self.tokens['delete']
+ token = self.tokens['csrf']
params = {
'action': 'delete',
'token': token,
@@ -2404,7 +2472,7 @@
If None, restores all revisions.
:param fileids: List of fileids to restore.
"""
- token = self.tokens['delete']
+ token = self.tokens['csrf']
params = {
'action': 'undelete',
'title': page,
@@ -2489,7 +2557,7 @@
applied to all protections. If None, 'infinite', 'indefinite',
'never', or '' is given, there is no expiry.
"""
- token = self.tokens['protect']
+ token = self.tokens['csrf']
self.lock_page(page)

protections_list = [ptype + '=' + level
@@ -2570,7 +2638,7 @@
blocked.
:return: The data retrieved from the API request.
"""
- token = self.tokens['block']
+ token = self.tokens['csrf']
if expiry is False:
expiry = 'never'
req = self.simple_request(action='block', user=user.username,
@@ -2598,7 +2666,7 @@
"""
req = self.simple_request(action='unblock',
user=user.username,
- token=self.tokens['block'],
+ token=self.tokens['csrf'],
reason=reason)

data = req.submit()
@@ -2698,7 +2766,7 @@
# TODO: is there another way?
req = self._request(throttle=False,
parameters={'action': 'upload',
- 'token': self.tokens['edit']})
+ 'token': self.tokens['csrf']})
try:
req.submit()
except APIError as error:
diff --git a/pywikibot/site/_datasite.py b/pywikibot/site/_datasite.py
index 4d870d8..75dce37 100644
--- a/pywikibot/site/_datasite.py
+++ b/pywikibot/site/_datasite.py
@@ -285,7 +285,7 @@
params['bot'] = 1
if 'baserevid' in kwargs and kwargs['baserevid']:
params['baserevid'] = kwargs['baserevid']
- params['token'] = self.tokens['edit']
+ params['token'] = self.tokens['csrf']

for arg in kwargs:
if arg in ['clear', 'summary']:
@@ -317,7 +317,7 @@
'claim': json.dumps(claim.toJSON()),
'baserevid': entity.latest_revision_id,
'summary': summary,
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
'bot': bot,
}
req = self.simple_request(**params)
@@ -350,7 +350,7 @@
raise NoPageError(claim)
params = {'action': 'wbsetclaimvalue', 'claim': claim.snak,
'snaktype': snaktype, 'summary': summary, 'bot': bot,
- 'token': self.tokens['edit']}
+ 'token': self.tokens['csrf']}

if snaktype == 'value':
params['value'] = json.dumps(claim._formatValue())
@@ -377,7 +377,7 @@
raise NoPageError(claim)
params = {'action': 'wbsetclaim',
'claim': json.dumps(claim.toJSON()),
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
'baserevid': claim.on_item.latest_revision_id,
'summary': summary,
'bot': bot,
@@ -411,7 +411,7 @@
raise ValueError('The claim cannot have a source.')
params = {'action': 'wbsetreference', 'statement': claim.snak,
'baserevid': claim.on_item.latest_revision_id,
- 'summary': summary, 'bot': bot, 'token': self.tokens['edit']}
+ 'summary': summary, 'bot': bot, 'token': self.tokens['csrf']}

# build up the snak
if isinstance(source, list):
@@ -468,7 +468,7 @@
if (not new and hasattr(qualifier, 'hash')
and qualifier.hash is not None):
params['snakhash'] = qualifier.hash
- params['token'] = self.tokens['edit']
+ params['token'] = self.tokens['csrf']
# build up the snak
if qualifier.getSnakType() == 'value':
params['value'] = json.dumps(qualifier._formatValue())
@@ -505,7 +505,7 @@
'summary': summary,
'bot': bot,
'claim': '|'.join(claim.snak for claim in claims),
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
}

req = self.simple_request(**params)
@@ -534,7 +534,7 @@
'summary': summary, 'bot': bot,
'statement': claim.snak,
'references': '|'.join(source.hash for source in sources),
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
}

req = self.simple_request(**params)
@@ -564,7 +564,7 @@
'summary': summary,
'bot': bot,
'qualifiers': [qualifier.hash for qualifier in qualifiers],
- 'token': self.tokens['edit']
+ 'token': self.tokens['csrf']
}

req = self.simple_request(**params)
@@ -589,7 +589,7 @@
'totitle': page1.title(),
'fromsite': page2.site.dbName(),
'fromtitle': page2.title(),
- 'token': self.tokens['edit']
+ 'token': self.tokens['csrf']
}
if bot:
params['bot'] = 1
@@ -621,7 +621,7 @@
'fromid': from_item.getID(),
'toid': to_item.getID(),
'ignoreconflicts': ignore_conflicts,
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
'summary': summary,
}
if bot:
@@ -649,7 +649,7 @@
'action': 'wblmergelexemes',
'source': from_lexeme.getID(),
'target': to_lexeme.getID(),
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
'summary': summary,
}
if bot:
@@ -673,7 +673,7 @@
'action': 'wbcreateredirect',
'from': from_item.getID(),
'to': to_item.getID(),
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
'bot': bot,
}
req = self.simple_request(**params)
@@ -870,7 +870,7 @@
params.update(
{'baserevid': baserevid,
'action': action,
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
'bot': kwargs.pop('bot', True),
})
params.update(prepare_data(action, action_data))
@@ -940,7 +940,7 @@
'lexemeId': lexeme.getID(),
'data': json.dumps(form.toJSON()),
'bot': bot,
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
}
if baserevid:
params['baserevid'] = baserevid
@@ -965,7 +965,7 @@
'action': 'wblremoveform',
'id': form.getID(),
'bot': bot,
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
}
if baserevid:
params['baserevid'] = baserevid
@@ -995,7 +995,7 @@
'formId': form.getID(),
'data': json.dumps(data),
'bot': bot,
- 'token': self.tokens['edit'],
+ 'token': self.tokens['csrf'],
}
if baserevid:
params['baserevid'] = baserevid
diff --git a/pywikibot/site/_extensions.py b/pywikibot/site/_extensions.py
index 8232a9e..6c5cf58 100644
--- a/pywikibot/site/_extensions.py
+++ b/pywikibot/site/_extensions.py
@@ -65,7 +65,7 @@
# TODO: ensure that the 'echomarkread' action
# is supported by the site
kwargs = merge_unique_dicts(kwargs, action='echomarkread',
- token=self.tokens['edit'])
+ token=self.tokens['csrf'])
req = self.simple_request(**kwargs)
data = req.submit()
try:
diff --git a/pywikibot/site/_tokenwallet.py b/pywikibot/site/_tokenwallet.py
index 9e32f87..1ec829d 100644
--- a/pywikibot/site/_tokenwallet.py
+++ b/pywikibot/site/_tokenwallet.py
@@ -4,92 +4,132 @@
#
# Distributed under the terms of the MIT license.
#
-from pywikibot import debug
-from pywikibot.exceptions import Error
+from collections.abc import Container
+from typing import Any, Optional, TYPE_CHECKING
+
+from pywikibot.backports import Dict, List
+from pywikibot.tools import issue_deprecation_warning, deprecated
+
+if TYPE_CHECKING:
+ from pywikibot.site import APISite


-class TokenWallet:
+class TokenWallet(Container):

- """Container for tokens."""
+ """Container for tokens.

- def __init__(self, site) -> None:
- """Initializer.
+ You should not use this container class directly; use
+ :attr:`APISite.tokens<pywikibot.site._apisite.APISite.tokens>`
+ instead which gives access to the site's TokenWallet instance.
+ """

- :type site: pywikibot.site.APISite
- """
- self.site = site
- self._tokens = {}
- self.failed_cache = set() # cache unavailable tokens.
+ def __init__(self, site: 'APISite') -> None:
+ """Initializer."""
+ self.site: APISite = site
+ self._tokens: Dict[str, str] = {}
+ self._currentuser: Optional[str] = site.user()

- def load_tokens(self, types, all: bool = False) -> None:
- """
- Preload one or multiple tokens.
-
- :param types: the types of token.
- :type types: iterable
- :param all: load all available tokens, if None only if it can be done
- in one request.
- """
- if self.site.user() is None:
- self.site.login()
-
- self._tokens.setdefault(self.site.user(), {}).update(
- self.site.get_tokens(types, all=all))
-
- # Preload all only the first time.
- # When all=True types is extended in site.get_tokens().
- # Keys not recognised as tokens, are cached so they are not requested
- # any longer.
- if all is not False:
- for key in types:
- if key not in self._tokens[self.site.user()]:
- self.failed_cache.add((self.site.user(), key))
-
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> str:
"""Get token value for the given key."""
- if self.site.user() is None:
+ if self.site.user() is None and key != 'login':
self.site.login()

- user_tokens = self._tokens.setdefault(self.site.user(), {})
- # always preload all for users without tokens
- failed_cache_key = (self.site.user(), key)
+ if self.site.user() != self._currentuser:
+ self._currentuser = self.site.user()
+ self.clear()

- # redirect old tokens to be compatible with older MW version
+ if not self._tokens:
+ self._tokens = self.site.get_tokens([])
+
+ # Redirect old tokens which were used by outdated MediaWiki versions
+ # but show a FutureWarning for this usage:
# https://www.mediawiki.org/wiki/MediaWiki_1.37/Deprecation_of_legacy_API_token_parameters
- if self.site.mw_version >= '1.24wmf19' \
- and key in {'edit', 'delete', 'protect', 'move', 'block', 'unblock',
- 'email', 'import', 'options'}:
- debug('Token {!r} was replaced by {!r}'.format(key, 'csrf'))
+ if key in {'edit', 'delete', 'protect', 'move', 'block', 'unblock',
+ 'email', 'import', 'options'}:
+ issue_deprecation_warning(
+ f'Token {key!r}', "'csrf'", since='8.0.0')
key = 'csrf'

try:
- key = self.site.validate_tokens([key])[0]
- except IndexError:
- raise Error(
- "Requested token '{}' is invalid on {} wiki."
- .format(key, self.site))
+ token = self._tokens[key]
+ except KeyError:
+ raise KeyError(
+ f'Invalid token {key!r} for user {self._currentuser!r} on '
+ f'{self.site} wiki.') from None

- if (key not in user_tokens
- and failed_cache_key not in self.failed_cache):
- self.load_tokens([key], all=False if user_tokens else None)
-
- if key in user_tokens:
- return user_tokens[key]
- # token not allowed for self.site.user() on self.site
- self.failed_cache.add(failed_cache_key)
- # to be changed back to a plain KeyError?
- raise Error(
- "Action '{}' is not allowed for user {} on {} wiki."
- .format(key, self.site.user(), self.site))
+ return token

def __contains__(self, key) -> bool:
- """Return True if the given token name is cached."""
- return key in self._tokens.setdefault(self.site.user(), {})
+ """Return True if the token name is cached for the current user."""
+ try:
+ self[key]
+ except KeyError:
+ return False
+ return True

def __str__(self) -> str:
"""Return a str representation of the internal tokens dictionary."""
- return self._tokens.__str__()
+ return str(self._tokens)

def __repr__(self) -> str:
- """Return a representation of the internal tokens dictionary."""
- return self._tokens.__repr__()
+ """Return a representation of the TokenWallet.
+
+ >>> import pywikibot
+ >>> site = pywikibot.Site('wikipedia:test')
+ >>> repr(site.tokens)
+ "TokenWallet(pywikibot.Site('wikipedia:test'))"
+
+ .. versionchanged:: 8.0
+ Provide a string which looks like a valid Python expression.
+ """
+ user = f', user={self._currentuser!r}' if self._currentuser else ''
+ return (f'{type(self).__name__}'
+ f'(pywikibot.Site({self.site.sitename!r}{user}))')
+
+ def clear(self):
+ """Clear the self._tokens cache. Tokens are reloaded when needed.
+
+ .. versionadded:: 8.0
+ """
+ self._tokens.clear()
+
+ def update_tokens(self, tokens: List[str]) -> List[str]:
+ """Return a list of new tokens for a given list of tokens.
+
+ This method can be used if a token is outdated and has to be
+ renewed but the token type is unknown and we only have the old
+ token. It first gets the token names from all given tokens,
+ clears the cache and returns fresh new tokens of the found types.
+
+ **Usage:**
+
+ >>> import pywikibot
+ >>> site = pywikibot.Site()
+ >>> tokens = [site.tokens['csrf']] # doctest: +SKIP
+ >>> new_tokens = site.tokens.update_tokens(tokens) # doctest: +SKIP
+
+ .. code-block:: Python
+ :caption: An example for replacing request token parameters
+
+ r._params['token'] = r.site.tokens.update_tokens(r._params['token'])
+
+ .. versionadded:: 8.0
+ """
+ # find the token types
+ types = [key
+ for key, value in self._tokens.items() for token in tokens
+ if value == token]
+ self.clear() # clear the cache
+ return [self[token_type] for token_type in types]
+
+ @deprecated('clear()', since='8.0.0')
+ def load_tokens(self, *args: Any, **kwargs: Any) -> None:
+ """Clear cache to lazy load tokens when needed.
+
+ .. deprecated:: 8.0
+ Use :meth:`clear` instead.
+ .. versionchanged:: 8.0
+ Clear the cache instead of loading tokens. All parameters are
+ ignored.
+ """
+ self.clear()
diff --git a/pywikibot/site/_upload.py b/pywikibot/site/_upload.py
index 0d23428..2dd2c32 100644
--- a/pywikibot/site/_upload.py
+++ b/pywikibot/site/_upload.py
@@ -171,7 +171,7 @@

ignore_all_warnings = not callable(ignore_warnings) and ignore_warnings

- token = self.site.tokens['edit']
+ token = self.site.tokens['csrf']
result = None
file_page_title = self.filepage.title(with_ns=False)
file_size = None
diff --git a/scripts/change_pagelang.py b/scripts/change_pagelang.py
index 780007c..06817bf 100755
--- a/scripts/change_pagelang.py
+++ b/scripts/change_pagelang.py
@@ -60,11 +60,10 @@
:param page: The page to update and save
:type page: pywikibot.page.BasePage
"""
- token = self.site.get_tokens(['csrf']).get('csrf')
parameters = {'action': 'setpagelanguage',
'title': page.title(),
'lang': self.opt.setlang,
- 'token': token}
+ 'token': self.site.tokens['csrf']}
r = self.site.simple_request(**parameters)
r.submit()
pywikibot.info(f'<<lightpurple>>{page}<<default>>: Setting '
diff --git a/tests/aspects.py b/tests/aspects.py
index 8f47518..431a387 100644
--- a/tests/aspects.py
+++ b/tests/aspects.py
@@ -1404,7 +1404,8 @@
"""Test cases for deprecation function in the tools module."""

_generic_match = re.compile(
- r'.* is deprecated(?: for \d+ [^;]*)?(; use .* instead)?\.')
+ r'.* is deprecated(?: since release [\d.]+ [^;]*)?'
+ r'(; use .* instead)?\.')

source_adjustment_skips = [
unittest.case._AssertRaisesContext,
diff --git a/tests/token_tests.py b/tests/token_tests.py
index 5ba8cef..8accc51 100755
--- a/tests/token_tests.py
+++ b/tests/token_tests.py
@@ -5,15 +5,20 @@
#
# Distributed under the terms of the MIT license.
#
+import unittest
from contextlib import suppress

from pywikibot.exceptions import APIError, Error
from pywikibot.site import TokenWallet
-from pywikibot.tools import MediaWikiVersion
-from tests.aspects import DefaultSiteTestCase, TestCase, TestCaseBase, unittest
+from tests.aspects import (
+ DefaultSiteTestCase,
+ DeprecationTestCase,
+ TestCase,
+ TestCaseBase,
+)


-class TestSiteTokens(DefaultSiteTestCase):
+class TestSiteTokens(DeprecationTestCase, DefaultSiteTestCase):

"""Test cases for tokens in Site methods.

@@ -26,67 +31,22 @@

login = True

- def setUp(self):
- """Store version."""
- super().setUp()
- self.mysite = self.get_site()
- self._version = self.mysite.mw_version
- self.orig_version = self.mysite.version
-
- def tearDown(self):
- """Restore version."""
- super().tearDown()
- self.mysite.version = self.orig_version
-
- def _test_tokens(self, version, test_version, additional_token):
+ def test_tokens(self):
"""Test tokens."""
- if version and (self._version < version
- or self._version < test_version):
- raise unittest.SkipTest(
- 'Site {} version {} is too low for this tests.'
- .format(self.mysite, self._version))
-
- self.mysite.version = lambda: test_version
- del self.mysite._mw_version_time # remove cached mw_version
-
redirected_tokens = ['edit', 'move', 'delete']
- for ttype in redirected_tokens + ['patrol', additional_token]:
- try:
- token = self.mysite.tokens[ttype]
- except Error as error_msg:
- if self.mysite.validate_tokens([ttype]):
- pattern = ("Action '[a-z]+' is not allowed "
- 'for user .* on .* wiki.')
- else:
- pattern = "Requested token '[a-z]+' is invalid on .* wiki."
-
- self.assertRegex(str(error_msg), pattern)
-
- else:
- self.assertIsInstance(token, str)
- self.assertEqual(token, self.mysite.tokens[ttype])
- # test __contains__
- if test_version < '1.24wmf19':
- self.assertIn(ttype, self.mysite.tokens)
- elif ttype in redirected_tokens:
- self.assertEqual(self.mysite.tokens[ttype],
- self.mysite.tokens['csrf'])
-
- def test_tokens_in_mw_123_124wmf18(self):
- """Test ability to get page tokens."""
- if MediaWikiVersion(self.orig_version()) >= '1.37wmf24':
- self.skipTest('Site {} version {} is too new for this tests.'
- .format(self.mysite, self._version))
- self._test_tokens('1.23', '1.24wmf18', 'deleteglobalaccount')
-
- def test_tokens_in_mw_124wmf19(self):
- """Test ability to get page tokens."""
- self._test_tokens('1.24wmf19', '1.24wmf20', 'deleteglobalaccount')
+ for ttype in redirected_tokens + ['patrol', 'deleteglobalaccount']:
+ self.assertIsInstance(self.site.tokens[ttype], str)
+ self.assertIn(ttype, self.site.tokens) # test __contains__
+ if ttype in redirected_tokens:
+ self.assertEqual(self.site.tokens[ttype],
+ self.site.tokens['csrf'])
+ self._do_test_warning_filename = False
+ self.assertDeprecationParts(f'Token {ttype!r}', "'csrf'")

def test_invalid_token(self):
"""Test invalid token."""
- with self.assertRaises(Error):
- self.mysite.tokens['invalidtype']
+ with self.assertRaises(KeyError):
+ self.site.tokens['invalidtype']


class TokenTestBase(TestCaseBase):
@@ -100,11 +60,11 @@
ttype = self.token_type
try:
token = mysite.tokens[ttype]
- except Error as error_msg:
+ except KeyError as error_msg:
self.assertRegex(
str(error_msg),
- "Action '[a-z]+' is not allowed for user .* on .* wiki.")
- self.assertNotIn(self.token_type, self.site.tokens)
+ f'Invalid token {ttype!r} for user .+ on {mysite} wiki.')
+ self.assertNotIn(ttype, self.site.tokens)
self.skipTest(error_msg)

self.token = token

To view, visit change 840669. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I12102055da723545f0f41408363cb45732b47967
Gerrit-Change-Number: 840669
Gerrit-PatchSet: 18
Gerrit-Owner: Xqt <info@gno.de>
Gerrit-Reviewer: JJMC89 <JJMC89.Wikimedia@gmail.com>
Gerrit-Reviewer: Mpaa <mpaa.wiki@gmail.com>
Gerrit-Reviewer: Xqt <info@gno.de>
Gerrit-Reviewer: jenkins-bot
Gerrit-MessageType: merged