jenkins-bot has submitted this change and it was merged.
Change subject: Asynchronous HTTP requests ......................................................................
Asynchronous HTTP requests
HttpRequest object expanded to be a usable HTTP interface providing a 'promise' response, allowing both syncronmous and asyncronous requests with error reporting, infinite callbacks, and access to raw and decoded response data.
Added HTTP request methods _enqueue and fetch, as simple utility methods for HttpRequest.
Bug: T57889 Change-Id: I4f71b39d6402abefed5841d4ea67a7ac47cc46a2 --- M pywikibot/comms/http.py M pywikibot/comms/threadedhttp.py M tests/http_tests.py 3 files changed, 276 insertions(+), 47 deletions(-)
Approvals: XZise: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/comms/http.py b/pywikibot/comms/http.py index 9e65821..7907165 100644 --- a/pywikibot/comms/http.py +++ b/pywikibot/comms/http.py @@ -209,48 +209,55 @@
@deprecate_arg('ssl', None) def request(site=None, uri=None, *args, **kwargs): - """Queue a request to be submitted to Site. + """ + Request to Site with default error handling and response decoding.
- All parameters not listed below are the same as - L{httplib2.Http.request}. + See L{httplib2.Http.request} for additional parameters.
- If the site argument is provided, the uri is relative to the site's - scriptpath. + If the site argument is provided, the uri is a relative uri from + and including the document root '/'.
- If the site argument is None, the uri must be absolute, and is - used for requests to non wiki pages. + If the site argument is None, the uri must be absolute.
@param site: The Site to connect to @type site: L{pywikibot.site.BaseSite} @param uri: the URI to retrieve @type uri: str - @return: The received data (a unicode string). - + @return: The received data + @rtype: unicode """ assert(site or uri) - if site: - proto = site.protocol() - if proto == 'https': - host = site.ssl_hostname() - uri = site.ssl_pathprefix() + uri - else: - host = site.hostname() - baseuri = urlparse.urljoin("%s://%s" % (proto, host), uri) + if not site: + # TODO: deprecate this usage, once the library code has been + # migrated to using the other request methods. + r = fetch(uri, *args, **kwargs) + return r.content
- kwargs.setdefault("disable_ssl_certificate_validation", - site.ignore_certificate_error()) + proto = site.protocol() + if proto == 'https': + host = site.ssl_hostname() + uri = site.ssl_pathprefix() + uri else: - baseuri = uri - host = urlparse.urlparse(uri).netloc + host = site.hostname() + baseuri = urlparse.urljoin("%s://%s" % (proto, host), uri) + + kwargs.setdefault("disable_ssl_certificate_validation", + site.ignore_certificate_error())
format_string = kwargs.setdefault("headers", {}).get("user-agent") kwargs["headers"]["user-agent"] = user_agent(site, format_string)
- request = threadedhttp.HttpRequest(baseuri, *args, **kwargs) - http_queue.put(request) - while not request.lock.acquire(False): - time.sleep(0.1) + r = fetch(baseuri, *args, **kwargs) + return r.content
+ +def error_handling_callback(request): + """ + Raise exceptions and log alerts. + + @param request: Request that has completed + @rtype request: L{threadedhttp.HttpRequest} + """ # TODO: do some error correcting stuff if isinstance(request.data, SSLHandshakeError): if SSL_CERT_VERIFY_FAILED_MSG in str(request.data): @@ -260,25 +267,86 @@ if isinstance(request.data, Exception): raise request.data
- if request.data[0].status == 504: - raise Server504Error("Server %s timed out" % host) + if request.status == 504: + raise Server504Error("Server %s timed out" % request.hostname)
- if request.data[0].status == 414: + if request.status == 414: raise Server414Error('Too long GET request')
# HTTP status 207 is also a success status for Webdav FINDPROP, # used by the version module. - if request.data[0].status not in (200, 207): + if request.status not in (200, 207): pywikibot.warning(u"Http response status %(status)s" % {'status': request.data[0].status})
- pos = request.data[0]['content-type'].find('charset=') - if pos >= 0: - pos += len('charset=') - encoding = request.data[0]['content-type'][pos:] - else: - encoding = 'ascii' - # Don't warn, many pages don't contain one - pywikibot.log(u"Http response doesn't contain a charset.")
- return request.data[1].decode(encoding) +def _enqueue(uri, method="GET", body=None, headers=None, **kwargs): + """ + Enqueue non-blocking threaded HTTP request with callback. + + Callbacks, including the default error handler if enabled, are run in the + HTTP thread, where exceptions are logged but are not able to be caught. + The default error handler is called first, then 'callback' (singular), + followed by each callback in 'callbacks' (plural). All callbacks are + invoked, even if the default error handler detects a problem, so they + must check request.exception before using the response data. + + Note: multiple async requests do not automatically run concurrently, + as they are limited by the number of http threads in L{numthreads}, + which is set to 1 by default. + + @see: L{httplib2.Http.request} for parameters. + + @kwarg default_error_handling: Use default error handling + @type default_error_handling: bool + @kwarg callback: Method to call once data is fetched + @type callback: callable + @kwarg callbacks: Methods to call once data is fetched + @type callbacks: list of callable + @rtype: L{threadedhttp.HttpRequest} + """ + default_error_handling = kwargs.pop('default_error_handling', None) + callback = kwargs.pop('callback', None) + + callbacks = [] + if default_error_handling: + callbacks.append(error_handling_callback) + if callback: + callbacks.append(callback) + + callbacks += kwargs.pop('callbacks', []) + + if not headers: + headers = {} + + user_agent_format_string = headers.get("user-agent", None) + if not user_agent_format_string or '{' in user_agent_format_string: + headers["user-agent"] = user_agent(None, user_agent_format_string) + + request = threadedhttp.HttpRequest( + uri, method, body, headers, callbacks, **kwargs) + http_queue.put(request) + return request + + +def fetch(uri, method="GET", body=None, headers=None, + default_error_handling=True, **kwargs): + """ + Blocking HTTP request. + + Note: The callback runs in the HTTP thread, where exceptions are logged + but are not able to be caught. + + See L{httplib2.Http.request} for parameters. + + @param default_error_handling: Use default error handling + @type default_error_handling: bool + @rtype: L{threadedhttp.HttpRequest} + """ + request = _enqueue(uri, method, body, headers, **kwargs) + request._join() # wait for it + # Run the error handling callback in the callers thread so exceptions + # may be caught. + if default_error_handling: + error_handling_callback(request) + return request diff --git a/pywikibot/comms/threadedhttp.py b/pywikibot/comms/threadedhttp.py index 32e55bd..b0369df 100644 --- a/pywikibot/comms/threadedhttp.py +++ b/pywikibot/comms/threadedhttp.py @@ -21,19 +21,24 @@ __docformat__ = 'epytext'
# standard python libraries -import sys import re +import sys import threading
if sys.version_info[0] > 2: from http import cookiejar as cookielib - from urllib.parse import splittype, splithost, unquote + from urllib.parse import splittype, splithost, unquote, urlparse + unicode = str else: import cookielib + import urlparse from urllib import splittype, splithost, unquote
import pywikibot + from pywikibot import config + +from pywikibot.tools import UnicodeMixin
_logger = "comm.threadedhttp"
@@ -300,7 +305,7 @@ response, content)
-class HttpRequest(object): +class HttpRequest(UnicodeMixin):
"""Object wrapper for HTTP requests that need to block origin thread.
@@ -321,6 +326,7 @@ <class 'httplib2.ServerNotFoundError'> >>> print(request.data) Unable to find the server at hostname.invalid + >>> queue.put(None) # Stop the http processor thread
C{request.lock.acquire()} will block until the data is available.
@@ -329,17 +335,118 @@ * an exception """
- def __init__(self, *args, **kwargs): + def __init__(self, uri, method="GET", body=None, headers=None, + callbacks=None, **kwargs): """ Constructor.
See C{Http.request} for parameters. """ - self.args = args + self.uri = uri + self.method = method + self.body = body + self.headers = headers + + self.callbacks = callbacks + + self.args = [uri, method, body, headers] self.kwargs = kwargs - self.data = None + + self._parsed_uri = None + self._data = None self.lock = threading.Semaphore(0)
+ def _join(self): + """Block until response has arrived.""" + self.lock.acquire(True) + + @property + def data(self): + """Return the httplib2 response tuple.""" + if not self._data: + self._join() + + assert(self._data) + return self._data + + @data.setter + def data(self, value): + """Set the httplib2 response and invoke each callback.""" + self._data = value + + if self.callbacks: + for callback in self.callbacks: + callback(self) + + @property + def exception(self): + """Get the exception raised by httplib2, if any.""" + if isinstance(self.data, Exception): + return self.data + + @property + def response_headers(self): + """Return the response headers.""" + if not self.exception: + return self.data[0] + + @property + def raw(self): + """Return the raw response body.""" + if not self.exception: + return self.data[1] + + @property + def parsed_uri(self): + """Return the parsed requested uri.""" + if not self._parsed_uri: + self._parsed_uri = urlparse(self.uri) + return self._parsed_uri + + @property + def hostname(self): + """Return the host of the request.""" + return self.parsed_uri.netloc + + @property + def status(self): + """HTTP response status. + + @rtype: int + """ + return self.response_headers.status + + @property + def encoding(self): + """Detect the response encoding.""" + pos = self.response_headers['content-type'].find('charset=') + if pos >= 0: + pos += len('charset=') + encoding = self.response_headers['content-type'][pos:] + else: + encoding = 'ascii' + # Don't warn, many pages don't contain one + pywikibot.log(u"Http response doesn't contain a charset.") + + return encoding + + def decode(self, encoding): + """Return the decoded response.""" + return self.raw.decode(encoding) + + @property + def content(self): + """Return the response decoded by the detected encoding.""" + return self.decode(self.encoding) + + def __unicode__(self): + """Return the response decoded by the detected encoding.""" + return self.content + + def __bytes__(self): + """Return the undecoded response.""" + return self.raw +
class HttpProcessor(threading.Thread):
diff --git a/tests/http_tests.py b/tests/http_tests.py index 28bb87a..0263d62 100644 --- a/tests/http_tests.py +++ b/tests/http_tests.py @@ -8,6 +8,7 @@ __version__ = '$Id$'
import sys + import pywikibot from pywikibot.comms import http, threadedhttp from pywikibot import config2 as config @@ -15,7 +16,11 @@ from tests.utils import expectedFailureIf
if sys.version_info[0] > 2: + import queue as Queue + unicode = str +else: + import Queue
class HttpTestCase(TestCase): @@ -23,6 +28,24 @@ """Tests for http module."""
net = True + + def test_async(self): + """Test http request_async function.""" + r = http._enqueue('http://www.wikipedia.org/') + self.assertIsInstance(r, threadedhttp.HttpRequest) + self.assertEqual(r.status, 200) + self.assertIn('<html lang="mul"', r.content) + self.assertIsInstance(r.content, unicode) + self.assertIsInstance(r.raw, bytes) + + def test_fetch(self): + """Test http fetch function.""" + r = http.fetch('http://www.wikipedia.org/') + self.assertIsInstance(r, threadedhttp.HttpRequest) + self.assertEqual(r.status, 200) + self.assertIn('<html lang="mul"', r.content) + self.assertIsInstance(r.content, unicode) + self.assertIsInstance(r.raw, bytes)
def test_http(self): """Test http request function.""" @@ -88,7 +111,7 @@
class ThreadedHttpTestCase(TestCase):
- """Tests for threadedhttp module.""" + """Tests for threadedhttp module Http class."""
net = True
@@ -102,7 +125,7 @@ self.assertIsInstance(r[0]['status'], str) self.assertEqual(r[0]['status'], '200')
- self.assertIsInstance(r[1], bytes if sys.version_info[0] > 2 else str) + self.assertIsInstance(r[1], bytes) self.assertIn(b'<html lang="mul"', r[1]) self.assertEqual(int(r[0]['content-length']), len(r[1]))
@@ -116,7 +139,7 @@ self.assertIsInstance(r[0]['status'], str) self.assertEqual(r[0]['status'], '200')
- self.assertIsInstance(r[1], bytes if sys.version_info[0] > 2 else str) + self.assertIsInstance(r[1], bytes) self.assertIn(b'<html lang="mul"', r[1]) self.assertEqual(int(r[0]['content-length']), len(r[1]))
@@ -136,6 +159,37 @@ self.assertEqual(r[0]['-content-encoding'], 'gzip')
+class ThreadedHttpRequestTestCase(TestCase): + + """Tests for threadedhttp module threaded HttpRequest.""" + + net = True + + def test_threading(self): + queue = Queue.Queue() + cookiejar = threadedhttp.LockableCookieJar() + connection_pool = threadedhttp.ConnectionPool() + proc = threadedhttp.HttpProcessor(queue, cookiejar, connection_pool) + proc.setDaemon(True) + proc.start() + r = threadedhttp.HttpRequest('http://www.wikipedia.org/') + queue.put(r) + + self.assertNotIsInstance(r.exception, Exception) + self.assertIsInstance(r.data, tuple) + self.assertIsInstance(r.response_headers, dict) + self.assertIn('status', r.response_headers) + self.assertIsInstance(r.response_headers['status'], str) + self.assertEqual(r.response_headers['status'], '200') + self.assertEqual(r.status, 200) + + self.assertIsInstance(r.raw, bytes) + self.assertIn(b'<html lang="mul"', r.raw) + self.assertEqual(int(r.response_headers['content-length']), len(r.raw)) + + queue.put(None) # Stop the http processor thread + + class UserAgentTestCase(TestCase):
"""User agent formatting tests using a format string."""