blob: c4fab1bbfa86333df429016c3fb96d4c3aeaf794 [file] [log] [blame]
"""Utilities for with-statement contexts. See PEP 343."""
import sys
__all__ = ["contextmanager", "nested", "closing"]
class GeneratorContextManager(object):
"""Helper for @contextmanager decorator."""
def __init__(self, gen):
self.gen = gen
def __enter__(self):
try:
return self.gen.next()
except StopIteration:
raise RuntimeError("generator didn't yield")
def __exit__(self, type, value, traceback):
if type is None:
try:
self.gen.next()
except StopIteration:
return
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = type()
try:
self.gen.throw(type, value, traceback)
raise RuntimeError("generator didn't stop after throw()")
except StopIteration, exc:
# Suppress the exception *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed
return exc is not value
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
if sys.exc_info()[1] is not value:
raise
def contextmanager(func):
"""@contextmanager decorator.
Typical usage:
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
This makes this:
with some_generator(<arguments>) as <variable>:
<body>
equivalent to this:
<setup>
try:
<variable> = <value>
<body>
finally:
<cleanup>
"""
def helper(*args, **kwds):
return GeneratorContextManager(func(*args, **kwds))
try:
helper.__name__ = func.__name__
helper.__doc__ = func.__doc__
helper.__dict__ = func.__dict__
except:
pass
return helper
@contextmanager
def nested(*managers):
"""Support multiple context managers in a single with-statement.
Code like this:
with nested(A, B, C) as (X, Y, Z):
<body>
is equivalent to this:
with A as X:
with B as Y:
with C as Z:
<body>
"""
exits = []
vars = []
exc = (None, None, None)
try:
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
# Don't rely on sys.exc_info() still containing
# the right information. Another exception may
# have been raised and caught by an exit method
raise exc[0], exc[1], exc[2]
class closing(object):
"""Context to automatically close something at the end of a block.
Code like this:
with closing(<module>.open(<arguments>)) as f:
<block>
is equivalent to this:
f = <module>.open(<arguments>)
try:
<block>
finally:
f.close()
"""
def __init__(self, thing):
self.thing = thing
def __enter__(self):
return self.thing
def __exit__(self, *exc_info):
self.thing.close()