Revision: 4052 Author: btongminh Date: 2007-08-16 20:30:19 +0000 (Thu, 16 Aug 2007)
Log Message: ----------- Large parts rewritten; Pywikipedia-fy
Modified Paths: -------------- trunk/pywikipedia/checkusage.py trunk/pywikipedia/delinker.py
Modified: trunk/pywikipedia/checkusage.py =================================================================== --- trunk/pywikipedia/checkusage.py 2007-08-16 17:03:49 UTC (rev 4051) +++ trunk/pywikipedia/checkusage.py 2007-08-16 20:30:19 UTC (rev 4052) @@ -41,6 +41,8 @@ import httplib, urlparse, socket, time from urllib import urlencode import simplejson + +import family
try: import MySQLdb @@ -61,7 +63,25 @@ if title.startswith('Image:'): return strip_ns(title) return title - + +def family(domain): + wiki = domain.split('.') + # Standard family + if wiki[1] in ('wikipedia', 'wiktionary', 'wikibooks', + 'wikiquote', 'wikisource', 'wikinews', 'wikiversity'): + return wiki[0], wiki[1] + # Family on own domain + if wiki[0] == 'www': + return wiki[1], wiki[1] + # Special Wikimedia wiki + if wiki[1] == 'wikimedia': + return wiki[0], wiki[0] + # Multilingual wikisource + if domain == 'wikisource.org': + return '-', 'wikisource' + raise RuntimeError('Unknown family ' + domain) + +# Not sure whether this belongs here class HTTP(object): def __init__(self, host): self.host = host @@ -94,14 +114,14 @@
return res
- def query_api(self, api, host, **kwargs): + def query_api(self, api, host, path, **kwargs): data = urlencode([(k, v.encode('utf-8')) for k, v in kwargs.iteritems()]) if api == 'query': - query_string = '/w/query.php?format=json&' + data + query_string = '%s?format=json&%s' % (path, data) method = 'GET' data = '' elif api == 'api': - query_string = '/w/api.php?format=json' + query_string = '%s?format=json%s' % (path, data) method = 'POST' else: raise ValueError('Unknown api %s' % repr(api)) @@ -131,11 +151,11 @@ list.__init__(self, ()) - def query_api(self, api, host, **kwargs): + def query_api(self, api, host, path, **kwargs): conn = self.find_conn(host) while True: try: - res = conn.query_api(api, host, **kwargs) + res = conn.query_api(api, host, path, **kwargs) self.current_retry = 0 return res except RuntimeError: @@ -181,12 +201,9 @@
class CheckUsage(object): - #LIVE = ['enwiki_p'] - #IGNORE = [] - def __init__(self, limit = 100, - sql_host = 'sql', sql_user = '', sql_pass = '', - sql_host_prefix = 'sql-s', no_db = False, use_autoconn = False, + mysql_default_server = 2, mysql_host_prefix = 'sql-s', mysql_kwargs = {}, + no_db = False, use_autoconn = False, http_retry_timeout = 30, http_max_retries = -1, http_callback = lambda *args: None, @@ -194,86 +211,111 @@ mysql_retry_timeout = 60, mysql_max_retries = -1, mysql_callback = lambda *args: None): - self.conn = HTTPPool(retry_timeout = http_retry_timeout, - max_retries = http_max_retries, callback = http_callback) + self.http = None if no_db: return
- self.sql_host, self.sql_host_prefix = sql_host, sql_host_prefix - self.sql_user, self.sql_pass = sql_user, sql_pass + self.mysql_host, self.mysql_host_prefix = mysql_host, mysql_host_prefix + self.mysql_kwargs = mysql_kwargs self.use_autoconn = use_autoconn self.mysql_retry_timeout = mysql_retry_timeout self.mysql_max_retries = mysql_max_retries self.mysql_callback = mysql_callback + + self.http_retry_timeout = http_retry_timeout + self.http_max_retries = http_max_retries + self.http_callback = http_callback
self.connections = [] - + + # Mapping database name -> mysql connection self.databases = {} - self.clusters = {} - self.domains = {} + # Mapping server id -> mysql connection + self.servers = {} + # Mapping database name -> (lang, family object) + self.families = {}
- database, cursor = self.connect(sql_host) + database, cursor = self.connect(mysql_host_prefix + str(mysql_default_server)) + self.clusters[mysql_default_server] = (database, cursor)
+ # Find where the databases are located cursor.execute('SELECT dbname, domain, server FROM toolserver.wiki ORDER BY size DESC LIMIT %s', (limit, )) for dbname, domain, server in cursor.fetchall(): - if server not in self.clusters: - for _database, _cursor in self.connections: - try: - _cursor.execute('USE ' + dbname) - except MySQLdb.Error, e: - if e[0] != 1049: raise - else: - self.clusters[server] = (_database, _cursor) - if not server in self.clusters: - self.clusters[server] = self.connect(sql_host_prefix + str(server)) + if server not in self.servers: + # Disabled: Multiple clusters per server + #for _database, _cursor in self.connections: + # try: + # _cursor.execute('USE ' + dbname) + # except MySQLdb.Error, e: + # if e[0] != 1049: raise + # else: + # self.clusters[server] = (_database, _cursor) + #if not server in self.clusters: + # self.clusters[server] = self.connect(sql_host_prefix + str(server)) + self.clusters[server] = self.connect_mysql(sql_host_prefix + str(server)) - self.domains[dbname] = domain - self.databases[dbname] = self.clusters[server] + self.sites[dbname] = family(domain) + self.families[dbname] = (self.sites[dbname][0], + wikipedia.Family(self.sites[dbname][1]))
- cursor.execute('SELECT dbname, ns_id, ns_name FROM toolserver.namespace') - self.namespaces = dict((((i[0], i[1]), i[2].decode('utf-8')) for i in cursor)) + # Localized namespaces + #cursor.execute('SELECT dbname, ns_id, ns_name FROM toolserver.namespace') + # self.namespaces = dict((((i[0], i[1]), i[2].decode('utf-8')) for i in cursor))
- def connect(self, host): + def connect_mysql(self, host): # A bug in MySQLdb 1.2.1_p will force you to set # all your connections to use_unicode = False. # Please upgrade to MySQLdb 1.2.2 or higher. if self.use_autoconn: database = mysql_autoconnection.connect( - use_unicode = False, user = self.sql_user, - passwd = self.sql_pass, host = host, + use_unicode = False, host = host, retry_timeout = self.mysql_retry_timeout, max_retries = self.mysql_max_retries, - callback = self.mysql_callback) + callback = self.mysql_callback, + **self.mysql_kwargs) else: database = MySQLdb.connect(use_unicode = False, - user = self.sql_user, - passwd = self.sql_pass, host = host) + host = host, **mysql_kwargs) cursor = database.cursor() self.connections.append((database, cursor)) return database, cursor + def connect_http(self): + if not self.http: + self.http = HTTPPool(retry_timeout = self.http_retry_timeout, + max_retries = self.http_max_retries, callback = self.http_callback) +
def get_usage(self, image): for dbname in self.databases: for link in self.get_usage_db(dbname, image): - yield self.domains[dbname], link + yield self.families[dbname], link
- def get_usage_db(self, database, image): - image = strip_image(image) + def get_usage_db(self, dbname, image): + #image = strip_image(image) + lang, family = self.families(dbname) + + if family.shared_image_repository() == (lang, family.name): + left_join = ''; + else: + left_join = 'LEFT JOIN %s.image ON (il_to = img_name)' query = """SELECT page_namespace, page_title FROM %s.page, %s.imagelinks - LEFT JOIN %s.image ON (il_to = img_name) WHERE img_name IS NULL AND - page_id = il_from AND il_to = %%s""" - self.databases[database][1].execute(query % (database, database, database), + %s WHERE img_name IS NULL AND page_id = il_from AND il_to = %%s""" + self.databases[database][1].execute(query % (dbname, dbname, left_join), (image.encode('utf-8', 'ignore'), )) - for item in self.databases[database][1]: - stripped_title = item[1].decode('utf-8', 'ignore') - if item[0] != 0: - title = self.namespaces[(database, item[0])] + u':' + stripped_title + for page_namespace, page_title in self.databases[database][1]: + stripped_title = page_title.decode('utf-8', 'ignore') + if page_namespace != 0: + title = family.namespace(lang, page_namespace) + u':' + stripped_title else: title = stripped_title - yield item[0], stripped_title, title + yield page_namespace, stripped_title, title
- def get_usage_live(self, domain, image): - image = strip_image(image) - res = self.conn.query_api('api', domain, action = 'query', list = 'imageusage', + def get_usage_live(self, site, image): + self.connect_http() + + #image = strip_image(image) + # BUG: This is ugly. + res = self.http.query_api('api', site.hostname(), site.apipath(), + action = 'query', list = 'imageusage', prop = 'info', iulimit = '500', titles = 'Image:' + image) if '-1' in res['query']['pages']: for usage in res['query'].get('imageusage', ()): @@ -284,27 +326,15 @@ else: stripped_title = title yield namespace, stripped_title, title - - def get_usage_multi(self, images): - res = {} - for image in images: - res[image] = self.get_usage(image) - return res - - ''' - def get_replag(self, db): - query = """SELECT UNIX_TIMESTAMP() - UNIX_TIMESTAMP(rc_timestamp) - FROM %s.recentchanges ORDER BY rc_timestamp DESC LIMIT 1""" - if self.cursor.execute(query) != 1: raise RuntimeError - return self.cursor.fetchone()[0] - ''' + - def exists(self, domain, image): + def exists(self, site, image): # Check whether the image still is deleted on Commons. # BUG: This also returns true for images with a page, but # without the image itself. Can be fixed by querying query.php # instead of api.php. - return '-1' not in self.conn.query_api('api', domain, + # BUG: This is ugly. + return '-1' not in self.http.query_api('api', site.hostname(), site.apipath(), action = 'query', titles = 'Image:' + image)['query']['pages'] @@ -314,5 +344,6 @@ connection.close() except: pass - self.conn.close() + if self.http: + self.conn.close() \ No newline at end of file
Modified: trunk/pywikipedia/delinker.py =================================================================== --- trunk/pywikipedia/delinker.py 2007-08-16 17:03:49 UTC (rev 4051) +++ trunk/pywikipedia/delinker.py 2007-08-16 20:30:19 UTC (rev 4052) @@ -44,28 +44,6 @@ import wikipedia import config -def family(domain): - wiki = domain.split('.') - # Standard family - if wiki[1] in ('wikipedia', 'wiktionary', 'wikibooks', - 'wikiquote', 'wikisource', 'wikinews', 'wikiversity'): - return wiki[0], wiki[1] - # Family on own domain - if wiki[0] == 'www': - return wiki[1], wiki[1] - # Special Wikimedia wiki - if wiki[1] == 'wikimedia': - return wiki[0], wiki[0] - # Multilingual wikisource - if domain == 'wikisource.org': - return '-', 'wikisource' - # My local test wiki - if wiki[-1] == 'localhost': - if len(wiki) == 1: - return 'pool', 'local' - return wiki[0], 'local' - raise RuntimeError('Unknown family ' + domain) - def wait_callback(object): output(u'Connection has been lost in %s. Attempting reconnection.' % repr(object), False) if hasattr(object, 'error'): @@ -93,12 +71,9 @@ return mysql_autoconnection.connect(**kwargs) # TODO: Add support for sqlite3 raise RuntimeError('Unsupported database engine %s' % engine) - -# TODO: Make configurable -SHARED = 'commons.wikimedia.org'
- class Delinker(threadpool.Thread): + # TODO: Method names could use some clean up def __init__(self, pool, CommonsDelinker): threadpool.Thread.__init__(self, pool) self.CommonsDelinker = CommonsDelinker @@ -109,30 +84,31 @@ output(u'%s Usage of %s: %s' % (self, image, usage)) skipped_images = {} - for domain, pages in usage.iteritems(): - # Some code is needed to magically convert the domain - # FIXME: Try some other magic - site = wikipedia.Site(*family(domain)) + for (lang, family), pages in usage.iteritems(): + site = self.CommonsDelinker.get_site(lang, family) - summary = self.get_summary(site, domain, image, admin, reason, replacement) - - for page_namespace, page_title, title in pages: - if self.CommonsDelinker.set_edit(domain, title): - # The page is currently being editted. Postpone. - if domain not in skipped_images: - skipped_images[domain] = [] - skipped_images[domain].append((page_namespace, page_title, title)) - else: - # Delink the image - output(u'%s Delinking %s from %s' % (self, image, domain)) - - try: - result = self.replace_image(image, site, title, summary, replacement) - finally: - self.CommonsDelinker.unset_edit(domain, title) - # Add to logging queue - self.CommonsDelinker.Loggers.append((timestamp, image, domain, - page_namespace, page_title, result, replacement)) + try: + summary = self.get_summary(site, site, image, admin, reason, replacement) + + for title in pages: + if self.CommonsDelinker.set_edit(str(site), title): + # The page is currently being editted. Postpone. + if domain not in skipped_images: + skipped_images[(lang, family)] = [] + skipped_images[(lang, family)].append(title) + else: + # Delink the image + output(u'%s Delinking %s from %s' % (self, image, site)) + + try: + result = self.replace_image(image, site, title, summary, replacement) + finally: + self.CommonsDelinker.unset_edit(str(site), title) + # Add to logging queue + self.CommonsDelinker.Loggers.append((timestamp, image, site.hostname(), + page_namespace, page_title, result, replacement)) + finally: + self.CommonsDelinker.unlock_site(site) if skipped_images: time.sleep(self.CommonsDelinker.config['timeout']) @@ -257,13 +233,10 @@ page.put(new_text, summary) return 'ok' except wikipedia.EditConflict: - # Try once, if this fails as well, we will - # return "failed" - # BUG: Should not discard, but rather try again - try: - page.put(new_text, summary) - except: - return 'failed' + # Try again + output(u'Got EditConflict trying to remove %s from %s:%s.' % \ + (image, site, page_title)) + return self.replace_image(image, site, page_title, summary, replacement = None) except (wikipedia.LockedPage, wikipedia.PageNotSaved): return 'failed' else: @@ -278,16 +251,14 @@ except: output(u'An exception occured in %s' % self, False) traceback.print_exc(file = sys.stderr) - #print >>sys.stderr, cgitb.text(sys.exc_info()) - def get_summary(self, site, domain, image, admin, reason, replacement): + def get_summary(self, site, image, admin, reason, replacement): """ Get the summary template and substitute the correct values.""" - # FIXME: site/domain is redundant. if replacement: - tlp = self.CommonsDelinker.SummaryCache.get(site, domain, 'replace-I18n') + tlp = self.CommonsDelinker.SummaryCache.get(site, str(site), 'replace-I18n') else: - tlp = self.CommonsDelinker.SummaryCache.get(site, domain, 'summary-I18n') + tlp = self.CommonsDelinker.SummaryCache.get(site, str(site), 'summary-I18n') tlp = tlp.replace('$1', image) if replacement: @@ -307,19 +278,19 @@ self.lock = threading.Lock() self.CommonsDelinker = CommonsDelinker - def get(self, site, domain, type): + def get(self, site, key, type): # This can probably also provide something for # localised settings, but then it first needs to # check whether the page is sysop only. self.lock.acquire() try: - if type not in self.summaries: + if type not in self.summaries: self.summaries[type] = {} - if domain in self.summaries[type]: - if (time.time() - self.summaries[type][domain][1]) < \ + if key in self.summaries[type]: + if (time.time() - self.summaries[type][key][1]) < \ self.CommonsDelinker.config['summary_cache']: # Return cached result - return self.summaries[type][domain][0] + return self.summaries[type][key][0] output(u'%s Fetching new summary for %s' % (self, site)) @@ -330,7 +301,7 @@ try: # Fetch the summary template, follow redirects i18n = page.get(get_redirect = True) - self.summaries[type][domain] = (i18n, time.time()) + self.summaries[type][key] = (i18n, time.time()) return i18n except wikipedia.NoPage: pass @@ -344,16 +315,11 @@ # like mediawiki.org and meta and species. output(u'%s Using default summary for %s' % (self, domain)) - if not 'wikipedia' in domain and self.CommonsDelinker.config['global']: - s = domain.split('.')[0] - lang = s[0] - project = s[1] - - if project in ('wiktionary', 'wikibooks', 'wikiquote', - 'wikisource', 'wikinews', 'wikiversity'): - - newsite = wikipedia.Site(lang, 'wikipedia') - return self.get(newsite, lang + '.wikipedia.org', type) + if site.family.name != 'wikipedia' and self.CommonsDelinker.config['global']: + if site.family.name in ('wiktionary', 'wikibooks', 'wikiquote', + 'wikisource', 'wikinews', 'wikiversity'): + newsite = wikipedia.getSite(site.lang, 'wikipedia') + return self.get(newsite, type) return self.CommonsDelinker.config['default_settings'].get(type, '') def check_user_page(self, site): @@ -362,9 +328,11 @@ # Make sure the userpage is not empty # Note: if wikis delete the userpage, it's there own fault filename = 'canedit.cdl' - # Removed utf-8 opening. Is not needed in combination with - # str(sitio), which always returns an 8 bit string. - f = open(filename, 'r') + try: + f = open(filename, 'r') + except IOError: + # Don't care + return ftxt = f.read() f.close() if not '#' + str(site) in ftxt: @@ -390,6 +358,7 @@ def __init__(self, pool, CommonsDelinker): threadpool.Thread.__init__(self, pool) self.CommonsDelinker = CommonsDelinker + # Not really thread safe, but we should only do read operations... self.site = CommonsDelinker.site def run(self): @@ -400,10 +369,8 @@ config = self.CommonsDelinker.config if config['global']: # Note: global use requires MySQL - self.CheckUsage = checkusage.CheckUsage(sys.maxint, - sql_host = config['sql_config']['host'], - sql_user = config['sql_config']['user'], - sql_pass = config['sql_config']['passwd'], + self.CheckUsage = checkusage.CheckUsage(limit = sys.maxint, + mysql_kwargs = config['sql_config'], use_autoconn = True, http_callback = wait_callback, mysql_callback = wait_callback) @@ -420,8 +387,8 @@ # without the image itself. Can be fixed by querying query.php # instead of api.php. Also should this be made as an exits() # method of checkusage.CheckUsage? - if self.CheckUsage.exists(SHARED, image) and not bool(replacement): - output(u'%s %s exists on Commons!' % (self, image)) + if self.CheckUsage.exists(self.site.shared_image_repository(), image) and not bool(replacement): + output(u'%s %s exists on the shared image repository!' % (self, image)) return if self.CheckUsage.exists(self.site.hostname(), image) and \ not bool(replacement): @@ -435,12 +402,13 @@ count = 0 # Sort usage per domain - for domain, page in usage: - if domain not in usage_domains: - usage_domains[domain] = [] - usage_domains[domain].append(page) + for (lang, family), (page_namespace, page_title, title) in usage: + if (lang, family) not in usage_domains: + usage_domains[(lang, family)] = [] + usage_domains[(lang, family)].append(title) count += 1 else: + #FIX! usage_domains = {self.site.hostname(): list( self.CheckUsage.get_usage_live(self.site.hostname(), image))} @@ -453,6 +421,7 @@ self.CommonsDelinker.Delinkers.append((image, usage_domains, timestamp, admin, reason, replacement)) elif replacement: + # Record replacement done self.CommonsDelinker.Loggers.append((timestamp, image, replacement)) def do(self, args): @@ -481,6 +450,7 @@ def log_result(self, timestamp, image, domain, namespace, page, status = "ok", newimage = None): # TODO: Make sqlite3 ready + # FIXME: domain is BAD, BAD, BAD!!! # The original delinker code cached log results, # in order to limit the number of connections. @@ -542,6 +512,9 @@ self.edit_list = [] self.editLock = threading.Lock() + self.sites = {} + self.siteLock = threading.Lock() + self.SummaryCache = SummaryCache(self) if self.config.get('enable_replacer', False): @@ -578,7 +551,31 @@ self.edit_list.remove((domain, page)) self.editLock.release() + def get_site(self, code, fam): + # Threadsafe replacement of wikipedia.getSite + key = '%s:%s' % (code, fam) + self.siteLock.acquire() + try: + if key not in self.sites: + self.sites[key] = [] + for site, used in self.sites[key]: + if not used: + self.sites[self.sites.index((site, False))] = (site, True) + return site + site = wikipedia.Site(code, fam) + self.sites[key].append(site) + return site + finally: + self.siteLock.release() + def unlock_site(self, site): + key = '%s:%s' % (site.lang, site.family.name) + self.siteLock.acquire() + try: + self.sites[self.sites.index((site, True))] = (site, False) + finally: + self.siteLock.release() + def read_deletion_log(self): ts_format = '%Y-%m-%dT%H:%M:%SZ' wait = self.config['delink_wait'] @@ -598,7 +595,7 @@ # Assume less than 500 deletion have been made between # this and the previous check of the log. If this is not # the case, timeout should be set lower. - result = self.http.query_api('api', self.site.hostname(), + result = self.http.query_api('api', self.site.hostname(), self.site.apipath(), action = 'query', list = 'logevents', letype = 'delete', lelimit = self.log_limit, lestart = ts_from_s, leend = ts_end_s, ledir = 'newer') @@ -628,47 +625,10 @@ None)) else: output(u'Skipping deleted image: %s' % logevent['title']) - def read_deletion_log_db(self): - # NOTE: Unused method currently, and not likely - # to be used in the nearby future. Should be removed? - - if not self.cursor.execute("""SELECT UNIX_TIMESTAMP() - last_activity - FROM %s WHERE name = 'deletion_log_updated'""" % \ - self.config['bot_table']): - return False - if cursor.fetchone()[0] > 120: return False - - ts_format = '%Y%m%d%H%M%S' - wait = self.config['delink_wait'] - exclusion = self.config['exclude_string'] - - ts_from = self.last_check - # Truncate -> int() - ts_end = int(time.time()) - self.last_check = ts_end - - # Format as a Mediawiki timestamp and substract a - # certain wait period. - ts_from_s = time.strftime(ts_format, time.gmtime(ts_from - wait + 1)) - ts_end_s = time.strftime(ts_format, time.gmtime(ts_end - wait)) - - self.cursor.execute("""SELECT timestamp, page_title, user, comment - FROM %s WHERE page_namespace = 6 AND action = 'delete' - AND timestamp >= %%s AND timestamp <= %%s""" % \ - self.config['deletion_log_table'], (ts_from_s, ts_end_s)) - - for timestamp, page_title, comment in self.cursor: - if exclusion not in comment: - output(u'Deleted image: %s' % page_title) - self.CheckUsages.append((page_title, timestamp, - user, comment, None)) - else: - output(u'Skipping deleted image: %s' % page_title) - return True - def read_replacement_log(self): # TODO: Make sqlite3 ready + # TODO: Single process replacer update = """UPDATE %s SET status = %%s WHERE id = %%s""" % \ self.config['replacer_table'] self.cursor.execute("""SELECT id, timestamp, old_image, new_image, user, comment @@ -720,14 +680,6 @@ if self.config.get('enable_replacer', False): self.read_replacement_log() - if 'bot_table' in self.config: - # NOTE: Unused - self.cursor.execute("""UPDATE %s SET - last_activity = UNIX_TIMESTAMP() - WHERE name = 'CommonsDelinker'""" % \ - self.config['bot_table']) - self.database.commit() - time.sleep(self.config['timeout']) def thread_died(self): @@ -754,7 +706,7 @@ if cu <= 1: output(u'ERROR!!! Too few CheckUsages left to function', False) threadpool.terminate() - if lg < 0: + if lg <= 0: output(u'ERROR!!! Too few Loggers left to function', False) print >>sys.stderr, 'Currently unlogged:', unlogged threadpool.terminate()