jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/748274 )
Change subject: Revert "logging: Fix redirection for terminal interfaces" ......................................................................
Revert "logging: Fix redirection for terminal interfaces"
This reverts commit ba527de7dc13cb38a1fc904d92197a3c96ca4a54.
Reason for revert: See comment of T283808, missing or delayed output.
Bug: T283808 Change-Id: Ic5a82622444f47aad9ee17548fb9bf9c54489f42 --- M pywikibot/userinterfaces/terminal_interface_base.py M pywikibot/userinterfaces/terminal_interface_unix.py M tests/ui_tests.py 3 files changed, 150 insertions(+), 127 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/userinterfaces/terminal_interface_base.py b/pywikibot/userinterfaces/terminal_interface_base.py index 1d87b67..c288bdc 100755 --- a/pywikibot/userinterfaces/terminal_interface_base.py +++ b/pywikibot/userinterfaces/terminal_interface_base.py @@ -71,9 +71,9 @@ This caches the std-streams locally so any attempts to monkey-patch the streams later will not work. """ - self.stdin = sys.__stdin__ - self.stdout = sys.__stdout__ - self.stderr = sys.__stderr__ + self.stdin = sys.stdin + self.stdout = sys.stdout + self.stderr = sys.stderr self.argv = sys.argv self.encoding = config.console_encoding self.transliteration_target = config.transliteration_target @@ -141,24 +141,8 @@ return cls.split_col_pat.search(color).groups()
def _write(self, text, target_stream): - """ - Optionally encode and write the text to the target stream. - - sys.stderr and sys.stdout are frozen upon initialization to original - streams (which are stored in the sys module as sys.__stderr__ and - sys.__stdout__). This works fine except when using redirect_stderr or - redirect_stdout context managers, where values of global sys.stderr and - sys.stdout are temporarily changed to a redirecting StringIO stream, - in which case writing to the frozen streams (which are still set to - sys.__stderr__ / sys.__stdout__) will not write to the redirecting - stream as expected. So, we check the target stream against the frozen - streams, and then write to the (potentially redirected) sys.stderr or - sys.stdout stream. - """ - if target_stream == self.stderr: - sys.stderr.write(text) - elif target_stream == self.stdout: - sys.stdout.write(text) + """Optionally encode and write the text to the target stream.""" + target_stream.write(text)
def support_color(self, target_stream): """Return whether the target stream does support colors.""" diff --git a/pywikibot/userinterfaces/terminal_interface_unix.py b/pywikibot/userinterfaces/terminal_interface_unix.py index eaba92a..8d82f32 100755 --- a/pywikibot/userinterfaces/terminal_interface_unix.py +++ b/pywikibot/userinterfaces/terminal_interface_unix.py @@ -51,3 +51,7 @@ if bg is not None: bg = unixColors[bg] self._write(self.make_unix_bg_color(bg), target_stream) + + def _write(self, text, target_stream): + """Optionally encode and write the text to the target stream.""" + target_stream.write(text) diff --git a/tests/ui_tests.py b/tests/ui_tests.py index 7eaa8b3..e9af702 100644 --- a/tests/ui_tests.py +++ b/tests/ui_tests.py @@ -7,9 +7,10 @@ import io import logging import os +import sys import unittest -from contextlib import redirect_stdout, suppress -from unittest.mock import patch + +from contextlib import suppress
import pywikibot from pywikibot.bot import ( @@ -32,6 +33,42 @@ from tests.utils import FakeModule
+class Stream: + + """Handler for a StringIO instance able to patch itself.""" + + def __init__(self, name: str, patched_streams: dict): + """ + Create a new stream with a StringIO instance. + + :param name: The part after 'std' (e.g. 'err'). + :param patched_streams: A mapping which maps the original stream to + the patched stream. + """ + self._stream = io.StringIO() + self._name = 'std{}'.format(name) + self._original = getattr(sys, self._name) + patched_streams[self._original] = self._stream + + def __repr__(self): + return '<patched {} {!r} wrapping {!r}>'.format( + self._name, self._stream, self._original) + + def reset(self): + """Reset own stream.""" + self._stream.truncate(0) + self._stream.seek(0) + + +patched_streams = {} +strout = Stream('out', patched_streams) +strerr = Stream('err', patched_streams) +strin = Stream('in', {}) + +newstdout = strout._stream +newstderr = strerr._stream +newstdin = strin._stream + logger = logging.getLogger('pywiki') loggingcontext = {'caller_name': 'ui_tests', 'caller_file': 'ui_tests', @@ -47,21 +84,19 @@
def setUp(self): super().setUp() - # Here we patch standard input, output, and errors, essentially - # redirecting to StringIO streams - self.stdout_patcher = patch('sys.stdout', new_callable=io.StringIO) - self.strout = self.stdout_patcher.start() - self.stderr_patcher = patch('sys.stderr', new_callable=io.StringIO) - self.strerr = self.stderr_patcher.start() - self.stdin_patcher = patch('sys.stdin', new_callable=io.StringIO) - self.strin = self.stdin_patcher.start()
pywikibot.bot.set_interface('terminal')
+ self.org_print = pywikibot.bot.ui._print self.org_input = pywikibot.bot.ui._raw_input
+ pywikibot.bot.ui._print = self._patched_print pywikibot.bot.ui._raw_input = self._patched_input
+ strout.reset() + strerr.reset() + strin.reset() + pywikibot.config.colorized_output = True pywikibot.config.transliterate = False pywikibot.ui.transliteration_target = None @@ -70,16 +105,23 @@ def tearDown(self): super().tearDown()
- self.stdout_patcher.stop() - self.stderr_patcher.stop() - self.stdin_patcher.stop() - + pywikibot.bot.ui._print = self.org_print pywikibot.bot.ui._raw_input = self.org_input
pywikibot.bot.set_interface('buffer')
+ def _patched_print(self, text, target_stream): + try: + stream = patched_streams[target_stream] + except KeyError: + expected = pywikibot.userinterfaces.win32_unicode.UnicodeOutput + assert isinstance(target_stream, expected) + assert target_stream._stream + stream = patched_streams[target_stream._stream] + self.org_print(text, stream) + def _patched_input(self): - return self.strin.readline().strip() + return strin._stream.readline().strip()
class TestExceptionError(Exception): @@ -107,56 +149,56 @@ for text, level, out, err in self.tests: with self.subTest(test=text): logger.log(level, text, extra=loggingcontext) - self.assertEqual(self.strout.getvalue(), out) - self.assertEqual(self.strerr.getvalue(), err) + self.assertEqual(newstdout.getvalue(), out) + self.assertEqual(newstderr.getvalue(), err)
# reset terminal files - for stream in [self.strout, self.strerr, self.strin]: - stream.truncate(0) - stream.seek(0) + strout.reset() + strerr.reset() + strin.reset()
def test_output(self): pywikibot.output('output') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'output\n') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'output\n')
def test_stdout(self): pywikibot.stdout('output') - self.assertEqual(self.strout.getvalue(), 'output\n') - self.assertEqual(self.strerr.getvalue(), '') + self.assertEqual(newstdout.getvalue(), 'output\n') + self.assertEqual(newstderr.getvalue(), '')
def test_warning(self): pywikibot.warning('warning') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'WARNING: warning\n') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'WARNING: warning\n')
def test_error(self): pywikibot.error('error') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'ERROR: error\n') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'ERROR: error\n')
def test_log(self): pywikibot.log('log') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), '')
def test_critical(self): pywikibot.critical('critical') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'CRITICAL: critical\n') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'CRITICAL: critical\n')
def test_debug(self): pywikibot.debug('debug', 'test') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), '')
def test_exception(self): try: raise TestExceptionError('Testing Exception') except TestExceptionError: pywikibot.exception('exception') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'ERROR: TestExceptionError: Testing Exception\n')
def test_exception_tb(self): @@ -164,8 +206,8 @@ raise TestExceptionError('Testing Exception') except TestExceptionError: pywikibot.exception('exception', tb=True) - self.assertEqual(self.strout.getvalue(), '') - stderrlines = self.strerr.getvalue().split('\n') + self.assertEqual(newstdout.getvalue(), '') + stderrlines = newstderr.getvalue().split('\n') self.assertEqual(stderrlines[0], 'ERROR: TestExceptionError: Testing Exception') self.assertEqual(stderrlines[1], 'Traceback (most recent call last):') @@ -188,22 +230,22 @@ input_choice_output = 'question ([A]nswer 1, a[n]swer 2, an[s]wer 3): '
def testInput(self): - self.strin.write('input to read\n') - self.strin.seek(0) + newstdin.write('input to read\n') + newstdin.seek(0) returned = pywikibot.input('question')
- self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'question: ') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'question: ') self.assertIsInstance(returned, str) self.assertEqual(returned, 'input to read')
def test_input_yn(self): - self.strin.write('\n') - self.strin.seek(0) + newstdin.write('\n') + newstdin.seek(0) returned = pywikibot.input_yn('question', False, automatic_quit=False)
- self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'question ([y]es, [N]o): ') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'question ([y]es, [N]o): ') self.assertFalse(returned)
def _call_input_choice(self): @@ -215,39 +257,39 @@ 'A', automatic_quit=False)
- self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertIsInstance(rv, str) return rv
def testInputChoiceDefault(self): - self.strin.write('\n') - self.strin.seek(0) + newstdin.write('\n') + newstdin.seek(0) returned = self._call_input_choice()
self.assertEqual(returned, 'a')
def testInputChoiceCapital(self): - self.strin.write('N\n') - self.strin.seek(0) + newstdin.write('N\n') + newstdin.seek(0) returned = self._call_input_choice()
- self.assertEqual(self.strerr.getvalue(), self.input_choice_output) + self.assertEqual(newstderr.getvalue(), self.input_choice_output) self.assertEqual(returned, 'n')
def testInputChoiceNonCapital(self): - self.strin.write('n\n') - self.strin.seek(0) + newstdin.write('n\n') + newstdin.seek(0) returned = self._call_input_choice()
- self.assertEqual(self.strerr.getvalue(), self.input_choice_output) + self.assertEqual(newstderr.getvalue(), self.input_choice_output) self.assertEqual(returned, 'n')
def testInputChoiceIncorrectAnswer(self): - self.strin.write('X\nN\n') - self.strin.seek(0) + newstdin.write('X\nN\n') + newstdin.seek(0) returned = self._call_input_choice()
- self.assertEqual(self.strerr.getvalue(), self.input_choice_output * 2) + self.assertEqual(newstderr.getvalue(), self.input_choice_output * 2) self.assertEqual(returned, 'n')
def test_input_list_choice(self): @@ -255,9 +297,9 @@ options = ('answer 1', 'answer 2', 'answer 3') rv = pywikibot.bot.input_list_choice('question', options, '2')
- self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertEqual( - self.strerr.getvalue(), + newstderr.getvalue(), ''.join('{}: {}\n'.format(num, items) for num, items in enumerate(options, start=1)) + 'question (default: 2): ') @@ -273,17 +315,17 @@
def testOutputColorizedText(self): pywikibot.output(self.str1) - self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertEqual( - self.strerr.getvalue(), + newstderr.getvalue(), 'text \x1b[95mlight purple text\x1b[0m text\n')
def testOutputNoncolorizedText(self): pywikibot.config.colorized_output = False pywikibot.output(self.str1) - self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertEqual( - self.strerr.getvalue(), + newstderr.getvalue(), 'text light purple text text ***\n')
str2 = ('normal text \03{lightpurple} light purple ' @@ -293,9 +335,9 @@ def testOutputColorCascade_incorrect(self): """Test incorrect behavior of testOutputColorCascade.""" pywikibot.output(self.str2) - self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertEqual( - self.strerr.getvalue(), + newstderr.getvalue(), 'normal text \x1b[95m light purple ' '\x1b[94m light blue \x1b[95m light purple ' '\x1b[0m normal text\n') @@ -308,18 +350,18 @@
def testOutputUnicodeText(self): pywikibot.output('Заглавная_страница') - self.assertEqual(self.strout.getvalue(), '') - self.assertEqual(self.strerr.getvalue(), 'Заглавная_страница\n') + self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(newstderr.getvalue(), 'Заглавная_страница\n')
def testInputUnicodeText(self): - self.strin.write('Заглавная_страница\n') - self.strin.seek(0) + newstdin.write('Заглавная_страница\n') + newstdin.seek(0)
returned = pywikibot.input('Википедию? ')
- self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertEqual( - self.strerr.getvalue(), 'Википедию? ') + newstderr.getvalue(), 'Википедию? ')
self.assertIsInstance(returned, str) self.assertEqual(returned, 'Заглавная_страница') @@ -334,9 +376,9 @@ pywikibot.bot.ui.encoding = 'latin-1' pywikibot.config.transliterate = True pywikibot.output('abcd АБГД αβγδ あいうえお') - self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(newstdout.getvalue(), '') self.assertEqual( - self.strerr.getvalue(), + newstderr.getvalue(), 'abcd \x1b[93mA\x1b[0m\x1b[93mB\x1b[0m\x1b[93mG\x1b[0m' '\x1b[93mD\x1b[0m \x1b[93ma\x1b[0m\x1b[93mb\x1b[0m\x1b[93mg' '\x1b[0m\x1b[93md\x1b[0m \x1b[93ma\x1b[0m\x1b[93mi\x1b[0m' @@ -357,9 +399,8 @@ def setUp(self): """Create dummy instances for the test and patch encounter_color.""" super().setUp() + self.stream = io.StringIO() self.ui_obj = self.ui_class() - # Write to sys.stdout stream, which we'll redirect to the stream below - self.redirect = io.StringIO() self._orig_encounter_color = self.ui_obj.encounter_color self.ui_obj.encounter_color = self._encounter_color self._index = 0 @@ -371,6 +412,10 @@ self.assertEqual(self._index, len(self._colors) if self.expect_color else 0)
+ def _getvalue(self): + """Get the value of the stream.""" + return self.stream.getvalue() + def _encounter_color(self, color, target_stream): """Patched encounter_color method.""" raise AssertionError('This method should not be invoked') @@ -378,51 +423,41 @@ def test_no_color(self): """Test a string without any colors.""" self._colors = () - with redirect_stdout(self.redirect) as f: - self.ui_obj._print('Hello world you!', self.ui_obj.stdout) - self.assertEqual(f.getvalue(), 'Hello world you!') + self.ui_obj._print('Hello world you!', self.stream) + self.assertEqual(self._getvalue(), 'Hello world you!')
def test_one_color(self): """Test a string using one color.""" self._colors = (('red', 6), ('default', 10)) - with redirect_stdout(self.redirect) as f: - self.ui_obj._print('Hello \03{red}world you!', self.ui_obj.stdout) - self.assertEqual(f.getvalue(), self.expected) + self.ui_obj._print('Hello \03{red}world you!', self.stream) + self.assertEqual(self._getvalue(), self.expected)
def test_flat_color(self): """Test using colors with defaulting in between.""" self._colors = (('red', 6), ('default', 6), ('yellow', 3), ('default', 1)) - with redirect_stdout(self.redirect) as f: - self.ui_obj._print( - 'Hello \03{red}world \03{default}you\03{yellow}!', - self.ui_obj.stdout) - self.assertEqual(f.getvalue(), self.expected) + self.ui_obj._print('Hello \03{red}world \03{default}you\03{yellow}!', + self.stream) + self.assertEqual(self._getvalue(), self.expected)
def test_stack_with_pop_color(self): """Test using stacked colors and just popping the latest color.""" self._colors = (('red', 6), ('yellow', 6), ('red', 3), ('default', 1)) - with redirect_stdout(self.redirect) as f: - self.ui_obj._print( - 'Hello \03{red}world \03{yellow}you\03{previous}!', - self.ui_obj.stdout) - self.assertEqual(f.getvalue(), self.expected) + self.ui_obj._print('Hello \03{red}world \03{yellow}you\03{previous}!', + self.stream) + self.assertEqual(self._getvalue(), self.expected)
def test_stack_implicit_color(self): """Test using stacked colors without popping any.""" self._colors = (('red', 6), ('yellow', 6), ('default', 4)) - with redirect_stdout(self.redirect) as f: - self.ui_obj._print('Hello \03{red}world \03{yellow}you!', - self.ui_obj.stdout) - self.assertEqual(f.getvalue(), self.expected) + self.ui_obj._print('Hello \03{red}world \03{yellow}you!', self.stream) + self.assertEqual(self._getvalue(), self.expected)
def test_one_color_newline(self): """Test with trailing new line and one color.""" self._colors = (('red', 6), ('default', 11)) - with redirect_stdout(self.redirect) as f: - self.ui_obj._print('Hello \03{red}world you!\n', - self.ui_obj.stdout) - self.assertEqual(f.getvalue(), self.expected + '\n') + self.ui_obj._print('Hello \03{red}world you!\n', self.stream) + self.assertEqual(self._getvalue(), self.expected + '\n')
class FakeUIColorizedTestBase(TestCase): @@ -454,11 +489,11 @@
def _encounter_color(self, color, target_stream): """Verify that the written data, color and stream are correct.""" - self.assertIs(target_stream, self.ui_obj.stdout) + self.assertIs(target_stream, self.stream) expected_color = self._colors[self._index][0] self._index += 1 self.assertEqual(color, expected_color) - self.assertLength(self.redirect.getvalue(), + self.assertLength(self.stream.getvalue(), sum(e[1] for e in self._colors[:self._index]))
@@ -484,7 +519,7 @@ ctypes = FakeModule.create_dotted('ctypes.windll.kernel32') ctypes.windll.kernel32.SetConsoleTextAttribute = self._handle_setattr terminal_interface_win32.ctypes = ctypes - self.ui_obj.stdout._hConsole = object() + self.stream._hConsole = object()
def tearDown(self): """Unpatch the ctypes import and check that all colors were used.""" @@ -497,12 +532,12 @@
def _handle_setattr(self, handle, attribute): """Dummy method to handle SetConsoleTextAttribute.""" - self.assertIs(handle, self.ui_obj.stdout._hConsole) + self.assertIs(handle, self.stream._hConsole) color = self._colors[self._index][0] self._index += 1 color = terminal_interface_win32.windowsColors[color] self.assertEqual(attribute, color) - self.assertLength(self.redirect.getvalue(), + self.assertLength(self.stream.getvalue(), sum(e[1] for e in self._colors[:self._index]))
@@ -518,7 +553,7 @@ def setUp(self): """Change the local stream's console to None to disable colors.""" super().setUp() - self.ui_obj.stdout._hConsole = None + self.stream._hConsole = None
if __name__ == '__main__': # pragma: no cover
pywikibot-commits@lists.wikimedia.org