jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1046603?usp=email )
Change subject: Add superset pagegenerator ......................................................................
Add superset pagegenerator
Add support for SQL queries through https://superset.wmcloud.org and use results as pagegenerator.
Bug: T367684 Change-Id: I3d3b3e4cc09497e3806f9552f5e0bc81700cdcbf --- A pywikibot/data/superset.py M pywikibot/pagegenerators/__init__.py M pywikibot/pagegenerators/_factory.py M pywikibot/pagegenerators/_generators.py A tests/superset_tests.py 5 files changed, 570 insertions(+), 0 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/data/superset.py b/pywikibot/data/superset.py new file mode 100644 index 0000000..85aebc0 --- /dev/null +++ b/pywikibot/data/superset.py @@ -0,0 +1,275 @@ +"""Superset Query interface.""" +# +# (C) Pywikibot team, 2024 +# +# Distributed under the terms of the MIT license. +# +from textwrap import fill + +import pywikibot +from pywikibot.comms import http +from pywikibot.data import WaitingMixin +from pywikibot.exceptions import NoUsernameError, ServerError + + +class SupersetQuery(WaitingMixin): + """Superset Query class. + + This class allows to run SQL queries against wikimedia superset + service. + """ + + def __init__(self, schema_name=None, + site=None, database_id=None): + """ + Create superset endpoint with initial defaults. + + Either site OR schema_name is required. Site and schema_name are + mutually exclusive. Database id will be retrieved automatically + if needed. + + :param site: The mediawiki site to be queried + :type site: pywikibot.Site, optional + + :param schema_name: superset database schema name. + Example value "enwiki_p" + :type schema_name: str, optional + + :param database_id: superset database id. + :type database_id: int, optional + + :raises TypeError: if site and schema_name are both defined' + + """ + if site and schema_name: + msg = 'Only one of schema_name and site parameters can be defined' + raise TypeError(msg) + + # Validate database_id + if database_id: + if not isinstance(database_id, int): + msg = f'database_id should be integer, but got "{database_id}"' + raise TypeError(msg) + + self.site = site + self.schema_name = schema_name + self.database_id = database_id + + self.connected = False + self.last_response = None + self.superset_url = 'https://superset.wmcloud.org' + + def login(self): + """ + Login to superset. + + Function logins first to meta.wikimedia.org + and then OAUTH login to superset.wmcloud.org. + Working login expects that the user has manually + permitted the username to login to the superset. + + :raises NoUsernameError: if not not logged in. + :raises ServerError: For other errors + + :return: True if user has been logged to superset + :rtype bool + """ + # superset uses meta for OAUTH authentication + loginsite = pywikibot.Site('meta') + if not loginsite.logged_in(): + loginsite.login() + if not loginsite.logged_in(): + msg = 'User is not logged in on meta.wikimedia.org' + raise NoUsernameError(msg) + + # Superset oauth login + url = f'{self.superset_url}/login/mediawiki?next=' + self.last_response = http.fetch(url) + + # Test if uset has been succesfully logged in + url = f'{self.superset_url}/api/v1/me/' + self.last_response = http.fetch(url) + + # Handle error cases + if self.last_response.status_code == 200: + self.connected = True + elif self.last_response.status_code == 401: + self.connected = False + raise NoUsernameError(fill( + 'User not logged in. You need to log in to ' + 'meta.wikimedia.org and give OAUTH permission. ' + 'Open https://superset.wmcloud.org/login/ ' + 'with browser to login and give permission.' + )) + else: + self.connected = False + status_code = self.last_response.status_code + raise ServerError(f'Unknown error: {status_code}') + + return self.connected + + def get_csrf_token(self): + """ + Get superset CSRF token. + + Method retrieves a CSRF token from the Superset service. + If the instance is not connected, it attempts to log in first. + + :raises ServerError: For any http errors + :return CSRF token string + :rtype str + """ + if not self.connected: + self.login() + + # Load CSRF token + url = f'{self.superset_url}/api/v1/security/csrf_token/' + self.last_response = http.fetch(url) + + if self.last_response.status_code == 200: + return self.last_response.json()['result'] + else: + status_code = self.last_response.status_code + raise ServerError(f'CSRF token error: {status_code}') + + def get_database_id_by_schema_name(self, schema_name): + """ + Get superset database_id using superset schema name. + + :param schema_name: superset database schema name. + Example value "enwiki_p" + :type schema_name: str + + :raises KeyError: If the database ID could found. + :raises ServerError: For any other http errors + + :return: database id + :rtype: int + """ + if not self.connected: + self.login() + + for database_id in range(1, 20): + url = self.superset_url + url += f'/api/v1/database/{database_id}/schemas/?q=(force:!f)' + self.last_response = http.fetch(url) + + if self.last_response.status_code == 200: + schemas = self.last_response.json()['result'] + if schema_name in schemas: + return database_id + + elif self.last_response.status_code == 404: + break + else: + status_code = self.last_response.status_code + raise ServerError(f'Unknown error: {status_code}') + + url = self.superset_url + raise KeyError(f'Schema "{schema_name}" not found in {url}.') + + def merge_query_arguments(self, + database_id=None, + schema_name=None, + site=None): + """ + Determine and validate the database_id and schema_name. + + :param database_id: The superset database ID. + :type database_id: int, optional + + :param schema_name: The superset schema name. + :type schema_name: str, optional + + :param site: The target site + :type site: pywikibot.Site, optional + + :raises TypeError: if site and schema_name are both defined' + :raises TypeError: If determined database_id is not an integer. + :raises TypeError: If neither site nor schema_name is determined. + + :return A tuple containing database_id and schema_name. + :rtype: tuple + """ + if site and schema_name: + msg = 'Only one of schema_name and site parameters can be defined' + raise TypeError(msg) + + # Determine schema_name + if not schema_name: + if site: + schema_name = f'{site.dbName()}_p' + elif self.schema_name: + schema_name = self.schema_name + elif self.site: + schema_name = f'{self.site.dbName()}_p' + + # Determine database_id + if not database_id: + if self.database_id: + database_id = int(self.database_id) + else: + database_id = self.get_database_id_by_schema_name(schema_name) + + # Validate database_id + if not isinstance(database_id, int): + msg = f'database_id should be integer, but got "{database_id}"' + raise TypeError(msg) + + # Ensure either site or schema_name is provided + if not (self.site or schema_name): + raise TypeError('Either site or schema_name must be provided') + + return database_id, schema_name + + def query(self, sql, database_id=None, schema_name=None, site=None): + """ + Execute SQL queries on Superset. + + :param sql: The SQL query to execute. + :type sql: str + :param database_id: The database ID. + :type database_id: int, optional + :param schema_name: The schema name. + :type schema_name: str, optional + + :raises RuntimeError: If the query execution fails. + + :return: The data returned from the query execution. + :rtype: list + """ + if not self.connected: + self.login() + + token = self.get_csrf_token() + + headers = { + 'X-CSRFToken': token, + 'Content-Type': 'application/json', + 'referer': 'https://superset.wmcloud.org/sqllab/' + } + + database_id, schema_name = self.merge_query_arguments(database_id, + schema_name, + site) + + sql_query_payload = { + 'database_id': database_id, + 'schema': schema_name, + 'sql': sql, + 'json': True, + 'runAsync': False, + } + + url = f'{self.superset_url}/api/v1/sqllab/execute/' + try: + self.last_response = http.fetch(uri=url, + json=sql_query_payload, + method='POST', + headers=headers) + self.last_response.raise_for_status() + json = self.last_response.json() + return json['data'] + + except Exception as e: + raise RuntimeError(f'Failed to execute query: {e}') diff --git a/pywikibot/pagegenerators/__init__.py b/pywikibot/pagegenerators/__init__.py index b9f02d7..64c8089 100644 --- a/pywikibot/pagegenerators/__init__.py +++ b/pywikibot/pagegenerators/__init__.py @@ -68,6 +68,7 @@ SearchPageGenerator, ShortPagesPageGenerator, SubCategoriesPageGenerator, + SupersetPageGenerator, TextIOPageGenerator, UnCategorizedCategoryGenerator, UnCategorizedImageGenerator, @@ -139,6 +140,7 @@ 'SearchPageGenerator', 'ShortPagesPageGenerator', 'SubCategoriesPageGenerator', + 'SupersetPageGenerator', 'TextIOPageGenerator', 'UnCategorizedCategoryGenerator', 'UnCategorizedImageGenerator', @@ -355,6 +357,12 @@ and treats the resulting pages. See :manpage:`MySQL` for more details.
+-supersetquery Takes a SQL query string like + "SELECT page_namespace, page_title FROM page + WHERE page_namespace = 0" and run it in + https://superset.wmcloud.org/ and treats + the resulting pages. + -sparql Takes a SPARQL SELECT query string including ?item and works on the resulting pages.
diff --git a/pywikibot/pagegenerators/_factory.py b/pywikibot/pagegenerators/_factory.py index adb10cd..0e2c135 100644 --- a/pywikibot/pagegenerators/_factory.py +++ b/pywikibot/pagegenerators/_factory.py @@ -46,6 +46,7 @@ PrefixingPageGenerator, RecentChangesPageGenerator, SubCategoriesPageGenerator, + SupersetPageGenerator, TextIOPageGenerator, UserContributionsGenerator, WikibaseSearchItemPageGenerator, @@ -901,6 +902,12 @@ value = pywikibot.input('Mysql query string:') return MySQLPageGenerator(value, site=self.site)
+ def _handle_supersetquery(self, value: str) -> HANDLER_GEN_TYPE: + """Handle `-supersetquery` argument.""" + if not value: + value = pywikibot.input('Superset SQL query string:') + return SupersetPageGenerator(value, site=self.site) + def _handle_intersect(self, value: str) -> Literal[True]: """Handle `-intersect` argument.""" self.intersect = True diff --git a/pywikibot/pagegenerators/_generators.py b/pywikibot/pagegenerators/_generators.py index af59e3d..bc9ed2c 100644 --- a/pywikibot/pagegenerators/_generators.py +++ b/pywikibot/pagegenerators/_generators.py @@ -922,6 +922,125 @@ yield page
+def SupersetPageGenerator(query: str, + site: BaseSite | None = None, + schema_name: str | None = None, + database_id: int | None = None + ) -> Iterator[pywikibot.page.Page]: + """Generate pages that result from the given SPARQL query. + + Pages are generated using site in following order: + + 1. site retrieved using page_wikidb column in SQL result + 2. site as parameter + 3. site retrieved using schema_name + + SQL columns used are + + - page_id + - page_namespace + page_title + - page_wikidb + + Example SQL queries + + .. code-block:: sql + + SELECT + gil_wiki AS page_wikidb, + gil_page AS page_id + FROM globalimagelinks + GROUP BY gil_wiki + LIMIT 10 + + OR + + .. code-block:: sql + + SELECT + page_id + FROM page + LIMIT 10 + + OR + + .. code-block:: sql + + SELECT + page_namespace, + page_title + FROM page + LIMIT 10 + + :param query: the SQL query string. + :param site: Site for generator results. + :param schema_name: target superset schema name + :param database_id: target superset database id + """ + from pywikibot.data.superset import SupersetQuery + + # Do not pass site to superset if schema_name is defined. + # The user may use schema_name to point to different + # wikimedia db on purpose and use site for + # generating result pages. + + superset_site = None if schema_name else site + + superset = SupersetQuery(site=superset_site, + schema_name=schema_name, + database_id=database_id) + + try: + rows = superset.query(query) + except Exception as e: + pywikibot.error(f'Error executing query: {query}\n{e}') + return + + sites = {} + + # If there is no site then retrieve it using schema_name + if not site: + if not schema_name: + raise TypeError('Schema name or site must be provided.') + + wikidb = re.sub('_p$', '', schema_name) + site = pywikibot.site.APISite.fromDBName(wikidb) + + for row in rows: + # If page_wikidb column in SQL result then use it to retrieve site + if 'page_wikidb' in row: + # remove "_p" suffix + wikidb = re.sub('_p$', '', row['page_wikidb']) + + # Caching sites + if wikidb not in sites: + try: + sites[wikidb] = pywikibot.site.APISite.fromDBName(wikidb) + except ValueError: + msg = f'Cannot parse a site from {wikidb} for {row}.' + pywikibot.warning(msg) + continue + site = sites[wikidb] + + # Generate page objects + + # Create page object from page_id + if 'page_id' in row: + page_ids = [row['page_id']] + pages = site.load_pages_from_pageids(page_ids) + for page in pages: + yield page + + # Create page object from page_namespace + page_title + elif 'page_title' in rows[0] and 'page_namespace' in rows[0]: + page_namespace = int(row['page_namespace']) + page_title = row['page_title'] + page = pywikibot.Page(site, page_title, ns=page_namespace) + yield page + + else: + raise ValueError('The SQL result is in wrong format.') + + class XMLDumpPageGenerator(abc.Iterator): # type: ignore[type-arg] """Xml iterator that yields Page objects.
diff --git a/tests/superset_tests.py b/tests/superset_tests.py new file mode 100755 index 0000000..772fa05 --- /dev/null +++ b/tests/superset_tests.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +"""Tests for superset module.""" +# +# (C) Pywikibot team, 2024 +# +# Distributed under the terms of the MIT license. +# +from __future__ import annotations + +import unittest +from contextlib import suppress +import pywikibot +from pywikibot.data.superset import SupersetQuery +from pywikibot.exceptions import NoUsernameError +from tests.aspects import TestCase +from pywikibot.pagegenerators import SupersetPageGenerator + + +class TestSupersetWithoutAuth(TestCase): + """Test Superset without auth.""" + + family = 'meta' + code = 'meta' + + def test_init(self): + """Test init validation functions.""" + # Test initial database_id parameter in wrong format + site = self.get_site() + + superset = SupersetQuery(schema_name='fiwiki_p') + self.assertIsInstance(superset, SupersetQuery) + + msg = 'Only one of schema_name and site parameters can be defined' + with self.assertRaisesRegex(TypeError, msg): + superset = SupersetQuery(schema_name='enwiki_p', + site=site) + + msg = 'database_id should be integer' + with self.assertRaisesRegex(TypeError, msg): + superset = SupersetQuery(schema_name='enwiki_p', + database_id='foo') + + +class TestSupersetWithAuth(TestCase): + """Test Superset with auth.""" + + login = True + family = 'meta' + code = 'meta' + + def test_login_and_oauth_permisson(self): + """Superset login and queries.""" + sql = 'SELECT page_id, page_title FROM page LIMIT 2;' + site = self.get_site() + site.login() + self.assertTrue(site.logged_in()) + + # Test login and initial site parameters + try: + superset = SupersetQuery(site=site) + superset.login() + except NoUsernameError: + pywikibot.warning('Oauth permission is missing. SKIPPING TESTS.') + return + + self.assertTrue(superset.connected) + rows = superset.query(sql) + self.assertLength(rows, 2) + + # Test initial schema_name parameter + superset = SupersetQuery(schema_name='fiwiki_p') + rows = superset.query(sql) + self.assertLength(rows, 2) + + # Test initial schema_name and database_id parameters + superset = SupersetQuery(schema_name='enwiki_p', database_id=1) + rows = superset.query(sql) + self.assertLength(rows, 2) + + # Test get_database_id_by_schema_name() + database_id = superset.get_database_id_by_schema_name('fiwiki_p') + self.assertEqual(database_id, 2) + + # Test incorrect initial schema_name parameter + superset = SupersetQuery(schema_name='foowiki_p') + msg = 'Schema "foowiki_p" not found in https://superset.wmcloud.org.' + with self.assertRaisesRegex(KeyError, msg): + rows = superset.query(sql) + + # Test incorrect initial database_id parameter + # superset.wmcloud.org fails with 500 server error + # so this is expected to be changed when server side + # is updated + + superset = SupersetQuery(schema_name='enwiki_p', database_id=2) + with self.assertRaises(RuntimeError): + rows = superset.query(sql) + + # Test overriding database_id in query + rows = superset.query(sql, database_id=1) + self.assertLength(rows, 2) + + # Test overriding schema_name in query + rows = superset.query(sql, schema_name='fiwiki_p') + self.assertLength(rows, 2) + + # Test overriding schema using site + testsite = pywikibot.Site('fi', 'wikipedia') + rows = superset.query(sql, site=testsite) + self.assertLength(rows, 2) + + # Test that overriding both schema_name and site fails + msg = 'Only one of schema_name and site parameters can be defined' + with self.assertRaisesRegex(TypeError, msg): + rows = superset.query(sql, schema_name='fiwiki_p', site=site) + + def test_superset_generators(self): + """Superset generator.""" + site = self.get_site() + query = 'SELECT page_id FROM page LIMIT 2' + gen = SupersetPageGenerator(query, site=site) + for page in gen: + t = str(page) + self.assertTrue(t) + + query = 'SELECT page_title, page_namespace FROM page LIMIT 2' + gen = SupersetPageGenerator(query, site=site) + for page in gen: + t = str(page) + self.assertTrue(t) + + query = 'SELECT page_namespace, page_title FROM page LIMIT 2' + gen = SupersetPageGenerator(query, schema_name='fiwiki_p') + for page in gen: + t = str(page) + self.assertTrue(t) + + query = ('SELECT * FROM (' + 'SELECT gil_wiki AS page_wikidb, gil_page AS page_id ' + 'FROM globalimagelinks GROUP BY gil_wiki ' + ') AS t LIMIT 2') + gen = SupersetPageGenerator(query, schema_name='commonswiki_p') + for page in gen: + t = str(page) + self.assertTrue(t) + + query = ('SELECT * FROM ( ' + 'SELECT gil_wiki AS page_wikidb, ' + 'gil_page_namespace_id AS page_namespace, ' + 'gil_page_title AS page_title ' + 'FROM globalimagelinks GROUP BY gil_wiki ' + ') AS t LIMIT 2') + gen = SupersetPageGenerator(query, schema_name='commonswiki_p') + for page in gen: + t = str(page) + self.assertTrue(t) + + +if __name__ == '__main__': + with suppress(SystemExit): + unittest.main()
pywikibot-commits@lists.wikimedia.org