contextlib2/contextlib2.py

227 lines
7.2 KiB
Python
Raw Normal View History

2011-12-13 12:41:17 +00:00
"""contextlib2 - backports and enhancements to the contextlib module"""
import sys
from collections import deque
from functools import wraps
__all__ = ["contextmanager", "closing", "ContextDecorator", "ContextStack"]
2011-12-13 12:41:17 +00:00
class ContextDecorator(object):
"A base class or mixin that enables context managers to work as decorators."
def refresh_cm(self):
"""Returns the context manager used to actually wrap the call to the
decorated function.
The default implementation just returns *self*.
Overriding this method allows otherwise one-shot context managers
like _GeneratorContextManager to support use as decorators via
implicit recreation.
"""
return self
def __call__(self, func):
@wraps(func)
def inner(*args, **kwds):
with self.refresh_cm():
return func(*args, **kwds)
return inner
class _GeneratorContextManager(ContextDecorator):
"""Helper for @contextmanager decorator."""
def __init__(self, func, *args, **kwds):
self.gen = func(*args, **kwds)
self.func, self.args, self.kwds = func, args, kwds
def refresh_cm(self):
# _GCM instances are one-shot context managers, so the
# CM must be recreated each time a decorated function is
# called
return self.__class__(self.func, *self.args, **self.kwds)
def __enter__(self):
try:
return next(self.gen)
except StopIteration:
raise RuntimeError("generator didn't yield")
def __exit__(self, type, value, traceback):
if type is None:
try:
next(self.gen)
except StopIteration:
return
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = type()
try:
self.gen.throw(type, value, traceback)
raise RuntimeError("generator didn't stop after throw()")
except StopIteration as exc:
# Suppress the exception *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed
return exc is not value
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
if sys.exc_info()[1] is not value:
raise
def contextmanager(func):
"""@contextmanager decorator.
Typical usage:
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
This makes this:
with some_generator(<arguments>) as <variable>:
<body>
equivalent to this:
<setup>
try:
<variable> = <value>
<body>
finally:
<cleanup>
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, *args, **kwds)
return helper
class closing(object):
"""Context to automatically close something at the end of a block.
Code like this:
with closing(<module>.open(<arguments>)) as f:
<block>
is equivalent to this:
f = <module>.open(<arguments>)
try:
<block>
finally:
f.close()
"""
def __init__(self, thing):
self.thing = thing
def __enter__(self):
return self.thing
def __exit__(self, *exc_info):
self.thing.close()
class ContextStack(object):
2011-12-13 12:41:17 +00:00
"""Context for programmatic management of resource cleanup
For example:
with ContextStack() as stack:
files = [stack.enter_context(fname) for fname in filenames]
2011-12-13 12:41:17 +00:00
# All opened files will automatically be closed at the end of
# the with statement, even if attempts to open files later
# in the list throw an exception
"""
def __init__(self):
self._callbacks = deque()
def register_exit(self, callback):
"""Registers a callback with the standard __exit__ method signature
2011-12-13 12:41:17 +00:00
Can suppress exceptions the same way __exit__ methods can.
2011-12-13 12:41:17 +00:00
"""
self._callbacks.append(callback)
return callback # Allow use as a decorator
2011-12-13 12:41:17 +00:00
def register(self, callback, *args, **kwds):
"""Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
"""
2011-12-13 12:41:17 +00:00
def _wrapper(exc_type, exc, tb):
callback(*args, **kwds)
2011-12-13 12:41:17 +00:00
return self.register_exit(_wrapper)
def enter_context(self, cm):
"""Enters the supplied context manager
If successful, also registers its __exit__ method as a callback and
returns the result of the __enter__ method.
"""
2011-12-13 12:41:17 +00:00
# We look up the special methods on the type to match the with statement
_cm_type = type(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
def _exit_wrapper(*exc_details):
return _exit(cm, *exc_details)
self.register_exit(_exit_wrapper)
return result
def close(self):
"""Immediately unwind the context stack"""
2011-12-13 12:41:17 +00:00
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, *exc_details):
if not self._callbacks:
return
# This looks complicated, but it is really just
# setting up a chain of try-expect statements to ensure
# that outer callbacks still get invoked even if an
# inner one throws an exception
def _invoke_next_callback(exc_details):
# Callbacks are removed from the list in FIFO order
# but the recursion means they're invoked in LIFO order
2011-12-13 12:41:17 +00:00
cb = self._callbacks.popleft()
if not self._callbacks:
# Innermost callback is invoked directly
return cb(*exc_details)
# More callbacks left, so descend another level in the stack
try:
suppress_exc = _invoke_next_callback(exc_details)
except:
suppress_exc = cb(*sys.exc_info())
# Check if this cb suppressed the inner exception
if not suppress_exc:
raise
else:
# Check if inner cb suppressed the original exception
if suppress_exc:
exc_details = (None, None, None)
suppress_exc = cb(*exc_details) or suppress_exc
return suppress_exc
# Kick off the recursive chain
return _invoke_next_callback(exc_details)