Revision: 4924 Author: russblau Date: 2008-01-23 14:48:13 +0000 (Wed, 23 Jan 2008)
Log Message: ----------- Improve error handling, retry waits, and docstring formatting.
Modified Paths: -------------- branches/rewrite/pywikibot/data/api.py branches/rewrite/pywikibot/data/http.py branches/rewrite/pywikibot/data/threadedhttp.py
Modified: branches/rewrite/pywikibot/data/api.py =================================================================== --- branches/rewrite/pywikibot/data/api.py 2008-01-22 14:57:59 UTC (rev 4923) +++ branches/rewrite/pywikibot/data/api.py 2008-01-23 14:48:13 UTC (rev 4924) @@ -9,14 +9,18 @@ # __version__ = '$Id: $'
- from UserDict import DictMixin -import urllib import http import simplejson as json -import warnings +import logging +import re +import traceback +import time +import urllib
+lagpattern = re.compile(r"Waiting for [\d.]+: (?P<lag>\d+) seconds? lagged") + class APIError(Exception): """The wiki site returned an error message.""" def __init__(self, code, info, **kwargs): @@ -30,17 +34,26 @@ return "%(code)s: %(info)s" % self.__dict__
+class TimeoutError(Exception): + pass + + class Request(DictMixin): """A request to a Site's api.php interface.
- Attributes of this object get passed as commands to api.php, and can be - get or set using the dict interface. All attributes must be strings - (unicode). Attributes supplied without values are passed to the API as - keys. + Attributes of this object (except for the special parameters listed + below) get passed as commands to api.php, and can be get or set using + the dict interface. All attributes must be strings (unicode). + Attributes supplied without values are passed to the API as keys.
- @param site: The Site to which the request will be submitted. If not - supplied, uses the user's configured default Site. + @param site: The Site to which the request will be submitted. If not + supplied, uses the user's configured default Site. @param format: (optional) Defaults to "json" + @param max_retries: (optional) Maximum number of times to retry after + errors, defaults to 25 + @param retry_wait: (optional) Minimum time to wait after an error, + defaults to 5 seconds (doubles each retry until max of 120 is + reached)
Example:
@@ -60,13 +73,15 @@
""" def __init__(self, *args, **kwargs): - if "site" in kwargs: - self.site = kwargs["site"] - del kwargs["site"] + self.site = kwargs.pop("site", None) # else use defaultSite() ... when written + self.max_retries = kwargs.pop("max_retries", 25) + self.retry_wait = kwargs.pop("retry_wait", 5) self.params = {} - if not "format" in kwargs: + if "format" not in kwargs: self.params["format"] = "json" + if "maxlag" not in kwargs: + self.params["maxlag"] = "5" self.update(*args, **kwargs)
# implement dict interface @@ -101,7 +116,7 @@ def submit(self): """Submit a query and parse the response.
- @return: The data retrieved from api.php (a dict) + @return: The data retrieved from api.php (a dict)
""" if self.params['format'] != 'json': @@ -112,52 +127,70 @@ while True: # TODO wait on errors # TODO catch http errors - if self.params.get("action", "") in ("login",): - rawdata = http.request(self.site, uri, method="POST", + try: + if self.params.get("action", "") in ("login",): + rawdata = http.request(self.site, uri, method="POST", headers={'Content-Type': - 'application/x-www-form-urlencoded'}, + 'application/x-www-form-urlencoded'}, body=params) - return rawdata - else: - uri = uri + "?" + params - rawdata = http.request(self.site, uri) + else: + uri = uri + "?" + params + rawdata = http.request(self.site, uri) + except Exception, e: #TODO: what exceptions can occur here? + logging.warning(traceback.format_exc()) + self.wait() + continue if rawdata.startswith(u"unknown_action"): - e = {'code': data[:14], 'info': data[16:]} - raise APIError(e) + raise APIError(rawdata[:14], rawdata[16:]) try: result = json.loads(rawdata) except ValueError: # if the result isn't valid JSON, there must be a server # problem. Wait a few seconds and try again # TODO: implement a throttle - warnings.warn( + logging.warning( "Non-JSON response received from server %s; the server may be down." % self.site) print rawdata + self.wait(max_retries, retry_wait) continue if not result: - return {} - if type(result) is dict: - if "error" in result: - if "code" in result["error"]: - code = result["error"]["code"] - del result["error"]["code"] - else: - code = "Unknown" - if "info" in result["error"]: - info = result["error"]["info"] - del result["error"]["info"] - else: - info = None - # raise error - raise APIError(code, info, **result["error"]) + result = {} + if type(result) is not dict: + raise APIError("Unknown", + "Unable to process query response of type %s." + % type(result), + {'data': result}) + if "error" not in result: return result - raise APIError("Unknown", - "Unable to process query response of type %s." - % type(result), - {'data': result}) + code = result["error"].pop("code", "Unknown") + info = result["error"].pop("info", None) + if code == "maxlag": + lag = lagpattern.search(info) + if lag: + logging.info( + "Pausing due to database lag: " + info) + self.wait(int(lag.group("lag"))) + continue + # raise error + raise APIError(code, info, **result["error"])
+ + def wait(self, lag=None): + """Determine how long to wait after a failed request.""" + self.max_retries -= 1 + if self.max_retries < 0: + raise TimeoutError("Maximum retries attempted without success.") + + if lag is not None: + if lag > 2 * self.retry_wait: + self.retry_wait = min(120, lag // 2) + logging.warn("Waiting %s seconds before retrying." % self.retry_wait) + time.sleep(self.retry_wait) + self.retry_wait = min(120, self.retry_wait * 2) + + if __name__ == "__main__": from pywikibot.tests.dummy import TestSite as Site mysite = Site("en.wikipedia.org") - + logging.getLogger().setLevel(logging.DEBUG)
Modified: branches/rewrite/pywikibot/data/http.py =================================================================== --- branches/rewrite/pywikibot/data/http.py 2008-01-22 14:57:59 UTC (rev 4923) +++ branches/rewrite/pywikibot/data/http.py 2008-01-23 14:48:13 UTC (rev 4924) @@ -5,11 +5,12 @@ This module handles communication between the bot and the HTTP threads.
This module is responsible for - * Setting up a connection pool - * Providing a (blocking) interface for HTTP requests - * Translate site objects with query strings into urls - * Urlencoding all data - * Basic HTTP error handling + +- Setting up a connection pool +- Providing a (blocking) interface for HTTP requests +- Translate site objects with query strings into urls +- Urlencoding all data +- Basic HTTP error handling """
# @@ -68,7 +69,14 @@ request = threadedhttp.HttpRequest(uri, *args, **kwargs) http_queue.put(request) request.lock.acquire() - + #do some error correcting stuff - + + #if all else fails + if isinstance(request.data, Exception): + raise request.data + + if request.data[0].status != 200: + logging.warning("Http response status %s" % request.data[0].status) + return request.data[1]
Modified: branches/rewrite/pywikibot/data/threadedhttp.py =================================================================== --- branches/rewrite/pywikibot/data/threadedhttp.py 2008-01-22 14:57:59 UTC (rev 4923) +++ branches/rewrite/pywikibot/data/threadedhttp.py 2008-01-23 14:48:13 UTC (rev 4924) @@ -1,10 +1,12 @@ -# -*- coding: utf-8 -*- +# -*- coding: utf-8 -*- """ Httplib2 threaded cookie layer - This class extends Httplib2, adding support for: - * Cookies, guarded for cross-site redirects - * Thread safe ConnectionPool and LockableCookieJar classes - * HttpProcessor thread class - * HttpRequest object + +This class extends Httplib2, adding support for: +- Cookies, guarded for cross-site redirects +- Thread safe ConnectionPool and LockableCookieJar classes +- HttpProcessor thread class +- HttpRequest object + """
# (C) 2007 Pywikipedia bot team, 2007 @@ -34,17 +36,19 @@
class ConnectionPool(object): - """ A thread-safe connection pool """ + """A thread-safe connection pool. + + @param maxnum: Maximum number of connections per identifier. + The pool drops excessive connections added. + + """ def __init__(self, maxnum=5): - """ @param maxnum: Maximum number of connections per identifier. - The pool drops excessive connections added. - """ self.connections = {} self.lock = threading.Lock() self.maxnum = maxnum
def __del__(self): - """ Destructor to close all connections in the pool """ + """Destructor to close all connections in the pool.""" self.lock.acquire() try: for key in self.connections: @@ -57,9 +61,11 @@ return self.connections.__repr__()
def pop_connection(self, identifier): - """ Gets a connection from identifiers connection pool - @param identifier The pool identifier - @returns A connection object if found, None otherwise + """Get a connection from identifier's connection pool. + + @param identifier: The pool identifier + @return: A connection object if found, None otherwise + """ self.lock.acquire() try: @@ -71,9 +77,11 @@ self.lock.release()
def push_connection(self, identifier, connection): - """ Adds a connection to identifiers connection pool - @param identifier The pool identifier - @param connection The connection to add to the pool + """Add a connection to identifier's connection pool. + + @param identifier: The pool identifier + @param connection: The connection to add to the pool + """ self.lock.acquire() try: @@ -96,31 +104,42 @@ self.lock = threading.Lock()
class Http(httplib2.Http): - """ Subclass of httplib2.Http that uses a `LockableCookieJar` to store cookies. - Overrides httplib2s internal redirect support to prevent cookies - being eaten by the wrong sites. + """Subclass of httplib2.Http that stores cookies. + + Overrides httplib2's internal redirect support to prevent cookies being + eaten by the wrong sites. + + @param cookiejar: (optional) CookieJar to use. A new one will be used + when not supplied. + @param connection_pool: (optional) Connection pool to use. A new one + will be used when not supplied. + @param max_redirects: (optional) The maximum number of redirects to + follow. 5 is default. + """ def __init__(self, *args, **kwargs): - """ @param cookiejar: (optional) CookieJar to use. A new one will be used when not supplied. - @param connection_pool: (optional) Connection pool to use. A new one will be used when not supplied. - @param max_redirects: (optional) The maximum number of redirects to follow. 5 is default. - """ self.cookiejar = kwargs.pop('cookiejar', LockableCookieJar()) self.connection_pool = kwargs.pop('connection_pool', ConnectionPool()) self.max_redirects = kwargs.pop('max_redirects', 5) httplib2.Http.__init__(self, *args, **kwargs)
- def request(self, uri, method="GET", body=None, headers=None, max_redirects=None, connection_type=None): - """ Starts an HTTP request. - @param uri: The uri to retrieve - @param method: (optional) The HTTP method to use. Default is 'GET' - @param body: (optional) The request body. Default is no body. - @param headers: (optional) Additional headers to send. Defaults include - C{connection: keep-alive}, C{user-agent} and C{content-type}. - @param max_redirects: (optional) The maximum number of redirects to use for this request. - The class instances max_redirects is default - @param connection_type: (optional) ? - @returns: (response, content) tuple + def request(self, uri, method="GET", body=None, headers=None, + max_redirects=None, connection_type=None): + """Start an HTTP request. + + @param uri: The uri to retrieve + @param method: (optional) The HTTP method to use. Default is 'GET' + @param body: (optional) The request body. Default is no body. + @param headers: (optional) Additional headers to send. Defaults + include C{connection: keep-alive}, C{user-agent} and + C{content-type}. + @param max_redirects: (optional) The maximum number of redirects to + use for this request. The class instance's max_redirects is + default + @param connection_type: (optional) see L{httplib2.Http.request} + + @return: (response, content) tuple + """ if max_redirects is None: max_redirects = self.max_redirects @@ -136,11 +155,13 @@ self.cookiejar.lock.release() headers = req.headers
- # Wikimedia squids: add connection: keep-alive to request headers unless overridden + # Wikimedia squids: add connection: keep-alive to request headers + # unless overridden headers['connection'] = headers.pop('connection', 'keep-alive')
# determine connection pool key and fetch connection - (scheme, authority, request_uri, defrag_uri) = httplib2.urlnorm(httplib2.iri2uri(uri)) + (scheme, authority, request_uri, defrag_uri) = httplib2.urlnorm( + httplib2.iri2uri(uri)) conn_key = scheme+":"+authority
connection = self.connection_pool.pop_connection(conn_key) @@ -150,12 +171,20 @@ # Redirect hack: we want to regulate redirects follow_redirects = self.follow_redirects self.follow_redirects = False - logging.debug('%r' % ((uri, method, headers, max_redirects, connection_type),)) - (response, content) = httplib2.Http.request(self, uri, method, body, headers, max_redirects, connection_type) + logging.debug('%r' % ( + (uri, method, body, headers, max_redirects, connection_type),)) + try: + (response, content) = httplib2.Http.request( + self, uri, method, body, headers, + max_redirects, connection_type) + except Exception, e: # what types? + # return exception instance to be retrieved by the calling thread + return e self.follow_redirects = follow_redirects
# return connection to pool - self.connection_pool.push_connection(conn_key, self.connections[conn_key]) + self.connection_pool.push_connection(conn_key, + self.connections[conn_key]) del self.connections[conn_key]
# First write cookies @@ -167,15 +196,20 @@
# Check for possible redirects redirectable_response = ((response.status == 303) or - (response.status in [300, 301, 302, 307] and method in ["GET", "HEAD"])) - if self.follow_redirects and (max_redirects > 0) and redirectable_response: - (response, content) = self._follow_redirect(uri, method, body, headers, response, content, max_redirects) + (response.status in [300, 301, 302, 307] and + method in ["GET", "HEAD"])) + if self.follow_redirects and (max_redirects > 0) \ + and redirectable_response: + (response, content) = self._follow_redirect( + uri, method, body, headers, response, content, max_redirects)
return (response, content)
- def _follow_redirect(self, uri, method, body, headers, response, content, max_redirects): - """ Internal function to follow a redirect recieved by L{request} """ - (scheme, authority, absolute_uri, defrag_uri) = httplib2.urlnorm(httplib2.iri2uri(uri)) + def _follow_redirect(self, uri, method, body, headers, response, + content, max_redirects): + """Internal function to follow a redirect recieved by L{request}""" + (scheme, authority, absolute_uri, defrag_uri) = httplib2.urlnorm( + httplib2.iri2uri(uri)) if self.cache: cachekey = defrag_uri else: @@ -184,39 +218,54 @@ # Pick out the location header and basically start from the beginning # remembering first to strip the ETag header and decrement our 'depth' if not response.has_key('location') and response.status != 300: - raise httplib2.RedirectMissingLocation("Redirected but the response is missing a Location: header.", response, content) + raise httplib2.RedirectMissingLocation( + "Redirected but the response is missing a Location: header.", + response, content) # Fix-up relative redirects (which violate an RFC 2616 MUST) if response.has_key('location'): location = response['location'] - (scheme, authority, path, query, fragment) = httplib2.parse_uri(location) + (scheme, authority, path, query, fragment) = httplib2.parse_uri( + location) if authority == None: response['location'] = httplib2.urlparse.urljoin(uri, location) - logging.debug('Relative redirect: changed [%s] to [%s]' % (location, response['location'])) + logging.debug('Relative redirect: changed [%s] to [%s]' + % (location, response['location'])) if response.status == 301 and method in ["GET", "HEAD"]: response['-x-permanent-redirect-url'] = response['location'] if not response.has_key('content-location'): response['content-location'] = absolute_uri - httplib2._updateCache(headers, response, content, self.cache, cachekey) + httplib2._updateCache(headers, response, content, self.cache, + cachekey)
headers.pop('if-none-match', None) headers.pop('if-modified-since', None)
if response.has_key('location'): location = response['location'] - redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method - return self.request(location, redirect_method, body=body, headers = headers, max_redirects = max_redirects - 1) + redirect_method = ((response.status == 303) and + (method not in ["GET", "HEAD"]) + ) and "GET" or method + return self.request(location, redirect_method, body=body, + headers=headers, + max_redirects=max_redirects - 1) else: - raise RedirectLimit("Redirected more times than redirection_limit allows.", response, content) + raise RedirectLimit( + "Redirected more times than redirection_limit allows.", + response, content)
+ class HttpRequest(object): - """ Object wrapper for HTTP requests that need to block the requesters thread. - Usage: - >>> request = HttpRequest('http://www.google.com') - >>> queue.put(request) - >>> request.lock.acquire() - >>> print request.data - - C{request.lock.acquire()} will block until the data is available. + """Object wrapper for HTTP requests that need to block origin thread. + + Usage: + + >>> request = HttpRequest('http://www.google.com') + >>> queue.put(request) + >>> request.lock.acquire() + >>> print request.data + + C{request.lock.acquire()} will block until the data is available. + """ def __init__(self, *args, **kwargs): self.args = args @@ -224,13 +273,19 @@ self.data = None self.lock = threading.Semaphore(0)
+ class HttpProcessor(threading.Thread): - """ Thread object to spawn multiple HTTP connection threads """ + """ Thread object to spawn multiple HTTP connection threads. + + @param queue: The C{Queue.Queue} object that contains L{HttpRequest} + objects. + @param cookiejar: The C{LockableCookieJar} cookie object to share among + requests. + @param connection_pool: The C{ConnectionPool} object which contains + connections to share among requests. + + """ def __init__(self, queue, cookiejar, connection_pool): - """ @param queue: The C{Queue.Queue} object that contains L{HttpRequest} objects. - @param cookiejar: The C{LockableCookieJar} cookie object to share among requests. - @param connection_pool: The C{ConnectionPool} object which contains connections to share among requests. - """ threading.Thread.__init__(self) self.queue = queue self.http = Http(cookiejar=cookiejar, connection_pool=connection_pool) @@ -352,4 +407,4 @@ # as part of the expires= date format. so we have # to split carefully here - header.split(',') won't do it. HEADERVAL= re.compile(r'\s*(([^,]|(,\s*\d))+)') - return [h[0] for h in HEADERVAL.findall(self.response[k])] \ No newline at end of file + return [h[0] for h in HEADERVAL.findall(self.response[k])]
pywikipedia-l@lists.wikimedia.org