jenkins-bot submitted this change.

View Change

Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
logging: Fix redirection for terminal interfaces

Redirection context managers contextlib.stderr and contextlib.stdout
redirect from respective streams by temporarily changing the values of
global variables sys.stderr or sys.stdout. 'Caching' these variables as
instance attributes in UI means that these variable changes are not
tracked, leading to unexpected redirect behavior.

One solution might be to explicitly refer to sys.stderr and sys.stdout
in the UI code, but handlers initialized with these in the UI will still
face the same 'caching' issue.

The solution here is to simply resolve the target stream to (potentially
redirected) sys.stderr or sys.stdout at write time. This obviously
allows writing to monkey-patched sys streams, which is hopefully not an
issue.

Bug: T283808
Change-Id: Ia11ae4dfcc87216ad9f5f97aacb1478a44142a66
---
M pywikibot/userinterfaces/terminal_interface_base.py
M pywikibot/userinterfaces/terminal_interface_unix.py
M tests/ui_tests.py
3 files changed, 127 insertions(+), 150 deletions(-)

diff --git a/pywikibot/userinterfaces/terminal_interface_base.py b/pywikibot/userinterfaces/terminal_interface_base.py
index c288bdc..1d87b67 100755
--- a/pywikibot/userinterfaces/terminal_interface_base.py
+++ b/pywikibot/userinterfaces/terminal_interface_base.py
@@ -71,9 +71,9 @@
This caches the std-streams locally so any attempts to
monkey-patch the streams later will not work.
"""
- self.stdin = sys.stdin
- self.stdout = sys.stdout
- self.stderr = sys.stderr
+ self.stdin = sys.__stdin__
+ self.stdout = sys.__stdout__
+ self.stderr = sys.__stderr__
self.argv = sys.argv
self.encoding = config.console_encoding
self.transliteration_target = config.transliteration_target
@@ -141,8 +141,24 @@
return cls.split_col_pat.search(color).groups()

def _write(self, text, target_stream):
- """Optionally encode and write the text to the target stream."""
- target_stream.write(text)
+ """
+ Optionally encode and write the text to the target stream.
+
+ sys.stderr and sys.stdout are frozen upon initialization to original
+ streams (which are stored in the sys module as sys.__stderr__ and
+ sys.__stdout__). This works fine except when using redirect_stderr or
+ redirect_stdout context managers, where values of global sys.stderr and
+ sys.stdout are temporarily changed to a redirecting StringIO stream,
+ in which case writing to the frozen streams (which are still set to
+ sys.__stderr__ / sys.__stdout__) will not write to the redirecting
+ stream as expected. So, we check the target stream against the frozen
+ streams, and then write to the (potentially redirected) sys.stderr or
+ sys.stdout stream.
+ """
+ if target_stream == self.stderr:
+ sys.stderr.write(text)
+ elif target_stream == self.stdout:
+ sys.stdout.write(text)

def support_color(self, target_stream):
"""Return whether the target stream does support colors."""
diff --git a/pywikibot/userinterfaces/terminal_interface_unix.py b/pywikibot/userinterfaces/terminal_interface_unix.py
index 8d82f32..eaba92a 100755
--- a/pywikibot/userinterfaces/terminal_interface_unix.py
+++ b/pywikibot/userinterfaces/terminal_interface_unix.py
@@ -51,7 +51,3 @@
if bg is not None:
bg = unixColors[bg]
self._write(self.make_unix_bg_color(bg), target_stream)
-
- def _write(self, text, target_stream):
- """Optionally encode and write the text to the target stream."""
- target_stream.write(text)
diff --git a/tests/ui_tests.py b/tests/ui_tests.py
index e9af702..7eaa8b3 100644
--- a/tests/ui_tests.py
+++ b/tests/ui_tests.py
@@ -7,10 +7,9 @@
import io
import logging
import os
-import sys
import unittest
-
-from contextlib import suppress
+from contextlib import redirect_stdout, suppress
+from unittest.mock import patch

import pywikibot
from pywikibot.bot import (
@@ -33,42 +32,6 @@
from tests.utils import FakeModule


-class Stream:
-
- """Handler for a StringIO instance able to patch itself."""
-
- def __init__(self, name: str, patched_streams: dict):
- """
- Create a new stream with a StringIO instance.
-
- :param name: The part after 'std' (e.g. 'err').
- :param patched_streams: A mapping which maps the original stream to
- the patched stream.
- """
- self._stream = io.StringIO()
- self._name = 'std{}'.format(name)
- self._original = getattr(sys, self._name)
- patched_streams[self._original] = self._stream
-
- def __repr__(self):
- return '<patched {} {!r} wrapping {!r}>'.format(
- self._name, self._stream, self._original)
-
- def reset(self):
- """Reset own stream."""
- self._stream.truncate(0)
- self._stream.seek(0)
-
-
-patched_streams = {}
-strout = Stream('out', patched_streams)
-strerr = Stream('err', patched_streams)
-strin = Stream('in', {})
-
-newstdout = strout._stream
-newstderr = strerr._stream
-newstdin = strin._stream
-
logger = logging.getLogger('pywiki')
loggingcontext = {'caller_name': 'ui_tests',
'caller_file': 'ui_tests',
@@ -84,19 +47,21 @@

def setUp(self):
super().setUp()
+ # Here we patch standard input, output, and errors, essentially
+ # redirecting to StringIO streams
+ self.stdout_patcher = patch('sys.stdout', new_callable=io.StringIO)
+ self.strout = self.stdout_patcher.start()
+ self.stderr_patcher = patch('sys.stderr', new_callable=io.StringIO)
+ self.strerr = self.stderr_patcher.start()
+ self.stdin_patcher = patch('sys.stdin', new_callable=io.StringIO)
+ self.strin = self.stdin_patcher.start()

pywikibot.bot.set_interface('terminal')

- self.org_print = pywikibot.bot.ui._print
self.org_input = pywikibot.bot.ui._raw_input

- pywikibot.bot.ui._print = self._patched_print
pywikibot.bot.ui._raw_input = self._patched_input

- strout.reset()
- strerr.reset()
- strin.reset()
-
pywikibot.config.colorized_output = True
pywikibot.config.transliterate = False
pywikibot.ui.transliteration_target = None
@@ -105,23 +70,16 @@
def tearDown(self):
super().tearDown()

- pywikibot.bot.ui._print = self.org_print
+ self.stdout_patcher.stop()
+ self.stderr_patcher.stop()
+ self.stdin_patcher.stop()
+
pywikibot.bot.ui._raw_input = self.org_input

pywikibot.bot.set_interface('buffer')

- def _patched_print(self, text, target_stream):
- try:
- stream = patched_streams[target_stream]
- except KeyError:
- expected = pywikibot.userinterfaces.win32_unicode.UnicodeOutput
- assert isinstance(target_stream, expected)
- assert target_stream._stream
- stream = patched_streams[target_stream._stream]
- self.org_print(text, stream)
-
def _patched_input(self):
- return strin._stream.readline().strip()
+ return self.strin.readline().strip()


class TestExceptionError(Exception):
@@ -149,56 +107,56 @@
for text, level, out, err in self.tests:
with self.subTest(test=text):
logger.log(level, text, extra=loggingcontext)
- self.assertEqual(newstdout.getvalue(), out)
- self.assertEqual(newstderr.getvalue(), err)
+ self.assertEqual(self.strout.getvalue(), out)
+ self.assertEqual(self.strerr.getvalue(), err)

# reset terminal files
- strout.reset()
- strerr.reset()
- strin.reset()
+ for stream in [self.strout, self.strerr, self.strin]:
+ stream.truncate(0)
+ stream.seek(0)

def test_output(self):
pywikibot.output('output')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'output\n')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'output\n')

def test_stdout(self):
pywikibot.stdout('output')
- self.assertEqual(newstdout.getvalue(), 'output\n')
- self.assertEqual(newstderr.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), 'output\n')
+ self.assertEqual(self.strerr.getvalue(), '')

def test_warning(self):
pywikibot.warning('warning')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'WARNING: warning\n')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'WARNING: warning\n')

def test_error(self):
pywikibot.error('error')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'ERROR: error\n')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'ERROR: error\n')

def test_log(self):
pywikibot.log('log')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), '')

def test_critical(self):
pywikibot.critical('critical')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'CRITICAL: critical\n')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'CRITICAL: critical\n')

def test_debug(self):
pywikibot.debug('debug', 'test')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), '')

def test_exception(self):
try:
raise TestExceptionError('Testing Exception')
except TestExceptionError:
pywikibot.exception('exception')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(),
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(),
'ERROR: TestExceptionError: Testing Exception\n')

def test_exception_tb(self):
@@ -206,8 +164,8 @@
raise TestExceptionError('Testing Exception')
except TestExceptionError:
pywikibot.exception('exception', tb=True)
- self.assertEqual(newstdout.getvalue(), '')
- stderrlines = newstderr.getvalue().split('\n')
+ self.assertEqual(self.strout.getvalue(), '')
+ stderrlines = self.strerr.getvalue().split('\n')
self.assertEqual(stderrlines[0],
'ERROR: TestExceptionError: Testing Exception')
self.assertEqual(stderrlines[1], 'Traceback (most recent call last):')
@@ -230,22 +188,22 @@
input_choice_output = 'question ([A]nswer 1, a[n]swer 2, an[s]wer 3): '

def testInput(self):
- newstdin.write('input to read\n')
- newstdin.seek(0)
+ self.strin.write('input to read\n')
+ self.strin.seek(0)
returned = pywikibot.input('question')

- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'question: ')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'question: ')
self.assertIsInstance(returned, str)
self.assertEqual(returned, 'input to read')

def test_input_yn(self):
- newstdin.write('\n')
- newstdin.seek(0)
+ self.strin.write('\n')
+ self.strin.seek(0)
returned = pywikibot.input_yn('question', False, automatic_quit=False)

- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'question ([y]es, [N]o): ')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'question ([y]es, [N]o): ')
self.assertFalse(returned)

def _call_input_choice(self):
@@ -257,39 +215,39 @@
'A',
automatic_quit=False)

- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertIsInstance(rv, str)
return rv

def testInputChoiceDefault(self):
- newstdin.write('\n')
- newstdin.seek(0)
+ self.strin.write('\n')
+ self.strin.seek(0)
returned = self._call_input_choice()

self.assertEqual(returned, 'a')

def testInputChoiceCapital(self):
- newstdin.write('N\n')
- newstdin.seek(0)
+ self.strin.write('N\n')
+ self.strin.seek(0)
returned = self._call_input_choice()

- self.assertEqual(newstderr.getvalue(), self.input_choice_output)
+ self.assertEqual(self.strerr.getvalue(), self.input_choice_output)
self.assertEqual(returned, 'n')

def testInputChoiceNonCapital(self):
- newstdin.write('n\n')
- newstdin.seek(0)
+ self.strin.write('n\n')
+ self.strin.seek(0)
returned = self._call_input_choice()

- self.assertEqual(newstderr.getvalue(), self.input_choice_output)
+ self.assertEqual(self.strerr.getvalue(), self.input_choice_output)
self.assertEqual(returned, 'n')

def testInputChoiceIncorrectAnswer(self):
- newstdin.write('X\nN\n')
- newstdin.seek(0)
+ self.strin.write('X\nN\n')
+ self.strin.seek(0)
returned = self._call_input_choice()

- self.assertEqual(newstderr.getvalue(), self.input_choice_output * 2)
+ self.assertEqual(self.strerr.getvalue(), self.input_choice_output * 2)
self.assertEqual(returned, 'n')

def test_input_list_choice(self):
@@ -297,9 +255,9 @@
options = ('answer 1', 'answer 2', 'answer 3')
rv = pywikibot.bot.input_list_choice('question', options, '2')

- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertEqual(
- newstderr.getvalue(),
+ self.strerr.getvalue(),
''.join('{}: {}\n'.format(num, items)
for num, items in enumerate(options, start=1))
+ 'question (default: 2): ')
@@ -315,17 +273,17 @@

def testOutputColorizedText(self):
pywikibot.output(self.str1)
- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertEqual(
- newstderr.getvalue(),
+ self.strerr.getvalue(),
'text \x1b[95mlight purple text\x1b[0m text\n')

def testOutputNoncolorizedText(self):
pywikibot.config.colorized_output = False
pywikibot.output(self.str1)
- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertEqual(
- newstderr.getvalue(),
+ self.strerr.getvalue(),
'text light purple text text ***\n')

str2 = ('normal text \03{lightpurple} light purple '
@@ -335,9 +293,9 @@
def testOutputColorCascade_incorrect(self):
"""Test incorrect behavior of testOutputColorCascade."""
pywikibot.output(self.str2)
- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertEqual(
- newstderr.getvalue(),
+ self.strerr.getvalue(),
'normal text \x1b[95m light purple '
'\x1b[94m light blue \x1b[95m light purple '
'\x1b[0m normal text\n')
@@ -350,18 +308,18 @@

def testOutputUnicodeText(self):
pywikibot.output('Заглавная_страница')
- self.assertEqual(newstdout.getvalue(), '')
- self.assertEqual(newstderr.getvalue(), 'Заглавная_страница\n')
+ self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(self.strerr.getvalue(), 'Заглавная_страница\n')

def testInputUnicodeText(self):
- newstdin.write('Заглавная_страница\n')
- newstdin.seek(0)
+ self.strin.write('Заглавная_страница\n')
+ self.strin.seek(0)

returned = pywikibot.input('Википедию? ')

- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertEqual(
- newstderr.getvalue(), 'Википедию? ')
+ self.strerr.getvalue(), 'Википедию? ')

self.assertIsInstance(returned, str)
self.assertEqual(returned, 'Заглавная_страница')
@@ -376,9 +334,9 @@
pywikibot.bot.ui.encoding = 'latin-1'
pywikibot.config.transliterate = True
pywikibot.output('abcd АБГД αβγδ あいうえお')
- self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(self.strout.getvalue(), '')
self.assertEqual(
- newstderr.getvalue(),
+ self.strerr.getvalue(),
'abcd \x1b[93mA\x1b[0m\x1b[93mB\x1b[0m\x1b[93mG\x1b[0m'
'\x1b[93mD\x1b[0m \x1b[93ma\x1b[0m\x1b[93mb\x1b[0m\x1b[93mg'
'\x1b[0m\x1b[93md\x1b[0m \x1b[93ma\x1b[0m\x1b[93mi\x1b[0m'
@@ -399,8 +357,9 @@
def setUp(self):
"""Create dummy instances for the test and patch encounter_color."""
super().setUp()
- self.stream = io.StringIO()
self.ui_obj = self.ui_class()
+ # Write to sys.stdout stream, which we'll redirect to the stream below
+ self.redirect = io.StringIO()
self._orig_encounter_color = self.ui_obj.encounter_color
self.ui_obj.encounter_color = self._encounter_color
self._index = 0
@@ -412,10 +371,6 @@
self.assertEqual(self._index,
len(self._colors) if self.expect_color else 0)

- def _getvalue(self):
- """Get the value of the stream."""
- return self.stream.getvalue()
-
def _encounter_color(self, color, target_stream):
"""Patched encounter_color method."""
raise AssertionError('This method should not be invoked')
@@ -423,41 +378,51 @@
def test_no_color(self):
"""Test a string without any colors."""
self._colors = ()
- self.ui_obj._print('Hello world you!', self.stream)
- self.assertEqual(self._getvalue(), 'Hello world you!')
+ with redirect_stdout(self.redirect) as f:
+ self.ui_obj._print('Hello world you!', self.ui_obj.stdout)
+ self.assertEqual(f.getvalue(), 'Hello world you!')

def test_one_color(self):
"""Test a string using one color."""
self._colors = (('red', 6), ('default', 10))
- self.ui_obj._print('Hello \03{red}world you!', self.stream)
- self.assertEqual(self._getvalue(), self.expected)
+ with redirect_stdout(self.redirect) as f:
+ self.ui_obj._print('Hello \03{red}world you!', self.ui_obj.stdout)
+ self.assertEqual(f.getvalue(), self.expected)

def test_flat_color(self):
"""Test using colors with defaulting in between."""
self._colors = (('red', 6), ('default', 6), ('yellow', 3),
('default', 1))
- self.ui_obj._print('Hello \03{red}world \03{default}you\03{yellow}!',
- self.stream)
- self.assertEqual(self._getvalue(), self.expected)
+ with redirect_stdout(self.redirect) as f:
+ self.ui_obj._print(
+ 'Hello \03{red}world \03{default}you\03{yellow}!',
+ self.ui_obj.stdout)
+ self.assertEqual(f.getvalue(), self.expected)

def test_stack_with_pop_color(self):
"""Test using stacked colors and just popping the latest color."""
self._colors = (('red', 6), ('yellow', 6), ('red', 3), ('default', 1))
- self.ui_obj._print('Hello \03{red}world \03{yellow}you\03{previous}!',
- self.stream)
- self.assertEqual(self._getvalue(), self.expected)
+ with redirect_stdout(self.redirect) as f:
+ self.ui_obj._print(
+ 'Hello \03{red}world \03{yellow}you\03{previous}!',
+ self.ui_obj.stdout)
+ self.assertEqual(f.getvalue(), self.expected)

def test_stack_implicit_color(self):
"""Test using stacked colors without popping any."""
self._colors = (('red', 6), ('yellow', 6), ('default', 4))
- self.ui_obj._print('Hello \03{red}world \03{yellow}you!', self.stream)
- self.assertEqual(self._getvalue(), self.expected)
+ with redirect_stdout(self.redirect) as f:
+ self.ui_obj._print('Hello \03{red}world \03{yellow}you!',
+ self.ui_obj.stdout)
+ self.assertEqual(f.getvalue(), self.expected)

def test_one_color_newline(self):
"""Test with trailing new line and one color."""
self._colors = (('red', 6), ('default', 11))
- self.ui_obj._print('Hello \03{red}world you!\n', self.stream)
- self.assertEqual(self._getvalue(), self.expected + '\n')
+ with redirect_stdout(self.redirect) as f:
+ self.ui_obj._print('Hello \03{red}world you!\n',
+ self.ui_obj.stdout)
+ self.assertEqual(f.getvalue(), self.expected + '\n')


class FakeUIColorizedTestBase(TestCase):
@@ -489,11 +454,11 @@

def _encounter_color(self, color, target_stream):
"""Verify that the written data, color and stream are correct."""
- self.assertIs(target_stream, self.stream)
+ self.assertIs(target_stream, self.ui_obj.stdout)
expected_color = self._colors[self._index][0]
self._index += 1
self.assertEqual(color, expected_color)
- self.assertLength(self.stream.getvalue(),
+ self.assertLength(self.redirect.getvalue(),
sum(e[1] for e in self._colors[:self._index]))


@@ -519,7 +484,7 @@
ctypes = FakeModule.create_dotted('ctypes.windll.kernel32')
ctypes.windll.kernel32.SetConsoleTextAttribute = self._handle_setattr
terminal_interface_win32.ctypes = ctypes
- self.stream._hConsole = object()
+ self.ui_obj.stdout._hConsole = object()

def tearDown(self):
"""Unpatch the ctypes import and check that all colors were used."""
@@ -532,12 +497,12 @@

def _handle_setattr(self, handle, attribute):
"""Dummy method to handle SetConsoleTextAttribute."""
- self.assertIs(handle, self.stream._hConsole)
+ self.assertIs(handle, self.ui_obj.stdout._hConsole)
color = self._colors[self._index][0]
self._index += 1
color = terminal_interface_win32.windowsColors[color]
self.assertEqual(attribute, color)
- self.assertLength(self.stream.getvalue(),
+ self.assertLength(self.redirect.getvalue(),
sum(e[1] for e in self._colors[:self._index]))


@@ -553,7 +518,7 @@
def setUp(self):
"""Change the local stream's console to None to disable colors."""
super().setUp()
- self.stream._hConsole = None
+ self.ui_obj.stdout._hConsole = None


if __name__ == '__main__': # pragma: no cover

To view, visit change 743725. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: Ia11ae4dfcc87216ad9f5f97aacb1478a44142a66
Gerrit-Change-Number: 743725
Gerrit-PatchSet: 3
Gerrit-Owner: Chris Maynor <cmchrismaynor@gmail.com>
Gerrit-Reviewer: Merlijn van Deen <valhallasw@arctus.nl>
Gerrit-Reviewer: Xqt <info@gno.de>
Gerrit-Reviewer: jenkins-bot
Gerrit-MessageType: merged