jenkins-bot has submitted this change and it was merged.
Change subject: [FEAT] open_compressed: Read magic number ......................................................................
[FEAT] open_compressed: Read magic number
Instead of relying on the filename it's reading the magic number (up to the first 8 bytes) and deciding the strategy with that.
Change-Id: I5a8a66877e779eac5ea7de2b497f87cd75feb3a1 --- M pywikibot/tools/__init__.py M tests/tools_tests.py 2 files changed, 22 insertions(+), 5 deletions(-)
Approvals: John Vandenberg: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/tools/__init__.py b/pywikibot/tools/__init__.py index 41714aa..1e88a27 100644 --- a/pywikibot/tools/__init__.py +++ b/pywikibot/tools/__init__.py @@ -702,7 +702,7 @@ setattr(self._wrapped, name, value)
-def open_compressed(filename): +def open_compressed(filename, use_extension=False): """ Open a file and uncompress it if needed.
@@ -731,11 +731,27 @@ else: return wrapped
- if filename.endswith('.bz2'): + if use_extension: + # if '.' not in filename, it'll be 1 character long but otherwise + # contain the period + extension = filename[filename.rfind('.'):][1:] + else: + with open(filename, 'rb') as f: + magic_number = f.read(8) + if magic_number.startswith(b'BZh'): + extension = 'bz2' + elif magic_number.startswith(b'\x1F\x8B\x08'): + extension = 'gz' + elif magic_number.startswith(b"7z\xBC\xAF'\x1C"): + extension = '7z' + else: + extension = '' + + if extension == 'bz2': return wrap(bz2.BZ2File(filename)) - elif filename.endswith('.gz'): + elif extension == 'gz': return wrap(gzip.open(filename)) - elif filename.endswith('.7z'): + elif extension == '7z': try: process = subprocess.Popen(['7za', 'e', '-bd', '-so', filename], stdout=subprocess.PIPE, diff --git a/tests/tools_tests.py b/tests/tools_tests.py index 9ece411..ceb42cb 100644 --- a/tests/tools_tests.py +++ b/tests/tools_tests.py @@ -87,6 +87,7 @@ self.assertEqual(self._get_content(self.base_file), self.original_content) self.assertEqual(self._get_content(self.base_file + '.bz2'), self.original_content) self.assertEqual(self._get_content(self.base_file + '.gz'), self.original_content) + self.assertEqual(self._get_content(self.base_file + '.bz2', True), self.original_content)
def test_open_compressed_7z(self): """Test open_compressed with 7za if installed.""" @@ -95,7 +96,7 @@ except OSError: raise unittest.SkipTest('7za not installed') self.assertEqual(self._get_content(self.base_file + '.7z'), self.original_content) - self.assertRaises(OSError, self._get_content, self.base_file + '_invalid.7z') + self.assertRaises(OSError, self._get_content, self.base_file + '_invalid.7z', True)
if __name__ == '__main__':
pywikibot-commits@lists.wikimedia.org