Revision: 5239
Author: btongminh
Date: 2008-04-19 19:46:02 +0000 (Sat, 19 Apr 2008)
Log Message:
-----------
Fix some minor bugs. Starve threads when we can't feed them.
Modified Paths:
--------------
trunk/pywikipedia/commonsdelinker/checkusage.py
trunk/pywikipedia/commonsdelinker/delinker.py
trunk/pywikipedia/commonsdelinker/threadpool.py
Modified: trunk/pywikipedia/commonsdelinker/checkusage.py
===================================================================
--- trunk/pywikipedia/commonsdelinker/checkusage.py 2008-04-19 17:40:05 UTC (rev 5238)
+++ trunk/pywikipedia/commonsdelinker/checkusage.py 2008-04-19 19:46:02 UTC (rev 5239)
@@ -233,8 +233,8 @@
if no_db: return
self.mysql_host_prefix = mysql_host_prefix
- if 'host' in mysql_kwargs: del mysql_kwargs['host']
- self.mysql_kwargs = mysql_kwargs
+ self.mysql_kwargs = mysql_kwargs.copy() # To be safe
+ if 'host' in self.mysql_kwargs: del self.mysql_kwargs['host']
self.use_autoconn = use_autoconn
self.mysql_retry_timeout = mysql_retry_timeout
self.mysql_max_retries = mysql_max_retries
@@ -292,7 +292,7 @@
**self.mysql_kwargs)
else:
database = MySQLdb.connect(use_unicode = False,
- host = host, **mysql_kwargs)
+ host = host, **self.mysql_kwargs)
cursor = database.cursor()
self.connections.append((database, cursor))
return database, cursor
@@ -380,5 +380,5 @@
except:
pass
if self.http:
- self.conn.close()
+ self.http.close()
\ No newline at end of file
Modified: trunk/pywikipedia/commonsdelinker/delinker.py
===================================================================
--- trunk/pywikipedia/commonsdelinker/delinker.py 2008-04-19 17:40:05 UTC (rev 5238)
+++ trunk/pywikipedia/commonsdelinker/delinker.py 2008-04-19 19:46:02 UTC (rev 5239)
@@ -18,7 +18,7 @@
#
# (C) Kyle/Orgullomoore, 2006-2007
# (C) Siebrand Mazeland, 2006-2007
-# (C) Bryan Tong Minh, 2007
+# (C) Bryan Tong Minh, 2007-2008
#
# Distributed under the terms of the MIT license.
#
@@ -40,6 +40,7 @@
import sys, os, threading, time
import traceback
import re, cgitb
+import threading
import threadpool
import checkusage
@@ -48,9 +49,10 @@
import config
def wait_callback(object):
- output(u'Connection has been lost in %s. Attempting reconnection.' %
repr(object), False)
+ output(u'%s Connection has been lost in %s. Attempting reconnection.' %
(threading.currentThread(), repr(object)), False)
if hasattr(object, 'error'):
output(u'Error was %s: %s' % tuple(object.error))
+
def universal_unicode(s):
if type(s) is str:
return s.decode('utf-8', 'ignore')
@@ -432,6 +434,7 @@
pass
class CheckUsage(threadpool.Thread):
+ timeout = 30
def __init__(self, pool, CommonsDelinker):
threadpool.Thread.__init__(self, pool)
self.CommonsDelinker = CommonsDelinker
@@ -443,6 +446,7 @@
threadpool.Thread.run(self)
def connect(self):
+ output(u'%s Connecting to databases' % self)
config = self.CommonsDelinker.config
if config['global']:
# Note: global use requires MySQL
@@ -516,6 +520,19 @@
traceback.print_exc(file = sys.stderr)
self.exit()
self.CommonsDelinker.thread_died()
+
+ def starve(self):
+ self.pool.jobLock.acquire()
+ try:
+ if self.pool[id(self)].isSet(): return False
+
+ output(u'%s Starving' % self)
+ self.CheckUsage.close()
+ del self.pool[id(self)]
+ self.pool.threads.remove(self)
+ return True
+ finally:
+ self.pool.jobLock.release()
class Logger(threadpool.Thread):
def __init__(self, pool, CommonsDelinker):
@@ -529,6 +546,7 @@
threadpool.Thread.run(self)
def connect(self):
+ output(u'%s Connecting to log database' % self)
self.database = connect_database()
self.cursor = self.database.cursor()
@@ -597,16 +615,12 @@
self.site.forceLogin()
# Initialize workers
- self.CheckUsages = threadpool.ThreadPool(CheckUsage)
- [self.CheckUsages.add_thread(self) for i in
xrange(self.config['checkusage_instances'])]
-
- self.Delinkers = threadpool.ThreadPool(Delinker)
- [self.Delinkers.add_thread(self) for i in
xrange(self.config['delinker_instances'])]
-
- self.Loggers = threadpool.ThreadPool(Logger)
+ self.CheckUsages = threadpool.ThreadPool(CheckUsage,
self.config['checkusage_instances'], self)
+ self.Delinkers = threadpool.ThreadPool(Delinker,
self.config['delinker_instances'], self)
if self.config.get('enable_logging', True):
- [self.Loggers.add_thread(self) for i in
xrange(self.config['logger_instances'])]
+ self.Loggers = threadpool.ThreadPool(Logger, self.config['logger_instances'],
self)
else:
+ self.Loggers = threadpool.ThreadPool(Logger, 1, self)
self.Loggers.add_thread(self)
self.http = checkusage.HTTP(self.site.hostname())
Modified: trunk/pywikipedia/commonsdelinker/threadpool.py
===================================================================
--- trunk/pywikipedia/commonsdelinker/threadpool.py 2008-04-19 17:40:05 UTC (rev 5238)
+++ trunk/pywikipedia/commonsdelinker/threadpool.py 2008-04-19 19:46:02 UTC (rev 5239)
@@ -18,7 +18,7 @@
may cause thread unsafety!
"""
#
-# (C) Bryan Tong Minh, 2007
+# (C) Bryan Tong Minh, 2007-2008
#
# Distributed under the terms of the MIT license.
#
@@ -29,23 +29,28 @@
class ThreadPool(dict):
pools = []
- def __init__(self, worker):
+ def __init__(self, worker, max_threads, *args, **kwargs):
dict.__init__(self)
-
+
self.jobLock = threading.Lock()
self.jobQueue = []
self.worker = worker
self.threads = []
-
+
+ self.max_threads = max_threads
+ self.args = args
+ self.kwargs = kwargs
+
self.pools.append(self)
-
+
def append(self, job):
self.jobLock.acquire()
+ counter = 0
try:
self.jobQueue.append(job)
+ # The amount of workers needed to be unlocked
unlock_workers = len(self.jobQueue)
-
- counter = 0
+
for event in self.itervalues():
if not event.isSet():
event.set()
@@ -54,16 +59,19 @@
break
finally:
self.jobLock.release()
-
- def add_thread(self, *args, **kwargs):
+ if counter == 0 and len(self.threads) < self.max_threads:
+ self.add_thread()
+ self.start()
+
+ def add_thread(self):
self.jobLock.acquire()
try:
- thread = self.worker(self, *args, **kwargs)
+ thread = self.worker(self, *self.args, **self.kwargs)
self.threads.append(thread)
self[id(thread)] = threading.Event()
finally:
self.jobLock.release()
-
+
def start(self):
for thread in self.threads:
if not thread.isAlive():
@@ -77,13 +85,14 @@
self[id(thread)].set()
finally:
self.jobLock.release()
-
+
class Thread(threading.Thread):
+ timeout = None
def __init__(self, pool):
threading.Thread.__init__(self)
self.pool = pool
self.quit = False
-
+
def run(self):
while True:
# No try..finally: lock.release() here:
@@ -100,30 +109,37 @@
if not self.pool.jobQueue:
# In case no job is available, wait for the pool
- # to call and do not start a busy while loop.
+ # to call and do not start a busy while loop.
event = self.pool[id(self)]
self.pool.jobLock.release()
event.clear()
- event.wait()
+ event.wait(self.timeout)
+ if not event.isSet() and self.timeout != None:
+ if self.starve(): return
continue
job = self.pool.jobQueue.pop(0)
self.pool.jobLock.release()
self.do(job)
-
+
def exit(self):
self.pool.jobLock.acquire()
try:
self.quit = True
self.pool[id(self)].set()
+ del self.pool[id(self)]
+ self.pool.threads.remove(self)
finally:
self.pool.jobLock.release()
-
+
+ def starve(self):
+ pass
+
def catch_signals():
import signal
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
-
+
def sig_handler(signalnum, stack):
import signal
for pool in ThreadPool.pools:
@@ -138,28 +154,28 @@
# Maybe not a good idea, will also kill child processes
import signal
os.kill(0, signal.SIGTERM)
-
+
if __name__ == '__main__':
import time
# Test cases
-
+
class Worker(Thread):
def do(self, args):
print 'Working', self
time.sleep(10)
print 'Done', self
-
+
pool = ThreadPool(Worker)
print 'Spawning 5 threads'
[pool.add_thread() for i in xrange(5)]
pool.start()
-
+
print 'Doing 25 jobs'
for i in xrange(25):
print 'Job', i
pool.append(i)
time.sleep(i % 6)
-
+
for thread in pool.threads:
thread.exit()