jenkins-bot has submitted this change and it was merged.
Change subject: Add Live RC Pages generator ......................................................................
Add Live RC Pages generator
Requires socketIO-python; will raise an Exception if the module is not present.
Bug: T66828 Change-Id: I9e4679430942855763b969495a72d6bba1138637 --- A pywikibot/comms/rcstream.py M pywikibot/family.py M pywikibot/pagegenerators.py M setup.py M tests/pagegenerators_tests.py M tox.ini 6 files changed, 277 insertions(+), 3 deletions(-)
Approvals: John Vandenberg: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/comms/rcstream.py b/pywikibot/comms/rcstream.py new file mode 100644 index 0000000..f3282ac --- /dev/null +++ b/pywikibot/comms/rcstream.py @@ -0,0 +1,198 @@ +""" +SocketIO-based rcstream client. + +(c) 2014 Merlijn van Deen + +This file is part of the Pywikibot framework, and is licensed under the MIT license. + +This module requires socketIO_client to be installed: + pip install socketIO_client +""" + +import sys +import threading + +if sys.version_info[0] > 2: + from queue import Queue, Empty +else: + from Queue import Queue, Empty + +from pywikibot.bot import debug, warning + +_logger = 'pywikibot.rcstream' + + +class RcListenerThread(threading.Thread): + + """ + Low-level RC Listener Thread, which reads the RC stream and pushes them to it's internal queue. + + @param wikihost: the hostname of the wiki we want to get changes for. This + is passed to rcstream using a 'subscribe' command. Pass + '*' to listen to all wikis for a given rc host. + @param rchost: the recent changes stream host to connect to. For Wikimedia + wikis, this is 'stream.wikimedia.org' + @param rcport: the port to connect to (default: 80) + @param rcpath: the sockets.io path. For Wikimedia wikis, this is '/rc'. + (default: '/rc') + @param total: the maximum number of entries to return. The underlying + thread is shut down then this number is reached. + + This part of the rc listener runs in a Thread. It makes the actual + socketIO/websockets connection to the rc stream server, subscribes + to a single site and pushes those entries into a queue. + + Usage: + >>> t = RcListenerThread('en.wikipedia.org', 'stream.wikimedia.org') + >>> t.start() + >>> change = t.queue.get() + >>> change + {'server_name': 'en.wikipedia.org', 'wiki': 'enwiki', 'minor': True, + 'length': {'new': 2038, 'old': 2009}, 'timestamp': 1419964350, + 'server_script_path': '/w', 'bot': False, 'user': 'Od Mishehu', + 'comment': 'stub sorting', 'title': 'Bradwell Bay Wilderness', + 'server_url': 'http://en.wikipedia.org', 'id': 703158386, + 'revision': {'new': 640271171, 'old': 468264850}, + 'type': 'edit', 'namespace': 0} + >>> t.stop() # optional, the thread will shut down on exiting python as well + """ + + def __init__(self, wikihost, rchost, rcport=80, rcpath='/rc', total=None): + """ Constructor for RcListenerThread. """ + super(RcListenerThread, self).__init__() + self.rchost = rchost + self.rcport = rcport + self.rcpath = rcpath + self.wikihost = wikihost + + self.daemon = True + self.running = False + self.queue = Queue() + + self.warn_queue_length = 100 + + self.total = total + self.count = 0 + + import socketIO_client + debug('Opening connection to %r' % self, _logger) + self.client = socketIO_client.SocketIO(rchost, rcport) + + thread = self + + class RCListener(socketIO_client.BaseNamespace): + def on_change(self, change): + debug('Received change %r' % change, _logger) + + thread.count += 1 + if thread.total is not None and thread.count > thread.total: + thread.stop() + return + + thread.queue.put(change) + if thread.queue.qsize() > thread.warn_queue_length: + warning('%r queue length exceeded %i' + % (thread, + thread.warn_queue_length), + _logger=_logger) + thread.warn_queue_length = thread.warn_queue_length + 100 + + def on_connect(self): + debug('Connected to %r; subscribing to %s' + % (thread, thread.wikihost), + _logger) + self.emit('subscribe', thread.wikihost) + debug('Subscribed to %s' % thread.wikihost, _logger) + + self.client.define(RCListener, rcpath) + + def __repr__(self): + """ Return representation. """ + return "<rcstream for socketio://%s@%s:%s%s>" % ( + self.wikihost, self.rchost, self.rcport, self.rcpath + ) + + def run(self): + """ Threaded function. Runs insided the thread when started with .start(). """ + self.running = True + while self.running: + self.client.wait(seconds=0.1) + debug('Shut down event loop for %r' % self, _logger) + self.client.disconnect() + debug('Disconnected %r' % self, _logger) + self.queue.put(None) + + def stop(self): + """ Stop the thread. """ + self.running = False + + +def rc_listener(wikihost, rchost, rcport=80, rcpath='/rc', total=None): + """ RC Changes Generator. Yields changes received from RCstream. + + @param wikihost: the hostname of the wiki we want to get changes for. This + is passed to rcstream using a 'subscribe' command. Pass + '*' to listen to all wikis for a given rc host. + @param rchost: the recent changes stream host to connect to. For Wikimedia + wikis, this is 'stream.wikimedia.org' + @param rcport: the port to connect to (default: 80) + @param rcpath: the sockets.io path. For Wikimedia wikis, this is '/rc'. + (default: '/rc') + @param total: the maximum number of entries to return. The underlying thread + is shut down then this number is reached. + + @yields dict: dict as formatted by MediaWiki's MachineReadableRCFeedFormatter[1], + which consists of at least id (recent changes id), type ('edit', + 'new', 'log' or 'external'), namespace, title, comment, timestamp, + user and bot (bot flag for the change). See [1] for more details. + + @raises ImportError + + [1] https://github.com/wikimedia/mediawiki/blob/master/includes/rcfeed/MachineRe... + + """ + try: + # this is just to test whether socketIO_client is installed or not, + # as the ImportError would otherwise pop up in the worker thread + # where it's not easily caught. We don't use it, so we silence + # flake8 with noqa. + import socketIO_client # noqa + except ImportError: + raise ImportError('socketIO_client is required for the rc stream; ' + 'install it with pip install socketIO_client') + + rc_thread = RcListenerThread( + wikihost=wikihost, + rchost=rchost, rcport=rcport, rcpath=rcpath, + total=total + ) + + debug('Starting rcstream thread %r' % rc_thread, + _logger) + rc_thread.start() + + while True: + try: + element = rc_thread.queue.get(timeout=0.1) + except Empty: + continue + if element is None: + return + yield element + + +def site_rc_listener(site, total=None): + """ RC Changes Generator. Yields changes received from RCstream. + + @param site: the Pywikibot.Site object to yield live recent changes for + @type site: Pywikibot.BaseSite + @param total: the maximum number of changes to return + @type total: int + + @returns pywikibot.comms.rcstream.rc_listener configured for the given site + """ + return rc_listener( + wikihost=site.hostname(), + rchost=site.rcstream_host(), + total=total, + ) diff --git a/pywikibot/family.py b/pywikibot/family.py index 412996e..bf04dbb 100644 --- a/pywikibot/family.py +++ b/pywikibot/family.py @@ -1025,6 +1025,9 @@ def nicepath(self, code): return '/wiki/'
+ def rcstream_host(self, code): + raise NotImplementedError("This family does not support RCStream") + def nice_get_address(self, code, title): return '%s%s' % (self.nicepath(code), title)
@@ -1226,6 +1229,9 @@ """Return 'https' as the protocol.""" return 'https'
+ def rcstream_host(self, code): + return 'stream.wikimedia.org' +
class AutoFamily(Family):
diff --git a/pywikibot/pagegenerators.py b/pywikibot/pagegenerators.py index d863e18..aeb2d15 100644 --- a/pywikibot/pagegenerators.py +++ b/pywikibot/pagegenerators.py @@ -130,6 +130,9 @@ -links Work on all pages that are linked from a certain page. Argument can also be given as "-links:linkingpagetitle".
+-liverecentchanges Work on pages from the live recent changes feed. If used as + -liverecentchanges:x, work on x recent changes. + -imagesused Work on all images that contained on a certain page. Argument can also be given as "-imagesused:linkingpagetitle".
@@ -491,6 +494,11 @@ total=60, site=self.site) gen = DuplicateFilterPageGenerator(gen) + elif arg.startswith('-liverecentchanges'): + if len(arg) >= 19: + gen = LiveRCPageGenerator(self.site, total=int(arg[19:])) + else: + gen = LiveRCPageGenerator(self.site) elif arg.startswith('-file'): textfilename = arg[6:] if not textfilename: @@ -1890,6 +1898,31 @@ yield pywikibot.Page(site, result)
+def LiveRCPageGenerator(site=None, total=None): + """ + Yield pages from a socket.io RC stream. + + Generates pages based on the socket.io recent changes stream. + The Page objects will have an extra property ._rcinfo containing the + literal rc data. This can be used to e.g. filter only new pages. See + `pywikibot.comms.rcstream.rc_listener` for details on the .rcinfo format. + + @param site: site to return recent changes for + @type site: pywikibot.BaseSite + @param total: the maximum number of changes to return + @type total: int + """ + if site is None: + site = pywikibot.Site() + + from pywikibot.comms.rcstream import site_rc_listener + + for entry in site_rc_listener(site, total=total): + page = pywikibot.Page(site, entry['title'], entry['namespace']) + page._rcinfo = entry + yield page + + # following classes just ported from version 1 without revision; not tested
diff --git a/setup.py b/setup.py index 951aa65..874c557 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,8 @@ 'Google': ['google'], 'IRC': ['irc'], 'mwparserfromhell': ['mwparserfromhell>=0.3.3'], - 'Tkinter': ['Pillow'] + 'Tkinter': ['Pillow'], + 'rcstream': ['socketIO-client'], }
if sys.version_info[0] == 2: diff --git a/tests/pagegenerators_tests.py b/tests/pagegenerators_tests.py index 39f5108..b0ee13e 100755 --- a/tests/pagegenerators_tests.py +++ b/tests/pagegenerators_tests.py @@ -25,6 +25,7 @@ TestCase, WikidataTestCase, DefaultSiteTestCase, + WikimediaDefaultSiteTestCase, ) from tests.thread_tests import GeneratorIntersectTestCase
@@ -603,6 +604,33 @@ ])
+class LiveRCPageGeneratorTestCase(WikimediaDefaultSiteTestCase): + + """ Test case for Live Recent Changes pagegenerator. + + Works best on a busy site, as three changes are requested + """ + + length = 3 + + def test_RC_pagegenerator_result(self): + site = self.get_site() + pagegenerator = pagegenerators.LiveRCPageGenerator(site, total=self.length) + entries = list(pagegenerator) + self.assertEqual(len(entries), self.length) + + testentry = entries[0] + self.assertEqual(testentry.site, site) + self.assertTrue(hasattr(testentry, '_rcinfo')) + + rcinfo = testentry._rcinfo + self.assertEqual(rcinfo['server_name'], site.hostname()) + self.assertEqual(rcinfo['wiki'], site.dbName()) + + for key in ["id", "type", "namespace", "title", "comment", "timestamp", + "user", "bot"]: + self.assertIn(key, rcinfo.keys()) + if __name__ == "__main__": try: unittest.main() diff --git a/tox.ini b/tox.ini index c040f75..d0f6ba2 100644 --- a/tox.ini +++ b/tox.ini @@ -3,6 +3,9 @@ skipsdist = True envlist = flake8,flake83,flake8-docstrings,py26,py27,py34
+[params] +nose_skip = --ignore-files=(gui.py|botirc.py|rcstream.py) + [testenv] setenv = VIRTUAL_ENV={envdir} usedevelop = True @@ -37,6 +40,7 @@ pywikibot/bot.py \ pywikibot/comms/__init__.py \ pywikibot/comms/http.py \ + pywikibot/comms/rcstream.py \ pywikibot/compat/ \ pywikibot/config2.py \ pywikibot/data/__init__.py \ @@ -103,7 +107,9 @@
[testenv:nose] setenv = PYWIKIBOT2_NO_USER_CONFIG=1 -commands = nosetests --with-doctest --with-doctest-ignore-unicode -v -a "!net" tests pywikibot --ignore-files=gui.py --ignore-files=botirc.py +commands = + nosetests --version + nosetests --with-doctest --with-doctest-ignore-unicode -v -a "!net" tests pywikibot {[params]nose_skip} deps = nose doctest-ignore-unicode @@ -111,7 +117,9 @@ [testenv:nose34] basepython = python3 setenv = PYWIKIBOT2_NO_USER_CONFIG=1 -commands = nosetests --with-doctest --with-doctest-ignore-unicode -v -a "!net" tests pywikibot --ignore-files=gui.py --ignore-files=botirc.py +commands = + nosetests --version + nosetests --with-doctest --with-doctest-ignore-unicode -v -a "!net" tests pywikibot {[params]nose_skip} deps = nose doctest-ignore-unicode