Rebase on Python 3.5 contextlib

This commit is contained in:
Nick Coghlan 2016-01-12 18:51:33 +10:00
parent 939293da14
commit 6d77165d63
2 changed files with 266 additions and 239 deletions

View file

@ -1,30 +1,19 @@
"""contextlib2 - backports and enhancements to the contextlib module"""
import sys
import warnings
from collections import deque
from functools import wraps
# Standard library exports
__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack",
"redirect_stdout", "suppress"]
"redirect_stdout", "redirect_stderr", "suppress"]
# contextlib2 only exports (current and legacy experimental interfaces)
# Backwards compatibility
__all__ += ["ContextStack"]
class ContextDecorator(object):
"A base class or mixin that enables context managers to work as decorators."
def _recreate_cm(self):
"""Return a recreated instance of self.
Allows an otherwise one-shot context manager like
_GeneratorContextManager to support use as
a decorator via implicit recreation.
This is a private interface just for _GeneratorContextManager.
See issue #11647 for details.
"""
return self
def refresh_cm(self):
"""Returns the context manager used to actually wrap the call to the
decorated function.
@ -34,13 +23,30 @@ class ContextDecorator(object):
Overriding this method allows otherwise one-shot context managers
like _GeneratorContextManager to support use as decorators via
implicit recreation.
DEPRECATED: refresh_cm was never added to the standard library's
ContextDecorator API
"""
warnings.warn("refresh_cm was never added to the standard library",
DeprecationWarning)
return self._recreate_cm()
def _recreate_cm(self):
"""Return a recreated instance of self.
Allows an otherwise one-shot context manager like
_GeneratorContextManager to support use as
a decorator via implicit recreation.
This is a private interface just for _GeneratorContextManager.
See issue #11647 for details.
"""
return self
def __call__(self, func):
@wraps(func)
def inner(*args, **kwds):
with self.refresh_cm():
with self._recreate_cm():
return func(*args, **kwds)
return inner
@ -48,7 +54,7 @@ class ContextDecorator(object):
class _GeneratorContextManager(ContextDecorator):
"""Helper for @contextmanager decorator."""
def __init__(self, func, *args, **kwds):
def __init__(self, func, args, kwds):
self.gen = func(*args, **kwds)
self.func, self.args, self.kwds = func, args, kwds
# Issue 19330: ensure context manager instances have good docstrings
@ -62,11 +68,11 @@ class _GeneratorContextManager(ContextDecorator):
# for the class instead.
# See http://bugs.python.org/issue19404 for more details.
def refresh_cm(self):
def _recreate_cm(self):
# _GCM instances are one-shot context managers, so the
# CM must be recreated each time a decorated function is
# called
return self.__class__(self.func, *self.args, **self.kwds)
return self.__class__(self.func, self.args, self.kwds)
def __enter__(self):
try:
@ -91,10 +97,17 @@ class _GeneratorContextManager(ContextDecorator):
self.gen.throw(type, value, traceback)
raise RuntimeError("generator didn't stop after throw()")
except StopIteration as exc:
# Suppress the exception *unless* it's the same exception that
# Suppress StopIteration *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed
# raised inside the "with" statement from being suppressed.
return exc is not value
except RuntimeError as exc:
# Likewise, avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a RuntimeError
# (see PEP 479).
if _HAVE_EXCEPTION_CHAINING and exc.__cause__ is value:
return False
raise
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
@ -137,7 +150,7 @@ def contextmanager(func):
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, *args, **kwds)
return _GeneratorContextManager(func, args, kwds)
return helper
@ -165,8 +178,27 @@ class closing(object):
def __exit__(self, *exc_info):
self.thing.close()
class redirect_stdout:
"""Context manager for temporarily redirecting stdout to another file
class _RedirectStream:
_stream = None
def __init__(self, new_target):
self._new_target = new_target
# We use a list of old targets to make this CM re-entrant
self._old_targets = []
def __enter__(self):
self._old_targets.append(getattr(sys, self._stream))
setattr(sys, self._stream, self._new_target)
return self._new_target
def __exit__(self, exctype, excinst, exctb):
setattr(sys, self._stream, self._old_targets.pop())
class redirect_stdout(_RedirectStream):
"""Context manager for temporarily redirecting stdout to another file.
# How to send help() to stderr
with redirect_stdout(sys.stderr):
@ -178,22 +210,13 @@ class redirect_stdout:
help(pow)
"""
def __init__(self, new_target):
self._new_target = new_target
self._old_target = self._sentinel = object()
_stream = "stdout"
def __enter__(self):
if self._old_target is not self._sentinel:
raise RuntimeError("Cannot reenter {0!r}".format(self))
self._old_target = sys.stdout
sys.stdout = self._new_target
return self._new_target
def __exit__(self, exctype, excinst, exctb):
restore_stdout = self._old_target
self._old_target = self._sentinel
sys.stdout = restore_stdout
class redirect_stderr(_RedirectStream):
"""Context manager for temporarily redirecting stderr to another file."""
_stream = "stderr"
class suppress:
@ -227,14 +250,21 @@ class suppress:
# Context manipulation is Python 3 only
if str is not bytes:
_HAVE_EXCEPTION_CHAINING = sys.version_info.major >= 3
if _HAVE_EXCEPTION_CHAINING:
def _make_context_fixer(frame_exc):
def _fix_exception_context(new_exc, old_exc):
# Context may not be correct, so find the end of the chain
while 1:
exc_context = new_exc.__context__
if exc_context in (None, frame_exc):
if exc_context is old_exc:
# Context is already set correctly (see issue 20317)
return
if exc_context is None or exc_context is frame_exc:
break
new_exc = exc_context
# Change the end of the chain to point to the exception
# we expect it to reference
new_exc.__context__ = old_exc
return _fix_exception_context
@ -258,7 +288,6 @@ else:
exc_type, exc_value, exc_tb = exc_details
exec ("raise exc_type, exc_value, exc_tb")
# Inspired by discussions on http://bugs.python.org/issue13585
class ExitStack(object):
"""Context manager for dynamic management of a stack of exit callbacks
@ -375,6 +404,11 @@ class ExitStack(object):
class ContextStack(ExitStack):
"""Backwards compatibility alias for ExitStack"""
def __init__(self):
warnings.warn("ContextStack has been renamed to ExitStack",
DeprecationWarning)
super(ContextStack, self).__init__()
def register_exit(self, callback):
return self.push(callback)

View file

@ -1,50 +1,21 @@
#!/usr/bin/env python
"""Unit tests for contextlib2"""
"""Unit tests for contextlib2.py"""
from __future__ import print_function
from __future__ import unicode_literals
import io
import sys
# Check for a sufficiently recent unittest module
import unittest
if not hasattr(unittest, "skipIf"):
import unittest2 as unittest
# Handle 2/3 compatibility for redirect_stdout testing,
# checking 3.x only implicit exception chaining behaviour
# and checking for exception details in test cases
if str is bytes:
# Python 2
from io import BytesIO as StrIO
check_exception_chaining = False
def check_exception_details(case, exc_type, regex):
return case.assertRaisesRegexp(exc_type, regex)
else:
# Python 3
from io import StringIO as StrIO
check_exception_chaining = True
def check_exception_details(case, exc_type, regex):
return case.assertRaisesRegex(exc_type, regex)
import __future__ # For PEP 479 conditional test
import contextlib2
from contextlib2 import * # Tests __all__
class ContextManagerTestCase(unittest.TestCase):
if not hasattr(unittest.TestCase, "assertRaisesRegex"):
import unittest2 as unittest
def test_instance_docstring_given_function_docstring(self):
# Issue 19330: ensure context manager instances have good docstrings
# See http://bugs.python.org/issue19404 for why this doesn't currently
# affect help() output :(
def gen_with_docstring():
"""This has a docstring"""
yield
gen_docstring = gen_with_docstring.__doc__
cm_with_docstring = contextmanager(gen_with_docstring)
self.assertEqual(cm_with_docstring.__doc__, gen_docstring)
obj = cm_with_docstring()
self.assertEqual(obj.__doc__, gen_docstring)
self.assertNotEqual(obj.__doc__, type(obj).__doc__)
requires_docstrings = unittest.skipIf(sys.flags.optimize >= 2,
"Test requires docstrings")
class ContextManagerTestCase(unittest.TestCase):
def test_contextmanager_plain(self):
state = []
@ -115,6 +86,44 @@ class ContextManagerTestCase(unittest.TestCase):
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except_stopiter(self):
stop_exc = StopIteration('spam')
@contextmanager
def woohoo():
yield
try:
with self.assertWarnsRegex(PendingDeprecationWarning,
"StopIteration"):
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')
@unittest.skipUnless(hasattr(__future__, "generator_stop"),
"Test only valid for versions implementing PEP 479")
def test_contextmanager_except_pep479(self):
code = """\
from __future__ import generator_stop
from contextlib import contextmanager
@contextmanager
def woohoo():
yield
"""
locals = {}
exec(code, locals, locals)
woohoo = locals['woohoo']
stop_exc = StopIteration('spam')
try:
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')
def _create_contextmanager_attribs(self):
def attribs(**kw):
def decorate(func):
@ -133,14 +142,28 @@ class ContextManagerTestCase(unittest.TestCase):
self.assertEqual(baz.__name__,'baz')
self.assertEqual(baz.foo, 'bar')
@unittest.skipIf(sys.flags.optimize >= 2,
"Docstrings are omitted with -O2 and above")
@requires_docstrings
def test_contextmanager_doc_attrib(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__doc__, "Whee!")
@requires_docstrings
def test_instance_docstring_given_cm_docstring(self):
baz = self._create_contextmanager_attribs()(None)
self.assertEqual(baz.__doc__, "Whee!")
def test_keywords(self):
# Ensure no keyword arguments are inhibited
@contextmanager
def woohoo(self, func, args, kwds):
yield (self, func, args, kwds)
with woohoo(self=11, func=22, args=33, kwds=44) as target:
self.assertEqual(target, (11, 22, 33, 44))
class ClosingTestCase(unittest.TestCase):
@requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = closing.__doc__
@ -189,6 +212,7 @@ class mycontext(ContextDecorator):
class TestContextDecorator(unittest.TestCase):
@requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = mycontext.__doc__
@ -207,7 +231,7 @@ class TestContextDecorator(unittest.TestCase):
def test_contextdecorator_with_exception(self):
context = mycontext()
with check_exception_details(self, NameError, 'foo'):
with self.assertRaisesRegex(NameError, 'foo'):
with context:
raise NameError('foo')
self.assertIsNotNone(context.exc)
@ -241,7 +265,7 @@ class TestContextDecorator(unittest.TestCase):
self.assertTrue(context.started)
raise NameError('foo')
with check_exception_details(self, NameError, 'foo'):
with self.assertRaisesRegex(NameError, 'foo'):
test()
self.assertIsNotNone(context.exc)
self.assertIs(context.exc[0], NameError)
@ -345,9 +369,12 @@ class TestContextDecorator(unittest.TestCase):
test('something else')
self.assertEqual(state, [1, 'something else', 999])
# Detailed exception chaining checks only make sense on Python 3
check_exception_chaining = contextlib2._HAVE_EXCEPTION_CHAINING
class TestExitStack(unittest.TestCase):
@requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = ExitStack.__doc__
@ -479,10 +506,7 @@ class TestExitStack(unittest.TestCase):
def test_exit_exception_chaining_reference(self):
# Sanity check to make sure that ExitStack chaining matches
# actual nested with statements
# We still run this under Py2, but the only thing it actually
# checks in that case is that the outermost exception is IndexError
# and that the inner ValueError was suppressed
class RaiseExc(object):
class RaiseExc:
def __init__(self, exc):
self.exc = exc
def __enter__(self):
@ -490,7 +514,7 @@ class TestExitStack(unittest.TestCase):
def __exit__(self, *exc_details):
raise self.exc
class RaiseExcWithContext(object):
class RaiseExcWithContext:
def __init__(self, outer, inner):
self.outer = outer
self.inner = inner
@ -502,11 +526,11 @@ class TestExitStack(unittest.TestCase):
except:
raise self.outer
class SuppressExc(object):
class SuppressExc:
def __enter__(self):
return self
def __exit__(self, *exc_details):
type(self).saved_details = exc_details
self.__class__.saved_details = exc_details
return True
try:
@ -531,15 +555,12 @@ class TestExitStack(unittest.TestCase):
def test_exit_exception_chaining(self):
# Ensure exception chaining matches the reference behaviour
# We still run this under Py2, but the only thing it actually
# checks in that case is that the outermost exception is IndexError
# and that the inner ValueError was suppressed
def raise_exc(exc):
raise exc
saved_details = []
saved_details = [None]
def suppress_exc(*exc_details):
saved_details[:] = [exc_details]
saved_details[0] = exc_details
return True
try:
@ -591,6 +612,69 @@ class TestExitStack(unittest.TestCase):
else:
self.fail("Expected KeyError, but no exception was raised")
def test_exit_exception_with_correct_context(self):
# http://bugs.python.org/issue20317
@contextmanager
def gets_the_context_right(exc):
try:
yield
finally:
raise exc
exc1 = Exception(1)
exc2 = Exception(2)
exc3 = Exception(3)
exc4 = Exception(4)
# The contextmanager already fixes the context, so prior to the
# fix, ExitStack would try to fix it *again* and get into an
# infinite self-referential loop
try:
with ExitStack() as stack:
stack.enter_context(gets_the_context_right(exc4))
stack.enter_context(gets_the_context_right(exc3))
stack.enter_context(gets_the_context_right(exc2))
raise exc1
except Exception as exc:
self.assertIs(exc, exc4)
if check_exception_chaining:
self.assertIs(exc.__context__, exc3)
self.assertIs(exc.__context__.__context__, exc2)
self.assertIs(exc.__context__.__context__.__context__, exc1)
self.assertIsNone(
exc.__context__.__context__.__context__.__context__)
def test_exit_exception_with_existing_context(self):
# Addresses a lack of test coverage discovered after checking in a
# fix for issue 20317 that still contained debugging code.
def raise_nested(inner_exc, outer_exc):
try:
raise inner_exc
finally:
raise outer_exc
exc1 = Exception(1)
exc2 = Exception(2)
exc3 = Exception(3)
exc4 = Exception(4)
exc5 = Exception(5)
try:
with ExitStack() as stack:
stack.callback(raise_nested, exc4, exc5)
stack.callback(raise_nested, exc2, exc3)
raise exc1
except Exception as exc:
self.assertIs(exc, exc5)
if check_exception_chaining:
self.assertIs(exc.__context__, exc4)
self.assertIs(exc.__context__.__context__, exc3)
self.assertIs(exc.__context__.__context__.__context__, exc2)
self.assertIs(
exc.__context__.__context__.__context__.__context__, exc1)
self.assertIsNone(
exc.__context__.__context__.__context__.__context__.__context__)
def test_body_exception_suppress(self):
def suppress_exc(*exc_details):
return True
@ -622,51 +706,79 @@ class TestExitStack(unittest.TestCase):
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1], cm)
class TestRedirectStdout(unittest.TestCase):
class TestRedirectStream:
redirect_stream = None
orig_stream = None
@requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = redirect_stdout.__doc__
obj = redirect_stdout(None)
cm_docstring = self.redirect_stream.__doc__
obj = self.redirect_stream(None)
self.assertEqual(obj.__doc__, cm_docstring)
def test_no_redirect_in_init(self):
orig_stdout = getattr(sys, self.orig_stream)
self.redirect_stream(None)
self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
def test_redirect_to_string_io(self):
f = StrIO()
f = io.StringIO()
msg = "Consider an API like help(), which prints directly to stdout"
with redirect_stdout(f):
print(msg)
orig_stdout = getattr(sys, self.orig_stream)
with self.redirect_stream(f):
print(msg, file=getattr(sys, self.orig_stream))
self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
s = f.getvalue().strip()
self.assertEqual(s, msg)
def test_enter_result_is_target(self):
f = StrIO()
with redirect_stdout(f) as enter_result:
f = io.StringIO()
with self.redirect_stream(f) as enter_result:
self.assertIs(enter_result, f)
def test_cm_is_reusable(self):
f = StrIO()
write_to_f = redirect_stdout(f)
f = io.StringIO()
write_to_f = self.redirect_stream(f)
orig_stdout = getattr(sys, self.orig_stream)
with write_to_f:
print("Hello")
print("Hello", end=" ", file=getattr(sys, self.orig_stream))
with write_to_f:
print("World!")
print("World!", file=getattr(sys, self.orig_stream))
self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
s = f.getvalue()
self.assertEqual(s, "Hello\nWorld!\n")
self.assertEqual(s, "Hello World!\n")
# If this is ever made reentrant, update the reusable-but-not-reentrant
# example at the end of the contextlib docs accordingly.
def test_nested_reentry_fails(self):
f = StrIO()
write_to_f = redirect_stdout(f)
with check_exception_details(self, RuntimeError, "Cannot reenter"):
def test_cm_is_reentrant(self):
f = io.StringIO()
write_to_f = self.redirect_stream(f)
orig_stdout = getattr(sys, self.orig_stream)
with write_to_f:
print("Hello", end=" ", file=getattr(sys, self.orig_stream))
with write_to_f:
print("Hello")
with write_to_f:
print("World!")
print("World!", file=getattr(sys, self.orig_stream))
self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
s = f.getvalue()
self.assertEqual(s, "Hello World!\n")
class TestRedirectStdout(TestRedirectStream, unittest.TestCase):
redirect_stream = redirect_stdout
orig_stream = "stdout"
class TestRedirectStderr(TestRedirectStream, unittest.TestCase):
redirect_stream = redirect_stderr
orig_stream = "stderr"
class TestSuppress(unittest.TestCase):
@requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = suppress.__doc__
@ -712,131 +824,12 @@ class TestSuppress(unittest.TestCase):
with ignore_exceptions:
len(5)
with ignore_exceptions:
1/0
with ignore_exceptions: # Check nested usage
len(5)
class TestContextStack(unittest.TestCase):
def test_no_resources(self):
with ContextStack():
pass
def test_register(self):
expected = [
((), {}),
((1,), {}),
((1,2), {}),
((), dict(example=1)),
((1,), dict(example=1)),
((1,2), dict(example=1)),
]
result = []
def _exit(*args, **kwds):
"""Test metadata propagation"""
result.append((args, kwds))
with ContextStack() as stack:
for args, kwds in reversed(expected):
if args and kwds:
f = stack.register(_exit, *args, **kwds)
elif args:
f = stack.register(_exit, *args)
elif kwds:
f = stack.register(_exit, **kwds)
else:
f = stack.register(_exit)
self.assertIs(f, _exit)
for wrapper in stack._exit_callbacks:
self.assertIs(wrapper.__wrapped__, _exit)
self.assertNotEqual(wrapper.__name__, _exit.__name__)
self.assertIsNone(wrapper.__doc__, _exit.__doc__)
self.assertEqual(result, expected)
def test_register_exit(self):
exc_raised = ZeroDivisionError
def _expect_exc(exc_type, exc, exc_tb):
self.assertIs(exc_type, exc_raised)
def _suppress_exc(*exc_details):
return True
def _expect_ok(exc_type, exc, exc_tb):
self.assertIsNone(exc_type)
self.assertIsNone(exc)
self.assertIsNone(exc_tb)
class ExitCM(object):
def __init__(self, check_exc):
self.check_exc = check_exc
def __enter__(self):
self.fail("Should not be called!")
def __exit__(self, *exc_details):
self.check_exc(*exc_details)
with ContextStack() as stack:
stack.register_exit(_expect_ok)
self.assertIs(stack._exit_callbacks[-1], _expect_ok)
cm = ExitCM(_expect_ok)
stack.register_exit(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
stack.register_exit(_suppress_exc)
self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
cm = ExitCM(_expect_exc)
stack.register_exit(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
stack.register_exit(_expect_exc)
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
stack.register_exit(_expect_exc)
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
outer_continued = True
1/0
def test_enter_context(self):
class TestCM(object):
def __enter__(self):
result.append(1)
def __exit__(self, *exc_details):
result.append(3)
result = []
cm = TestCM()
with ContextStack() as stack:
@stack.register # Registered first => cleaned up last
def _exit():
result.append(4)
self.assertIsNotNone(_exit)
stack.enter_context(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
result.append(2)
self.assertEqual(result, [1, 2, 3, 4])
def test_close(self):
result = []
with ContextStack() as stack:
@stack.register
def _exit():
result.append(1)
self.assertIsNotNone(_exit)
stack.close()
result.append(2)
self.assertEqual(result, [1, 2])
def test_preserve(self):
result = []
with ContextStack() as stack:
@stack.register
def _exit():
result.append(3)
self.assertIsNotNone(_exit)
new_stack = stack.preserve()
result.append(1)
result.append(2)
new_stack.close()
self.assertEqual(result, [1, 2, 3])
def test_instance_bypass(self):
class Example(object): pass
cm = Example()
cm.__exit__ = object()
stack = ContextStack()
self.assertRaises(AttributeError, stack.enter_context, cm)
stack.register_exit(cm)
self.assertIs(stack._exit_callbacks[-1], cm)
self.assertTrue(outer_continued)
if __name__ == "__main__":
import unittest
unittest.main()