jenkins-bot submitted this change.

View Change


Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
[IMPR] Refactor xmlreader

1) remove state from XmlDump class
2) keep common logic in one place
3) move parse_restrictions() inside XmlDump class
4) since Elements are analized in 'page' blocks, keep
event logic and memory cleanup in one place.
5) simplify XmlEntry() code
6) add type hints

Change-Id: I8f67f0cdb360fc548f6dc36e63621d270afdd7c5
---
M pywikibot/xmlreader.py
1 file changed, 199 insertions(+), 129 deletions(-)

diff --git a/pywikibot/xmlreader.py b/pywikibot/xmlreader.py
index ea675a6..fa5f53b 100644
--- a/pywikibot/xmlreader.py
+++ b/pywikibot/xmlreader.py
@@ -18,6 +18,9 @@
from __future__ import annotations

import re
+from dataclasses import dataclass
+from typing import NamedTuple
+from xml.etree.ElementTree import Element


try:
@@ -25,54 +28,53 @@
except ImportError:
from xml.etree.ElementTree import iterparse, ParseError

-from pywikibot.backports import Callable
-from pywikibot.tools import issue_deprecation_warning, open_archive
+from pywikibot.backports import Callable, Iterator
+from pywikibot.tools import (
+ ModuleDeprecationWrapper,
+ issue_deprecation_warning,
+ open_archive,
+)


-def parseRestrictions(restrictions):
- """
- Parse the characters within a restrictions tag.
-
- Returns strings representing user groups allowed to edit and
- to move a page, where None means there are no restrictions.
- """
- if not restrictions:
- return None, None
- editRestriction = None
- moveRestriction = None
- editLockMatch = re.search('edit=([^:]*)', restrictions)
- if editLockMatch:
- editRestriction = editLockMatch[1]
- moveLockMatch = re.search('move=([^:]*)', restrictions)
- if moveLockMatch:
- moveRestriction = moveLockMatch[1]
- if restrictions == 'sysop':
- editRestriction = 'sysop'
- moveRestriction = 'sysop'
- return editRestriction, moveRestriction
-
-
+@dataclass
class XmlEntry:

"""Represent a page."""

- def __init__(self, title, ns, id, text, username, ipedit, timestamp,
- editRestriction, moveRestriction, revisionid, comment,
- redirect) -> None:
- """Initializer."""
- # TODO: there are more tags we can read.
- self.title = title
- self.ns = ns
- self.id = id
- self.text = text
- self.username = username.strip()
- self.ipedit = ipedit
- self.timestamp = timestamp
- self.editRestriction = editRestriction
- self.moveRestriction = moveRestriction
- self.revisionid = revisionid
- self.comment = comment
- self.isredirect = redirect
+ # TODO: there are more tags we can read.
+ title: str
+ ns: str
+ id: str
+ text: str
+ username: str
+ ipedit: bool
+ timestamp: str
+ editRestriction: str # noqa: N815
+ moveRestriction: str # noqa: N815
+ revisionid: str
+ comment: str
+ isredirect: bool
+
+
+class Headers(NamedTuple):
+
+ """Represent the common info of a page."""
+
+ title: str
+ ns: str
+ pageid: str
+ isredirect: bool
+ edit_restriction: str
+ move_restriction: str
+
+
+class RawRev(NamedTuple):
+
+ """Represent a raw revision."""
+
+ headers: Headers
+ revision: Element
+ revid: int


class XmlDump:
@@ -119,12 +121,15 @@
Default: `first_found`
"""

- def __init__(self, filename, *,
- allrevisions: bool | str = None,
- # when allrevisions removed, revisions can default to 'latest'
- revisions: str = 'first_found',
- on_error: None | (
- Callable[[type[BaseException]], None]) = None) -> None:
+ def __init__(
+ self,
+ filename,
+ *,
+ allrevisions: bool | str = None,
+ # when allrevisions removed, revisions can default to 'latest'
+ revisions: str = 'first_found',
+ on_error: Callable[[ParseError], None] | None = None,
+ ) -> None:
"""Initializer."""
self.filename = filename
self.on_error = on_error
@@ -154,9 +159,10 @@
actions = str(list(self.rev_actions.keys())).strip('[]')
raise ValueError(f"'revisions' must be one of {actions}.")

- self._parse = self.rev_actions.get(revisions)
+ self._parse = self.rev_actions[revisions]
+ self.uri = None

- def parse(self):
+ def parse(self) -> Iterator[XmlEntry]:
"""Generator using ElementTree iterparse function.

.. versionchanged:: 7.2
@@ -165,7 +171,8 @@
"""
with open_archive(self.filename) as source:
context = iterparse(source, events=('start', 'end', 'start-ns'))
- self.root = None
+ root = None
+
while True:
try:
event, elem = next(context)
@@ -178,98 +185,144 @@
raise

if event == 'start-ns' and elem[0] == '':
- self.uri = elem[1]
+ self.uri = f'{{{elem[1]}}}'
continue
- if event == 'start' and self.root is None:
- self.root = elem
- continue
- yield from self._parse(event, elem)

- def _parse_only_first_found(self, event, elem):
+ # get the root element
+ if event == 'start' and root is None:
+ root = elem
+
+ if not (event == 'end' and elem.tag == f'{self.uri}page'):
+ continue
+
+ yield from self._parse(elem)
+
+ # clear references in the root, to allow garbage collection.
+ elem.clear()
+ root.clear()
+
+ def _parse_only_first_found(self, elem: Element) -> Iterator[XmlEntry]:
"""
Deprecated parser that yields the first revision found.

Documentation had wrongly indicated it returned the latest revision.
:phab: `T340804`
"""
- if event == 'end' and elem.tag == f'{{{self.uri}}}page':
- self._headers(elem)
- revision = elem.find(f'{{{self.uri}}}revision')
- yield self._create_revision(revision)
- elem.clear()
- self.root.clear()
+ raw_revs = self._fetch_revs(elem)
+ try:
+ raw_rev = next(raw_revs)
+ yield self._create_revision(raw_rev.headers, raw_rev.revision)
+ except StopIteration:
+ return

- def _parse_only_latest(self, event, elem):
+ def _parse_only_latest(self, elem: Element) -> Iterator[XmlEntry]:
"""Parser that yields only the latest revision."""
- if event == 'end' and elem.tag == f'{{{self.uri}}}page':
- self._headers(elem)
- latest_revision = None
- latest_revisionid = 0
- for revision in elem.findall(f'{{{self.uri}}}revision'):
- revisionid = int(revision.findtext(f'{{{self.uri}}}id'))
- if latest_revision is None or revisionid > latest_revisionid:
- latest_revision = revision
- latest_revisionid = revisionid
- if latest_revision is not None:
- yield self._create_revision(latest_revision)
- elem.clear()
- self.root.clear()
+ raw_revs = self._fetch_revs(elem, with_id=True)
+ raw_rev = max(raw_revs, default=None, key=lambda rev: rev.revid)
+ if raw_rev is not None:
+ yield self._create_revision(raw_rev.headers, raw_rev.revision)

- def _parse_only_earliest(self, event, elem):
+ def _parse_only_earliest(self, elem: Element) -> Iterator[XmlEntry]:
"""Parser that yields only the earliest revision."""
- if event == 'end' and elem.tag == f'{{{self.uri}}}page':
- self._headers(elem)
- earliest_revision = None
- earliest_revisionid = float('inf') # Initialize positive infinity
- for revision in elem.findall(f'{{{self.uri}}}revision'):
- revisionid = int(revision.findtext(f'{{{self.uri}}}id'))
- if revisionid < earliest_revisionid:
- earliest_revisionid = revisionid
- earliest_revision = revision
- if earliest_revision is not None:
- yield self._create_revision(earliest_revision)
- elem.clear()
- self.root.clear()
+ raw_revs = self._fetch_revs(elem, with_id=True)
+ raw_rev = min(raw_revs, default=None, key=lambda rev: rev.revid)
+ if raw_rev is not None:
+ yield self._create_revision(raw_rev.headers, raw_rev.revision)

- def _parse_all(self, event, elem):
+ def _parse_all(self, elem: Element) -> Iterator[XmlEntry]:
"""Parser that yields all revisions."""
- if event == 'end' and elem.tag == f'{{{self.uri}}}page':
- self._headers(elem)
- for revision in elem.findall(f'{{{self.uri}}}revision'):
- yield self._create_revision(revision)
- elem.clear()
- self.root.clear()
+ raw_revs = self._fetch_revs(elem)
+ for raw_rev in raw_revs:
+ yield self._create_revision(raw_rev.headers, raw_rev.revision)

- def _headers(self, elem) -> None:
+ def _fetch_revs(self, elem: Element, with_id=False) -> Iterator[RawRev]:
+ """Yield all revisions in a page."""
+ uri = self.uri
+ headers = self._headers(elem)
+ for revision in elem.findall(f'{uri}revision'):
+ if with_id:
+ revid = int(revision.findtext(f'{uri}id'))
+ else:
+ revid = 0
+ yield RawRev(headers, revision, revid)
+
+ @staticmethod
+ def parse_restrictions(restrictions: str) -> tuple[str | None, str | None]:
+ """
+ Parse the characters within a restrictions tag.
+
+ Returns strings representing user groups allowed to edit and
+ to move a page, where None means there are no restrictions.
+ """
+ if not restrictions:
+ return None, None
+
+ edit_restriction, move_restriction = None, None
+
+ edit_lock_match = re.search('edit=([^:]*)', restrictions)
+ if edit_lock_match:
+ edit_restriction = edit_lock_match[1]
+
+ move_lock_match = re.search('move=([^:]*)', restrictions)
+ if move_lock_match:
+ move_restriction = move_lock_match[1]
+
+ if restrictions == 'sysop':
+ edit_restriction = 'sysop'
+ move_restriction = 'sysop'
+
+ return edit_restriction, move_restriction
+
+ def _headers(self, elem: Element) -> Headers:
"""Extract headers from XML chunk."""
- self.title = elem.findtext(f'{{{self.uri}}}title')
- self.ns = elem.findtext(f'{{{self.uri}}}ns')
- self.pageid = elem.findtext(f'{{{self.uri}}}id')
- self.restrictions = elem.findtext(f'{{{self.uri}}}restrictions')
- self.isredirect = elem.findtext(f'{{{self.uri}}}redirect') is not None
- self.editRestriction, self.moveRestriction = parseRestrictions(
- self.restrictions)
+ uri = self.uri
+ edit_restriction, move_restriction = self.parse_restrictions(
+ elem.findtext(f'{uri}restrictions')
+ )

- def _create_revision(self, revision):
- """Create a single revision."""
- revisionid = revision.findtext(f'{{{self.uri}}}id')
- timestamp = revision.findtext(f'{{{self.uri}}}timestamp')
- comment = revision.findtext(f'{{{self.uri}}}comment')
- contributor = revision.find(f'{{{self.uri}}}contributor')
- ipeditor = contributor.findtext(f'{{{self.uri}}}ip')
- username = ipeditor or contributor.findtext(f'{{{self.uri}}}username')
- # could get comment, minor as well
- text = revision.findtext(f'{{{self.uri}}}text')
- return XmlEntry(title=self.title,
- ns=self.ns,
- id=self.pageid,
- text=text or '',
- username=username or '', # username might be deleted
- ipedit=bool(ipeditor),
- timestamp=timestamp,
- editRestriction=self.editRestriction,
- moveRestriction=self.moveRestriction,
- revisionid=revisionid,
- comment=comment,
- redirect=self.isredirect
- )
+ headers = Headers(
+ title=elem.findtext(f'{uri}title'),
+ ns=elem.findtext(f'{uri}ns'),
+ pageid=elem.findtext(f'{uri}id'),
+ isredirect=elem.findtext(f'{uri}redirect') is not None,
+ edit_restriction=edit_restriction,
+ move_restriction=move_restriction,
+ )
+
+ return headers
+
+ def _create_revision(
+ self, headers: Headers, revision: Element
+ ) -> XmlEntry:
+ """Create a Single revision."""
+ uri = self.uri
+ contributor = revision.find(f'{uri}contributor')
+ ip_editor = contributor.findtext(f'{uri}ip')
+ username = ip_editor or contributor.findtext(f'{uri}username')
+ username = username or '' # username might be deleted
+
+ xml_entry = XmlEntry(
+ title=headers.title,
+ ns=headers.ns,
+ id=headers.pageid,
+ editRestriction=headers.edit_restriction,
+ moveRestriction=headers.move_restriction,
+ isredirect=headers.isredirect,
+ text=revision.findtext(f'{uri}text'),
+ username=username,
+ ipedit=bool(ip_editor),
+ timestamp=revision.findtext(f'{uri}timestamp'),
+ revisionid=revision.findtext(f'{uri}id'),
+ comment=revision.findtext(f'{uri}comment'),
+ # could get comment, minor as well
+ )
+
+ return xml_entry
+
+
+wrapper = ModuleDeprecationWrapper(__name__)
+wrapper.add_deprecated_attr(
+ 'parseRestrictions',
+ XmlDump.parse_restrictions,
+ replacement_name='pywikibot.xmlreader.XmlDump.parseRestrictions',
+ since='9.0.0')

To view, visit change 986614. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I8f67f0cdb360fc548f6dc36e63621d270afdd7c5
Gerrit-Change-Number: 986614
Gerrit-PatchSet: 12
Gerrit-Owner: Mpaa <mpaa.wiki@gmail.com>
Gerrit-Reviewer: Xqt <info@gno.de>
Gerrit-Reviewer: jenkins-bot
Gerrit-MessageType: merged