jenkins-bot merged this change.

View Change

Approvals: Framawiki: Looks good to me, approved jenkins-bot: Verified
[IMPR] Enable multiple streams for EventStreams

- Wikimedia supports multiple streams, see https://stream.wikimedia.org/?doc
- Multiple streams may be composed,
see https://wikitech.wikimedia.org/wiki/EventStreams#Stream_selection
- rename "stream" with "streams" to indicate multiple streams may be
composed
- allow streams as a string with comma separated stream types or an
iterable of strings. Combine the iterable to a string internally.
- add tests for multiple streams
- update docs

Bug: T205114
Change-Id: Ia1805ad6070920821d679ddf46080c5c0026ae9d
---
M pywikibot/comms/eventstreams.py
M tests/eventstreams_tests.py
2 files changed, 48 insertions(+), 25 deletions(-)

diff --git a/pywikibot/comms/eventstreams.py b/pywikibot/comms/eventstreams.py
index b8ae34f..7824e5b 100644
--- a/pywikibot/comms/eventstreams.py
+++ b/pywikibot/comms/eventstreams.py
@@ -30,7 +30,7 @@
EventSource = e

from pywikibot import config, debug, Site, warning
-from pywikibot.tools import StringTypes
+from pywikibot.tools import deprecated_args, StringTypes

# requests >= 2.9 is required for eventstreams (T184713)
if LooseVersion(requests.__version__) < LooseVersion('2.9'):
@@ -75,21 +75,27 @@
>>> del stream
"""

+ @deprecated_args(stream='streams')
def __init__(self, **kwargs):
"""Initializer.

@keyword site: a project site object. Used when no url is given
@type site: APISite
- @keyword stream: event stream type. Used when no url is given.
- @type stream: str
+ @keyword streams: event stream types. Mandatory when no url is given.
+ Multiple streams may be given as a string with comma separated
+ stream types or an iterable of strings
+ Refer https://stream.wikimedia.org/?doc for available
+ wikimedia stream types.
+ @type streams: str or iterable
@keyword timeout: a timeout value indication how long to wait to send
data before giving up
@type timeout: int, float or a tuple of two values of int or float
@keyword url: an url retrieving events from. Will be set up to a
- default url using _site.family settings and streamtype
+ default url using _site.family settings and stream types
@type url: str
@param kwargs: keyword arguments passed to SSEClient and requests lib
@raises ImportError: sseclient is not installed
+ @raises NotImplementedError: no stream types specified
"""
if isinstance(EventSource, Exception):
raise ImportError('sseclient is required for EventStreams;\n'
@@ -97,7 +103,9 @@
self.filter = {'all': [], 'any': [], 'none': []}
self._total = None
self._site = kwargs.pop('site', Site())
- self._stream = kwargs.pop('stream', None)
+ self._streams = kwargs.pop('streams', None)
+ if self._streams and not isinstance(self._streams, StringTypes):
+ self._streams = ','.join(self._streams)
self._url = kwargs.get('url') or self.url
kwargs.setdefault('url', self._url)
kwargs.setdefault('timeout', config.socket_timeout)
@@ -108,8 +116,8 @@
kwargs = self.sse_kwargs.copy()
if self._site != Site():
kwargs['site'] = self._site
- if self._stream:
- kwargs['stream'] = self._stream
+ if self._streams:
+ kwargs['streams'] = self._streams
kwargs.pop('url')
if kwargs['timeout'] == config.socket_timeout:
kwargs.pop('timeout')
@@ -120,16 +128,16 @@
def url(self):
"""Get the EventStream's url.

- @raises NotImplementedError: streamtype is not specified
+ @raises NotImplementedError: no stream types specified
"""
if not hasattr(self, '_url'):
- if self._stream is None:
+ if self._streams is None:
raise NotImplementedError(
- 'No stream specified for class {0}'
+ 'No streams specified for class {0}'
.format(self.__class__.__name__))
self._url = ('{0}{1}/{2}'.format(self._site.eventstreams_host(),
self._site.eventstreams_path(),
- self._stream))
+ self._streams))
return self._url

def set_maximum_items(self, value):
diff --git a/tests/eventstreams_tests.py b/tests/eventstreams_tests.py
index 5f903e8..5e677b8 100644
--- a/tests/eventstreams_tests.py
+++ b/tests/eventstreams_tests.py
@@ -41,28 +41,29 @@
self.assertEqual(e._url, e.url)
self.assertEqual(e._url, e.sse_kwargs.get('url'))
self.assertIsNone(e._total)
- self.assertIsNone(e._stream)
+ self.assertIsNone(e._streams)

def test_url_from_site(self, key):
"""Test EventStreams with url from site."""
site = self.get_site(key)
- stream = 'recentchange'
- e = EventStreams(site=site, stream=stream)
+ streams = 'recentchange'
+ e = EventStreams(site=site, streams=streams)
self.assertEqual(
- e._url, 'https://stream.wikimedia.org/v2/stream/' + stream)
+ e._url, 'https://stream.wikimedia.org/v2/stream/' + streams)
self.assertEqual(e._url, e.url)
self.assertEqual(e._url, e.sse_kwargs.get('url'))
self.assertIsNone(e._total)
- self.assertEqual(e._stream, stream)
+ self.assertEqual(e._streams, streams)


@mock.patch('pywikibot.comms.eventstreams.EventSource', new=mock.MagicMock())
-class TestEventStreamsStreamTests(DefaultSiteTestCase):
+class TestEventStreamsStreamsTests(DefaultSiteTestCase):

"""Stream tests for eventstreams module."""

- def test_url_with_stream(self):
- """Test EventStreams with url from default site."""
+ def setUp(self):
+ """Setup tests."""
+ super(TestEventStreamsStreamsTests, self).setUp()
site = self.get_site()
fam = site.family
if not isinstance(fam, WikimediaFamily):
@@ -70,17 +71,31 @@
"Family '{0}' of site '{1}' is not a WikimediaFamily."
.format(fam, site))

- stream = 'recentchange'
- e = EventStreams(stream=stream)
+ def test_url_with_streams(self):
+ """Test EventStreams with url from default site."""
+ streams = 'recentchange'
+ e = EventStreams(streams=streams)
self.assertEqual(
- e._url, 'https://stream.wikimedia.org/v2/stream/' + stream)
+ e._url, 'https://stream.wikimedia.org/v2/stream/' + streams)
self.assertEqual(e._url, e.url)
self.assertEqual(e._url, e.sse_kwargs.get('url'))
self.assertIsNone(e._total)
- self.assertEqual(e._stream, stream)
+ self.assertEqual(e._streams, streams)

- def test_url_missing_stream(self):
- """Test EventStreams with url from site with missing stream."""
+ def test_multiple_streams(self):
+ """Test EventStreams with multiple streams."""
+ streams = ('page-create', 'page-move', 'page-delete')
+ e = EventStreams(streams=streams)
+ combined_streams = ','.join(streams)
+ self.assertEqual(
+ e._url,
+ 'https://stream.wikimedia.org/v2/stream/' + combined_streams)
+ self.assertEqual(e._url, e.url)
+ self.assertEqual(e._url, e.sse_kwargs.get('url'))
+ self.assertEqual(e._streams, combined_streams)
+
+ def test_url_missing_streams(self):
+ """Test EventStreams with url from site with missing streams."""
with self.assertRaises(NotImplementedError):
EventStreams()


To view, visit change 461986. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: Ia1805ad6070920821d679ddf46080c5c0026ae9d
Gerrit-Change-Number: 461986
Gerrit-PatchSet: 5
Gerrit-Owner: Xqt <info@gno.de>
Gerrit-Reviewer: Framawiki <framawiki@tools.wmflabs.org>
Gerrit-Reviewer: John Vandenberg <jayvdb@gmail.com>
Gerrit-Reviewer: Ottomata <aotto@wikimedia.org>
Gerrit-Reviewer: jenkins-bot (75)