Revision: 5239 Author: btongminh Date: 2008-04-19 19:46:02 +0000 (Sat, 19 Apr 2008)
Log Message: ----------- Fix some minor bugs. Starve threads when we can't feed them.
Modified Paths: -------------- trunk/pywikipedia/commonsdelinker/checkusage.py trunk/pywikipedia/commonsdelinker/delinker.py trunk/pywikipedia/commonsdelinker/threadpool.py
Modified: trunk/pywikipedia/commonsdelinker/checkusage.py =================================================================== --- trunk/pywikipedia/commonsdelinker/checkusage.py 2008-04-19 17:40:05 UTC (rev 5238) +++ trunk/pywikipedia/commonsdelinker/checkusage.py 2008-04-19 19:46:02 UTC (rev 5239) @@ -233,8 +233,8 @@ if no_db: return
self.mysql_host_prefix = mysql_host_prefix - if 'host' in mysql_kwargs: del mysql_kwargs['host'] - self.mysql_kwargs = mysql_kwargs + self.mysql_kwargs = mysql_kwargs.copy() # To be safe + if 'host' in self.mysql_kwargs: del self.mysql_kwargs['host'] self.use_autoconn = use_autoconn self.mysql_retry_timeout = mysql_retry_timeout self.mysql_max_retries = mysql_max_retries @@ -292,7 +292,7 @@ **self.mysql_kwargs) else: database = MySQLdb.connect(use_unicode = False, - host = host, **mysql_kwargs) + host = host, **self.mysql_kwargs) cursor = database.cursor() self.connections.append((database, cursor)) return database, cursor @@ -380,5 +380,5 @@ except: pass if self.http: - self.conn.close() + self.http.close() \ No newline at end of file
Modified: trunk/pywikipedia/commonsdelinker/delinker.py =================================================================== --- trunk/pywikipedia/commonsdelinker/delinker.py 2008-04-19 17:40:05 UTC (rev 5238) +++ trunk/pywikipedia/commonsdelinker/delinker.py 2008-04-19 19:46:02 UTC (rev 5239) @@ -18,7 +18,7 @@ # # (C) Kyle/Orgullomoore, 2006-2007 # (C) Siebrand Mazeland, 2006-2007 -# (C) Bryan Tong Minh, 2007 +# (C) Bryan Tong Minh, 2007-2008 # # Distributed under the terms of the MIT license. # @@ -40,6 +40,7 @@ import sys, os, threading, time import traceback import re, cgitb +import threading
import threadpool import checkusage @@ -48,9 +49,10 @@ import config def wait_callback(object): - output(u'Connection has been lost in %s. Attempting reconnection.' % repr(object), False) + output(u'%s Connection has been lost in %s. Attempting reconnection.' % (threading.currentThread(), repr(object)), False) if hasattr(object, 'error'): output(u'Error was %s: %s' % tuple(object.error)) + def universal_unicode(s): if type(s) is str: return s.decode('utf-8', 'ignore') @@ -432,6 +434,7 @@ pass class CheckUsage(threadpool.Thread): + timeout = 30 def __init__(self, pool, CommonsDelinker): threadpool.Thread.__init__(self, pool) self.CommonsDelinker = CommonsDelinker @@ -443,6 +446,7 @@ threadpool.Thread.run(self) def connect(self): + output(u'%s Connecting to databases' % self) config = self.CommonsDelinker.config if config['global']: # Note: global use requires MySQL @@ -516,6 +520,19 @@ traceback.print_exc(file = sys.stderr) self.exit() self.CommonsDelinker.thread_died() + + def starve(self): + self.pool.jobLock.acquire() + try: + if self.pool[id(self)].isSet(): return False + + output(u'%s Starving' % self) + self.CheckUsage.close() + del self.pool[id(self)] + self.pool.threads.remove(self) + return True + finally: + self.pool.jobLock.release() class Logger(threadpool.Thread): def __init__(self, pool, CommonsDelinker): @@ -529,6 +546,7 @@ threadpool.Thread.run(self) def connect(self): + output(u'%s Connecting to log database' % self) self.database = connect_database() self.cursor = self.database.cursor() @@ -597,16 +615,12 @@ self.site.forceLogin() # Initialize workers - self.CheckUsages = threadpool.ThreadPool(CheckUsage) - [self.CheckUsages.add_thread(self) for i in xrange(self.config['checkusage_instances'])] - - self.Delinkers = threadpool.ThreadPool(Delinker) - [self.Delinkers.add_thread(self) for i in xrange(self.config['delinker_instances'])] - - self.Loggers = threadpool.ThreadPool(Logger) + self.CheckUsages = threadpool.ThreadPool(CheckUsage, self.config['checkusage_instances'], self) + self.Delinkers = threadpool.ThreadPool(Delinker, self.config['delinker_instances'], self) if self.config.get('enable_logging', True): - [self.Loggers.add_thread(self) for i in xrange(self.config['logger_instances'])] + self.Loggers = threadpool.ThreadPool(Logger, self.config['logger_instances'], self) else: + self.Loggers = threadpool.ThreadPool(Logger, 1, self) self.Loggers.add_thread(self) self.http = checkusage.HTTP(self.site.hostname())
Modified: trunk/pywikipedia/commonsdelinker/threadpool.py =================================================================== --- trunk/pywikipedia/commonsdelinker/threadpool.py 2008-04-19 17:40:05 UTC (rev 5238) +++ trunk/pywikipedia/commonsdelinker/threadpool.py 2008-04-19 19:46:02 UTC (rev 5239) @@ -18,7 +18,7 @@ may cause thread unsafety! """ # -# (C) Bryan Tong Minh, 2007 +# (C) Bryan Tong Minh, 2007-2008 # # Distributed under the terms of the MIT license. # @@ -29,23 +29,28 @@
class ThreadPool(dict): pools = [] - def __init__(self, worker): + def __init__(self, worker, max_threads, *args, **kwargs): dict.__init__(self) - + self.jobLock = threading.Lock() self.jobQueue = [] self.worker = worker self.threads = [] - + + self.max_threads = max_threads + self.args = args + self.kwargs = kwargs + self.pools.append(self) - + def append(self, job): self.jobLock.acquire() + counter = 0 try: self.jobQueue.append(job) + # The amount of workers needed to be unlocked unlock_workers = len(self.jobQueue) - - counter = 0 + for event in self.itervalues(): if not event.isSet(): event.set() @@ -54,16 +59,19 @@ break finally: self.jobLock.release() - - def add_thread(self, *args, **kwargs): + if counter == 0 and len(self.threads) < self.max_threads: + self.add_thread() + self.start() + + def add_thread(self): self.jobLock.acquire() try: - thread = self.worker(self, *args, **kwargs) + thread = self.worker(self, *self.args, **self.kwargs) self.threads.append(thread) self[id(thread)] = threading.Event() finally: self.jobLock.release() - + def start(self): for thread in self.threads: if not thread.isAlive(): @@ -77,13 +85,14 @@ self[id(thread)].set() finally: self.jobLock.release() - + class Thread(threading.Thread): + timeout = None def __init__(self, pool): threading.Thread.__init__(self) self.pool = pool self.quit = False - + def run(self): while True: # No try..finally: lock.release() here: @@ -100,30 +109,37 @@ if not self.pool.jobQueue: # In case no job is available, wait for the pool - # to call and do not start a busy while loop. + # to call and do not start a busy while loop. event = self.pool[id(self)] self.pool.jobLock.release() event.clear() - event.wait() + event.wait(self.timeout) + if not event.isSet() and self.timeout != None: + if self.starve(): return continue job = self.pool.jobQueue.pop(0) self.pool.jobLock.release() self.do(job) - + def exit(self): self.pool.jobLock.acquire() try: self.quit = True self.pool[id(self)].set() + del self.pool[id(self)] + self.pool.threads.remove(self) finally: self.pool.jobLock.release() - + + def starve(self): + pass + def catch_signals(): import signal signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) - + def sig_handler(signalnum, stack): import signal for pool in ThreadPool.pools: @@ -138,28 +154,28 @@ # Maybe not a good idea, will also kill child processes import signal os.kill(0, signal.SIGTERM) - + if __name__ == '__main__': import time # Test cases - + class Worker(Thread): def do(self, args): print 'Working', self time.sleep(10) print 'Done', self - + pool = ThreadPool(Worker) print 'Spawning 5 threads' [pool.add_thread() for i in xrange(5)] pool.start() - + print 'Doing 25 jobs' for i in xrange(25): print 'Job', i pool.append(i) time.sleep(i % 6) - + for thread in pool.threads: thread.exit()