jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/743725 )
Change subject: logging: Fix redirection for terminal interfaces ......................................................................
logging: Fix redirection for terminal interfaces
Redirection context managers contextlib.stderr and contextlib.stdout redirect from respective streams by temporarily changing the values of global variables sys.stderr or sys.stdout. 'Caching' these variables as instance attributes in UI means that these variable changes are not tracked, leading to unexpected redirect behavior.
One solution might be to explicitly refer to sys.stderr and sys.stdout in the UI code, but handlers initialized with these in the UI will still face the same 'caching' issue.
The solution here is to simply resolve the target stream to (potentially redirected) sys.stderr or sys.stdout at write time. This obviously allows writing to monkey-patched sys streams, which is hopefully not an issue.
Bug: T283808 Change-Id: Ia11ae4dfcc87216ad9f5f97aacb1478a44142a66 --- M pywikibot/userinterfaces/terminal_interface_base.py M pywikibot/userinterfaces/terminal_interface_unix.py M tests/ui_tests.py 3 files changed, 127 insertions(+), 150 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/userinterfaces/terminal_interface_base.py b/pywikibot/userinterfaces/terminal_interface_base.py index c288bdc..1d87b67 100755 --- a/pywikibot/userinterfaces/terminal_interface_base.py +++ b/pywikibot/userinterfaces/terminal_interface_base.py @@ -71,9 +71,9 @@ This caches the std-streams locally so any attempts to monkey-patch the streams later will not work. """ - self.stdin = sys.stdin - self.stdout = sys.stdout - self.stderr = sys.stderr + self.stdin = sys.__stdin__ + self.stdout = sys.__stdout__ + self.stderr = sys.__stderr__ self.argv = sys.argv self.encoding = config.console_encoding self.transliteration_target = config.transliteration_target @@ -141,8 +141,24 @@ return cls.split_col_pat.search(color).groups()
def _write(self, text, target_stream): - """Optionally encode and write the text to the target stream.""" - target_stream.write(text) + """ + Optionally encode and write the text to the target stream. + + sys.stderr and sys.stdout are frozen upon initialization to original + streams (which are stored in the sys module as sys.__stderr__ and + sys.__stdout__). This works fine except when using redirect_stderr or + redirect_stdout context managers, where values of global sys.stderr and + sys.stdout are temporarily changed to a redirecting StringIO stream, + in which case writing to the frozen streams (which are still set to + sys.__stderr__ / sys.__stdout__) will not write to the redirecting + stream as expected. So, we check the target stream against the frozen + streams, and then write to the (potentially redirected) sys.stderr or + sys.stdout stream. + """ + if target_stream == self.stderr: + sys.stderr.write(text) + elif target_stream == self.stdout: + sys.stdout.write(text)
def support_color(self, target_stream): """Return whether the target stream does support colors.""" diff --git a/pywikibot/userinterfaces/terminal_interface_unix.py b/pywikibot/userinterfaces/terminal_interface_unix.py index 8d82f32..eaba92a 100755 --- a/pywikibot/userinterfaces/terminal_interface_unix.py +++ b/pywikibot/userinterfaces/terminal_interface_unix.py @@ -51,7 +51,3 @@ if bg is not None: bg = unixColors[bg] self._write(self.make_unix_bg_color(bg), target_stream) - - def _write(self, text, target_stream): - """Optionally encode and write the text to the target stream.""" - target_stream.write(text) diff --git a/tests/ui_tests.py b/tests/ui_tests.py index e9af702..7eaa8b3 100644 --- a/tests/ui_tests.py +++ b/tests/ui_tests.py @@ -7,10 +7,9 @@ import io import logging import os -import sys import unittest - -from contextlib import suppress +from contextlib import redirect_stdout, suppress +from unittest.mock import patch
import pywikibot from pywikibot.bot import ( @@ -33,42 +32,6 @@ from tests.utils import FakeModule
-class Stream: - - """Handler for a StringIO instance able to patch itself.""" - - def __init__(self, name: str, patched_streams: dict): - """ - Create a new stream with a StringIO instance. - - :param name: The part after 'std' (e.g. 'err'). - :param patched_streams: A mapping which maps the original stream to - the patched stream. - """ - self._stream = io.StringIO() - self._name = 'std{}'.format(name) - self._original = getattr(sys, self._name) - patched_streams[self._original] = self._stream - - def __repr__(self): - return '<patched {} {!r} wrapping {!r}>'.format( - self._name, self._stream, self._original) - - def reset(self): - """Reset own stream.""" - self._stream.truncate(0) - self._stream.seek(0) - - -patched_streams = {} -strout = Stream('out', patched_streams) -strerr = Stream('err', patched_streams) -strin = Stream('in', {}) - -newstdout = strout._stream -newstderr = strerr._stream -newstdin = strin._stream - logger = logging.getLogger('pywiki') loggingcontext = {'caller_name': 'ui_tests', 'caller_file': 'ui_tests', @@ -84,19 +47,21 @@
def setUp(self): super().setUp() + # Here we patch standard input, output, and errors, essentially + # redirecting to StringIO streams + self.stdout_patcher = patch('sys.stdout', new_callable=io.StringIO) + self.strout = self.stdout_patcher.start() + self.stderr_patcher = patch('sys.stderr', new_callable=io.StringIO) + self.strerr = self.stderr_patcher.start() + self.stdin_patcher = patch('sys.stdin', new_callable=io.StringIO) + self.strin = self.stdin_patcher.start()
pywikibot.bot.set_interface('terminal')
- self.org_print = pywikibot.bot.ui._print self.org_input = pywikibot.bot.ui._raw_input
- pywikibot.bot.ui._print = self._patched_print pywikibot.bot.ui._raw_input = self._patched_input
- strout.reset() - strerr.reset() - strin.reset() - pywikibot.config.colorized_output = True pywikibot.config.transliterate = False pywikibot.ui.transliteration_target = None @@ -105,23 +70,16 @@ def tearDown(self): super().tearDown()
- pywikibot.bot.ui._print = self.org_print + self.stdout_patcher.stop() + self.stderr_patcher.stop() + self.stdin_patcher.stop() + pywikibot.bot.ui._raw_input = self.org_input
pywikibot.bot.set_interface('buffer')
- def _patched_print(self, text, target_stream): - try: - stream = patched_streams[target_stream] - except KeyError: - expected = pywikibot.userinterfaces.win32_unicode.UnicodeOutput - assert isinstance(target_stream, expected) - assert target_stream._stream - stream = patched_streams[target_stream._stream] - self.org_print(text, stream) - def _patched_input(self): - return strin._stream.readline().strip() + return self.strin.readline().strip()
class TestExceptionError(Exception): @@ -149,56 +107,56 @@ for text, level, out, err in self.tests: with self.subTest(test=text): logger.log(level, text, extra=loggingcontext) - self.assertEqual(newstdout.getvalue(), out) - self.assertEqual(newstderr.getvalue(), err) + self.assertEqual(self.strout.getvalue(), out) + self.assertEqual(self.strerr.getvalue(), err)
# reset terminal files - strout.reset() - strerr.reset() - strin.reset() + for stream in [self.strout, self.strerr, self.strin]: + stream.truncate(0) + stream.seek(0)
def test_output(self): pywikibot.output('output') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'output\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'output\n')
def test_stdout(self): pywikibot.stdout('output') - self.assertEqual(newstdout.getvalue(), 'output\n') - self.assertEqual(newstderr.getvalue(), '') + self.assertEqual(self.strout.getvalue(), 'output\n') + self.assertEqual(self.strerr.getvalue(), '')
def test_warning(self): pywikibot.warning('warning') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'WARNING: warning\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'WARNING: warning\n')
def test_error(self): pywikibot.error('error') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'ERROR: error\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'ERROR: error\n')
def test_log(self): pywikibot.log('log') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), '')
def test_critical(self): pywikibot.critical('critical') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'CRITICAL: critical\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'CRITICAL: critical\n')
def test_debug(self): pywikibot.debug('debug', 'test') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), '')
def test_exception(self): try: raise TestExceptionError('Testing Exception') except TestExceptionError: pywikibot.exception('exception') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'ERROR: TestExceptionError: Testing Exception\n')
def test_exception_tb(self): @@ -206,8 +164,8 @@ raise TestExceptionError('Testing Exception') except TestExceptionError: pywikibot.exception('exception', tb=True) - self.assertEqual(newstdout.getvalue(), '') - stderrlines = newstderr.getvalue().split('\n') + self.assertEqual(self.strout.getvalue(), '') + stderrlines = self.strerr.getvalue().split('\n') self.assertEqual(stderrlines[0], 'ERROR: TestExceptionError: Testing Exception') self.assertEqual(stderrlines[1], 'Traceback (most recent call last):') @@ -230,22 +188,22 @@ input_choice_output = 'question ([A]nswer 1, a[n]swer 2, an[s]wer 3): '
def testInput(self): - newstdin.write('input to read\n') - newstdin.seek(0) + self.strin.write('input to read\n') + self.strin.seek(0) returned = pywikibot.input('question')
- self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'question: ') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'question: ') self.assertIsInstance(returned, str) self.assertEqual(returned, 'input to read')
def test_input_yn(self): - newstdin.write('\n') - newstdin.seek(0) + self.strin.write('\n') + self.strin.seek(0) returned = pywikibot.input_yn('question', False, automatic_quit=False)
- self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'question ([y]es, [N]o): ') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'question ([y]es, [N]o): ') self.assertFalse(returned)
def _call_input_choice(self): @@ -257,39 +215,39 @@ 'A', automatic_quit=False)
- self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertIsInstance(rv, str) return rv
def testInputChoiceDefault(self): - newstdin.write('\n') - newstdin.seek(0) + self.strin.write('\n') + self.strin.seek(0) returned = self._call_input_choice()
self.assertEqual(returned, 'a')
def testInputChoiceCapital(self): - newstdin.write('N\n') - newstdin.seek(0) + self.strin.write('N\n') + self.strin.seek(0) returned = self._call_input_choice()
- self.assertEqual(newstderr.getvalue(), self.input_choice_output) + self.assertEqual(self.strerr.getvalue(), self.input_choice_output) self.assertEqual(returned, 'n')
def testInputChoiceNonCapital(self): - newstdin.write('n\n') - newstdin.seek(0) + self.strin.write('n\n') + self.strin.seek(0) returned = self._call_input_choice()
- self.assertEqual(newstderr.getvalue(), self.input_choice_output) + self.assertEqual(self.strerr.getvalue(), self.input_choice_output) self.assertEqual(returned, 'n')
def testInputChoiceIncorrectAnswer(self): - newstdin.write('X\nN\n') - newstdin.seek(0) + self.strin.write('X\nN\n') + self.strin.seek(0) returned = self._call_input_choice()
- self.assertEqual(newstderr.getvalue(), self.input_choice_output * 2) + self.assertEqual(self.strerr.getvalue(), self.input_choice_output * 2) self.assertEqual(returned, 'n')
def test_input_list_choice(self): @@ -297,9 +255,9 @@ options = ('answer 1', 'answer 2', 'answer 3') rv = pywikibot.bot.input_list_choice('question', options, '2')
- self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), ''.join('{}: {}\n'.format(num, items) for num, items in enumerate(options, start=1)) + 'question (default: 2): ') @@ -315,17 +273,17 @@
def testOutputColorizedText(self): pywikibot.output(self.str1) - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'text \x1b[95mlight purple text\x1b[0m text\n')
def testOutputNoncolorizedText(self): pywikibot.config.colorized_output = False pywikibot.output(self.str1) - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'text light purple text text ***\n')
str2 = ('normal text \03{lightpurple} light purple ' @@ -335,9 +293,9 @@ def testOutputColorCascade_incorrect(self): """Test incorrect behavior of testOutputColorCascade.""" pywikibot.output(self.str2) - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'normal text \x1b[95m light purple ' '\x1b[94m light blue \x1b[95m light purple ' '\x1b[0m normal text\n') @@ -350,18 +308,18 @@
def testOutputUnicodeText(self): pywikibot.output('Заглавная_страница') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'Заглавная_страница\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'Заглавная_страница\n')
def testInputUnicodeText(self): - newstdin.write('Заглавная_страница\n') - newstdin.seek(0) + self.strin.write('Заглавная_страница\n') + self.strin.seek(0)
returned = pywikibot.input('Википедию? ')
- self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), 'Википедию? ') + self.strerr.getvalue(), 'Википедию? ')
self.assertIsInstance(returned, str) self.assertEqual(returned, 'Заглавная_страница') @@ -376,9 +334,9 @@ pywikibot.bot.ui.encoding = 'latin-1' pywikibot.config.transliterate = True pywikibot.output('abcd АБГД αβγδ あいうえお') - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'abcd \x1b[93mA\x1b[0m\x1b[93mB\x1b[0m\x1b[93mG\x1b[0m' '\x1b[93mD\x1b[0m \x1b[93ma\x1b[0m\x1b[93mb\x1b[0m\x1b[93mg' '\x1b[0m\x1b[93md\x1b[0m \x1b[93ma\x1b[0m\x1b[93mi\x1b[0m' @@ -399,8 +357,9 @@ def setUp(self): """Create dummy instances for the test and patch encounter_color.""" super().setUp() - self.stream = io.StringIO() self.ui_obj = self.ui_class() + # Write to sys.stdout stream, which we'll redirect to the stream below + self.redirect = io.StringIO() self._orig_encounter_color = self.ui_obj.encounter_color self.ui_obj.encounter_color = self._encounter_color self._index = 0 @@ -412,10 +371,6 @@ self.assertEqual(self._index, len(self._colors) if self.expect_color else 0)
- def _getvalue(self): - """Get the value of the stream.""" - return self.stream.getvalue() - def _encounter_color(self, color, target_stream): """Patched encounter_color method.""" raise AssertionError('This method should not be invoked') @@ -423,41 +378,51 @@ def test_no_color(self): """Test a string without any colors.""" self._colors = () - self.ui_obj._print('Hello world you!', self.stream) - self.assertEqual(self._getvalue(), 'Hello world you!') + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello world you!', self.ui_obj.stdout) + self.assertEqual(f.getvalue(), 'Hello world you!')
def test_one_color(self): """Test a string using one color.""" self._colors = (('red', 6), ('default', 10)) - self.ui_obj._print('Hello \03{red}world you!', self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello \03{red}world you!', self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_flat_color(self): """Test using colors with defaulting in between.""" self._colors = (('red', 6), ('default', 6), ('yellow', 3), ('default', 1)) - self.ui_obj._print('Hello \03{red}world \03{default}you\03{yellow}!', - self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print( + 'Hello \03{red}world \03{default}you\03{yellow}!', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_stack_with_pop_color(self): """Test using stacked colors and just popping the latest color.""" self._colors = (('red', 6), ('yellow', 6), ('red', 3), ('default', 1)) - self.ui_obj._print('Hello \03{red}world \03{yellow}you\03{previous}!', - self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print( + 'Hello \03{red}world \03{yellow}you\03{previous}!', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_stack_implicit_color(self): """Test using stacked colors without popping any.""" self._colors = (('red', 6), ('yellow', 6), ('default', 4)) - self.ui_obj._print('Hello \03{red}world \03{yellow}you!', self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello \03{red}world \03{yellow}you!', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_one_color_newline(self): """Test with trailing new line and one color.""" self._colors = (('red', 6), ('default', 11)) - self.ui_obj._print('Hello \03{red}world you!\n', self.stream) - self.assertEqual(self._getvalue(), self.expected + '\n') + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello \03{red}world you!\n', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected + '\n')
class FakeUIColorizedTestBase(TestCase): @@ -489,11 +454,11 @@
def _encounter_color(self, color, target_stream): """Verify that the written data, color and stream are correct.""" - self.assertIs(target_stream, self.stream) + self.assertIs(target_stream, self.ui_obj.stdout) expected_color = self._colors[self._index][0] self._index += 1 self.assertEqual(color, expected_color) - self.assertLength(self.stream.getvalue(), + self.assertLength(self.redirect.getvalue(), sum(e[1] for e in self._colors[:self._index]))
@@ -519,7 +484,7 @@ ctypes = FakeModule.create_dotted('ctypes.windll.kernel32') ctypes.windll.kernel32.SetConsoleTextAttribute = self._handle_setattr terminal_interface_win32.ctypes = ctypes - self.stream._hConsole = object() + self.ui_obj.stdout._hConsole = object()
def tearDown(self): """Unpatch the ctypes import and check that all colors were used.""" @@ -532,12 +497,12 @@
def _handle_setattr(self, handle, attribute): """Dummy method to handle SetConsoleTextAttribute.""" - self.assertIs(handle, self.stream._hConsole) + self.assertIs(handle, self.ui_obj.stdout._hConsole) color = self._colors[self._index][0] self._index += 1 color = terminal_interface_win32.windowsColors[color] self.assertEqual(attribute, color) - self.assertLength(self.stream.getvalue(), + self.assertLength(self.redirect.getvalue(), sum(e[1] for e in self._colors[:self._index]))
@@ -553,7 +518,7 @@ def setUp(self): """Change the local stream's console to None to disable colors.""" super().setUp() - self.stream._hConsole = None + self.ui_obj.stdout._hConsole = None
if __name__ == '__main__': # pragma: no cover
pywikibot-commits@lists.wikimedia.org