http://www.mediawiki.org/wiki/Special:Code/pywikipedia/9244
Revision: 9244 Author: xqt Date: 2011-05-12 14:56:49 +0000 (Thu, 12 May 2011) Log Message: ----------- move ThreadedGenerator from pagegenerators to pywikibot/tools from rewrite r9067
Modified Paths: -------------- trunk/pywikipedia/pagegenerators.py
Added Paths: ----------- trunk/pywikipedia/pywikibot/tools.py
Modified: trunk/pywikipedia/pagegenerators.py =================================================================== --- trunk/pywikipedia/pagegenerators.py 2011-05-12 14:18:20 UTC (rev 9243) +++ trunk/pywikipedia/pagegenerators.py 2011-05-12 14:56:49 UTC (rev 9244) @@ -24,10 +24,8 @@ import config
import traceback -import Queue import re import sys -import threading import codecs
import urllib, urllib2, time @@ -208,91 +206,6 @@ class GeneratorExit(Exception): pass
-class ThreadedGenerator(threading.Thread): - """Look-ahead generator class. - - Runs a generator in a separate thread and queues the results; can - be called like a regular generator. - - Subclasses should override self.generator, _not_ self.run - - Important: the generator thread will stop itself if the generator's - internal queue is exhausted; but, if the calling program does not use - all the generated values, it must call the generator's stop() method to - stop the background thread. Example usage: - - >>> gen = ThreadedGenerator(target=foo) - >>> try: - ... for data in gen: - ... do_work(data) - ... finally: - ... gen.stop() - - """ - - def __init__(self, group=None, target=None, name="GeneratorThread", - args=(), kwargs=None, qsize=65536): - """Constructor. Takes same keyword arguments as threading.Thread. - - target must be a generator function (or other callable that returns - an iterable object). - - @param qsize: The size of the lookahead queue. The larger the qsize, - the more values will be computed in advance of use (which can eat - up memory and processor time). - @type qsize: int - - """ - if kwargs is None: - kwargs = {} - if target: - self.generator = target - if not hasattr(self, "generator"): - raise RuntimeError("No generator for ThreadedGenerator to run.") - self.args, self.kwargs = args, kwargs - threading.Thread.__init__(self, group=group, name=name) - self.queue = Queue.Queue(qsize) - self.finished = threading.Event() - - def __iter__(self): - """Iterate results from the queue.""" - if not self.isAlive() and not self.finished.isSet(): - self.start() - # if there is an item in the queue, yield it, otherwise wait - while not self.finished.isSet(): - try: - yield self.queue.get(True, 0.25) - except Queue.Empty: - pass - except KeyboardInterrupt: - self.stop() - - def stop(self): - """Stop the background thread.""" -## if not self.finished.isSet(): -## pywikibot.output("DEBUG: signalling %s to stop." % self) - self.finished.set() - - def run(self): - """Run the generator and store the results on the queue.""" - self.__gen = self.generator(*self.args, **self.kwargs) - for result in self.__gen: - while True: - if self.finished.isSet(): -## pywikibot.output("DEBUG: %s received stop signal." % self) - return - try: - self.queue.put_nowait(result) - except Queue.Full: - time.sleep(0.25) - continue - break - # wait for queue to be emptied, then kill the thread - while not self.finished.isSet() and not self.queue.empty(): - time.sleep(0.25) - self.stop() -## pywikibot.output("DEBUG: %s stopped because generator exhausted." % self) - class GeneratorFactory(object): """Process command line arguments and return appropriate page generator. This factory is responsible for processing command line arguments
Copied: trunk/pywikipedia/pywikibot/tools.py (from rev 9242, branches/rewrite/pywikibot/tools.py) =================================================================== --- trunk/pywikipedia/pywikibot/tools.py (rev 0) +++ trunk/pywikipedia/pywikibot/tools.py 2011-05-12 14:56:49 UTC (rev 9244) @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +"""Miscellaneous helper functions (not wiki-dependent)""" +# +# (C) Pywikipedia bot team, 2008 +# +# Distributed under the terms of the MIT license. +# +__version__ = '$Id$' + +import sys +import threading +import time +import Queue + + +class ThreadedGenerator(threading.Thread): + """Look-ahead generator class. + + Runs a generator in a separate thread and queues the results; can + be called like a regular generator. + + Subclasses should override self.generator, I{not} self.run + + Important: the generator thread will stop itself if the generator's + internal queue is exhausted; but, if the calling program does not use + all the generated values, it must call the generator's stop() method to + stop the background thread. Example usage: + + >>> gen = ThreadedGenerator(target=xrange, args=(20,)) + >>> try: + ... for data in gen: + ... print data, + ... finally: + ... gen.stop() + 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 + + """ + + def __init__(self, group=None, target=None, name="GeneratorThread", + args=(), kwargs=None, qsize=65536): + """Constructor. Takes same keyword arguments as threading.Thread. + + target must be a generator function (or other callable that returns + an iterable object). + + @param qsize: The size of the lookahead queue. The larger the qsize, + the more values will be computed in advance of use (which can eat + up memory and processor time). + @type qsize: int + + """ + if kwargs is None: + kwargs = {} + if target: + self.generator = target + if not hasattr(self, "generator"): + raise RuntimeError("No generator for ThreadedGenerator to run.") + self.args, self.kwargs = args, kwargs + threading.Thread.__init__(self, group=group, name=name) + self.queue = Queue.Queue(qsize) + self.finished = threading.Event() + + def __iter__(self): + """Iterate results from the queue.""" + if not self.isAlive() and not self.finished.isSet(): + self.start() + # if there is an item in the queue, yield it, otherwise wait + while not self.finished.isSet(): + try: + yield self.queue.get(True, 0.25) + except Queue.Empty: + pass + except KeyboardInterrupt: + self.stop() + + def stop(self): + """Stop the background thread.""" + self.finished.set() + + def run(self): + """Run the generator and store the results on the queue.""" + self.__gen = self.generator(*self.args, **self.kwargs) + for result in self.__gen: + while True: + if self.finished.isSet(): + return + try: + self.queue.put_nowait(result) + except Queue.Full: + time.sleep(0.25) + continue + break + # wait for queue to be emptied, then kill the thread + while not self.finished.isSet() and not self.queue.empty(): + time.sleep(0.25) + self.stop() + + +def itergroup(iterable, size): + """Make an iterator that returns lists of (up to) size items from iterable. + + Example: + + >>> i = itergroup(xrange(25), 10) + >>> print i.next() + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + >>> print i.next() + [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] + >>> print i.next() + [20, 21, 22, 23, 24] + >>> print i.next() + Traceback (most recent call last): + ... + StopIteration + + """ + group = [] + for item in iterable: + group.append(item) + if len(group) == size: + yield group + group = [] + if group: + yield group + + +class ThreadList(list): + """A simple threadpool class to limit the number of simultaneous threads. + + Any threading.Thread object can be added to the pool using the append() + method. If the maximum number of simultaneous threads has not been reached, + the Thread object will be started immediately; if not, the append() call + will block until the thread is able to start. + + >>> pool = ThreadList(limit=10) + >>> def work(): + ... time.sleep(1) + ... + >>> for x in xrange(20): + ... pool.append(threading.Thread(target=work)) + ... + + """ + def __init__(self, limit=sys.maxint, *args): + self.limit = limit + list.__init__(self, *args) + for item in list(self): + if not isinstance(threading.Thread, item): + raise TypeError("Cannot add '%s' to ThreadList" % type(item)) + + def active_count(self): + """Return the number of alive threads, and delete all non-alive ones.""" + count = 0 + for item in list(self): + if item.isAlive(): + count += 1 + else: + self.remove(item) + return count + + def append(self, thd): + if not isinstance(thd, threading.Thread): + raise TypeError("Cannot append '%s' to ThreadList" % type(thd)) + while self.active_count() >= self.limit: + time.sleep(2) + list.append(self, thd) + thd.start() + + +if __name__ == "__main__": + def _test(): + import doctest + doctest.testmod() + _test()