diff --git a/contextlib2.py b/contextlib2.py index acb093b..a618f23 100644 --- a/contextlib2.py +++ b/contextlib2.py @@ -1,30 +1,19 @@ """contextlib2 - backports and enhancements to the contextlib module""" import sys +import warnings from collections import deque from functools import wraps -# Standard library exports __all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack", - "redirect_stdout", "suppress"] + "redirect_stdout", "redirect_stderr", "suppress"] -# contextlib2 only exports (current and legacy experimental interfaces) +# Backwards compatibility __all__ += ["ContextStack"] class ContextDecorator(object): "A base class or mixin that enables context managers to work as decorators." - def _recreate_cm(self): - """Return a recreated instance of self. - Allows an otherwise one-shot context manager like - _GeneratorContextManager to support use as - a decorator via implicit recreation. - - This is a private interface just for _GeneratorContextManager. - See issue #11647 for details. - """ - return self - def refresh_cm(self): """Returns the context manager used to actually wrap the call to the decorated function. @@ -34,13 +23,30 @@ class ContextDecorator(object): Overriding this method allows otherwise one-shot context managers like _GeneratorContextManager to support use as decorators via implicit recreation. + + DEPRECATED: refresh_cm was never added to the standard library's + ContextDecorator API + """ + warnings.warn("refresh_cm was never added to the standard library", + DeprecationWarning) + return self._recreate_cm() + + def _recreate_cm(self): + """Return a recreated instance of self. + + Allows an otherwise one-shot context manager like + _GeneratorContextManager to support use as + a decorator via implicit recreation. + + This is a private interface just for _GeneratorContextManager. + See issue #11647 for details. """ return self def __call__(self, func): @wraps(func) def inner(*args, **kwds): - with self.refresh_cm(): + with self._recreate_cm(): return func(*args, **kwds) return inner @@ -48,7 +54,7 @@ class ContextDecorator(object): class _GeneratorContextManager(ContextDecorator): """Helper for @contextmanager decorator.""" - def __init__(self, func, *args, **kwds): + def __init__(self, func, args, kwds): self.gen = func(*args, **kwds) self.func, self.args, self.kwds = func, args, kwds # Issue 19330: ensure context manager instances have good docstrings @@ -62,11 +68,11 @@ class _GeneratorContextManager(ContextDecorator): # for the class instead. # See http://bugs.python.org/issue19404 for more details. - def refresh_cm(self): + def _recreate_cm(self): # _GCM instances are one-shot context managers, so the # CM must be recreated each time a decorated function is # called - return self.__class__(self.func, *self.args, **self.kwds) + return self.__class__(self.func, self.args, self.kwds) def __enter__(self): try: @@ -91,10 +97,17 @@ class _GeneratorContextManager(ContextDecorator): self.gen.throw(type, value, traceback) raise RuntimeError("generator didn't stop after throw()") except StopIteration as exc: - # Suppress the exception *unless* it's the same exception that + # Suppress StopIteration *unless* it's the same exception that # was passed to throw(). This prevents a StopIteration - # raised inside the "with" statement from being suppressed + # raised inside the "with" statement from being suppressed. return exc is not value + except RuntimeError as exc: + # Likewise, avoid suppressing if a StopIteration exception + # was passed to throw() and later wrapped into a RuntimeError + # (see PEP 479). + if _HAVE_EXCEPTION_CHAINING and exc.__cause__ is value: + return False + raise except: # only re-raise if it's *not* the exception that was # passed to throw(), because __exit__() must not raise @@ -137,7 +150,7 @@ def contextmanager(func): """ @wraps(func) def helper(*args, **kwds): - return _GeneratorContextManager(func, *args, **kwds) + return _GeneratorContextManager(func, args, kwds) return helper @@ -165,8 +178,27 @@ class closing(object): def __exit__(self, *exc_info): self.thing.close() -class redirect_stdout: - """Context manager for temporarily redirecting stdout to another file + +class _RedirectStream: + + _stream = None + + def __init__(self, new_target): + self._new_target = new_target + # We use a list of old targets to make this CM re-entrant + self._old_targets = [] + + def __enter__(self): + self._old_targets.append(getattr(sys, self._stream)) + setattr(sys, self._stream, self._new_target) + return self._new_target + + def __exit__(self, exctype, excinst, exctb): + setattr(sys, self._stream, self._old_targets.pop()) + + +class redirect_stdout(_RedirectStream): + """Context manager for temporarily redirecting stdout to another file. # How to send help() to stderr with redirect_stdout(sys.stderr): @@ -178,22 +210,13 @@ class redirect_stdout: help(pow) """ - def __init__(self, new_target): - self._new_target = new_target - self._old_target = self._sentinel = object() + _stream = "stdout" - def __enter__(self): - if self._old_target is not self._sentinel: - raise RuntimeError("Cannot reenter {0!r}".format(self)) - self._old_target = sys.stdout - sys.stdout = self._new_target - return self._new_target - def __exit__(self, exctype, excinst, exctb): - restore_stdout = self._old_target - self._old_target = self._sentinel - sys.stdout = restore_stdout +class redirect_stderr(_RedirectStream): + """Context manager for temporarily redirecting stderr to another file.""" + _stream = "stderr" class suppress: @@ -227,14 +250,21 @@ class suppress: # Context manipulation is Python 3 only -if str is not bytes: +_HAVE_EXCEPTION_CHAINING = sys.version_info.major >= 3 +if _HAVE_EXCEPTION_CHAINING: def _make_context_fixer(frame_exc): def _fix_exception_context(new_exc, old_exc): + # Context may not be correct, so find the end of the chain while 1: exc_context = new_exc.__context__ - if exc_context in (None, frame_exc): + if exc_context is old_exc: + # Context is already set correctly (see issue 20317) + return + if exc_context is None or exc_context is frame_exc: break new_exc = exc_context + # Change the end of the chain to point to the exception + # we expect it to reference new_exc.__context__ = old_exc return _fix_exception_context @@ -258,7 +288,6 @@ else: exc_type, exc_value, exc_tb = exc_details exec ("raise exc_type, exc_value, exc_tb") - # Inspired by discussions on http://bugs.python.org/issue13585 class ExitStack(object): """Context manager for dynamic management of a stack of exit callbacks @@ -375,6 +404,11 @@ class ExitStack(object): class ContextStack(ExitStack): """Backwards compatibility alias for ExitStack""" + def __init__(self): + warnings.warn("ContextStack has been renamed to ExitStack", + DeprecationWarning) + super(ContextStack, self).__init__() + def register_exit(self, callback): return self.push(callback) diff --git a/test_contextlib2.py b/test_contextlib2.py index 2b9d315..3a12585 100755 --- a/test_contextlib2.py +++ b/test_contextlib2.py @@ -1,50 +1,21 @@ -#!/usr/bin/env python -"""Unit tests for contextlib2""" +"""Unit tests for contextlib2.py""" +from __future__ import print_function +from __future__ import unicode_literals import io import sys - -# Check for a sufficiently recent unittest module import unittest -if not hasattr(unittest, "skipIf"): - import unittest2 as unittest - -# Handle 2/3 compatibility for redirect_stdout testing, -# checking 3.x only implicit exception chaining behaviour -# and checking for exception details in test cases -if str is bytes: - # Python 2 - from io import BytesIO as StrIO - check_exception_chaining = False - - def check_exception_details(case, exc_type, regex): - return case.assertRaisesRegexp(exc_type, regex) -else: - # Python 3 - from io import StringIO as StrIO - check_exception_chaining = True - - def check_exception_details(case, exc_type, regex): - return case.assertRaisesRegex(exc_type, regex) - - +import __future__ # For PEP 479 conditional test +import contextlib2 from contextlib2 import * # Tests __all__ -class ContextManagerTestCase(unittest.TestCase): +if not hasattr(unittest.TestCase, "assertRaisesRegex"): + import unittest2 as unittest - def test_instance_docstring_given_function_docstring(self): - # Issue 19330: ensure context manager instances have good docstrings - # See http://bugs.python.org/issue19404 for why this doesn't currently - # affect help() output :( - def gen_with_docstring(): - """This has a docstring""" - yield - gen_docstring = gen_with_docstring.__doc__ - cm_with_docstring = contextmanager(gen_with_docstring) - self.assertEqual(cm_with_docstring.__doc__, gen_docstring) - obj = cm_with_docstring() - self.assertEqual(obj.__doc__, gen_docstring) - self.assertNotEqual(obj.__doc__, type(obj).__doc__) +requires_docstrings = unittest.skipIf(sys.flags.optimize >= 2, + "Test requires docstrings") + +class ContextManagerTestCase(unittest.TestCase): def test_contextmanager_plain(self): state = [] @@ -115,6 +86,44 @@ class ContextManagerTestCase(unittest.TestCase): raise ZeroDivisionError(999) self.assertEqual(state, [1, 42, 999]) + def test_contextmanager_except_stopiter(self): + stop_exc = StopIteration('spam') + @contextmanager + def woohoo(): + yield + try: + with self.assertWarnsRegex(PendingDeprecationWarning, + "StopIteration"): + with woohoo(): + raise stop_exc + except Exception as ex: + self.assertIs(ex, stop_exc) + else: + self.fail('StopIteration was suppressed') + + @unittest.skipUnless(hasattr(__future__, "generator_stop"), + "Test only valid for versions implementing PEP 479") + def test_contextmanager_except_pep479(self): + code = """\ +from __future__ import generator_stop +from contextlib import contextmanager +@contextmanager +def woohoo(): + yield +""" + locals = {} + exec(code, locals, locals) + woohoo = locals['woohoo'] + + stop_exc = StopIteration('spam') + try: + with woohoo(): + raise stop_exc + except Exception as ex: + self.assertIs(ex, stop_exc) + else: + self.fail('StopIteration was suppressed') + def _create_contextmanager_attribs(self): def attribs(**kw): def decorate(func): @@ -133,14 +142,28 @@ class ContextManagerTestCase(unittest.TestCase): self.assertEqual(baz.__name__,'baz') self.assertEqual(baz.foo, 'bar') - @unittest.skipIf(sys.flags.optimize >= 2, - "Docstrings are omitted with -O2 and above") + @requires_docstrings def test_contextmanager_doc_attrib(self): baz = self._create_contextmanager_attribs() self.assertEqual(baz.__doc__, "Whee!") + @requires_docstrings + def test_instance_docstring_given_cm_docstring(self): + baz = self._create_contextmanager_attribs()(None) + self.assertEqual(baz.__doc__, "Whee!") + + def test_keywords(self): + # Ensure no keyword arguments are inhibited + @contextmanager + def woohoo(self, func, args, kwds): + yield (self, func, args, kwds) + with woohoo(self=11, func=22, args=33, kwds=44) as target: + self.assertEqual(target, (11, 22, 33, 44)) + + class ClosingTestCase(unittest.TestCase): + @requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = closing.__doc__ @@ -189,6 +212,7 @@ class mycontext(ContextDecorator): class TestContextDecorator(unittest.TestCase): + @requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = mycontext.__doc__ @@ -207,7 +231,7 @@ class TestContextDecorator(unittest.TestCase): def test_contextdecorator_with_exception(self): context = mycontext() - with check_exception_details(self, NameError, 'foo'): + with self.assertRaisesRegex(NameError, 'foo'): with context: raise NameError('foo') self.assertIsNotNone(context.exc) @@ -241,7 +265,7 @@ class TestContextDecorator(unittest.TestCase): self.assertTrue(context.started) raise NameError('foo') - with check_exception_details(self, NameError, 'foo'): + with self.assertRaisesRegex(NameError, 'foo'): test() self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) @@ -345,9 +369,12 @@ class TestContextDecorator(unittest.TestCase): test('something else') self.assertEqual(state, [1, 'something else', 999]) +# Detailed exception chaining checks only make sense on Python 3 +check_exception_chaining = contextlib2._HAVE_EXCEPTION_CHAINING class TestExitStack(unittest.TestCase): + @requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = ExitStack.__doc__ @@ -479,10 +506,7 @@ class TestExitStack(unittest.TestCase): def test_exit_exception_chaining_reference(self): # Sanity check to make sure that ExitStack chaining matches # actual nested with statements - # We still run this under Py2, but the only thing it actually - # checks in that case is that the outermost exception is IndexError - # and that the inner ValueError was suppressed - class RaiseExc(object): + class RaiseExc: def __init__(self, exc): self.exc = exc def __enter__(self): @@ -490,7 +514,7 @@ class TestExitStack(unittest.TestCase): def __exit__(self, *exc_details): raise self.exc - class RaiseExcWithContext(object): + class RaiseExcWithContext: def __init__(self, outer, inner): self.outer = outer self.inner = inner @@ -502,11 +526,11 @@ class TestExitStack(unittest.TestCase): except: raise self.outer - class SuppressExc(object): + class SuppressExc: def __enter__(self): return self def __exit__(self, *exc_details): - type(self).saved_details = exc_details + self.__class__.saved_details = exc_details return True try: @@ -531,15 +555,12 @@ class TestExitStack(unittest.TestCase): def test_exit_exception_chaining(self): # Ensure exception chaining matches the reference behaviour - # We still run this under Py2, but the only thing it actually - # checks in that case is that the outermost exception is IndexError - # and that the inner ValueError was suppressed def raise_exc(exc): raise exc - saved_details = [] + saved_details = [None] def suppress_exc(*exc_details): - saved_details[:] = [exc_details] + saved_details[0] = exc_details return True try: @@ -591,6 +612,69 @@ class TestExitStack(unittest.TestCase): else: self.fail("Expected KeyError, but no exception was raised") + def test_exit_exception_with_correct_context(self): + # http://bugs.python.org/issue20317 + @contextmanager + def gets_the_context_right(exc): + try: + yield + finally: + raise exc + + exc1 = Exception(1) + exc2 = Exception(2) + exc3 = Exception(3) + exc4 = Exception(4) + + # The contextmanager already fixes the context, so prior to the + # fix, ExitStack would try to fix it *again* and get into an + # infinite self-referential loop + try: + with ExitStack() as stack: + stack.enter_context(gets_the_context_right(exc4)) + stack.enter_context(gets_the_context_right(exc3)) + stack.enter_context(gets_the_context_right(exc2)) + raise exc1 + except Exception as exc: + self.assertIs(exc, exc4) + if check_exception_chaining: + self.assertIs(exc.__context__, exc3) + self.assertIs(exc.__context__.__context__, exc2) + self.assertIs(exc.__context__.__context__.__context__, exc1) + self.assertIsNone( + exc.__context__.__context__.__context__.__context__) + + def test_exit_exception_with_existing_context(self): + # Addresses a lack of test coverage discovered after checking in a + # fix for issue 20317 that still contained debugging code. + def raise_nested(inner_exc, outer_exc): + try: + raise inner_exc + finally: + raise outer_exc + exc1 = Exception(1) + exc2 = Exception(2) + exc3 = Exception(3) + exc4 = Exception(4) + exc5 = Exception(5) + try: + with ExitStack() as stack: + stack.callback(raise_nested, exc4, exc5) + stack.callback(raise_nested, exc2, exc3) + raise exc1 + except Exception as exc: + self.assertIs(exc, exc5) + if check_exception_chaining: + self.assertIs(exc.__context__, exc4) + self.assertIs(exc.__context__.__context__, exc3) + self.assertIs(exc.__context__.__context__.__context__, exc2) + self.assertIs( + exc.__context__.__context__.__context__.__context__, exc1) + self.assertIsNone( + exc.__context__.__context__.__context__.__context__.__context__) + + + def test_body_exception_suppress(self): def suppress_exc(*exc_details): return True @@ -622,51 +706,79 @@ class TestExitStack(unittest.TestCase): stack.push(cm) self.assertIs(stack._exit_callbacks[-1], cm) -class TestRedirectStdout(unittest.TestCase): +class TestRedirectStream: + + redirect_stream = None + orig_stream = None + + @requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings - cm_docstring = redirect_stdout.__doc__ - obj = redirect_stdout(None) + cm_docstring = self.redirect_stream.__doc__ + obj = self.redirect_stream(None) self.assertEqual(obj.__doc__, cm_docstring) + def test_no_redirect_in_init(self): + orig_stdout = getattr(sys, self.orig_stream) + self.redirect_stream(None) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) + def test_redirect_to_string_io(self): - f = StrIO() + f = io.StringIO() msg = "Consider an API like help(), which prints directly to stdout" - with redirect_stdout(f): - print(msg) + orig_stdout = getattr(sys, self.orig_stream) + with self.redirect_stream(f): + print(msg, file=getattr(sys, self.orig_stream)) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue().strip() self.assertEqual(s, msg) def test_enter_result_is_target(self): - f = StrIO() - with redirect_stdout(f) as enter_result: + f = io.StringIO() + with self.redirect_stream(f) as enter_result: self.assertIs(enter_result, f) def test_cm_is_reusable(self): - f = StrIO() - write_to_f = redirect_stdout(f) + f = io.StringIO() + write_to_f = self.redirect_stream(f) + orig_stdout = getattr(sys, self.orig_stream) with write_to_f: - print("Hello") + print("Hello", end=" ", file=getattr(sys, self.orig_stream)) with write_to_f: - print("World!") + print("World!", file=getattr(sys, self.orig_stream)) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue() - self.assertEqual(s, "Hello\nWorld!\n") + self.assertEqual(s, "Hello World!\n") - # If this is ever made reentrant, update the reusable-but-not-reentrant - # example at the end of the contextlib docs accordingly. - def test_nested_reentry_fails(self): - f = StrIO() - write_to_f = redirect_stdout(f) - with check_exception_details(self, RuntimeError, "Cannot reenter"): + def test_cm_is_reentrant(self): + f = io.StringIO() + write_to_f = self.redirect_stream(f) + orig_stdout = getattr(sys, self.orig_stream) + with write_to_f: + print("Hello", end=" ", file=getattr(sys, self.orig_stream)) with write_to_f: - print("Hello") - with write_to_f: - print("World!") + print("World!", file=getattr(sys, self.orig_stream)) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) + s = f.getvalue() + self.assertEqual(s, "Hello World!\n") + + +class TestRedirectStdout(TestRedirectStream, unittest.TestCase): + + redirect_stream = redirect_stdout + orig_stream = "stdout" + + +class TestRedirectStderr(TestRedirectStream, unittest.TestCase): + + redirect_stream = redirect_stderr + orig_stream = "stderr" class TestSuppress(unittest.TestCase): + @requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = suppress.__doc__ @@ -712,131 +824,12 @@ class TestSuppress(unittest.TestCase): with ignore_exceptions: len(5) with ignore_exceptions: - 1/0 with ignore_exceptions: # Check nested usage len(5) - -class TestContextStack(unittest.TestCase): - - def test_no_resources(self): - with ContextStack(): - pass - - def test_register(self): - expected = [ - ((), {}), - ((1,), {}), - ((1,2), {}), - ((), dict(example=1)), - ((1,), dict(example=1)), - ((1,2), dict(example=1)), - ] - result = [] - def _exit(*args, **kwds): - """Test metadata propagation""" - result.append((args, kwds)) - with ContextStack() as stack: - for args, kwds in reversed(expected): - if args and kwds: - f = stack.register(_exit, *args, **kwds) - elif args: - f = stack.register(_exit, *args) - elif kwds: - f = stack.register(_exit, **kwds) - else: - f = stack.register(_exit) - self.assertIs(f, _exit) - for wrapper in stack._exit_callbacks: - self.assertIs(wrapper.__wrapped__, _exit) - self.assertNotEqual(wrapper.__name__, _exit.__name__) - self.assertIsNone(wrapper.__doc__, _exit.__doc__) - self.assertEqual(result, expected) - - def test_register_exit(self): - exc_raised = ZeroDivisionError - def _expect_exc(exc_type, exc, exc_tb): - self.assertIs(exc_type, exc_raised) - def _suppress_exc(*exc_details): - return True - def _expect_ok(exc_type, exc, exc_tb): - self.assertIsNone(exc_type) - self.assertIsNone(exc) - self.assertIsNone(exc_tb) - class ExitCM(object): - def __init__(self, check_exc): - self.check_exc = check_exc - def __enter__(self): - self.fail("Should not be called!") - def __exit__(self, *exc_details): - self.check_exc(*exc_details) - with ContextStack() as stack: - stack.register_exit(_expect_ok) - self.assertIs(stack._exit_callbacks[-1], _expect_ok) - cm = ExitCM(_expect_ok) - stack.register_exit(cm) - self.assertIs(stack._exit_callbacks[-1].__self__, cm) - stack.register_exit(_suppress_exc) - self.assertIs(stack._exit_callbacks[-1], _suppress_exc) - cm = ExitCM(_expect_exc) - stack.register_exit(cm) - self.assertIs(stack._exit_callbacks[-1].__self__, cm) - stack.register_exit(_expect_exc) - self.assertIs(stack._exit_callbacks[-1], _expect_exc) - stack.register_exit(_expect_exc) - self.assertIs(stack._exit_callbacks[-1], _expect_exc) + outer_continued = True 1/0 - - def test_enter_context(self): - class TestCM(object): - def __enter__(self): - result.append(1) - def __exit__(self, *exc_details): - result.append(3) - - result = [] - cm = TestCM() - with ContextStack() as stack: - @stack.register # Registered first => cleaned up last - def _exit(): - result.append(4) - self.assertIsNotNone(_exit) - stack.enter_context(cm) - self.assertIs(stack._exit_callbacks[-1].__self__, cm) - result.append(2) - self.assertEqual(result, [1, 2, 3, 4]) - - def test_close(self): - result = [] - with ContextStack() as stack: - @stack.register - def _exit(): - result.append(1) - self.assertIsNotNone(_exit) - stack.close() - result.append(2) - self.assertEqual(result, [1, 2]) - - def test_preserve(self): - result = [] - with ContextStack() as stack: - @stack.register - def _exit(): - result.append(3) - self.assertIsNotNone(_exit) - new_stack = stack.preserve() - result.append(1) - result.append(2) - new_stack.close() - self.assertEqual(result, [1, 2, 3]) - - def test_instance_bypass(self): - class Example(object): pass - cm = Example() - cm.__exit__ = object() - stack = ContextStack() - self.assertRaises(AttributeError, stack.enter_context, cm) - stack.register_exit(cm) - self.assertIs(stack._exit_callbacks[-1], cm) + self.assertTrue(outer_continued) if __name__ == "__main__": + import unittest unittest.main()