jenkins-bot has submitted this change. (
https://gerrit.wikimedia.org/r/c/pywikibot/core/+/651946 )
Change subject: [IMPR] http.fetch() return requests.Response()
......................................................................
[IMPR] http.fetch() return requests.Response()
http.fetch() return requests.Response() instead of
threadedhttp.HttpRequest().
Done:
- move encoding detection in http.
- return respone instead of request
- deprecate threadedhttp.HttpRequest() with FutureWarning
- introduce a temporary class ONLY for deprecation purposes.
Bug: T265206
Change-Id: Ia46683bf39f75790baeb6eed8e49e5fcab2de501
---
M pywikibot/comms/http.py
M pywikibot/comms/threadedhttp.py
M pywikibot/page/__init__.py
M pywikibot/site_detect.py
M pywikibot/specialbots/_upload.py
M scripts/download_dump.py
M scripts/imagecopy.py
M scripts/reflinks.py
M tests/aspects.py
M tests/http_tests.py
M tests/site_detect_tests.py
11 files changed, 265 insertions(+), 154 deletions(-)
Approvals:
Xqt: Looks good to me, approved
jenkins-bot: Verified
diff --git a/pywikibot/comms/http.py b/pywikibot/comms/http.py
index 9716b0a..1c7f6e2 100644
--- a/pywikibot/comms/http.py
+++ b/pywikibot/comms/http.py
@@ -16,6 +16,8 @@
# Distributed under the terms of the MIT license.
#
import atexit
+import codecs
+import re
import sys
from contextlib import suppress
@@ -30,7 +32,6 @@
import pywikibot
from pywikibot.backports import Tuple
-from pywikibot.comms import threadedhttp
from pywikibot import config2 as config
from pywikibot.exceptions import (
FatalServerError, Server504Error, Server414Error
@@ -252,7 +253,7 @@
baseuri = site.base_url(uri)
r = fetch(baseuri, headers=headers, **kwargs)
- site.throttle.retry_after = int(r.response_headers.get('retry-after', 0))
+ site.throttle.retry_after = int(r.headers.get('retry-after', 0))
return r.text
@@ -313,15 +314,24 @@
if response.status_code not in (200, 207):
warning('Http response status {}'.format(response.status_code))
+ if isinstance(response.encoding, UnicodeDecodeError):
+ error('An error occurred for uri {}: '
+ 'no encoding detected!'.format(response.request.url))
+ raise response.encoding from None
+
@deprecated_args(callback=True, body='data')
-def fetch(uri, method='GET', headers=None, default_error_handling: bool = True,
+def fetch(uri: str, method: str = 'GET', headers: Optional[dict] = None,
+ default_error_handling: bool = True,
use_fake_user_agent: Union[bool, str] = False, **kwargs):
"""
HTTP request.
See L{requests.Session.request} for parameters.
+ @param uri: URL to send
+ @param method: HTTP method of the request (default: GET)
+ @param headers: dictionary of headers of the request
@param default_error_handling: Use default error handling
@param use_fake_user_agent: Set to True to use fake UA, False to use
pywikibot's UA, str to specify own UA. This behaviour might be
@@ -335,7 +345,7 @@
@type verify: bool or path to certificates
@kwarg callbacks: Methods to call once data is fetched
@type callbacks: list of callable
- @rtype: L{threadedhttp.HttpRequest}
+ @rtype: L{requests.Response}
"""
# Change user agent depending on fake UA settings.
# Set header to new UA if needed.
@@ -373,11 +383,11 @@
headers['user-agent'] =
assign_user_agent(headers.get('user-agent'))
callbacks = kwargs.pop('callbacks', [])
+ # error_handling_callback will be executed first.
if default_error_handling:
- callbacks.append(error_handling_callback)
+ callbacks.insert(0, error_handling_callback)
charset = kwargs.pop('charset', None)
- request = threadedhttp.HttpRequest(charset=charset)
auth = get_authentication(uri)
if auth is not None and len(auth) == 4:
@@ -406,18 +416,107 @@
headers=headers, auth=auth, timeout=timeout,
**kwargs)
except Exception as e:
- request.data = e
response = e
else:
- request.data = response
+ response.encoding = _decide_encoding(response, charset)
for callback in callbacks:
callback(response)
- # if there's no data in the answer we're in trouble
- try:
- request.data
- except AssertionError as e:
- raise e
+ return _ResponseDeprecationWrapper(response)
- return request
+
+class _ResponseDeprecationWrapper(requests.Response):
+
+ """Helper class for the deprecation of HttpRequests.
+
+ This class will be removed ASAP. Its only purpose is to allow
+ a graceful deprecation of HttpRequests.
+ DO NOT USE!
+
+ """
+
+ def __init__(self, response):
+ self.__response = response
+
+ def __getattr__(self, attr):
+ return getattr(self.__response, attr)
+
+ def __setattr__(self, attr, val):
+ if attr == '_ResponseDeprecationWrapper__response':
+ object.__setattr__(self, attr, val)
+
+ return setattr(self.__response, attr, val)
+
+ @property
+ @deprecated('attribute/methods of Response(), '
+ 'which is now returned from http.fetch()',
+ since='20210110', future_warning=True)
+ def data(self):
+ return self
+
+
+def _get_encoding_from_response_headers(response):
+ """Return charset given by the response header."""
+ content_type = response.headers.get('content-type')
+
+ if not content_type:
+ return None
+
+ m = re.search('charset=(?P<charset>.*?$)', content_type)
+ if m:
+ header_encoding = m.group('charset')
+ elif 'json' in content_type:
+ # application/json | application/sparql-results+json
+ header_encoding = 'utf-8'
+ elif 'xml' in content_type:
+ header = response.content[:100].splitlines()[0] # bytes
+ m = re.search(
+ br'encoding=(["\'])(?P<encoding>.+?)\1', header)
+ if m:
+ header_encoding = m.group('encoding').decode('utf-8')
+ else:
+ header_encoding = 'utf-8'
+ else:
+ header_encoding = None
+
+ return header_encoding
+
+
+def _decide_encoding(response, charset):
+ """Detect the response encoding."""
+ def _try_decode(content, encoding):
+ """Helper function to try decoding."""
+ content.decode(encoding)
+ return encoding
+
+ header_encoding = _get_encoding_from_response_headers(response)
+ if header_encoding is None:
+ pywikibot.log('Http response does not contain a charset.')
+
+ if charset is None:
+ charset = response.request.headers.get('accept-charset')
+
+ # No charset requested, or in request headers or response headers.
+ # Defaults to latin1.
+ if charset is None and header_encoding is None:
+ return _try_decode(response.content, 'latin1')
+
+ if charset is None and header_encoding is not None:
+ return _try_decode(response.content, header_encoding)
+
+ if charset is not None and header_encoding is None:
+ return _try_decode(response.content, charset)
+
+ # Both charset and header_encoding are available.
+ if codecs.lookup(header_encoding) != codecs.lookup(charset):
+ pywikibot.warning(
+ 'Encoding "{}" requested but "{}" received in the
'
+ 'response header.'.format(charset, header_encoding))
+
+ try:
+ _encoding = _try_decode(response.content, header_encoding)
+ except UnicodeDecodeError:
+ _encoding = _try_decode(response.content, charset)
+
+ return _encoding
diff --git a/pywikibot/comms/threadedhttp.py b/pywikibot/comms/threadedhttp.py
index 0f0d6e0..d5c26f5 100644
--- a/pywikibot/comms/threadedhttp.py
+++ b/pywikibot/comms/threadedhttp.py
@@ -17,6 +17,7 @@
deprecated,
deprecated_args,
issue_deprecation_warning,
+ ModuleDeprecationWrapper,
)
@@ -294,3 +295,8 @@
def __bytes__(self) -> Optional[bytes]: # pragma: no cover
"""Return the undecoded response."""
return self.content
+
+
+wrapper = ModuleDeprecationWrapper(__name__)
+wrapper._add_deprecated_attr('HttpRequest', replacement_name='',
+ since='20201226', future_warning=True)
diff --git a/pywikibot/page/__init__.py b/pywikibot/page/__init__.py
index 3dc9015..0b276bd 100644
--- a/pywikibot/page/__init__.py
+++ b/pywikibot/page/__init__.py
@@ -2528,7 +2528,7 @@
if req.status_code == 200:
try:
with open(filename, 'wb') as f:
- for chunk in req.data.iter_content(chunk_size):
+ for chunk in req.iter_content(chunk_size):
f.write(chunk)
except IOError as e:
raise e
diff --git a/pywikibot/site_detect.py b/pywikibot/site_detect.py
index 8b46594..167468d 100644
--- a/pywikibot/site_detect.py
+++ b/pywikibot/site_detect.py
@@ -43,9 +43,9 @@
r = fetch(fromurl)
check_response(r)
- if fromurl != r.data.url:
- pywikibot.log('{} redirected to {}'.format(fromurl, r.data.url))
- fromurl = r.data.url
+ if fromurl != r.url:
+ pywikibot.log('{} redirected to {}'.format(fromurl, r.url))
+ fromurl = r.url
self.fromurl = fromurl
diff --git a/pywikibot/specialbots/_upload.py b/pywikibot/specialbots/_upload.py
index 5161adb..cf96e67 100644
--- a/pywikibot/specialbots/_upload.py
+++ b/pywikibot/specialbots/_upload.py
@@ -132,8 +132,8 @@
with open(path, 'ab') as fd:
os.lseek(handle, file_len, 0)
try:
- r = http.fetch(file_url, stream=True, headers=headers)
- response = r.data
+ response = http.fetch(file_url, stream=True,
+ headers=headers)
response.raise_for_status()
# get download info, if available
diff --git a/scripts/download_dump.py b/scripts/download_dump.py
index b0d1e03..74e9a8d 100644
--- a/scripts/download_dump.py
+++ b/scripts/download_dump.py
@@ -118,8 +118,7 @@
return
with open(file_current_storepath, 'wb') as result_file:
- total = int(response.response_headers.get(
- 'content-length', -1))
+ total = int(response.headers['content-length'])
if total == -1:
pywikibot.warning("'content-length' missing in
"
'response headers')
@@ -128,7 +127,7 @@
display_string = ''
pywikibot.output('')
- for data in response.data.iter_content(100 * 1024):
+ for data in response.iter_content(100 * 1024):
result_file.write(data)
if total <= 0:
diff --git a/scripts/imagecopy.py b/scripts/imagecopy.py
index ae631b4..f949196 100644
--- a/scripts/imagecopy.py
+++ b/scripts/imagecopy.py
@@ -215,15 +215,14 @@
@return: A CommonHelper description message.
@rtype: str
"""
- gotInfo = False
- while not gotInfo:
+ while True:
try:
commonsHelperPage = fetch(
'https://commonshelper.toolforge.org/',
method='POST',
data=parameters)
- data = commonsHelperPage.data.content.decode('utf-8')
- gotInfo = True
+ data = commonsHelperPage.content.decode('utf-8')
+ break
except RequestException:
pywikibot.output("Got a RequestException, let's try again")
return data
diff --git a/scripts/reflinks.py b/scripts/reflinks.py
index 9ac0292..78ded1a 100755
--- a/scripts/reflinks.py
+++ b/scripts/reflinks.py
@@ -537,7 +537,7 @@
ref.url, use_fake_user_agent=self._use_fake_user_agent)
# Try to get Content-Type from server
- content_type = f.response_headers.get('content-type')
+ content_type = f.headers.get('content-type')
if content_type and not self.MIME.search(content_type):
if ref.link.lower().endswith('.pdf') \
and not self.opt.ignorepdf:
@@ -564,7 +564,7 @@
return
# Get the real url where we end (http redirects !)
- redir = f.data.url
+ redir = f.url
if redir != ref.link \
and domain.findall(redir) == domain.findall(link):
if soft404.search(redir) \
@@ -656,7 +656,7 @@
new_text = new_text.replace(match.group(), repl)
return
- u = f.data.text
+ u = f.text
# Retrieves the first non empty string inside <title> tags
for m in self.TITLE.finditer(u):
diff --git a/tests/aspects.py b/tests/aspects.py
index f7e3fec..ff35577 100644
--- a/tests/aspects.py
+++ b/tests/aspects.py
@@ -457,17 +457,12 @@
r = http.fetch(hostname,
method='HEAD',
default_error_handling=False)
- if r.exception:
- e = r.exception
- else:
- if r.status_code not in {200, 301, 302, 303, 307, 308}:
- raise ServerError('HTTP status: {}'
- .format(r.status_code))
- except Exception as e2:
+ if r.status_code not in {200, 301, 302, 303, 307, 308}:
+ raise ServerError('HTTP status: {}'.format(r.status_code))
+ except Exception as e:
pywikibot.error('{}: accessing {} caused exception:'
.format(cls.__name__, hostname))
- pywikibot.exception(e2, tb=True)
- e = e2
+ pywikibot.exception(e, tb=True)
if e:
cls._checked_hostnames[hostname] = e
diff --git a/tests/http_tests.py b/tests/http_tests.py
index ec85e77..d9c0b53 100644
--- a/tests/http_tests.py
+++ b/tests/http_tests.py
@@ -16,7 +16,8 @@
import pywikibot
from pywikibot import config2 as config
-from pywikibot.comms import http, threadedhttp
+
+from pywikibot.comms import http
from pywikibot.tools import PYTHON_VERSION, suppress_warnings
from tests import join_images_path, patch
@@ -41,7 +42,7 @@
def test_fetch(self):
"""Test http.fetch using
http://www.wikipedia.org/."""
r = http.fetch('http://www.wikipedia.org/')
- self.assertIsInstance(r, threadedhttp.HttpRequest)
+ self.assertIsInstance(r, requests.Response)
self.assertEqual(r.status_code, 200)
self.assertIn('<html lang="mul"', r.text)
self.assertIsInstance(r.text, str)
@@ -106,9 +107,8 @@
response = http.fetch(
'https://testssl-expire-r2i2.disig.sk/index.en.html',
verify=False)
- r = response.text
- self.assertIsInstance(r, str)
- self.assertTrue(re.search(r'<title>.*</title>', r))
+ self.assertIsInstance(response.text, str)
+ self.assertTrue(re.search(r'<title>.*</title>',
response.text))
http.session.close() # clear the connection
# Verify that it now fails again
@@ -166,14 +166,12 @@
# The following will redirect from ' ' -> '_', and maybe to
https://
r = http.fetch('http://en.wikipedia.org/wiki/Main%20Page')
self.assertEqual(r.status_code, 200)
- self.assertIsNotNone(r.data.history)
-
self.assertIn('//en.wikipedia.org/wiki/Main_Page'age',
- r.data.url)
+ self.assertIsNotNone(r.history)
+
self.assertIn('//en.wikipedia.org/wiki/Main_Page'age', r.url)
r = http.fetch('http://en.wikia.com')
self.assertEqual(r.status_code, 200)
- self.assertEqual(r.data.url,
- 'https://www.fandom.com/explore')
+ self.assertEqual(r.url, 'https://www.fandom.com/explore')
class UserAgentTestCase(TestCase):
@@ -282,19 +280,19 @@
r = http.fetch(
self.get_httpbin_url('/status/200'),
headers={'user-agent': 'EXISTING'})
- self.assertEqual(r.headers['user-agent'], 'EXISTING')
+ self.assertEqual(r.request.headers['user-agent'], 'EXISTING')
# Argument value changes
r = http.fetch(self.get_httpbin_url('/status/200'),
use_fake_user_agent=True)
- self.assertNotEqual(r.headers['user-agent'], http.user_agent())
+ self.assertNotEqual(r.request.headers['user-agent'], http.user_agent())
r = http.fetch(self.get_httpbin_url('/status/200'),
use_fake_user_agent=False)
- self.assertEqual(r.headers['user-agent'], http.user_agent())
+ self.assertEqual(r.request.headers['user-agent'], http.user_agent())
r = http.fetch(
self.get_httpbin_url('/status/200'),
use_fake_user_agent='ARBITRARY')
- self.assertEqual(r.headers['user-agent'], 'ARBITRARY')
+ self.assertEqual(r.request.headers['user-agent'], 'ARBITRARY')
# Empty value
self.assertRaisesRegex(ValueError,
@@ -313,7 +311,7 @@
self.get_httpbin_hostname(): 'OVERRIDDEN'}
r = http.fetch(
self.get_httpbin_url('/status/200'), use_fake_user_agent=False)
- self.assertEqual(r.headers['user-agent'], 'OVERRIDDEN')
+ self.assertEqual(r.request.headers['user-agent'], 'OVERRIDDEN')
@require_modules('fake_useragent')
def test_fetch_with_fake_useragent(self):
@@ -370,153 +368,170 @@
UTF8_BYTES = STR.encode('utf8')
@staticmethod
- def _create_request(charset=None, data=UTF8_BYTES):
+ def _create_response(headers=None, data=UTF8_BYTES):
"""Helper method."""
- req = threadedhttp.HttpRequest(charset=charset)
resp = requests.Response()
- resp.headers = {'content-type': 'charset=utf-8'}
+ resp.request = requests.Request()
+ if headers is not None:
+ resp.headers = headers
+ else:
+ resp.headers = {'content-type': 'charset=utf-8'}
resp._content = data[:]
- req.data = resp
- return req
+ return resp
def test_no_content_type(self):
"""Test decoding without content-type (and then no
charset)."""
- req = threadedhttp.HttpRequest('')
- resp = requests.Response()
- resp.headers = {}
- resp._content = CharsetTestCase.LATIN1_BYTES[:]
- req._data = resp
- self.assertIsNone(req.charset)
- self.assertEqual('latin1', req.encoding)
- self.assertEqual(req.content, CharsetTestCase.LATIN1_BYTES)
- self.assertEqual(req.text, CharsetTestCase.STR)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={},
+ data=CharsetTestCase.LATIN1_BYTES)
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('latin1', resp.encoding)
+ self.assertEqual(resp.content, CharsetTestCase.LATIN1_BYTES)
+ self.assertEqual(resp.text, CharsetTestCase.STR)
def test_no_charset(self):
"""Test decoding without explicit charset."""
- req = threadedhttp.HttpRequest('')
- resp = requests.Response()
- resp.headers = {'content-type': ''}
- resp._content = CharsetTestCase.LATIN1_BYTES[:]
- req._data = resp
- self.assertIsNone(req.charset)
- self.assertEqual('latin1', req.encoding)
- self.assertEqual(req.content, CharsetTestCase.LATIN1_BYTES)
- self.assertEqual(req.text, CharsetTestCase.STR)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': ''},
+ data=CharsetTestCase.LATIN1_BYTES)
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('latin1', resp.encoding)
+ self.assertEqual(resp.content, CharsetTestCase.LATIN1_BYTES)
+ self.assertEqual(resp.text, CharsetTestCase.STR)
def test_content_type_application_json_without_charset(self):
"""Test decoding without explicit charset but JSON
content."""
- req = CharsetTestCase._create_request()
- resp = requests.Response()
- req._data = resp
- resp._content = CharsetTestCase.UTF8_BYTES[:]
- resp.headers = {'content-type': 'application/json'}
- self.assertIsNone(req.charset)
- self.assertEqual('utf-8', req.encoding)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': 'application/json'},
+ data=CharsetTestCase.UTF8_BYTES)
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('utf-8', resp.encoding)
def test_content_type_sparql_json_without_charset(self):
"""Test decoding without explicit charset but JSON
content."""
- req = CharsetTestCase._create_request()
- resp = requests.Response()
- req._data = resp
- resp._content = CharsetTestCase.UTF8_BYTES[:]
- resp.headers = {'content-type':
'application/sparql-results+json'}
- self.assertIsNone(req.charset)
- self.assertEqual('utf-8', req.encoding)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': 'application/sparql-results+json'},
+ data=CharsetTestCase.UTF8_BYTES)
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('utf-8', resp.encoding)
def test_content_type_xml_without_charset(self):
"""Test decoding without explicit charset but xml
content."""
- req = CharsetTestCase._create_request()
- resp = requests.Response()
- req._data = resp
- resp._content = CharsetTestCase.UTF8_BYTES[:]
- resp.headers = {'content-type': 'text/xml'}
- self.assertIsNone(req.charset)
- self.assertEqual('utf-8', req.encoding)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': 'application/xml'},
+ data=CharsetTestCase.UTF8_BYTES)
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('utf-8', resp.encoding)
def test_content_type_xml_with_charset(self):
"""Test xml content with utf-8 encoding given in
content."""
- req = CharsetTestCase._create_request()
- resp = requests.Response()
- req._data = resp
- resp._content = '<?xml version="1.0"
encoding="UTF-8"?>'.encode(
- 'utf-8')
- resp.headers = {'content-type': 'text/xml'}
- self.assertIsNone(req.charset)
- self.assertEqual('UTF-8', req.encoding)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': 'application/xml'},
+ data='<?xml version="1.0"
encoding="UTF-8"?>'.encode('utf-8'))
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('UTF-8', resp.encoding)
def test_content_type_xml_with_charset_and_more_data(self):
"""Test xml content with utf-8 encoding given in
content."""
- req = CharsetTestCase._create_request()
- resp = requests.Response()
- req._data = resp
- resp._content = (
- '<?xml version="1.0" encoding="UTF-8"
someparam="ignored"?>'
- .encode('utf-8'))
- resp.headers = {'content-type': 'text/xml'}
- self.assertIsNone(req.charset)
- self.assertEqual('UTF-8', req.encoding)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': 'application/xml'},
+ data='<?xml version="1.0" encoding="UTF-8" '
+ 'someparam="ignored"?>'.encode('utf-8'))
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('UTF-8', resp.encoding)
def test_content_type_xml_with_variant_charset(self):
"""Test xml content with latin1 encoding given in
content."""
- req = CharsetTestCase._create_request()
- resp = requests.Response()
- req._data = resp
- resp._content = "<?xml version='1.0'
encoding='latin1'?>".encode(
- 'latin1')
- resp.headers = {'content-type': 'text/xml'}
- self.assertIsNone(req.charset)
- self.assertEqual('latin1', req.encoding)
+ charset = None
+ resp = CharsetTestCase._create_response(
+ headers={'content-type': 'application/xml'},
+ data="<?xml version='1.0'
encoding='latin1'?>".encode('latin1'))
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('latin1', resp.encoding)
def test_server_charset(self):
"""Test decoding with server explicit charset."""
- req = CharsetTestCase._create_request()
- self.assertIsNone(req.charset)
- self.assertEqual('utf-8', req.encoding)
- self.assertEqual(req.content, CharsetTestCase.UTF8_BYTES)
- self.assertEqual(req.text, CharsetTestCase.STR)
+ charset = None
+ resp = CharsetTestCase._create_response()
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('utf-8', resp.encoding)
+ self.assertEqual(resp.content, CharsetTestCase.UTF8_BYTES)
+ self.assertEqual(resp.text, CharsetTestCase.STR)
def test_same_charset(self):
"""Test decoding with explicit and equal
charsets."""
- req = CharsetTestCase._create_request('utf-8')
- self.assertEqual('utf-8', req.charset)
- self.assertEqual('utf-8', req.encoding)
- self.assertEqual(req.content, CharsetTestCase.UTF8_BYTES)
- self.assertEqual(req.text, CharsetTestCase.STR)
+ charset = 'utf-8'
+ resp = CharsetTestCase._create_response()
+ resp.encoding = http._decide_encoding(resp, charset)
+ self.assertEqual('utf-8', resp.encoding)
+ self.assertEqual(resp.content, CharsetTestCase.UTF8_BYTES)
+ self.assertEqual(resp.text, CharsetTestCase.STR)
def test_header_charset(self):
"""Test decoding with different charsets and valid header
charset."""
- req = CharsetTestCase._create_request('latin1')
- self.assertEqual('latin1', req.charset)
+ charset = 'latin1'
+ resp = CharsetTestCase._create_response()
+ resp.encoding = http._decide_encoding(resp, charset)
# Ignore WARNING: Encoding "latin1" requested but "utf-8"
received
with patch('pywikibot.warning'):
- self.assertEqual('utf-8', req.encoding)
- self.assertEqual(req.content, CharsetTestCase.UTF8_BYTES)
- self.assertEqual(req.text, CharsetTestCase.STR)
+ self.assertEqual('utf-8', resp.encoding)
+ self.assertEqual(resp.content, CharsetTestCase.UTF8_BYTES)
+ self.assertEqual(resp.text, CharsetTestCase.STR)
def test_code_charset(self):
"""Test decoding with different charsets and invalid header
charset."""
- req = CharsetTestCase._create_request('latin1',
- CharsetTestCase.LATIN1_BYTES)
- self.assertEqual('latin1', req.charset)
+ charset = 'latin1'
+ resp = CharsetTestCase._create_response(
+ data=CharsetTestCase.LATIN1_BYTES)
+ resp.encoding = http._decide_encoding(resp, charset)
# Ignore WARNING: Encoding "latin1" requested but "utf-8"
received
with patch('pywikibot.warning'):
- self.assertEqual('latin1', req.encoding)
- self.assertEqual(req.content, CharsetTestCase.LATIN1_BYTES)
- self.assertEqual(req.text, CharsetTestCase.STR)
+ self.assertEqual('latin1', resp.encoding)
+ self.assertEqual(resp.content, CharsetTestCase.LATIN1_BYTES)
+ self.assertEqual(resp.text, CharsetTestCase.STR)
def test_invalid_charset(self):
"""Test decoding with different and invalid
charsets."""
- req = CharsetTestCase._create_request('utf16',
- CharsetTestCase.LATIN1_BYTES)
- self.assertEqual('utf16', req.charset)
+ charset = 'utf16'
+ resp = CharsetTestCase._create_response(
+ data=CharsetTestCase.LATIN1_BYTES)
# Ignore WARNING: Encoding "utf16" requested but "utf-8"
received
with patch('pywikibot.warning'):
self.assertRaisesRegex(
UnicodeDecodeError, self.CODEC_CANT_DECODE_RE,
- lambda: req.encoding)
- self.assertEqual(req.content, CharsetTestCase.LATIN1_BYTES)
- self.assertRaisesRegex(
- UnicodeDecodeError, self.CODEC_CANT_DECODE_RE, lambda: req.text)
+ http._decide_encoding, resp, charset)
+ self.assertEqual(resp.content, CharsetTestCase.LATIN1_BYTES)
+
+ try:
+ resp.encoding = http._decide_encoding(resp, charset)
+ except UnicodeDecodeError as e:
+ resp.encoding = e
+
+ with patch('pywikibot.error'):
+ self.assertRaisesRegex(
+ UnicodeDecodeError, self.CODEC_CANT_DECODE_RE,
+ http.error_handling_callback, resp)
+
+ # TODO: this is a breaking change
+ # self.assertRaisesRegex(
+ # UnicodeDecodeError, self.CODEC_CANT_DECODE_RE, lambda: resp.text)
+
+ # Response() would do:
+ # encoding = UnicodeDecodeError -> str(self.content,
errors='replace')
+ self.assertEqual(
+ resp.text, str(resp.content, errors='replace'))
+ # encoding = None -> str(resp.content, resp.encoding,
errors='replace')
+ resp.encoding = None
+ self.assertEqual(
+ resp.text,
+ str(resp.content, resp.apparent_encoding, errors='replace'))
class BinaryTestCase(TestCase):
diff --git a/tests/site_detect_tests.py b/tests/site_detect_tests.py
index e401db9..36f608d 100644
--- a/tests/site_detect_tests.py
+++ b/tests/site_detect_tests.py
@@ -291,9 +291,7 @@
(object,),
{'status_code': 200,
'text': self._responses[parsed_url.path],
- 'data': type(str('ResponseData'),
- (object,),
- {'url': url})})
+ 'url': url})
@PatchingTestCase.patched(pywikibot, 'input')
def input(self, question, *args, **kwargs):
--
To view, visit
https://gerrit.wikimedia.org/r/c/pywikibot/core/+/651946
To unsubscribe, or for help writing mail filters, visit
https://gerrit.wikimedia.org/r/settings
Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: Ia46683bf39f75790baeb6eed8e49e5fcab2de501
Gerrit-Change-Number: 651946
Gerrit-PatchSet: 20
Gerrit-Owner: Mpaa <mpaa.wiki(a)gmail.com>
Gerrit-Reviewer: Xqt <info(a)gno.de>
Gerrit-Reviewer: jenkins-bot
Gerrit-MessageType: merged