Revision: 5189 Author: russblau Date: 2008-04-07 12:28:23 +0000 (Mon, 07 Apr 2008)
Log Message: ----------- pagegenerators.py: generalize look-ahead generator object and implement in PreloadingGenerator; other files: minor cleanup and bugfixes
Modified Paths: -------------- trunk/pywikipedia/pagegenerators.py trunk/pywikipedia/replace.py trunk/pywikipedia/xmlreader.py
Modified: trunk/pywikipedia/pagegenerators.py =================================================================== --- trunk/pywikipedia/pagegenerators.py 2008-04-06 22:15:30 UTC (rev 5188) +++ trunk/pywikipedia/pagegenerators.py 2008-04-07 12:28:23 UTC (rev 5189) @@ -42,7 +42,7 @@ -uncatfiles Work on all files which are not categorised.
-file Read a list of pages to treat from the named text file. - Page titles in the file must be enclosed with [[brackets]]. + Page titles in the file must be enclosed with [[brackets]]. Argument can also be given as "-file:filename".
-filelinks Work on all pages that use a certain image/media file. @@ -130,6 +130,93 @@ import wikipedia, date, catlib import config
+ +class ThreadedGenerator(threading.Thread): + """Look-ahead generator class. + + Runs a generator in a separate thread and queues the results; can + be called like a regular generator. + + Subclasses should override self.generator, _not_ self.run + + Important: the generator thread will stop itself if the generator's + internal queue is exhausted; but, if the calling program does not use + all the generated values, it must call the generator's stop() method to + stop the background thread. Example usage: + + >>> gen = ThreadedGenerator(target=foo) + >>> try: + ... for data in gen: + ... do_work(data) + ... finally: + ... gen.stop() + + """ + + def __init__(self, group=None, target=None, name="GeneratorThread", + args=(), kwargs=None, qsize=65536): + """Constructor. Takes same keyword arguments as threading.Thread. + + target must be a generator function (or other callable that returns + an iterable object). + + @param qsize: The size of the lookahead queue. The larger the qsize, + the more values will be computed in advance of use (which can eat + up memory and processor time). + @type qsize: int + + """ + if kwargs is None: + kwargs = {} + if target: + self.generator = target + if not hasattr(self, "generator"): + raise RuntimeError("No generator for ThreadedGenerator to run.") + self.args, self.kwargs = args, kwargs + threading.Thread.__init__(self, group=group, name=name) + self.queue = Queue.Queue(qsize) + self.finished = threading.Event() + + def __iter__(self): + """Iterate results from the queue.""" + if not self.isAlive() and not self.finished.isSet(): + self.start() + # if there is an item in the queue, yield it, otherwise wait + while not self.finished.isSet(): + try: + yield self.queue.get(True, 0.25) + except Queue.Empty: + pass + except KeyboardInterrupt: + self.stop() + + def stop(self): + """Stop the background thread.""" +## if not self.finished.isSet(): +## wikipedia.output("DEBUG: signalling %s to stop." % self) + self.finished.set() + + def run(self): + """Run the generator and store the results on the queue.""" + self.__gen = self.generator(*self.args, **self.kwargs) + for result in self.__gen: + while True: + if self.finished.isSet(): +## wikipedia.output("DEBUG: %s received stop signal." % self) + return + try: + self.queue.put_nowait(result) + except Queue.Full: + time.sleep(0.25) + continue + break + # wait for queue to be emptied, then kill the thread + while not self.finished.isSet() and not self.queue.empty(): + time.sleep(0.25) + self.stop() +## wikipedia.output("DEBUG: %s stopped because generator exhausted." % self) + + def AllpagesPageGenerator(start ='!', namespace = None, includeredirects = True, site = None): """ Using the Allpages special page, retrieve all articles' titles, and yield @@ -565,20 +652,20 @@
def RegexFilterPageGenerator(generator, regex): """ - Wraps around another generator. Yields only thos pages, which titles are positively - matched to regex. + Wraps around another generator. Yields only thos pages, which titles are + positively matched to regex. """ reg = re.compile(regex, re.I)
for page in generator: - if reg.match(page.titleWithoutNamespace()): + if reg.match(page.titleWithoutNamespace()): yield page
def CombinedPageGenerator(generators): """ - Wraps around a list of other generators. Yields all pages generated by the - first generator; when the first generator stops yielding pages, yields those - generated by the second generator, etc. + Wraps around a list of other generators. Yields all pages generated by + the first generator; when the first generator stops yielding pages, + yields those generated by the second generator, etc. """ for generator in generators: for page in generator: @@ -595,8 +682,8 @@
def PageWithTalkPageGenerator(generator): """ - Wraps around another generator. Yields the same pages, but for non-talk pages, it - also includes associated talk pages. + Wraps around another generator. Yields the same pages, but for non-talk + pages, it also includes associated talk pages. This generator does not check if the talk page in fact exists. """ for page in generator: @@ -604,93 +691,69 @@ if not page.isTalkPage(): yield page.toggleTalkPage()
-class _Preloader(threading.Thread): - def __init__(self, queue, generator, pageNumber): - threading.Thread.__init__(self) - self.queue = queue - self.generator = generator +class PreloadingGenerator(ThreadedGenerator): + """ + Yields the same pages as generator generator. Retrieves 60 pages (or + another number specified by pageNumber), loads them using + Special:Export, and yields them one after the other. Then retrieves more + pages, etc. Thus, it is not necessary to load each page separately. + Operates asynchronously, so the next batch of pages is loaded in the + background before the first batch is fully consumed. + """ + def __init__(self, generator, pageNumber=60, lookahead=10): + self.wrapped_gen = generator self.pageNumber = pageNumber - # identification for debugging purposes - self.setName('Preloader-Thread') - # This thread dies when the main program terminates - self.setDaemon(True) + ThreadedGenerator.__init__(self, name="Preloading-Thread", + qsize=lookahead)
- def preload(self, pages): + def generator(self): try: - while len(pages) > 0: - # It might be that the pages are on different sites, - # e.g. because the -interwiki parameter was used. - # Query the sites one by one. - site = pages[0].site() - pagesThisSite = [page for page in pages if page.site() == site] - pages = [page for page in pages if page.site() != site] - wikipedia.getall(site, pagesThisSite, throttle=False) - for page in pagesThisSite: - yield page - except IndexError: - # Can happen if the pages list is empty. Don't care. - pass - except wikipedia.SaxError: - # Ignore this error, and get the pages the traditional way later. - pass - - def run(self): - try: # this array will contain up to pageNumber pages and will be flushed # after these pages have been preloaded and yielded. somePages = [] - for page in self.generator: + for page in self.wrapped_gen: + if self.finished.isSet(): + return somePages.append(page) # We don't want to load too many pages at once using XML export. # We only get a maximum number at a time. if len(somePages) >= self.pageNumber: - for refpage in self.preload(somePages): - self.queue.put(refpage) + for loaded_page in self.preload(somePages): + yield loaded_page somePages = [] if somePages: + # wrapped generator is exhausted but some pages still unloaded # preload remaining pages - for refpage in self.preload(somePages): - self.queue.put(refpage) - self.queue.put(None) # to signal end of list + for loaded_page in self.preload(somePages): + yield loaded_page except Exception, e: wikipedia.output(unicode(e)) - self.queue.put(None) # to signal end of list + finally: + if hasattr(self.wrapped_gen, "stop"): + self.wrapped_gen.stop()
-def PreloadingGenerator(generator, pageNumber = 60): - """ - Yields the same pages as generator generator. Retrieves 60 pages (or - another number specified by pageNumber), loads them using - Special:Export, and yields them one after the other. Then retrieves more - pages, etc. Thus, it is not necessary to load each page separately. - Operates asynchronously, so the next batch of pages is loaded in the - background before the first batch is fully consumed. - """ - if pageNumber < 2: - raise ValueError("PreloadingGenerator needs to load more than 1 page.") - pagequeue = Queue.Queue(min(pageNumber//2, 10)) - # Note: queue size will determine how quickly the Preloader goes back for - # more pages. If the queue size is unlimited, it will preload all pages - # before yielding any of them to the consumer. If the queue size is small, - # it will wait until most pages have been yielded before preloading the - # next batch. This value tries to strike a compromise, but may need - # adjustment based upon experience. - preloader = _Preloader(pagequeue, generator, pageNumber) - preloader.start() - while True: - # Queue.get() blocks the main thread. This means that the - # program wouldn't react to CTRL-C while it is waiting for - # a queue element. - # Thus, there is a timeout to the blocking, so that Python - # can check once a second if there is a KeyboardInterrupt. + def preload(self, page_list): try: - p = pagequeue.get(timeout = 1) - except Queue.Empty: - # This is expected. Keep waiting. - continue - if p is None: - return - yield p + while len(page_list) > 0: + # It might be that the pages are on different sites, + # e.g. because the -interwiki parameter was used. + # Query the sites one by one. + site = page_list[0].site() + pagesThisSite = [page for page in page_list + if page.site() == site] + page_list = [page for page in page_list + if page.site() != site] + wikipedia.getall(site, pagesThisSite) + for page in pagesThisSite: + yield page + except IndexError: + # Can happen if the pages list is empty. Don't care. + pass + except wikipedia.SaxError: + # Ignore this error, and get the pages the traditional way later. + pass
+ class GeneratorFactory: """ This factory is responsible for processing command line arguments
Modified: trunk/pywikipedia/replace.py =================================================================== --- trunk/pywikipedia/replace.py 2008-04-06 22:15:30 UTC (rev 5188) +++ trunk/pywikipedia/replace.py 2008-04-07 12:28:23 UTC (rev 5189) @@ -102,7 +102,7 @@ python replace.py referer referrer -file:typos.txt -excepttext:HTTP """ # -# (C) Daniel Herding, 2004 +# (C) Daniel Herding & the Pywikipediabot Team, 2004-2008 # # Distributed under the terms of the MIT license. # @@ -156,6 +156,7 @@ 'zh': u'機器人:執行文字代換作業 %s', }
+ class XmlDumpReplacePageGenerator: """ Iterator that will yield Pages that might contain text to replace. @@ -190,9 +191,6 @@ self.parser = dump.parse()
def __iter__(self): - return self - - def next(self): try: for entry in self.parser: if self.skipping: @@ -205,17 +203,16 @@ for old, new in self.replacements: new_text = wikipedia.replaceExcept( new_text, old, new, self.excsInside) - if new_text != entry.text: - return wikipedia.Page(self.site, entry.title) + if new_text != entry.text: + yield wikipedia.Page(self.site, entry.title) except KeyboardInterrupt: try: if not self.skipping: wikipedia.output( - 'To resume, use "-xmlstart:%s" on the command line.' - % entry.title) + u'To resume, use "-xmlstart:%s" on the command line.' + % entry.title) except NameError: pass - raise KeyboardInterrupt
def isTitleExcepted(self, title): if self.exceptions.has_key('title'): @@ -231,6 +228,7 @@ return True return False
+ class ReplaceRobot: """ A bot that can do text replacements. @@ -336,7 +334,8 @@ # Load the page's text from the wiki original_text = page.get() if not page.canBeEdited(): - wikipedia.output(u"You can't edit page %s" % page.aslink()) + wikipedia.output(u"You can't edit page %s" + % page.aslink()) continue except wikipedia.NoPage: wikipedia.output(u'Page %s not found' % page.aslink()) @@ -363,7 +362,8 @@ cats = page.categories() if self.addedCat not in cats: cats.append(self.addedCat) - new_text = wikipedia.replaceCategoryLinks(new_text, cats) + new_text = wikipedia.replaceCategoryLinks(new_text, + cats) # Show the title of the page we're working on. # Highlight the title in purple. wikipedia.output(u"\n\n>>> \03{lightpurple}%s\03{default} <<<" @@ -388,7 +388,8 @@ u'Cannot change %s because of blacklist entry %s' % (page.title(), e.url)) except wikipedia.PageNotSaved, error: - wikipedia.output(u'Error putting page: %s' % (error.args,)) + wikipedia.output(u'Error putting page: %s' + % (error.args,)) except wikipedia.LockedPage: wikipedia.output(u'Skipping %s (locked page)' % (page.title(),)) @@ -613,9 +614,13 @@ gen = XmlDumpReplacePageGenerator(xmlFilename, xmlStart, replacements, exceptions) elif useSql: - whereClause = 'WHERE (%s)' % ' OR '.join(["old_text RLIKE '%s'" % prepareRegexForMySQL(old.pattern) for (old, new) in replacements]) + whereClause = 'WHERE (%s)' % ' OR '.join( + ["old_text RLIKE '%s'" % prepareRegexForMySQL(old.pattern) + for (old, new) in replacements]) if exceptions: - exceptClause = 'AND NOT (%s)' % ' OR '.join(["old_text RLIKE '%s'" % prepareRegexForMySQL(exc.pattern) for exc in exceptions]) + exceptClause = 'AND NOT (%s)' % ' OR '.join( + ["old_text RLIKE '%s'" % prepareRegexForMySQL(exc.pattern) + for exc in exceptions]) else: exceptClause = '' query = u""" @@ -640,10 +645,12 @@ if namespaces != []: gen = pagegenerators.NamespaceFilterPageGenerator(gen, namespaces) if xmlFilename: - # XML parsing can be quite slow, so we preload less pages each time. - preloadingGen = pagegenerators.PreloadingGenerator(gen, pageNumber = 20) + # XML parsing can be quite slow, so use smaller batches and + # longer lookahead. + preloadingGen = pagegenerators.PreloadingGenerator(gen, + pageNumber=20, lookahead=1000) else: - preloadingGen = pagegenerators.PreloadingGenerator(gen, pageNumber = 60) + preloadingGen = pagegenerators.PreloadingGenerator(gen, pageNumber=60) bot = ReplaceRobot(preloadingGen, replacements, exceptions, acceptall, allowoverlap, recursive, None, sleep) bot.run()
Modified: trunk/pywikipedia/xmlreader.py =================================================================== --- trunk/pywikipedia/xmlreader.py 2008-04-06 22:15:30 UTC (rev 5188) +++ trunk/pywikipedia/xmlreader.py 2008-04-07 12:28:23 UTC (rev 5189) @@ -51,6 +51,7 @@ moveRestriction = 'sysop' return editRestriction, moveRestriction
+ class XmlEntry: """ Represents a page. @@ -67,6 +68,7 @@ self.moveRestriction = moveRestriction self.revisionid = revisionid
+ class XmlHeaderEntry: """ Represents a header entry @@ -78,6 +80,7 @@ self.case = u'' self.namespaces = {}
+ class MediaWikiXmlHandler(xml.sax.handler.ContentHandler): def __init__(self): xml.sax.handler.ContentHandler.__init__(self) @@ -239,7 +242,9 @@ """Return a generator that will yield XmlEntry objects""" print 'Reading XML dump...' if not 'iterparse' in globals(): - wikipedia.output(u'NOTE: cElementTree not found. Using slower fallback solution. Consider installing the python-celementtree package.') + wikipedia.output( +u'''WARNING: cElementTree not found. Using slower fallback solution. +Consider installing the python-celementtree package.''') return self.regex_parse() else: return self.new_parse() @@ -271,13 +276,9 @@ text = revision.findtext("{%s}text" % uri) editRestriction, moveRestriction \ = parseRestrictions(restrictions) - - yield XmlEntry(title=title, - id=pageid, - text=text or u'', - username=username, - ipedit=bool(ipeditor), - timestamp= timestamp, + yield XmlEntry(title=title, id=pageid, text=text or u'', + username=username, ipedit=bool(ipeditor), + timestamp=timestamp, editRestriction=editRestriction, moveRestriction=moveRestriction, revisionid=revisionid @@ -344,11 +345,10 @@ username = m.group('ip') ipedit = True yield XmlEntry(title = m.group('title'), - id = m.group('pageid'), - text = text, - username = username, - ipedit=ipedit, - timestamp = m.group('timestamp'), + id=m.group('pageid'), text=text, + username=username, ipedit=ipedit, + timestamp=m.group('timestamp'), editRestriction = editRestriction, - moveRestriction = moveRestriction, - revisionid = m.group('revisionid')) + moveRestriction=moveRestriction, + revisionid=m.group('revisionid') + )