jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/986614 )
Change subject: [IMPR] Refactor xmlreader ......................................................................
[IMPR] Refactor xmlreader
1) remove state from XmlDump class 2) keep common logic in one place 3) move parse_restrictions() inside XmlDump class 4) since Elements are analized in 'page' blocks, keep event logic and memory cleanup in one place. 5) simplify XmlEntry() code 6) add type hints
Change-Id: I8f67f0cdb360fc548f6dc36e63621d270afdd7c5 --- M pywikibot/xmlreader.py 1 file changed, 199 insertions(+), 129 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/xmlreader.py b/pywikibot/xmlreader.py index ea675a6..fa5f53b 100644 --- a/pywikibot/xmlreader.py +++ b/pywikibot/xmlreader.py @@ -18,6 +18,9 @@ from __future__ import annotations
import re +from dataclasses import dataclass +from typing import NamedTuple +from xml.etree.ElementTree import Element
try: @@ -25,54 +28,53 @@ except ImportError: from xml.etree.ElementTree import iterparse, ParseError
-from pywikibot.backports import Callable -from pywikibot.tools import issue_deprecation_warning, open_archive +from pywikibot.backports import Callable, Iterator +from pywikibot.tools import ( + ModuleDeprecationWrapper, + issue_deprecation_warning, + open_archive, +)
-def parseRestrictions(restrictions): - """ - Parse the characters within a restrictions tag. - - Returns strings representing user groups allowed to edit and - to move a page, where None means there are no restrictions. - """ - if not restrictions: - return None, None - editRestriction = None - moveRestriction = None - editLockMatch = re.search('edit=([^:]*)', restrictions) - if editLockMatch: - editRestriction = editLockMatch[1] - moveLockMatch = re.search('move=([^:]*)', restrictions) - if moveLockMatch: - moveRestriction = moveLockMatch[1] - if restrictions == 'sysop': - editRestriction = 'sysop' - moveRestriction = 'sysop' - return editRestriction, moveRestriction - - +@dataclass class XmlEntry:
"""Represent a page."""
- def __init__(self, title, ns, id, text, username, ipedit, timestamp, - editRestriction, moveRestriction, revisionid, comment, - redirect) -> None: - """Initializer.""" - # TODO: there are more tags we can read. - self.title = title - self.ns = ns - self.id = id - self.text = text - self.username = username.strip() - self.ipedit = ipedit - self.timestamp = timestamp - self.editRestriction = editRestriction - self.moveRestriction = moveRestriction - self.revisionid = revisionid - self.comment = comment - self.isredirect = redirect + # TODO: there are more tags we can read. + title: str + ns: str + id: str + text: str + username: str + ipedit: bool + timestamp: str + editRestriction: str # noqa: N815 + moveRestriction: str # noqa: N815 + revisionid: str + comment: str + isredirect: bool + + +class Headers(NamedTuple): + + """Represent the common info of a page.""" + + title: str + ns: str + pageid: str + isredirect: bool + edit_restriction: str + move_restriction: str + + +class RawRev(NamedTuple): + + """Represent a raw revision.""" + + headers: Headers + revision: Element + revid: int
class XmlDump: @@ -119,12 +121,15 @@ Default: `first_found` """
- def __init__(self, filename, *, - allrevisions: bool | str = None, - # when allrevisions removed, revisions can default to 'latest' - revisions: str = 'first_found', - on_error: None | ( - Callable[[type[BaseException]], None]) = None) -> None: + def __init__( + self, + filename, + *, + allrevisions: bool | str = None, + # when allrevisions removed, revisions can default to 'latest' + revisions: str = 'first_found', + on_error: Callable[[ParseError], None] | None = None, + ) -> None: """Initializer.""" self.filename = filename self.on_error = on_error @@ -154,9 +159,10 @@ actions = str(list(self.rev_actions.keys())).strip('[]') raise ValueError(f"'revisions' must be one of {actions}.")
- self._parse = self.rev_actions.get(revisions) + self._parse = self.rev_actions[revisions] + self.uri = None
- def parse(self): + def parse(self) -> Iterator[XmlEntry]: """Generator using ElementTree iterparse function.
.. versionchanged:: 7.2 @@ -165,7 +171,8 @@ """ with open_archive(self.filename) as source: context = iterparse(source, events=('start', 'end', 'start-ns')) - self.root = None + root = None + while True: try: event, elem = next(context) @@ -178,98 +185,144 @@ raise
if event == 'start-ns' and elem[0] == '': - self.uri = elem[1] + self.uri = f'{{{elem[1]}}}' continue - if event == 'start' and self.root is None: - self.root = elem - continue - yield from self._parse(event, elem)
- def _parse_only_first_found(self, event, elem): + # get the root element + if event == 'start' and root is None: + root = elem + + if not (event == 'end' and elem.tag == f'{self.uri}page'): + continue + + yield from self._parse(elem) + + # clear references in the root, to allow garbage collection. + elem.clear() + root.clear() + + def _parse_only_first_found(self, elem: Element) -> Iterator[XmlEntry]: """ Deprecated parser that yields the first revision found.
Documentation had wrongly indicated it returned the latest revision. :phab: `T340804` """ - if event == 'end' and elem.tag == f'{{{self.uri}}}page': - self._headers(elem) - revision = elem.find(f'{{{self.uri}}}revision') - yield self._create_revision(revision) - elem.clear() - self.root.clear() + raw_revs = self._fetch_revs(elem) + try: + raw_rev = next(raw_revs) + yield self._create_revision(raw_rev.headers, raw_rev.revision) + except StopIteration: + return
- def _parse_only_latest(self, event, elem): + def _parse_only_latest(self, elem: Element) -> Iterator[XmlEntry]: """Parser that yields only the latest revision.""" - if event == 'end' and elem.tag == f'{{{self.uri}}}page': - self._headers(elem) - latest_revision = None - latest_revisionid = 0 - for revision in elem.findall(f'{{{self.uri}}}revision'): - revisionid = int(revision.findtext(f'{{{self.uri}}}id')) - if latest_revision is None or revisionid > latest_revisionid: - latest_revision = revision - latest_revisionid = revisionid - if latest_revision is not None: - yield self._create_revision(latest_revision) - elem.clear() - self.root.clear() + raw_revs = self._fetch_revs(elem, with_id=True) + raw_rev = max(raw_revs, default=None, key=lambda rev: rev.revid) + if raw_rev is not None: + yield self._create_revision(raw_rev.headers, raw_rev.revision)
- def _parse_only_earliest(self, event, elem): + def _parse_only_earliest(self, elem: Element) -> Iterator[XmlEntry]: """Parser that yields only the earliest revision.""" - if event == 'end' and elem.tag == f'{{{self.uri}}}page': - self._headers(elem) - earliest_revision = None - earliest_revisionid = float('inf') # Initialize positive infinity - for revision in elem.findall(f'{{{self.uri}}}revision'): - revisionid = int(revision.findtext(f'{{{self.uri}}}id')) - if revisionid < earliest_revisionid: - earliest_revisionid = revisionid - earliest_revision = revision - if earliest_revision is not None: - yield self._create_revision(earliest_revision) - elem.clear() - self.root.clear() + raw_revs = self._fetch_revs(elem, with_id=True) + raw_rev = min(raw_revs, default=None, key=lambda rev: rev.revid) + if raw_rev is not None: + yield self._create_revision(raw_rev.headers, raw_rev.revision)
- def _parse_all(self, event, elem): + def _parse_all(self, elem: Element) -> Iterator[XmlEntry]: """Parser that yields all revisions.""" - if event == 'end' and elem.tag == f'{{{self.uri}}}page': - self._headers(elem) - for revision in elem.findall(f'{{{self.uri}}}revision'): - yield self._create_revision(revision) - elem.clear() - self.root.clear() + raw_revs = self._fetch_revs(elem) + for raw_rev in raw_revs: + yield self._create_revision(raw_rev.headers, raw_rev.revision)
- def _headers(self, elem) -> None: + def _fetch_revs(self, elem: Element, with_id=False) -> Iterator[RawRev]: + """Yield all revisions in a page.""" + uri = self.uri + headers = self._headers(elem) + for revision in elem.findall(f'{uri}revision'): + if with_id: + revid = int(revision.findtext(f'{uri}id')) + else: + revid = 0 + yield RawRev(headers, revision, revid) + + @staticmethod + def parse_restrictions(restrictions: str) -> tuple[str | None, str | None]: + """ + Parse the characters within a restrictions tag. + + Returns strings representing user groups allowed to edit and + to move a page, where None means there are no restrictions. + """ + if not restrictions: + return None, None + + edit_restriction, move_restriction = None, None + + edit_lock_match = re.search('edit=([^:]*)', restrictions) + if edit_lock_match: + edit_restriction = edit_lock_match[1] + + move_lock_match = re.search('move=([^:]*)', restrictions) + if move_lock_match: + move_restriction = move_lock_match[1] + + if restrictions == 'sysop': + edit_restriction = 'sysop' + move_restriction = 'sysop' + + return edit_restriction, move_restriction + + def _headers(self, elem: Element) -> Headers: """Extract headers from XML chunk.""" - self.title = elem.findtext(f'{{{self.uri}}}title') - self.ns = elem.findtext(f'{{{self.uri}}}ns') - self.pageid = elem.findtext(f'{{{self.uri}}}id') - self.restrictions = elem.findtext(f'{{{self.uri}}}restrictions') - self.isredirect = elem.findtext(f'{{{self.uri}}}redirect') is not None - self.editRestriction, self.moveRestriction = parseRestrictions( - self.restrictions) + uri = self.uri + edit_restriction, move_restriction = self.parse_restrictions( + elem.findtext(f'{uri}restrictions') + )
- def _create_revision(self, revision): - """Create a single revision.""" - revisionid = revision.findtext(f'{{{self.uri}}}id') - timestamp = revision.findtext(f'{{{self.uri}}}timestamp') - comment = revision.findtext(f'{{{self.uri}}}comment') - contributor = revision.find(f'{{{self.uri}}}contributor') - ipeditor = contributor.findtext(f'{{{self.uri}}}ip') - username = ipeditor or contributor.findtext(f'{{{self.uri}}}username') - # could get comment, minor as well - text = revision.findtext(f'{{{self.uri}}}text') - return XmlEntry(title=self.title, - ns=self.ns, - id=self.pageid, - text=text or '', - username=username or '', # username might be deleted - ipedit=bool(ipeditor), - timestamp=timestamp, - editRestriction=self.editRestriction, - moveRestriction=self.moveRestriction, - revisionid=revisionid, - comment=comment, - redirect=self.isredirect - ) + headers = Headers( + title=elem.findtext(f'{uri}title'), + ns=elem.findtext(f'{uri}ns'), + pageid=elem.findtext(f'{uri}id'), + isredirect=elem.findtext(f'{uri}redirect') is not None, + edit_restriction=edit_restriction, + move_restriction=move_restriction, + ) + + return headers + + def _create_revision( + self, headers: Headers, revision: Element + ) -> XmlEntry: + """Create a Single revision.""" + uri = self.uri + contributor = revision.find(f'{uri}contributor') + ip_editor = contributor.findtext(f'{uri}ip') + username = ip_editor or contributor.findtext(f'{uri}username') + username = username or '' # username might be deleted + + xml_entry = XmlEntry( + title=headers.title, + ns=headers.ns, + id=headers.pageid, + editRestriction=headers.edit_restriction, + moveRestriction=headers.move_restriction, + isredirect=headers.isredirect, + text=revision.findtext(f'{uri}text'), + username=username, + ipedit=bool(ip_editor), + timestamp=revision.findtext(f'{uri}timestamp'), + revisionid=revision.findtext(f'{uri}id'), + comment=revision.findtext(f'{uri}comment'), + # could get comment, minor as well + ) + + return xml_entry + + +wrapper = ModuleDeprecationWrapper(__name__) +wrapper.add_deprecated_attr( + 'parseRestrictions', + XmlDump.parse_restrictions, + replacement_name='pywikibot.xmlreader.XmlDump.parseRestrictions', + since='9.0.0')
pywikibot-commits@lists.wikimedia.org