jenkins-bot submitted this change.

View Change

Approvals: Xqt: Looks good to me, approved jenkins-bot: Verified
Revert "logging: Fix redirection for terminal interfaces"

This reverts commit ba527de7dc13cb38a1fc904d92197a3c96ca4a54.

Reason for revert: See comment of T283808, missing or delayed output.

Bug: T283808
Change-Id: Ic5a82622444f47aad9ee17548fb9bf9c54489f42
---
M pywikibot/userinterfaces/terminal_interface_base.py
M pywikibot/userinterfaces/terminal_interface_unix.py
M tests/ui_tests.py
3 files changed, 150 insertions(+), 127 deletions(-)

diff --git a/pywikibot/userinterfaces/terminal_interface_base.py b/pywikibot/userinterfaces/terminal_interface_base.py
index 1d87b67..c288bdc 100755
--- a/pywikibot/userinterfaces/terminal_interface_base.py
+++ b/pywikibot/userinterfaces/terminal_interface_base.py
@@ -71,9 +71,9 @@
This caches the std-streams locally so any attempts to
monkey-patch the streams later will not work.
"""
- self.stdin = sys.__stdin__
- self.stdout = sys.__stdout__
- self.stderr = sys.__stderr__
+ self.stdin = sys.stdin
+ self.stdout = sys.stdout
+ self.stderr = sys.stderr
self.argv = sys.argv
self.encoding = config.console_encoding
self.transliteration_target = config.transliteration_target
@@ -141,24 +141,8 @@
return cls.split_col_pat.search(color).groups()

def _write(self, text, target_stream):
- """
- Optionally encode and write the text to the target stream.
-
- sys.stderr and sys.stdout are frozen upon initialization to original
- streams (which are stored in the sys module as sys.__stderr__ and
- sys.__stdout__). This works fine except when using redirect_stderr or
- redirect_stdout context managers, where values of global sys.stderr and
- sys.stdout are temporarily changed to a redirecting StringIO stream,
- in which case writing to the frozen streams (which are still set to
- sys.__stderr__ / sys.__stdout__) will not write to the redirecting
- stream as expected. So, we check the target stream against the frozen
- streams, and then write to the (potentially redirected) sys.stderr or
- sys.stdout stream.
- """
- if target_stream == self.stderr:
- sys.stderr.write(text)
- elif target_stream == self.stdout:
- sys.stdout.write(text)
+ """Optionally encode and write the text to the target stream."""
+ target_stream.write(text)

def support_color(self, target_stream):
"""Return whether the target stream does support colors."""
diff --git a/pywikibot/userinterfaces/terminal_interface_unix.py b/pywikibot/userinterfaces/terminal_interface_unix.py
index eaba92a..8d82f32 100755
--- a/pywikibot/userinterfaces/terminal_interface_unix.py
+++ b/pywikibot/userinterfaces/terminal_interface_unix.py
@@ -51,3 +51,7 @@
if bg is not None:
bg = unixColors[bg]
self._write(self.make_unix_bg_color(bg), target_stream)
+
+ def _write(self, text, target_stream):
+ """Optionally encode and write the text to the target stream."""
+ target_stream.write(text)
diff --git a/tests/ui_tests.py b/tests/ui_tests.py
index 7eaa8b3..e9af702 100644
--- a/tests/ui_tests.py
+++ b/tests/ui_tests.py
@@ -7,9 +7,10 @@
import io
import logging
import os
+import sys
import unittest
-from contextlib import redirect_stdout, suppress
-from unittest.mock import patch
+
+from contextlib import suppress

import pywikibot
from pywikibot.bot import (
@@ -32,6 +33,42 @@
from tests.utils import FakeModule


+class Stream:
+
+ """Handler for a StringIO instance able to patch itself."""
+
+ def __init__(self, name: str, patched_streams: dict):
+ """
+ Create a new stream with a StringIO instance.
+
+ :param name: The part after 'std' (e.g. 'err').
+ :param patched_streams: A mapping which maps the original stream to
+ the patched stream.
+ """
+ self._stream = io.StringIO()
+ self._name = 'std{}'.format(name)
+ self._original = getattr(sys, self._name)
+ patched_streams[self._original] = self._stream
+
+ def __repr__(self):
+ return '<patched {} {!r} wrapping {!r}>'.format(
+ self._name, self._stream, self._original)
+
+ def reset(self):
+ """Reset own stream."""
+ self._stream.truncate(0)
+ self._stream.seek(0)
+
+
+patched_streams = {}
+strout = Stream('out', patched_streams)
+strerr = Stream('err', patched_streams)
+strin = Stream('in', {})
+
+newstdout = strout._stream
+newstderr = strerr._stream
+newstdin = strin._stream
+
logger = logging.getLogger('pywiki')
loggingcontext = {'caller_name': 'ui_tests',
'caller_file': 'ui_tests',
@@ -47,21 +84,19 @@

def setUp(self):
super().setUp()
- # Here we patch standard input, output, and errors, essentially
- # redirecting to StringIO streams
- self.stdout_patcher = patch('sys.stdout', new_callable=io.StringIO)
- self.strout = self.stdout_patcher.start()
- self.stderr_patcher = patch('sys.stderr', new_callable=io.StringIO)
- self.strerr = self.stderr_patcher.start()
- self.stdin_patcher = patch('sys.stdin', new_callable=io.StringIO)
- self.strin = self.stdin_patcher.start()

pywikibot.bot.set_interface('terminal')

+ self.org_print = pywikibot.bot.ui._print
self.org_input = pywikibot.bot.ui._raw_input

+ pywikibot.bot.ui._print = self._patched_print
pywikibot.bot.ui._raw_input = self._patched_input

+ strout.reset()
+ strerr.reset()
+ strin.reset()
+
pywikibot.config.colorized_output = True
pywikibot.config.transliterate = False
pywikibot.ui.transliteration_target = None
@@ -70,16 +105,23 @@
def tearDown(self):
super().tearDown()

- self.stdout_patcher.stop()
- self.stderr_patcher.stop()
- self.stdin_patcher.stop()
-
+ pywikibot.bot.ui._print = self.org_print
pywikibot.bot.ui._raw_input = self.org_input

pywikibot.bot.set_interface('buffer')

+ def _patched_print(self, text, target_stream):
+ try:
+ stream = patched_streams[target_stream]
+ except KeyError:
+ expected = pywikibot.userinterfaces.win32_unicode.UnicodeOutput
+ assert isinstance(target_stream, expected)
+ assert target_stream._stream
+ stream = patched_streams[target_stream._stream]
+ self.org_print(text, stream)
+
def _patched_input(self):
- return self.strin.readline().strip()
+ return strin._stream.readline().strip()


class TestExceptionError(Exception):
@@ -107,56 +149,56 @@
for text, level, out, err in self.tests:
with self.subTest(test=text):
logger.log(level, text, extra=loggingcontext)
- self.assertEqual(self.strout.getvalue(), out)
- self.assertEqual(self.strerr.getvalue(), err)
+ self.assertEqual(newstdout.getvalue(), out)
+ self.assertEqual(newstderr.getvalue(), err)

# reset terminal files
- for stream in [self.strout, self.strerr, self.strin]:
- stream.truncate(0)
- stream.seek(0)
+ strout.reset()
+ strerr.reset()
+ strin.reset()

def test_output(self):
pywikibot.output('output')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'output\n')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'output\n')

def test_stdout(self):
pywikibot.stdout('output')
- self.assertEqual(self.strout.getvalue(), 'output\n')
- self.assertEqual(self.strerr.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), 'output\n')
+ self.assertEqual(newstderr.getvalue(), '')

def test_warning(self):
pywikibot.warning('warning')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'WARNING: warning\n')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'WARNING: warning\n')

def test_error(self):
pywikibot.error('error')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'ERROR: error\n')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'ERROR: error\n')

def test_log(self):
pywikibot.log('log')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), '')

def test_critical(self):
pywikibot.critical('critical')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'CRITICAL: critical\n')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'CRITICAL: critical\n')

def test_debug(self):
pywikibot.debug('debug', 'test')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), '')

def test_exception(self):
try:
raise TestExceptionError('Testing Exception')
except TestExceptionError:
pywikibot.exception('exception')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(),
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(),
'ERROR: TestExceptionError: Testing Exception\n')

def test_exception_tb(self):
@@ -164,8 +206,8 @@
raise TestExceptionError('Testing Exception')
except TestExceptionError:
pywikibot.exception('exception', tb=True)
- self.assertEqual(self.strout.getvalue(), '')
- stderrlines = self.strerr.getvalue().split('\n')
+ self.assertEqual(newstdout.getvalue(), '')
+ stderrlines = newstderr.getvalue().split('\n')
self.assertEqual(stderrlines[0],
'ERROR: TestExceptionError: Testing Exception')
self.assertEqual(stderrlines[1], 'Traceback (most recent call last):')
@@ -188,22 +230,22 @@
input_choice_output = 'question ([A]nswer 1, a[n]swer 2, an[s]wer 3): '

def testInput(self):
- self.strin.write('input to read\n')
- self.strin.seek(0)
+ newstdin.write('input to read\n')
+ newstdin.seek(0)
returned = pywikibot.input('question')

- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'question: ')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'question: ')
self.assertIsInstance(returned, str)
self.assertEqual(returned, 'input to read')

def test_input_yn(self):
- self.strin.write('\n')
- self.strin.seek(0)
+ newstdin.write('\n')
+ newstdin.seek(0)
returned = pywikibot.input_yn('question', False, automatic_quit=False)

- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'question ([y]es, [N]o): ')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'question ([y]es, [N]o): ')
self.assertFalse(returned)

def _call_input_choice(self):
@@ -215,39 +257,39 @@
'A',
automatic_quit=False)

- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertIsInstance(rv, str)
return rv

def testInputChoiceDefault(self):
- self.strin.write('\n')
- self.strin.seek(0)
+ newstdin.write('\n')
+ newstdin.seek(0)
returned = self._call_input_choice()

self.assertEqual(returned, 'a')

def testInputChoiceCapital(self):
- self.strin.write('N\n')
- self.strin.seek(0)
+ newstdin.write('N\n')
+ newstdin.seek(0)
returned = self._call_input_choice()

- self.assertEqual(self.strerr.getvalue(), self.input_choice_output)
+ self.assertEqual(newstderr.getvalue(), self.input_choice_output)
self.assertEqual(returned, 'n')

def testInputChoiceNonCapital(self):
- self.strin.write('n\n')
- self.strin.seek(0)
+ newstdin.write('n\n')
+ newstdin.seek(0)
returned = self._call_input_choice()

- self.assertEqual(self.strerr.getvalue(), self.input_choice_output)
+ self.assertEqual(newstderr.getvalue(), self.input_choice_output)
self.assertEqual(returned, 'n')

def testInputChoiceIncorrectAnswer(self):
- self.strin.write('X\nN\n')
- self.strin.seek(0)
+ newstdin.write('X\nN\n')
+ newstdin.seek(0)
returned = self._call_input_choice()

- self.assertEqual(self.strerr.getvalue(), self.input_choice_output * 2)
+ self.assertEqual(newstderr.getvalue(), self.input_choice_output * 2)
self.assertEqual(returned, 'n')

def test_input_list_choice(self):
@@ -255,9 +297,9 @@
options = ('answer 1', 'answer 2', 'answer 3')
rv = pywikibot.bot.input_list_choice('question', options, '2')

- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertEqual(
- self.strerr.getvalue(),
+ newstderr.getvalue(),
''.join('{}: {}\n'.format(num, items)
for num, items in enumerate(options, start=1))
+ 'question (default: 2): ')
@@ -273,17 +315,17 @@

def testOutputColorizedText(self):
pywikibot.output(self.str1)
- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertEqual(
- self.strerr.getvalue(),
+ newstderr.getvalue(),
'text \x1b[95mlight purple text\x1b[0m text\n')

def testOutputNoncolorizedText(self):
pywikibot.config.colorized_output = False
pywikibot.output(self.str1)
- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertEqual(
- self.strerr.getvalue(),
+ newstderr.getvalue(),
'text light purple text text ***\n')

str2 = ('normal text \03{lightpurple} light purple '
@@ -293,9 +335,9 @@
def testOutputColorCascade_incorrect(self):
"""Test incorrect behavior of testOutputColorCascade."""
pywikibot.output(self.str2)
- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertEqual(
- self.strerr.getvalue(),
+ newstderr.getvalue(),
'normal text \x1b[95m light purple '
'\x1b[94m light blue \x1b[95m light purple '
'\x1b[0m normal text\n')
@@ -308,18 +350,18 @@

def testOutputUnicodeText(self):
pywikibot.output('Заглавная_страница')
- self.assertEqual(self.strout.getvalue(), '')
- self.assertEqual(self.strerr.getvalue(), 'Заглавная_страница\n')
+ self.assertEqual(newstdout.getvalue(), '')
+ self.assertEqual(newstderr.getvalue(), 'Заглавная_страница\n')

def testInputUnicodeText(self):
- self.strin.write('Заглавная_страница\n')
- self.strin.seek(0)
+ newstdin.write('Заглавная_страница\n')
+ newstdin.seek(0)

returned = pywikibot.input('Википедию? ')

- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertEqual(
- self.strerr.getvalue(), 'Википедию? ')
+ newstderr.getvalue(), 'Википедию? ')

self.assertIsInstance(returned, str)
self.assertEqual(returned, 'Заглавная_страница')
@@ -334,9 +376,9 @@
pywikibot.bot.ui.encoding = 'latin-1'
pywikibot.config.transliterate = True
pywikibot.output('abcd АБГД αβγδ あいうえお')
- self.assertEqual(self.strout.getvalue(), '')
+ self.assertEqual(newstdout.getvalue(), '')
self.assertEqual(
- self.strerr.getvalue(),
+ newstderr.getvalue(),
'abcd \x1b[93mA\x1b[0m\x1b[93mB\x1b[0m\x1b[93mG\x1b[0m'
'\x1b[93mD\x1b[0m \x1b[93ma\x1b[0m\x1b[93mb\x1b[0m\x1b[93mg'
'\x1b[0m\x1b[93md\x1b[0m \x1b[93ma\x1b[0m\x1b[93mi\x1b[0m'
@@ -357,9 +399,8 @@
def setUp(self):
"""Create dummy instances for the test and patch encounter_color."""
super().setUp()
+ self.stream = io.StringIO()
self.ui_obj = self.ui_class()
- # Write to sys.stdout stream, which we'll redirect to the stream below
- self.redirect = io.StringIO()
self._orig_encounter_color = self.ui_obj.encounter_color
self.ui_obj.encounter_color = self._encounter_color
self._index = 0
@@ -371,6 +412,10 @@
self.assertEqual(self._index,
len(self._colors) if self.expect_color else 0)

+ def _getvalue(self):
+ """Get the value of the stream."""
+ return self.stream.getvalue()
+
def _encounter_color(self, color, target_stream):
"""Patched encounter_color method."""
raise AssertionError('This method should not be invoked')
@@ -378,51 +423,41 @@
def test_no_color(self):
"""Test a string without any colors."""
self._colors = ()
- with redirect_stdout(self.redirect) as f:
- self.ui_obj._print('Hello world you!', self.ui_obj.stdout)
- self.assertEqual(f.getvalue(), 'Hello world you!')
+ self.ui_obj._print('Hello world you!', self.stream)
+ self.assertEqual(self._getvalue(), 'Hello world you!')

def test_one_color(self):
"""Test a string using one color."""
self._colors = (('red', 6), ('default', 10))
- with redirect_stdout(self.redirect) as f:
- self.ui_obj._print('Hello \03{red}world you!', self.ui_obj.stdout)
- self.assertEqual(f.getvalue(), self.expected)
+ self.ui_obj._print('Hello \03{red}world you!', self.stream)
+ self.assertEqual(self._getvalue(), self.expected)

def test_flat_color(self):
"""Test using colors with defaulting in between."""
self._colors = (('red', 6), ('default', 6), ('yellow', 3),
('default', 1))
- with redirect_stdout(self.redirect) as f:
- self.ui_obj._print(
- 'Hello \03{red}world \03{default}you\03{yellow}!',
- self.ui_obj.stdout)
- self.assertEqual(f.getvalue(), self.expected)
+ self.ui_obj._print('Hello \03{red}world \03{default}you\03{yellow}!',
+ self.stream)
+ self.assertEqual(self._getvalue(), self.expected)

def test_stack_with_pop_color(self):
"""Test using stacked colors and just popping the latest color."""
self._colors = (('red', 6), ('yellow', 6), ('red', 3), ('default', 1))
- with redirect_stdout(self.redirect) as f:
- self.ui_obj._print(
- 'Hello \03{red}world \03{yellow}you\03{previous}!',
- self.ui_obj.stdout)
- self.assertEqual(f.getvalue(), self.expected)
+ self.ui_obj._print('Hello \03{red}world \03{yellow}you\03{previous}!',
+ self.stream)
+ self.assertEqual(self._getvalue(), self.expected)

def test_stack_implicit_color(self):
"""Test using stacked colors without popping any."""
self._colors = (('red', 6), ('yellow', 6), ('default', 4))
- with redirect_stdout(self.redirect) as f:
- self.ui_obj._print('Hello \03{red}world \03{yellow}you!',
- self.ui_obj.stdout)
- self.assertEqual(f.getvalue(), self.expected)
+ self.ui_obj._print('Hello \03{red}world \03{yellow}you!', self.stream)
+ self.assertEqual(self._getvalue(), self.expected)

def test_one_color_newline(self):
"""Test with trailing new line and one color."""
self._colors = (('red', 6), ('default', 11))
- with redirect_stdout(self.redirect) as f:
- self.ui_obj._print('Hello \03{red}world you!\n',
- self.ui_obj.stdout)
- self.assertEqual(f.getvalue(), self.expected + '\n')
+ self.ui_obj._print('Hello \03{red}world you!\n', self.stream)
+ self.assertEqual(self._getvalue(), self.expected + '\n')


class FakeUIColorizedTestBase(TestCase):
@@ -454,11 +489,11 @@

def _encounter_color(self, color, target_stream):
"""Verify that the written data, color and stream are correct."""
- self.assertIs(target_stream, self.ui_obj.stdout)
+ self.assertIs(target_stream, self.stream)
expected_color = self._colors[self._index][0]
self._index += 1
self.assertEqual(color, expected_color)
- self.assertLength(self.redirect.getvalue(),
+ self.assertLength(self.stream.getvalue(),
sum(e[1] for e in self._colors[:self._index]))


@@ -484,7 +519,7 @@
ctypes = FakeModule.create_dotted('ctypes.windll.kernel32')
ctypes.windll.kernel32.SetConsoleTextAttribute = self._handle_setattr
terminal_interface_win32.ctypes = ctypes
- self.ui_obj.stdout._hConsole = object()
+ self.stream._hConsole = object()

def tearDown(self):
"""Unpatch the ctypes import and check that all colors were used."""
@@ -497,12 +532,12 @@

def _handle_setattr(self, handle, attribute):
"""Dummy method to handle SetConsoleTextAttribute."""
- self.assertIs(handle, self.ui_obj.stdout._hConsole)
+ self.assertIs(handle, self.stream._hConsole)
color = self._colors[self._index][0]
self._index += 1
color = terminal_interface_win32.windowsColors[color]
self.assertEqual(attribute, color)
- self.assertLength(self.redirect.getvalue(),
+ self.assertLength(self.stream.getvalue(),
sum(e[1] for e in self._colors[:self._index]))


@@ -518,7 +553,7 @@
def setUp(self):
"""Change the local stream's console to None to disable colors."""
super().setUp()
- self.ui_obj.stdout._hConsole = None
+ self.stream._hConsole = None


if __name__ == '__main__': # pragma: no cover

To view, visit change 748274. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: Ic5a82622444f47aad9ee17548fb9bf9c54489f42
Gerrit-Change-Number: 748274
Gerrit-PatchSet: 2
Gerrit-Owner: Xqt <info@gno.de>
Gerrit-Reviewer: Chris Maynor <cmchrismaynor@gmail.com>
Gerrit-Reviewer: Merlijn van Deen <valhallasw@arctus.nl>
Gerrit-Reviewer: Xqt <info@gno.de>
Gerrit-Reviewer: jenkins-bot
Gerrit-MessageType: merged