Xqt has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/770146 )
Change subject: apisite: Add typing ......................................................................
apisite: Add typing
Change-Id: I5acc6664512cce786426a06915fdcbb072cd9ec3 --- M pywikibot/data/api.py M pywikibot/site/_apisite.py M pywikibot/site/_basesite.py M pywikibot/site/_siteinfo.py 4 files changed, 367 insertions(+), 258 deletions(-)
Approvals: Xqt: Verified; Looks good to me, approved
diff --git a/pywikibot/data/api.py b/pywikibot/data/api.py index f03af4d..70c7f9c 100644 --- a/pywikibot/data/api.py +++ b/pywikibot/data/api.py @@ -18,13 +18,13 @@ from email.mime.nonmultipart import MIMENonMultipart from inspect import getfullargspec from io import BytesIO -from typing import Optional, Union +from typing import Any, Optional, Union from urllib.parse import unquote, urlencode from warnings import warn
import pywikibot from pywikibot import config, login -from pywikibot.backports import Dict, Tuple, removeprefix +from pywikibot.backports import Callable, Dict, Match, Tuple, removeprefix from pywikibot.comms import http from pywikibot.exceptions import ( Error, @@ -551,7 +551,11 @@ """Return number of cached modules.""" return len(self._paraminfo)
- def parameter(self, module: str, param_name: str) -> Optional[dict]: + def parameter( + self, + module: str, + param_name: str + ) -> Optional[Dict[str, Any]]: """ Get details about one modules parameter.
@@ -1011,7 +1015,7 @@ raise ValueError("'action' specification missing from Request.") self.action = parameters['action'] self.update(parameters) # also convert all parameter values to lists - self._warning_handler = None + self._warning_handler = None # type: Optional[Callable[[str, str], Union[Match[str], bool, None]]] # noqa: E501 # Actions that imply database updates on the server, used for various # things like throttling or skipping actions when we're in simulation # mode @@ -2051,6 +2055,9 @@ kwargs['parameters'].update(mw_api_args) return kwargs
+ def set_maximum_items(self, value: Union[int, str, None]) -> None: + raise NotImplementedError +
class APIGenerator(_RequestWrapper):
diff --git a/pywikibot/site/_apisite.py b/pywikibot/site/_apisite.py index d79e8f5..df15811 100644 --- a/pywikibot/site/_apisite.py +++ b/pywikibot/site/_apisite.py @@ -11,10 +11,12 @@ from collections import OrderedDict, defaultdict, namedtuple from contextlib import suppress from textwrap import fill -from typing import Any, Optional, Union +from typing import Any, Iterable, Optional, Type, TypeVar, Union
import pywikibot -from pywikibot.backports import Dict, List +from pywikibot.backports import DefaultDict, Dict, List, Match +from pywikibot.backports import OrderedDict as OrderedDictType +from pywikibot.backports import Pattern, Set, Tuple from pywikibot.comms.http import get_authentication from pywikibot.data import api from pywikibot.exceptions import ( @@ -62,7 +64,7 @@ ) from pywikibot.site._generators import GeneratorsMixin from pywikibot.site._interwikimap import _InterwikiMap -from pywikibot.site._namespace import Namespace +from pywikibot.site._namespace import Namespace, NamespaceArgType from pywikibot.site._siteinfo import Siteinfo from pywikibot.site._tokenwallet import TokenWallet from pywikibot.site._upload import Uploader @@ -76,7 +78,11 @@
__all__ = ('APISite', ) _logger = 'wiki.apisite' -_mw_msg_cache = defaultdict(dict) +_mw_msg_cache = defaultdict(dict) # type: DefaultDict[str, Dict[str, str]] + + +_CompType = Union[int, str, 'pywikibot.page.Page', 'pywikibot.page.Revision'] +_RequestWrapperT = TypeVar('_RequestWrapperT', bound='api._RequestWrapper')
class APISite( @@ -101,31 +107,36 @@ Do not instantiate directly; use :py:obj:`pywikibot.Site` function. """
- def __init__(self, code, fam=None, user=None) -> None: + def __init__( + self, + code: str, + fam: Union[str, 'pywikibot.family.Family', None] = None, + user: Optional[str] = None + ) -> None: """Initializer.""" super().__init__(code, fam, user) - self._globaluserinfo = {} + self._globaluserinfo = {} # type: Dict[Union[int, str], Any] self._interwikimap = _InterwikiMap(self) self._loginstatus = _LoginStatus.NOT_ATTEMPTED - self._msgcache = {} + self._msgcache = {} # type: Dict[str, str] self._paraminfo = api.ParamInfo(self) self._siteinfo = Siteinfo(self) self.tokens = TokenWallet(self)
- def __getstate__(self): + def __getstate__(self) -> Dict[str, Any]: """Remove TokenWallet before pickling, for security reasons.""" new = super().__getstate__() del new['tokens'] del new['_interwikimap'] return new
- def __setstate__(self, attrs) -> None: + def __setstate__(self, attrs: Dict[str, Any]) -> None: """Restore things removed in __getstate__.""" super().__setstate__(attrs) self._interwikimap = _InterwikiMap(self) self.tokens = TokenWallet(self)
- def interwiki(self, prefix): + def interwiki(self, prefix: str) -> BaseSite: """ Return the site for a corresponding interwiki prefix.
@@ -135,7 +146,7 @@ """ return self._interwikimap[prefix].site
- def interwiki_prefix(self, site): + def interwiki_prefix(self, site: BaseSite) -> List[str]: """ Return the interwiki prefixes going to that site.
@@ -145,9 +156,6 @@ function).
:param site: The targeted site, which might be it's own. - :type site: :py:obj:`BaseSite` - :return: The interwiki prefixes - :rtype: list (guaranteed to be not empty) :raises KeyError: if there is no interwiki prefix for that site. """ assert site is not None, 'Site must not be None' @@ -159,7 +167,7 @@ "There is no interwiki prefix to '{}'".format(site)) return sorted(prefixes, key=lambda p: (len(p), p))
- def local_interwiki(self, prefix): + def local_interwiki(self, prefix: str) -> bool: """ Return whether the interwiki prefix is local.
@@ -174,16 +182,17 @@ return self._interwikimap[prefix].local
@classmethod - def fromDBName(cls, dbname, site=None): # noqa: N802 + def fromDBName( # noqa: N802 + cls, + dbname: str, + site: Optional[BaseSite] = None + ) -> BaseSite: """ Create a site from a database name using the sitematrix.
:param dbname: database name - :type dbname: str :param site: Site to load sitematrix from. (Default meta.wikimedia.org) - :type site: pywikibot.site.APISite :return: site object for the database name - :rtype: pywikibot.site.APISite """ # TODO this only works for some WMF sites if not site: @@ -196,19 +205,26 @@ continue if 'code' in val: lang = val['code'] - for site in val['site']: - if site['dbname'] == dbname: - if site['code'] == 'wiki': - site['code'] = 'wikipedia' - return pywikibot.Site(lang, site['code']) + for m_site in val['site']: + if m_site['dbname'] == dbname: + if m_site['code'] == 'wiki': + m_site['code'] = 'wikipedia' + return pywikibot.Site(lang, m_site['code']) else: # key == 'specials' - for site in val: - if site['dbname'] == dbname: - return pywikibot.Site(url=site['url'] + '/w/index.php') + for m_site in val: + if m_site['dbname'] == dbname: + url = m_site['url'] + '/w/index.php' + return pywikibot.Site(url=url) raise ValueError('Cannot parse a site out of {}.'.format(dbname))
- def _generator(self, gen_class, type_arg: Optional[str] = None, - namespaces=None, total: Optional[int] = None, **args): + def _generator( + self, + gen_class: Type[_RequestWrapperT], + type_arg: Optional[str] = None, + namespaces: NamespaceArgType = None, + total: Optional[int] = None, + **args: Any + ) -> _RequestWrapperT: """Convenience method that returns an API generator.
All generic keyword arguments are passed as MW API parameter @@ -221,18 +237,14 @@ constructor unchanged (not all types require this) :param namespaces: if not None, limit the query to namespaces in this list - :type namespaces: iterable of str or Namespace key, - or a single instance of those types. May be a '|' separated - list of namespace identifiers. :param total: if not None, limit the generator to yielding this many items in total :return: iterable with parameters set - :rtype: _RequestWrapper :raises KeyError: a namespace identifier was not resolved :raises TypeError: a namespace identifier has an inappropriate type such as NoneType or bool """ - req_args = {'site': self} + req_args = {'site': self} # type: Dict[str, Any] if 'g_content' in args: req_args['g_content'] = args.pop('g_content') if 'parameters' in args: @@ -249,7 +261,7 @@ return gen
@staticmethod - def _request_class(kwargs): + def _request_class(kwargs: Dict[str, Any]) -> Type[api.Request]: """ Get the appropriate class.
@@ -263,7 +275,7 @@ return api.CachedRequest return api.Request
- def _request(self, **kwargs): + def _request(self, **kwargs: Any) -> api.Request: """Create a request by forwarding all parameters directly.""" if 'expiry' in kwargs and kwargs['expiry'] is None: del kwargs['expiry'] @@ -271,11 +283,11 @@ return self._request_class(kwargs)(site=self, **kwargs)
@deprecated('simple_request', since='7.1.0') - def _simple_request(self, **kwargs): + def _simple_request(self, **kwargs: Any) -> api.Request: """DEPRECATED. Create a request using all kwargs as parameters.""" return self.simple_request(**kwargs)
- def simple_request(self, **kwargs): + def simple_request(self, **kwargs: Any) -> api.Request: """Create a request by defining all kwargs as parameters.
.. versionchanged:: 7.1 @@ -289,8 +301,6 @@
The expected usernames are those provided as the user parameter at instantiation. - - :rtype: bool """ if not hasattr(self, '_userinfo'): return False @@ -305,16 +315,16 @@
return True
- def is_oauth_token_available(self): - """ - Check whether OAuth token is set for this site. - - :rtype: bool - """ + def is_oauth_token_available(self) -> bool: + """Check whether OAuth token is set for this site.""" auth_token = get_authentication(self.base_url('')) return auth_token is not None and len(auth_token) == 4
- def login(self, autocreate: bool = False, user: Optional[str] = None): + def login( + self, + autocreate: bool = False, + user: Optional[str] = None + ) -> None: """ Log the user in if not already logged in.
@@ -443,19 +453,20 @@ api._invalidate_superior_cookies(self.family)
@property - def maxlimit(self): + def maxlimit(self) -> int: """Get the maximum limit of pages to be retrieved.
.. versionadded:: 7.0 """ parameter = self._paraminfo.parameter('query+info', 'prop') + assert parameter is not None if self.logged_in() and self.has_right('apihighlimits'): return int(parameter['highlimit'])
return int(parameter['limit']) # T78333, T161783
@property - def userinfo(self): + def userinfo(self) -> Dict[str, Any]: """Retrieve userinfo from site and store in _userinfo attribute.
To force retrieving userinfo ignoring cache, just delete this @@ -521,9 +532,10 @@
:raises TypeError: Inappropriate argument type of 'user' """ + param = {} # type: Dict[str, Union[int, str]] if user is None: user = self.username() - param = {} + assert isinstance(user, str) elif isinstance(user, str): param = {'guiuser': user} elif isinstance(user, int): @@ -567,8 +579,10 @@
..versionadded:: 7.0 """ + username = self.username() + assert username is not None with suppress(KeyError): - del self._globaluserinfo[self.username()] + del self._globaluserinfo[username]
def is_blocked(self, force: bool = False) -> bool: """Return True when logged in user is blocked. @@ -599,7 +613,7 @@ """ return 'locked' in self.get_globaluserinfo(user, force)
- def get_searched_namespaces(self, force: bool = False): + def get_searched_namespaces(self, force: bool = False) -> Set[Namespace]: """ Retrieve the default searched namespaces for the user.
@@ -609,7 +623,6 @@
:param force: Whether the cache should be discarded. :return: The namespaces which are searched by default. - :rtype: ``set`` of :py:obj:`Namespace` """ # TODO: Integrate into _userinfo if (force or not hasattr(self, '_useroptions') @@ -624,7 +637,7 @@ "API userinfo response lacks 'query' key" assert 'userinfo' in uidata['query'], \ "API userinfo response lacks 'userinfo' key" - self._useroptions = uidata['query']['userinfo']['options'] + self._useroptions = uidata['query']['userinfo']['options'] # type: Dict[str, Any] # noqa: E501 # To determine if user name has changed self._useroptions['_name'] = ( None if 'anon' in uidata['query']['userinfo'] else @@ -633,9 +646,9 @@ and self._useroptions['searchNs{}'.format(ns.id)] in ['1', True]}
- @property + @property # type: ignore[misc] @deprecated('articlepath', since='7.0.0') - def article_path(self): + def article_path(self) -> str: """Get the nice article path without $1.
.. deprecated:: 7.0 @@ -644,7 +657,7 @@ return self.articlepath[:-2]
@property - def articlepath(self): + def articlepath(self) -> str: """Get the nice article path with placeholder.
.. versionadded:: 7.0 @@ -656,19 +669,29 @@ return path.replace('$1', '{}')
@staticmethod - def assert_valid_iter_params(msg_prefix, start, end, reverse: bool, - is_ts: bool = True) -> None: + def assert_valid_iter_params( + msg_prefix: str, + start: Union[datetime.datetime, int, str], + end: Union[datetime.datetime, int, str], + reverse: bool, + is_ts: bool = True + ) -> None: """Validate iterating API parameters.
:param msg_prefix: The calling method name - :type msg_prefix: str :param start: The start value to compare :param end: The end value to compare :param reverse: The reverse option :param is_ts: When comparing timestamps (with is_ts=True) the start is usually greater than end. Comparing titles this is vice versa. - :raises AssertionError: start/end values are in wrong order + :raises AssertionError: start/end values are not comparabel types or + are in the wrong order """ + if not (isinstance(end, type(start)) or isinstance(start, type(end))): + raise TypeError( + 'start ({!r}) and end ({!r}) must be comparable' + .format(start, end) + ) if reverse ^ is_ts: low, high = end, start order = 'follow' @@ -678,22 +701,25 @@ msg = ('{method}: "start" must {order} "end" ' 'with reverse={reverse} and is_ts={is_ts} ' 'but "start" is "{start}" and "end" is "{end}".') - assert low < high, fill(msg.format(method=msg_prefix, order=order, - start=start, end=end, - reverse=reverse, is_ts=is_ts)) + assert low < high, fill(msg.format( # type: ignore[operator] + method=msg_prefix, + order=order, + start=start, + end=end, + reverse=reverse, + is_ts=is_ts))
- def has_right(self, right): + def has_right(self, right: str) -> bool: """Return true if and only if the user has a specific right.
Possible values of 'right' may vary depending on wiki settings. https://www.mediawiki.org/wiki/API:Userinfo
:param right: a specific right to be validated - :type right: str """ return right.lower() in self.userinfo['rights']
- def has_group(self, group): + def has_group(self, group: str) -> bool: """Return true if and only if the user is a member of specified group.
Possible values of 'group' may vary depending on wiki settings, @@ -702,11 +728,15 @@ """ return group.lower() in self.userinfo['groups']
- def messages(self): + def messages(self) -> bool: """Return true if the user has new messages, and false otherwise.""" return 'messages' in self.userinfo
- def mediawiki_messages(self, keys, lang: Optional[str] = None): + def mediawiki_messages( + self, + keys: Iterable[str], + lang: Optional[str] = None + ) -> OrderedDictType[str, str]: """Fetch the text of a set of MediaWiki messages.
The returned dict uses each key to store the associated message. @@ -714,9 +744,7 @@ :see: https://www.mediawiki.org/wiki/API:Allmessages
:param keys: MediaWiki messages to fetch - :type keys: iterable of str :param lang: a language code, default is self.lang - :rtype: OrderedDict """ amlang = lang or self.lang if not all(amlang in _mw_msg_cache @@ -744,37 +772,39 @@
return OrderedDict((key, _mw_msg_cache[amlang][key]) for key in keys)
- def mediawiki_message(self, key, lang=None) -> str: + def mediawiki_message( + self, + key: str, + lang: Optional[str] = None + ) -> str: """Fetch the text for a MediaWiki message.
:param key: name of MediaWiki message - :type key: str :param lang: a language code, default is self.lang - :type lang: str or None """ return self.mediawiki_messages([key], lang=lang)[key]
- def has_mediawiki_message(self, key, lang=None): + def has_mediawiki_message( + self, + key: str, + lang: Optional[str] = None + ) -> bool: """Determine if the site defines a MediaWiki message.
:param key: name of MediaWiki message - :type key: str :param lang: a language code, default is self.lang - :type lang: str or None - - :rtype: bool """ return self.has_all_mediawiki_messages([key], lang=lang)
- def has_all_mediawiki_messages(self, keys, lang=None) -> bool: + def has_all_mediawiki_messages( + self, + keys: Iterable[str], + lang: Optional[str] = None + ) -> bool: """Confirm that the site defines a set of MediaWiki messages.
:param keys: names of MediaWiki messages - :type keys: iterable of str :param lang: a language code, default is self.lang - :type lang: str or None - - :rtype: bool """ try: self.mediawiki_messages(keys, lang=lang) @@ -783,14 +813,13 @@ return True
@property - def months_names(self): + def months_names(self) -> List[Tuple[str, str]]: """Obtain month names from the site messages.
The list is zero-indexed, ordered by month in calendar, and should be in the original site language.
:return: list of tuples (month name, abbreviation) - :rtype: list """ if hasattr(self, '_months_names'): return self._months_names @@ -804,7 +833,7 @@
months = self.mediawiki_messages(months_long + months_short)
- self._months_names = [] + self._months_names = [] # type: List[Tuple[str, str]] for m_l, m_s in zip(months_long, months_short): self._months_names.append((months[m_l], months[m_s]))
@@ -835,7 +864,12 @@ return msgs['comma-separator'].join( args[:-2] + [concat.join(args[-2:])])
- def expand_text(self, text: str, title=None, includecomments=None) -> str: + def expand_text( + self, + text: str, + title: Optional[str] = None, + includecomments: Optional[bool] = None + ) -> str: """Parse the given text for preprocessing and rendering.
e.g expand templates and strip comments if includecomments @@ -844,11 +878,8 @@ magic parser words like {{CURRENTTIMESTAMP}}.
:param text: text to be expanded - :type text: str :param title: page title without section - :type title: str :param includecomments: if True do not strip comments - :type includecomments: bool """ if not isinstance(text, str): raise ValueError('text must be a string') @@ -866,19 +897,18 @@ key = '*' return req.submit()['expandtemplates'][key]
- def getcurrenttimestamp(self): + def getcurrenttimestamp(self) -> str: """ Return the server time as a MediaWiki timestamp string.
It calls :py:obj:`server_time` first so it queries the server to get the current server time.
- :return: the server time - :rtype: str (as 'yyyymmddhhmmss') + :return: the server time (as 'yyyymmddhhmmss') """ return self.server_time().totimestampformat()
- def server_time(self): + def server_time(self) -> 'pywikibot.Timestamp': """ Return a Timestamp object representing the current server time.
@@ -886,12 +916,11 @@ reload before returning the time.
:return: the current server time - :rtype: :py:obj:`Timestamp` """ return pywikibot.Timestamp.fromISOformat( self.siteinfo.get('time', expiry=True))
- def getmagicwords(self, word): + def getmagicwords(self, word: str) -> List[str]: """Return list of localized "word" magic words for the site.""" if not hasattr(self, '_magicwords'): magicwords = self.siteinfo.get('magicwords', cache=False) @@ -902,18 +931,18 @@ return self._magicwords[word] return [word]
- def redirect(self): + def redirect(self) -> str: """Return the localized #REDIRECT keyword.""" # return the magic word without the preceding '#' character return self.getmagicwords('redirect')[0].lstrip('#')
@deprecated('redirect_regex', since='5.5.0') - def redirectRegex(self): # noqa: N802 + def redirectRegex(self) -> Pattern[str]: # noqa: N802 """Return a compiled regular expression matching on redirect pages.""" return self.redirect_regex
@property - def redirect_regex(self): + def redirect_regex(self) -> Pattern[str]: """Return a compiled regular expression matching on redirect pages.
Group 1 in the regex match object will be the target title. @@ -929,15 +958,15 @@ pattern = None return super().redirectRegex(pattern)
- def pagenamecodes(self): + def pagenamecodes(self) -> List[str]: """Return list of localized PAGENAME tags for the site.""" return self.getmagicwords('pagename')
- def pagename2codes(self): + def pagename2codes(self) -> List[str]: """Return list of localized PAGENAMEE tags for the site.""" return self.getmagicwords('pagenamee')
- def _build_namespaces(self): + def _build_namespaces(self) -> Dict[int, Namespace]: _namespaces = {}
for nsdata in self.siteinfo.get('namespaces', cache=False).values(): @@ -959,7 +988,7 @@ namespace = Namespace(ns, canonical_name, custom_name, **nsdata) _namespaces[ns] = namespace
- for item in self.siteinfo.get('namespacealiases'): + for item in self.siteinfo.get('namespacealiasesi'): ns = int(item['id']) try: namespace = _namespaces[ns] @@ -973,13 +1002,11 @@
return _namespaces
- def has_extension(self, name) -> bool: + def has_extension(self, name: str) -> bool: """Determine whether extension `name` is loaded.
:param name: The extension to check for, case sensitive - :type name: str :return: If the extension is loaded - :rtype: bool """ extensions = self.siteinfo['extensions'] for ext in extensions: @@ -988,16 +1015,16 @@ return False
@property - def siteinfo(self): + def siteinfo(self) -> Siteinfo: """Site information dict.""" return self._siteinfo
- def dbName(self): # noqa: N802 + def dbName(self) -> str: # noqa: N802 """Return this site's internal id.""" return self.siteinfo['wikiid']
@property - def lang(self): + def lang(self) -> str: """Return the code for the language of this Site.""" return self.siteinfo['lang']
@@ -1023,30 +1050,33 @@ return version
@property - def mw_version(self): + def mw_version(self) -> MediaWikiVersion: """Return self.version() as a MediaWikiVersion object.
Cache the result for 24 hours. - :rtype: MediaWikiVersion """ mw_ver, cache_time = getattr(self, '_mw_version_time', (None, None)) - if mw_ver is None or time.time() - cache_time > 60 * 60 * 24: + if ( + mw_ver is None + or cache_time is None + or time.time() - cache_time > 60 * 60 * 24 + ): mw_ver = MediaWikiVersion(self.version()) self._mw_version_time = mw_ver, time.time() return mw_ver
@property - def has_image_repository(self): + def has_image_repository(self) -> bool: """Return True if site has a shared image repository like Commons.""" code, fam = self.shared_image_repository() return bool(code or fam)
@property - def has_data_repository(self): + def has_data_repository(self) -> bool: """Return True if site has a shared data repository like Wikidata.""" return self.data_repository() is not None
- def image_repository(self): + def image_repository(self) -> Optional[BaseSite]: """Return Site object for image repository e.g. commons.""" code, fam = self.shared_image_repository() if bool(code or fam): @@ -1054,14 +1084,16 @@
return None
- def data_repository(self): + def data_repository(self) -> Optional['pywikibot.site.DataSite']: """ Return the data repository connected to this site.
:return: The data repository if one is connected or None otherwise. - :rtype: pywikibot.site.DataSite or None """ - def handle_warning(mod, warning): + def handle_warning( + mod: str, + warning: str + ) -> Union[Match[str], bool, None]: return (mod == 'query' and re.match( r'Unrecognized value for parameter ['"]meta['"]: wikibase', warning)) @@ -1084,24 +1116,25 @@ assert 'warnings' in data return None
- def is_image_repository(self): + def is_image_repository(self) -> bool: """Return True if Site object is the image repository.""" return self is self.image_repository()
- def is_data_repository(self): + def is_data_repository(self) -> bool: """Return True if its data repository is itself.""" # fixme: this was an identity check return self == self.data_repository()
- def page_from_repository(self, item): + def page_from_repository( + self, + item: str + ) -> Optional['pywikibot.page.Page']: """ Return a Page for this site object specified by Wikibase item.
:param item: id number of item, "Q###", - :type item: str :return: Page, or Category object given by Wikibase item number for this site object. - :rtype: pywikibot.Page or None
:raises pywikibot.exceptions.UnknownExtensionError: site has no Wikibase extension @@ -1125,12 +1158,12 @@ page = pywikibot.Category(page) return page
- def nice_get_address(self, title): + def nice_get_address(self, title: str) -> str: """Return shorter URL path to retrieve page titled 'title'.""" # 'title' is expected to be URL-encoded already return self.siteinfo['articlepath'].replace('$1', title)
- def namespace(self, num: int, all: bool = False): + def namespace(self, num: int, all: bool = False) -> Union[str, Namespace]: """Return string containing local name of namespace 'num'.
If optional argument 'all' is true, return all recognized @@ -1140,17 +1173,21 @@ :param all: If True return a Namespace object. Otherwise return the namespace name. :return: local name or Namespace object - :rtype: str or Namespace """ if all: return self.namespaces[num] return self.namespaces[num][0]
- def _update_page(self, page, query, verify_imageinfo: bool = False): + def _update_page( + self, + page: 'pywikibot.page.BasePage', + query: api.PropertyGenerator, + verify_imageinfo: bool = False + ) -> None: """Update page attributes.
:param page: page object to be updated - :param query: a api.QueryGenerator + :param query: API query generator :param verify_imageinfo: if given, every pageitem is checked whether 'imageinfo' is missing. In that case an exception is raised. @@ -1170,7 +1207,11 @@ raise PageRelatedError( page, 'loadimageinfo: Query on {} returned no imageinfo')
- def loadpageinfo(self, page, preload: bool = False) -> None: + def loadpageinfo( + self, + page: 'pywikibot.page.BasePage', + preload: bool = False + ) -> None: """Load page info from api and store in page attributes.
:see: https://www.mediawiki.org/wiki/API:Info @@ -1186,7 +1227,7 @@ inprop=inprop) self._update_page(page, query)
- def loadpageprops(self, page) -> None: + def loadpageprops(self, page: 'pywikibot.page.BasePage') -> None: """Load page props for the given page.""" title = page.title(with_section=False) query = self._generator(api.PropertyGenerator, @@ -1195,8 +1236,14 @@ ) self._update_page(page, query)
- def loadimageinfo(self, page, history: bool = False, - url_width=None, url_height=None, url_param=None) -> None: + def loadimageinfo( + self, + page: 'pywikibot.page.FilePage', + history: bool = False, + url_width: Optional[int] = None, + url_height: Optional[int] = None, + url_param: Optional[str] = None + ) -> None: """Load image info from api and save in page attributes.
Parameters correspond to iiprops in: @@ -1210,39 +1257,41 @@ :param url_param: see iiurlparam in [1]
""" - title = page.title(with_section=False) - args = {'titles': title, + args = {'titles': page.title(with_section=False), 'iiurlwidth': url_width, 'iiurlheight': url_height, 'iiurlparam': url_param, + 'iiprop': ['timestamp', 'user', 'comment', 'url', 'size', + 'sha1', 'mime', 'metadata', 'archivename'] } if not history: args['total'] = 1 query = self._generator(api.PropertyGenerator, type_arg='imageinfo', - iiprop=['timestamp', 'user', 'comment', - 'url', 'size', 'sha1', 'mime', - 'metadata', 'archivename'], **args) self._update_page(page, query, verify_imageinfo=True)
- def page_restrictions(self, page): + def page_restrictions( + self, + page: 'pywikibot.page.BasePage' + ) -> Dict[str, Tuple[str, str]]: """Return a dictionary reflecting page protections.""" if not hasattr(page, '_protection'): self.loadpageinfo(page) return page._protection
- def page_can_be_edited(self, page, action: str = 'edit') -> bool: + def page_can_be_edited( + self, + page: 'pywikibot.page.BasePage', + action: str = 'edit' + ) -> bool: """Determine if the page can be modified.
Return True if the bot has the permission of needed restriction level for the given action type.
- :param page: a pywikibot.Page object - :type page: pywikibot.Page + :param page: a pywikibot.page.BasePage object :param action: a valid restriction type like 'edit', 'move' - :type action: str - :rtype: bool
:raises ValueError: invalid action parameter """ @@ -1262,21 +1311,22 @@ return True return False
- def page_isredirect(self, page): + def page_isredirect(self, page: 'pywikibot.page.BasePage') -> bool: """Return True if and only if page is a redirect.""" if not hasattr(page, '_isredir'): page._isredir = False # bug T56684 self.loadpageinfo(page) return page._isredir
- def getredirtarget(self, page): + def getredirtarget( + self, + page: 'pywikibot.page.BasePage' + ) -> 'pywikibot.page.Page': """ Return page object for the redirect target of page.
:param page: page to search redirects for - :type page: pywikibot.page.BasePage :return: redirect target of page - :rtype: pywikibot.Page
:raises pywikibot.exceptions.IsNotRedirectPageError: page is not a redirect @@ -1376,10 +1426,15 @@ Valid tokens depend on mw version. """ query = 'tokens' if self.mw_version < '1.24wmf19' else 'query+tokens' - token_types = self._paraminfo.parameter(query, 'type')['type'] - return [token for token in types if token in token_types] + data = self._paraminfo.parameter(query, 'type') + assert data is not None + return [token for token in types if token in data['type']]
- def get_tokens(self, types: List[str], all: bool = False) -> dict: + def get_tokens( + self, + types: List[str], + all: bool = False + ) -> Dict[str, str]: """Preload one or multiple tokens.
For MediaWiki version 1.23, only one token can be retrieved at once. @@ -1407,7 +1462,7 @@
return: a dict with retrieved valid tokens. """ - def warn_handler(mod, text): + def warn_handler(mod: str, text: str) -> Optional[Match[str]]: """Filter warnings for not available tokens.""" return re.match( r'Action '\w+' is not allowed for the current user', text) @@ -1415,16 +1470,16 @@ user_tokens = {} if self.mw_version < '1.24wmf19': if all is not False: - types_wiki = self._paraminfo.parameter('tokens', - 'type')['type'] - types.extend(types_wiki) + pdata = self._paraminfo.parameter('tokens', 'type') + assert pdata is not None + types.extend(pdata['type']) req = self.simple_request(action='tokens', type=self.validate_tokens(types)) else: if all is not False: - types_wiki = self._paraminfo.parameter('query+tokens', - 'type')['type'] - types.extend(types_wiki) + pdata = self._paraminfo.parameter('query+tokens', 'type') + assert pdata is not None + types.extend(pdata['type'])
req = self.simple_request(action='query', meta='tokens', type=self.validate_tokens(types)) @@ -1443,7 +1498,7 @@ return user_tokens
# TODO: expand support to other parameters of action=parse? - def get_parsed_page(self, page: 'pywikibot.Page') -> str: + def get_parsed_page(self, page: 'pywikibot.page.BasePage') -> str: """Retrieve parsed text of the page using action=parse.
.. versionchanged:: 7.1 @@ -1461,7 +1516,7 @@ raise KeyError('API parse response lacks {} key'.format(e)) return parsed_text
- def getcategoryinfo(self, category) -> None: + def getcategoryinfo(self, category: 'pywikibot.page.Category') -> None: """Retrieve data on contents of category.
:see: https://www.mediawiki.org/wiki/API:Categoryinfo @@ -1472,7 +1527,10 @@ titles=cititle.encode(self.encoding())) self._update_page(category, ciquery)
- def categoryinfo(self, category): + def categoryinfo( + self, + category: 'pywikibot.page.Category' + ) -> Dict[str, int]: """Retrieve data on contents of category.""" if not hasattr(category, '_catinfo'): self.getcategoryinfo(category) @@ -1482,19 +1540,28 @@ 'subcats': 0} return category._catinfo
- def isBot(self, username): # noqa: N802 + def isBot(self, username: str) -> bool: # noqa: N802 """Return True is username is a bot user.""" return username in (userdata['name'] for userdata in self.botusers())
@property - def logtypes(self): + def logtypes(self) -> Set['str']: """Return a set of log types available on current site.""" - return set(filter(None, self._paraminfo.parameter( - 'query+logevents', 'type')['type'])) + data = self._paraminfo.parameter('query+logevents', 'type') + assert data is not None + return set(filter(None, data['type']))
@need_right('deleterevision') - def deleterevs(self, targettype: str, ids, *, - hide=None, show=None, reason: str = '', target=None): + def deleterevs( + self, + targettype: str, + ids: Union[int, str, List[Union[int, str]]], + *, + hide: Union[str, List[str], None] = None, + show: Union[str, List[str], None] = None, + reason: str = '', + target: Union['pywikibot.page.Page', str, None] = None + ) -> None: """Delete or undelete specified page revisions, file versions or logs.
:see: https://www.mediawiki.org/wiki/API:Revisiondelete @@ -1507,13 +1574,10 @@ :param targettype: Type of target. One of "archive", "filearchive", "logging", "oldimage", "revision". :param ids: Identifiers for the revision, log, file version or archive. - :type ids: int, str, or list of int or str :param hide: What to delete. Can be "comment", "content", "user" or a combination of them in pipe-separate form such as "comment|user". - :type hide: str or list of str :param show: What to undelete. Can be "comment", "content", "user" or a combination of them in pipe-separate form such as "comment|user". - :type show: str or list of str :param reason: Deletion reason. :param target: Page object or page title, if required for the type. """ @@ -1595,22 +1659,22 @@ 'titleblacklist-forbidden': TitleblacklistError, 'spamblacklist': SpamblacklistError, 'abusefilter-disallowed': AbuseFilterDisallowedError, - } + } # type: Dict[str, Union[str, Type[PageSaveRelatedError]]] _ep_text_overrides = {'appendtext', 'prependtext', 'undo'}
@need_right('edit') def editpage( self, - page, - summary=None, + page: 'pywikibot.page.BasBaseePage', + summary: Optional[str] = None, minor: bool = True, notminor: bool = False, bot: bool = True, recreate: bool = True, createonly: bool = False, nocreate: bool = False, - watch=None, - **kwargs + watch: Optional[str] = None, + **kwargs: Any ) -> bool: """Submit an edit to be saved to the wiki.
@@ -1837,8 +1901,13 @@
@need_right('mergehistory') @need_version('1.27.0-wmf.13') - def merge_history(self, source, dest, timestamp=None, - reason: Optional[str] = None): + def merge_history( + self, + source: 'pywikibot.page.BasePage', + dest: 'pywikibot.page.BasePage', + timestamp: Optional['pywikibot.Timestamp'] = None, + reason: Optional[str] = None + ) -> None: """Merge revisions from one page into another.
:see: https://www.mediawiki.org/wiki/API:Mergehistory @@ -1849,13 +1918,10 @@ revisions must be dated before the earliest dest revision).
:param source: Source page from which revisions will be merged - :type source: pywikibot.Page :param dest: Destination page to which revisions will be merged - :type dest: pywikibot.Page :param timestamp: Revisions from this page dating up to this timestamp will be merged into the destination page (if not given or False, all revisions will be merged) - :type timestamp: pywikibot.Timestamp :param reason: Optional reason for the history merge """ # Data for error messages @@ -1880,8 +1946,9 @@
if source == dest: # Same pages raise PageSaveRelatedError( - 'Cannot merge revisions of {source} to itself' - .format_map(errdata)) + page=source, + message='Cannot merge revisions of {page} to itself' + )
# Send the merge API request token = self.tokens['csrf'] @@ -1949,11 +2016,17 @@ '[[{newtitle}]] file extension does not match content of ' '[[{oldtitle}]]', 'missingtitle': "{oldtitle} doesn't exist", - } + } # type: Dict[str, Union[str, OnErrorExc]]
@need_right('move') - def movepage(self, page, newtitle: str, summary, movetalk: bool = True, - noredirect: bool = False): + def movepage( + self, + page: 'pywikibot.page.BasePage', + newtitle: str, + summary: str, + movetalk: bool = True, + noredirect: bool = False + ) -> 'pywikibot.page.Page': """Move a Page to a new title.
:see: https://www.mediawiki.org/wiki/API:Move @@ -1965,7 +2038,6 @@ :param noredirect: if True, suppress creation of a redirect from the old title to the new one :return: Page object with the new title - :rtype: pywikibot.Page """ oldtitle = page.title(with_section=False) newlink = pywikibot.Link(newtitle, self) @@ -2002,7 +2074,7 @@ _logger) if err.code in self._mv_errors: on_error = self._mv_errors[err.code] - if hasattr(on_error, 'exception'): + if not isinstance(on_error, str): # LockedPageError can be raised both if "from" or "to" page # are locked for the user. # Both pages locked is not considered @@ -2056,7 +2128,11 @@ } # other errors shouldn't arise because we check for those errors
@need_right('rollback') - def rollbackpage(self, page, **kwargs): + def rollbackpage( + self, + page: 'pywikibot.page.BasePage', + **kwargs: Any + ) -> None: """Roll back page to version before last user's edits.
:see: https://www.mediawiki.org/wiki/API:Rollback @@ -2177,11 +2253,10 @@
if isinstance(page, pywikibot.page.BasePage): params['title'] = page - msg = page.title(with_section=False) + title = page.title(with_section=False) else: - pageid = int(page) - params['pageid'] = pageid - msg = pageid + params['pageid'] = int(page) + title = str(page)
if deletetalk: if self.mw_version < '1.38wmf24': @@ -2198,7 +2273,7 @@ except APIError as err: errdata = { 'site': self, - 'title': msg, + 'title': title, 'user': self.user(), } if err.code in self._dl_errors: @@ -2210,12 +2285,20 @@ _logger) raise else: - page.clear_cache() + if isinstance(page, pywikibot.page.BasePage): + page.clear_cache() finally: self.unlock_page(page)
@need_right('undelete') - def undelete(self, page, reason: str, *, revisions=None, fileids=None): + def undelete( + self, + page: 'pywikibot.page.BasePage', + reason: str, + *, + revisions: Optional[List[str]] = None, + fileids: Optional[List[Union[int, str]]] = None + ) -> None: """Undelete page from the wiki. Requires appropriate privilege level.
:see: https://www.mediawiki.org/wiki/API:Undelete @@ -2228,13 +2311,10 @@ keyword argument required for *revisions*.
:param page: Page to be deleted. - :type page: pywikibot.BasePage :param reason: Undeletion reason. :param revisions: List of timestamps to restore. If None, restores all revisions. - :type revisions: list :param fileids: List of fileids to restore. - :type fileids: list """ token = self.tokens['delete'] params = { @@ -2278,30 +2358,34 @@ 'protect-invalidlevel': 'Invalid protection level' }
- def protection_types(self): + def protection_types(self) -> Set[str]: """ Return the protection types available on this site.
:return: protection types available - :rtype: set of str instances :see: :py:obj:`Siteinfo._get_default()` """ return set(self.siteinfo.get('restrictions')['types'])
- def protection_levels(self): + def protection_levels(self) -> Set[str]: """ Return the protection levels available on this site.
:return: protection types available - :rtype: set of str instances :see: :py:obj:`Siteinfo._get_default()` """ # implemented in b73b5883d486db0e9278ef16733551f28d9e096d return set(self.siteinfo.get('restrictions')['levels'])
@need_right('protect') - def protect(self, page, protections: dict, - reason: str, expiry=None, **kwargs): + def protect( + self, + page: 'pywikibot.page.BasePage', + protections: Dict[str, Optional[str]], + reason: str, + expiry: Union[datetime.datetime, str, None] = None, + **kwargs: Any + ) -> None: """(Un)protect a wiki page. Requires administrator status.
:see: https://www.mediawiki.org/wiki/API:Protect @@ -2315,18 +2399,17 @@ :param expiry: When the block should expire. This expiry will be applied to all protections. If None, 'infinite', 'indefinite', 'never', or '' is given, there is no expiry. - :type expiry: pywikibot.Timestamp, string in GNU timestamp format - (including ISO 8601). """ token = self.tokens['protect'] self.lock_page(page)
- protections = [ptype + '=' + level - for ptype, level in protections.items() - if level is not None] + protections_list = [ptype + '=' + level + for ptype, level in protections.items() + if level is not None] parameters = merge_unique_dicts(kwargs, action='protect', title=page, token=token, - protections=protections, reason=reason, + protections=protections_list, + reason=reason, expiry=expiry)
req = self.simple_request(**parameters) @@ -2361,8 +2444,8 @@ @need_right('block') def blockuser( self, - user, - expiry, + user: 'pywikibot.page.User', + expiry: Union[datetime.datetime, str, bool], reason: str, anononly: bool = True, nocreate: bool = True, @@ -2370,14 +2453,13 @@ noemail: bool = False, reblock: bool = False, allowusertalk: bool = False - ): + ) -> Dict[str, Any]: """ Block a user for certain amount of time and for a certain reason.
:see: https://www.mediawiki.org/wiki/API:Block
:param user: The username/IP to be blocked without a namespace. - :type user: :py:obj:`pywikibot.User` :param expiry: The length or date/time when the block expires. If 'never', 'infinite', 'indefinite' it never does. If the value is given as a str it's parsed by php's strtotime function: @@ -2390,8 +2472,6 @@
It is recommended to not use a str if possible to be independent of the API. - :type expiry: Timestamp/datetime (absolute), - str (relative/absolute) or False ('never') :param reason: The reason for the block. :param anononly: Disable anonymous edits for this IP. :param nocreate: Prevent account creation. @@ -2403,7 +2483,6 @@ :param allowusertalk: Whether the user can edit their talk page while blocked. :return: The data retrieved from the API request. - :rtype: dict """ token = self.tokens['block'] if expiry is False: @@ -2418,14 +2497,17 @@ return data
@need_right('unblock') - def unblockuser(self, user, reason: Optional[str] = None): + def unblockuser( + self, + user: 'pywikibot.page.User', + reason: Optional[str] = None + ) -> Dict[str, Any]: """ Remove the block for the user.
:see: https://www.mediawiki.org/wiki/API:Block
:param user: The username/IP without a namespace. - :type user: :py:obj:`pywikibot.User` :param reason: Reason for the unblock. """ req = self.simple_request(action='unblock', @@ -2437,14 +2519,19 @@ return data
@need_right('editmywatchlist') - def watch(self, pages, unwatch: bool = False) -> bool: + def watch( + self, + pages: Union['pywikibot.page.BasePage', + str, + List[Union['pywikibot.page.BasePage', str]] + ], + unwatch: bool = False + ) -> bool: """Add or remove pages from watchlist.
:see: https://www.mediawiki.org/wiki/API:Watch
:param pages: A single page or a sequence of pages. - :type pages: A page object, a page-title string, or sequence of them. - Also accepts a single pipe-separated string like 'title1|title2'. :param unwatch: If True, remove pages from watchlist; if False add them (default). :return: True if API returned expected response; False otherwise @@ -2458,20 +2545,23 @@ } req = self.simple_request(**parameters) results = req.submit() - unwatch = 'unwatched' if unwatch else 'watched' - return all(unwatch in r for r in results['watch']) + unwatch_s = 'unwatched' if unwatch else 'watched' + return all(unwatch_s in r for r in results['watch'])
@need_right('purge') - def purgepages(self, pages, forcelinkupdate: bool = False, - forcerecursivelinkupdate: bool = False, - converttitles: bool = False, redirects: bool = False - ) -> bool: + def purgepages( + self, + pages: List['pywikibot.page.BasePage'], + forcelinkupdate: bool = False, + forcerecursivelinkupdate: bool = False, + converttitles: bool = False, + redirects: bool = False + ) -> bool: """ Purge the server's cache for one or multiple pages.
:param pages: list of Page objects :param redirects: Automatically resolve redirects. - :type redirects: bool :param converttitles: Convert titles to other variants if necessary. Only works if the wiki's content language supports variant conversion. @@ -2503,7 +2593,7 @@ return True
@need_right('edit') - def is_uploaddisabled(self): + def is_uploaddisabled(self) -> bool: """Return True if upload is disabled on site.
When the version is at least 1.27wmf9, uses general siteinfo. @@ -2541,18 +2631,26 @@ raise RuntimeError( 'Unexpected success of upload action without parameters.')
- def stash_info(self, file_key, props=None): + def stash_info( + self, + file_key: str, + props: Optional[List[str]] = None + ) -> Dict[str, Any]: """Get the stash info for a given file key.
:see: https://www.mediawiki.org/wiki/API:Stashimageinfo """ - props = props or False + props = props or None req = self.simple_request(action='query', prop='stashimageinfo', siifilekey=file_key, siiprop=props) return req.submit()['query']['stashimageinfo'][0]
@need_right('upload') - def upload(self, filepage, **kwargs) -> bool: + def upload( + self, + filepage: 'pywikibot.page.FilePage', + **kwargs: Any + ) -> bool: """Upload a file to the wiki.
:see: https://www.mediawiki.org/wiki/API:Upload @@ -2574,7 +2672,7 @@ """ return Uploader(self, filepage, **kwargs).upload()
- def get_property_names(self, force: bool = False): + def get_property_names(self, force: bool = False) -> List[str]: """ Get property names for pages_with_property().
@@ -2587,7 +2685,7 @@ self._property_names = [pn['propname'] for pn in ppngen] return self._property_names
- def compare(self, old, diff): + def compare(self, old: _CompType, diff: _CompType) -> str: """ Corresponding method to the 'action=compare' API action.
@@ -2596,14 +2694,11 @@ See: https://en.wikipedia.org/w/api.php?action=help&modules=compare Use pywikibot.diff's html_comparator() method to parse result. :param old: starting revision ID, title, Page, or Revision - :type old: int, str, pywikibot.Page, or pywikibot.Page.Revision :param diff: ending revision ID, title, Page, or Revision - :type diff: int, str, pywikibot.Page, or pywikibot.Page.Revision :return: Returns an HTML string of a diff between two revisions. - :rtype: str """ # check old and diff types - def get_param(item): + def get_param(item: object) -> Optional[Tuple[str, Union[str, int]]]: if isinstance(item, str): return 'title', item if isinstance(item, pywikibot.Page): @@ -2614,16 +2709,16 @@ return 'rev', item.revid return None
- old = get_param(old) - if not old: + old_t = get_param(old) + if not old_t: raise TypeError('old parameter is of invalid type') - diff = get_param(diff) - if not diff: + diff_t = get_param(diff) + if not diff_t: raise TypeError('diff parameter is of invalid type')
params = {'action': 'compare', - 'from{}'.format(old[0]): old[1], - 'to{}'.format(diff[0]): diff[1]} + 'from{}'.format(old_t[0]): old_t[1], + 'to{}'.format(diff_t[0]): diff_t[1]}
req = self.simple_request(**params) data = req.submit() diff --git a/pywikibot/site/_basesite.py b/pywikibot/site/_basesite.py index 19f3250..85da2fa 100644 --- a/pywikibot/site/_basesite.py +++ b/pywikibot/site/_basesite.py @@ -7,9 +7,11 @@ import functools import re import threading +from typing import Optional from warnings import warn
import pywikibot +from pywikibot.backports import Pattern from pywikibot.exceptions import ( Error, FamilyMaintenanceWarning, @@ -177,13 +179,13 @@ self.__dict__.update(attrs) self._pagemutex = threading.Condition()
- def user(self): + def user(self) -> Optional[str]: """Return the currently-logged in bot username, or None.""" if self.logged_in(): return self.username() return None
- def username(self): + def username(self) -> Optional[str]: """Return the username used for the site.""" return self._username
@@ -347,7 +349,10 @@ linkfam, linkcode = pywikibot.Link(text, self).parse_site() return linkfam != self.family.name or linkcode != self.code
- def redirectRegex(self, pattern=None): # noqa: N802 + def redirectRegex( # noqa: N802 + self, + pattern: Optional[str] = None + ) -> Pattern[str]: """Return a compiled regular expression matching on redirect pages.
Group 1 in the regex match object will be the target title. diff --git a/pywikibot/site/_siteinfo.py b/pywikibot/site/_siteinfo.py index e7c3bc5..a92311b 100644 --- a/pywikibot/site/_siteinfo.py +++ b/pywikibot/site/_siteinfo.py @@ -9,7 +9,7 @@ import re from collections.abc import Container from contextlib import suppress -from typing import Optional +from typing import Any, Optional, Union
import pywikibot from pywikibot.exceptions import APIError @@ -254,8 +254,13 @@ """Return a siteinfo property, caching and not forcing it.""" return self.get(key, False) # caches and doesn't force it
- def get(self, key: str, get_default: bool = True, cache: bool = True, - expiry=False): + def get( + self, + key: str, + get_default: bool = True, + cache: bool = True, + expiry: Union[datetime.datetime, float, bool] = False + ) -> Any: """ Return a siteinfo property.
@@ -268,10 +273,7 @@ this method won't query the server. :param expiry: If the cache is older than the expiry it ignores the cache and queries the server to get the newest value. - :type expiry: int/float (days), :py:obj:`datetime.timedelta`, - False (never expired), True (always expired) :return: The gathered property - :rtype: various :raises KeyError: If the key is not a valid siteinfo property and the get_default option is set to False. :see: :py:obj:`_get_siteinfo`
pywikibot-commits@lists.wikimedia.org