jenkins-bot has submitted this change. ( https://gerrit.wikimedia.org/r/c/pywikibot/core/+/769911 )
Change subject: Restore "logging: Fix redirection for terminal interfaces"" ......................................................................
Restore "logging: Fix redirection for terminal interfaces""
This reverts commit 55ae880d0980e774a6c8e6a8383c551d4d8769c9.
Redirection context managers contextlib.stderr and contextlib.stdout redirect from respective streams by temporarily changing the values of global variables sys.stderr or sys.stdout. 'Caching' these variables as instance attributes in UI means that these variable changes are not tracked, leading to unexpected redirect behavior.
One solution might be to explicitly refer to sys.stderr and sys.stdout in the UI code, but handlers initialized with these in the UI will still face the same 'caching' issue.
The solution here is to simply resolve the target stream to (potentially redirected) sys.stderr or sys.stdout at write time. This obviously allows writing to monkey-patched sys streams, which is hopefully not an issue.
Bug: T283808 Change-Id: Ibfd4916525367b6698a05088e6e679da9e0aaea2 --- M pywikibot/userinterfaces/terminal_interface_base.py M tests/ui_tests.py 2 files changed, 146 insertions(+), 145 deletions(-)
Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
diff --git a/pywikibot/userinterfaces/terminal_interface_base.py b/pywikibot/userinterfaces/terminal_interface_base.py index b855863..6b3c64b 100755 --- a/pywikibot/userinterfaces/terminal_interface_base.py +++ b/pywikibot/userinterfaces/terminal_interface_base.py @@ -71,10 +71,15 @@
This caches the std-streams locally so any attempts to monkey-patch the streams later will not work. + + .. versionchanged:: 7.1 + memorize original streams """ - self.stdin = sys.stdin - self.stdout = sys.stdout - self.stderr = sys.stderr + # for Windows GUI they can be None under some conditions + self.stdin = sys.__stdin__ or sys.stdin + self.stdout = sys.__stdout__ or sys.stdout + self.stderr = sys.__stderr__ or sys.stderr + self.argv = sys.argv self.encoding = config.console_encoding self.transliteration_target = config.transliteration_target @@ -145,9 +150,37 @@ """ return cls.split_col_pat.search(color).groups()
- def _write(self, text, target_stream) -> None: - """Optionally encode and write the text to the target stream.""" - target_stream.write(text) + def _write(self, text: str, target_stream) -> None: + """Write the text to the target stream. + + sys.stderr and sys.stdout are frozen upon initialization to + original streams (which are stored in the sys module as + `sys.__stderr__` and `sys.__stdout__`). This works fine except + when using `redirect_stderr` or `redirect_stdout` context + managers, where values of global `sys.stderr` and `sys.stdout` + are temporarily changed to a redirecting `StringIO` stream, in + which case writing to the frozen streams (which are still set to + `sys.__stderr__` / `sys.__stdout__`) will not write to the + redirecting stream as expected. So, we check the target stream + against the frozen streams, and then write to the (potentially + redirected) `sys.stderr` or `sys.stdout` stream. + + .. versionchanged:: 7.1 + instead of writing to `target_stream`, dispatch to + `sys.stderr` or `sys.stdout`. + """ + if target_stream == self.stderr: + sys.stderr.write(text) + elif target_stream == self.stdout: + sys.stdout.write(text) + else: + try: + out, err = self.stdout.name, self.stderr.name + except AttributeError: + out, err = self.stdout, self.stderr + raise OSError( + 'Target stream {} is neither stdin ({}) nor stderr ({})' + .format(target_stream.name, out, err))
def support_color(self, target_stream) -> bool: """Return whether the target stream does support colors.""" diff --git a/tests/ui_tests.py b/tests/ui_tests.py index c9d8109..9f443fd 100644 --- a/tests/ui_tests.py +++ b/tests/ui_tests.py @@ -8,9 +8,9 @@ import io import logging import os -import sys import unittest -from contextlib import suppress +from contextlib import redirect_stdout, suppress +from unittest.mock import patch
import pywikibot from pywikibot.bot import ( @@ -33,42 +33,6 @@ from tests.aspects import TestCase, TestCaseBase
-class Stream: - - """Handler for a StringIO instance able to patch itself.""" - - def __init__(self, name: str, patched_streams: dict): - """ - Create a new stream with a StringIO instance. - - :param name: The part after 'std' (e.g. 'err'). - :param patched_streams: A mapping which maps the original stream to - the patched stream. - """ - self._stream = io.StringIO() - self._name = 'std{}'.format(name) - self._original = getattr(sys, self._name) - patched_streams[self._original] = self._stream - - def __repr__(self): - return '<patched {} {!r} wrapping {!r}>'.format( - self._name, self._stream, self._original) - - def reset(self): - """Reset own stream.""" - self._stream.truncate(0) - self._stream.seek(0) - - -patched_streams = {} -strout = Stream('out', patched_streams) -strerr = Stream('err', patched_streams) -strin = Stream('in', {}) - -newstdout = strout._stream -newstderr = strerr._stream -newstdin = strin._stream - logger = logging.getLogger('pywiki') loggingcontext = {'caller_name': 'ui_tests', 'caller_file': 'ui_tests', @@ -83,45 +47,42 @@ net = False
def setUp(self): + """Setup test. + + Here we patch standard input, output, and errors, essentially + redirecting to `StringIO` streams. + """ super().setUp() + self.stdout_patcher = patch('sys.stdout', new_callable=io.StringIO) + self.strout = self.stdout_patcher.start() + self.stderr_patcher = patch('sys.stderr', new_callable=io.StringIO) + self.strerr = self.stderr_patcher.start() + self.stdin_patcher = patch('sys.stdin', new_callable=io.StringIO) + self.strin = self.stdin_patcher.start()
pywikibot.bot.set_interface('terminal')
- self.org_print = pywikibot.bot.ui._print self.org_input = pywikibot.bot.ui._raw_input - - pywikibot.bot.ui._print = self._patched_print pywikibot.bot.ui._raw_input = self._patched_input
- strout.reset() - strerr.reset() - strin.reset() - pywikibot.config.colorized_output = True pywikibot.config.transliterate = False pywikibot.ui.transliteration_target = None pywikibot.ui.encoding = 'utf-8'
def tearDown(self): + """Cleanup test.""" super().tearDown()
- pywikibot.bot.ui._print = self.org_print - pywikibot.bot.ui._raw_input = self.org_input + self.stdout_patcher.stop() + self.stderr_patcher.stop() + self.stdin_patcher.stop()
+ pywikibot.bot.ui._raw_input = self.org_input pywikibot.bot.set_interface('buffer')
- def _patched_print(self, text, target_stream): - try: - stream = patched_streams[target_stream] - except KeyError: - expected = pywikibot.userinterfaces.win32_unicode.UnicodeOutput - assert isinstance(target_stream, expected) - assert target_stream._stream - stream = patched_streams[target_stream._stream] - self.org_print(text, stream) - def _patched_input(self): - return strin._stream.readline().strip() + return self.strin.readline().strip()
class TestExceptionError(Exception): @@ -149,56 +110,56 @@ for text, level, out, err in self.tests: with self.subTest(test=text): logger.log(level, text, extra=loggingcontext) - self.assertEqual(newstdout.getvalue(), out) - self.assertEqual(newstderr.getvalue(), err) + self.assertEqual(self.strout.getvalue(), out) + self.assertEqual(self.strerr.getvalue(), err)
# reset terminal files - strout.reset() - strerr.reset() - strin.reset() + for stream in [self.strout, self.strerr, self.strin]: + stream.truncate(0) + stream.seek(0)
def test_output(self): pywikibot.output('output') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'output\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'output\n')
def test_stdout(self): pywikibot.stdout('output') - self.assertEqual(newstdout.getvalue(), 'output\n') - self.assertEqual(newstderr.getvalue(), '') + self.assertEqual(self.strout.getvalue(), 'output\n') + self.assertEqual(self.strerr.getvalue(), '')
def test_warning(self): pywikibot.warning('warning') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'WARNING: warning\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'WARNING: warning\n')
def test_error(self): pywikibot.error('error') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'ERROR: error\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'ERROR: error\n')
def test_log(self): pywikibot.log('log') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), '')
def test_critical(self): pywikibot.critical('critical') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'CRITICAL: critical\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'CRITICAL: critical\n')
def test_debug(self): pywikibot.debug('debug', 'test') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), '')
def test_exception(self): try: raise TestExceptionError('Testing Exception') except TestExceptionError: pywikibot.exception('exception') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'ERROR: TestExceptionError: Testing Exception\n')
def test_exception_tb(self): @@ -206,8 +167,8 @@ raise TestExceptionError('Testing Exception') except TestExceptionError: pywikibot.exception('exception', tb=True) - self.assertEqual(newstdout.getvalue(), '') - stderrlines = newstderr.getvalue().split('\n') + self.assertEqual(self.strout.getvalue(), '') + stderrlines = self.strerr.getvalue().split('\n') self.assertEqual(stderrlines[0], 'ERROR: TestExceptionError: Testing Exception') self.assertEqual(stderrlines[1], 'Traceback (most recent call last):') @@ -230,22 +191,22 @@ input_choice_output = 'question ([A]nswer 1, a[n]swer 2, an[s]wer 3): '
def testInput(self): - newstdin.write('input to read\n') - newstdin.seek(0) + self.strin.write('input to read\n') + self.strin.seek(0) returned = pywikibot.input('question')
- self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'question: ') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'question: ') self.assertIsInstance(returned, str) self.assertEqual(returned, 'input to read')
def test_input_yn(self): - newstdin.write('\n') - newstdin.seek(0) + self.strin.write('\n') + self.strin.seek(0) returned = pywikibot.input_yn('question', False, automatic_quit=False)
- self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'question ([y]es, [N]o): ') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'question ([y]es, [N]o): ') self.assertFalse(returned)
def _call_input_choice(self): @@ -257,39 +218,39 @@ 'A', automatic_quit=False)
- self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertIsInstance(rv, str) return rv
def testInputChoiceDefault(self): - newstdin.write('\n') - newstdin.seek(0) + self.strin.write('\n') + self.strin.seek(0) returned = self._call_input_choice()
self.assertEqual(returned, 'a')
def testInputChoiceCapital(self): - newstdin.write('N\n') - newstdin.seek(0) + self.strin.write('N\n') + self.strin.seek(0) returned = self._call_input_choice()
- self.assertEqual(newstderr.getvalue(), self.input_choice_output) + self.assertEqual(self.strerr.getvalue(), self.input_choice_output) self.assertEqual(returned, 'n')
def testInputChoiceNonCapital(self): - newstdin.write('n\n') - newstdin.seek(0) + self.strin.write('n\n') + self.strin.seek(0) returned = self._call_input_choice()
- self.assertEqual(newstderr.getvalue(), self.input_choice_output) + self.assertEqual(self.strerr.getvalue(), self.input_choice_output) self.assertEqual(returned, 'n')
def testInputChoiceIncorrectAnswer(self): - newstdin.write('X\nN\n') - newstdin.seek(0) + self.strin.write('X\nN\n') + self.strin.seek(0) returned = self._call_input_choice()
- self.assertEqual(newstderr.getvalue(), self.input_choice_output * 2) + self.assertEqual(self.strerr.getvalue(), self.input_choice_output * 2) self.assertEqual(returned, 'n')
def test_input_list_choice(self): @@ -297,9 +258,9 @@ options = ('answer 1', 'answer 2', 'answer 3') rv = pywikibot.bot.input_list_choice('question', options, '2')
- self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), ''.join('{}: {}\n'.format(num, items) for num, items in enumerate(options, start=1)) + 'question (default: 2): ') @@ -315,17 +276,17 @@
def testOutputColorizedText(self): pywikibot.output(self.str1) - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'text \x1b[95mlight purple text\x1b[0m text\n')
def testOutputNoncolorizedText(self): pywikibot.config.colorized_output = False pywikibot.output(self.str1) - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'text light purple text text ***\n')
str2 = ('normal text \03{lightpurple} light purple ' @@ -335,9 +296,9 @@ def testOutputColorCascade_incorrect(self): """Test incorrect behavior of testOutputColorCascade.""" pywikibot.output(self.str2) - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'normal text \x1b[95m light purple ' '\x1b[94m light blue \x1b[95m light purple ' '\x1b[0m normal text\n') @@ -350,18 +311,18 @@
def testOutputUnicodeText(self): pywikibot.output('Заглавная_страница') - self.assertEqual(newstdout.getvalue(), '') - self.assertEqual(newstderr.getvalue(), 'Заглавная_страница\n') + self.assertEqual(self.strout.getvalue(), '') + self.assertEqual(self.strerr.getvalue(), 'Заглавная_страница\n')
def testInputUnicodeText(self): - newstdin.write('Заглавная_страница\n') - newstdin.seek(0) + self.strin.write('Заглавная_страница\n') + self.strin.seek(0)
returned = pywikibot.input('Википедию? ')
- self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), 'Википедию? ') + self.strerr.getvalue(), 'Википедию? ')
self.assertIsInstance(returned, str) self.assertEqual(returned, 'Заглавная_страница') @@ -376,9 +337,9 @@ pywikibot.bot.ui.encoding = 'latin-1' pywikibot.config.transliterate = True pywikibot.output('abcd АБГД αβγδ あいうえお') - self.assertEqual(newstdout.getvalue(), '') + self.assertEqual(self.strout.getvalue(), '') self.assertEqual( - newstderr.getvalue(), + self.strerr.getvalue(), 'abcd \x1b[93mA\x1b[0m\x1b[93mB\x1b[0m\x1b[93mG\x1b[0m' '\x1b[93mD\x1b[0m \x1b[93ma\x1b[0m\x1b[93mb\x1b[0m\x1b[93mg' '\x1b[0m\x1b[93md\x1b[0m \x1b[93ma\x1b[0m\x1b[93mi\x1b[0m' @@ -421,8 +382,9 @@ def setUp(self): """Create dummy instances for the test and patch encounter_color.""" super().setUp() - self.stream = io.StringIO() self.ui_obj = self.ui_class() + # Write to sys.stdout stream, which we'll redirect to the stream below + self.redirect = io.StringIO() self._orig_encounter_color = self.ui_obj.encounter_color self.ui_obj.encounter_color = self._encounter_color self._index = 0 @@ -434,10 +396,6 @@ self.assertEqual(self._index, len(self._colors) if self.expect_color else 0)
- def _getvalue(self): - """Get the value of the stream.""" - return self.stream.getvalue() - def _encounter_color(self, color, target_stream): """Patched encounter_color method.""" raise AssertionError('This method should not be invoked') @@ -445,41 +403,51 @@ def test_no_color(self): """Test a string without any colors.""" self._colors = () - self.ui_obj._print('Hello world you!', self.stream) - self.assertEqual(self._getvalue(), 'Hello world you!') + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello world you!', self.ui_obj.stdout) + self.assertEqual(f.getvalue(), 'Hello world you!')
def test_one_color(self): """Test a string using one color.""" self._colors = (('red', 6), ('default', 10)) - self.ui_obj._print('Hello \03{red}world you!', self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello \03{red}world you!', self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_flat_color(self): """Test using colors with defaulting in between.""" self._colors = (('red', 6), ('default', 6), ('yellow', 3), ('default', 1)) - self.ui_obj._print('Hello \03{red}world \03{default}you\03{yellow}!', - self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print( + 'Hello \03{red}world \03{default}you\03{yellow}!', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_stack_with_pop_color(self): """Test using stacked colors and just popping the latest color.""" self._colors = (('red', 6), ('yellow', 6), ('red', 3), ('default', 1)) - self.ui_obj._print('Hello \03{red}world \03{yellow}you\03{previous}!', - self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print( + 'Hello \03{red}world \03{yellow}you\03{previous}!', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_stack_implicit_color(self): """Test using stacked colors without popping any.""" self._colors = (('red', 6), ('yellow', 6), ('default', 4)) - self.ui_obj._print('Hello \03{red}world \03{yellow}you!', self.stream) - self.assertEqual(self._getvalue(), self.expected) + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello \03{red}world \03{yellow}you!', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected)
def test_one_color_newline(self): """Test with trailing new line and one color.""" self._colors = (('red', 6), ('default', 11)) - self.ui_obj._print('Hello \03{red}world you!\n', self.stream) - self.assertEqual(self._getvalue(), self.expected + '\n') + with redirect_stdout(self.redirect) as f: + self.ui_obj._print('Hello \03{red}world you!\n', + self.ui_obj.stdout) + self.assertEqual(f.getvalue(), self.expected + '\n')
class FakeUIColorizedTestBase(TestCase): @@ -504,11 +472,11 @@
def _encounter_color(self, color, target_stream): """Verify that the written data, color and stream are correct.""" - self.assertIs(target_stream, self.stream) + self.assertIs(target_stream, self.ui_obj.stdout) expected_color = self._colors[self._index][0] self._index += 1 self.assertEqual(color, expected_color) - self.assertLength(self.stream.getvalue(), + self.assertLength(self.redirect.getvalue(), sum(e[1] for e in self._colors[:self._index]))
@@ -534,7 +502,7 @@ def setUp(self): """Patch the ctypes import and initialize a stream and UI instance.""" super().setUp() - self.stream.isatty = lambda: self.expect_color + self.ui_obj.stdout.isatty = lambda: self.expect_color
class FakeWin32UncolorizedTest(FakeWin32Test):
pywikibot-commits@lists.wikimedia.org